From 64f7670559d1007a3126d46cb588451af1c9b132 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Tue, 23 Nov 2021 16:31:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E9=9A=8F=E6=97=B6=E5=8F=AF?= =?UTF-8?q?=E5=8F=96=E6=B6=88=E7=9A=84watch=E7=9B=91=E6=8E=A7=E8=83=BD?= =?UTF-8?q?=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/etcd/define.go | 3 +++ middleware/etcd/watch.go | 40 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/middleware/etcd/define.go b/middleware/etcd/define.go index b2f3d00..5f0feca 100644 --- a/middleware/etcd/define.go +++ b/middleware/etcd/define.go @@ -38,3 +38,6 @@ const ( // WatcherHandler 监听key变化的处理函数 type WatcherHandler func(event *clientv3.Event) + +// CancelWatcherHandler 取消监听后的处理函数 +type CancelWatcherHandler func(key string, data interface{}) diff --git a/middleware/etcd/watch.go b/middleware/etcd/watch.go index c48605a..fd9de85 100644 --- a/middleware/etcd/watch.go +++ b/middleware/etcd/watch.go @@ -21,10 +21,48 @@ func WatchKey(ctx context.Context, watchKey string, callbackFunc WatcherHandler) // 变化之后,没有任何逻辑处理,视为不需要监听变化 return } - rch := Client.Watch(context.Background(), watchKey) // <-chan WatchResponse + if nil == ctx { + ctx = context.Background() + } + rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse for watchResp := range rch { for _, ev := range watchResp.Events { callbackFunc(ev) } } } + +// WatchKeyWithCancel 可以随时取消的 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 3:16 下午 2021/11/23 +func WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc WatcherHandler, cancelChan chan interface{}, cancelDealFunc CancelWatcherHandler) { + if nil == callbackFunc { + // 变化之后,没有任何逻辑处理,视为不需要监听变化 + return + } + if nil == ctx { + ctx = context.Background() + } + + rch := Client.Watch(context.Background(), watchKey) // <-chan WatchResponse + + hasFinish := false + for { + select { + case cancelData := <-cancelChan: + if nil != cancelDealFunc { + cancelDealFunc(watchKey, cancelData) + } + hasFinish = true + case watchResp := <-rch: + for _, ev := range watchResp.Events { + callbackFunc(ev) + } + } + if hasFinish { + break + } + } +}