Files
kafka/producer.go
2025-12-31 21:47:14 +08:00

121 lines
2.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package kafka...
//
// Description : 生产者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2021-09-21 3:46 下午
package kafka
import (
"encoding/json"
"errors"
"time"
"github.com/IBM/sarama"
)
// InitProducer 初始化生产者
func InitProducer(hostList []string, topic string, cfg *sarama.Config) (*Producer, error) {
if cfg == nil {
cfg = sarama.NewConfig()
// 默认同步模式,等待消息发送成功
cfg.Producer.Return.Successes = true
// 按照指定的key进行分区选择
cfg.Producer.Partitioner = sarama.NewHashPartitioner
}
p := &Producer{
cfg: cfg,
hostList: hostList,
topic: topic,
}
if err := p.init(); nil != err {
return nil, err
}
return p, nil
}
// Producer 生产者
type Producer struct {
cfg *sarama.Config
hostList []string
topic string
client sarama.Client
syncProducer sarama.SyncProducer
asyncProducer sarama.AsyncProducer
}
// init kafka初始化
func (p *Producer) init() error {
var (
err error
)
// client 初始化
if p.client, err = sarama.NewClient(p.hostList, p.cfg); nil != err {
return errors.New("init kafka client fail : " + err.Error())
}
// 同步生产者初始化
if p.syncProducer, err = sarama.NewSyncProducerFromClient(p.client); nil != err {
return errors.New("sync producer init fail : " + err.Error())
}
// 异步生产者初始化
if p.asyncProducer, err = sarama.NewAsyncProducerFromClient(p.client); nil != err {
return errors.New("async producer init fail : " + err.Error())
}
return nil
}
// Destroy 断开生产者连接时,触发的逻辑
func (p *Producer) Destroy() {
// 断开客户端链接
_ = p.client.Close()
}
// Sync 同步生产数据
func (p *Producer) Sync(data ProducerData) ProducerResult {
mes := p.buildMessage(data)
result := ProducerResult{}
result.Partition, result.Offset, result.Err = p.syncProducer.SendMessage(mes)
return result
}
// Async 异步生产数据
func (p *Producer) Async(data ProducerData) ProducerResult {
result := ProducerResult{}
p.asyncProducer.Input() <- p.buildMessage(data)
// 如果打开了Return.Successes配置则等同于同步方式
t := time.NewTicker(time.Second)
select {
case msg := <-p.asyncProducer.Successes():
result.Partition = msg.Partition
result.Offset = msg.Offset
case err := <-p.asyncProducer.Errors():
result.Err = err
case <-t.C:
result.Err = errors.New("produce data wait result timeout")
}
return result
}
// buildMessage 构建消息
func (p *Producer) buildMessage(data ProducerData) *sarama.ProducerMessage {
mes := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(data.Key),
Timestamp: time.Now(),
}
if nil == data.Data {
data.Data = make(map[interface{}]interface{})
}
byteData, _ := json.Marshal(data.Data)
mes.Value = sarama.ByteEncoder(byteData)
return mes
}