// Package websocket... // // Description : websocket... // // Author : go_developer@163.com<张德满> // // Date : 2021-03-27 6:49 下午 package websocket import ( "fmt" "github.com/go-developer/gopkg/logger" "go.uber.org/zap" "github.com/go-developer/websocket/message" "github.com/go-developer/websocket/config" "github.com/tidwall/gjson" "github.com/go-developer/websocket/context" "gopkg.in/olahol/melody.v1" "github.com/gin-gonic/gin" "github.com/go-developer/websocket/abstract" "github.com/pkg/errors" ) // Server ... // // Author : go_developer@163.com<张德满> // // Date : 8:04 下午 2021/3/27 type Server struct { ginRouter *gin.Engine // GIN引擎 wsServer *melody.Melody // websocket引擎 conf *config.WSServerConfig // 配置 } var ( // ginRouterTable 表 ginRouterTable map[int]*gin.Engine // wsServerTable 表 wsServerTable map[int]map[string]*Server // commandTable 指令表 commandTable map[string]map[string]abstract.ICommand // 日志实例表 loggerInstanceTable map[string]*zap.Logger // 服务启停的信号 sigChan = make(chan int, 0) ) // NewWebsocketServe 启动websocket服务 // // Author : go_developer@163.com<张德满> // // Date : 6:49 下午 2021/3/27 func NewWebsocketServe(wsInstanceList ...abstract.IWebsocket) error { if len(wsInstanceList) == 0 { return errors.WithStack(errors.New("register websocket server list is empty")) } ginRouterTable = make(map[int]*gin.Engine) wsServerTable = make(map[int]map[string]*Server) commandTable = make(map[string]map[string]abstract.ICommand) for _, wsInstance := range wsInstanceList { initServer(wsInstance) } run() return nil } // 初始化server func initServer(wsInstance abstract.IWebsocket) { // 初始化ws server // 初始化 gin 路由表 if _, exist := ginRouterTable[wsInstance.GetServerPort()]; !exist { ginRouterTable[wsInstance.GetServerPort()] = gin.Default() } // 初始化WS-Server表 if _, exist := wsServerTable[wsInstance.GetServerPort()]; !exist { wsServerTable[wsInstance.GetServerPort()] = make(map[string]*Server) } // 初始化日志实例表 loggerInstanceTable = make(map[string]*zap.Logger) if _, exist := wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()]; !exist { // 生成并存储 WS-Server wsSetConfigList := wsInstance.GetWSServerConfig() if nil == wsSetConfigList { wsSetConfigList = make([]config.SetWSServerConfig, 0) } s := &Server{ ginRouter: gin.Default(), wsServer: melody.New(), conf: config.NewWSServerConfig(wsSetConfigList...), } if s.conf.LogEnable { // 开启了日志,初始化日志 if len(s.conf.LogPath) == 0 { panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,但是没有配置日志路径") } var ( err error logConf *logger.RotateLogConfig ) optionList := make([]logger.SetLoggerOptionFunc, 0) if s.conf.LogConsole { optionList = append(optionList, logger.WithConsoleOutput()) } if s.conf.LogConsole { optionList = append(optionList, logger.WithConsoleOutput()) } if len(s.conf.LogFile) == 0 { s.conf.LogFile = wsInstance.GetModuleFlag() + ".log" } if logConf, err = logger.NewRotateLogConfig(s.conf.LogPath, s.conf.LogFile, logger.WithTimeIntervalType(s.conf.LogSplitInterval)); nil != err { panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error()) } if loggerInstance, err := logger.NewLogger(s.conf.LogLevel, logConf, optionList...); nil != err { panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error()) } else { loggerInstanceTable[wsInstance.GetModuleFlag()] = loggerInstance } } if nil == loggerInstanceTable[wsInstance.GetModuleFlag()] && s.conf.LogConsole { // 没有配置文件日志, 但是配置了控制台输出 if loggerInstance, err := logger.NewConsoleLogger(s.conf.LogLevel); nil != err { panic(wsInstance.GetModuleFlag() + " 模块开启了控制台日志记录,日志初始化失败, 失败原因 : " + err.Error()) } else { loggerInstanceTable[wsInstance.GetModuleFlag()] = loggerInstance } } wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()] = s } // 初始化指令存储表 if _, exist := commandTable[wsInstance.GetModuleFlag()]; !exist { commandTable[wsInstance.GetModuleFlag()] = make(map[string]abstract.ICommand) } routerGroup := ginRouterTable[wsInstance.GetServerPort()].Group(wsInstance.GetModuleFlag()) // 注册路由 for _, path := range wsInstance.HandshakeURL() { routerGroup.GET(path, func(ctx *gin.Context) { wsCtx := context.NewContext(ctx, wsInstance.GetModuleFlag(), nil) parameter := map[string]interface{}{ "ws_context": wsCtx, } if err := wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleRequestWithKeys(ctx.Writer, ctx.Request, parameter); nil != err { log( loggerInstanceTable[wsInstance.GetModuleFlag()], logFuncPanic, "模块启动,注册路由,绑定数据失败", getLoadDataList(wsCtx, zap.Error(err)), ) } }) } currentWSServer := getWsServer(wsInstance.GetServerPort(), wsInstance.GetModuleFlag()) // 注册回调函数 // 1. 建立连接的函数注册回调函数 // // currentWSServer.wsServer.HandleConnect(func(session *melody.Session) { ctxInterface, _ := session.Get("ws_context") ctx := ctxInterface.(*context.WSContext) ctx.Session = session if err := wsInstance.Connect(ctx); nil == err { if currentWSServer.conf.StoreConnection && nil != currentWSServer.conf.ConnectionManager { currentWSServer.conf.ConnectionManager.Store(ctx) } } }) // 2. 指令处理的函数 currentWSServer.wsServer.HandleMessage(func(session *melody.Session, bytes []byte) { // TODO : 增加指令回调失败的callback ctxInterface, _ := session.Get("ws_context") ctx := ctxInterface.(*context.WSContext) _ = dispatchCommand(context.CloneContext(ctx), bytes) }) // 3, 关闭连接的处理函数 currentWSServer.wsServer.HandleClose(func(session *melody.Session, i int, s string) error { ctxInterface, _ := session.Get("ws_context") ctx := ctxInterface.(*context.WSContext) defer func() { if !wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.StoreConnection || nil == wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager { return } wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Del(ctx, "") }() return wsInstance.Close(ctx, i, s) }) // 4. 断开连接的处理函数 currentWSServer.wsServer.HandleDisconnect(func(session *melody.Session) { ctxInterface, _ := session.Get("ws_context") ctx := ctxInterface.(*context.WSContext) defer func() { if nil == currentWSServer.conf.ConnectionManager { return } currentWSServer.conf.ConnectionManager.Del(ctx, "") }() wsInstance.Disconnect(ctx) }) // 注册指令 for _, cmd := range wsInstance.GetCommandList() { log( getLoggerInstance(wsInstance.GetModuleFlag(), nil), logFuncInfo, "长连接指令注册成功", getLoadDataList(nil, zap.String("module", wsInstance.GetModuleFlag()), zap.String("command", cmd.GetCommand()), ), ) commandTable[wsInstance.GetModuleFlag()][cmd.GetCommand()] = cmd } go func() { if err := ginRouterTable[wsInstance.GetServerPort()].Run(fmt.Sprintf(":%d", wsInstance.GetServerPort())); nil != err { log( getLoggerInstance(wsInstance.GetModuleFlag(), nil), logFuncPanic, "模块启动端口监听失败", getLoadDataList(nil, zap.String("module_flag", wsInstance.GetModuleFlag()), zap.Error(err), ), ) } }() } // dispatchCommand 调度command ... // // Author : go_developer@163.com<张德满> // // Date : 3:36 下午 2021/3/28 func dispatchCommand(ctx *context.WSContext, data []byte) error { if _, exist := commandTable[ctx.Flag]; !exist { log( getLoggerInstance(ctx.Flag, nil), logFuncFatal, "长连接模块不存在", getLoadDataList(ctx), ) return errors.WithStack(errors.New("未注册【" + ctx.Flag + "】长连接模块")) } var ( exist bool cmdInstance abstract.ICommand cmdConfig *config.CommandConfig err error result interface{} optionList []config.SetCommandConfig ) cmd := gjson.Get(string(data), "command").String() if cmdInstance, exist = commandTable[ctx.Flag][cmd]; !exist { log( getLoggerInstance(ctx.Flag, nil), logFuncFatal, "指令未注册", getLoadDataList(ctx), ) return errors.WithStack(errors.New("【" + ctx.Flag + "】长连接模块未注册【" + cmd + "】指令")) } if optionList = cmdInstance.GetConfigOption(); nil == optionList { optionList = make([]config.SetCommandConfig, 0) } cmdConfig = config.NewCommandConfig(optionList...) log( getLoggerInstance(ctx.Flag, &cmdConfig.LogUpData), logFuncInfo, "上行原始数据记录", getLoadDataList(ctx, zap.String("up_data", string(data))), ) if result, err = cmdInstance.Execute(ctx, data); nil != err { if cmdConfig.PushMessageWithError { if err := message.Response(ctx, map[string]interface{}{ "command": cmd, "message": err.Error(), "success": false, }); nil != err { log( getLoggerInstance(ctx.Flag, nil), logFuncWarn, "指令执行失败", getLoadDataList(ctx, zap.Error(err)), ) } } return err } if cmdConfig.ResponseData { responseData := buildResponseData(ctx, cmd, result) if err := ctx.Session.Write(responseData); nil != err { log( getLoggerInstance(ctx.Flag, nil), logFuncWarn, "指令响应结果失败", getLoadDataList(ctx, zap.Error(err), zap.String("expect_response", string(responseData))), ) return err } log( getLoggerInstance(ctx.Flag, &cmdConfig.LogDownData), logFuncInfo, "指令响应结果记录", getLoadDataList(ctx, zap.String("down_data", string(responseData))), ) } return nil } func run() { <-sigChan // TODO : 增加后置hook } // Stop 停止服务 // // Author : go_developer@163.com<张德满> // // Date : 3:55 下午 2021/3/28 func Stop() { sigChan <- 1 }