Run logger, distributed and config in separate goroutines (#14584)

This commit is contained in:
Lucas Manuel Rodriguez 2023-10-17 11:30:59 -03:00 committed by GitHub
parent 82534168d9
commit 22bba274b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -300,6 +300,7 @@ type agent struct {
UUID string
SerialNumber string
ConfigInterval time.Duration
LogInterval time.Duration
QueryInterval time.Duration
MDMCheckInInterval time.Duration
DiskEncryptionEnabled bool
@ -326,7 +327,7 @@ func newAgent(
agentIndex int,
serverAddress, enrollSecret string,
templates *template.Template,
configInterval, queryInterval, mdmCheckInInterval time.Duration,
configInterval, logInterval, queryInterval, mdmCheckInInterval time.Duration,
softwareCount softwareEntityCount,
userCount entityCount,
policyPassProb float64,
@ -383,6 +384,7 @@ func newAgent(
EnrollSecret: enrollSecret,
ConfigInterval: configInterval,
LogInterval: logInterval,
QueryInterval: queryInterval,
MDMCheckInInterval: mdmCheckInInterval,
UUID: uuid,
@ -453,31 +455,58 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
go a.runMDMLoop()
}
configTicker := time.Tick(a.ConfigInterval)
liveQueryTicker := time.Tick(a.QueryInterval)
// Since this is an internal timer, we don't need to configure it.
logTicker := time.Tick(1 * time.Second)
for {
select {
case <-configTicker:
a.config()
case <-liveQueryTicker:
resp, err := a.DistributedRead()
if err != nil {
log.Println(err)
} else if len(resp.Queries) > 0 {
a.DistributedWrite(resp.Queries)
//
// osquery runs three separate independent threads,
// - a thread for getting, running and submitting results for distributed queries (distributed).
// - a thread for getting configuration from a remote server (config).
// - a thread for submitting log results (logger).
//
// Thus we try to simulate that as much as we can.
// distributed thread:
go func() {
liveQueryTicker := time.Tick(a.QueryInterval)
for {
select {
case <-liveQueryTicker:
resp, err := a.DistributedRead()
if err != nil {
log.Println(err)
} else if len(resp.Queries) > 0 {
a.DistributedWrite(resp.Queries)
}
}
}
}()
// config thread:
go func() {
for {
configTicker := time.Tick(a.ConfigInterval)
select {
case <-configTicker:
a.config()
}
}
}()
// logger thread:
for {
logTicker := time.Tick(a.LogInterval)
select {
case <-logTicker:
// check if we have any scheduled queries
// check if we have any scheduled queries that should be returning results
var results []json.RawMessage
now := time.Now().Unix()
for i, query := range a.scheduledQueryData {
time.Sleep(500 * time.Millisecond)
now := time.Now().Unix()
if query.NextRun == 0 || now >= int64(query.NextRun) {
a.SubmitLogs(query.Name, a.CachedString("id"), a.CachedString("hostname"), query.PackName, query.NumResults)
results = append(results, a.scheduledQueryResults(query.PackName, query.Name, int(query.NumResults)))
a.scheduledQueryData[i].NextRun = float64(now + int64(query.ScheduleInterval))
}
}
if len(results) > 0 {
a.SubmitLogs(results)
}
}
}
}
@ -1419,17 +1448,9 @@ func (a *agent) DistributedWrite(queries map[string]string) {
// No need to read the distributed write body
}
func (a *agent) SubmitLogs(queryName, hostID, hostName, packName string, numResults uint) {
type submitLogsRequest struct {
NodeKey string `json:"node_key"`
LogType string `json:"log_type"`
Data json.RawMessage `json:"data"`
}
r := submitLogsRequest{
NodeKey: a.nodeKey,
LogType: "result",
Data: json.RawMessage(`[{
"snapshot": [` + results(int(numResults), a.UUID) + `
func (a *agent) scheduledQueryResults(packName, queryName string, numResults int) json.RawMessage {
return json.RawMessage(`{
"snapshot": [` + results(numResults, a.UUID) + `
],
"action": "snapshot",
"name": "pack/` + packName + `/` + queryName + `",
@ -1443,7 +1464,23 @@ func (a *agent) SubmitLogs(queryName, hostID, hostName, packName string, numResu
"host_uuid": "187c4d56-8e45-1a9d-8513-ac17efd2f0fd",
"hostname": "` + a.CachedString("hostname") + `"
}
}]`),
}`)
}
func (a *agent) SubmitLogs(results []json.RawMessage) {
jsonResults, err := json.Marshal(results)
if err != nil {
panic(err)
}
type submitLogsRequest struct {
NodeKey string `json:"node_key"`
LogType string `json:"log_type"`
Data json.RawMessage `json:"data"`
}
r := submitLogsRequest{
NodeKey: a.nodeKey,
LogType: "result",
Data: jsonResults,
}
body, err := json.Marshal(r)
@ -1466,7 +1503,7 @@ func (a *agent) SubmitLogs(queryName, hostID, hostName, packName string, numResu
}
// Creates a set of results for use in tests for Query Results.
func results(num int, hostID string) string {
func results(num int, hostUUID string) string {
b := strings.Builder{}
for i := 0; i < num; i++ {
b.WriteString(` {
@ -1479,7 +1516,7 @@ func results(num int, hostID string) string {
"pid": "3574",
"platform_mask": "9",
"start_time": "1696502961",
"uuid": "` + hostID + `",
"uuid": "` + hostUUID + `",
"version": "5.9.2",
"watcher": "3570"
}`)
@ -1503,12 +1540,15 @@ func main() {
}
var (
serverURL = flag.String("server_url", "https://localhost:8080", "URL (with protocol and port of osquery server)")
enrollSecret = flag.String("enroll_secret", "", "Enroll secret to authenticate enrollment")
hostCount = flag.Int("host_count", 10, "Number of hosts to start (default 10)")
randSeed = flag.Int64("seed", time.Now().UnixNano(), "Seed for random generator (default current time)")
startPeriod = flag.Duration("start_period", 10*time.Second, "Duration to spread start of hosts over")
configInterval = flag.Duration("config_interval", 1*time.Minute, "Interval for config requests")
serverURL = flag.String("server_url", "https://localhost:8080", "URL (with protocol and port of osquery server)")
enrollSecret = flag.String("enroll_secret", "", "Enroll secret to authenticate enrollment")
hostCount = flag.Int("host_count", 10, "Number of hosts to start (default 10)")
randSeed = flag.Int64("seed", time.Now().UnixNano(), "Seed for random generator (default current time)")
startPeriod = flag.Duration("start_period", 10*time.Second, "Duration to spread start of hosts over")
configInterval = flag.Duration("config_interval", 1*time.Minute, "Interval for config requests")
// Flag logger_tls_period defines how often to check for sending scheduled query results.
// osquery-perf will send log requests with results only if there are scheduled queries configured AND it's their time to run.
logInterval = flag.Duration("logger_tls_period", 10*time.Second, "Interval for scheduled queries log requests")
queryInterval = flag.Duration("query_interval", 10*time.Second, "Interval for live query requests")
mdmCheckInInterval = flag.Duration("mdm_check_in_interval", 10*time.Second, "Interval for performing MDM check ins")
onlyAlreadyEnrolled = flag.Bool("only_already_enrolled", false, "Only start agents that are already enrolled")
@ -1632,6 +1672,7 @@ func main() {
*enrollSecret,
tmpl,
*configInterval,
*logInterval,
*queryInterval,
*mdmCheckInInterval,
softwareEntityCount{