From 6f56ff3c4f8749c5b249896336e9850de9852198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 26 Jun 2024 18:15:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=8E=A5=E5=8F=A3=E6=8A=BD?= =?UTF-8?q?=E8=B1=A1=20+=20redis=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 72 +++++++++++++++++++++----------- define/handler.go | 33 +++++++++++++++ redis_pub_sub.go | 101 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 159 insertions(+), 47 deletions(-) create mode 100644 define/handler.go diff --git a/abstract/IEvent.go b/abstract/IEvent.go index 54029fe..3e7997f 100644 --- a/abstract/IEvent.go +++ b/abstract/IEvent.go @@ -12,6 +12,48 @@ import ( "git.zhangdeman.cn/zhangdeman/event/define" ) +// EventHandler 事件数据处理函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:08 2024/6/26 +type EventHandler func(eventData *define.EventData) (map[string]any, error) + +// SendFailCallback 发送事件失败的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:31 2024/6/26 +type SendFailCallback func(ctx context.Context, eventResult *define.SendResult) + +// SendSuccessCallback 发送事件成功的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:32 2024/6/26 +type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult, err error) + +// ConsumeFailCallbackHandler 时间处理成功回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:18 2024/6/26 +type ConsumeFailCallbackHandler func(eventData *define.EventData, handleResult map[string]any, err error) + +// ConsumeSuccessCallback 时间处理失败回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:20 2024/6/26 +type ConsumeSuccessCallback func(eventData *define.EventData, handleResult map[string]any) + +// PanicCallback panic回调, 根据不同异常类型, eventData / handleResult 均可能为nil +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:07 2024/6/26 +type PanicCallback func(err any, eventData *define.EventData, handleResult map[string]any) + // IEvent 事件接口定义 // // Author : go_developer@163.com<白茶清欢> @@ -23,44 +65,26 @@ type IEvent interface { // Author : go_developer@163.com<白茶清欢> // // Date : 12:04 2024/3/11 - SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) + SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback) (*define.SendResult, error) // SendEventAsync 发送事件(异步) // // Author : go_developer@163.com<白茶清欢> // // Date : 15:58 2024/6/25 - SendEventAsync(ctx context.Context, eventData *define.EventData) - // SendFailCallback 发送失败的回调方法 + SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback) + // GetConsumeEventChan 或去消息消费的channel, 自行实现消费 // // Author : go_developer@163.com<白茶清欢> // - // Date : 12:08 2024/3/11 - SendFailCallback(ctx context.Context, eventResult *define.SendResult) - // SendSuccessCallback 发送成功的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 12:08 2024/3/11 - SendSuccessCallback(ctx context.Context, eventResult *define.SendResult, err error) + // Date : 17:11 2024/6/26 + GetConsumeEventChan() (<-chan *define.EventData, error) // ConsumeEvent 消费事件 // // Author : go_developer@163.com<白茶清欢> // // Date : 12:05 2024/3/11 - ConsumeEvent() (<-chan *define.EventData, error) - // ConsumeFailCallback 消费失败的回调, eventData 可能为 NIL - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 12:09 2024/3/11 - ConsumeFailCallback(eventData *define.EventData, err error) - // ConsumeSuccessCallback 消费成功的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 12:10 2024/3/11 - ConsumeSuccessCallback(eventData *define.EventData) + ConsumeEvent(handler EventHandler, successCallback ConsumeSuccessCallback, failureCallback ConsumeFailCallbackHandler) error // Destroy 事件实例销毁时, 执行的方法 // // Author : go_developer@163.com<白茶清欢> diff --git a/define/handler.go b/define/handler.go new file mode 100644 index 0000000..c6722ca --- /dev/null +++ b/define/handler.go @@ -0,0 +1,33 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-26 17:21 +package define + +// DefaultSuccessCallbackHandler ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:21 2024/6/26 +func DefaultSuccessCallbackHandler(eventData *EventData, handleResult map[string]any) { + +} + +// DefaultFailCallbackHandler ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:11 2024/6/26 +func DefaultFailCallbackHandler(eventData *EventData, handleResult map[string]any, err error) { + +} + +// DefaultPanicCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:11 2024/6/26 +func DefaultPanicCallback(err any, eventData *EventData, handleResult map[string]any) {} diff --git a/redis_pub_sub.go b/redis_pub_sub.go index e6eefd2..ccb3466 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -9,6 +9,8 @@ package event import ( "context" + "errors" + "fmt" "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" @@ -40,45 +42,94 @@ func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisE // // Date : 16:07 2024/6/25 type RedisEventPubSub struct { - redisClient *redis.Client // redis客户端 - pubSubConfig *define.RedisEventPubSubConfig // 事件配置 - messageChan chan *define.EventData // 消息队列 - stopConsumer chan bool // 停止消费者 + redisClient *redis.Client // redis客户端 + pubSubConfig *define.RedisEventPubSubConfig // 事件配置 + messageChan chan *define.EventData // 消息队列 + stopConsumer chan bool // 停止消费者 + isStop bool // 是否已停止 + panicCallback abstract.PanicCallback // panic回调 } -func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { +func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) (*define.SendResult, error) { //TODO implement me panic("implement me") } -func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData) { +func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { //TODO implement me panic("implement me") } -func (r *RedisEventPubSub) SendFailCallback(ctx context.Context, eventResult *define.SendResult) { - //TODO implement me - panic("implement me") +// GetConsumeEventChan 获取消息channel, 自行实现消费 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:13 2024/6/26 +func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, error) { + if r.isStop { + return nil, errors.New("event instance has stop") + } + return r.messageChan, nil } -func (r *RedisEventPubSub) SendSuccessCallback(ctx context.Context, eventResult *define.SendResult, err error) { - //TODO implement me - panic("implement me") +// SetPanicCallback 出现任何panic的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:02 2024/6/26 +func (r *RedisEventPubSub) SetPanicCallback(panicCallback abstract.PanicCallback) { + if nil == panicCallback { + panicCallback = define.DefaultPanicCallback + } + r.panicCallback = panicCallback } -func (r *RedisEventPubSub) ConsumeEvent() (<-chan *define.EventData, error) { - //TODO implement me - panic("implement me") -} +// ConsumeEvent 获取数据消费实例 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:06 2024/6/26 +func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { + if nil == handler { + return errors.New("handler is nil") + } + var ( + messageChan <-chan *define.EventData + err error + ) -func (r *RedisEventPubSub) ConsumeFailCallback(eventData *define.EventData, err error) { - //TODO implement me - panic("implement me") -} + if messageChan, err = r.GetConsumeEventChan(); nil != err { + return err + } + if nil == failureCallback { + failureCallback = define.DefaultFailCallbackHandler + } + if nil == successCallback { + successCallback = define.DefaultSuccessCallbackHandler + } + go func() { + defer func() { + if panicErr := recover(); nil != panicErr { + fmt.Println(r) + } + }() + for !r.isStop || (r.isStop && len(messageChan) == 0) { + select { + case eventData := <-messageChan: + handlerResult, handlerErr := handler(eventData) + if nil != handlerErr { + // 失败回调 + failureCallback(eventData, handlerResult, handlerErr) + break + } else { + // 成功回调 + successCallback(eventData, handlerResult) + } + } + } + }() -func (r *RedisEventPubSub) ConsumeSuccessCallback(eventData *define.EventData) { - //TODO implement me - panic("implement me") + return nil } // Destroy 销毁事件实例 @@ -87,6 +138,10 @@ func (r *RedisEventPubSub) ConsumeSuccessCallback(eventData *define.EventData) { // // Date : 15:42 2024/6/26 func (r *RedisEventPubSub) Destroy() { + if r.isStop { + // 已停止 + return + } r.stopConsumer <- true // 停止消费者 messageChan := make(chan bool, 1) go func() {