Files
queue/delay/redis_consumer.go

212 lines
6.1 KiB
Go

// Package delay ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-08-22 18:55
package delay
import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"
"git.zhangdeman.cn/zhangdeman/network/util"
"git.zhangdeman.cn/zhangdeman/queue/abstract"
"git.zhangdeman.cn/zhangdeman/queue/define"
"git.zhangdeman.cn/zhangdeman/redis"
"git.zhangdeman.cn/zhangdeman/serialize"
redisV9 "github.com/redis/go-redis/v9"
"github.com/tidwall/gjson"
)
// NewRedisConsumer 启动一个消费者实例
func NewRedisConsumer(
ctx context.Context,
queueName string, queueCnt int, redisFlag string,
consumerHandler abstract.IConsumerHandler,
) abstract.IConsumer {
if nil == consumerHandler {
panic("consumer handler instance is nil")
}
if queueName == "" || redisFlag == "" {
panic("init redis producer: queue name or redis flag is empty")
}
// 验证redis实例是否存在
if _, err := redis.Client.GetRealClientWithError(redisFlag); nil != err {
panic(err.Error())
}
if queueCnt <= 0 {
queueCnt = 1 // 默认单队列
}
if nil == ctx {
ctx = context.Background()
}
c := &redisConsumer{
lock: &sync.RWMutex{},
hasStop: false,
ctx: ctx,
stopChan: make(chan bool),
redisFlag: redisFlag,
queueName: queueName,
queueCnt: queueCnt,
consumerHandler: consumerHandler,
}
return c
}
type redisConsumer struct {
lock *sync.RWMutex
hasStop bool
stopChan chan bool // 停止请求的chan
ctx context.Context // 请求处理上下文
redisFlag string // redis 标识, 必须是使用统一的 pkg/redis 进行管理的
queueName string // 队列名称,基础公共前缀,尾部后缀会自动根据 shard cnt 进行哈希
queueCnt int // 队列数量
consumerHandler abstract.IConsumerHandler // 消息处理实例
}
// Start 启动消费者
func (r *redisConsumer) Start() error {
var realQueueList []string
for i := 0; i < r.queueCnt; i++ {
realQueueList = append(realQueueList, fmt.Sprintf("%v_%v", r.queueName, i))
}
hostname, _ := os.Hostname()
serverInfo := &define.EventConsumerServerInfo{
SystemTimestamp: time.Now().UnixMilli(),
Hostname: hostname,
HostIp: util.IP.GetHostIP(),
Queue: strings.Join(realQueueList, ","),
}
for {
isFinish := false
select {
case <-r.ctx.Done(): // context被取消(对应程序系统默认行为,一般对应程序停止运行)
if nil != r.lock {
r.lock.Lock()
r.hasStop = true
r.lock.Unlock()
} else {
r.hasStop = true
}
isFinish = true
case <-r.stopChan: // 对应开发者介入处理行为, 主进程不停止运行, 但是消费者要退出
isFinish = true
default: // 60s 无数据进入下一轮等待, 或者 消费到数据正常处理
if r.hasStop {
// 已停止
return nil
}
res := redis.Wrapper.BRPop(r.ctx, r.redisFlag, realQueueList, 60)
if nil != res.Err {
if errors.Is(res.Err, redisV9.Nil) {
// 判断是否为未订阅到消息
continue
}
r.notifyMessageSubscribeFailure(serverInfo, res.Err)
time.Sleep(time.Second * 10)
continue
}
// 订阅成功, 判断是否无数据
if res.Result == "" {
// 直接进入下一轮阻塞等待
continue
}
// 订阅到数据, 数据解析
// BRPOP 返回数据是数组 0: 读取到数据的队列名 1: 读取到的数据
// 前置redis操作统一序列化, 此处读取到的是一个序列化后的字符串 `["queue_name", "value"]`
serverInfo.Queue = gjson.Get(res.Result, "0").String()
var (
err error
formatData define.EventData
)
if err = serialize.JSON.UnmarshalWithNumberForString(gjson.Get(res.Result, "1").String(), &formatData); nil != err {
// 回调数据解析失败的处理函数
r.notifyMessageUnmarshalFailure(serverInfo, res.Result, err)
continue
}
// 处理消息(同步或异步)
r.handler(serverInfo, &formatData)
}
if isFinish {
break
}
}
return nil
}
// notifyMessageSubscribeFailure 通知订阅失败
func (r *redisConsumer) notifyMessageSubscribeFailure(serverInfo *define.EventConsumerServerInfo, err error) {
defer func() {
// 防一手 panic, 订阅失败回调出现panic, 不回调panic处理方法
recover()
}()
r.consumerHandler.SubscribeFailureCallback(r.ctx, serverInfo, err)
}
// 回调消息返序列化失败的处理逻辑
func (r *redisConsumer) notifyMessageUnmarshalFailure(serverInfo *define.EventConsumerServerInfo, res string, err error) {
defer func() {
// 防一手 panic, 返回劣化失败回调出现panic, 不回调panic处理方法
recover()
}()
r.consumerHandler.UnmarshalFailureCallback(r.ctx, serverInfo, res, err)
}
// handler 处理订阅到的数据
func (r *redisConsumer) handler(serverInfo *define.EventConsumerServerInfo, data *define.EventData) {
// 处理消息(同步或异步)
if r.consumerHandler.Async(r.ctx, serverInfo, data) {
// 异步处理
go func() {
r.dealMessageData(serverInfo, data)
}()
} else {
r.dealMessageData(serverInfo, data)
}
}
// dealMessageData 处理消息数据
func (r *redisConsumer) dealMessageData(serverInfo *define.EventConsumerServerInfo, data *define.EventData) {
defer func() {
// 同步处理
if err := recover(); nil != err {
// 出现panic, 回调 panic 处理
r.consumerHandler.PanicCallback(r.ctx, serverInfo, data, err)
}
}()
// 加锁, 若果不需要加锁, 实现接口时函数体留空即可
if err := r.consumerHandler.Lock(r.ctx, serverInfo, data); nil != err {
r.consumerHandler.LockFailureCallback(r.ctx, serverInfo, data, err)
return
}
defer func() {
// 释放锁
if err := r.consumerHandler.Unlock(r.ctx, serverInfo, data); nil != err {
r.consumerHandler.UnlockFailureCallback(r.ctx, serverInfo, data, err)
return
}
}()
// 执行对消息的处理
if err := r.consumerHandler.MessageLogic(r.ctx, serverInfo, data); nil != err {
r.consumerHandler.MessageLogicFailureCallback(r.ctx, serverInfo, data, err)
}
}
// Stop 停止消费者
func (r *redisConsumer) Stop() {
r.stopChan <- true
}