From 4177c0193960bd244fc887bb9b5f5bc02cd0a834 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BE=B7=E6=BB=A1?= Date: Sun, 18 Apr 2021 00:43:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=87=E7=BA=A7server=E9=80=BB=E8=BE=91,=20?= =?UTF-8?q?=E5=A4=9A=E6=A8=A1=E5=9D=97=E7=9B=91=E5=90=AC=E5=90=8C=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E7=AB=AF=E5=8F=A3,=E9=99=A4=E4=BA=86=E5=85=B1?= =?UTF-8?q?=E4=BA=ABGIN=E5=AE=9E=E4=BE=8B=E5=A4=96,=E5=85=B6=E4=BB=96?= =?UTF-8?q?=E5=9D=87=E6=8C=89=E7=85=A7=E6=A8=A1=E5=9D=97=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?=E9=9A=94=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- construct.go | 271 ++++++++++++++++++++++++++------------------------- 1 file changed, 139 insertions(+), 132 deletions(-) diff --git a/construct.go b/construct.go index 498da0b..3d927e7 100644 --- a/construct.go +++ b/construct.go @@ -9,7 +9,6 @@ package websocket import ( "fmt" - "sync" "github.com/go-developer/gopkg/logger" @@ -25,8 +24,6 @@ import ( "gopkg.in/olahol/melody.v1" - "github.com/go-developer/gopkg/easylock" - "github.com/gin-gonic/gin" "github.com/go-developer/websocket/abstract" "github.com/pkg/errors" @@ -46,7 +43,9 @@ type Server struct { var ( // ginRouterTable 表 - ginRouterTable map[int]*Server + ginRouterTable map[int]*gin.Engine + // wsServerTable 表 + wsServerTable map[int]map[string]*Server // commandTable 指令表 commandTable map[string]map[string]abstract.ICommand // 服务启停的信号 @@ -62,141 +61,149 @@ func NewWebsocketServe(wsInstanceList ...abstract.IWebsocket) error { if len(wsInstanceList) == 0 { return errors.WithStack(errors.New("register websocket server list is empty")) } - lock := easylock.NewLock() - ginRouterTable = make(map[int]*Server) + ginRouterTable = make(map[int]*gin.Engine) + wsServerTable = make(map[int]map[string]*Server) commandTable = make(map[string]map[string]abstract.ICommand) - wg := &sync.WaitGroup{} - wg.Add(len(wsInstanceList)) for _, wsInstance := range wsInstanceList { - go func(wsInstance abstract.IWebsocket) { - defer wg.Done() - // 初始化ws server - _ = lock.Lock() - if _, exist := ginRouterTable[wsInstance.GetServerPort()]; !exist { - wsSetConfigList := wsInstance.GetWSServerConfig() - if nil == wsSetConfigList { - wsSetConfigList = make([]config.SetWSServerConfig, 0) - } - s := &Server{ - ginRouter: gin.Default(), - wsServer: melody.New(), - conf: config.NewWSServerConfig(wsSetConfigList...), - } - if s.conf.LogEnable { - // 开启了日志,初始化日志 - if len(s.conf.LogPath) == 0 { - panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,但是没有配置日志路径") - } - var ( - err error - logConf *logger.RotateLogConfig - ) - optionList := make([]logger.SetLoggerOptionFunc, 0) - if s.conf.LogConsole { - optionList = append(optionList, logger.WithConsoleOutput()) - } - if s.conf.LogConsole { - optionList = append(optionList, logger.WithConsoleOutput()) - } - if len(s.conf.LogFile) == 0 { - s.conf.LogFile = wsInstance.GetModuleFlag() + ".log" - } - if logConf, err = logger.NewRotateLogConfig(s.conf.LogPath, s.conf.LogFile, logger.WithTimeIntervalType(s.conf.LogSplitInterval)); nil != err { - panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error()) - } - if s.loggerInstance, err = logger.NewLogger(s.conf.LogLevel, logConf, optionList...); nil != err { - panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error()) - } - } - if nil == s.loggerInstance && s.conf.LogConsole { - var err error - // 没有配置文件日志, 但是配置了控制台输出 - if s.loggerInstance, err = logger.NewConsoleLogger(s.conf.LogLevel); nil != err { - panic(wsInstance.GetModuleFlag() + " 模块开启了控制台日志记录,日志初始化失败, 失败原因 : " + err.Error()) - } - } - ginRouterTable[wsInstance.GetServerPort()] = s - } - if _, exist := commandTable[wsInstance.GetModuleFlag()]; !exist { - commandTable[wsInstance.GetModuleFlag()] = make(map[string]abstract.ICommand) - } - _ = lock.Unlock() - routerGroup := ginRouterTable[wsInstance.GetServerPort()].ginRouter.Group(wsInstance.GetModuleFlag()) - // 注册路由 - for _, path := range wsInstance.HandshakeURL() { - routerGroup.GET(path, func(ctx *gin.Context) { - parameter := map[string]interface{}{ - "ws_context": context.NewContext(ctx, wsInstance.GetModuleFlag(), nil), - } - _ = ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleRequestWithKeys(ctx.Writer, ctx.Request, parameter) - }) - } - // 注册回调函数 - // 1. 建立连接的函数 - ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleConnect(func(session *melody.Session) { - ctxInterface, _ := session.Get("ws_context") - ctx := ctxInterface.(*context.WSContext) - ctx.Session = session - if err := wsInstance.Connect(ctx); nil == err { - if ginRouterTable[wsInstance.GetServerPort()].conf.StoreConnection && nil != ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager { - ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager.Store(ctx) - } - } - }) - // 2. 指令处理的函数 - ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleMessage(func(session *melody.Session, bytes []byte) { - // TODO : 增加指令回调失败的callback - ctxInterface, _ := session.Get("ws_context") - ctx := ctxInterface.(*context.WSContext) - _ = dispatchCommand(ctx, bytes) - }) - // 3, 关闭连接的处理函数 - ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleClose(func(session *melody.Session, i int, s string) error { - ctxInterface, _ := session.Get("ws_context") - ctx := ctxInterface.(*context.WSContext) - defer func() { - if !ginRouterTable[wsInstance.GetServerPort()].conf.StoreConnection || nil == ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager { - return - } - ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager.Del(ctx) - }() - return wsInstance.Close(ctx, i, s) - }) - // 4. 断开连接的处理函数 - ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleDisconnect(func(session *melody.Session) { - ctxInterface, _ := session.Get("ws_context") - ctx := ctxInterface.(*context.WSContext) - defer func() { - if nil == ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager { - return - } - ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager.Del(ctx) - }() - wsInstance.Disconnect(ctx) - }) - // 注册指令 - for _, cmd := range wsInstance.GetCommandList() { - if nil != ginRouterTable[wsInstance.GetServerPort()].loggerInstance { - ginRouterTable[wsInstance.GetServerPort()].loggerInstance.Debug( - "长连接指令注册成功", - zap.String("module", wsInstance.GetModuleFlag()), - zap.String("command", cmd.GetCommand()), - ) - } - commandTable[wsInstance.GetModuleFlag()][cmd.GetCommand()] = cmd - } - go func() { - if err := ginRouterTable[wsInstance.GetServerPort()].ginRouter.Run(fmt.Sprintf(":%d", wsInstance.GetServerPort())); nil != err { - panic(err) - } - }() - }(wsInstance) + initServer(wsInstance) } - wg.Wait() run() return nil } +// 初始化server +func initServer(wsInstance abstract.IWebsocket) { + // 初始化ws server + // 初始化 gin 路由表 + if _, exist := ginRouterTable[wsInstance.GetServerPort()]; !exist { + ginRouterTable[wsInstance.GetServerPort()] = gin.Default() + } + // 初始化WS-Server表 + if _, exist := wsServerTable[wsInstance.GetServerPort()]; !exist { + wsServerTable[wsInstance.GetServerPort()] = make(map[string]*Server) + } + if _, exist := wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()]; !exist { + // 生成并存储 WS-Server + wsSetConfigList := wsInstance.GetWSServerConfig() + if nil == wsSetConfigList { + wsSetConfigList = make([]config.SetWSServerConfig, 0) + } + s := &Server{ + ginRouter: gin.Default(), + wsServer: melody.New(), + conf: config.NewWSServerConfig(wsSetConfigList...), + } + if s.conf.LogEnable { + // 开启了日志,初始化日志 + if len(s.conf.LogPath) == 0 { + panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,但是没有配置日志路径") + } + var ( + err error + logConf *logger.RotateLogConfig + ) + optionList := make([]logger.SetLoggerOptionFunc, 0) + if s.conf.LogConsole { + optionList = append(optionList, logger.WithConsoleOutput()) + } + if s.conf.LogConsole { + optionList = append(optionList, logger.WithConsoleOutput()) + } + if len(s.conf.LogFile) == 0 { + s.conf.LogFile = wsInstance.GetModuleFlag() + ".log" + } + if logConf, err = logger.NewRotateLogConfig(s.conf.LogPath, s.conf.LogFile, logger.WithTimeIntervalType(s.conf.LogSplitInterval)); nil != err { + panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error()) + } + if s.loggerInstance, err = logger.NewLogger(s.conf.LogLevel, logConf, optionList...); nil != err { + panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error()) + } + } + if nil == s.loggerInstance && s.conf.LogConsole { + var err error + // 没有配置文件日志, 但是配置了控制台输出 + if s.loggerInstance, err = logger.NewConsoleLogger(s.conf.LogLevel); nil != err { + panic(wsInstance.GetModuleFlag() + " 模块开启了控制台日志记录,日志初始化失败, 失败原因 : " + err.Error()) + } + } + wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()] = s + } + + // 初始化指令存储表 + if _, exist := commandTable[wsInstance.GetModuleFlag()]; !exist { + commandTable[wsInstance.GetModuleFlag()] = make(map[string]abstract.ICommand) + } + routerGroup := ginRouterTable[wsInstance.GetServerPort()].Group(wsInstance.GetModuleFlag()) + // 注册路由 + for _, path := range wsInstance.HandshakeURL() { + routerGroup.GET(path, func(ctx *gin.Context) { + parameter := map[string]interface{}{ + "ws_context": context.NewContext(ctx, wsInstance.GetModuleFlag(), nil), + } + _ = wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleRequestWithKeys(ctx.Writer, ctx.Request, parameter) + }) + } + // 注册回调函数 + // 1. 建立连接的函数 + wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleConnect(func(session *melody.Session) { + ctxInterface, _ := session.Get("ws_context") + ctx := ctxInterface.(*context.WSContext) + ctx.Session = session + if err := wsInstance.Connect(ctx); nil == err { + if wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.StoreConnection && nil != wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager { + wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Store(ctx) + } + } + }) + // 2. 指令处理的函数 + wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleMessage(func(session *melody.Session, bytes []byte) { + // TODO : 增加指令回调失败的callback + ctxInterface, _ := session.Get("ws_context") + ctx := ctxInterface.(*context.WSContext) + _ = dispatchCommand(ctx, bytes) + }) + // 3, 关闭连接的处理函数 + wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleClose(func(session *melody.Session, i int, s string) error { + ctxInterface, _ := session.Get("ws_context") + ctx := ctxInterface.(*context.WSContext) + defer func() { + if !wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.StoreConnection || nil == wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager { + return + } + wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Del(ctx, "") + }() + return wsInstance.Close(ctx, i, s) + }) + // 4. 断开连接的处理函数 + wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleDisconnect(func(session *melody.Session) { + ctxInterface, _ := session.Get("ws_context") + ctx := ctxInterface.(*context.WSContext) + defer func() { + if nil == wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager { + return + } + wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Del(ctx, "") + }() + wsInstance.Disconnect(ctx) + }) + // 注册指令 + for _, cmd := range wsInstance.GetCommandList() { + if nil != wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].loggerInstance { + wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].loggerInstance.Debug( + "长连接指令注册成功", + zap.String("module", wsInstance.GetModuleFlag()), + zap.String("command", cmd.GetCommand()), + ) + } + commandTable[wsInstance.GetModuleFlag()][cmd.GetCommand()] = cmd + } + go func() { + if err := ginRouterTable[wsInstance.GetServerPort()].Run(fmt.Sprintf(":%d", wsInstance.GetServerPort())); nil != err { + panic(err) + } + }() +} + // dispatchCommand 调度command ... // // Author : go_developer@163.com<张德满>