From f7c435709753fc59537494bca3f51275ba3adf23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 17 Jul 2024 15:44:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E4=BA=8E=E5=86=85=E5=AD=98=E7=9A=84?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- memory.go | 43 +++++++++++++++++++++++++++++++++++++++---- memory_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 4 deletions(-) create mode 100644 memory_test.go diff --git a/memory.go b/memory.go index 8491a25..5cf03e7 100644 --- a/memory.go +++ b/memory.go @@ -10,6 +10,7 @@ package event import ( "context" "errors" + "fmt" "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" @@ -110,14 +111,48 @@ func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) { return m.messageChannel, nil } +// ConsumeEvent 消费事件 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 14:14 2024/7/17 func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { - //TODO implement me - panic("implement me") + messageChan, _ := m.GetConsumeEventChan() + if nil == successCallback { + successCallback = define.DefaultSuccessCallbackHandler + } + if nil == failureCallback { + failureCallback = define.DefaultFailCallbackHandler + } + go func() { + defer func() { + if r := recover(); nil != r { + fmt.Println("出现异常", r) + } + }() + for eventData := range messageChan { + var ( + err error + handleResult map[string]any + ) + if handleResult, err = handler(eventData); nil != err { + failureCallback(eventData, handleResult, err) + } else { + successCallback(eventData, handleResult) + } + } + }() + + return nil } +// Destroy 实例销毁 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 14:17 2024/7/17 func (m *MemoryEvent) Destroy() { - //TODO implement me - panic("implement me") + return } func (m *MemoryEvent) DriverType() string { diff --git a/memory_test.go b/memory_test.go new file mode 100644 index 0000000..6754657 --- /dev/null +++ b/memory_test.go @@ -0,0 +1,50 @@ +// Package event ... +// +// Description : event ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2024-07-17 14:17 +package event + +import ( + "context" + "encoding/json" + "fmt" + "git.zhangdeman.cn/zhangdeman/event/define" + "testing" + "time" +) + +func TestInitMemoryEvent(t *testing.T) { + InitMemoryEvent(&define.MemoryEventConfig{ + MessageBufferSize: 1024, + CloseMaxWaitTime: 5000, + }) + go func() { + for { + time.Sleep(time.Second) + MemoryEventClient.SendEventAsync(context.Background(), &define.EventData{ + EventType: "TEST", + TraceID: time.Now().Format("2006-01-02 15:04:05"), + Host: "", + Timestamp: time.Now().Unix(), + SystemTimestamp: 0, + Key: "", + Data: nil, + }, func(ctx context.Context, eventResult *define.SendResult) { + fmt.Println("消息发送成功") + }, func(ctx context.Context, eventData *define.EventData, eventResult *define.SendResult, err error) { + fmt.Println("消息发送失败") + }) + } + }() + MemoryEventClient.ConsumeEvent(func(eventData *define.EventData) (map[string]any, error) { + byteData, _ := json.Marshal(eventData) + fmt.Println(string(byteData)) + return map[string]any{}, nil + }, nil, nil) + for { + + } +}