From 3aa883d4bedbc739631b3fa691696f53a6d318b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 24 Sep 2021 21:25:08 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8Ekafka?= =?UTF-8?q?=E7=9A=84redis=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- queue/message/message_queue_with_kafka.go | 8 --- queue/message/message_queue_with_message.go | 64 +++++++++++++++++++++ 2 files changed, 64 insertions(+), 8 deletions(-) create mode 100644 queue/message/message_queue_with_message.go diff --git a/queue/message/message_queue_with_kafka.go b/queue/message/message_queue_with_kafka.go index f77deb7..ed4e966 100644 --- a/queue/message/message_queue_with_kafka.go +++ b/queue/message/message_queue_with_kafka.go @@ -68,11 +68,3 @@ func (q *QueueWithKafka) Publish(data interface{}) error { func (q *QueueWithKafka) Subscribe() { q.consumer.Consume(q.dataHandler) } - -func (q *QueueWithKafka) Success(data []byte) { - return -} - -func (q *QueueWithKafka) Fail(data []byte, err error) { - return -} diff --git a/queue/message/message_queue_with_message.go b/queue/message/message_queue_with_message.go new file mode 100644 index 0000000..e64bd61 --- /dev/null +++ b/queue/message/message_queue_with_message.go @@ -0,0 +1,64 @@ +// Package message ... +// +// Description : 基于Redis实现的消息队列 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2021-09-24 8:56 下午 +package message + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/gopkg/middleware/redis" +) + +// NewQueueWithRedis ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:58 下午 2021/9/24 +func NewQueueWithRedis(redisClient redis.RealClient, topic string, dataHandler IRedisMessageHandler) IMessageQueue { + return &QueueWithRedis{ + topic: topic, + handler: dataHandler, + client: redisClient, + } +} + +// QueueWithRedis ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:58 下午 2021/9/24 +type QueueWithRedis struct { + topic string + handler IRedisMessageHandler + client redis.RealClient +} + +// Publish ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 9:00 下午 2021/9/24 +func (q *QueueWithRedis) Publish(data interface{}) error { + return q.client.Instance.Publish(context.Background(), q.topic, data).Err() +} + +// Subscribe ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 9:17 下午 2021/9/24 +func (q *QueueWithRedis) Subscribe() { + mesChannel := q.client.Instance.Subscribe(context.Background(), q.topic).Channel() + for mes := range mesChannel { + err := q.handler.Execute([]byte(mes.Payload)) + if nil != err { + q.handler.Fail([]byte(mes.Payload), err) + } else { + q.handler.Success([]byte(mes.Payload)) + } + } +}