logger/zinc_search.go

141 lines
3.6 KiB
Go

// Package logger ...
//
// Description : 基于zinc实现日志收集
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-04-26 21:24
package logger
import (
"bytes"
"fmt"
"git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/serialize"
"io"
"log"
"net/http"
"strings"
"sync"
"time"
)
func NewZincLogConnect(cfg *ZincConfig) io.Writer {
zlc := &zincLogConnect{
config: cfg,
lock: &sync.RWMutex{},
buffer: nil,
}
// 批量写入强制同步任务
go zlc.flushTask()
return zlc
}
// zincLogConnect zinc日志收集器
type zincLogConnect struct {
config *ZincConfig // zinc配置
lock *sync.RWMutex // 操作锁
buffer []map[string]any // 数据缓冲buffer
}
// getPutLogFullUrl 获取请求地址,使用批量创建
func (zlc *zincLogConnect) getPutLogFullUrl() string {
if zlc.config.CreateType == CreateTypeBatch {
// 批量创建
return fmt.Sprintf("%v/api/_bulkv2", strings.TrimRight(zlc.config.Domain, "/"))
}
// 单条创建
return fmt.Sprintf("%v/api/%v/_doc", strings.TrimRight(zlc.config.Domain, "/"), zlc.config.Index)
}
// Write 日志写入, 要求字节数据必须是个合法的json序列化之后的结果
func (zlc *zincLogConnect) Write(logData []byte) (int, error) {
if zlc.config.CreateType == CreateTypeBatch {
var (
res map[string]any
err error
)
if err = serialize.JSON.UnmarshalWithNumber(logData, &res); nil != err {
return 0, err
}
// 批量
zlc.lock.Lock()
// 数据写入buffer
zlc.buffer = append(zlc.buffer, res)
if len(zlc.buffer) >= zlc.config.BufferSize {
// buffer已满, 数据写入zinc
zlc.flush()
}
zlc.lock.Unlock()
return 0, nil
}
// 单个直接写入
zlc.writeData(logData)
return 0, nil
}
// flush 日志刷入zinc服务(针对批量数据写入的)
func (zlc *zincLogConnect) flush() {
if zlc.config.CreateType != CreateTypeBatch {
return
}
if len(zlc.buffer) == 0 {
return
}
zlc.writeData(serialize.JSON.MarshalForByteIgnoreError(map[string]any{
"index": zlc.config.Index,
"records": zlc.buffer,
}))
// 清空buffer
zlc.buffer = []map[string]any{}
}
// flushTask 批量同步日志, 强制同步的定时任务(针对批量数据写入)
func (zlc *zincLogConnect) flushTask() {
if zlc.config.CreateType != CreateTypeBatch {
return
}
for {
time.Sleep(time.Millisecond * time.Duration(zlc.config.ForceSyncTime))
zlc.lock.Lock()
zlc.flush()
zlc.lock.Unlock()
}
}
// writeData 数据写入zinc
func (zlc *zincLogConnect) writeData(paramData []byte) {
req, _ := http.NewRequest(http.MethodPost, zlc.getPutLogFullUrl(), bytes.NewReader(paramData))
req.Header.Set(consts.HeaderKeyAuthorization.String(), "Basic "+zlc.config.Authorization) // 设置authorization
req.Header.Set(consts.HeaderKeyContentType.String(), consts.MimeTypeJson) // json请求
client := &http.Client{}
if zlc.config.Async {
// 异步请求
go func() {
resp, err := client.Do(req)
if err != nil {
log.Print("异步日志写入zinc失败(请求发送失败): " + err.Error())
return
}
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
// 状态码非2xx
log.Print("异步日志写入zinc失败(zinc服务响应状态码异常): " + resp.Status)
return
}
}()
} else {
// 同步请求
resp, err := client.Do(req)
if err != nil {
log.Print("异步日志写入zinc失败(请求发送失败): " + err.Error())
return
}
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
// 状态码非2xx
log.Print("异步日志写入zinc失败(zinc服务响应状态码异常): " + resp.Status)
return
}
}
}