gopkg/middleware/kafka/consumer.go

116 lines
2.8 KiB
Go

// Package kafka...
//
// Description : 消费者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2021-09-21 3:46 下午
package kafka
import (
"context"
"errors"
"github.com/Shopify/sarama"
)
// InitConsumer 初始化消费者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:45 下午 2021/9/22
func InitConsumer(topic string, consumerGroup string, hostList []string, cfg *sarama.Config) (*Consumer, error) {
if nil == cfg {
cfg = sarama.NewConfig()
cfg.Consumer.Return.Errors = false
cfg.Consumer.Offsets.Initial = sarama.OffsetNewest // 默认从最新初开始消费消息
}
c := &Consumer{
topic: topic,
host: hostList,
cfg: cfg,
consumerGroup: consumerGroup,
}
err := c.init()
return c, err
}
// Consumer ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:52 下午 2021/9/22
type Consumer struct {
topic string
host []string
cfg *sarama.Config
consumerGroup string
client sarama.Client
consumerGroupInstance sarama.ConsumerGroup
}
// init ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:55 下午 2021/9/22
func (c *Consumer) init() error {
var (
err error
)
if c.client, err = sarama.NewClient(c.host, c.cfg); nil != err {
return errors.New("init kafka client fail : " + err.Error())
}
if c.consumerGroupInstance, err = sarama.NewConsumerGroupFromClient(c.consumerGroup, c.client); nil != err {
return errors.New("consumer group init fail : " + err.Error())
}
return nil
}
// Consume 消费消息
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 9:04 下午 2021/9/22
func (c *Consumer) Consume(dataHandler IConsumeDataHandler) {
ctx := context.Background()
for {
topics := []string{c.consumerGroup}
handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler}
if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err {
handler.handler.Exception(err)
}
}
}
type consumerGroupHandler struct {
name string
handler IConsumeDataHandler
}
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
if err := h.handler.Execute(msg); nil != err {
h.handler.Fail(msg, err)
// 手动确认消息
sess.MarkMessage(msg, "")
continue
}
h.handler.Success(msg)
// 手动确认消息
sess.MarkMessage(msg, "")
}
return nil
}
// Destroy ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:59 下午 2021/9/22
func (c *Consumer) Destroy() {
_ = c.consumerGroupInstance.Close()
}