// Package kafka...
//
// Description : 生产者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2021-09-21 3:46 下午
package kafka

import (
	"encoding/json"
	"errors"
	"time"

	"github.com/Shopify/sarama"
)

// InitProducer 初始化生产者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:52 下午 2021/9/21
func InitProducer(hostList []string, topic string, cfg *sarama.Config) (*Producer, error) {
	if cfg == nil {
		cfg = sarama.NewConfig()
		// 默认同步模式,等待消息发送成功
		cfg.Producer.Return.Successes = true
		// 按照指定的key进行分区选择
		cfg.Producer.Partitioner = sarama.NewHashPartitioner
	}

	p := &Producer{
		cfg:      cfg,
		hostList: hostList,
		topic:    topic,
	}

	if err := p.init(); nil != err {
		return nil, err
	}

	return p, nil
}

// Producer 生产者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:57 下午 2021/9/21
type Producer struct {
	cfg           *sarama.Config
	hostList      []string
	topic         string
	client        sarama.Client
	syncProducer  sarama.SyncProducer
	asyncProducer sarama.AsyncProducer
}

// init kafka初始化
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:58 下午 2021/9/21
func (p *Producer) init() error {
	var (
		err error
	)

	// client 初始化
	if p.client, err = sarama.NewClient(p.hostList, p.cfg); nil != err {
		return errors.New("init kafka client fail : " + err.Error())
	}

	// 同步生产者初始化
	if p.syncProducer, err = sarama.NewSyncProducerFromClient(p.client); nil != err {
		return errors.New("sync producer init fail : " + err.Error())
	}

	// 异步生产者初始化
	if p.asyncProducer, err = sarama.NewAsyncProducerFromClient(p.client); nil != err {
		return errors.New("async producer init fail : " + err.Error())
	}

	return nil
}

// Destroy 断开生产者连接时,触发的逻辑
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:58 下午 2021/9/21
func (p *Producer) Destroy() {
	// 断开客户端链接
	_ = p.client.Close()
}

// Sync 同步生产数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:08 下午 2021/9/21
func (p *Producer) Sync(data ProducerData) ProducerResult {
	mes := p.buildMessage(data)
	result := ProducerResult{}
	result.Partition, result.Offset, result.Err = p.syncProducer.SendMessage(mes)
	return result
}

// Async 异步生产数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:09 下午 2021/9/21
func (p *Producer) Async(data ProducerData) ProducerResult {
	result := ProducerResult{}
	p.asyncProducer.Input() <- p.buildMessage(data)
	// 如果打开了Return.Successes配置,则等同于同步方式
	select {
	case msg := <-p.asyncProducer.Successes():
		result.Partition = msg.Partition
		result.Offset = msg.Offset
	case err := <-p.asyncProducer.Errors():
		result.Err = err
	default:
		// 为什么要有default ?
		// 则这段代码会挂住,因为设置没有要求返回成功config.Producer.Return.Successes = false,
		// 那么在select等待的时候producer.Successes()不会返回,producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。
		// 当然可以加一个default分支绕过去,就不会挂住了:
	}
	return result
}

// buildMessage 构建消息
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 7:32 下午 2021/9/21
func (p *Producer) buildMessage(data ProducerData) *sarama.ProducerMessage {
	mes := &sarama.ProducerMessage{
		Topic:     p.topic,
		Key:       sarama.StringEncoder(data.Key),
		Timestamp: time.Now(),
	}
	switch v := data.Data.(type) {
	case string:
		mes.Value = sarama.StringEncoder(v)
	case []byte:
		mes.Value = sarama.ByteEncoder(v)
	default:
		byteData, _ := json.Marshal(data.Data)
		mes.Value = sarama.ByteEncoder(byteData)
	}
	return mes
}