diff --git a/changes/15709-live-query-stats-cleared b/changes/15709-live-query-stats-cleared new file mode 100644 index 0000000000..5a29a843bb --- /dev/null +++ b/changes/15709-live-query-stats-cleared @@ -0,0 +1 @@ +Live query stats are cleared when query SQL is modified. \ No newline at end of file diff --git a/server/datastore/mysql/queries.go b/server/datastore/mysql/queries.go index e4a279cd82..2a26884985 100644 --- a/server/datastore/mysql/queries.go +++ b/server/datastore/mysql/queries.go @@ -245,7 +245,7 @@ func (ds *Datastore) NewQuery( return query, nil } -func (ds *Datastore) SaveQuery(ctx context.Context, q *fleet.Query, shouldDiscardResults bool) (err error) { +func (ds *Datastore) SaveQuery(ctx context.Context, q *fleet.Query, shouldDiscardResults bool, shouldDeleteStats bool) (err error) { if err := q.Verify(); err != nil { return ctxerr.Wrap(ctx, err) } @@ -297,6 +297,11 @@ func (ds *Datastore) SaveQuery(ctx context.Context, q *fleet.Query, shouldDiscar return ctxerr.Wrap(ctx, notFound("Query").WithID(q.ID)) } + if shouldDeleteStats { + // Delete any associated stats asynchronously. + go ds.deleteQueryStats(context.WithoutCancel(ctx), []uint{q.ID}) + } + // Opportunistically delete associated query_results. // // TODO(lucas): We should run this on a transaction but we found @@ -349,14 +354,7 @@ func (ds *Datastore) DeleteQuery(ctx context.Context, teamID *uint, name string) } // Delete any associated stats asynchronously. - ctxWithoutCancel := context.WithoutCancel(ctx) - go func() { - stmt := "DELETE FROM scheduled_query_stats WHERE scheduled_query_id = ?" - _, err := ds.writer(ctxWithoutCancel).ExecContext(ctxWithoutCancel, stmt, queryID) - if err != nil { - level.Error(ds.logger).Log("msg", "error deleting query stats", "err", err) - } - }() + go ds.deleteQueryStats(context.WithoutCancel(ctx), []uint{queryID}) // Opportunistically delete associated query_results. // @@ -378,19 +376,7 @@ func (ds *Datastore) DeleteQueries(ctx context.Context, ids []uint) (uint, error } // Delete any associated stats asynchronously. - ctxWithoutCancel := context.WithoutCancel(ctx) - go func() { - stmt := "DELETE FROM scheduled_query_stats WHERE scheduled_query_id IN (?)" - stmt, args, err := sqlx.In(stmt, ids) - if err != nil { - level.Error(ds.logger).Log("msg", "error creating delete query statement", "err", err) - return - } - _, err = ds.writer(ctxWithoutCancel).ExecContext(ctxWithoutCancel, stmt, args...) - if err != nil { - level.Error(ds.logger).Log("msg", "error deleting multiple query stats", "err", err) - } - }() + go ds.deleteQueryStats(context.WithoutCancel(ctx), ids) // Opportunistically delete associated query_results. // @@ -402,6 +388,35 @@ func (ds *Datastore) DeleteQueries(ctx context.Context, ids []uint) (uint, error return deleted, nil } +// deleteQueryStats deletes query stats and aggregated stats for saved queries. +// Errors are logged and not returned. +func (ds *Datastore) deleteQueryStats(ctx context.Context, queryIDs []uint) { + // Delete stats for each host. + stmt := "DELETE FROM scheduled_query_stats WHERE scheduled_query_id IN (?)" + stmt, args, err := sqlx.In(stmt, queryIDs) + if err != nil { + level.Error(ds.logger).Log("msg", "error creating delete query stats statement", "err", err) + } else { + _, err = ds.writer(ctx).ExecContext(ctx, stmt, args...) + if err != nil { + level.Error(ds.logger).Log("msg", "error deleting query stats", "err", err) + } + } + + // Delete aggregated stats + stmt = fmt.Sprintf("DELETE FROM aggregated_stats WHERE type = '%s' AND id IN (?)", fleet.AggregatedStatsTypeScheduledQuery) + stmt, args, err = sqlx.In(stmt, queryIDs) + if err != nil { + level.Error(ds.logger).Log("msg", "error creating delete aggregated stats statement", "err", err) + } else { + _, err = ds.writer(ctx).ExecContext(ctx, stmt, args...) + if err != nil { + level.Error(ds.logger).Log("msg", "error deleting aggregated stats", "err", err) + } + } + +} + // Query returns a single Query identified by id, if such exists. func (ds *Datastore) Query(ctx context.Context, id uint) (*fleet.Query, error) { sqlQuery := ` diff --git a/server/datastore/mysql/queries_test.go b/server/datastore/mysql/queries_test.go index 383752b87d..098d3d5925 100644 --- a/server/datastore/mysql/queries_test.go +++ b/server/datastore/mysql/queries_test.go @@ -2,6 +2,7 @@ package mysql import ( "context" + "database/sql" "fmt" "math" "math/rand" @@ -179,11 +180,14 @@ func testQueriesDelete(t *testing.T, ds *Datastore) { err = ds.UpdateLiveQueryStats( context.Background(), query.ID, []*fleet.LiveQueryStats{ { - HostID: hostID, + HostID: hostID, + Executions: 1, }, }, ) require.NoError(t, err) + err = ds.CalculateAggregatedPerfStatsPercentiles(context.Background(), fleet.AggregatedStatsTypeScheduledQuery, query.ID) + require.NoError(t, err) err = ds.DeleteQuery(context.Background(), query.TeamID, query.Name) require.NoError(t, err) @@ -200,6 +204,8 @@ func testQueriesDelete(t *testing.T, ds *Datastore) { stats, err := ds.GetLiveQueryStats(context.Background(), query.ID, []uint{hostID}) require.NoError(t, err) require.Equal(t, 0, len(stats)) + _, err = GetAggregatedStats(context.Background(), ds, fleet.AggregatedStatsTypeScheduledQuery, query.ID) + require.ErrorIs(t, err, sql.ErrNoRows) } func testQueriesGetByName(t *testing.T, ds *Datastore) { @@ -255,10 +261,12 @@ func testQueriesDeleteMany(t *testing.T, ds *Datastore) { err = ds.UpdateLiveQueryStats( context.Background(), q1.ID, []*fleet.LiveQueryStats{ { - HostID: hostIDs[0], + HostID: hostIDs[0], + Executions: 1, }, { - HostID: hostIDs[1], + HostID: hostIDs[1], + Executions: 1, }, }, ) @@ -266,11 +274,16 @@ func testQueriesDeleteMany(t *testing.T, ds *Datastore) { err = ds.UpdateLiveQueryStats( context.Background(), q3.ID, []*fleet.LiveQueryStats{ { - HostID: hostIDs[0], + HostID: hostIDs[0], + Executions: 1, }, }, ) require.NoError(t, err) + err = ds.CalculateAggregatedPerfStatsPercentiles(context.Background(), fleet.AggregatedStatsTypeScheduledQuery, q1.ID) + require.NoError(t, err) + err = ds.CalculateAggregatedPerfStatsPercentiles(context.Background(), fleet.AggregatedStatsTypeScheduledQuery, q3.ID) + require.NoError(t, err) deleted, err := ds.DeleteQueries(context.Background(), []uint{q1.ID, q3.ID}) require.Nil(t, err) @@ -289,6 +302,10 @@ func testQueriesDeleteMany(t *testing.T, ds *Datastore) { stats, err = ds.GetLiveQueryStats(context.Background(), q3.ID, hostIDs) require.NoError(t, err) require.Equal(t, 0, len(stats)) + _, err = GetAggregatedStats(context.Background(), ds, fleet.AggregatedStatsTypeScheduledQuery, q1.ID) + require.ErrorIs(t, err, sql.ErrNoRows) + _, err = GetAggregatedStats(context.Background(), ds, fleet.AggregatedStatsTypeScheduledQuery, q3.ID) + require.ErrorIs(t, err, sql.ErrNoRows) deleted, err = ds.DeleteQueries(context.Background(), []uint{q2.ID}) require.Nil(t, err) @@ -337,7 +354,7 @@ func testQueriesSave(t *testing.T, ds *Datastore) { query.Logging = fleet.LoggingDifferential query.DiscardData = true - err = ds.SaveQuery(context.Background(), query, true) + err = ds.SaveQuery(context.Background(), query, true, false) require.NoError(t, err) actual, err := ds.Query(context.Background(), query.ID) @@ -349,6 +366,47 @@ func testQueriesSave(t *testing.T, ds *Datastore) { require.Equal(t, "baz", actual.Query) require.Equal(t, "Zach", actual.AuthorName) require.Equal(t, "zwass@fleet.co", actual.AuthorEmail) + + // Now save again and delete stats. + // First we create stats which will be deleted. + const hostID = 1 + err = ds.UpdateLiveQueryStats( + context.Background(), query.ID, []*fleet.LiveQueryStats{ + { + HostID: hostID, + Executions: 1, + }, + }, + ) + require.NoError(t, err) + err = ds.CalculateAggregatedPerfStatsPercentiles(context.Background(), fleet.AggregatedStatsTypeScheduledQuery, query.ID) + require.NoError(t, err) + // Update/save query. + query.Query = "baz2" + err = ds.SaveQuery(context.Background(), query, true, true) + require.NoError(t, err) + actual, err = ds.Query(context.Background(), query.ID) + require.NoError(t, err) + require.NotNil(t, actual) + // The query now comes with stats, so we need to fill them in for comparison + query.AggregatedStats = fleet.AggregatedStats{ + SystemTimeP50: ptr.Float64(0), + SystemTimeP95: ptr.Float64(0), + UserTimeP50: ptr.Float64(0), + UserTimeP95: ptr.Float64(0), + TotalExecutions: ptr.Float64(1), + } + test.QueriesMatch(t, query, actual) + + // Ensure stats were deleted. + // The actual delete occurs asynchronously, so enough time should have passed + // to ensure the original query completed. + time.Sleep(10 * time.Millisecond) + stats, err := ds.GetLiveQueryStats(context.Background(), query.ID, []uint{hostID}) + require.NoError(t, err) + require.Equal(t, 0, len(stats)) + _, err = GetAggregatedStats(context.Background(), ds, fleet.AggregatedStatsTypeScheduledQuery, query.ID) + require.ErrorIs(t, err, sql.ErrNoRows) } func testQueriesList(t *testing.T, ds *Datastore) { diff --git a/server/fleet/datastore.go b/server/fleet/datastore.go index c00a646707..66d81d5a5b 100644 --- a/server/fleet/datastore.go +++ b/server/fleet/datastore.go @@ -70,7 +70,7 @@ type Datastore interface { // NewQuery creates a new query object in thie datastore. The returned query should have the ID updated. NewQuery(ctx context.Context, query *Query, opts ...OptionalArg) (*Query, error) // SaveQuery saves changes to an existing query object. - SaveQuery(ctx context.Context, query *Query, shouldDiscardResults bool) error + SaveQuery(ctx context.Context, query *Query, shouldDiscardResults bool, shouldDeleteStats bool) error // DeleteQuery deletes an existing query object on a team. If teamID is nil, then the query is // looked up in the 'global' team. DeleteQuery(ctx context.Context, teamID *uint, name string) error diff --git a/server/mock/datastore_mock.go b/server/mock/datastore_mock.go index 17d19fcc4a..d28e4c1716 100644 --- a/server/mock/datastore_mock.go +++ b/server/mock/datastore_mock.go @@ -60,7 +60,7 @@ type ApplyQueriesFunc func(ctx context.Context, authorID uint, queries []*fleet. type NewQueryFunc func(ctx context.Context, query *fleet.Query, opts ...fleet.OptionalArg) (*fleet.Query, error) -type SaveQueryFunc func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool) error +type SaveQueryFunc func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool, shouldDeleteStats bool) error type DeleteQueryFunc func(ctx context.Context, teamID *uint, name string) error @@ -2084,11 +2084,11 @@ func (s *DataStore) NewQuery(ctx context.Context, query *fleet.Query, opts ...fl return s.NewQueryFunc(ctx, query, opts...) } -func (s *DataStore) SaveQuery(ctx context.Context, query *fleet.Query, shouldDiscardResults bool) error { +func (s *DataStore) SaveQuery(ctx context.Context, query *fleet.Query, shouldDiscardResults bool, shouldDeleteStats bool) error { s.mu.Lock() s.SaveQueryFuncInvoked = true s.mu.Unlock() - return s.SaveQueryFunc(ctx, query, shouldDiscardResults) + return s.SaveQueryFunc(ctx, query, shouldDiscardResults, shouldDeleteStats) } func (s *DataStore) DeleteQuery(ctx context.Context, teamID *uint, name string) error { diff --git a/server/service/global_schedule_test.go b/server/service/global_schedule_test.go index d29e0f3f40..d34707d46a 100644 --- a/server/service/global_schedule_test.go +++ b/server/service/global_schedule_test.go @@ -24,7 +24,7 @@ func TestGlobalScheduleAuth(t *testing.T) { Query: "SELECT 1;", }, nil } - ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool) error { + ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool, shouldDeleteStats bool) error { return nil } ds.NewActivityFunc = func(ctx context.Context, user *fleet.User, activity fleet.ActivityDetails) error { diff --git a/server/service/queries.go b/server/service/queries.go index 3730d96487..415364f734 100644 --- a/server/service/queries.go +++ b/server/service/queries.go @@ -321,7 +321,7 @@ func modifyQueryEndpoint(ctx context.Context, request interface{}, svc fleet.Ser func (svc *Service) ModifyQuery(ctx context.Context, id uint, p fleet.QueryPayload) (*fleet.Query, error) { // Load query first to determine if the user can modify it. query, err := svc.ds.Query(ctx, id) - shouldDiscardQueryResults := false + shouldDiscardQueryResults, shouldDeleteStats := false, false if err != nil { setAuthCheckedOnPreAuthErr(ctx) return nil, err @@ -349,6 +349,7 @@ func (svc *Service) ModifyQuery(ctx context.Context, id uint, p fleet.QueryPaylo if p.Query != nil { if query.Query != *p.Query { shouldDiscardQueryResults = true + shouldDeleteStats = true } query.Query = *p.Query } @@ -382,7 +383,7 @@ func (svc *Service) ModifyQuery(ctx context.Context, id uint, p fleet.QueryPaylo logging.WithExtras(ctx, "name", query.Name, "sql", query.Query) - if err := svc.ds.SaveQuery(ctx, query, shouldDiscardQueryResults); err != nil { + if err := svc.ds.SaveQuery(ctx, query, shouldDiscardQueryResults, shouldDeleteStats); err != nil { return nil, err } diff --git a/server/service/queries_test.go b/server/service/queries_test.go index 944a2fe146..95ed67d006 100644 --- a/server/service/queries_test.go +++ b/server/service/queries_test.go @@ -137,7 +137,7 @@ func TestQueryPayloadValidationModify(t *testing.T) { ObserverCanRun: false, }, nil } - ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool) error { + ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool, shouldDeleteStats bool) error { assert.NotEmpty(t, query) return nil } @@ -374,7 +374,7 @@ func TestQueryAuth(t *testing.T) { return 0, nil } - ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool) error { + ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool, shouldDeleteStats bool) error { return nil } ds.DeleteQueryFunc = func(ctx context.Context, teamID *uint, name string) error { diff --git a/server/service/team_schedule_test.go b/server/service/team_schedule_test.go index 41fa628ae4..16f74ab1c4 100644 --- a/server/service/team_schedule_test.go +++ b/server/service/team_schedule_test.go @@ -33,7 +33,7 @@ func TestTeamScheduleAuth(t *testing.T) { TeamID: nil, }, nil } - ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool) error { + ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool, shouldDeleteStats bool) error { return nil } ds.NewActivityFunc = func(ctx context.Context, user *fleet.User, activity fleet.ActivityDetails) error {