gopkg/middleware/etcd/watch.go

138 lines
3.3 KiB
Go
Raw Normal View History

2021-11-23 15:13:50 +08:00
// Package etcd...
//
// Description : 监听key的变化
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2021-11-23 2:58 下午
package etcd
import (
"context"
2021-11-23 16:55:05 +08:00
"math"
"time"
2021-11-26 14:31:25 +08:00
"go.etcd.io/etcd/clientv3"
2021-11-23 15:13:50 +08:00
)
2021-11-26 14:31:25 +08:00
// WatchKeyWithOption ...
2021-11-23 15:13:50 +08:00
//
// Author : go_developer@163.com<白茶清欢>
//
2021-11-26 14:31:25 +08:00
// Date : 2:29 下午 2021/11/26
func WatchKeyWithOption(ctx context.Context, watchKey string, callbackFunc WatcherHandler, optionList ...clientv3.OpOption) {
2021-11-23 15:13:50 +08:00
if nil == callbackFunc {
// 变化之后,没有任何逻辑处理,视为不需要监听变化
return
}
if nil == ctx {
ctx = context.Background()
}
2021-11-26 15:56:30 +08:00
rch := Client.Watch(ctx, watchKey, optionList...) // <-chan WatchResponse
2021-11-23 15:13:50 +08:00
for watchResp := range rch {
for _, ev := range watchResp.Events {
callbackFunc(ev)
}
}
}
2021-11-26 14:31:25 +08:00
// WatchKey 监听key的变化,永久监听
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2:58 下午 2021/11/23
func WatchKey(ctx context.Context, watchKey string, callbackFunc WatcherHandler) {
WatchKeyWithOption(ctx, watchKey, callbackFunc)
}
// WatchWithKeyPrefix ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2:30 下午 2021/11/26
func WatchWithKeyPrefix(ctx context.Context, watchKey string, callbackFunc WatcherHandler) {
WatchKeyWithOption(ctx, watchKey, callbackFunc, clientv3.WithPrefix())
}
// WatchKeyWithCancel 可以随时取消的
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:16 下午 2021/11/23
func WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc WatcherHandler, cancelChan chan interface{}, cancelDealFunc CancelWatcherHandler) {
if nil == callbackFunc {
// 变化之后,没有任何逻辑处理,视为不需要监听变化
return
}
if nil == ctx {
ctx = context.Background()
}
var (
cancelFunc context.CancelFunc
)
ctx, cancelFunc = context.WithCancel(ctx)
defer cancelFunc()
rch := Client.Watch(context.Background(), watchKey) // <-chan WatchResponse
hasFinish := false
for {
select {
case cancelData := <-cancelChan:
if nil != cancelDealFunc {
cancelDealFunc(watchKey, cancelData)
}
hasFinish = true
case watchResp := <-rch:
for _, ev := range watchResp.Events {
callbackFunc(ev)
}
}
if hasFinish {
break
}
}
}
2021-11-23 16:55:05 +08:00
// WatchKeyOnce 监听一次key
//
// timeout 若 <= 0 , 则会一直等待当前key发生变化为止
//
// timeout 若 > 0 , 则在制定时间内为监听到变化, 立即自动取消,
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:35 下午 2021/11/23
func WatchKeyOnce(ctx context.Context, watchKey string, callbackFunc WatcherHandler, timeout time.Duration, timeoutHandler TimeoutWatcherHandler) {
if nil == callbackFunc {
// 变化之后,没有任何逻辑处理,视为不需要监听变化
return
}
if timeout == 0 {
// 不指定超时时间默认设置成int64最大值
timeout = math.MaxInt16 * time.Second
}
if nil == ctx {
ctx = context.Background()
}
var (
cancelFunc context.CancelFunc
)
ctx, cancelFunc = context.WithCancel(ctx)
defer cancelFunc()
2021-11-23 16:55:05 +08:00
rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse
select {
case <-time.After(timeout):
if nil != timeoutHandler {
timeoutHandler(watchKey, timeout)
}
case watchResp := <-rch:
for _, ev := range watchResp.Events {
callbackFunc(ev)
}
}
}