From 6a3d9959fcc5b78bbd6500a1e317cdd2f6b87906 Mon Sep 17 00:00:00 2001 From: gillespi314 <73313222+gillespi314@users.noreply.github.com> Date: Fri, 16 Sep 2022 10:08:51 -0500 Subject: [PATCH] Refactor vulnerabilities cron to scheduler package (#7650) --- cmd/fleet/cron.go | 142 +++++++++++++--------------- cmd/fleet/serve.go | 5 +- cmd/fleet/serve_test.go | 10 +- server/service/schedule/schedule.go | 4 + 4 files changed, 74 insertions(+), 87 deletions(-) diff --git a/cmd/fleet/cron.go b/cmd/fleet/cron.go index 7cc21021b9..2c46347f9b 100644 --- a/cmd/fleet/cron.go +++ b/cmd/fleet/cron.go @@ -33,96 +33,84 @@ func errHandler(ctx context.Context, logger kitlog.Logger, msg string, err error ctxerr.Handle(ctx, err) } +func startVulnerabilitiesSchedule( + ctx context.Context, + instanceID string, + ds fleet.Datastore, + logger kitlog.Logger, + config *config.VulnerabilitiesConfig, + license *fleet.LicenseInfo, +) *schedule.Schedule { + interval := config.Periodicity + vulnerabilitiesLogger := kitlog.With(logger, "cron", "vulnerabilities") + s := schedule.New( + ctx, "vulnerabilities", instanceID, interval, ds, + schedule.WithLogger(vulnerabilitiesLogger), + schedule.WithJob( + "cron_vulnerabilities", + func(ctx context.Context) error { + // TODO(lucas): Decouple cronVulnerabilities into multiple jobs. + return cronVulnerabilities(ctx, ds, vulnerabilitiesLogger, config, license) + }, + ), + schedule.WithJob( + "cron_sync_host_software", + func(ctx context.Context) error { + return ds.SyncHostsSoftware(ctx, time.Now()) + }, + ), + ) + s.Start() + return s +} + func cronVulnerabilities( ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, - identifier string, config *config.VulnerabilitiesConfig, license *fleet.LicenseInfo, -) { - logger = kitlog.With(logger, "cron", lockKeyVulnerabilities) - +) error { if config.CurrentInstanceChecks == "no" || config.CurrentInstanceChecks == "0" { level.Info(logger).Log("msg", "host not configured to check for vulnerabilities") - return + return nil } - // release the lock when this function exits - defer func() { - // use a different context that won't be cancelled when shutting down - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - err := ds.Unlock(ctx, lockKeyVulnerabilities, identifier) - if err != nil { - errHandler(ctx, logger, "error releasing lock", err) - } - }() - level.Info(logger).Log("periodicity", config.Periodicity) - ticker := time.NewTicker(10 * time.Second) - for { - level.Debug(logger).Log("waiting", "on ticker") - select { - case <-ticker.C: - level.Debug(logger).Log("waiting", "done") - ticker.Reset(config.Periodicity) - case <-ctx.Done(): - level.Debug(logger).Log("exit", "done with cron.") - return - } - - if config.CurrentInstanceChecks == "auto" { - if locked, err := ds.Lock(ctx, lockKeyVulnerabilities, identifier, 1*time.Hour); err != nil { - errHandler(ctx, logger, "error acquiring lock", err) - continue - } else if !locked { - level.Debug(logger).Log("msg", "Not the leader. Skipping...") - continue - } - } - - appConfig, err := ds.AppConfig(ctx) - if err != nil { - errHandler(ctx, logger, "couldn't read app config", err) - continue - } - - if !appConfig.Features.EnableSoftwareInventory { - level.Info(logger).Log("msg", "software inventory not configured") - continue - } - - var vulnPath string - switch { - case config.DatabasesPath != "" && appConfig.VulnerabilitySettings.DatabasesPath != "": - vulnPath = config.DatabasesPath - level.Info(logger).Log( - "msg", "fleet config takes precedence over app config when both are configured", - "databases_path", vulnPath, - ) - case config.DatabasesPath != "": - vulnPath = config.DatabasesPath - case appConfig.VulnerabilitySettings.DatabasesPath != "": - vulnPath = appConfig.VulnerabilitySettings.DatabasesPath - default: - level.Info(logger).Log("msg", "vulnerability scanning not configured, vulnerabilities databases path is empty") - } - if vulnPath != "" { - level.Info(logger).Log("msg", "scanning vulnerabilities") - if err := scanVulnerabilities(ctx, ds, logger, config, appConfig, vulnPath, license); err != nil { - errHandler(ctx, logger, "scanning vulnerabilities", err) - } - } - - if err := ds.SyncHostsSoftware(ctx, time.Now()); err != nil { - errHandler(ctx, logger, "calculating hosts count per software", err) - } - - level.Debug(logger).Log("loop", "done") + appConfig, err := ds.AppConfig(ctx) + if err != nil { + return fmt.Errorf("reading app config: %w", err) } + + if !appConfig.Features.EnableSoftwareInventory { + level.Info(logger).Log("msg", "software inventory not configured") + return nil + } + + var vulnPath string + switch { + case config.DatabasesPath != "" && appConfig.VulnerabilitySettings.DatabasesPath != "": + vulnPath = config.DatabasesPath + level.Info(logger).Log( + "msg", "fleet config takes precedence over app config when both are configured", + "databases_path", vulnPath, + ) + case config.DatabasesPath != "": + vulnPath = config.DatabasesPath + case appConfig.VulnerabilitySettings.DatabasesPath != "": + vulnPath = appConfig.VulnerabilitySettings.DatabasesPath + default: + level.Info(logger).Log("msg", "vulnerability scanning not configured, vulnerabilities databases path is empty") + } + if vulnPath != "" { + level.Info(logger).Log("msg", "scanning vulnerabilities") + if err := scanVulnerabilities(ctx, ds, logger, config, appConfig, vulnPath, license); err != nil { + return fmt.Errorf("scanning vulnerabilities: %w", err) + } + } + + return nil } func scanVulnerabilities( diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index eadde0d5d2..c36876aa2a 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -653,7 +653,6 @@ func basicAuthHandler(username, password string, next http.Handler) http.Handler } const ( - lockKeyVulnerabilities = "vulnerabilities" lockKeyWebhooksHostStatus = "webhooks" // keeping this name for backwards compatibility. lockKeyWebhooksFailingPolicies = "webhooks:global_failing_policies" lockKeyWorker = "worker" @@ -673,9 +672,6 @@ func runCrons( // StartCollectors starts a goroutine per collector, using ctx to cancel. task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task")) - go cronVulnerabilities( - ctx, ds, kitlog.With(logger, "cron", "vulnerabilities"), ourIdentifier, &config.Vulnerabilities, license) - go cronWebhooks(ctx, ds, kitlog.With(logger, "cron", "webhooks"), ourIdentifier, failingPoliciesSet, 1*time.Hour) go cronWorker(ctx, ds, kitlog.With(logger, "cron", "worker"), ourIdentifier) } @@ -691,6 +687,7 @@ func startSchedules( ) error { startCleanupsAndAggregationSchedule(ctx, instanceID, ds, logger, enrollHostLimiter) startSendStatsSchedule(ctx, instanceID, ds, config, license, logger) + startVulnerabilitiesSchedule(ctx, instanceID, ds, logger, &config.Vulnerabilities, license) return nil } diff --git a/cmd/fleet/serve_test.go b/cmd/fleet/serve_test.go index 91935f2103..a9515d4f0d 100644 --- a/cmd/fleet/serve_test.go +++ b/cmd/fleet/serve_test.go @@ -228,8 +228,8 @@ func TestCronVulnerabilitiesCreatesDatabasesPath(t *testing.T) { Periodicity: 10 * time.Second, CurrentInstanceChecks: "auto", } - - go cronVulnerabilities(ctx, ds, kitlog.NewNopLogger(), "AAA", &config, &fleet.LicenseInfo{Tier: "premium"}) + // Use schedule to test that the schedule does indeed call cronVulnerabilities. + startVulnerabilitiesSchedule(ctx, "test_instance", ds, kitlog.NewNopLogger(), &config, &fleet.LicenseInfo{Tier: "premium"}) require.Eventually(t, func() bool { info, err := os.Lstat(vulnPath) @@ -271,9 +271,6 @@ func TestScanVulnerabilitiesMkdirFailsIfVulnPathIsFile(t *testing.T) { } func TestCronVulnerabilitiesSkipMkdirIfDisabled(t *testing.T) { - logger := kitlog.NewNopLogger() - logger = level.NewFilter(logger, level.AllowDebug()) - ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() @@ -301,7 +298,8 @@ func TestCronVulnerabilitiesSkipMkdirIfDisabled(t *testing.T) { CurrentInstanceChecks: "1", } - go cronVulnerabilities(ctx, ds, logger, "AAA", &config, &fleet.LicenseInfo{Tier: "premium"}) + // Use schedule to test that the schedule does indeed call cronVulnerabilities. + startVulnerabilitiesSchedule(ctx, "test_instance", ds, kitlog.NewNopLogger(), &config, &fleet.LicenseInfo{Tier: "premium"}) // Every cron tick is 10 seconds ... here we just wait for a loop interation and assert the vuln // dir. was not created. diff --git a/server/service/schedule/schedule.go b/server/service/schedule/schedule.go index 91e98b9b32..ad09fb5fc4 100644 --- a/server/service/schedule/schedule.go +++ b/server/service/schedule/schedule.go @@ -231,6 +231,10 @@ func (s *Schedule) Start() { sentry.CaptureException(err) continue } + if newInterval <= 0 { + level.Debug(s.logger).Log("msg", "config reload interval method returned invalid interval") + continue + } if schedInterval == newInterval { level.Debug(s.logger).Log("msg", "schedule interval unchanged") continue