// Package kafka...
//
// Description : 消费者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2021-09-21 3:46 下午
package kafka

import (
	"context"
	"errors"

	"github.com/Shopify/sarama"
)

// InitConsumer 初始化消费者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:45 下午 2021/9/22
func InitConsumer(topic string, consumerGroup string, hostList []string, cfg *sarama.Config) (*Consumer, error) {
	if nil == cfg {
		cfg = sarama.NewConfig()
		cfg.Consumer.Return.Errors = false
		cfg.Consumer.Offsets.Initial = sarama.OffsetNewest // 默认从最新初开始消费消息
	}
	c := &Consumer{
		topic:         topic,
		host:          hostList,
		cfg:           cfg,
		consumerGroup: consumerGroup,
	}
	err := c.init()
	return c, err
}

// Consumer ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:52 下午 2021/9/22
type Consumer struct {
	topic                 string
	host                  []string
	cfg                   *sarama.Config
	consumerGroup         string
	client                sarama.Client
	consumerGroupInstance sarama.ConsumerGroup
}

// init ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:55 下午 2021/9/22
func (c *Consumer) init() error {
	var (
		err error
	)
	if c.client, err = sarama.NewClient(c.host, c.cfg); nil != err {
		return errors.New("init kafka client fail : " + err.Error())
	}
	if c.consumerGroupInstance, err = sarama.NewConsumerGroupFromClient(c.consumerGroup, c.client); nil != err {
		return errors.New("consumer group init fail : " + err.Error())
	}
	return nil
}

// Consume 消费消息
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 9:04 下午 2021/9/22
func (c *Consumer) Consume(dataHandler IConsumeDataHandler) {
	ctx := context.Background()
	for {
		topics := []string{c.consumerGroup}
		handler := consumerGroupHandler{name: c.consumerGroup, handler: dataHandler}
		if err := c.consumerGroupInstance.Consume(ctx, topics, handler); nil != err {
			handler.handler.Exception(err)
		}
	}
}

type consumerGroupHandler struct {
	name    string
	handler IConsumeDataHandler
}

func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		if err := h.handler.Execute(msg); nil != err {
			h.handler.Fail(msg, err)
			// 手动确认消息
			sess.MarkMessage(msg, "")
			continue
		}
		h.handler.Success(msg)
		// 手动确认消息
		sess.MarkMessage(msg, "")
	}
	return nil
}

// Destroy ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:59 下午 2021/9/22
func (c *Consumer) Destroy() {
	_ = c.consumerGroupInstance.Close()
}