116 lines
2.8 KiB
Go
116 lines
2.8 KiB
Go
// Package kafka...
|
|
//
|
|
// Description : 消费者
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 2021-09-21 3:46 下午
|
|
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/IBM/sarama"
|
|
)
|
|
|
|
// InitConsumer 初始化消费者
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 8:45 下午 2021/9/22
|
|
func InitConsumer(topic string, consumerGroup string, hostList []string, cfg *sarama.Config) (*Consumer, error) {
|
|
if nil == cfg {
|
|
cfg = sarama.NewConfig()
|
|
cfg.Consumer.Return.Errors = false
|
|
cfg.Consumer.Offsets.Initial = sarama.OffsetNewest // 默认从最新初开始消费消息
|
|
}
|
|
c := &Consumer{
|
|
topic: topic,
|
|
host: hostList,
|
|
cfg: cfg,
|
|
consumerGroup: consumerGroup,
|
|
}
|
|
err := c.init()
|
|
return c, err
|
|
}
|
|
|
|
// Consumer ...
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 8:52 下午 2021/9/22
|
|
type Consumer struct {
|
|
topic string
|
|
host []string
|
|
cfg *sarama.Config
|
|
consumerGroup string
|
|
client sarama.Client
|
|
consumerGroupInstance sarama.ConsumerGroup
|
|
}
|
|
|
|
// init ...
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 8:55 下午 2021/9/22
|
|
func (c *Consumer) init() error {
|
|
var (
|
|
err error
|
|
)
|
|
if c.client, err = sarama.NewClient(c.host, c.cfg); nil != err {
|
|
return errors.New("init kafka client fail : " + err.Error())
|
|
}
|
|
if c.consumerGroupInstance, err = sarama.NewConsumerGroupFromClient(c.consumerGroup, c.client); nil != err {
|
|
return errors.New("consumer group init fail : " + err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Consume 消费消息
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 9:04 下午 2021/9/22
|
|
func (c *Consumer) Consume(dataHandler IConsumeDataHandler) {
|
|
ctx := context.Background()
|
|
for {
|
|
topics := []string{c.consumerGroup}
|
|
handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler}
|
|
if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err {
|
|
handler.handler.Exception(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
type consumerGroupHandler struct {
|
|
name string
|
|
handler IConsumeDataHandler
|
|
}
|
|
|
|
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
for msg := range claim.Messages() {
|
|
if err := h.handler.Execute(msg); nil != err {
|
|
h.handler.Fail(msg, err)
|
|
// 手动确认消息
|
|
sess.MarkMessage(msg, "")
|
|
continue
|
|
}
|
|
h.handler.Success(msg)
|
|
// 手动确认消息
|
|
sess.MarkMessage(msg, "")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Destroy ...
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 8:59 下午 2021/9/22
|
|
func (c *Consumer) Destroy() {
|
|
_ = c.consumerGroupInstance.Close()
|
|
}
|