logger/wrapper/zinc_search.go

108 lines
3.1 KiB
Go

// Package wrapper ...
//
// Description : 基于zinc实现日志收集
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2025-04-26 21:24
package wrapper
import (
"fmt"
"git.zhangdeman.cn/zhangdeman/consts"
"git.zhangdeman.cn/zhangdeman/serialize"
"go.uber.org/zap/zapcore"
"io"
"strings"
"sync"
"time"
)
// ZincConfig zinc服务配置
type ZincConfig struct {
Authorization string `json:"authorization" dc:"授权secret,生成方式base64(user:password)"`
Domain string `json:"domain" dc:"zinc服务域名"`
Timeout int `json:"timeout" dc:"超时时间,单位毫秒,默认5000"`
Async bool `json:"async" dc:"数据异步写入"`
Index string `json:"index" dc:"日志使用的索引"`
LogLevel consts.LogLevel `json:"log_level" dc:"记录的日志等级"`
CreateType string `json:"create_type" dc:"日志同步的类型: single - 单个同步 batch - 批量创建"`
BufferSize int `json:"buffer_size" dc:"批量创建时, 数据缓存buffer大小, 默认1000"`
ForceSyncTime int `json:"force_sync_time" dc:"批量同步日志时,强制同步的时间间隔,buffer没满也会强制同步, 单位: 秒"`
}
const (
DefaultTimeout = 5000 // 默认超时时间
DefaultBufferSize = 1000 // 默认buffer大小
)
const (
CreateTypeSingle = "single" // 逐条日志同步
CreateTypeBatch = "batch" // 批量日志同步
)
func NewZincLogConnect(cfg *ZincConfig) io.Writer {
return &zincLogConnect{
config: cfg,
lock: &sync.RWMutex{},
buffer: nil,
}
}
// zincLogConnect zinc日志收集器
type zincLogConnect struct {
config *ZincConfig // zinc配置
lock *sync.RWMutex // 操作锁
buffer []map[string]any // 数据缓冲buffer
}
// getPutLogFullUrl 获取请求地址,使用批量创建
func (zlc *zincLogConnect) getPutLogFullUrl() string {
if zlc.config.CreateType == CreateTypeBatch {
// 批量创建
return fmt.Sprintf("%v/api/_bulkv2", strings.TrimRight(zlc.config.Domain, "/"))
}
// 单条创建
return fmt.Sprintf("%v/api/%v/_doc", strings.TrimRight(zlc.config.Domain, "/"), zlc.config.Index)
}
// Write 日志写入, 要求字节数据必须是个合法的json序列化之后的结果
func (zlc *zincLogConnect) Write(logData []byte) (int, error) {
var (
res map[string]any
err error
)
if err = serialize.JSON.UnmarshalWithNumber(logData, &res); nil != err {
return 0, err
}
if zlc.config.CreateType == CreateTypeBatch {
// 批量
zlc.lock.Lock()
// 数据蟹醋buffer
zlc.buffer = append(zlc.buffer, res)
if len(zlc.buffer) >= zlc.config.BufferSize {
// buffer已满, 数据写入zinc
zlc.flush()
}
}
// 单个
return 0, nil
}
// flush 日志刷入zinc服务
func (zlc *zincLogConnect) flush() {
// TODO: 数据同步
// 日志刷入完成后, 清空buffer
zlc.buffer = []map[string]any{}
}
// flushTask 批量同步日志, 强制同步的定时任务
func (zlc *zincLogConnect) flushTask() {
for {
time.Sleep(time.Second * time.Duration(zlc.config.ForceSyncTime))
zlc.lock.Lock()
zlc.flush()
zlc.lock.Lock()
}
}