mirror of
https://github.com/fleetdm/fleet
synced 2026-04-21 13:37:30 +00:00
Merge jira-integration branch to main (#4949)
This commit is contained in:
parent
536e828f43
commit
f28dc10a51
14 changed files with 887 additions and 305 deletions
1
Makefile
1
Makefile
|
|
@ -178,6 +178,7 @@ deps-go:
|
|||
|
||||
migration:
|
||||
go run github.com/fleetdm/goose/cmd/goose -dir server/datastore/mysql/migrations/tables create $(name)
|
||||
gofmt -w server/datastore/mysql/migrations/tables/*_$(name)*.go
|
||||
|
||||
clean: clean-assets
|
||||
rm -rf build vendor
|
||||
|
|
|
|||
369
cmd/fleet/cron.go
Normal file
369
cmd/fleet/cron.go
Normal file
|
|
@ -0,0 +1,369 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/config"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/fleetdm/fleet/v4/server/vulnerabilities"
|
||||
"github.com/fleetdm/fleet/v4/server/webhooks"
|
||||
"github.com/fleetdm/fleet/v4/server/worker"
|
||||
"github.com/getsentry/sentry-go"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
)
|
||||
|
||||
func cronDB(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, identifier string, license *fleet.LicenseInfo) {
|
||||
logger = kitlog.With(logger, "cron", lockKeyLeader)
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
for {
|
||||
level.Debug(logger).Log("waiting", "on ticker")
|
||||
select {
|
||||
case <-ticker.C:
|
||||
level.Debug(logger).Log("waiting", "done")
|
||||
ticker.Reset(1 * time.Hour)
|
||||
case <-ctx.Done():
|
||||
level.Debug(logger).Log("exit", "done with cron.")
|
||||
return
|
||||
}
|
||||
|
||||
if locked, err := ds.Lock(ctx, lockKeyLeader, identifier, 1*time.Hour); err != nil {
|
||||
level.Error(logger).Log("msg", "Error acquiring lock", "err", err)
|
||||
continue
|
||||
} else if !locked {
|
||||
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
|
||||
continue
|
||||
}
|
||||
|
||||
_, err := ds.CleanupDistributedQueryCampaigns(ctx, time.Now())
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning distributed query campaigns", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.CleanupIncomingHosts(ctx, time.Now())
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning incoming hosts", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
_, err = ds.CleanupCarves(ctx, time.Now())
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning carves", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.UpdateQueryAggregatedStats(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "aggregating query stats", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.UpdateScheduledQueryAggregatedStats(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "aggregating scheduled query stats", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.CleanupExpiredHosts(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning expired hosts", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.GenerateAggregatedMunkiAndMDM(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "aggregating munki and mdm data", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.CleanupPolicyMembership(ctx, time.Now())
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleanup policy membership", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.UpdateOSVersions(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "update os versions", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
|
||||
// NOTE(mna): this is not a route from the fleet server (not in server/service/handler.go) so it
|
||||
// will not automatically support the /latest/ versioning. Leaving it as /v1/ for that reason.
|
||||
err = trySendStatistics(ctx, ds, fleet.StatisticsFrequency, "https://fleetdm.com/api/v1/webhooks/receive-usage-analytics", license)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "sending statistics", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
|
||||
level.Debug(logger).Log("loop", "done")
|
||||
}
|
||||
}
|
||||
|
||||
func cronVulnerabilities(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
identifier string,
|
||||
config config.FleetConfig,
|
||||
) {
|
||||
logger = kitlog.With(logger, "cron", lockKeyVulnerabilities)
|
||||
|
||||
if config.Vulnerabilities.CurrentInstanceChecks == "no" || config.Vulnerabilities.CurrentInstanceChecks == "0" {
|
||||
level.Info(logger).Log("vulnerability scanning", "host not configured to check for vulnerabilities")
|
||||
return
|
||||
}
|
||||
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("config", "couldn't read app config", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
vulnDisabled := false
|
||||
if appConfig.VulnerabilitySettings.DatabasesPath == "" &&
|
||||
config.Vulnerabilities.DatabasesPath == "" {
|
||||
level.Info(logger).Log("vulnerability scanning", "not configured")
|
||||
vulnDisabled = true
|
||||
}
|
||||
if !appConfig.HostSettings.EnableSoftwareInventory {
|
||||
level.Info(logger).Log("software inventory", "not configured")
|
||||
return
|
||||
}
|
||||
|
||||
vulnPath := appConfig.VulnerabilitySettings.DatabasesPath
|
||||
if vulnPath == "" {
|
||||
vulnPath = config.Vulnerabilities.DatabasesPath
|
||||
}
|
||||
if config.Vulnerabilities.DatabasesPath != "" && config.Vulnerabilities.DatabasesPath != vulnPath {
|
||||
vulnPath = config.Vulnerabilities.DatabasesPath
|
||||
level.Info(logger).Log(
|
||||
"databases_path", "fleet config takes precedence over app config when both are configured",
|
||||
"result", vulnPath)
|
||||
}
|
||||
|
||||
if !vulnDisabled {
|
||||
level.Info(logger).Log("databases-path", vulnPath)
|
||||
}
|
||||
level.Info(logger).Log("periodicity", config.Vulnerabilities.Periodicity)
|
||||
|
||||
if !vulnDisabled {
|
||||
if config.Vulnerabilities.CurrentInstanceChecks == "auto" {
|
||||
level.Debug(logger).Log("current instance checks", "auto", "trying to create databases-path", vulnPath)
|
||||
err := os.MkdirAll(vulnPath, 0o755)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("databases-path", "creation failed, returning", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
for {
|
||||
level.Debug(logger).Log("waiting", "on ticker")
|
||||
select {
|
||||
case <-ticker.C:
|
||||
level.Debug(logger).Log("waiting", "done")
|
||||
ticker.Reset(config.Vulnerabilities.Periodicity)
|
||||
case <-ctx.Done():
|
||||
level.Debug(logger).Log("exit", "done with cron.")
|
||||
return
|
||||
}
|
||||
if config.Vulnerabilities.CurrentInstanceChecks == "auto" {
|
||||
if locked, err := ds.Lock(ctx, lockKeyVulnerabilities, identifier, 1*time.Hour); err != nil {
|
||||
level.Error(logger).Log("msg", "Error acquiring lock", "err", err)
|
||||
continue
|
||||
} else if !locked {
|
||||
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if !vulnDisabled {
|
||||
recentVulns := checkVulnerabilities(ctx, ds, logger, vulnPath, config, appConfig.WebhookSettings.VulnerabilitiesWebhook)
|
||||
if len(recentVulns) > 0 {
|
||||
if err := webhooks.TriggerVulnerabilitiesWebhook(ctx, ds, kitlog.With(logger, "webhook", "vulnerabilities"),
|
||||
recentVulns, appConfig, time.Now()); err != nil {
|
||||
|
||||
level.Error(logger).Log("err", "triggering vulnerabilities webhook", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := ds.CalculateHostsPerSoftware(ctx, time.Now()); err != nil {
|
||||
level.Error(logger).Log("msg", "calculating hosts count per software", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
|
||||
// It's important vulnerabilities.PostProcess runs after ds.CalculateHostsPerSoftware
|
||||
// because it cleans up any software that's not installed on the fleet (e.g. hosts removal,
|
||||
// or software being uninstalled on hosts).
|
||||
if !vulnDisabled {
|
||||
if err := vulnerabilities.PostProcess(ctx, ds, vulnPath, logger, config); err != nil {
|
||||
level.Error(logger).Log("msg", "post processing CVEs", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
level.Debug(logger).Log("loop", "done")
|
||||
}
|
||||
}
|
||||
|
||||
func checkVulnerabilities(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger,
|
||||
vulnPath string, config config.FleetConfig, vulnWebhookCfg fleet.VulnerabilitiesWebhookSettings) map[string][]string {
|
||||
err := vulnerabilities.TranslateSoftwareToCPE(ctx, ds, vulnPath, logger, config)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "analyzing vulnerable software: Software->CPE", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
recentVulns, err := vulnerabilities.TranslateCPEToCVE(ctx, ds, vulnPath, logger, config, vulnWebhookCfg.Enable)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "analyzing vulnerable software: CPE->CVE", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
return recentVulns
|
||||
}
|
||||
|
||||
func cronWebhooks(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
identifier string,
|
||||
failingPoliciesSet fleet.FailingPolicySet,
|
||||
intervalReload time.Duration,
|
||||
) {
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("config", "couldn't read app config", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
interval := appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour)
|
||||
level.Debug(logger).Log("interval", interval.String())
|
||||
ticker := time.NewTicker(interval)
|
||||
start := time.Now()
|
||||
for {
|
||||
level.Debug(logger).Log("waiting", "on ticker")
|
||||
select {
|
||||
case <-ticker.C:
|
||||
level.Debug(logger).Log("waiting", "done")
|
||||
case <-ctx.Done():
|
||||
level.Debug(logger).Log("exit", "done with cron.")
|
||||
return
|
||||
case <-time.After(intervalReload):
|
||||
// Reload interval and check if it has been reduced.
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("config", "couldn't read app config", "err", err)
|
||||
continue
|
||||
}
|
||||
if currInterval := appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour); time.Since(start) < currInterval {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Reread app config to be able to read latest data used by the webhook
|
||||
// and update the ticker for the next run.
|
||||
appConfig, err = ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("config", "couldn't read app config", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
} else {
|
||||
ticker.Reset(appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour))
|
||||
start = time.Now()
|
||||
}
|
||||
|
||||
// We set the db lock durations to match the intervalReload.
|
||||
maybeTriggerHostStatus(ctx, ds, logger, identifier, appConfig, intervalReload)
|
||||
maybeTriggerFailingPoliciesWebhook(ctx, ds, logger, identifier, appConfig, intervalReload, failingPoliciesSet)
|
||||
|
||||
level.Debug(logger).Log("loop", "done")
|
||||
}
|
||||
}
|
||||
|
||||
func maybeTriggerHostStatus(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
identifier string,
|
||||
appConfig *fleet.AppConfig,
|
||||
lockDuration time.Duration,
|
||||
) {
|
||||
logger = kitlog.With(logger, "cron", lockKeyWebhooksHostStatus)
|
||||
|
||||
if locked, err := ds.Lock(ctx, lockKeyWebhooksHostStatus, identifier, lockDuration); err != nil {
|
||||
level.Error(logger).Log("msg", "Error acquiring lock", "err", err)
|
||||
return
|
||||
} else if !locked {
|
||||
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
|
||||
return
|
||||
}
|
||||
|
||||
if err := webhooks.TriggerHostStatusWebhook(
|
||||
ctx, ds, kitlog.With(logger, "webhook", "host_status"), appConfig,
|
||||
); err != nil {
|
||||
level.Error(logger).Log("err", "triggering host status webhook", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
func maybeTriggerFailingPoliciesWebhook(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
identifier string,
|
||||
appConfig *fleet.AppConfig,
|
||||
lockDuration time.Duration,
|
||||
failingPoliciesSet fleet.FailingPolicySet,
|
||||
) {
|
||||
logger = kitlog.With(logger, "cron", lockKeyWebhooksFailingPolicies)
|
||||
|
||||
if locked, err := ds.Lock(ctx, lockKeyWebhooksFailingPolicies, identifier, lockDuration); err != nil {
|
||||
level.Error(logger).Log("msg", "Error acquiring lock", "err", err)
|
||||
return
|
||||
} else if !locked {
|
||||
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
|
||||
return
|
||||
}
|
||||
|
||||
if err := webhooks.TriggerFailingPoliciesWebhook(
|
||||
ctx, ds, kitlog.With(logger, "webhook", "failing_policies"), appConfig, failingPoliciesSet, time.Now(),
|
||||
); err != nil {
|
||||
level.Error(logger).Log("err", "triggering failing policies webhook", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
func cronWorker(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, identifier string) {
|
||||
logger = kitlog.With(logger, "cron", lockKeyWorker)
|
||||
|
||||
w := worker.NewWorker(ds, logger)
|
||||
|
||||
jira := worker.NewJira(ds, logger)
|
||||
w.Register(jira)
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
level.Debug(logger).Log("waiting", "done")
|
||||
ticker.Reset(10 * time.Minute)
|
||||
case <-ctx.Done():
|
||||
level.Debug(logger).Log("exit", "done with cron.")
|
||||
return
|
||||
}
|
||||
|
||||
if locked, err := ds.Lock(ctx, lockKeyWorker, identifier, 10*time.Minute); err != nil {
|
||||
level.Error(logger).Log("msg", "Error acquiring lock", "err", err)
|
||||
continue
|
||||
} else if !locked {
|
||||
level.Debug(logger).Log("msg", "Not the leader. Skipping...")
|
||||
continue
|
||||
}
|
||||
|
||||
err := w.ProcessJobs(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Error processing jobs", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -41,8 +41,6 @@ import (
|
|||
"github.com/fleetdm/fleet/v4/server/service/async"
|
||||
"github.com/fleetdm/fleet/v4/server/service/redis_policy_set"
|
||||
"github.com/fleetdm/fleet/v4/server/sso"
|
||||
"github.com/fleetdm/fleet/v4/server/vulnerabilities"
|
||||
"github.com/fleetdm/fleet/v4/server/webhooks"
|
||||
"github.com/getsentry/sentry-go"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
|
|
@ -571,6 +569,7 @@ const (
|
|||
lockKeyVulnerabilities = "vulnerabilities"
|
||||
lockKeyWebhooksHostStatus = "webhooks" // keeping this name for backwards compatibility.
|
||||
lockKeyWebhooksFailingPolicies = "webhooks:global_failing_policies"
|
||||
lockKeyWorker = "worker"
|
||||
)
|
||||
|
||||
func trySendStatistics(ctx context.Context, ds fleet.Datastore, frequency time.Duration, url string, license *fleet.LicenseInfo) error {
|
||||
|
|
@ -597,7 +596,14 @@ func trySendStatistics(ctx context.Context, ds fleet.Datastore, frequency time.D
|
|||
return ds.RecordStatisticsSent(ctx)
|
||||
}
|
||||
|
||||
func runCrons(ds fleet.Datastore, task *async.Task, logger kitlog.Logger, config config.FleetConfig, license *fleet.LicenseInfo, failingPoliciesSet fleet.FailingPolicySet) context.CancelFunc {
|
||||
func runCrons(
|
||||
ds fleet.Datastore,
|
||||
task *async.Task,
|
||||
logger kitlog.Logger,
|
||||
config config.FleetConfig,
|
||||
license *fleet.LicenseInfo,
|
||||
failingPoliciesSet fleet.FailingPolicySet,
|
||||
) context.CancelFunc {
|
||||
ctx, cancelBackground := context.WithCancel(context.Background())
|
||||
|
||||
ourIdentifier, err := server.GenerateRandomText(64)
|
||||
|
|
@ -612,309 +618,11 @@ func runCrons(ds fleet.Datastore, task *async.Task, logger kitlog.Logger, config
|
|||
go cronVulnerabilities(
|
||||
ctx, ds, kitlog.With(logger, "cron", "vulnerabilities"), ourIdentifier, config)
|
||||
go cronWebhooks(ctx, ds, kitlog.With(logger, "cron", "webhooks"), ourIdentifier, failingPoliciesSet, 1*time.Hour)
|
||||
go cronWorker(ctx, ds, kitlog.With(logger, "cron", "worker"), ourIdentifier)
|
||||
|
||||
return cancelBackground
|
||||
}
|
||||
|
||||
func cronDB(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, identifier string, license *fleet.LicenseInfo) {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
for {
|
||||
level.Debug(logger).Log("waiting", "on ticker")
|
||||
select {
|
||||
case <-ticker.C:
|
||||
level.Debug(logger).Log("waiting", "done")
|
||||
ticker.Reset(1 * time.Hour)
|
||||
case <-ctx.Done():
|
||||
level.Debug(logger).Log("exit", "done with cron.")
|
||||
return
|
||||
}
|
||||
|
||||
if locked, err := ds.Lock(ctx, lockKeyLeader, identifier, time.Hour); err != nil || !locked {
|
||||
level.Debug(logger).Log("leader", "Not the leader. Skipping...")
|
||||
continue
|
||||
}
|
||||
|
||||
_, err := ds.CleanupDistributedQueryCampaigns(ctx, time.Now())
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning distributed query campaigns", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.CleanupIncomingHosts(ctx, time.Now())
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning incoming hosts", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
_, err = ds.CleanupCarves(ctx, time.Now())
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning carves", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.UpdateQueryAggregatedStats(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "aggregating query stats", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.UpdateScheduledQueryAggregatedStats(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "aggregating scheduled query stats", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.CleanupExpiredHosts(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning expired hosts", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.GenerateAggregatedMunkiAndMDM(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "aggregating munki and mdm data", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.CleanupPolicyMembership(ctx, time.Now())
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleanup policy membership", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
err = ds.UpdateOSVersions(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "update os versions", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
|
||||
// NOTE(mna): this is not a route from the fleet server (not in server/service/handler.go) so it
|
||||
// will not automatically support the /latest/ versioning. Leaving it as /v1/ for that reason.
|
||||
err = trySendStatistics(ctx, ds, fleet.StatisticsFrequency, "https://fleetdm.com/api/v1/webhooks/receive-usage-analytics", license)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "sending statistics", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
|
||||
level.Debug(logger).Log("loop", "done")
|
||||
}
|
||||
}
|
||||
|
||||
func cronVulnerabilities(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
identifier string,
|
||||
config config.FleetConfig,
|
||||
) {
|
||||
if config.Vulnerabilities.CurrentInstanceChecks == "no" || config.Vulnerabilities.CurrentInstanceChecks == "0" {
|
||||
level.Info(logger).Log("vulnerability scanning", "host not configured to check for vulnerabilities")
|
||||
return
|
||||
}
|
||||
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("config", "couldn't read app config", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
vulnDisabled := false
|
||||
if appConfig.VulnerabilitySettings.DatabasesPath == "" &&
|
||||
config.Vulnerabilities.DatabasesPath == "" {
|
||||
level.Info(logger).Log("vulnerability scanning", "not configured")
|
||||
vulnDisabled = true
|
||||
}
|
||||
if !appConfig.HostSettings.EnableSoftwareInventory {
|
||||
level.Info(logger).Log("software inventory", "not configured")
|
||||
return
|
||||
}
|
||||
|
||||
vulnPath := appConfig.VulnerabilitySettings.DatabasesPath
|
||||
if vulnPath == "" {
|
||||
vulnPath = config.Vulnerabilities.DatabasesPath
|
||||
}
|
||||
if config.Vulnerabilities.DatabasesPath != "" && config.Vulnerabilities.DatabasesPath != vulnPath {
|
||||
vulnPath = config.Vulnerabilities.DatabasesPath
|
||||
level.Info(logger).Log(
|
||||
"databases_path", "fleet config takes precedence over app config when both are configured",
|
||||
"result", vulnPath)
|
||||
}
|
||||
|
||||
if !vulnDisabled {
|
||||
level.Info(logger).Log("databases-path", vulnPath)
|
||||
}
|
||||
level.Info(logger).Log("periodicity", config.Vulnerabilities.Periodicity)
|
||||
|
||||
if !vulnDisabled {
|
||||
if config.Vulnerabilities.CurrentInstanceChecks == "auto" {
|
||||
level.Debug(logger).Log("current instance checks", "auto", "trying to create databases-path", vulnPath)
|
||||
err := os.MkdirAll(vulnPath, 0o755)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("databases-path", "creation failed, returning", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
for {
|
||||
level.Debug(logger).Log("waiting", "on ticker")
|
||||
select {
|
||||
case <-ticker.C:
|
||||
level.Debug(logger).Log("waiting", "done")
|
||||
ticker.Reset(config.Vulnerabilities.Periodicity)
|
||||
case <-ctx.Done():
|
||||
level.Debug(logger).Log("exit", "done with cron.")
|
||||
return
|
||||
}
|
||||
if config.Vulnerabilities.CurrentInstanceChecks == "auto" {
|
||||
if locked, err := ds.Lock(ctx, lockKeyVulnerabilities, identifier, time.Hour); err != nil || !locked {
|
||||
level.Debug(logger).Log("leader", "Not the leader. Skipping...")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if !vulnDisabled {
|
||||
recentVulns := checkVulnerabilities(ctx, ds, logger, vulnPath, config, appConfig.WebhookSettings.VulnerabilitiesWebhook)
|
||||
if len(recentVulns) > 0 {
|
||||
if err := webhooks.TriggerVulnerabilitiesWebhook(ctx, ds, kitlog.With(logger, "webhook", "vulnerabilities"),
|
||||
recentVulns, appConfig, time.Now()); err != nil {
|
||||
|
||||
level.Error(logger).Log("err", "triggering vulnerabilities webhook", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := ds.CalculateHostsPerSoftware(ctx, time.Now()); err != nil {
|
||||
level.Error(logger).Log("msg", "calculating hosts count per software", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
|
||||
// It's important vulnerabilities.PostProcess runs after ds.CalculateHostsPerSoftware
|
||||
// because it cleans up any software that's not installed on the fleet (e.g. hosts removal,
|
||||
// or software being uninstalled on hosts).
|
||||
if !vulnDisabled {
|
||||
if err := vulnerabilities.PostProcess(ctx, ds, vulnPath, logger, config); err != nil {
|
||||
level.Error(logger).Log("msg", "post processing CVEs", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
level.Debug(logger).Log("loop", "done")
|
||||
}
|
||||
}
|
||||
|
||||
func checkVulnerabilities(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger,
|
||||
vulnPath string, config config.FleetConfig, vulnWebhookCfg fleet.VulnerabilitiesWebhookSettings) map[string][]string {
|
||||
err := vulnerabilities.TranslateSoftwareToCPE(ctx, ds, vulnPath, logger, config)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "analyzing vulnerable software: Software->CPE", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
recentVulns, err := vulnerabilities.TranslateCPEToCVE(ctx, ds, vulnPath, logger, config, vulnWebhookCfg.Enable)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "analyzing vulnerable software: CPE->CVE", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
return recentVulns
|
||||
}
|
||||
|
||||
func cronWebhooks(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
identifier string,
|
||||
failingPoliciesSet fleet.FailingPolicySet,
|
||||
intervalReload time.Duration,
|
||||
) {
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("config", "couldn't read app config", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
interval := appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour)
|
||||
level.Debug(logger).Log("interval", interval.String())
|
||||
ticker := time.NewTicker(interval)
|
||||
start := time.Now()
|
||||
for {
|
||||
level.Debug(logger).Log("waiting", "on ticker")
|
||||
select {
|
||||
case <-ticker.C:
|
||||
level.Debug(logger).Log("waiting", "done")
|
||||
case <-ctx.Done():
|
||||
level.Debug(logger).Log("exit", "done with cron.")
|
||||
return
|
||||
case <-time.After(intervalReload):
|
||||
// Reload interval and check if it has been reduced.
|
||||
appConfig, err := ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("config", "couldn't read app config", "err", err)
|
||||
continue
|
||||
}
|
||||
if currInterval := appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour); time.Since(start) < currInterval {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Reread app config to be able to read latest data used by the webhook
|
||||
// and update the ticker for the next run.
|
||||
appConfig, err = ds.AppConfig(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("config", "couldn't read app config", "err", err)
|
||||
sentry.CaptureException(err)
|
||||
} else {
|
||||
ticker.Reset(appConfig.WebhookSettings.Interval.ValueOr(24 * time.Hour))
|
||||
start = time.Now()
|
||||
}
|
||||
|
||||
// We set the db lock durations to match the intervalReload.
|
||||
maybeTriggerHostStatus(ctx, ds, logger, identifier, appConfig, intervalReload)
|
||||
maybeTriggerFailingPoliciesWebhook(ctx, ds, logger, identifier, appConfig, intervalReload, failingPoliciesSet)
|
||||
|
||||
level.Debug(logger).Log("loop", "done")
|
||||
}
|
||||
}
|
||||
|
||||
func maybeTriggerHostStatus(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
identifier string,
|
||||
appConfig *fleet.AppConfig,
|
||||
lockDuration time.Duration,
|
||||
) {
|
||||
if locked, err := ds.Lock(ctx, lockKeyWebhooksHostStatus, identifier, lockDuration); err != nil || !locked {
|
||||
level.Debug(logger).Log("leader-host-status", "Not the leader. Skipping...")
|
||||
return
|
||||
}
|
||||
|
||||
if err := webhooks.TriggerHostStatusWebhook(
|
||||
ctx, ds, kitlog.With(logger, "webhook", "host_status"), appConfig,
|
||||
); err != nil {
|
||||
level.Error(logger).Log("err", "triggering host status webhook", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
func maybeTriggerFailingPoliciesWebhook(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
identifier string,
|
||||
appConfig *fleet.AppConfig,
|
||||
lockDuration time.Duration,
|
||||
failingPoliciesSet fleet.FailingPolicySet,
|
||||
) {
|
||||
if locked, err := ds.Lock(ctx, lockKeyWebhooksFailingPolicies, identifier, lockDuration); err != nil || !locked {
|
||||
level.Debug(logger).Log("leader-failing-policies", "Not the leader. Skipping...")
|
||||
return
|
||||
}
|
||||
|
||||
if err := webhooks.TriggerFailingPoliciesWebhook(
|
||||
ctx, ds, kitlog.With(logger, "webhook", "failing_policies"), appConfig, failingPoliciesSet, time.Now(),
|
||||
); err != nil {
|
||||
level.Error(logger).Log("err", "triggering failing policies webhook", "details", err)
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Support for TLS security profiles, we set up the TLS configuation based on
|
||||
// value supplied to server_tls_compatibility command line flag. The default
|
||||
// profile is 'modern'.
|
||||
|
|
|
|||
|
|
@ -242,7 +242,7 @@ func TestCronVulnerabilitiesAcceptsExistingDbPath(t *testing.T) {
|
|||
cancelFunc()
|
||||
cronVulnerabilities(ctx, ds, logger, "AAA", fleetConfig)
|
||||
|
||||
require.Contains(t, buf.String(), `{"level":"debug","waiting":"on ticker"}`)
|
||||
require.Contains(t, buf.String(), `"waiting":"on ticker"`)
|
||||
}
|
||||
|
||||
func TestCronVulnerabilitiesQuitsIfErrorVulnPath(t *testing.T) {
|
||||
|
|
|
|||
70
server/datastore/mysql/jobs.go
Normal file
70
server/datastore/mysql/jobs.go
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
package mysql
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
func (ds *Datastore) NewJob(ctx context.Context, job *fleet.Job) (*fleet.Job, error) {
|
||||
query := `
|
||||
INSERT INTO jobs (
|
||||
name,
|
||||
args,
|
||||
state,
|
||||
retries,
|
||||
error
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
`
|
||||
result, err := ds.writer.ExecContext(ctx, query, job.Name, job.Args, job.State, job.Retries, job.Error)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id, _ := result.LastInsertId()
|
||||
job.ID = uint(id)
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (ds *Datastore) GetQueuedJobs(ctx context.Context, maxNumJobs int) ([]*fleet.Job, error) {
|
||||
query := `
|
||||
SELECT
|
||||
id, created_at, updated_at, name, args, state, retries, error
|
||||
FROM
|
||||
jobs
|
||||
WHERE
|
||||
state = ?
|
||||
ORDER BY
|
||||
created_at asc
|
||||
LIMIT ?
|
||||
`
|
||||
|
||||
var jobs []*fleet.Job
|
||||
err := sqlx.SelectContext(ctx, ds.reader, &jobs, query, fleet.JobStateQueued, maxNumJobs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
func (ds *Datastore) UpdateJob(ctx context.Context, id uint, job *fleet.Job) (*fleet.Job, error) {
|
||||
query := `
|
||||
UPDATE jobs
|
||||
SET
|
||||
state = ?,
|
||||
retries = ?,
|
||||
error = ?
|
||||
WHERE
|
||||
id = ?
|
||||
`
|
||||
_, err := ds.writer.ExecContext(ctx, query, job.State, job.Retries, job.Error, job.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
package tables
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
MigrationClient.AddMigration(Up_20220330100659, Down_20220330100659)
|
||||
}
|
||||
|
||||
func Up_20220330100659(tx *sql.Tx) error {
|
||||
_, err := tx.Exec(`
|
||||
CREATE TABLE jobs (
|
||||
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
args JSON,
|
||||
state VARCHAR(255) NOT NULL,
|
||||
retries INT NOT NULL DEFAULT 0,
|
||||
error TEXT
|
||||
)
|
||||
`)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "create table")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Down_20220330100659(tx *sql.Tx) error {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
package tables
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUp_20220330100659(t *testing.T) {
|
||||
db := applyUpToPrev(t)
|
||||
|
||||
applyNext(t, db)
|
||||
|
||||
query := `
|
||||
INSERT INTO jobs (
|
||||
name,
|
||||
args,
|
||||
state,
|
||||
retries,
|
||||
error
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
`
|
||||
_, err := db.Exec(query, "test", nil, "queued", 0, "")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
|
|
@ -519,6 +519,18 @@ type Datastore interface {
|
|||
|
||||
SerialUpdateHost(ctx context.Context, host *Host) error
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// JobStore
|
||||
|
||||
// NewJob inserts a new job into the jobs table (queue).
|
||||
NewJob(ctx context.Context, job *Job) (*Job, error)
|
||||
|
||||
// GetQueuedJobs gets queued jobs from the jobs table (queue).
|
||||
GetQueuedJobs(ctx context.Context, maxNumJobs int) ([]*Job, error)
|
||||
|
||||
// UpdateJobs updates an existing job. Call this after processing a job.
|
||||
UpdateJob(ctx context.Context, id uint, job *Job) (*Job, error)
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// Debug
|
||||
|
||||
|
|
|
|||
33
server/fleet/jobs.go
Normal file
33
server/fleet/jobs.go
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
package fleet
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
type JobState string
|
||||
|
||||
// The possible states for a job
|
||||
//
|
||||
// Queued───►Success
|
||||
// │
|
||||
// │
|
||||
// └──────►Failure
|
||||
//
|
||||
const (
|
||||
JobStateQueued JobState = "queued"
|
||||
JobStateSuccess JobState = "success"
|
||||
JobStateFailure JobState = "failure"
|
||||
)
|
||||
|
||||
// Job describes an asynchronous job started via the worker package.
|
||||
type Job struct {
|
||||
ID uint `json:"id" db:"id"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
|
||||
Name string `json:"name" db:"name"`
|
||||
Args *json.RawMessage `json:"args" db:"args"`
|
||||
State JobState `json:"state" db:"state"`
|
||||
Retries int `json:"retries" db:"retries"`
|
||||
Error string `json:"error" db:"error"`
|
||||
}
|
||||
|
|
@ -386,6 +386,12 @@ type EnrollHostFunc func(ctx context.Context, osqueryHostId string, nodeKey stri
|
|||
|
||||
type SerialUpdateHostFunc func(ctx context.Context, host *fleet.Host) error
|
||||
|
||||
type NewJobFunc func(ctx context.Context, job *fleet.Job) (*fleet.Job, error)
|
||||
|
||||
type GetQueuedJobsFunc func(ctx context.Context, maxNumJobs int) ([]*fleet.Job, error)
|
||||
|
||||
type UpdateJobFunc func(ctx context.Context, id uint, job *fleet.Job) (*fleet.Job, error)
|
||||
|
||||
type InnoDBStatusFunc func(ctx context.Context) (string, error)
|
||||
|
||||
type ProcessListFunc func(ctx context.Context) ([]fleet.MySQLProcess, error)
|
||||
|
|
@ -952,6 +958,15 @@ type DataStore struct {
|
|||
SerialUpdateHostFunc SerialUpdateHostFunc
|
||||
SerialUpdateHostFuncInvoked bool
|
||||
|
||||
NewJobFunc NewJobFunc
|
||||
NewJobFuncInvoked bool
|
||||
|
||||
GetQueuedJobsFunc GetQueuedJobsFunc
|
||||
GetQueuedJobsFuncInvoked bool
|
||||
|
||||
UpdateJobFunc UpdateJobFunc
|
||||
UpdateJobFuncInvoked bool
|
||||
|
||||
InnoDBStatusFunc InnoDBStatusFunc
|
||||
InnoDBStatusFuncInvoked bool
|
||||
|
||||
|
|
@ -1894,6 +1909,21 @@ func (s *DataStore) SerialUpdateHost(ctx context.Context, host *fleet.Host) erro
|
|||
return s.SerialUpdateHostFunc(ctx, host)
|
||||
}
|
||||
|
||||
func (s *DataStore) NewJob(ctx context.Context, job *fleet.Job) (*fleet.Job, error) {
|
||||
s.NewJobFuncInvoked = true
|
||||
return s.NewJobFunc(ctx, job)
|
||||
}
|
||||
|
||||
func (s *DataStore) GetQueuedJobs(ctx context.Context, maxNumJobs int) ([]*fleet.Job, error) {
|
||||
s.GetQueuedJobsFuncInvoked = true
|
||||
return s.GetQueuedJobsFunc(ctx, maxNumJobs)
|
||||
}
|
||||
|
||||
func (s *DataStore) UpdateJob(ctx context.Context, id uint, job *fleet.Job) (*fleet.Job, error) {
|
||||
s.UpdateJobFuncInvoked = true
|
||||
return s.UpdateJobFunc(ctx, id, job)
|
||||
}
|
||||
|
||||
func (s *DataStore) InnoDBStatus(ctx context.Context) (string, error) {
|
||||
s.InnoDBStatusFuncInvoked = true
|
||||
return s.InnoDBStatusFunc(ctx)
|
||||
|
|
|
|||
31
server/worker/jira.go
Normal file
31
server/worker/jira.go
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
)
|
||||
|
||||
type Jira struct {
|
||||
ds fleet.Datastore
|
||||
log kitlog.Logger
|
||||
// TODO: add jira client
|
||||
}
|
||||
|
||||
func NewJira(ds fleet.Datastore, log kitlog.Logger) *Jira {
|
||||
return &Jira{
|
||||
ds: ds,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
func (j *Jira) Name() string {
|
||||
return "jira"
|
||||
}
|
||||
|
||||
func (j *Jira) Run(ctx context.Context, argsJSON json.RawMessage) error {
|
||||
// TODO: implement me
|
||||
return nil
|
||||
}
|
||||
112
server/worker/worker.go
Normal file
112
server/worker/worker.go
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
)
|
||||
|
||||
const maxRetries = 5
|
||||
|
||||
// Job defines an interface for jobs that can be run by the Worker
|
||||
type Job interface {
|
||||
// Name is the unique name of the job.
|
||||
Name() string
|
||||
|
||||
// Run performs the actual work.
|
||||
Run(ctx context.Context, argsJSON json.RawMessage) error
|
||||
}
|
||||
|
||||
// Worker runs jobs. NOT SAFE FOR CONCURRENT USE.
|
||||
type Worker struct {
|
||||
ds fleet.Datastore
|
||||
log kitlog.Logger
|
||||
|
||||
registry map[string]Job
|
||||
}
|
||||
|
||||
func NewWorker(ds fleet.Datastore, log kitlog.Logger) *Worker {
|
||||
return &Worker{
|
||||
ds: ds,
|
||||
log: log,
|
||||
registry: make(map[string]Job),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Register(jobs ...Job) {
|
||||
for _, j := range jobs {
|
||||
name := j.Name()
|
||||
if _, ok := w.registry[name]; ok {
|
||||
panic(fmt.Sprintf("job %s already registered", name))
|
||||
}
|
||||
w.registry[name] = j
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) QueueJob(ctx context.Context, job *fleet.Job) (*fleet.Job, error) {
|
||||
job.State = fleet.JobStateQueued
|
||||
return w.ds.NewJob(ctx, job)
|
||||
}
|
||||
|
||||
// ProcessJobs processes all queued jobs.
|
||||
func (w *Worker) ProcessJobs(ctx context.Context) error {
|
||||
|
||||
// process jobs until there are none left
|
||||
maxNumJobs := 100
|
||||
for {
|
||||
jobs, err := w.ds.GetQueuedJobs(ctx, maxNumJobs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get jobs: %w", err)
|
||||
}
|
||||
|
||||
if len(jobs) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
log := kitlog.With(w.log, "job_id", job.ID)
|
||||
|
||||
level.Debug(log).Log("msg", "processing job")
|
||||
|
||||
err := w.processJob(ctx, job)
|
||||
if err != nil {
|
||||
level.Error(log).Log("msg", "job failed", "err", err)
|
||||
job.Error = err.Error()
|
||||
if job.Retries < maxRetries {
|
||||
level.Debug(log).Log("msg", "retrying job")
|
||||
job.Retries += 1
|
||||
} else {
|
||||
job.State = fleet.JobStateFailure
|
||||
}
|
||||
} else {
|
||||
job.State = fleet.JobStateSuccess
|
||||
job.Error = ""
|
||||
}
|
||||
|
||||
_, err = w.ds.UpdateJob(ctx, job.ID, job)
|
||||
if err != nil {
|
||||
level.Error(log).Log("update job", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) processJob(ctx context.Context, job *fleet.Job) error {
|
||||
j, ok := w.registry[job.Name]
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown job: %s", job.Name)
|
||||
}
|
||||
|
||||
var args json.RawMessage
|
||||
if job.Args != nil {
|
||||
args = *job.Args
|
||||
}
|
||||
|
||||
return j.Run(ctx, args)
|
||||
}
|
||||
141
server/worker/worker_test.go
Normal file
141
server/worker/worker_test.go
Normal file
|
|
@ -0,0 +1,141 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/fleetdm/fleet/v4/server/mock"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tj/assert"
|
||||
)
|
||||
|
||||
type testJob struct {
|
||||
name string
|
||||
run func(ctx context.Context, argsJSON json.RawMessage) error
|
||||
}
|
||||
|
||||
func (t testJob) Name() string {
|
||||
return t.name
|
||||
}
|
||||
|
||||
func (t testJob) Run(ctx context.Context, argsJSON json.RawMessage) error {
|
||||
return t.run(ctx, argsJSON)
|
||||
}
|
||||
|
||||
func TestWorker(t *testing.T) {
|
||||
ds := new(mock.Store)
|
||||
|
||||
// set up mocks
|
||||
getQueuedJobsCalled := 0
|
||||
ds.GetQueuedJobsFunc = func(ctx context.Context, maxNumJobs int) ([]*fleet.Job, error) {
|
||||
if getQueuedJobsCalled > 0 {
|
||||
return nil, nil
|
||||
}
|
||||
getQueuedJobsCalled++
|
||||
|
||||
argsJSON := json.RawMessage(`{"arg1":"foo"}`)
|
||||
return []*fleet.Job{
|
||||
{
|
||||
ID: 1,
|
||||
Name: "test",
|
||||
Args: &argsJSON,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
ds.UpdateJobFunc = func(ctx context.Context, id uint, job *fleet.Job) (*fleet.Job, error) {
|
||||
assert.Equal(t, fleet.JobStateSuccess, job.State)
|
||||
return job, nil
|
||||
}
|
||||
|
||||
logger := kitlog.NewNopLogger()
|
||||
w := NewWorker(ds, logger)
|
||||
|
||||
// register a test job
|
||||
jobCalled := false
|
||||
j := testJob{
|
||||
name: "test",
|
||||
run: func(ctx context.Context, argsJSON json.RawMessage) error {
|
||||
jobCalled = true
|
||||
|
||||
assert.Equal(t, json.RawMessage(`{"arg1":"foo"}`), argsJSON)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
w.Register(j)
|
||||
|
||||
err := w.ProcessJobs(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, ds.GetQueuedJobsFuncInvoked)
|
||||
require.True(t, ds.UpdateJobFuncInvoked)
|
||||
|
||||
require.True(t, jobCalled)
|
||||
}
|
||||
|
||||
func TestWorkerRetries(t *testing.T) {
|
||||
ds := new(mock.Store)
|
||||
|
||||
// set up mocks
|
||||
getQueuedJobsCalled := 0
|
||||
ds.GetQueuedJobsFunc = func(ctx context.Context, maxNumJobs int) ([]*fleet.Job, error) {
|
||||
|
||||
// don't return any jobs once its been called 5 (maxRetries) times
|
||||
if getQueuedJobsCalled > maxRetries {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
argsJSON := json.RawMessage(`{"arg1":"foo"}`)
|
||||
jobs := []*fleet.Job{
|
||||
{
|
||||
ID: 1,
|
||||
Name: "test",
|
||||
Args: &argsJSON,
|
||||
State: fleet.JobStateQueued,
|
||||
Retries: getQueuedJobsCalled,
|
||||
},
|
||||
}
|
||||
|
||||
getQueuedJobsCalled++
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
jobFailed := false
|
||||
ds.UpdateJobFunc = func(ctx context.Context, id uint, job *fleet.Job) (*fleet.Job, error) {
|
||||
assert.Equal(t, "unknown error", job.Error)
|
||||
if job.State == fleet.JobStateFailure {
|
||||
jobFailed = true
|
||||
assert.Equal(t, maxRetries, job.Retries)
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
logger := kitlog.NewNopLogger()
|
||||
w := NewWorker(ds, logger)
|
||||
|
||||
// register a test job
|
||||
jobCalled := 0
|
||||
j := testJob{
|
||||
name: "test",
|
||||
run: func(ctx context.Context, argsJSON json.RawMessage) error {
|
||||
jobCalled++
|
||||
return errors.New("unknown error")
|
||||
},
|
||||
}
|
||||
w.Register(j)
|
||||
|
||||
err := w.ProcessJobs(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, ds.GetQueuedJobsFuncInvoked)
|
||||
require.True(t, ds.UpdateJobFuncInvoked)
|
||||
|
||||
require.Equal(t, maxRetries+1, jobCalled)
|
||||
require.True(t, jobFailed)
|
||||
}
|
||||
Loading…
Reference in a new issue