// Package message ... // // Description : 基于Redis实现的消息队列 // // Author : go_developer@163.com<白茶清欢> // // Date : 2021-09-24 8:56 下午 package message import ( "context" "git.zhangdeman.cn/zhangdeman/gopkg/middleware/redis" ) // NewQueueWithRedis ... // // Author : go_developer@163.com<白茶清欢> // // Date : 8:58 下午 2021/9/24 func NewQueueWithRedis(redisClient redis.RealClient, topic string, dataHandler IRedisMessageHandler) IMessageQueue { return &QueueWithRedis{ topic: topic, handler: dataHandler, client: redisClient, } } // QueueWithRedis ... // // Author : go_developer@163.com<白茶清欢> // // Date : 8:58 下午 2021/9/24 type QueueWithRedis struct { topic string handler IRedisMessageHandler client redis.RealClient } // Publish ... // // Author : go_developer@163.com<白茶清欢> // // Date : 9:00 下午 2021/9/24 func (q *QueueWithRedis) Publish(data interface{}) error { return q.client.Instance.Publish(context.Background(), q.topic, data).Err() } // Subscribe ... // // Author : go_developer@163.com<白茶清欢> // // Date : 9:17 下午 2021/9/24 func (q *QueueWithRedis) Subscribe() { mesChannel := q.client.Instance.Subscribe(context.Background(), q.topic).Channel() for mes := range mesChannel { err := q.handler.Execute([]byte(mes.Payload)) if nil != err { q.handler.Fail([]byte(mes.Payload), err) } else { q.handler.Success([]byte(mes.Payload)) } } }