// Package delay ... // // Description : event ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2025-08-22 18:55 package delay import ( "context" "errors" "fmt" "os" "strings" "sync" "time" "git.zhangdeman.cn/zhangdeman/network/util" "git.zhangdeman.cn/zhangdeman/queue/abstract" "git.zhangdeman.cn/zhangdeman/queue/define" "git.zhangdeman.cn/zhangdeman/redis" "git.zhangdeman.cn/zhangdeman/serialize" redisV9 "github.com/redis/go-redis/v9" "github.com/tidwall/gjson" ) // NewRedisConsumer 启动一个消费者实例 func NewRedisConsumer( ctx context.Context, queueName string, queueCnt int, redisFlag string, consumerHandler abstract.IConsumerHandler, ) abstract.IConsumer { if nil == consumerHandler { panic("consumer handler instance is nil") } if queueName == "" || redisFlag == "" { panic("init redis producer: queue name or redis flag is empty") } // 验证redis实例是否存在 if _, err := redis.Client.GetRealClientWithError(redisFlag); nil != err { panic(err.Error()) } if queueCnt <= 0 { queueCnt = 1 // 默认单队列 } if nil == ctx { ctx = context.Background() } c := &redisConsumer{ lock: &sync.RWMutex{}, hasStop: false, ctx: ctx, stopChan: make(chan bool), redisFlag: redisFlag, queueName: queueName, queueCnt: queueCnt, consumerHandler: consumerHandler, } return c } type redisConsumer struct { lock *sync.RWMutex hasStop bool stopChan chan bool // 停止请求的chan ctx context.Context // 请求处理上下文 redisFlag string // redis 标识, 必须是使用统一的 pkg/redis 进行管理的 queueName string // 队列名称,基础公共前缀,尾部后缀会自动根据 shard cnt 进行哈希 queueCnt int // 队列数量 consumerHandler abstract.IConsumerHandler // 消息处理实例 } // Start 启动消费者 func (r *redisConsumer) Start() error { var realQueueList []string for i := 0; i < r.queueCnt; i++ { realQueueList = append(realQueueList, fmt.Sprintf("%v_%v", r.queueName, i)) } hostname, _ := os.Hostname() serverInfo := &define.EventConsumerServerInfo{ SystemTimestamp: time.Now().UnixMilli(), Hostname: hostname, HostIp: util.IP.GetHostIP(), Queue: strings.Join(realQueueList, ","), } for { isFinish := false select { case <-r.ctx.Done(): // context被取消(对应程序系统默认行为,一般对应程序停止运行) if nil != r.lock { r.lock.Lock() r.hasStop = true r.lock.Unlock() } else { r.hasStop = true } isFinish = true case <-r.stopChan: // 对应开发者介入处理行为, 主进程不停止运行, 但是消费者要退出 isFinish = true default: // 60s 无数据进入下一轮等待, 或者 消费到数据正常处理 if r.hasStop { // 已停止 return nil } res := redis.Wrapper.BRPop(r.ctx, r.redisFlag, realQueueList, 60) if nil != res.Err { if errors.Is(res.Err, redisV9.Nil) { // 判断是否为未订阅到消息 continue } r.notifyMessageSubscribeFailure(serverInfo, res.Err) time.Sleep(time.Second * 10) continue } // 订阅成功, 判断是否无数据 if res.Result == "" { // 直接进入下一轮阻塞等待 continue } // 订阅到数据, 数据解析 // BRPOP 返回数据是数组 0: 读取到数据的队列名 1: 读取到的数据 // 前置redis操作统一序列化, 此处读取到的是一个序列化后的字符串 `["queue_name", "value"]` serverInfo.Queue = gjson.Get(res.Result, "0").String() var ( err error formatData define.EventData ) if err = serialize.JSON.UnmarshalWithNumberForString(gjson.Get(res.Result, "1").String(), &formatData); nil != err { // 回调数据解析失败的处理函数 r.notifyMessageUnmarshalFailure(serverInfo, res.Result, err) continue } // 处理消息(同步或异步) r.handler(serverInfo, &formatData) } if isFinish { break } } return nil } // notifyMessageSubscribeFailure 通知订阅失败 func (r *redisConsumer) notifyMessageSubscribeFailure(serverInfo *define.EventConsumerServerInfo, err error) { defer func() { // 防一手 panic, 订阅失败回调出现panic, 不回调panic处理方法 recover() }() r.consumerHandler.SubscribeFailureCallback(r.ctx, serverInfo, err) } // 回调消息返序列化失败的处理逻辑 func (r *redisConsumer) notifyMessageUnmarshalFailure(serverInfo *define.EventConsumerServerInfo, res string, err error) { defer func() { // 防一手 panic, 返回劣化失败回调出现panic, 不回调panic处理方法 recover() }() r.consumerHandler.UnmarshalFailureCallback(r.ctx, serverInfo, res, err) } // handler 处理订阅到的数据 func (r *redisConsumer) handler(serverInfo *define.EventConsumerServerInfo, data *define.EventData) { // 处理消息(同步或异步) if r.consumerHandler.Async(r.ctx, serverInfo, data) { // 异步处理 go func() { r.dealMessageData(serverInfo, data) }() } else { r.dealMessageData(serverInfo, data) } } // dealMessageData 处理消息数据 func (r *redisConsumer) dealMessageData(serverInfo *define.EventConsumerServerInfo, data *define.EventData) { defer func() { // 同步处理 if err := recover(); nil != err { // 出现panic, 回调 panic 处理 r.consumerHandler.PanicCallback(r.ctx, serverInfo, data, err) } }() // 加锁, 若果不需要加锁, 实现接口时函数体留空即可 if err := r.consumerHandler.Lock(r.ctx, serverInfo, data); nil != err { r.consumerHandler.LockFailureCallback(r.ctx, serverInfo, data, err) return } defer func() { // 释放锁 if err := r.consumerHandler.Unlock(r.ctx, serverInfo, data); nil != err { r.consumerHandler.UnlockFailureCallback(r.ctx, serverInfo, data, err) return } }() // 执行对消息的处理 if err := r.consumerHandler.MessageLogic(r.ctx, serverInfo, data); nil != err { r.consumerHandler.MessageLogicFailureCallback(r.ctx, serverInfo, data, err) } } // Stop 停止消费者 func (r *redisConsumer) Stop() { r.stopChan <- true }