Schedule queries on hosts even if automations off (given we now store results) (#14411)

This is ready to go.

---------

Co-authored-by: Tim Lee <timlee@fleetdm.com>
This commit is contained in:
Lucas Manuel Rodriguez 2023-10-11 15:20:06 -03:00 committed by GitHub
parent 5aa9b80bf7
commit 9facf144dc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 398 additions and 136 deletions

View file

@ -20,8 +20,7 @@ services:
# Required for storage of Apple MDM installers.
"--max_allowed_packet=536870912"
]
environment:
&mysql-default-environment
environment: &mysql-default-environment
MYSQL_ROOT_PASSWORD: toor
MYSQL_DATABASE: fleet
MYSQL_USER: fleet
@ -41,7 +40,7 @@ services:
"--log_output=TABLE",
"--log-queries-not-using-indexes",
"--innodb-file-per-table=OFF",
"--table-definition-cache=2048",
"--table-definition-cache=8192",
# These 3 keys run MySQL with GTID consistency enforced to avoid issues with production deployments that use it.
"--enforce-gtid-consistency=ON",
"--log-bin=bin.log",
@ -142,3 +141,4 @@ volumes:
mysql-persistent-volume:
data-minio:

View file

@ -535,8 +535,8 @@ func (ds *Datastore) ObserverCanRunQuery(ctx context.Context, queryID uint) (boo
return observerCanRun, nil
}
func (ds *Datastore) ListScheduledQueriesForAgents(ctx context.Context, teamID *uint) ([]*fleet.Query, error) {
sql := `
func (ds *Datastore) ListScheduledQueriesForAgents(ctx context.Context, teamID *uint, queryReportsDisabled bool) ([]*fleet.Query, error) {
sqlStmt := `
SELECT
q.name,
q.query,
@ -545,22 +545,24 @@ func (ds *Datastore) ListScheduledQueriesForAgents(ctx context.Context, teamID *
q.platform,
q.min_osquery_version,
q.automations_enabled,
q.logging_type
q.logging_type,
q.discard_data
FROM queries q
WHERE q.saved = true
AND (q.schedule_interval > 0 AND q.automations_enabled = 1)
AND (q.schedule_interval > 0 AND %s AND (q.automations_enabled OR (NOT q.discard_data AND NOT ?)))
`
args := []interface{}{}
teamSQL := " team_id IS NULL"
if teamID != nil {
args = append(args, *teamID)
sql += " AND team_id = ?"
} else {
sql += " AND team_id IS NULL"
teamSQL = " team_id = ?"
}
sqlStmt = fmt.Sprintf(sqlStmt, teamSQL)
args = append(args, queryReportsDisabled)
results := []*fleet.Query{}
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, sql, args...); err != nil {
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, sqlStmt, args...); err != nil {
return nil, ctxerr.Wrap(ctx, err, "list scheduled queries for agents")
}

View file

@ -708,32 +708,44 @@ func testListScheduledQueriesForAgents(t *testing.T, ds *Datastore) {
if teamID != nil {
teamIDStr = fmt.Sprintf("%d", *teamID)
}
_, err := ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query1", teamIDStr),
Query: "select 1;",
Saved: true,
Interval: 0,
TeamID: teamID,
})
require.NoError(t, err)
// Non saved queries should not be returned here.
_, err = ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query2", teamIDStr),
Name: fmt.Sprintf("%s query1", teamIDStr),
Query: "select 1;",
Saved: false,
Interval: 10,
AutomationsEnabled: false,
TeamID: teamID,
DiscardData: true,
})
require.NoError(t, err)
q3, err := ds.NewQuery(context.Background(), &fleet.Query{
// Interval=0, AutomationsEnabled=0, DiscardData=0
_, err := ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query2", teamIDStr),
Query: "select 1;",
Saved: true,
Interval: 0,
TeamID: teamID,
AutomationsEnabled: false,
DiscardData: false,
})
require.NoError(t, err)
// Interval=0, AutomationsEnabled=0, DiscardData=1
_, err = ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query3", teamIDStr),
Query: "select 1;",
Saved: true,
Interval: 20,
AutomationsEnabled: true,
Interval: 0,
AutomationsEnabled: false,
TeamID: teamID,
DiscardData: true,
})
require.NoError(t, err)
// Interval=0, AutomationsEnabled=1, DiscardData=0
_, err = ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query4", teamIDStr),
Query: "select 1;",
@ -741,11 +753,84 @@ func testListScheduledQueriesForAgents(t *testing.T, ds *Datastore) {
Interval: 0,
AutomationsEnabled: true,
TeamID: teamID,
DiscardData: false,
})
require.NoError(t, err)
result, err := ds.ListScheduledQueriesForAgents(ctx, teamID)
// Interval=0, AutomationsEnabled=1, DiscardData=1
_, err = ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query5", teamIDStr),
Query: "select 1;",
Saved: true,
Interval: 0,
AutomationsEnabled: true,
TeamID: teamID,
DiscardData: true,
})
require.NoError(t, err)
test.QueryElementsMatch(t, result, []*fleet.Query{q3}, i)
// Interval=1, AutomationsEnabled=0, DiscardData=0
q6, err := ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query6", teamIDStr),
Query: "select 1;",
Saved: true,
Interval: 10,
AutomationsEnabled: false,
TeamID: teamID,
DiscardData: false,
})
require.NoError(t, err)
// Interval=1, AutomationsEnabled=0, DiscardData=1
_, err = ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query7", teamIDStr),
Query: "select 1;",
Saved: true,
Interval: 10,
AutomationsEnabled: false,
TeamID: teamID,
DiscardData: true,
})
require.NoError(t, err)
// Interval=1, AutomationsEnabled=1, DiscardData=0
q8, err := ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query8", teamIDStr),
Query: "select 1;",
Saved: true,
Interval: 10,
AutomationsEnabled: true,
TeamID: teamID,
DiscardData: false,
})
require.NoError(t, err)
// Interval=1, AutomationsEnabled=1, DiscardData=1
q9, err := ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("%s query9", teamIDStr),
Query: "select 1;",
Saved: true,
Interval: 10,
AutomationsEnabled: true,
TeamID: teamID,
DiscardData: true,
})
require.NoError(t, err)
queryReportsDisabled := false
result, err := ds.ListScheduledQueriesForAgents(ctx, teamID, queryReportsDisabled)
require.NoError(t, err)
sort.Slice(result, func(i, j int) bool {
return result[i].ID < result[j].ID
})
test.QueryElementsMatch(t, result, []*fleet.Query{q6, q8, q9}, i)
queryReportsDisabled = true
result, err = ds.ListScheduledQueriesForAgents(ctx, teamID, queryReportsDisabled)
require.NoError(t, err)
sort.Slice(result, func(i, j int) bool {
return result[i].ID < result[j].ID
})
test.QueryElementsMatch(t, result, []*fleet.Query{q8, q9}, i)
}
}

View file

@ -32,7 +32,7 @@ func (ds *Datastore) OverwriteQueryResultRows(ctx context.Context, rows []*fleet
return ctxerr.Wrap(ctx, err, "counting existing query results")
}
if countExisting == fleet.MaxQueryReportRows {
if countExisting >= fleet.MaxQueryReportRows {
// do not delete any rows if we are already at the limit
return nil
}

View file

@ -84,7 +84,7 @@ type Datastore interface {
ListQueries(ctx context.Context, opt ListQueryOptions) ([]*Query, error)
// ListScheduledQueriesForAgents returns a list of scheduled queries (without stats) for the
// given teamID. If teamID is nil, then all scheduled queries for the 'global' team are returned.
ListScheduledQueriesForAgents(ctx context.Context, teamID *uint) ([]*Query, error)
ListScheduledQueriesForAgents(ctx context.Context, teamID *uint, queryReportsDisabled bool) ([]*Query, error)
// QueryByName looks up a query by name on a team. If teamID is nil, then the query is looked up in
// the 'global' team.
QueryByName(ctx context.Context, teamID *uint, name string, opts ...OptionalArg) (*Query, error)

View file

@ -453,7 +453,6 @@ type Service interface {
ScheduleQuery(ctx context.Context, sq *ScheduledQuery) (query *ScheduledQuery, err error)
DeleteScheduledQuery(ctx context.Context, id uint) (err error)
ModifyScheduledQuery(ctx context.Context, id uint, p ScheduledQueryPayload) (query *ScheduledQuery, err error)
SaveResultLogsToQueryReports(ctx context.Context, results []json.RawMessage)
// /////////////////////////////////////////////////////////////////////////////
// StatusService

View file

@ -70,7 +70,7 @@ type QueryFunc func(ctx context.Context, id uint) (*fleet.Query, error)
type ListQueriesFunc func(ctx context.Context, opt fleet.ListQueryOptions) ([]*fleet.Query, error)
type ListScheduledQueriesForAgentsFunc func(ctx context.Context, teamID *uint) ([]*fleet.Query, error)
type ListScheduledQueriesForAgentsFunc func(ctx context.Context, teamID *uint, queryReportsDisabled bool) ([]*fleet.Query, error)
type QueryByNameFunc func(ctx context.Context, teamID *uint, name string, opts ...fleet.OptionalArg) (*fleet.Query, error)
@ -1909,11 +1909,11 @@ func (s *DataStore) ListQueries(ctx context.Context, opt fleet.ListQueryOptions)
return s.ListQueriesFunc(ctx, opt)
}
func (s *DataStore) ListScheduledQueriesForAgents(ctx context.Context, teamID *uint) ([]*fleet.Query, error) {
func (s *DataStore) ListScheduledQueriesForAgents(ctx context.Context, teamID *uint, queryReportsDisabled bool) ([]*fleet.Query, error) {
s.mu.Lock()
s.ListScheduledQueriesForAgentsFuncInvoked = true
s.mu.Unlock()
return s.ListScheduledQueriesForAgentsFunc(ctx, teamID)
return s.ListScheduledQueriesForAgentsFunc(ctx, teamID, queryReportsDisabled)
}
func (s *DataStore) QueryByName(ctx context.Context, teamID *uint, name string, opts ...fleet.OptionalArg) (*fleet.Query, error) {

View file

@ -79,7 +79,6 @@ func (s *integrationTestSuite) TestSlowOsqueryHost() {
SkipCreateTestUsers: true,
//nolint:gosec // G112: server is just run for testing this explicit config.
HTTPServerConfig: &http.Server{ReadTimeout: 2 * time.Second},
EnableCachedDS: true,
},
)
defer func() {

View file

@ -54,10 +54,9 @@ func (s *integrationEnterpriseTestSuite) SetupSuite() {
License: &fleet.LicenseInfo{
Tier: fleet.TierPremium,
},
Pool: s.redisPool,
Lq: s.lq,
Logger: log.NewLogfmtLogger(os.Stdout),
EnableCachedDS: true,
Pool: s.redisPool,
Lq: s.lq,
Logger: log.NewLogfmtLogger(os.Stdout),
}
users, server := RunServerForTestsWithDS(s.T(), s.ds, &config)
s.server = server

View file

@ -355,7 +355,12 @@ func getClientConfigEndpoint(ctx context.Context, request interface{}, svc fleet
}
func (svc *Service) getScheduledQueries(ctx context.Context, teamID *uint) (fleet.Queries, error) {
queries, err := svc.ds.ListScheduledQueriesForAgents(ctx, teamID)
appConfig, err := svc.ds.AppConfig(ctx)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "load app config")
}
queries, err := svc.ds.ListScheduledQueriesForAgents(ctx, teamID, appConfig.ServerSettings.QueryReportsDisabled)
if err != nil {
return nil, err
}
@ -536,7 +541,7 @@ func (svc *Service) AgentOptionsForHost(ctx context.Context, hostTeamID *uint, h
// Otherwise return the appropriate override for global options.
appConfig, err := svc.ds.AppConfig(ctx)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "load global agent options")
return nil, ctxerr.Wrap(ctx, err, "load app config")
}
var options fleet.AgentOptions
if appConfig.AgentOptions != nil {
@ -1381,9 +1386,8 @@ func submitLogsEndpoint(ctx context.Context, request interface{}, svc fleet.Serv
break
}
// Not returning errors as it will trigger osqueryd to retry the request
svc.SaveResultLogsToQueryReports(ctx, results)
// We currently return errors to osqueryd if there are any issues submitting results
// to the configured external destinations.
if err = svc.SubmitResultLogs(ctx, results); err != nil {
break
}
@ -1395,6 +1399,44 @@ func submitLogsEndpoint(ctx context.Context, request interface{}, svc fleet.Serv
return submitLogsResponse{Err: err}, nil
}
func (svc *Service) preProcessOsqueryResults(ctx context.Context, osqueryResults []json.RawMessage) (unmarshaledResults []*fleet.ScheduledQueryResult, queriesDBData map[string]*fleet.Query) {
// skipauth: Authorization is currently for user endpoints only.
svc.authz.SkipAuthorization(ctx)
for _, raw := range osqueryResults {
var result *fleet.ScheduledQueryResult
if err := json.Unmarshal(raw, &result); err != nil {
level.Error(svc.logger).Log("msg", "unmarshalling result", "err", err)
// Note we store a nil item if the result could not be unmarshaled.
}
unmarshaledResults = append(unmarshaledResults, result)
}
queriesDBData = make(map[string]*fleet.Query)
for _, queryResult := range unmarshaledResults {
if queryResult == nil {
// These are results that could not be unmarshaled.
continue
}
teamID, queryName, err := getQueryNameAndTeamIDFromResult(queryResult.QueryName)
if err != nil {
level.Error(svc.logger).Log("msg", "querying name and team ID from result", "err", err)
continue
}
if _, ok := queriesDBData[queryResult.QueryName]; ok {
// Already loaded.
continue
}
query, err := svc.ds.QueryByName(ctx, teamID, queryName)
if err != nil {
level.Debug(svc.logger).Log("msg", "loading query by name", "err", err, "team", teamID, "name", queryName)
continue
}
queriesDBData[queryResult.QueryName] = query
}
return unmarshaledResults, queriesDBData
}
func (svc *Service) SubmitStatusLogs(ctx context.Context, logs []json.RawMessage) error {
// skipauth: Authorization is currently for user endpoints only.
svc.authz.SkipAuthorization(ctx)
@ -1409,7 +1451,32 @@ func (svc *Service) SubmitResultLogs(ctx context.Context, logs []json.RawMessage
// skipauth: Authorization is currently for user endpoints only.
svc.authz.SkipAuthorization(ctx)
if err := svc.osqueryLogWriter.Result.Write(ctx, logs); err != nil {
//
// We do not return errors to osqueryd when processing results because
// otherwise the results will never clear from its local DB and
// will keep retrying forever.
//
unmarshaledResults, queriesDBData := svc.preProcessOsqueryResults(ctx, logs)
svc.saveResultLogsToQueryReports(ctx, unmarshaledResults, queriesDBData)
var filteredLogs []json.RawMessage
for i, unmarshaledResult := range unmarshaledResults {
if unmarshaledResult == nil {
// Ignore results that could not be unmarshaled.
continue
}
dbQuery, ok := queriesDBData[unmarshaledResult.QueryName]
if !ok {
// Ignore results for unknown queries.
continue
}
if !dbQuery.AutomationsEnabled {
// Ignore results for queries that have automations disabled.
continue
}
filteredLogs = append(filteredLogs, logs[i])
}
if err := svc.osqueryLogWriter.Result.Write(ctx, filteredLogs); err != nil {
return newOsqueryError("error writing result logs: " + err.Error())
}
return nil
@ -1419,76 +1486,57 @@ func (svc *Service) SubmitResultLogs(ctx context.Context, logs []json.RawMessage
// Query Reports
////////////////////////////////////////////////////////////////////////////////
func (svc *Service) SaveResultLogsToQueryReports(ctx context.Context, results []json.RawMessage) {
func (svc *Service) saveResultLogsToQueryReports(ctx context.Context, unmarshaledResults []*fleet.ScheduledQueryResult, queriesDBData map[string]*fleet.Query) {
// skipauth: Authorization is currently for user endpoints only.
svc.authz.SkipAuthorization(ctx)
host, ok := hostctx.FromContext(ctx)
if !ok {
level.Error(svc.logger).Log("err", "getting host from context")
return
}
// Do not insert results if query reports are disabled globally
appConfig, err := svc.ds.AppConfig(ctx)
if err != nil {
level.Error(svc.logger).Log("err", "getting app config", "err", err)
level.Error(svc.logger).Log("msg", "getting app config", "err", err)
return
}
if appConfig.ServerSettings.QueryReportsDisabled {
return
}
var queryResults []fleet.ScheduledQueryResult
for _, raw := range results {
var result fleet.ScheduledQueryResult
if err := json.Unmarshal(raw, &result); err != nil {
level.Error(svc.logger).Log("err", "unmarshalling result", "err", err)
continue
}
queryResults = append(queryResults, result)
}
// Filter results to only the most recent for each query
filtered := getMostRecentResults(queryResults)
// Filter results to only the most recent for each query.
filtered := getMostRecentResults(unmarshaledResults)
for _, result := range filtered {
if err := svc.processResults(ctx, result); err != nil {
level.Error(svc.logger).Log("err", "processing result", "err", err)
// Discard result if there is no snapshot
if len(result.Snapshot) == 0 {
continue
}
dbQuery, ok := queriesDBData[result.QueryName]
if !ok {
// Means the query does not exist with such name anymore. Thus we ignore its result.
continue
}
if dbQuery.DiscardData || dbQuery.Logging != fleet.LoggingSnapshot {
// Ignore result if query is marked as discard data or if logging is not snapshot
continue
}
if err := svc.overwriteResultRows(ctx, result, dbQuery.ID, host.ID); err != nil {
level.Error(svc.logger).Log("msg", "overwrite results", "err", err, "query_id", dbQuery.ID, "host_id", host.ID)
continue
}
}
return
}
func (svc *Service) processResults(ctx context.Context, result fleet.ScheduledQueryResult) error {
// Discard result if there is no snapshot
if len(result.Snapshot) == 0 {
return nil
}
teamID, queryName, err := getQueryNameAndTeamIDFromResult(result.QueryName)
if err != nil {
return ctxerr.Wrap(ctx, err, "querying name and team ID from result")
}
query, err := svc.ds.QueryByName(ctx, teamID, queryName)
if err != nil {
return nil // not logging here due to a known issue when renaming queries
}
// Discard Result if query is marked as discard data or if logging is not snapshot
if query.DiscardData || query.Logging != fleet.LoggingSnapshot {
return nil
}
host, ok := hostctx.FromContext(ctx)
if !ok {
return ctxerr.Wrap(ctx, err, "getting host from context")
}
return svc.overwriteResultRows(ctx, result, query.ID, host.ID)
}
// The "snapshot" array in a ScheduledQueryResult can contain multiple rows. Each
// row is saved as a separate ScheduledQueryResultRow. ie. a result could contain
// many USB Devices or a result could contain all User Accounts on a host.
func (svc *Service) overwriteResultRows(ctx context.Context, result fleet.ScheduledQueryResult, queryID, hostID uint) error {
func (svc *Service) overwriteResultRows(ctx context.Context, result *fleet.ScheduledQueryResult, queryID, hostID uint) error {
fetchTime := time.Now()
rows := make([]*fleet.ScheduledQueryResultRow, 0, len(result.Snapshot))
@ -1517,11 +1565,15 @@ func (svc *Service) overwriteResultRows(ctx context.Context, result fleet.Schedu
// Osquery can send multiple results for the same query (ie. if an agent loses
// network connectivity it will cache multiple results). Query Reports only
// save the most recent result for a given query.
func getMostRecentResults(results []fleet.ScheduledQueryResult) []fleet.ScheduledQueryResult {
func getMostRecentResults(results []*fleet.ScheduledQueryResult) []*fleet.ScheduledQueryResult {
// Use a map to track the most recent entry for each unique QueryName
latestResults := make(map[string]fleet.ScheduledQueryResult)
latestResults := make(map[string]*fleet.ScheduledQueryResult)
for _, result := range results {
if result == nil {
// This is a result that failed to unmarshal.
continue
}
if existing, ok := latestResults[result.QueryName]; ok {
// Compare the UnixTime time and update the map if the current result is more recent
if result.UnixTime > existing.UnixTime {
@ -1533,7 +1585,7 @@ func getMostRecentResults(results []fleet.ScheduledQueryResult) []fleet.Schedule
}
// Convert the map back to a slice
var filteredResults []fleet.ScheduledQueryResult
var filteredResults []*fleet.ScheduledQueryResult
for _, v := range latestResults {
filteredResults = append(filteredResults, v)
}

View file

@ -65,7 +65,7 @@ func TestGetClientConfig(t *testing.T) {
return []*fleet.ScheduledQuery{}, nil
}
}
ds.ListScheduledQueriesForAgentsFunc = func(ctx context.Context, teamID *uint) ([]*fleet.Query, error) {
ds.ListScheduledQueriesForAgentsFunc = func(ctx context.Context, teamID *uint, queryReportsDisabled bool) ([]*fleet.Query, error) {
if teamID == nil {
return nil, nil
}
@ -535,29 +535,133 @@ func TestSubmitResultLogs(t *testing.T) {
ds := new(mock.Store)
svc, ctx := newTestService(t, ds, nil, nil)
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return &fleet.AppConfig{}, nil
}
ds.QueryByNameFunc = func(ctx context.Context, teamID *uint, name string, opts ...fleet.OptionalArg) (*fleet.Query, error) {
switch {
case teamID == nil && (name == "time" || name == "system_info" || name == "encrypted" || name == "hosts"):
return &fleet.Query{
Name: name,
AutomationsEnabled: true,
}, nil
case teamID != nil && *teamID == 1 && name == "hosts":
return &fleet.Query{
Name: name,
AutomationsEnabled: true,
TeamID: teamID,
}, nil
case teamID == nil && name == "query_not_automated":
return &fleet.Query{
Name: name,
AutomationsEnabled: false,
}, nil
case teamID == nil && name == "query_should_be_saved_and_submitted":
return &fleet.Query{
ID: 123,
Name: name,
AutomationsEnabled: true,
Logging: fleet.LoggingSnapshot,
}, nil
case teamID == nil && name == "query_should_be_saved_but_not_submitted":
return &fleet.Query{
ID: 444,
Name: name,
AutomationsEnabled: false,
Logging: fleet.LoggingSnapshot,
}, nil
case teamID == nil && name == "query_no_rows":
return &fleet.Query{
ID: 555,
Name: name,
AutomationsEnabled: true,
Logging: fleet.LoggingSnapshot,
}, nil
default:
return nil, newNotFoundError()
}
}
ds.OverwriteQueryResultRowsFunc = func(ctx context.Context, rows []*fleet.ScheduledQueryResultRow) error {
if len(rows) == 0 {
return nil
}
switch {
case rows[0].QueryID == 123:
require.Len(t, rows, 1)
require.Equal(t, uint(999), rows[0].HostID)
require.NotZero(t, rows[0].LastFetched)
require.JSONEq(t, `{"hour":"20","minutes":"8"}`, string(rows[0].Data))
case rows[0].QueryID == 444:
require.Len(t, rows, 2)
require.Equal(t, uint(999), rows[0].HostID)
require.NotZero(t, rows[0].LastFetched)
require.JSONEq(t, `{"hour":"20","minutes":"8"}`, string(rows[0].Data))
require.Equal(t, uint(999), rows[1].HostID)
require.Equal(t, uint(444), rows[1].QueryID)
require.NotZero(t, rows[1].LastFetched)
require.JSONEq(t, `{"hour":"21","minutes":"9"}`, string(rows[1].Data))
}
return nil
}
// Hack to get at the service internals and modify the writer
serv := ((svc.(validationMiddleware)).Service).(*Service)
testLogger := &testJSONLogger{}
serv.osqueryLogWriter = &OsqueryLogger{Result: testLogger}
logs := []string{
`{"name":"system_info","hostIdentifier":"some_uuid","calendarTime":"Fri Sep 30 17:55:15 2016 UTC","unixTime":"1475258115","decorations":{"host_uuid":"some_uuid","username":"zwass"},"columns":{"cpu_brand":"Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz","hostname":"hostimus","physical_memory":"17179869184"},"action":"added"}`,
`{"name":"encrypted","hostIdentifier":"some_uuid","calendarTime":"Fri Sep 30 21:19:15 2016 UTC","unixTime":"1475270355","decorations":{"host_uuid":"4740D59F-699E-5B29-960B-979AAF9BBEEB","username":"zwass"},"columns":{"encrypted":"1","name":"\/dev\/disk1","type":"AES-XTS","uid":"","user_uuid":"","uuid":"some_uuid"},"action":"added"}`,
`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"time","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":"1484078931","decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`,
`{"diffResults":{"removed":[{"address":"127.0.0.1","hostnames":"kl.groob.io"}],"added":""},"name":"pack\/test\/hosts","hostIdentifier":"FA01680E-98CA-5557-8F59-7716ECFEE964","calendarTime":"Sun Nov 19 00:02:08 2017 UTC","unixTime":"1511049728","epoch":"0","counter":"10","decorations":{"host_uuid":"FA01680E-98CA-5557-8F59-7716ECFEE964","hostname":"kl.groob.io"}}`,
// fleet will accept anything in the "data" field of a log request.
`{"unknown":{"foo": [] }}`,
validLogResults := []string{
`{"name":"pack/Global/system_info","hostIdentifier":"some_uuid","calendarTime":"Fri Sep 30 17:55:15 2016 UTC","unixTime":"1475258115","decorations":{"host_uuid":"some_uuid","username":"zwass"},"columns":{"cpu_brand":"Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz","hostname":"hostimus","physical_memory":"17179869184"},"action":"added"}`,
`{"name":"pack/SomePack/encrypted","hostIdentifier":"some_uuid","calendarTime":"Fri Sep 30 21:19:15 2016 UTC","unixTime":"1475270355","decorations":{"host_uuid":"4740D59F-699E-5B29-960B-979AAF9BBEEB","username":"zwass"},"columns":{"encrypted":"1","name":"\/dev\/disk1","type":"AES-XTS","uid":"","user_uuid":"","uuid":"some_uuid"},"action":"added"}`,
`{"name":"pack/SomePack/encrypted","hostIdentifier":"some_uuid","calendarTime":"Fri Sep 30 21:19:14 2016 UTC","unixTime":"1475270354","decorations":{"host_uuid":"4740D59F-699E-5B29-960B-979AAF9BBEEB","username":"zwass"},"columns":{"encrypted":"1","name":"\/dev\/disk1","type":"AES-XTS","uid":"","user_uuid":"","uuid":"some_uuid"},"action":"added"}`,
// These results belong to the same query but have 1 second difference.
`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/Global/time","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":1484078931,"decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`,
`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/Global/time","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:50 2017 UTC","unixTime":1484078930,"decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`,
`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/Global/time","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:52 2017 UTC","unixTime":1484078932,"decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`,
`{"diffResults":{"removed":[{"address":"127.0.0.1","hostnames":"kl.groob.io"}],"added":""},"name":"pack\/team-1/hosts","hostIdentifier":"FA01680E-98CA-5557-8F59-7716ECFEE964","calendarTime":"Sun Nov 19 00:02:08 2017 UTC","unixTime":"1511049728","epoch":"0","counter":"10","decorations":{"host_uuid":"FA01680E-98CA-5557-8F59-7716ECFEE964","hostname":"kl.groob.io"}}`,
`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/Global/query_should_be_saved_and_submitted","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":1484078931,"decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`,
//`{"snapshot":[],"action":"snapshot","name":"pack/Global/query_no_rows","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":1484078931,"decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`,
}
logJSON := fmt.Sprintf("[%s]", strings.Join(logs, ","))
logJSON := fmt.Sprintf("[%s]", strings.Join(validLogResults, ","))
resultWithInvalidJSON := []byte("foobar:\n\t123")
// The "name" field will be empty, so this result will be ignored.
resultWithoutName := []byte(`{"unknown":{"foo": [] }}`)
// The "name" field has invalid format, so this result will be ignored.
resultWithInvalidNameFmt1 := []byte(`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/team-foo/bar","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":"1484078931","decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`)
resultWithInvalidNameFmt2 := []byte(`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/team-","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":"1484078931","decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`)
resultWithInvalidNameFmt3 := []byte(`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/PackName","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":"1484078931","decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`)
// The query doesn't exist, so this result will be ignored.
resultWithQueryDoesNotExist := []byte(`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/Global/doesntexist","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":"1484078931","decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`)
// The query was configured with automations disabled, so this result will be ignored.
resultWithQueryNotAutomated := []byte(`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/Global/query_not_automated","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":"1484078931","decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`)
// The query is supposed to be saved but with automations disabled (and has two columns).
resultWithQuerySavedNotAutomated := []byte(`{"snapshot":[{"hour":"20","minutes":"8"},{"hour":"21","minutes":"9"}],"action":"snapshot","name":"pack/Global/query_should_be_saved_but_not_submitted","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":1484078931,"decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`)
var results []json.RawMessage
err := json.Unmarshal([]byte(logJSON), &results)
require.NoError(t, err)
host := fleet.Host{}
host := fleet.Host{
ID: 999,
}
ctx = hostctx.NewContext(ctx, &host)
err = serv.SubmitResultLogs(ctx, results)
// Submit valid and invalid log results mixed.
err = serv.SubmitResultLogs(ctx, append(append(results[:3],
resultWithInvalidJSON,
resultWithoutName,
resultWithInvalidNameFmt1,
resultWithInvalidNameFmt2,
resultWithInvalidNameFmt3,
resultWithQueryDoesNotExist,
resultWithQueryNotAutomated,
resultWithQuerySavedNotAutomated,
), results[3:]...))
require.NoError(t, err)
assert.Equal(t, results, testLogger.logs)
@ -567,49 +671,76 @@ func TestSaveResultLogsToQueryReports(t *testing.T) {
ds := new(mock.Store)
svc, ctx := newTestService(t, ds, nil, nil)
logRawMessages := []json.RawMessage{
json.RawMessage(`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/Global/Uptime","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":1484078931,"decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`),
}
// Hack to get at the private methods
serv := ((svc.(validationMiddleware)).Service).(*Service)
host := fleet.Host{}
ctx = hostctx.NewContext(ctx, &host)
results := []*fleet.ScheduledQueryResult{
{
QueryName: "pack/Global/Uptime",
OsqueryHostID: "1379f59d98f4",
Snapshot: []json.RawMessage{
json.RawMessage(`{"hour":"20","minutes":"8"}`),
},
UnixTime: 1484078931,
},
}
queriesDBData := map[string]*fleet.Query{
"pack/Global/Uptime": {
ID: 1,
DiscardData: false,
Logging: fleet.LoggingSnapshot,
},
}
// Results not saved if query reports disabled globally
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return &fleet.AppConfig{ServerSettings: fleet.ServerSettings{QueryReportsDisabled: true}}, nil
}
svc.SaveResultLogsToQueryReports(ctx, logRawMessages)
serv.saveResultLogsToQueryReports(ctx, results, queriesDBData)
assert.False(t, ds.OverwriteQueryResultRowsFuncInvoked)
// Result not saved if result is not a snapshot
logRawMessages = []json.RawMessage{
json.RawMessage(`{"name":"pack/Global/Uptime","hostIdentifier":"2e23c347-da72-4e72-b6a8-a6b8a9a46ab7","calendarTime":"Fri Oct 6 14:19:15 2023 UTC","unixTime":1696601955,"epoch":0,"counter":10,"numerics":false,"decorations":{"host_uuid":"550eb898-c522-410b-8855-d74d94fdfcd2","hostname":"0025ad6e71fb"},"columns":{"days":"0","hours":"4","minutes":"52","seconds":"25","total_seconds":"17545"},"action":"removed"}`),
notSnapshotResult := []*fleet.ScheduledQueryResult{
{
QueryName: "pack/Global/Uptime",
OsqueryHostID: "1379f59d98f4",
Snapshot: []json.RawMessage{},
UnixTime: 1484078931,
},
}
svc.SaveResultLogsToQueryReports(ctx, logRawMessages)
// Results not saved if Logging is not snapshot in the query config
logRawMessages = []json.RawMessage{
json.RawMessage(`{"snapshot":[{"hour":"20","minutes":"8"}],"action":"snapshot","name":"pack/Global/Uptime","hostIdentifier":"1379f59d98f4","calendarTime":"Tue Jan 10 20:08:51 2017 UTC","unixTime":1484078931,"decorations":{"host_uuid":"EB714C9D-C1F8-A436-B6DA-3F853C5502EA"}}`),
}
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return &fleet.AppConfig{ServerSettings: fleet.ServerSettings{QueryReportsDisabled: false}}, nil
}
serv.saveResultLogsToQueryReports(ctx, notSnapshotResult, queriesDBData)
assert.False(t, ds.OverwriteQueryResultRowsFuncInvoked)
// Results not saved if DiscardData is true in Query
ds.QueryByNameFunc = func(ctx context.Context, teamID *uint, name string, opts ...fleet.OptionalArg) (*fleet.Query, error) {
return &fleet.Query{ID: 1, DiscardData: true, Logging: fleet.LoggingSnapshot}, nil
discardDataFalse := map[string]*fleet.Query{
"pack/Global/Uptime": {
ID: 1,
DiscardData: true,
Logging: fleet.LoggingSnapshot,
},
}
svc.SaveResultLogsToQueryReports(ctx, logRawMessages)
serv.saveResultLogsToQueryReports(ctx, results, discardDataFalse)
assert.False(t, ds.OverwriteQueryResultRowsFuncInvoked)
// Happy Path: Results saved
ds.QueryByNameFunc = func(ctx context.Context, teamID *uint, name string, opts ...fleet.OptionalArg) (*fleet.Query, error) {
return &fleet.Query{ID: 1, DiscardData: false, Logging: fleet.LoggingSnapshot}, nil
discardDataTrue := map[string]*fleet.Query{
"pack/Global/Uptime": {
ID: 1,
DiscardData: false,
Logging: fleet.LoggingSnapshot,
},
}
ds.OverwriteQueryResultRowsFunc = func(ctx context.Context, rows []*fleet.ScheduledQueryResultRow) error {
return nil
}
svc.SaveResultLogsToQueryReports(ctx, logRawMessages)
serv.saveResultLogsToQueryReports(ctx, results, discardDataTrue)
require.True(t, ds.OverwriteQueryResultRowsFuncInvoked)
}
@ -649,12 +780,12 @@ func TestGetQueryNameAndTeamIDFromResult(t *testing.T) {
func TestGetMostRecentResults(t *testing.T) {
tests := []struct {
name string
input []fleet.ScheduledQueryResult
expected []fleet.ScheduledQueryResult
input []*fleet.ScheduledQueryResult
expected []*fleet.ScheduledQueryResult
}{
{
name: "basic test",
input: []fleet.ScheduledQueryResult{
input: []*fleet.ScheduledQueryResult{
{QueryName: "test1", UnixTime: 1},
{QueryName: "test1", UnixTime: 2},
{QueryName: "test1", UnixTime: 3},
@ -662,14 +793,14 @@ func TestGetMostRecentResults(t *testing.T) {
{QueryName: "test2", UnixTime: 2},
{QueryName: "test2", UnixTime: 3},
},
expected: []fleet.ScheduledQueryResult{
expected: []*fleet.ScheduledQueryResult{
{QueryName: "test1", UnixTime: 3},
{QueryName: "test2", UnixTime: 3},
},
},
{
name: "out of order test",
input: []fleet.ScheduledQueryResult{
input: []*fleet.ScheduledQueryResult{
{QueryName: "test1", UnixTime: 2},
{QueryName: "test1", UnixTime: 3},
{QueryName: "test1", UnixTime: 1},
@ -677,7 +808,7 @@ func TestGetMostRecentResults(t *testing.T) {
{QueryName: "test2", UnixTime: 2},
{QueryName: "test2", UnixTime: 1},
},
expected: []fleet.ScheduledQueryResult{
expected: []*fleet.ScheduledQueryResult{
{QueryName: "test1", UnixTime: 3},
{QueryName: "test2", UnixTime: 3},
},
@ -2132,7 +2263,7 @@ func TestUpdateHostIntervals(t *testing.T) {
svc, ctx := newTestService(t, ds, nil, nil)
ds.ListScheduledQueriesForAgentsFunc = func(ctx context.Context, teamID *uint) ([]*fleet.Query, error) {
ds.ListScheduledQueriesForAgentsFunc = func(ctx context.Context, teamID *uint, queryReportsDisabled bool) ([]*fleet.Query, error) {
return nil, nil
}

View file

@ -17,7 +17,6 @@ import (
eeservice "github.com/fleetdm/fleet/v4/ee/server/service"
"github.com/fleetdm/fleet/v4/server/config"
"github.com/fleetdm/fleet/v4/server/contexts/license"
"github.com/fleetdm/fleet/v4/server/datastore/cached_mysql"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/logging"
"github.com/fleetdm/fleet/v4/server/mail"
@ -289,13 +288,9 @@ type TestServerOpts struct {
UseMailService bool
APNSTopic string
ProfileMatcher fleet.ProfileMatcher
EnableCachedDS bool
}
func RunServerForTestsWithDS(t *testing.T, ds fleet.Datastore, opts ...*TestServerOpts) (map[string]fleet.User, *httptest.Server) {
if len(opts) > 0 && opts[0].EnableCachedDS {
ds = cached_mysql.New(ds)
}
var rs fleet.QueryResultStore
if len(opts) > 0 && opts[0].Rs != nil {
rs = opts[0].Rs