mirror of
https://github.com/fleetdm/fleet
synced 2026-05-24 09:28:54 +00:00
Generate audit logs for activities (#9001)
* Generate audit logs for activities * Fix config tests * Fix TestGetConfig/IncludeServerConfig * Fix use of AddAttributes in results only * Stream activities asynchronously * Fix index and add logging * Revert change * Documentation fixes
This commit is contained in:
parent
dd29a4e718
commit
e1bbcfcfda
38 changed files with 1175 additions and 388 deletions
1
changes/issue-8416-stream-activities
Normal file
1
changes/issue-8416-stream-activities
Normal file
|
|
@ -0,0 +1 @@
|
|||
* Generate audit log for activities (supported log plugins are: `filesystem`, `firehose`, `kinesis`, `lambda`, `pubsub`, `kafkarest`, and `stdout`).
|
||||
|
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
|
|
@ -19,6 +20,7 @@ import (
|
|||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
apple_mdm "github.com/fleetdm/fleet/v4/server/mdm/apple"
|
||||
"github.com/fleetdm/fleet/v4/server/policies"
|
||||
"github.com/fleetdm/fleet/v4/server/ptr"
|
||||
"github.com/fleetdm/fleet/v4/server/service/externalsvc"
|
||||
"github.com/fleetdm/fleet/v4/server/service/schedule"
|
||||
"github.com/fleetdm/fleet/v4/server/vulnerabilities/msrc"
|
||||
|
|
@ -30,6 +32,7 @@ import (
|
|||
"github.com/getsentry/sentry-go"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/micromdm/nanodep/godep"
|
||||
nanodep_log "github.com/micromdm/nanodep/log"
|
||||
depsync "github.com/micromdm/nanodep/sync"
|
||||
|
|
@ -899,3 +902,99 @@ func cleanupCronStatsOnShutdown(ctx context.Context, ds fleet.Datastore, logger
|
|||
logger.Log("err", "cancel pending cron stats for instance", "details", err)
|
||||
}
|
||||
}
|
||||
|
||||
func newActivitiesStreamingSchedule(
|
||||
ctx context.Context,
|
||||
instanceID string,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
auditLogger fleet.JSONLogger,
|
||||
) (*schedule.Schedule, error) {
|
||||
const (
|
||||
name = string(fleet.CronActivitiesStreaming)
|
||||
interval = 5 * time.Minute
|
||||
)
|
||||
logger = kitlog.With(logger, "cron", name)
|
||||
s := schedule.New(
|
||||
ctx, name, instanceID, interval, ds, ds,
|
||||
schedule.WithLogger(logger),
|
||||
schedule.WithJob(
|
||||
"cron_activities_streaming",
|
||||
func(ctx context.Context) error {
|
||||
return cronActivitiesStreaming(ctx, ds, logger, auditLogger)
|
||||
},
|
||||
),
|
||||
)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
var ActivitiesToStreamBatchCount uint = 500
|
||||
|
||||
func cronActivitiesStreaming(
|
||||
ctx context.Context,
|
||||
ds fleet.Datastore,
|
||||
logger kitlog.Logger,
|
||||
auditLogger fleet.JSONLogger,
|
||||
) error {
|
||||
page := uint(0)
|
||||
for {
|
||||
// (1) Get batch of activities that haven't been streamed.
|
||||
activitiesToStream, err := ds.ListActivities(ctx, fleet.ListActivitiesOptions{
|
||||
ListOptions: fleet.ListOptions{
|
||||
OrderKey: "id",
|
||||
OrderDirection: fleet.OrderAscending,
|
||||
PerPage: ActivitiesToStreamBatchCount,
|
||||
Page: page,
|
||||
},
|
||||
Streamed: ptr.Bool(false),
|
||||
})
|
||||
if err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "list activities")
|
||||
}
|
||||
if len(activitiesToStream) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// (2) Stream the activities.
|
||||
var (
|
||||
streamedIDs []uint
|
||||
multiErr error
|
||||
)
|
||||
// We stream one activity at a time (instead of writing them all with
|
||||
// one auditLogger.Write call) to know which ones succeeded/failed,
|
||||
// and also because this method happens asynchronously,
|
||||
// so we don't need real-time performance.
|
||||
for _, activity := range activitiesToStream {
|
||||
b, err := json.Marshal(activity)
|
||||
if err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "marshal activity")
|
||||
}
|
||||
if err := auditLogger.Write(ctx, []json.RawMessage{json.RawMessage(b)}); err != nil {
|
||||
if len(streamedIDs) == 0 {
|
||||
return ctxerr.Wrapf(ctx, err, "stream first activity: %d", activity.ID)
|
||||
}
|
||||
multiErr = multierror.Append(multiErr, ctxerr.Wrapf(ctx, err, "stream activity: %d", activity.ID))
|
||||
// We stop streaming upon the first error (will retry on next cron iteration)
|
||||
break
|
||||
}
|
||||
streamedIDs = append(streamedIDs, activity.ID)
|
||||
}
|
||||
|
||||
logger.Log("streamed-events", len(streamedIDs))
|
||||
|
||||
// (3) Mark the streamed activities as streamed.
|
||||
if err := ds.MarkActivitiesAsStreamed(ctx, streamedIDs); err != nil {
|
||||
multiErr = multierror.Append(multiErr, ctxerr.Wrap(ctx, err, "mark activities as streamed"))
|
||||
}
|
||||
|
||||
// If there was an error while streaming or updating activities, return.
|
||||
if multiErr != nil {
|
||||
return multiErr
|
||||
}
|
||||
|
||||
if len(activitiesToStream) < int(ActivitiesToStreamBatchCount) {
|
||||
return nil
|
||||
}
|
||||
page += 1
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -353,9 +353,88 @@ the way that the Fleet server works.
|
|||
liveQueryStore := live_query.NewRedisLiveQuery(redisPool)
|
||||
ssoSessionStore := sso.NewSessionStore(redisPool)
|
||||
|
||||
osqueryLogger, err := logging.New(config, logger)
|
||||
// Set common configuration for all logging.
|
||||
loggingConfig := logging.Config{
|
||||
Filesystem: logging.FilesystemConfig{
|
||||
EnableLogRotation: config.Filesystem.EnableLogRotation,
|
||||
EnableLogCompression: config.Filesystem.EnableLogCompression,
|
||||
},
|
||||
Firehose: logging.FirehoseConfig{
|
||||
Region: config.Firehose.Region,
|
||||
EndpointURL: config.Firehose.EndpointURL,
|
||||
AccessKeyID: config.Firehose.AccessKeyID,
|
||||
SecretAccessKey: config.Firehose.SecretAccessKey,
|
||||
StsAssumeRoleArn: config.Firehose.StsAssumeRoleArn,
|
||||
},
|
||||
Kinesis: logging.KinesisConfig{
|
||||
Region: config.Kinesis.Region,
|
||||
EndpointURL: config.Kinesis.EndpointURL,
|
||||
AccessKeyID: config.Kinesis.AccessKeyID,
|
||||
SecretAccessKey: config.Kinesis.SecretAccessKey,
|
||||
StsAssumeRoleArn: config.Kinesis.StsAssumeRoleArn,
|
||||
},
|
||||
Lambda: logging.LambdaConfig{
|
||||
Region: config.Lambda.Region,
|
||||
AccessKeyID: config.Lambda.AccessKeyID,
|
||||
SecretAccessKey: config.Lambda.SecretAccessKey,
|
||||
StsAssumeRoleArn: config.Lambda.StsAssumeRoleArn,
|
||||
},
|
||||
PubSub: logging.PubSubConfig{
|
||||
Project: config.PubSub.Project,
|
||||
},
|
||||
KafkaREST: logging.KafkaRESTConfig{
|
||||
ProxyHost: config.KafkaREST.ProxyHost,
|
||||
ContentTypeValue: config.KafkaREST.ContentTypeValue,
|
||||
Timeout: config.KafkaREST.Timeout,
|
||||
},
|
||||
}
|
||||
|
||||
// Set specific configuration to osqueryd status logs.
|
||||
loggingConfig.Plugin = config.Osquery.StatusLogPlugin
|
||||
loggingConfig.Filesystem.LogFile = config.Filesystem.StatusLogFile
|
||||
loggingConfig.Firehose.StreamName = config.Firehose.StatusStream
|
||||
loggingConfig.Kinesis.StreamName = config.Kinesis.StatusStream
|
||||
loggingConfig.Lambda.Function = config.Lambda.StatusFunction
|
||||
loggingConfig.PubSub.Topic = config.PubSub.StatusTopic
|
||||
loggingConfig.PubSub.AddAttributes = false // only used by result logs
|
||||
loggingConfig.KafkaREST.Topic = config.KafkaREST.StatusTopic
|
||||
|
||||
osquerydStatusLogger, err := logging.NewJSONLogger("status", loggingConfig, logger)
|
||||
if err != nil {
|
||||
initFatal(err, "initializing osquery logging")
|
||||
initFatal(err, "initializing osqueryd status logging")
|
||||
}
|
||||
|
||||
// Set specific configuration to osqueryd result logs.
|
||||
loggingConfig.Plugin = config.Osquery.ResultLogPlugin
|
||||
loggingConfig.Filesystem.LogFile = config.Filesystem.ResultLogFile
|
||||
loggingConfig.Firehose.StreamName = config.Firehose.ResultStream
|
||||
loggingConfig.Kinesis.StreamName = config.Kinesis.ResultStream
|
||||
loggingConfig.Lambda.Function = config.Lambda.ResultFunction
|
||||
loggingConfig.PubSub.Topic = config.PubSub.ResultTopic
|
||||
loggingConfig.PubSub.AddAttributes = config.PubSub.AddAttributes
|
||||
loggingConfig.KafkaREST.Topic = config.KafkaREST.ResultTopic
|
||||
|
||||
osquerydResultLogger, err := logging.NewJSONLogger("result", loggingConfig, logger)
|
||||
if err != nil {
|
||||
initFatal(err, "initializing osqueryd result logging")
|
||||
}
|
||||
|
||||
var auditLogger fleet.JSONLogger
|
||||
if license.IsPremium() && config.Activity.EnableAuditLog {
|
||||
// Set specific configuration to audit logs.
|
||||
loggingConfig.Plugin = config.Activity.AuditLogPlugin
|
||||
loggingConfig.Filesystem.LogFile = config.Filesystem.AuditLogFile
|
||||
loggingConfig.Firehose.StreamName = config.Firehose.AuditStream
|
||||
loggingConfig.Kinesis.StreamName = config.Kinesis.AuditStream
|
||||
loggingConfig.Lambda.Function = config.Lambda.AuditFunction
|
||||
loggingConfig.PubSub.Topic = config.PubSub.AuditTopic
|
||||
loggingConfig.PubSub.AddAttributes = false // only used by result logs
|
||||
loggingConfig.KafkaREST.Topic = config.KafkaREST.AuditTopic
|
||||
|
||||
auditLogger, err = logging.NewJSONLogger("audit", loggingConfig, logger)
|
||||
if err != nil {
|
||||
initFatal(err, "initializing audit logging")
|
||||
}
|
||||
}
|
||||
|
||||
failingPolicySet := redis_policy_set.NewFailing(redisPool)
|
||||
|
|
@ -492,7 +571,10 @@ the way that the Fleet server works.
|
|||
task,
|
||||
resultStore,
|
||||
logger,
|
||||
osqueryLogger,
|
||||
&service.OsqueryLogger{
|
||||
Status: osquerydStatusLogger,
|
||||
Result: osquerydResultLogger,
|
||||
},
|
||||
config,
|
||||
mailService,
|
||||
clock.C,
|
||||
|
|
@ -563,6 +645,14 @@ the way that the Fleet server works.
|
|||
}
|
||||
}
|
||||
|
||||
if license.IsPremium() && config.Activity.EnableAuditLog {
|
||||
if err := cronSchedules.StartCronSchedule(func() (fleet.CronSchedule, error) {
|
||||
return newActivitiesStreamingSchedule(ctx, instanceID, ds, logger, auditLogger)
|
||||
}); err != nil {
|
||||
initFatal(err, "failed to register activities streaming schedule")
|
||||
}
|
||||
}
|
||||
|
||||
level.Info(logger).Log("msg", fmt.Sprintf("started cron schedules: %s", strings.Join(cronSchedules.ScheduleNames(), ", ")))
|
||||
|
||||
// StartCollectors starts a goroutine per collector, using ctx to cancel.
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/go-kit/log"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
@ -825,3 +826,149 @@ func TestDebugMux(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCronActivitiesStreaming(t *testing.T) {
|
||||
ds := new(mock.Store)
|
||||
|
||||
newActivity := func(
|
||||
id uint,
|
||||
actorName string,
|
||||
actorID uint,
|
||||
actorGravatar, actorEmail, actType string,
|
||||
details string,
|
||||
) *fleet.Activity {
|
||||
jsonRawMessage := json.RawMessage(details)
|
||||
return &fleet.Activity{
|
||||
ID: id,
|
||||
ActorFullName: actorName,
|
||||
ActorID: &actorID,
|
||||
ActorGravatar: &actorGravatar,
|
||||
ActorEmail: &actorEmail,
|
||||
Type: actType,
|
||||
Details: &jsonRawMessage,
|
||||
}
|
||||
}
|
||||
|
||||
a1 := newActivity(1, "foo1", 7, "foo1_gravatar", "foo1_email", "foobar1", `{"foo1":"bar1"}`)
|
||||
a2 := newActivity(2, "foo2", 8, "foo2_gravatar", "foo2_email", "foobar2", `{"foo2":"bar2"}`)
|
||||
a3 := newActivity(3, "foo3", 9, "foo3_gravatar", "foo3_email", "foobar3", `{"foo3":"bar3"}`)
|
||||
|
||||
t.Run("basic", func(t *testing.T) {
|
||||
as := []*fleet.Activity{a1, a2, a3}
|
||||
|
||||
ds.ListActivitiesFunc = func(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) {
|
||||
return as, nil
|
||||
}
|
||||
|
||||
ds.MarkActivitiesAsStreamedFunc = func(ctx context.Context, activityIDs []uint) error {
|
||||
require.Equal(t, []uint{1, 2, 3}, activityIDs)
|
||||
return nil
|
||||
}
|
||||
|
||||
var auditLogger jsonLogger
|
||||
err := cronActivitiesStreaming(context.Background(), ds, log.NewNopLogger(), &auditLogger)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, auditLogger.logs, 3)
|
||||
for i, m := range auditLogger.logs {
|
||||
var a *fleet.Activity
|
||||
err := json.Unmarshal([]byte(m), &a)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, as[i], a)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("fail_to_stream_an_activity", func(t *testing.T) {
|
||||
as := []*fleet.Activity{a1, a2, a3}
|
||||
|
||||
ds.ListActivitiesFunc = func(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) {
|
||||
return as, nil
|
||||
}
|
||||
|
||||
ds.MarkActivitiesAsStreamedFunc = func(ctx context.Context, activityIDs []uint) error {
|
||||
require.Equal(t, []uint{1}, activityIDs)
|
||||
return nil
|
||||
}
|
||||
|
||||
auditLogger := jsonLogger{failAfter: 1}
|
||||
err := cronActivitiesStreaming(context.Background(), ds, log.NewNopLogger(), &auditLogger)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, errStreamFailed)
|
||||
require.Len(t, auditLogger.logs, 1)
|
||||
var a *fleet.Activity
|
||||
err = json.Unmarshal([]byte(auditLogger.logs[0]), &a)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a1, a)
|
||||
})
|
||||
|
||||
t.Run("bigger_than_batch", func(t *testing.T) {
|
||||
// Make slice that will require three iterations (3 pages,
|
||||
// two pages of ActivitiesToStreamBatchCount and one extra page of one item.
|
||||
as := make([]*fleet.Activity, ActivitiesToStreamBatchCount*2+1)
|
||||
for i := range as {
|
||||
as[i] = newActivity(uint(i), "foo", uint(i), "foog", "fooe", "bar", `{"bar": "foo"}`)
|
||||
}
|
||||
|
||||
ds.ListActivitiesFunc = func(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) {
|
||||
require.Equal(t, opt.PerPage, ActivitiesToStreamBatchCount)
|
||||
switch opt.Page {
|
||||
case 0:
|
||||
return as[:ActivitiesToStreamBatchCount], nil
|
||||
case 1:
|
||||
return as[ActivitiesToStreamBatchCount : ActivitiesToStreamBatchCount*2], nil
|
||||
case 2:
|
||||
return as[ActivitiesToStreamBatchCount*2:], nil
|
||||
default:
|
||||
t.Fatalf("invalid page requested: %d", opt.Page)
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
call := 0
|
||||
firstBatch := make([]uint, ActivitiesToStreamBatchCount)
|
||||
secondBatch := make([]uint, ActivitiesToStreamBatchCount)
|
||||
for i := range as[:ActivitiesToStreamBatchCount] {
|
||||
firstBatch[i] = as[i].ID
|
||||
}
|
||||
for i := range as[ActivitiesToStreamBatchCount : ActivitiesToStreamBatchCount*2] {
|
||||
secondBatch[i] = as[int(ActivitiesToStreamBatchCount)+i].ID
|
||||
}
|
||||
thirdBatch := []uint{as[len(as)-1].ID}
|
||||
ds.MarkActivitiesAsStreamedFunc = func(ctx context.Context, activityIDs []uint) error {
|
||||
switch call {
|
||||
case 0:
|
||||
require.Equal(t, firstBatch, activityIDs)
|
||||
case 1:
|
||||
require.Equal(t, secondBatch, activityIDs)
|
||||
case 2:
|
||||
require.Equal(t, thirdBatch, activityIDs)
|
||||
default:
|
||||
t.Fatalf("invalid number of calls: %d", call)
|
||||
}
|
||||
call += 1
|
||||
return nil
|
||||
}
|
||||
|
||||
var auditLogger jsonLogger
|
||||
err := cronActivitiesStreaming(context.Background(), ds, log.NewNopLogger(), &auditLogger)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, auditLogger.logs, int(ActivitiesToStreamBatchCount)*2+1)
|
||||
require.Equal(t, 3, call)
|
||||
})
|
||||
}
|
||||
|
||||
var errStreamFailed = errors.New("streaming failed")
|
||||
|
||||
type jsonLogger struct {
|
||||
logs []string
|
||||
failAfter int
|
||||
}
|
||||
|
||||
func (j *jsonLogger) Write(ctx context.Context, logs []json.RawMessage) error {
|
||||
for _, log := range logs {
|
||||
if j.failAfter > 0 && len(j.logs) == j.failAfter {
|
||||
return errStreamFailed
|
||||
}
|
||||
j.logs = append(j.logs, string(log))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -630,6 +630,7 @@ spec:
|
|||
enable_log_rotation: false
|
||||
result_log_file: /dev/null
|
||||
status_log_file: /dev/null
|
||||
audit_log_file: /dev/null
|
||||
plugin: filesystem
|
||||
status:
|
||||
config:
|
||||
|
|
@ -637,6 +638,15 @@ spec:
|
|||
enable_log_rotation: false
|
||||
result_log_file: /dev/null
|
||||
status_log_file: /dev/null
|
||||
audit_log_file: /dev/null
|
||||
plugin: filesystem
|
||||
audit:
|
||||
config:
|
||||
enable_log_compression: false
|
||||
enable_log_rotation: false
|
||||
result_log_file: /dev/null
|
||||
status_log_file: /dev/null
|
||||
audit_log_file: /dev/null
|
||||
plugin: filesystem
|
||||
org_info:
|
||||
org_logo_url: ""
|
||||
|
|
@ -809,7 +819,8 @@ spec:
|
|||
"enable_log_compression": false,
|
||||
"enable_log_rotation": false,
|
||||
"result_log_file": "/dev/null",
|
||||
"status_log_file": "/dev/null"
|
||||
"status_log_file": "/dev/null",
|
||||
"audit_log_file": "/dev/null"
|
||||
}
|
||||
},
|
||||
"status": {
|
||||
|
|
@ -818,7 +829,18 @@ spec:
|
|||
"enable_log_compression": false,
|
||||
"enable_log_rotation": false,
|
||||
"result_log_file": "/dev/null",
|
||||
"status_log_file": "/dev/null"
|
||||
"status_log_file": "/dev/null",
|
||||
"audit_log_file": "/dev/null"
|
||||
}
|
||||
},
|
||||
"audit": {
|
||||
"plugin": "filesystem",
|
||||
"config": {
|
||||
"enable_log_compression": false,
|
||||
"enable_log_rotation": false,
|
||||
"result_log_file": "/dev/null",
|
||||
"status_log_file": "/dev/null",
|
||||
"audit_log_file": "/dev/null"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,19 +1,33 @@
|
|||
# Testing and local development
|
||||
|
||||
- [License key](#license-key)
|
||||
- [Simulated hosts](#hosts)
|
||||
- [Test suite](#test-suite)
|
||||
- [End-to-end tests](#end-to-end-tests)
|
||||
- [Test hosts](#test-hosts)
|
||||
- [Email](#email)
|
||||
- [Database backup/restore](#database-backuprestore)
|
||||
- [Seeding Data](https://fleetdm.com/docs/contributing/seeding-data)
|
||||
- [MySQL shell](#mysql-shell)
|
||||
- [Redis REPL](#redis-repl)
|
||||
- [Testing SSO](#testing-sso)
|
||||
- [Testing Kinesis Logging](#testing-kinesis-logging)
|
||||
- [Testing pre-built installers](#testing-pre-built-installers)
|
||||
- [Telemetry](#telemetry)
|
||||
- [Testing and local development](#testing-and-local-development)
|
||||
- [License key](#license-key)
|
||||
- [Simulated hosts](#simulated-hosts)
|
||||
- [Test suite](#test-suite)
|
||||
- [Go unit tests](#go-unit-tests)
|
||||
- [Go linters](#go-linters)
|
||||
- [Javascript unit and integration tests](#javascript-unit-and-integration-tests)
|
||||
- [Javascript linters](#javascript-linters)
|
||||
- [MySQL tests](#mysql-tests)
|
||||
- [Email tests](#email-tests)
|
||||
- [Network tests](#network-tests)
|
||||
- [Viewing test coverage](#viewing-test-coverage)
|
||||
- [End-to-end tests](#end-to-end-tests)
|
||||
- [Preparation](#preparation)
|
||||
- [Run tests](#run-tests)
|
||||
- [Interactive](#interactive)
|
||||
- [Command line](#command-line)
|
||||
- [Test hosts](#test-hosts)
|
||||
- [Email](#email)
|
||||
- [Manually testing email with MailHog](#manually-testing-email-with-mailhog)
|
||||
- [Development database management](#development-database-management)
|
||||
- [MySQL shell](#mysql-shell)
|
||||
- [Redis REPL](#redis-repl)
|
||||
- [Testing SSO](#testing-sso)
|
||||
- [Configuration](#configuration)
|
||||
- [Testing Kinesis Logging](#testing-kinesis-logging)
|
||||
- [Testing pre-built installers](#testing-pre-built-installers)
|
||||
- [Telemetry](#telemetry)
|
||||
|
||||
## License key
|
||||
|
||||
|
|
@ -404,6 +418,12 @@ awslocal kinesis get-records --shard-iterator AAAAAAAAAAERtiUrWGI0sq99TtpKnmDu6h
|
|||
[...]
|
||||
```
|
||||
|
||||
The `Data` field is base64 encoded. You can use the following command to decode:
|
||||
```
|
||||
echo eyJob3N0SWRlbnRpZmllciI6Ijg3OGE2ZWRmLTcxMzEtNGUyOC05NWEyLWQzNDQ5MDVjYWNhYiIsImNhbGVuZGFyVGltZSI6IldlZCBNYXIgIDIgMjI6MDI6NTQgMjAyMiBVVEMiLCJ1bml4VGltZSI6IjE2NDYyNTg1NzQiLCJzZXZlcml0eSI6IjAiLCJmaWxlbmFtZSI6Imdsb2dfbG9nZ2VyLmNwcCIsImxpbmUiOiI0OSIsIm1lc3NhZ2UiOiJDb3VsZCBub3QgZ2V0IFJQTSBoZWFkZXIgZmxhZy4iLCJ2ZXJzaW9uIjoiNC45LjAiLCJkZWNvcmF0aW9ucyI6eyJob3N0X3V1aWQiOiJlYjM5NDZiMi0wMDAwLTAwMDAtYjg4OC0yNTkxYTFiNjY2ZTkiLCJob3N0bmFtZSI6ImUwMDg4ZDI4YTYzZiJ9fQo= | base64 -d
|
||||
{"hostIdentifier":"878a6edf-7131-4e28-95a2-d344905cacab","calendarTime":"Wed Mar 2 22:02:54 2022 UTC","unixTime":"1646258574","severity":"0","filename":"glog_logger.cpp","line":"49","message":"Could not get RPM header flag.","version":"4.9.0","decorations":{"host_uuid":"eb3946b2-0000-0000-b888-2591a1b666e9","hostname":"e0088d28a63f"}}
|
||||
```
|
||||
|
||||
## Testing pre-built installers
|
||||
|
||||
Pre-built installers are kept in a blob storage like AWS S3. As part of your your local development there's a [MinIO](https://min.io/) instance running on http://localhost:9000. To test the pre-built installers functionality locally:
|
||||
|
|
|
|||
|
|
@ -1179,6 +1179,36 @@ osquery:
|
|||
result_log_plugin: firehose
|
||||
```
|
||||
|
||||
#### activity_enable_audit_log
|
||||
|
||||
This enables/disables the log output for audit events.
|
||||
See the `activity_audit_log_plugin` option below that specifies the logging destination.
|
||||
|
||||
The audit events are logged in an asynchronous fashion. It can take up to 5 minutes for an event to be logged.
|
||||
|
||||
- Default value: `false`
|
||||
- Environment variable: `FLEET_ACTIVITY_ENABLE_AUDIT_LOG`
|
||||
- Config file format:
|
||||
```yaml
|
||||
activity:
|
||||
enable_audit_log: true
|
||||
```
|
||||
|
||||
#### activity_audit_log_plugin
|
||||
|
||||
This is the log output plugin that should be used for audit logs.
|
||||
This flag only has effect if `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
Options are `filesystem`, `firehose`, `kinesis`, `lambda`, `pubsub`, `kafkarest`, and `stdout`.
|
||||
|
||||
- Default value: `filesystem`
|
||||
- Environment variable: `FLEET_ACTIVITY_AUDIT_LOG_PLUGIN`
|
||||
- Config file format:
|
||||
```yaml
|
||||
activity:
|
||||
audit_log_plugin: firehose
|
||||
```
|
||||
|
||||
#### Logging (Fleet server logging)
|
||||
|
||||
##### logging_debug
|
||||
|
|
@ -1269,9 +1299,25 @@ The path which osquery result logs will be logged to.
|
|||
result_log_file: /var/log/osquery/result.log
|
||||
```
|
||||
|
||||
##### filesystem_audit_log_file
|
||||
|
||||
This flag only has effect if `activity_audit_log_plugin` is set to `filesystem` (the default value) and if `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
The path which audit logs will be logged to.
|
||||
|
||||
- Default value: `/tmp/audit`
|
||||
- Environment variable: `FLEET_FILESYSTEM_AUDIT_LOG_FILE`
|
||||
- Config file format:
|
||||
```yaml
|
||||
filesystem:
|
||||
audit_log_file: /var/log/fleet/audit.log
|
||||
```
|
||||
|
||||
##### filesystem_enable_log_rotation
|
||||
|
||||
This flag only has effect if `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `filesystem` (the default value).
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `filesystem` (the default value).
|
||||
- `activity_audit_log_plugin` is set to `filesystem` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
This flag will cause the osquery result and status log files to be automatically
|
||||
rotated when files reach a size of 500 Mb or an age of 28 days.
|
||||
|
|
@ -1314,9 +1360,11 @@ filesystem:
|
|||
|
||||
##### firehose_region
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `firehose`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `firehose`.
|
||||
- `activity_audit_log_plugin` is set to `firehose` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
AWS region to use for Firehose connection
|
||||
AWS region to use for Firehose connection.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `FLEET_FIREHOSE_REGION`
|
||||
|
|
@ -1328,7 +1376,9 @@ AWS region to use for Firehose connection
|
|||
|
||||
##### firehose_access_key_id
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or `osquery_result_log_plugin` are set to `firehose`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `firehose`.
|
||||
- `activity_audit_log_plugin` is set to `firehose` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
If `firehose_access_key_id` and `firehose_secret_access_key` are omitted, Fleet will try to use [AWS STS](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html) credentials.
|
||||
|
||||
|
|
@ -1344,7 +1394,9 @@ AWS access key ID to use for Firehose authentication.
|
|||
|
||||
##### firehose_secret_access_key
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or `osquery_result_log_plugin` are set to `firehose`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `firehose`.
|
||||
- `activity_audit_log_plugin` is set to `firehose` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
AWS secret access key to use for Firehose authentication.
|
||||
|
||||
|
|
@ -1358,8 +1410,9 @@ AWS secret access key to use for Firehose authentication.
|
|||
|
||||
##### firehose_sts_assume_role_arn
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or
|
||||
`osquery_result_log_plugin` are set to `firehose`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `firehose`.
|
||||
- `activity_audit_log_plugin` is set to `firehose` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
AWS STS role ARN to use for Firehose authentication.
|
||||
|
||||
|
|
@ -1411,6 +1464,27 @@ the stream listed:
|
|||
- `firehose:DescribeDeliveryStream`
|
||||
- `firehose:PutRecordBatch`
|
||||
|
||||
##### firehose_audit_stream
|
||||
|
||||
This flag only has effect if `activity_audit_log_plugin` is set to `firehose`.
|
||||
|
||||
Name of the Firehose stream to audit logs.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `FLEET_FIREHOSE_AUDIT_STREAM`
|
||||
- Config file format:
|
||||
```yaml
|
||||
firehose:
|
||||
audit_stream: fleet_audit
|
||||
```
|
||||
|
||||
The IAM role used to send to Firehose must allow the following permissions on
|
||||
the stream listed:
|
||||
|
||||
- `firehose:DescribeDeliveryStream`
|
||||
- `firehose:PutRecordBatch`
|
||||
|
||||
|
||||
##### Example YAML
|
||||
|
||||
```yaml
|
||||
|
|
@ -1430,7 +1504,9 @@ firehose:
|
|||
|
||||
##### kinesis_region
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `kinesis`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kinesis`.
|
||||
- `activity_audit_log_plugin` is set to `kinesis` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
AWS region to use for Kinesis connection
|
||||
|
||||
|
|
@ -1444,8 +1520,9 @@ AWS region to use for Kinesis connection
|
|||
|
||||
##### kinesis_access_key_id
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or
|
||||
`osquery_result_log_plugin` are set to `kinesis`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kinesis`.
|
||||
- `activity_audit_log_plugin` is set to `kinesis` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
If `kinesis_access_key_id` and `kinesis_secret_access_key` are omitted, Fleet
|
||||
will try to use
|
||||
|
|
@ -1464,8 +1541,9 @@ AWS access key ID to use for Kinesis authentication.
|
|||
|
||||
##### kinesis_secret_access_key
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or
|
||||
`osquery_result_log_plugin` are set to `kinesis`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kinesis`.
|
||||
- `activity_audit_log_plugin` is set to `kinesis` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
AWS secret access key to use for Kinesis authentication.
|
||||
|
||||
|
|
@ -1479,8 +1557,9 @@ AWS secret access key to use for Kinesis authentication.
|
|||
|
||||
##### kinesis_sts_assume_role_arn
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or
|
||||
`osquery_result_log_plugin` are set to `kinesis`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kinesis`.
|
||||
- `activity_audit_log_plugin` is set to `kinesis` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
AWS STS role ARN to use for Kinesis authentication.
|
||||
|
||||
|
|
@ -1532,6 +1611,26 @@ the stream listed:
|
|||
- `kinesis:DescribeStream`
|
||||
- `kinesis:PutRecords`
|
||||
|
||||
##### kinesis_audit_stream
|
||||
|
||||
This flag only has effect if `activity_audit_log_plugin` is set to `kinesis`.
|
||||
|
||||
Name of the Kinesis stream to write audit logs.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `FLEET_KINESIS_AUDIT_STREAM`
|
||||
- Config file format:
|
||||
```yaml
|
||||
kinesis:
|
||||
audit_stream: fleet_audit
|
||||
```
|
||||
|
||||
The IAM role used to send to Kinesis must allow the following permissions on
|
||||
the stream listed:
|
||||
|
||||
- `kinesis:DescribeStream`
|
||||
- `kinesis:PutRecords`
|
||||
|
||||
##### Example YAML
|
||||
|
||||
```yaml
|
||||
|
|
@ -1540,7 +1639,6 @@ osquery:
|
|||
osquery_result_log_plugin: kinesis
|
||||
kinesis:
|
||||
region: ca-central-1
|
||||
result_log_file: /var/log/osquery/result.log
|
||||
access_key_id: AKIAIOSFODNN7EXAMPLE
|
||||
secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
|
||||
sts_assume_role_arn: arn:aws:iam::1234567890:role/firehose-role
|
||||
|
|
@ -1548,14 +1646,15 @@ kinesis:
|
|||
result_stream: osquery_result
|
||||
```
|
||||
|
||||
|
||||
#### Lambda
|
||||
|
||||
##### lambda_region
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `lambda`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `lambda`.
|
||||
- `activity_audit_log_plugin` is set to `lambda` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
AWS region to use for Lambda connection
|
||||
AWS region to use for Lambda connection.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `FLEET_LAMBDA_REGION`
|
||||
|
|
@ -1567,8 +1666,9 @@ AWS region to use for Lambda connection
|
|||
|
||||
##### lambda_access_key_id
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or
|
||||
`osquery_result_log_plugin` are set to `lambda`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `lambda`.
|
||||
- `activity_audit_log_plugin` is set to `lambda` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
If `lambda_access_key_id` and `lambda_secret_access_key` are omitted, Fleet
|
||||
will try to use
|
||||
|
|
@ -1587,8 +1687,9 @@ AWS access key ID to use for Lambda authentication.
|
|||
|
||||
##### lambda_secret_access_key
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or
|
||||
`osquery_result_log_plugin` are set to `lambda`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `lambda`.
|
||||
- `activity_audit_log_plugin` is set to `lambda` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
AWS secret access key to use for Lambda authentication.
|
||||
|
||||
|
|
@ -1602,8 +1703,9 @@ AWS secret access key to use for Lambda authentication.
|
|||
|
||||
##### lambda_sts_assume_role_arn
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or
|
||||
`osquery_result_log_plugin` are set to `lambda`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `lambda`.
|
||||
- `activity_audit_log_plugin` is set to `lambda` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
AWS STS role ARN to use for Lambda authentication.
|
||||
|
||||
|
|
@ -1653,6 +1755,25 @@ the function listed:
|
|||
|
||||
- `lambda:InvokeFunction`
|
||||
|
||||
##### lambda_audit_function
|
||||
|
||||
This flag only has effect if `activity_audit_log_plugin` is set to `lambda`.
|
||||
|
||||
Name of the Lambda function to write audit logs.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `FLEET_LAMBDA_AUDIT_FUNCTION`
|
||||
- Config file format:
|
||||
```yaml
|
||||
lambda:
|
||||
audit_function: auditFunction
|
||||
```
|
||||
|
||||
The IAM role used to send to Lambda must allow the following permissions on
|
||||
the function listed:
|
||||
|
||||
- `lambda:InvokeFunction`
|
||||
|
||||
##### Example YAML
|
||||
|
||||
```yaml
|
||||
|
|
@ -1672,7 +1793,9 @@ lambda:
|
|||
|
||||
##### pubsub_project
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `pubsub`.
|
||||
- `activity_audit_log_plugin` is set to `pubsub` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
The identifier of the Google Cloud project containing the pubsub topics to
|
||||
publish logs to.
|
||||
|
|
@ -1690,7 +1813,7 @@ for authentication with the service.
|
|||
|
||||
##### pubsub_result_topic
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
|
||||
This flag only has effect if `osquery_result_log_plugin` is set to `pubsub`.
|
||||
|
||||
The identifier of the pubsub topic that client results will be published to.
|
||||
|
||||
|
|
@ -1716,6 +1839,20 @@ The identifier of the pubsub topic that osquery status logs will be published to
|
|||
status_topic: osquery_status
|
||||
```
|
||||
|
||||
##### pubsub_audit_topic
|
||||
|
||||
This flag only has effect if `osquery_audit_log_plugin` is set to `pubsub`.
|
||||
|
||||
The identifier of the pubsub topic that client results will be published to.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `FLEET_PUBSUB_AUDIT_TOPIC`
|
||||
- Config file format:
|
||||
```yaml
|
||||
pubsub:
|
||||
audit_topic: fleet_audit
|
||||
```
|
||||
|
||||
##### pubsub_add_attributes
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
|
||||
|
|
@ -1756,7 +1893,9 @@ pubsub:
|
|||
|
||||
##### kafkarest_proxyhost
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or `osquery_result_log_plugin` is set to `kafkarest`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kafkarest`.
|
||||
- `activity_audit_log_plugin` is set to `kafkarest` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
The URL of the host which to check for the topic existence and post messages to the specified topic.
|
||||
|
||||
|
|
@ -1793,12 +1932,28 @@ The identifier of the kafka topic that osquery result logs will be published to.
|
|||
- Config file format:
|
||||
```yaml
|
||||
kafkarest:
|
||||
status_topic: osquery_result
|
||||
result_topic: osquery_result
|
||||
```
|
||||
|
||||
##### kafkarest_audit_topic
|
||||
|
||||
This flag only has effect if `osquery_audit_log_plugin` is set to `kafkarest`.
|
||||
|
||||
The identifier of the kafka topic that audit logs will be published to.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `FLEET_KAFKAREST_AUDIT_TOPIC`
|
||||
- Config file format:
|
||||
```yaml
|
||||
kafkarest:
|
||||
audit_topic: fleet_audit
|
||||
```
|
||||
|
||||
##### kafkarest_timeout
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` or `osquery_result_log_plugin` is set to `kafkarest`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kafkarest`.
|
||||
- `activity_audit_log_plugin` is set to `kafkarest` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
The timeout value for the http post attempt. Value is in units of seconds.
|
||||
|
||||
|
|
@ -1812,7 +1967,9 @@ The timeout value for the http post attempt. Value is in units of seconds.
|
|||
|
||||
##### kafkarest_content_type_value
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `kafkarest`.
|
||||
This flag only has effect if one of the following is true:
|
||||
- `osquery_result_log_plugin` or `osquery_status_log_plugin` are set to `kafkarest`.
|
||||
- `activity_audit_log_plugin` is set to `kafkarest` and `activity_enable_audit_log` is set to `true`.
|
||||
|
||||
The value of the Content-Type header to use in Kafka REST Proxy API calls. More information about available versions
|
||||
can be found [here](https://docs.confluent.io/platform/current/kafka-rest/api.html#content-types). _Note: only JSON format is supported_
|
||||
|
|
|
|||
|
|
@ -1,18 +1,29 @@
|
|||
# Log destinations
|
||||
|
||||
- [Amazon Kinesis Data Firehose](#amazon-kinesis-data-firehose)
|
||||
- [Snowflake](#snowflake)
|
||||
- [Splunk](#splunk)
|
||||
- [Amazon Kinesis Data Streams](#amazon-kinesis-data-streams)
|
||||
- [AWS Lambda](#aws-lambda)
|
||||
- [Google Cloud Pub/Sub](#google-cloud-pubsub)
|
||||
- [Apache Kafka](#apache-kafka)
|
||||
- [Stdout](#stdout)
|
||||
- [Filesystem](#filesystem)
|
||||
- [Log destinations](#log-destinations)
|
||||
- [Amazon Kinesis Data Firehose](#amazon-kinesis-data-firehose)
|
||||
- [Snowflake](#snowflake)
|
||||
- [Splunk](#splunk)
|
||||
- [Amazon Kinesis Data Streams](#amazon-kinesis-data-streams)
|
||||
- [AWS Lambda](#aws-lambda)
|
||||
- [Google Cloud Pub/Sub](#google-cloud-pubsub)
|
||||
- [Apache Kafka](#apache-kafka)
|
||||
- [Stdout](#stdout)
|
||||
- [Filesystem](#filesystem)
|
||||
- [Sending logs outside of Fleet](#sending-logs-outside-of-fleet)
|
||||
|
||||
This document provides a list of the supported log destinations in Fleet.
|
||||
|
||||
To configure each log destination, you must set the correct osquery logging configuration options in Fleet. Check out the reference documentation for [osquery logging configuration options](https://fleetdm.com/docs/deploying/configuration#osquery-status-log-plugin).
|
||||
Log destinations can be used in Fleet to log:
|
||||
- Osquery [status logs](https://osquery.readthedocs.io/en/stable/deployment/logging/#status-logs).
|
||||
- Osquery [schedule query result logs](https://osquery.readthedocs.io/en/stable/deployment/logging/#results-logs).
|
||||
- Fleet audit logs.
|
||||
|
||||
To configure each log destination, you must set the correct logging configuration options in Fleet.
|
||||
Check out the reference documentation for:
|
||||
- [Osquery status logging configuration options](https://fleetdm.com/docs/deploying/configuration#osquery-status-log-plugin).
|
||||
- [Osquery result logging configuration options](https://fleetdm.com/docs/deploying/configuration#osquery-result-log-plugin).
|
||||
- [Activity audit logging configuration options](https://fleetdm.com/docs/deploying/configuration#activity_audit_log_plugin).
|
||||
|
||||
## Amazon Kinesis Data Firehose
|
||||
|
||||
|
|
@ -54,8 +65,8 @@ Logs are written to [Amazon Kinesis Data Streams (Kinesis)](https://aws.amazon.c
|
|||
|
||||
Note that Kinesis logging has limits [discussed in the
|
||||
documentation](https://docs.aws.amazon.com/kinesis/latest/dev/limits.html).
|
||||
When Fleet encounters osquery logs that are too big for Kinesis, notifications appear
|
||||
in the Fleet server logs. Those osquery logs **will not** be sent to Kinesis.
|
||||
When Fleet encounters logs that are too big for Kinesis, notifications appear
|
||||
in the Fleet server logs. Those logs **will not** be sent to Kinesis.
|
||||
|
||||
## AWS Lambda
|
||||
|
||||
|
|
@ -102,7 +113,7 @@ Logs are written to stdout.
|
|||
- Plugin name: `stdout`
|
||||
- Flag namespace: [stdout](https://fleetdm.com/docs/deploying/configuration#stdout)
|
||||
|
||||
With the stdout plugin, osquery result and/or status logs are written to stdout
|
||||
With the stdout plugin, logs are written to stdout
|
||||
on the Fleet server. This is typically used for debugging or with a log
|
||||
forwarding setup that will capture and forward stdout logs into a logging
|
||||
pipeline.
|
||||
|
|
@ -119,7 +130,7 @@ The default log destination.
|
|||
- Plugin name: `filesystem`
|
||||
- Flag namespace: [filesystem](https://fleetdm.com/docs/deploying/configuration#filesystem)
|
||||
|
||||
With the filesystem plugin, osquery result and/or status logs are written to the local filesystem on the Fleet server. This is typically used with a log forwarding agent on the Fleet server that will push the logs into a logging pipeline.
|
||||
With the filesystem plugin, logs are written to the local filesystem on the Fleet server. This is typically used with a log forwarding agent on the Fleet server that will push the logs into a logging pipeline.
|
||||
|
||||
Note that if multiple load-balanced Fleet servers are used, the logs will be load-balanced across those servers (not duplicated).
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/micromdm/nanodep/storage"
|
||||
)
|
||||
|
||||
// Service wraps a free Service and implements additional premium functionality on top of it.
|
||||
type Service struct {
|
||||
fleet.Service
|
||||
|
||||
|
|
@ -31,7 +32,6 @@ func NewService(
|
|||
c clock.Clock,
|
||||
depStorage storage.AllStorage,
|
||||
) (*Service, error) {
|
||||
|
||||
authorizer, err := authz.NewAuthorizer()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("new authorizer: %w", err)
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ func (svc *Service) NewTeam(ctx context.Context, p fleet.TeamPayload) (*fleet.Te
|
|||
Name: team.Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for team creation")
|
||||
}
|
||||
|
||||
return team, nil
|
||||
|
|
@ -332,14 +332,17 @@ func (svc *Service) DeleteTeam(ctx context.Context, teamID uint) error {
|
|||
|
||||
logging.WithExtras(ctx, "id", teamID)
|
||||
|
||||
return svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
authz.UserFromContext(ctx),
|
||||
fleet.ActivityTypeDeletedTeam{
|
||||
ID: teamID,
|
||||
Name: name,
|
||||
},
|
||||
)
|
||||
); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "create activity for team deletion")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *Service) GetTeam(ctx context.Context, teamID uint) (*fleet.Team, error) {
|
||||
|
|
@ -490,7 +493,7 @@ func (svc *Service) ApplyTeamSpecs(ctx context.Context, specs []*fleet.TeamSpec,
|
|||
Teams: details,
|
||||
},
|
||||
); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "create applied team spec activity")
|
||||
return ctxerr.Wrap(ctx, err, "create activity for team spec")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -52,14 +52,12 @@ func (svc *Service) GetSSOUser(ctx context.Context, auth fleet.Auth) (*fleet.Use
|
|||
if err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "creating new SSO user")
|
||||
}
|
||||
err = svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
user,
|
||||
fleet.ActivityTypeUserAddedBySSO{},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
); err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for SSO user creation")
|
||||
}
|
||||
|
||||
return user, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -130,16 +130,24 @@ type SessionConfig struct {
|
|||
|
||||
// OsqueryConfig defines configs related to osquery
|
||||
type OsqueryConfig struct {
|
||||
NodeKeySize int `yaml:"node_key_size"`
|
||||
HostIdentifier string `yaml:"host_identifier"`
|
||||
EnrollCooldown time.Duration `yaml:"enroll_cooldown"`
|
||||
StatusLogPlugin string `yaml:"status_log_plugin"`
|
||||
ResultLogPlugin string `yaml:"result_log_plugin"`
|
||||
LabelUpdateInterval time.Duration `yaml:"label_update_interval"`
|
||||
PolicyUpdateInterval time.Duration `yaml:"policy_update_interval"`
|
||||
DetailUpdateInterval time.Duration `yaml:"detail_update_interval"`
|
||||
StatusLogFile string `yaml:"status_log_file"`
|
||||
ResultLogFile string `yaml:"result_log_file"`
|
||||
NodeKeySize int `yaml:"node_key_size"`
|
||||
HostIdentifier string `yaml:"host_identifier"`
|
||||
EnrollCooldown time.Duration `yaml:"enroll_cooldown"`
|
||||
StatusLogPlugin string `yaml:"status_log_plugin"`
|
||||
ResultLogPlugin string `yaml:"result_log_plugin"`
|
||||
LabelUpdateInterval time.Duration `yaml:"label_update_interval"`
|
||||
PolicyUpdateInterval time.Duration `yaml:"policy_update_interval"`
|
||||
DetailUpdateInterval time.Duration `yaml:"detail_update_interval"`
|
||||
|
||||
// StatusLogFile is deprecated. It was replaced by FilesystemConfig.StatusLogFile.
|
||||
//
|
||||
// TODO(lucas): We should at least add a warning if this field is populated.
|
||||
StatusLogFile string `yaml:"status_log_file"`
|
||||
// ResultLogFile is deprecated. It was replaced by FilesystemConfig.ResultLogFile.
|
||||
//
|
||||
// TODO(lucas): We should at least add a warning if this field is populated.
|
||||
ResultLogFile string `yaml:"result_log_file"`
|
||||
|
||||
EnableLogRotation bool `yaml:"enable_log_rotation"`
|
||||
MaxJitterPercent int `yaml:"max_jitter_percent"`
|
||||
EnableAsyncHostProcessing string `yaml:"enable_async_host_processing"` // true/false or per-task
|
||||
|
|
@ -217,6 +225,14 @@ type LoggingConfig struct {
|
|||
TracingType string `yaml:"tracing_type"`
|
||||
}
|
||||
|
||||
// ActivityConfig defines configs related to activities.
|
||||
type ActivityConfig struct {
|
||||
// EnableAuditLog enables logging for audit activities.
|
||||
EnableAuditLog bool `yaml:"enable_audit_log"`
|
||||
// AuditLogPlugin sets the plugin to use to log activities.
|
||||
AuditLogPlugin string `yaml:"audit_log_plugin"`
|
||||
}
|
||||
|
||||
// FirehoseConfig defines configs for the AWS Firehose logging plugin
|
||||
type FirehoseConfig struct {
|
||||
Region string
|
||||
|
|
@ -226,6 +242,7 @@ type FirehoseConfig struct {
|
|||
StsAssumeRoleArn string `yaml:"sts_assume_role_arn"`
|
||||
StatusStream string `yaml:"status_stream"`
|
||||
ResultStream string `yaml:"result_stream"`
|
||||
AuditStream string `yaml:"audit_stream"`
|
||||
}
|
||||
|
||||
// KinesisConfig defines configs for the AWS Kinesis logging plugin
|
||||
|
|
@ -237,6 +254,7 @@ type KinesisConfig struct {
|
|||
StsAssumeRoleArn string `yaml:"sts_assume_role_arn"`
|
||||
StatusStream string `yaml:"status_stream"`
|
||||
ResultStream string `yaml:"result_stream"`
|
||||
AuditStream string `yaml:"audit_stream"`
|
||||
}
|
||||
|
||||
// LambdaConfig defines configs for the AWS Lambda logging plugin
|
||||
|
|
@ -247,6 +265,7 @@ type LambdaConfig struct {
|
|||
StsAssumeRoleArn string `yaml:"sts_assume_role_arn"`
|
||||
StatusFunction string `yaml:"status_function"`
|
||||
ResultFunction string `yaml:"result_function"`
|
||||
AuditFunction string `yaml:"audit_function"`
|
||||
}
|
||||
|
||||
// S3Config defines config to enable file carving storage to an S3 bucket
|
||||
|
|
@ -267,6 +286,7 @@ type PubSubConfig struct {
|
|||
Project string `json:"project"`
|
||||
StatusTopic string `json:"status_topic" yaml:"status_topic"`
|
||||
ResultTopic string `json:"result_topic" yaml:"result_topic"`
|
||||
AuditTopic string `json:"audit_topic" yaml:"audit_topic"`
|
||||
AddAttributes bool `json:"add_attributes" yaml:"add_attributes"`
|
||||
}
|
||||
|
||||
|
|
@ -274,6 +294,7 @@ type PubSubConfig struct {
|
|||
type FilesystemConfig struct {
|
||||
StatusLogFile string `json:"status_log_file" yaml:"status_log_file"`
|
||||
ResultLogFile string `json:"result_log_file" yaml:"result_log_file"`
|
||||
AuditLogFile string `json:"audit_log_file" yaml:"audit_log_file"`
|
||||
EnableLogRotation bool `json:"enable_log_rotation" yaml:"enable_log_rotation"`
|
||||
EnableLogCompression bool `json:"enable_log_compression" yaml:"enable_log_compression"`
|
||||
}
|
||||
|
|
@ -282,6 +303,7 @@ type FilesystemConfig struct {
|
|||
type KafkaRESTConfig struct {
|
||||
StatusTopic string `json:"status_topic" yaml:"status_topic"`
|
||||
ResultTopic string `json:"result_topic" yaml:"result_topic"`
|
||||
AuditTopic string `json:"audit_topic" yaml:"audit_topic"`
|
||||
ProxyHost string `json:"proxyhost" yaml:"proxyhost"`
|
||||
ContentTypeValue string `json:"content_type_value" yaml:"content_type_value"`
|
||||
Timeout int `json:"timeout" yaml:"timeout"`
|
||||
|
|
@ -389,6 +411,7 @@ type FleetConfig struct {
|
|||
App AppConfig
|
||||
Session SessionConfig
|
||||
Osquery OsqueryConfig
|
||||
Activity ActivityConfig
|
||||
Logging LoggingConfig
|
||||
Firehose FirehoseConfig
|
||||
Kinesis KinesisConfig
|
||||
|
|
@ -826,6 +849,12 @@ func (man Manager) addConfigs() {
|
|||
man.addConfigDuration("osquery.min_software_last_opened_at_diff", 1*time.Hour,
|
||||
"Minimum time difference of the software's last opened timestamp (compared to the last one saved) to trigger an update to the database")
|
||||
|
||||
// Activities
|
||||
man.addConfigBool("activity.enable_audit_log", false,
|
||||
"Enable audit logs")
|
||||
man.addConfigString("activity.audit_log_plugin", "filesystem",
|
||||
"Log plugin to use for audit logs")
|
||||
|
||||
// Logging
|
||||
man.addConfigBool("logging.debug", false,
|
||||
"Enable debug logging")
|
||||
|
|
@ -852,6 +881,8 @@ func (man Manager) addConfigs() {
|
|||
"Firehose stream name for status logs")
|
||||
man.addConfigString("firehose.result_stream", "",
|
||||
"Firehose stream name for result logs")
|
||||
man.addConfigString("firehose.audit_stream", "",
|
||||
"Firehose stream name for audit logs")
|
||||
|
||||
// Kinesis
|
||||
man.addConfigString("kinesis.region", "", "AWS Region to use")
|
||||
|
|
@ -865,6 +896,8 @@ func (man Manager) addConfigs() {
|
|||
"Kinesis stream name for status logs")
|
||||
man.addConfigString("kinesis.result_stream", "",
|
||||
"Kinesis stream name for result logs")
|
||||
man.addConfigString("kinesis.audit_stream", "",
|
||||
"Kinesis stream name for audit logs")
|
||||
|
||||
// Lambda
|
||||
man.addConfigString("lambda.region", "", "AWS Region to use")
|
||||
|
|
@ -876,6 +909,8 @@ func (man Manager) addConfigs() {
|
|||
"Lambda function name for status logs")
|
||||
man.addConfigString("lambda.result_function", "",
|
||||
"Lambda function name for result logs")
|
||||
man.addConfigString("lambda.audit_function", "",
|
||||
"Lambda function name for audit logs")
|
||||
|
||||
// S3 for file carving
|
||||
man.addConfigString("s3.bucket", "", "Bucket where to store file carves")
|
||||
|
|
@ -892,6 +927,7 @@ func (man Manager) addConfigs() {
|
|||
man.addConfigString("pubsub.project", "", "Google Cloud Project to use")
|
||||
man.addConfigString("pubsub.status_topic", "", "PubSub topic for status logs")
|
||||
man.addConfigString("pubsub.result_topic", "", "PubSub topic for result logs")
|
||||
man.addConfigString("pubsub.audit_topic", "", "PubSub topic for audit logs")
|
||||
man.addConfigBool("pubsub.add_attributes", false, "Add PubSub attributes in addition to the message body")
|
||||
|
||||
// Filesystem
|
||||
|
|
@ -899,6 +935,8 @@ func (man Manager) addConfigs() {
|
|||
"Log file path to use for status logs")
|
||||
man.addConfigString("filesystem.result_log_file", filepath.Join(os.TempDir(), "osquery_result"),
|
||||
"Log file path to use for result logs")
|
||||
man.addConfigString("filesystem.audit_log_file", filepath.Join(os.TempDir(), "audit"),
|
||||
"Log file path to use for audit logs")
|
||||
man.addConfigBool("filesystem.enable_log_rotation", false,
|
||||
"Enable automatic rotation for osquery log files")
|
||||
man.addConfigBool("filesystem.enable_log_compression", false,
|
||||
|
|
@ -907,6 +945,7 @@ func (man Manager) addConfigs() {
|
|||
// KafkaREST
|
||||
man.addConfigString("kafkarest.status_topic", "", "Kafka REST topic for status logs")
|
||||
man.addConfigString("kafkarest.result_topic", "", "Kafka REST topic for result logs")
|
||||
man.addConfigString("kafkarest.audit_topic", "", "Kafka REST topic for audit logs")
|
||||
man.addConfigString("kafkarest.proxyhost", "", "Kafka REST proxy host url")
|
||||
man.addConfigString("kafkarest.content_type_value", "application/vnd.kafka.json.v1+json",
|
||||
"Kafka REST proxy content type header (defaults to \"application/vnd.kafka.json.v1+json\"")
|
||||
|
|
@ -1089,12 +1128,14 @@ func (man Manager) LoadConfig() FleetConfig {
|
|||
Duration: man.getConfigDuration("session.duration"),
|
||||
},
|
||||
Osquery: OsqueryConfig{
|
||||
NodeKeySize: man.getConfigInt("osquery.node_key_size"),
|
||||
HostIdentifier: man.getConfigString("osquery.host_identifier"),
|
||||
EnrollCooldown: man.getConfigDuration("osquery.enroll_cooldown"),
|
||||
StatusLogPlugin: man.getConfigString("osquery.status_log_plugin"),
|
||||
ResultLogPlugin: man.getConfigString("osquery.result_log_plugin"),
|
||||
StatusLogFile: man.getConfigString("osquery.status_log_file"),
|
||||
NodeKeySize: man.getConfigInt("osquery.node_key_size"),
|
||||
HostIdentifier: man.getConfigString("osquery.host_identifier"),
|
||||
EnrollCooldown: man.getConfigDuration("osquery.enroll_cooldown"),
|
||||
StatusLogPlugin: man.getConfigString("osquery.status_log_plugin"),
|
||||
ResultLogPlugin: man.getConfigString("osquery.result_log_plugin"),
|
||||
// StatusLogFile is deprecated. FilesystemConfig.StatusLogFile is used instead.
|
||||
StatusLogFile: man.getConfigString("osquery.status_log_file"),
|
||||
// ResultLogFile is deprecated. FilesystemConfig.ResultLogFile is used instead.
|
||||
ResultLogFile: man.getConfigString("osquery.result_log_file"),
|
||||
LabelUpdateInterval: man.getConfigDuration("osquery.label_update_interval"),
|
||||
PolicyUpdateInterval: man.getConfigDuration("osquery.policy_update_interval"),
|
||||
|
|
@ -1113,6 +1154,10 @@ func (man Manager) LoadConfig() FleetConfig {
|
|||
AsyncHostRedisScanKeysCount: man.getConfigInt("osquery.async_host_redis_scan_keys_count"),
|
||||
MinSoftwareLastOpenedAtDiff: man.getConfigDuration("osquery.min_software_last_opened_at_diff"),
|
||||
},
|
||||
Activity: ActivityConfig{
|
||||
EnableAuditLog: man.getConfigBool("activity.enable_audit_log"),
|
||||
AuditLogPlugin: man.getConfigString("activity.audit_log_plugin"),
|
||||
},
|
||||
Logging: LoggingConfig{
|
||||
Debug: man.getConfigBool("logging.debug"),
|
||||
JSON: man.getConfigBool("logging.json"),
|
||||
|
|
@ -1129,6 +1174,7 @@ func (man Manager) LoadConfig() FleetConfig {
|
|||
StsAssumeRoleArn: man.getConfigString("firehose.sts_assume_role_arn"),
|
||||
StatusStream: man.getConfigString("firehose.status_stream"),
|
||||
ResultStream: man.getConfigString("firehose.result_stream"),
|
||||
AuditStream: man.getConfigString("firehose.audit_stream"),
|
||||
},
|
||||
Kinesis: KinesisConfig{
|
||||
Region: man.getConfigString("kinesis.region"),
|
||||
|
|
@ -1137,6 +1183,7 @@ func (man Manager) LoadConfig() FleetConfig {
|
|||
SecretAccessKey: man.getConfigString("kinesis.secret_access_key"),
|
||||
StatusStream: man.getConfigString("kinesis.status_stream"),
|
||||
ResultStream: man.getConfigString("kinesis.result_stream"),
|
||||
AuditStream: man.getConfigString("kinesis.audit_stream"),
|
||||
StsAssumeRoleArn: man.getConfigString("kinesis.sts_assume_role_arn"),
|
||||
},
|
||||
Lambda: LambdaConfig{
|
||||
|
|
@ -1145,6 +1192,7 @@ func (man Manager) LoadConfig() FleetConfig {
|
|||
SecretAccessKey: man.getConfigString("lambda.secret_access_key"),
|
||||
StatusFunction: man.getConfigString("lambda.status_function"),
|
||||
ResultFunction: man.getConfigString("lambda.result_function"),
|
||||
AuditFunction: man.getConfigString("lambda.audit_function"),
|
||||
StsAssumeRoleArn: man.getConfigString("lambda.sts_assume_role_arn"),
|
||||
},
|
||||
S3: S3Config{
|
||||
|
|
@ -1162,17 +1210,20 @@ func (man Manager) LoadConfig() FleetConfig {
|
|||
Project: man.getConfigString("pubsub.project"),
|
||||
StatusTopic: man.getConfigString("pubsub.status_topic"),
|
||||
ResultTopic: man.getConfigString("pubsub.result_topic"),
|
||||
AuditTopic: man.getConfigString("pubsub.audit_topic"),
|
||||
AddAttributes: man.getConfigBool("pubsub.add_attributes"),
|
||||
},
|
||||
Filesystem: FilesystemConfig{
|
||||
StatusLogFile: man.getConfigString("filesystem.status_log_file"),
|
||||
ResultLogFile: man.getConfigString("filesystem.result_log_file"),
|
||||
AuditLogFile: man.getConfigString("filesystem.audit_log_file"),
|
||||
EnableLogRotation: man.getConfigBool("filesystem.enable_log_rotation"),
|
||||
EnableLogCompression: man.getConfigBool("filesystem.enable_log_compression"),
|
||||
},
|
||||
KafkaREST: KafkaRESTConfig{
|
||||
StatusTopic: man.getConfigString("kafkarest.status_topic"),
|
||||
ResultTopic: man.getConfigString("kafkarest.result_topic"),
|
||||
AuditTopic: man.getConfigString("kafkarest.audit_topic"),
|
||||
ProxyHost: man.getConfigString("kafkarest.proxyhost"),
|
||||
ContentTypeValue: man.getConfigString("kafkarest.content_type_value"),
|
||||
Timeout: man.getConfigInt("kafkarest.timeout"),
|
||||
|
|
@ -1551,6 +1602,10 @@ func TestConfig() FleetConfig {
|
|||
DetailUpdateInterval: 1 * time.Hour,
|
||||
MaxJitterPercent: 0,
|
||||
},
|
||||
Activity: ActivityConfig{
|
||||
EnableAuditLog: true,
|
||||
AuditLogPlugin: "filesystem",
|
||||
},
|
||||
Logging: LoggingConfig{
|
||||
Debug: true,
|
||||
DisableBanner: true,
|
||||
|
|
@ -1558,6 +1613,7 @@ func TestConfig() FleetConfig {
|
|||
Filesystem: FilesystemConfig{
|
||||
StatusLogFile: testLogFile,
|
||||
ResultLogFile: testLogFile,
|
||||
AuditLogFile: testLogFile,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,14 +30,32 @@ func (ds *Datastore) NewActivity(ctx context.Context, user *fleet.User, activity
|
|||
}
|
||||
|
||||
// ListActivities returns a slice of activities performed across the organization
|
||||
func (ds *Datastore) ListActivities(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error) {
|
||||
func (ds *Datastore) ListActivities(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) {
|
||||
activities := []*fleet.Activity{}
|
||||
query := `SELECT a.id, a.user_id, a.created_at, a.activity_type, a.details, coalesce(u.name, a.user_name) as name, u.gravatar_url, u.email
|
||||
FROM activities a LEFT JOIN users u ON (a.user_id=u.id)
|
||||
WHERE true`
|
||||
query = appendListOptionsToSQL(query, opt)
|
||||
query := `
|
||||
SELECT
|
||||
a.id,
|
||||
a.user_id,
|
||||
a.created_at,
|
||||
a.activity_type,
|
||||
a.details,
|
||||
coalesce(u.name, a.user_name) as name,
|
||||
u.gravatar_url,
|
||||
u.email,
|
||||
a.streamed
|
||||
FROM activities a
|
||||
LEFT JOIN users u ON (a.user_id=u.id)
|
||||
WHERE true`
|
||||
|
||||
err := sqlx.SelectContext(ctx, ds.reader, &activities, query)
|
||||
var args []interface{}
|
||||
if opt.Streamed != nil {
|
||||
query += " AND a.streamed = ?"
|
||||
args = append(args, *opt.Streamed)
|
||||
}
|
||||
|
||||
query = appendListOptionsToSQL(query, opt.ListOptions)
|
||||
|
||||
err := sqlx.SelectContext(ctx, ds.reader, &activities, query, args...)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, ctxerr.Wrap(ctx, notFound("Activity"))
|
||||
} else if err != nil {
|
||||
|
|
@ -46,3 +64,15 @@ func (ds *Datastore) ListActivities(ctx context.Context, opt fleet.ListOptions)
|
|||
|
||||
return activities, nil
|
||||
}
|
||||
|
||||
func (ds *Datastore) MarkActivitiesAsStreamed(ctx context.Context, activityIDs []uint) error {
|
||||
stmt := `UPDATE activities SET streamed = true WHERE id IN (?);`
|
||||
query, args, err := sqlx.In(stmt, activityIDs)
|
||||
if err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "sqlx.In mark activities as streamed")
|
||||
}
|
||||
if _, err := ds.writer.ExecContext(ctx, query, args...); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "exec mark activities as streamed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package mysql
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
|
|
@ -20,6 +21,7 @@ func TestActivity(t *testing.T) {
|
|||
}{
|
||||
{"UsernameChange", testActivityUsernameChange},
|
||||
{"New", testActivityNew},
|
||||
{"ListActivitiesStreamed", testListActivitiesStreamed},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
|
|
@ -59,7 +61,8 @@ func testActivityUsernameChange(t *testing.T, ds *Datastore) {
|
|||
GlobalRole: ptr.String(fleet.RoleObserver),
|
||||
}
|
||||
_, err := ds.NewUser(context.Background(), u)
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, ds.NewActivity(context.Background(), u, dummyActivity{
|
||||
name: "test1",
|
||||
details: map[string]interface{}{"detail": 1, "sometext": "aaa"},
|
||||
|
|
@ -69,7 +72,7 @@ func testActivityUsernameChange(t *testing.T, ds *Datastore) {
|
|||
details: map[string]interface{}{"detail": 2},
|
||||
}))
|
||||
|
||||
activities, err := ds.ListActivities(context.Background(), fleet.ListOptions{})
|
||||
activities, err := ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, activities, 2)
|
||||
assert.Equal(t, "fullname", activities[0].ActorFullName)
|
||||
|
|
@ -78,7 +81,7 @@ func testActivityUsernameChange(t *testing.T, ds *Datastore) {
|
|||
err = ds.SaveUser(context.Background(), u)
|
||||
require.NoError(t, err)
|
||||
|
||||
activities, err = ds.ListActivities(context.Background(), fleet.ListOptions{})
|
||||
activities, err = ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, activities, 2)
|
||||
assert.Equal(t, "newname", activities[0].ActorFullName)
|
||||
|
|
@ -88,7 +91,7 @@ func testActivityUsernameChange(t *testing.T, ds *Datastore) {
|
|||
err = ds.DeleteUser(context.Background(), u.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
activities, err = ds.ListActivities(context.Background(), fleet.ListOptions{})
|
||||
activities, err = ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, activities, 2)
|
||||
assert.Equal(t, "fullname", activities[0].ActorFullName)
|
||||
|
|
@ -113,9 +116,11 @@ func testActivityNew(t *testing.T, ds *Datastore) {
|
|||
details: map[string]interface{}{"detail": 2},
|
||||
}))
|
||||
|
||||
opt := fleet.ListOptions{
|
||||
Page: 0,
|
||||
PerPage: 1,
|
||||
opt := fleet.ListActivitiesOptions{
|
||||
ListOptions: fleet.ListOptions{
|
||||
Page: 0,
|
||||
PerPage: 1,
|
||||
},
|
||||
}
|
||||
activities, err := ds.ListActivities(context.Background(), opt)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -123,9 +128,11 @@ func testActivityNew(t *testing.T, ds *Datastore) {
|
|||
assert.Equal(t, "fullname", activities[0].ActorFullName)
|
||||
assert.Equal(t, "test1", activities[0].Type)
|
||||
|
||||
opt = fleet.ListOptions{
|
||||
Page: 1,
|
||||
PerPage: 1,
|
||||
opt = fleet.ListActivitiesOptions{
|
||||
ListOptions: fleet.ListOptions{
|
||||
Page: 1,
|
||||
PerPage: 1,
|
||||
},
|
||||
}
|
||||
activities, err = ds.ListActivities(context.Background(), opt)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -133,11 +140,71 @@ func testActivityNew(t *testing.T, ds *Datastore) {
|
|||
assert.Equal(t, "fullname", activities[0].ActorFullName)
|
||||
assert.Equal(t, "test2", activities[0].Type)
|
||||
|
||||
opt = fleet.ListOptions{
|
||||
Page: 0,
|
||||
PerPage: 10,
|
||||
opt = fleet.ListActivitiesOptions{
|
||||
ListOptions: fleet.ListOptions{
|
||||
Page: 0,
|
||||
PerPage: 10,
|
||||
},
|
||||
}
|
||||
activities, err = ds.ListActivities(context.Background(), opt)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, activities, 2)
|
||||
}
|
||||
|
||||
func testListActivitiesStreamed(t *testing.T, ds *Datastore) {
|
||||
u := &fleet.User{
|
||||
Password: []byte("asd"),
|
||||
Name: "fullname",
|
||||
Email: "email@asd.com",
|
||||
GlobalRole: ptr.String(fleet.RoleObserver),
|
||||
}
|
||||
_, err := ds.NewUser(context.Background(), u)
|
||||
require.Nil(t, err)
|
||||
|
||||
require.NoError(t, ds.NewActivity(context.Background(), u, dummyActivity{
|
||||
name: "test1",
|
||||
details: map[string]interface{}{"detail": 1, "sometext": "aaa"},
|
||||
}))
|
||||
require.NoError(t, ds.NewActivity(context.Background(), u, dummyActivity{
|
||||
name: "test2",
|
||||
details: map[string]interface{}{"detail": 2},
|
||||
}))
|
||||
require.NoError(t, ds.NewActivity(context.Background(), u, dummyActivity{
|
||||
name: "test3",
|
||||
details: map[string]interface{}{"detail": 3},
|
||||
}))
|
||||
|
||||
activities, err := ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, activities, 3)
|
||||
|
||||
sort.Slice(activities, func(i, j int) bool {
|
||||
return activities[i].ID < activities[j].ID
|
||||
})
|
||||
|
||||
err = ds.MarkActivitiesAsStreamed(context.Background(), []uint{activities[0].ID})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Reload activities (with streamed field updated).
|
||||
activities, err = ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, activities, 3)
|
||||
sort.Slice(activities, func(i, j int) bool {
|
||||
return activities[i].ID < activities[j].ID
|
||||
})
|
||||
|
||||
nonStreamed, err := ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{
|
||||
Streamed: ptr.Bool(false),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, nonStreamed, 2)
|
||||
require.Equal(t, nonStreamed[0], activities[1])
|
||||
require.Equal(t, nonStreamed[1], activities[2])
|
||||
|
||||
streamed, err := ds.ListActivities(context.Background(), fleet.ListActivitiesOptions{
|
||||
Streamed: ptr.Bool(true),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, streamed, 1)
|
||||
require.Equal(t, streamed[0], activities[0])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,29 @@
|
|||
package tables
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
MigrationClient.AddMigration(Up_20221220195935, Down_20221220195935)
|
||||
}
|
||||
|
||||
func Up_20221220195935(tx *sql.Tx) error {
|
||||
if _, err := tx.Exec(
|
||||
"ALTER TABLE `activities` ADD COLUMN `streamed` TINYINT(1) NOT NULL DEFAULT FALSE;",
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "adding streamed column to activities")
|
||||
}
|
||||
if _, err := tx.Exec(
|
||||
"CREATE INDEX activities_streamed_idx ON activities (streamed);",
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "create activities_streamed_idx")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Down_20221220195935(tx *sql.Tx) error {
|
||||
return nil
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
|
|
@ -435,6 +435,7 @@ type Activity struct {
|
|||
ActorEmail *string `json:"actor_email" db:"email"`
|
||||
Type string `json:"type" db:"activity_type"`
|
||||
Details *json.RawMessage `json:"details" db:"details"`
|
||||
Streamed *bool `json:"-" db:"streamed"`
|
||||
}
|
||||
|
||||
type ActivityTypeUserLoggedIn struct {
|
||||
|
|
|
|||
|
|
@ -469,6 +469,12 @@ type ListQueryOptions struct {
|
|||
OnlyObserverCanRun bool
|
||||
}
|
||||
|
||||
type ListActivitiesOptions struct {
|
||||
ListOptions
|
||||
|
||||
Streamed *bool
|
||||
}
|
||||
|
||||
// ApplySpecOptions are the options available when applying a YAML or JSON spec.
|
||||
type ApplySpecOptions struct {
|
||||
// Force indicates that any validation error in the incoming payload should
|
||||
|
|
@ -570,6 +576,7 @@ type Logging struct {
|
|||
Json bool `json:"json"`
|
||||
Result LoggingPlugin `json:"result"`
|
||||
Status LoggingPlugin `json:"status"`
|
||||
Audit LoggingPlugin `json:"audit"`
|
||||
}
|
||||
|
||||
type UpdateIntervalConfig struct {
|
||||
|
|
@ -611,6 +618,7 @@ type FirehoseConfig struct {
|
|||
Region string `json:"region"`
|
||||
StatusStream string `json:"status_stream"`
|
||||
ResultStream string `json:"result_stream"`
|
||||
AuditStream string `json:"audit_stream"`
|
||||
}
|
||||
|
||||
// KinesisConfig shadows config.KinesisConfig only exposing a subset of fields
|
||||
|
|
@ -618,6 +626,7 @@ type KinesisConfig struct {
|
|||
Region string `json:"region"`
|
||||
StatusStream string `json:"status_stream"`
|
||||
ResultStream string `json:"result_stream"`
|
||||
AuditStream string `json:"audit_stream"`
|
||||
}
|
||||
|
||||
// LambdaConfig shadows config.LambdaConfig only exposing a subset of fields
|
||||
|
|
@ -625,11 +634,13 @@ type LambdaConfig struct {
|
|||
Region string `json:"region"`
|
||||
StatusFunction string `json:"status_function"`
|
||||
ResultFunction string `json:"result_function"`
|
||||
AuditFunction string `json:"audit_function"`
|
||||
}
|
||||
|
||||
// KafkaRESTConfig shadows config.KafkaRESTConfig
|
||||
type KafkaRESTConfig struct {
|
||||
StatusTopic string `json:"status_topic"`
|
||||
ResultTopic string `json:"result_topic"`
|
||||
AuditTopic string `json:"audit_topic"`
|
||||
ProxyHost string `json:"proxyhost"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ const (
|
|||
CronVulnerabilities CronScheduleName = "vulnerabilities"
|
||||
CronAutomations CronScheduleName = "automations"
|
||||
CronIntegrations CronScheduleName = "integrations"
|
||||
CronActivitiesStreaming CronScheduleName = "activities_streaming"
|
||||
)
|
||||
|
||||
type CronSchedulesService interface {
|
||||
|
|
|
|||
|
|
@ -438,7 +438,8 @@ type Datastore interface {
|
|||
// ActivitiesStore
|
||||
|
||||
NewActivity(ctx context.Context, user *User, activity ActivityDetails) error
|
||||
ListActivities(ctx context.Context, opt ListOptions) ([]*Activity, error)
|
||||
ListActivities(ctx context.Context, opt ListActivitiesOptions) ([]*Activity, error)
|
||||
MarkActivitiesAsStreamed(ctx context.Context, activityIDs []uint) error
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// StatisticsStore
|
||||
|
|
|
|||
|
|
@ -457,7 +457,7 @@ type Service interface {
|
|||
///////////////////////////////////////////////////////////////////////////////
|
||||
// ActivitiesService
|
||||
|
||||
ListActivities(ctx context.Context, opt ListOptions) ([]*Activity, error)
|
||||
ListActivities(ctx context.Context, opt ListActivitiesOptions) ([]*Activity, error)
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// UserRolesService
|
||||
|
|
|
|||
|
|
@ -23,7 +23,8 @@ type filesystemLogWriter struct {
|
|||
writer io.WriteCloser
|
||||
}
|
||||
|
||||
// NewFilesystemLogWriter creates a log file for osquery status/result logs.
|
||||
// NewFilesystemLogWriter creates a logger that writes to a file.
|
||||
//
|
||||
// The logFile can be rotated by sending a `SIGHUP` signal to Fleet if
|
||||
// enableRotation is true
|
||||
//
|
||||
|
|
@ -43,25 +44,25 @@ func NewFilesystemLogWriter(path string, appLogger log.Logger, enableRotation bo
|
|||
}
|
||||
// Use lumberjack logger that supports rotation
|
||||
file.Close()
|
||||
osquerydLogger := &lumberjack.Logger{
|
||||
fsLogger := &lumberjack.Logger{
|
||||
Filename: path,
|
||||
MaxSize: 500, // megabytes
|
||||
MaxBackups: 3,
|
||||
MaxAge: 28, // days
|
||||
Compress: enableCompression,
|
||||
}
|
||||
appLogger = log.With(appLogger, "component", "osqueryd-logger")
|
||||
appLogger = log.With(appLogger, "component", "filesystem-logger")
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGHUP)
|
||||
go func() {
|
||||
for {
|
||||
<-sig // block on signal
|
||||
if err := osquerydLogger.Rotate(); err != nil {
|
||||
if err := fsLogger.Rotate(); err != nil {
|
||||
appLogger.Log("err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return &filesystemLogWriter{osquerydLogger}, nil
|
||||
return &filesystemLogWriter{fsLogger}, nil
|
||||
}
|
||||
|
||||
// If writer is based on bufio we want to flush after a batch of
|
||||
|
|
|
|||
|
|
@ -1,191 +1,168 @@
|
|||
// Package logging provides logger "plugins" for writing osquery status and
|
||||
// result logs to various destinations.
|
||||
// Package logging provides logger "plugins" for various destinations.
|
||||
package logging
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/config"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
)
|
||||
|
||||
type OsqueryLogger struct {
|
||||
Status fleet.JSONLogger
|
||||
Result fleet.JSONLogger
|
||||
type FilesystemConfig struct {
|
||||
LogFile string
|
||||
|
||||
EnableLogRotation bool
|
||||
EnableLogCompression bool
|
||||
}
|
||||
|
||||
func New(config config.FleetConfig, logger log.Logger) (*OsqueryLogger, error) {
|
||||
var status, result fleet.JSONLogger
|
||||
var err error
|
||||
type FirehoseConfig struct {
|
||||
StreamName string
|
||||
|
||||
switch config.Osquery.StatusLogPlugin {
|
||||
Region string
|
||||
EndpointURL string
|
||||
AccessKeyID string
|
||||
SecretAccessKey string
|
||||
StsAssumeRoleArn string
|
||||
}
|
||||
|
||||
type KinesisConfig struct {
|
||||
StreamName string
|
||||
|
||||
Region string
|
||||
EndpointURL string
|
||||
AccessKeyID string
|
||||
SecretAccessKey string
|
||||
StsAssumeRoleArn string
|
||||
}
|
||||
|
||||
type LambdaConfig struct {
|
||||
Function string
|
||||
|
||||
Region string
|
||||
AccessKeyID string
|
||||
SecretAccessKey string
|
||||
StsAssumeRoleArn string
|
||||
}
|
||||
|
||||
type PubSubConfig struct {
|
||||
Topic string
|
||||
|
||||
Project string
|
||||
AddAttributes bool
|
||||
}
|
||||
|
||||
type KafkaRESTConfig struct {
|
||||
Topic string
|
||||
|
||||
ProxyHost string
|
||||
ContentTypeValue string
|
||||
Timeout int
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Plugin string
|
||||
|
||||
Filesystem FilesystemConfig
|
||||
Firehose FirehoseConfig
|
||||
Kinesis KinesisConfig
|
||||
Lambda LambdaConfig
|
||||
PubSub PubSubConfig
|
||||
KafkaREST KafkaRESTConfig
|
||||
}
|
||||
|
||||
func NewJSONLogger(name string, config Config, logger log.Logger) (fleet.JSONLogger, error) {
|
||||
switch config.Plugin {
|
||||
case "":
|
||||
// Allow "" to mean filesystem for backwards compatibility
|
||||
level.Info(logger).Log("msg", "osquery_status_log_plugin not explicitly specified. Assuming 'filesystem'")
|
||||
level.Info(logger).Log(
|
||||
"msg",
|
||||
fmt.Sprintf("plugin for %s not explicitly specified. Assuming 'filesystem'", name),
|
||||
)
|
||||
fallthrough
|
||||
case "filesystem":
|
||||
status, err = NewFilesystemLogWriter(
|
||||
config.Filesystem.StatusLogFile,
|
||||
writer, err := NewFilesystemLogWriter(
|
||||
config.Filesystem.LogFile,
|
||||
logger,
|
||||
config.Filesystem.EnableLogRotation,
|
||||
config.Filesystem.EnableLogCompression,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create filesystem status logger: %w", err)
|
||||
return nil, fmt.Errorf("create filesystem %s logger: %w", name, err)
|
||||
}
|
||||
return fleet.JSONLogger(writer), nil
|
||||
case "firehose":
|
||||
status, err = NewFirehoseLogWriter(
|
||||
writer, err := NewFirehoseLogWriter(
|
||||
config.Firehose.Region,
|
||||
config.Firehose.EndpointURL,
|
||||
config.Firehose.AccessKeyID,
|
||||
config.Firehose.SecretAccessKey,
|
||||
config.Firehose.StsAssumeRoleArn,
|
||||
config.Firehose.StatusStream,
|
||||
config.Firehose.StreamName,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create firehose status logger: %w", err)
|
||||
return nil, fmt.Errorf("create firehose %s logger: %w", name, err)
|
||||
}
|
||||
return fleet.JSONLogger(writer), nil
|
||||
case "kinesis":
|
||||
status, err = NewKinesisLogWriter(
|
||||
writer, err := NewKinesisLogWriter(
|
||||
config.Kinesis.Region,
|
||||
config.Kinesis.EndpointURL,
|
||||
config.Kinesis.AccessKeyID,
|
||||
config.Kinesis.SecretAccessKey,
|
||||
config.Kinesis.StsAssumeRoleArn,
|
||||
config.Kinesis.StatusStream,
|
||||
config.Kinesis.StreamName,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create kinesis status logger: %w", err)
|
||||
return nil, fmt.Errorf("create kinesis %s logger: %w", name, err)
|
||||
}
|
||||
return fleet.JSONLogger(writer), nil
|
||||
case "lambda":
|
||||
status, err = NewLambdaLogWriter(
|
||||
writer, err := NewLambdaLogWriter(
|
||||
config.Lambda.Region,
|
||||
config.Lambda.AccessKeyID,
|
||||
config.Lambda.SecretAccessKey,
|
||||
config.Lambda.StsAssumeRoleArn,
|
||||
config.Lambda.StatusFunction,
|
||||
config.Lambda.Function,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create lambda status logger: %w", err)
|
||||
return nil, fmt.Errorf("create lambda %s logger: %w", name, err)
|
||||
}
|
||||
return fleet.JSONLogger(writer), nil
|
||||
case "pubsub":
|
||||
status, err = NewPubSubLogWriter(
|
||||
writer, err := NewPubSubLogWriter(
|
||||
config.PubSub.Project,
|
||||
config.PubSub.StatusTopic,
|
||||
false,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create pubsub status logger: %w", err)
|
||||
}
|
||||
case "stdout":
|
||||
status, err = NewStdoutLogWriter()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create stdout status logger: %w", err)
|
||||
}
|
||||
case "kafkarest":
|
||||
status, err = NewKafkaRESTWriter(&KafkaRESTParams{
|
||||
KafkaProxyHost: config.KafkaREST.ProxyHost,
|
||||
KafkaTopic: config.KafkaREST.StatusTopic,
|
||||
KafkaContentTypeValue: config.KafkaREST.ContentTypeValue,
|
||||
KafkaTimeout: config.KafkaREST.Timeout,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create kafka rest status logger: %w", err)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf(
|
||||
"unknown status log plugin: %s", config.Osquery.StatusLogPlugin,
|
||||
)
|
||||
}
|
||||
|
||||
switch config.Osquery.ResultLogPlugin {
|
||||
case "":
|
||||
// Allow "" to mean filesystem for backwards compatibility
|
||||
level.Info(logger).Log("msg", "osquery_result_log_plugin not explicitly specified. Assuming 'filesystem'")
|
||||
fallthrough
|
||||
case "filesystem":
|
||||
result, err = NewFilesystemLogWriter(
|
||||
config.Filesystem.ResultLogFile,
|
||||
logger,
|
||||
config.Filesystem.EnableLogRotation,
|
||||
config.Filesystem.EnableLogCompression,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create filesystem result logger: %w", err)
|
||||
}
|
||||
case "firehose":
|
||||
result, err = NewFirehoseLogWriter(
|
||||
config.Firehose.Region,
|
||||
config.Firehose.EndpointURL,
|
||||
config.Firehose.AccessKeyID,
|
||||
config.Firehose.SecretAccessKey,
|
||||
config.Firehose.StsAssumeRoleArn,
|
||||
config.Firehose.ResultStream,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create firehose result logger: %w", err)
|
||||
}
|
||||
case "kinesis":
|
||||
result, err = NewKinesisLogWriter(
|
||||
config.Kinesis.Region,
|
||||
config.Kinesis.EndpointURL,
|
||||
config.Kinesis.AccessKeyID,
|
||||
config.Kinesis.SecretAccessKey,
|
||||
config.Kinesis.StsAssumeRoleArn,
|
||||
config.Kinesis.ResultStream,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create kinesis result logger: %w", err)
|
||||
}
|
||||
case "lambda":
|
||||
result, err = NewLambdaLogWriter(
|
||||
config.Lambda.Region,
|
||||
config.Lambda.AccessKeyID,
|
||||
config.Lambda.SecretAccessKey,
|
||||
config.Lambda.StsAssumeRoleArn,
|
||||
config.Lambda.ResultFunction,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create lambda result logger: %w", err)
|
||||
}
|
||||
case "pubsub":
|
||||
result, err = NewPubSubLogWriter(
|
||||
config.PubSub.Project,
|
||||
config.PubSub.ResultTopic,
|
||||
config.PubSub.Topic,
|
||||
config.PubSub.AddAttributes,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create pubsub result logger: %w", err)
|
||||
return nil, fmt.Errorf("create pubsub %s logger: %w", name, err)
|
||||
}
|
||||
return fleet.JSONLogger(writer), nil
|
||||
case "stdout":
|
||||
result, err = NewStdoutLogWriter()
|
||||
writer, err := NewStdoutLogWriter()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create stdout result logger: %w", err)
|
||||
return nil, fmt.Errorf("create stdout %s logger: %w", name, err)
|
||||
}
|
||||
return fleet.JSONLogger(writer), nil
|
||||
case "kafkarest":
|
||||
result, err = NewKafkaRESTWriter(&KafkaRESTParams{
|
||||
writer, err := NewKafkaRESTWriter(&KafkaRESTParams{
|
||||
KafkaProxyHost: config.KafkaREST.ProxyHost,
|
||||
KafkaTopic: config.KafkaREST.ResultTopic,
|
||||
KafkaTopic: config.KafkaREST.Topic,
|
||||
KafkaContentTypeValue: config.KafkaREST.ContentTypeValue,
|
||||
KafkaTimeout: config.KafkaREST.Timeout,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create kafka rest result logger: %w", err)
|
||||
return nil, fmt.Errorf("create kafka rest %s logger: %w", name, err)
|
||||
}
|
||||
return fleet.JSONLogger(writer), nil
|
||||
default:
|
||||
return nil, fmt.Errorf(
|
||||
"unknown result log plugin: %s", config.Osquery.ResultLogPlugin,
|
||||
"unknown %s log plugin: %s", name, config.Plugin,
|
||||
)
|
||||
}
|
||||
return &OsqueryLogger{Status: status, Result: result}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -337,7 +337,9 @@ type CleanupHostOperatingSystemsFunc func(ctx context.Context) error
|
|||
|
||||
type NewActivityFunc func(ctx context.Context, user *fleet.User, activity fleet.ActivityDetails) error
|
||||
|
||||
type ListActivitiesFunc func(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error)
|
||||
type ListActivitiesFunc func(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error)
|
||||
|
||||
type MarkActivitiesAsStreamedFunc func(ctx context.Context, activityIDs []uint) error
|
||||
|
||||
type ShouldSendStatisticsFunc func(ctx context.Context, frequency time.Duration, config config.FleetConfig) (fleet.StatisticsPayload, bool, error)
|
||||
|
||||
|
|
@ -1003,6 +1005,9 @@ type DataStore struct {
|
|||
ListActivitiesFunc ListActivitiesFunc
|
||||
ListActivitiesFuncInvoked bool
|
||||
|
||||
MarkActivitiesAsStreamedFunc MarkActivitiesAsStreamedFunc
|
||||
MarkActivitiesAsStreamedFuncInvoked bool
|
||||
|
||||
ShouldSendStatisticsFunc ShouldSendStatisticsFunc
|
||||
ShouldSendStatisticsFuncInvoked bool
|
||||
|
||||
|
|
@ -2075,11 +2080,16 @@ func (s *DataStore) NewActivity(ctx context.Context, user *fleet.User, activity
|
|||
return s.NewActivityFunc(ctx, user, activity)
|
||||
}
|
||||
|
||||
func (s *DataStore) ListActivities(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error) {
|
||||
func (s *DataStore) ListActivities(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) {
|
||||
s.ListActivitiesFuncInvoked = true
|
||||
return s.ListActivitiesFunc(ctx, opt)
|
||||
}
|
||||
|
||||
func (s *DataStore) MarkActivitiesAsStreamed(ctx context.Context, activityIDs []uint) error {
|
||||
s.MarkActivitiesAsStreamedFuncInvoked = true
|
||||
return s.MarkActivitiesAsStreamedFunc(ctx, activityIDs)
|
||||
}
|
||||
|
||||
func (s *DataStore) ShouldSendStatistics(ctx context.Context, frequency time.Duration, config config.FleetConfig) (fleet.StatisticsPayload, bool, error) {
|
||||
s.ShouldSendStatisticsFuncInvoked = true
|
||||
return s.ShouldSendStatisticsFunc(ctx, frequency, config)
|
||||
|
|
|
|||
|
|
@ -23,7 +23,9 @@ func (r listActivitiesResponse) error() error { return r.Err }
|
|||
|
||||
func listActivitiesEndpoint(ctx context.Context, request interface{}, svc fleet.Service) (interface{}, error) {
|
||||
req := request.(*listActivitiesRequest)
|
||||
activities, err := svc.ListActivities(ctx, req.ListOptions)
|
||||
activities, err := svc.ListActivities(ctx, fleet.ListActivitiesOptions{
|
||||
ListOptions: req.ListOptions,
|
||||
})
|
||||
if err != nil {
|
||||
return listActivitiesResponse{Err: err}, nil
|
||||
}
|
||||
|
|
@ -32,7 +34,7 @@ func listActivitiesEndpoint(ctx context.Context, request interface{}, svc fleet.
|
|||
}
|
||||
|
||||
// ListActivities returns a slice of activities for the whole organization
|
||||
func (svc *Service) ListActivities(ctx context.Context, opt fleet.ListOptions) ([]*fleet.Activity, error) {
|
||||
func (svc *Service) ListActivities(ctx context.Context, opt fleet.ListActivitiesOptions) ([]*fleet.Activity, error) {
|
||||
if err := svc.authz.Authorize(ctx, &fleet.Activity{}, fleet.ActionRead); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ func TestListActivities(t *testing.T) {
|
|||
globalUsers := []*fleet.User{test.UserAdmin, test.UserMaintainer, test.UserObserver}
|
||||
teamUsers := []*fleet.User{test.UserTeamAdminTeam1, test.UserTeamMaintainerTeam1, test.UserTeamObserverTeam1}
|
||||
|
||||
ds.ListActivitiesFunc = func(ctx context.Context, opts fleet.ListOptions) ([]*fleet.Activity, error) {
|
||||
ds.ListActivitiesFunc = func(ctx context.Context, opts fleet.ListActivitiesOptions) ([]*fleet.Activity, error) {
|
||||
return []*fleet.Activity{
|
||||
{ID: 1},
|
||||
{ID: 2},
|
||||
|
|
@ -28,25 +28,25 @@ func TestListActivities(t *testing.T) {
|
|||
|
||||
// any global user can read activities
|
||||
for _, u := range globalUsers {
|
||||
activities, err := svc.ListActivities(test.UserContext(ctx, u), fleet.ListOptions{})
|
||||
activities, err := svc.ListActivities(test.UserContext(ctx, u), fleet.ListActivitiesOptions{})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, activities, 2)
|
||||
}
|
||||
|
||||
// team users cannot read activities
|
||||
for _, u := range teamUsers {
|
||||
_, err := svc.ListActivities(test.UserContext(ctx, u), fleet.ListOptions{})
|
||||
_, err := svc.ListActivities(test.UserContext(ctx, u), fleet.ListActivitiesOptions{})
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), authz.ForbiddenErrorMessage)
|
||||
}
|
||||
|
||||
// user with no roles cannot read activities
|
||||
_, err := svc.ListActivities(test.UserContext(ctx, test.UserNoRoles), fleet.ListOptions{})
|
||||
_, err := svc.ListActivities(test.UserContext(ctx, test.UserNoRoles), fleet.ListActivitiesOptions{})
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), authz.ForbiddenErrorMessage)
|
||||
|
||||
// no user in context
|
||||
_, err = svc.ListActivities(ctx, fleet.ListOptions{})
|
||||
_, err = svc.ListActivities(ctx, fleet.ListActivitiesOptions{})
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), authz.ForbiddenErrorMessage)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -416,7 +416,7 @@ func (svc *Service) ModifyAppConfig(ctx context.Context, p []byte, applyOpts fle
|
|||
Global: true,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for app config modification")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -174,7 +174,7 @@ func (svc *Service) NewDistributedQueryCampaign(ctx context.Context, queryString
|
|||
authz.UserFromContext(ctx),
|
||||
activityData,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for campaign creation")
|
||||
}
|
||||
return campaign, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ func (svc Service) NewGlobalPolicy(ctx context.Context, p fleet.PolicyPayload) (
|
|||
Name: policy.Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for global policy creation")
|
||||
}
|
||||
return policy, nil
|
||||
}
|
||||
|
|
@ -218,7 +218,7 @@ func (svc Service) DeleteGlobalPolicies(ctx context.Context, ids []uint) ([]uint
|
|||
Name: policiesByID[id].Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "adding new activity for deleted policy")
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for policy deletion")
|
||||
}
|
||||
}
|
||||
return ids, nil
|
||||
|
|
@ -480,11 +480,14 @@ func (svc *Service) ApplyPolicySpecs(ctx context.Context, policies []*fleet.Poli
|
|||
}
|
||||
// Note: Issue #4191 proposes that we move to SQL transactions for actions so that we can
|
||||
// rollback an action in the event of an error writing the associated activity
|
||||
return svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
authz.UserFromContext(ctx),
|
||||
fleet.ActivityTypeAppliedSpecPolicy{
|
||||
Policies: policies,
|
||||
},
|
||||
)
|
||||
); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "create activity for policy spec")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2048,7 +2048,7 @@ func (s *integrationTestSuite) TestListActivities() {
|
|||
ctx := context.Background()
|
||||
u := s.users["admin1@example.com"]
|
||||
|
||||
prevActivities, err := s.ds.ListActivities(ctx, fleet.ListOptions{})
|
||||
prevActivities, err := s.ds.ListActivities(ctx, fleet.ListActivitiesOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.ds.NewActivity(ctx, &u, fleet.ActivityTypeAppliedSpecPack{})
|
||||
|
|
|
|||
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/fleetdm/fleet/v4/server/datastore/redis/redistest"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/fleetdm/fleet/v4/server/live_query/live_query_mock"
|
||||
"github.com/fleetdm/fleet/v4/server/logging"
|
||||
"github.com/fleetdm/fleet/v4/server/mock"
|
||||
mockresult "github.com/fleetdm/fleet/v4/server/mock/mockresult"
|
||||
"github.com/fleetdm/fleet/v4/server/ptr"
|
||||
|
|
@ -441,7 +440,7 @@ func TestSubmitStatusLogs(t *testing.T) {
|
|||
serv := ((svc.(validationMiddleware)).Service).(*Service)
|
||||
|
||||
testLogger := &testJSONLogger{}
|
||||
serv.osqueryLogWriter = &logging.OsqueryLogger{Status: testLogger}
|
||||
serv.osqueryLogWriter = &OsqueryLogger{Status: testLogger}
|
||||
|
||||
logs := []string{
|
||||
`{"severity":"0","filename":"tls.cpp","line":"216","message":"some message","version":"1.8.2","decorations":{"host_uuid":"uuid_foobar","username":"zwass"}}`,
|
||||
|
|
@ -469,7 +468,7 @@ func TestSubmitResultLogs(t *testing.T) {
|
|||
serv := ((svc.(validationMiddleware)).Service).(*Service)
|
||||
|
||||
testLogger := &testJSONLogger{}
|
||||
serv.osqueryLogWriter = &logging.OsqueryLogger{Result: testLogger}
|
||||
serv.osqueryLogWriter = &OsqueryLogger{Result: testLogger}
|
||||
|
||||
logs := []string{
|
||||
`{"name":"system_info","hostIdentifier":"some_uuid","calendarTime":"Fri Sep 30 17:55:15 2016 UTC","unixTime":"1475258115","decorations":{"host_uuid":"some_uuid","username":"zwass"},"columns":{"cpu_brand":"Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz","hostname":"hostimus","physical_memory":"17179869184"},"action":"added"}`,
|
||||
|
|
|
|||
|
|
@ -175,7 +175,7 @@ func (svc *Service) NewPack(ctx context.Context, p fleet.PackPayload) (*fleet.Pa
|
|||
Name: pack.Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for pack creation")
|
||||
}
|
||||
|
||||
return &pack, nil
|
||||
|
|
@ -271,7 +271,7 @@ func (svc *Service) ModifyPack(ctx context.Context, id uint, p fleet.PackPayload
|
|||
Name: pack.Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for pack modification")
|
||||
}
|
||||
|
||||
return pack, err
|
||||
|
|
@ -359,13 +359,16 @@ func (svc *Service) DeletePack(ctx context.Context, name string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
authz.UserFromContext(ctx),
|
||||
fleet.ActivityTypeDeletedPack{
|
||||
Name: pack.Name,
|
||||
},
|
||||
)
|
||||
); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "create activity for pack deletion")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
@ -407,13 +410,16 @@ func (svc *Service) DeletePackByID(ctx context.Context, id uint) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
authz.UserFromContext(ctx),
|
||||
fleet.ActivityTypeDeletedPack{
|
||||
Name: pack.Name,
|
||||
},
|
||||
)
|
||||
); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "create activity for pack deletion by id")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
@ -483,11 +489,14 @@ func (svc *Service) ApplyPackSpecs(ctx context.Context, specs []*fleet.PackSpec)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return result, svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
authz.UserFromContext(ctx),
|
||||
fleet.ActivityTypeAppliedSpecPack{},
|
||||
)
|
||||
); err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for pack spec")
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
|||
|
|
@ -190,7 +190,7 @@ func (svc *Service) NewQuery(ctx context.Context, p fleet.QueryPayload) (*fleet.
|
|||
Name: query.Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for query creation")
|
||||
}
|
||||
|
||||
return query, nil
|
||||
|
|
@ -273,7 +273,7 @@ func (svc *Service) ModifyQuery(ctx context.Context, id uint, p fleet.QueryPaylo
|
|||
Name: query.Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for query modification")
|
||||
}
|
||||
|
||||
return query, nil
|
||||
|
|
@ -322,13 +322,16 @@ func (svc *Service) DeleteQuery(ctx context.Context, name string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
authz.UserFromContext(ctx),
|
||||
fleet.ActivityTypeDeletedSavedQuery{
|
||||
Name: name,
|
||||
},
|
||||
)
|
||||
); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "create activity for query deletion")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
@ -374,13 +377,16 @@ func (svc *Service) DeleteQueryByID(ctx context.Context, id uint) error {
|
|||
return ctxerr.Wrap(ctx, err, "delete query")
|
||||
}
|
||||
|
||||
return svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
authz.UserFromContext(ctx),
|
||||
fleet.ActivityTypeDeletedSavedQuery{
|
||||
Name: query.Name,
|
||||
},
|
||||
)
|
||||
); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "create activity for query deletion by id")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
@ -430,17 +436,15 @@ func (svc *Service) DeleteQueries(ctx context.Context, ids []uint) (uint, error)
|
|||
return n, err
|
||||
}
|
||||
|
||||
err = svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
authz.UserFromContext(ctx),
|
||||
fleet.ActivityTypeDeletedMultipleSavedQuery{
|
||||
IDs: ids,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return n, err
|
||||
); err != nil {
|
||||
return 0, ctxerr.Wrap(ctx, err, "create activity for query deletions")
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
|
|
@ -506,13 +510,16 @@ func (svc *Service) ApplyQuerySpecs(ctx context.Context, specs []*fleet.QuerySpe
|
|||
return ctxerr.Wrap(ctx, err, "applying queries")
|
||||
}
|
||||
|
||||
return svc.ds.NewActivity(
|
||||
if err := svc.ds.NewActivity(
|
||||
ctx,
|
||||
authz.UserFromContext(ctx),
|
||||
fleet.ActivityTypeAppliedSpecSavedQuery{
|
||||
Specs: specs,
|
||||
},
|
||||
)
|
||||
); err != nil {
|
||||
return ctxerr.Wrap(ctx, err, "create activity for query spec")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func queryFromSpec(spec *fleet.QuerySpec) *fleet.Query {
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/fleetdm/fleet/v4/server/authz"
|
||||
"github.com/fleetdm/fleet/v4/server/config"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/fleetdm/fleet/v4/server/logging"
|
||||
"github.com/fleetdm/fleet/v4/server/service/async"
|
||||
"github.com/fleetdm/fleet/v4/server/sso"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
|
|
@ -36,7 +35,7 @@ type Service struct {
|
|||
config config.FleetConfig
|
||||
clock clock.Clock
|
||||
|
||||
osqueryLogWriter *logging.OsqueryLogger
|
||||
osqueryLogWriter *OsqueryLogger
|
||||
|
||||
mailService fleet.MailService
|
||||
ssoSessionStore sso.SessionStore
|
||||
|
|
@ -69,6 +68,18 @@ func (s *Service) SetEnterpriseOverrides(overrides fleet.EnterpriseOverrides) {
|
|||
s.EnterpriseOverrides = &overrides
|
||||
}
|
||||
|
||||
// OsqueryLogger holds osqueryd's status and result loggers.
|
||||
type OsqueryLogger struct {
|
||||
// Status holds the osqueryd's status logger.
|
||||
//
|
||||
// See https://osquery.readthedocs.io/en/stable/deployment/logging/#status-logs
|
||||
Status fleet.JSONLogger
|
||||
// Result holds the osqueryd's result logger.
|
||||
//
|
||||
// See https://osquery.readthedocs.io/en/stable/deployment/logging/#results-logs
|
||||
Result fleet.JSONLogger
|
||||
}
|
||||
|
||||
// NewService creates a new service from the config struct
|
||||
func NewService(
|
||||
ctx context.Context,
|
||||
|
|
@ -76,7 +87,7 @@ func NewService(
|
|||
task *async.Task,
|
||||
resultStore fleet.QueryResultStore,
|
||||
logger kitlog.Logger,
|
||||
osqueryLogger *logging.OsqueryLogger,
|
||||
osqueryLogger *OsqueryLogger,
|
||||
config config.FleetConfig,
|
||||
mailService fleet.MailService,
|
||||
c clock.Clock,
|
||||
|
|
|
|||
|
|
@ -136,117 +136,98 @@ func (svc *Service) LoggingConfig(ctx context.Context) (*fleet.Logging, error) {
|
|||
if err := svc.authz.Authorize(ctx, &fleet.AppConfig{}, fleet.ActionRead); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conf := svc.config
|
||||
logging := &fleet.Logging{
|
||||
Debug: conf.Logging.Debug,
|
||||
Json: conf.Logging.JSON,
|
||||
}
|
||||
|
||||
switch conf.Osquery.StatusLogPlugin {
|
||||
case "", "filesystem":
|
||||
logging.Status = fleet.LoggingPlugin{
|
||||
Plugin: "filesystem",
|
||||
Config: fleet.FilesystemConfig{FilesystemConfig: conf.Filesystem},
|
||||
}
|
||||
case "kinesis":
|
||||
logging.Status = fleet.LoggingPlugin{
|
||||
Plugin: "kinesis",
|
||||
Config: fleet.KinesisConfig{
|
||||
Region: conf.Kinesis.Region,
|
||||
StatusStream: conf.Kinesis.StatusStream,
|
||||
ResultStream: conf.Kinesis.ResultStream,
|
||||
},
|
||||
}
|
||||
case "firehose":
|
||||
logging.Status = fleet.LoggingPlugin{
|
||||
Plugin: "firehose",
|
||||
Config: fleet.FirehoseConfig{
|
||||
Region: conf.Firehose.Region,
|
||||
StatusStream: conf.Firehose.StatusStream,
|
||||
ResultStream: conf.Firehose.ResultStream,
|
||||
},
|
||||
}
|
||||
case "lambda":
|
||||
logging.Status = fleet.LoggingPlugin{
|
||||
Plugin: "lambda",
|
||||
Config: fleet.LambdaConfig{
|
||||
Region: conf.Lambda.Region,
|
||||
StatusFunction: conf.Lambda.StatusFunction,
|
||||
ResultFunction: conf.Lambda.ResultFunction,
|
||||
},
|
||||
}
|
||||
case "pubsub":
|
||||
logging.Status = fleet.LoggingPlugin{
|
||||
Plugin: "pubsub",
|
||||
Config: fleet.PubSubConfig{PubSubConfig: conf.PubSub},
|
||||
}
|
||||
case "stdout":
|
||||
logging.Status = fleet.LoggingPlugin{Plugin: "stdout"}
|
||||
case "kafkarest":
|
||||
logging.Status = fleet.LoggingPlugin{
|
||||
Plugin: "kafkarest",
|
||||
Config: fleet.KafkaRESTConfig{
|
||||
StatusTopic: conf.KafkaREST.StatusTopic,
|
||||
ProxyHost: conf.KafkaREST.ProxyHost,
|
||||
},
|
||||
}
|
||||
default:
|
||||
return nil, ctxerr.Errorf(ctx, "unrecognized logging plugin: %s", conf.Osquery.StatusLogPlugin)
|
||||
loggings := []struct {
|
||||
plugin string
|
||||
target *fleet.LoggingPlugin
|
||||
}{
|
||||
{
|
||||
plugin: conf.Osquery.StatusLogPlugin,
|
||||
target: &logging.Status,
|
||||
},
|
||||
{
|
||||
plugin: conf.Osquery.ResultLogPlugin,
|
||||
target: &logging.Result,
|
||||
},
|
||||
}
|
||||
|
||||
switch conf.Osquery.ResultLogPlugin {
|
||||
case "", "filesystem":
|
||||
logging.Result = fleet.LoggingPlugin{
|
||||
Plugin: "filesystem",
|
||||
Config: fleet.FilesystemConfig{FilesystemConfig: conf.Filesystem},
|
||||
}
|
||||
case "kinesis":
|
||||
logging.Result = fleet.LoggingPlugin{
|
||||
Plugin: "kinesis",
|
||||
Config: fleet.KinesisConfig{
|
||||
Region: conf.Kinesis.Region,
|
||||
StatusStream: conf.Kinesis.StatusStream,
|
||||
ResultStream: conf.Kinesis.ResultStream,
|
||||
},
|
||||
}
|
||||
case "firehose":
|
||||
logging.Result = fleet.LoggingPlugin{
|
||||
Plugin: "firehose",
|
||||
Config: fleet.FirehoseConfig{
|
||||
Region: conf.Firehose.Region,
|
||||
StatusStream: conf.Firehose.StatusStream,
|
||||
ResultStream: conf.Firehose.ResultStream,
|
||||
},
|
||||
}
|
||||
case "lambda":
|
||||
logging.Result = fleet.LoggingPlugin{
|
||||
Plugin: "lambda",
|
||||
Config: fleet.LambdaConfig{
|
||||
Region: conf.Lambda.Region,
|
||||
StatusFunction: conf.Lambda.StatusFunction,
|
||||
ResultFunction: conf.Lambda.ResultFunction,
|
||||
},
|
||||
}
|
||||
case "pubsub":
|
||||
logging.Result = fleet.LoggingPlugin{
|
||||
Plugin: "pubsub",
|
||||
Config: fleet.PubSubConfig{PubSubConfig: conf.PubSub},
|
||||
}
|
||||
case "stdout":
|
||||
logging.Result = fleet.LoggingPlugin{
|
||||
Plugin: "stdout",
|
||||
}
|
||||
case "kafkarest":
|
||||
logging.Result = fleet.LoggingPlugin{
|
||||
Plugin: "kafkarest",
|
||||
Config: fleet.KafkaRESTConfig{
|
||||
ResultTopic: conf.KafkaREST.ResultTopic,
|
||||
ProxyHost: conf.KafkaREST.ProxyHost,
|
||||
},
|
||||
}
|
||||
default:
|
||||
return nil, ctxerr.Errorf(ctx, "unrecognized logging plugin: %s", conf.Osquery.ResultLogPlugin)
|
||||
if conf.Activity.EnableAuditLog {
|
||||
loggings = append(loggings, struct {
|
||||
plugin string
|
||||
target *fleet.LoggingPlugin
|
||||
}{
|
||||
plugin: conf.Activity.AuditLogPlugin,
|
||||
target: &logging.Audit,
|
||||
})
|
||||
}
|
||||
|
||||
for _, lp := range loggings {
|
||||
switch lp.plugin {
|
||||
case "", "filesystem":
|
||||
*lp.target = fleet.LoggingPlugin{
|
||||
Plugin: "filesystem",
|
||||
Config: fleet.FilesystemConfig{
|
||||
FilesystemConfig: conf.Filesystem,
|
||||
},
|
||||
}
|
||||
case "kinesis":
|
||||
*lp.target = fleet.LoggingPlugin{
|
||||
Plugin: "kinesis",
|
||||
Config: fleet.KinesisConfig{
|
||||
Region: conf.Kinesis.Region,
|
||||
StatusStream: conf.Kinesis.StatusStream,
|
||||
ResultStream: conf.Kinesis.ResultStream,
|
||||
AuditStream: conf.Kinesis.AuditStream,
|
||||
},
|
||||
}
|
||||
case "firehose":
|
||||
*lp.target = fleet.LoggingPlugin{
|
||||
Plugin: "firehose",
|
||||
Config: fleet.FirehoseConfig{
|
||||
Region: conf.Firehose.Region,
|
||||
StatusStream: conf.Firehose.StatusStream,
|
||||
ResultStream: conf.Firehose.ResultStream,
|
||||
AuditStream: conf.Firehose.AuditStream,
|
||||
},
|
||||
}
|
||||
case "lambda":
|
||||
*lp.target = fleet.LoggingPlugin{
|
||||
Plugin: "lambda",
|
||||
Config: fleet.LambdaConfig{
|
||||
Region: conf.Lambda.Region,
|
||||
StatusFunction: conf.Lambda.StatusFunction,
|
||||
ResultFunction: conf.Lambda.ResultFunction,
|
||||
AuditFunction: conf.Lambda.AuditFunction,
|
||||
},
|
||||
}
|
||||
case "pubsub":
|
||||
*lp.target = fleet.LoggingPlugin{
|
||||
Plugin: "pubsub",
|
||||
Config: fleet.PubSubConfig{
|
||||
PubSubConfig: conf.PubSub,
|
||||
},
|
||||
}
|
||||
case "stdout":
|
||||
*lp.target = fleet.LoggingPlugin{Plugin: "stdout"}
|
||||
case "kafkarest":
|
||||
*lp.target = fleet.LoggingPlugin{
|
||||
Plugin: "kafkarest",
|
||||
Config: fleet.KafkaRESTConfig{
|
||||
StatusTopic: conf.KafkaREST.StatusTopic,
|
||||
ResultTopic: conf.KafkaREST.ResultTopic,
|
||||
AuditTopic: conf.KafkaREST.AuditTopic,
|
||||
ProxyHost: conf.KafkaREST.ProxyHost,
|
||||
},
|
||||
}
|
||||
default:
|
||||
return nil, ctxerr.Errorf(ctx, "unrecognized logging plugin: %s", lp.plugin)
|
||||
}
|
||||
}
|
||||
return logging, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ func TestCleanupURL(t *testing.T) {
|
|||
assert.Equal(tt, test.expected, actual)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestCreateAppConfig(t *testing.T) {
|
||||
|
|
@ -42,7 +41,7 @@ func TestCreateAppConfig(t *testing.T) {
|
|||
return &fleet.AppConfig{}, nil
|
||||
}
|
||||
|
||||
var appConfigTests = []struct {
|
||||
appConfigTests := []struct {
|
||||
configPayload fleet.AppConfig
|
||||
}{
|
||||
{
|
||||
|
|
@ -156,6 +155,7 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
fileSystemConfig := fleet.FilesystemConfig{FilesystemConfig: config.FilesystemConfig{
|
||||
StatusLogFile: logFile,
|
||||
ResultLogFile: logFile,
|
||||
AuditLogFile: logFile,
|
||||
EnableLogRotation: false,
|
||||
EnableLogCompression: false,
|
||||
}}
|
||||
|
|
@ -164,18 +164,21 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
Region: testFirehosePluginConfig().Firehose.Region,
|
||||
StatusStream: testFirehosePluginConfig().Firehose.StatusStream,
|
||||
ResultStream: testFirehosePluginConfig().Firehose.ResultStream,
|
||||
AuditStream: testFirehosePluginConfig().Firehose.AuditStream,
|
||||
}
|
||||
|
||||
kinesisConfig := fleet.KinesisConfig{
|
||||
Region: testKinesisPluginConfig().Kinesis.Region,
|
||||
StatusStream: testKinesisPluginConfig().Kinesis.StatusStream,
|
||||
ResultStream: testKinesisPluginConfig().Kinesis.ResultStream,
|
||||
AuditStream: testKinesisPluginConfig().Kinesis.AuditStream,
|
||||
}
|
||||
|
||||
lambdaConfig := fleet.LambdaConfig{
|
||||
Region: testLambdaPluginConfig().Lambda.Region,
|
||||
StatusFunction: testLambdaPluginConfig().Lambda.StatusFunction,
|
||||
ResultFunction: testLambdaPluginConfig().Lambda.ResultFunction,
|
||||
AuditFunction: testLambdaPluginConfig().Lambda.AuditFunction,
|
||||
}
|
||||
|
||||
pubsubConfig := fleet.PubSubConfig{
|
||||
|
|
@ -183,6 +186,7 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
Project: testPubSubPluginConfig().PubSub.Project,
|
||||
StatusTopic: testPubSubPluginConfig().PubSub.StatusTopic,
|
||||
ResultTopic: testPubSubPluginConfig().PubSub.ResultTopic,
|
||||
AuditTopic: testPubSubPluginConfig().PubSub.AuditTopic,
|
||||
AddAttributes: false,
|
||||
},
|
||||
}
|
||||
|
|
@ -215,6 +219,10 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
Plugin: "filesystem",
|
||||
Config: fileSystemConfig,
|
||||
},
|
||||
Audit: fleet.LoggingPlugin{
|
||||
Plugin: "filesystem",
|
||||
Config: fileSystemConfig,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -232,6 +240,10 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
Plugin: "firehose",
|
||||
Config: firehoseConfig,
|
||||
},
|
||||
Audit: fleet.LoggingPlugin{
|
||||
Plugin: "firehose",
|
||||
Config: firehoseConfig,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -249,6 +261,10 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
Plugin: "kinesis",
|
||||
Config: kinesisConfig,
|
||||
},
|
||||
Audit: fleet.LoggingPlugin{
|
||||
Plugin: "kinesis",
|
||||
Config: kinesisConfig,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -266,6 +282,10 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
Plugin: "lambda",
|
||||
Config: lambdaConfig,
|
||||
},
|
||||
Audit: fleet.LoggingPlugin{
|
||||
Plugin: "lambda",
|
||||
Config: lambdaConfig,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -283,6 +303,10 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
Plugin: "pubsub",
|
||||
Config: pubsubConfig,
|
||||
},
|
||||
Audit: fleet.LoggingPlugin{
|
||||
Plugin: "pubsub",
|
||||
Config: pubsubConfig,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -300,6 +324,10 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
Plugin: "stdout",
|
||||
Config: nil,
|
||||
},
|
||||
Audit: fleet.LoggingPlugin{
|
||||
Plugin: "stdout",
|
||||
Config: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -310,7 +338,6 @@ func TestService_LoggingConfig(t *testing.T) {
|
|||
want: nil,
|
||||
},
|
||||
}
|
||||
t.Parallel()
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ds := new(mock.Store)
|
||||
|
|
|
|||
|
|
@ -76,6 +76,7 @@ func (svc Service) NewTeamPolicy(ctx context.Context, teamID uint, p fleet.Polic
|
|||
if err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "creating policy")
|
||||
}
|
||||
|
||||
// Note: Issue #4191 proposes that we move to SQL transactions for actions so that we can
|
||||
// rollback an action in the event of an error writing the associated activity
|
||||
if err := svc.ds.NewActivity(
|
||||
|
|
@ -86,7 +87,7 @@ func (svc Service) NewTeamPolicy(ctx context.Context, teamID uint, p fleet.Polic
|
|||
Name: policy.Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for team policy creation")
|
||||
}
|
||||
return policy, nil
|
||||
}
|
||||
|
|
@ -251,7 +252,7 @@ func (svc Service) DeleteTeamPolicies(ctx context.Context, teamID uint, ids []ui
|
|||
Name: policiesByID[id].Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "adding new activity for deleted policy")
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for policy deletion")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -336,6 +337,7 @@ func (svc *Service) modifyPolicy(ctx context.Context, teamID *uint, id uint, p f
|
|||
if err != nil {
|
||||
return nil, ctxerr.Wrap(ctx, err, "saving policy")
|
||||
}
|
||||
|
||||
// Note: Issue #4191 proposes that we move to SQL transactions for actions so that we can
|
||||
// rollback an action in the event of an error writing the associated activity
|
||||
if err := svc.ds.NewActivity(
|
||||
|
|
@ -346,7 +348,7 @@ func (svc *Service) modifyPolicy(ctx context.Context, teamID *uint, id uint, p f
|
|||
Name: policy.Name,
|
||||
},
|
||||
); err != nil {
|
||||
return nil, err
|
||||
return nil, ctxerr.Wrap(ctx, err, "create activity for policy modification")
|
||||
}
|
||||
|
||||
return policy, nil
|
||||
|
|
|
|||
|
|
@ -45,10 +45,9 @@ func newTestServiceWithConfig(t *testing.T, ds fleet.Datastore, fleetConfig conf
|
|||
fleetConfig.Filesystem.EnableLogRotation,
|
||||
fleetConfig.Filesystem.EnableLogCompression,
|
||||
)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
osqlogger := &logging.OsqueryLogger{Status: writer, Result: writer}
|
||||
osqlogger := &OsqueryLogger{Status: writer, Result: writer}
|
||||
logger := kitlog.NewNopLogger()
|
||||
|
||||
var ssoStore sso.SessionStore
|
||||
|
|
@ -285,6 +284,7 @@ func testKinesisPluginConfig() config.FleetConfig {
|
|||
c := config.TestConfig()
|
||||
c.Osquery.ResultLogPlugin = "kinesis"
|
||||
c.Osquery.StatusLogPlugin = "kinesis"
|
||||
c.Activity.AuditLogPlugin = "kinesis"
|
||||
c.Kinesis = config.KinesisConfig{
|
||||
Region: "us-east-1",
|
||||
AccessKeyID: "foo",
|
||||
|
|
@ -292,6 +292,7 @@ func testKinesisPluginConfig() config.FleetConfig {
|
|||
StsAssumeRoleArn: "baz",
|
||||
StatusStream: "test-status-stream",
|
||||
ResultStream: "test-result-stream",
|
||||
AuditStream: "test-audit-stream",
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
|
@ -300,6 +301,7 @@ func testFirehosePluginConfig() config.FleetConfig {
|
|||
c := config.TestConfig()
|
||||
c.Osquery.ResultLogPlugin = "firehose"
|
||||
c.Osquery.StatusLogPlugin = "firehose"
|
||||
c.Activity.AuditLogPlugin = "firehose"
|
||||
c.Firehose = config.FirehoseConfig{
|
||||
Region: "us-east-1",
|
||||
AccessKeyID: "foo",
|
||||
|
|
@ -307,6 +309,7 @@ func testFirehosePluginConfig() config.FleetConfig {
|
|||
StsAssumeRoleArn: "baz",
|
||||
StatusStream: "test-status-firehose",
|
||||
ResultStream: "test-result-firehose",
|
||||
AuditStream: "test-audit-firehose",
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
|
@ -315,6 +318,7 @@ func testLambdaPluginConfig() config.FleetConfig {
|
|||
c := config.TestConfig()
|
||||
c.Osquery.ResultLogPlugin = "lambda"
|
||||
c.Osquery.StatusLogPlugin = "lambda"
|
||||
c.Activity.AuditLogPlugin = "lambda"
|
||||
c.Lambda = config.LambdaConfig{
|
||||
Region: "us-east-1",
|
||||
AccessKeyID: "foo",
|
||||
|
|
@ -322,6 +326,7 @@ func testLambdaPluginConfig() config.FleetConfig {
|
|||
StsAssumeRoleArn: "baz",
|
||||
ResultFunction: "result-func",
|
||||
StatusFunction: "status-func",
|
||||
AuditFunction: "audit-func",
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
|
@ -330,10 +335,12 @@ func testPubSubPluginConfig() config.FleetConfig {
|
|||
c := config.TestConfig()
|
||||
c.Osquery.ResultLogPlugin = "pubsub"
|
||||
c.Osquery.StatusLogPlugin = "pubsub"
|
||||
c.Activity.AuditLogPlugin = "pubsub"
|
||||
c.PubSub = config.PubSubConfig{
|
||||
Project: "test",
|
||||
StatusTopic: "status-topic",
|
||||
ResultTopic: "result-topic",
|
||||
AuditTopic: "audit-topic",
|
||||
AddAttributes: false,
|
||||
}
|
||||
return c
|
||||
|
|
@ -343,12 +350,17 @@ func testStdoutPluginConfig() config.FleetConfig {
|
|||
c := config.TestConfig()
|
||||
c.Osquery.ResultLogPlugin = "stdout"
|
||||
c.Osquery.StatusLogPlugin = "stdout"
|
||||
c.Activity.AuditLogPlugin = "stdout"
|
||||
return c
|
||||
}
|
||||
|
||||
func testUnrecognizedPluginConfig() config.FleetConfig {
|
||||
c := config.TestConfig()
|
||||
c.Osquery = config.OsqueryConfig{ResultLogPlugin: "bar", StatusLogPlugin: "bar"}
|
||||
c.Osquery = config.OsqueryConfig{
|
||||
ResultLogPlugin: "bar",
|
||||
StatusLogPlugin: "bar",
|
||||
}
|
||||
c.Activity.AuditLogPlugin = "bar"
|
||||
return c
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue