From fe92fa5374f34c36d87ff7031ef8dcec2b89f78f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 6 Jul 2022 18:41:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0redis=E5=BB=B6=E8=BF=9F?= =?UTF-8?q?=E9=98=9F=E5=88=97=E7=AC=AC=E4=B8=80=E7=89=88=E7=94=9F=E4=BA=A7?= =?UTF-8?q?=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- delay/abstract.go | 6 +++++- delay/define.go | 12 +++++++++++- delay/redis_produce.go | 36 +++++++++++++++++++++++++++++++++++- 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/delay/abstract.go b/delay/abstract.go index 9966067..842c641 100644 --- a/delay/abstract.go +++ b/delay/abstract.go @@ -7,6 +7,10 @@ // Date : 2022-07-06 18:06 package delay +import ( + "context" +) + // IProduce 生产者约束 // // Author : go_developer@163.com<白茶清欢> @@ -14,5 +18,5 @@ package delay // Date : 18:06 2022/7/6 type IProduce interface { // Produce 生产数据 - Produce(data *Queue) error + Produce(ctx context.Context, data ...*Queue) error } diff --git a/delay/define.go b/delay/define.go index 944a00c..29a5e54 100644 --- a/delay/define.go +++ b/delay/define.go @@ -14,6 +14,16 @@ package delay // Date : 17:56 2022/7/6 type Queue struct { Name string `json:"name"` // 队列名称 - DelayTime int `json:"delay_time"` // 延迟执行时间 + DelayTime int64 `json:"delay_time"` // 延迟执行时间 Data map[string]interface{} `json:"data"` // 入队数据 + err error // 数据入队的异常信息 +} + +// Err 获取入队异常 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:37 2022/7/6 +func (q *Queue) Err() error { + return q.err } diff --git a/delay/redis_produce.go b/delay/redis_produce.go index f8e1672..31b100f 100644 --- a/delay/redis_produce.go +++ b/delay/redis_produce.go @@ -8,6 +8,11 @@ package delay import ( + "context" + "encoding/json" + "sync" + "time" + "github.com/go-redis/redis/v8" ) @@ -34,6 +39,35 @@ type redisProduce struct { // Author : go_developer@163.com<白茶清欢> // // Date : 18:03 2022/7/6 -func (rp *redisProduce) Produce(data *Queue) error { +func (rp *redisProduce) Produce(ctx context.Context, data ...*Queue) error { + if len(data) == 0 { + return nil + } + if nil == ctx { + ctx = context.Background() + } + + wg := &sync.WaitGroup{} + wg.Add(len(data)) + for _, queueData := range data { + go func(inputQueueData *Queue) { + defer wg.Done() + inputQueueData.err = rp.client.ZAdd(ctx, inputQueueData.Name, rp.buildAddMember(inputQueueData)).Err() + }(queueData) + } + wg.Wait() return nil } + +// buildAddMember ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:22 2022/7/6 +func (rp *redisProduce) buildAddMember(queueData *Queue) *redis.Z { + byteData, _ := json.Marshal(queueData) + return &redis.Z{ + Score: float64(time.Now().Unix() + queueData.DelayTime), + Member: string(byteData), + } +}