From 6ea8f49f22f707e760cbbcd9f21e1c3d9fa3ba7b Mon Sep 17 00:00:00 2001 From: Lucas Manuel Rodriguez Date: Mon, 19 Feb 2024 11:33:19 -0300 Subject: [PATCH] Replace fasthttp with Go's stdlib (reduces memory footprint) (#16922) #16423 - To reduce memory footprint I'm replacing fasthttp with Go's stdlib http package: - To reduce CPU usage while Fleet is down am adding a connection check before sending logs (otherwise osquery-perf spends a lot of CPU marshaling JSON). With fasthttp (memory start growing when I bring Fleet down, because it starts to buffer logs): ![output_fasthttp_cut](https://github.com/fleetdm/fleet/assets/2073526/05161817-581c-4518-9751-7c014160e139) With golang (memory start growing when I bring Fleet down, because it starts to buffer logs): ![output_go_cut](https://github.com/fleetdm/fleet/assets/2073526/abdf899b-a812-478b-b638-28336513456a) --- cmd/osquery-perf/agent.go | 187 ++++++++++++++++---------------------- go.mod | 5 +- go.sum | 10 -- 3 files changed, 81 insertions(+), 121 deletions(-) diff --git a/cmd/osquery-perf/agent.go b/cmd/osquery-perf/agent.go index dbb87751ba..3321fc6baa 100644 --- a/cmd/osquery-perf/agent.go +++ b/cmd/osquery-perf/agent.go @@ -14,6 +14,7 @@ import ( "io" "log" "math/rand" + "net" "net/http" _ "net/http/pprof" "os" @@ -30,7 +31,6 @@ import ( "github.com/fleetdm/fleet/v4/server/ptr" "github.com/fleetdm/fleet/v4/server/service" "github.com/google/uuid" - "github.com/valyala/fasthttp" ) var ( @@ -334,7 +334,6 @@ type agent struct { liveQueryNoResultsProb float64 strings map[string]string serverAddress string - fastClient fasthttp.Client stats *Stats nodeKeyManager *nodeKeyManager nodeKey string @@ -420,10 +419,6 @@ func newAgent( if rand.Float64() <= orbitProb { deviceAuthToken = ptr.String(uuid.NewString()) } - // #nosec (osquery-perf is only used for testing) - tlsConfig := &tls.Config{ - InsecureSkipVerify: true, - } serialNumber := mdmtest.RandSerialNumber() if rand.Float64() <= emptySerialProb { serialNumber = "" @@ -451,12 +446,9 @@ func newAgent( munkiIssueCount: munkiIssueCount, liveQueryFailProb: liveQueryFailProb, liveQueryNoResultsProb: liveQueryNoResultsProb, - fastClient: fasthttp.Client{ - TLSConfig: tlsConfig, - }, - templates: templates, - deviceAuthToken: deviceAuthToken, - os: strings.TrimRight(templates.Name(), ".tmpl"), + templates: templates, + deviceAuthToken: deviceAuthToken, + os: strings.TrimRight(templates.Name(), ".tmpl"), EnrollSecret: enrollSecret, ConfigInterval: configInterval, @@ -840,18 +832,19 @@ func (a *agent) execScripts(execIDs []string, orbitClient *service.OrbitClient) } } -func (a *agent) waitingDo(req *fasthttp.Request, res *fasthttp.Response) { - err := a.fastClient.Do(req, res) - for err != nil || res.StatusCode() != http.StatusOK { +func (a *agent) waitingDo(request *http.Request) *http.Response { + response, err := http.DefaultClient.Do(request) + for err != nil || response.StatusCode != http.StatusOK { if err != nil { log.Printf("failed to run request: %s", err) } else { // res.StatusCode() != http.StatusOK - log.Printf("request failed: %d", res.StatusCode()) + log.Printf("request failed: %d", response.StatusCode) } a.stats.IncrementErrors(1) <-time.Tick(time.Duration(rand.Intn(120)+1) * time.Second) - err = a.fastClient.Do(req, res) + response, err = http.DefaultClient.Do(request) } + return response } // TODO: add support to `alreadyEnrolled` akin to the `enroll` function. for @@ -863,32 +856,23 @@ func (a *agent) orbitEnroll() error { HardwareUUID: a.UUID, HardwareSerial: a.SerialNumber, } - jsonBytes, err := json.Marshal(params) if err != nil { log.Println("orbit json marshall:", err) return err } - req := fasthttp.AcquireRequest() - req.SetBody(jsonBytes) - req.Header.SetMethod("POST") - req.Header.SetContentType("application/json") - req.Header.SetRequestURI(a.serverAddress + "/api/fleet/orbit/enroll") - resp := fasthttp.AcquireResponse() - - a.waitingDo(req, resp) - - fasthttp.ReleaseRequest(req) - defer fasthttp.ReleaseResponse(resp) - - if resp.StatusCode() != http.StatusOK { - log.Println("orbit enroll status:", resp.StatusCode()) - return fmt.Errorf("status code: %d", resp.StatusCode()) + request, err := http.NewRequest("POST", a.serverAddress+"/api/fleet/orbit/enroll", bytes.NewReader(jsonBytes)) + if err != nil { + return err } + request.Header.Add("Content-type", "application/json") + + response := a.waitingDo(request) + defer response.Body.Close() var parsedResp service.EnrollOrbitResponse - if err := json.Unmarshal(resp.Body(), &parsedResp); err != nil { + if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil { log.Println("orbit json parse:", err) return err } @@ -915,26 +899,22 @@ func (a *agent) enroll(i int, onlyAlreadyEnrolled bool) error { return err } - req := fasthttp.AcquireRequest() - req.SetBody(body.Bytes()) - req.Header.SetMethod("POST") - req.Header.SetContentType("application/json") - req.Header.Add("User-Agent", "osquery/4.6.0") - req.SetRequestURI(a.serverAddress + "/api/osquery/enroll") - res := fasthttp.AcquireResponse() + request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/enroll", &body) + if err != nil { + return err + } + request.Header.Add("Content-type", "application/json") - a.waitingDo(req, res) + response := a.waitingDo(request) + defer response.Body.Close() - fasthttp.ReleaseRequest(req) - defer fasthttp.ReleaseResponse(res) - - if res.StatusCode() != http.StatusOK { - log.Println("enroll status:", res.StatusCode()) - return fmt.Errorf("status code: %d", res.StatusCode()) + if response.StatusCode != http.StatusOK { + log.Println("enroll status:", response.StatusCode) + return fmt.Errorf("status code: %d", response.StatusCode) } var parsedResp enrollResponse - if err := json.Unmarshal(res.Body(), &parsedResp); err != nil { + if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil { log.Println("json parse:", err) return err } @@ -948,28 +928,21 @@ func (a *agent) enroll(i int, onlyAlreadyEnrolled bool) error { } func (a *agent) config() error { - body := bytes.NewBufferString(`{"node_key": "` + a.nodeKey + `"}`) - - req := fasthttp.AcquireRequest() - req.SetBody(body.Bytes()) - req.Header.SetMethod("POST") - req.Header.SetContentType("application/json") - req.Header.Add("User-Agent", "osquery/4.6.0") - req.SetRequestURI(a.serverAddress + "/api/osquery/config") - res := fasthttp.AcquireResponse() - - err := a.fastClient.Do(req, res) - - fasthttp.ReleaseRequest(req) - defer fasthttp.ReleaseResponse(res) + request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/config", bytes.NewReader([]byte(`{"node_key": "`+a.nodeKey+`"}`))) + if err != nil { + return err + } + request.Header.Add("Content-type", "application/json") + response, err := http.DefaultClient.Do(request) if err != nil { return fmt.Errorf("config request failed to run: %w", err) } + defer response.Body.Close() a.stats.IncrementConfigRequests() - statusCode := res.StatusCode() + statusCode := response.StatusCode if statusCode != http.StatusOK { a.stats.IncrementConfigErrors() return fmt.Errorf("config request failed: %d", statusCode) @@ -980,7 +953,7 @@ func (a *agent) config() error { Queries map[string]interface{} `json:"queries"` } `json:"packs"` }{} - if err := json.Unmarshal(res.Body(), &parsedResp); err != nil { + if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil { return fmt.Errorf("json parse at config: %w", err) } @@ -1139,33 +1112,28 @@ func (a *agent) softwareMacOS() []map[string]string { } func (a *agent) DistributedRead() (*distributedReadResponse, error) { - req := fasthttp.AcquireRequest() - req.SetBody([]byte(`{"node_key": "` + a.nodeKey + `"}`)) - req.Header.SetMethod("POST") - req.Header.SetContentType("application/json") - req.Header.Add("User-Agent", "osquery/4.6.0") - req.SetRequestURI(a.serverAddress + "/api/osquery/distributed/read") - res := fasthttp.AcquireResponse() - - err := a.fastClient.Do(req, res) - - fasthttp.ReleaseRequest(req) - defer fasthttp.ReleaseResponse(res) + request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/distributed/read", bytes.NewReader([]byte(`{"node_key": "`+a.nodeKey+`"}`))) + if err != nil { + return nil, err + } + request.Header.Add("Content-type", "application/json") + response, err := http.DefaultClient.Do(request) if err != nil { return nil, fmt.Errorf("distributed/read request failed to run: %w", err) } + defer response.Body.Close() a.stats.IncrementDistributedReads() - statusCode := res.StatusCode() + statusCode := response.StatusCode if statusCode != http.StatusOK { a.stats.IncrementDistributedReadErrors() return nil, fmt.Errorf("distributed/read request failed: %d", statusCode) } var parsedResp distributedReadResponse - if err := json.Unmarshal(res.Body(), &parsedResp); err != nil { + if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil { log.Printf("json parse: %s", err) return nil, err } @@ -1596,26 +1564,21 @@ func (a *agent) DistributedWrite(queries map[string]string) error { panic(err) } - req := fasthttp.AcquireRequest() - req.SetBody(body) - req.Header.SetMethod("POST") - req.Header.SetContentType("application/json") - req.Header.Add("User-Agent", "osquery/5.0.1") - req.SetRequestURI(a.serverAddress + "/api/osquery/distributed/write") - res := fasthttp.AcquireResponse() - - err = a.fastClient.Do(req, res) - - fasthttp.ReleaseRequest(req) - defer fasthttp.ReleaseResponse(res) + request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/distributed/write", bytes.NewReader(body)) + if err != nil { + return err + } + request.Header.Add("Content-type", "application/json") + response, err := http.DefaultClient.Do(request) if err != nil { return fmt.Errorf("distributed/write request failed to run: %w", err) } + defer response.Body.Close() a.stats.IncrementDistributedWrites() - statusCode := res.StatusCode() + statusCode := response.StatusCode if statusCode != http.StatusOK { a.stats.IncrementDistributedWriteErrors() return fmt.Errorf("distributed/write request failed: %d", statusCode) @@ -1645,6 +1608,13 @@ func scheduledQueryResults(packName, queryName string, numResults int) json.RawM } func (a *agent) submitLogs(results []json.RawMessage) error { + // Connection check to prevent unnecessary JSON marshaling when the server is down. + conn, err := net.Dial("tcp", strings.TrimPrefix(a.serverAddress, "https://")) + if err != nil { + return err + } + conn.Close() + jsonResults, err := json.Marshal(results) if err != nil { panic(err) @@ -1654,36 +1624,31 @@ func (a *agent) submitLogs(results []json.RawMessage) error { LogType string `json:"log_type"` Data json.RawMessage `json:"data"` } - r := submitLogsRequest{ + slr := submitLogsRequest{ NodeKey: a.nodeKey, LogType: "result", Data: jsonResults, } - body, err := json.Marshal(r) + body, err := json.Marshal(slr) if err != nil { panic(err) } - req := fasthttp.AcquireRequest() - req.SetBody(body) - req.Header.SetMethod("POST") - req.Header.SetContentType("application/json") - req.Header.Add("User-Agent", "osquery/5.0.1") - req.SetRequestURI(a.serverAddress + "/api/osquery/log") - res := fasthttp.AcquireResponse() - - err = a.fastClient.Do(req, res) - - fasthttp.ReleaseRequest(req) - defer fasthttp.ReleaseResponse(res) + request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/log", bytes.NewReader(body)) + if err != nil { + return err + } + request.Header.Add("Content-type", "application/json") + response, err := http.DefaultClient.Do(request) if err != nil { return fmt.Errorf("log request failed to run: %w", err) } + defer response.Body.Close() a.stats.IncrementResultLogRequests() - statusCode := res.StatusCode() + statusCode := response.StatusCode if statusCode != http.StatusOK { a.stats.IncrementResultLogErrors() return fmt.Errorf("log request failed: %d", statusCode) @@ -1724,6 +1689,14 @@ func main() { log.Println(http.ListenAndServe("localhost:6060", nil)) }() + // #nosec (osquery-perf is only used for testing) + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + } + tr := http.DefaultTransport.(*http.Transport).Clone() + tr.TLSClientConfig = tlsConfig + http.DefaultClient.Transport = tr + validTemplateNames := map[string]bool{ "macos_13.6.2.tmpl": true, "macos_14.1.2.tmpl": true, diff --git a/go.mod b/go.mod index 343609d638..4ab0b66e50 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/fatih/color v1.15.0 github.com/getsentry/sentry-go v0.18.0 github.com/ghodss/yaml v1.0.0 + github.com/go-git/go-git/v5 v5.11.0 github.com/go-ini/ini v1.67.0 github.com/go-kit/kit v0.12.0 github.com/go-kit/log v0.2.1 @@ -98,7 +99,6 @@ require ( github.com/tj/assert v0.0.3 github.com/ulikunitz/xz v0.5.10 github.com/urfave/cli/v2 v2.23.5 - github.com/valyala/fasthttp v1.40.0 github.com/ziutek/mymysql v1.5.4 go.elastic.co/apm/module/apmgorilla/v2 v2.3.0 go.elastic.co/apm/module/apmsql/v2 v2.4.3 @@ -164,7 +164,6 @@ require ( github.com/agnivade/levenshtein v1.1.1 // indirect github.com/akavel/rsrc v0.10.2 // indirect github.com/alecthomas/jsonschema v0.0.0-20211022214203-8b29eab41725 // indirect - github.com/andybalholm/brotli v1.0.4 // indirect github.com/antchfx/xpath v1.2.2 // indirect github.com/apache/thrift v0.18.1 // indirect github.com/apex/log v1.9.0 // indirect @@ -210,7 +209,6 @@ require ( github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect github.com/go-git/go-billy/v5 v5.5.0 // indirect - github.com/go-git/go-git/v5 v5.11.0 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -292,7 +290,6 @@ require ( github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect github.com/trivago/tgo v1.0.7 // indirect - github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/vartanbeno/go-reddit/v2 v2.0.0 // indirect github.com/xanzy/go-gitlab v0.50.3 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect diff --git a/go.sum b/go.sum index af6f40b4f2..a9cba664cf 100644 --- a/go.sum +++ b/go.sum @@ -210,8 +210,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= -github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= -github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxBp0T0eFw1RUQY= github.com/andygrunwald/go-jira v1.16.0 h1:PU7C7Fkk5L96JvPc6vDVIrd99vdPnYudHu4ju2c2ikQ= github.com/andygrunwald/go-jira v1.16.0/go.mod h1:UQH4IBVxIYWbgagc0LF/k9FRs9xjIiQ8hIcC6HfLwFU= @@ -835,7 +833,6 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kolide/kit v0.0.0-20221107170827-fb85e3d59eab h1:KVR7cs+oPyy85i+8t1ZaNSy1bymCy5FuWyt51pdrXu4= @@ -1198,11 +1195,6 @@ github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8= github.com/ulikunitz/xz v0.5.10/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/urfave/cli/v2 v2.23.5 h1:xbrU7tAYviSpqeR3X4nEFWUdB/uDZ6DE+HxmRU7Xtyw= github.com/urfave/cli/v2 v2.23.5/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.40.0 h1:CRq/00MfruPGFLTQKY8b+8SfdK60TxNztjRMnH0t1Yc= -github.com/valyala/fasthttp v1.40.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I= -github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/vartanbeno/go-reddit/v2 v2.0.0 h1:fxYMqx5lhbmJ3yYRN1nnQC/gecRB3xpUS2BbG7GLpsk= github.com/vartanbeno/go-reddit/v2 v2.0.0/go.mod h1:758/S10hwZSLm43NPtwoNQdZFSg3sjB5745Mwjb0ANI= github.com/vmihailenco/msgpack v3.3.3+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= @@ -1325,7 +1317,6 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5 golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= @@ -1582,7 +1573,6 @@ golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=