diff --git a/define/memory.go b/define/memory.go index 106a12b..a5a5d04 100644 --- a/define/memory.go +++ b/define/memory.go @@ -8,7 +8,6 @@ package define const ( - DefaultMemoryPartitionNum = 1 // 默认的分区数量 DefaultMemoryChannelSize = 1024 // 默认的消息channel大小 DefaultMemoryCloseMaxWaitTime = 5000 // 默认最大等待 : 5s ) @@ -19,7 +18,6 @@ const ( // // Date : 11:26 2024/7/17 type MemoryEventConfig struct { - PartitionNum int `json:"partition_num"` // 多少个分区, 默认值 : 1 MessageBufferSize int `json:"message_buffer_size"` // 消息缓冲区大小 CloseMaxWaitTime int `json:"close_max_wait_time"` // 关闭消息实例, 最大等待时长, 单位 : ms } diff --git a/memory.go b/memory.go index 4d210d9..8491a25 100644 --- a/memory.go +++ b/memory.go @@ -13,7 +13,6 @@ import ( "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/event/abstract" "git.zhangdeman.cn/zhangdeman/event/define" - "git.zhangdeman.cn/zhangdeman/wrapper" ) var ( @@ -44,7 +43,7 @@ func InitMemoryEvent(cfg *define.MemoryEventConfig) { type MemoryEvent struct { *base cfg *define.MemoryEventConfig - messageChannel map[int]chan *define.EventData + messageChannel chan *define.EventData } // SendEvent 发送事件 @@ -56,11 +55,10 @@ func (m *MemoryEvent) SendEvent(ctx context.Context, eventData *define.EventData if nil == eventData { return nil, errors.New("event data is nil") } - partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(m.cfg.PartitionNum)) - m.messageChannel[partition] <- eventData + m.messageChannel <- eventData return &define.SendResult{ Data: eventData, - Partition: partition, + Partition: 0, Topic: "", IsSuccess: true, FailReason: "success", @@ -109,8 +107,7 @@ func (m *MemoryEvent) SendEventAsync(ctx context.Context, eventData *define.Even // // Date : 12:41 2024/7/17 func (m *MemoryEvent) GetConsumeEventChan() (<-chan *define.EventData, error) { - //TODO implement me - panic("implement me") + return m.messageChannel, nil } func (m *MemoryEvent) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error { @@ -136,9 +133,6 @@ func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) { if nil == cfg { cfg = &define.MemoryEventConfig{} } - if cfg.PartitionNum <= 0 { - cfg.PartitionNum = define.DefaultMemoryPartitionNum - } if cfg.MessageBufferSize <= 0 { cfg.MessageBufferSize = define.DefaultMemoryChannelSize } @@ -147,8 +141,5 @@ func (m *MemoryEvent) Init(cfg *define.MemoryEventConfig) { } m.cfg = cfg // 初始化内存 channel - m.messageChannel = make(map[int]chan *define.EventData) - for num := 0; num < cfg.PartitionNum; num++ { - m.messageChannel[num] = make(chan *define.EventData, cfg.MessageBufferSize) - } + m.messageChannel = make(chan *define.EventData, cfg.MessageBufferSize) }