diff --git a/redis_pub_sub.go b/redis_pub_sub.go index 6289618..51792cb 100644 --- a/redis_pub_sub.go +++ b/redis_pub_sub.go @@ -152,27 +152,28 @@ func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCa if nil == successCallback { successCallback = define.DefaultSuccessCallbackHandler } - go func() { - defer func() { - if panicErr := recover(); nil != panicErr { - fmt.Println(r) - } - }() - for !r.isStop || (r.isStop && len(messageChan) == 0) { - select { - case eventData := <-messageChan: - handlerResult, handlerErr := handler(eventData) - if nil != handlerErr { - // 失败回调 - failureCallback(eventData, handlerResult, handlerErr) - break - } else { - // 成功回调 - successCallback(eventData, handlerResult) + for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ { + go func(realPartition int) { + defer func() { + if panicErr := recover(); nil != panicErr { + r.panicCallback(panicErr, nil, nil) + } + }() + for !r.isStop || (r.isStop && len(messageChan) == 0) { + select { + case eventData := <-messageChan: + handlerResult, handlerErr := handler(eventData) + if nil != handlerErr { + // 失败回调 + failureCallback(eventData, handlerResult, handlerErr) + } else { + // 成功回调 + successCallback(eventData, handlerResult) + } } } - } - }() + }(partition) + } return nil }