From 0b5e55cefdaf9c26f1123e1fc4b4627e6b5ece89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Tue, 23 Nov 2021 16:55:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=9B=91=E5=90=AC=E4=B8=80?= =?UTF-8?q?=E6=AC=A1key=E5=8F=98=E5=8C=96=E7=9A=84=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/etcd/define.go | 3 +++ middleware/etcd/string_test.go | 45 ++++++++++++++++++++++++++++++++++ middleware/etcd/watch.go | 38 ++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+) diff --git a/middleware/etcd/define.go b/middleware/etcd/define.go index 5f0feca..3b1c3aa 100644 --- a/middleware/etcd/define.go +++ b/middleware/etcd/define.go @@ -41,3 +41,6 @@ type WatcherHandler func(event *clientv3.Event) // CancelWatcherHandler 取消监听后的处理函数 type CancelWatcherHandler func(key string, data interface{}) + +// TimeoutWatcherHandler 超时之后的回调函数 +type TimeoutWatcherHandler func(key string, timeout time.Duration) diff --git a/middleware/etcd/string_test.go b/middleware/etcd/string_test.go index db7bd80..031a7ad 100644 --- a/middleware/etcd/string_test.go +++ b/middleware/etcd/string_test.go @@ -114,3 +114,48 @@ func TestWatchKeyWithCancelByChangeCallback(t *testing.T) { }() WatchKeyWithCancel(nil, key, dealFunc, cancelChan, cancelFunc) } + +// TestWatchKeyOnce ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 4:50 下午 2021/11/23 +func TestWatchKeyOnce(t *testing.T) { + key := "name" + dealFunc := func(data *clientv3.Event) { + fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) + } + + timeoutFunc := func(key string, timeout time.Duration) { + fmt.Println("监听超时", key, timeout) + } + + timeout := 10 * time.Second + go func() { + for i := 0; i < 30; i++ { + _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) + time.Sleep(time.Second) + } + }() + WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) +} + +// TestWatchKeyOnceForTimeout ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 4:53 下午 2021/11/23 +func TestWatchKeyOnceForTimeout(t *testing.T) { + key := "name" + dealFunc := func(data *clientv3.Event) { + fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) + } + + timeoutFunc := func(key string, timeout time.Duration) { + fmt.Println("监听超时", key, timeout) + } + + timeout := time.Second + + WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) +} diff --git a/middleware/etcd/watch.go b/middleware/etcd/watch.go index fd9de85..4e43dc7 100644 --- a/middleware/etcd/watch.go +++ b/middleware/etcd/watch.go @@ -9,6 +9,8 @@ package etcd import ( "context" + "math" + "time" ) // WatchKey 监听key的变化 @@ -66,3 +68,39 @@ func WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc Watch } } } + +// WatchKeyOnce 监听一次key +// +// timeout 若 <= 0 , 则会一直等待当前key发生变化为止 +// +// timeout 若 > 0 , 则在制定时间内为监听到变化, 立即自动取消, +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 4:35 下午 2021/11/23 +func WatchKeyOnce(ctx context.Context, watchKey string, callbackFunc WatcherHandler, timeout time.Duration, timeoutHandler TimeoutWatcherHandler) { + if nil == callbackFunc { + // 变化之后,没有任何逻辑处理,视为不需要监听变化 + return + } + + if timeout == 0 { + // 不指定超时时间,默认设置成int64最大值 + timeout = math.MaxInt16 * time.Second + } + + if nil == ctx { + ctx = context.Background() + } + rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse + select { + case <-time.After(timeout): + if nil != timeoutHandler { + timeoutHandler(watchKey, timeout) + } + case watchResp := <-rch: + for _, ev := range watchResp.Events { + callbackFunc(ev) + } + } +}