增加基于kafka的redis队列

This commit is contained in:
白茶清欢 2021-09-24 21:25:08 +08:00
parent 04d19d9088
commit 3aa883d4be
2 changed files with 64 additions and 8 deletions

View File

@ -68,11 +68,3 @@ func (q *QueueWithKafka) Publish(data interface{}) error {
func (q *QueueWithKafka) Subscribe() { func (q *QueueWithKafka) Subscribe() {
q.consumer.Consume(q.dataHandler) q.consumer.Consume(q.dataHandler)
} }
func (q *QueueWithKafka) Success(data []byte) {
return
}
func (q *QueueWithKafka) Fail(data []byte, err error) {
return
}

View File

@ -0,0 +1,64 @@
// Package message ...
//
// Description : 基于Redis实现的消息队列
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2021-09-24 8:56 下午
package message
import (
"context"
"git.zhangdeman.cn/zhangdeman/gopkg/middleware/redis"
)
// NewQueueWithRedis ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:58 下午 2021/9/24
func NewQueueWithRedis(redisClient redis.RealClient, topic string, dataHandler IRedisMessageHandler) IMessageQueue {
return &QueueWithRedis{
topic: topic,
handler: dataHandler,
client: redisClient,
}
}
// QueueWithRedis ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 8:58 下午 2021/9/24
type QueueWithRedis struct {
topic string
handler IRedisMessageHandler
client redis.RealClient
}
// Publish ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 9:00 下午 2021/9/24
func (q *QueueWithRedis) Publish(data interface{}) error {
return q.client.Instance.Publish(context.Background(), q.topic, data).Err()
}
// Subscribe ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 9:17 下午 2021/9/24
func (q *QueueWithRedis) Subscribe() {
mesChannel := q.client.Instance.Subscribe(context.Background(), q.topic).Channel()
for mes := range mesChannel {
err := q.handler.Execute([]byte(mes.Payload))
if nil != err {
q.handler.Fail([]byte(mes.Payload), err)
} else {
q.handler.Success([]byte(mes.Payload))
}
}
}