增加kafka生产者
This commit is contained in:
29
middleware/kafka/config.go
Normal file
29
middleware/kafka/config.go
Normal file
@ -0,0 +1,29 @@
|
||||
// Package kafka...
|
||||
//
|
||||
// Description : 操作kafka的配置
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 2021-09-21 3:48 下午
|
||||
package kafka
|
||||
|
||||
// ProducerData 生产数据
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 4:13 下午 2021/9/21
|
||||
type ProducerData struct {
|
||||
Data interface{} // 发送的数据
|
||||
Key string // 分区key
|
||||
}
|
||||
|
||||
// ProducerResult 数据发送后的结果
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 4:15 下午 2021/9/21
|
||||
type ProducerResult struct {
|
||||
Partition int32 // 数据落入的分区
|
||||
Offset int64 // 数据对应的Offset
|
||||
Err error // 数据发送是否成功
|
||||
}
|
8
middleware/kafka/consumer.go
Normal file
8
middleware/kafka/consumer.go
Normal file
@ -0,0 +1,8 @@
|
||||
// Package kafka...
|
||||
//
|
||||
// Description : 消费者
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 2021-09-21 3:46 下午
|
||||
package kafka
|
154
middleware/kafka/producer.go
Normal file
154
middleware/kafka/producer.go
Normal file
@ -0,0 +1,154 @@
|
||||
// Package kafka...
|
||||
//
|
||||
// Description : 生产者
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 2021-09-21 3:46 下午
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
// InitProducer 初始化生产者
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 3:52 下午 2021/9/21
|
||||
func InitProducer(hostList []string, topic string, cfg *sarama.Config) (*Producer, error) {
|
||||
if cfg == nil {
|
||||
cfg = sarama.NewConfig()
|
||||
// 默认同步模式,等待消息发送成功
|
||||
cfg.Producer.Return.Successes = true
|
||||
// 按照指定的key进行分区选择
|
||||
cfg.Producer.Partitioner = sarama.NewHashPartitioner
|
||||
}
|
||||
|
||||
p := &Producer{
|
||||
cfg: cfg,
|
||||
hostList: hostList,
|
||||
topic: topic,
|
||||
}
|
||||
|
||||
if err := p.init(); nil != err {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Producer 生产者
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 3:57 下午 2021/9/21
|
||||
type Producer struct {
|
||||
cfg *sarama.Config
|
||||
hostList []string
|
||||
topic string
|
||||
client sarama.Client
|
||||
syncProducer sarama.SyncProducer
|
||||
asyncProducer sarama.AsyncProducer
|
||||
}
|
||||
|
||||
// init kafka初始化
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 3:58 下午 2021/9/21
|
||||
func (p *Producer) init() error {
|
||||
var (
|
||||
err error
|
||||
)
|
||||
|
||||
// client 初始化
|
||||
if p.client, err = sarama.NewClient(p.hostList, p.cfg); nil != err {
|
||||
return errors.New("init kafka client fail : " + err.Error())
|
||||
}
|
||||
|
||||
// 同步生产者初始化
|
||||
if p.syncProducer, err = sarama.NewSyncProducerFromClient(p.client); nil != err {
|
||||
return errors.New("sync producer init fail : " + err.Error())
|
||||
}
|
||||
|
||||
// 异步生产者初始化
|
||||
if p.asyncProducer, err = sarama.NewAsyncProducerFromClient(p.client); nil != err {
|
||||
return errors.New("async producer init fail : " + err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Destroy 断开生产者连接时,触发的逻辑
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 3:58 下午 2021/9/21
|
||||
func (p *Producer) Destroy() {
|
||||
// 断开客户端链接
|
||||
_ = p.client.Close()
|
||||
}
|
||||
|
||||
// Sync 同步生产数据
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 4:08 下午 2021/9/21
|
||||
func (p *Producer) Sync(data ProducerData) ProducerResult {
|
||||
mes := p.buildMessage(data)
|
||||
result := ProducerResult{}
|
||||
result.Partition, result.Offset, result.Err = p.syncProducer.SendMessage(mes)
|
||||
return result
|
||||
}
|
||||
|
||||
// Async 异步生产数据
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 4:09 下午 2021/9/21
|
||||
func (p *Producer) Async(data ProducerData) ProducerResult {
|
||||
result := ProducerResult{}
|
||||
p.asyncProducer.Input() <- p.buildMessage(data)
|
||||
// 如果打开了Return.Successes配置,则等同于同步方式
|
||||
select {
|
||||
case msg := <-p.asyncProducer.Successes():
|
||||
result.Partition = msg.Partition
|
||||
result.Offset = msg.Offset
|
||||
case err := <-p.asyncProducer.Errors():
|
||||
result.Err = err
|
||||
default:
|
||||
// 为什么要有default ?
|
||||
// 则这段代码会挂住,因为设置没有要求返回成功config.Producer.Return.Successes = false,
|
||||
// 那么在select等待的时候producer.Successes()不会返回,producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。
|
||||
// 当然可以加一个default分支绕过去,就不会挂住了:
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// buildMessage 构建消息
|
||||
//
|
||||
// Author : go_developer@163.com<白茶清欢>
|
||||
//
|
||||
// Date : 7:32 下午 2021/9/21
|
||||
func (p *Producer) buildMessage(data ProducerData) *sarama.ProducerMessage {
|
||||
mes := &sarama.ProducerMessage{
|
||||
Topic: p.topic,
|
||||
Key: sarama.StringEncoder(data.Key),
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
switch v := data.Data.(type) {
|
||||
case string:
|
||||
mes.Value = sarama.StringEncoder(v)
|
||||
case []byte:
|
||||
mes.Value = sarama.ByteEncoder(v)
|
||||
default:
|
||||
byteData, _ := json.Marshal(data.Data)
|
||||
mes.Value = sarama.ByteEncoder(byteData)
|
||||
}
|
||||
return mes
|
||||
}
|
Reference in New Issue
Block a user