From ad44a847189dec865dcd02b231d2a170fdc499e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Mon, 13 Apr 2026 17:04:21 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20=E5=8D=87=E7=BA=A7=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E7=BA=A6=E6=9D=9F=E4=BB=A5=E5=8F=8A=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IConsumer.go | 14 ++++++++++ abstract/IConsumerHandler.go | 28 +++++++++++++++++++ abstract/IDelayQueue.go | 22 +++++++++++++++ abstract/IProducer.go | 21 ++++++++++++++ abstract/IProducerHandler.go | 46 +++++++++++++++++++++++++++++++ define/data.go | 53 ++++++++++++++++++++++++++++++++++++ go.mod | 3 ++ go.sum | 8 ++++++ 8 files changed, 195 insertions(+) create mode 100644 abstract/IConsumer.go create mode 100644 abstract/IConsumerHandler.go create mode 100644 abstract/IDelayQueue.go create mode 100644 abstract/IProducer.go create mode 100644 abstract/IProducerHandler.go create mode 100644 define/data.go diff --git a/abstract/IConsumer.go b/abstract/IConsumer.go new file mode 100644 index 0000000..5e31d08 --- /dev/null +++ b/abstract/IConsumer.go @@ -0,0 +1,14 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-22 16:33 +package abstract + +// IConsumer 消费者接口约束 +type IConsumer interface { + Start() error // 启动消费者 + Stop() // 停止消费者 +} diff --git a/abstract/IConsumerHandler.go b/abstract/IConsumerHandler.go new file mode 100644 index 0000000..e9ec2de --- /dev/null +++ b/abstract/IConsumerHandler.go @@ -0,0 +1,28 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-27 08:54 +package abstract + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/queue/define" +) + +// IConsumerHandler 消费者的一些处理逻辑 +type IConsumerHandler interface { + Lock(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) error // 加锁 + Unlock(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) error // 释放锁 + MessageLogic(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) error // 处理订阅到的消息 + PanicCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, e any) // 出现异常panic的回调 + SubscribeFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, err error) // 订阅失败的回调 + UnmarshalFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, msgData string, err error) // 数据解析失败的回调 + Async(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) bool // 每一条数据可独立判断是否进行异步处理 + LockFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, err error) // 加锁失败的回调 + UnlockFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, err error) // 释放锁失败的回调 + MessageLogicFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, err error) // 消息数据处理失败的回调 +} diff --git a/abstract/IDelayQueue.go b/abstract/IDelayQueue.go new file mode 100644 index 0000000..52f967e --- /dev/null +++ b/abstract/IDelayQueue.go @@ -0,0 +1,22 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-29 12:08 +package abstract + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/queue/define" + redisPkgDefine "git.zhangdeman.cn/zhangdeman/redis/define" +) + +// IDelayQueue 延迟队列接口约束 +type IDelayQueue interface { + Send(ctx context.Context, delayTime int64, data *define.EventData) *redisPkgDefine.RedisResult // 生产事件 + Distribute(ctx context.Context) // 时间到期分发 + Stop() // 停止延迟队列 +} diff --git a/abstract/IProducer.go b/abstract/IProducer.go new file mode 100644 index 0000000..9e71a66 --- /dev/null +++ b/abstract/IProducer.go @@ -0,0 +1,21 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-22 16:33 +package abstract + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/queue/define" +) + +// IProducer 生产者接口约束 +type IProducer interface { + Sync(ctx context.Context, data *define.EventData) // 同步发送事件 + Async(ctx context.Context, data *define.EventData) // 异步发送事件 + GetProducerHandler() IProducerHandler // 获取生产者处理器 +} diff --git a/abstract/IProducerHandler.go b/abstract/IProducerHandler.go new file mode 100644 index 0000000..38b7ba0 --- /dev/null +++ b/abstract/IProducerHandler.go @@ -0,0 +1,46 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-27 08:54 +package abstract + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/queue/define" +) + +// IProducerHandler 生产者数据处理的接口约束 +type IProducerHandler interface { + Lock(ctx context.Context, data *define.EventData) error // 逻辑加锁 + Unlock(ctx context.Context, data *define.EventData) error // 逻辑释放锁 + SuccessCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) // 成功回调 + FailureCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) // 失败回调 + PanicCallback(ctx context.Context, data *define.EventData, e any) // panic回调 +} + +type DefaultProducerHandler struct { +} + +func (d DefaultProducerHandler) Lock(ctx context.Context, data *define.EventData) error { + return nil +} + +func (d DefaultProducerHandler) Unlock(ctx context.Context, data *define.EventData) error { + return nil +} + +func (d DefaultProducerHandler) SuccessCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) { + return +} + +func (d DefaultProducerHandler) FailureCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) { + return +} + +func (d DefaultProducerHandler) PanicCallback(ctx context.Context, data *define.EventData, e any) { + return +} diff --git a/define/data.go b/define/data.go new file mode 100644 index 0000000..cb801c7 --- /dev/null +++ b/define/data.go @@ -0,0 +1,53 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-22 16:38 +package define + +// EventData 事件数据 +// - Timestamp: 事件的时间, 毫秒时间戳, 不设置默认当前时间 +// - SystemTimestamp: 系统时间(无需设置,自动填充) +// - TraceID: 事件trace_id, 关联到触发事件的请求, 若不设置和 MsgID 保持一致 +// - Hostname: 发送事件的机器名称(无需设置,自动填充) +// - HostIp: 发送事件的机器IP(无需设置,自动填充) +// - Type: 事件类型, 基于不同的类型, 对data有不同的处理规则 +// - Version: 业务迭代, Data 的数据结构可能发生变化, 使用 version 标记 Data 如何解析, 使用者自行设置于理解 +// - Data: 事件业务数据, 使用者自行设置与理解 +// - Key: 用于哈希队列分片,不设置与MsgID一致 +// - MsgID: 系统随机生成, 每条消息的唯一标识 +type EventData struct { + Timestamp int64 `json:"timestamp"` // 事件的时间, 毫秒时间戳, 不设置默认当前时间 + SystemTimestamp int64 `json:"system_timestamp"` // 系统时间(无需设置,自动填充) + TraceID string `json:"trace_id"` // 事件trace_id, 关联到触发事件的请求, 若不设置和 MsgID 保持一致 + Hostname string `json:"hostname"` // 发送事件的机器名称(无需设置,自动填充) + HostIp string `json:"host_ip"` // 发送事件的机器IP(无需设置,自动填充) + Type string `json:"type"` // 事件类型, 基于不同的类型, 对data有不同的处理规则 + Data string `json:"data"` // 事件业务数据, 统一使用字符串 + Version string `json:"version"` // 业务迭代, Data 的数据结构可能发生变化, 使用 Version 标记 Data 如何解析, 使用者自行设置与理解 + Key string `json:"key"` // 用于哈希队列分片,不设置与MsgID一致 + MsgID string `json:"msg_id"` // 系统随机生成, 表示唯一一条消息 + Processor string `json:"processor"` // 处理器标识, 必须设置!!! +} + +// EventProduceResult 事件生产结果 +// - Queue: 事件消息实际发送到哪一个队列 +// - Success: 事件消息是否发送成功 +// - Err: 事件消息发送失败是的异常信息 +// - Cost: 事件消息发送的耗时 +type EventProduceResult struct { + Queue string `json:"queue"` // 事件发送在哪一个队列 + Success bool `json:"success"` // 事件发送结果 + Err error `json:"err"` // 发送失败时, 失败原因 + Cost int64 `json:"cost"` // 事件发送耗时 +} + +// EventConsumerServerInfo 事件消费的系统信息 +type EventConsumerServerInfo struct { + SystemTimestamp int64 `json:"system_timestamp"` // 系统时间 + Hostname string `json:"hostname"` // 消费事件的机器名称 + HostIp string `json:"host_ip"` // 消费事件的机器IP + Queue string `json:"queue"` // 从哪个队列消费到的数据 +} diff --git a/go.mod b/go.mod index 41b1547..31666e3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.zhangdeman.cn/zhangdeman/queue go 1.24.0 require ( + git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5 git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674 git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20260321023345-6c6e467e3a14 github.com/redis/go-redis/v9 v9.18.0 @@ -18,6 +19,8 @@ require ( github.com/sbabiv/xml2map v1.2.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.1 // indirect gopkg.in/ini.v1 v1.67.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6758bc8..fc6551a 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ git.zhangdeman.cn/zhangdeman/consts v0.0.0-20260413082525-fb90982b1256 h1:zYkRoH git.zhangdeman.cn/zhangdeman/consts v0.0.0-20260413082525-fb90982b1256/go.mod h1:5p8CEKGBxi7qPtTXDI3HDmqKAfIm5i/aBWdrbkbdNjc= git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20251013024601-da007da2fb42 h1:VjYrb4adud7FHeiYS9XA0B/tOaJjfRejzQAlwimrrDc= git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20251013024601-da007da2fb42/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI= +git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5 h1:WjPlJ6CoXi2yFl4NGyyCWWpB2KbBP5LH2Hia6Xeecsc= +git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5/go.mod h1:y/0RBrC0CipiYJlBZdkvNRU4ROw9wZt+UqzZJBxC7Ng= git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674 h1:75JJ09HPqWi9qm7XD+vV6p5TaCMQgDsae/EbsLiE1t4= git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674/go.mod h1:EXrvDs830GzqhDNTR5TgKVbT3ADRgyUb2pFerwF4rLc= git.zhangdeman.cn/zhangdeman/util v0.0.0-20260105024213-3d76b1bcde5a h1:IGUsWz204BTQlD2l4kenlwJQS4Av2RS2kfUHZ5QVrmw= @@ -54,6 +56,12 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.1 h1:tVBILHy0R6e4wkYOn3XmiITt/hEVH4TFMYvAX2Ytz6k= -- 2.36.6 From c14e5f84dcadcf19ec64e40384c547be00634c1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Mon, 13 Apr 2026 17:27:56 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat:=20=E9=87=8D=E6=96=B0=E8=AE=BE?= =?UTF-8?q?=E8=AE=A1=20delay=20consumer=20producer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- delay/redis_consumer.go | 211 +++++++++++++++++++++++++++++++ delay/redis_delay.go | 129 +++++++++++++++++++ delay/redis_dispatch_consumer.go | 103 --------------- delay/redis_produce.go | 156 ++++++++++++++--------- go.mod | 14 +- go.sum | 26 ++++ 6 files changed, 475 insertions(+), 164 deletions(-) create mode 100644 delay/redis_consumer.go create mode 100644 delay/redis_delay.go delete mode 100644 delay/redis_dispatch_consumer.go diff --git a/delay/redis_consumer.go b/delay/redis_consumer.go new file mode 100644 index 0000000..69eae57 --- /dev/null +++ b/delay/redis_consumer.go @@ -0,0 +1,211 @@ +// Package delay ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-22 18:55 +package delay + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "sync" + "time" + + "git.zhangdeman.cn/zhangdeman/network/util" + "git.zhangdeman.cn/zhangdeman/queue/abstract" + "git.zhangdeman.cn/zhangdeman/queue/define" + "git.zhangdeman.cn/zhangdeman/redis" + "git.zhangdeman.cn/zhangdeman/serialize" + redisV9 "github.com/redis/go-redis/v9" + "github.com/tidwall/gjson" +) + +// NewRedisConsumer 启动一个消费者实例 +func NewRedisConsumer( + ctx context.Context, + queueName string, queueCnt int, redisFlag string, + consumerHandler abstract.IConsumerHandler, +) abstract.IConsumer { + if nil == consumerHandler { + panic("consumer handler instance is nil") + } + if queueName == "" || redisFlag == "" { + panic("init redis producer: queue name or redis flag is empty") + } + // 验证redis实例是否存在 + if _, err := redis.Client.GetRealClientWithError(redisFlag); nil != err { + panic(err.Error()) + } + if queueCnt <= 0 { + queueCnt = 1 // 默认单队列 + } + + if nil == ctx { + ctx = context.Background() + } + c := &redisConsumer{ + lock: &sync.RWMutex{}, + hasStop: false, + ctx: ctx, + stopChan: make(chan bool), + redisFlag: redisFlag, + queueName: queueName, + queueCnt: queueCnt, + consumerHandler: consumerHandler, + } + return c +} + +type redisConsumer struct { + lock *sync.RWMutex + hasStop bool + stopChan chan bool // 停止请求的chan + ctx context.Context // 请求处理上下文 + redisFlag string // redis 标识, 必须是使用统一的 pkg/redis 进行管理的 + queueName string // 队列名称,基础公共前缀,尾部后缀会自动根据 shard cnt 进行哈希 + queueCnt int // 队列数量 + consumerHandler abstract.IConsumerHandler // 消息处理实例 +} + +// Start 启动消费者 +func (r *redisConsumer) Start() error { + var realQueueList []string + for i := 0; i < r.queueCnt; i++ { + realQueueList = append(realQueueList, fmt.Sprintf("%v_%v", r.queueName, i)) + } + + hostname, _ := os.Hostname() + serverInfo := &define.EventConsumerServerInfo{ + SystemTimestamp: time.Now().UnixMilli(), + Hostname: hostname, + HostIp: util.IP.GetHostIP(), + Queue: strings.Join(realQueueList, ","), + } + for { + isFinish := false + select { + case <-r.ctx.Done(): // context被取消(对应程序系统默认行为,一般对应程序停止运行) + if nil != r.lock { + r.lock.Lock() + r.hasStop = true + r.lock.Unlock() + } else { + r.hasStop = true + } + isFinish = true + case <-r.stopChan: // 对应开发者介入处理行为, 主进程不停止运行, 但是消费者要退出 + isFinish = true + default: // 60s 无数据进入下一轮等待, 或者 消费到数据正常处理 + if r.hasStop { + // 已停止 + return nil + } + res := redis.Wrapper.BRPop(r.ctx, r.redisFlag, realQueueList, 60) + if nil != res.Err { + if errors.Is(res.Err, redisV9.Nil) { + // 判断是否为未订阅到消息 + continue + } + r.notifyMessageSubscribeFailure(serverInfo, res.Err) + time.Sleep(time.Second * 10) + continue + } + + // 订阅成功, 判断是否无数据 + if res.Result == "" { + // 直接进入下一轮阻塞等待 + continue + } + + // 订阅到数据, 数据解析 + // BRPOP 返回数据是数组 0: 读取到数据的队列名 1: 读取到的数据 + // 前置redis操作统一序列化, 此处读取到的是一个序列化后的字符串 `["queue_name", "value"]` + serverInfo.Queue = gjson.Get(res.Result, "0").String() + var ( + err error + formatData define.EventData + ) + if err = serialize.JSON.UnmarshalWithNumberForString(gjson.Get(res.Result, "1").String(), &formatData); nil != err { + // 回调数据解析失败的处理函数 + r.notifyMessageUnmarshalFailure(serverInfo, res.Result, err) + continue + } + + // 处理消息(同步或异步) + r.handler(serverInfo, &formatData) + } + if isFinish { + break + } + } + return nil +} + +// notifyMessageSubscribeFailure 通知订阅失败 +func (r *redisConsumer) notifyMessageSubscribeFailure(serverInfo *define.EventConsumerServerInfo, err error) { + defer func() { + // 防一手 panic, 订阅失败回调出现panic, 不回调panic处理方法 + recover() + }() + r.consumerHandler.SubscribeFailureCallback(r.ctx, serverInfo, err) +} + +// 回调消息返序列化失败的处理逻辑 +func (r *redisConsumer) notifyMessageUnmarshalFailure(serverInfo *define.EventConsumerServerInfo, res string, err error) { + defer func() { + // 防一手 panic, 返回劣化失败回调出现panic, 不回调panic处理方法 + recover() + }() + r.consumerHandler.UnmarshalFailureCallback(r.ctx, serverInfo, res, err) +} + +// handler 处理订阅到的数据 +func (r *redisConsumer) handler(serverInfo *define.EventConsumerServerInfo, data *define.EventData) { + // 处理消息(同步或异步) + if r.consumerHandler.Async(r.ctx, serverInfo, data) { + // 异步处理 + go func() { + r.dealMessageData(serverInfo, data) + }() + } else { + r.dealMessageData(serverInfo, data) + } +} + +// dealMessageData 处理消息数据 +func (r *redisConsumer) dealMessageData(serverInfo *define.EventConsumerServerInfo, data *define.EventData) { + defer func() { + // 同步处理 + if err := recover(); nil != err { + // 出现panic, 回调 panic 处理 + r.consumerHandler.PanicCallback(r.ctx, serverInfo, data, err) + } + }() + + // 加锁, 若果不需要加锁, 实现接口时函数体留空即可 + if err := r.consumerHandler.Lock(r.ctx, serverInfo, data); nil != err { + r.consumerHandler.LockFailureCallback(r.ctx, serverInfo, data, err) + return + } + defer func() { + // 释放锁 + if err := r.consumerHandler.Unlock(r.ctx, serverInfo, data); nil != err { + r.consumerHandler.UnlockFailureCallback(r.ctx, serverInfo, data, err) + return + } + }() + // 执行对消息的处理 + if err := r.consumerHandler.MessageLogic(r.ctx, serverInfo, data); nil != err { + r.consumerHandler.MessageLogicFailureCallback(r.ctx, serverInfo, data, err) + } +} + +// Stop 停止消费者 +func (r *redisConsumer) Stop() { + r.stopChan <- true +} diff --git a/delay/redis_delay.go b/delay/redis_delay.go new file mode 100644 index 0000000..53d9032 --- /dev/null +++ b/delay/redis_delay.go @@ -0,0 +1,129 @@ +// Package delay ... +// +// Description : 基于redis实现的延迟事件队列 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-29 09:11 +package delay + +import ( + "context" + "time" + + "git.zhangdeman.cn/zhangdeman/queue/abstract" + "git.zhangdeman.cn/zhangdeman/queue/define" + redisPkg "git.zhangdeman.cn/zhangdeman/redis" + redisPkgDefine "git.zhangdeman.cn/zhangdeman/redis/define" + "git.zhangdeman.cn/zhangdeman/serialize" + "github.com/tidwall/gjson" +) + +// NewRedisDelayEvent 获取延迟队列实例 +// 参数说明: +// - redisFlag: 使用那个redis实例 +// - queueName: 延迟队列名称 +// - pullTimeInterval: 延迟队列扫描的时间间隔, 单位秒 +// - distributeProducer: 消息生产者示例, 扫描到的数据通过此实例向二级任务队列分发 +func NewRedisDelayEvent(redisFlag string, queueName string, pullTimeInterval int64, distributeProducer abstract.IProducer) abstract.IDelayQueue { + if queueName == "" || redisFlag == "" { + panic("init redis delay event: queue name or redis flag is empty") + } + if nil == distributeProducer { + panic("distributeProducer is nil") + } + // 验证redis实例是否存在 + if _, err := redisPkg.Client.GetRealClientWithError(redisFlag); nil != err { + panic(err.Error()) + } + if pullTimeInterval <= 0 { + pullTimeInterval = 60 + } + return &redisDelayEvent{ + stopChan: make(chan bool, 1), + redisFlag: redisFlag, + delayQueueName: queueName, + pullTimeInterval: pullTimeInterval, + distributeProducer: distributeProducer, + } +} + +type redisDelayEvent struct { + stopChan chan bool // 停止请求的chan + redisFlag string // 使用的redis实例 + delayQueueName string // 延迟队列名称 + pullTimeInterval int64 // 延迟队列多久拉取一次数据 + distributeProducer abstract.IProducer // 事件分发的生产者 +} + +// Send 生成一条延时事件 +// 参数说明: +// - delayTime: 延时时长, 单位: s +// - data: 要发送的数据 +func (rde redisDelayEvent) Send(ctx context.Context, delayTime int64, data *define.EventData) *redisPkgDefine.RedisResult { + if delayTime <= 0 { + // 默认延迟 1min + delayTime = 60 + } + if delayTime >= time.Now().Unix() { + // 指定具体的延迟时间, 重置延时时间为相对时间 + delayTime = delayTime - time.Now().Unix() + } + return redisPkg.Wrapper.ZAdd(ctx, rde.redisFlag, rde.delayQueueName, time.Now().Unix()+delayTime, serialize.JSON.MarshalForStringIgnoreError(data)) +} + +// Distribute 到期事件事件分发 +func (rde redisDelayEvent) Distribute(ctx context.Context) { + for { + hasFinish := false + select { + case <-ctx.Done(): + // 收到上下文已完成事件, 退出 + hasFinish = true + case <-rde.stopChan: + // 程序调用stop方法, 退出 + hasFinish = true + default: + now := time.Now() + // 计时器, 固定时长扫描一次 + // ZRangeAndRemByScore 此方法内部已经自动包装了删除逻辑 + eventDataListRes := redisPkg.Wrapper.ZRangeAndRemByScore(ctx, rde.redisFlag, rde.delayQueueName, 0, now.Unix()) + // ZRange 返回数据时二维数组 [["数据部分", "设置的数据score"]] + resList := gjson.Parse(eventDataListRes.Result).Array() + if len(resList) > 0 { + for _, itemData := range resList { + var ( + resFormat define.EventData + err error + ) + + if err = serialize.JSON.UnmarshalWithNumberForString(itemData.Get("0").String(), &resFormat); nil != err { + // 数据解析失败, 触发失败回调 + rde.distributeProducer.GetProducerHandler().FailureCallback(ctx, &define.EventData{ + Type: "UNMARSHAL_FAILURE", + Data: itemData.Get("0").String(), + }, &define.EventProduceResult{ + Queue: "", + Success: false, + Err: err, + Cost: 0, + }) + continue + } else { + // 生产数据, 向耳机队列分发任务 + rde.distributeProducer.Sync(ctx, &resFormat) + } + } + } + // 间隔指定时间拉取一次 + time.Sleep(time.Second * time.Duration(rde.pullTimeInterval)) + } + if hasFinish { + break + } + } +} + +func (rde redisDelayEvent) Stop() { + rde.stopChan <- true +} diff --git a/delay/redis_dispatch_consumer.go b/delay/redis_dispatch_consumer.go deleted file mode 100644 index c90957d..0000000 --- a/delay/redis_dispatch_consumer.go +++ /dev/null @@ -1,103 +0,0 @@ -// Package delay ... -// -// Description : delay ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2022-07-07 10:32 -package delay - -import ( - "context" - "errors" - "time" - - "git.zhangdeman.cn/zhangdeman/serialize" - "github.com/redis/go-redis/v9" -) - -// NewRedisConsumer redis延迟队列消费者 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:10 2022/7/8 -func NewRedisConsumer(redisInstance *redis.Client, cfg *ConsumerConfig) IConsumer { - return &redisConsumer{ - redisInstance: redisInstance, - cfg: cfg, - } -} - -type redisConsumer struct { - redisInstance *redis.Client - cfg *ConsumerConfig -} - -func (r *redisConsumer) Consume(ctx context.Context) ([]*ProduceData, error) { - if nil == ctx { - ctx = context.Background() - } - zRangeResult := r.redisInstance.ZRange(ctx, r.cfg.QueueName, 0, time.Now().UnixNano()/1e6) - if err := zRangeResult.Err(); nil != err { - return make([]*ProduceData, 0), err - } - // 格式化数据 - var ( - result []*ProduceData - ) - valueList := zRangeResult.Val() - if len(valueList) == 0 { - return result, nil - } - for _, item := range valueList { - d := &ProduceData{} - serialize.JSON.UnmarshalWithNumberForStringIgnoreError(item, d) - result = append(result, d) - } - - return result, nil -} - -func (r *redisConsumer) ConsumeWithHandler(ctx context.Context, handler IHandler) error { - if nil == handler { - return errors.New("handler instance is nil") - } - var ( - msgList []*ProduceData - err error - ) - - if msgList, err = r.Consume(ctx); nil != err { - return err - } - - // 未订阅到消息 - if len(msgList) == 0 { - return nil - } - return handler.Handle(msgList) - /* wg := &sync.WaitGroup{} - wg.Add(len(msgList)) - for _, item := range msgList { - - go func(msgData *ZRangeData) { - defer wg.Done() - hashValue, exist := msgData.Data.Data[r.cfg.HashKey] - if !exist || hashValue == nil { - hashValue = msgData.Data.MsgID - } - shard := util.Hash.GetHashIDMod(hashValue, r.cfg.SonQueueCnt) - realQueue := fmt.Sprintf(r.cfg.QueueName+"_%d", shard) - r.redisInstance.LPush() - }(item) - } - wg.Wait() - return nil*/ -} - -type redisConsumerMsgHandler struct { -} - -func (r redisConsumerMsgHandler) Handle(queData []*ZRangeData) error { - panic("implement me") -} diff --git a/delay/redis_produce.go b/delay/redis_produce.go index ccab36d..d412d7c 100644 --- a/delay/redis_produce.go +++ b/delay/redis_produce.go @@ -1,78 +1,116 @@ -// Package delay ... +// Package event ... // -// Description : delay ... +// Description : 基于redis实现事件生产 + 消费 // // Author : go_developer@163.com<白茶清欢> // -// Date : 2022-07-06 17:59 +// Date : 2025-08-22 16:34 package delay import ( "context" - "sync" + "fmt" + "os" "time" + "git.zhangdeman.cn/zhangdeman/network/util" + "git.zhangdeman.cn/zhangdeman/queue/abstract" + "git.zhangdeman.cn/zhangdeman/queue/define" + "git.zhangdeman.cn/zhangdeman/redis" "git.zhangdeman.cn/zhangdeman/serialize" "git.zhangdeman.cn/zhangdeman/wrapper/op_string" - "github.com/redis/go-redis/v9" ) -// NewRedisQueue 获取redis队列实例 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 18:09 2022/7/6 -func NewRedisQueue(redisInstance *redis.Client) IProduce { - return &redisProduce{client: redisInstance} -} - -// withRedis 使用redis实现延迟队列 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 17:59 2022/7/6 -type redisProduce struct { - client *redis.Client -} - -// Produce 生产数据 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 18:03 2022/7/6 -func (rp *redisProduce) Produce(ctx context.Context, data ...*Queue) error { - if len(data) == 0 { - return nil +// NewRedisProducer 获取基于redis的生产者实例 +func NewRedisProducer( + queueName string, queueCnt int, redisFlag string, + producerHandler abstract.IProducerHandler, +) abstract.IProducer { + if queueName == "" || redisFlag == "" { + panic("init redis producer: queue name or redis flag is empty") } - if nil == ctx { - ctx = context.Background() + // 验证redis实例是否存在 + if _, err := redis.Client.GetRealClientWithError(redisFlag); nil != err { + panic(err.Error()) } - - wg := &sync.WaitGroup{} - wg.Add(len(data)) - for _, queueData := range data { - go func(inputQueueData *Queue) { - defer wg.Done() - inputQueueData.err = rp.client.ZAdd(ctx, inputQueueData.Name, rp.buildAddMember(inputQueueData)).Err() - }(queueData) + if queueCnt <= 0 { + queueCnt = 1 // 默认单队列 } - wg.Wait() - return nil -} - -// buildAddMember ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 18:22 2022/7/6 -func (rp *redisProduce) buildAddMember(queueData *Queue) redis.Z { - return redis.Z{ - Score: float64(time.Now().Unix() + queueData.DelayTime), - Member: serialize.JSON.MarshalForStringIgnoreError(&ProduceData{ - MsgID: op_string.RandomMd5().Value, - Timestamp: time.Now().UnixNano() / 1e6, - Host: "", - Data: queueData.Data, - }), + if nil == producerHandler { + producerHandler = abstract.DefaultProducerHandler{} + } + return &redisProducer{ + queueCnt: queueCnt, + queueName: queueName, + redisFlag: redisFlag, + producerHandler: producerHandler, } } + +type redisProducer struct { + redisFlag string // redis 标识, 必须是使用统一的 pkg/redis 进行管理的 + queueName string // 队列名称,基础公共前缀,尾部后缀会自动根据 shard cnt 进行哈希 + queueCnt int // 队列数量 + producerHandler abstract.IProducerHandler // 生产数据的处理逻辑 +} + +// fillEventData 填充一些系统数据数据 +func (r *redisProducer) fillEventData(data *define.EventData) { + if len(data.Key) == 0 { + data.Key = data.MsgID // 不设置和MsgID一致 + } + if len(data.TraceID) == 0 { + data.TraceID = data.Key + } + data.SystemTimestamp = time.Now().UnixMilli() // 系统时间 + if data.Timestamp <= 0 { + data.Timestamp = data.SystemTimestamp + } + data.Hostname, _ = os.Hostname() // 服务器 hostname + data.HostIp = util.IP.GetHostIP() // 服务器IP +} + +// Sync 同步发送 +func (r *redisProducer) Sync(ctx context.Context, data *define.EventData) { + if nil == data { + return + } + r.fillEventData(data) + realQueue := r.getRealQueue(data) + sendRedisRes := redis.Wrapper.LPush(ctx, r.redisFlag, realQueue, serialize.JSON.MarshalForStringIgnoreError(data)) + res := &define.EventProduceResult{ + Queue: realQueue, + Success: sendRedisRes.Err == nil, + Err: sendRedisRes.Err, + Cost: sendRedisRes.UsedTime, + } + if res.Success { + r.producerHandler.SuccessCallback(ctx, data, res) + } else { + r.producerHandler.FailureCallback(ctx, data, res) + } +} + +// Async 异步发送 +func (r *redisProducer) Async(ctx context.Context, data *define.EventData) { + go func() { + defer func() { + // 如果出现panic, 则触发失败回调 + if err := recover(); nil != err { + r.producerHandler.PanicCallback(ctx, data, err) + } + }() + r.Sync(ctx, data) + }() +} + +// GetProducerHandler 获取producer处理器 +func (r *redisProducer) GetProducerHandler() abstract.IProducerHandler { + return r.producerHandler +} + +// getRealQueue 获取真实的队列key +func (r *redisProducer) getRealQueue(data *define.EventData) string { + queueIdx := op_string.HashNumber(data.Key).Value % uint64(r.queueCnt) + return fmt.Sprintf("%v_%v", r.queueName, queueIdx) +} diff --git a/go.mod b/go.mod index 31666e3..80aa224 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,14 @@ module git.zhangdeman.cn/zhangdeman/queue -go 1.24.0 +go 1.25.0 require ( - git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5 + git.zhangdeman.cn/zhangdeman/network v0.0.0-20260406142525-48e87d040519 + git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413092650-825ae95ecb29 git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674 git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20260321023345-6c6e467e3a14 github.com/redis/go-redis/v9 v9.18.0 + github.com/tidwall/gjson v1.18.0 ) require ( @@ -15,9 +17,17 @@ require ( git.zhangdeman.cn/zhangdeman/util v0.0.0-20260105024213-3d76b1bcde5a // indirect github.com/BurntSushi/toml v1.6.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-redis/redismock/v9 v9.2.0 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mozillazg/go-pinyin v0.21.0 // indirect + github.com/mssola/user_agent v0.6.0 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/sbabiv/xml2map v1.2.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/tidwall/match v1.2.0 // indirect + github.com/tidwall/pretty v1.2.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.1 // indirect diff --git a/go.sum b/go.sum index fc6551a..d8ddb3a 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,13 @@ git.zhangdeman.cn/zhangdeman/consts v0.0.0-20260413082525-fb90982b1256 h1:zYkRoH86j+4BoVGCWUY6zA+FIQNmul/c8whmmYK3bT4= git.zhangdeman.cn/zhangdeman/consts v0.0.0-20260413082525-fb90982b1256/go.mod h1:5p8CEKGBxi7qPtTXDI3HDmqKAfIm5i/aBWdrbkbdNjc= +git.zhangdeman.cn/zhangdeman/network v0.0.0-20260406142525-48e87d040519 h1:+895a/Efz/rrYSCXP95VQlIWN2rE2d2PFAevTT0D5To= +git.zhangdeman.cn/zhangdeman/network v0.0.0-20260406142525-48e87d040519/go.mod h1:0UcxxT2AoxJwR3SwsPAqA8Od+qY40kIdD8WU+TT0wYg= git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20251013024601-da007da2fb42 h1:VjYrb4adud7FHeiYS9XA0B/tOaJjfRejzQAlwimrrDc= git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20251013024601-da007da2fb42/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI= git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5 h1:WjPlJ6CoXi2yFl4NGyyCWWpB2KbBP5LH2Hia6Xeecsc= git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5/go.mod h1:y/0RBrC0CipiYJlBZdkvNRU4ROw9wZt+UqzZJBxC7Ng= +git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413092650-825ae95ecb29 h1:o+B0oeJkY2JAiw/zGbavwhrd0KZ6ZpG4LAxMLWaUOwg= +git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413092650-825ae95ecb29/go.mod h1:y/0RBrC0CipiYJlBZdkvNRU4ROw9wZt+UqzZJBxC7Ng= git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674 h1:75JJ09HPqWi9qm7XD+vV6p5TaCMQgDsae/EbsLiE1t4= git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674/go.mod h1:EXrvDs830GzqhDNTR5TgKVbT3ADRgyUb2pFerwF4rLc= git.zhangdeman.cn/zhangdeman/util v0.0.0-20260105024213-3d76b1bcde5a h1:IGUsWz204BTQlD2l4kenlwJQS4Av2RS2kfUHZ5QVrmw= @@ -21,8 +25,12 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU= +github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw= +github.com/go-redis/redismock/v9 v9.2.0/go.mod h1:18KHfGDK4Y6c2R0H38EUGWAdc7ZQS9gfYxc94k7rWT0= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= @@ -31,6 +39,14 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mozillazg/go-pinyin v0.21.0 h1:Wo8/NT45z7P3er/9YSLHA3/kjZzbLz5hR7i+jGeIGao= +github.com/mozillazg/go-pinyin v0.21.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= +github.com/mssola/user_agent v0.6.0 h1:uwPR4rtWlCHRFyyP9u2KOV0u8iQXmS7Z7feTrstQwk4= +github.com/mssola/user_agent v0.6.0/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= @@ -52,6 +68,14 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM= +github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -62,8 +86,10 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/ini.v1 v1.67.1 h1:tVBILHy0R6e4wkYOn3XmiITt/hEVH4TFMYvAX2Ytz6k= gopkg.in/ini.v1 v1.67.1/go.mod h1:x/cyOwCgZqOkJoDIJ3c1KNHMo10+nLGAhh+kn3Zizss= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -- 2.36.6