diff --git a/middleware/kafka/abstract.go b/middleware/kafka/abstract.go new file mode 100644 index 0000000..6b962ea --- /dev/null +++ b/middleware/kafka/abstract.go @@ -0,0 +1,26 @@ +// Package kafka... +// +// Description : kafka... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2021-09-22 9:07 下午 +package kafka + +import "github.com/Shopify/sarama" + +// IConsumeDataHandler ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 9:07 下午 2021/9/22 +type IConsumeDataHandler interface { + // Execute 处理数据 + Execute(data *sarama.ConsumerMessage) error + // Success 成功回调 + Success(data *sarama.ConsumerMessage) + // Fail 处理失败回调 + Fail(data *sarama.ConsumerMessage, err error) + // Exception 消费出现异常回调 + Exception(err error) +} diff --git a/middleware/kafka/consumer.go b/middleware/kafka/consumer.go index acf16e3..8ebc031 100644 --- a/middleware/kafka/consumer.go +++ b/middleware/kafka/consumer.go @@ -6,3 +6,110 @@ // // Date : 2021-09-21 3:46 下午 package kafka + +import ( + "context" + "errors" + + "github.com/Shopify/sarama" +) + +// InitConsumer 初始化消费者 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:45 下午 2021/9/22 +func InitConsumer(topic string, consumerGroup string, hostList []string, cfg *sarama.Config) (*Consumer, error) { + if nil == cfg { + cfg = sarama.NewConfig() + cfg.Consumer.Return.Errors = false + cfg.Consumer.Offsets.Initial = sarama.OffsetNewest // 默认从最新初开始消费消息 + } + c := &Consumer{ + topic: topic, + host: hostList, + cfg: cfg, + consumerGroup: consumerGroup, + } + err := c.init() + return c, err +} + +// Consumer ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:52 下午 2021/9/22 +type Consumer struct { + topic string + host []string + cfg *sarama.Config + consumerGroup string + client sarama.Client + consumerGroupInstance sarama.ConsumerGroup +} + +// init ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:55 下午 2021/9/22 +func (c *Consumer) init() error { + var ( + err error + ) + if c.client, err = sarama.NewClient(c.host, c.cfg); nil != err { + return errors.New("init kafka client fail : " + err.Error()) + } + if c.consumerGroupInstance, err = sarama.NewConsumerGroupFromClient(c.consumerGroup, c.client); nil != err { + return errors.New("consumer group init fail : " + err.Error()) + } + return nil +} + +// Consume 消费消息 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 9:04 下午 2021/9/22 +func (c *Consumer) Consume(dataHandler IConsumeDataHandler) { + ctx := context.Background() + for { + topics := []string{c.consumerGroup} + handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler} + if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err { + handler.handler.Exception(err) + } + } +} + +type consumerGroupHandler struct { + name string + handler IConsumeDataHandler +} + +func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + if err := h.handler.Execute(msg); nil != err { + h.handler.Fail(msg, err) + // 手动确认消息 + sess.MarkMessage(msg, "") + continue + } + h.handler.Success(msg) + // 手动确认消息 + sess.MarkMessage(msg, "") + } + return nil +} + +// Destroy ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:59 下午 2021/9/22 +func (c *Consumer) Destroy() { + _ = c.consumerGroupInstance.Close() +}