规划消费者

This commit is contained in:
白茶清欢 2022-07-08 16:53:42 +08:00
parent 85f9e62590
commit d049110155
3 changed files with 121 additions and 5 deletions

View File

@ -9,8 +9,6 @@ package delay
import (
"context"
"github.com/go-redis/redis/v8"
)
// IProduce 生产者约束
@ -30,9 +28,9 @@ type IProduce interface {
// Date : 10:19 2022/7/7
type IConsumer interface {
// Consume 消费数据
Consume(queueName string) ([]*redis.Z, error)
Consume(ctx context.Context) ([]*ZRangeData, error)
// ConsumeWithHandler 消费数据并使用handler处理
ConsumeWithHandler(queueName string, handler IHandler) error
ConsumeWithHandler(ctx context.Context, handler IHandler) error
}
// IHandler 消息的处理
@ -42,5 +40,5 @@ type IConsumer interface {
// Date : 10:26 2022/7/7
type IHandler interface {
// Handle 处理消费到的数据
Handle(queData []*ProduceData) error
Handle(queData []*ZRangeData) error
}

View File

@ -19,6 +19,16 @@ type ProduceData struct {
Data map[string]interface{} `json:"data"` // 传入的业务数据
}
// ZRangeData zRange读取的数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:24 2022/7/8
type ZRangeData struct {
Score int64 // 数据分值
Data *ProduceData // 实际业务数据
}
// Queue 队列数据
//
// Author : go_developer@163.com<白茶清欢>
@ -39,3 +49,15 @@ type Queue struct {
func (q *Queue) Err() error {
return q.err
}
// ConsumerConfig 消费者配置
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:12 2022/7/8
type ConsumerConfig struct {
QueueName string // 队列名称
SonQueueCnt int // 二级队列数量
SonQueName string // 二级队列名称
HashKey string // hash消息写到哪个二级队列的key, 若不配置或者key不存在, 使用 ProduceData.MsgID
}

View File

@ -0,0 +1,96 @@
// Package delay ...
//
// Description : delay ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2022-07-07 10:32
package delay
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/go-redis/redis/v8"
)
// NewRedisConsumer redis延迟队列消费者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:10 2022/7/8
func NewRedisConsumer(redisInstance *redis.Client, cfg *ConsumerConfig) IConsumer {
return &redisConsumer{
redisInstance: redisInstance,
cfg: cfg,
}
}
type redisConsumer struct {
redisInstance *redis.Client
cfg *ConsumerConfig
}
func (r *redisConsumer) Consume(ctx context.Context) ([]*ZRangeData, error) {
if nil == ctx {
ctx = context.Background()
}
zRangeResult := r.redisInstance.ZRange(ctx, r.cfg.QueueName, 0, time.Now().UnixNano()/1e6)
if err := zRangeResult.Err(); nil != err {
return make([]*ZRangeData, 0), err
}
// 格式化数据
var (
result []*ZRangeData
)
if err := json.Unmarshal([]byte(zRangeResult.String()), &result); nil != err {
return make([]*ZRangeData, 0), err
}
return result, nil
}
func (r *redisConsumer) ConsumeWithHandler(ctx context.Context, handler IHandler) error {
if nil == handler {
return errors.New("handler instance is nil")
}
var (
msgList []*ZRangeData
err error
)
if msgList, err = r.Consume(ctx); nil != err {
return err
}
// 未订阅到消息
if len(msgList) == 0 {
return nil
}
return handler.Handle(msgList)
/* wg := &sync.WaitGroup{}
wg.Add(len(msgList))
for _, item := range msgList {
go func(msgData *ZRangeData) {
defer wg.Done()
hashValue, exist := msgData.Data.Data[r.cfg.HashKey]
if !exist || hashValue == nil {
hashValue = msgData.Data.MsgID
}
shard := util.Hash.GetHashIDMod(hashValue, r.cfg.SonQueueCnt)
realQueue := fmt.Sprintf(r.cfg.QueueName+"_%d", shard)
r.redisInstance.LPush()
}(item)
}
wg.Wait()
return nil*/
}
type redisConsumerMsgHandler struct {
}
func (r redisConsumerMsgHandler) Handle(queData []*ZRangeData) error {
panic("implement me")
}