diff --git a/abstract/ICommand.go b/abstract/ICommand.go index 5fe757e..eee04be 100644 --- a/abstract/ICommand.go +++ b/abstract/ICommand.go @@ -35,5 +35,5 @@ type ICommand interface { // Author : go_developer@163.com<张德满> // // Date : 7:21 下午 2021/3/27 - Execute(ctx *context.WSContext, data []byte) error + Execute(ctx *context.WSContext, data []byte) (interface{}, error) } diff --git a/construct.go b/construct.go index 458a67d..41f3327 100644 --- a/construct.go +++ b/construct.go @@ -35,10 +35,9 @@ import ( // // Date : 8:04 下午 2021/3/27 type Server struct { - ginRouter *gin.Engine // GIN引擎 - wsServer *melody.Melody // websocket引擎 - conf *config.WSServerConfig // 配置 - loggerInstance *zap.Logger // 日志实例 + ginRouter *gin.Engine // GIN引擎 + wsServer *melody.Melody // websocket引擎 + conf *config.WSServerConfig // 配置 } var ( @@ -48,6 +47,8 @@ var ( wsServerTable map[int]map[string]*Server // commandTable 指令表 commandTable map[string]map[string]abstract.ICommand + // 日志实例表 + loggerInstanceTable map[string]*zap.Logger // 服务启停的信号 sigChan = make(chan int, 0) ) @@ -82,6 +83,8 @@ func initServer(wsInstance abstract.IWebsocket) { if _, exist := wsServerTable[wsInstance.GetServerPort()]; !exist { wsServerTable[wsInstance.GetServerPort()] = make(map[string]*Server) } + // 初始化日志实例表 + loggerInstanceTable = make(map[string]*zap.Logger) if _, exist := wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()]; !exist { // 生成并存储 WS-Server wsSetConfigList := wsInstance.GetWSServerConfig() @@ -115,15 +118,18 @@ func initServer(wsInstance abstract.IWebsocket) { if logConf, err = logger.NewRotateLogConfig(s.conf.LogPath, s.conf.LogFile, logger.WithTimeIntervalType(s.conf.LogSplitInterval)); nil != err { panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error()) } - if s.loggerInstance, err = logger.NewLogger(s.conf.LogLevel, logConf, optionList...); nil != err { + if loggerInstance, err := logger.NewLogger(s.conf.LogLevel, logConf, optionList...); nil != err { panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error()) + } else { + loggerInstanceTable[wsInstance.GetModuleFlag()] = loggerInstance } } - if nil == s.loggerInstance && s.conf.LogConsole { - var err error + if nil == loggerInstanceTable[wsInstance.GetModuleFlag()] && s.conf.LogConsole { // 没有配置文件日志, 但是配置了控制台输出 - if s.loggerInstance, err = logger.NewConsoleLogger(s.conf.LogLevel); nil != err { + if loggerInstance, err := logger.NewConsoleLogger(s.conf.LogLevel); nil != err { panic(wsInstance.GetModuleFlag() + " 模块开启了控制台日志记录,日志初始化失败, 失败原因 : " + err.Error()) + } else { + loggerInstanceTable[wsInstance.GetModuleFlag()] = loggerInstance } } wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()] = s @@ -137,33 +143,43 @@ func initServer(wsInstance abstract.IWebsocket) { // 注册路由 for _, path := range wsInstance.HandshakeURL() { routerGroup.GET(path, func(ctx *gin.Context) { + wsCtx := context.NewContext(ctx, wsInstance.GetModuleFlag(), nil) parameter := map[string]interface{}{ - "ws_context": context.NewContext(ctx, wsInstance.GetModuleFlag(), nil), + "ws_context": wsCtx, + } + if err := wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleRequestWithKeys(ctx.Writer, ctx.Request, parameter); nil != err { + log( + loggerInstanceTable[wsInstance.GetModuleFlag()], + logFuncPanic, + "模块启动,注册路由,绑定数据失败", + getLoadDataList(wsCtx, zap.Error(err)), + ) } - _ = wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleRequestWithKeys(ctx.Writer, ctx.Request, parameter) }) } + currentWSServer := getWsServer(wsInstance.GetServerPort(), wsInstance.GetModuleFlag()) // 注册回调函数 - // 1. 建立连接的函数 - wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleConnect(func(session *melody.Session) { + // 1. 建立连接的函数注册回调函数 + // // + currentWSServer.wsServer.HandleConnect(func(session *melody.Session) { ctxInterface, _ := session.Get("ws_context") ctx := ctxInterface.(*context.WSContext) ctx.Session = session if err := wsInstance.Connect(ctx); nil == err { - if wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.StoreConnection && nil != wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager { - wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Store(ctx) + if currentWSServer.conf.StoreConnection && nil != currentWSServer.conf.ConnectionManager { + currentWSServer.conf.ConnectionManager.Store(ctx) } } }) // 2. 指令处理的函数 - wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleMessage(func(session *melody.Session, bytes []byte) { + currentWSServer.wsServer.HandleMessage(func(session *melody.Session, bytes []byte) { // TODO : 增加指令回调失败的callback ctxInterface, _ := session.Get("ws_context") ctx := ctxInterface.(*context.WSContext) _ = dispatchCommand(context.CloneContext(ctx), bytes) }) // 3, 关闭连接的处理函数 - wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleClose(func(session *melody.Session, i int, s string) error { + currentWSServer.wsServer.HandleClose(func(session *melody.Session, i int, s string) error { ctxInterface, _ := session.Get("ws_context") ctx := ctxInterface.(*context.WSContext) defer func() { @@ -175,31 +191,41 @@ func initServer(wsInstance abstract.IWebsocket) { return wsInstance.Close(ctx, i, s) }) // 4. 断开连接的处理函数 - wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleDisconnect(func(session *melody.Session) { + currentWSServer.wsServer.HandleDisconnect(func(session *melody.Session) { ctxInterface, _ := session.Get("ws_context") ctx := ctxInterface.(*context.WSContext) defer func() { - if nil == wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager { + if nil == currentWSServer.conf.ConnectionManager { return } - wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Del(ctx, "") + currentWSServer.conf.ConnectionManager.Del(ctx, "") }() wsInstance.Disconnect(ctx) }) // 注册指令 for _, cmd := range wsInstance.GetCommandList() { - if nil != wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].loggerInstance { - wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].loggerInstance.Debug( - "长连接指令注册成功", + log( + getLoggerInstance(wsInstance.GetModuleFlag(), nil), + logFuncInfo, + "长连接指令注册成功", + getLoadDataList(nil, zap.String("module", wsInstance.GetModuleFlag()), zap.String("command", cmd.GetCommand()), - ) - } + ), + ) commandTable[wsInstance.GetModuleFlag()][cmd.GetCommand()] = cmd } go func() { if err := ginRouterTable[wsInstance.GetServerPort()].Run(fmt.Sprintf(":%d", wsInstance.GetServerPort())); nil != err { - panic(err) + log( + getLoggerInstance(wsInstance.GetModuleFlag(), nil), + logFuncPanic, + "模块启动端口监听失败", + getLoadDataList(nil, + zap.String("module_flag", wsInstance.GetModuleFlag()), + zap.Error(err), + ), + ) } }() } @@ -211,34 +237,81 @@ func initServer(wsInstance abstract.IWebsocket) { // Date : 3:36 下午 2021/3/28 func dispatchCommand(ctx *context.WSContext, data []byte) error { if _, exist := commandTable[ctx.Flag]; !exist { + log( + getLoggerInstance(ctx.Flag, nil), + logFuncFatal, + "长连接模块不存在", + getLoadDataList(ctx), + ) return errors.WithStack(errors.New("未注册【" + ctx.Flag + "】长连接模块")) } - - cmd := gjson.Get(string(data), "command").String() var ( exist bool cmdInstance abstract.ICommand cmdConfig *config.CommandConfig err error + result interface{} + optionList []config.SetCommandConfig ) + cmd := gjson.Get(string(data), "command").String() + if cmdInstance, exist = commandTable[ctx.Flag][cmd]; !exist { + log( + getLoggerInstance(ctx.Flag, nil), + logFuncFatal, + "指令未注册", + getLoadDataList(ctx), + ) return errors.WithStack(errors.New("【" + ctx.Flag + "】长连接模块未注册【" + cmd + "】指令")) } - optionList := cmdInstance.GetConfigOption() - if nil == optionList { + + if optionList = cmdInstance.GetConfigOption(); nil == optionList { optionList = make([]config.SetCommandConfig, 0) } cmdConfig = config.NewCommandConfig(optionList...) - if err = cmdInstance.Execute(ctx, data); nil != err { + log( + getLoggerInstance(ctx.Flag, &cmdConfig.LogUpData), + logFuncInfo, + "上行原始数据记录", + getLoadDataList(ctx, zap.String("up_data", string(data))), + ) + + if result, err = cmdInstance.Execute(ctx, data); nil != err { if cmdConfig.PushMessageWithError { - _ = message.Response(ctx, map[string]interface{}{ + if err := message.Response(ctx, map[string]interface{}{ "command": cmd, "message": err.Error(), "success": false, - }) + }); nil != err { + log( + getLoggerInstance(ctx.Flag, nil), + logFuncWarn, + "指令执行失败", + getLoadDataList(ctx, zap.Error(err)), + ) + } } return err } + + if cmdConfig.ResponseData { + responseData := buildResponseData(ctx, cmd, result) + if err := ctx.Session.Write(responseData); nil != err { + log( + getLoggerInstance(ctx.Flag, nil), + logFuncWarn, + "指令响应结果失败", + getLoadDataList(ctx, zap.Error(err), zap.String("expect_response", string(responseData))), + ) + return err + } + log( + getLoggerInstance(ctx.Flag, &cmdConfig.LogDownData), + logFuncInfo, + "指令响应结果记录", + getLoadDataList(ctx, zap.String("down_data", string(responseData))), + ) + } return nil } diff --git a/define.go b/define.go new file mode 100644 index 0000000..c68b274 --- /dev/null +++ b/define.go @@ -0,0 +1,113 @@ +// Package websocket ... +// +// Description : 数据结构定义 +// +// Author : go_developer@163.com<张德满> +// +// Date : 2021-04-18 7:53 下午 +package websocket + +import ( + "encoding/json" + "time" + + "github.com/go-developer/gopkg/util" + "github.com/go-developer/websocket/context" + "go.uber.org/zap" +) + +// buildResponseData 构建响应数据 +// +// Author : go_developer@163.com<张德满> +// +// Date : 8:23 下午 2021/4/18 +func buildResponseData(wsCtx *context.WSContext, cmd string, data interface{}) []byte { + r := map[string]interface{}{ + "connection_id": wsCtx.ConnectionID, + "trace_id": wsCtx.TraceID, + "command": cmd, + "data": data, + } + byteData, _ := json.Marshal(r) + return byteData +} + +// getLoadDataList 获取需记录的数据列表 +// +// Author : go_developer@163.com<张德满> +// +// Date : 8:00 下午 2021/4/18 +func getLoadDataList(wsContext *context.WSContext, fieldList ...zap.Field) []zap.Field { + if nil == wsContext { + wsContext = &context.WSContext{} + } + list := []zap.Field{ + zap.String("connection_id", wsContext.ConnectionID), + zap.String("trace_id", wsContext.TraceID), + zap.String("module_flag", wsContext.Flag), + zap.String("server_ip", util.GetHostIP()), + zap.Int64("timestamp", time.Now().UnixNano()), + } + list = append(list, fieldList...) + return list +} + +const ( + logFuncDebug = "debug" + logFuncInfo = "info" + logFuncWarn = "warn" + logFuncError = "error" + logFuncDPanic = "dpanic" + logFuncPanic = "panic" + logFuncFatal = "fatal" +) + +// getWsServer 获取WS-Server实例 +// +// Author : go_developer@163.com<张德满> +// +// Date : 8:54 下午 2021/4/18 +func getWsServer(port int, flag string) *Server { + return wsServerTable[port][flag] +} + +// getLoggerInstance 获取日志实例 +// +// Author : go_developer@163.com<张德满> +// +// Date : 8:48 下午 2021/4/18 +func getLoggerInstance(moduleFlag string, cmdAllow *bool) *zap.Logger { + if nil != cmdAllow && !*cmdAllow { + return nil + } + return loggerInstanceTable[moduleFlag] +} + +// log 记录日志 +// +// Author : go_developer@163.com<张德满> +// +// Date : 8:29 下午 2021/4/18 +func log(loggerInstance *zap.Logger, f string, message string, fieldList []zap.Field) { + if nil == loggerInstance { + return + } + switch f { + case "info": + loggerInstance.Info(message, fieldList...) + case "warn": + loggerInstance.Warn(message, fieldList...) + case "error": + loggerInstance.Error(message, fieldList...) + case "dpanic": + loggerInstance.DPanic(message, fieldList...) + case "panic": + loggerInstance.Panic(message, fieldList...) + case "fatal": + loggerInstance.Fatal(message, fieldList...) + case "debug": + fallthrough + default: + loggerInstance.Debug(message, fieldList...) + } +} diff --git a/example/server.go b/example/server.go index 2972fc8..7bcca54 100644 --- a/example/server.go +++ b/example/server.go @@ -79,7 +79,6 @@ func (e exampleCommand) GetConfigOption() []config.SetCommandConfig { return []config.SetCommandConfig{config.ClosePushCommandErrorMessage()} } -func (e exampleCommand) Execute(ctx *context.WSContext, data []byte) error { - message.Response(ctx, map[string]interface{}{"ping": "pong"}) - return nil +func (e exampleCommand) Execute(ctx *context.WSContext, data []byte) (interface{}, error) { + return map[string]interface{}{"ping": "pong"}, nil }