From e1bbcfcfdadc5ec6cbdc8b906776a762ef67d1e6 Mon Sep 17 00:00:00 2001 From: Lucas Manuel Rodriguez Date: Fri, 23 Dec 2022 19:04:13 -0300 Subject: [PATCH] Generate audit logs for activities (#9001) * Generate audit logs for activities * Fix config tests * Fix TestGetConfig/IncludeServerConfig * Fix use of AddAttributes in results only * Stream activities asynchronously * Fix index and add logging * Revert change * Documentation fixes --- changes/issue-8416-stream-activities | 1 + cmd/fleet/cron.go | 99 ++++++++ cmd/fleet/serve.go | 96 +++++++- cmd/fleet/serve_test.go | 147 ++++++++++++ cmd/fleetctl/get_test.go | 26 ++- .../Testing-and-local-development.md | 48 ++-- docs/Deploying/Configuration.md | 217 +++++++++++++++--- docs/Using-Fleet/Log-destinations.md | 39 ++-- ee/server/service/service.go | 2 +- ee/server/service/teams.go | 11 +- ee/server/service/users.go | 8 +- server/config/config.go | 88 +++++-- server/datastore/mysql/activities.go | 42 +++- server/datastore/mysql/activities_test.go | 93 ++++++-- ...220195935_AddStreamedColumnToActivities.go | 29 +++ server/datastore/mysql/schema.sql | 6 +- server/fleet/activities.go | 1 + server/fleet/app.go | 11 + server/fleet/cron_schedules.go | 1 + server/fleet/datastore.go | 3 +- server/fleet/service.go | 2 +- server/logging/filesystem.go | 11 +- server/logging/logging.go | 211 ++++++++--------- server/mock/datastore_mock.go | 14 +- server/service/activities.go | 6 +- server/service/activities_test.go | 10 +- server/service/appconfig.go | 2 +- server/service/campaigns.go | 2 +- server/service/global_policies.go | 11 +- server/service/integration_core_test.go | 2 +- server/service/osquery_test.go | 5 +- server/service/packs.go | 25 +- server/service/queries.go | 33 +-- server/service/service.go | 17 +- server/service/service_appconfig.go | 185 +++++++-------- server/service/service_appconfig_test.go | 33 ++- server/service/team_policies.go | 8 +- server/service/testing_utils.go | 18 +- 38 files changed, 1175 insertions(+), 388 deletions(-) create mode 100644 changes/issue-8416-stream-activities create mode 100644 server/datastore/mysql/migrations/tables/20221220195935_AddStreamedColumnToActivities.go diff --git a/changes/issue-8416-stream-activities b/changes/issue-8416-stream-activities new file mode 100644 index 0000000000..a955f816dc --- /dev/null +++ b/changes/issue-8416-stream-activities @@ -0,0 +1 @@ +* Generate audit log for activities (supported log plugins are: `filesystem`, `firehose`, `kinesis`, `lambda`, `pubsub`, `kafkarest`, and `stdout`). diff --git a/cmd/fleet/cron.go b/cmd/fleet/cron.go index a57c1b2215..7582012628 100644 --- a/cmd/fleet/cron.go +++ b/cmd/fleet/cron.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "net/url" "os" @@ -19,6 +20,7 @@ import ( "github.com/fleetdm/fleet/v4/server/fleet" apple_mdm "github.com/fleetdm/fleet/v4/server/mdm/apple" "github.com/fleetdm/fleet/v4/server/policies" + "github.com/fleetdm/fleet/v4/server/ptr" "github.com/fleetdm/fleet/v4/server/service/externalsvc" "github.com/fleetdm/fleet/v4/server/service/schedule" "github.com/fleetdm/fleet/v4/server/vulnerabilities/msrc" @@ -30,6 +32,7 @@ import ( "github.com/getsentry/sentry-go" kitlog "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/hashicorp/go-multierror" "github.com/micromdm/nanodep/godep" nanodep_log "github.com/micromdm/nanodep/log" depsync "github.com/micromdm/nanodep/sync" @@ -899,3 +902,99 @@ func cleanupCronStatsOnShutdown(ctx context.Context, ds fleet.Datastore, logger logger.Log("err", "cancel pending cron stats for instance", "details", err) } } + +func newActivitiesStreamingSchedule( + ctx context.Context, + instanceID string, + ds fleet.Datastore, + logger kitlog.Logger, + auditLogger fleet.JSONLogger, +) (*schedule.Schedule, error) { + const ( + name = string(fleet.CronActivitiesStreaming) + interval = 5 * time.Minute + ) + logger = kitlog.With(logger, "cron", name) + s := schedule.New( + ctx, name, instanceID, interval, ds, ds, + schedule.WithLogger(logger), + schedule.WithJob( + "cron_activities_streaming", + func(ctx context.Context) error { + return cronActivitiesStreaming(ctx, ds, logger, auditLogger) + }, + ), + ) + return s, nil +} + +var ActivitiesToStreamBatchCount uint = 500 + +func cronActivitiesStreaming( + ctx context.Context, + ds fleet.Datastore, + logger kitlog.Logger, + auditLogger fleet.JSONLogger, +) error { + page := uint(0) + for { + // (1) Get batch of activities that haven't been streamed. + activitiesToStream, err := ds.ListActivities(ctx, fleet.ListActivitiesOptions{ + ListOptions: fleet.ListOptions{ + OrderKey: "id", + OrderDirection: fleet.OrderAscending, + PerPage: ActivitiesToStreamBatchCount, + Page: page, + }, + Streamed: ptr.Bool(false), + }) + if err != nil { + return ctxerr.Wrap(ctx, err, "list activities") + } + if len(activitiesToStream) == 0 { + return nil + } + + // (2) Stream the activities. + var ( + streamedIDs []uint + multiErr error + ) + // We stream one activity at a time (instead of writing them all with + // one auditLogger.Write call) to know which ones succeeded/failed, + // and also because this method happens asynchronously, + // so we don't need real-time performance. + for _, activity := range activitiesToStream { + b, err := json.Marshal(activity) + if err != nil { + return ctxerr.Wrap(ctx, err, "marshal activity") + } + if err := auditLogger.Write(ctx, []json.RawMessage{json.RawMessage(b)}); err != nil { + if len(streamedIDs) == 0 { + return ctxerr.Wrapf(ctx, err, "stream first activity: %d", activity.ID) + } + multiErr = multierror.Append(multiErr, ctxerr.Wrapf(ctx, err, "stream activity: %d", activity.ID)) + // We stop streaming upon the first error (will retry on next cron iteration) + break + } + streamedIDs = append(streamedIDs, activity.ID) + } + + logger.Log("streamed-events", len(streamedIDs)) + + // (3) Mark the streamed activities as streamed. + if err := ds.MarkActivitiesAsStreamed(ctx, streamedIDs); err != nil { + multiErr = multierror.Append(multiErr, ctxerr.Wrap(ctx, err, "mark activities as streamed")) + } + + // If there was an error while streaming or updating activities, return. + if multiErr != nil { + return multiErr + } + + if len(activitiesToStream) < int(ActivitiesToStreamBatchCount) { + return nil + } + page += 1 + } +} diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index 6164574d8a..c25cf0d308 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -353,9 +353,88 @@ the way that the Fleet server works. liveQueryStore := live_query.NewRedisLiveQuery(redisPool) ssoSessionStore := sso.NewSessionStore(redisPool) - osqueryLogger, err := logging.New(config, logger) + // Set common configuration for all logging. + loggingConfig := logging.Config{ + Filesystem: logging.FilesystemConfig{ + EnableLogRotation: config.Filesystem.EnableLogRotation, + EnableLogCompression: config.Filesystem.EnableLogCompression, + }, + Firehose: logging.FirehoseConfig{ + Region: config.Firehose.Region, + EndpointURL: config.Firehose.EndpointURL, + AccessKeyID: config.Firehose.AccessKeyID, + SecretAccessKey: config.Firehose.SecretAccessKey, + StsAssumeRoleArn: config.Firehose.StsAssumeRoleArn, + }, + Kinesis: logging.KinesisConfig{ + Region: config.Kinesis.Region, + EndpointURL: config.Kinesis.EndpointURL, + AccessKeyID: config.Kinesis.AccessKeyID, + SecretAccessKey: config.Kinesis.SecretAccessKey, + StsAssumeRoleArn: config.Kinesis.StsAssumeRoleArn, + }, + Lambda: logging.LambdaConfig{ + Region: config.Lambda.Region, + AccessKeyID: config.Lambda.AccessKeyID, + SecretAccessKey: config.Lambda.SecretAccessKey, + StsAssumeRoleArn: config.Lambda.StsAssumeRoleArn, + }, + PubSub: logging.PubSubConfig{ + Project: config.PubSub.Project, + }, + KafkaREST: logging.KafkaRESTConfig{ + ProxyHost: config.KafkaREST.ProxyHost, + ContentTypeValue: config.KafkaREST.ContentTypeValue, + Timeout: config.KafkaREST.Timeout, + }, + } + + // Set specific configuration to osqueryd status logs. + loggingConfig.Plugin = config.Osquery.StatusLogPlugin + loggingConfig.Filesystem.LogFile = config.Filesystem.StatusLogFile + loggingConfig.Firehose.StreamName = config.Firehose.StatusStream + loggingConfig.Kinesis.StreamName = config.Kinesis.StatusStream + loggingConfig.Lambda.Function = config.Lambda.StatusFunction + loggingConfig.PubSub.Topic = config.PubSub.StatusTopic + loggingConfig.PubSub.AddAttributes = false // only used by result logs + loggingConfig.KafkaREST.Topic = config.KafkaREST.StatusTopic + + osquerydStatusLogger, err := logging.NewJSONLogger("status", loggingConfig, logger) if err != nil { - initFatal(err, "initializing osquery logging") + initFatal(err, "initializing osqueryd status logging") + } + + // Set specific configuration to osqueryd result logs. + loggingConfig.Plugin = config.Osquery.ResultLogPlugin + loggingConfig.Filesystem.LogFile = config.Filesystem.ResultLogFile + loggingConfig.Firehose.StreamName = config.Firehose.ResultStream + loggingConfig.Kinesis.StreamName = config.Kinesis.ResultStream + loggingConfig.Lambda.Function = config.Lambda.ResultFunction + loggingConfig.PubSub.Topic = config.PubSub.ResultTopic + loggingConfig.PubSub.AddAttributes = config.PubSub.AddAttributes + loggingConfig.KafkaREST.Topic = config.KafkaREST.ResultTopic + + osquerydResultLogger, err := logging.NewJSONLogger("result", loggingConfig, logger) + if err != nil { + initFatal(err, "initializing osqueryd result logging") + } + + var auditLogger fleet.JSONLogger + if license.IsPremium() && config.Activity.EnableAuditLog { + // Set specific configuration to audit logs. + loggingConfig.Plugin = config.Activity.AuditLogPlugin + loggingConfig.Filesystem.LogFile = config.Filesystem.AuditLogFile + loggingConfig.Firehose.StreamName = config.Firehose.AuditStream + loggingConfig.Kinesis.StreamName = config.Kinesis.AuditStream + loggingConfig.Lambda.Function = config.Lambda.AuditFunction + loggingConfig.PubSub.Topic = config.PubSub.AuditTopic + loggingConfig.PubSub.AddAttributes = false // only used by result logs + loggingConfig.KafkaREST.Topic = config.KafkaREST.AuditTopic + + auditLogger, err = logging.NewJSONLogger("audit", loggingConfig, logger) + if err != nil { + initFatal(err, "initializing audit logging") + } } failingPolicySet := redis_policy_set.NewFailing(redisPool) @@ -492,7 +571,10 @@ the way that the Fleet server works. task, resultStore, logger, - osqueryLogger, + &service.OsqueryLogger{ + Status: osquerydStatusLogger, + Result: osquerydResultLogger, + }, config, mailService, clock.C, @@ -563,6 +645,14 @@ the way that the Fleet server works. } } + if license.IsPremium() && config.Activity.EnableAuditLog { + if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) { + return newActivitiesStreamingSchedule(ctx, instanceID, ds, logger, auditLogger) + }); err != nil { + initFatal(err, "failed to register activities streaming schedule") + } + } + level.Info(logger).Log("msg", fmt.Sprintf("started cron schedules: %s", strings.Join(cronSchedules.ScheduleNames(), ", "))) // StartCollectors starts a goroutine per collector, using ctx to cancel. diff --git a/cmd/fleet/serve_test.go b/cmd/fleet/serve_test.go index 0ef30c1bba..0799b85abd 100644 --- a/cmd/fleet/serve_test.go +++ b/cmd/fleet/serve_test.go @@ -25,6 +25,7 @@ import ( kitlog "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/go-kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -825,3 +826,149 @@ func TestDebugMux(t *testing.T) { }) } } + +func TestCronActivitiesStreaming(t *testing.T) { + ds := new(mock.Store) + + newActivity := func( + id uint, + actorName string, + actorID uint, + actorGravatar, actorEmail, actType string, + details string, + ) *fleet.Activity { + jsonRawMessage := json.RawMessage(details) + return &fleet.Activity{ + ID: id, + ActorFullName: actorName, + ActorID: &actorID, + ActorGravatar: &actorGravatar, + ActorEmail: &actorEmail, + Type: actType, + Details: &jsonRawMessage, + } + } + + a1 := newActivity(1, "foo1", 7, "foo1_gravatar", "foo1_email", "foobar1", `{"foo1":"bar1"}`) + a2 := newActivity(2, "foo2", 8, "foo2_gravatar", "foo2_email", "foobar2", `{"foo2":"bar2"}`) + a3 := newActivity(3, "foo3", 9, "foo3_gravatar", "foo3_email", "foobar3", `{"foo3":"bar3"}`) + + t.Run("basic", func(t *testing.T) { + as := []*fleet.Activity{a1, a2, a3} + + ds.ListActivitiesFunc = func(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) { + return as, nil + } + + ds.MarkActivitiesAsStreamedFunc = func(ctx context.Context, activityIDs []uint) error { + require.Equal(t, []uint{1, 2, 3}, activityIDs) + return nil + } + + var auditLogger jsonLogger + err := cronActivitiesStreaming(context.Background(), ds, log.NewNopLogger(), &auditLogger) + require.NoError(t, err) + require.Len(t, auditLogger.logs, 3) + for i, m := range auditLogger.logs { + var a *fleet.Activity + err := json.Unmarshal([]byte(m), &a) + require.NoError(t, err) + require.Equal(t, as[i], a) + } + }) + + t.Run("fail_to_stream_an_activity", func(t *testing.T) { + as := []*fleet.Activity{a1, a2, a3} + + ds.ListActivitiesFunc = func(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) { + return as, nil + } + + ds.MarkActivitiesAsStreamedFunc = func(ctx context.Context, activityIDs []uint) error { + require.Equal(t, []uint{1}, activityIDs) + return nil + } + + auditLogger := jsonLogger{failAfter: 1} + err := cronActivitiesStreaming(context.Background(), ds, log.NewNopLogger(), &auditLogger) + require.Error(t, err) + require.ErrorIs(t, err, errStreamFailed) + require.Len(t, auditLogger.logs, 1) + var a *fleet.Activity + err = json.Unmarshal([]byte(auditLogger.logs[0]), &a) + require.NoError(t, err) + require.Equal(t, a1, a) + }) + + t.Run("bigger_than_batch", func(t *testing.T) { + // Make slice that will require three iterations (3 pages, + // two pages of ActivitiesToStreamBatchCount and one extra page of one item. + as := make([]*fleet.Activity, ActivitiesToStreamBatchCount*2+1) + for i := range as { + as[i] = newActivity(uint(i), "foo", uint(i), "foog", "fooe", "bar", `{"bar": "foo"}`) + } + + ds.ListActivitiesFunc = func(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) { + require.Equal(t, opt.PerPage, ActivitiesToStreamBatchCount) + switch opt.Page { + case 0: + return as[:ActivitiesToStreamBatchCount], nil + case 1: + return as[ActivitiesToStreamBatchCount : ActivitiesToStreamBatchCount*2], nil + case 2: + return as[ActivitiesToStreamBatchCount*2:], nil + default: + t.Fatalf("invalid page requested: %d", opt.Page) + return nil, nil + } + } + + call := 0 + firstBatch := make([]uint, ActivitiesToStreamBatchCount) + secondBatch := make([]uint, ActivitiesToStreamBatchCount) + for i := range as[:ActivitiesToStreamBatchCount] { + firstBatch[i] = as[i].ID + } + for i := range as[ActivitiesToStreamBatchCount : ActivitiesToStreamBatchCount*2] { + secondBatch[i] = as[int(ActivitiesToStreamBatchCount)+i].ID + } + thirdBatch := []uint{as[len(as)-1].ID} + ds.MarkActivitiesAsStreamedFunc = func(ctx context.Context, activityIDs []uint) error { + switch call { + case 0: + require.Equal(t, firstBatch, activityIDs) + case 1: + require.Equal(t, secondBatch, activityIDs) + case 2: + require.Equal(t, thirdBatch, activityIDs) + default: + t.Fatalf("invalid number of calls: %d", call) + } + call += 1 + return nil + } + + var auditLogger jsonLogger + err := cronActivitiesStreaming(context.Background(), ds, log.NewNopLogger(), &auditLogger) + require.NoError(t, err) + require.Len(t, auditLogger.logs, int(ActivitiesToStreamBatchCount)*2+1) + require.Equal(t, 3, call) + }) +} + +var errStreamFailed = errors.New("streaming failed") + +type jsonLogger struct { + logs []string + failAfter int +} + +func (j *jsonLogger) Write(ctx context.Context, logs []json.RawMessage) error { + for _, log := range logs { + if j.failAfter > 0 && len(j.logs) == j.failAfter { + return errStreamFailed + } + j.logs = append(j.logs, string(log)) + } + return nil +} diff --git a/cmd/fleetctl/get_test.go b/cmd/fleetctl/get_test.go index 6f63dd93f6..8a34f05383 100644 --- a/cmd/fleetctl/get_test.go +++ b/cmd/fleetctl/get_test.go @@ -630,6 +630,7 @@ spec: enable_log_rotation: false result_log_file: /dev/null status_log_file: /dev/null + audit_log_file: /dev/null plugin: filesystem status: config: @@ -637,6 +638,15 @@ spec: enable_log_rotation: false result_log_file: /dev/null status_log_file: /dev/null + audit_log_file: /dev/null + plugin: filesystem + audit: + config: + enable_log_compression: false + enable_log_rotation: false + result_log_file: /dev/null + status_log_file: /dev/null + audit_log_file: /dev/null plugin: filesystem org_info: org_logo_url: "" @@ -809,7 +819,8 @@ spec: "enable_log_compression": false, "enable_log_rotation": false, "result_log_file": "/dev/null", - "status_log_file": "/dev/null" + "status_log_file": "/dev/null", + "audit_log_file": "/dev/null" } }, "status": { @@ -818,7 +829,18 @@ spec: "enable_log_compression": false, "enable_log_rotation": false, "result_log_file": "/dev/null", - "status_log_file": "/dev/null" + "status_log_file": "/dev/null", + "audit_log_file": "/dev/null" + } + }, + "audit": { + "plugin": "filesystem", + "config": { + "enable_log_compression": false, + "enable_log_rotation": false, + "result_log_file": "/dev/null", + "status_log_file": "/dev/null", + "audit_log_file": "/dev/null" } } } diff --git a/docs/Contributing/Testing-and-local-development.md b/docs/Contributing/Testing-and-local-development.md index 0856538fd1..3b5c7bba5c 100644 --- a/docs/Contributing/Testing-and-local-development.md +++ b/docs/Contributing/Testing-and-local-development.md @@ -1,19 +1,33 @@ # Testing and local development -- [License key](#license-key) -- [Simulated hosts](#hosts) -- [Test suite](#test-suite) -- [End-to-end tests](#end-to-end-tests) -- [Test hosts](#test-hosts) -- [Email](#email) -- [Database backup/restore](#database-backuprestore) -- [Seeding Data](https://fleetdm.com/docs/contributing/seeding-data) -- [MySQL shell](#mysql-shell) -- [Redis REPL](#redis-repl) -- [Testing SSO](#testing-sso) -- [Testing Kinesis Logging](#testing-kinesis-logging) -- [Testing pre-built installers](#testing-pre-built-installers) -- [Telemetry](#telemetry) +- [Testing and local development](#testing-and-local-development) + - [License key](#license-key) + - [Simulated hosts](#simulated-hosts) + - [Test suite](#test-suite) + - [Go unit tests](#go-unit-tests) + - [Go linters](#go-linters) + - [Javascript unit and integration tests](#javascript-unit-and-integration-tests) + - [Javascript linters](#javascript-linters) + - [MySQL tests](#mysql-tests) + - [Email tests](#email-tests) + - [Network tests](#network-tests) + - [Viewing test coverage](#viewing-test-coverage) + - [End-to-end tests](#end-to-end-tests) + - [Preparation](#preparation) + - [Run tests](#run-tests) + - [Interactive](#interactive) + - [Command line](#command-line) + - [Test hosts](#test-hosts) + - [Email](#email) + - [Manually testing email with MailHog](#manually-testing-email-with-mailhog) + - [Development database management](#development-database-management) + - [MySQL shell](#mysql-shell) + - [Redis REPL](#redis-repl) + - [Testing SSO](#testing-sso) + - [Configuration](#configuration) + - [Testing Kinesis Logging](#testing-kinesis-logging) + - [Testing pre-built installers](#testing-pre-built-installers) + - [Telemetry](#telemetry) ## License key @@ -404,6 +418,12 @@ awslocal kinesis get-records --shard-iterator AAAAAAAAAAERtiUrWGI0sq99TtpKnmDu6h [...] ``` +The `Data` field is base64 encoded. You can use the following command to decode: +``` +echo eyJob3N0SWRlbnRpZmllciI6Ijg3OGE2ZWRmLTcxMzEtNGUyOC05NWEyLWQzNDQ5MDVjYWNhYiIsImNhbGVuZGFyVGltZSI6IldlZCBNYXIgIDIgMjI6MDI6NTQgMjAyMiBVVEMiLCJ1bml4VGltZSI6IjE2NDYyNTg1NzQiLCJzZXZlcml0eSI6IjAiLCJmaWxlbmFtZSI6Imdsb2dfbG9nZ2VyLmNwcCIsImxpbmUiOiI0OSIsIm1lc3NhZ2UiOiJDb3VsZCBub3QgZ2V0IFJQTSBoZWFkZXIgZmxhZy4iLCJ2ZXJzaW9uIjoiNC45LjAiLCJkZWNvcmF0aW9ucyI6eyJob3N0X3V1aWQiOiJlYjM5NDZiMi0wMDAwLTAwMDAtYjg4OC0yNTkxYTFiNjY2ZTkiLCJob3N0bmFtZSI6ImUwMDg4ZDI4YTYzZiJ9fQo= | base64 -d +{"hostIdentifier":"878a6edf-7131-4e28-95a2-d344905cacab","calendarTime":"Wed Mar 2 22:02:54 2022 UTC","unixTime":"1646258574","severity":"0","filename":"glog_logger.cpp","line":"49","message":"Could not get RPM header flag.","version":"4.9.0","decorations":{"host_uuid":"eb3946b2-0000-0000-b888-2591a1b666e9","hostname":"e0088d28a63f"}} +``` + ## Testing pre-built installers Pre-built installers are kept in a blob storage like AWS S3. As part of your your local development there's a [MinIO](https://min.io/) instance running on http://localhost:9000. To test the pre-built installers functionality locally: diff --git a/docs/Deploying/Configuration.md b/docs/Deploying/Configuration.md index 941c55a26d..40bb458e4d 100644 --- a/docs/Deploying/Configuration.md +++ b/docs/Deploying/Configuration.md @@ -1179,6 +1179,36 @@ osquery: result_log_plugin: firehose ``` +#### activity_enable_audit_log + +This enables/disables the log output for audit events. +See the `activity_audit_log_plugin` option below that specifies the logging destination. + +The audit events are logged in an asynchronous fashion. It can take up to 5 minutes for an event to be logged. + +- Default value: `false` +- Environment variable: `FLEET_ACTIVITY_ENABLE_AUDIT_LOG` +- Config file format: + ```yaml + activity: + enable_audit_log: true + ``` + +#### activity_audit_log_plugin + +This is the log output plugin that should be used for audit logs. +This flag only has effect if `activity_enable_audit_log` is set to `true`. + +Options are `filesystem`, `firehose`, `kinesis`, `lambda`, `pubsub`, `kafkarest`, and `stdout`. + +- Default value: `filesystem` +- Environment variable: `FLEET_ACTIVITY_AUDIT_LOG_PLUGIN` +- Config file format: + ```yaml + activity: + audit_log_plugin: firehose + ``` + #### Logging (Fleet server logging) ##### logging_debug @@ -1269,9 +1299,25 @@ The path which osquery result logs will be logged to. result_log_file: /var/log/osquery/result.log ``` +##### filesystem_audit_log_file + +This flag only has effect if `activity_audit_log_plugin` is set to `filesystem` (the default value) and if `activity_enable_audit_log` is set to `true`. + +The path which audit logs will be logged to. + +- Default value: `/tmp/audit` +- Environment variable: `FLEET_FILESYSTEM_AUDIT_LOG_FILE` +- Config file format: + ```yaml + filesystem: + audit_log_file: /var/log/fleet/audit.log + ``` + ##### filesystem_enable_log_rotation -This flag only has effect if `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `filesystem` (the default value). +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `filesystem` (the default value). +- `activity_audit_log_plugin` is set to `filesystem` and `activity_enable_audit_log` is set to `true`. This flag will cause the osquery result and status log files to be automatically rotated when files reach a size of 500 Mb or an age of 28 days. @@ -1314,9 +1360,11 @@ filesystem: ##### firehose_region -This flag only has effect if `osquery_status_log_plugin` is set to `firehose`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `firehose`. +- `activity_audit_log_plugin` is set to `firehose` and `activity_enable_audit_log` is set to `true`. -AWS region to use for Firehose connection +AWS region to use for Firehose connection. - Default value: none - Environment variable: `FLEET_FIREHOSE_REGION` @@ -1328,7 +1376,9 @@ AWS region to use for Firehose connection ##### firehose_access_key_id -This flag only has effect if `osquery_status_log_plugin` or `osquery_result_log_plugin` are set to `firehose`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `firehose`. +- `activity_audit_log_plugin` is set to `firehose` and `activity_enable_audit_log` is set to `true`. If `firehose_access_key_id` and `firehose_secret_access_key` are omitted, Fleet will try to use [AWS STS](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html) credentials. @@ -1344,7 +1394,9 @@ AWS access key ID to use for Firehose authentication. ##### firehose_secret_access_key -This flag only has effect if `osquery_status_log_plugin` or `osquery_result_log_plugin` are set to `firehose`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `firehose`. +- `activity_audit_log_plugin` is set to `firehose` and `activity_enable_audit_log` is set to `true`. AWS secret access key to use for Firehose authentication. @@ -1358,8 +1410,9 @@ AWS secret access key to use for Firehose authentication. ##### firehose_sts_assume_role_arn -This flag only has effect if `osquery_status_log_plugin` or -`osquery_result_log_plugin` are set to `firehose`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `firehose`. +- `activity_audit_log_plugin` is set to `firehose` and `activity_enable_audit_log` is set to `true`. AWS STS role ARN to use for Firehose authentication. @@ -1411,6 +1464,27 @@ the stream listed: - `firehose:DescribeDeliveryStream` - `firehose:PutRecordBatch` +##### firehose_audit_stream + +This flag only has effect if `activity_audit_log_plugin` is set to `firehose`. + +Name of the Firehose stream to audit logs. + +- Default value: none +- Environment variable: `FLEET_FIREHOSE_AUDIT_STREAM` +- Config file format: + ```yaml + firehose: + audit_stream: fleet_audit + ``` + +The IAM role used to send to Firehose must allow the following permissions on +the stream listed: + +- `firehose:DescribeDeliveryStream` +- `firehose:PutRecordBatch` + + ##### Example YAML ```yaml @@ -1430,7 +1504,9 @@ firehose: ##### kinesis_region -This flag only has effect if `osquery_status_log_plugin` is set to `kinesis`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kinesis`. +- `activity_audit_log_plugin` is set to `kinesis` and `activity_enable_audit_log` is set to `true`. AWS region to use for Kinesis connection @@ -1444,8 +1520,9 @@ AWS region to use for Kinesis connection ##### kinesis_access_key_id -This flag only has effect if `osquery_status_log_plugin` or -`osquery_result_log_plugin` are set to `kinesis`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kinesis`. +- `activity_audit_log_plugin` is set to `kinesis` and `activity_enable_audit_log` is set to `true`. If `kinesis_access_key_id` and `kinesis_secret_access_key` are omitted, Fleet will try to use @@ -1464,8 +1541,9 @@ AWS access key ID to use for Kinesis authentication. ##### kinesis_secret_access_key -This flag only has effect if `osquery_status_log_plugin` or -`osquery_result_log_plugin` are set to `kinesis`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kinesis`. +- `activity_audit_log_plugin` is set to `kinesis` and `activity_enable_audit_log` is set to `true`. AWS secret access key to use for Kinesis authentication. @@ -1479,8 +1557,9 @@ AWS secret access key to use for Kinesis authentication. ##### kinesis_sts_assume_role_arn -This flag only has effect if `osquery_status_log_plugin` or -`osquery_result_log_plugin` are set to `kinesis`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kinesis`. +- `activity_audit_log_plugin` is set to `kinesis` and `activity_enable_audit_log` is set to `true`. AWS STS role ARN to use for Kinesis authentication. @@ -1532,6 +1611,26 @@ the stream listed: - `kinesis:DescribeStream` - `kinesis:PutRecords` +##### kinesis_audit_stream + +This flag only has effect if `activity_audit_log_plugin` is set to `kinesis`. + +Name of the Kinesis stream to write audit logs. + +- Default value: none +- Environment variable: `FLEET_KINESIS_AUDIT_STREAM` +- Config file format: + ```yaml + kinesis: + audit_stream: fleet_audit + ``` + +The IAM role used to send to Kinesis must allow the following permissions on +the stream listed: + +- `kinesis:DescribeStream` +- `kinesis:PutRecords` + ##### Example YAML ```yaml @@ -1540,7 +1639,6 @@ osquery: osquery_result_log_plugin: kinesis kinesis: region: ca-central-1 - result_log_file: /var/log/osquery/result.log access_key_id: AKIAIOSFODNN7EXAMPLE secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY sts_assume_role_arn: arn:aws:iam::1234567890:role/firehose-role @@ -1548,14 +1646,15 @@ kinesis: result_stream: osquery_result ``` - #### Lambda ##### lambda_region -This flag only has effect if `osquery_status_log_plugin` is set to `lambda`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `lambda`. +- `activity_audit_log_plugin` is set to `lambda` and `activity_enable_audit_log` is set to `true`. -AWS region to use for Lambda connection +AWS region to use for Lambda connection. - Default value: none - Environment variable: `FLEET_LAMBDA_REGION` @@ -1567,8 +1666,9 @@ AWS region to use for Lambda connection ##### lambda_access_key_id -This flag only has effect if `osquery_status_log_plugin` or -`osquery_result_log_plugin` are set to `lambda`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `lambda`. +- `activity_audit_log_plugin` is set to `lambda` and `activity_enable_audit_log` is set to `true`. If `lambda_access_key_id` and `lambda_secret_access_key` are omitted, Fleet will try to use @@ -1587,8 +1687,9 @@ AWS access key ID to use for Lambda authentication. ##### lambda_secret_access_key -This flag only has effect if `osquery_status_log_plugin` or -`osquery_result_log_plugin` are set to `lambda`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `lambda`. +- `activity_audit_log_plugin` is set to `lambda` and `activity_enable_audit_log` is set to `true`. AWS secret access key to use for Lambda authentication. @@ -1602,8 +1703,9 @@ AWS secret access key to use for Lambda authentication. ##### lambda_sts_assume_role_arn -This flag only has effect if `osquery_status_log_plugin` or -`osquery_result_log_plugin` are set to `lambda`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `lambda`. +- `activity_audit_log_plugin` is set to `lambda` and `activity_enable_audit_log` is set to `true`. AWS STS role ARN to use for Lambda authentication. @@ -1653,6 +1755,25 @@ the function listed: - `lambda:InvokeFunction` +##### lambda_audit_function + +This flag only has effect if `activity_audit_log_plugin` is set to `lambda`. + +Name of the Lambda function to write audit logs. + +- Default value: none +- Environment variable: `FLEET_LAMBDA_AUDIT_FUNCTION` +- Config file format: + ```yaml + lambda: + audit_function: auditFunction + ``` + +The IAM role used to send to Lambda must allow the following permissions on +the function listed: + +- `lambda:InvokeFunction` + ##### Example YAML ```yaml @@ -1672,7 +1793,9 @@ lambda: ##### pubsub_project -This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `pubsub`. +- `activity_audit_log_plugin` is set to `pubsub` and `activity_enable_audit_log` is set to `true`. The identifier of the Google Cloud project containing the pubsub topics to publish logs to. @@ -1690,7 +1813,7 @@ for authentication with the service. ##### pubsub_result_topic -This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`. +This flag only has effect if `osquery_result_log_plugin` is set to `pubsub`. The identifier of the pubsub topic that client results will be published to. @@ -1716,6 +1839,20 @@ The identifier of the pubsub topic that osquery status logs will be published to status_topic: osquery_status ``` +##### pubsub_audit_topic + +This flag only has effect if `osquery_audit_log_plugin` is set to `pubsub`. + +The identifier of the pubsub topic that client results will be published to. + +- Default value: none +- Environment variable: `FLEET_PUBSUB_AUDIT_TOPIC` +- Config file format: + ```yaml + pubsub: + audit_topic: fleet_audit + ``` + ##### pubsub_add_attributes This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`. @@ -1756,7 +1893,9 @@ pubsub: ##### kafkarest_proxyhost -This flag only has effect if `osquery_status_log_plugin` or `osquery_result_log_plugin` is set to `kafkarest`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kafkarest`. +- `activity_audit_log_plugin` is set to `kafkarest` and `activity_enable_audit_log` is set to `true`. The URL of the host which to check for the topic existence and post messages to the specified topic. @@ -1793,12 +1932,28 @@ The identifier of the kafka topic that osquery result logs will be published to. - Config file format: ```yaml kafkarest: - status_topic: osquery_result + result_topic: osquery_result + ``` + +##### kafkarest_audit_topic + +This flag only has effect if `osquery_audit_log_plugin` is set to `kafkarest`. + +The identifier of the kafka topic that audit logs will be published to. + +- Default value: none +- Environment variable: `FLEET_KAFKAREST_AUDIT_TOPIC` +- Config file format: + ```yaml + kafkarest: + audit_topic: fleet_audit ``` ##### kafkarest_timeout -This flag only has effect if `osquery_status_log_plugin` or `osquery_result_log_plugin` is set to `kafkarest`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kafkarest`. +- `activity_audit_log_plugin` is set to `kafkarest` and `activity_enable_audit_log` is set to `true`. The timeout value for the http post attempt. Value is in units of seconds. @@ -1812,7 +1967,9 @@ The timeout value for the http post attempt. Value is in units of seconds. ##### kafkarest_content_type_value -This flag only has effect if `osquery_status_log_plugin` is set to `kafkarest`. +This flag only has effect if one of the following is true: +- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kafkarest`. +- `activity_audit_log_plugin` is set to `kafkarest` and `activity_enable_audit_log` is set to `true`. The value of the Content-Type header to use in Kafka REST Proxy API calls. More information about available versions can be found [here](https://docs.confluent.io/platform/current/kafka-rest/api.html#content-types). _Note: only JSON format is supported_ diff --git a/docs/Using-Fleet/Log-destinations.md b/docs/Using-Fleet/Log-destinations.md index 70873f2c8c..b11c752a12 100644 --- a/docs/Using-Fleet/Log-destinations.md +++ b/docs/Using-Fleet/Log-destinations.md @@ -1,18 +1,29 @@ # Log destinations -- [Amazon Kinesis Data Firehose](#amazon-kinesis-data-firehose) -- [Snowflake](#snowflake) -- [Splunk](#splunk) -- [Amazon Kinesis Data Streams](#amazon-kinesis-data-streams) -- [AWS Lambda](#aws-lambda) -- [Google Cloud Pub/Sub](#google-cloud-pubsub) -- [Apache Kafka](#apache-kafka) -- [Stdout](#stdout) -- [Filesystem](#filesystem) +- [Log destinations](#log-destinations) + - [Amazon Kinesis Data Firehose](#amazon-kinesis-data-firehose) + - [Snowflake](#snowflake) + - [Splunk](#splunk) + - [Amazon Kinesis Data Streams](#amazon-kinesis-data-streams) + - [AWS Lambda](#aws-lambda) + - [Google Cloud Pub/Sub](#google-cloud-pubsub) + - [Apache Kafka](#apache-kafka) + - [Stdout](#stdout) + - [Filesystem](#filesystem) + - [Sending logs outside of Fleet](#sending-logs-outside-of-fleet) This document provides a list of the supported log destinations in Fleet. -To configure each log destination, you must set the correct osquery logging configuration options in Fleet. Check out the reference documentation for [osquery logging configuration options](https://fleetdm.com/docs/deploying/configuration#osquery-status-log-plugin). +Log destinations can be used in Fleet to log: +- Osquery [status logs](https://osquery.readthedocs.io/en/stable/deployment/logging/#status-logs). +- Osquery [schedule query result logs](https://osquery.readthedocs.io/en/stable/deployment/logging/#results-logs). +- Fleet audit logs. + +To configure each log destination, you must set the correct logging configuration options in Fleet. +Check out the reference documentation for: + - [Osquery status logging configuration options](https://fleetdm.com/docs/deploying/configuration#osquery-status-log-plugin). + - [Osquery result logging configuration options](https://fleetdm.com/docs/deploying/configuration#osquery-result-log-plugin). + - [Activity audit logging configuration options](https://fleetdm.com/docs/deploying/configuration#activity_audit_log_plugin). ## Amazon Kinesis Data Firehose @@ -54,8 +65,8 @@ Logs are written to [Amazon Kinesis Data Streams (Kinesis)](https://aws.amazon.c Note that Kinesis logging has limits [discussed in the documentation](https://docs.aws.amazon.com/kinesis/latest/dev/limits.html). -When Fleet encounters osquery logs that are too big for Kinesis, notifications appear -in the Fleet server logs. Those osquery logs **will not** be sent to Kinesis. +When Fleet encounters logs that are too big for Kinesis, notifications appear +in the Fleet server logs. Those logs **will not** be sent to Kinesis. ## AWS Lambda @@ -102,7 +113,7 @@ Logs are written to stdout. - Plugin name: `stdout` - Flag namespace: [stdout](https://fleetdm.com/docs/deploying/configuration#stdout) -With the stdout plugin, osquery result and/or status logs are written to stdout +With the stdout plugin, logs are written to stdout on the Fleet server. This is typically used for debugging or with a log forwarding setup that will capture and forward stdout logs into a logging pipeline. @@ -119,7 +130,7 @@ The default log destination. - Plugin name: `filesystem` - Flag namespace: [filesystem](https://fleetdm.com/docs/deploying/configuration#filesystem) -With the filesystem plugin, osquery result and/or status logs are written to the local filesystem on the Fleet server. This is typically used with a log forwarding agent on the Fleet server that will push the logs into a logging pipeline. +With the filesystem plugin, logs are written to the local filesystem on the Fleet server. This is typically used with a log forwarding agent on the Fleet server that will push the logs into a logging pipeline. Note that if multiple load-balanced Fleet servers are used, the logs will be load-balanced across those servers (not duplicated). diff --git a/ee/server/service/service.go b/ee/server/service/service.go index f0403e1b90..2355dafffb 100644 --- a/ee/server/service/service.go +++ b/ee/server/service/service.go @@ -11,6 +11,7 @@ import ( "github.com/micromdm/nanodep/storage" ) +// Service wraps a free Service and implements additional premium functionality on top of it. type Service struct { fleet.Service @@ -31,7 +32,6 @@ func NewService( c clock.Clock, depStorage storage.AllStorage, ) (*Service, error) { - authorizer, err := authz.NewAuthorizer() if err != nil { return nil, fmt.Errorf("new authorizer: %w", err) diff --git a/ee/server/service/teams.go b/ee/server/service/teams.go index c5019208fd..0b3e918139 100644 --- a/ee/server/service/teams.go +++ b/ee/server/service/teams.go @@ -74,7 +74,7 @@ func (svc *Service) NewTeam(ctx context.Context, p fleet.TeamPayload) (*fleet.Te Name: team.Name, }, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for team creation") } return team, nil @@ -332,14 +332,17 @@ func (svc *Service) DeleteTeam(ctx context.Context, teamID uint) error { logging.WithExtras(ctx, "id", teamID) - return svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, authz.UserFromContext(ctx), fleet.ActivityTypeDeletedTeam{ ID: teamID, Name: name, }, - ) + ); err != nil { + return ctxerr.Wrap(ctx, err, "create activity for team deletion") + } + return nil } func (svc *Service) GetTeam(ctx context.Context, teamID uint) (*fleet.Team, error) { @@ -490,7 +493,7 @@ func (svc *Service) ApplyTeamSpecs(ctx context.Context, specs []*fleet.TeamSpec, Teams: details, }, ); err != nil { - return ctxerr.Wrap(ctx, err, "create applied team spec activity") + return ctxerr.Wrap(ctx, err, "create activity for team spec") } } return nil diff --git a/ee/server/service/users.go b/ee/server/service/users.go index 36259a900d..c6daecb96e 100644 --- a/ee/server/service/users.go +++ b/ee/server/service/users.go @@ -52,14 +52,12 @@ func (svc *Service) GetSSOUser(ctx context.Context, auth fleet.Auth) (*fleet.Use if err != nil { return nil, ctxerr.Wrap(ctx, err, "creating new SSO user") } - err = svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, user, fleet.ActivityTypeUserAddedBySSO{}, - ) - if err != nil { - return nil, err + ); err != nil { + return nil, ctxerr.Wrap(ctx, err, "create activity for SSO user creation") } - return user, nil } diff --git a/server/config/config.go b/server/config/config.go index 4db41575a0..7ae9a8ec75 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -130,16 +130,24 @@ type SessionConfig struct { // OsqueryConfig defines configs related to osquery type OsqueryConfig struct { - NodeKeySize int `yaml:"node_key_size"` - HostIdentifier string `yaml:"host_identifier"` - EnrollCooldown time.Duration `yaml:"enroll_cooldown"` - StatusLogPlugin string `yaml:"status_log_plugin"` - ResultLogPlugin string `yaml:"result_log_plugin"` - LabelUpdateInterval time.Duration `yaml:"label_update_interval"` - PolicyUpdateInterval time.Duration `yaml:"policy_update_interval"` - DetailUpdateInterval time.Duration `yaml:"detail_update_interval"` - StatusLogFile string `yaml:"status_log_file"` - ResultLogFile string `yaml:"result_log_file"` + NodeKeySize int `yaml:"node_key_size"` + HostIdentifier string `yaml:"host_identifier"` + EnrollCooldown time.Duration `yaml:"enroll_cooldown"` + StatusLogPlugin string `yaml:"status_log_plugin"` + ResultLogPlugin string `yaml:"result_log_plugin"` + LabelUpdateInterval time.Duration `yaml:"label_update_interval"` + PolicyUpdateInterval time.Duration `yaml:"policy_update_interval"` + DetailUpdateInterval time.Duration `yaml:"detail_update_interval"` + + // StatusLogFile is deprecated. It was replaced by FilesystemConfig.StatusLogFile. + // + // TODO(lucas): We should at least add a warning if this field is populated. + StatusLogFile string `yaml:"status_log_file"` + // ResultLogFile is deprecated. It was replaced by FilesystemConfig.ResultLogFile. + // + // TODO(lucas): We should at least add a warning if this field is populated. + ResultLogFile string `yaml:"result_log_file"` + EnableLogRotation bool `yaml:"enable_log_rotation"` MaxJitterPercent int `yaml:"max_jitter_percent"` EnableAsyncHostProcessing string `yaml:"enable_async_host_processing"` // true/false or per-task @@ -217,6 +225,14 @@ type LoggingConfig struct { TracingType string `yaml:"tracing_type"` } +// ActivityConfig defines configs related to activities. +type ActivityConfig struct { + // EnableAuditLog enables logging for audit activities. + EnableAuditLog bool `yaml:"enable_audit_log"` + // AuditLogPlugin sets the plugin to use to log activities. + AuditLogPlugin string `yaml:"audit_log_plugin"` +} + // FirehoseConfig defines configs for the AWS Firehose logging plugin type FirehoseConfig struct { Region string @@ -226,6 +242,7 @@ type FirehoseConfig struct { StsAssumeRoleArn string `yaml:"sts_assume_role_arn"` StatusStream string `yaml:"status_stream"` ResultStream string `yaml:"result_stream"` + AuditStream string `yaml:"audit_stream"` } // KinesisConfig defines configs for the AWS Kinesis logging plugin @@ -237,6 +254,7 @@ type KinesisConfig struct { StsAssumeRoleArn string `yaml:"sts_assume_role_arn"` StatusStream string `yaml:"status_stream"` ResultStream string `yaml:"result_stream"` + AuditStream string `yaml:"audit_stream"` } // LambdaConfig defines configs for the AWS Lambda logging plugin @@ -247,6 +265,7 @@ type LambdaConfig struct { StsAssumeRoleArn string `yaml:"sts_assume_role_arn"` StatusFunction string `yaml:"status_function"` ResultFunction string `yaml:"result_function"` + AuditFunction string `yaml:"audit_function"` } // S3Config defines config to enable file carving storage to an S3 bucket @@ -267,6 +286,7 @@ type PubSubConfig struct { Project string `json:"project"` StatusTopic string `json:"status_topic" yaml:"status_topic"` ResultTopic string `json:"result_topic" yaml:"result_topic"` + AuditTopic string `json:"audit_topic" yaml:"audit_topic"` AddAttributes bool `json:"add_attributes" yaml:"add_attributes"` } @@ -274,6 +294,7 @@ type PubSubConfig struct { type FilesystemConfig struct { StatusLogFile string `json:"status_log_file" yaml:"status_log_file"` ResultLogFile string `json:"result_log_file" yaml:"result_log_file"` + AuditLogFile string `json:"audit_log_file" yaml:"audit_log_file"` EnableLogRotation bool `json:"enable_log_rotation" yaml:"enable_log_rotation"` EnableLogCompression bool `json:"enable_log_compression" yaml:"enable_log_compression"` } @@ -282,6 +303,7 @@ type FilesystemConfig struct { type KafkaRESTConfig struct { StatusTopic string `json:"status_topic" yaml:"status_topic"` ResultTopic string `json:"result_topic" yaml:"result_topic"` + AuditTopic string `json:"audit_topic" yaml:"audit_topic"` ProxyHost string `json:"proxyhost" yaml:"proxyhost"` ContentTypeValue string `json:"content_type_value" yaml:"content_type_value"` Timeout int `json:"timeout" yaml:"timeout"` @@ -389,6 +411,7 @@ type FleetConfig struct { App AppConfig Session SessionConfig Osquery OsqueryConfig + Activity ActivityConfig Logging LoggingConfig Firehose FirehoseConfig Kinesis KinesisConfig @@ -826,6 +849,12 @@ func (man Manager) addConfigs() { man.addConfigDuration("osquery.min_software_last_opened_at_diff", 1*time.Hour, "Minimum time difference of the software's last opened timestamp (compared to the last one saved) to trigger an update to the database") + // Activities + man.addConfigBool("activity.enable_audit_log", false, + "Enable audit logs") + man.addConfigString("activity.audit_log_plugin", "filesystem", + "Log plugin to use for audit logs") + // Logging man.addConfigBool("logging.debug", false, "Enable debug logging") @@ -852,6 +881,8 @@ func (man Manager) addConfigs() { "Firehose stream name for status logs") man.addConfigString("firehose.result_stream", "", "Firehose stream name for result logs") + man.addConfigString("firehose.audit_stream", "", + "Firehose stream name for audit logs") // Kinesis man.addConfigString("kinesis.region", "", "AWS Region to use") @@ -865,6 +896,8 @@ func (man Manager) addConfigs() { "Kinesis stream name for status logs") man.addConfigString("kinesis.result_stream", "", "Kinesis stream name for result logs") + man.addConfigString("kinesis.audit_stream", "", + "Kinesis stream name for audit logs") // Lambda man.addConfigString("lambda.region", "", "AWS Region to use") @@ -876,6 +909,8 @@ func (man Manager) addConfigs() { "Lambda function name for status logs") man.addConfigString("lambda.result_function", "", "Lambda function name for result logs") + man.addConfigString("lambda.audit_function", "", + "Lambda function name for audit logs") // S3 for file carving man.addConfigString("s3.bucket", "", "Bucket where to store file carves") @@ -892,6 +927,7 @@ func (man Manager) addConfigs() { man.addConfigString("pubsub.project", "", "Google Cloud Project to use") man.addConfigString("pubsub.status_topic", "", "PubSub topic for status logs") man.addConfigString("pubsub.result_topic", "", "PubSub topic for result logs") + man.addConfigString("pubsub.audit_topic", "", "PubSub topic for audit logs") man.addConfigBool("pubsub.add_attributes", false, "Add PubSub attributes in addition to the message body") // Filesystem @@ -899,6 +935,8 @@ func (man Manager) addConfigs() { "Log file path to use for status logs") man.addConfigString("filesystem.result_log_file", filepath.Join(os.TempDir(), "osquery_result"), "Log file path to use for result logs") + man.addConfigString("filesystem.audit_log_file", filepath.Join(os.TempDir(), "audit"), + "Log file path to use for audit logs") man.addConfigBool("filesystem.enable_log_rotation", false, "Enable automatic rotation for osquery log files") man.addConfigBool("filesystem.enable_log_compression", false, @@ -907,6 +945,7 @@ func (man Manager) addConfigs() { // KafkaREST man.addConfigString("kafkarest.status_topic", "", "Kafka REST topic for status logs") man.addConfigString("kafkarest.result_topic", "", "Kafka REST topic for result logs") + man.addConfigString("kafkarest.audit_topic", "", "Kafka REST topic for audit logs") man.addConfigString("kafkarest.proxyhost", "", "Kafka REST proxy host url") man.addConfigString("kafkarest.content_type_value", "application/vnd.kafka.json.v1+json", "Kafka REST proxy content type header (defaults to \"application/vnd.kafka.json.v1+json\"") @@ -1089,12 +1128,14 @@ func (man Manager) LoadConfig() FleetConfig { Duration: man.getConfigDuration("session.duration"), }, Osquery: OsqueryConfig{ - NodeKeySize: man.getConfigInt("osquery.node_key_size"), - HostIdentifier: man.getConfigString("osquery.host_identifier"), - EnrollCooldown: man.getConfigDuration("osquery.enroll_cooldown"), - StatusLogPlugin: man.getConfigString("osquery.status_log_plugin"), - ResultLogPlugin: man.getConfigString("osquery.result_log_plugin"), - StatusLogFile: man.getConfigString("osquery.status_log_file"), + NodeKeySize: man.getConfigInt("osquery.node_key_size"), + HostIdentifier: man.getConfigString("osquery.host_identifier"), + EnrollCooldown: man.getConfigDuration("osquery.enroll_cooldown"), + StatusLogPlugin: man.getConfigString("osquery.status_log_plugin"), + ResultLogPlugin: man.getConfigString("osquery.result_log_plugin"), + // StatusLogFile is deprecated. FilesystemConfig.StatusLogFile is used instead. + StatusLogFile: man.getConfigString("osquery.status_log_file"), + // ResultLogFile is deprecated. FilesystemConfig.ResultLogFile is used instead. ResultLogFile: man.getConfigString("osquery.result_log_file"), LabelUpdateInterval: man.getConfigDuration("osquery.label_update_interval"), PolicyUpdateInterval: man.getConfigDuration("osquery.policy_update_interval"), @@ -1113,6 +1154,10 @@ func (man Manager) LoadConfig() FleetConfig { AsyncHostRedisScanKeysCount: man.getConfigInt("osquery.async_host_redis_scan_keys_count"), MinSoftwareLastOpenedAtDiff: man.getConfigDuration("osquery.min_software_last_opened_at_diff"), }, + Activity: ActivityConfig{ + EnableAuditLog: man.getConfigBool("activity.enable_audit_log"), + AuditLogPlugin: man.getConfigString("activity.audit_log_plugin"), + }, Logging: LoggingConfig{ Debug: man.getConfigBool("logging.debug"), JSON: man.getConfigBool("logging.json"), @@ -1129,6 +1174,7 @@ func (man Manager) LoadConfig() FleetConfig { StsAssumeRoleArn: man.getConfigString("firehose.sts_assume_role_arn"), StatusStream: man.getConfigString("firehose.status_stream"), ResultStream: man.getConfigString("firehose.result_stream"), + AuditStream: man.getConfigString("firehose.audit_stream"), }, Kinesis: KinesisConfig{ Region: man.getConfigString("kinesis.region"), @@ -1137,6 +1183,7 @@ func (man Manager) LoadConfig() FleetConfig { SecretAccessKey: man.getConfigString("kinesis.secret_access_key"), StatusStream: man.getConfigString("kinesis.status_stream"), ResultStream: man.getConfigString("kinesis.result_stream"), + AuditStream: man.getConfigString("kinesis.audit_stream"), StsAssumeRoleArn: man.getConfigString("kinesis.sts_assume_role_arn"), }, Lambda: LambdaConfig{ @@ -1145,6 +1192,7 @@ func (man Manager) LoadConfig() FleetConfig { SecretAccessKey: man.getConfigString("lambda.secret_access_key"), StatusFunction: man.getConfigString("lambda.status_function"), ResultFunction: man.getConfigString("lambda.result_function"), + AuditFunction: man.getConfigString("lambda.audit_function"), StsAssumeRoleArn: man.getConfigString("lambda.sts_assume_role_arn"), }, S3: S3Config{ @@ -1162,17 +1210,20 @@ func (man Manager) LoadConfig() FleetConfig { Project: man.getConfigString("pubsub.project"), StatusTopic: man.getConfigString("pubsub.status_topic"), ResultTopic: man.getConfigString("pubsub.result_topic"), + AuditTopic: man.getConfigString("pubsub.audit_topic"), AddAttributes: man.getConfigBool("pubsub.add_attributes"), }, Filesystem: FilesystemConfig{ StatusLogFile: man.getConfigString("filesystem.status_log_file"), ResultLogFile: man.getConfigString("filesystem.result_log_file"), + AuditLogFile: man.getConfigString("filesystem.audit_log_file"), EnableLogRotation: man.getConfigBool("filesystem.enable_log_rotation"), EnableLogCompression: man.getConfigBool("filesystem.enable_log_compression"), }, KafkaREST: KafkaRESTConfig{ StatusTopic: man.getConfigString("kafkarest.status_topic"), ResultTopic: man.getConfigString("kafkarest.result_topic"), + AuditTopic: man.getConfigString("kafkarest.audit_topic"), ProxyHost: man.getConfigString("kafkarest.proxyhost"), ContentTypeValue: man.getConfigString("kafkarest.content_type_value"), Timeout: man.getConfigInt("kafkarest.timeout"), @@ -1551,6 +1602,10 @@ func TestConfig() FleetConfig { DetailUpdateInterval: 1 * time.Hour, MaxJitterPercent: 0, }, + Activity: ActivityConfig{ + EnableAuditLog: true, + AuditLogPlugin: "filesystem", + }, Logging: LoggingConfig{ Debug: true, DisableBanner: true, @@ -1558,6 +1613,7 @@ func TestConfig() FleetConfig { Filesystem: FilesystemConfig{ StatusLogFile: testLogFile, ResultLogFile: testLogFile, + AuditLogFile: testLogFile, }, } } diff --git a/server/datastore/mysql/activities.go b/server/datastore/mysql/activities.go index f7a9549fdc..ada191eb46 100644 --- a/server/datastore/mysql/activities.go +++ b/server/datastore/mysql/activities.go @@ -30,14 +30,32 @@ func (ds *Datastore) NewActivity(ctx context.Context, user *fleet.User, activity } // ListActivities returns a slice of activities performed across the organization -func (ds *Datastore) ListActivities(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error) { +func (ds *Datastore) ListActivities(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) { activities := []*fleet.Activity{} - query := `SELECT a.id, a.user_id, a.created_at, a.activity_type, a.details, coalesce(u.name, a.user_name) as name, u.gravatar_url, u.email - FROM activities a LEFT JOIN users u ON (a.user_id=u.id) - WHERE true` - query = appendListOptionsToSQL(query, opt) + query := ` +SELECT + a.id, + a.user_id, + a.created_at, + a.activity_type, + a.details, + coalesce(u.name, a.user_name) as name, + u.gravatar_url, + u.email, + a.streamed +FROM activities a +LEFT JOIN users u ON (a.user_id=u.id) +WHERE true` - err := sqlx.SelectContext(ctx, ds.reader, &activities, query) + var args []interface{} + if opt.Streamed != nil { + query += " AND a.streamed = ?" + args = append(args, *opt.Streamed) + } + + query = appendListOptionsToSQL(query, opt.ListOptions) + + err := sqlx.SelectContext(ctx, ds.reader, &activities, query, args...) if err == sql.ErrNoRows { return nil, ctxerr.Wrap(ctx, notFound("Activity")) } else if err != nil { @@ -46,3 +64,15 @@ func (ds *Datastore) ListActivities(ctx context.Context, opt fleet.ListOptions) return activities, nil } + +func (ds *Datastore) MarkActivitiesAsStreamed(ctx context.Context, activityIDs []uint) error { + stmt := `UPDATE activities SET streamed = true WHERE id IN (?);` + query, args, err := sqlx.In(stmt, activityIDs) + if err != nil { + return ctxerr.Wrap(ctx, err, "sqlx.In mark activities as streamed") + } + if _, err := ds.writer.ExecContext(ctx, query, args...); err != nil { + return ctxerr.Wrap(ctx, err, "exec mark activities as streamed") + } + return nil +} diff --git a/server/datastore/mysql/activities_test.go b/server/datastore/mysql/activities_test.go index 42899789b3..8a5ffb9213 100644 --- a/server/datastore/mysql/activities_test.go +++ b/server/datastore/mysql/activities_test.go @@ -3,6 +3,7 @@ package mysql import ( "context" "encoding/json" + "sort" "testing" "github.com/fleetdm/fleet/v4/server/fleet" @@ -20,6 +21,7 @@ func TestActivity(t *testing.T) { }{ {"UsernameChange", testActivityUsernameChange}, {"New", testActivityNew}, + {"ListActivitiesStreamed", testListActivitiesStreamed}, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { @@ -59,7 +61,8 @@ func testActivityUsernameChange(t *testing.T, ds *Datastore) { GlobalRole: ptr.String(fleet.RoleObserver), } _, err := ds.NewUser(context.Background(), u) - require.Nil(t, err) + require.NoError(t, err) + require.NoError(t, ds.NewActivity(context.Background(), u, dummyActivity{ name: "test1", details: map[string]interface{}{"detail": 1, "sometext": "aaa"}, @@ -69,7 +72,7 @@ func testActivityUsernameChange(t *testing.T, ds *Datastore) { details: map[string]interface{}{"detail": 2}, })) - activities, err := ds.ListActivities(context.Background(), fleet.ListOptions{}) + activities, err := ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{}) require.NoError(t, err) assert.Len(t, activities, 2) assert.Equal(t, "fullname", activities[0].ActorFullName) @@ -78,7 +81,7 @@ func testActivityUsernameChange(t *testing.T, ds *Datastore) { err = ds.SaveUser(context.Background(), u) require.NoError(t, err) - activities, err = ds.ListActivities(context.Background(), fleet.ListOptions{}) + activities, err = ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{}) require.NoError(t, err) assert.Len(t, activities, 2) assert.Equal(t, "newname", activities[0].ActorFullName) @@ -88,7 +91,7 @@ func testActivityUsernameChange(t *testing.T, ds *Datastore) { err = ds.DeleteUser(context.Background(), u.ID) require.NoError(t, err) - activities, err = ds.ListActivities(context.Background(), fleet.ListOptions{}) + activities, err = ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{}) require.NoError(t, err) assert.Len(t, activities, 2) assert.Equal(t, "fullname", activities[0].ActorFullName) @@ -113,9 +116,11 @@ func testActivityNew(t *testing.T, ds *Datastore) { details: map[string]interface{}{"detail": 2}, })) - opt := fleet.ListOptions{ - Page: 0, - PerPage: 1, + opt := fleet.ListActivitiesOptions{ + ListOptions: fleet.ListOptions{ + Page: 0, + PerPage: 1, + }, } activities, err := ds.ListActivities(context.Background(), opt) require.NoError(t, err) @@ -123,9 +128,11 @@ func testActivityNew(t *testing.T, ds *Datastore) { assert.Equal(t, "fullname", activities[0].ActorFullName) assert.Equal(t, "test1", activities[0].Type) - opt = fleet.ListOptions{ - Page: 1, - PerPage: 1, + opt = fleet.ListActivitiesOptions{ + ListOptions: fleet.ListOptions{ + Page: 1, + PerPage: 1, + }, } activities, err = ds.ListActivities(context.Background(), opt) require.NoError(t, err) @@ -133,11 +140,71 @@ func testActivityNew(t *testing.T, ds *Datastore) { assert.Equal(t, "fullname", activities[0].ActorFullName) assert.Equal(t, "test2", activities[0].Type) - opt = fleet.ListOptions{ - Page: 0, - PerPage: 10, + opt = fleet.ListActivitiesOptions{ + ListOptions: fleet.ListOptions{ + Page: 0, + PerPage: 10, + }, } activities, err = ds.ListActivities(context.Background(), opt) require.NoError(t, err) assert.Len(t, activities, 2) } + +func testListActivitiesStreamed(t *testing.T, ds *Datastore) { + u := &fleet.User{ + Password: []byte("asd"), + Name: "fullname", + Email: "email@asd.com", + GlobalRole: ptr.String(fleet.RoleObserver), + } + _, err := ds.NewUser(context.Background(), u) + require.Nil(t, err) + + require.NoError(t, ds.NewActivity(context.Background(), u, dummyActivity{ + name: "test1", + details: map[string]interface{}{"detail": 1, "sometext": "aaa"}, + })) + require.NoError(t, ds.NewActivity(context.Background(), u, dummyActivity{ + name: "test2", + details: map[string]interface{}{"detail": 2}, + })) + require.NoError(t, ds.NewActivity(context.Background(), u, dummyActivity{ + name: "test3", + details: map[string]interface{}{"detail": 3}, + })) + + activities, err := ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{}) + require.NoError(t, err) + assert.Len(t, activities, 3) + + sort.Slice(activities, func(i, j int) bool { + return activities[i].ID < activities[j].ID + }) + + err = ds.MarkActivitiesAsStreamed(context.Background(), []uint{activities[0].ID}) + require.NoError(t, err) + + // Reload activities (with streamed field updated). + activities, err = ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{}) + require.NoError(t, err) + assert.Len(t, activities, 3) + sort.Slice(activities, func(i, j int) bool { + return activities[i].ID < activities[j].ID + }) + + nonStreamed, err := ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{ + Streamed: ptr.Bool(false), + }) + require.NoError(t, err) + assert.Len(t, nonStreamed, 2) + require.Equal(t, nonStreamed[0], activities[1]) + require.Equal(t, nonStreamed[1], activities[2]) + + streamed, err := ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{ + Streamed: ptr.Bool(true), + }) + require.NoError(t, err) + assert.Len(t, streamed, 1) + require.Equal(t, streamed[0], activities[0]) +} diff --git a/server/datastore/mysql/migrations/tables/20221220195935_AddStreamedColumnToActivities.go b/server/datastore/mysql/migrations/tables/20221220195935_AddStreamedColumnToActivities.go new file mode 100644 index 0000000000..42bdcf522b --- /dev/null +++ b/server/datastore/mysql/migrations/tables/20221220195935_AddStreamedColumnToActivities.go @@ -0,0 +1,29 @@ +package tables + +import ( + "database/sql" + + "github.com/pkg/errors" +) + +func init() { + MigrationClient.AddMigration(Up_20221220195935, Down_20221220195935) +} + +func Up_20221220195935(tx *sql.Tx) error { + if _, err := tx.Exec( + "ALTER TABLE `activities` ADD COLUMN `streamed` TINYINT(1) NOT NULL DEFAULT FALSE;", + ); err != nil { + return errors.Wrap(err, "adding streamed column to activities") + } + if _, err := tx.Exec( + "CREATE INDEX activities_streamed_idx ON activities (streamed);", + ); err != nil { + return errors.Wrap(err, "create activities_streamed_idx") + } + return nil +} + +func Down_20221220195935(tx *sql.Tx) error { + return nil +} diff --git a/server/datastore/mysql/schema.sql b/server/datastore/mysql/schema.sql index 94bc91987b..78eea2acc2 100644 --- a/server/datastore/mysql/schema.sql +++ b/server/datastore/mysql/schema.sql @@ -7,8 +7,10 @@ CREATE TABLE `activities` ( `user_name` varchar(255) DEFAULT NULL, `activity_type` varchar(255) NOT NULL, `details` json DEFAULT NULL, + `streamed` tinyint(1) NOT NULL DEFAULT '0', PRIMARY KEY (`id`), KEY `fk_activities_user_id` (`user_id`), + KEY `activities_streamed_idx` (`streamed`), CONSTRAINT `activities_ibfk_1` FOREIGN KEY (`user_id`) REFERENCES `users` (`id`) ON DELETE SET NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; /*!40101 SET character_set_client = @saved_cs_client */; @@ -466,9 +468,9 @@ CREATE TABLE `migration_status_tables` ( `tstamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `id` (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=163 DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB AUTO_INCREMENT=164 DEFAULT CHARSET=utf8mb4; /*!40101 SET character_set_client = @saved_cs_client */; -INSERT INTO `migration_status_tables` VALUES (1,0,1,'2020-01-01 01:01:01'),(2,20161118193812,1,'2020-01-01 01:01:01'),(3,20161118211713,1,'2020-01-01 01:01:01'),(4,20161118212436,1,'2020-01-01 01:01:01'),(5,20161118212515,1,'2020-01-01 01:01:01'),(6,20161118212528,1,'2020-01-01 01:01:01'),(7,20161118212538,1,'2020-01-01 01:01:01'),(8,20161118212549,1,'2020-01-01 01:01:01'),(9,20161118212557,1,'2020-01-01 01:01:01'),(10,20161118212604,1,'2020-01-01 01:01:01'),(11,20161118212613,1,'2020-01-01 01:01:01'),(12,20161118212621,1,'2020-01-01 01:01:01'),(13,20161118212630,1,'2020-01-01 01:01:01'),(14,20161118212641,1,'2020-01-01 01:01:01'),(15,20161118212649,1,'2020-01-01 01:01:01'),(16,20161118212656,1,'2020-01-01 01:01:01'),(17,20161118212758,1,'2020-01-01 01:01:01'),(18,20161128234849,1,'2020-01-01 01:01:01'),(19,20161230162221,1,'2020-01-01 01:01:01'),(20,20170104113816,1,'2020-01-01 01:01:01'),(21,20170105151732,1,'2020-01-01 01:01:01'),(22,20170108191242,1,'2020-01-01 01:01:01'),(23,20170109094020,1,'2020-01-01 01:01:01'),(24,20170109130438,1,'2020-01-01 01:01:01'),(25,20170110202752,1,'2020-01-01 01:01:01'),(26,20170111133013,1,'2020-01-01 01:01:01'),(27,20170117025759,1,'2020-01-01 01:01:01'),(28,20170118191001,1,'2020-01-01 01:01:01'),(29,20170119234632,1,'2020-01-01 01:01:01'),(30,20170124230432,1,'2020-01-01 01:01:01'),(31,20170127014618,1,'2020-01-01 01:01:01'),(32,20170131232841,1,'2020-01-01 01:01:01'),(33,20170223094154,1,'2020-01-01 01:01:01'),(34,20170306075207,1,'2020-01-01 01:01:01'),(35,20170309100733,1,'2020-01-01 01:01:01'),(36,20170331111922,1,'2020-01-01 01:01:01'),(37,20170502143928,1,'2020-01-01 01:01:01'),(38,20170504130602,1,'2020-01-01 01:01:01'),(39,20170509132100,1,'2020-01-01 01:01:01'),(40,20170519105647,1,'2020-01-01 01:01:01'),(41,20170519105648,1,'2020-01-01 01:01:01'),(42,20170831234300,1,'2020-01-01 01:01:01'),(43,20170831234301,1,'2020-01-01 01:01:01'),(44,20170831234303,1,'2020-01-01 01:01:01'),(45,20171116163618,1,'2020-01-01 01:01:01'),(46,20171219164727,1,'2020-01-01 01:01:01'),(47,20180620164811,1,'2020-01-01 01:01:01'),(48,20180620175054,1,'2020-01-01 01:01:01'),(49,20180620175055,1,'2020-01-01 01:01:01'),(50,20191010101639,1,'2020-01-01 01:01:01'),(51,20191010155147,1,'2020-01-01 01:01:01'),(52,20191220130734,1,'2020-01-01 01:01:01'),(53,20200311140000,1,'2020-01-01 01:01:01'),(54,20200405120000,1,'2020-01-01 01:01:01'),(55,20200407120000,1,'2020-01-01 01:01:01'),(56,20200420120000,1,'2020-01-01 01:01:01'),(57,20200504120000,1,'2020-01-01 01:01:01'),(58,20200512120000,1,'2020-01-01 01:01:01'),(59,20200707120000,1,'2020-01-01 01:01:01'),(60,20201011162341,1,'2020-01-01 01:01:01'),(61,20201021104586,1,'2020-01-01 01:01:01'),(62,20201102112520,1,'2020-01-01 01:01:01'),(63,20201208121729,1,'2020-01-01 01:01:01'),(64,20201215091637,1,'2020-01-01 01:01:01'),(65,20210119174155,1,'2020-01-01 01:01:01'),(66,20210326182902,1,'2020-01-01 01:01:01'),(67,20210421112652,1,'2020-01-01 01:01:01'),(68,20210506095025,1,'2020-01-01 01:01:01'),(69,20210513115729,1,'2020-01-01 01:01:01'),(70,20210526113559,1,'2020-01-01 01:01:01'),(71,20210601000001,1,'2020-01-01 01:01:01'),(72,20210601000002,1,'2020-01-01 01:01:01'),(73,20210601000003,1,'2020-01-01 01:01:01'),(74,20210601000004,1,'2020-01-01 01:01:01'),(75,20210601000005,1,'2020-01-01 01:01:01'),(76,20210601000006,1,'2020-01-01 01:01:01'),(77,20210601000007,1,'2020-01-01 01:01:01'),(78,20210601000008,1,'2020-01-01 01:01:01'),(79,20210606151329,1,'2020-01-01 01:01:01'),(80,20210616163757,1,'2020-01-01 01:01:01'),(81,20210617174723,1,'2020-01-01 01:01:01'),(82,20210622160235,1,'2020-01-01 01:01:01'),(83,20210623100031,1,'2020-01-01 01:01:01'),(84,20210623133615,1,'2020-01-01 01:01:01'),(85,20210708143152,1,'2020-01-01 01:01:01'),(86,20210709124443,1,'2020-01-01 01:01:01'),(87,20210712155608,1,'2020-01-01 01:01:01'),(88,20210714102108,1,'2020-01-01 01:01:01'),(89,20210719153709,1,'2020-01-01 01:01:01'),(90,20210721171531,1,'2020-01-01 01:01:01'),(91,20210723135713,1,'2020-01-01 01:01:01'),(92,20210802135933,1,'2020-01-01 01:01:01'),(93,20210806112844,1,'2020-01-01 01:01:01'),(94,20210810095603,1,'2020-01-01 01:01:01'),(95,20210811150223,1,'2020-01-01 01:01:01'),(96,20210818151827,1,'2020-01-01 01:01:01'),(97,20210818151828,1,'2020-01-01 01:01:01'),(98,20210818182258,1,'2020-01-01 01:01:01'),(99,20210819131107,1,'2020-01-01 01:01:01'),(100,20210819143446,1,'2020-01-01 01:01:01'),(101,20210903132338,1,'2020-01-01 01:01:01'),(102,20210915144307,1,'2020-01-01 01:01:01'),(103,20210920155130,1,'2020-01-01 01:01:01'),(104,20210927143115,1,'2020-01-01 01:01:01'),(105,20210927143116,1,'2020-01-01 01:01:01'),(106,20211013133706,1,'2020-01-01 01:01:01'),(107,20211013133707,1,'2020-01-01 01:01:01'),(108,20211102135149,1,'2020-01-01 01:01:01'),(109,20211109121546,1,'2020-01-01 01:01:01'),(110,20211110163320,1,'2020-01-01 01:01:01'),(111,20211116184029,1,'2020-01-01 01:01:01'),(112,20211116184030,1,'2020-01-01 01:01:01'),(113,20211202092042,1,'2020-01-01 01:01:01'),(114,20211202181033,1,'2020-01-01 01:01:01'),(115,20211207161856,1,'2020-01-01 01:01:01'),(116,20211216131203,1,'2020-01-01 01:01:01'),(117,20211221110132,1,'2020-01-01 01:01:01'),(118,20220107155700,1,'2020-01-01 01:01:01'),(119,20220125105650,1,'2020-01-01 01:01:01'),(120,20220201084510,1,'2020-01-01 01:01:01'),(121,20220208144830,1,'2020-01-01 01:01:01'),(122,20220208144831,1,'2020-01-01 01:01:01'),(123,20220215152203,1,'2020-01-01 01:01:01'),(124,20220223113157,1,'2020-01-01 01:01:01'),(125,20220307104655,1,'2020-01-01 01:01:01'),(126,20220309133956,1,'2020-01-01 01:01:01'),(127,20220316155700,1,'2020-01-01 01:01:01'),(128,20220323152301,1,'2020-01-01 01:01:01'),(129,20220330100659,1,'2020-01-01 01:01:01'),(130,20220404091216,1,'2020-01-01 01:01:01'),(131,20220419140750,1,'2020-01-01 01:01:01'),(132,20220428140039,1,'2020-01-01 01:01:01'),(133,20220503134048,1,'2020-01-01 01:01:01'),(134,20220524102918,1,'2020-01-01 01:01:01'),(135,20220526123327,1,'2020-01-01 01:01:01'),(136,20220526123328,1,'2020-01-01 01:01:01'),(137,20220526123329,1,'2020-01-01 01:01:01'),(138,20220608113128,1,'2020-01-01 01:01:01'),(139,20220627104817,1,'2020-01-01 01:01:01'),(140,20220704101843,1,'2020-01-01 01:01:01'),(141,20220708095046,1,'2020-01-01 01:01:01'),(142,20220713091130,1,'2020-01-01 01:01:01'),(143,20220802135510,1,'2020-01-01 01:01:01'),(144,20220818101352,1,'2020-01-01 01:01:01'),(145,20220822161445,1,'2020-01-01 01:01:01'),(146,20220831100036,1,'2020-01-01 01:01:01'),(147,20220831100151,1,'2020-01-01 01:01:01'),(148,20220908181826,1,'2020-01-01 01:01:01'),(149,20220914154915,1,'2020-01-01 01:01:01'),(150,20220915165115,1,'2020-01-01 01:01:01'),(151,20220915165116,1,'2020-01-01 01:01:01'),(152,20220928100158,1,'2020-01-01 01:01:01'),(153,20221014084130,1,'2020-01-01 01:01:01'),(154,20221027085019,1,'2020-01-01 01:01:01'),(155,20221101103952,1,'2020-01-01 01:01:01'),(156,20221104144401,1,'2020-01-01 01:01:01'),(157,20221109100749,1,'2020-01-01 01:01:01'),(158,20221115104546,1,'2020-01-01 01:01:01'),(159,20221130114928,1,'2020-01-01 01:01:01'),(160,20221205112142,1,'2020-01-01 01:01:01'),(161,20221216115820,1,'2020-01-01 01:01:01'),(162,20221220195934,1,'2020-01-01 01:01:01'); +INSERT INTO `migration_status_tables` VALUES (1,0,1,'2020-01-01 01:01:01'),(2,20161118193812,1,'2020-01-01 01:01:01'),(3,20161118211713,1,'2020-01-01 01:01:01'),(4,20161118212436,1,'2020-01-01 01:01:01'),(5,20161118212515,1,'2020-01-01 01:01:01'),(6,20161118212528,1,'2020-01-01 01:01:01'),(7,20161118212538,1,'2020-01-01 01:01:01'),(8,20161118212549,1,'2020-01-01 01:01:01'),(9,20161118212557,1,'2020-01-01 01:01:01'),(10,20161118212604,1,'2020-01-01 01:01:01'),(11,20161118212613,1,'2020-01-01 01:01:01'),(12,20161118212621,1,'2020-01-01 01:01:01'),(13,20161118212630,1,'2020-01-01 01:01:01'),(14,20161118212641,1,'2020-01-01 01:01:01'),(15,20161118212649,1,'2020-01-01 01:01:01'),(16,20161118212656,1,'2020-01-01 01:01:01'),(17,20161118212758,1,'2020-01-01 01:01:01'),(18,20161128234849,1,'2020-01-01 01:01:01'),(19,20161230162221,1,'2020-01-01 01:01:01'),(20,20170104113816,1,'2020-01-01 01:01:01'),(21,20170105151732,1,'2020-01-01 01:01:01'),(22,20170108191242,1,'2020-01-01 01:01:01'),(23,20170109094020,1,'2020-01-01 01:01:01'),(24,20170109130438,1,'2020-01-01 01:01:01'),(25,20170110202752,1,'2020-01-01 01:01:01'),(26,20170111133013,1,'2020-01-01 01:01:01'),(27,20170117025759,1,'2020-01-01 01:01:01'),(28,20170118191001,1,'2020-01-01 01:01:01'),(29,20170119234632,1,'2020-01-01 01:01:01'),(30,20170124230432,1,'2020-01-01 01:01:01'),(31,20170127014618,1,'2020-01-01 01:01:01'),(32,20170131232841,1,'2020-01-01 01:01:01'),(33,20170223094154,1,'2020-01-01 01:01:01'),(34,20170306075207,1,'2020-01-01 01:01:01'),(35,20170309100733,1,'2020-01-01 01:01:01'),(36,20170331111922,1,'2020-01-01 01:01:01'),(37,20170502143928,1,'2020-01-01 01:01:01'),(38,20170504130602,1,'2020-01-01 01:01:01'),(39,20170509132100,1,'2020-01-01 01:01:01'),(40,20170519105647,1,'2020-01-01 01:01:01'),(41,20170519105648,1,'2020-01-01 01:01:01'),(42,20170831234300,1,'2020-01-01 01:01:01'),(43,20170831234301,1,'2020-01-01 01:01:01'),(44,20170831234303,1,'2020-01-01 01:01:01'),(45,20171116163618,1,'2020-01-01 01:01:01'),(46,20171219164727,1,'2020-01-01 01:01:01'),(47,20180620164811,1,'2020-01-01 01:01:01'),(48,20180620175054,1,'2020-01-01 01:01:01'),(49,20180620175055,1,'2020-01-01 01:01:01'),(50,20191010101639,1,'2020-01-01 01:01:01'),(51,20191010155147,1,'2020-01-01 01:01:01'),(52,20191220130734,1,'2020-01-01 01:01:01'),(53,20200311140000,1,'2020-01-01 01:01:01'),(54,20200405120000,1,'2020-01-01 01:01:01'),(55,20200407120000,1,'2020-01-01 01:01:01'),(56,20200420120000,1,'2020-01-01 01:01:01'),(57,20200504120000,1,'2020-01-01 01:01:01'),(58,20200512120000,1,'2020-01-01 01:01:01'),(59,20200707120000,1,'2020-01-01 01:01:01'),(60,20201011162341,1,'2020-01-01 01:01:01'),(61,20201021104586,1,'2020-01-01 01:01:01'),(62,20201102112520,1,'2020-01-01 01:01:01'),(63,20201208121729,1,'2020-01-01 01:01:01'),(64,20201215091637,1,'2020-01-01 01:01:01'),(65,20210119174155,1,'2020-01-01 01:01:01'),(66,20210326182902,1,'2020-01-01 01:01:01'),(67,20210421112652,1,'2020-01-01 01:01:01'),(68,20210506095025,1,'2020-01-01 01:01:01'),(69,20210513115729,1,'2020-01-01 01:01:01'),(70,20210526113559,1,'2020-01-01 01:01:01'),(71,20210601000001,1,'2020-01-01 01:01:01'),(72,20210601000002,1,'2020-01-01 01:01:01'),(73,20210601000003,1,'2020-01-01 01:01:01'),(74,20210601000004,1,'2020-01-01 01:01:01'),(75,20210601000005,1,'2020-01-01 01:01:01'),(76,20210601000006,1,'2020-01-01 01:01:01'),(77,20210601000007,1,'2020-01-01 01:01:01'),(78,20210601000008,1,'2020-01-01 01:01:01'),(79,20210606151329,1,'2020-01-01 01:01:01'),(80,20210616163757,1,'2020-01-01 01:01:01'),(81,20210617174723,1,'2020-01-01 01:01:01'),(82,20210622160235,1,'2020-01-01 01:01:01'),(83,20210623100031,1,'2020-01-01 01:01:01'),(84,20210623133615,1,'2020-01-01 01:01:01'),(85,20210708143152,1,'2020-01-01 01:01:01'),(86,20210709124443,1,'2020-01-01 01:01:01'),(87,20210712155608,1,'2020-01-01 01:01:01'),(88,20210714102108,1,'2020-01-01 01:01:01'),(89,20210719153709,1,'2020-01-01 01:01:01'),(90,20210721171531,1,'2020-01-01 01:01:01'),(91,20210723135713,1,'2020-01-01 01:01:01'),(92,20210802135933,1,'2020-01-01 01:01:01'),(93,20210806112844,1,'2020-01-01 01:01:01'),(94,20210810095603,1,'2020-01-01 01:01:01'),(95,20210811150223,1,'2020-01-01 01:01:01'),(96,20210818151827,1,'2020-01-01 01:01:01'),(97,20210818151828,1,'2020-01-01 01:01:01'),(98,20210818182258,1,'2020-01-01 01:01:01'),(99,20210819131107,1,'2020-01-01 01:01:01'),(100,20210819143446,1,'2020-01-01 01:01:01'),(101,20210903132338,1,'2020-01-01 01:01:01'),(102,20210915144307,1,'2020-01-01 01:01:01'),(103,20210920155130,1,'2020-01-01 01:01:01'),(104,20210927143115,1,'2020-01-01 01:01:01'),(105,20210927143116,1,'2020-01-01 01:01:01'),(106,20211013133706,1,'2020-01-01 01:01:01'),(107,20211013133707,1,'2020-01-01 01:01:01'),(108,20211102135149,1,'2020-01-01 01:01:01'),(109,20211109121546,1,'2020-01-01 01:01:01'),(110,20211110163320,1,'2020-01-01 01:01:01'),(111,20211116184029,1,'2020-01-01 01:01:01'),(112,20211116184030,1,'2020-01-01 01:01:01'),(113,20211202092042,1,'2020-01-01 01:01:01'),(114,20211202181033,1,'2020-01-01 01:01:01'),(115,20211207161856,1,'2020-01-01 01:01:01'),(116,20211216131203,1,'2020-01-01 01:01:01'),(117,20211221110132,1,'2020-01-01 01:01:01'),(118,20220107155700,1,'2020-01-01 01:01:01'),(119,20220125105650,1,'2020-01-01 01:01:01'),(120,20220201084510,1,'2020-01-01 01:01:01'),(121,20220208144830,1,'2020-01-01 01:01:01'),(122,20220208144831,1,'2020-01-01 01:01:01'),(123,20220215152203,1,'2020-01-01 01:01:01'),(124,20220223113157,1,'2020-01-01 01:01:01'),(125,20220307104655,1,'2020-01-01 01:01:01'),(126,20220309133956,1,'2020-01-01 01:01:01'),(127,20220316155700,1,'2020-01-01 01:01:01'),(128,20220323152301,1,'2020-01-01 01:01:01'),(129,20220330100659,1,'2020-01-01 01:01:01'),(130,20220404091216,1,'2020-01-01 01:01:01'),(131,20220419140750,1,'2020-01-01 01:01:01'),(132,20220428140039,1,'2020-01-01 01:01:01'),(133,20220503134048,1,'2020-01-01 01:01:01'),(134,20220524102918,1,'2020-01-01 01:01:01'),(135,20220526123327,1,'2020-01-01 01:01:01'),(136,20220526123328,1,'2020-01-01 01:01:01'),(137,20220526123329,1,'2020-01-01 01:01:01'),(138,20220608113128,1,'2020-01-01 01:01:01'),(139,20220627104817,1,'2020-01-01 01:01:01'),(140,20220704101843,1,'2020-01-01 01:01:01'),(141,20220708095046,1,'2020-01-01 01:01:01'),(142,20220713091130,1,'2020-01-01 01:01:01'),(143,20220802135510,1,'2020-01-01 01:01:01'),(144,20220818101352,1,'2020-01-01 01:01:01'),(145,20220822161445,1,'2020-01-01 01:01:01'),(146,20220831100036,1,'2020-01-01 01:01:01'),(147,20220831100151,1,'2020-01-01 01:01:01'),(148,20220908181826,1,'2020-01-01 01:01:01'),(149,20220914154915,1,'2020-01-01 01:01:01'),(150,20220915165115,1,'2020-01-01 01:01:01'),(151,20220915165116,1,'2020-01-01 01:01:01'),(152,20220928100158,1,'2020-01-01 01:01:01'),(153,20221014084130,1,'2020-01-01 01:01:01'),(154,20221027085019,1,'2020-01-01 01:01:01'),(155,20221101103952,1,'2020-01-01 01:01:01'),(156,20221104144401,1,'2020-01-01 01:01:01'),(157,20221109100749,1,'2020-01-01 01:01:01'),(158,20221115104546,1,'2020-01-01 01:01:01'),(159,20221130114928,1,'2020-01-01 01:01:01'),(160,20221205112142,1,'2020-01-01 01:01:01'),(161,20221216115820,1,'2020-01-01 01:01:01'),(162,20221220195934,1,'2020-01-01 01:01:01'),(163,20221220195935,1,'2020-01-01 01:01:01'); /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `mobile_device_management_solutions` ( diff --git a/server/fleet/activities.go b/server/fleet/activities.go index d4c3ec8ead..51bc831e7e 100644 --- a/server/fleet/activities.go +++ b/server/fleet/activities.go @@ -435,6 +435,7 @@ type Activity struct { ActorEmail *string `json:"actor_email" db:"email"` Type string `json:"type" db:"activity_type"` Details *json.RawMessage `json:"details" db:"details"` + Streamed *bool `json:"-" db:"streamed"` } type ActivityTypeUserLoggedIn struct { diff --git a/server/fleet/app.go b/server/fleet/app.go index 5d8402b0ef..c9016d9e18 100644 --- a/server/fleet/app.go +++ b/server/fleet/app.go @@ -469,6 +469,12 @@ type ListQueryOptions struct { OnlyObserverCanRun bool } +type ListActivitiesOptions struct { + ListOptions + + Streamed *bool +} + // ApplySpecOptions are the options available when applying a YAML or JSON spec. type ApplySpecOptions struct { // Force indicates that any validation error in the incoming payload should @@ -570,6 +576,7 @@ type Logging struct { Json bool `json:"json"` Result LoggingPlugin `json:"result"` Status LoggingPlugin `json:"status"` + Audit LoggingPlugin `json:"audit"` } type UpdateIntervalConfig struct { @@ -611,6 +618,7 @@ type FirehoseConfig struct { Region string `json:"region"` StatusStream string `json:"status_stream"` ResultStream string `json:"result_stream"` + AuditStream string `json:"audit_stream"` } // KinesisConfig shadows config.KinesisConfig only exposing a subset of fields @@ -618,6 +626,7 @@ type KinesisConfig struct { Region string `json:"region"` StatusStream string `json:"status_stream"` ResultStream string `json:"result_stream"` + AuditStream string `json:"audit_stream"` } // LambdaConfig shadows config.LambdaConfig only exposing a subset of fields @@ -625,11 +634,13 @@ type LambdaConfig struct { Region string `json:"region"` StatusFunction string `json:"status_function"` ResultFunction string `json:"result_function"` + AuditFunction string `json:"audit_function"` } // KafkaRESTConfig shadows config.KafkaRESTConfig type KafkaRESTConfig struct { StatusTopic string `json:"status_topic"` ResultTopic string `json:"result_topic"` + AuditTopic string `json:"audit_topic"` ProxyHost string `json:"proxyhost"` } diff --git a/server/fleet/cron_schedules.go b/server/fleet/cron_schedules.go index 4805d3ff58..75cdc55eb0 100644 --- a/server/fleet/cron_schedules.go +++ b/server/fleet/cron_schedules.go @@ -18,6 +18,7 @@ const ( CronVulnerabilities CronScheduleName = "vulnerabilities" CronAutomations CronScheduleName = "automations" CronIntegrations CronScheduleName = "integrations" + CronActivitiesStreaming CronScheduleName = "activities_streaming" ) type CronSchedulesService interface { diff --git a/server/fleet/datastore.go b/server/fleet/datastore.go index 55f6746635..ead80019fa 100644 --- a/server/fleet/datastore.go +++ b/server/fleet/datastore.go @@ -438,7 +438,8 @@ type Datastore interface { // ActivitiesStore NewActivity(ctx context.Context, user *User, activity ActivityDetails) error - ListActivities(ctx context.Context, opt ListOptions) ([]*Activity, error) + ListActivities(ctx context.Context, opt ListActivitiesOptions) ([]*Activity, error) + MarkActivitiesAsStreamed(ctx context.Context, activityIDs []uint) error /////////////////////////////////////////////////////////////////////////////// // StatisticsStore diff --git a/server/fleet/service.go b/server/fleet/service.go index c209d51eda..55940be098 100644 --- a/server/fleet/service.go +++ b/server/fleet/service.go @@ -457,7 +457,7 @@ type Service interface { /////////////////////////////////////////////////////////////////////////////// // ActivitiesService - ListActivities(ctx context.Context, opt ListOptions) ([]*Activity, error) + ListActivities(ctx context.Context, opt ListActivitiesOptions) ([]*Activity, error) /////////////////////////////////////////////////////////////////////////////// // UserRolesService diff --git a/server/logging/filesystem.go b/server/logging/filesystem.go index b952384f14..3d4016d345 100644 --- a/server/logging/filesystem.go +++ b/server/logging/filesystem.go @@ -23,7 +23,8 @@ type filesystemLogWriter struct { writer io.WriteCloser } -// NewFilesystemLogWriter creates a log file for osquery status/result logs. +// NewFilesystemLogWriter creates a logger that writes to a file. +// // The logFile can be rotated by sending a `SIGHUP` signal to Fleet if // enableRotation is true // @@ -43,25 +44,25 @@ func NewFilesystemLogWriter(path string, appLogger log.Logger, enableRotation bo } // Use lumberjack logger that supports rotation file.Close() - osquerydLogger := &lumberjack.Logger{ + fsLogger := &lumberjack.Logger{ Filename: path, MaxSize: 500, // megabytes MaxBackups: 3, MaxAge: 28, // days Compress: enableCompression, } - appLogger = log.With(appLogger, "component", "osqueryd-logger") + appLogger = log.With(appLogger, "component", "filesystem-logger") sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGHUP) go func() { for { <-sig // block on signal - if err := osquerydLogger.Rotate(); err != nil { + if err := fsLogger.Rotate(); err != nil { appLogger.Log("err", err) } } }() - return &filesystemLogWriter{osquerydLogger}, nil + return &filesystemLogWriter{fsLogger}, nil } // If writer is based on bufio we want to flush after a batch of diff --git a/server/logging/logging.go b/server/logging/logging.go index e3c29bfafd..19c72f5041 100644 --- a/server/logging/logging.go +++ b/server/logging/logging.go @@ -1,191 +1,168 @@ -// Package logging provides logger "plugins" for writing osquery status and -// result logs to various destinations. +// Package logging provides logger "plugins" for various destinations. package logging import ( "fmt" - "github.com/fleetdm/fleet/v4/server/config" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" ) -type OsqueryLogger struct { - Status fleet.JSONLogger - Result fleet.JSONLogger +type FilesystemConfig struct { + LogFile string + + EnableLogRotation bool + EnableLogCompression bool } -func New(config config.FleetConfig, logger log.Logger) (*OsqueryLogger, error) { - var status, result fleet.JSONLogger - var err error +type FirehoseConfig struct { + StreamName string - switch config.Osquery.StatusLogPlugin { + Region string + EndpointURL string + AccessKeyID string + SecretAccessKey string + StsAssumeRoleArn string +} + +type KinesisConfig struct { + StreamName string + + Region string + EndpointURL string + AccessKeyID string + SecretAccessKey string + StsAssumeRoleArn string +} + +type LambdaConfig struct { + Function string + + Region string + AccessKeyID string + SecretAccessKey string + StsAssumeRoleArn string +} + +type PubSubConfig struct { + Topic string + + Project string + AddAttributes bool +} + +type KafkaRESTConfig struct { + Topic string + + ProxyHost string + ContentTypeValue string + Timeout int +} + +type Config struct { + Plugin string + + Filesystem FilesystemConfig + Firehose FirehoseConfig + Kinesis KinesisConfig + Lambda LambdaConfig + PubSub PubSubConfig + KafkaREST KafkaRESTConfig +} + +func NewJSONLogger(name string, config Config, logger log.Logger) (fleet.JSONLogger, error) { + switch config.Plugin { case "": // Allow "" to mean filesystem for backwards compatibility - level.Info(logger).Log("msg", "osquery_status_log_plugin not explicitly specified. Assuming 'filesystem'") + level.Info(logger).Log( + "msg", + fmt.Sprintf("plugin for %s not explicitly specified. Assuming 'filesystem'", name), + ) fallthrough case "filesystem": - status, err = NewFilesystemLogWriter( - config.Filesystem.StatusLogFile, + writer, err := NewFilesystemLogWriter( + config.Filesystem.LogFile, logger, config.Filesystem.EnableLogRotation, config.Filesystem.EnableLogCompression, ) if err != nil { - return nil, fmt.Errorf("create filesystem status logger: %w", err) + return nil, fmt.Errorf("create filesystem %s logger: %w", name, err) } + return fleet.JSONLogger(writer), nil case "firehose": - status, err = NewFirehoseLogWriter( + writer, err := NewFirehoseLogWriter( config.Firehose.Region, config.Firehose.EndpointURL, config.Firehose.AccessKeyID, config.Firehose.SecretAccessKey, config.Firehose.StsAssumeRoleArn, - config.Firehose.StatusStream, + config.Firehose.StreamName, logger, ) if err != nil { - return nil, fmt.Errorf("create firehose status logger: %w", err) + return nil, fmt.Errorf("create firehose %s logger: %w", name, err) } + return fleet.JSONLogger(writer), nil case "kinesis": - status, err = NewKinesisLogWriter( + writer, err := NewKinesisLogWriter( config.Kinesis.Region, config.Kinesis.EndpointURL, config.Kinesis.AccessKeyID, config.Kinesis.SecretAccessKey, config.Kinesis.StsAssumeRoleArn, - config.Kinesis.StatusStream, + config.Kinesis.StreamName, logger, ) if err != nil { - return nil, fmt.Errorf("create kinesis status logger: %w", err) + return nil, fmt.Errorf("create kinesis %s logger: %w", name, err) } + return fleet.JSONLogger(writer), nil case "lambda": - status, err = NewLambdaLogWriter( + writer, err := NewLambdaLogWriter( config.Lambda.Region, config.Lambda.AccessKeyID, config.Lambda.SecretAccessKey, config.Lambda.StsAssumeRoleArn, - config.Lambda.StatusFunction, + config.Lambda.Function, logger, ) if err != nil { - return nil, fmt.Errorf("create lambda status logger: %w", err) + return nil, fmt.Errorf("create lambda %s logger: %w", name, err) } + return fleet.JSONLogger(writer), nil case "pubsub": - status, err = NewPubSubLogWriter( + writer, err := NewPubSubLogWriter( config.PubSub.Project, - config.PubSub.StatusTopic, - false, - logger, - ) - if err != nil { - return nil, fmt.Errorf("create pubsub status logger: %w", err) - } - case "stdout": - status, err = NewStdoutLogWriter() - if err != nil { - return nil, fmt.Errorf("create stdout status logger: %w", err) - } - case "kafkarest": - status, err = NewKafkaRESTWriter(&KafkaRESTParams{ - KafkaProxyHost: config.KafkaREST.ProxyHost, - KafkaTopic: config.KafkaREST.StatusTopic, - KafkaContentTypeValue: config.KafkaREST.ContentTypeValue, - KafkaTimeout: config.KafkaREST.Timeout, - }) - if err != nil { - return nil, fmt.Errorf("create kafka rest status logger: %w", err) - } - default: - return nil, fmt.Errorf( - "unknown status log plugin: %s", config.Osquery.StatusLogPlugin, - ) - } - - switch config.Osquery.ResultLogPlugin { - case "": - // Allow "" to mean filesystem for backwards compatibility - level.Info(logger).Log("msg", "osquery_result_log_plugin not explicitly specified. Assuming 'filesystem'") - fallthrough - case "filesystem": - result, err = NewFilesystemLogWriter( - config.Filesystem.ResultLogFile, - logger, - config.Filesystem.EnableLogRotation, - config.Filesystem.EnableLogCompression, - ) - if err != nil { - return nil, fmt.Errorf("create filesystem result logger: %w", err) - } - case "firehose": - result, err = NewFirehoseLogWriter( - config.Firehose.Region, - config.Firehose.EndpointURL, - config.Firehose.AccessKeyID, - config.Firehose.SecretAccessKey, - config.Firehose.StsAssumeRoleArn, - config.Firehose.ResultStream, - logger, - ) - if err != nil { - return nil, fmt.Errorf("create firehose result logger: %w", err) - } - case "kinesis": - result, err = NewKinesisLogWriter( - config.Kinesis.Region, - config.Kinesis.EndpointURL, - config.Kinesis.AccessKeyID, - config.Kinesis.SecretAccessKey, - config.Kinesis.StsAssumeRoleArn, - config.Kinesis.ResultStream, - logger, - ) - if err != nil { - return nil, fmt.Errorf("create kinesis result logger: %w", err) - } - case "lambda": - result, err = NewLambdaLogWriter( - config.Lambda.Region, - config.Lambda.AccessKeyID, - config.Lambda.SecretAccessKey, - config.Lambda.StsAssumeRoleArn, - config.Lambda.ResultFunction, - logger, - ) - if err != nil { - return nil, fmt.Errorf("create lambda result logger: %w", err) - } - case "pubsub": - result, err = NewPubSubLogWriter( - config.PubSub.Project, - config.PubSub.ResultTopic, + config.PubSub.Topic, config.PubSub.AddAttributes, logger, ) if err != nil { - return nil, fmt.Errorf("create pubsub result logger: %w", err) + return nil, fmt.Errorf("create pubsub %s logger: %w", name, err) } + return fleet.JSONLogger(writer), nil case "stdout": - result, err = NewStdoutLogWriter() + writer, err := NewStdoutLogWriter() if err != nil { - return nil, fmt.Errorf("create stdout result logger: %w", err) + return nil, fmt.Errorf("create stdout %s logger: %w", name, err) } + return fleet.JSONLogger(writer), nil case "kafkarest": - result, err = NewKafkaRESTWriter(&KafkaRESTParams{ + writer, err := NewKafkaRESTWriter(&KafkaRESTParams{ KafkaProxyHost: config.KafkaREST.ProxyHost, - KafkaTopic: config.KafkaREST.ResultTopic, + KafkaTopic: config.KafkaREST.Topic, KafkaContentTypeValue: config.KafkaREST.ContentTypeValue, KafkaTimeout: config.KafkaREST.Timeout, }) if err != nil { - return nil, fmt.Errorf("create kafka rest result logger: %w", err) + return nil, fmt.Errorf("create kafka rest %s logger: %w", name, err) } + return fleet.JSONLogger(writer), nil default: return nil, fmt.Errorf( - "unknown result log plugin: %s", config.Osquery.ResultLogPlugin, + "unknown %s log plugin: %s", name, config.Plugin, ) } - return &OsqueryLogger{Status: status, Result: result}, nil } diff --git a/server/mock/datastore_mock.go b/server/mock/datastore_mock.go index 1a1f463480..31f0103edd 100644 --- a/server/mock/datastore_mock.go +++ b/server/mock/datastore_mock.go @@ -337,7 +337,9 @@ type CleanupHostOperatingSystemsFunc func(ctx context.Context) error type NewActivityFunc func(ctx context.Context, user *fleet.User, activity fleet.ActivityDetails) error -type ListActivitiesFunc func(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error) +type ListActivitiesFunc func(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) + +type MarkActivitiesAsStreamedFunc func(ctx context.Context, activityIDs []uint) error type ShouldSendStatisticsFunc func(ctx context.Context, frequency time.Duration, config config.FleetConfig) (fleet.StatisticsPayload, bool, error) @@ -1003,6 +1005,9 @@ type DataStore struct { ListActivitiesFunc ListActivitiesFunc ListActivitiesFuncInvoked bool + MarkActivitiesAsStreamedFunc MarkActivitiesAsStreamedFunc + MarkActivitiesAsStreamedFuncInvoked bool + ShouldSendStatisticsFunc ShouldSendStatisticsFunc ShouldSendStatisticsFuncInvoked bool @@ -2075,11 +2080,16 @@ func (s *DataStore) NewActivity(ctx context.Context, user *fleet.User, activity return s.NewActivityFunc(ctx, user, activity) } -func (s *DataStore) ListActivities(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error) { +func (s *DataStore) ListActivities(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) { s.ListActivitiesFuncInvoked = true return s.ListActivitiesFunc(ctx, opt) } +func (s *DataStore) MarkActivitiesAsStreamed(ctx context.Context, activityIDs []uint) error { + s.MarkActivitiesAsStreamedFuncInvoked = true + return s.MarkActivitiesAsStreamedFunc(ctx, activityIDs) +} + func (s *DataStore) ShouldSendStatistics(ctx context.Context, frequency time.Duration, config config.FleetConfig) (fleet.StatisticsPayload, bool, error) { s.ShouldSendStatisticsFuncInvoked = true return s.ShouldSendStatisticsFunc(ctx, frequency, config) diff --git a/server/service/activities.go b/server/service/activities.go index 6fed0479b8..f85c5768c7 100644 --- a/server/service/activities.go +++ b/server/service/activities.go @@ -23,7 +23,9 @@ func (r listActivitiesResponse) error() error { return r.Err } func listActivitiesEndpoint(ctx context.Context, request interface{}, svc fleet.Service) (interface{}, error) { req := request.(*listActivitiesRequest) - activities, err := svc.ListActivities(ctx, req.ListOptions) + activities, err := svc.ListActivities(ctx, fleet.ListActivitiesOptions{ + ListOptions: req.ListOptions, + }) if err != nil { return listActivitiesResponse{Err: err}, nil } @@ -32,7 +34,7 @@ func listActivitiesEndpoint(ctx context.Context, request interface{}, svc fleet. } // ListActivities returns a slice of activities for the whole organization -func (svc *Service) ListActivities(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error) { +func (svc *Service) ListActivities(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) { if err := svc.authz.Authorize(ctx, &fleet.Activity{}, fleet.ActionRead); err != nil { return nil, err } diff --git a/server/service/activities_test.go b/server/service/activities_test.go index 98f8ee8137..51b39afe39 100644 --- a/server/service/activities_test.go +++ b/server/service/activities_test.go @@ -19,7 +19,7 @@ func TestListActivities(t *testing.T) { globalUsers := []*fleet.User{test.UserAdmin, test.UserMaintainer, test.UserObserver} teamUsers := []*fleet.User{test.UserTeamAdminTeam1, test.UserTeamMaintainerTeam1, test.UserTeamObserverTeam1} - ds.ListActivitiesFunc = func(ctx context.Context, opts fleet.ListOptions) ([]*fleet.Activity, error) { + ds.ListActivitiesFunc = func(ctx context.Context, opts fleet.ListActivitiesOptions) ([]*fleet.Activity, error) { return []*fleet.Activity{ {ID: 1}, {ID: 2}, @@ -28,25 +28,25 @@ func TestListActivities(t *testing.T) { // any global user can read activities for _, u := range globalUsers { - activities, err := svc.ListActivities(test.UserContext(ctx, u), fleet.ListOptions{}) + activities, err := svc.ListActivities(test.UserContext(ctx, u), fleet.ListActivitiesOptions{}) require.NoError(t, err) require.Len(t, activities, 2) } // team users cannot read activities for _, u := range teamUsers { - _, err := svc.ListActivities(test.UserContext(ctx, u), fleet.ListOptions{}) + _, err := svc.ListActivities(test.UserContext(ctx, u), fleet.ListActivitiesOptions{}) require.Error(t, err) require.Contains(t, err.Error(), authz.ForbiddenErrorMessage) } // user with no roles cannot read activities - _, err := svc.ListActivities(test.UserContext(ctx, test.UserNoRoles), fleet.ListOptions{}) + _, err := svc.ListActivities(test.UserContext(ctx, test.UserNoRoles), fleet.ListActivitiesOptions{}) require.Error(t, err) require.Contains(t, err.Error(), authz.ForbiddenErrorMessage) // no user in context - _, err = svc.ListActivities(ctx, fleet.ListOptions{}) + _, err = svc.ListActivities(ctx, fleet.ListActivitiesOptions{}) require.Error(t, err) require.Contains(t, err.Error(), authz.ForbiddenErrorMessage) } diff --git a/server/service/appconfig.go b/server/service/appconfig.go index 2ef82ca62c..7e4aab4d2f 100644 --- a/server/service/appconfig.go +++ b/server/service/appconfig.go @@ -416,7 +416,7 @@ func (svc *Service) ModifyAppConfig(ctx context.Context, p []byte, applyOpts fle Global: true, }, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for app config modification") } } diff --git a/server/service/campaigns.go b/server/service/campaigns.go index c8441b18da..8f96b920a9 100644 --- a/server/service/campaigns.go +++ b/server/service/campaigns.go @@ -174,7 +174,7 @@ func (svc *Service) NewDistributedQueryCampaign(ctx context.Context, queryString authz.UserFromContext(ctx), activityData, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for campaign creation") } return campaign, nil } diff --git a/server/service/global_policies.go b/server/service/global_policies.go index 99db247cdf..cb304d7b09 100644 --- a/server/service/global_policies.go +++ b/server/service/global_policies.go @@ -78,7 +78,7 @@ func (svc Service) NewGlobalPolicy(ctx context.Context, p fleet.PolicyPayload) ( Name: policy.Name, }, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for global policy creation") } return policy, nil } @@ -218,7 +218,7 @@ func (svc Service) DeleteGlobalPolicies(ctx context.Context, ids []uint) ([]uint Name: policiesByID[id].Name, }, ); err != nil { - return nil, ctxerr.Wrap(ctx, err, "adding new activity for deleted policy") + return nil, ctxerr.Wrap(ctx, err, "create activity for policy deletion") } } return ids, nil @@ -480,11 +480,14 @@ func (svc *Service) ApplyPolicySpecs(ctx context.Context, policies []*fleet.Poli } // Note: Issue #4191 proposes that we move to SQL transactions for actions so that we can // rollback an action in the event of an error writing the associated activity - return svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, authz.UserFromContext(ctx), fleet.ActivityTypeAppliedSpecPolicy{ Policies: policies, }, - ) + ); err != nil { + return ctxerr.Wrap(ctx, err, "create activity for policy spec") + } + return nil } diff --git a/server/service/integration_core_test.go b/server/service/integration_core_test.go index a6c0570293..b4c1096d8f 100644 --- a/server/service/integration_core_test.go +++ b/server/service/integration_core_test.go @@ -2048,7 +2048,7 @@ func (s *integrationTestSuite) TestListActivities() { ctx := context.Background() u := s.users["admin1@example.com"] - prevActivities, err := s.ds.ListActivities(ctx, fleet.ListOptions{}) + prevActivities, err := s.ds.ListActivities(ctx, fleet.ListActivitiesOptions{}) require.NoError(t, err) err = s.ds.NewActivity(ctx, &u, fleet.ActivityTypeAppliedSpecPack{}) diff --git a/server/service/osquery_test.go b/server/service/osquery_test.go index 3b44bc7618..e4fe675164 100644 --- a/server/service/osquery_test.go +++ b/server/service/osquery_test.go @@ -25,7 +25,6 @@ import ( "github.com/fleetdm/fleet/v4/server/datastore/redis/redistest" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/live_query/live_query_mock" - "github.com/fleetdm/fleet/v4/server/logging" "github.com/fleetdm/fleet/v4/server/mock" mockresult "github.com/fleetdm/fleet/v4/server/mock/mockresult" "github.com/fleetdm/fleet/v4/server/ptr" @@ -441,7 +440,7 @@ func TestSubmitStatusLogs(t *testing.T) { serv := ((svc.(validationMiddleware)).Service).(*Service) testLogger := &testJSONLogger{} - serv.osqueryLogWriter = &logging.OsqueryLogger{Status: testLogger} + serv.osqueryLogWriter = &OsqueryLogger{Status: testLogger} logs := []string{ `{"severity":"0","filename":"tls.cpp","line":"216","message":"some message","version":"1.8.2","decorations":{"host_uuid":"uuid_foobar","username":"zwass"}}`, @@ -469,7 +468,7 @@ func TestSubmitResultLogs(t *testing.T) { serv := ((svc.(validationMiddleware)).Service).(*Service) testLogger := &testJSONLogger{} - serv.osqueryLogWriter = &logging.OsqueryLogger{Result: testLogger} + serv.osqueryLogWriter = &OsqueryLogger{Result: testLogger} logs := []string{ `{"name":"system_info","hostIdentifier":"some_uuid","calendarTime":"Fri Sep 30 17:55:15 2016 UTC","unixTime":"1475258115","decorations":{"host_uuid":"some_uuid","username":"zwass"},"columns":{"cpu_brand":"Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz","hostname":"hostimus","physical_memory":"17179869184"},"action":"added"}`, diff --git a/server/service/packs.go b/server/service/packs.go index a61840eca8..8015f6b44b 100644 --- a/server/service/packs.go +++ b/server/service/packs.go @@ -175,7 +175,7 @@ func (svc *Service) NewPack(ctx context.Context, p fleet.PackPayload) (*fleet.Pa Name: pack.Name, }, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for pack creation") } return &pack, nil @@ -271,7 +271,7 @@ func (svc *Service) ModifyPack(ctx context.Context, id uint, p fleet.PackPayload Name: pack.Name, }, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for pack modification") } return pack, err @@ -359,13 +359,16 @@ func (svc *Service) DeletePack(ctx context.Context, name string) error { return err } - return svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, authz.UserFromContext(ctx), fleet.ActivityTypeDeletedPack{ Name: pack.Name, }, - ) + ); err != nil { + return ctxerr.Wrap(ctx, err, "create activity for pack deletion") + } + return nil } //////////////////////////////////////////////////////////////////////////////// @@ -407,13 +410,16 @@ func (svc *Service) DeletePackByID(ctx context.Context, id uint) error { return err } - return svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, authz.UserFromContext(ctx), fleet.ActivityTypeDeletedPack{ Name: pack.Name, }, - ) + ); err != nil { + return ctxerr.Wrap(ctx, err, "create activity for pack deletion by id") + } + return nil } //////////////////////////////////////////////////////////////////////////////// @@ -483,11 +489,14 @@ func (svc *Service) ApplyPackSpecs(ctx context.Context, specs []*fleet.PackSpec) return nil, err } - return result, svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, authz.UserFromContext(ctx), fleet.ActivityTypeAppliedSpecPack{}, - ) + ); err != nil { + return nil, ctxerr.Wrap(ctx, err, "create activity for pack spec") + } + return result, nil } //////////////////////////////////////////////////////////////////////////////// diff --git a/server/service/queries.go b/server/service/queries.go index 4506497647..31655e887c 100644 --- a/server/service/queries.go +++ b/server/service/queries.go @@ -190,7 +190,7 @@ func (svc *Service) NewQuery(ctx context.Context, p fleet.QueryPayload) (*fleet. Name: query.Name, }, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for query creation") } return query, nil @@ -273,7 +273,7 @@ func (svc *Service) ModifyQuery(ctx context.Context, id uint, p fleet.QueryPaylo Name: query.Name, }, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for query modification") } return query, nil @@ -322,13 +322,16 @@ func (svc *Service) DeleteQuery(ctx context.Context, name string) error { return err } - return svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, authz.UserFromContext(ctx), fleet.ActivityTypeDeletedSavedQuery{ Name: name, }, - ) + ); err != nil { + return ctxerr.Wrap(ctx, err, "create activity for query deletion") + } + return nil } //////////////////////////////////////////////////////////////////////////////// @@ -374,13 +377,16 @@ func (svc *Service) DeleteQueryByID(ctx context.Context, id uint) error { return ctxerr.Wrap(ctx, err, "delete query") } - return svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, authz.UserFromContext(ctx), fleet.ActivityTypeDeletedSavedQuery{ Name: query.Name, }, - ) + ); err != nil { + return ctxerr.Wrap(ctx, err, "create activity for query deletion by id") + } + return nil } //////////////////////////////////////////////////////////////////////////////// @@ -430,17 +436,15 @@ func (svc *Service) DeleteQueries(ctx context.Context, ids []uint) (uint, error) return n, err } - err = svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, authz.UserFromContext(ctx), fleet.ActivityTypeDeletedMultipleSavedQuery{ IDs: ids, }, - ) - if err != nil { - return n, err + ); err != nil { + return 0, ctxerr.Wrap(ctx, err, "create activity for query deletions") } - return n, nil } @@ -506,13 +510,16 @@ func (svc *Service) ApplyQuerySpecs(ctx context.Context, specs []*fleet.QuerySpe return ctxerr.Wrap(ctx, err, "applying queries") } - return svc.ds.NewActivity( + if err := svc.ds.NewActivity( ctx, authz.UserFromContext(ctx), fleet.ActivityTypeAppliedSpecSavedQuery{ Specs: specs, }, - ) + ); err != nil { + return ctxerr.Wrap(ctx, err, "create activity for query spec") + } + return nil } func queryFromSpec(spec *fleet.QuerySpec) *fleet.Query { diff --git a/server/service/service.go b/server/service/service.go index b13ed25e63..f856669974 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -13,7 +13,6 @@ import ( "github.com/fleetdm/fleet/v4/server/authz" "github.com/fleetdm/fleet/v4/server/config" "github.com/fleetdm/fleet/v4/server/fleet" - "github.com/fleetdm/fleet/v4/server/logging" "github.com/fleetdm/fleet/v4/server/service/async" "github.com/fleetdm/fleet/v4/server/sso" kitlog "github.com/go-kit/kit/log" @@ -36,7 +35,7 @@ type Service struct { config config.FleetConfig clock clock.Clock - osqueryLogWriter *logging.OsqueryLogger + osqueryLogWriter *OsqueryLogger mailService fleet.MailService ssoSessionStore sso.SessionStore @@ -69,6 +68,18 @@ func (s *Service) SetEnterpriseOverrides(overrides fleet.EnterpriseOverrides) { s.EnterpriseOverrides = &overrides } +// OsqueryLogger holds osqueryd's status and result loggers. +type OsqueryLogger struct { + // Status holds the osqueryd's status logger. + // + // See https://osquery.readthedocs.io/en/stable/deployment/logging/#status-logs + Status fleet.JSONLogger + // Result holds the osqueryd's result logger. + // + // See https://osquery.readthedocs.io/en/stable/deployment/logging/#results-logs + Result fleet.JSONLogger +} + // NewService creates a new service from the config struct func NewService( ctx context.Context, @@ -76,7 +87,7 @@ func NewService( task *async.Task, resultStore fleet.QueryResultStore, logger kitlog.Logger, - osqueryLogger *logging.OsqueryLogger, + osqueryLogger *OsqueryLogger, config config.FleetConfig, mailService fleet.MailService, c clock.Clock, diff --git a/server/service/service_appconfig.go b/server/service/service_appconfig.go index 9895ad9e70..979e5d75a2 100644 --- a/server/service/service_appconfig.go +++ b/server/service/service_appconfig.go @@ -136,117 +136,98 @@ func (svc *Service) LoggingConfig(ctx context.Context) (*fleet.Logging, error) { if err := svc.authz.Authorize(ctx, &fleet.AppConfig{}, fleet.ActionRead); err != nil { return nil, err } + conf := svc.config logging := &fleet.Logging{ Debug: conf.Logging.Debug, Json: conf.Logging.JSON, } - switch conf.Osquery.StatusLogPlugin { - case "", "filesystem": - logging.Status = fleet.LoggingPlugin{ - Plugin: "filesystem", - Config: fleet.FilesystemConfig{FilesystemConfig: conf.Filesystem}, - } - case "kinesis": - logging.Status = fleet.LoggingPlugin{ - Plugin: "kinesis", - Config: fleet.KinesisConfig{ - Region: conf.Kinesis.Region, - StatusStream: conf.Kinesis.StatusStream, - ResultStream: conf.Kinesis.ResultStream, - }, - } - case "firehose": - logging.Status = fleet.LoggingPlugin{ - Plugin: "firehose", - Config: fleet.FirehoseConfig{ - Region: conf.Firehose.Region, - StatusStream: conf.Firehose.StatusStream, - ResultStream: conf.Firehose.ResultStream, - }, - } - case "lambda": - logging.Status = fleet.LoggingPlugin{ - Plugin: "lambda", - Config: fleet.LambdaConfig{ - Region: conf.Lambda.Region, - StatusFunction: conf.Lambda.StatusFunction, - ResultFunction: conf.Lambda.ResultFunction, - }, - } - case "pubsub": - logging.Status = fleet.LoggingPlugin{ - Plugin: "pubsub", - Config: fleet.PubSubConfig{PubSubConfig: conf.PubSub}, - } - case "stdout": - logging.Status = fleet.LoggingPlugin{Plugin: "stdout"} - case "kafkarest": - logging.Status = fleet.LoggingPlugin{ - Plugin: "kafkarest", - Config: fleet.KafkaRESTConfig{ - StatusTopic: conf.KafkaREST.StatusTopic, - ProxyHost: conf.KafkaREST.ProxyHost, - }, - } - default: - return nil, ctxerr.Errorf(ctx, "unrecognized logging plugin: %s", conf.Osquery.StatusLogPlugin) + loggings := []struct { + plugin string + target *fleet.LoggingPlugin + }{ + { + plugin: conf.Osquery.StatusLogPlugin, + target: &logging.Status, + }, + { + plugin: conf.Osquery.ResultLogPlugin, + target: &logging.Result, + }, } - switch conf.Osquery.ResultLogPlugin { - case "", "filesystem": - logging.Result = fleet.LoggingPlugin{ - Plugin: "filesystem", - Config: fleet.FilesystemConfig{FilesystemConfig: conf.Filesystem}, - } - case "kinesis": - logging.Result = fleet.LoggingPlugin{ - Plugin: "kinesis", - Config: fleet.KinesisConfig{ - Region: conf.Kinesis.Region, - StatusStream: conf.Kinesis.StatusStream, - ResultStream: conf.Kinesis.ResultStream, - }, - } - case "firehose": - logging.Result = fleet.LoggingPlugin{ - Plugin: "firehose", - Config: fleet.FirehoseConfig{ - Region: conf.Firehose.Region, - StatusStream: conf.Firehose.StatusStream, - ResultStream: conf.Firehose.ResultStream, - }, - } - case "lambda": - logging.Result = fleet.LoggingPlugin{ - Plugin: "lambda", - Config: fleet.LambdaConfig{ - Region: conf.Lambda.Region, - StatusFunction: conf.Lambda.StatusFunction, - ResultFunction: conf.Lambda.ResultFunction, - }, - } - case "pubsub": - logging.Result = fleet.LoggingPlugin{ - Plugin: "pubsub", - Config: fleet.PubSubConfig{PubSubConfig: conf.PubSub}, - } - case "stdout": - logging.Result = fleet.LoggingPlugin{ - Plugin: "stdout", - } - case "kafkarest": - logging.Result = fleet.LoggingPlugin{ - Plugin: "kafkarest", - Config: fleet.KafkaRESTConfig{ - ResultTopic: conf.KafkaREST.ResultTopic, - ProxyHost: conf.KafkaREST.ProxyHost, - }, - } - default: - return nil, ctxerr.Errorf(ctx, "unrecognized logging plugin: %s", conf.Osquery.ResultLogPlugin) + if conf.Activity.EnableAuditLog { + loggings = append(loggings, struct { + plugin string + target *fleet.LoggingPlugin + }{ + plugin: conf.Activity.AuditLogPlugin, + target: &logging.Audit, + }) + } + for _, lp := range loggings { + switch lp.plugin { + case "", "filesystem": + *lp.target = fleet.LoggingPlugin{ + Plugin: "filesystem", + Config: fleet.FilesystemConfig{ + FilesystemConfig: conf.Filesystem, + }, + } + case "kinesis": + *lp.target = fleet.LoggingPlugin{ + Plugin: "kinesis", + Config: fleet.KinesisConfig{ + Region: conf.Kinesis.Region, + StatusStream: conf.Kinesis.StatusStream, + ResultStream: conf.Kinesis.ResultStream, + AuditStream: conf.Kinesis.AuditStream, + }, + } + case "firehose": + *lp.target = fleet.LoggingPlugin{ + Plugin: "firehose", + Config: fleet.FirehoseConfig{ + Region: conf.Firehose.Region, + StatusStream: conf.Firehose.StatusStream, + ResultStream: conf.Firehose.ResultStream, + AuditStream: conf.Firehose.AuditStream, + }, + } + case "lambda": + *lp.target = fleet.LoggingPlugin{ + Plugin: "lambda", + Config: fleet.LambdaConfig{ + Region: conf.Lambda.Region, + StatusFunction: conf.Lambda.StatusFunction, + ResultFunction: conf.Lambda.ResultFunction, + AuditFunction: conf.Lambda.AuditFunction, + }, + } + case "pubsub": + *lp.target = fleet.LoggingPlugin{ + Plugin: "pubsub", + Config: fleet.PubSubConfig{ + PubSubConfig: conf.PubSub, + }, + } + case "stdout": + *lp.target = fleet.LoggingPlugin{Plugin: "stdout"} + case "kafkarest": + *lp.target = fleet.LoggingPlugin{ + Plugin: "kafkarest", + Config: fleet.KafkaRESTConfig{ + StatusTopic: conf.KafkaREST.StatusTopic, + ResultTopic: conf.KafkaREST.ResultTopic, + AuditTopic: conf.KafkaREST.AuditTopic, + ProxyHost: conf.KafkaREST.ProxyHost, + }, + } + default: + return nil, ctxerr.Errorf(ctx, "unrecognized logging plugin: %s", lp.plugin) + } } return logging, nil } diff --git a/server/service/service_appconfig_test.go b/server/service/service_appconfig_test.go index 920c2f5695..7645e23a70 100644 --- a/server/service/service_appconfig_test.go +++ b/server/service/service_appconfig_test.go @@ -31,7 +31,6 @@ func TestCleanupURL(t *testing.T) { assert.Equal(tt, test.expected, actual) }) } - } func TestCreateAppConfig(t *testing.T) { @@ -42,7 +41,7 @@ func TestCreateAppConfig(t *testing.T) { return &fleet.AppConfig{}, nil } - var appConfigTests = []struct { + appConfigTests := []struct { configPayload fleet.AppConfig }{ { @@ -156,6 +155,7 @@ func TestService_LoggingConfig(t *testing.T) { fileSystemConfig := fleet.FilesystemConfig{FilesystemConfig: config.FilesystemConfig{ StatusLogFile: logFile, ResultLogFile: logFile, + AuditLogFile: logFile, EnableLogRotation: false, EnableLogCompression: false, }} @@ -164,18 +164,21 @@ func TestService_LoggingConfig(t *testing.T) { Region: testFirehosePluginConfig().Firehose.Region, StatusStream: testFirehosePluginConfig().Firehose.StatusStream, ResultStream: testFirehosePluginConfig().Firehose.ResultStream, + AuditStream: testFirehosePluginConfig().Firehose.AuditStream, } kinesisConfig := fleet.KinesisConfig{ Region: testKinesisPluginConfig().Kinesis.Region, StatusStream: testKinesisPluginConfig().Kinesis.StatusStream, ResultStream: testKinesisPluginConfig().Kinesis.ResultStream, + AuditStream: testKinesisPluginConfig().Kinesis.AuditStream, } lambdaConfig := fleet.LambdaConfig{ Region: testLambdaPluginConfig().Lambda.Region, StatusFunction: testLambdaPluginConfig().Lambda.StatusFunction, ResultFunction: testLambdaPluginConfig().Lambda.ResultFunction, + AuditFunction: testLambdaPluginConfig().Lambda.AuditFunction, } pubsubConfig := fleet.PubSubConfig{ @@ -183,6 +186,7 @@ func TestService_LoggingConfig(t *testing.T) { Project: testPubSubPluginConfig().PubSub.Project, StatusTopic: testPubSubPluginConfig().PubSub.StatusTopic, ResultTopic: testPubSubPluginConfig().PubSub.ResultTopic, + AuditTopic: testPubSubPluginConfig().PubSub.AuditTopic, AddAttributes: false, }, } @@ -215,6 +219,10 @@ func TestService_LoggingConfig(t *testing.T) { Plugin: "filesystem", Config: fileSystemConfig, }, + Audit: fleet.LoggingPlugin{ + Plugin: "filesystem", + Config: fileSystemConfig, + }, }, }, { @@ -232,6 +240,10 @@ func TestService_LoggingConfig(t *testing.T) { Plugin: "firehose", Config: firehoseConfig, }, + Audit: fleet.LoggingPlugin{ + Plugin: "firehose", + Config: firehoseConfig, + }, }, }, { @@ -249,6 +261,10 @@ func TestService_LoggingConfig(t *testing.T) { Plugin: "kinesis", Config: kinesisConfig, }, + Audit: fleet.LoggingPlugin{ + Plugin: "kinesis", + Config: kinesisConfig, + }, }, }, { @@ -266,6 +282,10 @@ func TestService_LoggingConfig(t *testing.T) { Plugin: "lambda", Config: lambdaConfig, }, + Audit: fleet.LoggingPlugin{ + Plugin: "lambda", + Config: lambdaConfig, + }, }, }, { @@ -283,6 +303,10 @@ func TestService_LoggingConfig(t *testing.T) { Plugin: "pubsub", Config: pubsubConfig, }, + Audit: fleet.LoggingPlugin{ + Plugin: "pubsub", + Config: pubsubConfig, + }, }, }, { @@ -300,6 +324,10 @@ func TestService_LoggingConfig(t *testing.T) { Plugin: "stdout", Config: nil, }, + Audit: fleet.LoggingPlugin{ + Plugin: "stdout", + Config: nil, + }, }, }, { @@ -310,7 +338,6 @@ func TestService_LoggingConfig(t *testing.T) { want: nil, }, } - t.Parallel() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ds := new(mock.Store) diff --git a/server/service/team_policies.go b/server/service/team_policies.go index fd8be45c01..e0ccad980f 100644 --- a/server/service/team_policies.go +++ b/server/service/team_policies.go @@ -76,6 +76,7 @@ func (svc Service) NewTeamPolicy(ctx context.Context, teamID uint, p fleet.Polic if err != nil { return nil, ctxerr.Wrap(ctx, err, "creating policy") } + // Note: Issue #4191 proposes that we move to SQL transactions for actions so that we can // rollback an action in the event of an error writing the associated activity if err := svc.ds.NewActivity( @@ -86,7 +87,7 @@ func (svc Service) NewTeamPolicy(ctx context.Context, teamID uint, p fleet.Polic Name: policy.Name, }, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for team policy creation") } return policy, nil } @@ -251,7 +252,7 @@ func (svc Service) DeleteTeamPolicies(ctx context.Context, teamID uint, ids []ui Name: policiesByID[id].Name, }, ); err != nil { - return nil, ctxerr.Wrap(ctx, err, "adding new activity for deleted policy") + return nil, ctxerr.Wrap(ctx, err, "create activity for policy deletion") } } @@ -336,6 +337,7 @@ func (svc *Service) modifyPolicy(ctx context.Context, teamID *uint, id uint, p f if err != nil { return nil, ctxerr.Wrap(ctx, err, "saving policy") } + // Note: Issue #4191 proposes that we move to SQL transactions for actions so that we can // rollback an action in the event of an error writing the associated activity if err := svc.ds.NewActivity( @@ -346,7 +348,7 @@ func (svc *Service) modifyPolicy(ctx context.Context, teamID *uint, id uint, p f Name: policy.Name, }, ); err != nil { - return nil, err + return nil, ctxerr.Wrap(ctx, err, "create activity for policy modification") } return policy, nil diff --git a/server/service/testing_utils.go b/server/service/testing_utils.go index 48135bafd5..e4cd3c4874 100644 --- a/server/service/testing_utils.go +++ b/server/service/testing_utils.go @@ -45,10 +45,9 @@ func newTestServiceWithConfig(t *testing.T, ds fleet.Datastore, fleetConfig conf fleetConfig.Filesystem.EnableLogRotation, fleetConfig.Filesystem.EnableLogCompression, ) - require.NoError(t, err) - osqlogger := &logging.OsqueryLogger{Status: writer, Result: writer} + osqlogger := &OsqueryLogger{Status: writer, Result: writer} logger := kitlog.NewNopLogger() var ssoStore sso.SessionStore @@ -285,6 +284,7 @@ func testKinesisPluginConfig() config.FleetConfig { c := config.TestConfig() c.Osquery.ResultLogPlugin = "kinesis" c.Osquery.StatusLogPlugin = "kinesis" + c.Activity.AuditLogPlugin = "kinesis" c.Kinesis = config.KinesisConfig{ Region: "us-east-1", AccessKeyID: "foo", @@ -292,6 +292,7 @@ func testKinesisPluginConfig() config.FleetConfig { StsAssumeRoleArn: "baz", StatusStream: "test-status-stream", ResultStream: "test-result-stream", + AuditStream: "test-audit-stream", } return c } @@ -300,6 +301,7 @@ func testFirehosePluginConfig() config.FleetConfig { c := config.TestConfig() c.Osquery.ResultLogPlugin = "firehose" c.Osquery.StatusLogPlugin = "firehose" + c.Activity.AuditLogPlugin = "firehose" c.Firehose = config.FirehoseConfig{ Region: "us-east-1", AccessKeyID: "foo", @@ -307,6 +309,7 @@ func testFirehosePluginConfig() config.FleetConfig { StsAssumeRoleArn: "baz", StatusStream: "test-status-firehose", ResultStream: "test-result-firehose", + AuditStream: "test-audit-firehose", } return c } @@ -315,6 +318,7 @@ func testLambdaPluginConfig() config.FleetConfig { c := config.TestConfig() c.Osquery.ResultLogPlugin = "lambda" c.Osquery.StatusLogPlugin = "lambda" + c.Activity.AuditLogPlugin = "lambda" c.Lambda = config.LambdaConfig{ Region: "us-east-1", AccessKeyID: "foo", @@ -322,6 +326,7 @@ func testLambdaPluginConfig() config.FleetConfig { StsAssumeRoleArn: "baz", ResultFunction: "result-func", StatusFunction: "status-func", + AuditFunction: "audit-func", } return c } @@ -330,10 +335,12 @@ func testPubSubPluginConfig() config.FleetConfig { c := config.TestConfig() c.Osquery.ResultLogPlugin = "pubsub" c.Osquery.StatusLogPlugin = "pubsub" + c.Activity.AuditLogPlugin = "pubsub" c.PubSub = config.PubSubConfig{ Project: "test", StatusTopic: "status-topic", ResultTopic: "result-topic", + AuditTopic: "audit-topic", AddAttributes: false, } return c @@ -343,12 +350,17 @@ func testStdoutPluginConfig() config.FleetConfig { c := config.TestConfig() c.Osquery.ResultLogPlugin = "stdout" c.Osquery.StatusLogPlugin = "stdout" + c.Activity.AuditLogPlugin = "stdout" return c } func testUnrecognizedPluginConfig() config.FleetConfig { c := config.TestConfig() - c.Osquery = config.OsqueryConfig{ResultLogPlugin: "bar", StatusLogPlugin: "bar"} + c.Osquery = config.OsqueryConfig{ + ResultLogPlugin: "bar", + StatusLogPlugin: "bar", + } + c.Activity.AuditLogPlugin = "bar" return c }