diff --git a/delay/abstract.go b/delay/abstract.go index d491388..486d569 100644 --- a/delay/abstract.go +++ b/delay/abstract.go @@ -9,8 +9,6 @@ package delay import ( "context" - - "github.com/go-redis/redis/v8" ) // IProduce 生产者约束 @@ -30,9 +28,9 @@ type IProduce interface { // Date : 10:19 2022/7/7 type IConsumer interface { // Consume 消费数据 - Consume(queueName string) ([]*redis.Z, error) + Consume(ctx context.Context) ([]*ZRangeData, error) // ConsumeWithHandler 消费数据并使用handler处理 - ConsumeWithHandler(queueName string, handler IHandler) error + ConsumeWithHandler(ctx context.Context, handler IHandler) error } // IHandler 消息的处理 @@ -42,5 +40,5 @@ type IConsumer interface { // Date : 10:26 2022/7/7 type IHandler interface { // Handle 处理消费到的数据 - Handle(queData []*ProduceData) error + Handle(queData []*ZRangeData) error } diff --git a/delay/define.go b/delay/define.go index 4870186..aa978c2 100644 --- a/delay/define.go +++ b/delay/define.go @@ -19,6 +19,16 @@ type ProduceData struct { Data map[string]interface{} `json:"data"` // 传入的业务数据 } +// ZRangeData zRange读取的数据 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 14:24 2022/7/8 +type ZRangeData struct { + Score int64 // 数据分值 + Data *ProduceData // 实际业务数据 +} + // Queue 队列数据 // // Author : go_developer@163.com<白茶清欢> @@ -39,3 +49,15 @@ type Queue struct { func (q *Queue) Err() error { return q.err } + +// ConsumerConfig 消费者配置 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 14:12 2022/7/8 +type ConsumerConfig struct { + QueueName string // 队列名称 + SonQueueCnt int // 二级队列数量 + SonQueName string // 二级队列名称 + HashKey string // hash消息写到哪个二级队列的key, 若不配置或者key不存在, 使用 ProduceData.MsgID +} diff --git a/delay/redis_dispatch_consumer.go b/delay/redis_dispatch_consumer.go new file mode 100644 index 0000000..c9a5c81 --- /dev/null +++ b/delay/redis_dispatch_consumer.go @@ -0,0 +1,96 @@ +// Package delay ... +// +// Description : delay ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2022-07-07 10:32 +package delay + +import ( + "context" + "encoding/json" + "errors" + "time" + + "github.com/go-redis/redis/v8" +) + +// NewRedisConsumer redis延迟队列消费者 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 14:10 2022/7/8 +func NewRedisConsumer(redisInstance *redis.Client, cfg *ConsumerConfig) IConsumer { + return &redisConsumer{ + redisInstance: redisInstance, + cfg: cfg, + } +} + +type redisConsumer struct { + redisInstance *redis.Client + cfg *ConsumerConfig +} + +func (r *redisConsumer) Consume(ctx context.Context) ([]*ZRangeData, error) { + if nil == ctx { + ctx = context.Background() + } + zRangeResult := r.redisInstance.ZRange(ctx, r.cfg.QueueName, 0, time.Now().UnixNano()/1e6) + if err := zRangeResult.Err(); nil != err { + return make([]*ZRangeData, 0), err + } + // 格式化数据 + var ( + result []*ZRangeData + ) + if err := json.Unmarshal([]byte(zRangeResult.String()), &result); nil != err { + return make([]*ZRangeData, 0), err + } + return result, nil +} + +func (r *redisConsumer) ConsumeWithHandler(ctx context.Context, handler IHandler) error { + if nil == handler { + return errors.New("handler instance is nil") + } + var ( + msgList []*ZRangeData + err error + ) + + if msgList, err = r.Consume(ctx); nil != err { + return err + } + + // 未订阅到消息 + if len(msgList) == 0 { + return nil + } + return handler.Handle(msgList) + /* wg := &sync.WaitGroup{} + wg.Add(len(msgList)) + for _, item := range msgList { + + go func(msgData *ZRangeData) { + defer wg.Done() + hashValue, exist := msgData.Data.Data[r.cfg.HashKey] + if !exist || hashValue == nil { + hashValue = msgData.Data.MsgID + } + shard := util.Hash.GetHashIDMod(hashValue, r.cfg.SonQueueCnt) + realQueue := fmt.Sprintf(r.cfg.QueueName+"_%d", shard) + r.redisInstance.LPush() + }(item) + } + wg.Wait() + return nil*/ +} + +type redisConsumerMsgHandler struct { +} + +func (r redisConsumerMsgHandler) Handle(queData []*ZRangeData) error { + panic("implement me") +}