// Package delay ... // // Description : delay ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2022-07-06 17:59 package delay import ( "context" "sync" "time" "git.zhangdeman.cn/zhangdeman/serialize" "git.zhangdeman.cn/zhangdeman/wrapper/op_string" "github.com/redis/go-redis/v9" ) // NewRedisQueue 获取redis队列实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:09 2022/7/6 func NewRedisQueue(redisInstance *redis.Client) IProduce { return &redisProduce{client: redisInstance} } // withRedis 使用redis实现延迟队列 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:59 2022/7/6 type redisProduce struct { client *redis.Client } // Produce 生产数据 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:03 2022/7/6 func (rp *redisProduce) Produce(ctx context.Context, data ...*Queue) error { if len(data) == 0 { return nil } if nil == ctx { ctx = context.Background() } wg := &sync.WaitGroup{} wg.Add(len(data)) for _, queueData := range data { go func(inputQueueData *Queue) { defer wg.Done() inputQueueData.err = rp.client.ZAdd(ctx, inputQueueData.Name, rp.buildAddMember(inputQueueData)).Err() }(queueData) } wg.Wait() return nil } // buildAddMember ... // // Author : go_developer@163.com<白茶清欢> // // Date : 18:22 2022/7/6 func (rp *redisProduce) buildAddMember(queueData *Queue) redis.Z { return redis.Z{ Score: float64(time.Now().Unix() + queueData.DelayTime), Member: serialize.JSON.MarshalForStringIgnoreError(&ProduceData{ MsgID: op_string.RandomMd5().Value, Timestamp: time.Now().UnixNano() / 1e6, Host: "", Data: queueData.Data, }), } }