// Package event ... // // Description : event ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2024-06-25 16:06 package event import ( "context" "errors" "fmt" "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" "github.com/redis/go-redis/v9" "time" ) var ( RedisEventPubSubClient abstract.IEvent ) // InitRedisPubSubEvent 初始化redis事件驱动, 基于 pub / sub 指令 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:24 2024/6/25 func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { instance := &RedisEventPubSub{ redisClient: redisClient, pubSubConfig: pubSubConfig, } instance.SetRedisClient(redisClient, pubSubConfig) RedisEventPubSubClient = instance } // RedisEventPubSub 基于redis的事件驱动 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:07 2024/6/25 type RedisEventPubSub struct { redisClient *redis.Client // redis客户端 pubSubConfig *define.RedisEventPubSubConfig // 事件配置 messageChan chan *define.EventData // 消息队列 stopConsumer chan bool // 停止消费者 isStop bool // 是否已停止 panicCallback abstract.PanicCallback // panic回调 } func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) (*define.SendResult, error) { //TODO implement me panic("implement me") } func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { //TODO implement me panic("implement me") } // GetConsumeEventChan 获取消息channel, 自行实现消费 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:13 2024/6/26 func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, error) { if r.isStop { return nil, errors.New("event instance has stop") } return r.messageChan, nil } // SetPanicCallback 出现任何panic的回调 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:02 2024/6/26 func (r *RedisEventPubSub) SetPanicCallback(panicCallback abstract.PanicCallback) { if nil == panicCallback { panicCallback = define.DefaultPanicCallback } r.panicCallback = panicCallback } // ConsumeEvent 获取数据消费实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:06 2024/6/26 func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { if nil == handler { return errors.New("handler is nil") } var ( messageChan <-chan *define.EventData err error ) if messageChan, err = r.GetConsumeEventChan(); nil != err { return err } if nil == failureCallback { failureCallback = define.DefaultFailCallbackHandler } if nil == successCallback { successCallback = define.DefaultSuccessCallbackHandler } go func() { defer func() { if panicErr := recover(); nil != panicErr { fmt.Println(r) } }() for !r.isStop || (r.isStop && len(messageChan) == 0) { select { case eventData := <-messageChan: handlerResult, handlerErr := handler(eventData) if nil != handlerErr { // 失败回调 failureCallback(eventData, handlerResult, handlerErr) break } else { // 成功回调 successCallback(eventData, handlerResult) } } } }() return nil } // Destroy 销毁事件实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 15:42 2024/6/26 func (r *RedisEventPubSub) Destroy() { if r.isStop { // 已停止 return } r.stopConsumer <- true // 停止消费者 messageChan := make(chan bool, 1) go func() { for { if len(r.messageChan) == 0 { messageChan <- true return } } }() select { case <-time.After(time.Millisecond * time.Duration(r.pubSubConfig.CloseMaxWaitTime)): // 定时器到期 return case <-messageChan: // 没有待消费数据 return } } func (r *RedisEventPubSub) DriverType() string { return consts.EventDriverRedis } // SetRedisClient 设置redis客户端 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:30 2024/6/25 func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { if nil == pubSubConfig { pubSubConfig = &define.RedisEventPubSubConfig{ Topic: "", PartitionNum: 0, MessageBufferSize: 0, } } if len(pubSubConfig.Topic) == 0 { pubSubConfig.Topic = define.DefaultRedisTopic } if pubSubConfig.PartitionNum <= 0 { pubSubConfig.PartitionNum = define.DefaultRedisPartitionNum } if pubSubConfig.MessageBufferSize <= 0 { r.pubSubConfig.MessageBufferSize = define.DefaultRedisMessageBufferSize } if pubSubConfig.CloseMaxWaitTime <= 0 { r.pubSubConfig.CloseMaxWaitTime = define.DefaultRedisCloseMaxWaitTime } r.redisClient = redisClient r.pubSubConfig = pubSubConfig r.stopConsumer = make(chan bool, 1) r.messageChan = make(chan *define.EventData, r.pubSubConfig.MessageBufferSize) }