diff --git a/cmd/osquery-perf/agent.go b/cmd/osquery-perf/agent.go index 84e6085e2f..0ebbeee725 100644 --- a/cmd/osquery-perf/agent.go +++ b/cmd/osquery-perf/agent.go @@ -300,6 +300,7 @@ type agent struct { UUID string SerialNumber string ConfigInterval time.Duration + LogInterval time.Duration QueryInterval time.Duration MDMCheckInInterval time.Duration DiskEncryptionEnabled bool @@ -326,7 +327,7 @@ func newAgent( agentIndex int, serverAddress, enrollSecret string, templates *template.Template, - configInterval, queryInterval, mdmCheckInInterval time.Duration, + configInterval, logInterval, queryInterval, mdmCheckInInterval time.Duration, softwareCount softwareEntityCount, userCount entityCount, policyPassProb float64, @@ -383,6 +384,7 @@ func newAgent( EnrollSecret: enrollSecret, ConfigInterval: configInterval, + LogInterval: logInterval, QueryInterval: queryInterval, MDMCheckInInterval: mdmCheckInInterval, UUID: uuid, @@ -453,31 +455,58 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) { go a.runMDMLoop() } - configTicker := time.Tick(a.ConfigInterval) - liveQueryTicker := time.Tick(a.QueryInterval) - // Since this is an internal timer, we don't need to configure it. - logTicker := time.Tick(1 * time.Second) - for { - select { - case <-configTicker: - a.config() - case <-liveQueryTicker: - resp, err := a.DistributedRead() - if err != nil { - log.Println(err) - } else if len(resp.Queries) > 0 { - a.DistributedWrite(resp.Queries) + // + // osquery runs three separate independent threads, + // - a thread for getting, running and submitting results for distributed queries (distributed). + // - a thread for getting configuration from a remote server (config). + // - a thread for submitting log results (logger). + // + // Thus we try to simulate that as much as we can. + + // distributed thread: + go func() { + liveQueryTicker := time.Tick(a.QueryInterval) + for { + select { + case <-liveQueryTicker: + resp, err := a.DistributedRead() + if err != nil { + log.Println(err) + } else if len(resp.Queries) > 0 { + a.DistributedWrite(resp.Queries) + } } + } + }() + + // config thread: + go func() { + for { + configTicker := time.Tick(a.ConfigInterval) + select { + case <-configTicker: + a.config() + } + } + }() + + // logger thread: + for { + logTicker := time.Tick(a.LogInterval) + select { case <-logTicker: - // check if we have any scheduled queries + // check if we have any scheduled queries that should be returning results + var results []json.RawMessage + now := time.Now().Unix() for i, query := range a.scheduledQueryData { - time.Sleep(500 * time.Millisecond) - now := time.Now().Unix() if query.NextRun == 0 || now >= int64(query.NextRun) { - a.SubmitLogs(query.Name, a.CachedString("id"), a.CachedString("hostname"), query.PackName, query.NumResults) + results = append(results, a.scheduledQueryResults(query.PackName, query.Name, int(query.NumResults))) a.scheduledQueryData[i].NextRun = float64(now + int64(query.ScheduleInterval)) } } + if len(results) > 0 { + a.SubmitLogs(results) + } } } } @@ -1419,17 +1448,9 @@ func (a *agent) DistributedWrite(queries map[string]string) { // No need to read the distributed write body } -func (a *agent) SubmitLogs(queryName, hostID, hostName, packName string, numResults uint) { - type submitLogsRequest struct { - NodeKey string `json:"node_key"` - LogType string `json:"log_type"` - Data json.RawMessage `json:"data"` - } - r := submitLogsRequest{ - NodeKey: a.nodeKey, - LogType: "result", - Data: json.RawMessage(`[{ - "snapshot": [` + results(int(numResults), a.UUID) + ` +func (a *agent) scheduledQueryResults(packName, queryName string, numResults int) json.RawMessage { + return json.RawMessage(`{ + "snapshot": [` + results(numResults, a.UUID) + ` ], "action": "snapshot", "name": "pack/` + packName + `/` + queryName + `", @@ -1443,7 +1464,23 @@ func (a *agent) SubmitLogs(queryName, hostID, hostName, packName string, numResu "host_uuid": "187c4d56-8e45-1a9d-8513-ac17efd2f0fd", "hostname": "` + a.CachedString("hostname") + `" } -}]`), +}`) +} + +func (a *agent) SubmitLogs(results []json.RawMessage) { + jsonResults, err := json.Marshal(results) + if err != nil { + panic(err) + } + type submitLogsRequest struct { + NodeKey string `json:"node_key"` + LogType string `json:"log_type"` + Data json.RawMessage `json:"data"` + } + r := submitLogsRequest{ + NodeKey: a.nodeKey, + LogType: "result", + Data: jsonResults, } body, err := json.Marshal(r) @@ -1466,7 +1503,7 @@ func (a *agent) SubmitLogs(queryName, hostID, hostName, packName string, numResu } // Creates a set of results for use in tests for Query Results. -func results(num int, hostID string) string { +func results(num int, hostUUID string) string { b := strings.Builder{} for i := 0; i < num; i++ { b.WriteString(` { @@ -1479,7 +1516,7 @@ func results(num int, hostID string) string { "pid": "3574", "platform_mask": "9", "start_time": "1696502961", - "uuid": "` + hostID + `", + "uuid": "` + hostUUID + `", "version": "5.9.2", "watcher": "3570" }`) @@ -1503,12 +1540,15 @@ func main() { } var ( - serverURL = flag.String("server_url", "https://localhost:8080", "URL (with protocol and port of osquery server)") - enrollSecret = flag.String("enroll_secret", "", "Enroll secret to authenticate enrollment") - hostCount = flag.Int("host_count", 10, "Number of hosts to start (default 10)") - randSeed = flag.Int64("seed", time.Now().UnixNano(), "Seed for random generator (default current time)") - startPeriod = flag.Duration("start_period", 10*time.Second, "Duration to spread start of hosts over") - configInterval = flag.Duration("config_interval", 1*time.Minute, "Interval for config requests") + serverURL = flag.String("server_url", "https://localhost:8080", "URL (with protocol and port of osquery server)") + enrollSecret = flag.String("enroll_secret", "", "Enroll secret to authenticate enrollment") + hostCount = flag.Int("host_count", 10, "Number of hosts to start (default 10)") + randSeed = flag.Int64("seed", time.Now().UnixNano(), "Seed for random generator (default current time)") + startPeriod = flag.Duration("start_period", 10*time.Second, "Duration to spread start of hosts over") + configInterval = flag.Duration("config_interval", 1*time.Minute, "Interval for config requests") + // Flag logger_tls_period defines how often to check for sending scheduled query results. + // osquery-perf will send log requests with results only if there are scheduled queries configured AND it's their time to run. + logInterval = flag.Duration("logger_tls_period", 10*time.Second, "Interval for scheduled queries log requests") queryInterval = flag.Duration("query_interval", 10*time.Second, "Interval for live query requests") mdmCheckInInterval = flag.Duration("mdm_check_in_interval", 10*time.Second, "Interval for performing MDM check ins") onlyAlreadyEnrolled = flag.Bool("only_already_enrolled", false, "Only start agents that are already enrolled") @@ -1632,6 +1672,7 @@ func main() { *enrollSecret, tmpl, *configInterval, + *logInterval, *queryInterval, *mdmCheckInInterval, softwareEntityCount{