etcd/watch.go

138 lines
3.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package etcd...
//
// Description : 监听key的变化
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2021-11-23 2:58 下午
package etcd
import (
"context"
"math"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
// WatchKeyWithOption ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2:29 下午 2021/11/26
func (wc *WrapperClient) WatchKeyWithOption(ctx context.Context, watchKey string, callbackFunc WatcherHandler, optionList ...clientv3.OpOption) {
if nil == callbackFunc {
// 变化之后,没有任何逻辑处理,视为不需要监听变化
return
}
if nil == ctx {
ctx = context.Background()
}
rch := wc.client.Watch(ctx, watchKey, optionList...) // <-chan WatchResponse
for watchResp := range rch {
for _, ev := range watchResp.Events {
callbackFunc(ev)
}
}
}
// WatchKey 监听key的变化,永久监听
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2:58 下午 2021/11/23
func (wc *WrapperClient) WatchKey(ctx context.Context, watchKey string, callbackFunc WatcherHandler) {
wc.WatchKeyWithOption(ctx, watchKey, callbackFunc)
}
// WatchWithKeyPrefix ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2:30 下午 2021/11/26
func (wc *WrapperClient) WatchWithKeyPrefix(ctx context.Context, watchKey string, callbackFunc WatcherHandler) {
wc.WatchKeyWithOption(ctx, watchKey, callbackFunc, clientv3.WithPrefix())
}
// WatchKeyWithCancel 可以随时取消的
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:16 下午 2021/11/23
func (wc *WrapperClient) WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc WatcherHandler, cancelChan chan interface{}, cancelDealFunc CancelWatcherHandler) {
if nil == callbackFunc {
// 变化之后,没有任何逻辑处理,视为不需要监听变化
return
}
if nil == ctx {
ctx = context.Background()
}
var (
cancelFunc context.CancelFunc
)
ctx, cancelFunc = context.WithCancel(ctx)
defer cancelFunc()
rch := wc.client.Watch(context.Background(), watchKey) // <-chan WatchResponse
hasFinish := false
for {
select {
case cancelData := <-cancelChan:
if nil != cancelDealFunc {
cancelDealFunc(watchKey, cancelData)
}
hasFinish = true
case watchResp := <-rch:
for _, ev := range watchResp.Events {
callbackFunc(ev)
}
}
if hasFinish {
break
}
}
}
// WatchKeyOnce 监听一次key
//
// timeout 若 <= 0 , 则会一直等待当前key发生变化为止
//
// timeout 若 > 0 , 则在制定时间内为监听到变化, 立即自动取消,
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:35 下午 2021/11/23
func (wc *WrapperClient) WatchKeyOnce(ctx context.Context, watchKey string, callbackFunc WatcherHandler, timeout time.Duration, timeoutHandler TimeoutWatcherHandler) {
if nil == callbackFunc {
// 变化之后,没有任何逻辑处理,视为不需要监听变化
return
}
if timeout == 0 {
// 不指定超时时间默认设置成int64最大值
timeout = math.MaxInt16 * time.Second
}
if nil == ctx {
ctx = context.Background()
}
var (
cancelFunc context.CancelFunc
)
ctx, cancelFunc = context.WithCancel(ctx)
defer cancelFunc()
rch := wc.client.Watch(ctx, watchKey) // <-chan WatchResponse
select {
case <-time.After(timeout):
if nil != timeoutHandler {
timeoutHandler(watchKey, timeout)
}
case watchResp := <-rch:
for _, ev := range watchResp.Events {
callbackFunc(ev)
}
}
}