redis驱动发送事件

This commit is contained in:
白茶清欢 2024-06-27 11:50:13 +08:00
parent 6f56ff3c4f
commit 414ad47e72
6 changed files with 95 additions and 19 deletions

View File

@ -24,14 +24,14 @@ type EventHandler func(eventData *define.EventData) (map[string]any, error)
// Author : go_developer@163.com<白茶清欢> // Author : go_developer@163.com<白茶清欢>
// //
// Date : 17:31 2024/6/26 // Date : 17:31 2024/6/26
type SendFailCallback func(ctx context.Context, eventResult *define.SendResult) type SendFailCallback func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error)
// SendSuccessCallback 发送事件成功的回调 // SendSuccessCallback 发送事件成功的回调
// //
// Author : go_developer@163.com<白茶清欢> // Author : go_developer@163.com<白茶清欢>
// //
// Date : 17:32 2024/6/26 // Date : 17:32 2024/6/26
type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult, err error) type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult)
// ConsumeFailCallbackHandler 时间处理成功回调 // ConsumeFailCallbackHandler 时间处理成功回调
// //
@ -65,7 +65,7 @@ type IEvent interface {
// Author : go_developer@163.com<白茶清欢> // Author : go_developer@163.com<白茶清欢>
// //
// Date : 12:04 2024/3/11 // Date : 12:04 2024/3/11
SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback) (*define.SendResult, error) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error)
// SendEventAsync 发送事件(异步) // SendEventAsync 发送事件(异步)
// //

View File

@ -29,7 +29,7 @@ type EventData struct {
// Date : 15:56 2024/6/25 // Date : 15:56 2024/6/25
type SendResult struct { type SendResult struct {
Data *EventData `json:"data"` // 发送的数据 Data *EventData `json:"data"` // 发送的数据
PartitionNum int `json:"partition_num"` // 分区索引编号 Partition int `json:"partition_num"` // 分区索引编号
Topic string `json:"topic"` // 使用的真实topic Topic string `json:"topic"` // 使用的真实topic
IsSuccess bool `json:"is_success"` // 是否发送成功 IsSuccess bool `json:"is_success"` // 是否发送成功
FailReason string `json:"fail_reason"` // 失败原因 FailReason string `json:"fail_reason"` // 失败原因

View File

@ -7,6 +7,8 @@
// Date : 2024-06-26 17:21 // Date : 2024-06-26 17:21
package define package define
import "context"
// DefaultSuccessCallbackHandler ... // DefaultSuccessCallbackHandler ...
// //
// Author : go_developer@163.com<白茶清欢> // Author : go_developer@163.com<白茶清欢>
@ -31,3 +33,18 @@ func DefaultFailCallbackHandler(eventData *EventData, handleResult map[string]an
// //
// Date : 18:11 2024/6/26 // Date : 18:11 2024/6/26
func DefaultPanicCallback(err any, eventData *EventData, handleResult map[string]any) {} func DefaultPanicCallback(err any, eventData *EventData, handleResult map[string]any) {}
// DefaultSendFailCallback ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:45 2024/6/27
func DefaultSendFailCallback(ctx context.Context, eventData *EventData, eventResult *SendResult, err error) {
}
// DefaultSendSuccessCallback ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:46 2024/6/27
func DefaultSendSuccessCallback(ctx context.Context, eventResult *SendResult) {}

9
go.mod
View File

@ -5,12 +5,16 @@ go 1.21
toolchain go1.22.1 toolchain go1.22.1
require ( require (
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 // indirect git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50
github.com/redis/go-redis/v9 v9.5.3
)
require (
git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 // indirect git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 // indirect
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd // indirect git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd // indirect
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e // indirect
github.com/BurntSushi/toml v1.4.0 // indirect github.com/BurntSushi/toml v1.4.0 // indirect
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
@ -18,7 +22,6 @@ require (
github.com/go-ini/ini v1.67.0 // indirect github.com/go-ini/ini v1.67.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mozillazg/go-pinyin v0.20.0 // indirect github.com/mozillazg/go-pinyin v0.20.0 // indirect
github.com/redis/go-redis/v9 v9.5.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tidwall/gjson v1.17.1 // indirect github.com/tidwall/gjson v1.17.1 // indirect
github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/match v1.1.1 // indirect

15
go.sum
View File

@ -8,14 +8,20 @@ git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd h1:2Y3
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd/go.mod h1:6+7whkCmb4sJDIfH3HxNuXRveaM0gCCNWd2uXZqNtIE= git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd/go.mod h1:6+7whkCmb4sJDIfH3HxNuXRveaM0gCCNWd2uXZqNtIE=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e h1:Q973S6CcWr1ICZhFI1STFOJ+KUImCl2BaIXm6YppBqI= git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e h1:Q973S6CcWr1ICZhFI1STFOJ+KUImCl2BaIXm6YppBqI=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI= git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI=
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e h1:+PeWa2QdYBWnL32CfAAgy0dlaRCVNmYZDH4q+9w7Gfg= git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50 h1:olo34i2Gq5gX7bYPv5TR4X5l5CrYFtu9UCElkYlmL2c=
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e/go.mod h1:US/pcq2vstE3iyxIHf53w8IeXKkZys7bj/ozLWkRYeE= git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50/go.mod h1:US/pcq2vstE3iyxIHf53w8IeXKkZys7bj/ozLWkRYeE=
github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ=
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
@ -24,10 +30,14 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ=
github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U=
github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
@ -35,6 +45,7 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -14,6 +14,7 @@ import (
"git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/abstract"
"git.zhangdeman.cn/zhangdeman/event/define" "git.zhangdeman.cn/zhangdeman/event/define"
"git.zhangdeman.cn/zhangdeman/wrapper"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"time" "time"
) )
@ -50,14 +51,58 @@ type RedisEventPubSub struct {
panicCallback abstract.PanicCallback // panic回调 panicCallback abstract.PanicCallback // panic回调
} }
func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) (*define.SendResult, error) { // SendEvent 发布时间
//TODO implement me //
panic("implement me") // Author : go_developer@163.com<白茶清欢>
//
// Date : 10:58 2024/6/27
func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) {
partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(r.pubSubConfig.PartitionNum))
realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, partition)
res := r.redisClient.Publish(ctx, realTopic, eventData)
if nil == res {
return nil, errors.New("can not get send event result")
}
return &define.SendResult{
Data: eventData,
Partition: partition,
Topic: realTopic,
IsSuccess: res.Err() == nil,
FailReason: wrapper.TernaryOperator.String(res.Err() == nil, "success", wrapper.String(res.Err().Error())).Value(),
Extension: nil,
}, res.Err()
} }
// SendEventAsync 异步发送事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:29 2024/6/27
func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) {
//TODO implement me go func() {
panic("implement me") if nil == sendFailCallback {
sendFailCallback = define.DefaultSendFailCallback
}
if nil == sendSuccessCallback {
sendSuccessCallback = define.DefaultSendSuccessCallback
}
var (
sendResult *define.SendResult
handlerErr error
)
defer func() {
if panicErr := recover(); nil != panicErr {
r.panicCallback(panicErr, eventData, map[string]any{
"send_result": sendResult,
})
}
}()
if sendResult, handlerErr = r.SendEvent(ctx, eventData); nil != handlerErr {
sendFailCallback(ctx, eventData, sendResult, handlerErr)
} else {
sendSuccessCallback(ctx, sendResult)
}
}()
} }
// GetConsumeEventChan 获取消息channel, 自行实现消费 // GetConsumeEventChan 获取消息channel, 自行实现消费