Cleanup Query Results Cron (#15917)

This commit is contained in:
Tim Lee 2024-01-05 06:05:01 -07:00 committed by GitHub
parent 2eb29bd56b
commit d690867a6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 96 additions and 0 deletions

View file

@ -0,0 +1 @@
- resolved issue where some query results were still reporting after Discard Data is enabled on a Query

View file

@ -834,6 +834,10 @@ func newCleanupsAndAggregationSchedule(
}
}
if err = ds.CleanupDiscardedQueryResults(ctx); err != nil {
return err
}
return nil
}),
)

View file

@ -146,3 +146,16 @@ func (ds *Datastore) QueryResultRowsForHost(ctx context.Context, queryID, hostID
return results, nil
}
func (ds *Datastore) CleanupDiscardedQueryResults(ctx context.Context) error {
deleteStmt := `
DELETE FROM query_results
WHERE query_id IN
(SELECT id FROM queries WHERE discard_data = true)
`
_, err := ds.writer(ctx).ExecContext(ctx, deleteStmt)
if err != nil {
return ctxerr.Wrap(ctx, err, "cleaning up discarded query results")
}
return nil
}

View file

@ -27,6 +27,7 @@ func TestQueryResults(t *testing.T) {
{"MaxRows", testQueryResultRowsDoNotExceedMaxRows},
{"QueryResultRows", testQueryResultRows},
{"QueryResultRowsFilter", testQueryResultRowsTeamFilter},
{"CleanupQueryResultRows", testCleanupQueryResultRows},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
@ -628,3 +629,64 @@ func testQueryResultRows(t *testing.T, ds *Datastore) {
require.NoError(t, err)
require.Len(t, results, 1)
}
func testCleanupQueryResultRows(t *testing.T, ds *Datastore) {
user := test.NewUser(t, ds, "Test User", "test@example.com", true)
queryNoDiscard := test.NewQuery(t, ds, nil, "Query No Discard", "SELECT 1", user.ID, true)
queryDiscardTrue := test.NewQuery(t, ds, nil, "Query Discard True", "SELECT 1", user.ID, true)
queryDiscardTrue.DiscardData = true
err := ds.SaveQuery(context.Background(), queryDiscardTrue, false, false)
require.NoError(t, err)
mockTime := time.Now().UTC().Truncate(time.Second)
// Insert query result rows
rows := []*fleet.ScheduledQueryResultRow{
{
QueryID: queryNoDiscard.ID,
HostID: 1,
LastFetched: mockTime,
Data: ptr.RawMessage([]byte(`{"model": "USB Mouse", "vendor": "Logitech"}`)),
},
{
QueryID: queryNoDiscard.ID,
HostID: 1,
LastFetched: mockTime,
Data: ptr.RawMessage([]byte(`{"model": "Keyboard", "vendor": "Microsoft"}`)),
},
}
err = ds.OverwriteQueryResultRows(context.Background(), rows)
require.NoError(t, err)
// Call OverwriteQueryResultRows again with different rows
overwriteRows := []*fleet.ScheduledQueryResultRow{
{
QueryID: queryDiscardTrue.ID,
HostID: 1,
LastFetched: mockTime,
Data: ptr.RawMessage([]byte(`{"model": "Headphones", "vendor": "Sony"}`)),
},
{
QueryID: queryDiscardTrue.ID,
HostID: 1,
LastFetched: mockTime,
Data: ptr.RawMessage([]byte(`{"model": "Speakers", "vendor": "Bose"}`)),
},
}
err = ds.OverwriteQueryResultRows(context.Background(), overwriteRows)
require.NoError(t, err)
// Cleanup query result rows
err = ds.CleanupDiscardedQueryResults(context.Background())
require.NoError(t, err)
// Verify that the rows with discard data set to false are not removed
results, err := ds.QueryResultRows(context.Background(), queryNoDiscard.ID, fleet.TeamFilter{User: user})
require.NoError(t, err)
require.Len(t, results, 2)
// Verify that the rows with discard data set to true are removed
results, err = ds.QueryResultRows(context.Background(), queryDiscardTrue.ID, fleet.TeamFilter{User: user})
require.NoError(t, err)
require.Len(t, results, 0)
}

View file

@ -416,6 +416,10 @@ type Datastore interface {
ResultCountForQuery(ctx context.Context, queryID uint) (int, error)
ResultCountForQueryAndHost(ctx context.Context, queryID, hostID uint) (int, error)
OverwriteQueryResultRows(ctx context.Context, rows []*ScheduledQueryResultRow) error
// CleanupDiscardedQueryResults deletes all query results for queries with DiscardData enabled.
// Used in cleanups_then_aggregation cron to cleanup rows that were inserted immediately
// after DiscardData was set to true due to query caching.
CleanupDiscardedQueryResults(ctx context.Context) error
///////////////////////////////////////////////////////////////////////////////
// TeamStore

View file

@ -318,6 +318,8 @@ type ResultCountForQueryAndHostFunc func(ctx context.Context, queryID uint, host
type OverwriteQueryResultRowsFunc func(ctx context.Context, rows []*fleet.ScheduledQueryResultRow) error
type CleanupDiscardedQueryResultsFunc func(ctx context.Context) error
type NewTeamFunc func(ctx context.Context, team *fleet.Team) (*fleet.Team, error)
type SaveTeamFunc func(ctx context.Context, team *fleet.Team) (*fleet.Team, error)
@ -1237,6 +1239,9 @@ type DataStore struct {
OverwriteQueryResultRowsFunc OverwriteQueryResultRowsFunc
OverwriteQueryResultRowsFuncInvoked bool
CleanupDiscardedQueryResultsFunc CleanupDiscardedQueryResultsFunc
CleanupDiscardedQueryResultsFuncInvoked bool
NewTeamFunc NewTeamFunc
NewTeamFuncInvoked bool
@ -2992,6 +2997,13 @@ func (s *DataStore) OverwriteQueryResultRows(ctx context.Context, rows []*fleet.
return s.OverwriteQueryResultRowsFunc(ctx, rows)
}
func (s *DataStore) CleanupDiscardedQueryResults(ctx context.Context) error {
s.mu.Lock()
s.CleanupDiscardedQueryResultsFuncInvoked = true
s.mu.Unlock()
return s.CleanupDiscardedQueryResultsFunc(ctx)
}
func (s *DataStore) NewTeam(ctx context.Context, team *fleet.Team) (*fleet.Team, error) {
s.mu.Lock()
s.NewTeamFuncInvoked = true