From 2118c7ec5b4edf2abcb40db34c86c054b16e3e4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Wed, 31 Dec 2025 22:17:40 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=A7=84=E5=88=92=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumer.go | 2 +- consumer_manager.go | 30 ++++++++++++++++++++++++++++++ define.go | 12 ++---------- 3 files changed, 33 insertions(+), 11 deletions(-) create mode 100644 consumer_manager.go diff --git a/consumer.go b/consumer.go index c2c1c47..61ee624 100644 --- a/consumer.go +++ b/consumer.go @@ -58,9 +58,9 @@ func (c *Consumer) init() error { // Consume 消费消息 func (c *Consumer) Consume(dataHandler IConsumeDataHandler) { ctx := context.Background() + handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler} for { topics := []string{c.consumerGroup} - handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler} if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err { handler.handler.Exception(err) } diff --git a/consumer_manager.go b/consumer_manager.go new file mode 100644 index 0000000..ac2de4f --- /dev/null +++ b/consumer_manager.go @@ -0,0 +1,30 @@ +// Package kafka ... +// +// Description : kafka ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-12-31 21:54 +package kafka + +import "sync" + +var ( + ConsumerManager = &consumerManager{ + l: &sync.RWMutex{}, + instanceTable: map[string]IConsumeDataHandler{}, + } +) + +type consumerManager struct { + l *sync.RWMutex + instanceTable map[string]IConsumeDataHandler +} + +func (cm *consumerManager) AddConsumer() error { + return nil +} + +func (cm *consumerManager) Instance(flag string) IConsumeDataHandler { + return nil +} diff --git a/define.go b/define.go index 21d3863..6760842 100644 --- a/define.go +++ b/define.go @@ -8,20 +8,12 @@ package kafka // ProducerData 生产数据 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 4:13 下午 2021/9/21 type ProducerData struct { - Data interface{} // 发送的数据 - Key string // 分区key + Data any // 发送的数据 + Key string // 分区key } // ProducerResult 数据发送后的结果 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 4:15 下午 2021/9/21 type ProducerResult struct { Partition int32 // 数据落入的分区 Offset int64 // 数据对应的Offset