gopkg/middleware/kafka/producer.go

155 lines
3.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package kafka...
//
// Description : 生产者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2021-09-21 3:46 下午
package kafka
import (
"encoding/json"
"errors"
"time"
"github.com/Shopify/sarama"
)
// InitProducer 初始化生产者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:52 下午 2021/9/21
func InitProducer(hostList []string, topic string, cfg *sarama.Config) (*Producer, error) {
if cfg == nil {
cfg = sarama.NewConfig()
// 默认同步模式,等待消息发送成功
cfg.Producer.Return.Successes = true
// 按照指定的key进行分区选择
cfg.Producer.Partitioner = sarama.NewHashPartitioner
}
p := &Producer{
cfg: cfg,
hostList: hostList,
topic: topic,
}
if err := p.init(); nil != err {
return nil, err
}
return p, nil
}
// Producer 生产者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:57 下午 2021/9/21
type Producer struct {
cfg *sarama.Config
hostList []string
topic string
client sarama.Client
syncProducer sarama.SyncProducer
asyncProducer sarama.AsyncProducer
}
// init kafka初始化
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:58 下午 2021/9/21
func (p *Producer) init() error {
var (
err error
)
// client 初始化
if p.client, err = sarama.NewClient(p.hostList, p.cfg); nil != err {
return errors.New("init kafka client fail : " + err.Error())
}
// 同步生产者初始化
if p.syncProducer, err = sarama.NewSyncProducerFromClient(p.client); nil != err {
return errors.New("sync producer init fail : " + err.Error())
}
// 异步生产者初始化
if p.asyncProducer, err = sarama.NewAsyncProducerFromClient(p.client); nil != err {
return errors.New("async producer init fail : " + err.Error())
}
return nil
}
// Destroy 断开生产者连接时,触发的逻辑
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 3:58 下午 2021/9/21
func (p *Producer) Destroy() {
// 断开客户端链接
_ = p.client.Close()
}
// Sync 同步生产数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:08 下午 2021/9/21
func (p *Producer) Sync(data ProducerData) ProducerResult {
mes := p.buildMessage(data)
result := ProducerResult{}
result.Partition, result.Offset, result.Err = p.syncProducer.SendMessage(mes)
return result
}
// Async 异步生产数据
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 4:09 下午 2021/9/21
func (p *Producer) Async(data ProducerData) ProducerResult {
result := ProducerResult{}
p.asyncProducer.Input() <- p.buildMessage(data)
// 如果打开了Return.Successes配置则等同于同步方式
select {
case msg := <-p.asyncProducer.Successes():
result.Partition = msg.Partition
result.Offset = msg.Offset
case err := <-p.asyncProducer.Errors():
result.Err = err
default:
// 为什么要有default ?
// 则这段代码会挂住因为设置没有要求返回成功config.Producer.Return.Successes = false
// 那么在select等待的时候producer.Successes()不会返回producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。
// 当然可以加一个default分支绕过去就不会挂住了
}
return result
}
// buildMessage 构建消息
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 7:32 下午 2021/9/21
func (p *Producer) buildMessage(data ProducerData) *sarama.ProducerMessage {
mes := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(data.Key),
Timestamp: time.Now(),
}
switch v := data.Data.(type) {
case string:
mes.Value = sarama.StringEncoder(v)
case []byte:
mes.Value = sarama.ByteEncoder(v)
default:
byteData, _ := json.Marshal(data.Data)
mes.Value = sarama.ByteEncoder(byteData)
}
return mes
}