diff --git a/abstract/IEvent.go b/abstract/IEvent.go
index 54029fe..3e7997f 100644
--- a/abstract/IEvent.go
+++ b/abstract/IEvent.go
@@ -12,6 +12,48 @@ import (
 	"git.zhangdeman.cn/zhangdeman/event/define"
 )
 
+// EventHandler 事件数据处理函数
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 17:08 2024/6/26
+type EventHandler func(eventData *define.EventData) (map[string]any, error)
+
+// SendFailCallback 发送事件失败的回调
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 17:31 2024/6/26
+type SendFailCallback func(ctx context.Context, eventResult *define.SendResult)
+
+// SendSuccessCallback 发送事件成功的回调
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 17:32 2024/6/26
+type SendSuccessCallback func(ctx context.Context, eventResult *define.SendResult, err error)
+
+// ConsumeFailCallbackHandler 时间处理成功回调
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 17:18 2024/6/26
+type ConsumeFailCallbackHandler func(eventData *define.EventData, handleResult map[string]any, err error)
+
+// ConsumeSuccessCallback 时间处理失败回调
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 17:20 2024/6/26
+type ConsumeSuccessCallback func(eventData *define.EventData, handleResult map[string]any)
+
+// PanicCallback panic回调, 根据不同异常类型, eventData / handleResult 均可能为nil
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 18:07 2024/6/26
+type PanicCallback func(err any, eventData *define.EventData, handleResult map[string]any)
+
 // IEvent 事件接口定义
 //
 // Author : go_developer@163.com<白茶清欢>
@@ -23,44 +65,26 @@ type IEvent interface {
 	// Author : go_developer@163.com<白茶清欢>
 	//
 	// Date : 12:04 2024/3/11
-	SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error)
+	SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback) (*define.SendResult, error)
 
 	// SendEventAsync 发送事件(异步)
 	//
 	// Author : go_developer@163.com<白茶清欢>
 	//
 	// Date : 15:58 2024/6/25
-	SendEventAsync(ctx context.Context, eventData *define.EventData)
-	// SendFailCallback 发送失败的回调方法
+	SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback SendSuccessCallback, sendFailCallback SendFailCallback)
+	// GetConsumeEventChan 或去消息消费的channel, 自行实现消费
 	//
 	// Author : go_developer@163.com<白茶清欢>
 	//
-	// Date : 12:08 2024/3/11
-	SendFailCallback(ctx context.Context, eventResult *define.SendResult)
-	// SendSuccessCallback 发送成功的回调
-	//
-	// Author : go_developer@163.com<白茶清欢>
-	//
-	// Date : 12:08 2024/3/11
-	SendSuccessCallback(ctx context.Context, eventResult *define.SendResult, err error)
+	// Date : 17:11 2024/6/26
+	GetConsumeEventChan() (<-chan *define.EventData, error)
 	// ConsumeEvent 消费事件
 	//
 	// Author : go_developer@163.com<白茶清欢>
 	//
 	// Date : 12:05 2024/3/11
-	ConsumeEvent() (<-chan *define.EventData, error)
-	// ConsumeFailCallback 消费失败的回调, eventData 可能为 NIL
-	//
-	// Author : go_developer@163.com<白茶清欢>
-	//
-	// Date : 12:09 2024/3/11
-	ConsumeFailCallback(eventData *define.EventData, err error)
-	// ConsumeSuccessCallback 消费成功的回调
-	//
-	// Author : go_developer@163.com<白茶清欢>
-	//
-	// Date : 12:10 2024/3/11
-	ConsumeSuccessCallback(eventData *define.EventData)
+	ConsumeEvent(handler EventHandler, successCallback ConsumeSuccessCallback, failureCallback ConsumeFailCallbackHandler) error
 	// Destroy 事件实例销毁时, 执行的方法
 	//
 	// Author : go_developer@163.com<白茶清欢>
diff --git a/define/handler.go b/define/handler.go
new file mode 100644
index 0000000..c6722ca
--- /dev/null
+++ b/define/handler.go
@@ -0,0 +1,33 @@
+// Package define ...
+//
+// Description : define ...
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 2024-06-26 17:21
+package define
+
+// DefaultSuccessCallbackHandler ...
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 17:21 2024/6/26
+func DefaultSuccessCallbackHandler(eventData *EventData, handleResult map[string]any) {
+
+}
+
+// DefaultFailCallbackHandler ...
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 18:11 2024/6/26
+func DefaultFailCallbackHandler(eventData *EventData, handleResult map[string]any, err error) {
+
+}
+
+// DefaultPanicCallback ...
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 18:11 2024/6/26
+func DefaultPanicCallback(err any, eventData *EventData, handleResult map[string]any) {}
diff --git a/redis_pub_sub.go b/redis_pub_sub.go
index e6eefd2..ccb3466 100644
--- a/redis_pub_sub.go
+++ b/redis_pub_sub.go
@@ -9,6 +9,8 @@ package event
 
 import (
 	"context"
+	"errors"
+	"fmt"
 	"git.zhangdeman.cn/zhangdeman/consts"
 	"git.zhangdeman.cn/zhangdeman/event/abstract"
 	"git.zhangdeman.cn/zhangdeman/event/define"
@@ -40,45 +42,94 @@ func InitRedisPubSubEvent(redisClient *redis.Client, pubSubConfig *define.RedisE
 //
 // Date : 16:07 2024/6/25
 type RedisEventPubSub struct {
-	redisClient  *redis.Client                  // redis客户端
-	pubSubConfig *define.RedisEventPubSubConfig // 事件配置
-	messageChan  chan *define.EventData         // 消息队列
-	stopConsumer chan bool                      // 停止消费者
+	redisClient   *redis.Client                  // redis客户端
+	pubSubConfig  *define.RedisEventPubSubConfig // 事件配置
+	messageChan   chan *define.EventData         // 消息队列
+	stopConsumer  chan bool                      // 停止消费者
+	isStop        bool                           // 是否已停止
+	panicCallback abstract.PanicCallback         // panic回调
 }
 
-func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData) (*define.SendResult, error) {
+func (r *RedisEventPubSub) SendEvent(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) (*define.SendResult, error) {
 	//TODO implement me
 	panic("implement me")
 }
 
-func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData) {
+func (r *RedisEventPubSub) SendEventAsync(ctx context.Context, eventData *define.EventData, sendSuccessCallback abstract.SendSuccessCallback, sendFailCallback abstract.SendFailCallback) {
 	//TODO implement me
 	panic("implement me")
 }
 
-func (r *RedisEventPubSub) SendFailCallback(ctx context.Context, eventResult *define.SendResult) {
-	//TODO implement me
-	panic("implement me")
+// GetConsumeEventChan 获取消息channel, 自行实现消费
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 17:13 2024/6/26
+func (r *RedisEventPubSub) GetConsumeEventChan() (<-chan *define.EventData, error) {
+	if r.isStop {
+		return nil, errors.New("event instance has stop")
+	}
+	return r.messageChan, nil
 }
 
-func (r *RedisEventPubSub) SendSuccessCallback(ctx context.Context, eventResult *define.SendResult, err error) {
-	//TODO implement me
-	panic("implement me")
+// SetPanicCallback 出现任何panic的回调
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 18:02 2024/6/26
+func (r *RedisEventPubSub) SetPanicCallback(panicCallback abstract.PanicCallback) {
+	if nil == panicCallback {
+		panicCallback = define.DefaultPanicCallback
+	}
+	r.panicCallback = panicCallback
 }
 
-func (r *RedisEventPubSub) ConsumeEvent() (<-chan *define.EventData, error) {
-	//TODO implement me
-	panic("implement me")
-}
+// ConsumeEvent 获取数据消费实例
+//
+// Author : go_developer@163.com<白茶清欢>
+//
+// Date : 17:06 2024/6/26
+func (r *RedisEventPubSub) ConsumeEvent(handler abstract.EventHandler, successCallback abstract.ConsumeSuccessCallback, failureCallback abstract.ConsumeFailCallbackHandler) error {
+	if nil == handler {
+		return errors.New("handler is nil")
+	}
+	var (
+		messageChan <-chan *define.EventData
+		err         error
+	)
 
-func (r *RedisEventPubSub) ConsumeFailCallback(eventData *define.EventData, err error) {
-	//TODO implement me
-	panic("implement me")
-}
+	if messageChan, err = r.GetConsumeEventChan(); nil != err {
+		return err
+	}
+	if nil == failureCallback {
+		failureCallback = define.DefaultFailCallbackHandler
+	}
+	if nil == successCallback {
+		successCallback = define.DefaultSuccessCallbackHandler
+	}
+	go func() {
+		defer func() {
+			if panicErr := recover(); nil != panicErr {
+				fmt.Println(r)
+			}
+		}()
+		for !r.isStop || (r.isStop && len(messageChan) == 0) {
+			select {
+			case eventData := <-messageChan:
+				handlerResult, handlerErr := handler(eventData)
+				if nil != handlerErr {
+					// 失败回调
+					failureCallback(eventData, handlerResult, handlerErr)
+					break
+				} else {
+					// 成功回调
+					successCallback(eventData, handlerResult)
+				}
+			}
+		}
+	}()
 
-func (r *RedisEventPubSub) ConsumeSuccessCallback(eventData *define.EventData) {
-	//TODO implement me
-	panic("implement me")
+	return nil
 }
 
 // Destroy 销毁事件实例
@@ -87,6 +138,10 @@ func (r *RedisEventPubSub) ConsumeSuccessCallback(eventData *define.EventData) {
 //
 // Date : 15:42 2024/6/26
 func (r *RedisEventPubSub) Destroy() {
+	if r.isStop {
+		// 已停止
+		return
+	}
 	r.stopConsumer <- true // 停止消费者
 	messageChan := make(chan bool, 1)
 	go func() {