// Package redis ... // // Description : redis 客户端 // // Author : go_developer@163.com<白茶清欢> // // Date : 2021-02-27 4:49 下午 package redis import ( "context" "strings" "sync" "time" "git.zhangdeman.cn/zhangdeman/redis/abstract" "git.zhangdeman.cn/zhangdeman/redis/define" wrapperOperate "git.zhangdeman.cn/zhangdeman/wrapper" "github.com/pkg/errors" redisClient "github.com/redis/go-redis/v9" "go.uber.org/zap" ) var ( Client abstract.IRedisClient ) func init() { Client = &OwnClient{ lock: &sync.RWMutex{}, instanceTable: make(map[string]*redisClient.ClusterClient), whiteCommandTable: make(map[string]bool), } } type OwnClient struct { lock *sync.RWMutex instanceTable map[string]*redisClient.ClusterClient whiteCommandTable map[string]bool logger *zap.Logger } func (o *OwnClient) isAllowCommand(command string) bool { if len(o.whiteCommandTable) == 0 { // 未配置, 视为全部允许执行 return true } if o.whiteCommandTable["*"] { // 配置了 * 视为全部允许执行 return true } command = strings.ToLower(strings.TrimSpace(command)) o.lock.RLock() defer o.lock.RUnlock() return o.whiteCommandTable[command] } // Exec 执行命令 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:05 2024/6/19 func (o *OwnClient) Exec(ctx context.Context, instanceFlag string, command string, args ...any) *define.RedisResult { var ( instance *redisClient.ClusterClient ) cmdParamList := []any{ command, } argStrList := make([]string, 0) for _, itemArg := range args { argStrList = append(argStrList, wrapperOperate.AnyDataType(itemArg).ToString().Value()) cmdParamList = append(cmdParamList, itemArg) } res := &define.RedisResult{ StartTime: time.Now().UnixMilli(), FinishTime: 0, UsedTime: 0, Result: "", Command: command, ArgList: argStrList, Err: nil, } defer func() { res.FinishTime = time.Now().UnixMilli() res.UsedTime = res.FinishTime - res.StartTime if nil == o.logger { // 未注入日志实例 return } o.logger.Info( "Redis命令执行记录", zap.Int64("start_time", res.StartTime), zap.Int64("finish_time", res.FinishTime), zap.Int64("used_time", res.UsedTime), zap.String("command", res.Command), zap.String("arg_list", strings.Join(res.ArgList, " ")), zap.String("execute_result", res.Result), zap.Error(res.Err), ) }() if instance, res.Err = o.GetRealClientWithError(instanceFlag); nil != res.Err { return res } if nil == ctx { ctx = context.Background() } cmdRes := instance.Do(ctx, cmdParamList...) if res.Err = cmdRes.Err(); nil != res.Err { return res } res.Result = cmdRes.String() return res } // SetCommandWhiteList 设置命令白名单, 空 或者 包含 * 则认为所有命令均允许执行 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:02 2024/6/19 func (o *OwnClient) SetCommandWhiteList(commandList []string) { o.lock.Lock() defer o.lock.Unlock() for _, itemCommand := range commandList { o.whiteCommandTable[strings.ToLower(strings.TrimSpace(itemCommand))] = true } } func (o *OwnClient) GetRealClient(instanceFlag string) *redisClient.ClusterClient { o.lock.RLock() defer o.lock.RUnlock() return o.instanceTable[instanceFlag] } func (o *OwnClient) GetRealClientWithError(instanceFlag string) (*redisClient.ClusterClient, error) { o.lock.RLock() defer o.lock.RUnlock() instance, exist := o.instanceTable[instanceFlag] if !exist { return nil, errors.New(instanceFlag + " : redis instance is not found") } return instance, nil } func (o *OwnClient) AddClient(instanceFlag string, instanceConfig *define.ClusterOptions) error { instance := redisClient.NewClusterClient(&redisClient.ClusterOptions{ Addrs: instanceConfig.Addrs, // ClientName: instanceConfig.ClientName, NewClient: nil, MaxRedirects: instanceConfig.MaxRedirects, ReadOnly: instanceConfig.ReadOnly, RouteByLatency: instanceConfig.RouteByLatency, RouteRandomly: instanceConfig.RouteRandomly, ClusterSlots: nil, Dialer: nil, OnConnect: func(ctx context.Context, cn *redisClient.Conn) error { return nil }, // Protocol: 0, Username: instanceConfig.Username, Password: instanceConfig.Password, // CredentialsProvider: nil, // CredentialsProviderContext: nil, MaxRetries: instanceConfig.MaxRetries, MinRetryBackoff: time.Duration(instanceConfig.MinRetryBackoff) * time.Millisecond, MaxRetryBackoff: time.Duration(instanceConfig.MaxRetryBackoff) * time.Millisecond, DialTimeout: time.Duration(instanceConfig.DialTimeout) * time.Millisecond, ReadTimeout: time.Duration(instanceConfig.ReadTimeout) * time.Millisecond, WriteTimeout: time.Duration(instanceConfig.WriteTimeout) * time.Millisecond, // ContextTimeoutEnabled: instanceConfig.ContextTimeoutEnabled, PoolFIFO: instanceConfig.PoolFIFO, PoolSize: instanceConfig.PoolSize, PoolTimeout: time.Duration(instanceConfig.PoolTimeout) * time.Millisecond, MinIdleConns: instanceConfig.MinIdleConn, // MaxIdleConns: instanceConfig.MaxIdleConn, // MaxActiveConns: instanceConfig.MaxActiveConn, // ConnMaxIdleTime: time.Duration(instanceConfig.ConnMaxIdleTime) * time.Second, // ConnMaxLifetime: time.Duration(instanceConfig.ConnMaxIdleTime) * time.Second, TLSConfig: nil, // DisableIndentity: instanceConfig.DisableIdentity, // IdentitySuffix: instanceConfig.IdentitySuffix, }) o.lock.Lock() defer o.lock.Unlock() o.instanceTable[instanceFlag] = instance return nil } func (o *OwnClient) RemoveClient(instanceFlag string) { o.lock.Lock() defer o.lock.Unlock() delete(o.instanceTable, instanceFlag) } func (o *OwnClient) SetLogger(loggerInstance *zap.Logger) { o.logger = loggerInstance }