增加监听一次key变化的处理

This commit is contained in:
白茶清欢 2021-11-23 16:55:05 +08:00
parent ada41fa40a
commit 0b5e55cefd
3 changed files with 86 additions and 0 deletions

View File

@ -41,3 +41,6 @@ type WatcherHandler func(event *clientv3.Event)
// CancelWatcherHandler 取消监听后的处理函数 // CancelWatcherHandler 取消监听后的处理函数
type CancelWatcherHandler func(key string, data interface{}) type CancelWatcherHandler func(key string, data interface{})
// TimeoutWatcherHandler 超时之后的回调函数
type TimeoutWatcherHandler func(key string, timeout time.Duration)

View File

@ -114,3 +114,48 @@ func TestWatchKeyWithCancelByChangeCallback(t *testing.T) {
}() }()
WatchKeyWithCancel(nil, key, dealFunc, cancelChan, cancelFunc) WatchKeyWithCancel(nil, key, dealFunc, cancelChan, cancelFunc)
} }
// TestWatchKeyOnce ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:50 下午 2021/11/23
func TestWatchKeyOnce(t *testing.T) {
key := "name"
dealFunc := func(data *clientv3.Event) {
fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision)
}
timeoutFunc := func(key string, timeout time.Duration) {
fmt.Println("监听超时", key, timeout)
}
timeout := 10 * time.Second
go func() {
for i := 0; i < 30; i++ {
_ = Put(nil, key, fmt.Sprintf("test-%d", i), 0)
time.Sleep(time.Second)
}
}()
WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc)
}
// TestWatchKeyOnceForTimeout ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:53 下午 2021/11/23
func TestWatchKeyOnceForTimeout(t *testing.T) {
key := "name"
dealFunc := func(data *clientv3.Event) {
fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision)
}
timeoutFunc := func(key string, timeout time.Duration) {
fmt.Println("监听超时", key, timeout)
}
timeout := time.Second
WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc)
}

View File

@ -9,6 +9,8 @@ package etcd
import ( import (
"context" "context"
"math"
"time"
) )
// WatchKey 监听key的变化 // WatchKey 监听key的变化
@ -66,3 +68,39 @@ func WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc Watch
} }
} }
} }
// WatchKeyOnce 监听一次key
//
// timeout 若 <= 0 , 则会一直等待当前key发生变化为止
//
// timeout 若 > 0 , 则在制定时间内为监听到变化, 立即自动取消,
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:35 下午 2021/11/23
func WatchKeyOnce(ctx context.Context, watchKey string, callbackFunc WatcherHandler, timeout time.Duration, timeoutHandler TimeoutWatcherHandler) {
if nil == callbackFunc {
// 变化之后,没有任何逻辑处理,视为不需要监听变化
return
}
if timeout == 0 {
// 不指定超时时间默认设置成int64最大值
timeout = math.MaxInt16 * time.Second
}
if nil == ctx {
ctx = context.Background()
}
rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse
select {
case <-time.After(timeout):
if nil != timeoutHandler {
timeoutHandler(watchKey, timeout)
}
case watchResp := <-rch:
for _, ev := range watchResp.Events {
callbackFunc(ev)
}
}
}