use a single context for background jobs and HTTP handlers (#6313)

This commit is contained in:
Roberto Dip 2022-06-21 15:09:00 -03:00 committed by GitHub
parent eb7dbec88f
commit 4a867d53dc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 92 deletions

View file

@ -2,8 +2,6 @@ package main
import (
"context"
"errors"
"fmt"
"net/url"
"os"
"strconv"
@ -25,6 +23,12 @@ import (
"github.com/go-kit/kit/log/level"
)
func errHandler(ctx context.Context, logger kitlog.Logger, msg string, err error) {
level.Error(logger).Log("err", msg, "details", err)
sentry.CaptureException(err)
ctxerr.Handle(ctx, err)
}
func cronDB(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, identifier string, license *fleet.LicenseInfo, enrollHostLimiter fleet.EnrollHostLimiter) {
logger = kitlog.With(logger, "cron", lockKeyLeader)
@ -50,61 +54,50 @@ func cronDB(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, ident
_, err := ds.CleanupDistributedQueryCampaigns(ctx, time.Now())
if err != nil {
level.Error(logger).Log("err", "cleaning distributed query campaigns", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "cleaning distributed query campaigns", err)
}
_, err = ds.CleanupIncomingHosts(ctx, time.Now())
if err != nil {
level.Error(logger).Log("err", "cleaning incoming hosts", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "cleaning incoming hosts", err)
}
_, err = ds.CleanupCarves(ctx, time.Now())
if err != nil {
level.Error(logger).Log("err", "cleaning carves", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "cleaning carves", err)
}
err = ds.UpdateQueryAggregatedStats(ctx)
if err != nil {
level.Error(logger).Log("err", "aggregating query stats", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "aggregating query stats", err)
}
err = ds.UpdateScheduledQueryAggregatedStats(ctx)
if err != nil {
level.Error(logger).Log("err", "aggregating scheduled query stats", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "aggregating scheduled query stats", err)
}
_, err = ds.CleanupExpiredHosts(ctx)
if err != nil {
level.Error(logger).Log("err", "cleaning expired hosts", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "cleaning expired hosts", err)
}
err = ds.GenerateAggregatedMunkiAndMDM(ctx)
if err != nil {
level.Error(logger).Log("err", "aggregating munki and mdm data", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "aggregating munki and mdm data", err)
}
err = ds.CleanupPolicyMembership(ctx, time.Now())
if err != nil {
level.Error(logger).Log("err", "cleanup policy membership", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "cleanup policy membership", err)
}
err = ds.UpdateOSVersions(ctx)
if err != nil {
level.Error(logger).Log("err", "update os versions", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "update os versions", err)
}
err = enrollHostLimiter.SyncEnrolledHostIDs(ctx)
if err != nil {
level.Error(logger).Log("err", "sync enrolled host ids", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "sync enrolled host ids", err)
}
// NOTE(mna): this is not a route from the fleet server (not in server/service/handler.go) so it
// will not automatically support the /latest/ versioning. Leaving it as /v1/ for that reason.
err = trySendStatistics(ctx, ds, fleet.StatisticsFrequency, "https://fleetdm.com/api/v1/webhooks/receive-usage-analytics", license)
if err != nil {
level.Error(logger).Log("err", "sending statistics", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "sending statistics", err)
}
level.Debug(logger).Log("loop", "done")
@ -182,8 +175,7 @@ func cronVulnerabilities(
}
if config.Vulnerabilities.CurrentInstanceChecks == "auto" {
if locked, err := ds.Lock(ctx, lockKeyVulnerabilities, identifier, 1*time.Hour); err != nil {
level.Error(logger).Log("msg", "Error acquiring lock", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "error acquiring lock", err)
continue
} else if !locked {
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
@ -195,8 +187,7 @@ func cronVulnerabilities(
// refresh app config to check if webhook or any jira integration is
// enabled, as this can be changed dynamically.
if freshAppConfig, err := ds.AppConfig(ctx); err != nil {
level.Error(logger).Log("config", "couldn't refresh app config", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "couldn't refresh app config", err)
// continue with stale app config
} else {
appConfig = freshAppConfig
@ -213,9 +204,8 @@ func cronVulnerabilities(
for _, j := range appConfig.Integrations.Jira {
if j.EnableSoftwareVulnerabilities {
if vulnAutomationEnabled != "" {
err := errors.New("more than one automation enabled: jira check")
level.Error(logger).Log("err", err)
sentry.CaptureException(err)
err := ctxerr.New(ctx, "jira check")
errHandler(ctx, logger, "more than one automation enabled", err)
}
vulnAutomationEnabled = "jira"
break
@ -225,9 +215,8 @@ func cronVulnerabilities(
for _, z := range appConfig.Integrations.Zendesk {
if z.EnableSoftwareVulnerabilities {
if vulnAutomationEnabled != "" {
err := errors.New("more than one automation enabled: Zendesk check")
level.Error(logger).Log("err", err)
sentry.CaptureException(err)
err := ctxerr.New(ctx, "zendesk check")
errHandler(ctx, logger, "more than one automation enabled", err)
}
vulnAutomationEnabled = "zendesk"
break
@ -252,8 +241,7 @@ func cronVulnerabilities(
appConfig,
time.Now()); err != nil {
level.Error(logger).Log("err", "triggering vulnerabilities webhook", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "triggering vulnerabilities webhook", err)
}
case "jira":
@ -264,8 +252,7 @@ func cronVulnerabilities(
kitlog.With(logger, "jira", "vulnerabilities"),
recentVulns,
); err != nil {
level.Error(logger).Log("err", "queueing vulnerabilities to jira", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "queueing vulnerabilities to jira", err)
}
case "zendesk":
@ -276,21 +263,18 @@ func cronVulnerabilities(
kitlog.With(logger, "zendesk", "vulnerabilities"),
recentVulns,
); err != nil {
level.Error(logger).Log("err", "queueing vulnerabilities to Zendesk", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "queueing vulnerabilities to Zendesk", err)
}
default:
err = errors.New("no vuln automations enabled")
level.Error(logger).Log("err", "attempting to process vuln automations", err)
sentry.CaptureException(err)
err = ctxerr.New(ctx, "no vuln automations enabled")
errHandler(ctx, logger, "attempting to process vuln automations", err)
}
}
}
if err := ds.CalculateHostsPerSoftware(ctx, time.Now()); err != nil {
level.Error(logger).Log("msg", "calculating hosts count per software", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "calculating hosts count per software", err)
}
// It's important vulnerabilities.PostProcess runs after ds.CalculateHostsPerSoftware
@ -298,8 +282,7 @@ func cronVulnerabilities(
// or software being uninstalled on hosts).
if !vulnDisabled {
if err := vulnerabilities.PostProcess(ctx, ds, vulnPath, logger, config); err != nil {
level.Error(logger).Log("msg", "post processing CVEs", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "post processing CVEs", err)
}
}
@ -321,8 +304,7 @@ func filterRecentVulns(
recent, err := ds.ListCVEs(ctx, maxAge)
if err != nil {
level.Error(logger).Log("msg", "could not fetch recent CVEs", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "could not fetch recent CVEs", err)
return nil
}
@ -368,8 +350,7 @@ func checkOvalVulnerabilities(
// Get Platforms
versions, err := ds.OSVersions(ctx, nil, nil)
if err != nil {
level.Error(logger).Log("msg", "updating oval definitions", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "updating oval definitions", err)
return nil
}
@ -377,8 +358,7 @@ func checkOvalVulnerabilities(
client := fleethttp.NewClient()
downloaded, err := oval.Refresh(ctx, client, versions, vulnPath)
if err != nil {
level.Error(logger).Log("msg", "updating oval definitions", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "updating oval definitions", err)
}
for _, d := range downloaded {
level.Debug(logger).Log("oval-sync-downloaded", d)
@ -396,8 +376,7 @@ func checkOvalVulnerabilities(
"found new", len(r))
results = append(results, r...)
if err != nil {
level.Error(logger).Log("msg", "analyzing oval definitions", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "analyzing oval definitions", err)
}
}
@ -415,30 +394,25 @@ func checkNVDVulnerabilities(
if !config.Vulnerabilities.DisableDataSync {
err := vulnerabilities.Sync(vulnPath, config.Vulnerabilities.CPEDatabaseURL)
if err != nil {
level.Error(logger).Log("msg", "syncing vulnerability database", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "syncing vulnerability database", err)
return nil
}
}
if err := vulnerabilities.LoadCVEMeta(logger, vulnPath, ds); err != nil {
err = fmt.Errorf("load cve meta: %w", err)
level.Error(logger).Log("err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "load cve meta", err)
// don't return, continue on ...
}
err := vulnerabilities.TranslateSoftwareToCPE(ctx, ds, vulnPath, logger)
if err != nil {
level.Error(logger).Log("msg", "analyzing vulnerable software: Software->CPE", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "analyzing vulnerable software: Software->CPE", err)
return nil
}
vulns, err := vulnerabilities.TranslateCPEToCVE(ctx, ds, vulnPath, logger, collectVulns)
if err != nil {
level.Error(logger).Log("msg", "analyzing vulnerable software: CPE->CVE", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "analyzing vulnerable software: CPE->CVE", err)
return nil
}
@ -487,8 +461,7 @@ func cronWebhooks(
// and update the ticker for the next run.
appConfig, err = ds.AppConfig(ctx)
if err != nil {
level.Error(logger).Log("config", "couldn't read app config", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "couldn't read app config", err)
} else {
ticker.Reset(appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour))
start = time.Now()
@ -523,8 +496,7 @@ func maybeTriggerHostStatus(
if err := webhooks.TriggerHostStatusWebhook(
ctx, ds, kitlog.With(logger, "webhook", "host_status"), appConfig,
); err != nil {
level.Error(logger).Log("err", "triggering host status webhook", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "triggering host status webhook", err)
}
}
@ -549,8 +521,7 @@ func maybeTriggerFailingPoliciesAutomation(
serverURL, err := url.Parse(appConfig.ServerSettings.ServerURL)
if err != nil {
level.Error(logger).Log("err", "parsing appConfig.ServerSettings.ServerURL", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "parsing appConfig.ServerSettings.ServerURL", err)
return
}
@ -588,8 +559,7 @@ func maybeTriggerFailingPoliciesAutomation(
return nil
})
if err != nil {
level.Error(logger).Log("err", "triggering failing policies automation", "details", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "triggering failing policies automation", err)
}
}
@ -631,8 +601,7 @@ func cronWorker(
// is not a possible scenario.
appConfig, err := ds.AppConfig(ctx)
if err != nil {
level.Error(logger).Log("config", "couldn't read app config", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "couldn't read app config", err)
}
// we clear it even if we fail to load the app config, not a likely scenario
// in our test environments for the needs of forced failures.
@ -664,8 +633,7 @@ func cronWorker(
// integration.
appConfig, err := ds.AppConfig(ctx)
if err != nil {
level.Error(logger).Log("config", "couldn't read app config", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "couldn't read app config", err)
continue
}
@ -674,8 +642,7 @@ func cronWorker(
workCtx, cancel := context.WithTimeout(ctx, lockDuration)
if err := w.ProcessJobs(workCtx); err != nil {
level.Error(logger).Log("msg", "Error processing jobs", "err", err)
sentry.CaptureException(err)
errHandler(ctx, logger, "Error processing jobs", err)
}
cancel() // don't use defer inside loop
}

View file

@ -6,7 +6,6 @@ import (
"crypto/subtle"
"crypto/tls"
"database/sql/driver"
"errors"
"fmt"
"io/ioutil"
"math/rand"
@ -338,9 +337,10 @@ the way that the Fleet server works.
}
}
// TODO: gather all the different contexts and use just one
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
eh := errorstore.NewHandler(ctx, redisPool, logger, config.Logging.ErrorRetentionPeriod)
ctx = ctxerr.NewContext(ctx, eh)
svc, err := service.NewService(ctx, ds, task, resultStore, logger, osqueryLogger, config, mailService, clock.C, ssoSessionStore, liveQueryStore, carveStore, *license, failingPolicySet, geoIP, redisWrapperDS)
if err != nil {
initFatal(err, "initializing service")
@ -353,7 +353,7 @@ the way that the Fleet server works.
}
}
cancelBackground := runCrons(ds, task, kitlog.With(logger, "component", "crons"), config, license, failingPolicySet, redisWrapperDS)
runCrons(ctx, ds, task, kitlog.With(logger, "component", "crons"), config, license, failingPolicySet, redisWrapperDS)
// Flush seen hosts every second
hostsAsyncCfg := config.Osquery.AsyncConfigForTask(configpkg.AsyncTaskHostLastSeen)
@ -434,8 +434,6 @@ the way that the Fleet server works.
// Instantiate a gRPC service to handle launcher requests.
launcher := launcher.New(svc, logger, grpc.NewServer(), healthCheckers)
eh := errorstore.NewHandler(ctx, redisPool, logger, config.Logging.ErrorRetentionPeriod)
rootMux := http.NewServeMux()
rootMux.Handle("/healthz", service.PrometheusMetricsHandler("healthz", health.Handler(httpLogger, healthCheckers)))
rootMux.Handle("/version", service.PrometheusMetricsHandler("version", version.Handler()))
@ -513,8 +511,6 @@ the way that the Fleet server works.
writeTimeout = liveQueryRestPeriod
}
httpSrvCtx := ctxerr.NewContext(ctx, eh)
// Create the handler based on whether tracing should be there
var handler http.Handler
if config.Logging.TracingEnabled && config.Logging.TracingType == "elasticapm" {
@ -532,7 +528,7 @@ the way that the Fleet server works.
IdleTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 18, // 0.25 MB (262144 bytes)
BaseContext: func(l net.Listener) context.Context {
return httpSrvCtx
return ctx
},
}
srv.SetKeepAlivesEnabled(config.Server.Keepalive)
@ -557,7 +553,6 @@ the way that the Fleet server works.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
errs <- func() error {
cancelBackground()
cancelFunc()
launcher.GracefulStop()
return srv.Shutdown(ctx)
@ -635,6 +630,7 @@ func trySendStatistics(ctx context.Context, ds fleet.Datastore, frequency time.D
}
func runCrons(
ctx context.Context,
ds fleet.Datastore,
task *async.Task,
logger kitlog.Logger,
@ -642,12 +638,11 @@ func runCrons(
license *fleet.LicenseInfo,
failingPoliciesSet fleet.FailingPolicySet,
enrollHostLimiter fleet.EnrollHostLimiter,
) context.CancelFunc {
ctx, cancelBackground := context.WithCancel(context.Background())
) {
ourIdentifier, err := server.GenerateRandomText(64)
if err != nil {
initFatal(errors.New("Error generating random instance identifier"), "")
initFatal(ctxerr.New(ctx, "generating random instance identifier"), "")
}
// StartCollectors starts a goroutine per collector, using ctx to cancel.
@ -658,8 +653,6 @@ func runCrons(
ctx, ds, kitlog.With(logger, "cron", "vulnerabilities"), ourIdentifier, config)
go cronWebhooks(ctx, ds, kitlog.With(logger, "cron", "webhooks"), ourIdentifier, failingPoliciesSet, 1*time.Hour)
go cronWorker(ctx, ds, kitlog.With(logger, "cron", "worker"), ourIdentifier)
return cancelBackground
}
// Support for TLS security profiles, we set up the TLS configuation based on

View file

@ -7,6 +7,7 @@ import (
"github.com/WatchBeam/clock"
"github.com/fleetdm/fleet/v4/server/config"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/datastore/redis"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/getsentry/sentry-go"
@ -46,6 +47,7 @@ func (t *Task) StartCollectors(ctx context.Context, logger kitlog.Logger) {
collectorErrHandler := func(name string, err error) {
level.Error(logger).Log("err", fmt.Sprintf("%s collector", name), "details", err)
sentry.CaptureException(err)
ctxerr.Handle(ctx, err)
}
handlers := map[config.AsyncTaskName]collectorHandlerFunc{