mirror of
https://github.com/fleetdm/fleet
synced 2026-04-21 13:37:30 +00:00
Reduced redundant database calls in the osquery distributed query results hot path (#42157)
<!-- Add the related story/sub-task/bug number, like Resolves #123, or remove if NA --> **Related issue:** Resolves #42156 The core change: instead of loading AppConfig, HostFeatures, TeamMDMConfig, and rebuilding the detail query map **independently inside each call** to `directIngestDetailQuery` and `ingestDetailQuery` (so ~2N times per check-in with N detail results), we load everything **once** into a `hostDetailQueryConfig` struct and pass it through. ## Before ``` SubmitDistributedQueryResults loop: for each query result: → ingestQueryResults → directIngestDetailQuery: loads AppConfig, HostFeatures, TeamMDMConfig, builds detail query map → ingestDetailQuery: loads AppConfig, HostFeatures, TeamMDMConfig, builds detail query map after loop: loads AppConfig for labels/policies loads AppConfig AGAIN for deferred host save ``` ## After ``` SubmitDistributedQueryResults loop: on first detail query result: → loadHostDetailQueryConfig: loads AppConfig, HostFeatures, TeamMDMConfig, builds detail query map ONCE for each query result: → ingestQueryResults (receives pre-loaded config) → directIngestDetailQuery: just looks up the query in the cached map → ingestDetailQuery: just looks up the query in the cached map after loop: loads AppConfig once for labels/policies/deferred host save ``` The detail config is **lazy-loaded** — if a check-in only has label/policy results and no detail queries, the HostFeatures/TeamMDMConfig calls are skipped entirely. ## Other changes bundled in 1. **`serialUpdateHost`** now receives the request context and uses `context.WithoutCancel(ctx)` instead of `context.Background()`, so the background goroutine preserves OTEL traces and logging context without being subject to request cancellation. 2. **Deferred save host** at the end of `SubmitDistributedQueryResults` reuses the already-loaded AppConfig instead of loading it a third time. The old code silently skipped the host save if that third load failed. # Checklist for submitter If some of the following don't apply, delete the relevant line. - [x] Changes file added for user-visible changes in `changes/`, `orbit/changes/` or `ee/fleetd-chrome/changes`. ## Testing - [x] QA'd all new/changed functionality manually - Ran a local load test with osquery perf <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Performance** * Optimized distributed query result processing by preloading configuration once per request instead of repeatedly per query result, reducing redundant database calls and improving overall query performance. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
parent
648a75c429
commit
7a6a95703f
2 changed files with 97 additions and 104 deletions
1
changes/35467-detail-query-config-preload
Normal file
1
changes/35467-detail-query-config-preload
Normal file
|
|
@ -0,0 +1 @@
|
|||
* Reduced redundant database calls in the osquery distributed query results hot path by pre-loading configuration (AppConfig, HostFeatures, TeamMDMConfig, conditional access) once per request instead of once per detail query result.
|
||||
|
|
@ -229,7 +229,7 @@ func (svc *Service) EnrollOsquery(ctx context.Context, enrollSecret, hostIdentif
|
|||
|
||||
if save {
|
||||
if appConfig.ServerSettings.DeferredSaveHost {
|
||||
go svc.serialUpdateHost(host)
|
||||
go svc.serialUpdateHost(ctx, host)
|
||||
} else {
|
||||
if err := svc.ds.UpdateHost(ctx, host); err != nil {
|
||||
return "", ctxerr.Wrap(ctx, err, "save host in enroll agent")
|
||||
|
|
@ -242,12 +242,14 @@ func (svc *Service) EnrollOsquery(ctx context.Context, enrollSecret, hostIdentif
|
|||
|
||||
var counter = int64(0)
|
||||
|
||||
func (svc *Service) serialUpdateHost(host *fleet.Host) {
|
||||
func (svc *Service) serialUpdateHost(ctx context.Context, host *fleet.Host) {
|
||||
newVal := atomic.AddInt64(&counter, 1)
|
||||
defer func() {
|
||||
atomic.AddInt64(&counter, -1)
|
||||
}()
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
// Detach from request cancellation but preserve context values (e.g. OTEL trace),
|
||||
// then apply a timeout for this background operation.
|
||||
ctx, cancelFunc := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancelFunc()
|
||||
svc.logger.DebugContext(ctx, "serial update host background", "background", newVal)
|
||||
err := svc.ds.SerialUpdateHost(ctx, host)
|
||||
|
|
@ -715,6 +717,53 @@ var criticalDetailQueries = map[string]bool{
|
|||
"mdm_windows": true,
|
||||
}
|
||||
|
||||
// hostDetailQueryConfig holds pre-loaded configuration data needed for building and ingesting
|
||||
// detail queries. Loading this once and passing it through avoids redundant database calls
|
||||
// (AppConfig, HostFeatures, TeamMDMConfig, conditional access) on every detail query result,
|
||||
// and also caches the resolved detail query map so it is built only once per request.
|
||||
type hostDetailQueryConfig struct {
|
||||
appConfig *fleet.AppConfig
|
||||
features *fleet.Features
|
||||
detailQueries map[string]osquery_utils.DetailQuery
|
||||
}
|
||||
|
||||
func (svc *Service) loadHostDetailQueryConfig(ctx context.Context, host *fleet.Host) (*hostDetailQueryConfig, error) {
|
||||
appConfig, err := svc.ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "read app config")
|
||||
}
|
||||
|
||||
features, err := svc.HostFeatures(ctx, host)
|
||||
if err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "read host features")
|
||||
}
|
||||
|
||||
var mdmTeamConfig *fleet.TeamMDM
|
||||
if appConfig != nil && appConfig.MDM.EnabledAndConfigured && host.TeamID != nil {
|
||||
mdmTeamConfig, err = svc.ds.TeamMDMConfig(ctx, *host.TeamID)
|
||||
if err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "reading MDM Team Config")
|
||||
}
|
||||
}
|
||||
|
||||
detailQueries := osquery_utils.GetDetailQueries(
|
||||
ctx,
|
||||
svc.config,
|
||||
appConfig,
|
||||
features,
|
||||
osquery_utils.Integrations{
|
||||
ConditionalAccessMicrosoft: svc.hostRequiresConditionalAccessMicrosoftIngestion(ctx, host),
|
||||
},
|
||||
mdmTeamConfig,
|
||||
)
|
||||
|
||||
return &hostDetailQueryConfig{
|
||||
appConfig: appConfig,
|
||||
features: features,
|
||||
detailQueries: detailQueries,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// detailQueriesForHost returns the map of detail+additional queries that should be executed by
|
||||
// osqueryd to fill in the host details.
|
||||
func (svc *Service) detailQueriesForHost(ctx context.Context, host *fleet.Host) (queries map[string]string, discovery map[string]string, err error) {
|
||||
|
|
@ -729,36 +778,15 @@ func (svc *Service) detailQueriesForHost(ctx context.Context, host *fleet.Host)
|
|||
}
|
||||
}
|
||||
|
||||
appConfig, err := svc.ds.AppConfig(ctx)
|
||||
cfg, err := svc.loadHostDetailQueryConfig(ctx, host)
|
||||
if err != nil {
|
||||
return nil, nil, ctxerr.Wrap(ctx, err, "read app config")
|
||||
}
|
||||
|
||||
features, err := svc.HostFeatures(ctx, host)
|
||||
if err != nil {
|
||||
return nil, nil, ctxerr.Wrap(ctx, err, "read host features")
|
||||
}
|
||||
|
||||
var mdmTeamConfig *fleet.TeamMDM
|
||||
if appConfig != nil && appConfig.MDM.EnabledAndConfigured && host.TeamID != nil {
|
||||
mdmTeamConfig, err = svc.ds.TeamMDMConfig(ctx, *host.TeamID)
|
||||
if err != nil {
|
||||
return nil, nil, ctxerr.Wrap(ctx, err, "reading MDM Team Config")
|
||||
}
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
queries = make(map[string]string)
|
||||
discovery = make(map[string]string)
|
||||
|
||||
detailQueries := osquery_utils.GetDetailQueries(
|
||||
ctx,
|
||||
svc.config,
|
||||
appConfig,
|
||||
features,
|
||||
osquery_utils.Integrations{
|
||||
ConditionalAccessMicrosoft: svc.hostRequiresConditionalAccessMicrosoftIngestion(ctx, host),
|
||||
}, mdmTeamConfig)
|
||||
for name, query := range detailQueries {
|
||||
for name, query := range cfg.detailQueries {
|
||||
if criticalQueriesOnly && !criticalDetailQueries[name] {
|
||||
continue
|
||||
}
|
||||
|
|
@ -784,13 +812,13 @@ func (svc *Service) detailQueriesForHost(ctx context.Context, host *fleet.Host)
|
|||
}
|
||||
}
|
||||
|
||||
if features.AdditionalQueries == nil || criticalQueriesOnly {
|
||||
if cfg.features.AdditionalQueries == nil || criticalQueriesOnly {
|
||||
// No additional queries set
|
||||
return queries, discovery, nil
|
||||
}
|
||||
|
||||
var additionalQueries map[string]string
|
||||
if err := json.Unmarshal(*features.AdditionalQueries, &additionalQueries); err != nil {
|
||||
if err := json.Unmarshal(*cfg.features.AdditionalQueries, &additionalQueries); err != nil {
|
||||
return nil, nil, ctxerr.Wrap(ctx, err, "unmarshal additional queries")
|
||||
}
|
||||
|
||||
|
|
@ -1086,6 +1114,12 @@ func (svc *Service) SubmitDistributedQueryResults(
|
|||
|
||||
preProcessSoftwareResults(ctx, host, results, statuses, messages, osquery_utils.SoftwareOverrideQueries, svc.logger)
|
||||
|
||||
// Lazy-load detail query config only when a detail result is present, to avoid
|
||||
// unnecessary HostFeatures/TeamMDMConfig/conditional access DB calls for payloads
|
||||
// that only contain label, policy, or live-query results.
|
||||
var detailConfig *hostDetailQueryConfig
|
||||
var detailConfigFailed bool
|
||||
|
||||
var hostWithoutPolicies bool
|
||||
for query, rows := range results {
|
||||
// When receiving this query in the results, we will update the host's
|
||||
|
|
@ -1109,8 +1143,23 @@ func (svc *Service) SubmitDistributedQueryResults(
|
|||
}
|
||||
queryStats := stats[query]
|
||||
|
||||
// Lazy-load detail config on first detail query result.
|
||||
if detailConfig == nil && strings.HasPrefix(query, hostDetailQueryPrefix) {
|
||||
if detailConfigFailed {
|
||||
// Already failed to load detail config, skip all detail queries.
|
||||
continue
|
||||
}
|
||||
var err error
|
||||
detailConfig, err = svc.loadHostDetailQueryConfig(ctx, host)
|
||||
if err != nil {
|
||||
detailConfigFailed = true
|
||||
logging.WithErr(ctx, ctxerr.Wrap(ctx, err, "loading host detail query config"))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
ingestedDetailUpdated, ingestedAdditionalUpdated, err := svc.ingestQueryResults(
|
||||
ctx, query, host, rows, failed, messages, policyResults, labelResults, additionalResults, queryStats,
|
||||
ctx, query, host, rows, failed, messages, policyResults, labelResults, additionalResults, queryStats, detailConfig,
|
||||
)
|
||||
if err != nil {
|
||||
logging.WithErr(ctx, ctxerr.New(ctx, "error in query ingestion"))
|
||||
|
|
@ -1121,6 +1170,8 @@ func (svc *Service) SubmitDistributedQueryResults(
|
|||
additionalUpdated = additionalUpdated || ingestedAdditionalUpdated
|
||||
}
|
||||
|
||||
// Load AppConfig separately for label/policy processing. detailConfig may be nil
|
||||
// (no detail queries in this check-in) or may have failed to load (soft failure).
|
||||
ac, err := svc.ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "getting app config")
|
||||
|
|
@ -1253,16 +1304,11 @@ func (svc *Service) SubmitDistributedQueryResults(
|
|||
}
|
||||
|
||||
if refetchRequested || detailUpdated || refetchCriticalCleared {
|
||||
appConfig, err := svc.ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
logging.WithErr(ctx, err)
|
||||
if ac.ServerSettings.DeferredSaveHost {
|
||||
go svc.serialUpdateHost(ctx, host)
|
||||
} else {
|
||||
if appConfig.ServerSettings.DeferredSaveHost {
|
||||
go svc.serialUpdateHost(host)
|
||||
} else {
|
||||
if err := svc.ds.UpdateHost(ctx, host); err != nil {
|
||||
logging.WithErr(ctx, err)
|
||||
}
|
||||
if err := svc.ds.UpdateHost(ctx, host); err != nil {
|
||||
logging.WithErr(ctx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1681,6 +1727,7 @@ func (svc *Service) ingestQueryResults(
|
|||
labelResults map[uint]*bool,
|
||||
additionalResults fleet.OsqueryDistributedQueryResults,
|
||||
stats *fleet.Stats,
|
||||
detailConfig *hostDetailQueryConfig,
|
||||
) (bool, bool, error) {
|
||||
var detailUpdated, additionalUpdated bool
|
||||
|
||||
|
|
@ -1706,11 +1753,14 @@ func (svc *Service) ingestQueryResults(
|
|||
|
||||
switch {
|
||||
case strings.HasPrefix(query, hostDetailQueryPrefix):
|
||||
if detailConfig == nil { // safety net for NilAway linter
|
||||
return false, false, newOsqueryError("detail query config not loaded for query " + query)
|
||||
}
|
||||
trimmedQuery := strings.TrimPrefix(query, hostDetailQueryPrefix)
|
||||
var ingested bool
|
||||
ingested, err = svc.directIngestDetailQuery(ctx, host, trimmedQuery, rows)
|
||||
ingested, err = svc.directIngestDetailQuery(ctx, host, trimmedQuery, rows, detailConfig)
|
||||
if !ingested && err == nil {
|
||||
err = svc.ingestDetailQuery(ctx, host, trimmedQuery, rows)
|
||||
err = svc.ingestDetailQuery(ctx, host, trimmedQuery, rows, detailConfig)
|
||||
// No err != nil check here because ingestDetailQuery could have updated
|
||||
// successfully some values of host.
|
||||
detailUpdated = true
|
||||
|
|
@ -1726,36 +1776,8 @@ func (svc *Service) ingestQueryResults(
|
|||
|
||||
var noSuchTableRegexp = regexp.MustCompile(`^no such table: \S+$`)
|
||||
|
||||
func (svc *Service) directIngestDetailQuery(ctx context.Context, host *fleet.Host, name string, rows []map[string]string) (ingested bool, err error) {
|
||||
features, err := svc.HostFeatures(ctx, host)
|
||||
if err != nil {
|
||||
return false, newOsqueryError("ingest detail query: " + err.Error())
|
||||
}
|
||||
|
||||
appConfig, err := svc.ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
return false, newOsqueryError("ingest detail query: " + err.Error())
|
||||
}
|
||||
|
||||
var mdmTeamConfig *fleet.TeamMDM
|
||||
if appConfig != nil && appConfig.MDM.EnabledAndConfigured && host.TeamID != nil {
|
||||
mdmTeamConfig, err = svc.ds.TeamMDMConfig(ctx, *host.TeamID)
|
||||
if err != nil {
|
||||
return false, newOsqueryError("ingest detail query: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
detailQueries := osquery_utils.GetDetailQueries(
|
||||
ctx,
|
||||
svc.config,
|
||||
appConfig,
|
||||
features,
|
||||
osquery_utils.Integrations{
|
||||
ConditionalAccessMicrosoft: svc.hostRequiresConditionalAccessMicrosoftIngestion(ctx, host),
|
||||
},
|
||||
mdmTeamConfig,
|
||||
)
|
||||
query, ok := detailQueries[name]
|
||||
func (svc *Service) directIngestDetailQuery(ctx context.Context, host *fleet.Host, name string, rows []map[string]string, cfg *hostDetailQueryConfig) (ingested bool, err error) {
|
||||
query, ok := cfg.detailQueries[name]
|
||||
if !ok {
|
||||
return false, newOsqueryError("unknown detail query " + name)
|
||||
}
|
||||
|
|
@ -1885,44 +1907,14 @@ func ingestMembershipQuery(
|
|||
|
||||
// ingestDetailQuery takes the results of a detail query and modifies the
|
||||
// provided fleet.Host appropriately.
|
||||
func (svc *Service) ingestDetailQuery(ctx context.Context, host *fleet.Host, name string, rows []map[string]string) error {
|
||||
features, err := svc.HostFeatures(ctx, host)
|
||||
if err != nil {
|
||||
return newOsqueryError("ingest detail query: " + err.Error())
|
||||
}
|
||||
|
||||
appConfig, err := svc.ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
return newOsqueryError("ingest detail query: " + err.Error())
|
||||
}
|
||||
|
||||
var mdmTeamConfig *fleet.TeamMDM
|
||||
if appConfig != nil && appConfig.MDM.EnabledAndConfigured && host.TeamID != nil {
|
||||
mdmTeamConfig, err = svc.ds.TeamMDMConfig(ctx, *host.TeamID)
|
||||
if err != nil {
|
||||
return newOsqueryError("ingest detail query: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
detailQueries := osquery_utils.GetDetailQueries(
|
||||
ctx,
|
||||
svc.config,
|
||||
appConfig,
|
||||
features,
|
||||
osquery_utils.Integrations{
|
||||
ConditionalAccessMicrosoft: svc.hostRequiresConditionalAccessMicrosoftIngestion(ctx, host),
|
||||
},
|
||||
mdmTeamConfig,
|
||||
)
|
||||
|
||||
query, ok := detailQueries[name]
|
||||
func (svc *Service) ingestDetailQuery(ctx context.Context, host *fleet.Host, name string, rows []map[string]string, cfg *hostDetailQueryConfig) error {
|
||||
query, ok := cfg.detailQueries[name]
|
||||
if !ok {
|
||||
return newOsqueryError("unknown detail query " + name)
|
||||
}
|
||||
|
||||
if query.IngestFunc != nil {
|
||||
err = query.IngestFunc(ctx, svc.logger, host, rows)
|
||||
if err != nil {
|
||||
if err := query.IngestFunc(ctx, svc.logger, host, rows); err != nil {
|
||||
return newOsqueryError(fmt.Sprintf("ingesting query %s: %s", name, err.Error()))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue