vlog/lib.go
2026-02-14 10:02:32 +01:00

228 lines
5.1 KiB
Go

package vlog
import (
// Standard
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/fs"
"log/slog"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"runtime/debug"
"time"
)
type VictoriaHandler struct {
*slog.JSONHandler
Application string
System string
Instance string
URL string
handler slog.Handler
filecounter int
logdir string
logQueue chan []byte
run string
log *slog.Logger
httpClient http.Client
}
type LogData struct {
Level string
Msg string
Args []any
}
func New(w io.Writer, opts slog.HandlerOptions, logdir, url, application, system, instance string) (vl VictoriaHandler) { // {{{
var err error
vl.JSONHandler = slog.NewJSONHandler(w, &opts)
vl.httpClient = http.Client{}
vl.logQueue = make(chan []byte, 8192)
vl.logdir = logdir
vl.handler = slog.NewJSONHandler(w, &opts)
// logdir is used to spool logfiles in case of
err = os.MkdirAll(logdir, 0700)
if err != nil {
panic(err)
}
vl.log = slog.New(slog.NewJSONHandler(w, &opts))
vl.log.Debug("VICTORIALOG", "tempory_storage", vl.logdir)
vl.URL = url
vl.Application = application
vl.System = system
vl.Instance = instance
vl.run = time.Now().Format("060102_150405")
go vl.queueHandler()
go vl.diskHandler()
return
} // }}}
// Handle is overridden to enable central logging.
func (vl VictoriaHandler) Handle(ctx context.Context, record slog.Record) error {// {{{
vl.queue(record)
return vl.handler.Handle(ctx, record)
}// }}}
func (vl *VictoriaHandler) diskHandler() { // {{{
for {
time.Sleep(time.Second * 5)
err := filepath.WalkDir(vl.logdir, func(fname string, d fs.DirEntry, err error) error {
if d.IsDir() {
return nil
}
data, err := os.ReadFile(fname)
if err != nil {
vl.log.Error("VICTORIALOG", "error", err)
return nil
}
err = vl.send(data)
if err != nil {
vl.log.Error("VICTORIALOG", "error", err)
return nil
}
err = os.Remove(fname)
if err != nil {
vl.log.Error("VICTORIALOG", "error", err)
return nil
}
return nil
})
if err != nil {
vl.log.Error("VICTORIALOG", "error", err)
}
}
} // }}}
func (vl *VictoriaHandler) queueHandler() { // {{{
var err error
for {
msg := <-vl.logQueue
err = vl.send(msg)
if err != nil {
vl.logToFile(msg)
}
}
} // }}}
func (vl *VictoriaHandler) queue(rec slog.Record) { // {{{
/*
{
"_msg": "hello world"
"_time": "2026-02-12T09:39:45.374813969Z",
"log": {
"level": "info",
"application": "transcode",
"system": "server01.hum.ding",
"instance": "production",
"run": "260212_123000",
build: "",
trace: "",
},
"<app_attr>": <data>
}
*/
data := make(map[string]any)
dataLog := make(map[string]string)
if rec.Level == slog.LevelDebug || rec.Level == slog.LevelWarn || rec.Level == slog.LevelError {
trace := debug.Stack()
buildInfo, _ := debug.ReadBuildInfo()
buildBytes, _ := json.Marshal(buildInfo)
dataLog["trace"] = string(trace)
dataLog["build"] = string(buildBytes)
}
dataLog["level"] = rec.Level.String()
dataLog["application"] = vl.Application
dataLog["system"] = vl.System
dataLog["instance"] = vl.Instance
dataLog["run"] = vl.run
data["_msg"] = rec.Message
data["_time"] = rec.Time.Format(time.RFC3339Nano)
data["log"] = dataLog
rec.Attrs(func(attr slog.Attr) bool {
data[attr.Key] = attr.Value.Any()
return true
})
// Data is marshalled into its final form to be queued and sent.
j, _ := json.Marshal(data)
select {
case vl.logQueue <- j:
vl.log.Debug("VICTORIALOG", "op", "sent to channel")
default:
vl.logToFile(j)
vl.log.Debug("VICTORIALOG", "op", "sent to file")
}
} // }}}
func (vl *VictoriaHandler) send(data []byte) (err error) { // {{{
request := bytes.NewReader(data)
// URL is composed with path query cleanly built to
// ensure correct encoding of it.
var vlogURL *url.URL
vlogURL, err = url.Parse(vl.URL + "/insert/jsonline")
if err != nil {
vl.log.Error("VICTORIALOG", "error", err)
return
}
values := vlogURL.Query()
values.Add("_time_field", "_time")
values.Add("_stream_fields", "log.application,log.system,log.instance,log.run")
vlogURL.RawQuery = values.Encode()
// Request is sent to server in order to be centrally lagged.
var req *http.Request
req, err = http.NewRequest("POST", vlogURL.String(), request)
if err != nil {
vl.log.Error("VICTORIALOG", "error", err)
return
}
var resp *http.Response
resp, err = vl.httpClient.Do(req)
if err != nil {
vl.log.Error("VICTORIALOG", "error", err)
return
}
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
vl.log.Error("VICTORIALOG", "error", resp.Status, "body", string(body))
return fmt.Errorf("Unsuccesful status code, %d", resp.StatusCode)
}
vl.log.Debug("VICTORIALOG", "op", "sent to server")
return
} // }}}
func (vl *VictoriaHandler) logToFile(data []byte) { // {{{
vl.filecounter++
fname := path.Join(vl.logdir, fmt.Sprintf("%010X.log", vl.filecounter))
err := os.WriteFile(fname, data, 0600)
if err != nil {
vl.log.Error("VICTORIALOG", "error", err)
return
}
vl.log.Error("VICTORIALOG", "op", "sent to disk", "fname", fname)
} // }}}