redis事件多分区消费
This commit is contained in:
parent
414ad47e72
commit
7fe6003db8
@ -152,27 +152,28 @@ func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCa
|
||||
if nil == successCallback {
|
||||
successCallback = define.DefaultSuccessCallbackHandler
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if panicErr := recover(); nil != panicErr {
|
||||
fmt.Println(r)
|
||||
}
|
||||
}()
|
||||
for !r.isStop || (r.isStop && len(messageChan) == 0) {
|
||||
select {
|
||||
case eventData := <-messageChan:
|
||||
handlerResult, handlerErr := handler(eventData)
|
||||
if nil != handlerErr {
|
||||
// 失败回调
|
||||
failureCallback(eventData, handlerResult, handlerErr)
|
||||
break
|
||||
} else {
|
||||
// 成功回调
|
||||
successCallback(eventData, handlerResult)
|
||||
for partition := 0; partition < r.pubSubConfig.PartitionNum; partition++ {
|
||||
go func(realPartition int) {
|
||||
defer func() {
|
||||
if panicErr := recover(); nil != panicErr {
|
||||
r.panicCallback(panicErr, nil, nil)
|
||||
}
|
||||
}()
|
||||
for !r.isStop || (r.isStop && len(messageChan) == 0) {
|
||||
select {
|
||||
case eventData := <-messageChan:
|
||||
handlerResult, handlerErr := handler(eventData)
|
||||
if nil != handlerErr {
|
||||
// 失败回调
|
||||
failureCallback(eventData, handlerResult, handlerErr)
|
||||
} else {
|
||||
// 成功回调
|
||||
successCallback(eventData, handlerResult)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}(partition)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user