Replace fasthttp with Go's stdlib (reduces memory footprint) (#16922)

#16423

- To reduce memory footprint I'm replacing fasthttp with Go's stdlib
http package:
- To reduce CPU usage while Fleet is down am adding a connection check
before sending logs (otherwise osquery-perf spends a lot of CPU
marshaling JSON).

With fasthttp (memory start growing when I bring Fleet down, because it
starts to buffer logs):

![output_fasthttp_cut](https://github.com/fleetdm/fleet/assets/2073526/05161817-581c-4518-9751-7c014160e139)
With golang (memory start growing when I bring Fleet down, because it
starts to buffer logs):

![output_go_cut](https://github.com/fleetdm/fleet/assets/2073526/abdf899b-a812-478b-b638-28336513456a)
This commit is contained in:
Lucas Manuel Rodriguez 2024-02-19 11:33:19 -03:00 committed by GitHub
parent d60463dc1b
commit 6ea8f49f22
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 81 additions and 121 deletions

View file

@ -14,6 +14,7 @@ import (
"io"
"log"
"math/rand"
"net"
"net/http"
_ "net/http/pprof"
"os"
@ -30,7 +31,6 @@ import (
"github.com/fleetdm/fleet/v4/server/ptr"
"github.com/fleetdm/fleet/v4/server/service"
"github.com/google/uuid"
"github.com/valyala/fasthttp"
)
var (
@ -334,7 +334,6 @@ type agent struct {
liveQueryNoResultsProb float64
strings map[string]string
serverAddress string
fastClient fasthttp.Client
stats *Stats
nodeKeyManager *nodeKeyManager
nodeKey string
@ -420,10 +419,6 @@ func newAgent(
if rand.Float64() <= orbitProb {
deviceAuthToken = ptr.String(uuid.NewString())
}
// #nosec (osquery-perf is only used for testing)
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
serialNumber := mdmtest.RandSerialNumber()
if rand.Float64() <= emptySerialProb {
serialNumber = ""
@ -451,12 +446,9 @@ func newAgent(
munkiIssueCount: munkiIssueCount,
liveQueryFailProb: liveQueryFailProb,
liveQueryNoResultsProb: liveQueryNoResultsProb,
fastClient: fasthttp.Client{
TLSConfig: tlsConfig,
},
templates: templates,
deviceAuthToken: deviceAuthToken,
os: strings.TrimRight(templates.Name(), ".tmpl"),
templates: templates,
deviceAuthToken: deviceAuthToken,
os: strings.TrimRight(templates.Name(), ".tmpl"),
EnrollSecret: enrollSecret,
ConfigInterval: configInterval,
@ -840,18 +832,19 @@ func (a *agent) execScripts(execIDs []string, orbitClient *service.OrbitClient)
}
}
func (a *agent) waitingDo(req *fasthttp.Request, res *fasthttp.Response) {
err := a.fastClient.Do(req, res)
for err != nil || res.StatusCode() != http.StatusOK {
func (a *agent) waitingDo(request *http.Request) *http.Response {
response, err := http.DefaultClient.Do(request)
for err != nil || response.StatusCode != http.StatusOK {
if err != nil {
log.Printf("failed to run request: %s", err)
} else { // res.StatusCode() != http.StatusOK
log.Printf("request failed: %d", res.StatusCode())
log.Printf("request failed: %d", response.StatusCode)
}
a.stats.IncrementErrors(1)
<-time.Tick(time.Duration(rand.Intn(120)+1) * time.Second)
err = a.fastClient.Do(req, res)
response, err = http.DefaultClient.Do(request)
}
return response
}
// TODO: add support to `alreadyEnrolled` akin to the `enroll` function. for
@ -863,32 +856,23 @@ func (a *agent) orbitEnroll() error {
HardwareUUID: a.UUID,
HardwareSerial: a.SerialNumber,
}
jsonBytes, err := json.Marshal(params)
if err != nil {
log.Println("orbit json marshall:", err)
return err
}
req := fasthttp.AcquireRequest()
req.SetBody(jsonBytes)
req.Header.SetMethod("POST")
req.Header.SetContentType("application/json")
req.Header.SetRequestURI(a.serverAddress + "/api/fleet/orbit/enroll")
resp := fasthttp.AcquireResponse()
a.waitingDo(req, resp)
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)
if resp.StatusCode() != http.StatusOK {
log.Println("orbit enroll status:", resp.StatusCode())
return fmt.Errorf("status code: %d", resp.StatusCode())
request, err := http.NewRequest("POST", a.serverAddress+"/api/fleet/orbit/enroll", bytes.NewReader(jsonBytes))
if err != nil {
return err
}
request.Header.Add("Content-type", "application/json")
response := a.waitingDo(request)
defer response.Body.Close()
var parsedResp service.EnrollOrbitResponse
if err := json.Unmarshal(resp.Body(), &parsedResp); err != nil {
if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil {
log.Println("orbit json parse:", err)
return err
}
@ -915,26 +899,22 @@ func (a *agent) enroll(i int, onlyAlreadyEnrolled bool) error {
return err
}
req := fasthttp.AcquireRequest()
req.SetBody(body.Bytes())
req.Header.SetMethod("POST")
req.Header.SetContentType("application/json")
req.Header.Add("User-Agent", "osquery/4.6.0")
req.SetRequestURI(a.serverAddress + "/api/osquery/enroll")
res := fasthttp.AcquireResponse()
request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/enroll", &body)
if err != nil {
return err
}
request.Header.Add("Content-type", "application/json")
a.waitingDo(req, res)
response := a.waitingDo(request)
defer response.Body.Close()
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
if res.StatusCode() != http.StatusOK {
log.Println("enroll status:", res.StatusCode())
return fmt.Errorf("status code: %d", res.StatusCode())
if response.StatusCode != http.StatusOK {
log.Println("enroll status:", response.StatusCode)
return fmt.Errorf("status code: %d", response.StatusCode)
}
var parsedResp enrollResponse
if err := json.Unmarshal(res.Body(), &parsedResp); err != nil {
if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil {
log.Println("json parse:", err)
return err
}
@ -948,28 +928,21 @@ func (a *agent) enroll(i int, onlyAlreadyEnrolled bool) error {
}
func (a *agent) config() error {
body := bytes.NewBufferString(`{"node_key": "` + a.nodeKey + `"}`)
req := fasthttp.AcquireRequest()
req.SetBody(body.Bytes())
req.Header.SetMethod("POST")
req.Header.SetContentType("application/json")
req.Header.Add("User-Agent", "osquery/4.6.0")
req.SetRequestURI(a.serverAddress + "/api/osquery/config")
res := fasthttp.AcquireResponse()
err := a.fastClient.Do(req, res)
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/config", bytes.NewReader([]byte(`{"node_key": "`+a.nodeKey+`"}`)))
if err != nil {
return err
}
request.Header.Add("Content-type", "application/json")
response, err := http.DefaultClient.Do(request)
if err != nil {
return fmt.Errorf("config request failed to run: %w", err)
}
defer response.Body.Close()
a.stats.IncrementConfigRequests()
statusCode := res.StatusCode()
statusCode := response.StatusCode
if statusCode != http.StatusOK {
a.stats.IncrementConfigErrors()
return fmt.Errorf("config request failed: %d", statusCode)
@ -980,7 +953,7 @@ func (a *agent) config() error {
Queries map[string]interface{} `json:"queries"`
} `json:"packs"`
}{}
if err := json.Unmarshal(res.Body(), &parsedResp); err != nil {
if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil {
return fmt.Errorf("json parse at config: %w", err)
}
@ -1139,33 +1112,28 @@ func (a *agent) softwareMacOS() []map[string]string {
}
func (a *agent) DistributedRead() (*distributedReadResponse, error) {
req := fasthttp.AcquireRequest()
req.SetBody([]byte(`{"node_key": "` + a.nodeKey + `"}`))
req.Header.SetMethod("POST")
req.Header.SetContentType("application/json")
req.Header.Add("User-Agent", "osquery/4.6.0")
req.SetRequestURI(a.serverAddress + "/api/osquery/distributed/read")
res := fasthttp.AcquireResponse()
err := a.fastClient.Do(req, res)
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/distributed/read", bytes.NewReader([]byte(`{"node_key": "`+a.nodeKey+`"}`)))
if err != nil {
return nil, err
}
request.Header.Add("Content-type", "application/json")
response, err := http.DefaultClient.Do(request)
if err != nil {
return nil, fmt.Errorf("distributed/read request failed to run: %w", err)
}
defer response.Body.Close()
a.stats.IncrementDistributedReads()
statusCode := res.StatusCode()
statusCode := response.StatusCode
if statusCode != http.StatusOK {
a.stats.IncrementDistributedReadErrors()
return nil, fmt.Errorf("distributed/read request failed: %d", statusCode)
}
var parsedResp distributedReadResponse
if err := json.Unmarshal(res.Body(), &parsedResp); err != nil {
if err := json.NewDecoder(response.Body).Decode(&parsedResp); err != nil {
log.Printf("json parse: %s", err)
return nil, err
}
@ -1596,26 +1564,21 @@ func (a *agent) DistributedWrite(queries map[string]string) error {
panic(err)
}
req := fasthttp.AcquireRequest()
req.SetBody(body)
req.Header.SetMethod("POST")
req.Header.SetContentType("application/json")
req.Header.Add("User-Agent", "osquery/5.0.1")
req.SetRequestURI(a.serverAddress + "/api/osquery/distributed/write")
res := fasthttp.AcquireResponse()
err = a.fastClient.Do(req, res)
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/distributed/write", bytes.NewReader(body))
if err != nil {
return err
}
request.Header.Add("Content-type", "application/json")
response, err := http.DefaultClient.Do(request)
if err != nil {
return fmt.Errorf("distributed/write request failed to run: %w", err)
}
defer response.Body.Close()
a.stats.IncrementDistributedWrites()
statusCode := res.StatusCode()
statusCode := response.StatusCode
if statusCode != http.StatusOK {
a.stats.IncrementDistributedWriteErrors()
return fmt.Errorf("distributed/write request failed: %d", statusCode)
@ -1645,6 +1608,13 @@ func scheduledQueryResults(packName, queryName string, numResults int) json.RawM
}
func (a *agent) submitLogs(results []json.RawMessage) error {
// Connection check to prevent unnecessary JSON marshaling when the server is down.
conn, err := net.Dial("tcp", strings.TrimPrefix(a.serverAddress, "https://"))
if err != nil {
return err
}
conn.Close()
jsonResults, err := json.Marshal(results)
if err != nil {
panic(err)
@ -1654,36 +1624,31 @@ func (a *agent) submitLogs(results []json.RawMessage) error {
LogType string `json:"log_type"`
Data json.RawMessage `json:"data"`
}
r := submitLogsRequest{
slr := submitLogsRequest{
NodeKey: a.nodeKey,
LogType: "result",
Data: jsonResults,
}
body, err := json.Marshal(r)
body, err := json.Marshal(slr)
if err != nil {
panic(err)
}
req := fasthttp.AcquireRequest()
req.SetBody(body)
req.Header.SetMethod("POST")
req.Header.SetContentType("application/json")
req.Header.Add("User-Agent", "osquery/5.0.1")
req.SetRequestURI(a.serverAddress + "/api/osquery/log")
res := fasthttp.AcquireResponse()
err = a.fastClient.Do(req, res)
fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
request, err := http.NewRequest("POST", a.serverAddress+"/api/osquery/log", bytes.NewReader(body))
if err != nil {
return err
}
request.Header.Add("Content-type", "application/json")
response, err := http.DefaultClient.Do(request)
if err != nil {
return fmt.Errorf("log request failed to run: %w", err)
}
defer response.Body.Close()
a.stats.IncrementResultLogRequests()
statusCode := res.StatusCode()
statusCode := response.StatusCode
if statusCode != http.StatusOK {
a.stats.IncrementResultLogErrors()
return fmt.Errorf("log request failed: %d", statusCode)
@ -1724,6 +1689,14 @@ func main() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// #nosec (osquery-perf is only used for testing)
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.TLSClientConfig = tlsConfig
http.DefaultClient.Transport = tr
validTemplateNames := map[string]bool{
"macos_13.6.2.tmpl": true,
"macos_14.1.2.tmpl": true,

5
go.mod
View file

@ -34,6 +34,7 @@ require (
github.com/fatih/color v1.15.0
github.com/getsentry/sentry-go v0.18.0
github.com/ghodss/yaml v1.0.0
github.com/go-git/go-git/v5 v5.11.0
github.com/go-ini/ini v1.67.0
github.com/go-kit/kit v0.12.0
github.com/go-kit/log v0.2.1
@ -98,7 +99,6 @@ require (
github.com/tj/assert v0.0.3
github.com/ulikunitz/xz v0.5.10
github.com/urfave/cli/v2 v2.23.5
github.com/valyala/fasthttp v1.40.0
github.com/ziutek/mymysql v1.5.4
go.elastic.co/apm/module/apmgorilla/v2 v2.3.0
go.elastic.co/apm/module/apmsql/v2 v2.4.3
@ -164,7 +164,6 @@ require (
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/akavel/rsrc v0.10.2 // indirect
github.com/alecthomas/jsonschema v0.0.0-20211022214203-8b29eab41725 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/antchfx/xpath v1.2.2 // indirect
github.com/apache/thrift v0.18.1 // indirect
github.com/apex/log v1.9.0 // indirect
@ -210,7 +209,6 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-git/go-billy/v5 v5.5.0 // indirect
github.com/go-git/go-git/v5 v5.11.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
@ -292,7 +290,6 @@ require (
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/trivago/tgo v1.0.7 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/vartanbeno/go-reddit/v2 v2.0.0 // indirect
github.com/xanzy/go-gitlab v0.50.3 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect

10
go.sum
View file

@ -210,8 +210,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxBp0T0eFw1RUQY=
github.com/andygrunwald/go-jira v1.16.0 h1:PU7C7Fkk5L96JvPc6vDVIrd99vdPnYudHu4ju2c2ikQ=
github.com/andygrunwald/go-jira v1.16.0/go.mod h1:UQH4IBVxIYWbgagc0LF/k9FRs9xjIiQ8hIcC6HfLwFU=
@ -835,7 +833,6 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kolide/kit v0.0.0-20221107170827-fb85e3d59eab h1:KVR7cs+oPyy85i+8t1ZaNSy1bymCy5FuWyt51pdrXu4=
@ -1198,11 +1195,6 @@ github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8=
github.com/ulikunitz/xz v0.5.10/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/urfave/cli/v2 v2.23.5 h1:xbrU7tAYviSpqeR3X4nEFWUdB/uDZ6DE+HxmRU7Xtyw=
github.com/urfave/cli/v2 v2.23.5/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.40.0 h1:CRq/00MfruPGFLTQKY8b+8SfdK60TxNztjRMnH0t1Yc=
github.com/valyala/fasthttp v1.40.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/vartanbeno/go-reddit/v2 v2.0.0 h1:fxYMqx5lhbmJ3yYRN1nnQC/gecRB3xpUS2BbG7GLpsk=
github.com/vartanbeno/go-reddit/v2 v2.0.0/go.mod h1:758/S10hwZSLm43NPtwoNQdZFSg3sjB5745Mwjb0ANI=
github.com/vmihailenco/msgpack v3.3.3+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
@ -1325,7 +1317,6 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
@ -1582,7 +1573,6 @@ golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=