// Package rpc ... // // Description : rpc ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2022-06-29 15:21 package rpc import ( "bytes" "compress/gzip" "encoding/json" "errors" "fmt" "io" "net/http" "path/filepath" "strings" "sync" "time" "github.com/tidwall/gjson" "git.zhangdeman.cn/zhangdeman/util" "github.com/ddliu/go-httpclient" "github.com/gin-gonic/gin" "go.uber.org/zap" ) var ( // Request 请求实例 Request *request ) // InitRPC 初始化RPC服务 // // Author : go_developer@163.com<白茶清欢> // // Date : 15:23 2022/6/29 func InitRPC(serviceTable map[string]*Service, loggerInstance *zap.Logger) error { if nil == serviceTable { serviceTable = make(map[string]*Service) } for _, item := range serviceTable { if item.ApiTable == nil { item.ApiTable = make(map[string]*Api) } } Request = &request{ logger: loggerInstance, serviceTable: serviceTable, lock: &sync.RWMutex{}, } return nil } // InitRPCFromCfgDir 使用RPC配置文件路径初始化RPC // // Author : go_developer@163.com<白茶清欢> // // Date : 17:41 2022/7/1 func InitRPCFromCfgDir(cfgDir string, logger *zap.Logger) error { serviceTable := make(map[string]*Service) filepathNames, _ := filepath.Glob(filepath.Join(cfgDir, "*")) for i := range filepathNames { var ( serviceInfo Service err error ) if err = util.File.ReadAnyFileContent(filepathNames[i], &serviceInfo); nil != err { return err } serviceTable[serviceInfo.Flag] = &serviceInfo } return InitRPC(serviceTable, logger) } // InitRPCFromCfgFile 从配置文件初始化RPC服务 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:42 2022/7/1 func InitRPCFromCfgFile(cfgFile string, logger *zap.Logger) error { var ( serviceTable map[string]*Service err error ) if err = util.File.ReadAnyFileContent(cfgFile, &serviceTable); nil != err { return err } return InitRPC(serviceTable, logger) } type request struct { logger *zap.Logger serviceTable map[string]*Service lock *sync.RWMutex } // AddService 新增一个服务 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:13 2022/6/29 func (r *request) AddService(serviceInfo *Service) error { if nil == serviceInfo { return errors.New("service info is nil") } if nil == serviceInfo.SuccessCodeList { serviceInfo.SuccessCodeList = []string{"0"} } if nil == serviceInfo.SuccessHttpCodeList { serviceInfo.SuccessHttpCodeList = []int{http.StatusOK} } if len(serviceInfo.CodeField) == 0 { serviceInfo.CodeField = DefaultCodeField } if len(serviceInfo.DataField) == 0 { serviceInfo.DataField = DefaultDataField } if len(serviceInfo.MessageField) == 0 { serviceInfo.MessageField = DefaultMessageField } r.lock.Lock() defer r.lock.Unlock() if _, exist := r.serviceTable[serviceInfo.Flag]; exist { return errors.New(serviceInfo.Flag + " already exist") } r.serviceTable[serviceInfo.Flag] = serviceInfo return nil } // GetServiceInfo ... // // Author : go_developer@163.com<白茶清欢> // // Date : 11:32 2022/6/30 func (r *request) GetServiceInfo(serviceFlag string) (*Service, error) { var ( serviceInfo *Service exist bool ) r.lock.RLock() defer r.lock.RUnlock() if serviceInfo, exist = r.serviceTable[serviceFlag]; !exist { return nil, errors.New(serviceFlag + " -> 服务不存在") } return serviceInfo, nil } // RemoveService 删除一个service // // Author : go_developer@163.com<白茶清欢> // // Date : 11:46 2022/6/30 func (r *request) RemoveService(serviceFlag string) { r.lock.Lock() defer r.lock.Unlock() delete(r.serviceTable, serviceFlag) } // AddServiceApi 向一个service中增加Api // // Author : go_developer@163.com<白茶清欢> // // Date : 11:26 2022/6/30 func (r *request) AddServiceApi(serviceFlag string, apiConfig *Api) error { var ( serviceInfo *Service err error ) if serviceInfo, err = r.GetServiceInfo(serviceFlag); nil != err { return err } if len(apiConfig.SuccessCodeList) == 0 { apiConfig.SuccessCodeList = serviceInfo.SuccessCodeList } if len(apiConfig.SuccessHttpCodeList) == 0 { apiConfig.SuccessHttpCodeList = serviceInfo.SuccessHttpCodeList } if len(apiConfig.CodeField) == 0 { apiConfig.CodeField = serviceInfo.CodeField } if len(apiConfig.DataField) == 0 { apiConfig.DataField = serviceInfo.DataField } if len(apiConfig.MessageField) == 0 { apiConfig.MessageField = serviceInfo.MessageField } r.lock.Lock() defer r.lock.Unlock() if nil == serviceInfo.ApiTable { serviceInfo.ApiTable = make(map[string]*Api) } serviceInfo.ApiTable[apiConfig.Flag] = apiConfig return nil } // GetServiceApi 获取服务api配置 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:53 2022/6/30 func (r *request) GetServiceApi(serviceFlag string, apiFlag string) (*Service, *Api, error) { var ( serviceInfo *Service err error exist bool apiInfo *Api ) if serviceInfo, err = r.GetServiceInfo(serviceFlag); nil != err { return nil, nil, err } r.lock.RLock() defer r.lock.RUnlock() if apiInfo, exist = serviceInfo.ApiTable[apiFlag]; !exist { return nil, nil, errors.New(serviceFlag + " : " + apiFlag + " -> api") } return serviceInfo, apiInfo, nil } // RemoveServiceApi 删除服务下的一个api // // Author : go_developer@163.com<白茶清欢> // // Date : 12:02 2022/6/30 func (r *request) RemoveServiceApi(serviceFlag string, apiFlag string) { var ( serviceInfo *Service err error ) if serviceInfo, _, err = r.GetServiceApi(serviceFlag, apiFlag); nil != err { // 不存在无需处理 return } r.lock.Lock() defer r.lock.Unlock() delete(serviceInfo.ApiTable, apiFlag) } // Get ... // // Author : go_developer@163.com<白茶清欢> // // Date : 14:25 2022/6/30 func (r *request) Get() error { return nil } // Send 统一的发送请求方法 // // Author : go_developer@163.com<白茶清欢> // // Date : 14:24 2022/6/30 func (r *request) Send(ctx *gin.Context, serviceFlag string, apiFlag string, parameter map[string]interface{}, header map[string]string, receiver interface{}) error { var ( serviceConfig *Service apiConfig *Api err error fullURL string client *httpclient.HttpClient body []byte response *httpclient.Response responseBody []byte code, message, data string ) // 日志数据 logDataList := []zap.Field{ zap.String("service_flag", serviceFlag), zap.String("service_api_flag", apiFlag), zap.Any("input_param", parameter), } defer func() { // 记录请求日志 r.loggerRequest(ctx, logDataList...) }() if serviceConfig, apiConfig, err = r.GetServiceApi(serviceFlag, apiFlag); nil != err { logDataList = append(logDataList, zap.Any("read_api_config_fail", err.Error())) return err } logDataList = append(logDataList, zap.Any("api_config", apiConfig)) // 完整的请求地址 fullURL, body = r.getFullURLAndBody(serviceConfig, apiConfig, parameter) logDataList = append(logDataList, zap.String("full_utl", fullURL)) fullHeader := make(map[string]string) for k, v := range apiConfig.Header { fullHeader[k] = v } for k, v := range header { fullHeader[k] = v } // 获取客户端 client = r.GetHttpClient(fullHeader, apiConfig.Timeout) var bodyReader io.Reader if nil != body { logDataList = append(logDataList, zap.String("request_body", string(body))) bodyReader = bytes.NewReader(body) } requestStartTime := time.Now().UnixNano() / 1e6 requestFinishTime := int64(0) logDataList = append(logDataList, zap.Int64("start_time", requestStartTime)) if response, err = client.Do(apiConfig.Method, fullURL, apiConfig.Header, bodyReader); nil != err { requestFinishTime = time.Now().UnixNano() / 1e6 logDataList = append(logDataList, zap.Int64("finish_time", requestFinishTime)) logDataList = append(logDataList, zap.Int64("used_time", requestFinishTime-requestStartTime)) logDataList = append(logDataList, zap.String("request_fail_reason", err.Error())) return err } requestFinishTime = time.Now().UnixNano() / 1e6 logDataList = append(logDataList, zap.Int64("finish_time", requestFinishTime)) logDataList = append(logDataList, zap.Int64("used_time", requestFinishTime-requestStartTime)) logDataList = append(logDataList, zap.Any("response_header", response.Header)) logDataList = append(logDataList, zap.Any("response_http_code", response.StatusCode)) if responseBody, err = r.getResponseBody(response); nil != err { logDataList = append(logDataList, zap.String("read_body_fail_reason", err.Error())) return err } logDataList = append(logDataList, zap.Any("response_body", string(responseBody))) responseFieldCfg := r.getCodeAndMessageAndDataField(serviceConfig, apiConfig) successHttpCodeList, successBusinessCodeList := r.getSuccessHttpCodeAndSuccessBusinessCode(serviceConfig, apiConfig) if !r.httpCodeIsSuccess(response.StatusCode, successHttpCodeList) { return fmt.Errorf("HTTP状态码异常 : %v -> %v", response.StatusCode, response.Status) } // 解析响应的业务数据 code, message, data = r.getCodeAndMessageAndData(responseBody, responseFieldCfg) if !r.codeIsSuccess(code, successBusinessCodeList) { return fmt.Errorf("业务状态码异常 : %v -> %v", code, message) } if nil == receiver { // 数据接收指针为 nil , 则认为状态码为成功既是成功 return nil } if err = parseResponseBody(response.Header.Get("Content-Type"), []byte(data), receiver); nil != err { logDataList = append(logDataList, zap.Any("response_body_parse_fail_reason", err.Error())) } return err } // getResponseBody 获取响应体 // // Author : go_developer@163.com<白茶清欢> // // Date : 3:10 2022/10/4 func (r *request) getResponseBody(response *httpclient.Response) ([]byte, error) { var ( responseBody []byte err error gzipInstance *gzip.Reader buf bytes.Buffer ) if responseBody, err = io.ReadAll(response.Body); nil != err { return nil, err } // 判断返回值是否经过gzip压缩 responseGzip := response.Header.Get("content-encoding") if strings.ToLower(responseGzip) == "gzip" { // 压缩过的数据,在处理response body if gzipInstance, err = gzip.NewReader(bytes.NewReader(responseBody)); nil != err { return nil, err } defer func() { _ = gzipInstance.Close() }() if _, err = io.Copy(&buf, gzipInstance); nil != err { return nil, err } responseBody = buf.Bytes() } // 判断是否为gbk编码,若是,转为utf8 isGBK := strings.Contains(strings.ToLower(response.Header.Get("content-type")), "gbk") if isGBK { return []byte(util.String.Convert(string(responseBody), "gbk", "utf-8")), nil } // 判断是否为gb2312,若是,转为utf8 isGB2312 := strings.Contains(strings.ToLower(response.Header.Get("content-type")), "gb2312") if isGB2312 { return []byte(util.String.Convert(string(responseBody), "gb2312", "utf-8")), nil } return responseBody, nil } // GetHttpClient 获取client实例 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:00 2022/6/30 func (r *request) GetHttpClient(header map[string]string, timeout ApiTimeout) *httpclient.HttpClient { client := httpclient.NewHttpClient() if timeout.Connect <= 0 { timeout.Connect = DefaultConnectTimeout } if timeout.Read <= 0 { timeout.Read = DefaultReadTimeout } client.WithHeaders(header) client.WithOption(httpclient.OPT_CONNECTTIMEOUT_MS, timeout.Connect) client.WithOption(httpclient.OPT_TIMEOUT_MS, timeout.Read) return client } // getFullURL ... // // Author : go_developer@163.com<白茶清欢> // // Date : 17:23 2022/6/30 func (r *request) getFullURLAndBody(serviceConfig *Service, apiConfig *Api, parameter map[string]interface{}) (string, []byte) { fullURL := strings.TrimRight(serviceConfig.Domain, "/") + "/" + strings.TrimLeft(apiConfig.URI, "/") for name, val := range parameter { paramTpl := "{{" + name + "}}" if !strings.Contains(fullURL, paramTpl) { continue } fullURL = strings.ReplaceAll(fullURL, paramTpl, fmt.Sprintf("%v", val)) // 替换到URL的参数, 从body删除 delete(parameter, name) } parameterPair := make([]string, 0) var body []byte switch strings.ToUpper(apiConfig.Method) { case http.MethodGet: fallthrough case http.MethodHead: fallthrough case http.MethodOptions: fallthrough case http.MethodConnect: fallthrough case http.MethodPatch: fallthrough case http.MethodTrace: for name, val := range parameter { var valStr string _ = util.ConvertAssign(&valStr, val) parameterPair = append(parameterPair, fmt.Sprintf("%v=%v", name, valStr)) } case http.MethodPost: fallthrough case http.MethodPut: fallthrough case http.MethodDelete: body, _ = json.Marshal(parameter) } query := strings.Join(parameterPair, "&") if len(query) == 0 { return fullURL, body } return fullURL + "?" + query, body } // validateResponseHttpCode 验证http状态码 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:18 2022/6/30 func (r *request) validateResponseHttpCode(apiConfig *Api, response *httpclient.Response) error { // 判断状态码 isHttpSuccess := false for _, successCode := range apiConfig.SuccessHttpCodeList { if successCode == response.StatusCode { isHttpSuccess = true break } } if !isHttpSuccess { return fmt.Errorf("http响应状态码异常 -> %v", response.StatusCode) } return nil } // getCodeAndMessageAndData 读取业务状态码 + 文案 + 数据 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:20 2022/6/30 func (r *request) getCodeAndMessageAndData(responseBody []byte, responseFieldCfg responseDaFieldConfig) (string, string, string) { var ( code = SpecialFiledVal message = SpecialFiledVal data string ) if responseFieldCfg.Code != SpecialFiledVal { code = gjson.Get(string(responseBody), responseFieldCfg.Code).String() } if responseFieldCfg.Message != SpecialFiledVal { message = gjson.Get(string(responseBody), responseFieldCfg.Message).String() } if responseFieldCfg.Data == SpecialFiledVal { data = string(responseBody) } else { data = gjson.Get(string(responseBody), responseFieldCfg.Data).String() } return code, message, data } // codeIsSuccess 判断业务状态码是否为成功 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:27 2022/6/30 func (r *request) codeIsSuccess(input string, successCodeList []string) bool { if input == SpecialFiledVal { // 不需要验证状态码,直接返回成功 return true } for _, item := range successCodeList { if item == input { return true } } return false } // httpCodeIsSuccess http状态码是否为成功 // // Author : go_developer@163.com<白茶清欢> // // Date : 18:31 2022/6/30 func (r *request) httpCodeIsSuccess(input int, successCodeList []int) bool { for _, item := range successCodeList { if item == input { return true } } return false } // getSuccessHttpCodeAndSuccessBusinessCode 获取成功的http 和 业务状态码列表 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:48 2022/7/1 func (r *request) getSuccessHttpCodeAndSuccessBusinessCode(serviceInfo *Service, apiInfo *Api) ([]int, []string) { var ( successHttpCodeList = apiInfo.SuccessHttpCodeList successBusinessCodeList = apiInfo.SuccessCodeList ) if len(apiInfo.SuccessHttpCodeList) == 0 { successHttpCodeList = serviceInfo.SuccessHttpCodeList } if len(apiInfo.SuccessHttpCodeList) == 0 { successBusinessCodeList = serviceInfo.SuccessCodeList } if len(successHttpCodeList) == 0 { successHttpCodeList = []int{http.StatusOK} } if len(successBusinessCodeList) == 0 { successBusinessCodeList = []string{"0"} } return successHttpCodeList, successBusinessCodeList } // getCodeAndMessageAndDataField 获取三个字段信息 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:54 2022/7/1 func (r *request) getCodeAndMessageAndDataField(serviceInfo *Service, apiInfo *Api) responseDaFieldConfig { cfg := responseDaFieldConfig{ Code: apiInfo.CodeField, Message: apiInfo.MessageField, Data: apiInfo.DataField, } if len(cfg.Code) == 0 { cfg.Code = serviceInfo.CodeField } if len(cfg.Message) == 0 { cfg.Message = serviceInfo.MessageField } if len(cfg.Data) == 0 { cfg.Data = serviceInfo.DataField } return cfg } // loggerRequest 记录请求日志 // // Author : go_developer@163.com<白茶清欢> // // Date : 17:54 2022/7/1 func (r *request) loggerRequest(ctx *gin.Context, dataList ...zap.Field) { if nil == r.logger { // 未设置日志实例 return } r.logger.Info("API接口请求日志记录", dataList...) }