130 lines
4.1 KiB
Go
130 lines
4.1 KiB
Go
// Package delay ...
|
|
//
|
|
// Description : 基于redis实现的延迟事件队列
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 2025-08-29 09:11
|
|
package delay
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"git.zhangdeman.cn/zhangdeman/queue/abstract"
|
|
"git.zhangdeman.cn/zhangdeman/queue/define"
|
|
redisPkg "git.zhangdeman.cn/zhangdeman/redis"
|
|
redisPkgDefine "git.zhangdeman.cn/zhangdeman/redis/define"
|
|
"git.zhangdeman.cn/zhangdeman/serialize"
|
|
"github.com/tidwall/gjson"
|
|
)
|
|
|
|
// NewRedisDelayEvent 获取延迟队列实例
|
|
// 参数说明:
|
|
// - redisFlag: 使用那个redis实例
|
|
// - queueName: 延迟队列名称
|
|
// - pullTimeInterval: 延迟队列扫描的时间间隔, 单位秒
|
|
// - distributeProducer: 消息生产者示例, 扫描到的数据通过此实例向二级任务队列分发
|
|
func NewRedisDelayEvent(redisFlag string, queueName string, pullTimeInterval int64, distributeProducer abstract.IProducer) abstract.IDelayQueue {
|
|
if queueName == "" || redisFlag == "" {
|
|
panic("init redis delay event: queue name or redis flag is empty")
|
|
}
|
|
if nil == distributeProducer {
|
|
panic("distributeProducer is nil")
|
|
}
|
|
// 验证redis实例是否存在
|
|
if _, err := redisPkg.Client.GetRealClientWithError(redisFlag); nil != err {
|
|
panic(err.Error())
|
|
}
|
|
if pullTimeInterval <= 0 {
|
|
pullTimeInterval = 60
|
|
}
|
|
return &redisDelayEvent{
|
|
stopChan: make(chan bool, 1),
|
|
redisFlag: redisFlag,
|
|
delayQueueName: queueName,
|
|
pullTimeInterval: pullTimeInterval,
|
|
distributeProducer: distributeProducer,
|
|
}
|
|
}
|
|
|
|
type redisDelayEvent struct {
|
|
stopChan chan bool // 停止请求的chan
|
|
redisFlag string // 使用的redis实例
|
|
delayQueueName string // 延迟队列名称
|
|
pullTimeInterval int64 // 延迟队列多久拉取一次数据
|
|
distributeProducer abstract.IProducer // 事件分发的生产者
|
|
}
|
|
|
|
// Send 生成一条延时事件
|
|
// 参数说明:
|
|
// - delayTime: 延时时长, 单位: s
|
|
// - data: 要发送的数据
|
|
func (rde redisDelayEvent) Send(ctx context.Context, delayTime int64, data *define.EventData) *redisPkgDefine.RedisResult {
|
|
if delayTime <= 0 {
|
|
// 默认延迟 1min
|
|
delayTime = 60
|
|
}
|
|
if delayTime >= time.Now().Unix() {
|
|
// 指定具体的延迟时间, 重置延时时间为相对时间
|
|
delayTime = delayTime - time.Now().Unix()
|
|
}
|
|
return redisPkg.Wrapper.ZAdd(ctx, rde.redisFlag, rde.delayQueueName, time.Now().Unix()+delayTime, serialize.JSON.MarshalForStringIgnoreError(data))
|
|
}
|
|
|
|
// Distribute 到期事件事件分发
|
|
func (rde redisDelayEvent) Distribute(ctx context.Context) {
|
|
for {
|
|
hasFinish := false
|
|
select {
|
|
case <-ctx.Done():
|
|
// 收到上下文已完成事件, 退出
|
|
hasFinish = true
|
|
case <-rde.stopChan:
|
|
// 程序调用stop方法, 退出
|
|
hasFinish = true
|
|
default:
|
|
now := time.Now()
|
|
// 计时器, 固定时长扫描一次
|
|
// ZRangeAndRemByScore 此方法内部已经自动包装了删除逻辑
|
|
eventDataListRes := redisPkg.Wrapper.ZRangeAndRemByScore(ctx, rde.redisFlag, rde.delayQueueName, 0, now.Unix())
|
|
// ZRange 返回数据时二维数组 [["数据部分", "设置的数据score"]]
|
|
resList := gjson.Parse(eventDataListRes.Result).Array()
|
|
if len(resList) > 0 {
|
|
for _, itemData := range resList {
|
|
var (
|
|
resFormat define.EventData
|
|
err error
|
|
)
|
|
|
|
if err = serialize.JSON.UnmarshalWithNumberForString(itemData.Get("0").String(), &resFormat); nil != err {
|
|
// 数据解析失败, 触发失败回调
|
|
rde.distributeProducer.GetProducerHandler().FailureCallback(ctx, &define.EventData{
|
|
Type: "UNMARSHAL_FAILURE",
|
|
Data: itemData.Get("0").String(),
|
|
}, &define.EventProduceResult{
|
|
Queue: "",
|
|
Success: false,
|
|
Err: err,
|
|
Cost: 0,
|
|
})
|
|
continue
|
|
} else {
|
|
// 生产数据, 向耳机队列分发任务
|
|
rde.distributeProducer.Sync(ctx, &resFormat)
|
|
}
|
|
}
|
|
}
|
|
// 间隔指定时间拉取一次
|
|
time.Sleep(time.Second * time.Duration(rde.pullTimeInterval))
|
|
}
|
|
if hasFinish {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rde redisDelayEvent) Stop() {
|
|
rde.stopChan <- true
|
|
}
|