228 lines
5.1 KiB
Go
228 lines
5.1 KiB
Go
package vlog
|
|
|
|
import (
|
|
// Standard
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"runtime/debug"
|
|
"time"
|
|
)
|
|
|
|
type VictoriaHandler struct {
|
|
*slog.JSONHandler
|
|
Application string
|
|
System string
|
|
Instance string
|
|
URL string
|
|
|
|
handler slog.Handler
|
|
filecounter int
|
|
logdir string
|
|
logQueue chan []byte
|
|
run string
|
|
log *slog.Logger
|
|
httpClient http.Client
|
|
}
|
|
|
|
type LogData struct {
|
|
Level string
|
|
Msg string
|
|
Args []any
|
|
}
|
|
|
|
func New(w io.Writer, opts slog.HandlerOptions, logdir, url, application, system, instance string) (vl VictoriaHandler) { // {{{
|
|
var err error
|
|
vl.JSONHandler = slog.NewJSONHandler(w, &opts)
|
|
|
|
vl.httpClient = http.Client{}
|
|
vl.logQueue = make(chan []byte, 8192)
|
|
vl.logdir = logdir
|
|
|
|
vl.handler = slog.NewJSONHandler(w, &opts)
|
|
|
|
// logdir is used to spool logfiles in case of
|
|
err = os.MkdirAll(logdir, 0700)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
vl.log = slog.New(slog.NewJSONHandler(w, &opts))
|
|
vl.log.Debug("VICTORIALOG", "tempory_storage", vl.logdir)
|
|
|
|
vl.URL = url
|
|
vl.Application = application
|
|
vl.System = system
|
|
vl.Instance = instance
|
|
vl.run = time.Now().Format("060102_150405")
|
|
|
|
go vl.queueHandler()
|
|
go vl.diskHandler()
|
|
return
|
|
} // }}}
|
|
|
|
// Handle is overridden to enable central logging.
|
|
func (vl VictoriaHandler) Handle(ctx context.Context, record slog.Record) error {// {{{
|
|
vl.queue(record)
|
|
return vl.handler.Handle(ctx, record)
|
|
}// }}}
|
|
|
|
func (vl *VictoriaHandler) diskHandler() { // {{{
|
|
for {
|
|
time.Sleep(time.Second * 5)
|
|
|
|
err := filepath.WalkDir(vl.logdir, func(fname string, d fs.DirEntry, err error) error {
|
|
if d.IsDir() {
|
|
return nil
|
|
}
|
|
|
|
data, err := os.ReadFile(fname)
|
|
if err != nil {
|
|
vl.log.Error("VICTORIALOG", "error", err)
|
|
return nil
|
|
}
|
|
|
|
err = vl.send(data)
|
|
if err != nil {
|
|
vl.log.Error("VICTORIALOG", "error", err)
|
|
return nil
|
|
}
|
|
|
|
err = os.Remove(fname)
|
|
if err != nil {
|
|
vl.log.Error("VICTORIALOG", "error", err)
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
vl.log.Error("VICTORIALOG", "error", err)
|
|
}
|
|
}
|
|
} // }}}
|
|
func (vl *VictoriaHandler) queueHandler() { // {{{
|
|
var err error
|
|
for {
|
|
msg := <-vl.logQueue
|
|
err = vl.send(msg)
|
|
if err != nil {
|
|
vl.logToFile(msg)
|
|
}
|
|
}
|
|
} // }}}
|
|
|
|
func (vl *VictoriaHandler) queue(rec slog.Record) { // {{{
|
|
/*
|
|
{
|
|
"_msg": "hello world"
|
|
"_time": "2026-02-12T09:39:45.374813969Z",
|
|
"log": {
|
|
"level": "info",
|
|
"application": "transcode",
|
|
"system": "server01.hum.ding",
|
|
"instance": "production",
|
|
"run": "260212_123000",
|
|
build: "",
|
|
trace: "",
|
|
},
|
|
"<app_attr>": <data>
|
|
}
|
|
*/
|
|
data := make(map[string]any)
|
|
dataLog := make(map[string]string)
|
|
|
|
if rec.Level == slog.LevelDebug || rec.Level == slog.LevelWarn || rec.Level == slog.LevelError {
|
|
trace := debug.Stack()
|
|
buildInfo, _ := debug.ReadBuildInfo()
|
|
buildBytes, _ := json.Marshal(buildInfo)
|
|
dataLog["trace"] = string(trace)
|
|
dataLog["build"] = string(buildBytes)
|
|
}
|
|
|
|
dataLog["level"] = rec.Level.String()
|
|
dataLog["application"] = vl.Application
|
|
dataLog["system"] = vl.System
|
|
dataLog["instance"] = vl.Instance
|
|
dataLog["run"] = vl.run
|
|
|
|
data["_msg"] = rec.Message
|
|
data["_time"] = rec.Time.Format(time.RFC3339Nano)
|
|
data["log"] = dataLog
|
|
|
|
rec.Attrs(func(attr slog.Attr) bool {
|
|
data[attr.Key] = attr.Value.Any()
|
|
return true
|
|
})
|
|
|
|
// Data is marshalled into its final form to be queued and sent.
|
|
j, _ := json.Marshal(data)
|
|
|
|
select {
|
|
case vl.logQueue <- j:
|
|
vl.log.Debug("VICTORIALOG", "op", "sent to channel")
|
|
default:
|
|
vl.logToFile(j)
|
|
vl.log.Debug("VICTORIALOG", "op", "sent to file")
|
|
}
|
|
} // }}}
|
|
func (vl *VictoriaHandler) send(data []byte) (err error) { // {{{
|
|
request := bytes.NewReader(data)
|
|
|
|
// URL is composed with path query cleanly built to
|
|
// ensure correct encoding of it.
|
|
var vlogURL *url.URL
|
|
vlogURL, err = url.Parse(vl.URL + "/insert/jsonline")
|
|
if err != nil {
|
|
vl.log.Error("VICTORIALOG", "error", err)
|
|
return
|
|
}
|
|
values := vlogURL.Query()
|
|
values.Add("_time_field", "_time")
|
|
values.Add("_stream_fields", "log.application,log.system,log.instance,log.run")
|
|
vlogURL.RawQuery = values.Encode()
|
|
|
|
// Request is sent to server in order to be centrally lagged.
|
|
var req *http.Request
|
|
req, err = http.NewRequest("POST", vlogURL.String(), request)
|
|
if err != nil {
|
|
vl.log.Error("VICTORIALOG", "error", err)
|
|
return
|
|
}
|
|
|
|
var resp *http.Response
|
|
resp, err = vl.httpClient.Do(req)
|
|
if err != nil {
|
|
vl.log.Error("VICTORIALOG", "error", err)
|
|
return
|
|
}
|
|
|
|
if resp.StatusCode != 200 {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
vl.log.Error("VICTORIALOG", "error", resp.Status, "body", string(body))
|
|
return fmt.Errorf("Unsuccesful status code, %d", resp.StatusCode)
|
|
}
|
|
|
|
vl.log.Debug("VICTORIALOG", "op", "sent to server")
|
|
return
|
|
} // }}}
|
|
func (vl *VictoriaHandler) logToFile(data []byte) { // {{{
|
|
vl.filecounter++
|
|
fname := path.Join(vl.logdir, fmt.Sprintf("%010X.log", vl.filecounter))
|
|
err := os.WriteFile(fname, data, 0600)
|
|
if err != nil {
|
|
vl.log.Error("VICTORIALOG", "error", err)
|
|
return
|
|
}
|
|
vl.log.Error("VICTORIALOG", "op", "sent to disk", "fname", fname)
|
|
} // }}}
|