feat: 增加sse支持

This commit is contained in:
2025-12-29 10:27:17 +08:00
parent 8511f311f8
commit f424813a08
3 changed files with 84 additions and 4 deletions

View File

@@ -139,3 +139,13 @@ type ResponseOption struct {
Extension map[string]any `json:"extension"` // 扩展数据
XmlName string `json:"xml_name"` // 以xml文件格式响应数据时, Xml文件名(根节点)
}
type SseData struct {
ID string `json:"id"` // 响应数据 ID
Object string `json:"object"` // 响应数据对象类型
Created int64 `json:"created"` // 创建时间, 秒级时间戳
Choices []map[string]any `json:"choices"` // choice 列表
EventType string `json:"event_type"` // 事件类型
}
const SseMsgFormat = "id:%v\nevent:%v\ndata:%v\n\n"

View File

@@ -51,4 +51,6 @@ const (
CodeLogicHook = "logic-hook"
CodeParamValidateFailure = "param-validate-failure"
CodeLogicErrorWrapper = "logic-error-wrapper"
CodeLogicSseInitError = "sse-init-error"
CodeLogicSseClosedError = "sse-closed-error"
)

View File

@@ -9,9 +9,11 @@ package router
import (
"errors"
"fmt"
"net/http"
"reflect"
"sync"
"time"
"git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/exception"
@@ -21,6 +23,8 @@ import (
"git.zhangdeman.cn/zhangdeman/gin/response"
"git.zhangdeman.cn/zhangdeman/gin/util"
loggerPkg "git.zhangdeman.cn/zhangdeman/logger"
"git.zhangdeman.cn/zhangdeman/serialize"
"git.zhangdeman.cn/zhangdeman/wrapper/op_string"
"github.com/gin-gonic/gin"
"github.com/mcuadros/go-defaults"
)
@@ -173,8 +177,38 @@ func (s *server) SseHandler(uriCfg UriConfig) gin.HandlerFunc {
ctx.Writer.Header().Set(consts.HeaderKeyConnection.String(), "keep-alive")
ctx.Writer.Header().Set(consts.HeaderKeyXAccelBuffering.String(), "no")
flusher, _ := ctx.Writer.(http.Flusher)
// TODO: 发送连接就绪消息
// 发送连接就绪消息
if _, err = fmt.Fprintf(ctx.Writer, define.SseMsgFormat, -1, "system", serialize.JSON.MarshalForStringIgnoreError(define.SseData{
ID: op_string.Random(8, ""),
Object: "system",
Created: time.Now().Unix(),
Choices: []map[string]any{},
EventType: "connected",
})); nil != err {
// 无法推送数据, 等价于结束, 如有必要, 让客户端发起重连
logger.Instance.Error("SSE 连接建立成功后, 发送链接成功消息出现异常", loggerPkg.NewLogData(util.GinCtxToContext(ctx), logger.RecordType, logger.CodeLogicSseInitError, map[string]any{
"err_msg": err.Error(),
}).ToFieldList()...)
return
}
flusher.Flush()
defer func() {
// 发送连接关闭消息
if _, err = fmt.Fprintf(ctx.Writer, define.SseMsgFormat, -3, "system", serialize.JSON.MarshalForStringIgnoreError(define.SseData{
ID: op_string.Random(8, ""),
Object: "system",
Created: time.Now().Unix(),
Choices: []map[string]any{},
EventType: "closed",
})); nil != err {
// 无法推送数据, 等价于结束, 如有必要, 让客户端发起重连
logger.Instance.Error("SSE 连接断开前, 发送链接断开消息出现异常", loggerPkg.NewLogData(util.GinCtxToContext(ctx), logger.RecordType, logger.CodeLogicSseClosedError, map[string]any{
"err_msg": err.Error(),
}).ToFieldList()...)
return
}
flusher.Flush()
}()
firstParam = reflect.ValueOf(ctx)
resList := uriCfg.ApiLogicFunc.Func.Call([]reflect.Value{uriCfg.ApiStructValue, firstParam, inputValue})
if resList[1].IsNil() {
@@ -196,8 +230,42 @@ func (s *server) SseHandler(uriCfg UriConfig) gin.HandlerFunc {
"err": resList[1].Interface(),
})
}
response.SendWithException(ctx, e, &define.ResponseOption{
ContentType: consts.MimeTypeJson,
})
if nil != e {
// 异常终止
if _, err = fmt.Fprintf(ctx.Writer, define.SseMsgFormat, -3, "system", serialize.JSON.MarshalForStringIgnoreError(define.SseData{
ID: op_string.Random(8, ""),
Object: "system",
Created: time.Now().Unix(),
Choices: []map[string]any{
{
"err_msg": e.Message(),
"err_code": e.Code(),
"err_data": e.Data(),
},
},
EventType: "failure",
})); nil != err {
// 无法推送数据, 等价于结束, 如有必要, 让客户端发起重连
logger.Instance.Error("SSE 业务处理完成, 发送业务处理失败消息出现异常", loggerPkg.NewLogData(util.GinCtxToContext(ctx), logger.RecordType, logger.CodeLogicSseClosedError, map[string]any{
"err_msg": err.Error(),
}).ToFieldList()...)
return
}
} else {
// 正常终止
if _, err = fmt.Fprintf(ctx.Writer, define.SseMsgFormat, -3, "system", serialize.JSON.MarshalForStringIgnoreError(define.SseData{
ID: op_string.Random(8, ""),
Object: "system",
Created: time.Now().Unix(),
Choices: []map[string]any{},
EventType: "success",
})); nil != err {
// 无法推送数据, 等价于结束, 如有必要, 让客户端发起重连
logger.Instance.Error("SSE 业务处理完成, 发送业务处理成功消息出现异常", loggerPkg.NewLogData(util.GinCtxToContext(ctx), logger.RecordType, logger.CodeLogicSseClosedError, map[string]any{
"err_msg": err.Error(),
}).ToFieldList()...)
return
}
}
}
}