From 4a867d53dce724b041a325a57269ab0c5e59db8d Mon Sep 17 00:00:00 2001 From: Roberto Dip Date: Tue, 21 Jun 2022 15:09:00 -0300 Subject: [PATCH] use a single context for background jobs and HTTP handlers (#6313) --- cmd/fleet/cron.go | 123 +++++++++++++--------------------- cmd/fleet/serve.go | 21 ++---- server/service/async/async.go | 2 + 3 files changed, 54 insertions(+), 92 deletions(-) diff --git a/cmd/fleet/cron.go b/cmd/fleet/cron.go index 92549d5d6d..1b92ee44fc 100644 --- a/cmd/fleet/cron.go +++ b/cmd/fleet/cron.go @@ -2,8 +2,6 @@ package main import ( "context" - "errors" - "fmt" "net/url" "os" "strconv" @@ -25,6 +23,12 @@ import ( "github.com/go-kit/kit/log/level" ) +func errHandler(ctx context.Context, logger kitlog.Logger, msg string, err error) { + level.Error(logger).Log("err", msg, "details", err) + sentry.CaptureException(err) + ctxerr.Handle(ctx, err) +} + func cronDB(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, identifier string, license *fleet.LicenseInfo, enrollHostLimiter fleet.EnrollHostLimiter) { logger = kitlog.With(logger, "cron", lockKeyLeader) @@ -50,61 +54,50 @@ func cronDB(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, ident _, err := ds.CleanupDistributedQueryCampaigns(ctx, time.Now()) if err != nil { - level.Error(logger).Log("err", "cleaning distributed query campaigns", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "cleaning distributed query campaigns", err) } _, err = ds.CleanupIncomingHosts(ctx, time.Now()) if err != nil { - level.Error(logger).Log("err", "cleaning incoming hosts", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "cleaning incoming hosts", err) } _, err = ds.CleanupCarves(ctx, time.Now()) if err != nil { - level.Error(logger).Log("err", "cleaning carves", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "cleaning carves", err) } err = ds.UpdateQueryAggregatedStats(ctx) if err != nil { - level.Error(logger).Log("err", "aggregating query stats", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "aggregating query stats", err) } err = ds.UpdateScheduledQueryAggregatedStats(ctx) if err != nil { - level.Error(logger).Log("err", "aggregating scheduled query stats", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "aggregating scheduled query stats", err) } _, err = ds.CleanupExpiredHosts(ctx) if err != nil { - level.Error(logger).Log("err", "cleaning expired hosts", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "cleaning expired hosts", err) } err = ds.GenerateAggregatedMunkiAndMDM(ctx) if err != nil { - level.Error(logger).Log("err", "aggregating munki and mdm data", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "aggregating munki and mdm data", err) } err = ds.CleanupPolicyMembership(ctx, time.Now()) if err != nil { - level.Error(logger).Log("err", "cleanup policy membership", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "cleanup policy membership", err) } err = ds.UpdateOSVersions(ctx) if err != nil { - level.Error(logger).Log("err", "update os versions", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "update os versions", err) } err = enrollHostLimiter.SyncEnrolledHostIDs(ctx) if err != nil { - level.Error(logger).Log("err", "sync enrolled host ids", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "sync enrolled host ids", err) } // NOTE(mna): this is not a route from the fleet server (not in server/service/handler.go) so it // will not automatically support the /latest/ versioning. Leaving it as /v1/ for that reason. err = trySendStatistics(ctx, ds, fleet.StatisticsFrequency, "https://fleetdm.com/api/v1/webhooks/receive-usage-analytics", license) if err != nil { - level.Error(logger).Log("err", "sending statistics", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "sending statistics", err) } level.Debug(logger).Log("loop", "done") @@ -182,8 +175,7 @@ func cronVulnerabilities( } if config.Vulnerabilities.CurrentInstanceChecks == "auto" { if locked, err := ds.Lock(ctx, lockKeyVulnerabilities, identifier, 1*time.Hour); err != nil { - level.Error(logger).Log("msg", "Error acquiring lock", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "error acquiring lock", err) continue } else if !locked { level.Debug(logger).Log("msg", "Not the leader. Skipping...") @@ -195,8 +187,7 @@ func cronVulnerabilities( // refresh app config to check if webhook or any jira integration is // enabled, as this can be changed dynamically. if freshAppConfig, err := ds.AppConfig(ctx); err != nil { - level.Error(logger).Log("config", "couldn't refresh app config", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "couldn't refresh app config", err) // continue with stale app config } else { appConfig = freshAppConfig @@ -213,9 +204,8 @@ func cronVulnerabilities( for _, j := range appConfig.Integrations.Jira { if j.EnableSoftwareVulnerabilities { if vulnAutomationEnabled != "" { - err := errors.New("more than one automation enabled: jira check") - level.Error(logger).Log("err", err) - sentry.CaptureException(err) + err := ctxerr.New(ctx, "jira check") + errHandler(ctx, logger, "more than one automation enabled", err) } vulnAutomationEnabled = "jira" break @@ -225,9 +215,8 @@ func cronVulnerabilities( for _, z := range appConfig.Integrations.Zendesk { if z.EnableSoftwareVulnerabilities { if vulnAutomationEnabled != "" { - err := errors.New("more than one automation enabled: Zendesk check") - level.Error(logger).Log("err", err) - sentry.CaptureException(err) + err := ctxerr.New(ctx, "zendesk check") + errHandler(ctx, logger, "more than one automation enabled", err) } vulnAutomationEnabled = "zendesk" break @@ -252,8 +241,7 @@ func cronVulnerabilities( appConfig, time.Now()); err != nil { - level.Error(logger).Log("err", "triggering vulnerabilities webhook", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "triggering vulnerabilities webhook", err) } case "jira": @@ -264,8 +252,7 @@ func cronVulnerabilities( kitlog.With(logger, "jira", "vulnerabilities"), recentVulns, ); err != nil { - level.Error(logger).Log("err", "queueing vulnerabilities to jira", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "queueing vulnerabilities to jira", err) } case "zendesk": @@ -276,21 +263,18 @@ func cronVulnerabilities( kitlog.With(logger, "zendesk", "vulnerabilities"), recentVulns, ); err != nil { - level.Error(logger).Log("err", "queueing vulnerabilities to Zendesk", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "queueing vulnerabilities to Zendesk", err) } default: - err = errors.New("no vuln automations enabled") - level.Error(logger).Log("err", "attempting to process vuln automations", err) - sentry.CaptureException(err) + err = ctxerr.New(ctx, "no vuln automations enabled") + errHandler(ctx, logger, "attempting to process vuln automations", err) } } } if err := ds.CalculateHostsPerSoftware(ctx, time.Now()); err != nil { - level.Error(logger).Log("msg", "calculating hosts count per software", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "calculating hosts count per software", err) } // It's important vulnerabilities.PostProcess runs after ds.CalculateHostsPerSoftware @@ -298,8 +282,7 @@ func cronVulnerabilities( // or software being uninstalled on hosts). if !vulnDisabled { if err := vulnerabilities.PostProcess(ctx, ds, vulnPath, logger, config); err != nil { - level.Error(logger).Log("msg", "post processing CVEs", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "post processing CVEs", err) } } @@ -321,8 +304,7 @@ func filterRecentVulns( recent, err := ds.ListCVEs(ctx, maxAge) if err != nil { - level.Error(logger).Log("msg", "could not fetch recent CVEs", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "could not fetch recent CVEs", err) return nil } @@ -368,8 +350,7 @@ func checkOvalVulnerabilities( // Get Platforms versions, err := ds.OSVersions(ctx, nil, nil) if err != nil { - level.Error(logger).Log("msg", "updating oval definitions", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "updating oval definitions", err) return nil } @@ -377,8 +358,7 @@ func checkOvalVulnerabilities( client := fleethttp.NewClient() downloaded, err := oval.Refresh(ctx, client, versions, vulnPath) if err != nil { - level.Error(logger).Log("msg", "updating oval definitions", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "updating oval definitions", err) } for _, d := range downloaded { level.Debug(logger).Log("oval-sync-downloaded", d) @@ -396,8 +376,7 @@ func checkOvalVulnerabilities( "found new", len(r)) results = append(results, r...) if err != nil { - level.Error(logger).Log("msg", "analyzing oval definitions", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "analyzing oval definitions", err) } } @@ -415,30 +394,25 @@ func checkNVDVulnerabilities( if !config.Vulnerabilities.DisableDataSync { err := vulnerabilities.Sync(vulnPath, config.Vulnerabilities.CPEDatabaseURL) if err != nil { - level.Error(logger).Log("msg", "syncing vulnerability database", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "syncing vulnerability database", err) return nil } } if err := vulnerabilities.LoadCVEMeta(logger, vulnPath, ds); err != nil { - err = fmt.Errorf("load cve meta: %w", err) - level.Error(logger).Log("err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "load cve meta", err) // don't return, continue on ... } err := vulnerabilities.TranslateSoftwareToCPE(ctx, ds, vulnPath, logger) if err != nil { - level.Error(logger).Log("msg", "analyzing vulnerable software: Software->CPE", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "analyzing vulnerable software: Software->CPE", err) return nil } vulns, err := vulnerabilities.TranslateCPEToCVE(ctx, ds, vulnPath, logger, collectVulns) if err != nil { - level.Error(logger).Log("msg", "analyzing vulnerable software: CPE->CVE", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "analyzing vulnerable software: CPE->CVE", err) return nil } @@ -487,8 +461,7 @@ func cronWebhooks( // and update the ticker for the next run. appConfig, err = ds.AppConfig(ctx) if err != nil { - level.Error(logger).Log("config", "couldn't read app config", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "couldn't read app config", err) } else { ticker.Reset(appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour)) start = time.Now() @@ -523,8 +496,7 @@ func maybeTriggerHostStatus( if err := webhooks.TriggerHostStatusWebhook( ctx, ds, kitlog.With(logger, "webhook", "host_status"), appConfig, ); err != nil { - level.Error(logger).Log("err", "triggering host status webhook", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "triggering host status webhook", err) } } @@ -549,8 +521,7 @@ func maybeTriggerFailingPoliciesAutomation( serverURL, err := url.Parse(appConfig.ServerSettings.ServerURL) if err != nil { - level.Error(logger).Log("err", "parsing appConfig.ServerSettings.ServerURL", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "parsing appConfig.ServerSettings.ServerURL", err) return } @@ -588,8 +559,7 @@ func maybeTriggerFailingPoliciesAutomation( return nil }) if err != nil { - level.Error(logger).Log("err", "triggering failing policies automation", "details", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "triggering failing policies automation", err) } } @@ -631,8 +601,7 @@ func cronWorker( // is not a possible scenario. appConfig, err := ds.AppConfig(ctx) if err != nil { - level.Error(logger).Log("config", "couldn't read app config", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "couldn't read app config", err) } // we clear it even if we fail to load the app config, not a likely scenario // in our test environments for the needs of forced failures. @@ -664,8 +633,7 @@ func cronWorker( // integration. appConfig, err := ds.AppConfig(ctx) if err != nil { - level.Error(logger).Log("config", "couldn't read app config", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "couldn't read app config", err) continue } @@ -674,8 +642,7 @@ func cronWorker( workCtx, cancel := context.WithTimeout(ctx, lockDuration) if err := w.ProcessJobs(workCtx); err != nil { - level.Error(logger).Log("msg", "Error processing jobs", "err", err) - sentry.CaptureException(err) + errHandler(ctx, logger, "Error processing jobs", err) } cancel() // don't use defer inside loop } diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index e938a6a978..781a0eb823 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -6,7 +6,6 @@ import ( "crypto/subtle" "crypto/tls" "database/sql/driver" - "errors" "fmt" "io/ioutil" "math/rand" @@ -338,9 +337,10 @@ the way that the Fleet server works. } } - // TODO: gather all the different contexts and use just one ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() + eh := errorstore.NewHandler(ctx, redisPool, logger, config.Logging.ErrorRetentionPeriod) + ctx = ctxerr.NewContext(ctx, eh) svc, err := service.NewService(ctx, ds, task, resultStore, logger, osqueryLogger, config, mailService, clock.C, ssoSessionStore, liveQueryStore, carveStore, *license, failingPolicySet, geoIP, redisWrapperDS) if err != nil { initFatal(err, "initializing service") @@ -353,7 +353,7 @@ the way that the Fleet server works. } } - cancelBackground := runCrons(ds, task, kitlog.With(logger, "component", "crons"), config, license, failingPolicySet, redisWrapperDS) + runCrons(ctx, ds, task, kitlog.With(logger, "component", "crons"), config, license, failingPolicySet, redisWrapperDS) // Flush seen hosts every second hostsAsyncCfg := config.Osquery.AsyncConfigForTask(configpkg.AsyncTaskHostLastSeen) @@ -434,8 +434,6 @@ the way that the Fleet server works. // Instantiate a gRPC service to handle launcher requests. launcher := launcher.New(svc, logger, grpc.NewServer(), healthCheckers) - eh := errorstore.NewHandler(ctx, redisPool, logger, config.Logging.ErrorRetentionPeriod) - rootMux := http.NewServeMux() rootMux.Handle("/healthz", service.PrometheusMetricsHandler("healthz", health.Handler(httpLogger, healthCheckers))) rootMux.Handle("/version", service.PrometheusMetricsHandler("version", version.Handler())) @@ -513,8 +511,6 @@ the way that the Fleet server works. writeTimeout = liveQueryRestPeriod } - httpSrvCtx := ctxerr.NewContext(ctx, eh) - // Create the handler based on whether tracing should be there var handler http.Handler if config.Logging.TracingEnabled && config.Logging.TracingType == "elasticapm" { @@ -532,7 +528,7 @@ the way that the Fleet server works. IdleTimeout: 5 * time.Minute, MaxHeaderBytes: 1 << 18, // 0.25 MB (262144 bytes) BaseContext: func(l net.Listener) context.Context { - return httpSrvCtx + return ctx }, } srv.SetKeepAlivesEnabled(config.Server.Keepalive) @@ -557,7 +553,6 @@ the way that the Fleet server works. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() errs <- func() error { - cancelBackground() cancelFunc() launcher.GracefulStop() return srv.Shutdown(ctx) @@ -635,6 +630,7 @@ func trySendStatistics(ctx context.Context, ds fleet.Datastore, frequency time.D } func runCrons( + ctx context.Context, ds fleet.Datastore, task *async.Task, logger kitlog.Logger, @@ -642,12 +638,11 @@ func runCrons( license *fleet.LicenseInfo, failingPoliciesSet fleet.FailingPolicySet, enrollHostLimiter fleet.EnrollHostLimiter, -) context.CancelFunc { - ctx, cancelBackground := context.WithCancel(context.Background()) +) { ourIdentifier, err := server.GenerateRandomText(64) if err != nil { - initFatal(errors.New("Error generating random instance identifier"), "") + initFatal(ctxerr.New(ctx, "generating random instance identifier"), "") } // StartCollectors starts a goroutine per collector, using ctx to cancel. @@ -658,8 +653,6 @@ func runCrons( ctx, ds, kitlog.With(logger, "cron", "vulnerabilities"), ourIdentifier, config) go cronWebhooks(ctx, ds, kitlog.With(logger, "cron", "webhooks"), ourIdentifier, failingPoliciesSet, 1*time.Hour) go cronWorker(ctx, ds, kitlog.With(logger, "cron", "worker"), ourIdentifier) - - return cancelBackground } // Support for TLS security profiles, we set up the TLS configuation based on diff --git a/server/service/async/async.go b/server/service/async/async.go index aefeb52cb9..bfde3adb20 100644 --- a/server/service/async/async.go +++ b/server/service/async/async.go @@ -7,6 +7,7 @@ import ( "github.com/WatchBeam/clock" "github.com/fleetdm/fleet/v4/server/config" + "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" "github.com/fleetdm/fleet/v4/server/datastore/redis" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/getsentry/sentry-go" @@ -46,6 +47,7 @@ func (t *Task) StartCollectors(ctx context.Context, logger kitlog.Logger) { collectorErrHandler := func(name string, err error) { level.Error(logger).Log("err", fmt.Sprintf("%s collector", name), "details", err) sentry.CaptureException(err) + ctxerr.Handle(ctx, err) } handlers := map[config.AsyncTaskName]collectorHandlerFunc{