212 lines
6.7 KiB
Go

// Package mesh ...
//
// Description : mesh ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-03-28 14:11
package mesh
import (
"context"
"fmt"
"git.zhangdeman.cn/gateway/validate"
"git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/network/httpclient"
"git.zhangdeman.cn/zhangdeman/network/httpclient/define"
"git.zhangdeman.cn/zhangdeman/serialize"
"strings"
"sync"
)
func Request(req *RequestConfig, receiver any) *Response {
if nil == req.Ctx {
req.Ctx = context.Background()
}
c := &client{
resp: &Response{
IsSuccess: false,
ErrorCode: "",
ErrorMessage: "",
Raw: nil,
DataMap: nil,
AliasResultTable: make(map[string]*define.Response),
Lock: &sync.RWMutex{},
},
reqCfg: req,
receiver: receiver,
}
c.resp.AliasResultTable[CommonServiceAlias] = &define.Response{
Query: c.reqCfg.CommonParam[consts.RequestDataLocationQuery.String()],
Header: c.reqCfg.CommonParam[consts.RequestDataLocationHeader.String()],
Cookie: c.reqCfg.CommonParam[consts.RequestDataLocationCookie.String()],
Data: "",
Code: "",
Message: "",
Body: c.reqCfg.CommonParam[consts.RequestDataLocationBody.String()],
ExtendData: nil,
HttpCode: 0,
HttpCodeStatus: "",
ResponseDataRule: nil,
Seq: 0,
RequestStartTime: 0,
RequestFinishTime: 0,
UsedTime: 0,
RestyResponse: nil,
IsSuccess: false,
RequestCount: 0,
FailInfo: nil,
CacheInfo: nil,
}
return c.Request()
}
type client struct {
ctx context.Context
resp *Response
reqCfg *RequestConfig
receiver any
}
func (c *client) Request() *Response {
for _, itemGroup := range c.reqCfg.Group {
if !c.doRequest(itemGroup) {
break
}
}
if c.resp.FinalFailure {
// 请求失败
return c.resp
}
// 请求成功, 构建返回结果
return c.resp
}
// doRequest 返回是否继续
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:30 2025/3/28
func (c *client) doRequest(apiList []*RequestConfigGroupItem) bool {
if len(apiList) == 0 {
return true
}
// 初始化请求配置, 并获取 http client 实例
httpClientList := make([]*httpclient.HttpClient, 0)
for _, apiCfg := range apiList {
var (
err error
httpClient *httpclient.HttpClient
param map[string]map[string]any
)
// 初始化一下请求
c.initApiCfg(apiCfg)
// 构造生成请求参数
if param, err = c.buildRequestParams(apiCfg); nil != err {
// 构造请求参数失败, 直接返回
c.resp.ErrorCode = "-500"
c.resp.ErrorMessage = err.Error()
return false
}
apiCfg.RequestCfg.Body = param[strings.ToLower(consts.RequestDataLocationBody.String())] // body
apiCfg.RequestCfg.Header = param[strings.ToLower(consts.RequestDataLocationHeader.String())] // header
apiCfg.RequestCfg.Cookie = param[strings.ToLower(consts.RequestDataLocationCookie.String())] // cookie
apiCfg.RequestCfg.Query = param[strings.ToLower(consts.RequestDataLocationQuery.String())] // query
if httpClient, err = httpclient.NewHttpClient(apiCfg.RequestCfg, apiCfg.CacheInstance); nil != err {
// 此处获取客户端实例即发生异常, 忽略一切配置, 直接作为全局失败, 后续也不请求了
c.resp.ErrorCode = "-500"
c.resp.ErrorMessage = err.Error()
return false
}
httpClientList = append(httpClientList, httpClient)
}
isContinue := true
wg := &sync.WaitGroup{}
for idx, apiCfg := range apiList {
wg.Add(1)
go func(clientIdx int, apiCfg *RequestConfigGroupItem) {
defer func() {
if err := recover(); nil != err {
}
wg.Done()
}()
if c.resp.FinalFailure && !apiCfg.FinalFailureAllow {
// 已经最终失败, 并且最终失败后, 当前接口已经不允许请求, 不在进行请求
return
}
resp := httpClientList[clientIdx].Request()
c.resp.Lock.Lock()
defer c.resp.Lock.Unlock()
if !resp.IsSuccess {
// 判断是否已经是最终失败
if apiCfg.FailBehavior.FinalFailure || apiCfg.FailBehavior.Action == FailBehaviorError {
c.resp.FinalFailure = true
// 判断是否继续, 只能阻断后续分组请求,无法阻断当前租的请求
isContinue = FailBehaviorContinue == apiCfg.FailBehavior.Action
}
}
// 记录请求的信息
c.resp.AliasResultTable[apiCfg.Alias] = resp
}(idx, apiCfg)
}
wg.Wait()
return isContinue
}
func (c *client) initApiCfg(apiCfg *RequestConfigGroupItem) {
if apiCfg.FailBehavior == nil {
apiCfg.FailBehavior = &RequestConfigGroupItemFailBehavior{
Action: FailBehaviorError, // 默认失败终止请求
FinalFailure: true, // 默认一旦失败,则争个整个失败
}
}
if apiCfg.FailBehavior.Action == FailBehaviorError {
// 配置了请求失败则报错, 一定是导致结果最终失败
apiCfg.FailBehavior.FinalFailure = true
}
// 每一个请求有独立的context
apiCfg.RequestCfg.Ctx = context.WithValue(c.reqCfg.Ctx, "alias", apiCfg.Alias)
}
// buildRequestParams 构建请求参数 location => path: value
func (c *client) buildRequestParams(apiCfg *RequestConfigGroupItem) (map[string]map[string]any, error) {
sourceData := map[string]any{}
for aliasName, itemRes := range c.resp.AliasResultTable {
sourceData[aliasName] = map[string]any{
strings.ToLower(consts.ResponseDataLocationBody.String()): itemRes.Body[apiCfg.RequestCfg.DataField],
strings.ToLower(consts.ResponseDataLocationHeader.String()): itemRes.Header,
strings.ToLower(consts.ResponseDataLocationCookie.String()): itemRes.Cookie,
strings.ToLower(consts.RequestDataLocationQuery.String()): itemRes.Query,
}
}
sourceDataByte := serialize.JSON.MarshalForByteIgnoreError(sourceData)
validateRuleList := make([]validate.StructField, 0)
for _, itemParam := range apiCfg.ParamRuleList {
arr := strings.Split(itemParam.Path, ".")
sourcePath := itemParam.SourceResultPath
sourcePath = fmt.Sprintf("%v.%v.%v", itemParam.SourceAlias, strings.ToLower(itemParam.SourceResultLocation), itemParam.SourceResultPath)
targetPath := fmt.Sprintf("%v.%v", strings.ToLower(itemParam.Location.String()), itemParam.Path)
validateRuleList = append(validateRuleList, validate.StructField{
JsonTag: arr[len(arr)-1],
Type: itemParam.Type,
Required: false,
RuleList: nil,
DefaultValue: itemParam.DefaultValue,
SourcePath: sourcePath,
TargetPath: targetPath,
Errmsg: "",
})
}
buildRes, err := validate.Run(sourceDataByte, validateRuleList)
if nil != err {
return nil, err
}
var d map[string]map[string]any
if err = serialize.JSON.UnmarshalWithNumber(buildRes, &d); nil != err {
return nil, err
}
return d, err
}