// Package delay ... // // Description : delay ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2022-07-06 17:59 package delay import ( "context" "encoding/json" "sync" "time" "git.zhangdeman.cn/zhangdeman/util" "github.com/go-redis/redis/v8" ) // NewRedisQueue 获取redis队列实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:09 2022/7/6 func NewRedisQueue(redisInstance *redis.Client) IProduce { return &redisProduce{client: redisInstance} } // withRedis 使用redis实现延迟队列 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:59 2022/7/6 type redisProduce struct { client *redis.Client } // Produce 生产数据 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:03 2022/7/6 func (rp *redisProduce) Produce(ctx context.Context, data ...*Queue) error { if len(data) == 0 { return nil } if nil == ctx { ctx = context.Background() } wg := &sync.WaitGroup{} wg.Add(len(data)) for _, queueData := range data { go func(inputQueueData *Queue) { defer wg.Done() inputQueueData.err = rp.client.ZAdd(ctx, inputQueueData.Name, rp.buildAddMember(inputQueueData)).Err() }(queueData) } wg.Wait() return nil } // buildAddMember ... // // Author : go_developer@163.com<白茶清欢> // // Date : 18:22 2022/7/6 func (rp *redisProduce) buildAddMember(queueData *Queue) *redis.Z { byteData, _ := json.Marshal(&ProduceData{ MsgID: util.String.Md5(util.String.GenRandom("", 16)), Timestamp: time.Now().UnixNano() / 1e6, Host: util.IP.GetHostIP(), Data: queueData.Data, }) return &redis.Z{ Score: float64(time.Now().Unix() + queueData.DelayTime), Member: string(byteData), } }