消息能力升级 #1

Merged
zhangdeman merged 17 commits from feature/upgrade into master 2024-07-17 15:45:22 +08:00
20 changed files with 867 additions and 810 deletions

119
abstract/IEvent.go Normal file
View File

@ -0,0 +1,119 @@
// Package abstract ...
//
// Description : abstract ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-03-11 12:02
package abstract
import (
"context"
"git.zhangdeman.cn/zhangdeman/event/define"
)
// EventHandler 事件数据处理函数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:08 2024/6/26
type EventHandler func(eventData *define.EventData) (map[string]any, error)
// SendFailCallback 发送事件失败的回调
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:31 2024/6/26
type SendFailCallback func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error)
// SendSuccessCallback 发送事件成功的回调
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:32 2024/6/26
type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult)
// ConsumeFailCallbackHandler 时间处理成功回调
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:18 2024/6/26
type ConsumeFailCallbackHandler func(eventData *define.EventData, handleResult map[string]any, err error)
// ConsumeSuccessCallback 时间处理失败回调
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:20 2024/6/26
type ConsumeSuccessCallback func(eventData *define.EventData, handleResult map[string]any)
// PanicCallback panic回调, 根据不同异常类型, eventData / handleResult 均可能为nil
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:07 2024/6/26
type PanicCallback func(err any, eventData *define.EventData, handleResult map[string]any)
// EventParseFailCallback ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:48 2024/6/27
type EventParseFailCallback func(err error, eventData string)
// IEvent 事件接口定义
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 19:08 2023/8/14
type IEvent interface {
// SendEvent 发送事件(同步)
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:04 2024/3/11
SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error)
// SendEventAsync 发送事件(异步)
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:58 2024/6/25
SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback)
// GetConsumeEventChan 或去消息消费的channel, 自行实现消费
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:11 2024/6/26
GetConsumeEventChan() (<-chan *define.EventData, error)
// ConsumeEvent 消费事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:05 2024/3/11
ConsumeEvent(handler EventHandler, successCallback ConsumeSuccessCallback, failureCallback ConsumeFailCallbackHandler) error
// Destroy 事件实例销毁时, 执行的方法
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:05 2024/3/11
Destroy()
// DriverType 获取驱动类型
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:06 2024/3/11
DriverType() string
// SetPanicCallback 设置失败回调的处理函数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:39 2024/6/27
SetPanicCallback(panicCallback PanicCallback)
// SetEventParseFailCallback 设置数据解析失败的处理函数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:51 2024/6/27
SetEventParseFailCallback(parseFailCallbackCallback EventParseFailCallback)
}

View File

@ -1,62 +0,0 @@
// Package abstract ...
//
// Description : 事件发送前预处理接口约束
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2023-02-01 14:06
package abstract
import (
"net/http"
)
// IPreSendHandler ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:06 2023/2/1
type IPreSendHandler interface {
// GetRequestParam 构造 base info 时, 可能需要从请求参数中提取公共数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:13 2023/2/1
GetRequestParam() map[string]interface{}
// GetRequestHeader ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:14 2023/2/1
// 构造 base info 时, 可能需要从请求参数中提取公共数据
GetRequestHeader() http.Header
// GetResponseData 响应数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:15 2023/2/1
GetResponseData() map[string]interface{}
// GetExtensionData 获取扩展数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:19 2023/2/1
GetExtensionData() map[string]interface{}
// GetEventData 获取事件数据, 建议返回结构体或者结构体指针, 内部会自动补齐缺失的数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:16 2023/2/1
GetEventData() interface{}
// NeedSend 判断是否需要发送事件, 若不需要发送, 可在第一个返回值中记录不需要发送的原因
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:24 2023/2/2
NeedSend() (map[string]interface{}, bool)
}

View File

@ -1,47 +0,0 @@
// Package abstract ...
//
// Description : abstract ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2023-02-01 14:21
package abstract
import "git.zhangdeman.cn/zhangdeman/event"
// ISendEventHandler 发送事件处理器
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:21 2023/2/1
type ISendEventHandler interface {
// Send ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:21 2023/2/1
// 事件发送成功之后, 可以返回一些业务数据, 这些业务数据会回调给SuccessCallback
// 事件发送成功之后, 可以返回一些业务数据 以及 err, 这些业务数据会回调给FailCallback
Send(data []byte) (map[string]interface{}, error)
// SuccessCallback 事件发送成功的回调
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:21 2023/2/1
SuccessCallback(data map[string]interface{})
// FailCallback 事件发送失败的回调
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:22 2023/2/1
FailCallback(data map[string]interface{}, err error)
// NoSendCallback 不需要发送事件的回调
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:26 2023/2/2
NoSendCallback(data interface{}, res event.SendResult)
}

42
base.go Normal file
View File

@ -0,0 +1,42 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-07-17 12:31
package event
import (
"git.zhangdeman.cn/zhangdeman/event/abstract"
"git.zhangdeman.cn/zhangdeman/event/define"
)
type base struct {
panicCallback abstract.PanicCallback
parseFailCallback abstract.EventParseFailCallback
}
// SetPanicCallback 出现任何panic的回调
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:02 2024/6/26
func (b *base) SetPanicCallback(panicCallback abstract.PanicCallback) {
if nil == panicCallback {
panicCallback = define.DefaultPanicCallback
}
b.panicCallback = panicCallback
}
// SetEventParseFailCallback 设置事件解析失败回回调函数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:42 2024/6/27
func (b *base) SetEventParseFailCallback(parseFailCallbackCallback abstract.EventParseFailCallback) {
if nil == parseFailCallbackCallback {
parseFailCallbackCallback = define.DefaultParseFailCallbackFunc
}
b.parseFailCallback = parseFailCallbackCallback
}

View File

@ -1,42 +0,0 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2023-02-02 16:36
package event
import "net/http"
// DefaultPreSendHandler IPreSendHandler 默认实现
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:36 2023/2/2
type DefaultPreSendHandler struct {
}
func (d *DefaultPreSendHandler) GetRequestParam() map[string]interface{} {
return map[string]interface{}{}
}
func (d *DefaultPreSendHandler) GetRequestHeader() http.Header {
return make(map[string][]string, 0)
}
func (d *DefaultPreSendHandler) GetResponseData() map[string]interface{} {
return map[string]interface{}{}
}
func (d *DefaultPreSendHandler) GetExtensionData() map[string]interface{} {
return map[string]interface{}{}
}
func (d *DefaultPreSendHandler) GetEventData() interface{} {
return map[string]interface{}{}
}
func (d *DefaultPreSendHandler) NeedSend() (map[string]interface{}, bool) {
return map[string]interface{}{}, true
}

View File

@ -1,27 +0,0 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2023-02-02 16:39
package event
type DefaultSendEventHandler struct {
}
func (d *DefaultSendEventHandler) Send(data []byte) (map[string]interface{}, error) {
return map[string]interface{}{}, nil
}
func (d *DefaultSendEventHandler) SuccessCallback(data map[string]interface{}) {
}
func (d *DefaultSendEventHandler) FailCallback(data map[string]interface{}, err error) {
}
func (d *DefaultSendEventHandler) NoSendCallback(data interface{}, res SendResult) {
}

109
define.go
View File

@ -1,109 +0,0 @@
// Package event ...
//
// Description : 各种常量定义
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2023-02-01 14:23
package event
import "reflect"
const (
// OutEventTag 事件数据输出key的标签
OutEventTag = "event"
// JsonTag json输出的标签
JsonTag = "json"
// IgnoreTagValue 不做输出的标签值
IgnoreTagValue = "-"
// MappingTag 参数映射标签
MappingTag = "mapping"
// PriorityTag 处理数据时, mapping数据查找的优先级, 默认值 : field
PriorityTag = "priority"
// OmitemptyTag ...
OmitemptyTag = "omitempty"
)
const (
// MappingLocationAll 自动探测所有路径
MappingLocationAll = "all"
// MappingLocationParam 从参数读取
MappingLocationParam = "param"
// MappingLocationHeader 从请求header读取
MappingLocationHeader = "header"
// MappingLocationResponse 从响应数据读取
MappingLocationResponse = "response"
// MappingLocationExtension 从扩展数据读取
MappingLocationExtension = "extension"
)
var (
mappingLocationList = []string{
MappingLocationHeader,
MappingLocationParam,
MappingLocationResponse,
MappingLocationExtension,
}
)
// SetMappingLocationList 只持续该默认的查找优先级
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:17 2023/2/1
func SetMappingLocationList(locationList []string) {
if len(locationList) > 0 {
mappingLocationList = locationList
}
}
// MappingRuleItem 规则
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:36 2023/2/1
type MappingRuleItem struct {
Location string `json:"location"` // 数据所在位置, header-请求头 param-参数获取 response-响应数据获取 extension-扩展数据读取 all-自动按照header/param/response/extension的顺序查询
Field string `json:"field"` // 查询的字段名称
}
// StructField 结构体字段信息
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:35 2023/2/1
type StructField struct {
Idx int // 字段在结构体的索引
Name string // 字段名
Type reflect.Kind // 字段类型
IsPtr bool // 是否为指针
JsonTag string // json标签
EventTag string // 事件标签
MappingRuleList []MappingRuleItem // 规则列表
}
// StructInfo 结构体信息
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:17 2023/2/1
type StructInfo struct {
Flag string // 结构体标识
IsStruct bool // 是否为结构体
IsStructPtr bool // 是否为结构体指针
StructFieldList []*StructField // 结构体字段列表
}
// SendResult ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:27 2023/2/2
type SendResult struct {
IsNeedSend bool `json:"is_need_send"` // 是否需要发送
NoSendDetail map[string]interface{} `json:"no_send_detail"` // 未发送详细原因
Data interface{} `json:"data"` // 发送的数据
IsSuccess bool `json:"is_success"` // 是否处理成功
FailReason string `json:"fail_reason"` // 失败原因
SendResult map[string]interface{} `json:"send_result"` // 发送结果
}

14
define/consts.go Normal file
View File

@ -0,0 +1,14 @@
// Package define ...
//
// Description : define ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-03-11 12:06
package define
const (
DriverTypeEtcd = "ETCD" // etcd驱动
DriverTypeKafka = "KAFKA" // kafka驱动
DriverTypeREDIS = "REDIS" // redis驱动
)

37
define/data.go Normal file
View File

@ -0,0 +1,37 @@
// Package define ...
//
// Description : define ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-03-11 11:40
package define
// EventData ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:41 2024/3/11
type EventData struct {
EventType string `json:"event_type"` // 事件类型
TraceID string `json:"trace_id"` // 事件追踪ID
Host string `json:"host"` // 触发事件host
Timestamp int64 `json:"timestamp"` // 触发时间,纳秒级时间戳
SystemTimestamp int64 `json:"system_timestamp"` // 发送时的系统时间
Key string `json:"key"` // 会基于当前值进行hash, 决定消息分区, 不指定则随机生成
Data any `json:"data"` // 发送的数据
}
// SendResult 发送结果
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:56 2024/6/25
type SendResult struct {
Data *EventData `json:"data"` // 发送的数据
Partition int `json:"partition_num"` // 分区索引编号
Topic string `json:"topic"` // 使用的真实topic
IsSuccess bool `json:"is_success"` // 是否发送成功
FailReason string `json:"fail_reason"` // 失败原因
Extension map[string]any `json:"extension"` // 扩展数据
}

59
define/handler.go Normal file
View File

@ -0,0 +1,59 @@
// Package define ...
//
// Description : define ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-06-26 17:21
package define
import "context"
// DefaultSuccessCallbackHandler ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:21 2024/6/26
func DefaultSuccessCallbackHandler(eventData *EventData, handleResult map[string]any) {
}
// DefaultFailCallbackHandler ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:11 2024/6/26
func DefaultFailCallbackHandler(eventData *EventData, handleResult map[string]any, err error) {
}
// DefaultPanicCallback ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:11 2024/6/26
func DefaultPanicCallback(err any, eventData *EventData, handleResult map[string]any) {}
// DefaultSendFailCallback ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:45 2024/6/27
func DefaultSendFailCallback(ctx context.Context, eventData *EventData, eventResult *SendResult, err error) {
}
// DefaultSendSuccessCallback ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:46 2024/6/27
func DefaultSendSuccessCallback(ctx context.Context, eventResult *SendResult) {}
// DefaultParseFailCallbackFunc ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:49 2024/6/27
func DefaultParseFailCallbackFunc(err error, eventData string) {
}

23
define/memory.go Normal file
View File

@ -0,0 +1,23 @@
// Package define ...
//
// Description : define ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-07-17 11:25
package define
const (
DefaultMemoryChannelSize = 1024 // 默认的消息channel大小
DefaultMemoryCloseMaxWaitTime = 5000 // 默认最大等待 : 5s
)
// MemoryEventConfig 内存事件的配置
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:26 2024/7/17
type MemoryEventConfig struct {
MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小
CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms
}

27
define/redis.go Normal file
View File

@ -0,0 +1,27 @@
// Package define ...
//
// Description : define ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-06-25 16:24
package define
const (
DefaultRedisPartitionNum = 1
DefaultRedisTopic = "EVENT_TOPIC_C6DBE0AAE846C5C0DE35802107326B1E"
DefaultRedisMessageBufferSize = 1024
DefaultRedisCloseMaxWaitTime = 5000 // 默认最大等待 : 5s
)
// RedisEventPubSubConfig redis事件配置
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:25 2024/6/25
type RedisEventPubSubConfig struct {
Topic string `json:"topic"` // topic key, 不指定随机生成
PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1
MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小
CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms
}

17
go.mod
View File

@ -5,20 +5,25 @@ go 1.21
toolchain go1.22.1 toolchain go1.22.1
require ( require (
git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253 git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de
github.com/tidwall/gjson v1.17.1 git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50
github.com/redis/go-redis/v9 v9.5.3
) )
require ( require (
github.com/Jeffail/gabs v1.4.0 // indirect git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 // indirect
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect
github.com/BurntSushi/toml v1.4.0 // indirect
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-ini/ini v1.67.0 // indirect github.com/go-ini/ini v1.67.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mozillazg/go-pinyin v0.20.0 // indirect github.com/mozillazg/go-pinyin v0.20.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tidwall/gjson v1.17.1 // indirect
github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect

48
go.sum
View File

@ -1,45 +1,53 @@
git.zhangdeman.cn/zhangdeman/util v0.0.0-20230113095943-b4b3e261e0c4 h1:1WclY9P8l8o/NZ3ZR/mupm8LtowjQ/Q4UNGXR32f0OQ= git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 h1:+o+BI5GGlwJelPLWL8ciDJqxw/G8dv+FU6OztGSnEjo=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20230113095943-b4b3e261e0c4/go.mod h1:zTir/0IWdK3E7n0GiaogyWHADAQnBtTdl2I6Z2/OPqw= git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253 h1:GO3oZa5a2sqwAzGcLDJtQzmshSWRmoP7IDS8bwFqvC4= git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de h1:ksjcMHupU0Bw0BJxJp3dajmWqGdqV7k2eVohN5O3S9Q=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI= git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k=
github.com/Jeffail/gabs v1.4.0 h1://5fYRRTq1edjfIrQGvdkcd22pkYUrHZ5YC/H2GJVAo= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 h1:I/wOsRpCSRkU9vo1u703slQsmK0wnNeZzsWQOGtIAG0=
github.com/Jeffail/gabs v1.4.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211/go.mod h1:SrtvrQRdzt+8KfYzvosH++gWxo2ShPTzR1m3VQ6uX7U=
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 h1:gUDlQMuJ4xNfP2Abl1Msmpa3fASLWYkNlqDFF/6GN0Y=
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI=
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd h1:2Y37waOVCmVvx0Rp8VGEptE2/2JVMImtxB4dKKDk/3w=
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd/go.mod h1:6+7whkCmb4sJDIfH3HxNuXRveaM0gCCNWd2uXZqNtIE=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e h1:Q973S6CcWr1ICZhFI1STFOJ+KUImCl2BaIXm6YppBqI=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI=
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50 h1:olo34i2Gq5gX7bYPv5TR4X5l5CrYFtu9UCElkYlmL2c=
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50/go.mod h1:US/pcq2vstE3iyxIHf53w8IeXKkZys7bj/ozLWkRYeE=
github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ=
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-ini/ini v1.66.6 h1:h6k2Bb0HWS/BXXHCXj4QHjxPmlIU4NK+7MuLp9SD+4k= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/go-ini/ini v1.66.6/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ=
github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo=
github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U=
github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

180
memory.go Normal file
View File

@ -0,0 +1,180 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-07-17 11:21
package event
import (
"context"
"errors"
"fmt"
"git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/event/abstract"
"git.zhangdeman.cn/zhangdeman/event/define"
)
var (
MemoryEventClient abstract.IEvent
)
// InitMemoryEvent 初始化事件实例
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:24 2024/7/17
func InitMemoryEvent(cfg *define.MemoryEventConfig) {
instance := &MemoryEvent{
base: &base{
panicCallback: define.DefaultPanicCallback,
parseFailCallback: define.DefaultParseFailCallbackFunc,
},
}
instance.Init(cfg)
MemoryEventClient = instance
}
// MemoryEvent 基于内存实现的消息队列(仅适用于单机)
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:22 2024/7/17
type MemoryEvent struct {
*base
cfg *define.MemoryEventConfig
messageChannel chan *define.EventData
}
// SendEvent 发送事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:28 2024/7/17
func (m *MemoryEvent) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) {
if nil == eventData {
return nil, errors.New("event data is nil")
}
m.messageChannel <- eventData
return &define.SendResult{
Data: eventData,
Partition: 0,
Topic: "",
IsSuccess: true,
FailReason: "success",
Extension: nil,
}, nil
}
// SendEventAsync 异步发送事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:29 2024/7/17
func (m *MemoryEvent) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) {
if nil == sendFailCallback {
sendFailCallback = define.DefaultSendFailCallback
}
if nil == sendSuccessCallback {
sendSuccessCallback = define.DefaultSendSuccessCallback
}
go func() {
var (
res *define.SendResult
err error
)
defer func() {
if r := recover(); nil != r {
m.panicCallback(err, eventData, map[string]any{
"send_result": res,
})
}
}()
res, err = m.SendEvent(ctx, eventData)
if nil != err {
sendFailCallback(ctx, eventData, res, err)
return
} else {
sendSuccessCallback(ctx, res)
}
}()
}
// GetConsumeEventChan 消费的消息chan
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:41 2024/7/17
func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) {
return m.messageChannel, nil
}
// ConsumeEvent 消费事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:14 2024/7/17
func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error {
messageChan, _ := m.GetConsumeEventChan()
if nil == successCallback {
successCallback = define.DefaultSuccessCallbackHandler
}
if nil == failureCallback {
failureCallback = define.DefaultFailCallbackHandler
}
go func() {
defer func() {
if r := recover(); nil != r {
fmt.Println("出现异常", r)
}
}()
for eventData := range messageChan {
var (
err error
handleResult map[string]any
)
if handleResult, err = handler(eventData); nil != err {
failureCallback(eventData, handleResult, err)
} else {
successCallback(eventData, handleResult)
}
}
}()
return nil
}
// Destroy 实例销毁
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:17 2024/7/17
func (m *MemoryEvent) Destroy() {
return
}
func (m *MemoryEvent) DriverType() string {
return consts.EventDriverMemory
}
// Init 初始化
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:08 2024/7/17
func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) {
if nil == cfg {
cfg = &define.MemoryEventConfig{}
}
if cfg.MessageBufferSize <= 0 {
cfg.MessageBufferSize = define.DefaultMemoryChannelSize
}
if cfg.CloseMaxWaitTime <= 0 {
cfg.CloseMaxWaitTime = define.DefaultMemoryCloseMaxWaitTime
}
m.cfg = cfg
// 初始化内存 channel
m.messageChannel = make(chan *define.EventData, cfg.MessageBufferSize)
}

50
memory_test.go Normal file
View File

@ -0,0 +1,50 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-07-17 14:17
package event
import (
"context"
"encoding/json"
"fmt"
"git.zhangdeman.cn/zhangdeman/event/define"
"testing"
"time"
)
func TestInitMemoryEvent(t *testing.T) {
InitMemoryEvent(&define.MemoryEventConfig{
MessageBufferSize: 1024,
CloseMaxWaitTime: 5000,
})
go func() {
for {
time.Sleep(time.Second)
MemoryEventClient.SendEventAsync(context.Background(), &define.EventData{
EventType: "TEST",
TraceID: time.Now().Format("2006-01-02 15:04:05"),
Host: "",
Timestamp: time.Now().Unix(),
SystemTimestamp: 0,
Key: "",
Data: nil,
}, func(ctx context.Context, eventResult *define.SendResult) {
fmt.Println("消息发送成功")
}, func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error) {
fmt.Println("消息发送失败")
})
}
}()
MemoryEventClient.ConsumeEvent(func(eventData *define.EventData) (map[string]any, error) {
byteData, _ := json.Marshal(eventData)
fmt.Println(string(byteData))
return map[string]any{}, nil
}, nil, nil)
for {
}
}

258
redis_pub_sub.go Normal file
View File

@ -0,0 +1,258 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-06-25 16:06
package event
import (
"context"
"errors"
"fmt"
"git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/event/abstract"
"git.zhangdeman.cn/zhangdeman/event/define"
"git.zhangdeman.cn/zhangdeman/serialize"
"git.zhangdeman.cn/zhangdeman/wrapper"
"github.com/redis/go-redis/v9"
"time"
)
var (
RedisEventPubSubClient abstract.IEvent
)
// InitRedisPubSubEvent 初始化redis事件驱动, 基于 pub / sub 指令
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:24 2024/6/25
func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) {
instance := &RedisEventPubSub{
base: &base{
panicCallback: define.DefaultPanicCallback,
parseFailCallback: define.DefaultParseFailCallbackFunc,
},
redisClient: redisClient,
pubSubConfig: pubSubConfig,
}
instance.SetRedisClient(redisClient, pubSubConfig)
instance.StartConsumer() // 启动消费者
RedisEventPubSubClient = instance
}
// RedisEventPubSub 基于redis的事件驱动
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:07 2024/6/25
type RedisEventPubSub struct {
*base
redisClient *redis.Client // redis客户端
pubSubConfig *define.RedisEventPubSubConfig // 事件配置
messageChan chan *define.EventData // 消息队列
stopConsumer chan bool // 停止消费者
isStop bool // 是否已停止
parseFailCallback abstract.EventParseFailCallback // 数据解析失败回调
}
// SendEvent 发布时间
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 10:58 2024/6/27
func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) {
partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(r.pubSubConfig.PartitionNum))
realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, partition)
res := r.redisClient.Publish(ctx, realTopic, eventData)
if nil == res {
return nil, errors.New("can not get send event result")
}
return &define.SendResult{
Data: eventData,
Partition: partition,
Topic: realTopic,
IsSuccess: res.Err() == nil,
FailReason: wrapper.TernaryOperator.String(res.Err() == nil, "success", wrapper.String(res.Err().Error())).Value(),
Extension: nil,
}, res.Err()
}
// SendEventAsync 异步发送事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:29 2024/6/27
func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) {
go func() {
if nil == sendFailCallback {
sendFailCallback = define.DefaultSendFailCallback
}
if nil == sendSuccessCallback {
sendSuccessCallback = define.DefaultSendSuccessCallback
}
var (
sendResult *define.SendResult
handlerErr error
)
defer func() {
if panicErr := recover(); nil != panicErr {
r.panicCallback(panicErr, eventData, map[string]any{
"send_result": sendResult,
})
}
}()
if sendResult, handlerErr = r.SendEvent(ctx, eventData); nil != handlerErr {
sendFailCallback(ctx, eventData, sendResult, handlerErr)
} else {
sendSuccessCallback(ctx, sendResult)
}
}()
}
// GetConsumeEventChan 获取消息channel, 自行实现消费
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:13 2024/6/26
func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, error) {
if r.isStop {
return nil, errors.New("event instance has stop")
}
return r.messageChan, nil
}
// ConsumeEvent 获取数据消费实例
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:06 2024/6/26
func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error {
if nil == handler {
return errors.New("handler is nil")
}
var (
messageChan <-chan *define.EventData
err error
)
if messageChan, err = r.GetConsumeEventChan(); nil != err {
return err
}
if nil == failureCallback {
failureCallback = define.DefaultFailCallbackHandler
}
if nil == successCallback {
successCallback = define.DefaultSuccessCallbackHandler
}
for eventData := range messageChan {
handlerResult, handleErr := handler(eventData)
if nil != handleErr {
failureCallback(eventData, handlerResult, err)
} else {
successCallback(eventData, handlerResult)
}
}
return nil
}
// StartConsumer 启动消费者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:36 2024/6/27
func (r *RedisEventPubSub) StartConsumer() {
for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ {
go func(realPartition int) {
defer func() {
if panicErr := recover(); nil != panicErr {
r.panicCallback(panicErr, nil, nil)
}
}()
// 启动消费者
realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, realPartition)
messageChannel := r.redisClient.Subscribe(context.Background(), realTopic).Channel()
for message := range messageChannel {
var (
eventData define.EventData
err error
)
if err = serialize.JSON.UnmarshalWithNumber([]byte(message.Payload), &eventData); nil != err {
// TODO : 事件数据解析失败的回调
continue
}
r.messageChan <- &eventData
}
}(partition)
}
}
// Destroy 销毁事件实例
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:42 2024/6/26
func (r *RedisEventPubSub) Destroy() {
if r.isStop {
// 已停止
return
}
r.stopConsumer <- true // 停止消费者
messageChan := make(chan bool, 1)
go func() {
for {
if len(r.messageChan) == 0 {
messageChan <- true
return
}
}
}()
select {
case <-time.After(time.Millisecond * time.Duration(r.pubSubConfig.CloseMaxWaitTime)):
// 定时器到期
return
case <-messageChan:
// 没有待消费数据
return
}
}
func (r *RedisEventPubSub) DriverType() string {
return consts.EventDriverRedis
}
// SetRedisClient 设置redis客户端
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:30 2024/6/25
func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) {
if nil == pubSubConfig {
pubSubConfig = &define.RedisEventPubSubConfig{
Topic: "",
PartitionNum: 0,
MessageBufferSize: 0,
}
}
if len(pubSubConfig.Topic) == 0 {
pubSubConfig.Topic = define.DefaultRedisTopic
}
if pubSubConfig.PartitionNum <= 0 {
pubSubConfig.PartitionNum = define.DefaultRedisPartitionNum
}
if pubSubConfig.MessageBufferSize <= 0 {
r.pubSubConfig.MessageBufferSize = define.DefaultRedisMessageBufferSize
}
if pubSubConfig.CloseMaxWaitTime <= 0 {
r.pubSubConfig.CloseMaxWaitTime = define.DefaultRedisCloseMaxWaitTime
}
r.redisClient = redisClient
r.pubSubConfig = pubSubConfig
r.stopConsumer = make(chan bool, 1)
r.messageChan = make(chan *define.EventData, r.pubSubConfig.MessageBufferSize)
}

19
redis_pub_sub_test.go Normal file
View File

@ -0,0 +1,19 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-06-25 16:51
package event
import (
"fmt"
"git.zhangdeman.cn/zhangdeman/wrapper"
"strings"
"testing"
)
func TestInitRedisPubSubEvent(t *testing.T) {
fmt.Println(strings.ToUpper(wrapper.StringFromRandom(128, "").Md5().Value))
}

View File

@ -1,440 +0,0 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2023-02-01 14:30
package event
import (
"encoding/json"
"git.zhangdeman.cn/zhangdeman/event/abstract"
"git.zhangdeman.cn/zhangdeman/util"
"github.com/tidwall/gjson"
"net/http"
"reflect"
"strings"
"sync"
)
var (
// ReflectTypeInstance 反射实例
ReflectTypeInstance *ReflectType
ReflectValueInstance *ReflectValue
)
func init() {
ReflectTypeInstance = &ReflectType{
lock: &sync.RWMutex{},
cacheTable: make(map[string]*StructInfo),
}
ReflectValueInstance = &ReflectValue{}
}
// ReflectType 反射数据类型
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:31 2023/2/1
type ReflectType struct {
// 数据锁
lock *sync.RWMutex
// 反射结果缓存
cacheTable map[string]*StructInfo
}
// Do 反射获取数据类型
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:34 2023/2/1
//
// 为特定结构体生成全局唯一的标识, 并进行缓存, 加速反射结果获取
func (rt *ReflectType) Do(dataFlag string, data interface{}) *StructInfo {
rt.lock.Lock()
defer rt.lock.Unlock()
if cacheResult, exist := rt.cacheTable[dataFlag]; exist {
// 缓存存在, 直接是有缓存结果
return cacheResult
}
// 缓存不存在, 解析
res := &StructInfo{
Flag: dataFlag,
IsStruct: false,
IsStructPtr: false,
StructFieldList: make([]*StructField, 0),
}
isPtr := false
reflectType := reflect.TypeOf(data)
if reflectType.Kind() == reflect.Ptr {
isPtr = true
reflectType = reflectType.Elem()
}
if reflectType.Kind() != reflect.Struct {
// 非结构体,无需反射
rt.cacheTable[dataFlag] = res
return res
}
res.IsStruct = true
res.IsStructPtr = isPtr
for idx := 0; idx < reflectType.NumField(); idx++ {
field := &StructField{
Idx: idx,
Name: reflectType.Field(idx).Name,
JsonTag: reflectType.Field(idx).Tag.Get(JsonTag),
EventTag: reflectType.Field(idx).Tag.Get(OutEventTag),
MappingRuleList: make([]MappingRuleItem, 0),
}
rt.fillFieldType(field, reflectType.Field(idx).Type)
rt.fillMappingRule(field, reflectType.Field(idx).Tag.Get(MappingTag))
res.StructFieldList = append(res.StructFieldList, field)
}
rt.cacheTable[dataFlag] = res
return res
}
// fillFieldType 填充字段类型
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 20:46 2023/2/1
func (rt *ReflectType) fillFieldType(field *StructField, dataType reflect.Type) {
switch dataType.Kind() {
case reflect.Float32:
fallthrough
case reflect.Float64:
field.Type = reflect.Float64
case reflect.String:
field.Type = reflect.String
case reflect.Struct:
field.Type = reflect.Struct
case reflect.Slice:
field.Type = reflect.Slice
case reflect.Map:
field.Type = reflect.Map
case reflect.Ptr:
field.IsPtr = true
// 指针再次判断基础类型
field.Type = dataType.Elem().Kind()
default:
if strings.Contains(dataType.String(), "int") {
field.Type = reflect.Int64
} else {
field.Type = reflect.Interface
}
}
}
// fillTagInfo 填充标签信息
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:11 2023/2/1
func (rt *ReflectType) fillTagInfo(field *StructField) {
if len(field.JsonTag) == 0 {
field.JsonTag = field.Name
}
// jsonTag 去掉 omitempty
jsonTagValArr := strings.Split(field.JsonTag, ",")
for _, item := range jsonTagValArr {
if len(item) > 0 && item != OmitemptyTag {
field.JsonTag = item
break
}
}
// 没有设置event tag,则和 json tag保持一致
if len(field.EventTag) == 0 {
field.EventTag = field.JsonTag
}
}
// fillMappingRule 解析参数映射规则
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:14 2023/2/1
//
// mapping:"user_id:param#user_id|header#id"
func (rt *ReflectType) fillMappingRule(field *StructField, inputMappingVal string) {
if len(inputMappingVal) == 0 {
// 没有指定规则, 有默认规则
for _, location := range mappingLocationList {
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
Location: location,
Field: field.JsonTag,
})
}
return
}
mappingArr := strings.Split(inputMappingVal, ",")
for _, item := range mappingArr {
item = strings.TrimSpace(item)
// 要赋值的字段名
itemArr := strings.Split(item, ":")
if len(itemArr) != 2 {
// 配置格式错误, 跳过
continue
}
mapRuleArr := strings.Split(strings.TrimSpace(itemArr[1]), "|")
for _, itemMapRule := range mapRuleArr {
itemMapRule = strings.TrimLeft(itemMapRule, "#")
itemMapRuleArr := strings.Split(itemMapRule, "#")
// 注意 : # 为特殊分隔符, 如配置成 mapping:"project_id:#source_project_id#xxx_project_id" 实际等价于 mapping:"project_id:#source_project_id" 多余配置自动跳过
if len(itemMapRuleArr[0]) < 2 {
// 没有指定位置默认all, 即配置格式: mapping:"project_id:#source_project_id"
itemMapRuleArr[0] = MappingLocationAll
itemMapRuleArr = []string{MappingLocationAll, itemMapRuleArr[0]}
}
switch itemMapRuleArr[0] {
// 从header读取
case MappingLocationHeader:
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
Location: MappingLocationHeader,
Field: itemMapRuleArr[1],
})
// 从请求参数读取
case MappingLocationParam:
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
Location: MappingLocationParam,
Field: itemMapRuleArr[1],
})
// 从响应数据读取
case MappingLocationResponse:
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
Location: MappingLocationResponse,
Field: itemMapRuleArr[1],
})
// 从扩展数据读取
case MappingLocationExtension:
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
Location: MappingLocationExtension,
Field: itemMapRuleArr[1],
})
// 全部读取一遍
case MappingLocationAll:
fallthrough
default:
for _, itemLocation := range mappingLocationList {
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
Location: itemLocation,
Field: itemMapRuleArr[1],
})
}
}
}
}
}
// ReflectValue 反射值的实例
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 10:53 2023/2/2
type ReflectValue struct {
}
// Do 通过反射机制对data进行数据填充此逻辑要求 data 必须是结构体或者结构体指针
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:52 2023/2/2
func (rv *ReflectValue) Do(dataFlag string, data interface{}, preSendHandler abstract.IPreSendHandler) {
structInfo := ReflectTypeInstance.Do(dataFlag, data)
if !structInfo.IsStruct {
return
}
reflectValue := reflect.ValueOf(data)
if structInfo.IsStructPtr {
reflectValue = reflectValue.Elem()
}
for _, fieldInfo := range structInfo.StructFieldList {
if !rv.isZeroInputFieldValue(reflectValue, fieldInfo) {
// 不是零值, 无需处理
continue
}
// 是零值, 填充默认值
}
}
// isZeroInputFieldValue 判断对应的字段是否为对应类型默认的零值
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:04 2023/2/2
func (rv *ReflectValue) isZeroInputFieldValue(reflectValue reflect.Value, fieldInfo *StructField) bool {
inputVal := reflectValue.Field(fieldInfo.Idx).Interface()
switch fieldInfo.Type {
case reflect.Float64:
var f float64
if err := util.ConvertAssign(&f, inputVal); nil == err {
if f != 0 {
return false
}
return true
}
case reflect.String:
var s string
if err := util.ConvertAssign(&s, inputVal); nil == err {
if len(s) > 0 {
return false
}
return true
}
case reflect.Int64:
var i int64
if err := util.ConvertAssign(&i, inputVal); nil == err {
if i != 0 {
return false
}
return true
}
case reflect.Map:
if nil == inputVal {
return true
}
var m map[interface{}]interface{}
if err := util.ConvertAssign(&m, inputVal); nil == err {
if len(m) != 0 {
return false
}
return true
}
case reflect.Slice:
if nil == inputVal {
return true
}
var sl []interface{}
if err := util.ConvertAssign(&sl, inputVal); nil == err {
if len(sl) != 0 {
return false
}
return true
}
case reflect.Interface:
if inputVal == nil {
return true
}
return false
default:
// 默认不处理
return false
}
return false
}
// fillFieldValue 填充字段值
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:00 2023/2/2
func (rv *ReflectValue) fillFieldValue(reflectValue reflect.Value, fieldInfo *StructField, preSendHandler abstract.IPreSendHandler) {
paramByte, _ := json.Marshal(preSendHandler.GetRequestParam())
header := preSendHandler.GetRequestHeader()
responseByte, _ := json.Marshal(preSendHandler.GetResponseData())
extensionByte, _ := json.Marshal(preSendHandler.GetExtensionData())
for _, item := range fieldInfo.MappingRuleList {
if rv.setDataValue(reflectValue, fieldInfo, item, header, paramByte, responseByte, extensionByte) {
// 找到数据,并且赋值成功,结束继续查找
break
}
}
}
// getInputValue 获取输入的值
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:24 2023/2/2
func (rv *ReflectValue) setDataValue(reflectVal reflect.Value, fieldInfo *StructField, rule MappingRuleItem, header http.Header, paramByte, responseByte, extensionByte []byte) bool {
inputVal := rv.getInputValue(rule, header, paramByte, responseByte, extensionByte)
if len(inputVal) == 0 {
return false
}
switch fieldInfo.Type {
case reflect.String:
reflectVal.Field(fieldInfo.Idx).SetString(inputVal)
return true
case reflect.Int64:
var i int64
if err := util.ConvertAssign(&i, inputVal); nil != err {
return false
}
if i == 0 {
return false
}
if fieldInfo.IsPtr {
reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&i))
} else {
reflectVal.Field(fieldInfo.Idx).SetInt(i)
}
return true
case reflect.Float64:
var f float64
if err := util.ConvertAssign(&f, inputVal); nil != err {
return false
}
if f == 0 {
return false
}
if fieldInfo.IsPtr {
reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&f))
} else {
reflectVal.Field(fieldInfo.Idx).SetFloat(f)
}
return true
case reflect.Interface:
reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&inputVal))
return true
case reflect.Slice:
fallthrough
case reflect.Map:
fallthrough
case reflect.Struct:
var v interface{}
if err := json.Unmarshal([]byte(inputVal), &v); nil != err {
return false
}
reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&v))
return true
default:
return false
}
}
// getInputValue 获取输入的值
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:43 2023/2/2
func (rv *ReflectValue) getInputValue(rule MappingRuleItem, header http.Header, paramByte, responseByte, extensionByte []byte) string {
switch rule.Location {
case MappingLocationHeader:
return header.Get(rule.Field)
case MappingLocationParam:
return gjson.GetBytes(paramByte, rule.Field).String()
case MappingLocationResponse:
return gjson.GetBytes(responseByte, rule.Field).String()
case MappingLocationExtension:
return gjson.GetBytes(extensionByte, rule.Field).String()
case MappingLocationAll:
str := header.Get(rule.Field)
if len(str) == 0 {
str = gjson.GetBytes(paramByte, rule.Field).String()
}
if len(str) == 0 {
str = gjson.GetBytes(responseByte, rule.Field).String()
}
if len(str) == 0 {
str = gjson.GetBytes(extensionByte, rule.Field).String()
}
return str
default:
return ""
}
}

57
send.go
View File

@ -1,57 +0,0 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2023-02-02 15:15
package event
import (
"encoding/json"
"git.zhangdeman.cn/zhangdeman/event/abstract"
)
// SendEvent 发送事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:16 2023/2/2
func SendEvent(dataFlag string, data interface{}, preSendHandler abstract.IPreSendHandler, sendEventHandler abstract.ISendEventHandler) SendResult {
if nil == preSendHandler {
preSendHandler = &DefaultPreSendHandler{}
}
if nil == sendEventHandler {
sendEventHandler = &DefaultSendEventHandler{}
}
res := SendResult{}
if res.NoSendDetail, res.IsNeedSend = preSendHandler.NeedSend(); !res.IsNeedSend {
sendEventHandler.NoSendCallback(data, res)
return res
}
// 通过反射填充数据
ReflectValueInstance.Do(dataFlag, data, preSendHandler)
res.Data = data
var (
byteData []byte
err error
)
if byteData, err = json.Marshal(data); nil != err {
res.IsSuccess = false
res.FailReason = err.Error()
sendEventHandler.FailCallback(map[string]interface{}{
"err": err.Error(),
"reason": "data marshal fail",
}, err)
return res
}
if res.SendResult, err = sendEventHandler.Send(byteData); nil != err {
sendEventHandler.FailCallback(res.SendResult, err)
return res
}
sendEventHandler.SuccessCallback(res.SendResult)
return res
}