基于内存的消息队列

This commit is contained in:
白茶清欢 2024-07-17 15:44:26 +08:00
parent 5fc71de49a
commit f7c4357097
2 changed files with 89 additions and 4 deletions

View File

@ -10,6 +10,7 @@ package event
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/abstract"
"git.zhangdeman.cn/zhangdeman/event/define" "git.zhangdeman.cn/zhangdeman/event/define"
@ -110,14 +111,48 @@ func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) {
return m.messageChannel, nil return m.messageChannel, nil
} }
// ConsumeEvent 消费事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:14 2024/7/17
func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error {
//TODO implement me messageChan, _ := m.GetConsumeEventChan()
panic("implement me") if nil == successCallback {
successCallback = define.DefaultSuccessCallbackHandler
}
if nil == failureCallback {
failureCallback = define.DefaultFailCallbackHandler
}
go func() {
defer func() {
if r := recover(); nil != r {
fmt.Println("出现异常", r)
}
}()
for eventData := range messageChan {
var (
err error
handleResult map[string]any
)
if handleResult, err = handler(eventData); nil != err {
failureCallback(eventData, handleResult, err)
} else {
successCallback(eventData, handleResult)
}
}
}()
return nil
} }
// Destroy 实例销毁
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 14:17 2024/7/17
func (m *MemoryEvent) Destroy() { func (m *MemoryEvent) Destroy() {
//TODO implement me return
panic("implement me")
} }
func (m *MemoryEvent) DriverType() string { func (m *MemoryEvent) DriverType() string {

50
memory_test.go Normal file
View File

@ -0,0 +1,50 @@
// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-07-17 14:17
package event
import (
"context"
"encoding/json"
"fmt"
"git.zhangdeman.cn/zhangdeman/event/define"
"testing"
"time"
)
func TestInitMemoryEvent(t *testing.T) {
InitMemoryEvent(&define.MemoryEventConfig{
MessageBufferSize: 1024,
CloseMaxWaitTime: 5000,
})
go func() {
for {
time.Sleep(time.Second)
MemoryEventClient.SendEventAsync(context.Background(), &define.EventData{
EventType: "TEST",
TraceID: time.Now().Format("2006-01-02 15:04:05"),
Host: "",
Timestamp: time.Now().Unix(),
SystemTimestamp: 0,
Key: "",
Data: nil,
}, func(ctx context.Context, eventResult *define.SendResult) {
fmt.Println("消息发送成功")
}, func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error) {
fmt.Println("消息发送失败")
})
}
}()
MemoryEventClient.ConsumeEvent(func(eventData *define.EventData) (map[string]any, error) {
byteData, _ := json.Marshal(eventData)
fmt.Println(string(byteData))
return map[string]any{}, nil
}, nil, nil)
for {
}
}