From 943d5f36cb067f845176bbaf4a181ee618af0c2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Tue, 25 Jun 2024 16:47:29 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=84=E5=88=92=E5=9F=BA=E4=BA=8Eredis=20pub?= =?UTF-8?q?/sub=20=E7=9A=84=E9=A9=B1=E5=8A=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 6 --- define/data.go | 11 +++-- define/redis.go | 22 +++++++++ go.mod | 22 +++++++++ go.sum | 40 ++++++++++++++++ redis_pub_sub.go | 112 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 203 insertions(+), 10 deletions(-) create mode 100644 define/redis.go create mode 100644 redis_pub_sub.go diff --git a/abstract/IEvent.go b/abstract/IEvent.go index 70dfa96..54029fe 100644 --- a/abstract/IEvent.go +++ b/abstract/IEvent.go @@ -18,12 +18,6 @@ import ( // // Date : 19:08 2023/8/14 type IEvent interface { - // Construct 初始化事件实例 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 12:04 2024/3/11 - Construct() error // SendEvent 发送事件(同步) // // Author : go_developer@163.com<白茶清欢> diff --git a/define/data.go b/define/data.go index 2da5538..baeb823 100644 --- a/define/data.go +++ b/define/data.go @@ -18,6 +18,7 @@ type EventData struct { Host string `json:"host"` // 触发事件host Timestamp int64 `json:"timestamp"` // 触发时间,纳秒级时间戳 SystemTimestamp int64 `json:"system_timestamp"` // 发送时的系统时间 + Key string `json:"key"` // 会基于当前值进行hash, 决定消息分区, 不指定则随机生成 Data any `json:"data"` // 发送的数据 } @@ -27,8 +28,10 @@ type EventData struct { // // Date : 15:56 2024/6/25 type SendResult struct { - Data *EventData `json:"data"` // 发送的数据 - IsSuccess bool `json:"is_success"` // 是否发送成功 - FailReason string `json:"fail_reason"` // 失败原因 - Extension map[string]any `json:"extension"` // 扩展数据 + Data *EventData `json:"data"` // 发送的数据 + PartitionNum int `json:"partition_num"` // 分区索引编号 + Topic string `json:"topic"` // 使用的真实topic + IsSuccess bool `json:"is_success"` // 是否发送成功 + FailReason string `json:"fail_reason"` // 失败原因 + Extension map[string]any `json:"extension"` // 扩展数据 } diff --git a/define/redis.go b/define/redis.go new file mode 100644 index 0000000..63755e4 --- /dev/null +++ b/define/redis.go @@ -0,0 +1,22 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-25 16:24 +package define + +const ( + DefaultPartitionNum = 1 +) + +// RedisEventPubSubConfig redis事件配置 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:25 2024/6/25 +type RedisEventPubSubConfig struct { + Topic string `json:"topic"` // topic key, 不指定随机生成 + PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 +} diff --git a/go.mod b/go.mod index 1a3001e..6e362bf 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,25 @@ module git.zhangdeman.cn/zhangdeman/event go 1.21 toolchain go1.22.1 + +require ( + git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 // indirect + git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 // indirect + git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect + git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd // indirect + git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect + git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e // indirect + github.com/BurntSushi/toml v1.4.0 // indirect + github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-ini/ini v1.67.0 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mozillazg/go-pinyin v0.20.0 // indirect + github.com/redis/go-redis/v9 v9.5.3 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/tidwall/gjson v1.17.1 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index e69de29..0a006c1 100644 --- a/go.sum +++ b/go.sum @@ -0,0 +1,40 @@ +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 h1:+o+BI5GGlwJelPLWL8ciDJqxw/G8dv+FU6OztGSnEjo= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 h1:I/wOsRpCSRkU9vo1u703slQsmK0wnNeZzsWQOGtIAG0= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211/go.mod h1:SrtvrQRdzt+8KfYzvosH++gWxo2ShPTzR1m3VQ6uX7U= +git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 h1:gUDlQMuJ4xNfP2Abl1Msmpa3fASLWYkNlqDFF/6GN0Y= +git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd h1:2Y37waOVCmVvx0Rp8VGEptE2/2JVMImtxB4dKKDk/3w= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd/go.mod h1:6+7whkCmb4sJDIfH3HxNuXRveaM0gCCNWd2uXZqNtIE= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e h1:Q973S6CcWr1ICZhFI1STFOJ+KUImCl2BaIXm6YppBqI= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e h1:+PeWa2QdYBWnL32CfAAgy0dlaRCVNmYZDH4q+9w7Gfg= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e/go.mod h1:US/pcq2vstE3iyxIHf53w8IeXKkZys7bj/ozLWkRYeE= +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= +github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= +github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/redis_pub_sub.go b/redis_pub_sub.go new file mode 100644 index 0000000..0956c35 --- /dev/null +++ b/redis_pub_sub.go @@ -0,0 +1,112 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-25 16:06 +package event + +import ( + "context" + "git.zhangdeman.cn/zhangdeman/event/abstract" + "git.zhangdeman.cn/zhangdeman/event/define" + "git.zhangdeman.cn/zhangdeman/wrapper" + "github.com/redis/go-redis/v9" +) + +var ( + RedisEventPubSubClient abstract.IEvent +) + +// InitRedisPubSubEvent 初始化redis事件驱动, 基于 pub / sub 指令 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:24 2024/6/25 +func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { + instance := &RedisEventPubSub{ + redisClient: redisClient, + pubSubConfig: pubSubConfig, + } + instance.SetRedisClient(redisClient, pubSubConfig) + RedisEventPubSubClient = instance +} + +// RedisEventPubSub 基于redis的事件驱动 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:07 2024/6/25 +type RedisEventPubSub struct { + redisClient *redis.Client // redis客户端 + pubSubConfig *define.RedisEventPubSubConfig // 事件配置 + +} + +func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) SendFailCallback(ctx context.Context, eventResult *define.SendResult) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) SendSuccessCallback(ctx context.Context, eventResult *define.SendResult, err error) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) ConsumeEvent() (<-chan *define.EventData, error) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) ConsumeFailCallback(eventData *define.EventData, err error) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) ConsumeSuccessCallback(eventData *define.EventData) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) Destroy() { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) DriverType() string { + //TODO implement me + panic("implement me") +} + +// SetRedisClient 设置redis客户端 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:30 2024/6/25 +func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { + if nil == pubSubConfig { + pubSubConfig = &define.RedisEventPubSubConfig{ + Topic: "", + PartitionNum: 0, + } + } + if len(pubSubConfig.Topic) == 0 { + pubSubConfig.Topic = "EVENT_TOPIC_" + wrapper.StringFromRandom(128, "").Md5().Value + } + if pubSubConfig.PartitionNum <= 0 { + pubSubConfig.PartitionNum = define.DefaultPartitionNum + } + r.redisClient = redisClient + r.pubSubConfig = pubSubConfig +}