From 7fe6003db876e09d2a5f3791b8c442bffe8caa19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Thu, 27 Jun 2024 12:00:15 +0800 Subject: [PATCH] =?UTF-8?q?redis=E4=BA=8B=E4=BB=B6=E5=A4=9A=E5=88=86?= =?UTF-8?q?=E5=8C=BA=E6=B6=88=E8=B4=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- redis_pub_sub.go | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/redis_pub_sub.go b/redis_pub_sub.go index 6289618..51792cb 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -152,27 +152,28 @@ func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCa if nil == successCallback { successCallback = define.DefaultSuccessCallbackHandler } - go func() { - defer func() { - if panicErr := recover(); nil != panicErr { - fmt.Println(r) - } - }() - for !r.isStop || (r.isStop && len(messageChan) == 0) { - select { - case eventData := <-messageChan: - handlerResult, handlerErr := handler(eventData) - if nil != handlerErr { - // 失败回调 - failureCallback(eventData, handlerResult, handlerErr) - break - } else { - // 成功回调 - successCallback(eventData, handlerResult) + for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ { + go func(realPartition int) { + defer func() { + if panicErr := recover(); nil != panicErr { + r.panicCallback(panicErr, nil, nil) + } + }() + for !r.isStop || (r.isStop && len(messageChan) == 0) { + select { + case eventData := <-messageChan: + handlerResult, handlerErr := handler(eventData) + if nil != handlerErr { + // 失败回调 + failureCallback(eventData, handlerResult, handlerErr) + } else { + // 成功回调 + successCallback(eventData, handlerResult) + } } } - } - }() + }(partition) + } return nil }