// Package message... // // Description : 基于kafka实现消息队列 // // Author : go_developer@163.com<白茶清欢> // // Date : 2021-09-23 11:09 下午 package message import ( "git.zhangdeman.cn/zhangdeman/gopkg/middleware/kafka" "github.com/pkg/errors" ) // NewMessageQueueWithKafka 获取基于kafka消息队列的实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:11 下午 2021/9/23 func NewMessageQueueWithKafka(producer kafka.Producer, consumer kafka.Consumer, sync bool, dataHandler kafka.IConsumeDataHandler) IMessageQueue { return &QueueWithKafka{ producer: producer, consumer: consumer, sync: sync, dataHandler: dataHandler, } } // QueueWithKafka 基于kafka的消息队列 // // Author : zhangdeman001@ke.com<张德满> // // Date : 11:10 下午 2021/9/23 type QueueWithKafka struct { producer kafka.Producer // 生产者 consumer kafka.Consumer // 消费者 sync bool // 同步发消息 or 异步 dataHandler kafka.IConsumeDataHandler // 订阅到的数据处理方法 } // Publish 发布消息 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:17 下午 2021/9/23 func (q *QueueWithKafka) Publish(data interface{}) error { var ( ok bool publishData kafka.ProducerData publishResult kafka.ProducerResult ) if publishData, ok = data.(kafka.ProducerData); !ok { return errors.New("data format is not kafka.ProducerData") } if q.sync { publishResult = q.producer.Sync(publishData) } else { publishResult = q.producer.Async(publishData) } return publishResult.Err } // Subscribe 订阅消息 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:17 下午 2021/9/23 func (q *QueueWithKafka) Subscribe() { q.consumer.Consume(q.dataHandler) } func (q *QueueWithKafka) Success(data []byte) { return } func (q *QueueWithKafka) Fail(data []byte, err error) { return }