From 237457c65c9d0ea66dd3d6a180cecac32841baf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Sun, 12 Apr 2026 23:44:23 +0800 Subject: [PATCH] feat: update producer --- delay/abstract.go | 4 ++-- delay/redis_dispatch_consumer.go | 21 ++++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/delay/abstract.go b/delay/abstract.go index 486d569..23d0c14 100644 --- a/delay/abstract.go +++ b/delay/abstract.go @@ -28,7 +28,7 @@ type IProduce interface { // Date : 10:19 2022/7/7 type IConsumer interface { // Consume 消费数据 - Consume(ctx context.Context) ([]*ZRangeData, error) + Consume(ctx context.Context) ([]*ProduceData, error) // ConsumeWithHandler 消费数据并使用handler处理 ConsumeWithHandler(ctx context.Context, handler IHandler) error } @@ -40,5 +40,5 @@ type IConsumer interface { // Date : 10:26 2022/7/7 type IHandler interface { // Handle 处理消费到的数据 - Handle(queData []*ZRangeData) error + Handle(queData []*ProduceData) error } diff --git a/delay/redis_dispatch_consumer.go b/delay/redis_dispatch_consumer.go index c05219c..c90957d 100644 --- a/delay/redis_dispatch_consumer.go +++ b/delay/redis_dispatch_consumer.go @@ -9,10 +9,10 @@ package delay import ( "context" - "encoding/json" "errors" "time" + "git.zhangdeman.cn/zhangdeman/serialize" "github.com/redis/go-redis/v9" ) @@ -33,21 +33,28 @@ type redisConsumer struct { cfg *ConsumerConfig } -func (r *redisConsumer) Consume(ctx context.Context) ([]*ZRangeData, error) { +func (r *redisConsumer) Consume(ctx context.Context) ([]*ProduceData, error) { if nil == ctx { ctx = context.Background() } zRangeResult := r.redisInstance.ZRange(ctx, r.cfg.QueueName, 0, time.Now().UnixNano()/1e6) if err := zRangeResult.Err(); nil != err { - return make([]*ZRangeData, 0), err + return make([]*ProduceData, 0), err } // 格式化数据 var ( - result []*ZRangeData + result []*ProduceData ) - if err := json.Unmarshal([]byte(zRangeResult.String()), &result); nil != err { - return make([]*ZRangeData, 0), err + valueList := zRangeResult.Val() + if len(valueList) == 0 { + return result, nil } + for _, item := range valueList { + d := &ProduceData{} + serialize.JSON.UnmarshalWithNumberForStringIgnoreError(item, d) + result = append(result, d) + } + return result, nil } @@ -56,7 +63,7 @@ func (r *redisConsumer) ConsumeWithHandler(ctx context.Context, handler IHandler return errors.New("handler instance is nil") } var ( - msgList []*ZRangeData + msgList []*ProduceData err error )