From 45d4019f4da7f1d574c5b8a9cf35712d5e64d5f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Mon, 11 Mar 2024 11:40:05 +0800 Subject: [PATCH 01/17] code clean --- abstract/pre_send_event_handler.go | 62 ---- abstract/send_event_handler.go | 47 --- default_pre_send_handler.go | 42 --- default_send_event_handler.go | 27 -- define.go | 109 ------- go.mod | 20 -- go.sum | 45 --- reflect.go | 440 ----------------------------- send.go | 57 ---- 9 files changed, 849 deletions(-) delete mode 100644 abstract/pre_send_event_handler.go delete mode 100644 abstract/send_event_handler.go delete mode 100644 default_pre_send_handler.go delete mode 100644 default_send_event_handler.go delete mode 100644 define.go delete mode 100644 reflect.go delete mode 100644 send.go diff --git a/abstract/pre_send_event_handler.go b/abstract/pre_send_event_handler.go deleted file mode 100644 index 9883a4d..0000000 --- a/abstract/pre_send_event_handler.go +++ /dev/null @@ -1,62 +0,0 @@ -// Package abstract ... -// -// Description : 事件发送前预处理接口约束 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-01 14:06 -package abstract - -import ( - "net/http" -) - -// IPreSendHandler ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:06 2023/2/1 -type IPreSendHandler interface { - // GetRequestParam 构造 base info 时, 可能需要从请求参数中提取公共数据 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:13 2023/2/1 - GetRequestParam() map[string]interface{} - - // GetRequestHeader ... - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:14 2023/2/1 - // 构造 base info 时, 可能需要从请求参数中提取公共数据 - GetRequestHeader() http.Header - - // GetResponseData 响应数据 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:15 2023/2/1 - GetResponseData() map[string]interface{} - - // GetExtensionData 获取扩展数据 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:19 2023/2/1 - GetExtensionData() map[string]interface{} - - // GetEventData 获取事件数据, 建议返回结构体或者结构体指针, 内部会自动补齐缺失的数据 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:16 2023/2/1 - GetEventData() interface{} - - // NeedSend 判断是否需要发送事件, 若不需要发送, 可在第一个返回值中记录不需要发送的原因 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 15:24 2023/2/2 - NeedSend() (map[string]interface{}, bool) -} diff --git a/abstract/send_event_handler.go b/abstract/send_event_handler.go deleted file mode 100644 index 5b9de5b..0000000 --- a/abstract/send_event_handler.go +++ /dev/null @@ -1,47 +0,0 @@ -// Package abstract ... -// -// Description : abstract ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-01 14:21 -package abstract - -import "git.zhangdeman.cn/zhangdeman/event" - -// ISendEventHandler 发送事件处理器 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:21 2023/2/1 -type ISendEventHandler interface { - // Send ... - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:21 2023/2/1 - // 事件发送成功之后, 可以返回一些业务数据, 这些业务数据会回调给SuccessCallback - // 事件发送成功之后, 可以返回一些业务数据 以及 err, 这些业务数据会回调给FailCallback - Send(data []byte) (map[string]interface{}, error) - - // SuccessCallback 事件发送成功的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:21 2023/2/1 - SuccessCallback(data map[string]interface{}) - - // FailCallback 事件发送失败的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 14:22 2023/2/1 - FailCallback(data map[string]interface{}, err error) - - // NoSendCallback 不需要发送事件的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 15:26 2023/2/2 - NoSendCallback(data interface{}, res event.SendResult) -} diff --git a/default_pre_send_handler.go b/default_pre_send_handler.go deleted file mode 100644 index 11d93b2..0000000 --- a/default_pre_send_handler.go +++ /dev/null @@ -1,42 +0,0 @@ -// Package event ... -// -// Description : event ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-02 16:36 -package event - -import "net/http" - -// DefaultPreSendHandler IPreSendHandler 默认实现 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 16:36 2023/2/2 -type DefaultPreSendHandler struct { -} - -func (d *DefaultPreSendHandler) GetRequestParam() map[string]interface{} { - return map[string]interface{}{} -} - -func (d *DefaultPreSendHandler) GetRequestHeader() http.Header { - return make(map[string][]string, 0) -} - -func (d *DefaultPreSendHandler) GetResponseData() map[string]interface{} { - return map[string]interface{}{} -} - -func (d *DefaultPreSendHandler) GetExtensionData() map[string]interface{} { - return map[string]interface{}{} -} - -func (d *DefaultPreSendHandler) GetEventData() interface{} { - return map[string]interface{}{} -} - -func (d *DefaultPreSendHandler) NeedSend() (map[string]interface{}, bool) { - return map[string]interface{}{}, true -} diff --git a/default_send_event_handler.go b/default_send_event_handler.go deleted file mode 100644 index 466644c..0000000 --- a/default_send_event_handler.go +++ /dev/null @@ -1,27 +0,0 @@ -// Package event ... -// -// Description : event ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-02 16:39 -package event - -type DefaultSendEventHandler struct { -} - -func (d *DefaultSendEventHandler) Send(data []byte) (map[string]interface{}, error) { - return map[string]interface{}{}, nil -} - -func (d *DefaultSendEventHandler) SuccessCallback(data map[string]interface{}) { - -} - -func (d *DefaultSendEventHandler) FailCallback(data map[string]interface{}, err error) { - -} - -func (d *DefaultSendEventHandler) NoSendCallback(data interface{}, res SendResult) { - -} diff --git a/define.go b/define.go deleted file mode 100644 index 0c083a8..0000000 --- a/define.go +++ /dev/null @@ -1,109 +0,0 @@ -// Package event ... -// -// Description : 各种常量定义 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-01 14:23 -package event - -import "reflect" - -const ( - // OutEventTag 事件数据输出key的标签 - OutEventTag = "event" - // JsonTag json输出的标签 - JsonTag = "json" - // IgnoreTagValue 不做输出的标签值 - IgnoreTagValue = "-" - // MappingTag 参数映射标签 - MappingTag = "mapping" - // PriorityTag 处理数据时, mapping数据查找的优先级, 默认值 : field - PriorityTag = "priority" - // OmitemptyTag ... - OmitemptyTag = "omitempty" -) - -const ( - // MappingLocationAll 自动探测所有路径 - MappingLocationAll = "all" - // MappingLocationParam 从参数读取 - MappingLocationParam = "param" - // MappingLocationHeader 从请求header读取 - MappingLocationHeader = "header" - // MappingLocationResponse 从响应数据读取 - MappingLocationResponse = "response" - // MappingLocationExtension 从扩展数据读取 - MappingLocationExtension = "extension" -) - -var ( - mappingLocationList = []string{ - MappingLocationHeader, - MappingLocationParam, - MappingLocationResponse, - MappingLocationExtension, - } -) - -// SetMappingLocationList 只持续该默认的查找优先级 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 18:17 2023/2/1 -func SetMappingLocationList(locationList []string) { - if len(locationList) > 0 { - mappingLocationList = locationList - } -} - -// MappingRuleItem 规则 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 17:36 2023/2/1 -type MappingRuleItem struct { - Location string `json:"location"` // 数据所在位置, header-请求头 param-参数获取 response-响应数据获取 extension-扩展数据读取 all-自动按照header/param/response/extension的顺序查询 - Field string `json:"field"` // 查询的字段名称 -} - -// StructField 结构体字段信息 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:35 2023/2/1 -type StructField struct { - Idx int // 字段在结构体的索引 - Name string // 字段名 - Type reflect.Kind // 字段类型 - IsPtr bool // 是否为指针 - JsonTag string // json标签 - EventTag string // 事件标签 - MappingRuleList []MappingRuleItem // 规则列表 -} - -// StructInfo 结构体信息 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 16:17 2023/2/1 -type StructInfo struct { - Flag string // 结构体标识 - IsStruct bool // 是否为结构体 - IsStructPtr bool // 是否为结构体指针 - StructFieldList []*StructField // 结构体字段列表 -} - -// SendResult ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 15:27 2023/2/2 -type SendResult struct { - IsNeedSend bool `json:"is_need_send"` // 是否需要发送 - NoSendDetail map[string]interface{} `json:"no_send_detail"` // 未发送详细原因 - Data interface{} `json:"data"` // 发送的数据 - IsSuccess bool `json:"is_success"` // 是否处理成功 - FailReason string `json:"fail_reason"` // 失败原因 - SendResult map[string]interface{} `json:"send_result"` // 发送结果 -} diff --git a/go.mod b/go.mod index 85f3283..1a3001e 100644 --- a/go.mod +++ b/go.mod @@ -3,23 +3,3 @@ module git.zhangdeman.cn/zhangdeman/event go 1.21 toolchain go1.22.1 - -require ( - git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253 - github.com/tidwall/gjson v1.17.1 -) - -require ( - github.com/Jeffail/gabs v1.4.0 // indirect - github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-ini/ini v1.67.0 // indirect - github.com/mitchellh/go-homedir v1.1.0 // indirect - github.com/mozillazg/go-pinyin v0.20.0 // indirect - github.com/pkg/errors v0.9.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/tidwall/match v1.1.1 // indirect - github.com/tidwall/pretty v1.2.1 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) diff --git a/go.sum b/go.sum index c9259d3..e69de29 100644 --- a/go.sum +++ b/go.sum @@ -1,45 +0,0 @@ -git.zhangdeman.cn/zhangdeman/util v0.0.0-20230113095943-b4b3e261e0c4 h1:1WclY9P8l8o/NZ3ZR/mupm8LtowjQ/Q4UNGXR32f0OQ= -git.zhangdeman.cn/zhangdeman/util v0.0.0-20230113095943-b4b3e261e0c4/go.mod h1:zTir/0IWdK3E7n0GiaogyWHADAQnBtTdl2I6Z2/OPqw= -git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253 h1:GO3oZa5a2sqwAzGcLDJtQzmshSWRmoP7IDS8bwFqvC4= -git.zhangdeman.cn/zhangdeman/util v0.0.0-20231227095334-7eb5cdbf9253/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI= -github.com/Jeffail/gabs v1.4.0 h1://5fYRRTq1edjfIrQGvdkcd22pkYUrHZ5YC/H2GJVAo= -github.com/Jeffail/gabs v1.4.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= -github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= -github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-ini/ini v1.66.6 h1:h6k2Bb0HWS/BXXHCXj4QHjxPmlIU4NK+7MuLp9SD+4k= -github.com/go-ini/ini v1.66.6/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= -github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= -github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= -github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= -github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= -github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= -github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo= -github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= -github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= -github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= -github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= -github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= -github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/reflect.go b/reflect.go deleted file mode 100644 index b1abfdc..0000000 --- a/reflect.go +++ /dev/null @@ -1,440 +0,0 @@ -// Package event ... -// -// Description : event ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-01 14:30 -package event - -import ( - "encoding/json" - "git.zhangdeman.cn/zhangdeman/event/abstract" - "git.zhangdeman.cn/zhangdeman/util" - "github.com/tidwall/gjson" - "net/http" - "reflect" - "strings" - "sync" -) - -var ( - // ReflectTypeInstance 反射实例 - ReflectTypeInstance *ReflectType - ReflectValueInstance *ReflectValue -) - -func init() { - ReflectTypeInstance = &ReflectType{ - lock: &sync.RWMutex{}, - cacheTable: make(map[string]*StructInfo), - } - ReflectValueInstance = &ReflectValue{} -} - -// ReflectType 反射数据类型 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:31 2023/2/1 -type ReflectType struct { - // 数据锁 - lock *sync.RWMutex - // 反射结果缓存 - cacheTable map[string]*StructInfo -} - -// Do 反射获取数据类型 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:34 2023/2/1 -// -// 为特定结构体生成全局唯一的标识, 并进行缓存, 加速反射结果获取 -func (rt *ReflectType) Do(dataFlag string, data interface{}) *StructInfo { - rt.lock.Lock() - defer rt.lock.Unlock() - if cacheResult, exist := rt.cacheTable[dataFlag]; exist { - // 缓存存在, 直接是有缓存结果 - return cacheResult - } - // 缓存不存在, 解析 - res := &StructInfo{ - Flag: dataFlag, - IsStruct: false, - IsStructPtr: false, - StructFieldList: make([]*StructField, 0), - } - isPtr := false - reflectType := reflect.TypeOf(data) - if reflectType.Kind() == reflect.Ptr { - isPtr = true - reflectType = reflectType.Elem() - } - if reflectType.Kind() != reflect.Struct { - // 非结构体,无需反射 - rt.cacheTable[dataFlag] = res - return res - } - res.IsStruct = true - res.IsStructPtr = isPtr - for idx := 0; idx < reflectType.NumField(); idx++ { - field := &StructField{ - Idx: idx, - Name: reflectType.Field(idx).Name, - JsonTag: reflectType.Field(idx).Tag.Get(JsonTag), - EventTag: reflectType.Field(idx).Tag.Get(OutEventTag), - MappingRuleList: make([]MappingRuleItem, 0), - } - rt.fillFieldType(field, reflectType.Field(idx).Type) - rt.fillMappingRule(field, reflectType.Field(idx).Tag.Get(MappingTag)) - res.StructFieldList = append(res.StructFieldList, field) - } - rt.cacheTable[dataFlag] = res - return res -} - -// fillFieldType 填充字段类型 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 20:46 2023/2/1 -func (rt *ReflectType) fillFieldType(field *StructField, dataType reflect.Type) { - switch dataType.Kind() { - case reflect.Float32: - fallthrough - case reflect.Float64: - field.Type = reflect.Float64 - case reflect.String: - field.Type = reflect.String - case reflect.Struct: - field.Type = reflect.Struct - case reflect.Slice: - field.Type = reflect.Slice - case reflect.Map: - field.Type = reflect.Map - case reflect.Ptr: - field.IsPtr = true - // 指针再次判断基础类型 - field.Type = dataType.Elem().Kind() - default: - if strings.Contains(dataType.String(), "int") { - field.Type = reflect.Int64 - } else { - field.Type = reflect.Interface - } - } -} - -// fillTagInfo 填充标签信息 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 17:11 2023/2/1 -func (rt *ReflectType) fillTagInfo(field *StructField) { - if len(field.JsonTag) == 0 { - field.JsonTag = field.Name - } - - // jsonTag 去掉 omitempty - jsonTagValArr := strings.Split(field.JsonTag, ",") - for _, item := range jsonTagValArr { - if len(item) > 0 && item != OmitemptyTag { - field.JsonTag = item - break - } - } - - // 没有设置event tag,则和 json tag保持一致 - if len(field.EventTag) == 0 { - field.EventTag = field.JsonTag - } -} - -// fillMappingRule 解析参数映射规则 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 17:14 2023/2/1 -// -// mapping:"user_id:param#user_id|header#id" -func (rt *ReflectType) fillMappingRule(field *StructField, inputMappingVal string) { - - if len(inputMappingVal) == 0 { - // 没有指定规则, 有默认规则 - for _, location := range mappingLocationList { - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: location, - Field: field.JsonTag, - }) - } - return - } - mappingArr := strings.Split(inputMappingVal, ",") - for _, item := range mappingArr { - item = strings.TrimSpace(item) - // 要赋值的字段名 - itemArr := strings.Split(item, ":") - if len(itemArr) != 2 { - // 配置格式错误, 跳过 - continue - } - mapRuleArr := strings.Split(strings.TrimSpace(itemArr[1]), "|") - for _, itemMapRule := range mapRuleArr { - itemMapRule = strings.TrimLeft(itemMapRule, "#") - itemMapRuleArr := strings.Split(itemMapRule, "#") - // 注意 : # 为特殊分隔符, 如配置成 mapping:"project_id:#source_project_id#xxx_project_id" 实际等价于 mapping:"project_id:#source_project_id" 多余配置自动跳过 - if len(itemMapRuleArr[0]) < 2 { - // 没有指定位置,默认all, 即配置格式: mapping:"project_id:#source_project_id" - itemMapRuleArr[0] = MappingLocationAll - itemMapRuleArr = []string{MappingLocationAll, itemMapRuleArr[0]} - } - - switch itemMapRuleArr[0] { - // 从header读取 - case MappingLocationHeader: - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: MappingLocationHeader, - Field: itemMapRuleArr[1], - }) - // 从请求参数读取 - case MappingLocationParam: - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: MappingLocationParam, - Field: itemMapRuleArr[1], - }) - // 从响应数据读取 - case MappingLocationResponse: - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: MappingLocationResponse, - Field: itemMapRuleArr[1], - }) - // 从扩展数据读取 - case MappingLocationExtension: - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: MappingLocationExtension, - Field: itemMapRuleArr[1], - }) - // 全部读取一遍 - case MappingLocationAll: - fallthrough - default: - for _, itemLocation := range mappingLocationList { - field.MappingRuleList = append(field.MappingRuleList, MappingRuleItem{ - Location: itemLocation, - Field: itemMapRuleArr[1], - }) - } - } - } - } -} - -// ReflectValue 反射值的实例 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 10:53 2023/2/2 -type ReflectValue struct { -} - -// Do 通过反射机制,对data进行数据填充,此逻辑要求 data 必须是结构体或者结构体指针 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 11:52 2023/2/2 -func (rv *ReflectValue) Do(dataFlag string, data interface{}, preSendHandler abstract.IPreSendHandler) { - structInfo := ReflectTypeInstance.Do(dataFlag, data) - if !structInfo.IsStruct { - return - } - reflectValue := reflect.ValueOf(data) - if structInfo.IsStructPtr { - reflectValue = reflectValue.Elem() - } - - for _, fieldInfo := range structInfo.StructFieldList { - if !rv.isZeroInputFieldValue(reflectValue, fieldInfo) { - // 不是零值, 无需处理 - continue - } - // 是零值, 填充默认值 - } -} - -// isZeroInputFieldValue 判断对应的字段是否为对应类型默认的零值 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 12:04 2023/2/2 -func (rv *ReflectValue) isZeroInputFieldValue(reflectValue reflect.Value, fieldInfo *StructField) bool { - inputVal := reflectValue.Field(fieldInfo.Idx).Interface() - switch fieldInfo.Type { - case reflect.Float64: - var f float64 - if err := util.ConvertAssign(&f, inputVal); nil == err { - if f != 0 { - return false - } - return true - } - case reflect.String: - var s string - if err := util.ConvertAssign(&s, inputVal); nil == err { - if len(s) > 0 { - return false - } - return true - } - case reflect.Int64: - var i int64 - if err := util.ConvertAssign(&i, inputVal); nil == err { - if i != 0 { - return false - } - return true - } - case reflect.Map: - if nil == inputVal { - return true - } - var m map[interface{}]interface{} - if err := util.ConvertAssign(&m, inputVal); nil == err { - if len(m) != 0 { - return false - } - return true - } - case reflect.Slice: - if nil == inputVal { - return true - } - var sl []interface{} - if err := util.ConvertAssign(&sl, inputVal); nil == err { - if len(sl) != 0 { - return false - } - return true - } - case reflect.Interface: - if inputVal == nil { - return true - } - return false - default: - // 默认不处理 - return false - } - return false -} - -// fillFieldValue 填充字段值 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:00 2023/2/2 -func (rv *ReflectValue) fillFieldValue(reflectValue reflect.Value, fieldInfo *StructField, preSendHandler abstract.IPreSendHandler) { - paramByte, _ := json.Marshal(preSendHandler.GetRequestParam()) - header := preSendHandler.GetRequestHeader() - responseByte, _ := json.Marshal(preSendHandler.GetResponseData()) - extensionByte, _ := json.Marshal(preSendHandler.GetExtensionData()) - for _, item := range fieldInfo.MappingRuleList { - if rv.setDataValue(reflectValue, fieldInfo, item, header, paramByte, responseByte, extensionByte) { - // 找到数据,并且赋值成功,结束继续查找 - break - } - } -} - -// getInputValue 获取输入的值 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:24 2023/2/2 -func (rv *ReflectValue) setDataValue(reflectVal reflect.Value, fieldInfo *StructField, rule MappingRuleItem, header http.Header, paramByte, responseByte, extensionByte []byte) bool { - inputVal := rv.getInputValue(rule, header, paramByte, responseByte, extensionByte) - if len(inputVal) == 0 { - return false - } - switch fieldInfo.Type { - case reflect.String: - reflectVal.Field(fieldInfo.Idx).SetString(inputVal) - return true - case reflect.Int64: - var i int64 - if err := util.ConvertAssign(&i, inputVal); nil != err { - return false - } - if i == 0 { - return false - } - if fieldInfo.IsPtr { - reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&i)) - } else { - reflectVal.Field(fieldInfo.Idx).SetInt(i) - } - return true - case reflect.Float64: - var f float64 - if err := util.ConvertAssign(&f, inputVal); nil != err { - return false - } - if f == 0 { - return false - } - if fieldInfo.IsPtr { - reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&f)) - } else { - reflectVal.Field(fieldInfo.Idx).SetFloat(f) - } - return true - case reflect.Interface: - reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&inputVal)) - return true - case reflect.Slice: - fallthrough - case reflect.Map: - fallthrough - case reflect.Struct: - var v interface{} - if err := json.Unmarshal([]byte(inputVal), &v); nil != err { - return false - } - reflectVal.Field(fieldInfo.Idx).Set(reflect.ValueOf(&v)) - return true - default: - return false - } -} - -// getInputValue 获取输入的值 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 14:43 2023/2/2 -func (rv *ReflectValue) getInputValue(rule MappingRuleItem, header http.Header, paramByte, responseByte, extensionByte []byte) string { - switch rule.Location { - case MappingLocationHeader: - return header.Get(rule.Field) - case MappingLocationParam: - return gjson.GetBytes(paramByte, rule.Field).String() - case MappingLocationResponse: - return gjson.GetBytes(responseByte, rule.Field).String() - case MappingLocationExtension: - return gjson.GetBytes(extensionByte, rule.Field).String() - case MappingLocationAll: - str := header.Get(rule.Field) - if len(str) == 0 { - str = gjson.GetBytes(paramByte, rule.Field).String() - } - if len(str) == 0 { - str = gjson.GetBytes(responseByte, rule.Field).String() - } - if len(str) == 0 { - str = gjson.GetBytes(extensionByte, rule.Field).String() - } - return str - default: - return "" - } -} diff --git a/send.go b/send.go deleted file mode 100644 index ef717c0..0000000 --- a/send.go +++ /dev/null @@ -1,57 +0,0 @@ -// Package event ... -// -// Description : event ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2023-02-02 15:15 -package event - -import ( - "encoding/json" - "git.zhangdeman.cn/zhangdeman/event/abstract" -) - -// SendEvent 发送事件 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 15:16 2023/2/2 -func SendEvent(dataFlag string, data interface{}, preSendHandler abstract.IPreSendHandler, sendEventHandler abstract.ISendEventHandler) SendResult { - if nil == preSendHandler { - preSendHandler = &DefaultPreSendHandler{} - } - - if nil == sendEventHandler { - sendEventHandler = &DefaultSendEventHandler{} - } - res := SendResult{} - - if res.NoSendDetail, res.IsNeedSend = preSendHandler.NeedSend(); !res.IsNeedSend { - sendEventHandler.NoSendCallback(data, res) - return res - } - // 通过反射填充数据 - ReflectValueInstance.Do(dataFlag, data, preSendHandler) - res.Data = data - var ( - byteData []byte - err error - ) - - if byteData, err = json.Marshal(data); nil != err { - res.IsSuccess = false - res.FailReason = err.Error() - sendEventHandler.FailCallback(map[string]interface{}{ - "err": err.Error(), - "reason": "data marshal fail", - }, err) - return res - } - if res.SendResult, err = sendEventHandler.Send(byteData); nil != err { - sendEventHandler.FailCallback(res.SendResult, err) - return res - } - sendEventHandler.SuccessCallback(res.SendResult) - return res -} -- 2.36.6 From 82785601dfbc0c18cab294766dbc3b87f6b8459e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Mon, 11 Mar 2024 12:10:44 +0800 Subject: [PATCH 02/17] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9F=BA=E7=A1=80?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=AE=9A=E4=B9=89=20+=20=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E7=BA=A6=E6=9D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++ define/consts.go | 14 +++++++++ define/data.go | 22 ++++++++++++++ 3 files changed, 108 insertions(+) create mode 100644 abstract/IEvent.go create mode 100644 define/consts.go create mode 100644 define/data.go diff --git a/abstract/IEvent.go b/abstract/IEvent.go new file mode 100644 index 0000000..b3f843f --- /dev/null +++ b/abstract/IEvent.go @@ -0,0 +1,72 @@ +// Package abstract ... +// +// Description : abstract ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-03-11 12:02 +package abstract + +import "git.zhangdeman.cn/zhangdeman/event/define" + +// IEvent 事件接口定义 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 19:08 2023/8/14 +type IEvent interface { + // Construct 初始化事件实例 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:04 2024/3/11 + Construct() error + // SendEvent 发送事件 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:04 2024/3/11 + SendEvent(eventTData *define.EventData) (map[string]interface{}, error) + // SendFailCallback 发送失败的回调方法 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:08 2024/3/11 + SendFailCallback(eventTData *define.EventData, err error) + // SendSuccessCallback 发送成功的回调 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:08 2024/3/11 + SendSuccessCallback(eventTData *define.EventData, err error) + // ConsumeEvent 消费事件 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:05 2024/3/11 + ConsumeEvent() (<-chan *define.EventData, error) + // ConsumeFailCallback 消费失败的回调, eventData 可能为 NIL + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:09 2024/3/11 + ConsumeFailCallback(eventData *define.EventData, err error) + // ConsumeSuccessCallback 消费成功的回调 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:10 2024/3/11 + ConsumeSuccessCallback(eventData *define.EventData) + // Destroy 事件实例销毁时, 执行的方法 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:05 2024/3/11 + Destroy() + // GetDriverType 获取驱动类型 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 12:06 2024/3/11 + GetDriverType() string +} diff --git a/define/consts.go b/define/consts.go new file mode 100644 index 0000000..d22c369 --- /dev/null +++ b/define/consts.go @@ -0,0 +1,14 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-03-11 12:06 +package define + +const ( + DriverTypeEtcd = "ETCD" // etcd驱动 + DriverTypeKafka = "KAFKA" // kafka驱动 + DriverTypeREDIS = "REDIS" // redis驱动 +) diff --git a/define/data.go b/define/data.go new file mode 100644 index 0000000..773c68f --- /dev/null +++ b/define/data.go @@ -0,0 +1,22 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-03-11 11:40 +package define + +// EventData ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:41 2024/3/11 +type EventData struct { + EventType string `json:"event_type"` // 事件类型 + TraceID string `json:"trace_id"` // 事件追踪ID + Host string `json:"host"` // 触发事件host + Timestamp int64 `json:"timestamp"` // 触发时间,纳秒级时间戳 + SystemTimestamp int64 `json:"system_timestamp"` // 发送时的系统时间 + Data interface{} `json:"data"` // 发送的数据 +} -- 2.36.6 From ea7920cceb36eba2ab38f815d82787d5a9dc2f8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Tue, 25 Jun 2024 16:03:45 +0800 Subject: [PATCH 03/17] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 20 +++++++++++++++----- define/data.go | 24 ++++++++++++++++++------ 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/abstract/IEvent.go b/abstract/IEvent.go index b3f843f..8ebd7bb 100644 --- a/abstract/IEvent.go +++ b/abstract/IEvent.go @@ -7,7 +7,10 @@ // Date : 2024-03-11 12:02 package abstract -import "git.zhangdeman.cn/zhangdeman/event/define" +import ( + "context" + "git.zhangdeman.cn/zhangdeman/event/define" +) // IEvent 事件接口定义 // @@ -21,24 +24,31 @@ type IEvent interface { // // Date : 12:04 2024/3/11 Construct() error - // SendEvent 发送事件 + // SendEvent 发送事件(同步) // // Author : go_developer@163.com<白茶清欢> // // Date : 12:04 2024/3/11 - SendEvent(eventTData *define.EventData) (map[string]interface{}, error) + SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) + + // SendEventAsync 发送事件(异步) + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 15:58 2024/6/25 + SendEventAsync(ctx context.Context, eventData *define.EventData) // SendFailCallback 发送失败的回调方法 // // Author : go_developer@163.com<白茶清欢> // // Date : 12:08 2024/3/11 - SendFailCallback(eventTData *define.EventData, err error) + SendFailCallback(ctx context.Context, eventResult *define.SendResult) // SendSuccessCallback 发送成功的回调 // // Author : go_developer@163.com<白茶清欢> // // Date : 12:08 2024/3/11 - SendSuccessCallback(eventTData *define.EventData, err error) + SendSuccessCallback(ctx context.Context, eventResult *define.SendResult, err error) // ConsumeEvent 消费事件 // // Author : go_developer@163.com<白茶清欢> diff --git a/define/data.go b/define/data.go index 773c68f..2da5538 100644 --- a/define/data.go +++ b/define/data.go @@ -13,10 +13,22 @@ package define // // Date : 11:41 2024/3/11 type EventData struct { - EventType string `json:"event_type"` // 事件类型 - TraceID string `json:"trace_id"` // 事件追踪ID - Host string `json:"host"` // 触发事件host - Timestamp int64 `json:"timestamp"` // 触发时间,纳秒级时间戳 - SystemTimestamp int64 `json:"system_timestamp"` // 发送时的系统时间 - Data interface{} `json:"data"` // 发送的数据 + EventType string `json:"event_type"` // 事件类型 + TraceID string `json:"trace_id"` // 事件追踪ID + Host string `json:"host"` // 触发事件host + Timestamp int64 `json:"timestamp"` // 触发时间,纳秒级时间戳 + SystemTimestamp int64 `json:"system_timestamp"` // 发送时的系统时间 + Data any `json:"data"` // 发送的数据 +} + +// SendResult 发送结果 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 15:56 2024/6/25 +type SendResult struct { + Data *EventData `json:"data"` // 发送的数据 + IsSuccess bool `json:"is_success"` // 是否发送成功 + FailReason string `json:"fail_reason"` // 失败原因 + Extension map[string]any `json:"extension"` // 扩展数据 } -- 2.36.6 From c5b0d49c25f4ebaed93582bc605dc9a7b95bb558 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Tue, 25 Jun 2024 16:05:37 +0800 Subject: [PATCH 04/17] =?UTF-8?q?=E9=A9=B1=E5=8A=A8=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/abstract/IEvent.go b/abstract/IEvent.go index 8ebd7bb..70dfa96 100644 --- a/abstract/IEvent.go +++ b/abstract/IEvent.go @@ -73,10 +73,10 @@ type IEvent interface { // // Date : 12:05 2024/3/11 Destroy() - // GetDriverType 获取驱动类型 + // DriverType 获取驱动类型 // // Author : go_developer@163.com<白茶清欢> // // Date : 12:06 2024/3/11 - GetDriverType() string + DriverType() string } -- 2.36.6 From 943d5f36cb067f845176bbaf4a181ee618af0c2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Tue, 25 Jun 2024 16:47:29 +0800 Subject: [PATCH 05/17] =?UTF-8?q?=E8=A7=84=E5=88=92=E5=9F=BA=E4=BA=8Eredis?= =?UTF-8?q?=20pub/sub=20=E7=9A=84=E9=A9=B1=E5=8A=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 6 --- define/data.go | 11 +++-- define/redis.go | 22 +++++++++ go.mod | 22 +++++++++ go.sum | 40 ++++++++++++++++ redis_pub_sub.go | 112 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 203 insertions(+), 10 deletions(-) create mode 100644 define/redis.go create mode 100644 redis_pub_sub.go diff --git a/abstract/IEvent.go b/abstract/IEvent.go index 70dfa96..54029fe 100644 --- a/abstract/IEvent.go +++ b/abstract/IEvent.go @@ -18,12 +18,6 @@ import ( // // Date : 19:08 2023/8/14 type IEvent interface { - // Construct 初始化事件实例 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 12:04 2024/3/11 - Construct() error // SendEvent 发送事件(同步) // // Author : go_developer@163.com<白茶清欢> diff --git a/define/data.go b/define/data.go index 2da5538..baeb823 100644 --- a/define/data.go +++ b/define/data.go @@ -18,6 +18,7 @@ type EventData struct { Host string `json:"host"` // 触发事件host Timestamp int64 `json:"timestamp"` // 触发时间,纳秒级时间戳 SystemTimestamp int64 `json:"system_timestamp"` // 发送时的系统时间 + Key string `json:"key"` // 会基于当前值进行hash, 决定消息分区, 不指定则随机生成 Data any `json:"data"` // 发送的数据 } @@ -27,8 +28,10 @@ type EventData struct { // // Date : 15:56 2024/6/25 type SendResult struct { - Data *EventData `json:"data"` // 发送的数据 - IsSuccess bool `json:"is_success"` // 是否发送成功 - FailReason string `json:"fail_reason"` // 失败原因 - Extension map[string]any `json:"extension"` // 扩展数据 + Data *EventData `json:"data"` // 发送的数据 + PartitionNum int `json:"partition_num"` // 分区索引编号 + Topic string `json:"topic"` // 使用的真实topic + IsSuccess bool `json:"is_success"` // 是否发送成功 + FailReason string `json:"fail_reason"` // 失败原因 + Extension map[string]any `json:"extension"` // 扩展数据 } diff --git a/define/redis.go b/define/redis.go new file mode 100644 index 0000000..63755e4 --- /dev/null +++ b/define/redis.go @@ -0,0 +1,22 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-25 16:24 +package define + +const ( + DefaultPartitionNum = 1 +) + +// RedisEventPubSubConfig redis事件配置 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:25 2024/6/25 +type RedisEventPubSubConfig struct { + Topic string `json:"topic"` // topic key, 不指定随机生成 + PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 +} diff --git a/go.mod b/go.mod index 1a3001e..6e362bf 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,25 @@ module git.zhangdeman.cn/zhangdeman/event go 1.21 toolchain go1.22.1 + +require ( + git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 // indirect + git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 // indirect + git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect + git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd // indirect + git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect + git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e // indirect + github.com/BurntSushi/toml v1.4.0 // indirect + github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-ini/ini v1.67.0 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mozillazg/go-pinyin v0.20.0 // indirect + github.com/redis/go-redis/v9 v9.5.3 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/tidwall/gjson v1.17.1 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index e69de29..0a006c1 100644 --- a/go.sum +++ b/go.sum @@ -0,0 +1,40 @@ +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 h1:+o+BI5GGlwJelPLWL8ciDJqxw/G8dv+FU6OztGSnEjo= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 h1:I/wOsRpCSRkU9vo1u703slQsmK0wnNeZzsWQOGtIAG0= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211/go.mod h1:SrtvrQRdzt+8KfYzvosH++gWxo2ShPTzR1m3VQ6uX7U= +git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 h1:gUDlQMuJ4xNfP2Abl1Msmpa3fASLWYkNlqDFF/6GN0Y= +git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd h1:2Y37waOVCmVvx0Rp8VGEptE2/2JVMImtxB4dKKDk/3w= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd/go.mod h1:6+7whkCmb4sJDIfH3HxNuXRveaM0gCCNWd2uXZqNtIE= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e h1:Q973S6CcWr1ICZhFI1STFOJ+KUImCl2BaIXm6YppBqI= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e h1:+PeWa2QdYBWnL32CfAAgy0dlaRCVNmYZDH4q+9w7Gfg= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e/go.mod h1:US/pcq2vstE3iyxIHf53w8IeXKkZys7bj/ozLWkRYeE= +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= +github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= +github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/redis_pub_sub.go b/redis_pub_sub.go new file mode 100644 index 0000000..0956c35 --- /dev/null +++ b/redis_pub_sub.go @@ -0,0 +1,112 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-25 16:06 +package event + +import ( + "context" + "git.zhangdeman.cn/zhangdeman/event/abstract" + "git.zhangdeman.cn/zhangdeman/event/define" + "git.zhangdeman.cn/zhangdeman/wrapper" + "github.com/redis/go-redis/v9" +) + +var ( + RedisEventPubSubClient abstract.IEvent +) + +// InitRedisPubSubEvent 初始化redis事件驱动, 基于 pub / sub 指令 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:24 2024/6/25 +func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { + instance := &RedisEventPubSub{ + redisClient: redisClient, + pubSubConfig: pubSubConfig, + } + instance.SetRedisClient(redisClient, pubSubConfig) + RedisEventPubSubClient = instance +} + +// RedisEventPubSub 基于redis的事件驱动 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:07 2024/6/25 +type RedisEventPubSub struct { + redisClient *redis.Client // redis客户端 + pubSubConfig *define.RedisEventPubSubConfig // 事件配置 + +} + +func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) SendFailCallback(ctx context.Context, eventResult *define.SendResult) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) SendSuccessCallback(ctx context.Context, eventResult *define.SendResult, err error) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) ConsumeEvent() (<-chan *define.EventData, error) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) ConsumeFailCallback(eventData *define.EventData, err error) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) ConsumeSuccessCallback(eventData *define.EventData) { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) Destroy() { + //TODO implement me + panic("implement me") +} + +func (r *RedisEventPubSub) DriverType() string { + //TODO implement me + panic("implement me") +} + +// SetRedisClient 设置redis客户端 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:30 2024/6/25 +func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { + if nil == pubSubConfig { + pubSubConfig = &define.RedisEventPubSubConfig{ + Topic: "", + PartitionNum: 0, + } + } + if len(pubSubConfig.Topic) == 0 { + pubSubConfig.Topic = "EVENT_TOPIC_" + wrapper.StringFromRandom(128, "").Md5().Value + } + if pubSubConfig.PartitionNum <= 0 { + pubSubConfig.PartitionNum = define.DefaultPartitionNum + } + r.redisClient = redisClient + r.pubSubConfig = pubSubConfig +} -- 2.36.6 From 0eeb2ef42e73ca606afe890c8e5f7e884297a71b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Tue, 25 Jun 2024 19:39:49 +0800 Subject: [PATCH 06/17] save code --- define/redis.go | 3 ++- redis_pub_sub.go | 6 ++---- redis_pub_sub_test.go | 19 +++++++++++++++++++ 3 files changed, 23 insertions(+), 5 deletions(-) create mode 100644 redis_pub_sub_test.go diff --git a/define/redis.go b/define/redis.go index 63755e4..5892399 100644 --- a/define/redis.go +++ b/define/redis.go @@ -8,7 +8,8 @@ package define const ( - DefaultPartitionNum = 1 + DefaultRedisPartitionNum = 1 + DefaultRedisTopic = "EVENT_TOPIC_C6DBE0AAE846C5C0DE35802107326B1E" ) // RedisEventPubSubConfig redis事件配置 diff --git a/redis_pub_sub.go b/redis_pub_sub.go index 0956c35..6db7505 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -11,7 +11,6 @@ import ( "context" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" - "git.zhangdeman.cn/zhangdeman/wrapper" "github.com/redis/go-redis/v9" ) @@ -41,7 +40,6 @@ func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisE type RedisEventPubSub struct { redisClient *redis.Client // redis客户端 pubSubConfig *define.RedisEventPubSubConfig // 事件配置 - } func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { @@ -102,10 +100,10 @@ func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfi } } if len(pubSubConfig.Topic) == 0 { - pubSubConfig.Topic = "EVENT_TOPIC_" + wrapper.StringFromRandom(128, "").Md5().Value + pubSubConfig.Topic = define.DefaultRedisTopic } if pubSubConfig.PartitionNum <= 0 { - pubSubConfig.PartitionNum = define.DefaultPartitionNum + pubSubConfig.PartitionNum = define.DefaultRedisPartitionNum } r.redisClient = redisClient r.pubSubConfig = pubSubConfig diff --git a/redis_pub_sub_test.go b/redis_pub_sub_test.go new file mode 100644 index 0000000..2c044d5 --- /dev/null +++ b/redis_pub_sub_test.go @@ -0,0 +1,19 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-25 16:51 +package event + +import ( + "fmt" + "git.zhangdeman.cn/zhangdeman/wrapper" + "strings" + "testing" +) + +func TestInitRedisPubSubEvent(t *testing.T) { + fmt.Println(strings.ToUpper(wrapper.StringFromRandom(128, "").Md5().Value)) +} -- 2.36.6 From 87b2742e360a4c5bf17dfbfc8bf4e026b5e49317 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 26 Jun 2024 15:46:00 +0800 Subject: [PATCH 07/17] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8Eredis?= =?UTF-8?q?=E7=9A=84=E4=BA=8B=E4=BB=B6=E9=94=80=E6=AF=81=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- define/redis.go | 12 ++++++++---- redis_pub_sub.go | 45 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/define/redis.go b/define/redis.go index 5892399..60dfcc9 100644 --- a/define/redis.go +++ b/define/redis.go @@ -8,8 +8,10 @@ package define const ( - DefaultRedisPartitionNum = 1 - DefaultRedisTopic = "EVENT_TOPIC_C6DBE0AAE846C5C0DE35802107326B1E" + DefaultRedisPartitionNum = 1 + DefaultRedisTopic = "EVENT_TOPIC_C6DBE0AAE846C5C0DE35802107326B1E" + DefaultRedisMessageBufferSize = 1024 + DefaultRedisCloseMaxWaitTime = 5000 // 默认最大等待 : 5s ) // RedisEventPubSubConfig redis事件配置 @@ -18,6 +20,8 @@ const ( // // Date : 16:25 2024/6/25 type RedisEventPubSubConfig struct { - Topic string `json:"topic"` // topic key, 不指定随机生成 - PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 + Topic string `json:"topic"` // topic key, 不指定随机生成 + PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 + MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小 + CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms } diff --git a/redis_pub_sub.go b/redis_pub_sub.go index 6db7505..e6eefd2 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -9,9 +9,11 @@ package event import ( "context" + "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" "github.com/redis/go-redis/v9" + "time" ) var ( @@ -40,6 +42,8 @@ func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisE type RedisEventPubSub struct { redisClient *redis.Client // redis客户端 pubSubConfig *define.RedisEventPubSubConfig // 事件配置 + messageChan chan *define.EventData // 消息队列 + stopConsumer chan bool // 停止消费者 } func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { @@ -77,14 +81,34 @@ func (r *RedisEventPubSub) ConsumeSuccessCallback(eventData *define.EventData) { panic("implement me") } +// Destroy 销毁事件实例 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 15:42 2024/6/26 func (r *RedisEventPubSub) Destroy() { - //TODO implement me - panic("implement me") + r.stopConsumer <- true // 停止消费者 + messageChan := make(chan bool, 1) + go func() { + for { + if len(r.messageChan) == 0 { + messageChan <- true + return + } + } + }() + select { + case <-time.After(time.Millisecond * time.Duration(r.pubSubConfig.CloseMaxWaitTime)): + // 定时器到期 + return + case <-messageChan: + // 没有待消费数据 + return + } } func (r *RedisEventPubSub) DriverType() string { - //TODO implement me - panic("implement me") + return consts.EventDriverRedis } // SetRedisClient 设置redis客户端 @@ -95,8 +119,9 @@ func (r *RedisEventPubSub) DriverType() string { func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { if nil == pubSubConfig { pubSubConfig = &define.RedisEventPubSubConfig{ - Topic: "", - PartitionNum: 0, + Topic: "", + PartitionNum: 0, + MessageBufferSize: 0, } } if len(pubSubConfig.Topic) == 0 { @@ -105,6 +130,14 @@ func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfi if pubSubConfig.PartitionNum <= 0 { pubSubConfig.PartitionNum = define.DefaultRedisPartitionNum } + if pubSubConfig.MessageBufferSize <= 0 { + r.pubSubConfig.MessageBufferSize = define.DefaultRedisMessageBufferSize + } + if pubSubConfig.CloseMaxWaitTime <= 0 { + r.pubSubConfig.CloseMaxWaitTime = define.DefaultRedisCloseMaxWaitTime + } r.redisClient = redisClient r.pubSubConfig = pubSubConfig + r.stopConsumer = make(chan bool, 1) + r.messageChan = make(chan *define.EventData, r.pubSubConfig.MessageBufferSize) } -- 2.36.6 From 6f56ff3c4f8749c5b249896336e9850de9852198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 26 Jun 2024 18:15:34 +0800 Subject: [PATCH 08/17] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E6=8A=BD=E8=B1=A1=20+=20redis=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 72 +++++++++++++++++++++----------- define/handler.go | 33 +++++++++++++++ redis_pub_sub.go | 101 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 159 insertions(+), 47 deletions(-) create mode 100644 define/handler.go diff --git a/abstract/IEvent.go b/abstract/IEvent.go index 54029fe..3e7997f 100644 --- a/abstract/IEvent.go +++ b/abstract/IEvent.go @@ -12,6 +12,48 @@ import ( "git.zhangdeman.cn/zhangdeman/event/define" ) +// EventHandler 事件数据处理函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:08 2024/6/26 +type EventHandler func(eventData *define.EventData) (map[string]any, error) + +// SendFailCallback 发送事件失败的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:31 2024/6/26 +type SendFailCallback func(ctx context.Context, eventResult *define.SendResult) + +// SendSuccessCallback 发送事件成功的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:32 2024/6/26 +type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult, err error) + +// ConsumeFailCallbackHandler 时间处理成功回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:18 2024/6/26 +type ConsumeFailCallbackHandler func(eventData *define.EventData, handleResult map[string]any, err error) + +// ConsumeSuccessCallback 时间处理失败回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:20 2024/6/26 +type ConsumeSuccessCallback func(eventData *define.EventData, handleResult map[string]any) + +// PanicCallback panic回调, 根据不同异常类型, eventData / handleResult 均可能为nil +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:07 2024/6/26 +type PanicCallback func(err any, eventData *define.EventData, handleResult map[string]any) + // IEvent 事件接口定义 // // Author : go_developer@163.com<白茶清欢> @@ -23,44 +65,26 @@ type IEvent interface { // Author : go_developer@163.com<白茶清欢> // // Date : 12:04 2024/3/11 - SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) + SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback) (*define.SendResult, error) // SendEventAsync 发送事件(异步) // // Author : go_developer@163.com<白茶清欢> // // Date : 15:58 2024/6/25 - SendEventAsync(ctx context.Context, eventData *define.EventData) - // SendFailCallback 发送失败的回调方法 + SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback) + // GetConsumeEventChan 或去消息消费的channel, 自行实现消费 // // Author : go_developer@163.com<白茶清欢> // - // Date : 12:08 2024/3/11 - SendFailCallback(ctx context.Context, eventResult *define.SendResult) - // SendSuccessCallback 发送成功的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 12:08 2024/3/11 - SendSuccessCallback(ctx context.Context, eventResult *define.SendResult, err error) + // Date : 17:11 2024/6/26 + GetConsumeEventChan() (<-chan *define.EventData, error) // ConsumeEvent 消费事件 // // Author : go_developer@163.com<白茶清欢> // // Date : 12:05 2024/3/11 - ConsumeEvent() (<-chan *define.EventData, error) - // ConsumeFailCallback 消费失败的回调, eventData 可能为 NIL - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 12:09 2024/3/11 - ConsumeFailCallback(eventData *define.EventData, err error) - // ConsumeSuccessCallback 消费成功的回调 - // - // Author : go_developer@163.com<白茶清欢> - // - // Date : 12:10 2024/3/11 - ConsumeSuccessCallback(eventData *define.EventData) + ConsumeEvent(handler EventHandler, successCallback ConsumeSuccessCallback, failureCallback ConsumeFailCallbackHandler) error // Destroy 事件实例销毁时, 执行的方法 // // Author : go_developer@163.com<白茶清欢> diff --git a/define/handler.go b/define/handler.go new file mode 100644 index 0000000..c6722ca --- /dev/null +++ b/define/handler.go @@ -0,0 +1,33 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-06-26 17:21 +package define + +// DefaultSuccessCallbackHandler ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:21 2024/6/26 +func DefaultSuccessCallbackHandler(eventData *EventData, handleResult map[string]any) { + +} + +// DefaultFailCallbackHandler ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:11 2024/6/26 +func DefaultFailCallbackHandler(eventData *EventData, handleResult map[string]any, err error) { + +} + +// DefaultPanicCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:11 2024/6/26 +func DefaultPanicCallback(err any, eventData *EventData, handleResult map[string]any) {} diff --git a/redis_pub_sub.go b/redis_pub_sub.go index e6eefd2..ccb3466 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -9,6 +9,8 @@ package event import ( "context" + "errors" + "fmt" "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" @@ -40,45 +42,94 @@ func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisE // // Date : 16:07 2024/6/25 type RedisEventPubSub struct { - redisClient *redis.Client // redis客户端 - pubSubConfig *define.RedisEventPubSubConfig // 事件配置 - messageChan chan *define.EventData // 消息队列 - stopConsumer chan bool // 停止消费者 + redisClient *redis.Client // redis客户端 + pubSubConfig *define.RedisEventPubSubConfig // 事件配置 + messageChan chan *define.EventData // 消息队列 + stopConsumer chan bool // 停止消费者 + isStop bool // 是否已停止 + panicCallback abstract.PanicCallback // panic回调 } -func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { +func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) (*define.SendResult, error) { //TODO implement me panic("implement me") } -func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData) { +func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { //TODO implement me panic("implement me") } -func (r *RedisEventPubSub) SendFailCallback(ctx context.Context, eventResult *define.SendResult) { - //TODO implement me - panic("implement me") +// GetConsumeEventChan 获取消息channel, 自行实现消费 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:13 2024/6/26 +func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, error) { + if r.isStop { + return nil, errors.New("event instance has stop") + } + return r.messageChan, nil } -func (r *RedisEventPubSub) SendSuccessCallback(ctx context.Context, eventResult *define.SendResult, err error) { - //TODO implement me - panic("implement me") +// SetPanicCallback 出现任何panic的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:02 2024/6/26 +func (r *RedisEventPubSub) SetPanicCallback(panicCallback abstract.PanicCallback) { + if nil == panicCallback { + panicCallback = define.DefaultPanicCallback + } + r.panicCallback = panicCallback } -func (r *RedisEventPubSub) ConsumeEvent() (<-chan *define.EventData, error) { - //TODO implement me - panic("implement me") -} +// ConsumeEvent 获取数据消费实例 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:06 2024/6/26 +func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { + if nil == handler { + return errors.New("handler is nil") + } + var ( + messageChan <-chan *define.EventData + err error + ) -func (r *RedisEventPubSub) ConsumeFailCallback(eventData *define.EventData, err error) { - //TODO implement me - panic("implement me") -} + if messageChan, err = r.GetConsumeEventChan(); nil != err { + return err + } + if nil == failureCallback { + failureCallback = define.DefaultFailCallbackHandler + } + if nil == successCallback { + successCallback = define.DefaultSuccessCallbackHandler + } + go func() { + defer func() { + if panicErr := recover(); nil != panicErr { + fmt.Println(r) + } + }() + for !r.isStop || (r.isStop && len(messageChan) == 0) { + select { + case eventData := <-messageChan: + handlerResult, handlerErr := handler(eventData) + if nil != handlerErr { + // 失败回调 + failureCallback(eventData, handlerResult, handlerErr) + break + } else { + // 成功回调 + successCallback(eventData, handlerResult) + } + } + } + }() -func (r *RedisEventPubSub) ConsumeSuccessCallback(eventData *define.EventData) { - //TODO implement me - panic("implement me") + return nil } // Destroy 销毁事件实例 @@ -87,6 +138,10 @@ func (r *RedisEventPubSub) ConsumeSuccessCallback(eventData *define.EventData) { // // Date : 15:42 2024/6/26 func (r *RedisEventPubSub) Destroy() { + if r.isStop { + // 已停止 + return + } r.stopConsumer <- true // 停止消费者 messageChan := make(chan bool, 1) go func() { -- 2.36.6 From 414ad47e7234d2bfaf22c44156b431fd5266d460 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Thu, 27 Jun 2024 11:50:13 +0800 Subject: [PATCH 09/17] =?UTF-8?q?redis=E9=A9=B1=E5=8A=A8=E5=8F=91=E9=80=81?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 6 ++--- define/data.go | 12 +++++----- define/handler.go | 17 ++++++++++++++ go.mod | 9 +++++--- go.sum | 15 +++++++++++-- redis_pub_sub.go | 55 +++++++++++++++++++++++++++++++++++++++++----- 6 files changed, 95 insertions(+), 19 deletions(-) diff --git a/abstract/IEvent.go b/abstract/IEvent.go index 3e7997f..f368009 100644 --- a/abstract/IEvent.go +++ b/abstract/IEvent.go @@ -24,14 +24,14 @@ type EventHandler func(eventData *define.EventData) (map[string]any, error) // Author : go_developer@163.com<白茶清欢> // // Date : 17:31 2024/6/26 -type SendFailCallback func(ctx context.Context, eventResult *define.SendResult) +type SendFailCallback func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error) // SendSuccessCallback 发送事件成功的回调 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:32 2024/6/26 -type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult, err error) +type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult) // ConsumeFailCallbackHandler 时间处理成功回调 // @@ -65,7 +65,7 @@ type IEvent interface { // Author : go_developer@163.com<白茶清欢> // // Date : 12:04 2024/3/11 - SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback) (*define.SendResult, error) + SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) // SendEventAsync 发送事件(异步) // diff --git a/define/data.go b/define/data.go index baeb823..09d2427 100644 --- a/define/data.go +++ b/define/data.go @@ -28,10 +28,10 @@ type EventData struct { // // Date : 15:56 2024/6/25 type SendResult struct { - Data *EventData `json:"data"` // 发送的数据 - PartitionNum int `json:"partition_num"` // 分区索引编号 - Topic string `json:"topic"` // 使用的真实topic - IsSuccess bool `json:"is_success"` // 是否发送成功 - FailReason string `json:"fail_reason"` // 失败原因 - Extension map[string]any `json:"extension"` // 扩展数据 + Data *EventData `json:"data"` // 发送的数据 + Partition int `json:"partition_num"` // 分区索引编号 + Topic string `json:"topic"` // 使用的真实topic + IsSuccess bool `json:"is_success"` // 是否发送成功 + FailReason string `json:"fail_reason"` // 失败原因 + Extension map[string]any `json:"extension"` // 扩展数据 } diff --git a/define/handler.go b/define/handler.go index c6722ca..897c346 100644 --- a/define/handler.go +++ b/define/handler.go @@ -7,6 +7,8 @@ // Date : 2024-06-26 17:21 package define +import "context" + // DefaultSuccessCallbackHandler ... // // Author : go_developer@163.com<白茶清欢> @@ -31,3 +33,18 @@ func DefaultFailCallbackHandler(eventData *EventData, handleResult map[string]an // // Date : 18:11 2024/6/26 func DefaultPanicCallback(err any, eventData *EventData, handleResult map[string]any) {} + +// DefaultSendFailCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:45 2024/6/27 +func DefaultSendFailCallback(ctx context.Context, eventData *EventData, eventResult *SendResult, err error) { +} + +// DefaultSendSuccessCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:46 2024/6/27 +func DefaultSendSuccessCallback(ctx context.Context, eventResult *SendResult) {} diff --git a/go.mod b/go.mod index 6e362bf..5f5e214 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,16 @@ go 1.21 toolchain go1.22.1 require ( - git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 // indirect + git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 + git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50 + github.com/redis/go-redis/v9 v9.5.3 +) + +require ( git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 // indirect git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd // indirect git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect - git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e // indirect github.com/BurntSushi/toml v1.4.0 // indirect github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -18,7 +22,6 @@ require ( github.com/go-ini/ini v1.67.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mozillazg/go-pinyin v0.20.0 // indirect - github.com/redis/go-redis/v9 v9.5.3 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/tidwall/gjson v1.17.1 // indirect github.com/tidwall/match v1.1.1 // indirect diff --git a/go.sum b/go.sum index 0a006c1..5f45336 100644 --- a/go.sum +++ b/go.sum @@ -8,14 +8,20 @@ git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd h1:2Y3 git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd/go.mod h1:6+7whkCmb4sJDIfH3HxNuXRveaM0gCCNWd2uXZqNtIE= git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e h1:Q973S6CcWr1ICZhFI1STFOJ+KUImCl2BaIXm6YppBqI= git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI= -git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e h1:+PeWa2QdYBWnL32CfAAgy0dlaRCVNmYZDH4q+9w7Gfg= -git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240612083858-8d056baada2e/go.mod h1:US/pcq2vstE3iyxIHf53w8IeXKkZys7bj/ozLWkRYeE= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50 h1:olo34i2Gq5gX7bYPv5TR4X5l5CrYFtu9UCElkYlmL2c= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50/go.mod h1:US/pcq2vstE3iyxIHf53w8IeXKkZys7bj/ozLWkRYeE= github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= @@ -24,10 +30,14 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -35,6 +45,7 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/redis_pub_sub.go b/redis_pub_sub.go index ccb3466..6289618 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -14,6 +14,7 @@ import ( "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" + "git.zhangdeman.cn/zhangdeman/wrapper" "github.com/redis/go-redis/v9" "time" ) @@ -50,14 +51,58 @@ type RedisEventPubSub struct { panicCallback abstract.PanicCallback // panic回调 } -func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) (*define.SendResult, error) { - //TODO implement me - panic("implement me") +// SendEvent 发布时间 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 10:58 2024/6/27 +func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { + partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(r.pubSubConfig.PartitionNum)) + realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, partition) + res := r.redisClient.Publish(ctx, realTopic, eventData) + if nil == res { + return nil, errors.New("can not get send event result") + } + return &define.SendResult{ + Data: eventData, + Partition: partition, + Topic: realTopic, + IsSuccess: res.Err() == nil, + FailReason: wrapper.TernaryOperator.String(res.Err() == nil, "success", wrapper.String(res.Err().Error())).Value(), + Extension: nil, + }, res.Err() } +// SendEventAsync 异步发送事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:29 2024/6/27 func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { - //TODO implement me - panic("implement me") + go func() { + if nil == sendFailCallback { + sendFailCallback = define.DefaultSendFailCallback + } + if nil == sendSuccessCallback { + sendSuccessCallback = define.DefaultSendSuccessCallback + } + var ( + sendResult *define.SendResult + handlerErr error + ) + defer func() { + if panicErr := recover(); nil != panicErr { + r.panicCallback(panicErr, eventData, map[string]any{ + "send_result": sendResult, + }) + } + }() + if sendResult, handlerErr = r.SendEvent(ctx, eventData); nil != handlerErr { + sendFailCallback(ctx, eventData, sendResult, handlerErr) + } else { + sendSuccessCallback(ctx, sendResult) + } + }() } // GetConsumeEventChan 获取消息channel, 自行实现消费 -- 2.36.6 From 7fe6003db876e09d2a5f3791b8c442bffe8caa19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Thu, 27 Jun 2024 12:00:15 +0800 Subject: [PATCH 10/17] =?UTF-8?q?redis=E4=BA=8B=E4=BB=B6=E5=A4=9A=E5=88=86?= =?UTF-8?q?=E5=8C=BA=E6=B6=88=E8=B4=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- redis_pub_sub.go | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/redis_pub_sub.go b/redis_pub_sub.go index 6289618..51792cb 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -152,27 +152,28 @@ func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCa if nil == successCallback { successCallback = define.DefaultSuccessCallbackHandler } - go func() { - defer func() { - if panicErr := recover(); nil != panicErr { - fmt.Println(r) - } - }() - for !r.isStop || (r.isStop && len(messageChan) == 0) { - select { - case eventData := <-messageChan: - handlerResult, handlerErr := handler(eventData) - if nil != handlerErr { - // 失败回调 - failureCallback(eventData, handlerResult, handlerErr) - break - } else { - // 成功回调 - successCallback(eventData, handlerResult) + for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ { + go func(realPartition int) { + defer func() { + if panicErr := recover(); nil != panicErr { + r.panicCallback(panicErr, nil, nil) + } + }() + for !r.isStop || (r.isStop && len(messageChan) == 0) { + select { + case eventData := <-messageChan: + handlerResult, handlerErr := handler(eventData) + if nil != handlerErr { + // 失败回调 + failureCallback(eventData, handlerResult, handlerErr) + } else { + // 成功回调 + successCallback(eventData, handlerResult) + } } } - } - }() + }(partition) + } return nil } -- 2.36.6 From da01f284b658abfd18902172bcaba53a4a7bd1d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Thu, 27 Jun 2024 18:37:22 +0800 Subject: [PATCH 11/17] =?UTF-8?q?=E5=AE=8C=E6=88=90v1=E7=89=88=E6=9C=ACred?= =?UTF-8?q?is=E6=97=B6=E9=97=B4=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- redis_pub_sub.go | 47 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/redis_pub_sub.go b/redis_pub_sub.go index 51792cb..8b36554 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -14,6 +14,7 @@ import ( "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" + "git.zhangdeman.cn/zhangdeman/serialize" "git.zhangdeman.cn/zhangdeman/wrapper" "github.com/redis/go-redis/v9" "time" @@ -34,6 +35,7 @@ func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisE pubSubConfig: pubSubConfig, } instance.SetRedisClient(redisClient, pubSubConfig) + instance.StartConsumer() // 启动消费者 RedisEventPubSubClient = instance } @@ -152,6 +154,25 @@ func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCa if nil == successCallback { successCallback = define.DefaultSuccessCallbackHandler } + + for eventData := range messageChan { + handlerResult, handleErr := handler(eventData) + if nil != handleErr { + failureCallback(eventData, handlerResult, err) + } else { + successCallback(eventData, handlerResult) + } + } + + return nil +} + +// StartConsumer 启动消费者 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:36 2024/6/27 +func (r *RedisEventPubSub) StartConsumer() { for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ { go func(realPartition int) { defer func() { @@ -159,23 +180,23 @@ func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCa r.panicCallback(panicErr, nil, nil) } }() - for !r.isStop || (r.isStop && len(messageChan) == 0) { - select { - case eventData := <-messageChan: - handlerResult, handlerErr := handler(eventData) - if nil != handlerErr { - // 失败回调 - failureCallback(eventData, handlerResult, handlerErr) - } else { - // 成功回调 - successCallback(eventData, handlerResult) - } + // 启动消费者 + realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, realPartition) + messageChannel := r.redisClient.Subscribe(context.Background(), realTopic).Channel() + for message := range messageChannel { + var ( + eventData define.EventData + err error + ) + + if err = serialize.JSON.UnmarshalWithNumber([]byte(message.Payload), &eventData); nil != err { + // TODO : 事件数据解析失败的回调 + continue } + r.messageChan <- &eventData } }(partition) } - - return nil } // Destroy 销毁事件实例 -- 2.36.6 From 2f6062a645ac23ef6599083590a60335b1ec4c67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Thu, 27 Jun 2024 18:39:51 +0800 Subject: [PATCH 12/17] =?UTF-8?q?=E5=8D=87=E7=BA=A7=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=9A=84=E6=8E=A5=E5=8F=A3=E7=BA=A6=E6=9D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/abstract/IEvent.go b/abstract/IEvent.go index f368009..0f57d22 100644 --- a/abstract/IEvent.go +++ b/abstract/IEvent.go @@ -97,4 +97,10 @@ type IEvent interface { // // Date : 12:06 2024/3/11 DriverType() string + // SetPanicCallback 设置失败回调的处理函数 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 18:39 2024/6/27 + SetPanicCallback(panicCallback PanicCallback) } -- 2.36.6 From d9244e0f0c1ed4bc5ce49256a733218e58c8a60f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Thu, 27 Jun 2024 18:52:20 +0800 Subject: [PATCH 13/17] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E5=A4=B1=E8=B4=A5=E7=9A=84=E5=9B=9E=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract/IEvent.go | 13 +++++++++++++ define/handler.go | 9 +++++++++ redis_pub_sub.go | 25 +++++++++++++++++++------ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/abstract/IEvent.go b/abstract/IEvent.go index 0f57d22..12cd6a6 100644 --- a/abstract/IEvent.go +++ b/abstract/IEvent.go @@ -54,6 +54,13 @@ type ConsumeSuccessCallback func(eventData *define.EventData, handleResult map[s // Date : 18:07 2024/6/26 type PanicCallback func(err any, eventData *define.EventData, handleResult map[string]any) +// EventParseFailCallback ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:48 2024/6/27 +type EventParseFailCallback func(err error, eventData string) + // IEvent 事件接口定义 // // Author : go_developer@163.com<白茶清欢> @@ -103,4 +110,10 @@ type IEvent interface { // // Date : 18:39 2024/6/27 SetPanicCallback(panicCallback PanicCallback) + // SetEventParseFailCallback 设置数据解析失败的处理函数 + // + // Author : go_developer@163.com<白茶清欢> + // + // Date : 18:51 2024/6/27 + SetEventParseFailCallback(parseFailCallbackCallback EventParseFailCallback) } diff --git a/define/handler.go b/define/handler.go index 897c346..305660f 100644 --- a/define/handler.go +++ b/define/handler.go @@ -48,3 +48,12 @@ func DefaultSendFailCallback(ctx context.Context, eventData *EventData, eventRes // // Date : 11:46 2024/6/27 func DefaultSendSuccessCallback(ctx context.Context, eventResult *SendResult) {} + +// DefaultParseFailCallbackFunc ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:49 2024/6/27 +func DefaultParseFailCallbackFunc(err error, eventData string) { + +} diff --git a/redis_pub_sub.go b/redis_pub_sub.go index 8b36554..7eb406a 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -45,12 +45,13 @@ func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisE // // Date : 16:07 2024/6/25 type RedisEventPubSub struct { - redisClient *redis.Client // redis客户端 - pubSubConfig *define.RedisEventPubSubConfig // 事件配置 - messageChan chan *define.EventData // 消息队列 - stopConsumer chan bool // 停止消费者 - isStop bool // 是否已停止 - panicCallback abstract.PanicCallback // panic回调 + redisClient *redis.Client // redis客户端 + pubSubConfig *define.RedisEventPubSubConfig // 事件配置 + messageChan chan *define.EventData // 消息队列 + stopConsumer chan bool // 停止消费者 + isStop bool // 是否已停止 + panicCallback abstract.PanicCallback // panic回调 + parseFailCallback abstract.EventParseFailCallback // 数据解析失败回调 } // SendEvent 发布时间 @@ -131,6 +132,18 @@ func (r *RedisEventPubSub) SetPanicCallback(panicCallback abstract.PanicCallback r.panicCallback = panicCallback } +// SetEventParseFailCallback 设置事件解析失败回回调函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:42 2024/6/27 +func (r *RedisEventPubSub) SetEventParseFailCallback(parseFailCallbackCallback abstract.EventParseFailCallback) { + if nil == parseFailCallbackCallback { + parseFailCallbackCallback = define.DefaultParseFailCallbackFunc + } + r.parseFailCallback = parseFailCallbackCallback +} + // ConsumeEvent 获取数据消费实例 // // Author : go_developer@163.com<白茶清欢> -- 2.36.6 From b5ea528036ab944c8d8f1cdf043af9d4bd969a66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Thu, 11 Jul 2024 19:05:45 +0800 Subject: [PATCH 14/17] update go mod --- go.mod | 4 ++-- go.sum | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 5f5e214..ac2d75d 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,8 @@ go 1.21 toolchain go1.22.1 require ( - git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 + git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de + git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20240627031706-9ff1c213bb50 github.com/redis/go-redis/v9 v9.5.3 ) @@ -13,7 +14,6 @@ require ( require ( git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 // indirect git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect - git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20240618035451-8d48a6bd39dd // indirect git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect github.com/BurntSushi/toml v1.4.0 // indirect github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect diff --git a/go.sum b/go.sum index 5f45336..ceba8db 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38 h1:+o+BI5GGlwJelPLWL8ciDJqxw/G8dv+FU6OztGSnEjo= git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240625075535-c3417b35fe38/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de h1:ksjcMHupU0Bw0BJxJp3dajmWqGdqV7k2eVohN5O3S9Q= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20240709134122-e1e2a2e421de/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211 h1:I/wOsRpCSRkU9vo1u703slQsmK0wnNeZzsWQOGtIAG0= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20240311030808-e2a2e6a3c211/go.mod h1:SrtvrQRdzt+8KfYzvosH++gWxo2ShPTzR1m3VQ6uX7U= git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 h1:gUDlQMuJ4xNfP2Abl1Msmpa3fASLWYkNlqDFF/6GN0Y= -- 2.36.6 From 10f0eb484f51f3a0cae735c5c8aba1de70885023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 17 Jul 2024 12:47:09 +0800 Subject: [PATCH 15/17] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E?= =?UTF-8?q?=E5=86=85=E5=AD=98=E7=9A=84=E4=BA=8B=E4=BB=B6=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- base.go | 42 +++++++++++++ define/memory.go | 25 ++++++++ memory.go | 154 +++++++++++++++++++++++++++++++++++++++++++++++ redis_pub_sub.go | 30 ++------- 4 files changed, 226 insertions(+), 25 deletions(-) create mode 100644 base.go create mode 100644 define/memory.go create mode 100644 memory.go diff --git a/base.go b/base.go new file mode 100644 index 0000000..99e5470 --- /dev/null +++ b/base.go @@ -0,0 +1,42 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 12:31 +package event + +import ( + "git.zhangdeman.cn/zhangdeman/event/abstract" + "git.zhangdeman.cn/zhangdeman/event/define" +) + +type base struct { + panicCallback abstract.PanicCallback + parseFailCallback abstract.EventParseFailCallback +} + +// SetPanicCallback 出现任何panic的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:02 2024/6/26 +func (b *base) SetPanicCallback(panicCallback abstract.PanicCallback) { + if nil == panicCallback { + panicCallback = define.DefaultPanicCallback + } + b.panicCallback = panicCallback +} + +// SetEventParseFailCallback 设置事件解析失败回回调函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:42 2024/6/27 +func (b *base) SetEventParseFailCallback(parseFailCallbackCallback abstract.EventParseFailCallback) { + if nil == parseFailCallbackCallback { + parseFailCallbackCallback = define.DefaultParseFailCallbackFunc + } + b.parseFailCallback = parseFailCallbackCallback +} diff --git a/define/memory.go b/define/memory.go new file mode 100644 index 0000000..106a12b --- /dev/null +++ b/define/memory.go @@ -0,0 +1,25 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 11:25 +package define + +const ( + DefaultMemoryPartitionNum = 1 // 默认的分区数量 + DefaultMemoryChannelSize = 1024 // 默认的消息channel大小 + DefaultMemoryCloseMaxWaitTime = 5000 // 默认最大等待 : 5s +) + +// MemoryEventConfig 内存事件的配置 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:26 2024/7/17 +type MemoryEventConfig struct { + PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 + MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小 + CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms +} diff --git a/memory.go b/memory.go new file mode 100644 index 0000000..4d210d9 --- /dev/null +++ b/memory.go @@ -0,0 +1,154 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 11:21 +package event + +import ( + "context" + "errors" + "git.zhangdeman.cn/zhangdeman/consts" + "git.zhangdeman.cn/zhangdeman/event/abstract" + "git.zhangdeman.cn/zhangdeman/event/define" + "git.zhangdeman.cn/zhangdeman/wrapper" +) + +var ( + MemoryEventClient abstract.IEvent +) + +// InitMemoryEvent 初始化事件实例 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:24 2024/7/17 +func InitMemoryEvent(cfg *define.MemoryEventConfig) { + instance := &MemoryEvent{ + base: &base{ + panicCallback: define.DefaultPanicCallback, + parseFailCallback: define.DefaultParseFailCallbackFunc, + }, + } + instance.Init(cfg) + MemoryEventClient = instance +} + +// MemoryEvent 基于内存实现的消息队列(仅适用于单机) +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:22 2024/7/17 +type MemoryEvent struct { + *base + cfg *define.MemoryEventConfig + messageChannel map[int]chan *define.EventData +} + +// SendEvent 发送事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:28 2024/7/17 +func (m *MemoryEvent) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { + if nil == eventData { + return nil, errors.New("event data is nil") + } + partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(m.cfg.PartitionNum)) + m.messageChannel[partition] <- eventData + return &define.SendResult{ + Data: eventData, + Partition: partition, + Topic: "", + IsSuccess: true, + FailReason: "success", + Extension: nil, + }, nil +} + +// SendEventAsync 异步发送事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:29 2024/7/17 +func (m *MemoryEvent) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { + if nil == sendFailCallback { + sendFailCallback = define.DefaultSendFailCallback + } + if nil == sendSuccessCallback { + sendSuccessCallback = define.DefaultSendSuccessCallback + } + go func() { + var ( + res *define.SendResult + err error + ) + defer func() { + if r := recover(); nil != r { + m.panicCallback(err, eventData, map[string]any{ + "send_result": res, + }) + } + }() + res, err = m.SendEvent(ctx, eventData) + + if nil != err { + sendFailCallback(ctx, eventData, res, err) + return + } else { + sendSuccessCallback(ctx, res) + } + }() +} + +// GetConsumeEventChan 消费的消息chan +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:41 2024/7/17 +func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) { + //TODO implement me + panic("implement me") +} + +func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { + //TODO implement me + panic("implement me") +} + +func (m *MemoryEvent) Destroy() { + //TODO implement me + panic("implement me") +} + +func (m *MemoryEvent) DriverType() string { + return consts.EventDriverMemory +} + +// Init 初始化 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:08 2024/7/17 +func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) { + if nil == cfg { + cfg = &define.MemoryEventConfig{} + } + if cfg.PartitionNum <= 0 { + cfg.PartitionNum = define.DefaultMemoryPartitionNum + } + if cfg.MessageBufferSize <= 0 { + cfg.MessageBufferSize = define.DefaultMemoryChannelSize + } + if cfg.CloseMaxWaitTime <= 0 { + cfg.CloseMaxWaitTime = define.DefaultMemoryCloseMaxWaitTime + } + m.cfg = cfg + // 初始化内存 channel + m.messageChannel = make(map[int]chan *define.EventData) + for num := 0; num < cfg.PartitionNum; num++ { + m.messageChannel[num] = make(chan *define.EventData, cfg.MessageBufferSize) + } +} diff --git a/redis_pub_sub.go b/redis_pub_sub.go index 7eb406a..b6d2c39 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -31,6 +31,10 @@ var ( // Date : 16:24 2024/6/25 func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { instance := &RedisEventPubSub{ + base: &base{ + panicCallback: define.DefaultPanicCallback, + parseFailCallback: define.DefaultParseFailCallbackFunc, + }, redisClient: redisClient, pubSubConfig: pubSubConfig, } @@ -45,12 +49,12 @@ func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisE // // Date : 16:07 2024/6/25 type RedisEventPubSub struct { + *base redisClient *redis.Client // redis客户端 pubSubConfig *define.RedisEventPubSubConfig // 事件配置 messageChan chan *define.EventData // 消息队列 stopConsumer chan bool // 停止消费者 isStop bool // 是否已停止 - panicCallback abstract.PanicCallback // panic回调 parseFailCallback abstract.EventParseFailCallback // 数据解析失败回调 } @@ -120,30 +124,6 @@ func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, erro return r.messageChan, nil } -// SetPanicCallback 出现任何panic的回调 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 18:02 2024/6/26 -func (r *RedisEventPubSub) SetPanicCallback(panicCallback abstract.PanicCallback) { - if nil == panicCallback { - panicCallback = define.DefaultPanicCallback - } - r.panicCallback = panicCallback -} - -// SetEventParseFailCallback 设置事件解析失败回回调函数 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 18:42 2024/6/27 -func (r *RedisEventPubSub) SetEventParseFailCallback(parseFailCallbackCallback abstract.EventParseFailCallback) { - if nil == parseFailCallbackCallback { - parseFailCallbackCallback = define.DefaultParseFailCallbackFunc - } - r.parseFailCallback = parseFailCallbackCallback -} - // ConsumeEvent 获取数据消费实例 // // Author : go_developer@163.com<白茶清欢> -- 2.36.6 From 5fc71de49a999d0238a6794320fe17aac3e8e698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 17 Jul 2024 12:55:51 +0800 Subject: [PATCH 16/17] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E6=81=AFchannel=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- define/memory.go | 2 -- memory.go | 19 +++++-------------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/define/memory.go b/define/memory.go index 106a12b..a5a5d04 100644 --- a/define/memory.go +++ b/define/memory.go @@ -8,7 +8,6 @@ package define const ( - DefaultMemoryPartitionNum = 1 // 默认的分区数量 DefaultMemoryChannelSize = 1024 // 默认的消息channel大小 DefaultMemoryCloseMaxWaitTime = 5000 // 默认最大等待 : 5s ) @@ -19,7 +18,6 @@ const ( // // Date : 11:26 2024/7/17 type MemoryEventConfig struct { - PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小 CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms } diff --git a/memory.go b/memory.go index 4d210d9..8491a25 100644 --- a/memory.go +++ b/memory.go @@ -13,7 +13,6 @@ import ( "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" - "git.zhangdeman.cn/zhangdeman/wrapper" ) var ( @@ -44,7 +43,7 @@ func InitMemoryEvent(cfg *define.MemoryEventConfig) { type MemoryEvent struct { *base cfg *define.MemoryEventConfig - messageChannel map[int]chan *define.EventData + messageChannel chan *define.EventData } // SendEvent 发送事件 @@ -56,11 +55,10 @@ func (m *MemoryEvent) SendEvent(ctx context.Context, eventData *define.EventData if nil == eventData { return nil, errors.New("event data is nil") } - partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(m.cfg.PartitionNum)) - m.messageChannel[partition] <- eventData + m.messageChannel <- eventData return &define.SendResult{ Data: eventData, - Partition: partition, + Partition: 0, Topic: "", IsSuccess: true, FailReason: "success", @@ -109,8 +107,7 @@ func (m *MemoryEvent) SendEventAsync(ctx context.Context, eventData *define.Even // // Date : 12:41 2024/7/17 func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) { - //TODO implement me - panic("implement me") + return m.messageChannel, nil } func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { @@ -136,9 +133,6 @@ func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) { if nil == cfg { cfg = &define.MemoryEventConfig{} } - if cfg.PartitionNum <= 0 { - cfg.PartitionNum = define.DefaultMemoryPartitionNum - } if cfg.MessageBufferSize <= 0 { cfg.MessageBufferSize = define.DefaultMemoryChannelSize } @@ -147,8 +141,5 @@ func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) { } m.cfg = cfg // 初始化内存 channel - m.messageChannel = make(map[int]chan *define.EventData) - for num := 0; num < cfg.PartitionNum; num++ { - m.messageChannel[num] = make(chan *define.EventData, cfg.MessageBufferSize) - } + m.messageChannel = make(chan *define.EventData, cfg.MessageBufferSize) } -- 2.36.6 From f7c435709753fc59537494bca3f51275ba3adf23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 17 Jul 2024 15:44:26 +0800 Subject: [PATCH 17/17] =?UTF-8?q?=E5=9F=BA=E4=BA=8E=E5=86=85=E5=AD=98?= =?UTF-8?q?=E7=9A=84=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- memory.go | 43 +++++++++++++++++++++++++++++++++++++++---- memory_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 4 deletions(-) create mode 100644 memory_test.go diff --git a/memory.go b/memory.go index 8491a25..5cf03e7 100644 --- a/memory.go +++ b/memory.go @@ -10,6 +10,7 @@ package event import ( "context" "errors" + "fmt" "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" @@ -110,14 +111,48 @@ func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) { return m.messageChannel, nil } +// ConsumeEvent 消费事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 14:14 2024/7/17 func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { - //TODO implement me - panic("implement me") + messageChan, _ := m.GetConsumeEventChan() + if nil == successCallback { + successCallback = define.DefaultSuccessCallbackHandler + } + if nil == failureCallback { + failureCallback = define.DefaultFailCallbackHandler + } + go func() { + defer func() { + if r := recover(); nil != r { + fmt.Println("出现异常", r) + } + }() + for eventData := range messageChan { + var ( + err error + handleResult map[string]any + ) + if handleResult, err = handler(eventData); nil != err { + failureCallback(eventData, handleResult, err) + } else { + successCallback(eventData, handleResult) + } + } + }() + + return nil } +// Destroy 实例销毁 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 14:17 2024/7/17 func (m *MemoryEvent) Destroy() { - //TODO implement me - panic("implement me") + return } func (m *MemoryEvent) DriverType() string { diff --git a/memory_test.go b/memory_test.go new file mode 100644 index 0000000..6754657 --- /dev/null +++ b/memory_test.go @@ -0,0 +1,50 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 14:17 +package event + +import ( + "context" + "encoding/json" + "fmt" + "git.zhangdeman.cn/zhangdeman/event/define" + "testing" + "time" +) + +func TestInitMemoryEvent(t *testing.T) { + InitMemoryEvent(&define.MemoryEventConfig{ + MessageBufferSize: 1024, + CloseMaxWaitTime: 5000, + }) + go func() { + for { + time.Sleep(time.Second) + MemoryEventClient.SendEventAsync(context.Background(), &define.EventData{ + EventType: "TEST", + TraceID: time.Now().Format("2006-01-02 15:04:05"), + Host: "", + Timestamp: time.Now().Unix(), + SystemTimestamp: 0, + Key: "", + Data: nil, + }, func(ctx context.Context, eventResult *define.SendResult) { + fmt.Println("消息发送成功") + }, func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error) { + fmt.Println("消息发送失败") + }) + } + }() + MemoryEventClient.ConsumeEvent(func(eventData *define.EventData) (map[string]any, error) { + byteData, _ := json.Marshal(eventData) + fmt.Println(string(byteData)) + return map[string]any{}, nil + }, nil, nil) + for { + + } +} -- 2.36.6