401 lines
12 KiB
Go
401 lines
12 KiB
Go
// Package websocket...
|
|
//
|
|
// Description : websocket...
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 2021-03-27 6:49 下午
|
|
package websocket
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/gin-contrib/pprof"
|
|
|
|
"git.zhangdeman.cn/zhangdeman/logger"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"git.zhangdeman.cn/zhangdeman/websocket/message"
|
|
|
|
"git.zhangdeman.cn/zhangdeman/websocket/config"
|
|
|
|
"github.com/tidwall/gjson"
|
|
|
|
"git.zhangdeman.cn/zhangdeman/websocket/context"
|
|
|
|
"gopkg.in/olahol/melody.v1"
|
|
|
|
"git.zhangdeman.cn/zhangdeman/websocket/abstract"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// Server ...
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 8:04 下午 2021/3/27
|
|
type Server struct {
|
|
ginRouter *gin.Engine // GIN引擎
|
|
wsServer *melody.Melody // websocket引擎
|
|
conf *config.WSServerConfig // 配置
|
|
}
|
|
|
|
// GetConfig 获取service实例配置
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 18:36 2024/7/22
|
|
func (s *Server) GetConfig() *config.WSServerConfig {
|
|
return s.conf
|
|
}
|
|
|
|
// GetMelody 获取websocket实例
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 18:37 2024/7/22
|
|
func (s *Server) GetMelody() *melody.Melody {
|
|
return s.wsServer
|
|
}
|
|
|
|
var (
|
|
// ginRouterTable 表
|
|
ginRouterTable map[int]*gin.Engine
|
|
// wsServerTable 表
|
|
wsServerTable map[int]map[string]*Server
|
|
// commandTable 指令表
|
|
commandTable map[string]map[string]abstract.ICommand
|
|
// 日志实例表
|
|
loggerInstanceTable map[string]*zap.Logger
|
|
// 服务启停的信号
|
|
sigChan = make(chan int, 1)
|
|
)
|
|
|
|
// NewWebsocketServeWithGinRouter ...
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 17:41 2024/7/22
|
|
func NewWebsocketServeWithGinRouter(ginPort int, ginRouter *gin.Engine, wsInstanceList ...abstract.IWebsocket) error {
|
|
if len(wsInstanceList) == 0 {
|
|
return errors.WithStack(errors.New("register websocket server list is empty"))
|
|
}
|
|
ginRouterTable = map[int]*gin.Engine{
|
|
ginPort: ginRouter,
|
|
}
|
|
wsServerTable = make(map[int]map[string]*Server)
|
|
commandTable = make(map[string]map[string]abstract.ICommand)
|
|
for _, wsInstance := range wsInstanceList {
|
|
initServer(wsInstance)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewWebsocketServe 启动websocket服务
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 6:49 下午 2021/3/27
|
|
func NewWebsocketServe(finishHook func(), wsInstanceList ...abstract.IWebsocket) error {
|
|
if len(wsInstanceList) == 0 {
|
|
return errors.WithStack(errors.New("register websocket server list is empty"))
|
|
}
|
|
ginRouterTable = make(map[int]*gin.Engine)
|
|
wsServerTable = make(map[int]map[string]*Server)
|
|
commandTable = make(map[string]map[string]abstract.ICommand)
|
|
for _, wsInstance := range wsInstanceList {
|
|
initServer(wsInstance)
|
|
}
|
|
run(finishHook)
|
|
return nil
|
|
}
|
|
|
|
// GetWsServer 获取ws Server
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 18:36 2024/7/22
|
|
func GetWsServer(port int, serverFlag string) (*Server, error) {
|
|
if _, exist := wsServerTable[port]; !exist {
|
|
return nil, errors.New(fmt.Sprintf("%v : port is not listen", port))
|
|
}
|
|
if _, exist := wsServerTable[port][serverFlag]; !exist {
|
|
return nil, errors.New(fmt.Sprintf("server flag %v on port %v is not found", serverFlag, port))
|
|
}
|
|
return wsServerTable[port][serverFlag], nil
|
|
}
|
|
|
|
// 初始化server
|
|
func initServer(wsInstance abstract.IWebsocket) {
|
|
// 初始化ws server
|
|
// 初始化 gin 路由表
|
|
if _, exist := ginRouterTable[wsInstance.GetServerPort()]; !exist {
|
|
ginRouterTable[wsInstance.GetServerPort()] = gin.Default()
|
|
}
|
|
// 初始化WS-Server表
|
|
if _, exist := wsServerTable[wsInstance.GetServerPort()]; !exist {
|
|
wsServerTable[wsInstance.GetServerPort()] = make(map[string]*Server)
|
|
}
|
|
// 初始化日志实例表
|
|
loggerInstanceTable = make(map[string]*zap.Logger)
|
|
if _, exist := wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()]; !exist {
|
|
// 生成并存储 WS-Server
|
|
wsSetConfigList := wsInstance.GetWSServerConfig()
|
|
if nil == wsSetConfigList {
|
|
wsSetConfigList = make([]config.SetWSServerConfig, 0)
|
|
}
|
|
s := &Server{
|
|
ginRouter: gin.Default(),
|
|
wsServer: melody.New(),
|
|
conf: config.NewWSServerConfig(wsSetConfigList...),
|
|
}
|
|
if s.conf.LogEnable {
|
|
// 开启了日志,初始化日志
|
|
if len(s.conf.LogPath) == 0 {
|
|
panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,但是没有配置日志路径")
|
|
}
|
|
var (
|
|
// err error
|
|
// logConf *logger.RotateLogConfig
|
|
)
|
|
optionList := make([]logger.SetLoggerOptionFunc, 0)
|
|
if s.conf.LogConsole {
|
|
optionList = append(optionList, logger.WithConsoleOutput())
|
|
}
|
|
if s.conf.LogConsole {
|
|
optionList = append(optionList, logger.WithConsoleOutput())
|
|
}
|
|
if len(s.conf.LogFile) == 0 {
|
|
s.conf.LogFile = wsInstance.GetModuleFlag() + ".log"
|
|
}
|
|
/*if logConf, err = logger.NewRotateLogConfig(s.conf.LogPath, s.conf.LogFile, logger.WithTimeIntervalType(s.conf.LogSplitInterval)); nil != err {
|
|
panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error())
|
|
}*/
|
|
/*if loggerInstance, err := logger.NewLogger(s.conf.LogLevel, logConf, optionList...); nil != err {
|
|
panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error())
|
|
} else {
|
|
loggerInstanceTable[wsInstance.GetModuleFlag()] = loggerInstance
|
|
}*/
|
|
}
|
|
if nil == loggerInstanceTable[wsInstance.GetModuleFlag()] && s.conf.LogConsole {
|
|
// 没有配置文件日志, 但是配置了控制台输出
|
|
if loggerInstance, err := logger.NewConsoleLogger(s.conf.LogLevel); nil != err {
|
|
panic(wsInstance.GetModuleFlag() + " 模块开启了控制台日志记录,日志初始化失败, 失败原因 : " + err.Error())
|
|
} else {
|
|
loggerInstanceTable[wsInstance.GetModuleFlag()] = loggerInstance
|
|
}
|
|
}
|
|
// 对长连接进行配置
|
|
s.wsServer.Config.MaxMessageSize = s.conf.MaxMessageSize
|
|
s.wsServer.Config.MessageBufferSize = s.conf.MessageBufferSize
|
|
s.wsServer.Config.WriteWait = s.conf.WriteWait
|
|
s.wsServer.Config.PongWait = s.conf.PongWait
|
|
s.wsServer.Config.PingPeriod = s.conf.PingPeriod
|
|
|
|
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()] = s
|
|
}
|
|
|
|
// 初始化指令存储表
|
|
if _, exist := commandTable[wsInstance.GetModuleFlag()]; !exist {
|
|
commandTable[wsInstance.GetModuleFlag()] = make(map[string]abstract.ICommand)
|
|
}
|
|
routerGroup := ginRouterTable[wsInstance.GetServerPort()].Group(wsInstance.GetModuleFlag())
|
|
// 注册路由
|
|
for _, path := range wsInstance.HandshakeURL() {
|
|
routerGroup.GET(path, func(ctx *gin.Context) {
|
|
wsCtx := context.NewContext(ctx, wsInstance.GetModuleFlag(), nil)
|
|
parameter := map[string]interface{}{
|
|
"ws_context": wsCtx,
|
|
}
|
|
if err := wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].wsServer.HandleRequestWithKeys(ctx.Writer, ctx.Request, parameter); nil != err {
|
|
log(
|
|
loggerInstanceTable[wsInstance.GetModuleFlag()],
|
|
logFuncPanic,
|
|
"模块启动,注册路由,绑定数据失败",
|
|
getLoadDataList(wsCtx, zap.Error(err)),
|
|
)
|
|
}
|
|
})
|
|
}
|
|
currentWSServer := getWsServer(wsInstance.GetServerPort(), wsInstance.GetModuleFlag())
|
|
// 注册pprof
|
|
if currentWSServer.conf.EnablePprof {
|
|
pprofGinRouter, exist := ginRouterTable[currentWSServer.conf.PprofPort]
|
|
if !exist {
|
|
pprofGinRouter = gin.Default()
|
|
ginRouterTable[currentWSServer.conf.PprofPort] = pprofGinRouter
|
|
}
|
|
pprof.Register(pprofGinRouter)
|
|
}
|
|
// 注册回调函数
|
|
// 1. 建立连接的函数注册回调函数
|
|
// //
|
|
currentWSServer.wsServer.HandleConnect(func(session *melody.Session) {
|
|
ctxInterface, _ := session.Get("ws_context")
|
|
ctx := ctxInterface.(*context.WSContext)
|
|
ctx.Session = session
|
|
if err := wsInstance.Connect(ctx); nil == err {
|
|
if currentWSServer.conf.StoreConnection && nil != currentWSServer.conf.ConnectionManager {
|
|
currentWSServer.conf.ConnectionManager.Store(ctx)
|
|
}
|
|
}
|
|
})
|
|
// 2. 指令处理的函数
|
|
currentWSServer.wsServer.HandleMessage(func(session *melody.Session, bytes []byte) {
|
|
// TODO : 增加指令回调失败的callback
|
|
ctxInterface, _ := session.Get("ws_context")
|
|
ctx := ctxInterface.(*context.WSContext)
|
|
_ = dispatchCommand(context.CloneContext(ctx), bytes)
|
|
})
|
|
// 3, 关闭连接的处理函数
|
|
currentWSServer.wsServer.HandleClose(func(session *melody.Session, i int, s string) error {
|
|
ctxInterface, _ := session.Get("ws_context")
|
|
ctx := ctxInterface.(*context.WSContext)
|
|
defer func() {
|
|
if !wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.StoreConnection || nil == wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager {
|
|
return
|
|
}
|
|
wsServerTable[wsInstance.GetServerPort()][wsInstance.GetModuleFlag()].conf.ConnectionManager.Del(ctx, "")
|
|
}()
|
|
return wsInstance.Close(ctx, i, s)
|
|
})
|
|
// 4. 断开连接的处理函数
|
|
currentWSServer.wsServer.HandleDisconnect(func(session *melody.Session) {
|
|
ctxInterface, _ := session.Get("ws_context")
|
|
ctx := ctxInterface.(*context.WSContext)
|
|
defer func() {
|
|
if nil == currentWSServer.conf.ConnectionManager {
|
|
return
|
|
}
|
|
currentWSServer.conf.ConnectionManager.Del(ctx, "")
|
|
}()
|
|
wsInstance.Disconnect(ctx)
|
|
})
|
|
// 注册指令
|
|
for _, cmd := range wsInstance.GetCommandList() {
|
|
log(
|
|
getLoggerInstance(wsInstance.GetModuleFlag(), nil),
|
|
logFuncInfo,
|
|
"长连接指令注册成功",
|
|
getLoadDataList(nil,
|
|
zap.String("module", wsInstance.GetModuleFlag()),
|
|
zap.String("command", cmd.GetCommand()),
|
|
),
|
|
)
|
|
commandTable[wsInstance.GetModuleFlag()][cmd.GetCommand()] = cmd
|
|
}
|
|
}
|
|
|
|
// dispatchCommand 调度command ...
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 3:36 下午 2021/3/28
|
|
func dispatchCommand(ctx *context.WSContext, data []byte) error {
|
|
if _, exist := commandTable[ctx.Flag]; !exist {
|
|
log(
|
|
getLoggerInstance(ctx.Flag, nil),
|
|
logFuncFatal,
|
|
"长连接模块不存在",
|
|
getLoadDataList(ctx),
|
|
)
|
|
return errors.WithStack(errors.New("未注册【" + ctx.Flag + "】长连接模块"))
|
|
}
|
|
var (
|
|
exist bool
|
|
cmdInstance abstract.ICommand
|
|
cmdConfig *config.CommandConfig
|
|
err error
|
|
result interface{}
|
|
optionList []config.SetCommandConfig
|
|
)
|
|
cmd := gjson.Get(string(data), "command").String()
|
|
|
|
if cmdInstance, exist = commandTable[ctx.Flag][cmd]; !exist {
|
|
log(
|
|
getLoggerInstance(ctx.Flag, nil),
|
|
logFuncFatal,
|
|
"指令未注册",
|
|
getLoadDataList(ctx),
|
|
)
|
|
return errors.WithStack(errors.New("【" + ctx.Flag + "】长连接模块未注册【" + cmd + "】指令"))
|
|
}
|
|
|
|
if optionList = cmdInstance.GetConfigOption(); nil == optionList {
|
|
optionList = make([]config.SetCommandConfig, 0)
|
|
}
|
|
cmdConfig = config.NewCommandConfig(optionList...)
|
|
log(
|
|
getLoggerInstance(ctx.Flag, &cmdConfig.LogUpData),
|
|
logFuncInfo,
|
|
"上行原始数据记录",
|
|
getLoadDataList(ctx, zap.String("up_data", string(data))),
|
|
)
|
|
|
|
if result, err = cmdInstance.Execute(ctx, data); nil != err {
|
|
if cmdConfig.PushMessageWithError {
|
|
if err := message.Response(ctx, map[string]interface{}{
|
|
"command": cmd,
|
|
"message": err.Error(),
|
|
"success": false,
|
|
}); nil != err {
|
|
log(
|
|
getLoggerInstance(ctx.Flag, nil),
|
|
logFuncWarn,
|
|
"指令执行失败",
|
|
getLoadDataList(ctx, zap.Error(err)),
|
|
)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
if cmdConfig.ResponseData {
|
|
responseData := buildResponseData(ctx, cmd, result)
|
|
if err := ctx.Session.Write(responseData); nil != err {
|
|
log(
|
|
getLoggerInstance(ctx.Flag, nil),
|
|
logFuncWarn,
|
|
"指令响应结果失败",
|
|
getLoadDataList(ctx, zap.Error(err), zap.String("expect_response", string(responseData))),
|
|
)
|
|
return err
|
|
}
|
|
log(
|
|
getLoggerInstance(ctx.Flag, &cmdConfig.LogDownData),
|
|
logFuncInfo,
|
|
"指令响应结果记录",
|
|
getLoadDataList(ctx, zap.String("down_data", string(responseData))),
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// 启动所有端口的监听
|
|
func run(finishHook func()) {
|
|
for port, ginInstance := range ginRouterTable {
|
|
go func(ginInstance *gin.Engine, port int) {
|
|
if err := ginInstance.Run(fmt.Sprintf(":%d", port)); nil != err {
|
|
panic(fmt.Sprintf("%d 启动端口监听失败, 失败原因 : %s", port, err.Error()))
|
|
}
|
|
}(ginInstance, port)
|
|
}
|
|
|
|
<-sigChan
|
|
// 增加后置hook
|
|
if nil != finishHook {
|
|
finishHook()
|
|
}
|
|
}
|
|
|
|
// Stop 停止服务
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 3:55 下午 2021/3/28
|
|
func Stop() {
|
|
sigChan <- 1
|
|
}
|