消息能力升级 #1
119
abstract/IEvent.go
Normal file
119
abstract/IEvent.go
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
// Package abstract ...
|
||||||
|
//
|
||||||
|
// Description : abstract ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-03-11 12:02
|
||||||
|
package abstract
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/event/define"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EventHandler 事件数据处理函数
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 17:08 2024/6/26
|
||||||
|
type EventHandler func(eventData *define.EventData) (map[string]any, error)
|
||||||
|
|
||||||
|
// SendFailCallback 发送事件失败的回调
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 17:31 2024/6/26
|
||||||
|
type SendFailCallback func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error)
|
||||||
|
|
||||||
|
// SendSuccessCallback 发送事件成功的回调
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 17:32 2024/6/26
|
||||||
|
type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult)
|
||||||
|
|
||||||
|
// ConsumeFailCallbackHandler 时间处理成功回调
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 17:18 2024/6/26
|
||||||
|
type ConsumeFailCallbackHandler func(eventData *define.EventData, handleResult map[string]any, err error)
|
||||||
|
|
||||||
|
// ConsumeSuccessCallback 时间处理失败回调
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 17:20 2024/6/26
|
||||||
|
type ConsumeSuccessCallback func(eventData *define.EventData, handleResult map[string]any)
|
||||||
|
|
||||||
|
// PanicCallback panic回调, 根据不同异常类型, eventData / handleResult 均可能为nil
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:07 2024/6/26
|
||||||
|
type PanicCallback func(err any, eventData *define.EventData, handleResult map[string]any)
|
||||||
|
|
||||||
|
// EventParseFailCallback ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:48 2024/6/27
|
||||||
|
type EventParseFailCallback func(err error, eventData string)
|
||||||
|
|
||||||
|
// IEvent 事件接口定义
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 19:08 2023/8/14
|
||||||
|
type IEvent interface {
|
||||||
|
// SendEvent 发送事件(同步)
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 12:04 2024/3/11
|
||||||
|
SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error)
|
||||||
|
|
||||||
|
// SendEventAsync 发送事件(异步)
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 15:58 2024/6/25
|
||||||
|
SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback)
|
||||||
|
// GetConsumeEventChan 或去消息消费的channel, 自行实现消费
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 17:11 2024/6/26
|
||||||
|
GetConsumeEventChan() (<-chan *define.EventData, error)
|
||||||
|
// ConsumeEvent 消费事件
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 12:05 2024/3/11
|
||||||
|
ConsumeEvent(handler EventHandler, successCallback ConsumeSuccessCallback, failureCallback ConsumeFailCallbackHandler) error
|
||||||
|
// Destroy 事件实例销毁时, 执行的方法
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 12:05 2024/3/11
|
||||||
|
Destroy()
|
||||||
|
// DriverType 获取驱动类型
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 12:06 2024/3/11
|
||||||
|
DriverType() string
|
||||||
|
// SetPanicCallback 设置失败回调的处理函数
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:39 2024/6/27
|
||||||
|
SetPanicCallback(panicCallback PanicCallback)
|
||||||
|
// SetEventParseFailCallback 设置数据解析失败的处理函数
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:51 2024/6/27
|
||||||
|
SetEventParseFailCallback(parseFailCallbackCallback EventParseFailCallback)
|
||||||
|
}
|
@ -1,62 +0,0 @@
|
|||||||
// Package abstract ...
|
|
||||||
//
|
|
||||||
// Description : 事件发送前预处理接口约束
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 2023-02-01 14:06
|
|
||||||
package abstract
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
)
|
|
||||||
|
|
||||||
// IPreSendHandler ...
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:06 2023/2/1
|
|
||||||
type IPreSendHandler interface {
|
|
||||||
// GetRequestParam 构造 base info 时, 可能需要从请求参数中提取公共数据
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:13 2023/2/1
|
|
||||||
GetRequestParam() map[string]interface{}
|
|
||||||
|
|
||||||
// GetRequestHeader ...
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:14 2023/2/1
|
|
||||||
// 构造 base info 时, 可能需要从请求参数中提取公共数据
|
|
||||||
GetRequestHeader() http.Header
|
|
||||||
|
|
||||||
// GetResponseData 响应数据
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:15 2023/2/1
|
|
||||||
GetResponseData() map[string]interface{}
|
|
||||||
|
|
||||||
// GetExtensionData 获取扩展数据
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:19 2023/2/1
|
|
||||||
GetExtensionData() map[string]interface{}
|
|
||||||
|
|
||||||
// GetEventData 获取事件数据, 建议返回结构体或者结构体指针, 内部会自动补齐缺失的数据
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:16 2023/2/1
|
|
||||||
GetEventData() interface{}
|
|
||||||
|
|
||||||
// NeedSend 判断是否需要发送事件, 若不需要发送, 可在第一个返回值中记录不需要发送的原因
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 15:24 2023/2/2
|
|
||||||
NeedSend() (map[string]interface{}, bool)
|
|
||||||
}
|
|
@ -1,47 +0,0 @@
|
|||||||
// Package abstract ...
|
|
||||||
//
|
|
||||||
// Description : abstract ...
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 2023-02-01 14:21
|
|
||||||
package abstract
|
|
||||||
|
|
||||||
import "git.zhangdeman.cn/zhangdeman/event"
|
|
||||||
|
|
||||||
// ISendEventHandler 发送事件处理器
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:21 2023/2/1
|
|
||||||
type ISendEventHandler interface {
|
|
||||||
// Send ...
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:21 2023/2/1
|
|
||||||
// 事件发送成功之后, 可以返回一些业务数据, 这些业务数据会回调给SuccessCallback
|
|
||||||
// 事件发送成功之后, 可以返回一些业务数据 以及 err, 这些业务数据会回调给FailCallback
|
|
||||||
Send(data []byte) (map[string]interface{}, error)
|
|
||||||
|
|
||||||
// SuccessCallback 事件发送成功的回调
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:21 2023/2/1
|
|
||||||
SuccessCallback(data map[string]interface{})
|
|
||||||
|
|
||||||
// FailCallback 事件发送失败的回调
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:22 2023/2/1
|
|
||||||
FailCallback(data map[string]interface{}, err error)
|
|
||||||
|
|
||||||
// NoSendCallback 不需要发送事件的回调
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 15:26 2023/2/2
|
|
||||||
NoSendCallback(data interface{}, res event.SendResult)
|
|
||||||
}
|
|
42
base.go
Normal file
42
base.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
// Package event ...
|
||||||
|
//
|
||||||
|
// Description : event ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-07-17 12:31
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.zhangdeman.cn/zhangdeman/event/abstract"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/event/define"
|
||||||
|
)
|
||||||
|
|
||||||
|
type base struct {
|
||||||
|
panicCallback abstract.PanicCallback
|
||||||
|
parseFailCallback abstract.EventParseFailCallback
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPanicCallback 出现任何panic的回调
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:02 2024/6/26
|
||||||
|
func (b *base) SetPanicCallback(panicCallback abstract.PanicCallback) {
|
||||||
|
if nil == panicCallback {
|
||||||
|
panicCallback = define.DefaultPanicCallback
|
||||||
|
}
|
||||||
|
b.panicCallback = panicCallback
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetEventParseFailCallback 设置事件解析失败回回调函数
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:42 2024/6/27
|
||||||
|
func (b *base) SetEventParseFailCallback(parseFailCallbackCallback abstract.EventParseFailCallback) {
|
||||||
|
if nil == parseFailCallbackCallback {
|
||||||
|
parseFailCallbackCallback = define.DefaultParseFailCallbackFunc
|
||||||
|
}
|
||||||
|
b.parseFailCallback = parseFailCallbackCallback
|
||||||
|
}
|
@ -1,42 +0,0 @@
|
|||||||
// Package event ...
|
|
||||||
//
|
|
||||||
// Description : event ...
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 2023-02-02 16:36
|
|
||||||
package event
|
|
||||||
|
|
||||||
import "net/http"
|
|
||||||
|
|
||||||
// DefaultPreSendHandler IPreSendHandler 默认实现
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 16:36 2023/2/2
|
|
||||||
type DefaultPreSendHandler struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultPreSendHandler) GetRequestParam() map[string]interface{} {
|
|
||||||
return map[string]interface{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultPreSendHandler) GetRequestHeader() http.Header {
|
|
||||||
return make(map[string][]string, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultPreSendHandler) GetResponseData() map[string]interface{} {
|
|
||||||
return map[string]interface{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultPreSendHandler) GetExtensionData() map[string]interface{} {
|
|
||||||
return map[string]interface{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultPreSendHandler) GetEventData() interface{} {
|
|
||||||
return map[string]interface{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultPreSendHandler) NeedSend() (map[string]interface{}, bool) {
|
|
||||||
return map[string]interface{}{}, true
|
|
||||||
}
|
|
@ -1,27 +0,0 @@
|
|||||||
// Package event ...
|
|
||||||
//
|
|
||||||
// Description : event ...
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 2023-02-02 16:39
|
|
||||||
package event
|
|
||||||
|
|
||||||
type DefaultSendEventHandler struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultSendEventHandler) Send(data []byte) (map[string]interface{}, error) {
|
|
||||||
return map[string]interface{}{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultSendEventHandler) SuccessCallback(data map[string]interface{}) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultSendEventHandler) FailCallback(data map[string]interface{}, err error) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DefaultSendEventHandler) NoSendCallback(data interface{}, res SendResult) {
|
|
||||||
|
|
||||||
}
|
|
109
define.go
109
define.go
@ -1,109 +0,0 @@
|
|||||||
// Package event ...
|
|
||||||
//
|
|
||||||
// Description : 各种常量定义
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 2023-02-01 14:23
|
|
||||||
package event
|
|
||||||
|
|
||||||
import "reflect"
|
|
||||||
|
|
||||||
const (
|
|
||||||
// OutEventTag 事件数据输出key的标签
|
|
||||||
OutEventTag = "event"
|
|
||||||
// JsonTag json输出的标签
|
|
||||||
JsonTag = "json"
|
|
||||||
// IgnoreTagValue 不做输出的标签值
|
|
||||||
IgnoreTagValue = "-"
|
|
||||||
// MappingTag 参数映射标签
|
|
||||||
MappingTag = "mapping"
|
|
||||||
// PriorityTag 处理数据时, mapping数据查找的优先级, 默认值 : field
|
|
||||||
PriorityTag = "priority"
|
|
||||||
// OmitemptyTag ...
|
|
||||||
OmitemptyTag = "omitempty"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// MappingLocationAll 自动探测所有路径
|
|
||||||
MappingLocationAll = "all"
|
|
||||||
// MappingLocationParam 从参数读取
|
|
||||||
MappingLocationParam = "param"
|
|
||||||
// MappingLocationHeader 从请求header读取
|
|
||||||
MappingLocationHeader = "header"
|
|
||||||
// MappingLocationResponse 从响应数据读取
|
|
||||||
MappingLocationResponse = "response"
|
|
||||||
// MappingLocationExtension 从扩展数据读取
|
|
||||||
MappingLocationExtension = "extension"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
mappingLocationList = []string{
|
|
||||||
MappingLocationHeader,
|
|
||||||
MappingLocationParam,
|
|
||||||
MappingLocationResponse,
|
|
||||||
MappingLocationExtension,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
// SetMappingLocationList 只持续该默认的查找优先级
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 18:17 2023/2/1
|
|
||||||
func SetMappingLocationList(locationList []string) {
|
|
||||||
if len(locationList) > 0 {
|
|
||||||
mappingLocationList = locationList
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// MappingRuleItem 规则
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 17:36 2023/2/1
|
|
||||||
type MappingRuleItem struct {
|
|
||||||
Location string `json:"location"` // 数据所在位置, header-请求头 param-参数获取 response-响应数据获取 extension-扩展数据读取 all-自动按照header/param/response/extension的顺序查询
|
|
||||||
Field string `json:"field"` // 查询的字段名称
|
|
||||||
}
|
|
||||||
|
|
||||||
// StructField 结构体字段信息
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:35 2023/2/1
|
|
||||||
type StructField struct {
|
|
||||||
Idx int // 字段在结构体的索引
|
|
||||||
Name string // 字段名
|
|
||||||
Type reflect.Kind // 字段类型
|
|
||||||
IsPtr bool // 是否为指针
|
|
||||||
JsonTag string // json标签
|
|
||||||
EventTag string // 事件标签
|
|
||||||
MappingRuleList []MappingRuleItem // 规则列表
|
|
||||||
}
|
|
||||||
|
|
||||||
// StructInfo 结构体信息
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 16:17 2023/2/1
|
|
||||||
type StructInfo struct {
|
|
||||||
Flag string // 结构体标识
|
|
||||||
IsStruct bool // 是否为结构体
|
|
||||||
IsStructPtr bool // 是否为结构体指针
|
|
||||||
StructFieldList []*StructField // 结构体字段列表
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendResult ...
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 15:27 2023/2/2
|
|
||||||
type SendResult struct {
|
|
||||||
IsNeedSend bool `json:"is_need_send"` // 是否需要发送
|
|
||||||
NoSendDetail map[string]interface{} `json:"no_send_detail"` // 未发送详细原因
|
|
||||||
Data interface{} `json:"data"` // 发送的数据
|
|
||||||
IsSuccess bool `json:"is_success"` // 是否处理成功
|
|
||||||
FailReason string `json:"fail_reason"` // 失败原因
|
|
||||||
SendResult map[string]interface{} `json:"send_result"` // 发送结果
|
|
||||||
}
|
|
14
define/consts.go
Normal file
14
define/consts.go
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
// Package define ...
|
||||||
|
//
|
||||||
|
// Description : define ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-03-11 12:06
|
||||||
|
package define
|
||||||
|
|
||||||
|
const (
|
||||||
|
DriverTypeEtcd = "ETCD" // etcd驱动
|
||||||
|
DriverTypeKafka = "KAFKA" // kafka驱动
|
||||||
|
DriverTypeREDIS = "REDIS" // redis驱动
|
||||||
|
)
|
37
define/data.go
Normal file
37
define/data.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
// Package define ...
|
||||||
|
//
|
||||||
|
// Description : define ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-03-11 11:40
|
||||||
|
package define
|
||||||
|
|
||||||
|
// EventData ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 11:41 2024/3/11
|
||||||
|
type EventData struct {
|
||||||
|
EventType string `json:"event_type"` // 事件类型
|
||||||
|
TraceID string `json:"trace_id"` // 事件追踪ID
|
||||||
|
Host string `json:"host"` // 触发事件host
|
||||||
|
Timestamp int64 `json:"timestamp"` // 触发时间,纳秒级时间戳
|
||||||
|
SystemTimestamp int64 `json:"system_timestamp"` // 发送时的系统时间
|
||||||
|
Key string `json:"key"` // 会基于当前值进行hash, 决定消息分区, 不指定则随机生成
|
||||||
|
Data any `json:"data"` // 发送的数据
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendResult 发送结果
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 15:56 2024/6/25
|
||||||
|
type SendResult struct {
|
||||||
|
Data *EventData `json:"data"` // 发送的数据
|
||||||
|
Partition int `json:"partition_num"` // 分区索引编号
|
||||||
|
Topic string `json:"topic"` // 使用的真实topic
|
||||||
|
IsSuccess bool `json:"is_success"` // 是否发送成功
|
||||||
|
FailReason string `json:"fail_reason"` // 失败原因
|
||||||
|
Extension map[string]any `json:"extension"` // 扩展数据
|
||||||
|
}
|
59
define/handler.go
Normal file
59
define/handler.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
// Package define ...
|
||||||
|
//
|
||||||
|
// Description : define ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-06-26 17:21
|
||||||
|
package define
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
// DefaultSuccessCallbackHandler ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 17:21 2024/6/26
|
||||||
|
func DefaultSuccessCallbackHandler(eventData *EventData, handleResult map[string]any) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultFailCallbackHandler ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:11 2024/6/26
|
||||||
|
func DefaultFailCallbackHandler(eventData *EventData, handleResult map[string]any, err error) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultPanicCallback ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:11 2024/6/26
|
||||||
|
func DefaultPanicCallback(err any, eventData *EventData, handleResult map[string]any) {}
|
||||||
|
|
||||||
|
// DefaultSendFailCallback ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 11:45 2024/6/27
|
||||||
|
func DefaultSendFailCallback(ctx context.Context, eventData *EventData, eventResult *SendResult, err error) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultSendSuccessCallback ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 11:46 2024/6/27
|
||||||
|
func DefaultSendSuccessCallback(ctx context.Context, eventResult *SendResult) {}
|
||||||
|
|
||||||
|
// DefaultParseFailCallbackFunc ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:49 2024/6/27
|
||||||
|
func DefaultParseFailCallbackFunc(err error, eventData string) {
|
||||||
|
|
||||||
|
}
|
23
define/memory.go
Normal file
23
define/memory.go
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
// Package define ...
|
||||||
|
//
|
||||||
|
// Description : define ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-07-17 11:25
|
||||||
|
package define
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultMemoryChannelSize = 1024 // 默认的消息channel大小
|
||||||
|
DefaultMemoryCloseMaxWaitTime = 5000 // 默认最大等待 : 5s
|
||||||
|
)
|
||||||
|
|
||||||
|
// MemoryEventConfig 内存事件的配置
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 11:26 2024/7/17
|
||||||
|
type MemoryEventConfig struct {
|
||||||
|
MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小
|
||||||
|
CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms
|
||||||
|
}
|
27
define/redis.go
Normal file
27
define/redis.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
// Package define ...
|
||||||
|
//
|
||||||
|
// Description : define ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-06-25 16:24
|
||||||
|
package define
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultRedisPartitionNum = 1
|
||||||
|
DefaultRedisTopic = "EVENT_TOPIC_C6DBE0AAE846C5C0DE35802107326B1E"
|
||||||
|
DefaultRedisMessageBufferSize = 1024
|
||||||
|
DefaultRedisCloseMaxWaitTime = 5000 // 默认最大等待 : 5s
|
||||||
|
)
|
||||||
|
|
||||||
|
// RedisEventPubSubConfig redis事件配置
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 16:25 2024/6/25
|
||||||
|
type RedisEventPubSubConfig struct {
|
||||||
|
Topic string `json:"topic"` // topic key, 不指定随机生成
|
||||||
|
PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1
|
||||||
|
MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小
|
||||||
|
CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms
|
||||||
|
}
|
17
go.mod
17
go.mod
@ -5,20 +5,25 @@ go 1.21
|
|||||||
toolchain go1.22.1
|
toolchain go1.22.1
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253
|
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de
|
||||||
github.com/tidwall/gjson v1.17.1
|
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd
|
||||||
|
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50
|
||||||
|
github.com/redis/go-redis/v9 v9.5.3
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Jeffail/gabs v1.4.0 // indirect
|
git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 // indirect
|
||||||
|
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect
|
||||||
|
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect
|
||||||
|
github.com/BurntSushi/toml v1.4.0 // indirect
|
||||||
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect
|
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
github.com/go-ini/ini v1.67.0 // indirect
|
github.com/go-ini/ini v1.67.0 // indirect
|
||||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
||||||
github.com/mozillazg/go-pinyin v0.20.0 // indirect
|
github.com/mozillazg/go-pinyin v0.20.0 // indirect
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
|
||||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||||
|
github.com/tidwall/gjson v1.17.1 // indirect
|
||||||
github.com/tidwall/match v1.1.1 // indirect
|
github.com/tidwall/match v1.1.1 // indirect
|
||||||
github.com/tidwall/pretty v1.2.1 // indirect
|
github.com/tidwall/pretty v1.2.1 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
|
48
go.sum
48
go.sum
@ -1,45 +1,53 @@
|
|||||||
git.zhangdeman.cn/zhangdeman/util v0.0.0-20230113095943-b4b3e261e0c4 h1:1WclY9P8l8o/NZ3ZR/mupm8LtowjQ/Q4UNGXR32f0OQ=
|
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 h1:+o+BI5GGlwJelPLWL8ciDJqxw/G8dv+FU6OztGSnEjo=
|
||||||
git.zhangdeman.cn/zhangdeman/util v0.0.0-20230113095943-b4b3e261e0c4/go.mod h1:zTir/0IWdK3E7n0GiaogyWHADAQnBtTdl2I6Z2/OPqw=
|
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k=
|
||||||
git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253 h1:GO3oZa5a2sqwAzGcLDJtQzmshSWRmoP7IDS8bwFqvC4=
|
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de h1:ksjcMHupU0Bw0BJxJp3dajmWqGdqV7k2eVohN5O3S9Q=
|
||||||
git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI=
|
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k=
|
||||||
github.com/Jeffail/gabs v1.4.0 h1://5fYRRTq1edjfIrQGvdkcd22pkYUrHZ5YC/H2GJVAo=
|
git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 h1:I/wOsRpCSRkU9vo1u703slQsmK0wnNeZzsWQOGtIAG0=
|
||||||
github.com/Jeffail/gabs v1.4.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc=
|
git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211/go.mod h1:SrtvrQRdzt+8KfYzvosH++gWxo2ShPTzR1m3VQ6uX7U=
|
||||||
|
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 h1:gUDlQMuJ4xNfP2Abl1Msmpa3fASLWYkNlqDFF/6GN0Y=
|
||||||
|
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI=
|
||||||
|
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd h1:2Y37waOVCmVvx0Rp8VGEptE2/2JVMImtxB4dKKDk/3w=
|
||||||
|
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd/go.mod h1:6+7whkCmb4sJDIfH3HxNuXRveaM0gCCNWd2uXZqNtIE=
|
||||||
|
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e h1:Q973S6CcWr1ICZhFI1STFOJ+KUImCl2BaIXm6YppBqI=
|
||||||
|
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI=
|
||||||
|
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50 h1:olo34i2Gq5gX7bYPv5TR4X5l5CrYFtu9UCElkYlmL2c=
|
||||||
|
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50/go.mod h1:US/pcq2vstE3iyxIHf53w8IeXKkZys7bj/ozLWkRYeE=
|
||||||
|
github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0=
|
||||||
|
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
|
||||||
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ=
|
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ=
|
||||||
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg=
|
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||||
|
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||||
|
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||||
|
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/go-ini/ini v1.66.6 h1:h6k2Bb0HWS/BXXHCXj4QHjxPmlIU4NK+7MuLp9SD+4k=
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||||
github.com/go-ini/ini v1.66.6/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||||
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
|
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
|
||||||
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
|
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
|
||||||
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
|
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
|
||||||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||||
github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ=
|
github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ=
|
||||||
github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc=
|
github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc=
|
||||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
|
||||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
|
||||||
|
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
|
||||||
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
|
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
|
||||||
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
|
||||||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
|
|
||||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
|
||||||
github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo=
|
|
||||||
github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
|
||||||
github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U=
|
github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U=
|
||||||
github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||||
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||||
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||||
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
|
|
||||||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||||
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
|
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
|
||||||
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
180
memory.go
Normal file
180
memory.go
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
// Package event ...
|
||||||
|
//
|
||||||
|
// Description : event ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-07-17 11:21
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/consts"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/event/abstract"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/event/define"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
MemoryEventClient abstract.IEvent
|
||||||
|
)
|
||||||
|
|
||||||
|
// InitMemoryEvent 初始化事件实例
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 11:24 2024/7/17
|
||||||
|
func InitMemoryEvent(cfg *define.MemoryEventConfig) {
|
||||||
|
instance := &MemoryEvent{
|
||||||
|
base: &base{
|
||||||
|
panicCallback: define.DefaultPanicCallback,
|
||||||
|
parseFailCallback: define.DefaultParseFailCallbackFunc,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
instance.Init(cfg)
|
||||||
|
MemoryEventClient = instance
|
||||||
|
}
|
||||||
|
|
||||||
|
// MemoryEvent 基于内存实现的消息队列(仅适用于单机)
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 11:22 2024/7/17
|
||||||
|
type MemoryEvent struct {
|
||||||
|
*base
|
||||||
|
cfg *define.MemoryEventConfig
|
||||||
|
messageChannel chan *define.EventData
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendEvent 发送事件
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 12:28 2024/7/17
|
||||||
|
func (m *MemoryEvent) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) {
|
||||||
|
if nil == eventData {
|
||||||
|
return nil, errors.New("event data is nil")
|
||||||
|
}
|
||||||
|
m.messageChannel <- eventData
|
||||||
|
return &define.SendResult{
|
||||||
|
Data: eventData,
|
||||||
|
Partition: 0,
|
||||||
|
Topic: "",
|
||||||
|
IsSuccess: true,
|
||||||
|
FailReason: "success",
|
||||||
|
Extension: nil,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendEventAsync 异步发送事件
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 12:29 2024/7/17
|
||||||
|
func (m *MemoryEvent) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) {
|
||||||
|
if nil == sendFailCallback {
|
||||||
|
sendFailCallback = define.DefaultSendFailCallback
|
||||||
|
}
|
||||||
|
if nil == sendSuccessCallback {
|
||||||
|
sendSuccessCallback = define.DefaultSendSuccessCallback
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
var (
|
||||||
|
res *define.SendResult
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); nil != r {
|
||||||
|
m.panicCallback(err, eventData, map[string]any{
|
||||||
|
"send_result": res,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
res, err = m.SendEvent(ctx, eventData)
|
||||||
|
|
||||||
|
if nil != err {
|
||||||
|
sendFailCallback(ctx, eventData, res, err)
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
sendSuccessCallback(ctx, res)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConsumeEventChan 消费的消息chan
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 12:41 2024/7/17
|
||||||
|
func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) {
|
||||||
|
return m.messageChannel, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConsumeEvent 消费事件
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 14:14 2024/7/17
|
||||||
|
func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error {
|
||||||
|
messageChan, _ := m.GetConsumeEventChan()
|
||||||
|
if nil == successCallback {
|
||||||
|
successCallback = define.DefaultSuccessCallbackHandler
|
||||||
|
}
|
||||||
|
if nil == failureCallback {
|
||||||
|
failureCallback = define.DefaultFailCallbackHandler
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); nil != r {
|
||||||
|
fmt.Println("出现异常", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for eventData := range messageChan {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
handleResult map[string]any
|
||||||
|
)
|
||||||
|
if handleResult, err = handler(eventData); nil != err {
|
||||||
|
failureCallback(eventData, handleResult, err)
|
||||||
|
} else {
|
||||||
|
successCallback(eventData, handleResult)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Destroy 实例销毁
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 14:17 2024/7/17
|
||||||
|
func (m *MemoryEvent) Destroy() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MemoryEvent) DriverType() string {
|
||||||
|
return consts.EventDriverMemory
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init 初始化
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 12:08 2024/7/17
|
||||||
|
func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) {
|
||||||
|
if nil == cfg {
|
||||||
|
cfg = &define.MemoryEventConfig{}
|
||||||
|
}
|
||||||
|
if cfg.MessageBufferSize <= 0 {
|
||||||
|
cfg.MessageBufferSize = define.DefaultMemoryChannelSize
|
||||||
|
}
|
||||||
|
if cfg.CloseMaxWaitTime <= 0 {
|
||||||
|
cfg.CloseMaxWaitTime = define.DefaultMemoryCloseMaxWaitTime
|
||||||
|
}
|
||||||
|
m.cfg = cfg
|
||||||
|
// 初始化内存 channel
|
||||||
|
m.messageChannel = make(chan *define.EventData, cfg.MessageBufferSize)
|
||||||
|
}
|
50
memory_test.go
Normal file
50
memory_test.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
// Package event ...
|
||||||
|
//
|
||||||
|
// Description : event ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-07-17 14:17
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/event/define"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInitMemoryEvent(t *testing.T) {
|
||||||
|
InitMemoryEvent(&define.MemoryEventConfig{
|
||||||
|
MessageBufferSize: 1024,
|
||||||
|
CloseMaxWaitTime: 5000,
|
||||||
|
})
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
MemoryEventClient.SendEventAsync(context.Background(), &define.EventData{
|
||||||
|
EventType: "TEST",
|
||||||
|
TraceID: time.Now().Format("2006-01-02 15:04:05"),
|
||||||
|
Host: "",
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
SystemTimestamp: 0,
|
||||||
|
Key: "",
|
||||||
|
Data: nil,
|
||||||
|
}, func(ctx context.Context, eventResult *define.SendResult) {
|
||||||
|
fmt.Println("消息发送成功")
|
||||||
|
}, func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error) {
|
||||||
|
fmt.Println("消息发送失败")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
MemoryEventClient.ConsumeEvent(func(eventData *define.EventData) (map[string]any, error) {
|
||||||
|
byteData, _ := json.Marshal(eventData)
|
||||||
|
fmt.Println(string(byteData))
|
||||||
|
return map[string]any{}, nil
|
||||||
|
}, nil, nil)
|
||||||
|
for {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
258
redis_pub_sub.go
Normal file
258
redis_pub_sub.go
Normal file
@ -0,0 +1,258 @@
|
|||||||
|
// Package event ...
|
||||||
|
//
|
||||||
|
// Description : event ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-06-25 16:06
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/consts"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/event/abstract"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/event/define"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/serialize"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/wrapper"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
RedisEventPubSubClient abstract.IEvent
|
||||||
|
)
|
||||||
|
|
||||||
|
// InitRedisPubSubEvent 初始化redis事件驱动, 基于 pub / sub 指令
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 16:24 2024/6/25
|
||||||
|
func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) {
|
||||||
|
instance := &RedisEventPubSub{
|
||||||
|
base: &base{
|
||||||
|
panicCallback: define.DefaultPanicCallback,
|
||||||
|
parseFailCallback: define.DefaultParseFailCallbackFunc,
|
||||||
|
},
|
||||||
|
redisClient: redisClient,
|
||||||
|
pubSubConfig: pubSubConfig,
|
||||||
|
}
|
||||||
|
instance.SetRedisClient(redisClient, pubSubConfig)
|
||||||
|
instance.StartConsumer() // 启动消费者
|
||||||
|
RedisEventPubSubClient = instance
|
||||||
|
}
|
||||||
|
|
||||||
|
// RedisEventPubSub 基于redis的事件驱动
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 16:07 2024/6/25
|
||||||
|
type RedisEventPubSub struct {
|
||||||
|
*base
|
||||||
|
redisClient *redis.Client // redis客户端
|
||||||
|
pubSubConfig *define.RedisEventPubSubConfig // 事件配置
|
||||||
|
messageChan chan *define.EventData // 消息队列
|
||||||
|
stopConsumer chan bool // 停止消费者
|
||||||
|
isStop bool // 是否已停止
|
||||||
|
parseFailCallback abstract.EventParseFailCallback // 数据解析失败回调
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendEvent 发布时间
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 10:58 2024/6/27
|
||||||
|
func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) {
|
||||||
|
partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(r.pubSubConfig.PartitionNum))
|
||||||
|
realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, partition)
|
||||||
|
res := r.redisClient.Publish(ctx, realTopic, eventData)
|
||||||
|
if nil == res {
|
||||||
|
return nil, errors.New("can not get send event result")
|
||||||
|
}
|
||||||
|
return &define.SendResult{
|
||||||
|
Data: eventData,
|
||||||
|
Partition: partition,
|
||||||
|
Topic: realTopic,
|
||||||
|
IsSuccess: res.Err() == nil,
|
||||||
|
FailReason: wrapper.TernaryOperator.String(res.Err() == nil, "success", wrapper.String(res.Err().Error())).Value(),
|
||||||
|
Extension: nil,
|
||||||
|
}, res.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendEventAsync 异步发送事件
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 11:29 2024/6/27
|
||||||
|
func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) {
|
||||||
|
go func() {
|
||||||
|
if nil == sendFailCallback {
|
||||||
|
sendFailCallback = define.DefaultSendFailCallback
|
||||||
|
}
|
||||||
|
if nil == sendSuccessCallback {
|
||||||
|
sendSuccessCallback = define.DefaultSendSuccessCallback
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
sendResult *define.SendResult
|
||||||
|
handlerErr error
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
if panicErr := recover(); nil != panicErr {
|
||||||
|
r.panicCallback(panicErr, eventData, map[string]any{
|
||||||
|
"send_result": sendResult,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if sendResult, handlerErr = r.SendEvent(ctx, eventData); nil != handlerErr {
|
||||||
|
sendFailCallback(ctx, eventData, sendResult, handlerErr)
|
||||||
|
} else {
|
||||||
|
sendSuccessCallback(ctx, sendResult)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConsumeEventChan 获取消息channel, 自行实现消费
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 17:13 2024/6/26
|
||||||
|
func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, error) {
|
||||||
|
if r.isStop {
|
||||||
|
return nil, errors.New("event instance has stop")
|
||||||
|
}
|
||||||
|
return r.messageChan, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConsumeEvent 获取数据消费实例
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 17:06 2024/6/26
|
||||||
|
func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error {
|
||||||
|
if nil == handler {
|
||||||
|
return errors.New("handler is nil")
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
messageChan <-chan *define.EventData
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if messageChan, err = r.GetConsumeEventChan(); nil != err {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if nil == failureCallback {
|
||||||
|
failureCallback = define.DefaultFailCallbackHandler
|
||||||
|
}
|
||||||
|
if nil == successCallback {
|
||||||
|
successCallback = define.DefaultSuccessCallbackHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
for eventData := range messageChan {
|
||||||
|
handlerResult, handleErr := handler(eventData)
|
||||||
|
if nil != handleErr {
|
||||||
|
failureCallback(eventData, handlerResult, err)
|
||||||
|
} else {
|
||||||
|
successCallback(eventData, handlerResult)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartConsumer 启动消费者
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 18:36 2024/6/27
|
||||||
|
func (r *RedisEventPubSub) StartConsumer() {
|
||||||
|
for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ {
|
||||||
|
go func(realPartition int) {
|
||||||
|
defer func() {
|
||||||
|
if panicErr := recover(); nil != panicErr {
|
||||||
|
r.panicCallback(panicErr, nil, nil)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// 启动消费者
|
||||||
|
realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, realPartition)
|
||||||
|
messageChannel := r.redisClient.Subscribe(context.Background(), realTopic).Channel()
|
||||||
|
for message := range messageChannel {
|
||||||
|
var (
|
||||||
|
eventData define.EventData
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if err = serialize.JSON.UnmarshalWithNumber([]byte(message.Payload), &eventData); nil != err {
|
||||||
|
// TODO : 事件数据解析失败的回调
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
r.messageChan <- &eventData
|
||||||
|
}
|
||||||
|
}(partition)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Destroy 销毁事件实例
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 15:42 2024/6/26
|
||||||
|
func (r *RedisEventPubSub) Destroy() {
|
||||||
|
if r.isStop {
|
||||||
|
// 已停止
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.stopConsumer <- true // 停止消费者
|
||||||
|
messageChan := make(chan bool, 1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
if len(r.messageChan) == 0 {
|
||||||
|
messageChan <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Millisecond * time.Duration(r.pubSubConfig.CloseMaxWaitTime)):
|
||||||
|
// 定时器到期
|
||||||
|
return
|
||||||
|
case <-messageChan:
|
||||||
|
// 没有待消费数据
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RedisEventPubSub) DriverType() string {
|
||||||
|
return consts.EventDriverRedis
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRedisClient 设置redis客户端
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 16:30 2024/6/25
|
||||||
|
func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) {
|
||||||
|
if nil == pubSubConfig {
|
||||||
|
pubSubConfig = &define.RedisEventPubSubConfig{
|
||||||
|
Topic: "",
|
||||||
|
PartitionNum: 0,
|
||||||
|
MessageBufferSize: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(pubSubConfig.Topic) == 0 {
|
||||||
|
pubSubConfig.Topic = define.DefaultRedisTopic
|
||||||
|
}
|
||||||
|
if pubSubConfig.PartitionNum <= 0 {
|
||||||
|
pubSubConfig.PartitionNum = define.DefaultRedisPartitionNum
|
||||||
|
}
|
||||||
|
if pubSubConfig.MessageBufferSize <= 0 {
|
||||||
|
r.pubSubConfig.MessageBufferSize = define.DefaultRedisMessageBufferSize
|
||||||
|
}
|
||||||
|
if pubSubConfig.CloseMaxWaitTime <= 0 {
|
||||||
|
r.pubSubConfig.CloseMaxWaitTime = define.DefaultRedisCloseMaxWaitTime
|
||||||
|
}
|
||||||
|
r.redisClient = redisClient
|
||||||
|
r.pubSubConfig = pubSubConfig
|
||||||
|
r.stopConsumer = make(chan bool, 1)
|
||||||
|
r.messageChan = make(chan *define.EventData, r.pubSubConfig.MessageBufferSize)
|
||||||
|
}
|
19
redis_pub_sub_test.go
Normal file
19
redis_pub_sub_test.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
// Package event ...
|
||||||
|
//
|
||||||
|
// Description : event ...
|
||||||
|
//
|
||||||
|
// Author : go_developer@163.com<白茶清欢>
|
||||||
|
//
|
||||||
|
// Date : 2024-06-25 16:51
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"git.zhangdeman.cn/zhangdeman/wrapper"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInitRedisPubSubEvent(t *testing.T) {
|
||||||
|
fmt.Println(strings.ToUpper(wrapper.StringFromRandom(128, "").Md5().Value))
|
||||||
|
}
|
440
reflect.go
440
reflect.go
@ -1,440 +0,0 @@
|
|||||||
// Package event ...
|
|
||||||
//
|
|
||||||
// Description : event ...
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 2023-02-01 14:30
|
|
||||||
package event
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"git.zhangdeman.cn/zhangdeman/event/abstract"
|
|
||||||
"git.zhangdeman.cn/zhangdeman/util"
|
|
||||||
"github.com/tidwall/gjson"
|
|
||||||
"net/http"
|
|
||||||
"reflect"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// ReflectTypeInstance 反射实例
|
|
||||||
ReflectTypeInstance *ReflectType
|
|
||||||
ReflectValueInstance *ReflectValue
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
ReflectTypeInstance = &ReflectType{
|
|
||||||
lock: &sync.RWMutex{},
|
|
||||||
cacheTable: make(map[string]*StructInfo),
|
|
||||||
}
|
|
||||||
ReflectValueInstance = &ReflectValue{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReflectType 反射数据类型
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:31 2023/2/1
|
|
||||||
type ReflectType struct {
|
|
||||||
// 数据锁
|
|
||||||
lock *sync.RWMutex
|
|
||||||
// 反射结果缓存
|
|
||||||
cacheTable map[string]*StructInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do 反射获取数据类型
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:34 2023/2/1
|
|
||||||
//
|
|
||||||
// 为特定结构体生成全局唯一的标识, 并进行缓存, 加速反射结果获取
|
|
||||||
func (rt *ReflectType) Do(dataFlag string, data interface{}) *StructInfo {
|
|
||||||
rt.lock.Lock()
|
|
||||||
defer rt.lock.Unlock()
|
|
||||||
if cacheResult, exist := rt.cacheTable[dataFlag]; exist {
|
|
||||||
// 缓存存在, 直接是有缓存结果
|
|
||||||
return cacheResult
|
|
||||||
}
|
|
||||||
// 缓存不存在, 解析
|
|
||||||
res := &StructInfo{
|
|
||||||
Flag: dataFlag,
|
|
||||||
IsStruct: false,
|
|
||||||
IsStructPtr: false,
|
|
||||||
StructFieldList: make([]*StructField, 0),
|
|
||||||
}
|
|
||||||
isPtr := false
|
|
||||||
reflectType := reflect.TypeOf(data)
|
|
||||||
if reflectType.Kind() == reflect.Ptr {
|
|
||||||
isPtr = true
|
|
||||||
reflectType = reflectType.Elem()
|
|
||||||
}
|
|
||||||
if reflectType.Kind() != reflect.Struct {
|
|
||||||
// 非结构体,无需反射
|
|
||||||
rt.cacheTable[dataFlag] = res
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
res.IsStruct = true
|
|
||||||
res.IsStructPtr = isPtr
|
|
||||||
for idx := 0; idx < reflectType.NumField(); idx++ {
|
|
||||||
field := &StructField{
|
|
||||||
Idx: idx,
|
|
||||||
Name: reflectType.Field(idx).Name,
|
|
||||||
JsonTag: reflectType.Field(idx).Tag.Get(JsonTag),
|
|
||||||
EventTag: reflectType.Field(idx).Tag.Get(OutEventTag),
|
|
||||||
MappingRuleList: make([]MappingRuleItem, 0),
|
|
||||||
}
|
|
||||||
rt.fillFieldType(field, reflectType.Field(idx).Type)
|
|
||||||
rt.fillMappingRule(field, reflectType.Field(idx).Tag.Get(MappingTag))
|
|
||||||
res.StructFieldList = append(res.StructFieldList, field)
|
|
||||||
}
|
|
||||||
rt.cacheTable[dataFlag] = res
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
// fillFieldType 填充字段类型
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 20:46 2023/2/1
|
|
||||||
func (rt *ReflectType) fillFieldType(field *StructField, dataType reflect.Type) {
|
|
||||||
switch dataType.Kind() {
|
|
||||||
case reflect.Float32:
|
|
||||||
fallthrough
|
|
||||||
case reflect.Float64:
|
|
||||||
field.Type = reflect.Float64
|
|
||||||
case reflect.String:
|
|
||||||
field.Type = reflect.String
|
|
||||||
case reflect.Struct:
|
|
||||||
field.Type = reflect.Struct
|
|
||||||
case reflect.Slice:
|
|
||||||
field.Type = reflect.Slice
|
|
||||||
case reflect.Map:
|
|
||||||
field.Type = reflect.Map
|
|
||||||
case reflect.Ptr:
|
|
||||||
field.IsPtr = true
|
|
||||||
// 指针再次判断基础类型
|
|
||||||
field.Type = dataType.Elem().Kind()
|
|
||||||
default:
|
|
||||||
if strings.Contains(dataType.String(), "int") {
|
|
||||||
field.Type = reflect.Int64
|
|
||||||
} else {
|
|
||||||
field.Type = reflect.Interface
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// fillTagInfo 填充标签信息
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 17:11 2023/2/1
|
|
||||||
func (rt *ReflectType) fillTagInfo(field *StructField) {
|
|
||||||
if len(field.JsonTag) == 0 {
|
|
||||||
field.JsonTag = field.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
// jsonTag 去掉 omitempty
|
|
||||||
jsonTagValArr := strings.Split(field.JsonTag, ",")
|
|
||||||
for _, item := range jsonTagValArr {
|
|
||||||
if len(item) > 0 && item != OmitemptyTag {
|
|
||||||
field.JsonTag = item
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 没有设置event tag,则和 json tag保持一致
|
|
||||||
if len(field.EventTag) == 0 {
|
|
||||||
field.EventTag = field.JsonTag
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// fillMappingRule 解析参数映射规则
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 17:14 2023/2/1
|
|
||||||
//
|
|
||||||
// mapping:"user_id:param#user_id|header#id"
|
|
||||||
func (rt *ReflectType) fillMappingRule(field *StructField, inputMappingVal string) {
|
|
||||||
|
|
||||||
if len(inputMappingVal) == 0 {
|
|
||||||
// 没有指定规则, 有默认规则
|
|
||||||
for _, location := range mappingLocationList {
|
|
||||||
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
|
|
||||||
Location: location,
|
|
||||||
Field: field.JsonTag,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
mappingArr := strings.Split(inputMappingVal, ",")
|
|
||||||
for _, item := range mappingArr {
|
|
||||||
item = strings.TrimSpace(item)
|
|
||||||
// 要赋值的字段名
|
|
||||||
itemArr := strings.Split(item, ":")
|
|
||||||
if len(itemArr) != 2 {
|
|
||||||
// 配置格式错误, 跳过
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
mapRuleArr := strings.Split(strings.TrimSpace(itemArr[1]), "|")
|
|
||||||
for _, itemMapRule := range mapRuleArr {
|
|
||||||
itemMapRule = strings.TrimLeft(itemMapRule, "#")
|
|
||||||
itemMapRuleArr := strings.Split(itemMapRule, "#")
|
|
||||||
// 注意 : # 为特殊分隔符, 如配置成 mapping:"project_id:#source_project_id#xxx_project_id" 实际等价于 mapping:"project_id:#source_project_id" 多余配置自动跳过
|
|
||||||
if len(itemMapRuleArr[0]) < 2 {
|
|
||||||
// 没有指定位置,默认all, 即配置格式: mapping:"project_id:#source_project_id"
|
|
||||||
itemMapRuleArr[0] = MappingLocationAll
|
|
||||||
itemMapRuleArr = []string{MappingLocationAll, itemMapRuleArr[0]}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch itemMapRuleArr[0] {
|
|
||||||
// 从header读取
|
|
||||||
case MappingLocationHeader:
|
|
||||||
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
|
|
||||||
Location: MappingLocationHeader,
|
|
||||||
Field: itemMapRuleArr[1],
|
|
||||||
})
|
|
||||||
// 从请求参数读取
|
|
||||||
case MappingLocationParam:
|
|
||||||
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
|
|
||||||
Location: MappingLocationParam,
|
|
||||||
Field: itemMapRuleArr[1],
|
|
||||||
})
|
|
||||||
// 从响应数据读取
|
|
||||||
case MappingLocationResponse:
|
|
||||||
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
|
|
||||||
Location: MappingLocationResponse,
|
|
||||||
Field: itemMapRuleArr[1],
|
|
||||||
})
|
|
||||||
// 从扩展数据读取
|
|
||||||
case MappingLocationExtension:
|
|
||||||
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
|
|
||||||
Location: MappingLocationExtension,
|
|
||||||
Field: itemMapRuleArr[1],
|
|
||||||
})
|
|
||||||
// 全部读取一遍
|
|
||||||
case MappingLocationAll:
|
|
||||||
fallthrough
|
|
||||||
default:
|
|
||||||
for _, itemLocation := range mappingLocationList {
|
|
||||||
field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{
|
|
||||||
Location: itemLocation,
|
|
||||||
Field: itemMapRuleArr[1],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReflectValue 反射值的实例
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 10:53 2023/2/2
|
|
||||||
type ReflectValue struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do 通过反射机制,对data进行数据填充,此逻辑要求 data 必须是结构体或者结构体指针
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 11:52 2023/2/2
|
|
||||||
func (rv *ReflectValue) Do(dataFlag string, data interface{}, preSendHandler abstract.IPreSendHandler) {
|
|
||||||
structInfo := ReflectTypeInstance.Do(dataFlag, data)
|
|
||||||
if !structInfo.IsStruct {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
reflectValue := reflect.ValueOf(data)
|
|
||||||
if structInfo.IsStructPtr {
|
|
||||||
reflectValue = reflectValue.Elem()
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, fieldInfo := range structInfo.StructFieldList {
|
|
||||||
if !rv.isZeroInputFieldValue(reflectValue, fieldInfo) {
|
|
||||||
// 不是零值, 无需处理
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// 是零值, 填充默认值
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// isZeroInputFieldValue 判断对应的字段是否为对应类型默认的零值
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 12:04 2023/2/2
|
|
||||||
func (rv *ReflectValue) isZeroInputFieldValue(reflectValue reflect.Value, fieldInfo *StructField) bool {
|
|
||||||
inputVal := reflectValue.Field(fieldInfo.Idx).Interface()
|
|
||||||
switch fieldInfo.Type {
|
|
||||||
case reflect.Float64:
|
|
||||||
var f float64
|
|
||||||
if err := util.ConvertAssign(&f, inputVal); nil == err {
|
|
||||||
if f != 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case reflect.String:
|
|
||||||
var s string
|
|
||||||
if err := util.ConvertAssign(&s, inputVal); nil == err {
|
|
||||||
if len(s) > 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case reflect.Int64:
|
|
||||||
var i int64
|
|
||||||
if err := util.ConvertAssign(&i, inputVal); nil == err {
|
|
||||||
if i != 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case reflect.Map:
|
|
||||||
if nil == inputVal {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
var m map[interface{}]interface{}
|
|
||||||
if err := util.ConvertAssign(&m, inputVal); nil == err {
|
|
||||||
if len(m) != 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case reflect.Slice:
|
|
||||||
if nil == inputVal {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
var sl []interface{}
|
|
||||||
if err := util.ConvertAssign(&sl, inputVal); nil == err {
|
|
||||||
if len(sl) != 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case reflect.Interface:
|
|
||||||
if inputVal == nil {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
default:
|
|
||||||
// 默认不处理
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// fillFieldValue 填充字段值
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:00 2023/2/2
|
|
||||||
func (rv *ReflectValue) fillFieldValue(reflectValue reflect.Value, fieldInfo *StructField, preSendHandler abstract.IPreSendHandler) {
|
|
||||||
paramByte, _ := json.Marshal(preSendHandler.GetRequestParam())
|
|
||||||
header := preSendHandler.GetRequestHeader()
|
|
||||||
responseByte, _ := json.Marshal(preSendHandler.GetResponseData())
|
|
||||||
extensionByte, _ := json.Marshal(preSendHandler.GetExtensionData())
|
|
||||||
for _, item := range fieldInfo.MappingRuleList {
|
|
||||||
if rv.setDataValue(reflectValue, fieldInfo, item, header, paramByte, responseByte, extensionByte) {
|
|
||||||
// 找到数据,并且赋值成功,结束继续查找
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// getInputValue 获取输入的值
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:24 2023/2/2
|
|
||||||
func (rv *ReflectValue) setDataValue(reflectVal reflect.Value, fieldInfo *StructField, rule MappingRuleItem, header http.Header, paramByte, responseByte, extensionByte []byte) bool {
|
|
||||||
inputVal := rv.getInputValue(rule, header, paramByte, responseByte, extensionByte)
|
|
||||||
if len(inputVal) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
switch fieldInfo.Type {
|
|
||||||
case reflect.String:
|
|
||||||
reflectVal.Field(fieldInfo.Idx).SetString(inputVal)
|
|
||||||
return true
|
|
||||||
case reflect.Int64:
|
|
||||||
var i int64
|
|
||||||
if err := util.ConvertAssign(&i, inputVal); nil != err {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if i == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if fieldInfo.IsPtr {
|
|
||||||
reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&i))
|
|
||||||
} else {
|
|
||||||
reflectVal.Field(fieldInfo.Idx).SetInt(i)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
case reflect.Float64:
|
|
||||||
var f float64
|
|
||||||
if err := util.ConvertAssign(&f, inputVal); nil != err {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if f == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if fieldInfo.IsPtr {
|
|
||||||
reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&f))
|
|
||||||
} else {
|
|
||||||
reflectVal.Field(fieldInfo.Idx).SetFloat(f)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
case reflect.Interface:
|
|
||||||
reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&inputVal))
|
|
||||||
return true
|
|
||||||
case reflect.Slice:
|
|
||||||
fallthrough
|
|
||||||
case reflect.Map:
|
|
||||||
fallthrough
|
|
||||||
case reflect.Struct:
|
|
||||||
var v interface{}
|
|
||||||
if err := json.Unmarshal([]byte(inputVal), &v); nil != err {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&v))
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// getInputValue 获取输入的值
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 14:43 2023/2/2
|
|
||||||
func (rv *ReflectValue) getInputValue(rule MappingRuleItem, header http.Header, paramByte, responseByte, extensionByte []byte) string {
|
|
||||||
switch rule.Location {
|
|
||||||
case MappingLocationHeader:
|
|
||||||
return header.Get(rule.Field)
|
|
||||||
case MappingLocationParam:
|
|
||||||
return gjson.GetBytes(paramByte, rule.Field).String()
|
|
||||||
case MappingLocationResponse:
|
|
||||||
return gjson.GetBytes(responseByte, rule.Field).String()
|
|
||||||
case MappingLocationExtension:
|
|
||||||
return gjson.GetBytes(extensionByte, rule.Field).String()
|
|
||||||
case MappingLocationAll:
|
|
||||||
str := header.Get(rule.Field)
|
|
||||||
if len(str) == 0 {
|
|
||||||
str = gjson.GetBytes(paramByte, rule.Field).String()
|
|
||||||
}
|
|
||||||
if len(str) == 0 {
|
|
||||||
str = gjson.GetBytes(responseByte, rule.Field).String()
|
|
||||||
}
|
|
||||||
if len(str) == 0 {
|
|
||||||
str = gjson.GetBytes(extensionByte, rule.Field).String()
|
|
||||||
}
|
|
||||||
return str
|
|
||||||
default:
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
}
|
|
57
send.go
57
send.go
@ -1,57 +0,0 @@
|
|||||||
// Package event ...
|
|
||||||
//
|
|
||||||
// Description : event ...
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 2023-02-02 15:15
|
|
||||||
package event
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"git.zhangdeman.cn/zhangdeman/event/abstract"
|
|
||||||
)
|
|
||||||
|
|
||||||
// SendEvent 发送事件
|
|
||||||
//
|
|
||||||
// Author : go_developer@163.com<白茶清欢>
|
|
||||||
//
|
|
||||||
// Date : 15:16 2023/2/2
|
|
||||||
func SendEvent(dataFlag string, data interface{}, preSendHandler abstract.IPreSendHandler, sendEventHandler abstract.ISendEventHandler) SendResult {
|
|
||||||
if nil == preSendHandler {
|
|
||||||
preSendHandler = &DefaultPreSendHandler{}
|
|
||||||
}
|
|
||||||
|
|
||||||
if nil == sendEventHandler {
|
|
||||||
sendEventHandler = &DefaultSendEventHandler{}
|
|
||||||
}
|
|
||||||
res := SendResult{}
|
|
||||||
|
|
||||||
if res.NoSendDetail, res.IsNeedSend = preSendHandler.NeedSend(); !res.IsNeedSend {
|
|
||||||
sendEventHandler.NoSendCallback(data, res)
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
// 通过反射填充数据
|
|
||||||
ReflectValueInstance.Do(dataFlag, data, preSendHandler)
|
|
||||||
res.Data = data
|
|
||||||
var (
|
|
||||||
byteData []byte
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
if byteData, err = json.Marshal(data); nil != err {
|
|
||||||
res.IsSuccess = false
|
|
||||||
res.FailReason = err.Error()
|
|
||||||
sendEventHandler.FailCallback(map[string]interface{}{
|
|
||||||
"err": err.Error(),
|
|
||||||
"reason": "data marshal fail",
|
|
||||||
}, err)
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
if res.SendResult, err = sendEventHandler.Send(byteData); nil != err {
|
|
||||||
sendEventHandler.FailCallback(res.SendResult, err)
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
sendEventHandler.SuccessCallback(res.SendResult)
|
|
||||||
return res
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user