fleet/cmd/fleet/serve.go
Juan Fernandez 3df6449426
API endpoints initial models (#42881)
**Related issue:** Resolves #42881

- Added user_api_endpoints table to track per user API endpoint
permissions.
- Added service/api_endpoints, used to handle service/api_endpoints.yml
artifact.
- Added check on server start that makes sure that
service/apin_endpoints.yml is a subset of router routes.
2026-04-07 10:40:39 -04:00

2188 lines
83 KiB
Go

package main
import (
"context"
"crypto"
"crypto/sha256"
"crypto/subtle"
"crypto/tls"
"crypto/x509"
"database/sql/driver"
"errors"
"fmt"
"log/slog"
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"
"github.com/WatchBeam/clock"
"github.com/e-dard/netbug"
"github.com/fleetdm/fleet/v4/cmd/fleetctl/fleetctl"
"github.com/fleetdm/fleet/v4/ee/server/licensing"
"github.com/fleetdm/fleet/v4/ee/server/scim"
eeservice "github.com/fleetdm/fleet/v4/ee/server/service"
"github.com/fleetdm/fleet/v4/ee/server/service/condaccess"
"github.com/fleetdm/fleet/v4/ee/server/service/digicert"
"github.com/fleetdm/fleet/v4/ee/server/service/est"
"github.com/fleetdm/fleet/v4/ee/server/service/hostidentity"
"github.com/fleetdm/fleet/v4/ee/server/service/hostidentity/httpsig"
"github.com/fleetdm/fleet/v4/ee/server/service/scep"
"github.com/fleetdm/fleet/v4/pkg/fleethttp"
"github.com/fleetdm/fleet/v4/pkg/scripts"
"github.com/fleetdm/fleet/v4/pkg/str"
"github.com/fleetdm/fleet/v4/server"
"github.com/fleetdm/fleet/v4/server/acl/acmeacl"
"github.com/fleetdm/fleet/v4/server/acl/activityacl"
activity_api "github.com/fleetdm/fleet/v4/server/activity/api"
activity_bootstrap "github.com/fleetdm/fleet/v4/server/activity/bootstrap"
"github.com/fleetdm/fleet/v4/server/authz"
configpkg "github.com/fleetdm/fleet/v4/server/config"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/contexts/installersize"
licensectx "github.com/fleetdm/fleet/v4/server/contexts/license"
"github.com/fleetdm/fleet/v4/server/cron"
"github.com/fleetdm/fleet/v4/server/datastore/cached_mysql"
"github.com/fleetdm/fleet/v4/server/datastore/failing"
"github.com/fleetdm/fleet/v4/server/datastore/filesystem"
"github.com/fleetdm/fleet/v4/server/datastore/mysql"
"github.com/fleetdm/fleet/v4/server/datastore/mysqlredis"
"github.com/fleetdm/fleet/v4/server/datastore/redis"
"github.com/fleetdm/fleet/v4/server/datastore/s3"
"github.com/fleetdm/fleet/v4/server/dev_mode"
"github.com/fleetdm/fleet/v4/server/errorstore"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/health"
"github.com/fleetdm/fleet/v4/server/launcher"
"github.com/fleetdm/fleet/v4/server/live_query"
"github.com/fleetdm/fleet/v4/server/logging"
"github.com/fleetdm/fleet/v4/server/mail"
"github.com/fleetdm/fleet/v4/server/mdm/acme"
acme_api "github.com/fleetdm/fleet/v4/server/mdm/acme/api"
acme_bootstrap "github.com/fleetdm/fleet/v4/server/mdm/acme/bootstrap"
android_service "github.com/fleetdm/fleet/v4/server/mdm/android/service"
apple_mdm "github.com/fleetdm/fleet/v4/server/mdm/apple"
"github.com/fleetdm/fleet/v4/server/mdm/apple/apple_apps"
"github.com/fleetdm/fleet/v4/server/mdm/cryptoutil"
microsoft_mdm "github.com/fleetdm/fleet/v4/server/mdm/microsoft"
"github.com/fleetdm/fleet/v4/server/mdm/nanomdm/push"
"github.com/fleetdm/fleet/v4/server/mdm/nanomdm/push/buford"
nanomdm_pushsvc "github.com/fleetdm/fleet/v4/server/mdm/nanomdm/push/service"
scepdepot "github.com/fleetdm/fleet/v4/server/mdm/scep/depot"
"github.com/fleetdm/fleet/v4/server/platform/endpointer"
platform_http "github.com/fleetdm/fleet/v4/server/platform/http"
platform_logging "github.com/fleetdm/fleet/v4/server/platform/logging"
common_mysql "github.com/fleetdm/fleet/v4/server/platform/mysql"
"github.com/fleetdm/fleet/v4/server/pubsub"
"github.com/fleetdm/fleet/v4/server/service"
"github.com/fleetdm/fleet/v4/server/service/async"
"github.com/fleetdm/fleet/v4/server/service/conditional_access_microsoft_proxy"
"github.com/fleetdm/fleet/v4/server/service/middleware/auth"
"github.com/fleetdm/fleet/v4/server/service/middleware/log"
otelmw "github.com/fleetdm/fleet/v4/server/service/middleware/otel"
"github.com/fleetdm/fleet/v4/server/service/redis_key_value"
"github.com/fleetdm/fleet/v4/server/service/redis_lock"
"github.com/fleetdm/fleet/v4/server/service/redis_policy_set"
"github.com/fleetdm/fleet/v4/server/service/schedule"
"github.com/fleetdm/fleet/v4/server/sso"
"github.com/fleetdm/fleet/v4/server/version"
"github.com/getsentry/sentry-go"
"github.com/go-kit/kit/endpoint"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/google/uuid"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/ngrok/sqlmw"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"github.com/throttled/throttled/v2"
"go.elastic.co/apm/module/apmhttp/v2"
_ "go.elastic.co/apm/module/apmsql/v2"
_ "go.elastic.co/apm/module/apmsql/v2/mysql"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
otelsdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.39.0"
"google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip" // Because we use gzip compression for OTLP
)
var allowedURLPrefixRegexp = regexp.MustCompile("^(?:/[a-zA-Z0-9_.~-]+)+$")
const (
liveQueryMemCacheDuration = 1 * time.Second
)
type initializer interface {
// Initialize is used to populate a datastore with
// preloaded data
Initialize() error
}
func createServeCmd(configManager configpkg.Manager) *cobra.Command {
// Whether to enable the debug endpoints
debug := false
// Whether to enable development Fleet Premium license
devLicense := false
// Whether to enable development Fleet Premium license with an expired license
devExpiredLicense := false
serveCmd := &cobra.Command{
Use: "serve",
Short: "Launch the Fleet server",
Long: `
Launch the Fleet server
Use fleet serve to run the main HTTPS server. The Fleet server bundles
together all static assets and dependent libraries into a statically linked go
binary (which you're executing right now). Use the options below to customize
the way that the Fleet server works.
`,
// runServeCmd is a named function so that NilAway can analyze it for nil-safety.
Run: func(cmd *cobra.Command, args []string) {
runServeCmd(cmd, configManager, debug, devLicense, devExpiredLicense)
},
}
serveCmd.PersistentFlags().BoolVar(&debug, "debug", false, "Enable debug endpoints")
serveCmd.PersistentFlags().BoolVar(&dev_mode.IsEnabled, "dev", false, "Enable developer options")
serveCmd.PersistentFlags().BoolVar(&devLicense, "dev_license", false, "Enable development license")
serveCmd.PersistentFlags().BoolVar(&devExpiredLicense, "dev_expired_license", false, "Enable expired development license")
return serveCmd
}
// runServeCmd is a named function so that NilAway can analyze it for nil-safety.
func runServeCmd(cmd *cobra.Command, configManager configpkg.Manager, debug, devLicense, devExpiredLicense bool) {
config := configManager.LoadConfig()
if dev_mode.IsEnabled {
applyDevFlags(&config)
}
license, err := initLicense(&config, devLicense, devExpiredLicense)
if err != nil {
initFatal(
err,
"failed to load license - for help use https://fleetdm.com/contact",
)
}
if license != nil && license.IsPremium() && license.IsExpired() {
fleet.WriteExpiredLicenseBanner(os.Stderr)
}
// Validate OTEL server options
if config.Logging.OtelLogsEnabled && !config.Logging.TracingEnabled {
initFatal(
errors.New("logging.otel_logs_enabled requires logging.tracing_enabled to be true"),
"OTEL logs require tracing for trace correlation",
)
}
// Init OTEL providers (traces, metrics, logs)
var loggerProvider *otelsdklog.LoggerProvider
var tracerProvider *sdktrace.TracerProvider
var meterProvider *sdkmetric.MeterProvider
if config.OTELEnabled() {
// Create shared resource with service identification attributes.
// OTEL_SERVICE_NAME and OTEL_RESOURCE_ATTRIBUTES env vars can override
// the defaults below.
res, err := resource.New(context.Background(),
resource.WithSchemaURL(semconv.SchemaURL),
resource.WithAttributes(
semconv.ServiceName("fleet"),
semconv.ServiceVersion(version.Version().Version),
),
resource.WithFromEnv(),
resource.WithTelemetrySDK(),
)
if err != nil {
initFatal(err, "Failed to create OTEL resource")
}
// Initialize OTEL traces
otlpTraceExporter, err := otlptrace.New(context.Background(), otlptracegrpc.NewClient(
otlptracegrpc.WithCompressor("gzip"),
))
if err != nil {
initFatal(err, "Failed to initialize OTEL trace exporter")
}
// Configure batch span processor with smaller batch size to avoid exceeding message size limits (4MB default limit)
batchSpanProcessor := sdktrace.NewBatchSpanProcessor(otlpTraceExporter,
sdktrace.WithMaxExportBatchSize(256), // Reduce from default 512 to 256
)
tracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(batchSpanProcessor),
)
otel.SetTracerProvider(tracerProvider)
// Initialize OTEL metrics
metricExporter, err := otlpmetricgrpc.New(context.Background(),
otlpmetricgrpc.WithCompressor("gzip"),
)
if err != nil {
initFatal(err, "Failed to initialize OTEL metrics exporter")
}
// Create views to rename otelsql metrics to match what OpenTelemetry Signoz expects
// Reference: https://opentelemetry.io/docs/specs/semconv/db/database-metrics/
dbMetricViews := []sdkmetric.View{
sdkmetric.NewView(
sdkmetric.Instrument{Name: "db.sql.connection.open"},
sdkmetric.Stream{Name: "db.client.connection.count"},
),
sdkmetric.NewView(
sdkmetric.Instrument{Name: "db.sql.connection.max_open"},
sdkmetric.Stream{Name: "db.client.connection.max"},
),
sdkmetric.NewView(
sdkmetric.Instrument{Name: "db.sql.connection.wait"},
sdkmetric.Stream{Name: "db.client.connection.wait_count"},
),
sdkmetric.NewView(
sdkmetric.Instrument{Name: "db.sql.connection.wait_duration"},
sdkmetric.Stream{Name: "db.client.connection.wait_time"},
),
sdkmetric.NewView(
sdkmetric.Instrument{Name: "db.sql.connection.closed_max_idle"},
sdkmetric.Stream{Name: "db.client.connection.closed.max_idle"},
),
sdkmetric.NewView(
sdkmetric.Instrument{Name: "db.sql.connection.closed_max_idle_time"},
sdkmetric.Stream{Name: "db.client.connection.closed.max_idle_time"},
),
}
meterProvider = sdkmetric.NewMeterProvider(
sdkmetric.WithResource(res),
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter)),
sdkmetric.WithView(dbMetricViews...),
)
otel.SetMeterProvider(meterProvider)
// Initialize OTEL logs
if config.Logging.OtelLogsEnabled {
logExporter, err := otlploggrpc.New(context.Background(),
otlploggrpc.WithCompressor("gzip"),
)
if err != nil {
initFatal(err, "Failed to initialize OTEL log exporter")
}
loggerProvider = otelsdklog.NewLoggerProvider(
otelsdklog.WithResource(res),
otelsdklog.WithProcessor(otelsdklog.NewBatchProcessor(logExporter)),
)
}
}
logger := initLogger(config, loggerProvider)
// If you want to disable any logs by default, this is where to do it.
//
// For example:
// platform_logging.DisableTopic("deprecated-api-keys")
// Apply log topic overrides from config. Enables run first, then
// disables, so disable wins on conflict.
// Note that any topic not included in these lists will be considered
// enabled if it's encountered in a log.
for _, topic := range str.SplitAndTrim(config.Logging.EnableLogTopics, ",", true) {
platform_logging.EnableTopic(topic)
}
for _, topic := range str.SplitAndTrim(config.Logging.DisableLogTopics, ",", true) {
platform_logging.DisableTopic(topic)
}
if dev_mode.IsEnabled {
createTestBuckets(cmd.Context(), &config, logger)
}
allowedHostIdentifiers := map[string]bool{
"provided": true,
"instance": true,
"uuid": true,
"hostname": true,
}
if !allowedHostIdentifiers[config.Osquery.HostIdentifier] {
initFatal(fmt.Errorf("%s is not a valid value for osquery_host_identifier", config.Osquery.HostIdentifier), "set host identifier")
}
config.ConditionalAccess.Validate(initFatal)
if len(config.Server.URLPrefix) > 0 {
// Massage provided prefix to match expected format
config.Server.URLPrefix = strings.TrimSuffix(config.Server.URLPrefix, "/")
if len(config.Server.URLPrefix) > 0 && !strings.HasPrefix(config.Server.URLPrefix, "/") {
config.Server.URLPrefix = "/" + config.Server.URLPrefix
}
if !allowedURLPrefixRegexp.MatchString(config.Server.URLPrefix) {
initFatal(
fmt.Errorf("prefix must match regexp \"%s\"", allowedURLPrefixRegexp.String()),
"setting server URL prefix",
)
}
}
// Handle server private key configuration - either direct or via AWS Secrets Manager
if config.Server.PrivateKey != "" && config.Server.PrivateKeySecretArn != "" {
initFatal(errors.New("cannot specify both private_key and private_key_secret_arn"), "validate private key configuration")
}
// Retrieve private key from AWS Secrets Manager if specified
if config.Server.PrivateKeySecretArn != "" {
privateKey, err := configpkg.RetrieveSecretsManagerSecret(
context.Background(),
config.Server.PrivateKeySecretArn,
config.Server.PrivateKeySecretRegion,
config.Server.PrivateKeySecretSTSAssumeRoleArn,
config.Server.PrivateKeySecretSTSExternalID,
)
if err != nil {
initFatal(err, "retrieve private key from secrets manager")
}
config.Server.PrivateKey = privateKey
}
if len(config.Server.PrivateKey) > 0 {
if len(config.Server.PrivateKey) < 32 {
initFatal(errors.New("private key must be at least 32 bytes long"), "validate private key")
}
// We truncate to 32 bytes because AES-256 requires a 32 byte (256 bit) PK, but some
// infra setups generate keys that are longer than 32 bytes.
config.Server.PrivateKey = config.Server.PrivateKey[:32]
}
if config.MDM.CertificateProfilesLimit < 0 {
config.MDM.CertificateProfilesLimit = 0
}
var ds fleet.Datastore
var carveStore fleet.CarveStore
opts := []mysql.DBOption{mysql.Logger(logger), mysql.WithFleetConfig(&config)}
if config.MysqlReadReplica.Address != "" {
opts = append(opts, mysql.Replica(&config.MysqlReadReplica))
}
// NOTE this will disable OTEL/APM interceptor
if dev_mode.Env("FLEET_DEV_ENABLE_SQL_INTERCEPTOR") != "" {
opts = append(opts, mysql.WithInterceptor(&devSQLInterceptor{
logger: logger.With("component", "sql-interceptor"),
}))
}
if config.Logging.TracingEnabled {
opts = append(opts, mysql.TracingEnabled(&config.Logging))
}
// Configure default max request body size based on config
platform_http.MaxRequestBodySize = config.Server.DefaultMaxRequestBodySize
// Create database connections that can be shared across datastores
dbConns, err := mysql.NewDBConnections(config.Mysql, opts...)
if err != nil {
initFatal(err, "initializing database connections")
}
mds, err := mysql.NewDatastore(dbConns, config.Mysql, clock.C)
if err != nil {
initFatal(err, "initializing datastore")
}
ds = mds
if config.S3.CarvesBucket != "" || config.S3.Bucket != "" {
carveStore, err = s3.NewCarveStore(config.S3, ds)
if err != nil {
initFatal(err, "initializing S3 carvestore")
}
} else {
carveStore = ds
}
migrationStatus, err := ds.MigrationStatus(cmd.Context())
if err != nil {
initFatal(err, "retrieving migration status")
}
switch migrationStatus.StatusCode {
case fleet.AllMigrationsCompleted:
// OK
case fleet.UnknownMigrations:
printUnknownMigrationsMessage(migrationStatus.UnknownTable, migrationStatus.UnknownData)
if dev_mode.IsEnabled {
os.Exit(1)
}
case fleet.NeedsFleetv4732Fix:
printFleetv4732FixNeededMessage()
if !config.Upgrades.AllowMissingMigrations {
os.Exit(1)
}
case fleet.UnknownFleetv4732State:
printFleetv4732UnknownStateMessage(migrationStatus.StatusCode)
if !config.Upgrades.AllowMissingMigrations {
os.Exit(1)
}
case fleet.SomeMigrationsCompleted:
tables, data := migrationStatus.MissingTable, migrationStatus.MissingData
printMissingMigrationsWarning(tables, data)
if !config.Upgrades.AllowMissingMigrations {
os.Exit(1)
}
case fleet.NoMigrationsCompleted:
printDatabaseNotInitializedError()
os.Exit(1)
}
if initializingDS, ok := ds.(initializer); ok {
if err := initializingDS.Initialize(); err != nil {
initFatal(err, "loading built in data")
}
}
// Strip the Redis URI scheme if it's present. Scheme docs are at: https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml
// This allows us to use Render's Redis service in render.yaml, including the free tier.
// In the future, we could support the full Redis URI if needed (including username, password, database, etc.)
redisAddress := strings.TrimPrefix(config.Redis.Address, "redis://")
redisPool, err := redis.NewPool(redis.PoolConfig{
Server: redisAddress,
Username: config.Redis.Username,
Password: config.Redis.Password,
Database: config.Redis.Database,
UseTLS: config.Redis.UseTLS,
Region: config.Redis.Region,
CacheName: config.Redis.CacheName,
StsAssumeRoleArn: config.Redis.StsAssumeRoleArn,
StsExternalID: config.Redis.StsExternalID,
ConnTimeout: config.Redis.ConnectTimeout,
KeepAlive: config.Redis.KeepAlive,
ConnectRetryAttempts: config.Redis.ConnectRetryAttempts,
ClusterFollowRedirections: config.Redis.ClusterFollowRedirections,
ClusterReadFromReplica: config.Redis.ClusterReadFromReplica,
TLSCert: config.Redis.TLSCert,
TLSKey: config.Redis.TLSKey,
TLSCA: config.Redis.TLSCA,
TLSServerName: config.Redis.TLSServerName,
TLSHandshakeTimeout: config.Redis.TLSHandshakeTimeout,
MaxIdleConns: config.Redis.MaxIdleConns,
MaxOpenConns: config.Redis.MaxOpenConns,
ConnMaxLifetime: config.Redis.ConnMaxLifetime,
IdleTimeout: config.Redis.IdleTimeout,
ConnWaitTimeout: config.Redis.ConnWaitTimeout,
WriteTimeout: config.Redis.WriteTimeout,
ReadTimeout: config.Redis.ReadTimeout,
})
if err != nil {
initFatal(err, "initialize Redis")
}
logger.InfoContext(cmd.Context(), "redis initialized", "component", "redis", "mode", redisPool.Mode())
ds = cached_mysql.New(ds)
var dsOpts []mysqlredis.Option
if license.DeviceCount > 0 && config.License.EnforceHostLimit {
dsOpts = append(dsOpts, mysqlredis.WithEnforcedHostLimit(license.DeviceCount))
}
redisWrapperDS := mysqlredis.New(ds, redisPool, dsOpts...)
ds = redisWrapperDS
resultStore := pubsub.NewRedisQueryResults(redisPool, config.Redis.DuplicateResults,
logger.With("component", "query-results"),
)
liveQueryStore := live_query.NewRedisLiveQuery(redisPool, logger, liveQueryMemCacheDuration)
ssoSessionStore := sso.NewSessionStore(redisPool)
// Set common configuration for all logging.
loggingConfig := logging.Config{
Filesystem: logging.FilesystemConfig{
EnableLogRotation: config.Filesystem.EnableLogRotation,
EnableLogCompression: config.Filesystem.EnableLogCompression,
MaxSize: config.Filesystem.MaxSize,
MaxAge: config.Filesystem.MaxAge,
MaxBackups: config.Filesystem.MaxBackups,
},
Webhook: logging.WebhookConfig{},
Firehose: logging.FirehoseConfig{
Region: config.Firehose.Region,
EndpointURL: config.Firehose.EndpointURL,
AccessKeyID: config.Firehose.AccessKeyID,
SecretAccessKey: config.Firehose.SecretAccessKey,
StsAssumeRoleArn: config.Firehose.StsAssumeRoleArn,
StsExternalID: config.Firehose.StsExternalID,
},
Kinesis: logging.KinesisConfig{
Region: config.Kinesis.Region,
EndpointURL: config.Kinesis.EndpointURL,
AccessKeyID: config.Kinesis.AccessKeyID,
SecretAccessKey: config.Kinesis.SecretAccessKey,
StsAssumeRoleArn: config.Kinesis.StsAssumeRoleArn,
StsExternalID: config.Kinesis.StsExternalID,
},
Lambda: logging.LambdaConfig{
Region: config.Lambda.Region,
AccessKeyID: config.Lambda.AccessKeyID,
SecretAccessKey: config.Lambda.SecretAccessKey,
StsAssumeRoleArn: config.Lambda.StsAssumeRoleArn,
StsExternalID: config.Lambda.StsExternalID,
},
PubSub: logging.PubSubConfig{
Project: config.PubSub.Project,
},
KafkaREST: logging.KafkaRESTConfig{
ProxyHost: config.KafkaREST.ProxyHost,
ContentTypeValue: config.KafkaREST.ContentTypeValue,
Timeout: config.KafkaREST.Timeout,
},
Nats: logging.NatsConfig{
Server: config.Nats.Server,
CredFile: config.Nats.CredFile,
NKeyFile: config.Nats.NKeyFile,
TLSClientCertFile: config.Nats.TLSClientCrtFile,
TLSClientKeyFile: config.Nats.TLSClientKeyFile,
CACertFile: config.Nats.CACrtFile,
Compression: config.Nats.Compression,
JetStream: config.Nats.JetStream,
Timeout: config.Nats.Timeout,
},
}
// Set specific configuration to osqueryd status logs.
loggingConfig.Plugin = config.Osquery.StatusLogPlugin
loggingConfig.Filesystem.LogFile = config.Filesystem.StatusLogFile
loggingConfig.Webhook.URL = config.Webhook.StatusURL
loggingConfig.Firehose.StreamName = config.Firehose.StatusStream
loggingConfig.Kinesis.StreamName = config.Kinesis.StatusStream
loggingConfig.Lambda.Function = config.Lambda.StatusFunction
loggingConfig.PubSub.Topic = config.PubSub.StatusTopic
loggingConfig.PubSub.AddAttributes = false // only used by result logs
loggingConfig.KafkaREST.Topic = config.KafkaREST.StatusTopic
loggingConfig.Nats.Subject = config.Nats.StatusSubject
osquerydStatusLogger, err := logging.NewJSONLogger(cmd.Context(), "status", loggingConfig, logger)
if err != nil {
initFatal(err, "initializing osqueryd status logging")
}
// Set specific configuration to osqueryd result logs.
loggingConfig.Plugin = config.Osquery.ResultLogPlugin
loggingConfig.Filesystem.LogFile = config.Filesystem.ResultLogFile
loggingConfig.Webhook.URL = config.Webhook.ResultURL
loggingConfig.Firehose.StreamName = config.Firehose.ResultStream
loggingConfig.Kinesis.StreamName = config.Kinesis.ResultStream
loggingConfig.Lambda.Function = config.Lambda.ResultFunction
loggingConfig.PubSub.Topic = config.PubSub.ResultTopic
loggingConfig.PubSub.AddAttributes = config.PubSub.AddAttributes
loggingConfig.KafkaREST.Topic = config.KafkaREST.ResultTopic
loggingConfig.Nats.Subject = config.Nats.ResultSubject
osquerydResultLogger, err := logging.NewJSONLogger(cmd.Context(), "result", loggingConfig, logger)
if err != nil {
initFatal(err, "initializing osqueryd result logging")
}
var auditLogger fleet.JSONLogger
if license.IsPremium() && config.Activity.EnableAuditLog {
// Set specific configuration to audit logs.
loggingConfig.Plugin = config.Activity.AuditLogPlugin
loggingConfig.Filesystem.LogFile = config.Filesystem.AuditLogFile
loggingConfig.Firehose.StreamName = config.Firehose.AuditStream
loggingConfig.Kinesis.StreamName = config.Kinesis.AuditStream
loggingConfig.Lambda.Function = config.Lambda.AuditFunction
loggingConfig.PubSub.Topic = config.PubSub.AuditTopic
loggingConfig.PubSub.AddAttributes = false // only used by result logs
loggingConfig.KafkaREST.Topic = config.KafkaREST.AuditTopic
loggingConfig.Nats.Subject = config.Nats.AuditSubject
auditLogger, err = logging.NewJSONLogger(cmd.Context(), "audit", loggingConfig, logger)
if err != nil {
initFatal(err, "initializing audit logging")
}
}
failingPolicySet := redis_policy_set.NewFailing(redisPool)
task := async.NewTask(ds, redisPool, clock.C, &config)
if config.Sentry.Dsn != "" {
v := version.Version()
err = sentry.Init(sentry.ClientOptions{
Dsn: config.Sentry.Dsn,
Release: fmt.Sprintf("%s_%s_%s", v.Version, v.Branch, v.Revision),
})
if err != nil {
initFatal(err, "initializing sentry")
}
logger.InfoContext(cmd.Context(), "sentry initialized", "dsn", config.Sentry.Dsn)
defer sentry.Recover()
defer sentry.Flush(2 * time.Second)
}
var geoIP fleet.GeoIP
geoIP = &fleet.NoOpGeoIP{}
if config.GeoIP.DatabasePath != "" {
maxmind, err := fleet.NewMaxMindGeoIP(logger, config.GeoIP.DatabasePath)
if err != nil {
logger.ErrorContext(cmd.Context(), "failed to initialize maxmind geoip, check database path", "database_path",
config.GeoIP.DatabasePath, "error", err)
} else {
geoIP = maxmind
}
}
if config.MDM.EnableCustomOSUpdatesAndFileVault && !license.IsPremium() {
config.MDM.EnableCustomOSUpdatesAndFileVault = false
logger.WarnContext(cmd.Context(), "Disabling custom OS updates and FileVault management because Fleet Premium license is not present")
}
mdmStorage, err := mds.NewMDMAppleMDMStorage()
if err != nil {
initFatal(err, "initialize mdm apple MySQL storage")
}
depStorage, err := mds.NewMDMAppleDEPStorage()
if err != nil {
initFatal(err, "initialize Apple BM DEP storage")
}
scepStorage, err := mds.NewSCEPDepot()
if err != nil {
initFatal(err, "initialize mdm apple scep storage")
}
var mdmPushService push.Pusher
nanoMDMLogger := service.NewNanoMDMLogger(logger.With("component", "apple-mdm-push"))
pushProviderFactory := buford.NewPushProviderFactory(buford.WithNewClient(func(cert *tls.Certificate) (*http.Client, error) {
return fleethttp.NewClient(fleethttp.WithTLSClientConfig(&tls.Config{
Certificates: []tls.Certificate{*cert},
})), nil
}))
if dev_mode.Env("FLEET_DEV_MDM_APPLE_DISABLE_PUSH") == "1" {
mdmPushService = nopPusher{}
} else {
mdmPushService = nanomdm_pushsvc.New(mdmStorage, mdmStorage, pushProviderFactory, nanoMDMLogger)
}
mds.WithPusher(mdmPushService)
checkMDMAssets := func(names []fleet.MDMAssetName) (bool, error) {
_, err = ds.GetAllMDMConfigAssetsByName(context.Background(), names, nil)
if err != nil {
if fleet.IsNotFound(err) || errors.Is(err, mysql.ErrPartialResult) {
return false, nil
}
return false, err
}
return true, nil
}
// reconcile Apple Business Manager configuration environment variables with the database
if config.MDM.IsAppleAPNsSet() || config.MDM.IsAppleSCEPSet() {
if len(config.Server.PrivateKey) == 0 {
initFatal(errors.New("inserting MDM APNs and SCEP assets"),
"missing required private key. Learn how to configure the private key here: https://fleetdm.com/learn-more-about/fleet-server-private-key")
}
// first we'll check if the APNs and SCEP assets are already in the database and
// only insert config values if they're not already present in the database
toInsert := make(map[fleet.MDMAssetName]struct{}, 4)
// check DB for APNs assets
found, err := checkMDMAssets([]fleet.MDMAssetName{fleet.MDMAssetAPNSCert, fleet.MDMAssetAPNSKey})
switch {
case err != nil:
initFatal(err, "reading APNs assets from database")
case !found:
toInsert[fleet.MDMAssetAPNSCert] = struct{}{}
toInsert[fleet.MDMAssetAPNSKey] = struct{}{}
default:
logger.WarnContext(cmd.Context(),
"Your server already has stored APNs certificates. Fleet will ignore any certificates provided via environment variables when this happens.")
}
// check DB for SCEP assets
found, err = checkMDMAssets([]fleet.MDMAssetName{fleet.MDMAssetCACert, fleet.MDMAssetCAKey})
switch {
case err != nil:
initFatal(err, "reading SCEP assets from database")
case !found:
toInsert[fleet.MDMAssetCACert] = struct{}{}
toInsert[fleet.MDMAssetCAKey] = struct{}{}
default:
logger.WarnContext(cmd.Context(),
"Your server already has stored SCEP certificates. Fleet will ignore any certificates provided via environment variables when this happens.")
}
if len(toInsert) > 0 {
if !config.MDM.IsAppleAPNsSet() {
initFatal(errors.New("Apple APNs MDM configuration must be provided when Apple SCEP is provided"),
"validate Apple MDM")
} else if !config.MDM.IsAppleSCEPSet() {
initFatal(errors.New("Apple SCEP MDM configuration must be provided when Apple APNs is provided"),
"validate Apple MDM")
}
// parse the APNs and SCEP assets from the config
_, apnsCertPEM, apnsKeyPEM, err := config.MDM.AppleAPNs()
if err != nil {
initFatal(err, "parse Apple APNs certificate and key from config")
}
_, appleSCEPCertPEM, appleSCEPKeyPEM, err := config.MDM.AppleSCEP()
if err != nil {
initFatal(err, "load Apple SCEP certificate and key from config")
}
var args []fleet.MDMConfigAsset
for name := range toInsert {
switch name {
case fleet.MDMAssetAPNSCert:
args = append(args, fleet.MDMConfigAsset{Name: name, Value: apnsCertPEM})
case fleet.MDMAssetAPNSKey:
args = append(args, fleet.MDMConfigAsset{Name: name, Value: apnsKeyPEM})
case fleet.MDMAssetCACert:
args = append(args, fleet.MDMConfigAsset{Name: name, Value: appleSCEPCertPEM})
case fleet.MDMAssetCAKey:
args = append(args, fleet.MDMConfigAsset{Name: name, Value: appleSCEPKeyPEM})
}
}
if err := ds.InsertMDMConfigAssets(context.Background(), args, nil); err != nil {
if mysql.IsDuplicate(err) {
// we already checked for existing assets so we should never have a duplicate key error here; we'll add a debug log just in case
logger.DebugContext(cmd.Context(), "unexpected duplicate key error inserting MDM APNs and SCEP assets")
} else {
initFatal(err, "inserting MDM APNs and SCEP assets")
}
}
}
}
// reconcile Apple Business Manager configuration environment variables with the database
if config.MDM.IsAppleBMSet() {
if len(config.Server.PrivateKey) == 0 {
initFatal(errors.New("inserting MDM ABM assets"),
"missing required private key. Learn how to configure the private key here: https://fleetdm.com/learn-more-about/fleet-server-private-key")
}
appleBM, err := config.MDM.AppleBM()
if err != nil {
initFatal(err, "parse Apple BM token, certificate and key from config")
}
toInsert := make([]fleet.MDMConfigAsset, 0, 2)
found, err := checkMDMAssets([]fleet.MDMAssetName{fleet.MDMAssetABMKey, fleet.MDMAssetABMCert})
switch {
case err != nil:
initFatal(err, "reading ABM assets from database")
case !found:
toInsert = append(toInsert, fleet.MDMConfigAsset{Name: fleet.MDMAssetABMKey, Value: appleBM.KeyPEM},
fleet.MDMConfigAsset{Name: fleet.MDMAssetABMCert, Value: appleBM.CertPEM})
default:
logger.WarnContext(cmd.Context(),
"Your server already has stored ABM certificates and token. Fleet will ignore any certificates provided via environment variables when this happens.")
}
if len(toInsert) > 0 {
err := ds.InsertMDMConfigAssets(context.Background(), toInsert, nil)
switch {
case err != nil && mysql.IsDuplicate(err):
// we already checked for existing assets so we should never have a duplicate key error here; we'll add a debug log just in case
logger.DebugContext(cmd.Context(), "unexpected duplicate key error inserting ABM assets")
case err != nil:
initFatal(err, "inserting ABM assets")
default:
// insert the ABM token without any metdata; it'll be picked by the
// apple_mdm_dep_profile_assigner cron and backfilled
if _, err := ds.InsertABMToken(context.Background(), &fleet.ABMToken{
EncryptedToken: appleBM.EncryptedToken,
RenewAt: time.Date(2000, time.January, 1, 0, 0, 0, 0,
time.UTC), // 2000-01-01 is our "zero value" for time
}); err != nil {
initFatal(err, "save ABM token")
}
}
}
}
appCfg, err := ds.AppConfig(context.Background())
if err != nil {
initFatal(err, "loading app config")
}
appCfg.MDM.EnabledAndConfigured = false
appCfg.MDM.AppleBMEnabledAndConfigured = false
if len(config.Server.PrivateKey) > 0 {
appCfg.MDM.EnabledAndConfigured, err = checkMDMAssets([]fleet.MDMAssetName{
fleet.MDMAssetCACert,
fleet.MDMAssetCAKey,
fleet.MDMAssetAPNSKey,
fleet.MDMAssetAPNSCert,
})
if err != nil {
initFatal(err, "loading MDM assets from database")
}
var appleBMCerts bool
appleBMCerts, err = checkMDMAssets([]fleet.MDMAssetName{
fleet.MDMAssetABMCert,
fleet.MDMAssetABMKey,
})
if err != nil {
initFatal(err, "loading MDM ABM assets from database")
}
if appleBMCerts {
// the ABM certs are there, check if a token exists and if so, apple
// BM is enabled and configured.
count, err := ds.GetABMTokenCount(context.Background())
if err != nil {
initFatal(err, "loading MDM ABM token from database")
}
appCfg.MDM.AppleBMEnabledAndConfigured = count > 0
}
}
if appCfg.MDM.EnabledAndConfigured {
logger.InfoContext(cmd.Context(), "Apple MDM enabled")
}
if appCfg.MDM.AppleBMEnabledAndConfigured {
logger.InfoContext(cmd.Context(), "Apple Business Manager enabled")
}
// register the Microsoft MDM services
var (
wstepCertManager microsoft_mdm.CertManager
)
// Configuring WSTEP certs
if config.MDM.IsMicrosoftWSTEPSet() {
_, crtPEM, keyPEM, err := config.MDM.MicrosoftWSTEP()
if err != nil {
initFatal(err, "validate Microsoft WSTEP certificate and key")
}
wstepCertManager, err = microsoft_mdm.NewCertManager(ds, crtPEM, keyPEM)
if err != nil {
initFatal(err, "initialize mdm microsoft wstep depot")
}
}
// save the app config with the updated MDM.Enabled value
if err := ds.SaveAppConfig(context.Background(), appCfg); err != nil {
initFatal(err, "saving app config")
}
// setup mail service
if appCfg.SMTPSettings != nil && appCfg.SMTPSettings.SMTPEnabled {
// if SMTP is already enabled then default the backend to empty string, which fill force load the SMTP implementation
if config.Email.EmailBackend != "" {
config.Email.EmailBackend = ""
logger.WarnContext(cmd.Context(), "SMTP is already enabled, first disable SMTP to utilize a different email backend")
}
}
mailService, err := mail.NewService(config)
if err != nil {
logger.ErrorContext(cmd.Context(), "failed to configure mailing service", "err", err)
}
cronSchedules := fleet.NewCronSchedules()
baseCtx := licensectx.NewContext(context.Background(), license)
ctx, cancelFunc := context.WithCancel(baseCtx)
defer cancelFunc()
// Channel used to trigger graceful shutdown on fatal DB errors (e.g. Aurora failover).
dbFatalCh := make(chan error, 1)
common_mysql.SetFatalErrorHandler(func(ctx context.Context, err error) {
logger.ErrorContext(ctx, "fatal database error detected, initiating graceful shutdown", "err", err)
select {
case dbFatalCh <- err:
default:
}
})
var conditionalAccessMicrosoftProxy *conditional_access_microsoft_proxy.Proxy
if config.MicrosoftCompliancePartner.IsSet() {
var err error
conditionalAccessMicrosoftProxy, err = conditional_access_microsoft_proxy.New(
config.MicrosoftCompliancePartner.ProxyURI,
config.MicrosoftCompliancePartner.ProxyAPIKey,
func() (string, error) {
appCfg, err := ds.AppConfig(ctx)
if err != nil {
return "", fmt.Errorf("failed to load appconfig: %w", err)
}
return appCfg.ServerSettings.ServerURL, nil
},
)
if err != nil {
initFatal(err, "new microsoft compliance proxy")
}
}
eh := errorstore.NewHandler(ctx, redisPool, logger, config.Logging.ErrorRetentionPeriod)
scepConfigMgr := scep.NewSCEPConfigService(logger, nil)
digiCertService := digicert.NewService(digicert.WithLogger(logger))
ctx = ctxerr.NewContext(ctx, eh)
// Declare svc early so the closure below can capture it.
var svc fleet.Service
config.MDM.AndroidAgent.Validate(initFatal)
androidSvc, err := android_service.NewService(
ctx,
logger,
ds,
config.License.Key,
config.Server.PrivateKey,
ds,
func(ctx context.Context, user *fleet.User, activity fleet.ActivityDetails) error {
return svc.NewActivity(ctx, user, activity)
},
config.MDM.AndroidAgent,
)
if err != nil {
initFatal(err, "initializing android service")
}
svc, err = service.NewService(
ctx,
ds,
task,
resultStore,
logger,
&service.OsqueryLogger{
Status: osquerydStatusLogger,
Result: osquerydResultLogger,
},
config,
mailService,
clock.C,
ssoSessionStore,
liveQueryStore,
carveStore,
failingPolicySet,
geoIP,
redisWrapperDS,
depStorage,
mdmStorage,
mdmPushService,
cronSchedules,
wstepCertManager,
scepConfigMgr,
digiCertService,
conditionalAccessMicrosoftProxy,
redis_key_value.New(redisPool),
androidSvc,
)
if err != nil {
initFatal(err, "initializing service")
}
var softwareInstallStore fleet.SoftwareInstallerStore
var bootstrapPackageStore fleet.MDMBootstrapPackageStore
var softwareTitleIconStore fleet.SoftwareTitleIconStore
var distributedLock fleet.Lock
if license.IsPremium() {
hydrantService := est.NewService(est.WithLogger(logger))
profileMatcher := apple_mdm.NewProfileMatcher(redisPool)
if config.S3.SoftwareInstallersBucket != "" {
if config.S3.BucketsAndPrefixesMatch() {
logger.WarnContext(ctx,
"the S3 buckets and prefixes for carves and software installers appear to be identical, this can cause issues")
}
// Extract the CloudFront URL signer before creating the S3 stores.
config.S3.ValidateCloudFrontURL(initFatal)
if config.S3.SoftwareInstallersCloudFrontURLSigningPrivateKey != "" {
// Strip newlines from private key
signingPrivateKey := strings.ReplaceAll(config.S3.SoftwareInstallersCloudFrontURLSigningPrivateKey, "\\n", "\n")
privateKey, err := cryptoutil.ParsePrivateKey([]byte(signingPrivateKey),
"CloudFront URL signing private key")
if err != nil {
initFatal(err, "parsing CloudFront URL signing private key")
}
var ok bool
config.S3.SoftwareInstallersCloudFrontSigner, ok = privateKey.(crypto.Signer)
if !ok {
initFatal(errors.New("CloudFront URL signing private key is not a crypto.Signer"),
"parsing CloudFront URL signing private key")
}
}
store, err := s3.NewSoftwareInstallerStore(config.S3)
if err != nil {
initFatal(err, "initializing S3 software installer store")
}
softwareInstallStore = store
logger.InfoContext(ctx, "using S3 software installer store", "bucket", config.S3.SoftwareInstallersBucket)
bstore, err := s3.NewBootstrapPackageStore(config.S3)
if err != nil {
initFatal(err, "initializing S3 bootstrap package store")
}
bootstrapPackageStore = bstore
logger.InfoContext(ctx, "using S3 bootstrap package store", "bucket", config.S3.SoftwareInstallersBucket)
softwareTitleIconStore, err = s3.NewSoftwareTitleIconStore(config.S3)
if err != nil {
initFatal(err, "initializing S3 software title icon store")
}
logger.InfoContext(ctx, "using S3 software title icon store", "bucket", config.S3.SoftwareInstallersBucket)
} else {
installerDir := os.TempDir()
if dir := os.Getenv("FLEET_SOFTWARE_INSTALLER_STORE_DIR"); dir != "" {
installerDir = dir
}
store, err := filesystem.NewSoftwareInstallerStore(installerDir)
if err != nil {
logger.ErrorContext(ctx, "failed to configure local filesystem software installer store", "err", err)
softwareInstallStore = failing.NewFailingSoftwareInstallerStore()
} else {
softwareInstallStore = store
logger.InfoContext(ctx,
"using local filesystem software installer store, this is not suitable for production use", "directory",
installerDir)
}
iconDir := os.TempDir()
if dir := os.Getenv("FLEET_SOFTWARE_TITLE_ICON_STORE_DIR"); dir != "" {
iconDir = dir
}
iconStore, err := filesystem.NewSoftwareTitleIconStore(iconDir)
if err != nil {
logger.ErrorContext(ctx, "failed to configure local filesystem software title icon store", "err", err)
softwareTitleIconStore = failing.NewFailingSoftwareTitleIconStore()
} else {
softwareTitleIconStore = iconStore
logger.WarnContext(ctx,
"using local filesystem software title icon store, this is not suitable for production use", "directory",
iconDir)
}
}
distributedLock = redis_lock.NewLock(redisPool)
svc, err = eeservice.NewService(
svc,
ds,
logger,
config,
mailService,
clock.C,
depStorage,
apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService),
ssoSessionStore,
profileMatcher,
softwareInstallStore,
bootstrapPackageStore,
softwareTitleIconStore,
distributedLock,
redis_key_value.New(redisPool),
scepConfigMgr,
digiCertService,
androidSvc,
hydrantService,
)
if err != nil {
initFatal(err, "initial Fleet Premium service")
}
}
instanceID, err := server.GenerateRandomText(64)
if err != nil {
initFatal(errors.New("Error generating random instance identifier"), "")
}
logger.InfoContext(ctx, "instance info", "instanceID", instanceID)
// Bootstrap activity bounded context (needed for cron schedules and HTTP routes)
activitySvc, activityRoutes := createActivityBoundedContext(svc, dbConns, logger)
// Inject the activity bounded context into the main service
svc.SetActivityService(activitySvc)
// Bootstrap ACME service module
acmeSigner := &acmeCSRSigner{signer: scepdepot.NewSigner(scepStorage, scepdepot.WithValidityDays(config.MDM.AppleSCEPSignerValidityDays), scepdepot.WithAllowRenewalDays(14))}
acmeSvc, acmeRoutes := createACMEServiceModule(ds, dbConns, redisPool, logger, acmeSigner)
// Inject the ACME service module into the main service
svc.SetACMEService(acmeSvc)
// Perform a cleanup of cron_stats outside of the cronSchedules because the
// schedule package uses cron_stats entries to decide whether a schedule will
// run or not (see https://github.com/fleetdm/fleet/issues/9486).
go func() {
cleanupCronStats := func() {
logger.DebugContext(ctx, "cleaning up cron_stats")
// Datastore.CleanupCronStats should be safe to run by multiple fleet
// instances at the same time and it should not be an expensive operation.
if err := ds.CleanupCronStats(ctx); err != nil {
logger.InfoContext(ctx, "failed to clean up cron_stats", "err", err)
}
}
cleanupCronStats()
cleanUpCronStatsTick := time.NewTicker(1 * time.Hour)
defer cleanUpCronStatsTick.Stop()
for {
select {
case <-ctx.Done():
return
case <-cleanUpCronStatsTick.C:
cleanupCronStats()
}
}
}()
if softwareInstallStore != nil {
if err := cronSchedules.StartCronSchedule(
func() (fleet.CronSchedule, error) {
return cronUninstallSoftwareMigration(ctx, instanceID, ds, softwareInstallStore, logger)
},
); err != nil {
initFatal(err, fmt.Sprintf("failed to register %s", fleet.CronUninstallSoftwareMigration))
}
if err := cronSchedules.StartCronSchedule(
func() (fleet.CronSchedule, error) {
return cronUpgradeCodeSoftwareMigration(ctx, instanceID, ds, softwareInstallStore, logger)
},
); err != nil {
initFatal(err, fmt.Sprintf("failed to register %s", fleet.CronUpgradeCodeSoftwareMigration))
}
}
if config.Server.FrequentCleanupsEnabled {
if err := cronSchedules.StartCronSchedule(
func() (fleet.CronSchedule, error) {
return newFrequentCleanupsSchedule(ctx, instanceID, ds, liveQueryStore, logger)
},
); err != nil {
initFatal(err, "failed to register frequent_cleanups schedule")
}
}
if err := cronSchedules.StartCronSchedule(
func() (fleet.CronSchedule, error) {
commander := apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService)
return newCleanupsAndAggregationSchedule(
ctx, instanceID, ds, svc, logger, redisWrapperDS, &config, commander, softwareInstallStore, bootstrapPackageStore, softwareTitleIconStore, androidSvc, activitySvc, acmeSvc,
)
},
); err != nil {
initFatal(err, "failed to register cleanups_then_aggregations schedule")
}
if err := cronSchedules.StartCronSchedule(
func() (fleet.CronSchedule, error) {
return newQueryResultsCleanupSchedule(ctx, instanceID, ds, liveQueryStore, logger)
},
); err != nil {
initFatal(err, "failed to register query_results_cleanup schedule")
}
if err := cronSchedules.StartCronSchedule(
func() (fleet.CronSchedule, error) {
return newUpcomingActivitiesSchedule(ctx, instanceID, ds, logger)
},
); err != nil {
initFatal(err, "failed to register upcoming_activities_maintenance schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newUsageStatisticsSchedule(ctx, instanceID, ds, config, logger)
}); err != nil {
initFatal(err, "failed to register stats schedule")
}
if err := cronSchedules.StartCronSchedule(
func() (fleet.CronSchedule, error) {
return newBatchActivitiesSchedule(ctx, instanceID, ds, logger)
}); err != nil {
initFatal(err, "failed to register batch activities schedule")
}
vulnerabilityScheduleDisabled := false
if config.Vulnerabilities.DisableSchedule {
vulnerabilityScheduleDisabled = true
logger.InfoContext(ctx, "vulnerabilities schedule disabled via vulnerabilities.disable_schedule")
}
if config.Vulnerabilities.CurrentInstanceChecks == "no" || config.Vulnerabilities.CurrentInstanceChecks == "0" {
logger.InfoContext(ctx, "vulnerabilities schedule disabled via vulnerabilities.current_instance_checks")
vulnerabilityScheduleDisabled = true
}
if !vulnerabilityScheduleDisabled {
// vuln processing by default is run by internal cron mechanism
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newVulnerabilitiesSchedule(ctx, instanceID, ds, logger, &config.Vulnerabilities)
}); err != nil {
initFatal(err, "failed to register vulnerabilities schedule")
}
} else {
// Register a remote trigger proxy so triggering still works
// when the vulnerability schedule runs on a separate server.
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return schedule.NewRemoteTriggerSchedule(string(fleet.CronVulnerabilities), ds), nil
}); err != nil {
initFatal(err, "failed to register remote vulnerability trigger")
}
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newAutomationsSchedule(ctx, instanceID, ds, logger, 5*time.Minute, failingPolicySet)
}); err != nil {
initFatal(err, "failed to register automations schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
commander := apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService)
return newWorkerIntegrationsSchedule(ctx, instanceID, ds, logger, depStorage, commander, androidSvc)
}); err != nil {
initFatal(err, "failed to register worker integrations schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
commander := apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService)
vppInstaller := svc.(fleet.AppleMDMVPPInstaller)
return newAppleMDMWorkerSchedule(ctx, instanceID, ds, logger, commander, bootstrapPackageStore, vppInstaller)
}); err != nil {
initFatal(err, "failed to register apple_mdm_worker schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newAppleMDMDEPProfileAssigner(ctx, instanceID, config.MDM.AppleDEPSyncPeriodicity, ds, depStorage, logger)
}); err != nil {
initFatal(err, "failed to register apple_mdm_dep_profile_assigner schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newMDMAppleServiceDiscoverySchedule(ctx, instanceID, ds, depStorage, logger, config.Server.URLPrefix)
}); err != nil {
initFatal(err, "failed to register mdm_apple_service_discovery schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newAppleMDMProfileManagerSchedule(
ctx,
instanceID,
ds,
apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService),
redis_key_value.New(redisPool),
logger,
config.MDM.CertificateProfilesLimit,
)
}); err != nil {
initFatal(err, "failed to register mdm_apple_profile_manager schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newWindowsMDMProfileManagerSchedule(
ctx,
instanceID,
ds,
logger,
)
}); err != nil {
initFatal(err, "failed to register mdm_windows_profile_manager schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newAndroidMDMProfileManagerSchedule(
ctx,
instanceID,
ds,
logger,
config.License.Key, // NOTE: this requires the license key, not the parsed *LicenseInfo available in the ctx
config.MDM.AndroidAgent,
)
}); err != nil {
initFatal(err, "failed to register mdm_android_profile_manager schedule")
}
// Register Android MDM Device Reconciler schedule (same interval as Android profile manager)
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newAndroidMDMDeviceReconcilerSchedule(
ctx,
instanceID,
ds,
logger,
config.License.Key,
svc.NewActivity,
)
}); err != nil {
initFatal(err, "failed to register mdm_android_device_reconciler schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return cronEnableAndroidAppReportsOnDefaultPolicy(ctx, instanceID, ds, logger, androidSvc)
}); err != nil {
initFatal(err, "failed to register enable_android_app_reports_on_default_policy cron")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return cronMigrateToPerHostPolicy(ctx, instanceID, ds, logger, androidSvc)
}); err != nil {
initFatal(err, "failed to register migrate_to_per_host_policy cron")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newMDMAPNsPusher(
ctx,
instanceID,
ds,
apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService),
logger,
)
}); err != nil {
initFatal(err, "failed to register APNs pusher schedule")
}
if license.IsPremium() {
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
commander := apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService)
return newIPhoneIPadRefetcher(ctx, instanceID, 10*time.Minute, ds, commander, logger, svc.NewActivity)
}); err != nil {
initFatal(err, "failed to register apple_mdm_iphone_ipad_refetcher schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
commander := apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService)
return newIPhoneIPadReviver(ctx, instanceID, ds, commander, logger)
}); err != nil {
initFatal(err, "failed to register apple_mdm_iphone_ipad_reviver schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newMaintainedAppSchedule(ctx, instanceID, ds, logger)
}); err != nil {
initFatal(err, "failed to register maintained apps schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newRefreshVPPAppVersionsSchedule(ctx, instanceID, ds, logger, apple_apps.Configure(ctx, ds, config.License.Key, config.MDM.AppleConnectJWT))
}); err != nil {
initFatal(err, "failed to register refresh vpp app versions schedule")
}
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
commander := apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService)
return newRecoveryLockPasswordSchedule(ctx, instanceID, ds, commander, logger, svc.NewActivity)
}); err != nil {
initFatal(err, "failed to register recovery lock password schedule")
}
}
if license.IsPremium() && config.Activity.EnableAuditLog {
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newActivitiesStreamingSchedule(ctx, instanceID, activitySvc, ds, logger, auditLogger)
}); err != nil {
initFatal(err, "failed to register activities streaming schedule")
}
}
if license.IsPremium() {
if err := cronSchedules.StartCronSchedule(
func() (fleet.CronSchedule, error) {
if config.Calendar.Periodicity > 0 {
config.Calendar.SetAlwaysReloadEvent(true)
} else {
config.Calendar.Periodicity = 5 * time.Minute
}
return cron.NewCalendarSchedule(ctx, instanceID, ds, distributedLock, config.Calendar, logger)
},
); err != nil {
initFatal(err, "failed to register calendar schedule")
}
}
// Start the service that calculates and updates host vitals label membership.
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newHostVitalsLabelMembershipSchedule(ctx, instanceID, ds, logger)
}); err != nil {
initFatal(err, "failed to register host vitals label membership schedule")
}
// Start the service that marks activities as completed.
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
return newBatchActivityCompletionCheckerSchedule(ctx, instanceID, ds, logger)
}); err != nil {
initFatal(err, "failed to register batch activity completion checker schedule")
}
logger.InfoContext(ctx, fmt.Sprintf("started cron schedules: %s", strings.Join(cronSchedules.ScheduleNames(), ", ")))
// StartCollectors starts a goroutine per collector, using ctx to cancel.
task.StartCollectors(ctx, logger.With("cron", "async_task"))
// Flush seen hosts every second
hostsAsyncCfg := config.Osquery.AsyncConfigForTask(configpkg.AsyncTaskHostLastSeen)
if !hostsAsyncCfg.Enabled {
go func() {
for range time.Tick(time.Duration(rand.Intn(10)+1) * time.Second) {
if err := task.FlushHostsLastSeen(baseCtx, clock.C.Now()); err != nil {
logger.InfoContext(ctx, "failed to update host seen times", "err", err)
}
}
}()
}
fieldKeys := []string{"method", "error"}
requestCount := kitprometheus.NewCounterFrom(prometheus.CounterOpts{
Namespace: "api",
Subsystem: "service",
Name: "request_count",
Help: "Number of requests received.",
}, fieldKeys)
requestLatency := kitprometheus.NewSummaryFrom(prometheus.SummaryOpts{
Namespace: "api",
Subsystem: "service",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, fieldKeys)
svc = service.NewMetricsService(svc, requestCount, requestLatency)
httpLogger := logger.With("component", "http")
limiterStore := &redis.ThrottledStore{
Pool: redisPool,
KeyPrefix: "ratelimit::",
}
var httpSigVerifier func(http.Handler) http.Handler
if license.IsPremium() {
httpSigVerifier, err = httpsig.Middleware(ds, config.Auth.RequireHTTPMessageSignature, logger.With("component", "http-sig-verifier"))
if err != nil {
initFatal(err, "initializing HTTP signature verifier")
}
}
// This is off by default for testing and development uses only.
cspEV := os.Getenv("FLEET_SERVER_ENABLE_CSP")
serveCSP := cspEV == "1" || cspEV == "true"
var apiHandler, frontendHandler, endUserEnrollOTAHandler http.Handler
{
frontendHandler = service.PrometheusMetricsHandler(
"get_frontend",
service.ServeFrontend(config.Server.URLPrefix, config.Server.SandboxEnabled, httpLogger, serveCSP),
)
frontendHandler = service.WithMDMEnrollmentMiddleware(svc, httpLogger, frontendHandler)
var extra []service.ExtraHandlerOption
if config.MDM.SSORateLimitPerMinute > 0 {
extra = append(extra, service.WithMdmSsoRateLimit(throttled.PerMin(config.MDM.SSORateLimitPerMinute)))
}
extra = append(extra, service.WithHTTPSigVerifier(httpSigVerifier))
apiHandler = service.MakeHandler(svc, config, httpLogger, limiterStore, redisPool, carveStore,
[]endpointer.HandlerRoutesFunc{android_service.GetRoutes(svc, androidSvc), activityRoutes, acmeRoutes}, extra...)
if err := service.ValidateAPIEndpoints(apiHandler); err != nil {
panic(fmt.Sprintf("invalid api_endpoints.yml: %v", err))
}
if serveCSP {
// Only injecting this if CSP is turned on since the default security headers add some overhead to each request
apiHandler = endpointer.BrowserSecurityHeadersHandler(serveCSP, apiHandler)
}
setupRequired, err := svc.SetupRequired(baseCtx)
if err != nil {
initFatal(err, "fetching setup requirement")
}
// WithSetup will check if first time setup is required
// By performing the same check inside main, we can make server startups
// more efficient after the first startup.
if setupRequired {
// Pass in a closure to run the fleetctl command, so that the service layer
// doesn't need to import the CLI package.
// When Primo mode is enabled, skip the starter library.
var applyStarterLibrary func(ctx context.Context, serverURL, token string) error
if config.Partnerships.EnablePrimo {
applyStarterLibrary = func(ctx context.Context, _, _ string) error {
logger.DebugContext(ctx, "Skipping starter library application in Primo mode")
return nil
}
} else {
applyStarterLibrary = func(ctx context.Context, serverURL, token string) error {
return service.ApplyStarterLibrary(ctx, serverURL, token, logger, func(args []string) error {
_, err := fleetctl.RunApp(args)
return err
})
}
}
apiHandler = service.WithSetup(svc, logger, applyStarterLibrary, apiHandler)
frontendHandler = service.RedirectLoginToSetup(svc, logger, frontendHandler, config.Server.URLPrefix)
} else {
frontendHandler = service.RedirectSetupToLogin(svc, logger, frontendHandler, config.Server.URLPrefix)
}
endUserEnrollOTAHandler = service.ServeEndUserEnrollOTA(
svc,
config.Server.URLPrefix,
ds,
logger,
serveCSP,
)
}
healthCheckers := make(map[string]health.Checker)
{
// a list of dependencies which could affect the status of the app if unavailable.
deps := map[string]any{
"mysql": ds,
"redis": resultStore,
}
// convert all dependencies to health.Checker if they implement the healthz methods.
for name, dep := range deps {
if hc, ok := dep.(health.Checker); ok {
healthCheckers[name] = hc
} else {
initFatal(errors.New(name+" should be a health.Checker"), "initializing health checks")
}
}
}
// Instantiate a gRPC service to handle launcher requests.
launcher := launcher.New(svc, logger, grpc.NewServer(
grpc.ChainUnaryInterceptor(
grpc_recovery.UnaryServerInterceptor(),
),
grpc.ChainStreamInterceptor(
grpc_recovery.StreamServerInterceptor(),
),
), healthCheckers)
rootMux := http.NewServeMux()
rootMux.Handle("/healthz", service.PrometheusMetricsHandler("healthz", otelmw.WrapHandler(health.Handler(httpLogger, healthCheckers), "/healthz", config)))
rootMux.Handle("/version", service.PrometheusMetricsHandler("version", otelmw.WrapHandler(version.Handler(), "/version", config)))
rootMux.Handle("/assets/", service.PrometheusMetricsHandler("static_assets", otelmw.WrapHandlerDynamic(service.ServeStaticAssets("/assets/", serveCSP), config)))
if len(config.Server.PrivateKey) > 0 {
commander := apple_mdm.NewMDMAppleCommander(mdmStorage, mdmPushService)
ddmService := service.NewMDMAppleDDMService(ds, logger)
vppInstaller := svc.(fleet.AppleMDMVPPInstaller)
mdmCheckinAndCommandService := service.NewMDMAppleCheckinAndCommandService(
ds,
commander,
vppInstaller,
license.IsPremium(),
logger,
redis_key_value.New(redisPool),
svc.NewActivity,
)
mdmCheckinAndCommandService.RegisterResultsHandler("InstalledApplicationList", service.NewInstalledApplicationListResultsHandler(ds, commander, logger, config.Server.VPPVerifyTimeout, config.Server.VPPVerifyRequestDelay, svc.NewActivity))
mdmCheckinAndCommandService.RegisterResultsHandler(fleet.DeviceLocationCmdName, service.NewDeviceLocationResultsHandler(ds, commander, logger))
mdmCheckinAndCommandService.RegisterResultsHandler(fleet.SetRecoveryLockCmdName, service.NewSetRecoveryLockResultsHandler(ds, logger, svc.NewActivity))
hasSCEPChallenge, err := checkMDMAssets([]fleet.MDMAssetName{fleet.MDMAssetSCEPChallenge})
if err != nil {
initFatal(err, "checking SCEP challenge in database")
}
if !hasSCEPChallenge {
scepChallenge := config.MDM.AppleSCEPChallenge
if scepChallenge == "" {
scepChallenge = uuid.NewString()
}
err = ds.InsertMDMConfigAssets(context.Background(), []fleet.MDMConfigAsset{
{Name: fleet.MDMAssetSCEPChallenge, Value: []byte(scepChallenge)},
}, nil)
if err != nil {
// duplicate key errors mean that we already
// have a value for those keys in the
// database, fail to initalize on other
// cases.
if !mysql.IsDuplicate(err) {
initFatal(err, "inserting SCEP challenge")
}
logger.WarnContext(ctx,
"Your server already has stored a SCEP challenge. Fleet will ignore this value provided via environment variables when this happens.")
}
}
if err := service.RegisterAppleMDMProtocolServices(
rootMux,
config.MDM,
mdmStorage,
scepStorage,
logger,
mdmCheckinAndCommandService,
ddmService,
commander,
appCfg.ServerSettings.ServerURL,
config,
); err != nil {
initFatal(err, "setup mdm apple services")
}
}
if license.IsPremium() {
// SCEP proxy (for NDES, etc.)
if err = service.RegisterSCEPProxy(rootMux, ds, logger, nil, &config); err != nil {
initFatal(err, "setup SCEP proxy")
}
if err = scim.RegisterSCIM(rootMux, ds, svc, logger, &config); err != nil {
initFatal(err, "setup SCIM")
}
// Host identify and conditional access SCEP feature only works if a private key has been set up
if len(config.Server.PrivateKey) > 0 {
hostIdentitySCEPDepot, err := mds.NewHostIdentitySCEPDepot(logger.With("component", "host-id-scep-depot"), &config)
if err != nil {
initFatal(err, "setup host identity SCEP depot")
}
if err = hostidentity.RegisterSCEP(rootMux, hostIdentitySCEPDepot, ds, logger, &config); err != nil {
initFatal(err, "setup host identity SCEP")
}
// Conditional Access SCEP
condAccessSCEPDepot, err := mds.NewConditionalAccessSCEPDepot(logger.With("component", "conditional-access-scep-depot"), &config)
if err != nil {
initFatal(err, "setup conditional access SCEP depot")
}
if err = condaccess.RegisterSCEP(ctx, rootMux, condAccessSCEPDepot, ds, logger, &config); err != nil {
initFatal(err, "setup conditional access SCEP")
}
// Conditional Access IdP (Okta)
if err = condaccess.RegisterIdP(rootMux, ds, logger, &config, limiterStore); err != nil {
initFatal(err, "setup conditional access IdP")
}
} else {
logger.WarnContext(ctx,
"Host identity and conditional access SCEP is not available because no server private key has been set up.")
}
}
if config.Prometheus.BasicAuth.Username != "" && config.Prometheus.BasicAuth.Password != "" {
rootMux.Handle("/metrics", basicAuthHandler(
config.Prometheus.BasicAuth.Username,
config.Prometheus.BasicAuth.Password,
service.PrometheusMetricsHandler("metrics", otelmw.WrapHandler(promhttp.Handler(), "/metrics", config)),
))
} else {
if config.Prometheus.BasicAuth.Disable {
logger.InfoContext(ctx, "metrics endpoint enabled with http basic auth disabled")
rootMux.Handle("/metrics", service.PrometheusMetricsHandler("metrics", otelmw.WrapHandler(promhttp.Handler(), "/metrics", config)))
} else {
logger.InfoContext(ctx, "metrics endpoint disabled (http basic auth credentials not set)")
}
}
// We must wrap the Handler here to set special per-endpoint Read/Write
// timeouts, so that we have access to the raw http.ResponseWriter.
// Otherwise, the handler is wrapped by the promhttp response delegator,
// which does not support the Unwrap call needed to work with
// ResponseController.
//
// See https://pkg.go.dev/net/http#NewResponseController which explains
// the Unwrap method that the prometheus wrapper of http.ResponseWriter
// does not implement.
rootMux.HandleFunc("/api/", func(rw http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodPost && strings.HasSuffix(req.URL.Path, "/fleet/scripts/run/sync") {
// when running a script synchronously, we wait a while for a script
// execution result, so the write timeout (to write the response)
// must be extended.
rc := http.NewResponseController(rw)
// add an additional 30 seconds to prevent race conditions where the
// request is terminated early.
if err := rc.SetWriteDeadline(time.Now().Add(scripts.MaxServerWaitTime + (30 * time.Second))); err != nil {
logger.ErrorContext(req.Context(),
"http middleware failed to override endpoint write timeout for script sync run",
"response_writer_type", fmt.Sprintf("%T", rw),
"response_writer", fmt.Sprintf("%+v", rw),
"err", err,
)
}
}
if (req.Method == http.MethodPost && strings.HasSuffix(req.URL.Path, "/fleet/software/package")) ||
(req.Method == http.MethodPatch && strings.HasSuffix(req.URL.Path, "/package") && strings.Contains(req.URL.Path,
"/fleet/software/titles/")) ||
(req.Method == http.MethodPost && strings.HasSuffix(req.URL.Path, "/bootstrap")) ||
(req.Method == http.MethodPost && strings.HasSuffix(req.URL.Path, "/fleet_maintained_apps")) ||
(req.Method == http.MethodGet && strings.Contains(req.URL.Path, "/package/token")) ||
(req.Method == http.MethodPost && strings.Contains(req.URL.Path, "orbit/software_install/package")) {
var zeroTime time.Time
rc := http.NewResponseController(rw)
// For large software installers and bootstrap packages, the server time needs time to read the full
// request body so we use the zero value to remove the deadline and override the
// default read timeout.
// TODO: Is this really how we want to handle this? Or would an arbitrarily long
// timeout be better?
if err := rc.SetReadDeadline(zeroTime); err != nil {
logger.ErrorContext(req.Context(),
"http middleware failed to override endpoint read timeout for software package upload",
"response_writer_type", fmt.Sprintf("%T", rw),
"response_writer", fmt.Sprintf("%+v", rw),
"err", err,
)
}
// For large software installers, the server time needs time to store the
// installer to S3 (or the configured storage location) and write the response
// body so we use the zero value to remove the deadline and override the
// default write timeout.
// TODO: Is this really how we want to handle this? Or would an arbitrarily long
// timeout be better?
if err := rc.SetWriteDeadline(zeroTime); err != nil {
logger.ErrorContext(req.Context(),
"http middleware failed to override endpoint write timeout for software package upload",
"response_writer_type", fmt.Sprintf("%T", rw),
"response_writer", fmt.Sprintf("%+v", rw),
"err", err,
)
}
// We need to add the context value here because we need the installer max size when doing request
// parsing, which happens somewhere where we're only passed the request (and not the service object)
req.Body = http.MaxBytesReader(rw, req.Body, config.Server.MaxInstallerSizeBytes)
req = req.WithContext(installersize.NewContext(req.Context(), config.Server.MaxInstallerSizeBytes))
}
if req.Method == http.MethodGet && strings.HasSuffix(req.URL.Path, "/fleet/android_enterprise/signup_sse") {
// When enabling Android MDM, frontend UI will wait for the admin to finish the setup in Google.
rc := http.NewResponseController(rw)
if err := rc.SetWriteDeadline(time.Now().Add(30 * time.Minute)); err != nil {
logger.ErrorContext(req.Context(),
"http middleware failed to override endpoint write timeout for android enterpriset setup",
"response_writer_type", fmt.Sprintf("%T", rw),
"response_writer", fmt.Sprintf("%+v", rw),
"err", err,
)
}
}
if req.Method == http.MethodPost && strings.HasSuffix(req.URL.Path, "/fleet/mdm/profiles/batch") ||
(req.Method == http.MethodPost && strings.HasSuffix(req.URL.Path, "/fleet/configuration_profiles/batch")) {
// For customers using large profiles and/or large numbers of profiles, the
// server needs time to completely read the request body and also to process
// all the side effects of a potentially large number of profiles being changed
// across a large number of hosts, so set the timeouts a bit higher than default
rc := http.NewResponseController(rw)
if err := rc.SetWriteDeadline(time.Now().Add(5 * time.Minute)); err != nil {
logger.ErrorContext(req.Context(),
"http middleware failed to override endpoint write timeout for MDM profiles batch endpoint",
"response_writer_type", fmt.Sprintf("%T", rw),
"response_writer", fmt.Sprintf("%+v", rw),
"err", err,
)
}
if err := rc.SetReadDeadline(time.Now().Add(5 * time.Minute)); err != nil {
logger.ErrorContext(req.Context(),
"http middleware failed to override endpoint read timeout for MDM profiles batch endpoint",
"response_writer_type", fmt.Sprintf("%T", rw),
"response_writer", fmt.Sprintf("%+v", rw),
"err", err,
)
}
}
apiHandler.ServeHTTP(rw, req)
})
// The `/api/{version}/fleet/scim` base path is used by SCIM handler. In order to route the `details` route to the apiHandler,
// we have to explicitly handle that path at the root. The Go router takes precedence for a more specific path. The v1/latest are used in the path for it to be more specific.
// The Fleet API was designed this way for end-user simplicity.
rootMux.Handle("/api/v1/fleet/scim/details", apiHandler)
rootMux.Handle("/api/latest/fleet/scim/details", apiHandler)
rootMux.Handle("/enroll", otelmw.WrapHandler(endUserEnrollOTAHandler, "/enroll", config))
rootMux.Handle("/", otelmw.WrapHandler(frontendHandler, "/", config))
debugHandler := &debugMux{
fleetAuthenticatedHandler: service.MakeDebugHandler(svc, config, logger, eh, ds),
}
rootMux.Handle("/debug/", otelmw.WrapHandlerDynamic(debugHandler, config))
if debug {
// Add debug endpoints with a random
// authorization token
debugToken, err := server.GenerateRandomText(24)
if err != nil {
initFatal(err, "generating debug token")
}
debugHandler.tokenAuthenticatedHandler = http.StripPrefix("/debug/", netbug.AuthHandler(debugToken))
fmt.Printf("*** Debug mode enabled ***\nAccess the debug endpoints at /debug/?token=%s\n", url.QueryEscape(debugToken))
}
if len(config.Server.URLPrefix) > 0 {
prefixMux := http.NewServeMux()
prefixMux.Handle(config.Server.URLPrefix+"/", http.StripPrefix(config.Server.URLPrefix, rootMux))
rootMux = prefixMux
}
// NOTE(lucas): It seems we missed updating this value from 90s (see #1798) to 25s after we
// decided to make the synchronous live query API to take up to 25 seconds.
// Not changing this to not break any long running requests (like when uploading software
// packages via GitOps).
liveQueryRestPeriod := 90 * time.Second
if v := os.Getenv("FLEET_LIVE_QUERY_REST_PERIOD"); v != "" {
duration, err := time.ParseDuration(v)
if err != nil {
logger.ErrorContext(ctx, "failed to parse live query rest period", "err", err)
} else {
liveQueryRestPeriod = duration
}
}
// The "GET /api/latest/fleet/queries/run" API requires
// WriteTimeout to be higher than the live query rest period
// (otherwise the response is not sent back to the client).
//
// We add 10s to the live query rest period to allow the writing
// of the response.
liveQueryRestPeriod += 10 * time.Second
// Create the handler based on whether tracing should be there
var handler http.Handler
if config.Logging.TracingEnabled && config.Logging.TracingType == "elasticapm" {
handler = launcher.Handler(apmhttp.Wrap(rootMux))
} else {
handler = launcher.Handler(rootMux)
}
srv := config.Server.DefaultHTTPServer(ctx, handler)
if liveQueryRestPeriod > srv.WriteTimeout {
srv.WriteTimeout = liveQueryRestPeriod
}
srv.SetKeepAlivesEnabled(config.Server.Keepalive)
errs := make(chan error, 2)
go func() {
if !config.Server.TLS {
logger.InfoContext(ctx, "listening", "transport", "http", "address", config.Server.Address)
errs <- srv.ListenAndServe()
} else {
logger.InfoContext(ctx, "listening", "transport", "https", "address", config.Server.Address)
srv.TLSConfig = getTLSConfig(config.Server.TLSProfile)
errs <- srv.ListenAndServeTLS(
config.Server.Cert,
config.Server.Key,
)
}
}()
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sig:
case <-dbFatalCh:
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
errs <- func() error {
cancelFunc()
cleanupCronStatsOnShutdown(ctx, ds, logger, instanceID)
launcher.GracefulStop()
// Flush any pending OTEL data before shutting down
if tracerProvider != nil {
if err := tracerProvider.Shutdown(ctx); err != nil {
logger.ErrorContext(ctx, "failed to shutdown OTEL tracer provider", "err", err)
}
}
if meterProvider != nil {
if err := meterProvider.Shutdown(ctx); err != nil {
logger.ErrorContext(ctx, "failed to shutdown OTEL meter provider", "err", err)
}
}
if loggerProvider != nil {
if err := loggerProvider.Shutdown(ctx); err != nil {
logger.ErrorContext(ctx, "failed to shutdown OTEL logger provider", "err", err)
}
}
return srv.Shutdown(ctx)
}()
}()
// block on errs signal
logger.InfoContext(ctx, "terminated", "err", <-errs)
}
// acmeCSRSigner adapts a depot.Signer to the acme.CSRSigner interface.
type acmeCSRSigner struct {
signer *scepdepot.Signer
}
func (a *acmeCSRSigner) SignCSR(_ context.Context, csr *x509.CertificateRequest) (*x509.Certificate, error) {
return a.signer.Signx509CSR(csr)
}
func createACMEServiceModule(ds fleet.Datastore, dbConns *common_mysql.DBConnections, redisPool fleet.RedisPool, logger *slog.Logger, csrSigner acme.CSRSigner) (acme_api.Service, endpointer.HandlerRoutesFunc) {
providers := acmeacl.NewFleetDatastoreAdapter(ds, csrSigner)
acmeSvc, acmeRoutesFn := acme_bootstrap.New(dbConns, redisPool, providers, logger)
acmeRoutes := acmeRoutesFn(log.Logged)
return acmeSvc, acmeRoutes
}
func createActivityBoundedContext(svc fleet.Service, dbConns *common_mysql.DBConnections, logger *slog.Logger) (activity_api.Service, endpointer.HandlerRoutesFunc) {
legacyAuthorizer, err := authz.NewAuthorizer()
if err != nil {
initFatal(err, "initializing activity authorizer")
}
activityAuthorizer := authz.NewAuthorizerAdapter(legacyAuthorizer)
activityACLAdapter := activityacl.NewFleetServiceAdapter(svc)
activitySvc, activityRoutesFn := activity_bootstrap.New(
dbConns,
activityAuthorizer,
activityACLAdapter,
logger,
)
// Create auth middleware for activity bounded context
activityAuthMiddleware := func(next endpoint.Endpoint) endpoint.Endpoint {
return auth.AuthenticatedUser(svc, next)
}
activityRoutes := activityRoutesFn(activityAuthMiddleware)
return activitySvc, activityRoutes
}
func printDatabaseNotInitializedError() {
fmt.Printf("################################################################################\n"+
"# ERROR:\n"+
"# Your Fleet database is not initialized. Fleet cannot start up.\n"+
"#\n"+
"# Run `%s prepare db` to initialize the database.\n"+
"################################################################################\n",
os.Args[0])
}
func printMissingMigrationsWarning(tables []int64, data []int64) {
fmt.Printf("################################################################################\n"+
"# WARNING:\n"+
"# Your Fleet database is missing required migrations. This is likely to cause\n"+
"# errors in Fleet.\n"+
"#\n"+
"# Missing migrations: %s.\n"+
"#\n"+
"# Run `%s prepare db` to perform migrations.\n"+
"#\n"+
"# To run the server without performing migrations:\n"+
"# - Set environment variable FLEET_UPGRADES_ALLOW_MISSING_MIGRATIONS=1, or,\n"+
"# - Set config updates.allow_missing_migrations to true, or,\n"+
"# - Use command line argument --upgrades_allow_missing_migrations=true\n"+
"################################################################################\n",
tablesAndDataToString(tables, data), os.Args[0])
}
func printFleetv4732FixNeededMessage() {
fmt.Printf("################################################################################\n"+
"# WARNING:\n"+
"# Your Fleet database has misnumbered migrations introduced in some released\n"+
"# v4.73.2 artifacts. Fleet will automatically perform this fix prior to database\n"+
"# migrations. Please back up your data before continuing.\n"+
"#\n"+
"# Run `%s prepare db` to perform migrations.\n"+
"#\n"+
"# To run the server without performing migrations:\n"+
"# - Set environment variable FLEET_UPGRADES_ALLOW_MISSING_MIGRATIONS=1, or,\n"+
"# - Set config updates.allow_missing_migrations to true, or,\n"+
"# - Use command line argument --upgrades_allow_missing_migrations=true\n"+
"################################################################################\n", os.Args[0])
}
func initLicense(config *configpkg.FleetConfig, devLicense, devExpiredLicense bool) (*fleet.LicenseInfo, error) {
if devLicense {
// This license key is valid for development only
config.License.Key = "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJGbGVldCBEZXZpY2UgTWFuYWdlbWVudCBJbmMuIiwiZXhwIjoxNzgyNzc3NjAwLCJzdWIiOiJGbGVldCBEZXZpY2UgTWFuYWdlbWVudCwgSW5jLiBEZXZlbG9wZXIiLCJkZXZpY2VzIjoxMDAwLCJub3RlIjoiQ3JlYXRlZCB3aXRoIEZsZWV0IExpY2Vuc2Uga2V5IGRpc3BlbnNlciIsInRpZXIiOiJwcmVtaXVtIiwiaWF0IjoxNzY3MjAzODg2fQ.X9O3CXJOzIfgkzlXgL45iBaSvAbZyQn4UjcvH_gEXJGIQw0xMW4r3tJBSEuUqQXoaQnADVR1Oocfp6j_hMZX0A"
} else if devExpiredLicense {
// An expired license key
config.License.Key = "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJGbGVldCBEZXZpY2UgTWFuYWdlbWVudCBJbmMuIiwiZXhwIjoxNjI5NzYzMjAwLCJzdWIiOiJEZXYgbGljZW5zZSAoZXhwaXJlZCkiLCJkZXZpY2VzIjo1MDAwMDAsIm5vdGUiOiJUaGlzIGxpY2Vuc2UgaXMgdXNlZCB0byBmb3IgZGV2ZWxvcG1lbnQgcHVycG9zZXMuIiwidGllciI6ImJhc2ljIiwiaWF0IjoxNjI5OTA0NzMyfQ.AOppRkl1Mlc_dYKH9zwRqaTcL0_bQzs7RM3WSmxd3PeCH9CxJREfXma8gm0Iand6uIWw8gHq5Dn0Ivtv80xKvQ"
}
return licensing.LoadLicense(config.License.Key)
}
// basicAuthHandler wraps the given handler behind HTTP Basic Auth.
func basicAuthHandler(username, password string, next http.Handler) http.HandlerFunc {
hashFn := func(s string) []byte {
h := sha256.Sum256([]byte(s))
return h[:]
}
expectedUsernameHash := hashFn(username)
expectedPasswordHash := hashFn(password)
return func(w http.ResponseWriter, r *http.Request) {
recvUsername, recvPassword, ok := r.BasicAuth()
if ok {
usernameMatch := subtle.ConstantTimeCompare(hashFn(recvUsername), expectedUsernameHash) == 1
passwordMatch := subtle.ConstantTimeCompare(hashFn(recvPassword), expectedPasswordHash) == 1
if usernameMatch && passwordMatch {
next.ServeHTTP(w, r)
return
}
}
w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
}
}
// Support for TLS security profiles, we set up the TLS configuation based on
// value supplied to server_tls_compatibility command line flag. The default
// profile is 'modern'.
// See https://wiki.mozilla.org/index.php?title=Security/Server_Side_TLS&oldid=1229478
func getTLSConfig(profile string) *tls.Config {
cfg := tls.Config{
PreferServerCipherSuites: true,
}
switch profile {
case configpkg.TLSProfileModern:
cfg.MinVersion = tls.VersionTLS13
cfg.CurvePreferences = append(cfg.CurvePreferences,
tls.X25519,
tls.CurveP256,
tls.CurveP384,
)
cfg.CipherSuites = append(cfg.CipherSuites,
tls.TLS_AES_128_GCM_SHA256,
tls.TLS_AES_256_GCM_SHA384,
tls.TLS_CHACHA20_POLY1305_SHA256,
// These cipher suites not explicitly listed by Mozilla, but
// required by Go's HTTP/2 implementation
// See: https://go-review.googlesource.com/c/net/+/200317/
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
)
case configpkg.TLSProfileIntermediate:
cfg.MinVersion = tls.VersionTLS12
cfg.CurvePreferences = append(cfg.CurvePreferences,
tls.X25519,
tls.CurveP256,
tls.CurveP384,
)
cfg.CipherSuites = append(cfg.CipherSuites,
tls.TLS_AES_128_GCM_SHA256,
tls.TLS_AES_256_GCM_SHA384,
tls.TLS_CHACHA20_POLY1305_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
)
default:
initFatal(
fmt.Errorf("%s is invalid", profile),
"set TLS profile",
)
}
return &cfg
}
// devSQLInterceptor is a sql interceptor to be used for development purposes.
type devSQLInterceptor struct {
sqlmw.NullInterceptor
logger *slog.Logger
}
func (in *devSQLInterceptor) ConnQueryContext(ctx context.Context, conn driver.QueryerContext, query string, args []driver.NamedValue) (driver.Rows, error) {
start := time.Now()
rows, err := conn.QueryContext(ctx, query, args)
in.logQuery(ctx, start, query, args, err)
return rows, err
}
func (in *devSQLInterceptor) StmtQueryContext(ctx context.Context, stmt driver.StmtQueryContext, query string, args []driver.NamedValue) (driver.Rows, error) {
start := time.Now()
rows, err := stmt.QueryContext(ctx, args)
in.logQuery(ctx, start, query, args, err)
return rows, err
}
func (in *devSQLInterceptor) StmtExecContext(ctx context.Context, stmt driver.StmtExecContext, query string, args []driver.NamedValue) (driver.Result, error) {
start := time.Now()
result, err := stmt.ExecContext(ctx, args)
in.logQuery(ctx, start, query, args, err)
return result, err
}
var spaceRegex = regexp.MustCompile(`\s+`)
func (in *devSQLInterceptor) logQuery(ctx context.Context, start time.Time, query string, args []driver.NamedValue, err error) {
query = strings.TrimSpace(spaceRegex.ReplaceAllString(query, " "))
if err != nil {
in.logger.ErrorContext(ctx, "sql query", "duration", time.Since(start), "query", query, "args", argsToString(args), "err", err)
} else {
in.logger.DebugContext(ctx, "sql query", "duration", time.Since(start), "query", query, "args", argsToString(args))
}
}
func argsToString(args []driver.NamedValue) string {
var allArgs strings.Builder
allArgs.WriteString("{")
for i, arg := range args {
if i > 0 {
allArgs.WriteString(", ")
}
if arg.Name != "" {
allArgs.WriteString(fmt.Sprintf("%s=", arg.Name))
}
allArgs.WriteString(fmt.Sprintf("%v", arg.Value))
}
allArgs.WriteString("}")
return allArgs.String()
}
// The debugMux directs the request to either the fleet-authenticated handler,
// which is the standard handler for debug endpoints (using a Fleet
// authorization bearer token), or to the token-authenticated handler if a
// query-string token is provided and such a handler is set. The only wayt to
// set this handler is if the --debug flag was provided to the fleet serve
// command.
type debugMux struct {
fleetAuthenticatedHandler http.Handler
tokenAuthenticatedHandler http.Handler
}
func (m *debugMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Query().Has("token") && m.tokenAuthenticatedHandler != nil {
m.tokenAuthenticatedHandler.ServeHTTP(w, r)
return
}
m.fleetAuthenticatedHandler.ServeHTTP(w, r)
}
// nopPusher is a no-op push.Pusher.
type nopPusher struct{}
var _ push.Pusher = nopPusher{}
// Push implements push.Pusher.
func (n nopPusher) Push(context.Context, []string) (map[string]*push.Response, error) {
return nil, nil
}
func createTestBuckets(ctx context.Context, config *configpkg.FleetConfig, logger *slog.Logger) {
softwareInstallerStore, err := s3.NewSoftwareInstallerStore(config.S3)
if err != nil {
initFatal(err, "initializing S3 software installer store")
}
if err := softwareInstallerStore.CreateTestBucket(ctx, config.S3.SoftwareInstallersBucket); err != nil {
// Don't panic, allow devs to run Fleet without S3 dependency.
logger.InfoContext(ctx, "failed to create test software installer bucket",
"err", err,
"name", config.S3.SoftwareInstallersBucket,
)
}
carveStore, err := s3.NewCarveStore(config.S3, nil)
if err != nil {
initFatal(err, "initializing S3 carve store")
}
if err := carveStore.CreateTestBucket(ctx, config.S3.CarvesBucket); err != nil {
// Don't panic, allow devs to run Fleet without S3 dependency.
logger.InfoContext(ctx, "failed to create test carve bucket",
"err", err,
"name", config.S3.CarvesBucket,
)
}
}