升级server逻辑, 多模块监听同一个端口,除了共享GIN实例外,其他均按照模块进行隔离

This commit is contained in:
白茶清欢 2021-04-18 00:43:19 +08:00
parent 63be630c68
commit 4177c01939

View File

@ -9,7 +9,6 @@ package websocket
import (
"fmt"
"sync"
"github.com/go-developer/gopkg/logger"
@ -25,8 +24,6 @@ import (
"gopkg.in/olahol/melody.v1"
"github.com/go-developer/gopkg/easylock"
"github.com/gin-gonic/gin"
"github.com/go-developer/websocket/abstract"
"github.com/pkg/errors"
@ -46,7 +43,9 @@ type Server struct {
var (
// ginRouterTable 表
ginRouterTable map[int]*Server
ginRouterTable map[int]*gin.Engine
// wsServerTable 表
wsServerTable map[int]map[string]*Server
// commandTable 指令表
commandTable map[string]map[string]abstract.ICommand
// 服务启停的信号
@ -62,17 +61,29 @@ func NewWebsocketServe(wsInstanceList ...abstract.IWebsocket) error {
if len(wsInstanceList) == 0 {
return errors.WithStack(errors.New("register websocket server list is empty"))
}
lock := easylock.NewLock()
ginRouterTable = make(map[int]*Server)
ginRouterTable = make(map[int]*gin.Engine)
wsServerTable = make(map[int]map[string]*Server)
commandTable = make(map[string]map[string]abstract.ICommand)
wg := &sync.WaitGroup{}
wg.Add(len(wsInstanceList))
for _, wsInstance := range wsInstanceList {
go func(wsInstance abstract.IWebsocket) {
defer wg.Done()
initServer(wsInstance)
}
run()
return nil
}
// 初始化server
func initServer(wsInstance abstract.IWebsocket) {
// 初始化ws server
_ = lock.Lock()
// 初始化 gin 路由表
if _, exist := ginRouterTable[wsInstance.GetServerPort()]; !exist {
ginRouterTable[wsInstance.GetServerPort()] = gin.Default()
}
// 初始化WS-Server表
if _, exist := wsServerTable[wsInstance.GetServerPort()]; !exist {
wsServerTable[wsInstance.GetServerPort()] = make(map[string]*Server)
}
if _, exist := wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()]; !exist {
// 生成并存储 WS-Server
wsSetConfigList := wsInstance.GetWSServerConfig()
if nil == wsSetConfigList {
wsSetConfigList = make([]config.SetWSServerConfig, 0)
@ -115,69 +126,70 @@ func NewWebsocketServe(wsInstanceList ...abstract.IWebsocket) error {
panic(wsInstance.GetModuleFlag() + " 模块开启了控制台日志记录,日志初始化失败, 失败原因 : " + err.Error())
}
}
ginRouterTable[wsInstance.GetServerPort()] = s
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()] = s
}
// 初始化指令存储表
if _, exist := commandTable[wsInstance.GetModuleFlag()]; !exist {
commandTable[wsInstance.GetModuleFlag()] = make(map[string]abstract.ICommand)
}
_ = lock.Unlock()
routerGroup := ginRouterTable[wsInstance.GetServerPort()].ginRouter.Group(wsInstance.GetModuleFlag())
routerGroup := ginRouterTable[wsInstance.GetServerPort()].Group(wsInstance.GetModuleFlag())
// 注册路由
for _, path := range wsInstance.HandshakeURL() {
routerGroup.GET(path, func(ctx *gin.Context) {
parameter := map[string]interface{}{
"ws_context": context.NewContext(ctx, wsInstance.GetModuleFlag(), nil),
}
_ = ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleRequestWithKeys(ctx.Writer, ctx.Request, parameter)
_ = wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleRequestWithKeys(ctx.Writer, ctx.Request, parameter)
})
}
// 注册回调函数
// 1. 建立连接的函数
ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleConnect(func(session *melody.Session) {
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleConnect(func(session *melody.Session) {
ctxInterface, _ := session.Get("ws_context")
ctx := ctxInterface.(*context.WSContext)
ctx.Session = session
if err := wsInstance.Connect(ctx); nil == err {
if ginRouterTable[wsInstance.GetServerPort()].conf.StoreConnection && nil != ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager {
ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager.Store(ctx)
if wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.StoreConnection && nil != wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager {
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Store(ctx)
}
}
})
// 2. 指令处理的函数
ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleMessage(func(session *melody.Session, bytes []byte) {
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleMessage(func(session *melody.Session, bytes []byte) {
// TODO : 增加指令回调失败的callback
ctxInterface, _ := session.Get("ws_context")
ctx := ctxInterface.(*context.WSContext)
_ = dispatchCommand(ctx, bytes)
})
// 3, 关闭连接的处理函数
ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleClose(func(session *melody.Session, i int, s string) error {
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleClose(func(session *melody.Session, i int, s string) error {
ctxInterface, _ := session.Get("ws_context")
ctx := ctxInterface.(*context.WSContext)
defer func() {
if !ginRouterTable[wsInstance.GetServerPort()].conf.StoreConnection || nil == ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager {
if !wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.StoreConnection || nil == wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager {
return
}
ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager.Del(ctx)
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Del(ctx, "")
}()
return wsInstance.Close(ctx, i, s)
})
// 4. 断开连接的处理函数
ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleDisconnect(func(session *melody.Session) {
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleDisconnect(func(session *melody.Session) {
ctxInterface, _ := session.Get("ws_context")
ctx := ctxInterface.(*context.WSContext)
defer func() {
if nil == ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager {
if nil == wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager {
return
}
ginRouterTable[wsInstance.GetServerPort()].conf.ConnectionManager.Del(ctx)
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Del(ctx, "")
}()
wsInstance.Disconnect(ctx)
})
// 注册指令
for _, cmd := range wsInstance.GetCommandList() {
if nil != ginRouterTable[wsInstance.GetServerPort()].loggerInstance {
ginRouterTable[wsInstance.GetServerPort()].loggerInstance.Debug(
if nil != wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].loggerInstance {
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].loggerInstance.Debug(
"长连接指令注册成功",
zap.String("module", wsInstance.GetModuleFlag()),
zap.String("command", cmd.GetCommand()),
@ -186,15 +198,10 @@ func NewWebsocketServe(wsInstanceList ...abstract.IWebsocket) error {
commandTable[wsInstance.GetModuleFlag()][cmd.GetCommand()] = cmd
}
go func() {
if err := ginRouterTable[wsInstance.GetServerPort()].ginRouter.Run(fmt.Sprintf(":%d", wsInstance.GetServerPort())); nil != err {
if err := ginRouterTable[wsInstance.GetServerPort()].Run(fmt.Sprintf(":%d", wsInstance.GetServerPort())); nil != err {
panic(err)
}
}()
}(wsInstance)
}
wg.Wait()
run()
return nil
}
// dispatchCommand 调度command ...