diff --git a/queue/message/message_queue_with_kafka.go b/queue/message/message_queue_with_kafka.go index f77deb7..ed4e966 100644 --- a/queue/message/message_queue_with_kafka.go +++ b/queue/message/message_queue_with_kafka.go @@ -68,11 +68,3 @@ func (q *QueueWithKafka) Publish(data interface{}) error { func (q *QueueWithKafka) Subscribe() { q.consumer.Consume(q.dataHandler) } - -func (q *QueueWithKafka) Success(data []byte) { - return -} - -func (q *QueueWithKafka) Fail(data []byte, err error) { - return -} diff --git a/queue/message/message_queue_with_message.go b/queue/message/message_queue_with_message.go new file mode 100644 index 0000000..e64bd61 --- /dev/null +++ b/queue/message/message_queue_with_message.go @@ -0,0 +1,64 @@ +// Package message ... +// +// Description : 基于Redis实现的消息队列 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2021-09-24 8:56 下午 +package message + +import ( + "context" + + "git.zhangdeman.cn/zhangdeman/gopkg/middleware/redis" +) + +// NewQueueWithRedis ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:58 下午 2021/9/24 +func NewQueueWithRedis(redisClient redis.RealClient, topic string, dataHandler IRedisMessageHandler) IMessageQueue { + return &QueueWithRedis{ + topic: topic, + handler: dataHandler, + client: redisClient, + } +} + +// QueueWithRedis ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 8:58 下午 2021/9/24 +type QueueWithRedis struct { + topic string + handler IRedisMessageHandler + client redis.RealClient +} + +// Publish ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 9:00 下午 2021/9/24 +func (q *QueueWithRedis) Publish(data interface{}) error { + return q.client.Instance.Publish(context.Background(), q.topic, data).Err() +} + +// Subscribe ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 9:17 下午 2021/9/24 +func (q *QueueWithRedis) Subscribe() { + mesChannel := q.client.Instance.Subscribe(context.Background(), q.topic).Channel() + for mes := range mesChannel { + err := q.handler.Execute([]byte(mes.Payload)) + if nil != err { + q.handler.Fail([]byte(mes.Payload), err) + } else { + q.handler.Success([]byte(mes.Payload)) + } + } +}