diff --git a/changes/issue-3697-better-jitter b/changes/issue-3697-better-jitter new file mode 100644 index 0000000000..e866b42027 --- /dev/null +++ b/changes/issue-3697-better-jitter @@ -0,0 +1 @@ +* Improve how fleet distributes hosts checking in to balance the load on the server. diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index 9d5c9cc22e..0938e75750 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -280,7 +280,11 @@ the way that the Fleet server works. RedisScanKeysCount: config.Osquery.AsyncHostRedisScanKeysCount, CollectorInterval: config.Osquery.AsyncHostCollectInterval, } - svc, err := service.NewService(ds, task, resultStore, logger, osqueryLogger, config, mailService, clock.C, ssoSessionStore, liveQueryStore, carveStore, *license, failingPolicySet) + + // TODO: gather all the different contexts and use just one + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + svc, err := service.NewService(ctx, ds, task, resultStore, logger, osqueryLogger, config, mailService, clock.C, ssoSessionStore, liveQueryStore, carveStore, *license, failingPolicySet) if err != nil { initFatal(err, "initializing service") } @@ -370,9 +374,6 @@ the way that the Fleet server works. // Instantiate a gRPC service to handle launcher requests. launcher := launcher.New(svc, logger, grpc.NewServer(), healthCheckers) - // TODO: gather all the different contexts and use just one - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() eh := errorstore.NewHandler(ctx, redisPool, logger, config.Logging.ErrorRetentionPeriod) rootMux := http.NewServeMux() diff --git a/server/service/service.go b/server/service/service.go index 5cecb5b776..5c2089821c 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -3,9 +3,15 @@ package service import ( + "context" + "crypto/rand" "fmt" "html/template" + "math" + "math/big" "sync" + "sync/atomic" + "time" "github.com/WatchBeam/clock" "github.com/fleetdm/fleet/v4/server/authz" @@ -39,10 +45,13 @@ type Service struct { failingPolicySet fleet.FailingPolicySet authz *authz.Authorizer + + jitterSeed int64 } // NewService creates a new service from the config struct func NewService( + ctx context.Context, ds fleet.Datastore, task *async.Task, resultStore fleet.QueryResultStore, @@ -57,14 +66,12 @@ func NewService( license fleet.LicenseInfo, failingPolicySet fleet.FailingPolicySet, ) (fleet.Service, error) { - var svc fleet.Service - authorizer, err := authz.NewAuthorizer() if err != nil { return nil, fmt.Errorf("new authorizer: %w", err) } - svc = &Service{ + svc := &Service{ ds: ds, task: task, carveStore: carveStore, @@ -81,8 +88,36 @@ func NewService( failingPolicySet: failingPolicySet, authz: authorizer, } - svc = validationMiddleware{svc, ds, sso} - return svc, nil + + // Try setting a first seed + svc.updateJitterSeedRand() + go svc.updateJitterSeed(ctx) + + return validationMiddleware{svc, ds, sso}, nil +} + +func (s *Service) updateJitterSeedRand() { + nBig, err := rand.Int(rand.Reader, big.NewInt(math.MaxInt)) + if err != nil { + panic(err) + } + n := nBig.Int64() + atomic.StoreInt64(&s.jitterSeed, n) +} + +func (s *Service) updateJitterSeed(ctx context.Context) { + for { + select { + case <-time.Tick(1 * time.Hour): + s.updateJitterSeedRand() + case <-ctx.Done(): + return + } + } +} + +func (s *Service) getJitterSeed() int64 { + return atomic.LoadInt64(&s.jitterSeed) } func (s Service) SendEmail(mail fleet.Email) error { diff --git a/server/service/service_osquery.go b/server/service/service_osquery.go index d05ede3453..3ec8773674 100644 --- a/server/service/service_osquery.go +++ b/server/service/service_osquery.go @@ -2,11 +2,9 @@ package service import ( "context" - "crypto/rand" "encoding/json" "errors" "fmt" - "math/big" "strconv" "strings" "sync/atomic" @@ -413,7 +411,7 @@ const hostDistributedQueryPrefix = "fleet_distributed_query_" // detailQueriesForHost returns the map of detail+additional queries that should be executed by // osqueryd to fill in the host details. func (svc *Service) detailQueriesForHost(ctx context.Context, host *fleet.Host) (map[string]string, error) { - if !svc.shouldUpdate(host.DetailUpdatedAt, svc.config.Osquery.DetailUpdateInterval) && !host.RefetchRequested { + if !svc.shouldUpdate(host.DetailUpdatedAt, svc.config.Osquery.DetailUpdateInterval, host.ID) && !host.RefetchRequested { return nil, nil } @@ -447,14 +445,11 @@ func (svc *Service) detailQueriesForHost(ctx context.Context, host *fleet.Host) return queries, nil } -func (svc *Service) shouldUpdate(lastUpdated time.Time, interval time.Duration) bool { +func (svc *Service) shouldUpdate(lastUpdated time.Time, interval time.Duration, hostID uint) bool { var jitter time.Duration if svc.config.Osquery.MaxJitterPercent > 0 { - maxJitter := time.Duration(svc.config.Osquery.MaxJitterPercent) * interval / time.Duration(100.0) - randDuration, err := rand.Int(rand.Reader, big.NewInt(int64(maxJitter))) - if err == nil { - jitter = time.Duration(randDuration.Int64()) - } + maxJitter := int64(svc.config.Osquery.MaxJitterPercent) * int64(interval) / 100.0 + jitter = time.Duration((int64(hostID) + svc.getJitterSeed()) % maxJitter) } cutoff := svc.clock.Now().Add(-(interval + jitter)) return lastUpdated.Before(cutoff) @@ -462,7 +457,7 @@ func (svc *Service) shouldUpdate(lastUpdated time.Time, interval time.Duration) func (svc *Service) labelQueriesForHost(ctx context.Context, host *fleet.Host) (map[string]string, error) { labelReportedAt := svc.task.GetHostLabelReportedAt(ctx, host) - if !svc.shouldUpdate(labelReportedAt, svc.config.Osquery.LabelUpdateInterval) && !host.RefetchRequested { + if !svc.shouldUpdate(labelReportedAt, svc.config.Osquery.LabelUpdateInterval, host.ID) && !host.RefetchRequested { return nil, nil } labelQueries, err := svc.ds.LabelQueriesForHost(ctx, host) @@ -473,7 +468,7 @@ func (svc *Service) labelQueriesForHost(ctx context.Context, host *fleet.Host) ( } func (svc *Service) policyQueriesForHost(ctx context.Context, host *fleet.Host) (map[string]string, error) { - if !svc.shouldUpdate(host.PolicyUpdatedAt, svc.config.Osquery.PolicyUpdateInterval) && !host.RefetchRequested { + if !svc.shouldUpdate(host.PolicyUpdatedAt, svc.config.Osquery.PolicyUpdateInterval, host.ID) && !host.RefetchRequested { return nil, nil } policyQueries, err := svc.ds.PolicyQueriesForHost(ctx, host) diff --git a/server/service/testing_utils.go b/server/service/testing_utils.go index 537b01105d..0b430327cf 100644 --- a/server/service/testing_utils.go +++ b/server/service/testing_utils.go @@ -70,7 +70,7 @@ func newTestServiceWithConfig(ds fleet.Datastore, fleetConfig config.FleetConfig Datastore: ds, AsyncEnabled: false, } - svc, err := NewService(ds, task, rs, logger, osqlogger, fleetConfig, mailer, c, ssoStore, lq, ds, *license, failingPolicySet) + svc, err := NewService(context.Background(), ds, task, rs, logger, osqlogger, fleetConfig, mailer, c, ssoStore, lq, ds, *license, failingPolicySet) if err != nil { panic(err) }