diff --git a/middleware/etcd/define.go b/middleware/etcd/define.go index 5f0feca..3b1c3aa 100644 --- a/middleware/etcd/define.go +++ b/middleware/etcd/define.go @@ -41,3 +41,6 @@ type WatcherHandler func(event *clientv3.Event) // CancelWatcherHandler 取消监听后的处理函数 type CancelWatcherHandler func(key string, data interface{}) + +// TimeoutWatcherHandler 超时之后的回调函数 +type TimeoutWatcherHandler func(key string, timeout time.Duration) diff --git a/middleware/etcd/string_test.go b/middleware/etcd/string_test.go index db7bd80..031a7ad 100644 --- a/middleware/etcd/string_test.go +++ b/middleware/etcd/string_test.go @@ -114,3 +114,48 @@ func TestWatchKeyWithCancelByChangeCallback(t *testing.T) { }() WatchKeyWithCancel(nil, key, dealFunc, cancelChan, cancelFunc) } + +// TestWatchKeyOnce ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 4:50 下午 2021/11/23 +func TestWatchKeyOnce(t *testing.T) { + key := "name" + dealFunc := func(data *clientv3.Event) { + fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) + } + + timeoutFunc := func(key string, timeout time.Duration) { + fmt.Println("监听超时", key, timeout) + } + + timeout := 10 * time.Second + go func() { + for i := 0; i < 30; i++ { + _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) + time.Sleep(time.Second) + } + }() + WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) +} + +// TestWatchKeyOnceForTimeout ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 4:53 下午 2021/11/23 +func TestWatchKeyOnceForTimeout(t *testing.T) { + key := "name" + dealFunc := func(data *clientv3.Event) { + fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) + } + + timeoutFunc := func(key string, timeout time.Duration) { + fmt.Println("监听超时", key, timeout) + } + + timeout := time.Second + + WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) +} diff --git a/middleware/etcd/watch.go b/middleware/etcd/watch.go index fd9de85..4e43dc7 100644 --- a/middleware/etcd/watch.go +++ b/middleware/etcd/watch.go @@ -9,6 +9,8 @@ package etcd import ( "context" + "math" + "time" ) // WatchKey 监听key的变化 @@ -66,3 +68,39 @@ func WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc Watch } } } + +// WatchKeyOnce 监听一次key +// +// timeout 若 <= 0 , 则会一直等待当前key发生变化为止 +// +// timeout 若 > 0 , 则在制定时间内为监听到变化, 立即自动取消, +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 4:35 下午 2021/11/23 +func WatchKeyOnce(ctx context.Context, watchKey string, callbackFunc WatcherHandler, timeout time.Duration, timeoutHandler TimeoutWatcherHandler) { + if nil == callbackFunc { + // 变化之后,没有任何逻辑处理,视为不需要监听变化 + return + } + + if timeout == 0 { + // 不指定超时时间,默认设置成int64最大值 + timeout = math.MaxInt16 * time.Second + } + + if nil == ctx { + ctx = context.Background() + } + rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse + select { + case <-time.After(timeout): + if nil != timeoutHandler { + timeoutHandler(watchKey, timeout) + } + case watchResp := <-rch: + for _, ev := range watchResp.Events { + callbackFunc(ev) + } + } +}