增加redis延迟队列第一版生产者
This commit is contained in:
		| @ -7,6 +7,10 @@ | ||||
| // Date : 2022-07-06 18:06 | ||||
| package delay | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| ) | ||||
|  | ||||
| // IProduce 生产者约束 | ||||
| // | ||||
| // Author : go_developer@163.com<白茶清欢> | ||||
| @ -14,5 +18,5 @@ package delay | ||||
| // Date : 18:06 2022/7/6 | ||||
| type IProduce interface { | ||||
| 	// Produce 生产数据 | ||||
| 	Produce(data *Queue) error | ||||
| 	Produce(ctx context.Context, data ...*Queue) error | ||||
| } | ||||
|  | ||||
| @ -14,6 +14,16 @@ package delay | ||||
| // Date : 17:56 2022/7/6 | ||||
| type Queue struct { | ||||
| 	Name      string                 `json:"name"`       // 队列名称 | ||||
| 	DelayTime int                    `json:"delay_time"` // 延迟执行时间 | ||||
| 	DelayTime int64                  `json:"delay_time"` // 延迟执行时间 | ||||
| 	Data      map[string]interface{} `json:"data"`       // 入队数据 | ||||
| 	err       error                  // 数据入队的异常信息 | ||||
| } | ||||
|  | ||||
| // Err 获取入队异常 | ||||
| // | ||||
| // Author : go_developer@163.com<白茶清欢> | ||||
| // | ||||
| // Date : 18:37 2022/7/6 | ||||
| func (q *Queue) Err() error { | ||||
| 	return q.err | ||||
| } | ||||
|  | ||||
| @ -8,6 +8,11 @@ | ||||
| package delay | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/go-redis/redis/v8" | ||||
| ) | ||||
|  | ||||
| @ -34,6 +39,35 @@ type redisProduce struct { | ||||
| // Author : go_developer@163.com<白茶清欢> | ||||
| // | ||||
| // Date : 18:03 2022/7/6 | ||||
| func (rp *redisProduce) Produce(data *Queue) error { | ||||
| func (rp *redisProduce) Produce(ctx context.Context, data ...*Queue) error { | ||||
| 	if len(data) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
| 	if nil == ctx { | ||||
| 		ctx = context.Background() | ||||
| 	} | ||||
|  | ||||
| 	wg := &sync.WaitGroup{} | ||||
| 	wg.Add(len(data)) | ||||
| 	for _, queueData := range data { | ||||
| 		go func(inputQueueData *Queue) { | ||||
| 			defer wg.Done() | ||||
| 			inputQueueData.err = rp.client.ZAdd(ctx, inputQueueData.Name, rp.buildAddMember(inputQueueData)).Err() | ||||
| 		}(queueData) | ||||
| 	} | ||||
| 	wg.Wait() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // buildAddMember ... | ||||
| // | ||||
| // Author : go_developer@163.com<白茶清欢> | ||||
| // | ||||
| // Date : 18:22 2022/7/6 | ||||
| func (rp *redisProduce) buildAddMember(queueData *Queue) *redis.Z { | ||||
| 	byteData, _ := json.Marshal(queueData) | ||||
| 	return &redis.Z{ | ||||
| 		Score:  float64(time.Now().Unix() + queueData.DelayTime), | ||||
| 		Member: string(byteData), | ||||
| 	} | ||||
| } | ||||
|  | ||||
		Reference in New Issue
	
	Block a user