fleet/server/worker/worker.go
Victor Lyuboslavsky 763fbf318d
Migrating server/worker and related code to slog (#40205)
<!-- Add the related story/sub-task/bug number, like Resolves #123, or
remove if NA -->
**Related issue:** Resolves #40054

# Checklist for submitter

- [x] Changes file added for user-visible changes in `changes/`,
`orbit/changes/` or `ee/fleetd-chrome/changes`.

## Testing

- [x] Added/updated automated tests
- [x] QA'd all new/changed functionality manually

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

## Release Notes

* **Refactor**
* Updated logging infrastructure across background jobs and worker
services to use standardized structured logging, improving consistency
and log output formatting across the system.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2026-02-23 13:18:07 -06:00

265 lines
7.6 KiB
Go

package worker
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/fleet"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const (
maxRetries = 5
// nvdCVEURL is the base link to a CVE on the NVD website, only the CVE code
// needs to be appended to make it a valid link.
nvdCVEURL = "https://nvd.nist.gov/vuln/detail/"
)
const (
// types of integrations - jobs like Jira and Zendesk support different
// integrations, this identifies the integration type of a message.
intgTypeVuln = "vuln"
intgTypeFailingPolicy = "failingPolicy"
)
// Job defines an interface for jobs that can be run by the Worker
type Job interface {
// Name is the unique name of the job.
Name() string
// Run performs the actual work.
Run(ctx context.Context, argsJSON json.RawMessage) error
}
// failingPolicyArgs are the args common to all integrations that can process
// failing policies.
type failingPolicyArgs struct {
PolicyID uint `json:"policy_id"`
PolicyName string `json:"policy_name"`
PolicyCritical bool `json:"policy_critical"`
Hosts []fleet.PolicySetHost `json:"hosts"`
TeamID *uint `json:"team_id,omitempty"`
}
// vulnArgs are the args common to all integrations that can process
// vulnerabilities.
type vulnArgs struct {
CVE string `json:"cve,omitempty"`
AffectedSoftwareIDs []uint `json:"affected_software,omitempty"`
EPSSProbability *float64 `json:"epss_probability,omitempty"` // Premium feature only
CVSSScore *float64 `json:"cvss_score,omitempty"` // Premium feature only
CISAKnownExploit *bool `json:"cisa_known_exploit,omitempty"` // Premium feature only
CVEPublished *time.Time `json:"cve_published,omitempty"` // Premium feature only
}
// Worker runs jobs. NOT SAFE FOR CONCURRENT USE.
type Worker struct {
ds fleet.Datastore
log *slog.Logger
// For tests only, allows ignoring unknown jobs instead of failing them.
TestIgnoreUnknownJobs bool
// delayPerRetry defines the delays between retries. If nil, the default
// delays are used.
delayPerRetry []time.Duration
registry map[string]Job
}
func NewWorker(ds fleet.Datastore, log *slog.Logger) *Worker {
return &Worker{
ds: ds,
log: log,
registry: make(map[string]Job),
}
}
func (w *Worker) Register(jobs ...Job) {
for _, j := range jobs {
name := j.Name()
if _, ok := w.registry[name]; ok {
panic(fmt.Sprintf("job %s already registered", name))
}
w.registry[name] = j
}
}
// QueueJob inserts a job to be processed by the worker for the job processor
// identified by the name (e.g. "jira"). The args value is marshaled as JSON
// and provided to the job processor when the job is executed.
func QueueJob(ctx context.Context, ds fleet.Datastore, name string, args interface{}) (*fleet.Job, error) {
return QueueJobWithDelay(ctx, ds, name, args, 0)
}
// QueueJobWithDelay is like QueueJob but does not make the job available
// before a specified delay (or no delay if delay is <= 0).
func QueueJobWithDelay(ctx context.Context, ds fleet.Datastore, name string, args interface{}, delay time.Duration) (*fleet.Job, error) {
argsJSON, err := json.Marshal(args)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "marshal args")
}
var notBefore time.Time
if delay > 0 {
notBefore = time.Now().UTC().Add(delay)
}
job := &fleet.Job{
Name: name,
Args: (*json.RawMessage)(&argsJSON),
State: fleet.JobStateQueued,
NotBefore: notBefore,
}
return ds.NewJob(ctx, job)
}
// defaultDelayPerRetry defines the delays to add between retries (i.e. how
// the "not_before" timestamp of a job will be set for the next run). Keep in
// mind that at a minimum, the job will not be retried before the next cron run
// of the worker, but we want to ensure a minimum delay before retries to give
// a chance to e.g. transient network issues to resolve themselves.
var defaultDelayPerRetry = []time.Duration{
1: 0, // i.e. for the first retry, do it ASAP (on the next worker run)
2: 5 * time.Minute,
3: 10 * time.Minute,
4: 1 * time.Hour,
5: 2 * time.Hour,
}
func (w *Worker) jobNames() []string {
// Get the names of the jobs in the registry
jobNames := make([]string, 0, len(w.registry))
for name := range w.registry {
jobNames = append(jobNames, name)
}
return jobNames
}
// ProcessJobs processes all queued jobs.
func (w *Worker) ProcessJobs(ctx context.Context) error {
const maxNumJobs = 100
jobNames := w.jobNames()
if len(jobNames) == 0 {
w.log.InfoContext(ctx, "no jobs registered, nothing to process")
return nil
}
// process jobs until there are none left or the context is cancelled
seen := make(map[uint]struct{})
for {
jobs, err := w.ds.GetFilteredQueuedJobs(ctx, maxNumJobs, time.Time{}, jobNames)
if err != nil {
return ctxerr.Wrap(ctx, err, "get queued jobs")
}
if len(jobs) == 0 {
break
}
for _, job := range jobs {
select {
case <-ctx.Done():
return ctxerr.Wrap(ctx, ctx.Err(), "context done")
default:
}
log := w.log.With("job_id", job.ID)
if _, ok := seen[job.ID]; ok {
log.DebugContext(ctx, "some jobs failed, retrying on next cron execution")
return nil
}
seen[job.ID] = struct{}{}
log.DebugContext(ctx, "processing job")
if err := w.processJob(ctx, job); err != nil {
log.ErrorContext(ctx, "process job", "err", err)
job.Error = err.Error()
if job.Retries < maxRetries {
log.DebugContext(ctx, "will retry job")
job.Retries += 1
delays := w.delayPerRetry
if delays == nil {
delays = defaultDelayPerRetry
}
if job.Retries < len(delays) {
job.NotBefore = time.Now().UTC().Add(delays[job.Retries])
}
} else {
job.State = fleet.JobStateFailure
}
} else {
job.State = fleet.JobStateSuccess
job.Error = ""
}
// When we update the job, the updated_at timestamp gets updated and the job gets "pushed" to the back
// of queue. GetQueuedJobs fetches jobs by updated_at, so it will not return the same job until the queue
// has been processed once.
if _, err := w.ds.UpdateJob(ctx, job.ID, job); err != nil {
log.ErrorContext(ctx, "update job", "err", err)
}
}
}
return nil
}
func (w *Worker) processJob(ctx context.Context, job *fleet.Job) error {
// Create OTEL span for job processing (parent span should be: cron.scheduled_tick.integrations)
ctx, span := otel.Tracer("github.com/fleetdm/fleet/v4/server/worker").Start(ctx, fmt.Sprintf("worker.process_job.%s", job.Name),
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(
attribute.Int64("job.id", int64(job.ID)), // nolint:gosec,G115
),
)
defer span.End()
j, ok := w.registry[job.Name]
if !ok {
if w.TestIgnoreUnknownJobs {
return nil
}
return ctxerr.Errorf(ctx, "unknown job: %s", job.Name)
}
var args json.RawMessage
if job.Args != nil {
args = *job.Args
}
err := j.Run(ctx, args)
if err != nil {
span.RecordError(err)
}
return err
}
type failingPoliciesTplArgs struct {
FleetURL string
PolicyID uint
PolicyName string
PolicyCritical bool
TeamID *uint
Hosts []fleet.PolicySetHost
}
func newFailingPoliciesTplArgs(fleetURL string, args *failingPolicyArgs) *failingPoliciesTplArgs {
return &failingPoliciesTplArgs{
FleetURL: fleetURL,
PolicyName: args.PolicyName,
PolicyID: args.PolicyID,
PolicyCritical: args.PolicyCritical,
TeamID: args.TeamID,
Hosts: args.Hosts,
}
}