diff --git a/go.mod b/go.mod index 273f21e..4bad366 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ replace google.golang.org/grpc => google.golang.org/grpc v1.26.0 require ( github.com/Shopify/sarama v1.30.0 + github.com/coreos/etcd v3.3.27+incompatible github.com/ddliu/go-httpclient v0.6.9 github.com/gin-gonic/gin v1.7.5 github.com/go-redis/redis/v8 v8.11.4 @@ -32,7 +33,6 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/coreos/bbolt v1.3.4 // indirect - github.com/coreos/etcd v3.3.27+incompatible // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect diff --git a/middleware/etcd/define.go b/middleware/etcd/define.go index 630176e..d2b3f59 100644 --- a/middleware/etcd/define.go +++ b/middleware/etcd/define.go @@ -34,6 +34,8 @@ const ( DefaultPutTimeout = time.Second // DefaultGetTimeout get 默认超时时间 DefaultGetTimeout = time.Second + // DefaultDeleteTimeout 删除的超时时间 + DefaultDeleteTimeout = time.Second ) // WatcherHandler 监听key变化的处理函数 diff --git a/middleware/etcd/delete.go b/middleware/etcd/delete.go new file mode 100644 index 0000000..0a62445 --- /dev/null +++ b/middleware/etcd/delete.go @@ -0,0 +1,54 @@ +// Package etcd... +// +// Description : 删除相关操作 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2021-11-24 12:42 下午 +package etcd + +import ( + "context" + "errors" + "time" + + "go.etcd.io/etcd/clientv3" +) + +// DeleteWithOption ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2:02 下午 2021/11/24 +func DeleteWithOption(ctx context.Context, key string, timeout time.Duration, optionList ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + if nil == ctx { + ctx = context.Background() + } + + if timeout <= 0 { + timeout = DefaultDeleteTimeout + } + + var ( + cancelFunc context.CancelFunc + deleteResponse *clientv3.DeleteResponse + err error + ) + ctx, cancelFunc = context.WithTimeout(ctx, timeout) + defer cancelFunc() + + if deleteResponse, err = Client.Delete(ctx, key, optionList...); nil != err { + return nil, errors.New("delete key fail : " + err.Error()) + } + + return deleteResponse, nil +} + +// DeleteWithKeyPrefix 基于 key 前缀, 删除key +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2:12 下午 2021/11/24 +func DeleteWithKeyPrefix(ctx context.Context, keyPrefix string, timeout time.Duration) (*clientv3.DeleteResponse, error) { + return DeleteWithOption(ctx, keyPrefix, timeout, clientv3.WithPrefix()) +} diff --git a/middleware/etcd/delete_test.go b/middleware/etcd/delete_test.go new file mode 100644 index 0000000..dbcbf49 --- /dev/null +++ b/middleware/etcd/delete_test.go @@ -0,0 +1,22 @@ +// Package etcd... +// +// Description : etcd... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2021-11-24 2:11 下午 +package etcd + +import ( + "fmt" + "testing" +) + +// TestDeleteWithKeyPrefix ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2:11 下午 2021/11/24 +func TestDeleteWithKeyPrefix(t *testing.T) { + fmt.Println(DeleteWithKeyPrefix(nil, "/test", 0)) +} diff --git a/middleware/etcd/lease.go b/middleware/etcd/lease.go index eb04d6d..2402985 100644 --- a/middleware/etcd/lease.go +++ b/middleware/etcd/lease.go @@ -42,10 +42,11 @@ func LeaseOnce(ctx context.Context, key string, val string, ttl int64) error { } // ttl 秒钟之后, 这个key就会被移除 - if _, err = Client.Put(context.TODO(), key, val, clientv3.WithLease(resp.ID)); err != nil { + if _, err = Client.Put(ctx, key, val, clientv3.WithLease(resp.ID)); err != nil { return errors.New("lease key put fail : " + err.Error()) } - return nil + _, err = Client.KeepAliveOnce(ctx, resp.ID) + return err } // LeaseKeepAliveForever 无限续租一个key diff --git a/middleware/etcd/lease_test.go b/middleware/etcd/lease_test.go new file mode 100644 index 0000000..d90da03 --- /dev/null +++ b/middleware/etcd/lease_test.go @@ -0,0 +1,68 @@ +// Package etcd... +// +// Description : etcd... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2021-11-24 11:40 上午 +package etcd + +import ( + "fmt" + "testing" + "time" +) + +// TestLeaseOnce ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 7:37 下午 2021/11/23 +func TestLeaseOnce(t *testing.T) { + key := "lock" + fmt.Println(LeaseOnce(nil, key, "lock", 10)) + for i := 0; i < 15; i++ { + fmt.Println(Get(nil, key, 1)) + time.Sleep(time.Second) + } +} + +// TestLeaseKeepAliveForever ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 7:54 下午 2021/11/23 +func TestLeaseKeepAliveForever(t *testing.T) { + key := "lock" + keepAliveHandler := func(data *LeaseKeepAliveData) { + fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) + } + go func() { + fmt.Println(LeaseKeepAliveForever(nil, key, "lock", 10, keepAliveHandler)) + }() + for i := 0; i < 15; i++ { + r, e := Get(nil, key, 1) + fmt.Println("读取", r, e) + time.Sleep(time.Second) + } +} + +// TestLeaseKeepAliveWithDuration ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:56 下午 2021/11/23 +func TestLeaseKeepAliveWithDuration(t *testing.T) { + key := "lock" + keepAliveHandler := func(data *LeaseKeepAliveData) { + fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) + } + go func() { + fmt.Println(LeaseKeepAliveWithDuration(nil, key, "lock", 1, keepAliveHandler, nil, 5)) + }() + for i := 0; i < 15; i++ { + r, e := Get(nil, key, 1) + fmt.Println("读取", r, e) + time.Sleep(time.Second) + } +} diff --git a/middleware/etcd/string.go b/middleware/etcd/string.go index 9186483..49d7088 100644 --- a/middleware/etcd/string.go +++ b/middleware/etcd/string.go @@ -37,12 +37,12 @@ func Put(ctx context.Context, key string, val string, operateTimeout time.Durati return err } -// Get 读取数据 +// GetWithOption 使用各种option选项读取数据 // // Author : go_developer@163.com<白茶清欢> // -// Date : 12:09 下午 2021/11/23 -func Get(ctx context.Context, key string, operateTimeout time.Duration) ( +// Date : 11:17 上午 2021/11/24 +func GetWithOption(ctx context.Context, key string, operateTimeout time.Duration, optionList ...clientv3.OpOption) ( []*mvccpb.KeyValue, error, ) { @@ -57,11 +57,45 @@ func Get(ctx context.Context, key string, operateTimeout time.Duration) ( if nil == ctx { ctx = context.Background() } - ctx, cancel = context.WithTimeout(context.Background(), time.Second) - result, err = Client.Get(ctx, key) - cancel() - if err != nil { + ctx, cancel = context.WithTimeout(context.Background(), operateTimeout) + defer cancel() + if result, err = Client.Get(ctx, key, optionList...); err != nil { return nil, err } return result.Kvs, nil } + +// Get 读取数据,按照key精确匹配 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:09 下午 2021/11/23 +func Get(ctx context.Context, key string, operateTimeout time.Duration) ( + []*mvccpb.KeyValue, + error, +) { + return GetWithOption(ctx, key, operateTimeout) +} + +// GetWithPrefix 使用key前缀查找 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 10:58 上午 2021/11/24 +func GetWithPrefix(ctx context.Context, keyPrefix string, operateTimeout time.Duration) ( + map[string]*mvccpb.KeyValue, + error, +) { + var ( + result []*mvccpb.KeyValue + err error + ) + if result, err = GetWithOption(ctx, keyPrefix, operateTimeout, clientv3.WithPrefix()); nil != err { + return make(map[string]*mvccpb.KeyValue), err + } + formatResult := make(map[string]*mvccpb.KeyValue) + for _, item := range result { + formatResult[string(item.Key)] = item + } + return formatResult, nil +} diff --git a/middleware/etcd/string_test.go b/middleware/etcd/string_test.go index 21c2805..c5aca44 100644 --- a/middleware/etcd/string_test.go +++ b/middleware/etcd/string_test.go @@ -10,7 +10,8 @@ package etcd import ( "fmt" "testing" - "time" + + "git.zhangdeman.cn/zhangdeman/gopkg/util" "go.etcd.io/etcd/clientv3" ) @@ -42,174 +43,15 @@ func TestGet(t *testing.T) { fmt.Println(Get(nil, "name", 0)) } -// TestWatchKey ... +// TestGetWithPrefix 根据key前缀读取数据 // // Author : go_developer@163.com<白茶清欢> // -// Date : 3:06 下午 2021/11/23 -func TestWatchKey(t *testing.T) { - key := "name" - dealFunc := func(data *clientv3.Event) { - fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) - } - go func() { - for i := 0; i < 30; i++ { - _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) - time.Sleep(time.Second) - } - }() - WatchKey(nil, key, dealFunc) -} - -// TestWatchKeyWithCancel ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 3:57 下午 2021/11/23 -func TestWatchKeyWithCancel(t *testing.T) { - key := "name" - dealFunc := func(data *clientv3.Event) { - fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) - } - cancelFunc := func(key string, data interface{}) { - fmt.Println("取消监听 : ", key, data) - } - cancelChan := make(chan interface{}, 1) - go func() { - for i := 0; i < 30; i++ { - _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) - time.Sleep(time.Second) - } - time.Sleep(10 * time.Second) - cancelChan <- "Hello World" - }() - WatchKeyWithCancel(nil, key, dealFunc, cancelChan, cancelFunc) -} - -// TestWatchKeyWithCancelByChangeCallback ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 4:27 下午 2021/11/23 -func TestWatchKeyWithCancelByChangeCallback(t *testing.T) { - key := "name" - cancelChan := make(chan interface{}, 1) - - dealFunc := func(data *clientv3.Event) { - fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) - if string(data.Kv.Value) == "test-29" { - cancelChan <- "Hello World!" - } - } - - cancelFunc := func(key string, data interface{}) { - fmt.Println("取消监听 : ", key, data) - } - - go func() { - for i := 0; i < 30; i++ { - _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) - time.Sleep(time.Second) - } - }() - WatchKeyWithCancel(nil, key, dealFunc, cancelChan, cancelFunc) -} - -// TestWatchKeyOnce ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 4:50 下午 2021/11/23 -func TestWatchKeyOnce(t *testing.T) { - key := "name" - dealFunc := func(data *clientv3.Event) { - fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) - } - - timeoutFunc := func(key string, timeout time.Duration) { - fmt.Println("监听超时", key, timeout) - } - - timeout := 10 * time.Second - go func() { - for i := 0; i < 30; i++ { - _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) - time.Sleep(time.Second) - } - }() - WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) -} - -// TestWatchKeyOnceForTimeout ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 4:53 下午 2021/11/23 -func TestWatchKeyOnceForTimeout(t *testing.T) { - key := "name" - dealFunc := func(data *clientv3.Event) { - fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) - } - - timeoutFunc := func(key string, timeout time.Duration) { - fmt.Println("监听超时", key, timeout) - } - - timeout := time.Second - - WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) -} - -// TestLeaseOnce ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 7:37 下午 2021/11/23 -func TestLeaseOnce(t *testing.T) { - key := "lock" - fmt.Println(LeaseOnce(nil, key, "lock", 10)) - for i := 0; i < 15; i++ { - fmt.Println(Get(nil, key, 1)) - time.Sleep(time.Second) - } -} - -// TestLeaseKeepAliveForever ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 7:54 下午 2021/11/23 -func TestLeaseKeepAliveForever(t *testing.T) { - key := "lock" - keepAliveHandler := func(data *LeaseKeepAliveData) { - fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) - } - go func() { - fmt.Println(LeaseKeepAliveForever(nil, key, "lock", 10, keepAliveHandler)) - }() - for i := 0; i < 15; i++ { - r, e := Get(nil, key, 1) - fmt.Println("读取", r, e) - time.Sleep(time.Second) - } -} - -// TestLeaseKeepAliveWithDuration ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 11:56 下午 2021/11/23 -func TestLeaseKeepAliveWithDuration(t *testing.T) { - key := "lock" - keepAliveHandler := func(data *LeaseKeepAliveData) { - fmt.Println(key, data.LeaseDetail.ID, data.LeaseDetail.TTL, data.LeaseCnt) - } - go func() { - fmt.Println(LeaseKeepAliveWithDuration(nil, key, "lock", 1, keepAliveHandler, nil, 5)) - }() - for i := 0; i < 15; i++ { - r, e := Get(nil, key, 1) - fmt.Println("读取", r, e) - time.Sleep(time.Second) +// Date : 11:01 上午 2021/11/24 +func TestGetWithPrefix(t *testing.T) { + prefix := "/test/dir/" + for i := 0; i < 10; i++ { + _ = Put(nil, fmt.Sprintf("%s%d", prefix, i), util.GenRandomString("", 8), 0) } + fmt.Println(GetWithPrefix(nil, prefix, 0)) } diff --git a/middleware/etcd/watch_test.go b/middleware/etcd/watch_test.go new file mode 100644 index 0000000..18dd513 --- /dev/null +++ b/middleware/etcd/watch_test.go @@ -0,0 +1,134 @@ +// Package etcd... +// +// Description : etcd... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2021-11-24 11:40 上午 +package etcd + +import ( + "fmt" + "testing" + "time" + + "go.etcd.io/etcd/clientv3" +) + +// TestWatchKey ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 3:06 下午 2021/11/23 +func TestWatchKey(t *testing.T) { + key := "name" + dealFunc := func(data *clientv3.Event) { + fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) + } + go func() { + for i := 0; i < 30; i++ { + _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) + time.Sleep(time.Second) + } + }() + WatchKey(nil, key, dealFunc) +} + +// TestWatchKeyWithCancel ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 3:57 下午 2021/11/23 +func TestWatchKeyWithCancel(t *testing.T) { + key := "name" + dealFunc := func(data *clientv3.Event) { + fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) + } + cancelFunc := func(key string, data interface{}) { + fmt.Println("取消监听 : ", key, data) + } + cancelChan := make(chan interface{}, 1) + go func() { + for i := 0; i < 30; i++ { + _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) + time.Sleep(time.Second) + } + time.Sleep(10 * time.Second) + cancelChan <- "Hello World" + }() + WatchKeyWithCancel(nil, key, dealFunc, cancelChan, cancelFunc) +} + +// TestWatchKeyWithCancelByChangeCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 4:27 下午 2021/11/23 +func TestWatchKeyWithCancelByChangeCallback(t *testing.T) { + key := "name" + cancelChan := make(chan interface{}, 1) + + dealFunc := func(data *clientv3.Event) { + fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) + if string(data.Kv.Value) == "test-29" { + cancelChan <- "Hello World!" + } + } + + cancelFunc := func(key string, data interface{}) { + fmt.Println("取消监听 : ", key, data) + } + + go func() { + for i := 0; i < 30; i++ { + _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) + time.Sleep(time.Second) + } + }() + WatchKeyWithCancel(nil, key, dealFunc, cancelChan, cancelFunc) +} + +// TestWatchKeyOnce ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 4:50 下午 2021/11/23 +func TestWatchKeyOnce(t *testing.T) { + key := "name" + dealFunc := func(data *clientv3.Event) { + fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) + } + + timeoutFunc := func(key string, timeout time.Duration) { + fmt.Println("监听超时", key, timeout) + } + + timeout := 10 * time.Second + go func() { + for i := 0; i < 30; i++ { + _ = Put(nil, key, fmt.Sprintf("test-%d", i), 0) + time.Sleep(time.Second) + } + }() + WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) +} + +// TestWatchKeyOnceForTimeout ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 4:53 下午 2021/11/23 +func TestWatchKeyOnceForTimeout(t *testing.T) { + key := "name" + dealFunc := func(data *clientv3.Event) { + fmt.Println(string(data.Kv.Key), string(data.Kv.Value), data.Kv.Version, data.Kv.CreateRevision, data.Kv.ModRevision) + } + + timeoutFunc := func(key string, timeout time.Duration) { + fmt.Println("监听超时", key, timeout) + } + + timeout := time.Second + + WatchKeyOnce(nil, key, dealFunc, timeout, timeoutFunc) +}