150 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			150 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package logger ...
 | |
| //
 | |
| // Description : 基于zinc实现日志收集
 | |
| //
 | |
| // Author : go_developer@163.com<白茶清欢>
 | |
| //
 | |
| // Date : 2025-04-26 21:24
 | |
| package logger
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"fmt"
 | |
| 	"git.zhangdeman.cn/zhangdeman/consts"
 | |
| 	"git.zhangdeman.cn/zhangdeman/serialize"
 | |
| 	"io"
 | |
| 	"log"
 | |
| 	"net/http"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| func NewZincLogConnect(cfg *ZincConfig) io.Writer {
 | |
| 	if cfg.Timeout <= 0 {
 | |
| 		cfg.Timeout = DefaultTimeout
 | |
| 	}
 | |
| 	if cfg.BufferSize <= 0 {
 | |
| 		cfg.BufferSize = DefaultBufferSize
 | |
| 	}
 | |
| 	if cfg.ForceSyncTime <= 0 {
 | |
| 		cfg.ForceSyncTime = DefaultForceFlushLogTime
 | |
| 	}
 | |
| 	zlc := &zincLogConnect{
 | |
| 		config: cfg,
 | |
| 		lock:   &sync.RWMutex{},
 | |
| 		buffer: nil,
 | |
| 	}
 | |
| 	// 批量写入强制同步任务
 | |
| 	go zlc.flushTask()
 | |
| 	return zlc
 | |
| }
 | |
| 
 | |
| // zincLogConnect zinc日志收集器
 | |
| type zincLogConnect struct {
 | |
| 	config *ZincConfig      // zinc配置
 | |
| 	lock   *sync.RWMutex    // 操作锁
 | |
| 	buffer []map[string]any // 数据缓冲buffer
 | |
| }
 | |
| 
 | |
| // getPutLogFullUrl 获取请求地址,使用批量创建
 | |
| func (zlc *zincLogConnect) getPutLogFullUrl() string {
 | |
| 	if zlc.config.CreateType == CreateTypeBatch {
 | |
| 		// 批量创建
 | |
| 		return fmt.Sprintf("%v/api/_bulkv2", strings.TrimRight(zlc.config.Domain, "/"))
 | |
| 	}
 | |
| 	// 单条创建
 | |
| 	return fmt.Sprintf("%v/api/%v/_doc", strings.TrimRight(zlc.config.Domain, "/"), zlc.config.Index)
 | |
| }
 | |
| 
 | |
| // Write 日志写入, 要求字节数据必须是个合法的json序列化之后的结果
 | |
| func (zlc *zincLogConnect) Write(logData []byte) (int, error) {
 | |
| 	if zlc.config.CreateType == CreateTypeBatch {
 | |
| 		var (
 | |
| 			res map[string]any
 | |
| 			err error
 | |
| 		)
 | |
| 		if err = serialize.JSON.UnmarshalWithNumber(logData, &res); nil != err {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 		// 批量
 | |
| 		zlc.lock.Lock()
 | |
| 		// 数据写入buffer
 | |
| 		zlc.buffer = append(zlc.buffer, res)
 | |
| 		if len(zlc.buffer) >= zlc.config.BufferSize {
 | |
| 			// buffer已满, 数据写入zinc
 | |
| 			zlc.flush()
 | |
| 		}
 | |
| 		zlc.lock.Unlock()
 | |
| 		return 0, nil
 | |
| 	}
 | |
| 	// 单个直接写入
 | |
| 	zlc.writeData(logData)
 | |
| 	return 0, nil
 | |
| }
 | |
| 
 | |
| // flush 日志刷入zinc服务(针对批量数据写入的)
 | |
| func (zlc *zincLogConnect) flush() {
 | |
| 	if zlc.config.CreateType != CreateTypeBatch {
 | |
| 		return
 | |
| 	}
 | |
| 	if len(zlc.buffer) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	zlc.writeData(serialize.JSON.MarshalForByteIgnoreError(map[string]any{
 | |
| 		"index":   zlc.config.Index,
 | |
| 		"records": zlc.buffer,
 | |
| 	}))
 | |
| 	// 清空buffer
 | |
| 	zlc.buffer = []map[string]any{}
 | |
| }
 | |
| 
 | |
| // flushTask 批量同步日志, 强制同步的定时任务(针对批量数据写入)
 | |
| func (zlc *zincLogConnect) flushTask() {
 | |
| 	if zlc.config.CreateType != CreateTypeBatch {
 | |
| 		return
 | |
| 	}
 | |
| 	for {
 | |
| 		time.Sleep(time.Millisecond * time.Duration(zlc.config.ForceSyncTime))
 | |
| 		zlc.lock.Lock()
 | |
| 		zlc.flush()
 | |
| 		zlc.lock.Unlock()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // writeData 数据写入zinc
 | |
| func (zlc *zincLogConnect) writeData(paramData []byte) {
 | |
| 	req, _ := http.NewRequest(http.MethodPost, zlc.getPutLogFullUrl(), bytes.NewReader(paramData))
 | |
| 	req.Header.Set(consts.HeaderKeyAuthorization.String(), "Basic "+zlc.config.Authorization) // 设置authorization
 | |
| 	req.Header.Set(consts.HeaderKeyContentType.String(), consts.MimeTypeJson)                 // json请求
 | |
| 	client := &http.Client{}
 | |
| 
 | |
| 	if zlc.config.Async {
 | |
| 		// 异步请求
 | |
| 		go func() {
 | |
| 			resp, err := client.Do(req)
 | |
| 			if err != nil {
 | |
| 				log.Print("异步日志写入zinc失败(请求发送失败): " + err.Error())
 | |
| 				return
 | |
| 			}
 | |
| 			if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
 | |
| 				// 状态码非2xx
 | |
| 				log.Print("异步日志写入zinc失败(zinc服务响应状态码异常): " + resp.Status)
 | |
| 				return
 | |
| 			}
 | |
| 		}()
 | |
| 	} else {
 | |
| 		// 同步请求
 | |
| 		resp, err := client.Do(req)
 | |
| 		if err != nil {
 | |
| 			log.Print("异步日志写入zinc失败(请求发送失败): " + err.Error())
 | |
| 			return
 | |
| 		}
 | |
| 		if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
 | |
| 			// 状态码非2xx
 | |
| 			log.Print("异步日志写入zinc失败(zinc服务响应状态码异常): " + resp.Status)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 |