diff --git a/delay/abstract.go b/delay/abstract.go index 9966067..842c641 100644 --- a/delay/abstract.go +++ b/delay/abstract.go @@ -7,6 +7,10 @@ // Date : 2022-07-06 18:06 package delay +import ( + "context" +) + // IProduce 生产者约束 // // Author : go_developer@163.com<白茶清欢> @@ -14,5 +18,5 @@ package delay // Date : 18:06 2022/7/6 type IProduce interface { // Produce 生产数据 - Produce(data *Queue) error + Produce(ctx context.Context, data ...*Queue) error } diff --git a/delay/define.go b/delay/define.go index 944a00c..29a5e54 100644 --- a/delay/define.go +++ b/delay/define.go @@ -14,6 +14,16 @@ package delay // Date : 17:56 2022/7/6 type Queue struct { Name string `json:"name"` // 队列名称 - DelayTime int `json:"delay_time"` // 延迟执行时间 + DelayTime int64 `json:"delay_time"` // 延迟执行时间 Data map[string]interface{} `json:"data"` // 入队数据 + err error // 数据入队的异常信息 +} + +// Err 获取入队异常 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:37 2022/7/6 +func (q *Queue) Err() error { + return q.err } diff --git a/delay/redis_produce.go b/delay/redis_produce.go index f8e1672..31b100f 100644 --- a/delay/redis_produce.go +++ b/delay/redis_produce.go @@ -8,6 +8,11 @@ package delay import ( + "context" + "encoding/json" + "sync" + "time" + "github.com/go-redis/redis/v8" ) @@ -34,6 +39,35 @@ type redisProduce struct { // Author : go_developer@163.com<白茶清欢> // // Date : 18:03 2022/7/6 -func (rp *redisProduce) Produce(data *Queue) error { +func (rp *redisProduce) Produce(ctx context.Context, data ...*Queue) error { + if len(data) == 0 { + return nil + } + if nil == ctx { + ctx = context.Background() + } + + wg := &sync.WaitGroup{} + wg.Add(len(data)) + for _, queueData := range data { + go func(inputQueueData *Queue) { + defer wg.Done() + inputQueueData.err = rp.client.ZAdd(ctx, inputQueueData.Name, rp.buildAddMember(inputQueueData)).Err() + }(queueData) + } + wg.Wait() return nil } + +// buildAddMember ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:22 2022/7/6 +func (rp *redisProduce) buildAddMember(queueData *Queue) *redis.Z { + byteData, _ := json.Marshal(queueData) + return &redis.Z{ + Score: float64(time.Now().Unix() + queueData.DelayTime), + Member: string(byteData), + } +}