From 29cb4a4fbac2dd05a6b05ad8f22d5d648401c4b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Tue, 23 Nov 2021 19:39:52 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E4=B8=80=E6=AC=A1=E6=80=A7=E6=9C=89=E6=95=88=E7=A7=9F=E7=BA=A6?= =?UTF-8?q?=E7=9A=84=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/etcd/lease.go | 43 ++++++++++++++++++++++++++++++++++ middleware/etcd/string_test.go | 14 +++++++++++ 2 files changed, 57 insertions(+) create mode 100644 middleware/etcd/lease.go diff --git a/middleware/etcd/lease.go b/middleware/etcd/lease.go new file mode 100644 index 0000000..4cc8966 --- /dev/null +++ b/middleware/etcd/lease.go @@ -0,0 +1,43 @@ +// Package etcd ... +// +// Description : 租约相关操作 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2021-11-23 6:03 下午 +package etcd + +import ( + "context" + + "github.com/pkg/errors" + "go.etcd.io/etcd/clientv3" +) + +// LeaseOnce 申请一个一次性租约 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 6:06 下午 2021/11/23 +func LeaseOnce(ctx context.Context, key string, val string, ttl int64) error { + if ttl <= 0 { + return errors.New("lease time must be more than 0") + } + if nil == ctx { + ctx = context.TODO() + } + var ( + resp *clientv3.LeaseGrantResponse + err error + ) + // 创建一个5秒的租约 + if resp, err = Client.Grant(ctx, ttl); err != nil { + return errors.New("lease grant error : " + err.Error()) + } + + // ttl 秒钟之后, 这个key就会被移除 + if _, err = Client.Put(context.TODO(), key, val, clientv3.WithLease(resp.ID)); err != nil { + return errors.New("lease key put fail : " + err.Error()) + } + return nil +} diff --git a/middleware/etcd/string_test.go b/middleware/etcd/string_test.go index 031a7ad..c7f8c9a 100644 --- a/middleware/etcd/string_test.go +++ b/middleware/etcd/string_test.go @@ -159,3 +159,17 @@ func TestWatchKeyOnceForTimeout(t *testing.T) { WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) } + +// TestLeaseOnce ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 7:37 下午 2021/11/23 +func TestLeaseOnce(t *testing.T) { + key := "lock" + fmt.Println(LeaseOnce(nil, key, "lock", 10)) + for i := 0; i < 15; i++ { + fmt.Println(Get(nil, key, 1)) + time.Sleep(time.Second) + } +} -- 2.36.6 From a9eabd0f195ebec4f691c0be77114b0f4b438786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Tue, 23 Nov 2021 20:02:15 +0800 Subject: [PATCH 2/5] =?UTF-8?q?lease=E5=A2=9E=E5=8A=A0=E6=B0=B8=E4=B9=85?= =?UTF-8?q?=E7=BB=AD=E6=9C=9F=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/etcd/define.go | 3 +++ middleware/etcd/lease.go | 39 ++++++++++++++++++++++++++++++++++ middleware/etcd/string_test.go | 20 +++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/middleware/etcd/define.go b/middleware/etcd/define.go index 3b1c3aa..3a65392 100644 --- a/middleware/etcd/define.go +++ b/middleware/etcd/define.go @@ -44,3 +44,6 @@ type CancelWatcherHandler func(key string, data interface{}) // TimeoutWatcherHandler 超时之后的回调函数 type TimeoutWatcherHandler func(key string, timeout time.Duration) + +// LeaseKeepALiveHandler 续期成功的处理 +type LeaseKeepALiveHandler func(key string, leaseDetail *clientv3.LeaseKeepAliveResponse) diff --git a/middleware/etcd/lease.go b/middleware/etcd/lease.go index 4cc8966..22f4108 100644 --- a/middleware/etcd/lease.go +++ b/middleware/etcd/lease.go @@ -41,3 +41,42 @@ func LeaseOnce(ctx context.Context, key string, val string, ttl int64) error { } return nil } + +// LeaseKeepAliveForever 无限续租一个key +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 7:40 下午 2021/11/23 +func LeaseKeepAliveForever(ctx context.Context, key string, val string, ttl int64, keepAliveHandler LeaseKeepALiveHandler) error { + if ttl <= 0 { + return errors.New("lease time must be more than 0") + } + if nil == ctx { + ctx = context.TODO() + } + var ( + resp *clientv3.LeaseGrantResponse + respChan <-chan *clientv3.LeaseKeepAliveResponse + err error + ) + // 创建一个5秒的租约 + if resp, err = Client.Grant(ctx, ttl); err != nil { + return errors.New("lease grant error : " + err.Error()) + } + + // ttl 秒钟之后, 这个key就会被移除 + if _, err = Client.Put(context.TODO(), key, val, clientv3.WithLease(resp.ID)); err != nil { + return errors.New("lease key put fail : " + err.Error()) + } + // the key will be kept forever + if respChan, err = Client.KeepAlive(ctx, resp.ID); nil != err { + return errors.New("lease keep alive fail : " + err.Error()) + } + // 监听 chan + for ka := range respChan { + if nil != keepAliveHandler { + keepAliveHandler(key, ka) + } + } + return nil +} diff --git a/middleware/etcd/string_test.go b/middleware/etcd/string_test.go index c7f8c9a..4fb4780 100644 --- a/middleware/etcd/string_test.go +++ b/middleware/etcd/string_test.go @@ -173,3 +173,23 @@ func TestLeaseOnce(t *testing.T) { time.Sleep(time.Second) } } + +// TestLeaseKeepAliveForever ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 7:54 下午 2021/11/23 +func TestLeaseKeepAliveForever(t *testing.T) { + key := "lock" + keepAliveHandler := func(key string, data *clientv3.LeaseKeepAliveResponse) { + fmt.Println(key, data.ID, data.TTL, data.Size(), data.String()) + } + go func() { + fmt.Println(LeaseKeepAliveForever(nil, key, "lock", 10, keepAliveHandler)) + }() + for i := 0; i < 15; i++ { + r, e := Get(nil, key, 1) + fmt.Println("读取", r, e) + time.Sleep(time.Second) + } +} -- 2.36.6 From 43cf6f27c9e1c14d064ea9ca5d90831b1be50526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 24 Nov 2021 00:06:07 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=B8=A6=E6=AC=A1?= =?UTF-8?q?=E6=95=B0=E7=9A=84=E7=A7=9F=E7=BA=A6=E7=BB=AD=E6=9C=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/etcd/define.go | 19 ++++++- middleware/etcd/lease.go | 93 +++++++++++++++++++++++++++++++++- middleware/etcd/string_test.go | 24 ++++++++- 3 files changed, 132 insertions(+), 4 deletions(-) diff --git a/middleware/etcd/define.go b/middleware/etcd/define.go index 3a65392..630176e 100644 --- a/middleware/etcd/define.go +++ b/middleware/etcd/define.go @@ -46,4 +46,21 @@ type CancelWatcherHandler func(key string, data interface{}) type TimeoutWatcherHandler func(key string, timeout time.Duration) // LeaseKeepALiveHandler 续期成功的处理 -type LeaseKeepALiveHandler func(key string, leaseDetail *clientv3.LeaseKeepAliveResponse) +type LeaseKeepALiveHandler func(data *LeaseKeepAliveData) + +// LeaseKeepAliveData 自动续期的数据结构 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:28 下午 2021/11/23 +type LeaseKeepAliveData struct { + Key string `json:"key"` // 续期key + StartTime int64 `json:"start_time"` // 开始续期时间 + LastLeaseTime int64 `json:"last_lease_time"` // 上一次续期事件时间 + LeaseCnt int64 `json:"lease_cnt"` // 续期次数 + HasFinish bool `json:"has_finish"` // 是否完成 + LeaseFinishType string `json:"lease_finish_type"` // 续期完成类型 + LeaseFinishTime int64 `json:"lease_finish_time"` // 续期完成时间 + LeaseDetail *clientv3.LeaseKeepAliveResponse `json:"lease_detail"` // 续约数据 + Data map[string]interface{} `json:"data"` // 携带的数据 +} diff --git a/middleware/etcd/lease.go b/middleware/etcd/lease.go index 22f4108..21822c1 100644 --- a/middleware/etcd/lease.go +++ b/middleware/etcd/lease.go @@ -9,6 +9,7 @@ package etcd import ( "context" + "time" "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" @@ -72,11 +73,101 @@ func LeaseKeepAliveForever(ctx context.Context, key string, val string, ttl int6 if respChan, err = Client.KeepAlive(ctx, resp.ID); nil != err { return errors.New("lease keep alive fail : " + err.Error()) } + leaseData := &LeaseKeepAliveData{ + Key: key, + StartTime: time.Now().Unix(), + LastLeaseTime: 0, + LeaseCnt: 0, + HasFinish: false, + LeaseFinishType: "", + LeaseFinishTime: 0, + LeaseDetail: nil, + Data: make(map[string]interface{}), + } // 监听 chan for ka := range respChan { + leaseData.LeaseCnt++ + leaseData.LeaseDetail = ka + leaseData.LastLeaseTime = time.Now().Unix() if nil != keepAliveHandler { - keepAliveHandler(key, ka) + keepAliveHandler(leaseData) } } return nil } + +// LeaseKeepAliveWithDuration 设置最大支持续期的时间, 中途可随时取消 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:05 下午 2021/11/23 +func LeaseKeepAliveWithDuration(ctx context.Context, key string, val string, ttl int64, keepAliveHandler LeaseKeepALiveHandler, cancelLeaseChan chan *LeaseKeepAliveData, maxCnt int64) (*LeaseKeepAliveData, error) { + if ttl <= 0 { + return nil, errors.New("lease time must be more than 0") + } + + if nil == cancelLeaseChan { + cancelLeaseChan = make(chan *LeaseKeepAliveData, 1) + } + var cancelFunc context.CancelFunc + if nil == ctx { + ctx, cancelFunc = context.WithCancel(context.Background()) + } else { + ctx, cancelFunc = context.WithCancel(ctx) + } + defer cancelFunc() + var ( + resp *clientv3.LeaseGrantResponse + respChan <-chan *clientv3.LeaseKeepAliveResponse + err error + ) + leaseData := &LeaseKeepAliveData{ + Key: key, + StartTime: 0, + LastLeaseTime: 0, + LeaseCnt: 0, + LeaseFinishType: "", + LeaseFinishTime: 0, + Data: make(map[string]interface{}), + HasFinish: false, + } + // 创建一个 ttl 秒的租约 + if resp, err = Client.Grant(ctx, ttl); err != nil { + return nil, errors.New("lease grant error : " + err.Error()) + } + leaseData.StartTime = time.Now().Unix() + + // ttl 秒钟之后, 这个key就会被移除 + if _, err = Client.Put(context.TODO(), key, val, clientv3.WithLease(resp.ID)); err != nil { + return nil, errors.New("lease key put fail : " + err.Error()) + } + // the key will be kept forever + if respChan, err = Client.KeepAlive(ctx, resp.ID); nil != err { + return nil, errors.New("lease keep alive fail : " + err.Error()) + } + + for { + select { + case <-cancelLeaseChan: + leaseData.HasFinish = true + leaseData.LeaseFinishTime = time.Now().Unix() + leaseData.LeaseFinishType = "SIGNAL_CANCEL" + case leaseResp := <-respChan: + leaseData.LeaseCnt++ + leaseData.LastLeaseTime = time.Now().Unix() + leaseData.LeaseDetail = leaseResp + if nil != keepAliveHandler { + keepAliveHandler(leaseData) + } + if leaseData.LeaseCnt >= maxCnt { + leaseData.HasFinish = true + leaseData.LeaseFinishType = "OVER_MAX_CNT" + leaseData.LeaseFinishTime = time.Now().Unix() + } + } + if leaseData.HasFinish { + break + } + } + return leaseData, nil +} diff --git a/middleware/etcd/string_test.go b/middleware/etcd/string_test.go index 4fb4780..21c2805 100644 --- a/middleware/etcd/string_test.go +++ b/middleware/etcd/string_test.go @@ -181,8 +181,8 @@ func TestLeaseOnce(t *testing.T) { // Date : 7:54 下午 2021/11/23 func TestLeaseKeepAliveForever(t *testing.T) { key := "lock" - keepAliveHandler := func(key string, data *clientv3.LeaseKeepAliveResponse) { - fmt.Println(key, data.ID, data.TTL, data.Size(), data.String()) + keepAliveHandler := func(data *LeaseKeepAliveData) { + fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) } go func() { fmt.Println(LeaseKeepAliveForever(nil, key, "lock", 10, keepAliveHandler)) @@ -193,3 +193,23 @@ func TestLeaseKeepAliveForever(t *testing.T) { time.Sleep(time.Second) } } + +// TestLeaseKeepAliveWithDuration ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:56 下午 2021/11/23 +func TestLeaseKeepAliveWithDuration(t *testing.T) { + key := "lock" + keepAliveHandler := func(data *LeaseKeepAliveData) { + fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) + } + go func() { + fmt.Println(LeaseKeepAliveWithDuration(nil, key, "lock", 1, keepAliveHandler, nil, 5)) + }() + for i := 0; i < 15; i++ { + r, e := Get(nil, key, 1) + fmt.Println("读取", r, e) + time.Sleep(time.Second) + } +} -- 2.36.6 From c21e16138d5dc3c5ee46033d83b4c815f6efcbb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 24 Nov 2021 00:08:14 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=80=E6=AC=A1?= =?UTF-8?q?=E8=A1=8C=E7=9B=91=E5=90=AC=E4=B8=BA=E5=8F=96=E6=B6=88=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/etcd/lease.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/middleware/etcd/lease.go b/middleware/etcd/lease.go index 21822c1..eb04d6d 100644 --- a/middleware/etcd/lease.go +++ b/middleware/etcd/lease.go @@ -24,13 +24,18 @@ func LeaseOnce(ctx context.Context, key string, val string, ttl int64) error { if ttl <= 0 { return errors.New("lease time must be more than 0") } + if nil == ctx { - ctx = context.TODO() + ctx = context.Background() } + var ( - resp *clientv3.LeaseGrantResponse - err error + resp *clientv3.LeaseGrantResponse + err error + cancelFunc context.CancelFunc ) + ctx, cancelFunc = context.WithCancel(ctx) + defer cancelFunc() // 创建一个5秒的租约 if resp, err = Client.Grant(ctx, ttl); err != nil { return errors.New("lease grant error : " + err.Error()) -- 2.36.6 From 7bef73334ce833236570ef71134ff6ee1572cf68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 24 Nov 2021 00:11:14 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dwatcher=E5=8F=96=E6=B6=88?= =?UTF-8?q?=E5=90=8E=EF=BC=8C=E7=9B=91=E5=90=AC=E5=8D=8F=E7=A8=8B=E4=BB=8D?= =?UTF-8?q?=E5=9C=A8=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/etcd/watch.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/middleware/etcd/watch.go b/middleware/etcd/watch.go index 4e43dc7..9340e75 100644 --- a/middleware/etcd/watch.go +++ b/middleware/etcd/watch.go @@ -13,7 +13,7 @@ import ( "time" ) -// WatchKey 监听key的变化 +// WatchKey 监听key的变化,永久监听 // // Author : go_developer@163.com<白茶清欢> // @@ -26,6 +26,7 @@ func WatchKey(ctx context.Context, watchKey string, callbackFunc WatcherHandler) if nil == ctx { ctx = context.Background() } + rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse for watchResp := range rch { for _, ev := range watchResp.Events { @@ -47,6 +48,11 @@ func WatchKeyWithCancel(ctx context.Context, watchKey string, callbackFunc Watch if nil == ctx { ctx = context.Background() } + var ( + cancelFunc context.CancelFunc + ) + ctx, cancelFunc = context.WithCancel(ctx) + defer cancelFunc() rch := Client.Watch(context.Background(), watchKey) // <-chan WatchResponse @@ -92,6 +98,11 @@ func WatchKeyOnce(ctx context.Context, watchKey string, callbackFunc WatcherHand if nil == ctx { ctx = context.Background() } + var ( + cancelFunc context.CancelFunc + ) + ctx, cancelFunc = context.WithCancel(ctx) + defer cancelFunc() rch := Client.Watch(ctx, watchKey) // <-chan WatchResponse select { case <-time.After(timeout): -- 2.36.6