diff --git a/define/response.go b/define/response.go index 43e824e..eea05b8 100644 --- a/define/response.go +++ b/define/response.go @@ -139,3 +139,13 @@ type ResponseOption struct { Extension map[string]any `json:"extension"` // 扩展数据 XmlName string `json:"xml_name"` // 以xml文件格式响应数据时, Xml文件名(根节点) } + +type SseData struct { + ID string `json:"id"` // 响应数据 ID + Object string `json:"object"` // 响应数据对象类型 + Created int64 `json:"created"` // 创建时间, 秒级时间戳 + Choices []map[string]any `json:"choices"` // choice 列表 + EventType string `json:"event_type"` // 事件类型 +} + +const SseMsgFormat = "id:%v\nevent:%v\ndata:%v\n\n" diff --git a/logger/instance.go b/logger/instance.go index 8491cb5..352a938 100644 --- a/logger/instance.go +++ b/logger/instance.go @@ -51,4 +51,6 @@ const ( CodeLogicHook = "logic-hook" CodeParamValidateFailure = "param-validate-failure" CodeLogicErrorWrapper = "logic-error-wrapper" + CodeLogicSseInitError = "sse-init-error" + CodeLogicSseClosedError = "sse-closed-error" ) diff --git a/router/handler.go b/router/handler.go index 7ba3b97..c120892 100644 --- a/router/handler.go +++ b/router/handler.go @@ -9,9 +9,11 @@ package router import ( "errors" + "fmt" "net/http" "reflect" "sync" + "time" "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/exception" @@ -21,6 +23,8 @@ import ( "git.zhangdeman.cn/zhangdeman/gin/response" "git.zhangdeman.cn/zhangdeman/gin/util" loggerPkg "git.zhangdeman.cn/zhangdeman/logger" + "git.zhangdeman.cn/zhangdeman/serialize" + "git.zhangdeman.cn/zhangdeman/wrapper/op_string" "github.com/gin-gonic/gin" "github.com/mcuadros/go-defaults" ) @@ -173,8 +177,38 @@ func (s *server) SseHandler(uriCfg UriConfig) gin.HandlerFunc { ctx.Writer.Header().Set(consts.HeaderKeyConnection.String(), "keep-alive") ctx.Writer.Header().Set(consts.HeaderKeyXAccelBuffering.String(), "no") flusher, _ := ctx.Writer.(http.Flusher) - // TODO: 发送连接就绪消息 + // 发送连接就绪消息 + if _, err = fmt.Fprintf(ctx.Writer, define.SseMsgFormat, -1, "system", serialize.JSON.MarshalForStringIgnoreError(define.SseData{ + ID: op_string.Random(8, ""), + Object: "system", + Created: time.Now().Unix(), + Choices: []map[string]any{}, + EventType: "connected", + })); nil != err { + // 无法推送数据, 等价于结束, 如有必要, 让客户端发起重连 + logger.Instance.Error("SSE 连接建立成功后, 发送链接成功消息出现异常", loggerPkg.NewLogData(util.GinCtxToContext(ctx), logger.RecordType, logger.CodeLogicSseInitError, map[string]any{ + "err_msg": err.Error(), + }).ToFieldList()...) + return + } flusher.Flush() + defer func() { + // 发送连接关闭消息 + if _, err = fmt.Fprintf(ctx.Writer, define.SseMsgFormat, -3, "system", serialize.JSON.MarshalForStringIgnoreError(define.SseData{ + ID: op_string.Random(8, ""), + Object: "system", + Created: time.Now().Unix(), + Choices: []map[string]any{}, + EventType: "closed", + })); nil != err { + // 无法推送数据, 等价于结束, 如有必要, 让客户端发起重连 + logger.Instance.Error("SSE 连接断开前, 发送链接断开消息出现异常", loggerPkg.NewLogData(util.GinCtxToContext(ctx), logger.RecordType, logger.CodeLogicSseClosedError, map[string]any{ + "err_msg": err.Error(), + }).ToFieldList()...) + return + } + flusher.Flush() + }() firstParam = reflect.ValueOf(ctx) resList := uriCfg.ApiLogicFunc.Func.Call([]reflect.Value{uriCfg.ApiStructValue, firstParam, inputValue}) if resList[1].IsNil() { @@ -196,8 +230,42 @@ func (s *server) SseHandler(uriCfg UriConfig) gin.HandlerFunc { "err": resList[1].Interface(), }) } - response.SendWithException(ctx, e, &define.ResponseOption{ - ContentType: consts.MimeTypeJson, - }) + if nil != e { + // 异常终止 + if _, err = fmt.Fprintf(ctx.Writer, define.SseMsgFormat, -3, "system", serialize.JSON.MarshalForStringIgnoreError(define.SseData{ + ID: op_string.Random(8, ""), + Object: "system", + Created: time.Now().Unix(), + Choices: []map[string]any{ + { + "err_msg": e.Message(), + "err_code": e.Code(), + "err_data": e.Data(), + }, + }, + EventType: "failure", + })); nil != err { + // 无法推送数据, 等价于结束, 如有必要, 让客户端发起重连 + logger.Instance.Error("SSE 业务处理完成, 发送业务处理失败消息出现异常", loggerPkg.NewLogData(util.GinCtxToContext(ctx), logger.RecordType, logger.CodeLogicSseClosedError, map[string]any{ + "err_msg": err.Error(), + }).ToFieldList()...) + return + } + } else { + // 正常终止 + if _, err = fmt.Fprintf(ctx.Writer, define.SseMsgFormat, -3, "system", serialize.JSON.MarshalForStringIgnoreError(define.SseData{ + ID: op_string.Random(8, ""), + Object: "system", + Created: time.Now().Unix(), + Choices: []map[string]any{}, + EventType: "success", + })); nil != err { + // 无法推送数据, 等价于结束, 如有必要, 让客户端发起重连 + logger.Instance.Error("SSE 业务处理完成, 发送业务处理成功消息出现异常", loggerPkg.NewLogData(util.GinCtxToContext(ctx), logger.RecordType, logger.CodeLogicSseClosedError, map[string]any{ + "err_msg": err.Error(), + }).ToFieldList()...) + return + } + } } }