diff --git a/cmd/fleet/cron.go b/cmd/fleet/cron.go index eb095141c9..a2679062eb 100644 --- a/cmd/fleet/cron.go +++ b/cmd/fleet/cron.go @@ -365,22 +365,24 @@ func startAutomationsSchedule( intervalReload time.Duration, failingPoliciesSet fleet.FailingPolicySet, ) (*schedule.Schedule, error) { - const defaultAutomationsInterval = 24 * time.Hour - + const ( + name = "automations" + defaultInterval = 24 * time.Hour + ) appConfig, err := ds.AppConfig(ctx) if err != nil { return nil, fmt.Errorf("getting app config: %w", err) } s := schedule.New( // TODO(sarah): Reconfigure settings so automations interval doesn't reside under webhook settings - ctx, "automations", instanceID, appConfig.WebhookSettings.Interval.ValueOr(defaultAutomationsInterval), ds, - schedule.WithLogger(kitlog.With(logger, "cron", "automations")), + ctx, name, instanceID, appConfig.WebhookSettings.Interval.ValueOr(defaultInterval), ds, + schedule.WithLogger(kitlog.With(logger, "cron", name)), schedule.WithConfigReloadInterval(intervalReload, func(ctx context.Context) (time.Duration, error) { appConfig, err := ds.AppConfig(ctx) if err != nil { return 0, err } - newInterval := appConfig.WebhookSettings.Interval.ValueOr(defaultAutomationsInterval) + newInterval := appConfig.WebhookSettings.Interval.ValueOr(defaultInterval) return newInterval, nil }), schedule.WithJob( @@ -456,18 +458,18 @@ func triggerFailingPoliciesAutomation( return nil } -func cronWorker( +func startIntegrationsSchedule( ctx context.Context, + instanceID string, ds fleet.Datastore, logger kitlog.Logger, - identifier string, -) { +) (*schedule.Schedule, error) { const ( - lockDuration = 10 * time.Minute - lockAttemptInterval = 10 * time.Minute + name = "integrations" + defaultInterval = 10 * time.Minute ) - logger = kitlog.With(logger, "cron", lockKeyWorker) + logger = kitlog.With(logger, "cron", name) // create the worker and register the Jira and Zendesk jobs even if no // integration is enabled, as that config can change live (and if it's not @@ -494,8 +496,9 @@ func cronWorker( // is not a possible scenario. appConfig, err := ds.AppConfig(ctx) if err != nil { - errHandler(ctx, logger, "couldn't read app config", err) + return nil, fmt.Errorf("getting app config: %w", err) } + // we clear it even if we fail to load the app config, not a likely scenario // in our test environments for the needs of forced failures. if !strings.Contains(appConfig.ServerSettings.ServerURL, "fleetdm") { @@ -503,42 +506,34 @@ func cronWorker( os.Unsetenv("FLEET_ZENDESK_CLIENT_FORCED_FAILURES") } - ticker := time.NewTicker(10 * time.Second) - for { - select { - case <-ticker.C: - level.Debug(logger).Log("waiting", "done") - ticker.Reset(lockAttemptInterval) - case <-ctx.Done(): - level.Debug(logger).Log("exit", "done with cron.") - return - } + s := schedule.New( + ctx, name, instanceID, defaultInterval, ds, + schedule.WithAltLockID("worker"), + schedule.WithLogger(logger), + schedule.WithJob("integrations_worker", func(ctx context.Context) error { + // Read app config to be able to use the latest configuration for integrations. + appConfig, err := ds.AppConfig(ctx) + if err != nil { + return fmt.Errorf("getting app config: %w", err) + } - if locked, err := ds.Lock(ctx, lockKeyWorker, identifier, lockDuration); err != nil { - level.Error(logger).Log("msg", "Error acquiring lock", "err", err) - continue - } else if !locked { - level.Debug(logger).Log("msg", "Not the leader. Skipping...") - continue - } + jira.FleetURL = appConfig.ServerSettings.ServerURL + zendesk.FleetURL = appConfig.ServerSettings.ServerURL - // Read app config to be able to use the latest configuration for the Jira - // integration. - appConfig, err := ds.AppConfig(ctx) - if err != nil { - errHandler(ctx, logger, "couldn't read app config", err) - continue - } + workCtx, cancel := context.WithTimeout(ctx, defaultInterval) + if err := w.ProcessJobs(workCtx); err != nil { + cancel() // don't use defer inside loop + return fmt.Errorf("processing integrations jobs: %w", err) + } - jira.FleetURL = appConfig.ServerSettings.ServerURL - zendesk.FleetURL = appConfig.ServerSettings.ServerURL + cancel() // don't use defer inside loop + return nil + }), + ) - workCtx, cancel := context.WithTimeout(ctx, lockDuration) - if err := w.ProcessJobs(workCtx); err != nil { - errHandler(ctx, logger, "Error processing jobs", err) - } - cancel() // don't use defer inside loop - } + s.Start() + + return s, nil } func newJiraClient(opts *externalsvc.JiraOptions) (worker.JiraClient, error) { diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index 28ccea4d59..86de4c538d 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -400,11 +400,13 @@ the way that the Fleet server works. if err != nil { initFatal(errors.New("Error generating random instance identifier"), "") } - runCrons(ctx, ds, task, kitlog.With(logger, "component", "crons"), config, license, instanceID) if err := startSchedules(ctx, ds, logger, config, license, redisWrapperDS, failingPolicySet, instanceID); err != nil { initFatal(err, "failed to register schedules") } + // StartCollectors starts a goroutine per collector, using ctx to cancel. + task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task")) + // Flush seen hosts every second hostsAsyncCfg := config.Osquery.AsyncConfigForTask(configpkg.AsyncTaskHostLastSeen) if !hostsAsyncCfg.Enabled { @@ -652,26 +654,6 @@ func basicAuthHandler(username, password string, next http.Handler) http.Handler } } -const ( - lockKeyWorker = "worker" -) - -// runCrons runs cron jobs not yet ported to use the schedule package (startSchedules) -func runCrons( - ctx context.Context, - ds fleet.Datastore, - task *async.Task, - logger kitlog.Logger, - config configpkg.FleetConfig, - license *fleet.LicenseInfo, - ourIdentifier string, -) { - // StartCollectors starts a goroutine per collector, using ctx to cancel. - task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task")) - - go cronWorker(ctx, ds, kitlog.With(logger, "cron", "worker"), ourIdentifier) -} - func startSchedules( ctx context.Context, ds fleet.Datastore, @@ -688,6 +670,9 @@ func startSchedules( if _, err := startAutomationsSchedule(ctx, instanceID, ds, logger, 5*time.Minute, failingPoliciesSet); err != nil { return err } + if _, err := startIntegrationsSchedule(ctx, instanceID, ds, logger); err != nil { + return err + } return nil }