diff --git a/cmd/osquery-perf/agent.go b/cmd/osquery-perf/agent.go index ed01e46c75..dbb87751ba 100644 --- a/cmd/osquery-perf/agent.go +++ b/cmd/osquery-perf/agent.go @@ -15,6 +15,7 @@ import ( "log" "math/rand" "net/http" + _ "net/http/pprof" "os" "strconv" "strings" @@ -377,7 +378,7 @@ type agent struct { scheduledQueriesMu sync.Mutex // protects the below members scheduledQueries []string scheduledQueryData []scheduledQuery - bufferedResults []json.RawMessage + bufferedResults []resultLog } type entityCount struct { @@ -566,13 +567,17 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) { defer logTicker.Stop() for range logTicker.C { // check if we have any scheduled queries that should be returning results - var results []json.RawMessage + var results []resultLog now := time.Now().Unix() a.scheduledQueriesMu.Lock() prevCount := len(a.bufferedResults) for i, query := range a.scheduledQueryData { if query.nextRun == 0 || now >= int64(query.nextRun) { - results = append(results, a.scheduledQueryResults(query.packName, query.Name, int(query.numRows))) + results = append(results, resultLog{ + packName: query.packName, + queryName: query.Name, + numRows: int(query.numRows), + }) a.scheduledQueryData[i].nextRun = float64(now + int64(query.ScheduleInterval)) } } @@ -588,6 +593,16 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) { } } +type resultLog struct { + packName string + queryName string + numRows int +} + +func (r resultLog) emit() json.RawMessage { + return scheduledQueryResults(r.packName, r.queryName, r.numRows) +} + // sendLogsBatch sends up to loggerTLSMaxLines logs and updates the buffer. func (a *agent) sendLogsBatch() { if len(a.bufferedResults) == 0 { @@ -599,7 +614,11 @@ func (a *agent) sendLogsBatch() { batchSize = len(a.bufferedResults) } batch := a.bufferedResults[:batchSize] - if err := a.submitLogs(batch); err != nil { + batchLogs := make([]json.RawMessage, 0, len(batch)) + for _, result := range batch { + batchLogs = append(batchLogs, result.emit()) + } + if err := a.submitLogs(batchLogs); err != nil { return } a.bufferedResults = a.bufferedResults[batchSize:] @@ -940,13 +959,14 @@ func (a *agent) config() error { res := fasthttp.AcquireResponse() err := a.fastClient.Do(req, res) - if err != nil { - return fmt.Errorf("config request failed to run: %w", err) - } fasthttp.ReleaseRequest(req) defer fasthttp.ReleaseResponse(res) + if err != nil { + return fmt.Errorf("config request failed to run: %w", err) + } + a.stats.IncrementConfigRequests() statusCode := res.StatusCode() @@ -1128,13 +1148,14 @@ func (a *agent) DistributedRead() (*distributedReadResponse, error) { res := fasthttp.AcquireResponse() err := a.fastClient.Do(req, res) - if err != nil { - return nil, fmt.Errorf("distributed/read request failed to run: %w", err) - } fasthttp.ReleaseRequest(req) defer fasthttp.ReleaseResponse(res) + if err != nil { + return nil, fmt.Errorf("distributed/read request failed to run: %w", err) + } + a.stats.IncrementDistributedReads() statusCode := res.StatusCode() @@ -1584,13 +1605,14 @@ func (a *agent) DistributedWrite(queries map[string]string) error { res := fasthttp.AcquireResponse() err = a.fastClient.Do(req, res) - if err != nil { - return fmt.Errorf("distributed/write request failed to run: %w", err) - } fasthttp.ReleaseRequest(req) defer fasthttp.ReleaseResponse(res) + if err != nil { + return fmt.Errorf("distributed/write request failed to run: %w", err) + } + a.stats.IncrementDistributedWrites() statusCode := res.StatusCode() @@ -1603,13 +1625,13 @@ func (a *agent) DistributedWrite(queries map[string]string) error { return nil } -func (a *agent) scheduledQueryResults(packName, queryName string, numResults int) json.RawMessage { +func scheduledQueryResults(packName, queryName string, numResults int) json.RawMessage { return json.RawMessage(`{ - "snapshot": [` + rows(numResults, a.UUID) + ` + "snapshot": [` + rows(numResults) + ` ], "action": "snapshot", "name": "pack/` + packName + `/` + queryName + `", - "hostIdentifier": "` + a.UUID + `", + "hostIdentifier": "EF9595F0-CE81-493A-9B06-D8A9D2CCB952", "calendarTime": "Fri Oct 6 18:13:04 2023 UTC", "unixTime": 1696615984, "epoch": 0, @@ -1617,7 +1639,7 @@ func (a *agent) scheduledQueryResults(packName, queryName string, numResults int "numerics": false, "decorations": { "host_uuid": "187c4d56-8e45-1a9d-8513-ac17efd2f0fd", - "hostname": "` + a.CachedString("hostname") + `" + "hostname": "osquery-perf" } }`) } @@ -1651,13 +1673,14 @@ func (a *agent) submitLogs(results []json.RawMessage) error { res := fasthttp.AcquireResponse() err = a.fastClient.Do(req, res) - if err != nil { - return fmt.Errorf("log request failed to run: %w", err) - } fasthttp.ReleaseRequest(req) defer fasthttp.ReleaseResponse(res) + if err != nil { + return fmt.Errorf("log request failed to run: %w", err) + } + a.stats.IncrementResultLogRequests() statusCode := res.StatusCode() @@ -1670,7 +1693,7 @@ func (a *agent) submitLogs(results []json.RawMessage) error { } // rows returns a set of rows for use in tests for query results. -func rows(num int, hostUUID string) string { +func rows(num int) string { b := strings.Builder{} for i := 0; i < num; i++ { b.WriteString(` { @@ -1683,7 +1706,7 @@ func rows(num int, hostUUID string) string { "pid": "3574", "platform_mask": "9", "start_time": "1696502961", - "uuid": "` + hostUUID + `", + "uuid": "EF9595F0-CE81-493A-9B06-D8A9D2CCB95", "version": "5.9.2", "watcher": "3570" }`) @@ -1696,6 +1719,11 @@ func rows(num int, hostUUID string) string { } func main() { + // Start HTTP server for pprof. See https://pkg.go.dev/net/http/pprof. + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + validTemplateNames := map[string]bool{ "macos_13.6.2.tmpl": true, "macos_14.1.2.tmpl": true,