diff --git a/middleware/etcd/define.go b/middleware/etcd/define.go index b2f3d00..5f0feca 100644 --- a/middleware/etcd/define.go +++ b/middleware/etcd/define.go @@ -38,3 +38,6 @@ const ( // WatcherHandler 监听key变化的处理函数 type WatcherHandler func(event *clientv3.Event) + +// CancelWatcherHandler 取消监听后的处理函数 +type CancelWatcherHandler func(key string, data interface{}) diff --git a/middleware/etcd/watch.go b/middleware/etcd/watch.go index c48605a..fd9de85 100644 --- a/middleware/etcd/watch.go +++ b/middleware/etcd/watch.go @@ -21,10 +21,48 @@ func WatchKey(ctx context.Context, watchKey string, callbackFunc WatcherHandler) // 变化之后,没有任何逻辑处理,视为不需要监听变化 return } - rch := Client.Watch(context.Background(), watchKey) // <-chan WatchResponse + if nil == ctx { + ctx = context.Background() + } + rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse for watchResp := range rch { for _, ev := range watchResp.Events { callbackFunc(ev) } } } + +// WatchKeyWithCancel 可以随时取消的 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 3:16 下午 2021/11/23 +func WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc WatcherHandler, cancelChan chan interface{}, cancelDealFunc CancelWatcherHandler) { + if nil == callbackFunc { + // 变化之后,没有任何逻辑处理,视为不需要监听变化 + return + } + if nil == ctx { + ctx = context.Background() + } + + rch := Client.Watch(context.Background(), watchKey) // <-chan WatchResponse + + hasFinish := false + for { + select { + case cancelData := <-cancelChan: + if nil != cancelDealFunc { + cancelDealFunc(watchKey, cancelData) + } + hasFinish = true + case watchResp := <-rch: + for _, ev := range watchResp.Events { + callbackFunc(ev) + } + } + if hasFinish { + break + } + } +}