// Package etcd... // // Description : 监听key的变化 // // Author : go_developer@163.com<白茶清欢> // // Date : 2021-11-23 2:58 下午 package etcd import ( "context" "math" "time" clientv3 "go.etcd.io/etcd/client/v3" ) // WatchKeyWithOption ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2:29 下午 2021/11/26 func (wc *WrapperClient) WatchKeyWithOption(ctx context.Context, watchKey string, callbackFunc WatcherHandler, optionList ...clientv3.OpOption) { if nil == callbackFunc { // 变化之后,没有任何逻辑处理,视为不需要监听变化 return } if nil == ctx { ctx = context.Background() } rch := wc.client.Watch(ctx, watchKey, optionList...) // <-chan WatchResponse for watchResp := range rch { for _, ev := range watchResp.Events { callbackFunc(ev) } } } // WatchKey 监听key的变化,永久监听 // // Author : go_developer@163.com<白茶清欢> // // Date : 2:58 下午 2021/11/23 func (wc *WrapperClient) WatchKey(ctx context.Context, watchKey string, callbackFunc WatcherHandler) { wc.WatchKeyWithOption(ctx, watchKey, callbackFunc) } // WatchWithKeyPrefix ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2:30 下午 2021/11/26 func (wc *WrapperClient) WatchWithKeyPrefix(ctx context.Context, watchKey string, callbackFunc WatcherHandler) { wc.WatchKeyWithOption(ctx, watchKey, callbackFunc, clientv3.WithPrefix()) } // WatchKeyWithCancel 可以随时取消的 // // Author : go_developer@163.com<白茶清欢> // // Date : 3:16 下午 2021/11/23 func (wc *WrapperClient) WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc WatcherHandler, cancelChan chan interface{}, cancelDealFunc CancelWatcherHandler) { if nil == callbackFunc { // 变化之后,没有任何逻辑处理,视为不需要监听变化 return } if nil == ctx { ctx = context.Background() } var ( cancelFunc context.CancelFunc ) ctx, cancelFunc = context.WithCancel(ctx) defer cancelFunc() rch := wc.client.Watch(context.Background(), watchKey) // <-chan WatchResponse hasFinish := false for { select { case cancelData := <-cancelChan: if nil != cancelDealFunc { cancelDealFunc(watchKey, cancelData) } hasFinish = true case watchResp := <-rch: for _, ev := range watchResp.Events { callbackFunc(ev) } } if hasFinish { break } } } // WatchKeyOnce 监听一次key // // timeout 若 <= 0 , 则会一直等待当前key发生变化为止 // // timeout 若 > 0 , 则在制定时间内为监听到变化, 立即自动取消, // // Author : go_developer@163.com<白茶清欢> // // Date : 4:35 下午 2021/11/23 func (wc *WrapperClient) WatchKeyOnce(ctx context.Context, watchKey string, callbackFunc WatcherHandler, timeout time.Duration, timeoutHandler TimeoutWatcherHandler) { if nil == callbackFunc { // 变化之后,没有任何逻辑处理,视为不需要监听变化 return } if timeout == 0 { // 不指定超时时间,默认设置成int64最大值 timeout = math.MaxInt16 * time.Second } if nil == ctx { ctx = context.Background() } var ( cancelFunc context.CancelFunc ) ctx, cancelFunc = context.WithCancel(ctx) defer cancelFunc() rch := wc.client.Watch(ctx, watchKey) // <-chan WatchResponse select { case <-time.After(timeout): if nil != timeoutHandler { timeoutHandler(watchKey, timeout) } case watchResp := <-rch: for _, ev := range watchResp.Events { callbackFunc(ev) } } }