feat: 重新设计 delay consumer producer

This commit is contained in:
2026-04-13 17:27:56 +08:00
parent ad44a84718
commit c14e5f84dc
6 changed files with 475 additions and 164 deletions

211
delay/redis_consumer.go Normal file
View File

@@ -0,0 +1,211 @@
// Package delay ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-08-22 18:55
package delay
import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"
"git.zhangdeman.cn/zhangdeman/network/util"
"git.zhangdeman.cn/zhangdeman/queue/abstract"
"git.zhangdeman.cn/zhangdeman/queue/define"
"git.zhangdeman.cn/zhangdeman/redis"
"git.zhangdeman.cn/zhangdeman/serialize"
redisV9 "github.com/redis/go-redis/v9"
"github.com/tidwall/gjson"
)
// NewRedisConsumer 启动一个消费者实例
func NewRedisConsumer(
ctx context.Context,
queueName string, queueCnt int, redisFlag string,
consumerHandler abstract.IConsumerHandler,
) abstract.IConsumer {
if nil == consumerHandler {
panic("consumer handler instance is nil")
}
if queueName == "" || redisFlag == "" {
panic("init redis producer: queue name or redis flag is empty")
}
// 验证redis实例是否存在
if _, err := redis.Client.GetRealClientWithError(redisFlag); nil != err {
panic(err.Error())
}
if queueCnt <= 0 {
queueCnt = 1 // 默认单队列
}
if nil == ctx {
ctx = context.Background()
}
c := &redisConsumer{
lock: &sync.RWMutex{},
hasStop: false,
ctx: ctx,
stopChan: make(chan bool),
redisFlag: redisFlag,
queueName: queueName,
queueCnt: queueCnt,
consumerHandler: consumerHandler,
}
return c
}
type redisConsumer struct {
lock *sync.RWMutex
hasStop bool
stopChan chan bool // 停止请求的chan
ctx context.Context // 请求处理上下文
redisFlag string // redis 标识, 必须是使用统一的 pkg/redis 进行管理的
queueName string // 队列名称,基础公共前缀,尾部后缀会自动根据 shard cnt 进行哈希
queueCnt int // 队列数量
consumerHandler abstract.IConsumerHandler // 消息处理实例
}
// Start 启动消费者
func (r *redisConsumer) Start() error {
var realQueueList []string
for i := 0; i < r.queueCnt; i++ {
realQueueList = append(realQueueList, fmt.Sprintf("%v_%v", r.queueName, i))
}
hostname, _ := os.Hostname()
serverInfo := &define.EventConsumerServerInfo{
SystemTimestamp: time.Now().UnixMilli(),
Hostname: hostname,
HostIp: util.IP.GetHostIP(),
Queue: strings.Join(realQueueList, ","),
}
for {
isFinish := false
select {
case <-r.ctx.Done(): // context被取消(对应程序系统默认行为,一般对应程序停止运行)
if nil != r.lock {
r.lock.Lock()
r.hasStop = true
r.lock.Unlock()
} else {
r.hasStop = true
}
isFinish = true
case <-r.stopChan: // 对应开发者介入处理行为, 主进程不停止运行, 但是消费者要退出
isFinish = true
default: // 60s 无数据进入下一轮等待, 或者 消费到数据正常处理
if r.hasStop {
// 已停止
return nil
}
res := redis.Wrapper.BRPop(r.ctx, r.redisFlag, realQueueList, 60)
if nil != res.Err {
if errors.Is(res.Err, redisV9.Nil) {
// 判断是否为未订阅到消息
continue
}
r.notifyMessageSubscribeFailure(serverInfo, res.Err)
time.Sleep(time.Second * 10)
continue
}
// 订阅成功, 判断是否无数据
if res.Result == "" {
// 直接进入下一轮阻塞等待
continue
}
// 订阅到数据, 数据解析
// BRPOP 返回数据是数组 0: 读取到数据的队列名 1: 读取到的数据
// 前置redis操作统一序列化, 此处读取到的是一个序列化后的字符串 `["queue_name", "value"]`
serverInfo.Queue = gjson.Get(res.Result, "0").String()
var (
err error
formatData define.EventData
)
if err = serialize.JSON.UnmarshalWithNumberForString(gjson.Get(res.Result, "1").String(), &formatData); nil != err {
// 回调数据解析失败的处理函数
r.notifyMessageUnmarshalFailure(serverInfo, res.Result, err)
continue
}
// 处理消息(同步或异步)
r.handler(serverInfo, &formatData)
}
if isFinish {
break
}
}
return nil
}
// notifyMessageSubscribeFailure 通知订阅失败
func (r *redisConsumer) notifyMessageSubscribeFailure(serverInfo *define.EventConsumerServerInfo, err error) {
defer func() {
// 防一手 panic, 订阅失败回调出现panic, 不回调panic处理方法
recover()
}()
r.consumerHandler.SubscribeFailureCallback(r.ctx, serverInfo, err)
}
// 回调消息返序列化失败的处理逻辑
func (r *redisConsumer) notifyMessageUnmarshalFailure(serverInfo *define.EventConsumerServerInfo, res string, err error) {
defer func() {
// 防一手 panic, 返回劣化失败回调出现panic, 不回调panic处理方法
recover()
}()
r.consumerHandler.UnmarshalFailureCallback(r.ctx, serverInfo, res, err)
}
// handler 处理订阅到的数据
func (r *redisConsumer) handler(serverInfo *define.EventConsumerServerInfo, data *define.EventData) {
// 处理消息(同步或异步)
if r.consumerHandler.Async(r.ctx, serverInfo, data) {
// 异步处理
go func() {
r.dealMessageData(serverInfo, data)
}()
} else {
r.dealMessageData(serverInfo, data)
}
}
// dealMessageData 处理消息数据
func (r *redisConsumer) dealMessageData(serverInfo *define.EventConsumerServerInfo, data *define.EventData) {
defer func() {
// 同步处理
if err := recover(); nil != err {
// 出现panic, 回调 panic 处理
r.consumerHandler.PanicCallback(r.ctx, serverInfo, data, err)
}
}()
// 加锁, 若果不需要加锁, 实现接口时函数体留空即可
if err := r.consumerHandler.Lock(r.ctx, serverInfo, data); nil != err {
r.consumerHandler.LockFailureCallback(r.ctx, serverInfo, data, err)
return
}
defer func() {
// 释放锁
if err := r.consumerHandler.Unlock(r.ctx, serverInfo, data); nil != err {
r.consumerHandler.UnlockFailureCallback(r.ctx, serverInfo, data, err)
return
}
}()
// 执行对消息的处理
if err := r.consumerHandler.MessageLogic(r.ctx, serverInfo, data); nil != err {
r.consumerHandler.MessageLogicFailureCallback(r.ctx, serverInfo, data, err)
}
}
// Stop 停止消费者
func (r *redisConsumer) Stop() {
r.stopChan <- true
}

129
delay/redis_delay.go Normal file
View File

@@ -0,0 +1,129 @@
// Package delay ...
//
// Description : 基于redis实现的延迟事件队列
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-08-29 09:11
package delay
import (
"context"
"time"
"git.zhangdeman.cn/zhangdeman/queue/abstract"
"git.zhangdeman.cn/zhangdeman/queue/define"
redisPkg "git.zhangdeman.cn/zhangdeman/redis"
redisPkgDefine "git.zhangdeman.cn/zhangdeman/redis/define"
"git.zhangdeman.cn/zhangdeman/serialize"
"github.com/tidwall/gjson"
)
// NewRedisDelayEvent 获取延迟队列实例
// 参数说明:
// - redisFlag: 使用那个redis实例
// - queueName: 延迟队列名称
// - pullTimeInterval: 延迟队列扫描的时间间隔, 单位秒
// - distributeProducer: 消息生产者示例, 扫描到的数据通过此实例向二级任务队列分发
func NewRedisDelayEvent(redisFlag string, queueName string, pullTimeInterval int64, distributeProducer abstract.IProducer) abstract.IDelayQueue {
if queueName == "" || redisFlag == "" {
panic("init redis delay event: queue name or redis flag is empty")
}
if nil == distributeProducer {
panic("distributeProducer is nil")
}
// 验证redis实例是否存在
if _, err := redisPkg.Client.GetRealClientWithError(redisFlag); nil != err {
panic(err.Error())
}
if pullTimeInterval <= 0 {
pullTimeInterval = 60
}
return &redisDelayEvent{
stopChan: make(chan bool, 1),
redisFlag: redisFlag,
delayQueueName: queueName,
pullTimeInterval: pullTimeInterval,
distributeProducer: distributeProducer,
}
}
type redisDelayEvent struct {
stopChan chan bool // 停止请求的chan
redisFlag string // 使用的redis实例
delayQueueName string // 延迟队列名称
pullTimeInterval int64 // 延迟队列多久拉取一次数据
distributeProducer abstract.IProducer // 事件分发的生产者
}
// Send 生成一条延时事件
// 参数说明:
// - delayTime: 延时时长, 单位: s
// - data: 要发送的数据
func (rde redisDelayEvent) Send(ctx context.Context, delayTime int64, data *define.EventData) *redisPkgDefine.RedisResult {
if delayTime <= 0 {
// 默认延迟 1min
delayTime = 60
}
if delayTime >= time.Now().Unix() {
// 指定具体的延迟时间, 重置延时时间为相对时间
delayTime = delayTime - time.Now().Unix()
}
return redisPkg.Wrapper.ZAdd(ctx, rde.redisFlag, rde.delayQueueName, time.Now().Unix()+delayTime, serialize.JSON.MarshalForStringIgnoreError(data))
}
// Distribute 到期事件事件分发
func (rde redisDelayEvent) Distribute(ctx context.Context) {
for {
hasFinish := false
select {
case <-ctx.Done():
// 收到上下文已完成事件, 退出
hasFinish = true
case <-rde.stopChan:
// 程序调用stop方法, 退出
hasFinish = true
default:
now := time.Now()
// 计时器, 固定时长扫描一次
// ZRangeAndRemByScore 此方法内部已经自动包装了删除逻辑
eventDataListRes := redisPkg.Wrapper.ZRangeAndRemByScore(ctx, rde.redisFlag, rde.delayQueueName, 0, now.Unix())
// ZRange 返回数据时二维数组 [["数据部分", "设置的数据score"]]
resList := gjson.Parse(eventDataListRes.Result).Array()
if len(resList) > 0 {
for _, itemData := range resList {
var (
resFormat define.EventData
err error
)
if err = serialize.JSON.UnmarshalWithNumberForString(itemData.Get("0").String(), &resFormat); nil != err {
// 数据解析失败, 触发失败回调
rde.distributeProducer.GetProducerHandler().FailureCallback(ctx, &define.EventData{
Type: "UNMARSHAL_FAILURE",
Data: itemData.Get("0").String(),
}, &define.EventProduceResult{
Queue: "",
Success: false,
Err: err,
Cost: 0,
})
continue
} else {
// 生产数据, 向耳机队列分发任务
rde.distributeProducer.Sync(ctx, &resFormat)
}
}
}
// 间隔指定时间拉取一次
time.Sleep(time.Second * time.Duration(rde.pullTimeInterval))
}
if hasFinish {
break
}
}
}
func (rde redisDelayEvent) Stop() {
rde.stopChan <- true
}

View File

@@ -1,103 +0,0 @@
// Package delay ...
//
// Description : delay ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2022-07-07 10:32
package delay
import (
"context"
"errors"
"time"
"git.zhangdeman.cn/zhangdeman/serialize"
"github.com/redis/go-redis/v9"
)
// NewRedisConsumer redis延迟队列消费者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:10 2022/7/8
func NewRedisConsumer(redisInstance *redis.Client, cfg *ConsumerConfig) IConsumer {
return &redisConsumer{
redisInstance: redisInstance,
cfg: cfg,
}
}
type redisConsumer struct {
redisInstance *redis.Client
cfg *ConsumerConfig
}
func (r *redisConsumer) Consume(ctx context.Context) ([]*ProduceData, error) {
if nil == ctx {
ctx = context.Background()
}
zRangeResult := r.redisInstance.ZRange(ctx, r.cfg.QueueName, 0, time.Now().UnixNano()/1e6)
if err := zRangeResult.Err(); nil != err {
return make([]*ProduceData, 0), err
}
// 格式化数据
var (
result []*ProduceData
)
valueList := zRangeResult.Val()
if len(valueList) == 0 {
return result, nil
}
for _, item := range valueList {
d := &ProduceData{}
serialize.JSON.UnmarshalWithNumberForStringIgnoreError(item, d)
result = append(result, d)
}
return result, nil
}
func (r *redisConsumer) ConsumeWithHandler(ctx context.Context, handler IHandler) error {
if nil == handler {
return errors.New("handler instance is nil")
}
var (
msgList []*ProduceData
err error
)
if msgList, err = r.Consume(ctx); nil != err {
return err
}
// 未订阅到消息
if len(msgList) == 0 {
return nil
}
return handler.Handle(msgList)
/* wg := &sync.WaitGroup{}
wg.Add(len(msgList))
for _, item := range msgList {
go func(msgData *ZRangeData) {
defer wg.Done()
hashValue, exist := msgData.Data.Data[r.cfg.HashKey]
if !exist || hashValue == nil {
hashValue = msgData.Data.MsgID
}
shard := util.Hash.GetHashIDMod(hashValue, r.cfg.SonQueueCnt)
realQueue := fmt.Sprintf(r.cfg.QueueName+"_%d", shard)
r.redisInstance.LPush()
}(item)
}
wg.Wait()
return nil*/
}
type redisConsumerMsgHandler struct {
}
func (r redisConsumerMsgHandler) Handle(queData []*ZRangeData) error {
panic("implement me")
}

View File

@@ -1,78 +1,116 @@
// Package delay ...
// Package event ...
//
// Description : delay ...
// Description : 基于redis实现事件生产 + 消费
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2022-07-06 17:59
// Date : 2025-08-22 16:34
package delay
import (
"context"
"sync"
"fmt"
"os"
"time"
"git.zhangdeman.cn/zhangdeman/network/util"
"git.zhangdeman.cn/zhangdeman/queue/abstract"
"git.zhangdeman.cn/zhangdeman/queue/define"
"git.zhangdeman.cn/zhangdeman/redis"
"git.zhangdeman.cn/zhangdeman/serialize"
"git.zhangdeman.cn/zhangdeman/wrapper/op_string"
"github.com/redis/go-redis/v9"
)
// NewRedisQueue 获取redis队列实例
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:09 2022/7/6
func NewRedisQueue(redisInstance *redis.Client) IProduce {
return &redisProduce{client: redisInstance}
}
// withRedis 使用redis实现延迟队列
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:59 2022/7/6
type redisProduce struct {
client *redis.Client
}
// Produce 生产数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:03 2022/7/6
func (rp *redisProduce) Produce(ctx context.Context, data ...*Queue) error {
if len(data) == 0 {
return nil
// NewRedisProducer 获取基于redis的生产者实例
func NewRedisProducer(
queueName string, queueCnt int, redisFlag string,
producerHandler abstract.IProducerHandler,
) abstract.IProducer {
if queueName == "" || redisFlag == "" {
panic("init redis producer: queue name or redis flag is empty")
}
if nil == ctx {
ctx = context.Background()
// 验证redis实例是否存在
if _, err := redis.Client.GetRealClientWithError(redisFlag); nil != err {
panic(err.Error())
}
wg := &sync.WaitGroup{}
wg.Add(len(data))
for _, queueData := range data {
go func(inputQueueData *Queue) {
defer wg.Done()
inputQueueData.err = rp.client.ZAdd(ctx, inputQueueData.Name, rp.buildAddMember(inputQueueData)).Err()
}(queueData)
if queueCnt <= 0 {
queueCnt = 1 // 默认单队列
}
wg.Wait()
return nil
}
// buildAddMember ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:22 2022/7/6
func (rp *redisProduce) buildAddMember(queueData *Queue) redis.Z {
return redis.Z{
Score: float64(time.Now().Unix() + queueData.DelayTime),
Member: serialize.JSON.MarshalForStringIgnoreError(&ProduceData{
MsgID: op_string.RandomMd5().Value,
Timestamp: time.Now().UnixNano() / 1e6,
Host: "",
Data: queueData.Data,
}),
if nil == producerHandler {
producerHandler = abstract.DefaultProducerHandler{}
}
return &redisProducer{
queueCnt: queueCnt,
queueName: queueName,
redisFlag: redisFlag,
producerHandler: producerHandler,
}
}
type redisProducer struct {
redisFlag string // redis 标识, 必须是使用统一的 pkg/redis 进行管理的
queueName string // 队列名称,基础公共前缀,尾部后缀会自动根据 shard cnt 进行哈希
queueCnt int // 队列数量
producerHandler abstract.IProducerHandler // 生产数据的处理逻辑
}
// fillEventData 填充一些系统数据数据
func (r *redisProducer) fillEventData(data *define.EventData) {
if len(data.Key) == 0 {
data.Key = data.MsgID // 不设置和MsgID一致
}
if len(data.TraceID) == 0 {
data.TraceID = data.Key
}
data.SystemTimestamp = time.Now().UnixMilli() // 系统时间
if data.Timestamp <= 0 {
data.Timestamp = data.SystemTimestamp
}
data.Hostname, _ = os.Hostname() // 服务器 hostname
data.HostIp = util.IP.GetHostIP() // 服务器IP
}
// Sync 同步发送
func (r *redisProducer) Sync(ctx context.Context, data *define.EventData) {
if nil == data {
return
}
r.fillEventData(data)
realQueue := r.getRealQueue(data)
sendRedisRes := redis.Wrapper.LPush(ctx, r.redisFlag, realQueue, serialize.JSON.MarshalForStringIgnoreError(data))
res := &define.EventProduceResult{
Queue: realQueue,
Success: sendRedisRes.Err == nil,
Err: sendRedisRes.Err,
Cost: sendRedisRes.UsedTime,
}
if res.Success {
r.producerHandler.SuccessCallback(ctx, data, res)
} else {
r.producerHandler.FailureCallback(ctx, data, res)
}
}
// Async 异步发送
func (r *redisProducer) Async(ctx context.Context, data *define.EventData) {
go func() {
defer func() {
// 如果出现panic, 则触发失败回调
if err := recover(); nil != err {
r.producerHandler.PanicCallback(ctx, data, err)
}
}()
r.Sync(ctx, data)
}()
}
// GetProducerHandler 获取producer处理器
func (r *redisProducer) GetProducerHandler() abstract.IProducerHandler {
return r.producerHandler
}
// getRealQueue 获取真实的队列key
func (r *redisProducer) getRealQueue(data *define.EventData) string {
queueIdx := op_string.HashNumber(data.Key).Value % uint64(r.queueCnt)
return fmt.Sprintf("%v_%v", r.queueName, queueIdx)
}