// Package event ... // // Description : event ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2024-06-25 16:06 package event import ( "context" "errors" "fmt" "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" "git.zhangdeman.cn/zhangdeman/serialize" "git.zhangdeman.cn/zhangdeman/wrapper" "github.com/redis/go-redis/v9" "time" ) var ( RedisEventPubSubClient abstract.IEvent ) // InitRedisPubSubEvent 初始化redis事件驱动, 基于 pub / sub 指令 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:24 2024/6/25 func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { instance := &RedisEventPubSub{ base: &base{ panicCallback: define.DefaultPanicCallback, parseFailCallback: define.DefaultParseFailCallbackFunc, }, redisClient: redisClient, pubSubConfig: pubSubConfig, } instance.SetRedisClient(redisClient, pubSubConfig) instance.StartConsumer() // 启动消费者 RedisEventPubSubClient = instance } // RedisEventPubSub 基于redis的事件驱动 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:07 2024/6/25 type RedisEventPubSub struct { *base redisClient *redis.Client // redis客户端 pubSubConfig *define.RedisEventPubSubConfig // 事件配置 messageChan chan *define.EventData // 消息队列 stopConsumer chan bool // 停止消费者 isStop bool // 是否已停止 parseFailCallback abstract.EventParseFailCallback // 数据解析失败回调 } // SendEvent 发布时间 // // Author : go_developer@163.com<白茶清欢> // // Date : 10:58 2024/6/27 func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(r.pubSubConfig.PartitionNum)) realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, partition) res := r.redisClient.Publish(ctx, realTopic, eventData) if nil == res { return nil, errors.New("can not get send event result") } return &define.SendResult{ Data: eventData, Partition: partition, Topic: realTopic, IsSuccess: res.Err() == nil, FailReason: wrapper.TernaryOperator.String(res.Err() == nil, "success", wrapper.String(res.Err().Error())).Value(), Extension: nil, }, res.Err() } // SendEventAsync 异步发送事件 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:29 2024/6/27 func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { go func() { if nil == sendFailCallback { sendFailCallback = define.DefaultSendFailCallback } if nil == sendSuccessCallback { sendSuccessCallback = define.DefaultSendSuccessCallback } var ( sendResult *define.SendResult handlerErr error ) defer func() { if panicErr := recover(); nil != panicErr { r.panicCallback(panicErr, eventData, map[string]any{ "send_result": sendResult, }) } }() if sendResult, handlerErr = r.SendEvent(ctx, eventData); nil != handlerErr { sendFailCallback(ctx, eventData, sendResult, handlerErr) } else { sendSuccessCallback(ctx, sendResult) } }() } // GetConsumeEventChan 获取消息channel, 自行实现消费 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:13 2024/6/26 func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, error) { if r.isStop { return nil, errors.New("event instance has stop") } return r.messageChan, nil } // ConsumeEvent 获取数据消费实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:06 2024/6/26 func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { if nil == handler { return errors.New("handler is nil") } var ( messageChan <-chan *define.EventData err error ) if messageChan, err = r.GetConsumeEventChan(); nil != err { return err } if nil == failureCallback { failureCallback = define.DefaultFailCallbackHandler } if nil == successCallback { successCallback = define.DefaultSuccessCallbackHandler } for eventData := range messageChan { handlerResult, handleErr := handler(eventData) if nil != handleErr { failureCallback(eventData, handlerResult, err) } else { successCallback(eventData, handlerResult) } } return nil } // StartConsumer 启动消费者 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:36 2024/6/27 func (r *RedisEventPubSub) StartConsumer() { for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ { go func(realPartition int) { defer func() { if panicErr := recover(); nil != panicErr { r.panicCallback(panicErr, nil, nil) } }() // 启动消费者 realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, realPartition) messageChannel := r.redisClient.Subscribe(context.Background(), realTopic).Channel() for message := range messageChannel { var ( eventData define.EventData err error ) if err = serialize.JSON.UnmarshalWithNumber([]byte(message.Payload), &eventData); nil != err { // TODO : 事件数据解析失败的回调 continue } r.messageChan <- &eventData } }(partition) } } // Destroy 销毁事件实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 15:42 2024/6/26 func (r *RedisEventPubSub) Destroy() { if r.isStop { // 已停止 return } r.stopConsumer <- true // 停止消费者 messageChan := make(chan bool, 1) go func() { for { if len(r.messageChan) == 0 { messageChan <- true return } } }() select { case <-time.After(time.Millisecond * time.Duration(r.pubSubConfig.CloseMaxWaitTime)): // 定时器到期 return case <-messageChan: // 没有待消费数据 return } } func (r *RedisEventPubSub) DriverType() string { return consts.EventDriverRedis } // SetRedisClient 设置redis客户端 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:30 2024/6/25 func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { if nil == pubSubConfig { pubSubConfig = &define.RedisEventPubSubConfig{ Topic: "", PartitionNum: 0, MessageBufferSize: 0, } } if len(pubSubConfig.Topic) == 0 { pubSubConfig.Topic = define.DefaultRedisTopic } if pubSubConfig.PartitionNum <= 0 { pubSubConfig.PartitionNum = define.DefaultRedisPartitionNum } if pubSubConfig.MessageBufferSize <= 0 { r.pubSubConfig.MessageBufferSize = define.DefaultRedisMessageBufferSize } if pubSubConfig.CloseMaxWaitTime <= 0 { r.pubSubConfig.CloseMaxWaitTime = define.DefaultRedisCloseMaxWaitTime } r.redisClient = redisClient r.pubSubConfig = pubSubConfig r.stopConsumer = make(chan bool, 1) r.messageChan = make(chan *define.EventData, r.pubSubConfig.MessageBufferSize) }