mirror of
https://github.com/fleetdm/fleet
synced 2026-05-23 00:49:03 +00:00
Add osquery-perf changes to buffer log results (#16659)
These are the osquery-perf changes for the load test to be performed in #16423. - Adding buffering of results when they fail to be sent to the server (when Fleet is offline/down). - I'm changing many `a.waitingDo(req, res)` to just do one request and if it fails it will retry on the next interval, which is what osquery does on `/api/osquery/config`/`/api/osquery/distributed/read`/`/api/osquery/distributed/write`/`/api/osquery/log`. - I'm reducing the logs as they will accumulate considerably while Fleet is down for 30m with 140k hosts. - Changing from `fmt.Printfs` to `log.Printfs` for consistency. - Adding more fine grained stats (to know which endpoints are failing) - Adding a mode to disable Fleet Desktop to simulate what the customer is running. - Adding `logger_tls_max_lines` flag that simulates the osquery setting.
This commit is contained in:
parent
e7d2aee8a1
commit
ab55bd7bfd
1 changed files with 219 additions and 102 deletions
|
|
@ -58,7 +58,7 @@ func loadMacOSVulnerableSoftware() {
|
|||
for _, line := range lines {
|
||||
parts := bytes.Split(line, []byte("##"))
|
||||
if len(parts) < 2 {
|
||||
fmt.Println("skipping", string(line))
|
||||
log.Println("skipping", string(line))
|
||||
continue
|
||||
}
|
||||
macosVulnerableSoftware = append(macosVulnerableSoftware, fleet.Software{
|
||||
|
|
@ -67,7 +67,7 @@ func loadMacOSVulnerableSoftware() {
|
|||
Source: "apps",
|
||||
})
|
||||
}
|
||||
log.Printf("Loaded %d vulnerable macOS software\n", len(macosVulnerableSoftware))
|
||||
log.Printf("Loaded %d vulnerable macOS software", len(macosVulnerableSoftware))
|
||||
}
|
||||
|
||||
func loadSoftwareItems(fs embed.FS, path string) []map[string]string {
|
||||
|
|
@ -106,19 +106,24 @@ func init() {
|
|||
}
|
||||
|
||||
type Stats struct {
|
||||
startTime time.Time
|
||||
errors int
|
||||
osqueryEnrollments int
|
||||
orbitEnrollments int
|
||||
mdmEnrollments int
|
||||
distributedWrites int
|
||||
mdmCommandsReceived int
|
||||
distributedReads int
|
||||
configRequests int
|
||||
resultLogRequests int
|
||||
orbitErrors int
|
||||
mdmErrors int
|
||||
desktopErrors int
|
||||
startTime time.Time
|
||||
errors int
|
||||
osqueryEnrollments int
|
||||
orbitEnrollments int
|
||||
mdmEnrollments int
|
||||
distributedWrites int
|
||||
mdmCommandsReceived int
|
||||
distributedReads int
|
||||
configRequests int
|
||||
configErrors int
|
||||
resultLogRequests int
|
||||
orbitErrors int
|
||||
mdmErrors int
|
||||
desktopErrors int
|
||||
distributedReadErrors int
|
||||
distributedWriteErrors int
|
||||
resultLogErrors int
|
||||
bufferedLogs int
|
||||
|
||||
l sync.Mutex
|
||||
}
|
||||
|
|
@ -171,6 +176,12 @@ func (s *Stats) IncrementConfigRequests() {
|
|||
s.configRequests++
|
||||
}
|
||||
|
||||
func (s *Stats) IncrementConfigErrors() {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
s.configErrors++
|
||||
}
|
||||
|
||||
func (s *Stats) IncrementResultLogRequests() {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
|
@ -195,13 +206,39 @@ func (s *Stats) IncrementDesktopErrors() {
|
|||
s.desktopErrors++
|
||||
}
|
||||
|
||||
func (s *Stats) IncrementDistributedReadErrors() {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
s.distributedReadErrors++
|
||||
}
|
||||
|
||||
func (s *Stats) IncrementDistributedWriteErrors() {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
s.distributedWriteErrors++
|
||||
}
|
||||
|
||||
func (s *Stats) IncrementResultLogErrors() {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
s.resultLogErrors++
|
||||
}
|
||||
|
||||
func (s *Stats) UpdateBufferedLogs(v int) {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
s.bufferedLogs += v
|
||||
if s.bufferedLogs < 0 {
|
||||
s.bufferedLogs = 0
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stats) Log() {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
fmt.Printf(
|
||||
"%s :: uptime: %s, error rate: %.2f, osquery enrolls: %d, orbit enrolls: %d, mdm enrolls: %d, distributed/reads: %d, distributed/writes: %d, config requests: %d, result log requests: %d, mdm commands received: %d, orbit errors: %d, desktop errors: %d, mdm errors: %d\n",
|
||||
time.Now().Format("2006-01-02T15:04:05Z"),
|
||||
log.Printf(
|
||||
"uptime: %s, error rate: %.2f, osquery enrolls: %d, orbit enrolls: %d, mdm enrolls: %d, distributed/reads: %d, distributed/writes: %d, config requests: %d, result log requests: %d, mdm commands received: %d, config errors: %d, distributed/read errors: %d, distributed/write errors: %d, log result errors: %d, orbit errors: %d, desktop errors: %d, mdm errors: %d, buffered logs: %d",
|
||||
time.Since(s.startTime).Round(time.Second),
|
||||
float64(s.errors)/float64(s.osqueryEnrollments),
|
||||
s.osqueryEnrollments,
|
||||
|
|
@ -212,9 +249,14 @@ func (s *Stats) Log() {
|
|||
s.configRequests,
|
||||
s.resultLogRequests,
|
||||
s.mdmCommandsReceived,
|
||||
s.configErrors,
|
||||
s.distributedReadErrors,
|
||||
s.distributedWriteErrors,
|
||||
s.resultLogErrors,
|
||||
s.orbitErrors,
|
||||
s.desktopErrors,
|
||||
s.mdmErrors,
|
||||
s.bufferedLogs,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -242,12 +284,12 @@ func (n *nodeKeyManager) LoadKeys() {
|
|||
|
||||
data, err := os.ReadFile(n.filepath)
|
||||
if err != nil {
|
||||
fmt.Println("WARNING (ignore if creating a new node key file): error loading nodekey file:", err)
|
||||
log.Println("WARNING (ignore if creating a new node key file): error loading nodekey file:", err)
|
||||
return
|
||||
}
|
||||
n.nodekeys = strings.Split(string(data), "\n")
|
||||
n.nodekeys = n.nodekeys[:len(n.nodekeys)-1] // remove last empty node key due to new line.
|
||||
fmt.Printf("loaded %d node keys\n", len(n.nodekeys))
|
||||
log.Printf("loaded %d node keys", len(n.nodekeys))
|
||||
}
|
||||
|
||||
func (n *nodeKeyManager) Get(i int) string {
|
||||
|
|
@ -271,12 +313,12 @@ func (n *nodeKeyManager) Add(nodekey string) {
|
|||
|
||||
f, err := os.OpenFile(n.filepath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
|
||||
if err != nil {
|
||||
fmt.Println("error opening nodekey file:", err.Error())
|
||||
log.Printf("error opening nodekey file: %s", err.Error())
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
if _, err := f.WriteString(nodekey + "\n"); err != nil {
|
||||
fmt.Println("error writing nodekey file:", err)
|
||||
log.Printf("error writing nodekey file: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -311,7 +353,10 @@ type agent struct {
|
|||
// isEnrolledToMDMMu protects isEnrolledToMDM.
|
||||
isEnrolledToMDMMu sync.Mutex
|
||||
|
||||
disableScriptExec bool
|
||||
disableScriptExec bool
|
||||
disableFleetDesktop bool
|
||||
loggerTLSMaxLines int
|
||||
|
||||
// atomic boolean is set to true when executing scripts, so that only a
|
||||
// single goroutine at a time can execute scripts.
|
||||
scriptExecRunning atomic.Bool
|
||||
|
|
@ -329,9 +374,10 @@ type agent struct {
|
|||
MDMCheckInInterval time.Duration
|
||||
DiskEncryptionEnabled bool
|
||||
|
||||
scheduledQueriesMu sync.Mutex
|
||||
scheduledQueriesMu sync.Mutex // protects the below members
|
||||
scheduledQueries []string
|
||||
scheduledQueryData []scheduledQuery
|
||||
bufferedResults []json.RawMessage
|
||||
}
|
||||
|
||||
type entityCount struct {
|
||||
|
|
@ -366,6 +412,8 @@ func newAgent(
|
|||
liveQueryFailProb float64,
|
||||
liveQueryNoResultsProb float64,
|
||||
disableScriptExec bool,
|
||||
disableFleetDesktop bool,
|
||||
loggerTLSMaxLines int,
|
||||
) *agent {
|
||||
var deviceAuthToken *string
|
||||
if rand.Float64() <= orbitProb {
|
||||
|
|
@ -417,8 +465,10 @@ func newAgent(
|
|||
UUID: uuid,
|
||||
SerialNumber: serialNumber,
|
||||
|
||||
mdmClient: mdmClient,
|
||||
disableScriptExec: disableScriptExec,
|
||||
mdmClient: mdmClient,
|
||||
disableScriptExec: disableScriptExec,
|
||||
disableFleetDesktop: disableFleetDesktop,
|
||||
loggerTLSMaxLines: loggerTLSMaxLines,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -437,9 +487,9 @@ type scheduledQuery struct {
|
|||
Platform string `json:"platform"`
|
||||
Version string `json:"version"`
|
||||
Snapshot bool `json:"snapshot"`
|
||||
NextRun float64
|
||||
NumResults uint
|
||||
PackName string
|
||||
nextRun float64
|
||||
numRows uint
|
||||
packName string
|
||||
}
|
||||
|
||||
func (a *agent) isOrbit() bool {
|
||||
|
|
@ -457,13 +507,12 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
|
|||
return
|
||||
}
|
||||
|
||||
a.config()
|
||||
_ = a.config()
|
||||
|
||||
resp, err := a.DistributedRead()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
} else {
|
||||
if err == nil {
|
||||
if len(resp.Queries) > 0 {
|
||||
a.DistributedWrite(resp.Queries)
|
||||
_ = a.DistributedWrite(resp.Queries)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -473,7 +522,7 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
|
|||
|
||||
if a.mdmClient != nil {
|
||||
if err := a.mdmClient.Enroll(); err != nil {
|
||||
log.Printf("MDM enroll failed: %s\n", err)
|
||||
log.Printf("MDM enroll failed: %s", err)
|
||||
a.stats.IncrementMDMErrors()
|
||||
return
|
||||
}
|
||||
|
|
@ -496,11 +545,8 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
|
|||
defer liveQueryTicker.Stop()
|
||||
|
||||
for range liveQueryTicker.C {
|
||||
resp, err := a.DistributedRead()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
} else if len(resp.Queries) > 0 {
|
||||
a.DistributedWrite(resp.Queries)
|
||||
if resp, err := a.DistributedRead(); err == nil && len(resp.Queries) > 0 {
|
||||
_ = a.DistributedWrite(resp.Queries)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
@ -511,7 +557,7 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
|
|||
defer configTicker.Stop()
|
||||
|
||||
for range configTicker.C {
|
||||
a.config()
|
||||
_ = a.config()
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -523,19 +569,42 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) {
|
|||
var results []json.RawMessage
|
||||
now := time.Now().Unix()
|
||||
a.scheduledQueriesMu.Lock()
|
||||
prevCount := len(a.bufferedResults)
|
||||
for i, query := range a.scheduledQueryData {
|
||||
if query.NextRun == 0 || now >= int64(query.NextRun) {
|
||||
results = append(results, a.scheduledQueryResults(query.PackName, query.Name, int(query.NumResults)))
|
||||
a.scheduledQueryData[i].NextRun = float64(now + int64(query.ScheduleInterval))
|
||||
if query.nextRun == 0 || now >= int64(query.nextRun) {
|
||||
results = append(results, a.scheduledQueryResults(query.packName, query.Name, int(query.numRows)))
|
||||
a.scheduledQueryData[i].nextRun = float64(now + int64(query.ScheduleInterval))
|
||||
}
|
||||
}
|
||||
a.scheduledQueriesMu.Unlock()
|
||||
if len(results) > 0 {
|
||||
a.SubmitLogs(results)
|
||||
a.bufferedResults = append(a.bufferedResults, results...)
|
||||
if len(a.bufferedResults) > 1_000_000 { // osquery buffered_log_max is 1M
|
||||
extra := len(a.bufferedResults) - 1_000_000
|
||||
a.bufferedResults = a.bufferedResults[extra:]
|
||||
}
|
||||
a.sendLogsBatch()
|
||||
newBufferedCount := len(a.bufferedResults) - prevCount
|
||||
a.stats.UpdateBufferedLogs(newBufferedCount)
|
||||
a.scheduledQueriesMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// sendLogsBatch sends up to loggerTLSMaxLines logs and updates the buffer.
|
||||
func (a *agent) sendLogsBatch() {
|
||||
if len(a.bufferedResults) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
batchSize := a.loggerTLSMaxLines
|
||||
if len(a.bufferedResults) < batchSize {
|
||||
batchSize = len(a.bufferedResults)
|
||||
}
|
||||
batch := a.bufferedResults[:batchSize]
|
||||
if err := a.submitLogs(batch); err != nil {
|
||||
return
|
||||
}
|
||||
a.bufferedResults = a.bufferedResults[batchSize:]
|
||||
}
|
||||
|
||||
func (a *agent) runOrbitLoop() {
|
||||
orbitClient, err := service.NewOrbitClient(
|
||||
"",
|
||||
|
|
@ -558,28 +627,30 @@ func (a *agent) runOrbitLoop() {
|
|||
|
||||
deviceClient, err := service.NewDeviceClient(a.serverAddress, true, "", nil, "")
|
||||
if err != nil {
|
||||
log.Println("creating device client: ", err)
|
||||
log.Fatal("creating device client: ", err)
|
||||
}
|
||||
|
||||
// orbit does a config check when it starts
|
||||
if _, err := orbitClient.GetConfig(); err != nil {
|
||||
a.stats.IncrementOrbitErrors()
|
||||
log.Println("orbitClient.GetConfig: ", err)
|
||||
}
|
||||
|
||||
tokenRotationEnabled := orbitClient.GetServerCapabilities().Has(fleet.CapabilityOrbitEndpoints) &&
|
||||
orbitClient.GetServerCapabilities().Has(fleet.CapabilityTokenRotation)
|
||||
tokenRotationEnabled := false
|
||||
if !a.disableFleetDesktop {
|
||||
tokenRotationEnabled = orbitClient.GetServerCapabilities().Has(fleet.CapabilityOrbitEndpoints) &&
|
||||
orbitClient.GetServerCapabilities().Has(fleet.CapabilityTokenRotation)
|
||||
|
||||
// it also writes and checks the device token
|
||||
if tokenRotationEnabled {
|
||||
if err := orbitClient.SetOrUpdateDeviceToken(*a.deviceAuthToken); err != nil {
|
||||
a.stats.IncrementOrbitErrors()
|
||||
log.Println("orbitClient.SetOrUpdateDeviceToken: ", err)
|
||||
}
|
||||
// it also writes and checks the device token
|
||||
if tokenRotationEnabled {
|
||||
if err := orbitClient.SetOrUpdateDeviceToken(*a.deviceAuthToken); err != nil {
|
||||
a.stats.IncrementOrbitErrors()
|
||||
log.Println("orbitClient.SetOrUpdateDeviceToken: ", err)
|
||||
}
|
||||
|
||||
if err := deviceClient.CheckToken(*a.deviceAuthToken); err != nil {
|
||||
a.stats.IncrementOrbitErrors()
|
||||
log.Println("deviceClient.CheckToken: ", err)
|
||||
if err := deviceClient.CheckToken(*a.deviceAuthToken); err != nil {
|
||||
a.stats.IncrementOrbitErrors()
|
||||
log.Println("deviceClient.CheckToken: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -606,8 +677,10 @@ func (a *agent) runOrbitLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// fleet desktop performs a burst of check token requests when it's initialized
|
||||
checkToken()
|
||||
// Fleet Desktop performs a burst of check token requests when it's initialized
|
||||
if !a.disableFleetDesktop {
|
||||
checkToken()
|
||||
}
|
||||
|
||||
// orbit makes a call to check the config and update the CLI flags every 30
|
||||
// seconds
|
||||
|
|
@ -629,7 +702,6 @@ func (a *agent) runOrbitLoop() {
|
|||
cfg, err := orbitClient.GetConfig()
|
||||
if err != nil {
|
||||
a.stats.IncrementOrbitErrors()
|
||||
log.Println("orbitClient.GetConfig: ", err)
|
||||
continue
|
||||
}
|
||||
if len(cfg.Notifications.PendingScriptExecutionIDs) > 0 {
|
||||
|
|
@ -638,7 +710,7 @@ func (a *agent) runOrbitLoop() {
|
|||
go a.execScripts(cfg.Notifications.PendingScriptExecutionIDs, orbitClient)
|
||||
}
|
||||
case <-orbitTokenRemoteCheckTicker:
|
||||
if tokenRotationEnabled {
|
||||
if !a.disableFleetDesktop && tokenRotationEnabled {
|
||||
if err := deviceClient.CheckToken(*a.deviceAuthToken); err != nil {
|
||||
a.stats.IncrementOrbitErrors()
|
||||
log.Println("deviceClient.CheckToken: ", err)
|
||||
|
|
@ -646,7 +718,7 @@ func (a *agent) runOrbitLoop() {
|
|||
}
|
||||
}
|
||||
case <-orbitTokenRotationTicker:
|
||||
if tokenRotationEnabled {
|
||||
if !a.disableFleetDesktop && tokenRotationEnabled {
|
||||
newToken := ptr.String(uuid.NewString())
|
||||
if err := orbitClient.SetOrUpdateDeviceToken(*newToken); err != nil {
|
||||
a.stats.IncrementOrbitErrors()
|
||||
|
|
@ -660,14 +732,15 @@ func (a *agent) runOrbitLoop() {
|
|||
case <-capabilitiesCheckerTicker:
|
||||
if err := orbitClient.Ping(); err != nil {
|
||||
a.stats.IncrementOrbitErrors()
|
||||
log.Println("orbitClient.Ping: ", err)
|
||||
continue
|
||||
}
|
||||
case <-fleetDesktopPolicyTicker:
|
||||
if _, err := deviceClient.DesktopSummary(*a.deviceAuthToken); err != nil {
|
||||
a.stats.IncrementDesktopErrors()
|
||||
log.Println("deviceClient.NumberOfFailingPolicies: ", err)
|
||||
continue
|
||||
if !a.disableFleetDesktop {
|
||||
if _, err := deviceClient.DesktopSummary(*a.deviceAuthToken); err != nil {
|
||||
a.stats.IncrementDesktopErrors()
|
||||
log.Println("deviceClient.NumberOfFailingPolicies: ", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -679,7 +752,7 @@ func (a *agent) runMDMLoop() {
|
|||
for range mdmCheckInTicker {
|
||||
mdmCommandPayload, err := a.mdmClient.Idle()
|
||||
if err != nil {
|
||||
log.Printf("MDM Idle request failed: %s\n", err)
|
||||
log.Printf("MDM Idle request failed: %s", err)
|
||||
a.stats.IncrementMDMErrors()
|
||||
continue
|
||||
}
|
||||
|
|
@ -688,7 +761,7 @@ func (a *agent) runMDMLoop() {
|
|||
a.stats.IncrementMDMCommandsReceived()
|
||||
mdmCommandPayload, err = a.mdmClient.Acknowledge(mdmCommandPayload.CommandUUID)
|
||||
if err != nil {
|
||||
log.Printf("MDM Acknowledge request failed: %s\n", err)
|
||||
log.Printf("MDM Acknowledge request failed: %s", err)
|
||||
a.stats.IncrementMDMErrors()
|
||||
break INNER_FOR_LOOP
|
||||
}
|
||||
|
|
@ -703,7 +776,7 @@ func (a *agent) execScripts(execIDs []string, orbitClient *service.OrbitClient)
|
|||
}
|
||||
defer a.scriptExecRunning.Store(false)
|
||||
|
||||
log.Printf("running scripts: %v\n", execIDs)
|
||||
log.Printf("running scripts: %v", execIDs)
|
||||
for _, execID := range execIDs {
|
||||
if a.disableScriptExec {
|
||||
// send a no-op result without executing if script exec is disabled
|
||||
|
|
@ -716,7 +789,7 @@ func (a *agent) execScripts(execIDs []string, orbitClient *service.OrbitClient)
|
|||
log.Println("save disabled host script result:", err)
|
||||
return
|
||||
}
|
||||
log.Printf("did save disabled host script result: id=%s\n", execID)
|
||||
log.Printf("did save disabled host script result: id=%s", execID)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -744,14 +817,18 @@ func (a *agent) execScripts(execIDs []string, orbitClient *service.OrbitClient)
|
|||
log.Println("save host script result:", err)
|
||||
return
|
||||
}
|
||||
log.Printf("did exec and save host script result: id=%s, output size=%d, runtime=%d, exit code=%d\n", execID, base64.StdEncoding.EncodedLen(n), runtime, exitCode)
|
||||
log.Printf("did exec and save host script result: id=%s, output size=%d, runtime=%d, exit code=%d", execID, base64.StdEncoding.EncodedLen(n), runtime, exitCode)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *agent) waitingDo(req *fasthttp.Request, res *fasthttp.Response) {
|
||||
err := a.fastClient.Do(req, res)
|
||||
for err != nil || res.StatusCode() != http.StatusOK {
|
||||
fmt.Println(err, res.StatusCode())
|
||||
if err != nil {
|
||||
log.Printf("failed to run request: %s", err)
|
||||
} else { // res.StatusCode() != http.StatusOK
|
||||
log.Printf("request failed: %d", res.StatusCode())
|
||||
}
|
||||
a.stats.IncrementErrors(1)
|
||||
<-time.Tick(time.Duration(rand.Intn(120)+1) * time.Second)
|
||||
err = a.fastClient.Do(req, res)
|
||||
|
|
@ -851,7 +928,7 @@ func (a *agent) enroll(i int, onlyAlreadyEnrolled bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *agent) config() {
|
||||
func (a *agent) config() error {
|
||||
body := bytes.NewBufferString(`{"node_key": "` + a.nodeKey + `"}`)
|
||||
|
||||
req := fasthttp.AcquireRequest()
|
||||
|
|
@ -862,16 +939,20 @@ func (a *agent) config() {
|
|||
req.SetRequestURI(a.serverAddress + "/api/osquery/config")
|
||||
res := fasthttp.AcquireResponse()
|
||||
|
||||
a.waitingDo(req, res)
|
||||
err := a.fastClient.Do(req, res)
|
||||
if err != nil {
|
||||
return fmt.Errorf("config request failed to run: %w", err)
|
||||
}
|
||||
|
||||
fasthttp.ReleaseRequest(req)
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
|
||||
a.stats.IncrementConfigRequests()
|
||||
|
||||
if res.StatusCode() != http.StatusOK {
|
||||
log.Println("config status:", res.StatusCode())
|
||||
return
|
||||
statusCode := res.StatusCode()
|
||||
if statusCode != http.StatusOK {
|
||||
a.stats.IncrementConfigErrors()
|
||||
return fmt.Errorf("config request failed: %d", statusCode)
|
||||
}
|
||||
|
||||
parsedResp := struct {
|
||||
|
|
@ -880,8 +961,7 @@ func (a *agent) config() {
|
|||
} `json:"packs"`
|
||||
}{}
|
||||
if err := json.Unmarshal(res.Body(), &parsedResp); err != nil {
|
||||
log.Println("json parse at config:", err)
|
||||
return
|
||||
return fmt.Errorf("json parse at config: %w", err)
|
||||
}
|
||||
|
||||
var scheduledQueries []string
|
||||
|
|
@ -891,25 +971,24 @@ func (a *agent) config() {
|
|||
scheduledQueries = append(scheduledQueries, packName+"_"+queryName)
|
||||
m, ok := query.(map[string]interface{})
|
||||
if !ok {
|
||||
log.Fatalf("processing scheduled query failed: %v\n", query)
|
||||
return fmt.Errorf("processing scheduled query failed: %v", query)
|
||||
}
|
||||
|
||||
q := scheduledQuery{}
|
||||
q.PackName = packName
|
||||
q.packName = packName
|
||||
q.Name = queryName
|
||||
q.NumResults = 1
|
||||
q.numRows = 1
|
||||
parts := strings.Split(q.Name, "_")
|
||||
if len(parts) == 2 {
|
||||
num, err := strconv.ParseInt(parts[1], 10, 32)
|
||||
if err != nil {
|
||||
num = 1
|
||||
}
|
||||
q.NumResults = uint(num)
|
||||
q.numRows = uint(num)
|
||||
}
|
||||
q.ScheduleInterval = m["interval"].(float64)
|
||||
q.Query = m["query"].(string)
|
||||
scheduledQueryData = append(scheduledQueryData, q)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -917,6 +996,8 @@ func (a *agent) config() {
|
|||
a.scheduledQueries = scheduledQueries
|
||||
a.scheduledQueryData = scheduledQueryData
|
||||
a.scheduledQueriesMu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
const stringVals = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_."
|
||||
|
|
@ -1046,16 +1127,25 @@ func (a *agent) DistributedRead() (*distributedReadResponse, error) {
|
|||
req.SetRequestURI(a.serverAddress + "/api/osquery/distributed/read")
|
||||
res := fasthttp.AcquireResponse()
|
||||
|
||||
a.waitingDo(req, res)
|
||||
err := a.fastClient.Do(req, res)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("distributed/read request failed to run: %w", err)
|
||||
}
|
||||
|
||||
fasthttp.ReleaseRequest(req)
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
|
||||
a.stats.IncrementDistributedReads()
|
||||
|
||||
statusCode := res.StatusCode()
|
||||
if statusCode != http.StatusOK {
|
||||
a.stats.IncrementDistributedReadErrors()
|
||||
return nil, fmt.Errorf("distributed/read request failed: %d", statusCode)
|
||||
}
|
||||
|
||||
var parsedResp distributedReadResponse
|
||||
if err := json.Unmarshal(res.Body(), &parsedResp); err != nil {
|
||||
log.Println("json parse:", err)
|
||||
log.Printf("json parse: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
@ -1449,7 +1539,7 @@ func (a *agent) processQuery(name, query string) (
|
|||
}
|
||||
}
|
||||
|
||||
func (a *agent) DistributedWrite(queries map[string]string) {
|
||||
func (a *agent) DistributedWrite(queries map[string]string) error {
|
||||
r := service.SubmitDistributedQueryResultsRequest{
|
||||
Results: make(fleet.OsqueryDistributedQueryResults),
|
||||
Statuses: make(map[string]fleet.OsqueryStatus),
|
||||
|
|
@ -1492,18 +1582,29 @@ func (a *agent) DistributedWrite(queries map[string]string) {
|
|||
req.SetRequestURI(a.serverAddress + "/api/osquery/distributed/write")
|
||||
res := fasthttp.AcquireResponse()
|
||||
|
||||
a.waitingDo(req, res)
|
||||
err = a.fastClient.Do(req, res)
|
||||
if err != nil {
|
||||
return fmt.Errorf("distributed/write request failed to run: %w", err)
|
||||
}
|
||||
|
||||
fasthttp.ReleaseRequest(req)
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
|
||||
a.stats.IncrementDistributedWrites()
|
||||
|
||||
statusCode := res.StatusCode()
|
||||
if statusCode != http.StatusOK {
|
||||
a.stats.IncrementDistributedWriteErrors()
|
||||
return fmt.Errorf("distributed/write request failed: %d", statusCode)
|
||||
}
|
||||
|
||||
// No need to read the distributed write body
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *agent) scheduledQueryResults(packName, queryName string, numResults int) json.RawMessage {
|
||||
return json.RawMessage(`{
|
||||
"snapshot": [` + results(numResults, a.UUID) + `
|
||||
"snapshot": [` + rows(numResults, a.UUID) + `
|
||||
],
|
||||
"action": "snapshot",
|
||||
"name": "pack/` + packName + `/` + queryName + `",
|
||||
|
|
@ -1520,7 +1621,7 @@ func (a *agent) scheduledQueryResults(packName, queryName string, numResults int
|
|||
}`)
|
||||
}
|
||||
|
||||
func (a *agent) SubmitLogs(results []json.RawMessage) {
|
||||
func (a *agent) submitLogs(results []json.RawMessage) error {
|
||||
jsonResults, err := json.Marshal(results)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
@ -1535,7 +1636,6 @@ func (a *agent) SubmitLogs(results []json.RawMessage) {
|
|||
LogType: "result",
|
||||
Data: jsonResults,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
@ -1549,16 +1649,27 @@ func (a *agent) SubmitLogs(results []json.RawMessage) {
|
|||
req.SetRequestURI(a.serverAddress + "/api/osquery/log")
|
||||
res := fasthttp.AcquireResponse()
|
||||
|
||||
a.waitingDo(req, res)
|
||||
err = a.fastClient.Do(req, res)
|
||||
if err != nil {
|
||||
return fmt.Errorf("log request failed to run: %w", err)
|
||||
}
|
||||
|
||||
fasthttp.ReleaseRequest(req)
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
|
||||
a.stats.IncrementResultLogRequests()
|
||||
|
||||
statusCode := res.StatusCode()
|
||||
if statusCode != http.StatusOK {
|
||||
a.stats.IncrementResultLogErrors()
|
||||
return fmt.Errorf("log request failed: %d", statusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Creates a set of results for use in tests for Query Results.
|
||||
func results(num int, hostUUID string) string {
|
||||
// rows returns a set of rows for use in tests for query results.
|
||||
func rows(num int, hostUUID string) string {
|
||||
b := strings.Builder{}
|
||||
for i := 0; i < num; i++ {
|
||||
b.WriteString(` {
|
||||
|
|
@ -1616,7 +1727,7 @@ func main() {
|
|||
commonSoftwareUninstallCount = flag.Int("common_software_uninstall_count", 1, "Number of common software to uninstall")
|
||||
commonSoftwareUninstallProb = flag.Float64("common_software_uninstall_prob", 0.1, "Probability of uninstalling common_software_uninstall_count unique software/s")
|
||||
|
||||
uniqueSoftwareCount = flag.Int("unique_software_count", 10, "Number of uninstalls ")
|
||||
uniqueSoftwareCount = flag.Int("unique_software_count", 1, "Number of unique software installed on each host")
|
||||
uniqueSoftwareUninstallCount = flag.Int("unique_software_uninstall_count", 1, "Number of unique software to uninstall")
|
||||
uniqueSoftwareUninstallProb = flag.Float64("unique_software_uninstall_prob", 0.1, "Probability of uninstalling unique_software_uninstall_count common software/s")
|
||||
|
||||
|
|
@ -1641,6 +1752,10 @@ func main() {
|
|||
liveQueryNoResultsProb = flag.Float64("live_query_no_results_prob", 0.2, "Probability of a live query returning no results")
|
||||
|
||||
disableScriptExec = flag.Bool("disable_script_exec", false, "Disable script execution support")
|
||||
|
||||
disableFleetDesktop = flag.Bool("disable_fleet_desktop", false, "Disable Fleet Desktop")
|
||||
// logger_tls_max_lines is simulating the osquery setting with the same name.
|
||||
loggerTLSMaxLines = flag.Int("", 1024, "Maximum number of buffered result log lines to send on every log request")
|
||||
)
|
||||
|
||||
flag.Parse()
|
||||
|
|
@ -1652,10 +1767,10 @@ func main() {
|
|||
*orbitProb = 0
|
||||
}
|
||||
|
||||
if *commonSoftwareUninstallCount >= *commonSoftwareCount {
|
||||
if *commonSoftwareUninstallCount > *commonSoftwareCount {
|
||||
log.Fatalf("Argument common_software_uninstall_count cannot be bigger than common_software_count")
|
||||
}
|
||||
if *uniqueSoftwareUninstallCount >= *uniqueSoftwareCount {
|
||||
if *uniqueSoftwareUninstallCount > *uniqueSoftwareCount {
|
||||
log.Fatalf("Argument unique_software_uninstall_count cannot be bigger than unique_software_count")
|
||||
}
|
||||
|
||||
|
|
@ -1761,6 +1876,8 @@ func main() {
|
|||
*liveQueryFailProb,
|
||||
*liveQueryNoResultsProb,
|
||||
*disableScriptExec,
|
||||
*disableFleetDesktop,
|
||||
*loggerTLSMaxLines,
|
||||
)
|
||||
a.stats = stats
|
||||
a.nodeKeyManager = nodeKeyManager
|
||||
|
|
@ -1768,6 +1885,6 @@ func main() {
|
|||
time.Sleep(sleepTime)
|
||||
}
|
||||
|
||||
fmt.Println("Agents running. Kill with C-c.")
|
||||
log.Println("Agents running. Kill with C-c.")
|
||||
<-make(chan struct{})
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue