mirror of
https://github.com/fleetdm/fleet
synced 2026-05-24 09:28:54 +00:00
Refactor cron worker to schedule package (#7886)
This commit is contained in:
parent
fca5ad3158
commit
639b85e47b
2 changed files with 45 additions and 65 deletions
|
|
@ -365,22 +365,24 @@ func startAutomationsSchedule(
|
|||
intervalReload time.Duration,
|
||||
failingPoliciesSet fleet.FailingPolicySet,
|
||||
) (*schedule.Schedule, error) {
|
||||
const defaultAutomationsInterval = 24 * time.Hour
|
||||
|
||||
const (
|
||||
name = "automations"
|
||||
defaultInterval = 24 * time.Hour
|
||||
)
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting app config: %w", err)
|
||||
}
|
||||
s := schedule.New(
|
||||
// TODO(sarah): Reconfigure settings so automations interval doesn't reside under webhook settings
|
||||
ctx, "automations", instanceID, appConfig.WebhookSettings.Interval.ValueOr(defaultAutomationsInterval), ds,
|
||||
schedule.WithLogger(kitlog.With(logger, "cron", "automations")),
|
||||
ctx, name, instanceID, appConfig.WebhookSettings.Interval.ValueOr(defaultInterval), ds,
|
||||
schedule.WithLogger(kitlog.With(logger, "cron", name)),
|
||||
schedule.WithConfigReloadInterval(intervalReload, func(ctx context.Context) (time.Duration, error) {
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
newInterval := appConfig.WebhookSettings.Interval.ValueOr(defaultAutomationsInterval)
|
||||
newInterval := appConfig.WebhookSettings.Interval.ValueOr(defaultInterval)
|
||||
return newInterval, nil
|
||||
}),
|
||||
schedule.WithJob(
|
||||
|
|
@ -456,18 +458,18 @@ func triggerFailingPoliciesAutomation(
|
|||
return nil
|
||||
}
|
||||
|
||||
func cronWorker(
|
||||
func startIntegrationsSchedule(
|
||||
ctx context.Context,
|
||||
instanceID string,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
identifier string,
|
||||
) {
|
||||
) (*schedule.Schedule, error) {
|
||||
const (
|
||||
lockDuration = 10 * time.Minute
|
||||
lockAttemptInterval = 10 * time.Minute
|
||||
name = "integrations"
|
||||
defaultInterval = 10 * time.Minute
|
||||
)
|
||||
|
||||
logger = kitlog.With(logger, "cron", lockKeyWorker)
|
||||
logger = kitlog.With(logger, "cron", name)
|
||||
|
||||
// create the worker and register the Jira and Zendesk jobs even if no
|
||||
// integration is enabled, as that config can change live (and if it's not
|
||||
|
|
@ -494,8 +496,9 @@ func cronWorker(
|
|||
// is not a possible scenario.
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
errHandler(ctx, logger, "couldn't read app config", err)
|
||||
return nil, fmt.Errorf("getting app config: %w", err)
|
||||
}
|
||||
|
||||
// we clear it even if we fail to load the app config, not a likely scenario
|
||||
// in our test environments for the needs of forced failures.
|
||||
if !strings.Contains(appConfig.ServerSettings.ServerURL, "fleetdm") {
|
||||
|
|
@ -503,42 +506,34 @@ func cronWorker(
|
|||
os.Unsetenv("FLEET_ZENDESK_CLIENT_FORCED_FAILURES")
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
level.Debug(logger).Log("waiting", "done")
|
||||
ticker.Reset(lockAttemptInterval)
|
||||
case <-ctx.Done():
|
||||
level.Debug(logger).Log("exit", "done with cron.")
|
||||
return
|
||||
}
|
||||
s := schedule.New(
|
||||
ctx, name, instanceID, defaultInterval, ds,
|
||||
schedule.WithAltLockID("worker"),
|
||||
schedule.WithLogger(logger),
|
||||
schedule.WithJob("integrations_worker", func(ctx context.Context) error {
|
||||
// Read app config to be able to use the latest configuration for integrations.
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting app config: %w", err)
|
||||
}
|
||||
|
||||
if locked, err := ds.Lock(ctx, lockKeyWorker, identifier, lockDuration); err != nil {
|
||||
level.Error(logger).Log("msg", "Error acquiring lock", "err", err)
|
||||
continue
|
||||
} else if !locked {
|
||||
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
|
||||
continue
|
||||
}
|
||||
jira.FleetURL = appConfig.ServerSettings.ServerURL
|
||||
zendesk.FleetURL = appConfig.ServerSettings.ServerURL
|
||||
|
||||
// Read app config to be able to use the latest configuration for the Jira
|
||||
// integration.
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
errHandler(ctx, logger, "couldn't read app config", err)
|
||||
continue
|
||||
}
|
||||
workCtx, cancel := context.WithTimeout(ctx, defaultInterval)
|
||||
if err := w.ProcessJobs(workCtx); err != nil {
|
||||
cancel() // don't use defer inside loop
|
||||
return fmt.Errorf("processing integrations jobs: %w", err)
|
||||
}
|
||||
|
||||
jira.FleetURL = appConfig.ServerSettings.ServerURL
|
||||
zendesk.FleetURL = appConfig.ServerSettings.ServerURL
|
||||
cancel() // don't use defer inside loop
|
||||
return nil
|
||||
}),
|
||||
)
|
||||
|
||||
workCtx, cancel := context.WithTimeout(ctx, lockDuration)
|
||||
if err := w.ProcessJobs(workCtx); err != nil {
|
||||
errHandler(ctx, logger, "Error processing jobs", err)
|
||||
}
|
||||
cancel() // don't use defer inside loop
|
||||
}
|
||||
s.Start()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func newJiraClient(opts *externalsvc.JiraOptions) (worker.JiraClient, error) {
|
||||
|
|
|
|||
|
|
@ -400,11 +400,13 @@ the way that the Fleet server works.
|
|||
if err != nil {
|
||||
initFatal(errors.New("Error generating random instance identifier"), "")
|
||||
}
|
||||
runCrons(ctx, ds, task, kitlog.With(logger, "component", "crons"), config, license, instanceID)
|
||||
if err := startSchedules(ctx, ds, logger, config, license, redisWrapperDS, failingPolicySet, instanceID); err != nil {
|
||||
initFatal(err, "failed to register schedules")
|
||||
}
|
||||
|
||||
// StartCollectors starts a goroutine per collector, using ctx to cancel.
|
||||
task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task"))
|
||||
|
||||
// Flush seen hosts every second
|
||||
hostsAsyncCfg := config.Osquery.AsyncConfigForTask(configpkg.AsyncTaskHostLastSeen)
|
||||
if !hostsAsyncCfg.Enabled {
|
||||
|
|
@ -652,26 +654,6 @@ func basicAuthHandler(username, password string, next http.Handler) http.Handler
|
|||
}
|
||||
}
|
||||
|
||||
const (
|
||||
lockKeyWorker = "worker"
|
||||
)
|
||||
|
||||
// runCrons runs cron jobs not yet ported to use the schedule package (startSchedules)
|
||||
func runCrons(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
task *async.Task,
|
||||
logger kitlog.Logger,
|
||||
config configpkg.FleetConfig,
|
||||
license *fleet.LicenseInfo,
|
||||
ourIdentifier string,
|
||||
) {
|
||||
// StartCollectors starts a goroutine per collector, using ctx to cancel.
|
||||
task.StartCollectors(ctx, kitlog.With(logger, "cron", "async_task"))
|
||||
|
||||
go cronWorker(ctx, ds, kitlog.With(logger, "cron", "worker"), ourIdentifier)
|
||||
}
|
||||
|
||||
func startSchedules(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
|
|
@ -688,6 +670,9 @@ func startSchedules(
|
|||
if _, err := startAutomationsSchedule(ctx, instanceID, ds, logger, 5*time.Minute, failingPoliciesSet); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := startIntegrationsSchedule(ctx, instanceID, ds, logger); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue