// Package redis ... // // Description : redis 客户端 // // Author : go_developer@163.com<白茶清欢> // // Date : 2021-02-27 4:49 下午 package redis import ( "context" "git.zhangdeman.cn/zhangdeman/consts" "strings" "sync" "time" "git.zhangdeman.cn/zhangdeman/redis/abstract" "git.zhangdeman.cn/zhangdeman/redis/define" wrapperOperate "git.zhangdeman.cn/zhangdeman/wrapper" "github.com/pkg/errors" redisClient "github.com/redis/go-redis/v9" "go.uber.org/zap" ) var ( Client abstract.IRedisClient ) func init() { Client = &OwnClient{ lock: &sync.RWMutex{}, instanceTable: make(map[string]*define.ClientInfo), whiteCommandTable: make(map[string]bool), } } type OwnClient struct { lock *sync.RWMutex instanceTable map[string]*define.ClientInfo whiteCommandTable map[string]bool logger *zap.Logger extraLogFieldList []string } func (o *OwnClient) isAllowCommand(command string) bool { if len(o.whiteCommandTable) == 0 { // 未配置, 视为全部允许执行 return true } if o.whiteCommandTable["*"] { // 配置了 * 视为全部允许执行 return true } command = strings.ToLower(strings.TrimSpace(command)) o.lock.RLock() defer o.lock.RUnlock() return o.whiteCommandTable[command] } // Exec 执行命令 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:05 2024/6/19 func (o *OwnClient) Exec(ctx context.Context, instanceFlag string, command string, args ...any) *define.RedisResult { if nil == ctx { ctx = context.Background() } var ( instance *define.ClientInfo ) cmdParamList := []any{ command, } argStrList := make([]string, 0) for _, itemArg := range args { argStrList = append(argStrList, wrapperOperate.AnyDataType(itemArg).ToString().Value()) cmdParamList = append(cmdParamList, itemArg) } res := &define.RedisResult{ StartTime: time.Now().UnixMilli(), FinishTime: 0, UsedTime: 0, Result: "", Command: command, ArgList: argStrList, Err: nil, InstanceFlag: instanceFlag, } defer func() { res.FinishTime = time.Now().UnixMilli() res.UsedTime = res.FinishTime - res.StartTime if nil == o.logger { // 未注入日志实例 return } logDataList := []zap.Field{ zap.Int64("start_time", res.StartTime), zap.Int64("finish_time", res.FinishTime), zap.Int64("used_time", res.UsedTime), zap.String("command", res.Command), zap.String("arg_list", strings.Join(res.ArgList, " ")), zap.String("execute_result", res.Result), zap.Error(res.Err), } for _, item := range o.extraLogFieldList { logDataList = append(logDataList, zap.Any(item, ctx.Value(item))) } o.logger.Info( "Redis命令执行记录", logDataList..., ) }() if instance, res.Err = o.GetRealClientWithError(instanceFlag); nil != res.Err { return res } if nil == ctx { ctx = context.Background() } if instance.ReadOnly && o.isWriteCommand(command) { // 只读实例, 尝试执行写命令 res.Err = errors.New(instanceFlag + " : instance is read only") return res } if instance.ReadOnly && o.isWriteCommand(command) && instance.MasterClient() == nil { // 写命令, 没有主库连接 res.Err = errors.New(instanceFlag + " : instance master client is nil") return res } redisRealClient := instance.MasterClient() if !o.isWriteCommand(command) { redisRealClient = instance.SlaveClient() } cmdRes := redisRealClient.Do(ctx, cmdParamList...) if res.Err = cmdRes.Err(); nil != res.Err { return res } res.Result = wrapperOperate.AnyDataType(cmdRes.Val()).ToString().Value() return res } // SetCommandWhiteList 设置命令白名单, 空 或者 包含 * 则认为所有命令均允许执行 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:02 2024/6/19 func (o *OwnClient) SetCommandWhiteList(commandList []string) { o.lock.Lock() defer o.lock.Unlock() for _, itemCommand := range commandList { o.whiteCommandTable[strings.ToLower(strings.TrimSpace(itemCommand))] = true } } func (o *OwnClient) GetRealClient(instanceFlag string) *define.ClientInfo { o.lock.RLock() defer o.lock.RUnlock() return o.instanceTable[instanceFlag] } func (o *OwnClient) GetRealClientWithError(instanceFlag string) (*define.ClientInfo, error) { o.lock.RLock() defer o.lock.RUnlock() instance, exist := o.instanceTable[instanceFlag] if !exist { return nil, errors.New(instanceFlag + " : redis instance is not found") } return instance, nil } func (o *OwnClient) AddClient(instanceFlag string, instanceConfig *define.Config) error { if nil == instanceConfig.Master && !instanceConfig.ReadOnly { // 不是只读, 则要求 主库配置 和 从库配置都要存在 return errors.New(instanceFlag + " : master config is nil") } clientInfo := &define.ClientInfo{ ReadOnly: instanceConfig.ReadOnly, Master: nil, Slave: nil, } if nil != instanceConfig.Master { clientInfo.Master = o.newClient(instanceConfig.Master) } if nil != instanceConfig.Slave { clientInfo.Master = o.newClient(instanceConfig.Slave) } o.lock.Lock() defer o.lock.Unlock() o.instanceTable[instanceFlag] = clientInfo return nil } func (o *OwnClient) RemoveClient(instanceFlag string) { o.lock.Lock() defer o.lock.Unlock() delete(o.instanceTable, instanceFlag) } func (o *OwnClient) SetLogger(loggerInstance *zap.Logger, extraLogFieldList []string) { o.logger = loggerInstance o.extraLogFieldList = extraLogFieldList } // isWriteCommand 判断是否写命令 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:22 2024/10/8 func (o *OwnClient) isWriteCommand(command string) bool { return wrapperOperate.ArrayType([]string{ consts.RedisCommandDel, consts.RedisCommandSet, consts.RedisCommandLpush, consts.RedisCommandRpush, consts.RedisCommandMSet, consts.RedisCommandPublish, consts.RedisCommandPsubScribe, }).Has(strings.ToUpper(command)) >= 0 } // newClient 获取客户端连接 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:12 2024/10/8 func (o *OwnClient) newClient(instanceConfig *define.Options) *redisClient.Client { return redisClient.NewClient(&redisClient.Options{ DB: instanceConfig.DB, Addr: instanceConfig.Addr, ClientName: instanceConfig.ClientName, Dialer: nil, OnConnect: func(ctx context.Context, cn *redisClient.Conn) error { return nil }, // Protocol: 0, Username: instanceConfig.Username, Password: instanceConfig.Password, // CredentialsProvider: nil, // CredentialsProviderContext: nil, MaxRetries: instanceConfig.MaxRetries, MinRetryBackoff: time.Duration(instanceConfig.MinRetryBackoff) * time.Millisecond, MaxRetryBackoff: time.Duration(instanceConfig.MaxRetryBackoff) * time.Millisecond, DialTimeout: time.Duration(instanceConfig.DialTimeout) * time.Millisecond, ReadTimeout: time.Duration(instanceConfig.ReadTimeout) * time.Millisecond, WriteTimeout: time.Duration(instanceConfig.WriteTimeout) * time.Millisecond, // ContextTimeoutEnabled: instanceConfig.ContextTimeoutEnabled, PoolFIFO: instanceConfig.PoolFIFO, PoolSize: instanceConfig.PoolSize, PoolTimeout: time.Duration(instanceConfig.PoolTimeout) * time.Millisecond, MinIdleConns: instanceConfig.MinIdleConn, // MaxIdleConns: instanceConfig.MaxIdleConn, // MaxActiveConns: instanceConfig.MaxActiveConn, // ConnMaxIdleTime: time.Duration(instanceConfig.ConnMaxIdleTime) * time.Second, // ConnMaxLifetime: time.Duration(instanceConfig.ConnMaxIdleTime) * time.Second, TLSConfig: nil, // DisableIndentity: instanceConfig.DisableIdentity, // IdentitySuffix: instanceConfig.IdentitySuffix, }) }