// Package kafka... // // Description : 生产者 // // Author : go_developer@163.com<白茶清欢> // // Date : 2021-09-21 3:46 下午 package kafka import ( "encoding/json" "errors" "time" "github.com/Shopify/sarama" ) // InitProducer 初始化生产者 // // Author : go_developer@163.com<白茶清欢> // // Date : 3:52 下午 2021/9/21 func InitProducer(hostList []string, topic string, cfg *sarama.Config) (*Producer, error) { if cfg == nil { cfg = sarama.NewConfig() // 默认同步模式,等待消息发送成功 cfg.Producer.Return.Successes = true // 按照指定的key进行分区选择 cfg.Producer.Partitioner = sarama.NewHashPartitioner } p := &Producer{ cfg: cfg, hostList: hostList, topic: topic, } if err := p.init(); nil != err { return nil, err } return p, nil } // Producer 生产者 // // Author : go_developer@163.com<白茶清欢> // // Date : 3:57 下午 2021/9/21 type Producer struct { cfg *sarama.Config hostList []string topic string client sarama.Client syncProducer sarama.SyncProducer asyncProducer sarama.AsyncProducer } // init kafka初始化 // // Author : go_developer@163.com<白茶清欢> // // Date : 3:58 下午 2021/9/21 func (p *Producer) init() error { var ( err error ) // client 初始化 if p.client, err = sarama.NewClient(p.hostList, p.cfg); nil != err { return errors.New("init kafka client fail : " + err.Error()) } // 同步生产者初始化 if p.syncProducer, err = sarama.NewSyncProducerFromClient(p.client); nil != err { return errors.New("sync producer init fail : " + err.Error()) } // 异步生产者初始化 if p.asyncProducer, err = sarama.NewAsyncProducerFromClient(p.client); nil != err { return errors.New("async producer init fail : " + err.Error()) } return nil } // Destroy 断开生产者连接时,触发的逻辑 // // Author : go_developer@163.com<白茶清欢> // // Date : 3:58 下午 2021/9/21 func (p *Producer) Destroy() { // 断开客户端链接 _ = p.client.Close() } // Sync 同步生产数据 // // Author : go_developer@163.com<白茶清欢> // // Date : 4:08 下午 2021/9/21 func (p *Producer) Sync(data ProducerData) ProducerResult { mes := p.buildMessage(data) result := ProducerResult{} result.Partition, result.Offset, result.Err = p.syncProducer.SendMessage(mes) return result } // Async 异步生产数据 // // Author : go_developer@163.com<白茶清欢> // // Date : 4:09 下午 2021/9/21 func (p *Producer) Async(data ProducerData) ProducerResult { result := ProducerResult{} p.asyncProducer.Input() <- p.buildMessage(data) // 如果打开了Return.Successes配置,则等同于同步方式 select { case msg := <-p.asyncProducer.Successes(): result.Partition = msg.Partition result.Offset = msg.Offset case err := <-p.asyncProducer.Errors(): result.Err = err default: // 为什么要有default ? // 则这段代码会挂住,因为设置没有要求返回成功config.Producer.Return.Successes = false, // 那么在select等待的时候producer.Successes()不会返回,producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。 // 当然可以加一个default分支绕过去,就不会挂住了: } return result } // buildMessage 构建消息 // // Author : go_developer@163.com<白茶清欢> // // Date : 7:32 下午 2021/9/21 func (p *Producer) buildMessage(data ProducerData) *sarama.ProducerMessage { mes := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(data.Key), Timestamp: time.Now(), } switch v := data.Data.(type) { case string: mes.Value = sarama.StringEncoder(v) case []byte: mes.Value = sarama.ByteEncoder(v) default: byteData, _ := json.Marshal(data.Data) mes.Value = sarama.ByteEncoder(byteData) } return mes }