diff --git a/middleware/etcd/define.go b/middleware/etcd/define.go index 3b1c3aa..630176e 100644 --- a/middleware/etcd/define.go +++ b/middleware/etcd/define.go @@ -44,3 +44,23 @@ type CancelWatcherHandler func(key string, data interface{}) // TimeoutWatcherHandler 超时之后的回调函数 type TimeoutWatcherHandler func(key string, timeout time.Duration) + +// LeaseKeepALiveHandler 续期成功的处理 +type LeaseKeepALiveHandler func(data *LeaseKeepAliveData) + +// LeaseKeepAliveData 自动续期的数据结构 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:28 下午 2021/11/23 +type LeaseKeepAliveData struct { + Key string `json:"key"` // 续期key + StartTime int64 `json:"start_time"` // 开始续期时间 + LastLeaseTime int64 `json:"last_lease_time"` // 上一次续期事件时间 + LeaseCnt int64 `json:"lease_cnt"` // 续期次数 + HasFinish bool `json:"has_finish"` // 是否完成 + LeaseFinishType string `json:"lease_finish_type"` // 续期完成类型 + LeaseFinishTime int64 `json:"lease_finish_time"` // 续期完成时间 + LeaseDetail *clientv3.LeaseKeepAliveResponse `json:"lease_detail"` // 续约数据 + Data map[string]interface{} `json:"data"` // 携带的数据 +} diff --git a/middleware/etcd/lease.go b/middleware/etcd/lease.go new file mode 100644 index 0000000..eb04d6d --- /dev/null +++ b/middleware/etcd/lease.go @@ -0,0 +1,178 @@ +// Package etcd ... +// +// Description : 租约相关操作 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2021-11-23 6:03 下午 +package etcd + +import ( + "context" + "time" + + "github.com/pkg/errors" + "go.etcd.io/etcd/clientv3" +) + +// LeaseOnce 申请一个一次性租约 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 6:06 下午 2021/11/23 +func LeaseOnce(ctx context.Context, key string, val string, ttl int64) error { + if ttl <= 0 { + return errors.New("lease time must be more than 0") + } + + if nil == ctx { + ctx = context.Background() + } + + var ( + resp *clientv3.LeaseGrantResponse + err error + cancelFunc context.CancelFunc + ) + ctx, cancelFunc = context.WithCancel(ctx) + defer cancelFunc() + // 创建一个5秒的租约 + if resp, err = Client.Grant(ctx, ttl); err != nil { + return errors.New("lease grant error : " + err.Error()) + } + + // ttl 秒钟之后, 这个key就会被移除 + if _, err = Client.Put(context.TODO(), key, val, clientv3.WithLease(resp.ID)); err != nil { + return errors.New("lease key put fail : " + err.Error()) + } + return nil +} + +// LeaseKeepAliveForever 无限续租一个key +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 7:40 下午 2021/11/23 +func LeaseKeepAliveForever(ctx context.Context, key string, val string, ttl int64, keepAliveHandler LeaseKeepALiveHandler) error { + if ttl <= 0 { + return errors.New("lease time must be more than 0") + } + if nil == ctx { + ctx = context.TODO() + } + var ( + resp *clientv3.LeaseGrantResponse + respChan <-chan *clientv3.LeaseKeepAliveResponse + err error + ) + // 创建一个5秒的租约 + if resp, err = Client.Grant(ctx, ttl); err != nil { + return errors.New("lease grant error : " + err.Error()) + } + + // ttl 秒钟之后, 这个key就会被移除 + if _, err = Client.Put(context.TODO(), key, val, clientv3.WithLease(resp.ID)); err != nil { + return errors.New("lease key put fail : " + err.Error()) + } + // the key will be kept forever + if respChan, err = Client.KeepAlive(ctx, resp.ID); nil != err { + return errors.New("lease keep alive fail : " + err.Error()) + } + leaseData := &LeaseKeepAliveData{ + Key: key, + StartTime: time.Now().Unix(), + LastLeaseTime: 0, + LeaseCnt: 0, + HasFinish: false, + LeaseFinishType: "", + LeaseFinishTime: 0, + LeaseDetail: nil, + Data: make(map[string]interface{}), + } + // 监听 chan + for ka := range respChan { + leaseData.LeaseCnt++ + leaseData.LeaseDetail = ka + leaseData.LastLeaseTime = time.Now().Unix() + if nil != keepAliveHandler { + keepAliveHandler(leaseData) + } + } + return nil +} + +// LeaseKeepAliveWithDuration 设置最大支持续期的时间, 中途可随时取消 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:05 下午 2021/11/23 +func LeaseKeepAliveWithDuration(ctx context.Context, key string, val string, ttl int64, keepAliveHandler LeaseKeepALiveHandler, cancelLeaseChan chan *LeaseKeepAliveData, maxCnt int64) (*LeaseKeepAliveData, error) { + if ttl <= 0 { + return nil, errors.New("lease time must be more than 0") + } + + if nil == cancelLeaseChan { + cancelLeaseChan = make(chan *LeaseKeepAliveData, 1) + } + var cancelFunc context.CancelFunc + if nil == ctx { + ctx, cancelFunc = context.WithCancel(context.Background()) + } else { + ctx, cancelFunc = context.WithCancel(ctx) + } + defer cancelFunc() + var ( + resp *clientv3.LeaseGrantResponse + respChan <-chan *clientv3.LeaseKeepAliveResponse + err error + ) + leaseData := &LeaseKeepAliveData{ + Key: key, + StartTime: 0, + LastLeaseTime: 0, + LeaseCnt: 0, + LeaseFinishType: "", + LeaseFinishTime: 0, + Data: make(map[string]interface{}), + HasFinish: false, + } + // 创建一个 ttl 秒的租约 + if resp, err = Client.Grant(ctx, ttl); err != nil { + return nil, errors.New("lease grant error : " + err.Error()) + } + leaseData.StartTime = time.Now().Unix() + + // ttl 秒钟之后, 这个key就会被移除 + if _, err = Client.Put(context.TODO(), key, val, clientv3.WithLease(resp.ID)); err != nil { + return nil, errors.New("lease key put fail : " + err.Error()) + } + // the key will be kept forever + if respChan, err = Client.KeepAlive(ctx, resp.ID); nil != err { + return nil, errors.New("lease keep alive fail : " + err.Error()) + } + + for { + select { + case <-cancelLeaseChan: + leaseData.HasFinish = true + leaseData.LeaseFinishTime = time.Now().Unix() + leaseData.LeaseFinishType = "SIGNAL_CANCEL" + case leaseResp := <-respChan: + leaseData.LeaseCnt++ + leaseData.LastLeaseTime = time.Now().Unix() + leaseData.LeaseDetail = leaseResp + if nil != keepAliveHandler { + keepAliveHandler(leaseData) + } + if leaseData.LeaseCnt >= maxCnt { + leaseData.HasFinish = true + leaseData.LeaseFinishType = "OVER_MAX_CNT" + leaseData.LeaseFinishTime = time.Now().Unix() + } + } + if leaseData.HasFinish { + break + } + } + return leaseData, nil +} diff --git a/middleware/etcd/string_test.go b/middleware/etcd/string_test.go index 031a7ad..21c2805 100644 --- a/middleware/etcd/string_test.go +++ b/middleware/etcd/string_test.go @@ -159,3 +159,57 @@ func TestWatchKeyOnceForTimeout(t *testing.T) { WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) } + +// TestLeaseOnce ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 7:37 下午 2021/11/23 +func TestLeaseOnce(t *testing.T) { + key := "lock" + fmt.Println(LeaseOnce(nil, key, "lock", 10)) + for i := 0; i < 15; i++ { + fmt.Println(Get(nil, key, 1)) + time.Sleep(time.Second) + } +} + +// TestLeaseKeepAliveForever ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 7:54 下午 2021/11/23 +func TestLeaseKeepAliveForever(t *testing.T) { + key := "lock" + keepAliveHandler := func(data *LeaseKeepAliveData) { + fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) + } + go func() { + fmt.Println(LeaseKeepAliveForever(nil, key, "lock", 10, keepAliveHandler)) + }() + for i := 0; i < 15; i++ { + r, e := Get(nil, key, 1) + fmt.Println("读取", r, e) + time.Sleep(time.Second) + } +} + +// TestLeaseKeepAliveWithDuration ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:56 下午 2021/11/23 +func TestLeaseKeepAliveWithDuration(t *testing.T) { + key := "lock" + keepAliveHandler := func(data *LeaseKeepAliveData) { + fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) + } + go func() { + fmt.Println(LeaseKeepAliveWithDuration(nil, key, "lock", 1, keepAliveHandler, nil, 5)) + }() + for i := 0; i < 15; i++ { + r, e := Get(nil, key, 1) + fmt.Println("读取", r, e) + time.Sleep(time.Second) + } +} diff --git a/middleware/etcd/watch.go b/middleware/etcd/watch.go index 4e43dc7..9340e75 100644 --- a/middleware/etcd/watch.go +++ b/middleware/etcd/watch.go @@ -13,7 +13,7 @@ import ( "time" ) -// WatchKey 监听key的变化 +// WatchKey 监听key的变化,永久监听 // // Author : go_developer@163.com<白茶清欢> // @@ -26,6 +26,7 @@ func WatchKey(ctx context.Context, watchKey string, callbackFunc WatcherHandler) if nil == ctx { ctx = context.Background() } + rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse for watchResp := range rch { for _, ev := range watchResp.Events { @@ -47,6 +48,11 @@ func WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc Watch if nil == ctx { ctx = context.Background() } + var ( + cancelFunc context.CancelFunc + ) + ctx, cancelFunc = context.WithCancel(ctx) + defer cancelFunc() rch := Client.Watch(context.Background(), watchKey) // <-chan WatchResponse @@ -92,6 +98,11 @@ func WatchKeyOnce(ctx context.Context, watchKey string, callbackFunc WatcherHand if nil == ctx { ctx = context.Background() } + var ( + cancelFunc context.CancelFunc + ) + ctx, cancelFunc = context.WithCancel(ctx) + defer cancelFunc() rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse select { case <-time.After(timeout):