// Package mesh ... // // Description : mesh ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2025-03-28 14:11 package mesh import ( "context" "fmt" "strings" "sync" "git.zhangdeman.cn/gateway/validate" "git.zhangdeman.cn/zhangdeman/consts" "git.zhangdeman.cn/zhangdeman/network/httpclient" "git.zhangdeman.cn/zhangdeman/network/httpclient/define" "git.zhangdeman.cn/zhangdeman/serialize" ) func Request(req *RequestConfig, receiver any) *Response { if nil == req.Ctx { req.Ctx = context.Background() } c := &client{ resp: &Response{ IsSuccess: false, ErrorCode: "", ErrorMessage: "", Raw: nil, DataMap: nil, AliasResultTable: make(map[string]*define.Response), Lock: &sync.RWMutex{}, }, reqCfg: req, receiver: receiver, } c.resp.AliasResultTable[CommonServiceAlias] = &define.Response{ Query: c.reqCfg.CommonParam[consts.RequestDataLocationQuery.String()], Header: c.reqCfg.CommonParam[consts.RequestDataLocationHeader.String()], Cookie: c.reqCfg.CommonParam[consts.RequestDataLocationCookie.String()], Data: "", Code: "", Message: "", Body: c.reqCfg.CommonParam[consts.RequestDataLocationBody.String()], ExtendData: nil, HttpCode: 0, HttpCodeStatus: "", ResponseDataRule: nil, Seq: 0, RequestStartTime: 0, RequestFinishTime: 0, UsedTime: 0, RestyResponse: nil, IsSuccess: false, RequestCount: 0, FailInfo: nil, CacheInfo: nil, } return c.Request() } type client struct { ctx context.Context resp *Response reqCfg *RequestConfig receiver any } func (c *client) Request() *Response { for _, itemGroup := range c.reqCfg.Group { if !c.doRequest(itemGroup) { break } } if c.resp.FinalFailure { // 请求失败 return c.resp } // 请求成功, 构建返回结果 respByte := serialize.JSON.MarshalForByteIgnoreError(c.resp.AliasResultTable) fieldList := make([]validate.StructField, 0) for _, item := range c.reqCfg.ResultRule { pathArr := strings.Split(item.DataPath, ".") fieldList = append(fieldList, validate.StructField{ JsonTag: pathArr[len(pathArr)-1], Type: item.DataType, Required: false, RuleList: nil, DefaultValue: "", SourcePath: fmt.Sprintf("%v.%v.%v", item.RequestAlias, item.RequestResultLocation.String(), item.RequestResultPath), TargetPath: fmt.Sprintf("%v.%v", item.Location, item.DataPath), Errmsg: "", }) } validateRes, err := validate.Run(respByte, fieldList) if nil != err { c.resp.ErrorCode = "-500" c.resp.ErrorMessage = err.Error() return c.resp } c.resp.Raw = respByte // 原始返回数据 c.resp.DataMap, _ = validateRes.Map("json") // serialize.JSON.UnmarshalWithNumberIgnoreError(resByte, &c.resp.DataMap) // map结果 if nil != c.receiver { // 解析到receiver if err = validateRes.Transform("json", c.receiver); nil != err { c.resp.ErrorCode = "-500" c.resp.ErrorMessage = err.Error() return c.resp } } c.resp.IsSuccess = true return c.resp } // doRequest 返回是否继续 // // Author : go_developer@163.com<白茶清欢> // // Date : 15:30 2025/3/28 func (c *client) doRequest(apiList []*RequestConfigGroupItem) bool { if len(apiList) == 0 { return true } // 初始化请求配置, 并获取 http client 实例 httpClientList := make([]*httpclient.HttpClient, 0) for _, apiCfg := range apiList { var ( err error httpClient *httpclient.HttpClient param map[string]map[string]any ) // 初始化一下请求 c.initApiCfg(apiCfg) // 构造生成请求参数 if param, err = c.buildRequestParams(apiCfg); nil != err { // 构造请求参数失败, 直接返回 c.resp.ErrorCode = "-500" c.resp.ErrorMessage = err.Error() return false } apiCfg.RequestCfg.Body = param[strings.ToLower(consts.RequestDataLocationBody.String())] // body apiCfg.RequestCfg.Header = param[strings.ToLower(consts.RequestDataLocationHeader.String())] // header apiCfg.RequestCfg.Cookie = param[strings.ToLower(consts.RequestDataLocationCookie.String())] // cookie apiCfg.RequestCfg.Query = param[strings.ToLower(consts.RequestDataLocationQuery.String())] // query if httpClient, err = httpclient.NewHttpClient(apiCfg.RequestCfg, apiCfg.CacheInstance); nil != err { // 此处获取客户端实例即发生异常, 忽略一切配置, 直接作为全局失败, 后续也不请求了 c.resp.ErrorCode = "-500" c.resp.ErrorMessage = err.Error() return false } httpClientList = append(httpClientList, httpClient) } isContinue := true wg := &sync.WaitGroup{} for idx, apiCfg := range apiList { wg.Add(1) go func(clientIdx int, apiCfg *RequestConfigGroupItem) { defer func() { if err := recover(); nil != err { } wg.Done() }() if c.resp.FinalFailure && !apiCfg.FinalFailureAllow { // 已经最终失败, 并且最终失败后, 当前接口已经不允许请求, 不在进行请求 return } resp := httpClientList[clientIdx].Request() c.resp.Lock.Lock() defer c.resp.Lock.Unlock() if !resp.IsSuccess { // 哪些接口请求失败: 注意,接口请求失败不代表整体失败,接口可能针对失败配置规则 continue c.resp.FailApiAlias = append(c.resp.FailApiAlias, apiCfg.Alias) // 判断是否已经是最终失败 if apiCfg.FailBehavior.Action == FailBehaviorError { c.resp.FinalFailure = true c.resp.FailureApiAlias = apiCfg.Alias // 导致最终请求失败的接口 // 判断是否继续, 只能阻断后续分组请求,无法阻断当前租的请求 isContinue = FailBehaviorContinue == apiCfg.FailBehavior.Action } } // 记录请求的信息 c.resp.AliasResultTable[apiCfg.Alias] = resp }(idx, apiCfg) } wg.Wait() return isContinue } func (c *client) initApiCfg(apiCfg *RequestConfigGroupItem) { if apiCfg.FailBehavior == nil { apiCfg.FailBehavior = &RequestConfigGroupItemFailBehavior{ Action: FailBehaviorError, // 默认失败终止请求 } } // 每一个请求有独立的context apiCfg.RequestCfg.Ctx = context.WithValue(c.reqCfg.Ctx, "alias", apiCfg.Alias) } // buildRequestParams 构建请求参数 location => path: value func (c *client) buildRequestParams(apiCfg *RequestConfigGroupItem) (map[string]map[string]any, error) { sourceData := map[string]any{} for aliasName, itemRes := range c.resp.AliasResultTable { sourceData[aliasName] = map[string]any{ strings.ToLower(consts.ResponseDataLocationBody.String()): itemRes.Body[apiCfg.RequestCfg.DataField], strings.ToLower(consts.ResponseDataLocationHeader.String()): itemRes.Header, strings.ToLower(consts.ResponseDataLocationCookie.String()): itemRes.Cookie, strings.ToLower(consts.RequestDataLocationQuery.String()): itemRes.Query, } } sourceDataByte := serialize.JSON.MarshalForByteIgnoreError(sourceData) validateRuleList := make([]validate.StructField, 0) for _, itemParam := range apiCfg.ParamRuleList { arr := strings.Split(itemParam.Path, ".") sourcePath := itemParam.SourceResultPath sourcePath = fmt.Sprintf("%v.%v.%v", itemParam.SourceAlias, strings.ToLower(itemParam.SourceResultLocation), itemParam.SourceResultPath) targetPath := fmt.Sprintf("%v.%v", strings.ToLower(itemParam.Location.String()), itemParam.Path) validateRuleList = append(validateRuleList, validate.StructField{ JsonTag: arr[len(arr)-1], Type: itemParam.Type, Required: false, RuleList: nil, DefaultValue: itemParam.DefaultValue, SourcePath: sourcePath, TargetPath: targetPath, Errmsg: "", }) } buildRes, err := validate.Run(sourceDataByte, validateRuleList) if nil != err { return nil, err } var d map[string]map[string]any if err = buildRes.Transform("json", &d); nil != err { return nil, err } return d, err }