event/redis_pub_sub.go

266 lines
7.2 KiB
Go

// Package event ...
//
// Description : event ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2024-06-25 16:06
package event
import (
"context"
"errors"
"fmt"
"git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/event/abstract"
"git.zhangdeman.cn/zhangdeman/event/define"
"git.zhangdeman.cn/zhangdeman/serialize"
"git.zhangdeman.cn/zhangdeman/wrapper"
"github.com/redis/go-redis/v9"
"time"
)
var (
RedisEventPubSubClient abstract.IEvent
)
// InitRedisPubSubEvent 初始化redis事件驱动, 基于 pub / sub 指令
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:24 2024/6/25
func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) {
instance := &RedisEventPubSub{
redisClient: redisClient,
pubSubConfig: pubSubConfig,
}
instance.SetRedisClient(redisClient, pubSubConfig)
instance.StartConsumer() // 启动消费者
RedisEventPubSubClient = instance
}
// RedisEventPubSub 基于redis的事件驱动
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:07 2024/6/25
type RedisEventPubSub struct {
redisClient *redis.Client // redis客户端
pubSubConfig *define.RedisEventPubSubConfig // 事件配置
messageChan chan *define.EventData // 消息队列
stopConsumer chan bool // 停止消费者
isStop bool // 是否已停止
panicCallback abstract.PanicCallback // panic回调
}
// SendEvent 发布时间
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 10:58 2024/6/27
func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) {
partition := int(wrapper.String(eventData.Key).HashNumber().Value % uint64(r.pubSubConfig.PartitionNum))
realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, partition)
res := r.redisClient.Publish(ctx, realTopic, eventData)
if nil == res {
return nil, errors.New("can not get send event result")
}
return &define.SendResult{
Data: eventData,
Partition: partition,
Topic: realTopic,
IsSuccess: res.Err() == nil,
FailReason: wrapper.TernaryOperator.String(res.Err() == nil, "success", wrapper.String(res.Err().Error())).Value(),
Extension: nil,
}, res.Err()
}
// SendEventAsync 异步发送事件
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:29 2024/6/27
func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) {
go func() {
if nil == sendFailCallback {
sendFailCallback = define.DefaultSendFailCallback
}
if nil == sendSuccessCallback {
sendSuccessCallback = define.DefaultSendSuccessCallback
}
var (
sendResult *define.SendResult
handlerErr error
)
defer func() {
if panicErr := recover(); nil != panicErr {
r.panicCallback(panicErr, eventData, map[string]any{
"send_result": sendResult,
})
}
}()
if sendResult, handlerErr = r.SendEvent(ctx, eventData); nil != handlerErr {
sendFailCallback(ctx, eventData, sendResult, handlerErr)
} else {
sendSuccessCallback(ctx, sendResult)
}
}()
}
// GetConsumeEventChan 获取消息channel, 自行实现消费
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:13 2024/6/26
func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, error) {
if r.isStop {
return nil, errors.New("event instance has stop")
}
return r.messageChan, nil
}
// SetPanicCallback 出现任何panic的回调
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:02 2024/6/26
func (r *RedisEventPubSub) SetPanicCallback(panicCallback abstract.PanicCallback) {
if nil == panicCallback {
panicCallback = define.DefaultPanicCallback
}
r.panicCallback = panicCallback
}
// ConsumeEvent 获取数据消费实例
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 17:06 2024/6/26
func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error {
if nil == handler {
return errors.New("handler is nil")
}
var (
messageChan <-chan *define.EventData
err error
)
if messageChan, err = r.GetConsumeEventChan(); nil != err {
return err
}
if nil == failureCallback {
failureCallback = define.DefaultFailCallbackHandler
}
if nil == successCallback {
successCallback = define.DefaultSuccessCallbackHandler
}
for eventData := range messageChan {
handlerResult, handleErr := handler(eventData)
if nil != handleErr {
failureCallback(eventData, handlerResult, err)
} else {
successCallback(eventData, handlerResult)
}
}
return nil
}
// StartConsumer 启动消费者
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 18:36 2024/6/27
func (r *RedisEventPubSub) StartConsumer() {
for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ {
go func(realPartition int) {
defer func() {
if panicErr := recover(); nil != panicErr {
r.panicCallback(panicErr, nil, nil)
}
}()
// 启动消费者
realTopic := fmt.Sprintf("%v_%v", r.pubSubConfig.Topic, realPartition)
messageChannel := r.redisClient.Subscribe(context.Background(), realTopic).Channel()
for message := range messageChannel {
var (
eventData define.EventData
err error
)
if err = serialize.JSON.UnmarshalWithNumber([]byte(message.Payload), &eventData); nil != err {
// TODO : 事件数据解析失败的回调
continue
}
r.messageChan <- &eventData
}
}(partition)
}
}
// Destroy 销毁事件实例
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:42 2024/6/26
func (r *RedisEventPubSub) Destroy() {
if r.isStop {
// 已停止
return
}
r.stopConsumer <- true // 停止消费者
messageChan := make(chan bool, 1)
go func() {
for {
if len(r.messageChan) == 0 {
messageChan <- true
return
}
}
}()
select {
case <-time.After(time.Millisecond * time.Duration(r.pubSubConfig.CloseMaxWaitTime)):
// 定时器到期
return
case <-messageChan:
// 没有待消费数据
return
}
}
func (r *RedisEventPubSub) DriverType() string {
return consts.EventDriverRedis
}
// SetRedisClient 设置redis客户端
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:30 2024/6/25
func (r *RedisEventPubSub) SetRedisClient(redisClient *redis.Client, pubSubConfig *define.RedisEventPubSubConfig) {
if nil == pubSubConfig {
pubSubConfig = &define.RedisEventPubSubConfig{
Topic: "",
PartitionNum: 0,
MessageBufferSize: 0,
}
}
if len(pubSubConfig.Topic) == 0 {
pubSubConfig.Topic = define.DefaultRedisTopic
}
if pubSubConfig.PartitionNum <= 0 {
pubSubConfig.PartitionNum = define.DefaultRedisPartitionNum
}
if pubSubConfig.MessageBufferSize <= 0 {
r.pubSubConfig.MessageBufferSize = define.DefaultRedisMessageBufferSize
}
if pubSubConfig.CloseMaxWaitTime <= 0 {
r.pubSubConfig.CloseMaxWaitTime = define.DefaultRedisCloseMaxWaitTime
}
r.redisClient = redisClient
r.pubSubConfig = pubSubConfig
r.stopConsumer = make(chan bool, 1)
r.messageChan = make(chan *define.EventData, r.pubSubConfig.MessageBufferSize)
}