diff --git a/abstract/IConsumer.go b/abstract/IConsumer.go new file mode 100644 index 0000000..5e31d08 --- /dev/null +++ b/abstract/IConsumer.go @@ -0,0 +1,14 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-22 16:33 +package abstract + +// IConsumer 消费者接口约束 +type IConsumer interface { + Start() error // 启动消费者 + Stop() // 停止消费者 +} diff --git a/abstract/IConsumerHandler.go b/abstract/IConsumerHandler.go new file mode 100644 index 0000000..e9ec2de --- /dev/null +++ b/abstract/IConsumerHandler.go @@ -0,0 +1,28 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-27 08:54 +package abstract + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/queue/define" +) + +// IConsumerHandler 消费者的一些处理逻辑 +type IConsumerHandler interface { + Lock(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) error // 加锁 + Unlock(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) error // 释放锁 + MessageLogic(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) error // 处理订阅到的消息 + PanicCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, e any) // 出现异常panic的回调 + SubscribeFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, err error) // 订阅失败的回调 + UnmarshalFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, msgData string, err error) // 数据解析失败的回调 + Async(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData) bool // 每一条数据可独立判断是否进行异步处理 + LockFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, err error) // 加锁失败的回调 + UnlockFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, err error) // 释放锁失败的回调 + MessageLogicFailureCallback(ctx context.Context, serverInfo *define.EventConsumerServerInfo, data *define.EventData, err error) // 消息数据处理失败的回调 +} diff --git a/abstract/IDelayQueue.go b/abstract/IDelayQueue.go new file mode 100644 index 0000000..52f967e --- /dev/null +++ b/abstract/IDelayQueue.go @@ -0,0 +1,22 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-29 12:08 +package abstract + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/queue/define" + redisPkgDefine "git.zhangdeman.cn/zhangdeman/redis/define" +) + +// IDelayQueue 延迟队列接口约束 +type IDelayQueue interface { + Send(ctx context.Context, delayTime int64, data *define.EventData) *redisPkgDefine.RedisResult // 生产事件 + Distribute(ctx context.Context) // 时间到期分发 + Stop() // 停止延迟队列 +} diff --git a/abstract/IProducer.go b/abstract/IProducer.go new file mode 100644 index 0000000..9e71a66 --- /dev/null +++ b/abstract/IProducer.go @@ -0,0 +1,21 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-22 16:33 +package abstract + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/queue/define" +) + +// IProducer 生产者接口约束 +type IProducer interface { + Sync(ctx context.Context, data *define.EventData) // 同步发送事件 + Async(ctx context.Context, data *define.EventData) // 异步发送事件 + GetProducerHandler() IProducerHandler // 获取生产者处理器 +} diff --git a/abstract/IProducerHandler.go b/abstract/IProducerHandler.go new file mode 100644 index 0000000..38b7ba0 --- /dev/null +++ b/abstract/IProducerHandler.go @@ -0,0 +1,46 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-27 08:54 +package abstract + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/queue/define" +) + +// IProducerHandler 生产者数据处理的接口约束 +type IProducerHandler interface { + Lock(ctx context.Context, data *define.EventData) error // 逻辑加锁 + Unlock(ctx context.Context, data *define.EventData) error // 逻辑释放锁 + SuccessCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) // 成功回调 + FailureCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) // 失败回调 + PanicCallback(ctx context.Context, data *define.EventData, e any) // panic回调 +} + +type DefaultProducerHandler struct { +} + +func (d DefaultProducerHandler) Lock(ctx context.Context, data *define.EventData) error { + return nil +} + +func (d DefaultProducerHandler) Unlock(ctx context.Context, data *define.EventData) error { + return nil +} + +func (d DefaultProducerHandler) SuccessCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) { + return +} + +func (d DefaultProducerHandler) FailureCallback(ctx context.Context, data *define.EventData, result *define.EventProduceResult) { + return +} + +func (d DefaultProducerHandler) PanicCallback(ctx context.Context, data *define.EventData, e any) { + return +} diff --git a/define/data.go b/define/data.go new file mode 100644 index 0000000..cb801c7 --- /dev/null +++ b/define/data.go @@ -0,0 +1,53 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-08-22 16:38 +package define + +// EventData 事件数据 +// - Timestamp: 事件的时间, 毫秒时间戳, 不设置默认当前时间 +// - SystemTimestamp: 系统时间(无需设置,自动填充) +// - TraceID: 事件trace_id, 关联到触发事件的请求, 若不设置和 MsgID 保持一致 +// - Hostname: 发送事件的机器名称(无需设置,自动填充) +// - HostIp: 发送事件的机器IP(无需设置,自动填充) +// - Type: 事件类型, 基于不同的类型, 对data有不同的处理规则 +// - Version: 业务迭代, Data 的数据结构可能发生变化, 使用 version 标记 Data 如何解析, 使用者自行设置于理解 +// - Data: 事件业务数据, 使用者自行设置与理解 +// - Key: 用于哈希队列分片,不设置与MsgID一致 +// - MsgID: 系统随机生成, 每条消息的唯一标识 +type EventData struct { + Timestamp int64 `json:"timestamp"` // 事件的时间, 毫秒时间戳, 不设置默认当前时间 + SystemTimestamp int64 `json:"system_timestamp"` // 系统时间(无需设置,自动填充) + TraceID string `json:"trace_id"` // 事件trace_id, 关联到触发事件的请求, 若不设置和 MsgID 保持一致 + Hostname string `json:"hostname"` // 发送事件的机器名称(无需设置,自动填充) + HostIp string `json:"host_ip"` // 发送事件的机器IP(无需设置,自动填充) + Type string `json:"type"` // 事件类型, 基于不同的类型, 对data有不同的处理规则 + Data string `json:"data"` // 事件业务数据, 统一使用字符串 + Version string `json:"version"` // 业务迭代, Data 的数据结构可能发生变化, 使用 Version 标记 Data 如何解析, 使用者自行设置与理解 + Key string `json:"key"` // 用于哈希队列分片,不设置与MsgID一致 + MsgID string `json:"msg_id"` // 系统随机生成, 表示唯一一条消息 + Processor string `json:"processor"` // 处理器标识, 必须设置!!! +} + +// EventProduceResult 事件生产结果 +// - Queue: 事件消息实际发送到哪一个队列 +// - Success: 事件消息是否发送成功 +// - Err: 事件消息发送失败是的异常信息 +// - Cost: 事件消息发送的耗时 +type EventProduceResult struct { + Queue string `json:"queue"` // 事件发送在哪一个队列 + Success bool `json:"success"` // 事件发送结果 + Err error `json:"err"` // 发送失败时, 失败原因 + Cost int64 `json:"cost"` // 事件发送耗时 +} + +// EventConsumerServerInfo 事件消费的系统信息 +type EventConsumerServerInfo struct { + SystemTimestamp int64 `json:"system_timestamp"` // 系统时间 + Hostname string `json:"hostname"` // 消费事件的机器名称 + HostIp string `json:"host_ip"` // 消费事件的机器IP + Queue string `json:"queue"` // 从哪个队列消费到的数据 +} diff --git a/go.mod b/go.mod index 41b1547..31666e3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.zhangdeman.cn/zhangdeman/queue go 1.24.0 require ( + git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5 git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674 git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20260321023345-6c6e467e3a14 github.com/redis/go-redis/v9 v9.18.0 @@ -18,6 +19,8 @@ require ( github.com/sbabiv/xml2map v1.2.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.1 // indirect gopkg.in/ini.v1 v1.67.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6758bc8..fc6551a 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ git.zhangdeman.cn/zhangdeman/consts v0.0.0-20260413082525-fb90982b1256 h1:zYkRoH git.zhangdeman.cn/zhangdeman/consts v0.0.0-20260413082525-fb90982b1256/go.mod h1:5p8CEKGBxi7qPtTXDI3HDmqKAfIm5i/aBWdrbkbdNjc= git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20251013024601-da007da2fb42 h1:VjYrb4adud7FHeiYS9XA0B/tOaJjfRejzQAlwimrrDc= git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20251013024601-da007da2fb42/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI= +git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5 h1:WjPlJ6CoXi2yFl4NGyyCWWpB2KbBP5LH2Hia6Xeecsc= +git.zhangdeman.cn/zhangdeman/redis v0.0.0-20260413082618-2adbe2bcd3a5/go.mod h1:y/0RBrC0CipiYJlBZdkvNRU4ROw9wZt+UqzZJBxC7Ng= git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674 h1:75JJ09HPqWi9qm7XD+vV6p5TaCMQgDsae/EbsLiE1t4= git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20260112135254-c9ba29f9f674/go.mod h1:EXrvDs830GzqhDNTR5TgKVbT3ADRgyUb2pFerwF4rLc= git.zhangdeman.cn/zhangdeman/util v0.0.0-20260105024213-3d76b1bcde5a h1:IGUsWz204BTQlD2l4kenlwJQS4Av2RS2kfUHZ5QVrmw= @@ -54,6 +56,12 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.1 h1:tVBILHy0R6e4wkYOn3XmiITt/hEVH4TFMYvAX2Ytz6k=