diff --git a/abstract/IEvent.go b/abstract/IEvent.go new file mode 100644 index 0000000..12cd6a6 --- /dev/null +++ b/abstract/IEvent.go @@ -0,0 +1,119 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-03-11 12:02 +package abstract + +import ( + "context" + "git.zhangdeman.cn/zhangdeman/event/define" +) + +// EventHandler 事件数据处理函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:08 2024/6/26 +type EventHandler func(eventData *define.EventData) (map[string]any, error) + +// SendFailCallback 发送事件失败的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:31 2024/6/26 +type SendFailCallback func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error) + +// SendSuccessCallback 发送事件成功的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:32 2024/6/26 +type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult) + +// ConsumeFailCallbackHandler 时间处理成功回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:18 2024/6/26 +type ConsumeFailCallbackHandler func(eventData *define.EventData, handleResult map[string]any, err error) + +// ConsumeSuccessCallback 时间处理失败回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:20 2024/6/26 +type ConsumeSuccessCallback func(eventData *define.EventData, handleResult map[string]any) + +// PanicCallback panic回调, 根据不同异常类型, eventData / handleResult 均可能为nil +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:07 2024/6/26 +type PanicCallback func(err any, eventData *define.EventData, handleResult map[string]any) + +// EventParseFailCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:48 2024/6/27 +type EventParseFailCallback func(err error, eventData string) + +// IEvent 事件接口定义 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 19:08 2023/8/14 +type IEvent interface { + // SendEvent 发送事件(同步) + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:04 2024/3/11 + SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) + + // SendEventAsync 发送事件(异步) + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 15:58 2024/6/25 + SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback) + // GetConsumeEventChan 或去消息消费的channel, 自行实现消费 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 17:11 2024/6/26 + GetConsumeEventChan() (<-chan *define.EventData, error) + // ConsumeEvent 消费事件 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:05 2024/3/11 + ConsumeEvent(handler EventHandler, successCallback ConsumeSuccessCallback, failureCallback ConsumeFailCallbackHandler) error + // Destroy 事件实例销毁时, 执行的方法 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:05 2024/3/11 + Destroy() + // DriverType 获取驱动类型 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:06 2024/3/11 + DriverType() string + // SetPanicCallback 设置失败回调的处理函数 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 18:39 2024/6/27 + SetPanicCallback(panicCallback PanicCallback) + // SetEventParseFailCallback 设置数据解析失败的处理函数 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 18:51 2024/6/27 + SetEventParseFailCallback(parseFailCallbackCallback EventParseFailCallback) +} diff --git a/abstract/pre_send_event_handler.go b/abstract/pre_send_event_handler.go deleted file mode 100644 index 9883a4d..0000000 --- a/abstract/pre_send_event_handler.go +++ /dev/null @@ -1,62 +0,0 @@ -// Package abstract ... -// -// Description : 事件发送前预处理接口约束 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-01 14:06 -package abstract - -import ( - "net/http" -) - -// IPreSendHandler ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:06 2023/2/1 -type IPreSendHandler interface { - // GetRequestParam 构造 base info 时, 可能需要从请求参数中提取公共数据 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:13 2023/2/1 - GetRequestParam() map[string]interface{} - - // GetRequestHeader ... - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:14 2023/2/1 - // 构造 base info 时, 可能需要从请求参数中提取公共数据 - GetRequestHeader() http.Header - - // GetResponseData 响应数据 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:15 2023/2/1 - GetResponseData() map[string]interface{} - - // GetExtensionData 获取扩展数据 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:19 2023/2/1 - GetExtensionData() map[string]interface{} - - // GetEventData 获取事件数据, 建议返回结构体或者结构体指针, 内部会自动补齐缺失的数据 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:16 2023/2/1 - GetEventData() interface{} - - // NeedSend 判断是否需要发送事件, 若不需要发送, 可在第一个返回值中记录不需要发送的原因 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 15:24 2023/2/2 - NeedSend() (map[string]interface{}, bool) -} diff --git a/abstract/send_event_handler.go b/abstract/send_event_handler.go deleted file mode 100644 index 5b9de5b..0000000 --- a/abstract/send_event_handler.go +++ /dev/null @@ -1,47 +0,0 @@ -// Package abstract ... -// -// Description : abstract ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-01 14:21 -package abstract - -import "git.zhangdeman.cn/zhangdeman/event" - -// ISendEventHandler 发送事件处理器 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:21 2023/2/1 -type ISendEventHandler interface { - // Send ... - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:21 2023/2/1 - // 事件发送成功之后, 可以返回一些业务数据, 这些业务数据会回调给SuccessCallback - // 事件发送成功之后, 可以返回一些业务数据 以及 err, 这些业务数据会回调给FailCallback - Send(data []byte) (map[string]interface{}, error) - - // SuccessCallback 事件发送成功的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:21 2023/2/1 - SuccessCallback(data map[string]interface{}) - - // FailCallback 事件发送失败的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:22 2023/2/1 - FailCallback(data map[string]interface{}, err error) - - // NoSendCallback 不需要发送事件的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 15:26 2023/2/2 - NoSendCallback(data interface{}, res event.SendResult) -} diff --git a/base.go b/base.go new file mode 100644 index 0000000..99e5470 --- /dev/null +++ b/base.go @@ -0,0 +1,42 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 12:31 +package event + +import ( + "git.zhangdeman.cn/zhangdeman/event/abstract" + "git.zhangdeman.cn/zhangdeman/event/define" +) + +type base struct { + panicCallback abstract.PanicCallback + parseFailCallback abstract.EventParseFailCallback +} + +// SetPanicCallback 出现任何panic的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:02 2024/6/26 +func (b *base) SetPanicCallback(panicCallback abstract.PanicCallback) { + if nil == panicCallback { + panicCallback = define.DefaultPanicCallback + } + b.panicCallback = panicCallback +} + +// SetEventParseFailCallback 设置事件解析失败回回调函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:42 2024/6/27 +func (b *base) SetEventParseFailCallback(parseFailCallbackCallback abstract.EventParseFailCallback) { + if nil == parseFailCallbackCallback { + parseFailCallbackCallback = define.DefaultParseFailCallbackFunc + } + b.parseFailCallback = parseFailCallbackCallback +} diff --git a/default_pre_send_handler.go b/default_pre_send_handler.go deleted file mode 100644 index 11d93b2..0000000 --- a/default_pre_send_handler.go +++ /dev/null @@ -1,42 +0,0 @@ -// Package event ... -// -// Description : event ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-02 16:36 -package event - -import "net/http" - -// DefaultPreSendHandler IPreSendHandler 默认实现 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 16:36 2023/2/2 -type DefaultPreSendHandler struct { -} - -func (d *DefaultPreSendHandler) GetRequestParam() map[string]interface{} { - return map[string]interface{}{} -} - -func (d *DefaultPreSendHandler) GetRequestHeader() http.Header { - return make(map[string][]string, 0) -} - -func (d *DefaultPreSendHandler) GetResponseData() map[string]interface{} { - return map[string]interface{}{} -} - -func (d *DefaultPreSendHandler) GetExtensionData() map[string]interface{} { - return map[string]interface{}{} -} - -func (d *DefaultPreSendHandler) GetEventData() interface{} { - return map[string]interface{}{} -} - -func (d *DefaultPreSendHandler) NeedSend() (map[string]interface{}, bool) { - return map[string]interface{}{}, true -} diff --git a/default_send_event_handler.go b/default_send_event_handler.go deleted file mode 100644 index 466644c..0000000 --- a/default_send_event_handler.go +++ /dev/null @@ -1,27 +0,0 @@ -// Package event ... -// -// Description : event ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-02 16:39 -package event - -type DefaultSendEventHandler struct { -} - -func (d *DefaultSendEventHandler) Send(data []byte) (map[string]interface{}, error) { - return map[string]interface{}{}, nil -} - -func (d *DefaultSendEventHandler) SuccessCallback(data map[string]interface{}) { - -} - -func (d *DefaultSendEventHandler) FailCallback(data map[string]interface{}, err error) { - -} - -func (d *DefaultSendEventHandler) NoSendCallback(data interface{}, res SendResult) { - -} diff --git a/define.go b/define.go deleted file mode 100644 index 0c083a8..0000000 --- a/define.go +++ /dev/null @@ -1,109 +0,0 @@ -// Package event ... -// -// Description : 各种常量定义 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-01 14:23 -package event - -import "reflect" - -const ( - // OutEventTag 事件数据输出key的标签 - OutEventTag = "event" - // JsonTag json输出的标签 - JsonTag = "json" - // IgnoreTagValue 不做输出的标签值 - IgnoreTagValue = "-" - // MappingTag 参数映射标签 - MappingTag = "mapping" - // PriorityTag 处理数据时, mapping数据查找的优先级, 默认值 : field - PriorityTag = "priority" - // OmitemptyTag ... - OmitemptyTag = "omitempty" -) - -const ( - // MappingLocationAll 自动探测所有路径 - MappingLocationAll = "all" - // MappingLocationParam 从参数读取 - MappingLocationParam = "param" - // MappingLocationHeader 从请求header读取 - MappingLocationHeader = "header" - // MappingLocationResponse 从响应数据读取 - MappingLocationResponse = "response" - // MappingLocationExtension 从扩展数据读取 - MappingLocationExtension = "extension" -) - -var ( - mappingLocationList = []string{ - MappingLocationHeader, - MappingLocationParam, - MappingLocationResponse, - MappingLocationExtension, - } -) - -// SetMappingLocationList 只持续该默认的查找优先级 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 18:17 2023/2/1 -func SetMappingLocationList(locationList []string) { - if len(locationList) > 0 { - mappingLocationList = locationList - } -} - -// MappingRuleItem 规则 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 17:36 2023/2/1 -type MappingRuleItem struct { - Location string `json:"location"` // 数据所在位置, header-请求头 param-参数获取 response-响应数据获取 extension-扩展数据读取 all-自动按照header/param/response/extension的顺序查询 - Field string `json:"field"` // 查询的字段名称 -} - -// StructField 结构体字段信息 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:35 2023/2/1 -type StructField struct { - Idx int // 字段在结构体的索引 - Name string // 字段名 - Type reflect.Kind // 字段类型 - IsPtr bool // 是否为指针 - JsonTag string // json标签 - EventTag string // 事件标签 - MappingRuleList []MappingRuleItem // 规则列表 -} - -// StructInfo 结构体信息 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 16:17 2023/2/1 -type StructInfo struct { - Flag string // 结构体标识 - IsStruct bool // 是否为结构体 - IsStructPtr bool // 是否为结构体指针 - StructFieldList []*StructField // 结构体字段列表 -} - -// SendResult ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 15:27 2023/2/2 -type SendResult struct { - IsNeedSend bool `json:"is_need_send"` // 是否需要发送 - NoSendDetail map[string]interface{} `json:"no_send_detail"` // 未发送详细原因 - Data interface{} `json:"data"` // 发送的数据 - IsSuccess bool `json:"is_success"` // 是否处理成功 - FailReason string `json:"fail_reason"` // 失败原因 - SendResult map[string]interface{} `json:"send_result"` // 发送结果 -} diff --git a/define/consts.go b/define/consts.go new file mode 100644 index 0000000..d22c369 --- /dev/null +++ b/define/consts.go @@ -0,0 +1,14 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-03-11 12:06 +package define + +const ( + DriverTypeEtcd = "ETCD" // etcd驱动 + DriverTypeKafka = "KAFKA" // kafka驱动 + DriverTypeREDIS = "REDIS" // redis驱动 +) diff --git a/define/data.go b/define/data.go new file mode 100644 index 0000000..09d2427 --- /dev/null +++ b/define/data.go @@ -0,0 +1,37 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-03-11 11:40 +package define + +// EventData ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:41 2024/3/11 +type EventData struct { + EventType string `json:"event_type"` // 事件类型 + TraceID string `json:"trace_id"` // 事件追踪ID + Host string `json:"host"` // 触发事件host + Timestamp int64 `json:"timestamp"` // 触发时间,纳秒级时间戳 + SystemTimestamp int64 `json:"system_timestamp"` // 发送时的系统时间 + Key string `json:"key"` // 会基于当前值进行hash, 决定消息分区, 不指定则随机生成 + Data any `json:"data"` // 发送的数据 +} + +// SendResult 发送结果 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 15:56 2024/6/25 +type SendResult struct { + Data *EventData `json:"data"` // 发送的数据 + Partition int `json:"partition_num"` // 分区索引编号 + Topic string `json:"topic"` // 使用的真实topic + IsSuccess bool `json:"is_success"` // 是否发送成功 + FailReason string `json:"fail_reason"` // 失败原因 + Extension map[string]any `json:"extension"` // 扩展数据 +} diff --git a/define/handler.go b/define/handler.go new file mode 100644 index 0000000..305660f --- /dev/null +++ b/define/handler.go @@ -0,0 +1,59 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-26 17:21 +package define + +import "context" + +// DefaultSuccessCallbackHandler ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:21 2024/6/26 +func DefaultSuccessCallbackHandler(eventData *EventData, handleResult map[string]any) { + +} + +// DefaultFailCallbackHandler ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:11 2024/6/26 +func DefaultFailCallbackHandler(eventData *EventData, handleResult map[string]any, err error) { + +} + +// DefaultPanicCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:11 2024/6/26 +func DefaultPanicCallback(err any, eventData *EventData, handleResult map[string]any) {} + +// DefaultSendFailCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:45 2024/6/27 +func DefaultSendFailCallback(ctx context.Context, eventData *EventData, eventResult *SendResult, err error) { +} + +// DefaultSendSuccessCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:46 2024/6/27 +func DefaultSendSuccessCallback(ctx context.Context, eventResult *SendResult) {} + +// DefaultParseFailCallbackFunc ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:49 2024/6/27 +func DefaultParseFailCallbackFunc(err error, eventData string) { + +} diff --git a/define/memory.go b/define/memory.go new file mode 100644 index 0000000..a5a5d04 --- /dev/null +++ b/define/memory.go @@ -0,0 +1,23 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 11:25 +package define + +const ( + DefaultMemoryChannelSize = 1024 // 默认的消息channel大小 + DefaultMemoryCloseMaxWaitTime = 5000 // 默认最大等待 : 5s +) + +// MemoryEventConfig 内存事件的配置 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:26 2024/7/17 +type MemoryEventConfig struct { + MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小 + CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms +} diff --git a/define/redis.go b/define/redis.go new file mode 100644 index 0000000..60dfcc9 --- /dev/null +++ b/define/redis.go @@ -0,0 +1,27 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-25 16:24 +package define + +const ( + DefaultRedisPartitionNum = 1 + DefaultRedisTopic = "EVENT_TOPIC_C6DBE0AAE846C5C0DE35802107326B1E" + DefaultRedisMessageBufferSize = 1024 + DefaultRedisCloseMaxWaitTime = 5000 // 默认最大等待 : 5s +) + +// RedisEventPubSubConfig redis事件配置 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:25 2024/6/25 +type RedisEventPubSubConfig struct { + Topic string `json:"topic"` // topic key, 不指定随机生成 + PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 + MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小 + CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms +} diff --git a/go.mod b/go.mod index 85f3283..ac2d75d 100644 --- a/go.mod +++ b/go.mod @@ -5,20 +5,25 @@ go 1.21 toolchain go1.22.1 require ( - git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253 - github.com/tidwall/gjson v1.17.1 + git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de + git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd + git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50 + github.com/redis/go-redis/v9 v9.5.3 ) require ( - github.com/Jeffail/gabs v1.4.0 // indirect + git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 // indirect + git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect + git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect + github.com/BurntSushi/toml v1.4.0 // indirect github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mozillazg/go-pinyin v0.20.0 // indirect - github.com/pkg/errors v0.9.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/tidwall/gjson v1.17.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index c9259d3..ceba8db 100644 --- a/go.sum +++ b/go.sum @@ -1,45 +1,53 @@ -git.zhangdeman.cn/zhangdeman/util v0.0.0-20230113095943-b4b3e261e0c4 h1:1WclY9P8l8o/NZ3ZR/mupm8LtowjQ/Q4UNGXR32f0OQ= -git.zhangdeman.cn/zhangdeman/util v0.0.0-20230113095943-b4b3e261e0c4/go.mod h1:zTir/0IWdK3E7n0GiaogyWHADAQnBtTdl2I6Z2/OPqw= -git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253 h1:GO3oZa5a2sqwAzGcLDJtQzmshSWRmoP7IDS8bwFqvC4= -git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI= -github.com/Jeffail/gabs v1.4.0 h1://5fYRRTq1edjfIrQGvdkcd22pkYUrHZ5YC/H2GJVAo= -github.com/Jeffail/gabs v1.4.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 h1:+o+BI5GGlwJelPLWL8ciDJqxw/G8dv+FU6OztGSnEjo= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de h1:ksjcMHupU0Bw0BJxJp3dajmWqGdqV7k2eVohN5O3S9Q= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 h1:I/wOsRpCSRkU9vo1u703slQsmK0wnNeZzsWQOGtIAG0= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211/go.mod h1:SrtvrQRdzt+8KfYzvosH++gWxo2ShPTzR1m3VQ6uX7U= +git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 h1:gUDlQMuJ4xNfP2Abl1Msmpa3fASLWYkNlqDFF/6GN0Y= +git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd h1:2Y37waOVCmVvx0Rp8VGEptE2/2JVMImtxB4dKKDk/3w= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd/go.mod h1:6+7whkCmb4sJDIfH3HxNuXRveaM0gCCNWd2uXZqNtIE= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e h1:Q973S6CcWr1ICZhFI1STFOJ+KUImCl2BaIXm6YppBqI= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50 h1:olo34i2Gq5gX7bYPv5TR4X5l5CrYFtu9UCElkYlmL2c= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50/go.mod h1:US/pcq2vstE3iyxIHf53w8IeXKkZys7bj/ozLWkRYeE= +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-ini/ini v1.66.6 h1:h6k2Bb0HWS/BXXHCXj4QHjxPmlIU4NK+7MuLp9SD+4k= -github.com/go-ini/ini v1.66.6/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo= -github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= -github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/memory.go b/memory.go new file mode 100644 index 0000000..5cf03e7 --- /dev/null +++ b/memory.go @@ -0,0 +1,180 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 11:21 +package event + +import ( + "context" + "errors" + "fmt" + "git.zhangdeman.cn/zhangdeman/consts" + "git.zhangdeman.cn/zhangdeman/event/abstract" + "git.zhangdeman.cn/zhangdeman/event/define" +) + +var ( + MemoryEventClient abstract.IEvent +) + +// InitMemoryEvent 初始化事件实例 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:24 2024/7/17 +func InitMemoryEvent(cfg *define.MemoryEventConfig) { + instance := &MemoryEvent{ + base: &base{ + panicCallback: define.DefaultPanicCallback, + parseFailCallback: define.DefaultParseFailCallbackFunc, + }, + } + instance.Init(cfg) + MemoryEventClient = instance +} + +// MemoryEvent 基于内存实现的消息队列(仅适用于单机) +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:22 2024/7/17 +type MemoryEvent struct { + *base + cfg *define.MemoryEventConfig + messageChannel chan *define.EventData +} + +// SendEvent 发送事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:28 2024/7/17 +func (m *MemoryEvent) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { + if nil == eventData { + return nil, errors.New("event data is nil") + } + m.messageChannel <- eventData + return &define.SendResult{ + Data: eventData, + Partition: 0, + Topic: "", + IsSuccess: true, + FailReason: "success", + Extension: nil, + }, nil +} + +// SendEventAsync 异步发送事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:29 2024/7/17 +func (m *MemoryEvent) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { + if nil == sendFailCallback { + sendFailCallback = define.DefaultSendFailCallback + } + if nil == sendSuccessCallback { + sendSuccessCallback = define.DefaultSendSuccessCallback + } + go func() { + var ( + res *define.SendResult + err error + ) + defer func() { + if r := recover(); nil != r { + m.panicCallback(err, eventData, map[string]any{ + "send_result": res, + }) + } + }() + res, err = m.SendEvent(ctx, eventData) + + if nil != err { + sendFailCallback(ctx, eventData, res, err) + return + } else { + sendSuccessCallback(ctx, res) + } + }() +} + +// GetConsumeEventChan 消费的消息chan +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:41 2024/7/17 +func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) { + return m.messageChannel, nil +} + +// ConsumeEvent 消费事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 14:14 2024/7/17 +func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { + messageChan, _ := m.GetConsumeEventChan() + if nil == successCallback { + successCallback = define.DefaultSuccessCallbackHandler + } + if nil == failureCallback { + failureCallback = define.DefaultFailCallbackHandler + } + go func() { + defer func() { + if r := recover(); nil != r { + fmt.Println("出现异常", r) + } + }() + for eventData := range messageChan { + var ( + err error + handleResult map[string]any + ) + if handleResult, err = handler(eventData); nil != err { + failureCallback(eventData, handleResult, err) + } else { + successCallback(eventData, handleResult) + } + } + }() + + return nil +} + +// Destroy 实例销毁 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 14:17 2024/7/17 +func (m *MemoryEvent) Destroy() { + return +} + +func (m *MemoryEvent) DriverType() string { + return consts.EventDriverMemory +} + +// Init 初始化 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:08 2024/7/17 +func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) { + if nil == cfg { + cfg = &define.MemoryEventConfig{} + } + if cfg.MessageBufferSize <= 0 { + cfg.MessageBufferSize = define.DefaultMemoryChannelSize + } + if cfg.CloseMaxWaitTime <= 0 { + cfg.CloseMaxWaitTime = define.DefaultMemoryCloseMaxWaitTime + } + m.cfg = cfg + // 初始化内存 channel + m.messageChannel = make(chan *define.EventData, cfg.MessageBufferSize) +} diff --git a/memory_test.go b/memory_test.go new file mode 100644 index 0000000..6754657 --- /dev/null +++ b/memory_test.go @@ -0,0 +1,50 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 14:17 +package event + +import ( + "context" + "encoding/json" + "fmt" + "git.zhangdeman.cn/zhangdeman/event/define" + "testing" + "time" +) + +func TestInitMemoryEvent(t *testing.T) { + InitMemoryEvent(&define.MemoryEventConfig{ + MessageBufferSize: 1024, + CloseMaxWaitTime: 5000, + }) + go func() { + for { + time.Sleep(time.Second) + MemoryEventClient.SendEventAsync(context.Background(), &define.EventData{ + EventType: "TEST", + TraceID: time.Now().Format("2006-01-02 15:04:05"), + Host: "", + Timestamp: time.Now().Unix(), + SystemTimestamp: 0, + Key: "", + Data: nil, + }, func(ctx context.Context, eventResult *define.SendResult) { + fmt.Println("消息发送成功") + }, func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error) { + fmt.Println("消息发送失败") + }) + } + }() + MemoryEventClient.ConsumeEvent(func(eventData *define.EventData) (map[string]any, error) { + byteData, _ := json.Marshal(eventData) + fmt.Println(string(byteData)) + return map[string]any{}, nil + }, nil, nil) + for { + + } +} diff --git a/redis_pub_sub.go b/redis_pub_sub.go new file mode 100644 index 0000000..b6d2c39 --- /dev/null +++ b/redis_pub_sub.go @@ -0,0 +1,258 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-25 16:06 +package event + +import ( + "context" + "errors" + "fmt" + "git.zhangdeman.cn/zhangdeman/consts" + "git.zhangdeman.cn/zhangdeman/event/abstract" + "git.zhangdeman.cn/zhangdeman/event/define" + "git.zhangdeman.cn/zhangdeman/serialize" + "git.zhangdeman.cn/zhangdeman/wrapper" + "github.com/redis/go-redis/v9" + "time" +) + +var ( + RedisEventPubSubClient abstract.IEvent +) + +// InitRedisPubSubEvent 初始化redis事件驱动, 基于 pub / sub 指令 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:24 2024/6/25 +func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { + instance := &RedisEventPubSub{ + base: &base{ + panicCallback: define.DefaultPanicCallback, + parseFailCallback: define.DefaultParseFailCallbackFunc, + }, + redisClient: redisClient, + pubSubConfig: pubSubConfig, + } + instance.SetRedisClient(redisClient, pubSubConfig) + instance.StartConsumer() // 启动消费者 + RedisEventPubSubClient = instance +} + +// RedisEventPubSub 基于redis的事件驱动 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:07 2024/6/25 +type RedisEventPubSub struct { + *base + redisClient *redis.Client // redis客户端 + pubSubConfig *define.RedisEventPubSubConfig // 事件配置 + messageChan chan *define.EventData // 消息队列 + stopConsumer chan bool // 停止消费者 + isStop bool // 是否已停止 + parseFailCallback abstract.EventParseFailCallback // 数据解析失败回调 +} + +// SendEvent 发布时间 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 10:58 2024/6/27 +func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { + partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(r.pubSubConfig.PartitionNum)) + realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, partition) + res := r.redisClient.Publish(ctx, realTopic, eventData) + if nil == res { + return nil, errors.New("can not get send event result") + } + return &define.SendResult{ + Data: eventData, + Partition: partition, + Topic: realTopic, + IsSuccess: res.Err() == nil, + FailReason: wrapper.TernaryOperator.String(res.Err() == nil, "success", wrapper.String(res.Err().Error())).Value(), + Extension: nil, + }, res.Err() +} + +// SendEventAsync 异步发送事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:29 2024/6/27 +func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { + go func() { + if nil == sendFailCallback { + sendFailCallback = define.DefaultSendFailCallback + } + if nil == sendSuccessCallback { + sendSuccessCallback = define.DefaultSendSuccessCallback + } + var ( + sendResult *define.SendResult + handlerErr error + ) + defer func() { + if panicErr := recover(); nil != panicErr { + r.panicCallback(panicErr, eventData, map[string]any{ + "send_result": sendResult, + }) + } + }() + if sendResult, handlerErr = r.SendEvent(ctx, eventData); nil != handlerErr { + sendFailCallback(ctx, eventData, sendResult, handlerErr) + } else { + sendSuccessCallback(ctx, sendResult) + } + }() +} + +// GetConsumeEventChan 获取消息channel, 自行实现消费 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:13 2024/6/26 +func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, error) { + if r.isStop { + return nil, errors.New("event instance has stop") + } + return r.messageChan, nil +} + +// ConsumeEvent 获取数据消费实例 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:06 2024/6/26 +func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { + if nil == handler { + return errors.New("handler is nil") + } + var ( + messageChan <-chan *define.EventData + err error + ) + + if messageChan, err = r.GetConsumeEventChan(); nil != err { + return err + } + if nil == failureCallback { + failureCallback = define.DefaultFailCallbackHandler + } + if nil == successCallback { + successCallback = define.DefaultSuccessCallbackHandler + } + + for eventData := range messageChan { + handlerResult, handleErr := handler(eventData) + if nil != handleErr { + failureCallback(eventData, handlerResult, err) + } else { + successCallback(eventData, handlerResult) + } + } + + return nil +} + +// StartConsumer 启动消费者 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:36 2024/6/27 +func (r *RedisEventPubSub) StartConsumer() { + for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ { + go func(realPartition int) { + defer func() { + if panicErr := recover(); nil != panicErr { + r.panicCallback(panicErr, nil, nil) + } + }() + // 启动消费者 + realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, realPartition) + messageChannel := r.redisClient.Subscribe(context.Background(), realTopic).Channel() + for message := range messageChannel { + var ( + eventData define.EventData + err error + ) + + if err = serialize.JSON.UnmarshalWithNumber([]byte(message.Payload), &eventData); nil != err { + // TODO : 事件数据解析失败的回调 + continue + } + r.messageChan <- &eventData + } + }(partition) + } +} + +// Destroy 销毁事件实例 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 15:42 2024/6/26 +func (r *RedisEventPubSub) Destroy() { + if r.isStop { + // 已停止 + return + } + r.stopConsumer <- true // 停止消费者 + messageChan := make(chan bool, 1) + go func() { + for { + if len(r.messageChan) == 0 { + messageChan <- true + return + } + } + }() + select { + case <-time.After(time.Millisecond * time.Duration(r.pubSubConfig.CloseMaxWaitTime)): + // 定时器到期 + return + case <-messageChan: + // 没有待消费数据 + return + } +} + +func (r *RedisEventPubSub) DriverType() string { + return consts.EventDriverRedis +} + +// SetRedisClient 设置redis客户端 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:30 2024/6/25 +func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { + if nil == pubSubConfig { + pubSubConfig = &define.RedisEventPubSubConfig{ + Topic: "", + PartitionNum: 0, + MessageBufferSize: 0, + } + } + if len(pubSubConfig.Topic) == 0 { + pubSubConfig.Topic = define.DefaultRedisTopic + } + if pubSubConfig.PartitionNum <= 0 { + pubSubConfig.PartitionNum = define.DefaultRedisPartitionNum + } + if pubSubConfig.MessageBufferSize <= 0 { + r.pubSubConfig.MessageBufferSize = define.DefaultRedisMessageBufferSize + } + if pubSubConfig.CloseMaxWaitTime <= 0 { + r.pubSubConfig.CloseMaxWaitTime = define.DefaultRedisCloseMaxWaitTime + } + r.redisClient = redisClient + r.pubSubConfig = pubSubConfig + r.stopConsumer = make(chan bool, 1) + r.messageChan = make(chan *define.EventData, r.pubSubConfig.MessageBufferSize) +} diff --git a/redis_pub_sub_test.go b/redis_pub_sub_test.go new file mode 100644 index 0000000..2c044d5 --- /dev/null +++ b/redis_pub_sub_test.go @@ -0,0 +1,19 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-25 16:51 +package event + +import ( + "fmt" + "git.zhangdeman.cn/zhangdeman/wrapper" + "strings" + "testing" +) + +func TestInitRedisPubSubEvent(t *testing.T) { + fmt.Println(strings.ToUpper(wrapper.StringFromRandom(128, "").Md5().Value)) +} diff --git a/reflect.go b/reflect.go deleted file mode 100644 index b1abfdc..0000000 --- a/reflect.go +++ /dev/null @@ -1,440 +0,0 @@ -// Package event ... -// -// Description : event ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-01 14:30 -package event - -import ( - "encoding/json" - "git.zhangdeman.cn/zhangdeman/event/abstract" - "git.zhangdeman.cn/zhangdeman/util" - "github.com/tidwall/gjson" - "net/http" - "reflect" - "strings" - "sync" -) - -var ( - // ReflectTypeInstance 反射实例 - ReflectTypeInstance *ReflectType - ReflectValueInstance *ReflectValue -) - -func init() { - ReflectTypeInstance = &ReflectType{ - lock: &sync.RWMutex{}, - cacheTable: make(map[string]*StructInfo), - } - ReflectValueInstance = &ReflectValue{} -} - -// ReflectType 反射数据类型 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:31 2023/2/1 -type ReflectType struct { - // 数据锁 - lock *sync.RWMutex - // 反射结果缓存 - cacheTable map[string]*StructInfo -} - -// Do 反射获取数据类型 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:34 2023/2/1 -// -// 为特定结构体生成全局唯一的标识, 并进行缓存, 加速反射结果获取 -func (rt *ReflectType) Do(dataFlag string, data interface{}) *StructInfo { - rt.lock.Lock() - defer rt.lock.Unlock() - if cacheResult, exist := rt.cacheTable[dataFlag]; exist { - // 缓存存在, 直接是有缓存结果 - return cacheResult - } - // 缓存不存在, 解析 - res := &StructInfo{ - Flag: dataFlag, - IsStruct: false, - IsStructPtr: false, - StructFieldList: make([]*StructField, 0), - } - isPtr := false - reflectType := reflect.TypeOf(data) - if reflectType.Kind() == reflect.Ptr { - isPtr = true - reflectType = reflectType.Elem() - } - if reflectType.Kind() != reflect.Struct { - // 非结构体,无需反射 - rt.cacheTable[dataFlag] = res - return res - } - res.IsStruct = true - res.IsStructPtr = isPtr - for idx := 0; idx < reflectType.NumField(); idx++ { - field := &StructField{ - Idx: idx, - Name: reflectType.Field(idx).Name, - JsonTag: reflectType.Field(idx).Tag.Get(JsonTag), - EventTag: reflectType.Field(idx).Tag.Get(OutEventTag), - MappingRuleList: make([]MappingRuleItem, 0), - } - rt.fillFieldType(field, reflectType.Field(idx).Type) - rt.fillMappingRule(field, reflectType.Field(idx).Tag.Get(MappingTag)) - res.StructFieldList = append(res.StructFieldList, field) - } - rt.cacheTable[dataFlag] = res - return res -} - -// fillFieldType 填充字段类型 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 20:46 2023/2/1 -func (rt *ReflectType) fillFieldType(field *StructField, dataType reflect.Type) { - switch dataType.Kind() { - case reflect.Float32: - fallthrough - case reflect.Float64: - field.Type = reflect.Float64 - case reflect.String: - field.Type = reflect.String - case reflect.Struct: - field.Type = reflect.Struct - case reflect.Slice: - field.Type = reflect.Slice - case reflect.Map: - field.Type = reflect.Map - case reflect.Ptr: - field.IsPtr = true - // 指针再次判断基础类型 - field.Type = dataType.Elem().Kind() - default: - if strings.Contains(dataType.String(), "int") { - field.Type = reflect.Int64 - } else { - field.Type = reflect.Interface - } - } -} - -// fillTagInfo 填充标签信息 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 17:11 2023/2/1 -func (rt *ReflectType) fillTagInfo(field *StructField) { - if len(field.JsonTag) == 0 { - field.JsonTag = field.Name - } - - // jsonTag 去掉 omitempty - jsonTagValArr := strings.Split(field.JsonTag, ",") - for _, item := range jsonTagValArr { - if len(item) > 0 && item != OmitemptyTag { - field.JsonTag = item - break - } - } - - // 没有设置event tag,则和 json tag保持一致 - if len(field.EventTag) == 0 { - field.EventTag = field.JsonTag - } -} - -// fillMappingRule 解析参数映射规则 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 17:14 2023/2/1 -// -// mapping:"user_id:param#user_id|header#id" -func (rt *ReflectType) fillMappingRule(field *StructField, inputMappingVal string) { - - if len(inputMappingVal) == 0 { - // 没有指定规则, 有默认规则 - for _, location := range mappingLocationList { - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: location, - Field: field.JsonTag, - }) - } - return - } - mappingArr := strings.Split(inputMappingVal, ",") - for _, item := range mappingArr { - item = strings.TrimSpace(item) - // 要赋值的字段名 - itemArr := strings.Split(item, ":") - if len(itemArr) != 2 { - // 配置格式错误, 跳过 - continue - } - mapRuleArr := strings.Split(strings.TrimSpace(itemArr[1]), "|") - for _, itemMapRule := range mapRuleArr { - itemMapRule = strings.TrimLeft(itemMapRule, "#") - itemMapRuleArr := strings.Split(itemMapRule, "#") - // 注意 : # 为特殊分隔符, 如配置成 mapping:"project_id:#source_project_id#xxx_project_id" 实际等价于 mapping:"project_id:#source_project_id" 多余配置自动跳过 - if len(itemMapRuleArr[0]) < 2 { - // 没有指定位置,默认all, 即配置格式: mapping:"project_id:#source_project_id" - itemMapRuleArr[0] = MappingLocationAll - itemMapRuleArr = []string{MappingLocationAll, itemMapRuleArr[0]} - } - - switch itemMapRuleArr[0] { - // 从header读取 - case MappingLocationHeader: - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: MappingLocationHeader, - Field: itemMapRuleArr[1], - }) - // 从请求参数读取 - case MappingLocationParam: - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: MappingLocationParam, - Field: itemMapRuleArr[1], - }) - // 从响应数据读取 - case MappingLocationResponse: - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: MappingLocationResponse, - Field: itemMapRuleArr[1], - }) - // 从扩展数据读取 - case MappingLocationExtension: - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: MappingLocationExtension, - Field: itemMapRuleArr[1], - }) - // 全部读取一遍 - case MappingLocationAll: - fallthrough - default: - for _, itemLocation := range mappingLocationList { - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: itemLocation, - Field: itemMapRuleArr[1], - }) - } - } - } - } -} - -// ReflectValue 反射值的实例 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 10:53 2023/2/2 -type ReflectValue struct { -} - -// Do 通过反射机制,对data进行数据填充,此逻辑要求 data 必须是结构体或者结构体指针 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 11:52 2023/2/2 -func (rv *ReflectValue) Do(dataFlag string, data interface{}, preSendHandler abstract.IPreSendHandler) { - structInfo := ReflectTypeInstance.Do(dataFlag, data) - if !structInfo.IsStruct { - return - } - reflectValue := reflect.ValueOf(data) - if structInfo.IsStructPtr { - reflectValue = reflectValue.Elem() - } - - for _, fieldInfo := range structInfo.StructFieldList { - if !rv.isZeroInputFieldValue(reflectValue, fieldInfo) { - // 不是零值, 无需处理 - continue - } - // 是零值, 填充默认值 - } -} - -// isZeroInputFieldValue 判断对应的字段是否为对应类型默认的零值 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 12:04 2023/2/2 -func (rv *ReflectValue) isZeroInputFieldValue(reflectValue reflect.Value, fieldInfo *StructField) bool { - inputVal := reflectValue.Field(fieldInfo.Idx).Interface() - switch fieldInfo.Type { - case reflect.Float64: - var f float64 - if err := util.ConvertAssign(&f, inputVal); nil == err { - if f != 0 { - return false - } - return true - } - case reflect.String: - var s string - if err := util.ConvertAssign(&s, inputVal); nil == err { - if len(s) > 0 { - return false - } - return true - } - case reflect.Int64: - var i int64 - if err := util.ConvertAssign(&i, inputVal); nil == err { - if i != 0 { - return false - } - return true - } - case reflect.Map: - if nil == inputVal { - return true - } - var m map[interface{}]interface{} - if err := util.ConvertAssign(&m, inputVal); nil == err { - if len(m) != 0 { - return false - } - return true - } - case reflect.Slice: - if nil == inputVal { - return true - } - var sl []interface{} - if err := util.ConvertAssign(&sl, inputVal); nil == err { - if len(sl) != 0 { - return false - } - return true - } - case reflect.Interface: - if inputVal == nil { - return true - } - return false - default: - // 默认不处理 - return false - } - return false -} - -// fillFieldValue 填充字段值 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:00 2023/2/2 -func (rv *ReflectValue) fillFieldValue(reflectValue reflect.Value, fieldInfo *StructField, preSendHandler abstract.IPreSendHandler) { - paramByte, _ := json.Marshal(preSendHandler.GetRequestParam()) - header := preSendHandler.GetRequestHeader() - responseByte, _ := json.Marshal(preSendHandler.GetResponseData()) - extensionByte, _ := json.Marshal(preSendHandler.GetExtensionData()) - for _, item := range fieldInfo.MappingRuleList { - if rv.setDataValue(reflectValue, fieldInfo, item, header, paramByte, responseByte, extensionByte) { - // 找到数据,并且赋值成功,结束继续查找 - break - } - } -} - -// getInputValue 获取输入的值 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:24 2023/2/2 -func (rv *ReflectValue) setDataValue(reflectVal reflect.Value, fieldInfo *StructField, rule MappingRuleItem, header http.Header, paramByte, responseByte, extensionByte []byte) bool { - inputVal := rv.getInputValue(rule, header, paramByte, responseByte, extensionByte) - if len(inputVal) == 0 { - return false - } - switch fieldInfo.Type { - case reflect.String: - reflectVal.Field(fieldInfo.Idx).SetString(inputVal) - return true - case reflect.Int64: - var i int64 - if err := util.ConvertAssign(&i, inputVal); nil != err { - return false - } - if i == 0 { - return false - } - if fieldInfo.IsPtr { - reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&i)) - } else { - reflectVal.Field(fieldInfo.Idx).SetInt(i) - } - return true - case reflect.Float64: - var f float64 - if err := util.ConvertAssign(&f, inputVal); nil != err { - return false - } - if f == 0 { - return false - } - if fieldInfo.IsPtr { - reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&f)) - } else { - reflectVal.Field(fieldInfo.Idx).SetFloat(f) - } - return true - case reflect.Interface: - reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&inputVal)) - return true - case reflect.Slice: - fallthrough - case reflect.Map: - fallthrough - case reflect.Struct: - var v interface{} - if err := json.Unmarshal([]byte(inputVal), &v); nil != err { - return false - } - reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&v)) - return true - default: - return false - } -} - -// getInputValue 获取输入的值 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:43 2023/2/2 -func (rv *ReflectValue) getInputValue(rule MappingRuleItem, header http.Header, paramByte, responseByte, extensionByte []byte) string { - switch rule.Location { - case MappingLocationHeader: - return header.Get(rule.Field) - case MappingLocationParam: - return gjson.GetBytes(paramByte, rule.Field).String() - case MappingLocationResponse: - return gjson.GetBytes(responseByte, rule.Field).String() - case MappingLocationExtension: - return gjson.GetBytes(extensionByte, rule.Field).String() - case MappingLocationAll: - str := header.Get(rule.Field) - if len(str) == 0 { - str = gjson.GetBytes(paramByte, rule.Field).String() - } - if len(str) == 0 { - str = gjson.GetBytes(responseByte, rule.Field).String() - } - if len(str) == 0 { - str = gjson.GetBytes(extensionByte, rule.Field).String() - } - return str - default: - return "" - } -} diff --git a/send.go b/send.go deleted file mode 100644 index ef717c0..0000000 --- a/send.go +++ /dev/null @@ -1,57 +0,0 @@ -// Package event ... -// -// Description : event ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-02 15:15 -package event - -import ( - "encoding/json" - "git.zhangdeman.cn/zhangdeman/event/abstract" -) - -// SendEvent 发送事件 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 15:16 2023/2/2 -func SendEvent(dataFlag string, data interface{}, preSendHandler abstract.IPreSendHandler, sendEventHandler abstract.ISendEventHandler) SendResult { - if nil == preSendHandler { - preSendHandler = &DefaultPreSendHandler{} - } - - if nil == sendEventHandler { - sendEventHandler = &DefaultSendEventHandler{} - } - res := SendResult{} - - if res.NoSendDetail, res.IsNeedSend = preSendHandler.NeedSend(); !res.IsNeedSend { - sendEventHandler.NoSendCallback(data, res) - return res - } - // 通过反射填充数据 - ReflectValueInstance.Do(dataFlag, data, preSendHandler) - res.Data = data - var ( - byteData []byte - err error - ) - - if byteData, err = json.Marshal(data); nil != err { - res.IsSuccess = false - res.FailReason = err.Error() - sendEventHandler.FailCallback(map[string]interface{}{ - "err": err.Error(), - "reason": "data marshal fail", - }, err) - return res - } - if res.SendResult, err = sendEventHandler.Send(byteData); nil != err { - sendEventHandler.FailCallback(res.SendResult, err) - return res - } - sendEventHandler.SuccessCallback(res.SendResult) - return res -}