mirror of
https://github.com/fleetdm/fleet
synced 2026-05-23 17:08:53 +00:00
osquery-perf: can send scheduled query results (#14502)
This commit is contained in:
parent
e46e83baf8
commit
f92695fda5
1 changed files with 121 additions and 1 deletions
|
|
@ -303,6 +303,7 @@ type agent struct {
|
|||
QueryInterval time.Duration
|
||||
MDMCheckInInterval time.Duration
|
||||
DiskEncryptionEnabled bool
|
||||
scheduledQueryData []scheduledQuery
|
||||
}
|
||||
|
||||
type entityCount struct {
|
||||
|
|
@ -400,6 +401,18 @@ type distributedReadResponse struct {
|
|||
Queries map[string]string `json:"queries"`
|
||||
}
|
||||
|
||||
type scheduledQuery struct {
|
||||
Query string `json:"query"`
|
||||
Name string `json:"name"`
|
||||
ScheduleInterval float64 `json:"interval"`
|
||||
Platform string `json:"platform"`
|
||||
Version string `json:"version"`
|
||||
Snapshot bool `json:"snapshot"`
|
||||
NextRun float64
|
||||
NumResults uint
|
||||
PackName string
|
||||
}
|
||||
|
||||
func (a *agent) isOrbit() bool {
|
||||
return a.deviceAuthToken != nil
|
||||
}
|
||||
|
|
@ -442,6 +455,8 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
|
|||
|
||||
configTicker := time.Tick(a.ConfigInterval)
|
||||
liveQueryTicker := time.Tick(a.QueryInterval)
|
||||
// Since this is an internal timer, we don't need to configure it.
|
||||
logTicker := time.Tick(1 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-configTicker:
|
||||
|
|
@ -453,6 +468,16 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
|
|||
} else if len(resp.Queries) > 0 {
|
||||
a.DistributedWrite(resp.Queries)
|
||||
}
|
||||
case <-logTicker:
|
||||
// check if we have any scheduled queries
|
||||
for i, query := range a.scheduledQueryData {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
now := time.Now().Unix()
|
||||
if query.NextRun == 0 || now >= int64(query.NextRun) {
|
||||
a.SubmitLogs(query.Name, a.CachedString("id"), a.CachedString("hostname"), query.PackName, query.NumResults)
|
||||
a.scheduledQueryData[i].NextRun = float64(now + int64(query.ScheduleInterval))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -804,12 +829,35 @@ func (a *agent) config() {
|
|||
}
|
||||
|
||||
var scheduledQueries []string
|
||||
var scheduledQueryData []scheduledQuery
|
||||
for packName, pack := range parsedResp.Packs {
|
||||
for queryName := range pack.Queries {
|
||||
for queryName, query := range pack.Queries {
|
||||
scheduledQueries = append(scheduledQueries, packName+"_"+queryName)
|
||||
m, ok := query.(map[string]interface{})
|
||||
if !ok {
|
||||
log.Fatalf("processing scheduled query failed: %v\n", query)
|
||||
}
|
||||
|
||||
q := scheduledQuery{}
|
||||
q.PackName = packName
|
||||
q.Name = queryName
|
||||
q.NumResults = 1
|
||||
parts := strings.Split(q.Name, "_")
|
||||
if len(parts) > 0 {
|
||||
num, err := strconv.ParseInt(parts[1], 10, 32)
|
||||
if err != nil {
|
||||
num = 1
|
||||
}
|
||||
q.NumResults = uint(num)
|
||||
}
|
||||
q.ScheduleInterval = m["interval"].(float64)
|
||||
q.Query = m["query"].(string)
|
||||
scheduledQueryData = append(scheduledQueryData, q)
|
||||
|
||||
}
|
||||
}
|
||||
a.scheduledQueries = scheduledQueries
|
||||
a.scheduledQueryData = scheduledQueryData
|
||||
}
|
||||
|
||||
const stringVals = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_."
|
||||
|
|
@ -1371,6 +1419,78 @@ func (a *agent) DistributedWrite(queries map[string]string) {
|
|||
// No need to read the distributed write body
|
||||
}
|
||||
|
||||
func (a *agent) SubmitLogs(queryName, hostID, hostName, packName string, numResults uint) {
|
||||
type submitLogsRequest struct {
|
||||
NodeKey string `json:"node_key"`
|
||||
LogType string `json:"log_type"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
r := submitLogsRequest{
|
||||
NodeKey: a.nodeKey,
|
||||
LogType: "result",
|
||||
Data: json.RawMessage(`[{
|
||||
"snapshot": [` + results(int(numResults), a.UUID) + `
|
||||
],
|
||||
"action": "snapshot",
|
||||
"name": "pack/` + packName + `/` + queryName + `",
|
||||
"hostIdentifier": "` + a.UUID + `",
|
||||
"calendarTime": "Fri Oct 6 18:13:04 2023 UTC",
|
||||
"unixTime": 1696615984,
|
||||
"epoch": 0,
|
||||
"counter": 0,
|
||||
"numerics": false,
|
||||
"decorations": {
|
||||
"host_uuid": "187c4d56-8e45-1a9d-8513-ac17efd2f0fd",
|
||||
"hostname": "` + a.CachedString("hostname") + `"
|
||||
}
|
||||
}]`),
|
||||
}
|
||||
|
||||
body, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
req := fasthttp.AcquireRequest()
|
||||
req.SetBody(body)
|
||||
req.Header.SetMethod("POST")
|
||||
req.Header.SetContentType("application/json")
|
||||
req.Header.Add("User-Agent", "osquery/5.0.1")
|
||||
req.SetRequestURI(a.serverAddress + "/api/osquery/log")
|
||||
res := fasthttp.AcquireResponse()
|
||||
|
||||
a.waitingDo(req, res)
|
||||
|
||||
fasthttp.ReleaseRequest(req)
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
}
|
||||
|
||||
// Creates a set of results for use in tests for Query Results.
|
||||
func results(num int, hostID string) string {
|
||||
b := strings.Builder{}
|
||||
for i := 0; i < num; i++ {
|
||||
b.WriteString(` {
|
||||
"build_distro": "centos7",
|
||||
"build_platform": "linux",
|
||||
"config_hash": "eed0d8296e5f90b790a23814a9db7a127b13498d",
|
||||
"config_valid": "1",
|
||||
"extensions": "active",
|
||||
"instance_id": "e5799132-85ab-4cfa-89f3-03e0dd3c509a",
|
||||
"pid": "3574",
|
||||
"platform_mask": "9",
|
||||
"start_time": "1696502961",
|
||||
"uuid": "` + hostID + `",
|
||||
"version": "5.9.2",
|
||||
"watcher": "3570"
|
||||
}`)
|
||||
if i != num-1 {
|
||||
b.WriteString(",")
|
||||
}
|
||||
}
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func main() {
|
||||
validTemplateNames := map[string]bool{
|
||||
"mac10.14.6.tmpl": true,
|
||||
|
|
|
|||
Loading…
Reference in a new issue