mirror of
https://github.com/fleetdm/fleet
synced 2026-05-24 09:28:54 +00:00
Refactor vulnerabilities cron to scheduler package (#7650)
This commit is contained in:
parent
de96086871
commit
6a3d9959fc
4 changed files with 74 additions and 87 deletions
|
|
@ -33,96 +33,84 @@ func errHandler(ctx context.Context, logger kitlog.Logger, msg string, err error
|
||||||
ctxerr.Handle(ctx, err)
|
ctxerr.Handle(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func startVulnerabilitiesSchedule(
|
||||||
|
ctx context.Context,
|
||||||
|
instanceID string,
|
||||||
|
ds fleet.Datastore,
|
||||||
|
logger kitlog.Logger,
|
||||||
|
config *config.VulnerabilitiesConfig,
|
||||||
|
license *fleet.LicenseInfo,
|
||||||
|
) *schedule.Schedule {
|
||||||
|
interval := config.Periodicity
|
||||||
|
vulnerabilitiesLogger := kitlog.With(logger, "cron", "vulnerabilities")
|
||||||
|
s := schedule.New(
|
||||||
|
ctx, "vulnerabilities", instanceID, interval, ds,
|
||||||
|
schedule.WithLogger(vulnerabilitiesLogger),
|
||||||
|
schedule.WithJob(
|
||||||
|
"cron_vulnerabilities",
|
||||||
|
func(ctx context.Context) error {
|
||||||
|
// TODO(lucas): Decouple cronVulnerabilities into multiple jobs.
|
||||||
|
return cronVulnerabilities(ctx, ds, vulnerabilitiesLogger, config, license)
|
||||||
|
},
|
||||||
|
),
|
||||||
|
schedule.WithJob(
|
||||||
|
"cron_sync_host_software",
|
||||||
|
func(ctx context.Context) error {
|
||||||
|
return ds.SyncHostsSoftware(ctx, time.Now())
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
s.Start()
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
func cronVulnerabilities(
|
func cronVulnerabilities(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
ds fleet.Datastore,
|
ds fleet.Datastore,
|
||||||
logger kitlog.Logger,
|
logger kitlog.Logger,
|
||||||
identifier string,
|
|
||||||
config *config.VulnerabilitiesConfig,
|
config *config.VulnerabilitiesConfig,
|
||||||
license *fleet.LicenseInfo,
|
license *fleet.LicenseInfo,
|
||||||
) {
|
) error {
|
||||||
logger = kitlog.With(logger, "cron", lockKeyVulnerabilities)
|
|
||||||
|
|
||||||
if config.CurrentInstanceChecks == "no" || config.CurrentInstanceChecks == "0" {
|
if config.CurrentInstanceChecks == "no" || config.CurrentInstanceChecks == "0" {
|
||||||
level.Info(logger).Log("msg", "host not configured to check for vulnerabilities")
|
level.Info(logger).Log("msg", "host not configured to check for vulnerabilities")
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// release the lock when this function exits
|
|
||||||
defer func() {
|
|
||||||
// use a different context that won't be cancelled when shutting down
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
err := ds.Unlock(ctx, lockKeyVulnerabilities, identifier)
|
|
||||||
if err != nil {
|
|
||||||
errHandler(ctx, logger, "error releasing lock", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
level.Info(logger).Log("periodicity", config.Periodicity)
|
level.Info(logger).Log("periodicity", config.Periodicity)
|
||||||
|
|
||||||
ticker := time.NewTicker(10 * time.Second)
|
appConfig, err := ds.AppConfig(ctx)
|
||||||
for {
|
if err != nil {
|
||||||
level.Debug(logger).Log("waiting", "on ticker")
|
return fmt.Errorf("reading app config: %w", err)
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
level.Debug(logger).Log("waiting", "done")
|
|
||||||
ticker.Reset(config.Periodicity)
|
|
||||||
case <-ctx.Done():
|
|
||||||
level.Debug(logger).Log("exit", "done with cron.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.CurrentInstanceChecks == "auto" {
|
|
||||||
if locked, err := ds.Lock(ctx, lockKeyVulnerabilities, identifier, 1*time.Hour); err != nil {
|
|
||||||
errHandler(ctx, logger, "error acquiring lock", err)
|
|
||||||
continue
|
|
||||||
} else if !locked {
|
|
||||||
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
appConfig, err := ds.AppConfig(ctx)
|
|
||||||
if err != nil {
|
|
||||||
errHandler(ctx, logger, "couldn't read app config", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !appConfig.Features.EnableSoftwareInventory {
|
|
||||||
level.Info(logger).Log("msg", "software inventory not configured")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var vulnPath string
|
|
||||||
switch {
|
|
||||||
case config.DatabasesPath != "" && appConfig.VulnerabilitySettings.DatabasesPath != "":
|
|
||||||
vulnPath = config.DatabasesPath
|
|
||||||
level.Info(logger).Log(
|
|
||||||
"msg", "fleet config takes precedence over app config when both are configured",
|
|
||||||
"databases_path", vulnPath,
|
|
||||||
)
|
|
||||||
case config.DatabasesPath != "":
|
|
||||||
vulnPath = config.DatabasesPath
|
|
||||||
case appConfig.VulnerabilitySettings.DatabasesPath != "":
|
|
||||||
vulnPath = appConfig.VulnerabilitySettings.DatabasesPath
|
|
||||||
default:
|
|
||||||
level.Info(logger).Log("msg", "vulnerability scanning not configured, vulnerabilities databases path is empty")
|
|
||||||
}
|
|
||||||
if vulnPath != "" {
|
|
||||||
level.Info(logger).Log("msg", "scanning vulnerabilities")
|
|
||||||
if err := scanVulnerabilities(ctx, ds, logger, config, appConfig, vulnPath, license); err != nil {
|
|
||||||
errHandler(ctx, logger, "scanning vulnerabilities", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ds.SyncHostsSoftware(ctx, time.Now()); err != nil {
|
|
||||||
errHandler(ctx, logger, "calculating hosts count per software", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
level.Debug(logger).Log("loop", "done")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !appConfig.Features.EnableSoftwareInventory {
|
||||||
|
level.Info(logger).Log("msg", "software inventory not configured")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var vulnPath string
|
||||||
|
switch {
|
||||||
|
case config.DatabasesPath != "" && appConfig.VulnerabilitySettings.DatabasesPath != "":
|
||||||
|
vulnPath = config.DatabasesPath
|
||||||
|
level.Info(logger).Log(
|
||||||
|
"msg", "fleet config takes precedence over app config when both are configured",
|
||||||
|
"databases_path", vulnPath,
|
||||||
|
)
|
||||||
|
case config.DatabasesPath != "":
|
||||||
|
vulnPath = config.DatabasesPath
|
||||||
|
case appConfig.VulnerabilitySettings.DatabasesPath != "":
|
||||||
|
vulnPath = appConfig.VulnerabilitySettings.DatabasesPath
|
||||||
|
default:
|
||||||
|
level.Info(logger).Log("msg", "vulnerability scanning not configured, vulnerabilities databases path is empty")
|
||||||
|
}
|
||||||
|
if vulnPath != "" {
|
||||||
|
level.Info(logger).Log("msg", "scanning vulnerabilities")
|
||||||
|
if err := scanVulnerabilities(ctx, ds, logger, config, appConfig, vulnPath, license); err != nil {
|
||||||
|
return fmt.Errorf("scanning vulnerabilities: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func scanVulnerabilities(
|
func scanVulnerabilities(
|
||||||
|
|
|
||||||
|
|
@ -653,7 +653,6 @@ func basicAuthHandler(username, password string, next http.Handler) http.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
lockKeyVulnerabilities = "vulnerabilities"
|
|
||||||
lockKeyWebhooksHostStatus = "webhooks" // keeping this name for backwards compatibility.
|
lockKeyWebhooksHostStatus = "webhooks" // keeping this name for backwards compatibility.
|
||||||
lockKeyWebhooksFailingPolicies = "webhooks:global_failing_policies"
|
lockKeyWebhooksFailingPolicies = "webhooks:global_failing_policies"
|
||||||
lockKeyWorker = "worker"
|
lockKeyWorker = "worker"
|
||||||
|
|
@ -673,9 +672,6 @@ func runCrons(
|
||||||
// StartCollectors starts a goroutine per collector, using ctx to cancel.
|
// StartCollectors starts a goroutine per collector, using ctx to cancel.
|
||||||
task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task"))
|
task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task"))
|
||||||
|
|
||||||
go cronVulnerabilities(
|
|
||||||
ctx, ds, kitlog.With(logger, "cron", "vulnerabilities"), ourIdentifier, &config.Vulnerabilities, license)
|
|
||||||
|
|
||||||
go cronWebhooks(ctx, ds, kitlog.With(logger, "cron", "webhooks"), ourIdentifier, failingPoliciesSet, 1*time.Hour)
|
go cronWebhooks(ctx, ds, kitlog.With(logger, "cron", "webhooks"), ourIdentifier, failingPoliciesSet, 1*time.Hour)
|
||||||
go cronWorker(ctx, ds, kitlog.With(logger, "cron", "worker"), ourIdentifier)
|
go cronWorker(ctx, ds, kitlog.With(logger, "cron", "worker"), ourIdentifier)
|
||||||
}
|
}
|
||||||
|
|
@ -691,6 +687,7 @@ func startSchedules(
|
||||||
) error {
|
) error {
|
||||||
startCleanupsAndAggregationSchedule(ctx, instanceID, ds, logger, enrollHostLimiter)
|
startCleanupsAndAggregationSchedule(ctx, instanceID, ds, logger, enrollHostLimiter)
|
||||||
startSendStatsSchedule(ctx, instanceID, ds, config, license, logger)
|
startSendStatsSchedule(ctx, instanceID, ds, config, license, logger)
|
||||||
|
startVulnerabilitiesSchedule(ctx, instanceID, ds, logger, &config.Vulnerabilities, license)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -228,8 +228,8 @@ func TestCronVulnerabilitiesCreatesDatabasesPath(t *testing.T) {
|
||||||
Periodicity: 10 * time.Second,
|
Periodicity: 10 * time.Second,
|
||||||
CurrentInstanceChecks: "auto",
|
CurrentInstanceChecks: "auto",
|
||||||
}
|
}
|
||||||
|
// Use schedule to test that the schedule does indeed call cronVulnerabilities.
|
||||||
go cronVulnerabilities(ctx, ds, kitlog.NewNopLogger(), "AAA", &config, &fleet.LicenseInfo{Tier: "premium"})
|
startVulnerabilitiesSchedule(ctx, "test_instance", ds, kitlog.NewNopLogger(), &config, &fleet.LicenseInfo{Tier: "premium"})
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
info, err := os.Lstat(vulnPath)
|
info, err := os.Lstat(vulnPath)
|
||||||
|
|
@ -271,9 +271,6 @@ func TestScanVulnerabilitiesMkdirFailsIfVulnPathIsFile(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCronVulnerabilitiesSkipMkdirIfDisabled(t *testing.T) {
|
func TestCronVulnerabilitiesSkipMkdirIfDisabled(t *testing.T) {
|
||||||
logger := kitlog.NewNopLogger()
|
|
||||||
logger = level.NewFilter(logger, level.AllowDebug())
|
|
||||||
|
|
||||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||||
defer cancelFunc()
|
defer cancelFunc()
|
||||||
|
|
||||||
|
|
@ -301,7 +298,8 @@ func TestCronVulnerabilitiesSkipMkdirIfDisabled(t *testing.T) {
|
||||||
CurrentInstanceChecks: "1",
|
CurrentInstanceChecks: "1",
|
||||||
}
|
}
|
||||||
|
|
||||||
go cronVulnerabilities(ctx, ds, logger, "AAA", &config, &fleet.LicenseInfo{Tier: "premium"})
|
// Use schedule to test that the schedule does indeed call cronVulnerabilities.
|
||||||
|
startVulnerabilitiesSchedule(ctx, "test_instance", ds, kitlog.NewNopLogger(), &config, &fleet.LicenseInfo{Tier: "premium"})
|
||||||
|
|
||||||
// Every cron tick is 10 seconds ... here we just wait for a loop interation and assert the vuln
|
// Every cron tick is 10 seconds ... here we just wait for a loop interation and assert the vuln
|
||||||
// dir. was not created.
|
// dir. was not created.
|
||||||
|
|
|
||||||
|
|
@ -231,6 +231,10 @@ func (s *Schedule) Start() {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if newInterval <= 0 {
|
||||||
|
level.Debug(s.logger).Log("msg", "config reload interval method returned invalid interval")
|
||||||
|
continue
|
||||||
|
}
|
||||||
if schedInterval == newInterval {
|
if schedInterval == newInterval {
|
||||||
level.Debug(s.logger).Log("msg", "schedule interval unchanged")
|
level.Debug(s.logger).Log("msg", "schedule interval unchanged")
|
||||||
continue
|
continue
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue