From 43cf6f27c9e1c14d064ea9ca5d90831b1be50526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 24 Nov 2021 00:06:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=B8=A6=E6=AC=A1=E6=95=B0?= =?UTF-8?q?=E7=9A=84=E7=A7=9F=E7=BA=A6=E7=BB=AD=E6=9C=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/etcd/define.go | 19 ++++++- middleware/etcd/lease.go | 93 +++++++++++++++++++++++++++++++++- middleware/etcd/string_test.go | 24 ++++++++- 3 files changed, 132 insertions(+), 4 deletions(-) diff --git a/middleware/etcd/define.go b/middleware/etcd/define.go index 3a65392..630176e 100644 --- a/middleware/etcd/define.go +++ b/middleware/etcd/define.go @@ -46,4 +46,21 @@ type CancelWatcherHandler func(key string, data interface{}) type TimeoutWatcherHandler func(key string, timeout time.Duration) // LeaseKeepALiveHandler 续期成功的处理 -type LeaseKeepALiveHandler func(key string, leaseDetail *clientv3.LeaseKeepAliveResponse) +type LeaseKeepALiveHandler func(data *LeaseKeepAliveData) + +// LeaseKeepAliveData 自动续期的数据结构 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:28 下午 2021/11/23 +type LeaseKeepAliveData struct { + Key string `json:"key"` // 续期key + StartTime int64 `json:"start_time"` // 开始续期时间 + LastLeaseTime int64 `json:"last_lease_time"` // 上一次续期事件时间 + LeaseCnt int64 `json:"lease_cnt"` // 续期次数 + HasFinish bool `json:"has_finish"` // 是否完成 + LeaseFinishType string `json:"lease_finish_type"` // 续期完成类型 + LeaseFinishTime int64 `json:"lease_finish_time"` // 续期完成时间 + LeaseDetail *clientv3.LeaseKeepAliveResponse `json:"lease_detail"` // 续约数据 + Data map[string]interface{} `json:"data"` // 携带的数据 +} diff --git a/middleware/etcd/lease.go b/middleware/etcd/lease.go index 22f4108..21822c1 100644 --- a/middleware/etcd/lease.go +++ b/middleware/etcd/lease.go @@ -9,6 +9,7 @@ package etcd import ( "context" + "time" "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" @@ -72,11 +73,101 @@ func LeaseKeepAliveForever(ctx context.Context, key string, val string, ttl int6 if respChan, err = Client.KeepAlive(ctx, resp.ID); nil != err { return errors.New("lease keep alive fail : " + err.Error()) } + leaseData := &LeaseKeepAliveData{ + Key: key, + StartTime: time.Now().Unix(), + LastLeaseTime: 0, + LeaseCnt: 0, + HasFinish: false, + LeaseFinishType: "", + LeaseFinishTime: 0, + LeaseDetail: nil, + Data: make(map[string]interface{}), + } // 监听 chan for ka := range respChan { + leaseData.LeaseCnt++ + leaseData.LeaseDetail = ka + leaseData.LastLeaseTime = time.Now().Unix() if nil != keepAliveHandler { - keepAliveHandler(key, ka) + keepAliveHandler(leaseData) } } return nil } + +// LeaseKeepAliveWithDuration 设置最大支持续期的时间, 中途可随时取消 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:05 下午 2021/11/23 +func LeaseKeepAliveWithDuration(ctx context.Context, key string, val string, ttl int64, keepAliveHandler LeaseKeepALiveHandler, cancelLeaseChan chan *LeaseKeepAliveData, maxCnt int64) (*LeaseKeepAliveData, error) { + if ttl <= 0 { + return nil, errors.New("lease time must be more than 0") + } + + if nil == cancelLeaseChan { + cancelLeaseChan = make(chan *LeaseKeepAliveData, 1) + } + var cancelFunc context.CancelFunc + if nil == ctx { + ctx, cancelFunc = context.WithCancel(context.Background()) + } else { + ctx, cancelFunc = context.WithCancel(ctx) + } + defer cancelFunc() + var ( + resp *clientv3.LeaseGrantResponse + respChan <-chan *clientv3.LeaseKeepAliveResponse + err error + ) + leaseData := &LeaseKeepAliveData{ + Key: key, + StartTime: 0, + LastLeaseTime: 0, + LeaseCnt: 0, + LeaseFinishType: "", + LeaseFinishTime: 0, + Data: make(map[string]interface{}), + HasFinish: false, + } + // 创建一个 ttl 秒的租约 + if resp, err = Client.Grant(ctx, ttl); err != nil { + return nil, errors.New("lease grant error : " + err.Error()) + } + leaseData.StartTime = time.Now().Unix() + + // ttl 秒钟之后, 这个key就会被移除 + if _, err = Client.Put(context.TODO(), key, val, clientv3.WithLease(resp.ID)); err != nil { + return nil, errors.New("lease key put fail : " + err.Error()) + } + // the key will be kept forever + if respChan, err = Client.KeepAlive(ctx, resp.ID); nil != err { + return nil, errors.New("lease keep alive fail : " + err.Error()) + } + + for { + select { + case <-cancelLeaseChan: + leaseData.HasFinish = true + leaseData.LeaseFinishTime = time.Now().Unix() + leaseData.LeaseFinishType = "SIGNAL_CANCEL" + case leaseResp := <-respChan: + leaseData.LeaseCnt++ + leaseData.LastLeaseTime = time.Now().Unix() + leaseData.LeaseDetail = leaseResp + if nil != keepAliveHandler { + keepAliveHandler(leaseData) + } + if leaseData.LeaseCnt >= maxCnt { + leaseData.HasFinish = true + leaseData.LeaseFinishType = "OVER_MAX_CNT" + leaseData.LeaseFinishTime = time.Now().Unix() + } + } + if leaseData.HasFinish { + break + } + } + return leaseData, nil +} diff --git a/middleware/etcd/string_test.go b/middleware/etcd/string_test.go index 4fb4780..21c2805 100644 --- a/middleware/etcd/string_test.go +++ b/middleware/etcd/string_test.go @@ -181,8 +181,8 @@ func TestLeaseOnce(t *testing.T) { // Date : 7:54 下午 2021/11/23 func TestLeaseKeepAliveForever(t *testing.T) { key := "lock" - keepAliveHandler := func(key string, data *clientv3.LeaseKeepAliveResponse) { - fmt.Println(key, data.ID, data.TTL, data.Size(), data.String()) + keepAliveHandler := func(data *LeaseKeepAliveData) { + fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) } go func() { fmt.Println(LeaseKeepAliveForever(nil, key, "lock", 10, keepAliveHandler)) @@ -193,3 +193,23 @@ func TestLeaseKeepAliveForever(t *testing.T) { time.Sleep(time.Second) } } + +// TestLeaseKeepAliveWithDuration ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:56 下午 2021/11/23 +func TestLeaseKeepAliveWithDuration(t *testing.T) { + key := "lock" + keepAliveHandler := func(data *LeaseKeepAliveData) { + fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) + } + go func() { + fmt.Println(LeaseKeepAliveWithDuration(nil, key, "lock", 1, keepAliveHandler, nil, 5)) + }() + for i := 0; i < 15; i++ { + r, e := Get(nil, key, 1) + fmt.Println("读取", r, e) + time.Sleep(time.Second) + } +}