增加随时可取消的watch监控能力

This commit is contained in:
白茶清欢 2021-11-23 16:31:25 +08:00
parent c3f8e40133
commit 64f7670559
2 changed files with 42 additions and 1 deletions

View File

@ -38,3 +38,6 @@ const (
// WatcherHandler 监听key变化的处理函数 // WatcherHandler 监听key变化的处理函数
type WatcherHandler func(event *clientv3.Event) type WatcherHandler func(event *clientv3.Event)
// CancelWatcherHandler 取消监听后的处理函数
type CancelWatcherHandler func(key string, data interface{})

View File

@ -21,10 +21,48 @@ func WatchKey(ctx context.Context, watchKey string, callbackFunc WatcherHandler)
// 变化之后,没有任何逻辑处理,视为不需要监听变化 // 变化之后,没有任何逻辑处理,视为不需要监听变化
return return
} }
rch := Client.Watch(context.Background(), watchKey) // <-chan WatchResponse if nil == ctx {
ctx = context.Background()
}
rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse
for watchResp := range rch { for watchResp := range rch {
for _, ev := range watchResp.Events { for _, ev := range watchResp.Events {
callbackFunc(ev) callbackFunc(ev)
} }
} }
} }
// WatchKeyWithCancel 可以随时取消的
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:16 下午 2021/11/23
func WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc WatcherHandler, cancelChan chan interface{}, cancelDealFunc CancelWatcherHandler) {
if nil == callbackFunc {
// 变化之后,没有任何逻辑处理,视为不需要监听变化
return
}
if nil == ctx {
ctx = context.Background()
}
rch := Client.Watch(context.Background(), watchKey) // <-chan WatchResponse
hasFinish := false
for {
select {
case cancelData := <-cancelChan:
if nil != cancelDealFunc {
cancelDealFunc(watchKey, cancelData)
}
hasFinish = true
case watchResp := <-rch:
for _, ev := range watchResp.Events {
callbackFunc(ev)
}
}
if hasFinish {
break
}
}
}