feature/upgrade_redis_queue #1

Merged
zhangdeman merged 2 commits from feature/upgrade_redis_queue into master 2026-04-13 17:38:13 +08:00
8 changed files with 195 additions and 0 deletions
Showing only changes of commit ad44a84718 - Show all commits

14
abstract/IConsumer.go Normal file
View File

@@ -0,0 +1,14 @@
// Package abstract ...
//
// Description : abstract ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-08-22 16:33
package abstract
// IConsumer 消费者接口约束
type IConsumer interface {
Start() error // 启动消费者
Stop() // 停止消费者
}

View File

@@ -0,0 +1,28 @@
// Package abstract ...
//
// Description : abstract ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-08-27 08:54
package abstract
import (
"context"
"git.zhangdeman.cn/zhangdeman/queue/define"
)
// IConsumerHandler 消费者的一些处理逻辑
type IConsumerHandler interface {
Lock(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) error // 加锁
Unlock(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) error // 释放锁
MessageLogic(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) error // 处理订阅到的消息
PanicCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, e any) // 出现异常panic的回调
SubscribeFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, err error) // 订阅失败的回调
UnmarshalFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, msgData string, err error) // 数据解析失败的回调
Async(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) bool // 每一条数据可独立判断是否进行异步处理
LockFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, err error) // 加锁失败的回调
UnlockFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, err error) // 释放锁失败的回调
MessageLogicFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, err error) // 消息数据处理失败的回调
}

22
abstract/IDelayQueue.go Normal file
View File

@@ -0,0 +1,22 @@
// Package abstract ...
//
// Description : abstract ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-08-29 12:08
package abstract
import (
"context"
"git.zhangdeman.cn/zhangdeman/queue/define"
redisPkgDefine "git.zhangdeman.cn/zhangdeman/redis/define"
)
// IDelayQueue 延迟队列接口约束
type IDelayQueue interface {
Send(ctx context.Context, delayTime int64, data *define.EventData) *redisPkgDefine.RedisResult // 生产事件
Distribute(ctx context.Context) // 时间到期分发
Stop() // 停止延迟队列
}

21
abstract/IProducer.go Normal file
View File

@@ -0,0 +1,21 @@
// Package abstract ...
//
// Description : abstract ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-08-22 16:33
package abstract
import (
"context"
"git.zhangdeman.cn/zhangdeman/queue/define"
)
// IProducer 生产者接口约束
type IProducer interface {
Sync(ctx context.Context, data *define.EventData) // 同步发送事件
Async(ctx context.Context, data *define.EventData) // 异步发送事件
GetProducerHandler() IProducerHandler // 获取生产者处理器
}

View File

@@ -0,0 +1,46 @@
// Package abstract ...
//
// Description : abstract ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-08-27 08:54
package abstract
import (
"context"
"git.zhangdeman.cn/zhangdeman/queue/define"
)
// IProducerHandler 生产者数据处理的接口约束
type IProducerHandler interface {
Lock(ctx context.Context, data *define.EventData) error // 逻辑加锁
Unlock(ctx context.Context, data *define.EventData) error // 逻辑释放锁
SuccessCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) // 成功回调
FailureCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) // 失败回调
PanicCallback(ctx context.Context, data *define.EventData, e any) // panic回调
}
type DefaultProducerHandler struct {
}
func (d DefaultProducerHandler) Lock(ctx context.Context, data *define.EventData) error {
return nil
}
func (d DefaultProducerHandler) Unlock(ctx context.Context, data *define.EventData) error {
return nil
}
func (d DefaultProducerHandler) SuccessCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) {
return
}
func (d DefaultProducerHandler) FailureCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) {
return
}
func (d DefaultProducerHandler) PanicCallback(ctx context.Context, data *define.EventData, e any) {
return
}

53
define/data.go Normal file
View File

@@ -0,0 +1,53 @@
// Package define ...
//
// Description : define ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-08-22 16:38
package define
// EventData 事件数据
// - Timestamp: 事件的时间, 毫秒时间戳, 不设置默认当前时间
// - SystemTimestamp: 系统时间(无需设置,自动填充)
// - TraceID: 事件trace_id, 关联到触发事件的请求, 若不设置和 MsgID 保持一致
// - Hostname: 发送事件的机器名称(无需设置,自动填充)
// - HostIp: 发送事件的机器IP(无需设置,自动填充)
// - Type: 事件类型, 基于不同的类型, 对data有不同的处理规则
// - Version: 业务迭代, Data 的数据结构可能发生变化, 使用 version 标记 Data 如何解析, 使用者自行设置于理解
// - Data: 事件业务数据, 使用者自行设置与理解
// - Key: 用于哈希队列分片,不设置与MsgID一致
// - MsgID: 系统随机生成, 每条消息的唯一标识
type EventData struct {
Timestamp int64 `json:"timestamp"` // 事件的时间, 毫秒时间戳, 不设置默认当前时间
SystemTimestamp int64 `json:"system_timestamp"` // 系统时间(无需设置,自动填充)
TraceID string `json:"trace_id"` // 事件trace_id, 关联到触发事件的请求, 若不设置和 MsgID 保持一致
Hostname string `json:"hostname"` // 发送事件的机器名称(无需设置,自动填充)
HostIp string `json:"host_ip"` // 发送事件的机器IP(无需设置,自动填充)
Type string `json:"type"` // 事件类型, 基于不同的类型, 对data有不同的处理规则
Data string `json:"data"` // 事件业务数据, 统一使用字符串
Version string `json:"version"` // 业务迭代, Data 的数据结构可能发生变化, 使用 Version 标记 Data 如何解析, 使用者自行设置与理解
Key string `json:"key"` // 用于哈希队列分片,不设置与MsgID一致
MsgID string `json:"msg_id"` // 系统随机生成, 表示唯一一条消息
Processor string `json:"processor"` // 处理器标识, 必须设置!!!
}
// EventProduceResult 事件生产结果
// - Queue: 事件消息实际发送到哪一个队列
// - Success: 事件消息是否发送成功
// - Err: 事件消息发送失败是的异常信息
// - Cost: 事件消息发送的耗时
type EventProduceResult struct {
Queue string `json:"queue"` // 事件发送在哪一个队列
Success bool `json:"success"` // 事件发送结果
Err error `json:"err"` // 发送失败时, 失败原因
Cost int64 `json:"cost"` // 事件发送耗时
}
// EventConsumerServerInfo 事件消费的系统信息
type EventConsumerServerInfo struct {
SystemTimestamp int64 `json:"system_timestamp"` // 系统时间
Hostname string `json:"hostname"` // 消费事件的机器名称
HostIp string `json:"host_ip"` // 消费事件的机器IP
Queue string `json:"queue"` // 从哪个队列消费到的数据
}

3
go.mod
View File

@@ -3,6 +3,7 @@ module git.zhangdeman.cn/zhangdeman/queue
go 1.24.0
require (
git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20260321023345-6c6e467e3a14
github.com/redis/go-redis/v9 v9.18.0
@@ -18,6 +19,8 @@ require (
github.com/sbabiv/xml2map v1.2.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.1 // indirect
gopkg.in/ini.v1 v1.67.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

8
go.sum
View File

@@ -2,6 +2,8 @@ git.zhangdeman.cn/zhangdeman/consts v0.0.0-20260413082525-fb90982b1256 h1:zYkRoH
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20260413082525-fb90982b1256/go.mod h1:5p8CEKGBxi7qPtTXDI3HDmqKAfIm5i/aBWdrbkbdNjc=
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20251013024601-da007da2fb42 h1:VjYrb4adud7FHeiYS9XA0B/tOaJjfRejzQAlwimrrDc=
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20251013024601-da007da2fb42/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI=
git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5 h1:WjPlJ6CoXi2yFl4NGyyCWWpB2KbBP5LH2Hia6Xeecsc=
git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5/go.mod h1:y/0RBrC0CipiYJlBZdkvNRU4ROw9wZt+UqzZJBxC7Ng=
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674 h1:75JJ09HPqWi9qm7XD+vV6p5TaCMQgDsae/EbsLiE1t4=
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674/go.mod h1:EXrvDs830GzqhDNTR5TgKVbT3ADRgyUb2pFerwF4rLc=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20260105024213-3d76b1bcde5a h1:IGUsWz204BTQlD2l4kenlwJQS4Av2RS2kfUHZ5QVrmw=
@@ -54,6 +56,12 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.67.1 h1:tVBILHy0R6e4wkYOn3XmiITt/hEVH4TFMYvAX2Ytz6k=