From 339e78bd1207eb5d518c3fc5188326c7bdf2af1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 24 Sep 2021 11:00:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96kafka=E7=9B=B8=E5=85=B3handle?= =?UTF-8?q?r=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- queue/message/abstract.go | 5 +++++ queue/message/message_queue_with_kafka.go | 10 +++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/queue/message/abstract.go b/queue/message/abstract.go index a68dcaa..c7989b4 100644 --- a/queue/message/abstract.go +++ b/queue/message/abstract.go @@ -7,6 +7,8 @@ // Date : 2021-09-23 10:58 下午 package message +import "git.zhangdeman.cn/zhangdeman/gopkg/middleware/kafka" + // IMessageQueue 消息队列的接口约束 // // Author : go_developer@163.com<白茶清欢> @@ -22,3 +24,6 @@ type IMessageQueue interface { // Fail 失败回调 Fail(data []byte, err error) } + +// IKafkaMessageHandler 卡夫卡消息处理 +type IKafkaMessageHandler kafka.IConsumeDataHandler diff --git a/queue/message/message_queue_with_kafka.go b/queue/message/message_queue_with_kafka.go index eb9ac58..f77deb7 100644 --- a/queue/message/message_queue_with_kafka.go +++ b/queue/message/message_queue_with_kafka.go @@ -17,7 +17,7 @@ import ( // Author : go_developer@163.com<白茶清欢> // // Date : 11:11 下午 2021/9/23 -func NewMessageQueueWithKafka(producer kafka.Producer, consumer kafka.Consumer, sync bool, dataHandler kafka.IConsumeDataHandler) IMessageQueue { +func NewMessageQueueWithKafka(producer kafka.Producer, consumer kafka.Consumer, sync bool, dataHandler IKafkaMessageHandler) IMessageQueue { return &QueueWithKafka{ producer: producer, consumer: consumer, @@ -32,10 +32,10 @@ func NewMessageQueueWithKafka(producer kafka.Producer, consumer kafka.Consumer, // // Date : 11:10 下午 2021/9/23 type QueueWithKafka struct { - producer kafka.Producer // 生产者 - consumer kafka.Consumer // 消费者 - sync bool // 同步发消息 or 异步 - dataHandler kafka.IConsumeDataHandler // 订阅到的数据处理方法 + producer kafka.Producer // 生产者 + consumer kafka.Consumer // 消费者 + sync bool // 同步发消息 or 异步 + dataHandler IKafkaMessageHandler // 订阅到的数据处理方法 } // Publish 发布消息