diff --git a/base.go b/base.go new file mode 100644 index 0000000..99e5470 --- /dev/null +++ b/base.go @@ -0,0 +1,42 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 12:31 +package event + +import ( + "git.zhangdeman.cn/zhangdeman/event/abstract" + "git.zhangdeman.cn/zhangdeman/event/define" +) + +type base struct { + panicCallback abstract.PanicCallback + parseFailCallback abstract.EventParseFailCallback +} + +// SetPanicCallback 出现任何panic的回调 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:02 2024/6/26 +func (b *base) SetPanicCallback(panicCallback abstract.PanicCallback) { + if nil == panicCallback { + panicCallback = define.DefaultPanicCallback + } + b.panicCallback = panicCallback +} + +// SetEventParseFailCallback 设置事件解析失败回回调函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 18:42 2024/6/27 +func (b *base) SetEventParseFailCallback(parseFailCallbackCallback abstract.EventParseFailCallback) { + if nil == parseFailCallbackCallback { + parseFailCallbackCallback = define.DefaultParseFailCallbackFunc + } + b.parseFailCallback = parseFailCallbackCallback +} diff --git a/define/memory.go b/define/memory.go new file mode 100644 index 0000000..106a12b --- /dev/null +++ b/define/memory.go @@ -0,0 +1,25 @@ +// Package define ... +// +// Description : define ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 11:25 +package define + +const ( + DefaultMemoryPartitionNum = 1 // 默认的分区数量 + DefaultMemoryChannelSize = 1024 // 默认的消息channel大小 + DefaultMemoryCloseMaxWaitTime = 5000 // 默认最大等待 : 5s +) + +// MemoryEventConfig 内存事件的配置 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:26 2024/7/17 +type MemoryEventConfig struct { + PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 + MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小 + CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms +} diff --git a/memory.go b/memory.go new file mode 100644 index 0000000..4d210d9 --- /dev/null +++ b/memory.go @@ -0,0 +1,154 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 11:21 +package event + +import ( + "context" + "errors" + "git.zhangdeman.cn/zhangdeman/consts" + "git.zhangdeman.cn/zhangdeman/event/abstract" + "git.zhangdeman.cn/zhangdeman/event/define" + "git.zhangdeman.cn/zhangdeman/wrapper" +) + +var ( + MemoryEventClient abstract.IEvent +) + +// InitMemoryEvent 初始化事件实例 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:24 2024/7/17 +func InitMemoryEvent(cfg *define.MemoryEventConfig) { + instance := &MemoryEvent{ + base: &base{ + panicCallback: define.DefaultPanicCallback, + parseFailCallback: define.DefaultParseFailCallbackFunc, + }, + } + instance.Init(cfg) + MemoryEventClient = instance +} + +// MemoryEvent 基于内存实现的消息队列(仅适用于单机) +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:22 2024/7/17 +type MemoryEvent struct { + *base + cfg *define.MemoryEventConfig + messageChannel map[int]chan *define.EventData +} + +// SendEvent 发送事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:28 2024/7/17 +func (m *MemoryEvent) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) { + if nil == eventData { + return nil, errors.New("event data is nil") + } + partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(m.cfg.PartitionNum)) + m.messageChannel[partition] <- eventData + return &define.SendResult{ + Data: eventData, + Partition: partition, + Topic: "", + IsSuccess: true, + FailReason: "success", + Extension: nil, + }, nil +} + +// SendEventAsync 异步发送事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:29 2024/7/17 +func (m *MemoryEvent) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) { + if nil == sendFailCallback { + sendFailCallback = define.DefaultSendFailCallback + } + if nil == sendSuccessCallback { + sendSuccessCallback = define.DefaultSendSuccessCallback + } + go func() { + var ( + res *define.SendResult + err error + ) + defer func() { + if r := recover(); nil != r { + m.panicCallback(err, eventData, map[string]any{ + "send_result": res, + }) + } + }() + res, err = m.SendEvent(ctx, eventData) + + if nil != err { + sendFailCallback(ctx, eventData, res, err) + return + } else { + sendSuccessCallback(ctx, res) + } + }() +} + +// GetConsumeEventChan 消费的消息chan +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:41 2024/7/17 +func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) { + //TODO implement me + panic("implement me") +} + +func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { + //TODO implement me + panic("implement me") +} + +func (m *MemoryEvent) Destroy() { + //TODO implement me + panic("implement me") +} + +func (m *MemoryEvent) DriverType() string { + return consts.EventDriverMemory +} + +// Init 初始化 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 12:08 2024/7/17 +func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) { + if nil == cfg { + cfg = &define.MemoryEventConfig{} + } + if cfg.PartitionNum <= 0 { + cfg.PartitionNum = define.DefaultMemoryPartitionNum + } + if cfg.MessageBufferSize <= 0 { + cfg.MessageBufferSize = define.DefaultMemoryChannelSize + } + if cfg.CloseMaxWaitTime <= 0 { + cfg.CloseMaxWaitTime = define.DefaultMemoryCloseMaxWaitTime + } + m.cfg = cfg + // 初始化内存 channel + m.messageChannel = make(map[int]chan *define.EventData) + for num := 0; num < cfg.PartitionNum; num++ { + m.messageChannel[num] = make(chan *define.EventData, cfg.MessageBufferSize) + } +} diff --git a/redis_pub_sub.go b/redis_pub_sub.go index 7eb406a..b6d2c39 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -31,6 +31,10 @@ var ( // Date : 16:24 2024/6/25 func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) { instance := &RedisEventPubSub{ + base: &base{ + panicCallback: define.DefaultPanicCallback, + parseFailCallback: define.DefaultParseFailCallbackFunc, + }, redisClient: redisClient, pubSubConfig: pubSubConfig, } @@ -45,12 +49,12 @@ func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisE // // Date : 16:07 2024/6/25 type RedisEventPubSub struct { + *base redisClient *redis.Client // redis客户端 pubSubConfig *define.RedisEventPubSubConfig // 事件配置 messageChan chan *define.EventData // 消息队列 stopConsumer chan bool // 停止消费者 isStop bool // 是否已停止 - panicCallback abstract.PanicCallback // panic回调 parseFailCallback abstract.EventParseFailCallback // 数据解析失败回调 } @@ -120,30 +124,6 @@ func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, erro return r.messageChan, nil } -// SetPanicCallback 出现任何panic的回调 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 18:02 2024/6/26 -func (r *RedisEventPubSub) SetPanicCallback(panicCallback abstract.PanicCallback) { - if nil == panicCallback { - panicCallback = define.DefaultPanicCallback - } - r.panicCallback = panicCallback -} - -// SetEventParseFailCallback 设置事件解析失败回回调函数 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 18:42 2024/6/27 -func (r *RedisEventPubSub) SetEventParseFailCallback(parseFailCallbackCallback abstract.EventParseFailCallback) { - if nil == parseFailCallbackCallback { - parseFailCallbackCallback = define.DefaultParseFailCallbackFunc - } - r.parseFailCallback = parseFailCallbackCallback -} - // ConsumeEvent 获取数据消费实例 // // Author : go_developer@163.com<白茶清欢>