Better jitter (#3716)

* Better jitter

* Fix lint

* Use milliseconds

* Make duration milliseconds

* Update based on Lucas' suggestion

* Add changes file

* Panic on error

* Fix compilation error
This commit is contained in:
Tomas Touceda 2022-01-18 11:29:57 -03:00 committed by GitHub
parent dea23356de
commit b47cf3d2d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 21 deletions

View file

@ -0,0 +1 @@
* Improve how fleet distributes hosts checking in to balance the load on the server.

View file

@ -280,7 +280,11 @@ the way that the Fleet server works.
RedisScanKeysCount: config.Osquery.AsyncHostRedisScanKeysCount,
CollectorInterval: config.Osquery.AsyncHostCollectInterval,
}
svc, err := service.NewService(ds, task, resultStore, logger, osqueryLogger, config, mailService, clock.C, ssoSessionStore, liveQueryStore, carveStore, *license, failingPolicySet)
// TODO: gather all the different contexts and use just one
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
svc, err := service.NewService(ctx, ds, task, resultStore, logger, osqueryLogger, config, mailService, clock.C, ssoSessionStore, liveQueryStore, carveStore, *license, failingPolicySet)
if err != nil {
initFatal(err, "initializing service")
}
@ -370,9 +374,6 @@ the way that the Fleet server works.
// Instantiate a gRPC service to handle launcher requests.
launcher := launcher.New(svc, logger, grpc.NewServer(), healthCheckers)
// TODO: gather all the different contexts and use just one
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
eh := errorstore.NewHandler(ctx, redisPool, logger, config.Logging.ErrorRetentionPeriod)
rootMux := http.NewServeMux()

View file

@ -3,9 +3,15 @@
package service
import (
"context"
"crypto/rand"
"fmt"
"html/template"
"math"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/WatchBeam/clock"
"github.com/fleetdm/fleet/v4/server/authz"
@ -39,10 +45,13 @@ type Service struct {
failingPolicySet fleet.FailingPolicySet
authz *authz.Authorizer
jitterSeed int64
}
// NewService creates a new service from the config struct
func NewService(
ctx context.Context,
ds fleet.Datastore,
task *async.Task,
resultStore fleet.QueryResultStore,
@ -57,14 +66,12 @@ func NewService(
license fleet.LicenseInfo,
failingPolicySet fleet.FailingPolicySet,
) (fleet.Service, error) {
var svc fleet.Service
authorizer, err := authz.NewAuthorizer()
if err != nil {
return nil, fmt.Errorf("new authorizer: %w", err)
}
svc = &Service{
svc := &Service{
ds: ds,
task: task,
carveStore: carveStore,
@ -81,8 +88,36 @@ func NewService(
failingPolicySet: failingPolicySet,
authz: authorizer,
}
svc = validationMiddleware{svc, ds, sso}
return svc, nil
// Try setting a first seed
svc.updateJitterSeedRand()
go svc.updateJitterSeed(ctx)
return validationMiddleware{svc, ds, sso}, nil
}
func (s *Service) updateJitterSeedRand() {
nBig, err := rand.Int(rand.Reader, big.NewInt(math.MaxInt))
if err != nil {
panic(err)
}
n := nBig.Int64()
atomic.StoreInt64(&s.jitterSeed, n)
}
func (s *Service) updateJitterSeed(ctx context.Context) {
for {
select {
case <-time.Tick(1 * time.Hour):
s.updateJitterSeedRand()
case <-ctx.Done():
return
}
}
}
func (s *Service) getJitterSeed() int64 {
return atomic.LoadInt64(&s.jitterSeed)
}
func (s Service) SendEmail(mail fleet.Email) error {

View file

@ -2,11 +2,9 @@ package service
import (
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"sync/atomic"
@ -413,7 +411,7 @@ const hostDistributedQueryPrefix = "fleet_distributed_query_"
// detailQueriesForHost returns the map of detail+additional queries that should be executed by
// osqueryd to fill in the host details.
func (svc *Service) detailQueriesForHost(ctx context.Context, host *fleet.Host) (map[string]string, error) {
if !svc.shouldUpdate(host.DetailUpdatedAt, svc.config.Osquery.DetailUpdateInterval) && !host.RefetchRequested {
if !svc.shouldUpdate(host.DetailUpdatedAt, svc.config.Osquery.DetailUpdateInterval, host.ID) && !host.RefetchRequested {
return nil, nil
}
@ -447,14 +445,11 @@ func (svc *Service) detailQueriesForHost(ctx context.Context, host *fleet.Host)
return queries, nil
}
func (svc *Service) shouldUpdate(lastUpdated time.Time, interval time.Duration) bool {
func (svc *Service) shouldUpdate(lastUpdated time.Time, interval time.Duration, hostID uint) bool {
var jitter time.Duration
if svc.config.Osquery.MaxJitterPercent > 0 {
maxJitter := time.Duration(svc.config.Osquery.MaxJitterPercent) * interval / time.Duration(100.0)
randDuration, err := rand.Int(rand.Reader, big.NewInt(int64(maxJitter)))
if err == nil {
jitter = time.Duration(randDuration.Int64())
}
maxJitter := int64(svc.config.Osquery.MaxJitterPercent) * int64(interval) / 100.0
jitter = time.Duration((int64(hostID) + svc.getJitterSeed()) % maxJitter)
}
cutoff := svc.clock.Now().Add(-(interval + jitter))
return lastUpdated.Before(cutoff)
@ -462,7 +457,7 @@ func (svc *Service) shouldUpdate(lastUpdated time.Time, interval time.Duration)
func (svc *Service) labelQueriesForHost(ctx context.Context, host *fleet.Host) (map[string]string, error) {
labelReportedAt := svc.task.GetHostLabelReportedAt(ctx, host)
if !svc.shouldUpdate(labelReportedAt, svc.config.Osquery.LabelUpdateInterval) && !host.RefetchRequested {
if !svc.shouldUpdate(labelReportedAt, svc.config.Osquery.LabelUpdateInterval, host.ID) && !host.RefetchRequested {
return nil, nil
}
labelQueries, err := svc.ds.LabelQueriesForHost(ctx, host)
@ -473,7 +468,7 @@ func (svc *Service) labelQueriesForHost(ctx context.Context, host *fleet.Host) (
}
func (svc *Service) policyQueriesForHost(ctx context.Context, host *fleet.Host) (map[string]string, error) {
if !svc.shouldUpdate(host.PolicyUpdatedAt, svc.config.Osquery.PolicyUpdateInterval) && !host.RefetchRequested {
if !svc.shouldUpdate(host.PolicyUpdatedAt, svc.config.Osquery.PolicyUpdateInterval, host.ID) && !host.RefetchRequested {
return nil, nil
}
policyQueries, err := svc.ds.PolicyQueriesForHost(ctx, host)

View file

@ -70,7 +70,7 @@ func newTestServiceWithConfig(ds fleet.Datastore, fleetConfig config.FleetConfig
Datastore: ds,
AsyncEnabled: false,
}
svc, err := NewService(ds, task, rs, logger, osqlogger, fleetConfig, mailer, c, ssoStore, lq, ds, *license, failingPolicySet)
svc, err := NewService(context.Background(), ds, task, rs, logger, osqlogger, fleetConfig, mailer, c, ssoStore, lq, ds, *license, failingPolicySet)
if err != nil {
panic(err)
}