diff --git a/delay/abstract.go b/delay/abstract.go index 486d569..23d0c14 100644 --- a/delay/abstract.go +++ b/delay/abstract.go @@ -28,7 +28,7 @@ type IProduce interface { // Date : 10:19 2022/7/7 type IConsumer interface { // Consume 消费数据 - Consume(ctx context.Context) ([]*ZRangeData, error) + Consume(ctx context.Context) ([]*ProduceData, error) // ConsumeWithHandler 消费数据并使用handler处理 ConsumeWithHandler(ctx context.Context, handler IHandler) error } @@ -40,5 +40,5 @@ type IConsumer interface { // Date : 10:26 2022/7/7 type IHandler interface { // Handle 处理消费到的数据 - Handle(queData []*ZRangeData) error + Handle(queData []*ProduceData) error } diff --git a/delay/redis_dispatch_consumer.go b/delay/redis_dispatch_consumer.go index c05219c..c90957d 100644 --- a/delay/redis_dispatch_consumer.go +++ b/delay/redis_dispatch_consumer.go @@ -9,10 +9,10 @@ package delay import ( "context" - "encoding/json" "errors" "time" + "git.zhangdeman.cn/zhangdeman/serialize" "github.com/redis/go-redis/v9" ) @@ -33,21 +33,28 @@ type redisConsumer struct { cfg *ConsumerConfig } -func (r *redisConsumer) Consume(ctx context.Context) ([]*ZRangeData, error) { +func (r *redisConsumer) Consume(ctx context.Context) ([]*ProduceData, error) { if nil == ctx { ctx = context.Background() } zRangeResult := r.redisInstance.ZRange(ctx, r.cfg.QueueName, 0, time.Now().UnixNano()/1e6) if err := zRangeResult.Err(); nil != err { - return make([]*ZRangeData, 0), err + return make([]*ProduceData, 0), err } // 格式化数据 var ( - result []*ZRangeData + result []*ProduceData ) - if err := json.Unmarshal([]byte(zRangeResult.String()), &result); nil != err { - return make([]*ZRangeData, 0), err + valueList := zRangeResult.Val() + if len(valueList) == 0 { + return result, nil } + for _, item := range valueList { + d := &ProduceData{} + serialize.JSON.UnmarshalWithNumberForStringIgnoreError(item, d) + result = append(result, d) + } + return result, nil } @@ -56,7 +63,7 @@ func (r *redisConsumer) ConsumeWithHandler(ctx context.Context, handler IHandler return errors.New("handler instance is nil") } var ( - msgList []*ZRangeData + msgList []*ProduceData err error )