Files
queue/delay/redis_dispatch_consumer.go
2026-04-12 23:44:23 +08:00

104 lines
2.2 KiB
Go

// Package delay ...
//
// Description : delay ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2022-07-07 10:32
package delay
import (
"context"
"errors"
"time"
"git.zhangdeman.cn/zhangdeman/serialize"
"github.com/redis/go-redis/v9"
)
// NewRedisConsumer redis延迟队列消费者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:10 2022/7/8
func NewRedisConsumer(redisInstance *redis.Client, cfg *ConsumerConfig) IConsumer {
return &redisConsumer{
redisInstance: redisInstance,
cfg: cfg,
}
}
type redisConsumer struct {
redisInstance *redis.Client
cfg *ConsumerConfig
}
func (r *redisConsumer) Consume(ctx context.Context) ([]*ProduceData, error) {
if nil == ctx {
ctx = context.Background()
}
zRangeResult := r.redisInstance.ZRange(ctx, r.cfg.QueueName, 0, time.Now().UnixNano()/1e6)
if err := zRangeResult.Err(); nil != err {
return make([]*ProduceData, 0), err
}
// 格式化数据
var (
result []*ProduceData
)
valueList := zRangeResult.Val()
if len(valueList) == 0 {
return result, nil
}
for _, item := range valueList {
d := &ProduceData{}
serialize.JSON.UnmarshalWithNumberForStringIgnoreError(item, d)
result = append(result, d)
}
return result, nil
}
func (r *redisConsumer) ConsumeWithHandler(ctx context.Context, handler IHandler) error {
if nil == handler {
return errors.New("handler instance is nil")
}
var (
msgList []*ProduceData
err error
)
if msgList, err = r.Consume(ctx); nil != err {
return err
}
// 未订阅到消息
if len(msgList) == 0 {
return nil
}
return handler.Handle(msgList)
/* wg := &sync.WaitGroup{}
wg.Add(len(msgList))
for _, item := range msgList {
go func(msgData *ZRangeData) {
defer wg.Done()
hashValue, exist := msgData.Data.Data[r.cfg.HashKey]
if !exist || hashValue == nil {
hashValue = msgData.Data.MsgID
}
shard := util.Hash.GetHashIDMod(hashValue, r.cfg.SonQueueCnt)
realQueue := fmt.Sprintf(r.cfg.QueueName+"_%d", shard)
r.redisInstance.LPush()
}(item)
}
wg.Wait()
return nil*/
}
type redisConsumerMsgHandler struct {
}
func (r redisConsumerMsgHandler) Handle(queData []*ZRangeData) error {
panic("implement me")
}