mirror of
https://github.com/fleetdm/fleet
synced 2026-04-21 13:37:30 +00:00
Remove valyala/fastjson and valyala/fasttemplate dependencies (#37914)
<!-- Add the related story/sub-task/bug number, like Resolves #123, or remove if NA --> Context: https://fleetdm.slack.com/archives/C019WG4GH0A/p1767713469571139 Replaced `valyala` dependencies and now relying on `json.Unmarshal` and manual traversal of `Template` subjects, such as [this one](https://github.com/fleetdm/fleet/blob/main/server/logging/nats_test.go#L113) # Checklist for submitter ## Testing - [x] ~~Added/updated automated tests~~ I'm relying on existing tests on `nats_test.go` which already cover using a `Template` subject, namely: https://github.com/fleetdm/fleet/blob/main/server/logging/nats_test.go#L112-L132 https://github.com/fleetdm/fleet/blob/main/server/logging/nats_test.go#L194-L245 https://github.com/fleetdm/fleet/blob/main/server/logging/nats_test.go#L301-L356 - [ ] Where appropriate, [automated tests simulate multiple hosts and test for host isolation](https://github.com/fleetdm/fleet/blob/main/docs/Contributing/reference/patterns-backend.md#unit-testing) (updates to one hosts's records do not affect another) - [x] QA'd all new/changed functionality manually Ran `nats-server`, subscribed to all subjects by running `nats --server=nats://localhost:4222 subscribe ">"` and got logs from this query: <img width="675" height="411" alt="Screenshot 2026-01-06 at 4 12 52 PM" src="https://github.com/user-attachments/assets/e4e6e5d0-53ac-4b09-9810-b6032794d5f3" /> <img width="773" height="165" alt="Screenshot 2026-01-06 at 4 11 16 PM" src="https://github.com/user-attachments/assets/6f58d1f1-272b-40b3-96f5-1659c0bbb918" /> <img width="2541" height="119" alt="Screenshot 2026-01-06 at 4 11 06 PM" src="https://github.com/user-attachments/assets/2e61acac-063c-4cdd-aeee-871031600125" />
This commit is contained in:
parent
18a5f24ad7
commit
116c8ddb4f
3 changed files with 104 additions and 70 deletions
3
go.mod
3
go.mod
|
|
@ -138,8 +138,6 @@ require (
|
|||
github.com/tj/assert v0.0.3
|
||||
github.com/ulikunitz/xz v0.5.15
|
||||
github.com/urfave/cli/v2 v2.27.7
|
||||
github.com/valyala/fastjson v1.6.4
|
||||
github.com/valyala/fasttemplate v1.2.2
|
||||
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8
|
||||
github.com/ziutek/mymysql v1.5.4
|
||||
go.elastic.co/apm/module/apmgorilla/v2 v2.6.2
|
||||
|
|
@ -333,7 +331,6 @@ require (
|
|||
github.com/tklauser/go-sysconf v0.3.12 // indirect
|
||||
github.com/tklauser/numcpus v0.6.1 // indirect
|
||||
github.com/trivago/tgo v1.0.7 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/xanzy/ssh-agent v0.3.3 // indirect
|
||||
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
|
||||
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
|
||||
|
|
|
|||
6
go.sum
6
go.sum
|
|
@ -890,12 +890,6 @@ github.com/ulikunitz/xz v0.5.15 h1:9DNdB5s+SgV3bQ2ApL10xRc35ck0DuIX/isZvIk+ubY=
|
|||
github.com/ulikunitz/xz v0.5.15/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
|
||||
github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU=
|
||||
github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
|
||||
github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
|
||||
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
|
||||
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
|
||||
github.com/vmihailenco/msgpack v3.3.3+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
|
||||
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
|
||||
github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw=
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -21,8 +20,6 @@ import (
|
|||
"github.com/golang/snappy"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/valyala/fastjson"
|
||||
"github.com/valyala/fasttemplate"
|
||||
)
|
||||
|
||||
// natsPublisher sends logs to a NATS server.
|
||||
|
|
@ -353,33 +350,54 @@ func (r *natsConstantRouter) Route(_ json.RawMessage) (string, error) {
|
|||
|
||||
// natsTemplateEnv is the evaluation environment for the template expressions.
|
||||
type natsTemplateEnv struct {
|
||||
Log *fastjson.Value `expr:"log"`
|
||||
Log any `expr:"log"`
|
||||
}
|
||||
|
||||
// natsTemplateGet returns the value of a field from a fastjson value. If it is
|
||||
// natsTemplateGet returns the value of a field from a parsed JSON value. If it is
|
||||
// a terminal type, it returns the string representation. Otherwise, it returns
|
||||
// the fastjson value.
|
||||
func natsTemplateGet(val *fastjson.Value, path string) any {
|
||||
v := val.Get(path)
|
||||
|
||||
switch v.Type() {
|
||||
case fastjson.TypeFalse:
|
||||
return "false"
|
||||
|
||||
case fastjson.TypeNull:
|
||||
// the value for further traversal.
|
||||
func natsTemplateGet(val any, path string) any {
|
||||
// Handle nil values.
|
||||
if val == nil {
|
||||
return "null"
|
||||
|
||||
case fastjson.TypeNumber:
|
||||
return strconv.FormatFloat(v.GetFloat64(), 'f', -1, 64)
|
||||
|
||||
case fastjson.TypeString:
|
||||
return string(v.GetStringBytes())
|
||||
|
||||
case fastjson.TypeTrue:
|
||||
return "true"
|
||||
}
|
||||
|
||||
return v
|
||||
// If val is a map, look up the path key.
|
||||
if m, ok := val.(map[string]any); ok {
|
||||
v, exists := m[path]
|
||||
if !exists {
|
||||
return "null"
|
||||
}
|
||||
return natsTemplateToString(v)
|
||||
}
|
||||
|
||||
// For non-map types, return their string representation.
|
||||
return natsTemplateToString(val)
|
||||
}
|
||||
|
||||
// natsTemplateToString converts a JSON value to its string representation.
|
||||
func natsTemplateToString(val any) any {
|
||||
switch v := val.(type) {
|
||||
case nil:
|
||||
return "null"
|
||||
case bool:
|
||||
if v {
|
||||
return "true"
|
||||
}
|
||||
return "false"
|
||||
case float64:
|
||||
return strconv.FormatFloat(v, 'f', -1, 64)
|
||||
case string:
|
||||
return v
|
||||
case map[string]any:
|
||||
// Return the map for further traversal.
|
||||
return v
|
||||
case []any:
|
||||
// Return the slice for further traversal.
|
||||
return v
|
||||
default:
|
||||
return fmt.Sprintf("%v", v)
|
||||
}
|
||||
}
|
||||
|
||||
// natsTemplatePatcher patches the expression's AST so it uses natsTemplateGet
|
||||
|
|
@ -398,14 +416,14 @@ func (p *natsTemplatePatcher) Visit(node *ast.Node) {
|
|||
|
||||
// natsTemplateRouter uses the log contents to create a subject.
|
||||
type natsTemplateRouter struct {
|
||||
// The JSON parser pool.
|
||||
pp *fastjson.ParserPool
|
||||
|
||||
// The compiled programs for each template tag expression.
|
||||
pr map[string]*vm.Program
|
||||
|
||||
// The template for the subject.
|
||||
tp *fasttemplate.Template
|
||||
// The original subject template string.
|
||||
template string
|
||||
|
||||
// The ordered list of tags found in the template.
|
||||
tags []string
|
||||
}
|
||||
|
||||
// newNatsTemplateRouter creates a new template router.
|
||||
|
|
@ -413,11 +431,11 @@ func newNatsTemplateRouter(sub string) (*natsTemplateRouter, error) {
|
|||
// Initialize the programs map.
|
||||
pr := make(map[string]*vm.Program)
|
||||
|
||||
// Initialize the template.
|
||||
tp := fasttemplate.New(sub,
|
||||
natsSubjectTagStart,
|
||||
natsSubjectTagStop,
|
||||
)
|
||||
// Parse the template to extract tags.
|
||||
tags, err := parseTemplateTags(sub)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Define the expression compiler options.
|
||||
opts := []expr.Option{
|
||||
|
|
@ -428,7 +446,7 @@ func newNatsTemplateRouter(sub string) (*natsTemplateRouter, error) {
|
|||
"get",
|
||||
func(params ...any) (any, error) {
|
||||
return natsTemplateGet(
|
||||
params[0].(*fastjson.Value),
|
||||
params[0],
|
||||
params[1].(string),
|
||||
), nil
|
||||
},
|
||||
|
|
@ -436,49 +454,74 @@ func newNatsTemplateRouter(sub string) (*natsTemplateRouter, error) {
|
|||
),
|
||||
}
|
||||
|
||||
// Execute the template, compiling each tag expression.
|
||||
_, err := tp.ExecuteFuncStringWithErr(func(_ io.Writer, tag string) (int, error) {
|
||||
// Compile each tag expression.
|
||||
for _, tag := range tags {
|
||||
p, err := expr.Compile(tag, opts...)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pr[tag] = p
|
||||
|
||||
return 0, err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &natsTemplateRouter{pp: new(fastjson.ParserPool), pr: pr, tp: tp}, nil
|
||||
return &natsTemplateRouter{pr: pr, template: sub, tags: tags}, nil
|
||||
}
|
||||
|
||||
// parseTemplateTags extracts all tags from a template string.
|
||||
func parseTemplateTags(template string) ([]string, error) {
|
||||
var tags []string
|
||||
s := template
|
||||
|
||||
for {
|
||||
// Find the start of the next tag.
|
||||
startIdx := strings.Index(s, natsSubjectTagStart)
|
||||
if startIdx == -1 {
|
||||
break
|
||||
}
|
||||
|
||||
// Find the end of the tag.
|
||||
remaining := s[startIdx+len(natsSubjectTagStart):]
|
||||
endIdx := strings.Index(remaining, natsSubjectTagStop)
|
||||
if endIdx == -1 {
|
||||
return nil, fmt.Errorf("unclosed template tag in: %s", template)
|
||||
}
|
||||
|
||||
// Extract the tag content.
|
||||
tag := remaining[:endIdx]
|
||||
tags = append(tags, tag)
|
||||
|
||||
// Move past this tag.
|
||||
s = remaining[endIdx+len(natsSubjectTagStop):]
|
||||
}
|
||||
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
// Route returns the subject for a log.
|
||||
func (r *natsTemplateRouter) Route(log json.RawMessage) (string, error) {
|
||||
// Get a JSON parser from the pool, and ensure it is released when done.
|
||||
p := r.pp.Get()
|
||||
defer r.pp.Put(p)
|
||||
|
||||
// Parse the log contents into a fastjson value.
|
||||
v, err := p.ParseBytes(log)
|
||||
if err != nil {
|
||||
// Parse the log contents into a map.
|
||||
var v map[string]any
|
||||
if err := json.Unmarshal(log, &v); err != nil {
|
||||
return "", fmt.Errorf("failed to parse log: %w", err)
|
||||
}
|
||||
|
||||
// Define the function to evaluate the template for a tag.
|
||||
fn := func(w io.Writer, tag string) (int, error) {
|
||||
// Build the result by replacing tags with their evaluated values.
|
||||
result := r.template
|
||||
for _, tag := range r.tags {
|
||||
// Evaluate the tag expression.
|
||||
r, err := expr.Run(r.pr[tag], &natsTemplateEnv{Log: v})
|
||||
val, err := expr.Run(r.pr[tag], &natsTemplateEnv{Log: v})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return "", err
|
||||
}
|
||||
|
||||
// If the returned value is a string, write it.
|
||||
if s, ok := r.(string); ok {
|
||||
return io.WriteString(w, s)
|
||||
// Ensure the result is a string.
|
||||
s, ok := val.(string)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("expected string, got %T: %v", val, val)
|
||||
}
|
||||
|
||||
// A non-string value was returned.
|
||||
return 0, fmt.Errorf("expected string, got %T: %v", r, r)
|
||||
// Replace the tag in the template.
|
||||
result = strings.Replace(result, natsSubjectTagStart+tag+natsSubjectTagStop, s, 1)
|
||||
}
|
||||
|
||||
return r.tp.ExecuteFuncStringWithErr(fn)
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue