websocket/construct.go

250 lines
7.7 KiB
Go

// Package websocket...
//
// Description : websocket...
//
// Author : go_developer@163.com<张德满>
//
// Date : 2021-03-27 6:49 下午
package websocket
import (
"fmt"
"sync"
"github.com/go-developer/gopkg/logger"
"go.uber.org/zap"
"github.com/go-developer/websocket/message"
"github.com/go-developer/websocket/config"
"github.com/tidwall/gjson"
"github.com/go-developer/websocket/storage"
"github.com/go-developer/websocket/context"
"gopkg.in/olahol/melody.v1"
"github.com/go-developer/gopkg/easylock"
"github.com/gin-gonic/gin"
"github.com/go-developer/websocket/abstract"
"github.com/pkg/errors"
)
// Server ...
//
// Author : go_developer@163.com<张德满>
//
// Date : 8:04 下午 2021/3/27
type Server struct {
ginRouter *gin.Engine // GIN引擎
wsServer *melody.Melody // websocket引擎
conf *config.WSServerConfig // 配置
loggerInstance *zap.Logger // 日志实例
}
var (
// ginRouterTable 表
ginRouterTable map[int]*Server
// commandTable 指令表
commandTable map[string]map[string]abstract.ICommand
// 服务启停的信号
sigChan = make(chan int, 0)
)
// NewWebsocketServe 启动websocket服务
//
// Author : go_developer@163.com<张德满>
//
// Date : 6:49 下午 2021/3/27
func NewWebsocketServe(wsInstanceList ...abstract.IWebsocket) error {
if len(wsInstanceList) == 0 {
return errors.WithStack(errors.New("register websocket server list is empty"))
}
lock := easylock.NewLock()
ginRouterTable = make(map[int]*Server)
commandTable = make(map[string]map[string]abstract.ICommand)
wg := &sync.WaitGroup{}
wg.Add(len(wsInstanceList))
for _, wsInstance := range wsInstanceList {
go func(wsInstance abstract.IWebsocket) {
defer wg.Done()
// 初始化ws server
_ = lock.Lock()
if _, exist := ginRouterTable[wsInstance.GetServerPort()]; !exist {
wsSetConfigList := wsInstance.GetWSServerConfig()
if nil == wsSetConfigList {
wsSetConfigList = make([]config.SetWSServerConfig, 0)
}
s := &Server{
ginRouter: gin.Default(),
wsServer: melody.New(),
conf: config.NewWSServerConfig(wsSetConfigList...),
}
if s.conf.LogEnable {
// 开启了日志,初始化日志
if len(s.conf.LogPath) == 0 {
panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,但是没有配置日志路径")
}
var (
err error
logConf *logger.RotateLogConfig
)
optionList := make([]logger.SetLoggerOptionFunc, 0)
if s.conf.LogConsole {
optionList = append(optionList, logger.WithConsoleOutput())
}
if s.conf.LogConsole {
optionList = append(optionList, logger.WithConsoleOutput())
}
if logConf, err = logger.NewRotateLogConfig(s.conf.LogPath, s.conf.LogFile, logger.WithTimeIntervalType(s.conf.LogSplitInterval)); nil != err {
panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error())
}
if s.loggerInstance, err = logger.NewLogger(s.conf.LogLevel, logConf, optionList...); nil != err {
panic(wsInstance.GetModuleFlag() + " 模块开启了日志记录,日志初始化失败, 失败原因 : " + err.Error())
}
}
if nil == s.loggerInstance && s.conf.LogConsole {
var err error
// 没有配置文件日志, 但是配置了控制台输出
if s.loggerInstance, err = logger.NewConsoleLogger(s.conf.LogLevel); nil != err {
panic(wsInstance.GetModuleFlag() + " 模块开启了控制台日志记录,日志初始化失败, 失败原因 : " + err.Error())
}
}
ginRouterTable[wsInstance.GetServerPort()] = s
}
if _, exist := commandTable[wsInstance.GetModuleFlag()]; !exist {
commandTable[wsInstance.GetModuleFlag()] = make(map[string]abstract.ICommand)
}
_ = lock.Unlock()
routerGroup := ginRouterTable[wsInstance.GetServerPort()].ginRouter.Group(wsInstance.GetModuleFlag())
// 注册路由
for _, path := range wsInstance.HandshakeURL() {
routerGroup.GET(path, func(ctx *gin.Context) {
parameter := map[string]interface{}{
"ws_context": context.NewContext(ctx, wsInstance.GetModuleFlag(), nil),
}
_ = ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleRequestWithKeys(ctx.Writer, ctx.Request, parameter)
})
}
// 注册回调函数
// 1. 建立连接的函数
ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleConnect(func(session *melody.Session) {
ctxInterface, _ := session.Get("ws_context")
ctx := ctxInterface.(*context.WSContext)
ctx.Session = session
if err := wsInstance.Connect(ctx); nil == err {
if nil != storage.Connection {
storage.Connection.Store(ctx)
}
}
})
// 2. 指令处理的函数
ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleMessage(func(session *melody.Session, bytes []byte) {
// TODO : 增加指令回调失败的callback
ctxInterface, _ := session.Get("ws_context")
ctx := ctxInterface.(*context.WSContext)
_ = dispatchCommand(ctx, bytes)
})
// 3, 关闭连接的处理函数
ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleClose(func(session *melody.Session, i int, s string) error {
ctxInterface, _ := session.Get("ws_context")
ctx := ctxInterface.(*context.WSContext)
defer func() {
if nil == storage.Connection {
return
}
storage.Connection.Del(ctx)
}()
return wsInstance.Close(ctx, i, s)
})
// 4. 断开连接的处理函数
ginRouterTable[wsInstance.GetServerPort()].wsServer.HandleDisconnect(func(session *melody.Session) {
ctxInterface, _ := session.Get("ws_context")
ctx := ctxInterface.(*context.WSContext)
defer func() {
if nil == storage.Connection {
return
}
storage.Connection.Del(ctx)
}()
wsInstance.Disconnect(ctx)
})
// 注册指令
for _, cmd := range wsInstance.GetCommandList() {
if nil != ginRouterTable[wsInstance.GetServerPort()].loggerInstance {
ginRouterTable[wsInstance.GetServerPort()].loggerInstance.Debug(
"长连接指令注册成功",
zap.String("module", wsInstance.GetModuleFlag()),
zap.String("command", cmd.GetCommand()),
)
}
commandTable[wsInstance.GetModuleFlag()][cmd.GetCommand()] = cmd
}
go func() {
if err := ginRouterTable[wsInstance.GetServerPort()].ginRouter.Run(fmt.Sprintf(":%d", wsInstance.GetServerPort())); nil != err {
panic(err)
}
}()
}(wsInstance)
}
wg.Wait()
run()
return nil
}
// dispatchCommand 调度command ...
//
// Author : go_developer@163.com<张德满>
//
// Date : 3:36 下午 2021/3/28
func dispatchCommand(ctx *context.WSContext, data []byte) error {
if _, exist := commandTable[ctx.Flag]; !exist {
return errors.WithStack(errors.New("未注册【" + ctx.Flag + "】长连接模块"))
}
cmd := gjson.Get(string(data), "command").String()
var (
exist bool
cmdInstance abstract.ICommand
cmdConfig *config.CommandConfig
err error
)
if cmdInstance, exist = commandTable[ctx.Flag][cmd]; !exist {
return errors.WithStack(errors.New("【" + ctx.Flag + "】长连接模块未注册【" + cmd + "】指令"))
}
optionList := cmdInstance.GetConfigOption()
if nil == optionList {
optionList = make([]config.SetCommandConfig, 0)
}
cmdConfig = config.NewCommandConfig(optionList...)
if err = cmdInstance.Execute(ctx, data); nil != err {
if cmdConfig.PushMessageWithError {
_ = message.Response(ctx, map[string]interface{}{
"command": cmd,
"message": err.Error(),
"success": false,
})
}
return err
}
return nil
}
func run() {
<-sigChan
// TODO : 增加后置hook
}
// Stop 停止服务
//
// Author : go_developer@163.com<张德满>
//
// Date : 3:55 下午 2021/3/28
func Stop() {
sigChan <- 1
}