event/memory.go

181 lines
4.2 KiB
Go
Raw Normal View History

2024-07-17 12:47:09 +08:00
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-07-17 11:21
package event
import (
"context"
"errors"
2024-07-17 15:44:26 +08:00
"fmt"
2024-07-17 12:47:09 +08:00
"git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/event/abstract"
"git.zhangdeman.cn/zhangdeman/event/define"
)
var (
MemoryEventClient abstract.IEvent
)
// InitMemoryEvent 初始化事件实例
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:24 2024/7/17
func InitMemoryEvent(cfg *define.MemoryEventConfig) {
instance := &MemoryEvent{
base: &base{
panicCallback: define.DefaultPanicCallback,
parseFailCallback: define.DefaultParseFailCallbackFunc,
},
}
instance.Init(cfg)
MemoryEventClient = instance
}
// MemoryEvent 基于内存实现的消息队列(仅适用于单机)
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:22 2024/7/17
type MemoryEvent struct {
*base
cfg *define.MemoryEventConfig
2024-07-17 12:55:51 +08:00
messageChannel chan *define.EventData
2024-07-17 12:47:09 +08:00
}
// SendEvent 发送事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:28 2024/7/17
func (m *MemoryEvent) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) {
if nil == eventData {
return nil, errors.New("event data is nil")
}
2024-07-17 12:55:51 +08:00
m.messageChannel <- eventData
2024-07-17 12:47:09 +08:00
return &define.SendResult{
Data: eventData,
2024-07-17 12:55:51 +08:00
Partition: 0,
2024-07-17 12:47:09 +08:00
Topic: "",
IsSuccess: true,
FailReason: "success",
Extension: nil,
}, nil
}
// SendEventAsync 异步发送事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:29 2024/7/17
func (m *MemoryEvent) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) {
if nil == sendFailCallback {
sendFailCallback = define.DefaultSendFailCallback
}
if nil == sendSuccessCallback {
sendSuccessCallback = define.DefaultSendSuccessCallback
}
go func() {
var (
res *define.SendResult
err error
)
defer func() {
if r := recover(); nil != r {
m.panicCallback(err, eventData, map[string]any{
"send_result": res,
})
}
}()
res, err = m.SendEvent(ctx, eventData)
if nil != err {
sendFailCallback(ctx, eventData, res, err)
return
} else {
sendSuccessCallback(ctx, res)
}
}()
}
// GetConsumeEventChan 消费的消息chan
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:41 2024/7/17
func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) {
2024-07-17 12:55:51 +08:00
return m.messageChannel, nil
2024-07-17 12:47:09 +08:00
}
2024-07-17 15:44:26 +08:00
// ConsumeEvent 消费事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:14 2024/7/17
2024-07-17 12:47:09 +08:00
func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error {
2024-07-17 15:44:26 +08:00
messageChan, _ := m.GetConsumeEventChan()
if nil == successCallback {
successCallback = define.DefaultSuccessCallbackHandler
}
if nil == failureCallback {
failureCallback = define.DefaultFailCallbackHandler
}
go func() {
defer func() {
if r := recover(); nil != r {
fmt.Println("出现异常", r)
}
}()
for eventData := range messageChan {
var (
err error
handleResult map[string]any
)
if handleResult, err = handler(eventData); nil != err {
failureCallback(eventData, handleResult, err)
} else {
successCallback(eventData, handleResult)
}
}
}()
return nil
2024-07-17 12:47:09 +08:00
}
2024-07-17 15:44:26 +08:00
// Destroy 实例销毁
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:17 2024/7/17
2024-07-17 12:47:09 +08:00
func (m *MemoryEvent) Destroy() {
2024-07-17 15:44:26 +08:00
return
2024-07-17 12:47:09 +08:00
}
func (m *MemoryEvent) DriverType() string {
return consts.EventDriverMemory
}
// Init 初始化
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 12:08 2024/7/17
func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) {
if nil == cfg {
cfg = &define.MemoryEventConfig{}
}
if cfg.MessageBufferSize <= 0 {
cfg.MessageBufferSize = define.DefaultMemoryChannelSize
}
if cfg.CloseMaxWaitTime <= 0 {
cfg.CloseMaxWaitTime = define.DefaultMemoryCloseMaxWaitTime
}
m.cfg = cfg
// 初始化内存 channel
2024-07-17 12:55:51 +08:00
m.messageChannel = make(chan *define.EventData, cfg.MessageBufferSize)
2024-07-17 12:47:09 +08:00
}