diff --git a/consumer.go b/consumer.go index c2c1c47..61ee624 100644 --- a/consumer.go +++ b/consumer.go @@ -58,9 +58,9 @@ func (c *Consumer) init() error { // Consume 消费消息 func (c *Consumer) Consume(dataHandler IConsumeDataHandler) { ctx := context.Background() + handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler} for { topics := []string{c.consumerGroup} - handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler} if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err { handler.handler.Exception(err) } diff --git a/consumer_manager.go b/consumer_manager.go new file mode 100644 index 0000000..ac2de4f --- /dev/null +++ b/consumer_manager.go @@ -0,0 +1,30 @@ +// Package kafka ... +// +// Description : kafka ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2025-12-31 21:54 +package kafka + +import "sync" + +var ( + ConsumerManager = &consumerManager{ + l: &sync.RWMutex{}, + instanceTable: map[string]IConsumeDataHandler{}, + } +) + +type consumerManager struct { + l *sync.RWMutex + instanceTable map[string]IConsumeDataHandler +} + +func (cm *consumerManager) AddConsumer() error { + return nil +} + +func (cm *consumerManager) Instance(flag string) IConsumeDataHandler { + return nil +} diff --git a/define.go b/define.go index 21d3863..6760842 100644 --- a/define.go +++ b/define.go @@ -8,20 +8,12 @@ package kafka // ProducerData 生产数据 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 4:13 下午 2021/9/21 type ProducerData struct { - Data interface{} // 发送的数据 - Key string // 分区key + Data any // 发送的数据 + Key string // 分区key } // ProducerResult 数据发送后的结果 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 4:15 下午 2021/9/21 type ProducerResult struct { Partition int32 // 数据落入的分区 Offset int64 // 数据对应的Offset