// Package kafka... // // Description : 消费者 // // Author : go_developer@163.com<白茶清欢> // // Date : 2021-09-21 3:46 下午 package kafka import ( "context" "errors" "github.com/IBM/sarama" ) // InitConsumer 初始化消费者 // // Author : go_developer@163.com<白茶清欢> // // Date : 8:45 下午 2021/9/22 func InitConsumer(topic string, consumerGroup string, hostList []string, cfg *sarama.Config) (*Consumer, error) { if nil == cfg { cfg = sarama.NewConfig() cfg.Consumer.Return.Errors = false cfg.Consumer.Offsets.Initial = sarama.OffsetNewest // 默认从最新初开始消费消息 } c := &Consumer{ topic: topic, host: hostList, cfg: cfg, consumerGroup: consumerGroup, } err := c.init() return c, err } // Consumer ... // // Author : go_developer@163.com<白茶清欢> // // Date : 8:52 下午 2021/9/22 type Consumer struct { topic string host []string cfg *sarama.Config consumerGroup string client sarama.Client consumerGroupInstance sarama.ConsumerGroup } // init ... // // Author : go_developer@163.com<白茶清欢> // // Date : 8:55 下午 2021/9/22 func (c *Consumer) init() error { var ( err error ) if c.client, err = sarama.NewClient(c.host, c.cfg); nil != err { return errors.New("init kafka client fail : " + err.Error()) } if c.consumerGroupInstance, err = sarama.NewConsumerGroupFromClient(c.consumerGroup, c.client); nil != err { return errors.New("consumer group init fail : " + err.Error()) } return nil } // Consume 消费消息 // // Author : go_developer@163.com<白茶清欢> // // Date : 9:04 下午 2021/9/22 func (c *Consumer) Consume(dataHandler IConsumeDataHandler) { ctx := context.Background() for { topics := []string{c.consumerGroup} handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler} if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err { handler.handler.Exception(err) } } } type consumerGroupHandler struct { name string handler IConsumeDataHandler } func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { if err := h.handler.Execute(msg); nil != err { h.handler.Fail(msg, err) // 手动确认消息 sess.MarkMessage(msg, "") continue } h.handler.Success(msg) // 手动确认消息 sess.MarkMessage(msg, "") } return nil } // Destroy ... // // Author : go_developer@163.com<白茶清欢> // // Date : 8:59 下午 2021/9/22 func (c *Consumer) Destroy() { _ = c.consumerGroupInstance.Close() }