// Package redis ... // // Description : redis 客户端 // // Author : go_developer@163.com<白茶清欢> // // Date : 2021-02-27 4:49 下午 package redis import ( "encoding/json" "fmt" "path/filepath" "strings" "sync" "time" "git.zhangdeman.cn/zhangdeman/util" "git.zhangdeman.cn/zhangdeman/logger" redisInstance "github.com/go-redis/redis/v8" "go.uber.org/zap" ) var ( // Client 连接实例 Client ClientInterface ) func init() { Client = &OwnClient{ lock: &sync.RWMutex{}, instanceTable: make(map[string]*RealClient), confTable: make(map[string]*FullConfig), parseErrorFunc: defaultParseError, } } // defaultParseError ... // // Author : go_developer@163.com<白茶清欢> // // Date : 10:59 下午 2021/2/27 func defaultParseError(err error) error { if nil == err { return nil } errMsg := err.Error() if errMsg == "nil" || errMsg == "" { return nil } strArr := strings.Split(errMsg, ":") if len(strArr) != 2 { return err } msg := strings.ToLower(strings.TrimSpace(strArr[1])) if msg == "nil" || msg == "" { return nil } return err } // RealClient 包装好的 redis client type RealClient struct { Flag string // redis 标识 Master *redisInstance.Client // redis 实例 Slave *redisInstance.Client // redis 实例 Logger *zap.Logger // 日志实例 } // NewClient 获取redis client实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 5:05 下午 2021/2/27 func NewClient(config map[string]*FullConfig, parseErrorFunc func(err error) error) (ClientInterface, error) { c := &OwnClient{ instanceTable: make(map[string]*RealClient), loggerTable: make(map[string]*zap.Logger), confTable: config, parseErrorFunc: parseErrorFunc, } if nil == c.parseErrorFunc { c.parseErrorFunc = defaultParseError } return c, c.init() } // OwnClient 包装的redis client // // Author : go_developer@163.com<白茶清欢> // // Date : 4:52 下午 2021/2/27 type OwnClient struct { lock *sync.RWMutex // 锁 loggerTable map[string]*zap.Logger // 日志实例 instanceTable map[string]*RealClient // redis 实例 confTable map[string]*FullConfig // redis 配置 parseErrorFunc func(err error) error // 解析err的function,解析执行结果是否为失败,有的场景,执行成功,返回 redis:nil / redis: } // AddClientWithCfgFile 使用具体配置文件初始化 // // Author : go_developer@163.com<白茶清欢> // // Date : 20:57 2022/6/15 func (c *OwnClient) AddClientWithCfgFile(cfgPath string) error { var ( err error ) if err = c.loadConfig(cfgPath); nil != err { return err } return c.init() } // AddClientWithCfgDir 使用配置目录进行初始化 // // Author : go_developer@163.com<白茶清欢> // // Date : 20:58 2022/6/15 func (c *OwnClient) AddClientWithCfgDir(cfgDir string) error { var ( err error ) if err = c.batchLoadConfig(cfgDir); nil != err { return err } return c.init() } // loadConfig 载入配置文件 // // Author : go_developer@163.com<白茶清欢> // // Date : 14:31 2022/6/15 func (c *OwnClient) loadConfig(cfgPath string) error { var ( err error cfg FullConfig ) filePathArr := strings.Split(cfgPath, string(filepath.Separator)) if len(filePathArr) == 0 { return CfgFilePathError() } fileName := strings.ToLower(filePathArr[len(filePathArr)-1]) fileArr := strings.Split(fileName, ".") if len(filePathArr) < 2 { return CfgFileFormatErr("未知") } flag := strings.Trim( strings.Trim( strings.Trim(fileName, ".json"), ".yaml"), ".yml") switch strings.ToLower(fileArr[len(fileArr)-1]) { case "json": if err = util.File.ReadJSONContent(cfgPath, &cfg); nil != err { return err } flag = strings.Join(fileArr[0:len(fileArr)-1], ".") case "yml": fallthrough case "yaml": if err = util.File.ReadYmlContent(cfgPath, &cfg); nil != err { return err } flag = strings.Join(fileArr[0:len(fileArr)-1], ".") default: // 不支持的格式,跳过 } c.lock.Lock() c.confTable[flag] = &cfg c.lock.Unlock() return nil } // batchLoadConfig 批量载入配置 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:44 2022/6/15 func (c *OwnClient) batchLoadConfig(cfgDir string) error { filepathNames, _ := filepath.Glob(filepath.Join(cfgDir, "*")) for i := range filepathNames { if err := c.loadConfig(filepathNames[i]); nil != err { return err } } return nil } // init 初始化redis连接 // // Author : go_developer@163.com<白茶清欢> // // Date : 5:31 下午 2021/2/27 func (c *OwnClient) init() error { var err error for flag, conf := range c.confTable { c.instanceTable[flag] = &RealClient{ Flag: flag, Master: redisInstance.NewClient(Config2Options(conf.Master)), Slave: redisInstance.NewClient(Config2Options(conf.Slave)), } if c.instanceTable[flag].Logger, err = logger.GetLogInstanceFromInputConfig(conf.Logger); nil != err { return InitLoggerErr(flag, err) } } return nil } // initLogger 初始化日志 // // Author : go_developer@163.com<白茶清欢> // // Date : 7:07 下午 2021/2/27 func (c *OwnClient) initLogger(flag string, conf *logger.InputLogConfig) error { var err error c.loggerTable[flag], err = logger.GetLogInstanceFromInputConfig(conf) return LoggerInitFail(flag, err) } // GetRedisClient 获取redis实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 5:16 下午 2021/2/27 func (c *OwnClient) GetRedisClient(flag string) (*RealClient, error) { redisClient, exist := c.instanceTable[flag] if !exist { return nil, FlagNotFound(flag) } return redisClient, nil } // log 记录redis请求日志 // // Author : go_developer@163.com<白茶清欢> // // Date : 8:52 下午 2021/2/27 func (c *OwnClient) log(ctx *Context, realClient *RealClient, isMaster bool, cmdResult redisInstance.Cmder, startTime int64, finishTime int64) { if nil == realClient || nil == realClient.Logger { return } realClient.Logger.Info( "执行redis命令日志记录", zap.Any(ctx.RequestIDField, ctx.RequestID), // 上下文串联的requestID zap.String("exec_command", cmdResult.String()), // 执行的命令 zap.Bool("use_master", isMaster), // 是否使用主节点 zap.Float64("exec_used_time", float64(finishTime-startTime)/1e6), // 耗时,单位: ms zap.Error(cmdResult.Err()), // 异常信息 ) } // CommandProxy 执行命令的代理 // // Author : go_developer@163.com<白茶清欢> // // Date : 9:41 下午 2021/2/27 func (c *OwnClient) CommandProxy(ctx *Context, flag string, cmd string, param ...interface{}) (string, error) { var ( realClient *RealClient err error ) if len(cmd) == 0 { return "", EmptyCmd() } if nil == ctx { ctx = NewContext(flag) } if realClient, err = c.GetRedisClient(ctx.Flag); nil != err { return "", err } flagArr := strings.Split(flag, "#") if len(flagArr) == 1 { flagArr = append(flagArr, "r") } isMater := flagArr[1] == "w" redisClient := realClient.Slave if isMater { redisClient = realClient.Master } redisCmd := append([]interface{}{cmd}, param...) startTime := time.Now().Unix() cmdResult := redisClient.Do(ctx.Ctx, redisCmd...) go c.log(ctx, realClient, isMater, cmdResult, startTime, time.Now().UnixNano()) return fmt.Sprintf("%v", cmdResult.Val()), c.parseErrorFunc(cmdResult.Err()) } // CommandProxyWithReceiver 执行命令,并解析结果 // // Author : go_developer@163.com<白茶清欢> // // Date : 10:00 下午 2021/2/27 func (c *OwnClient) CommandProxyWithReceiver(ctx *Context, flag string, receiver interface{}, cmd string, param ...interface{}) error { if nil == receiver { return ReceiverISNIL() } var ( err error result string ) if result, err = c.CommandProxy(ctx, flag, cmd, param...); nil != err { return err } return ResultConvertFail(json.Unmarshal([]byte(result), receiver)) } // ClientInterface 定义redis client的接口实现,方便单元测试数据mock // // Author : go_developer@163.com<白茶清欢> // // Date : 10:49 下午 2021/2/27 type ClientInterface interface { GetRedisClient(flag string) (*RealClient, error) CommandProxy(ctx *Context, flag string, cmd string, param ...interface{}) (string, error) CommandProxyWithReceiver(ctx *Context, flag string, receiver interface{}, cmd string, param ...interface{}) error }