feat: 规划消费者

This commit is contained in:
2025-12-31 22:17:40 +08:00
parent 8f2edaa754
commit 2118c7ec5b
3 changed files with 33 additions and 11 deletions

View File

@@ -58,9 +58,9 @@ func (c *Consumer) init() error {
// Consume 消费消息 // Consume 消费消息
func (c *Consumer) Consume(dataHandler IConsumeDataHandler) { func (c *Consumer) Consume(dataHandler IConsumeDataHandler) {
ctx := context.Background() ctx := context.Background()
handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler}
for { for {
topics := []string{c.consumerGroup} topics := []string{c.consumerGroup}
handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler}
if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err { if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err {
handler.handler.Exception(err) handler.handler.Exception(err)
} }

30
consumer_manager.go Normal file
View File

@@ -0,0 +1,30 @@
// Package kafka ...
//
// Description : kafka ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-12-31 21:54
package kafka
import "sync"
var (
ConsumerManager = &consumerManager{
l: &sync.RWMutex{},
instanceTable: map[string]IConsumeDataHandler{},
}
)
type consumerManager struct {
l *sync.RWMutex
instanceTable map[string]IConsumeDataHandler
}
func (cm *consumerManager) AddConsumer() error {
return nil
}
func (cm *consumerManager) Instance(flag string) IConsumeDataHandler {
return nil
}

View File

@@ -8,20 +8,12 @@
package kafka package kafka
// ProducerData 生产数据 // ProducerData 生产数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:13 下午 2021/9/21
type ProducerData struct { type ProducerData struct {
Data interface{} // 发送的数据 Data any // 发送的数据
Key string // 分区key Key string // 分区key
} }
// ProducerResult 数据发送后的结果 // ProducerResult 数据发送后的结果
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:15 下午 2021/9/21
type ProducerResult struct { type ProducerResult struct {
Partition int32 // 数据落入的分区 Partition int32 // 数据落入的分区
Offset int64 // 数据对应的Offset Offset int64 // 数据对应的Offset