From 5fc71de49a999d0238a6794320fe17aac3e8e698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 17 Jul 2024 12:55:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8E=B7=E5=8F=96=E6=B6=88?= =?UTF-8?q?=E6=81=AFchannel=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- define/memory.go | 2 -- memory.go | 19 +++++-------------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/define/memory.go b/define/memory.go index 106a12b..a5a5d04 100644 --- a/define/memory.go +++ b/define/memory.go @@ -8,7 +8,6 @@ package define const ( - DefaultMemoryPartitionNum = 1 // 默认的分区数量 DefaultMemoryChannelSize = 1024 // 默认的消息channel大小 DefaultMemoryCloseMaxWaitTime = 5000 // 默认最大等待 : 5s ) @@ -19,7 +18,6 @@ const ( // // Date : 11:26 2024/7/17 type MemoryEventConfig struct { - PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小 CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms } diff --git a/memory.go b/memory.go index 4d210d9..8491a25 100644 --- a/memory.go +++ b/memory.go @@ -13,7 +13,6 @@ import ( "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" - "git.zhangdeman.cn/zhangdeman/wrapper" ) var ( @@ -44,7 +43,7 @@ func InitMemoryEvent(cfg *define.MemoryEventConfig) { type MemoryEvent struct { *base cfg *define.MemoryEventConfig - messageChannel map[int]chan *define.EventData + messageChannel chan *define.EventData } // SendEvent 发送事件 @@ -56,11 +55,10 @@ func (m *MemoryEvent) SendEvent(ctx context.Context, eventData *define.EventData if nil == eventData { return nil, errors.New("event data is nil") } - partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(m.cfg.PartitionNum)) - m.messageChannel[partition] <- eventData + m.messageChannel <- eventData return &define.SendResult{ Data: eventData, - Partition: partition, + Partition: 0, Topic: "", IsSuccess: true, FailReason: "success", @@ -109,8 +107,7 @@ func (m *MemoryEvent) SendEventAsync(ctx context.Context, eventData *define.Even // // Date : 12:41 2024/7/17 func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) { - //TODO implement me - panic("implement me") + return m.messageChannel, nil } func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { @@ -136,9 +133,6 @@ func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) { if nil == cfg { cfg = &define.MemoryEventConfig{} } - if cfg.PartitionNum <= 0 { - cfg.PartitionNum = define.DefaultMemoryPartitionNum - } if cfg.MessageBufferSize <= 0 { cfg.MessageBufferSize = define.DefaultMemoryChannelSize } @@ -147,8 +141,5 @@ func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) { } m.cfg = cfg // 初始化内存 channel - m.messageChannel = make(map[int]chan *define.EventData) - for num := 0; num < cfg.PartitionNum; num++ { - m.messageChannel[num] = make(chan *define.EventData, cfg.MessageBufferSize) - } + m.messageChannel = make(chan *define.EventData, cfg.MessageBufferSize) }