244 lines
7.8 KiB
Go
244 lines
7.8 KiB
Go
// Package mesh ...
|
|
//
|
|
// Description : mesh ...
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 2025-03-28 14:11
|
|
package mesh
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"git.zhangdeman.cn/gateway/validate"
|
|
"git.zhangdeman.cn/zhangdeman/consts"
|
|
"git.zhangdeman.cn/zhangdeman/network/httpclient"
|
|
"git.zhangdeman.cn/zhangdeman/network/httpclient/define"
|
|
"git.zhangdeman.cn/zhangdeman/serialize"
|
|
)
|
|
|
|
func Request(req *RequestConfig, receiver any) *Response {
|
|
if nil == req.Ctx {
|
|
req.Ctx = context.Background()
|
|
}
|
|
c := &client{
|
|
resp: &Response{
|
|
IsSuccess: false,
|
|
ErrorCode: "",
|
|
ErrorMessage: "",
|
|
Raw: nil,
|
|
DataMap: nil,
|
|
AliasResultTable: make(map[string]*define.Response),
|
|
Lock: &sync.RWMutex{},
|
|
},
|
|
reqCfg: req,
|
|
receiver: receiver,
|
|
}
|
|
c.resp.AliasResultTable[CommonServiceAlias] = &define.Response{
|
|
Query: c.reqCfg.CommonParam[consts.RequestDataLocationQuery.String()],
|
|
Header: c.reqCfg.CommonParam[consts.RequestDataLocationHeader.String()],
|
|
Cookie: c.reqCfg.CommonParam[consts.RequestDataLocationCookie.String()],
|
|
Data: "",
|
|
Code: "",
|
|
Message: "",
|
|
Body: c.reqCfg.CommonParam[consts.RequestDataLocationBody.String()],
|
|
ExtendData: nil,
|
|
HttpCode: 0,
|
|
HttpCodeStatus: "",
|
|
ResponseDataRule: nil,
|
|
Seq: 0,
|
|
RequestStartTime: 0,
|
|
RequestFinishTime: 0,
|
|
UsedTime: 0,
|
|
RestyResponse: nil,
|
|
IsSuccess: false,
|
|
RequestCount: 0,
|
|
FailInfo: nil,
|
|
CacheInfo: nil,
|
|
}
|
|
return c.Request()
|
|
}
|
|
|
|
type client struct {
|
|
ctx context.Context
|
|
resp *Response
|
|
reqCfg *RequestConfig
|
|
receiver any
|
|
}
|
|
|
|
func (c *client) Request() *Response {
|
|
for _, itemGroup := range c.reqCfg.Group {
|
|
if !c.doRequest(itemGroup) {
|
|
break
|
|
}
|
|
}
|
|
if c.resp.FinalFailure {
|
|
// 请求失败
|
|
return c.resp
|
|
}
|
|
// 请求成功, 构建返回结果
|
|
respByte := serialize.JSON.MarshalForByteIgnoreError(c.resp.AliasResultTable)
|
|
fieldList := make([]validate.StructField, 0)
|
|
for _, item := range c.reqCfg.ResultRule {
|
|
fieldList = append(fieldList, validate.StructField{
|
|
JsonTag: "",
|
|
Type: item.DataType,
|
|
Required: false,
|
|
RuleList: nil,
|
|
DefaultValue: "",
|
|
SourcePath: fmt.Sprintf("%v.%v.%v", item.RequestAlias, item.RequestResultLocation.String(), item.RequestResultPath),
|
|
TargetPath: fmt.Sprintf("%v.%v", item.Location, item.DataPath),
|
|
Errmsg: "",
|
|
})
|
|
}
|
|
resByte, err := validate.Run(respByte, fieldList)
|
|
if nil != err {
|
|
c.resp.ErrorCode = "-500"
|
|
c.resp.ErrorMessage = err.Error()
|
|
return c.resp
|
|
}
|
|
c.resp.Raw = respByte // 原始返回数据
|
|
serialize.JSON.UnmarshalWithNumberIgnoreError(resByte, &c.resp.DataMap) // map结果
|
|
|
|
if nil != c.receiver {
|
|
// 解析到receiver
|
|
if err = serialize.JSON.UnmarshalWithNumber(resByte, c.receiver); nil != err {
|
|
c.resp.ErrorCode = "-500"
|
|
c.resp.ErrorMessage = err.Error()
|
|
return c.resp
|
|
}
|
|
}
|
|
return c.resp
|
|
}
|
|
|
|
// doRequest 返回是否继续
|
|
//
|
|
// Author : go_developer@163.com<白茶清欢>
|
|
//
|
|
// Date : 15:30 2025/3/28
|
|
func (c *client) doRequest(apiList []*RequestConfigGroupItem) bool {
|
|
if len(apiList) == 0 {
|
|
return true
|
|
}
|
|
// 初始化请求配置, 并获取 http client 实例
|
|
httpClientList := make([]*httpclient.HttpClient, 0)
|
|
for _, apiCfg := range apiList {
|
|
var (
|
|
err error
|
|
httpClient *httpclient.HttpClient
|
|
param map[string]map[string]any
|
|
)
|
|
// 初始化一下请求
|
|
c.initApiCfg(apiCfg)
|
|
// 构造生成请求参数
|
|
if param, err = c.buildRequestParams(apiCfg); nil != err {
|
|
// 构造请求参数失败, 直接返回
|
|
c.resp.ErrorCode = "-500"
|
|
c.resp.ErrorMessage = err.Error()
|
|
return false
|
|
}
|
|
apiCfg.RequestCfg.Body = param[strings.ToLower(consts.RequestDataLocationBody.String())] // body
|
|
apiCfg.RequestCfg.Header = param[strings.ToLower(consts.RequestDataLocationHeader.String())] // header
|
|
apiCfg.RequestCfg.Cookie = param[strings.ToLower(consts.RequestDataLocationCookie.String())] // cookie
|
|
apiCfg.RequestCfg.Query = param[strings.ToLower(consts.RequestDataLocationQuery.String())] // query
|
|
if httpClient, err = httpclient.NewHttpClient(apiCfg.RequestCfg, apiCfg.CacheInstance); nil != err {
|
|
// 此处获取客户端实例即发生异常, 忽略一切配置, 直接作为全局失败, 后续也不请求了
|
|
c.resp.ErrorCode = "-500"
|
|
c.resp.ErrorMessage = err.Error()
|
|
return false
|
|
}
|
|
httpClientList = append(httpClientList, httpClient)
|
|
}
|
|
isContinue := true
|
|
wg := &sync.WaitGroup{}
|
|
for idx, apiCfg := range apiList {
|
|
wg.Add(1)
|
|
go func(clientIdx int, apiCfg *RequestConfigGroupItem) {
|
|
defer func() {
|
|
if err := recover(); nil != err {
|
|
|
|
}
|
|
wg.Done()
|
|
}()
|
|
if c.resp.FinalFailure && !apiCfg.FinalFailureAllow {
|
|
// 已经最终失败, 并且最终失败后, 当前接口已经不允许请求, 不在进行请求
|
|
return
|
|
}
|
|
resp := httpClientList[clientIdx].Request()
|
|
c.resp.Lock.Lock()
|
|
defer c.resp.Lock.Unlock()
|
|
if !resp.IsSuccess {
|
|
// 判断是否已经是最终失败
|
|
if apiCfg.FailBehavior.FinalFailure || apiCfg.FailBehavior.Action == FailBehaviorError {
|
|
c.resp.FinalFailure = true
|
|
// 判断是否继续, 只能阻断后续分组请求,无法阻断当前租的请求
|
|
isContinue = FailBehaviorContinue == apiCfg.FailBehavior.Action
|
|
}
|
|
}
|
|
// 记录请求的信息
|
|
c.resp.AliasResultTable[apiCfg.Alias] = resp
|
|
}(idx, apiCfg)
|
|
}
|
|
wg.Wait()
|
|
return isContinue
|
|
}
|
|
|
|
func (c *client) initApiCfg(apiCfg *RequestConfigGroupItem) {
|
|
if apiCfg.FailBehavior == nil {
|
|
apiCfg.FailBehavior = &RequestConfigGroupItemFailBehavior{
|
|
Action: FailBehaviorError, // 默认失败终止请求
|
|
FinalFailure: true, // 默认一旦失败,则争个整个失败
|
|
}
|
|
}
|
|
if apiCfg.FailBehavior.Action == FailBehaviorError {
|
|
// 配置了请求失败则报错, 一定是导致结果最终失败
|
|
apiCfg.FailBehavior.FinalFailure = true
|
|
}
|
|
// 每一个请求有独立的context
|
|
apiCfg.RequestCfg.Ctx = context.WithValue(c.reqCfg.Ctx, "alias", apiCfg.Alias)
|
|
}
|
|
|
|
// buildRequestParams 构建请求参数 location => path: value
|
|
func (c *client) buildRequestParams(apiCfg *RequestConfigGroupItem) (map[string]map[string]any, error) {
|
|
sourceData := map[string]any{}
|
|
for aliasName, itemRes := range c.resp.AliasResultTable {
|
|
sourceData[aliasName] = map[string]any{
|
|
strings.ToLower(consts.ResponseDataLocationBody.String()): itemRes.Body[apiCfg.RequestCfg.DataField],
|
|
strings.ToLower(consts.ResponseDataLocationHeader.String()): itemRes.Header,
|
|
strings.ToLower(consts.ResponseDataLocationCookie.String()): itemRes.Cookie,
|
|
strings.ToLower(consts.RequestDataLocationQuery.String()): itemRes.Query,
|
|
}
|
|
}
|
|
sourceDataByte := serialize.JSON.MarshalForByteIgnoreError(sourceData)
|
|
validateRuleList := make([]validate.StructField, 0)
|
|
for _, itemParam := range apiCfg.ParamRuleList {
|
|
arr := strings.Split(itemParam.Path, ".")
|
|
sourcePath := itemParam.SourceResultPath
|
|
sourcePath = fmt.Sprintf("%v.%v.%v", itemParam.SourceAlias, strings.ToLower(itemParam.SourceResultLocation), itemParam.SourceResultPath)
|
|
targetPath := fmt.Sprintf("%v.%v", strings.ToLower(itemParam.Location.String()), itemParam.Path)
|
|
validateRuleList = append(validateRuleList, validate.StructField{
|
|
JsonTag: arr[len(arr)-1],
|
|
Type: itemParam.Type,
|
|
Required: false,
|
|
RuleList: nil,
|
|
DefaultValue: itemParam.DefaultValue,
|
|
SourcePath: sourcePath,
|
|
TargetPath: targetPath,
|
|
Errmsg: "",
|
|
})
|
|
}
|
|
|
|
buildRes, err := validate.Run(sourceDataByte, validateRuleList)
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
var d map[string]map[string]any
|
|
if err = serialize.JSON.UnmarshalWithNumber(buildRes, &d); nil != err {
|
|
return nil, err
|
|
}
|
|
return d, err
|
|
}
|