From 03caba2030875741a7ccc6942bc040485c4972ec Mon Sep 17 00:00:00 2001 From: Lucas Manuel Rodriguez Date: Fri, 1 Sep 2023 15:14:49 -0300 Subject: [PATCH] Fix queries stats ingestion (`Performance impact`) (#13432) #13318 - [X] Changes file added for user-visible changes in `changes/` or `orbit/changes/`. See [Changes files](https://fleetdm.com/docs/contributing/committing-changes#changes-files) for more information. - ~[ ] Documented any API changes (docs/Using-Fleet/REST-API.md or docs/Contributing/API-for-contributors.md)~ - ~[ ] Documented any permissions changes~ - ~[ ] Input data is properly validated, `SELECT *` is avoided, SQL injection is prevented (using placeholders for values in statements)~ - ~[ ] Added support on fleet's osquery simulator `cmd/osquery-perf` for new osquery data ingestion features.~ - [x] Added/updated tests - [X] Manual QA for all new/changed functionality - ~For Orbit and Fleet Desktop changes:~ - ~[ ] Manual QA must be performed in the three main OSs, macOS, Windows and Linux.~ - ~[ ] Auto-update manual QA, from released version of component to new version (see [tools/tuf/test](../tools/tuf/test/README.md)).~ --- changes/13318-fix-query-stats | 1 + server/datastore/mysql/hosts.go | 2 +- server/service/integration_core_test.go | 380 +++++++++++++++++++++++- server/service/osquery.go | 8 + server/service/testing_client.go | 7 + 5 files changed, 391 insertions(+), 7 deletions(-) create mode 100644 changes/13318-fix-query-stats diff --git a/changes/13318-fix-query-stats b/changes/13318-fix-query-stats new file mode 100644 index 0000000000..ec7c227eb3 --- /dev/null +++ b/changes/13318-fix-query-stats @@ -0,0 +1 @@ +* Fixed a bug where query stats (aka `Performance impact`) were not being populated in Fleet. diff --git a/server/datastore/mysql/hosts.go b/server/datastore/mysql/hosts.go index c16988e1fc..0533b5fcac 100644 --- a/server/datastore/mysql/hosts.go +++ b/server/datastore/mysql/hosts.go @@ -164,7 +164,7 @@ func saveHostPackStatsDB(ctx context.Context, db *sqlx.DB, teamID *uint, hostID } scheduledQueriesArgs = append(scheduledQueriesArgs, teamIDArg, - query.QueryName, + query.ScheduledQueryName, hostID, query.AverageMemory, diff --git a/server/service/integration_core_test.go b/server/service/integration_core_test.go index c56df78d39..5b1b1440c2 100644 --- a/server/service/integration_core_test.go +++ b/server/service/integration_core_test.go @@ -9,8 +9,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" - "log" "net/http" "net/http/httptest" "net/url" @@ -21,14 +19,19 @@ import ( "testing" "time" + "github.com/WatchBeam/clock" "github.com/fleetdm/fleet/v4/pkg/fleethttp" "github.com/fleetdm/fleet/v4/server" + "github.com/fleetdm/fleet/v4/server/config" "github.com/fleetdm/fleet/v4/server/datastore/mysql" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/live_query/live_query_mock" "github.com/fleetdm/fleet/v4/server/ptr" + "github.com/fleetdm/fleet/v4/server/service/async" + "github.com/fleetdm/fleet/v4/server/service/osquery_utils" "github.com/fleetdm/fleet/v4/server/test" "github.com/ghodss/yaml" + "github.com/go-kit/kit/log" "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/stretchr/testify/assert" @@ -670,7 +673,7 @@ func (s *integrationTestSuite) TestVulnerableSoftware() { require.True(t, inserted) resp := s.Do("GET", fmt.Sprintf("/api/latest/fleet/hosts/%d", host.ID), nil, http.StatusOK) - bodyBytes, err := ioutil.ReadAll(resp.Body) + bodyBytes, err := io.ReadAll(resp.Body) require.NoError(t, err) expectedJSONSoft2 := `"name": "bar", @@ -696,7 +699,7 @@ func (s *integrationTestSuite) TestVulnerableSoftware() { // no software host counts have been calculated yet, so this returns nothing var lsResp listSoftwareResponse resp = s.Do("GET", "/api/latest/fleet/software", nil, http.StatusOK, "vulnerable", "true", "order_key", "generated_cpe", "order_direction", "desc") - bodyBytes, err = ioutil.ReadAll(resp.Body) + bodyBytes, err = io.ReadAll(resp.Body) require.NoError(t, err) assert.Contains(t, string(bodyBytes), `"counts_updated_at": null`) @@ -2173,7 +2176,7 @@ func (s *integrationTestSuite) TestHostDetailsPolicies() { require.NoError(t, err) resp := s.Do("GET", fmt.Sprintf("/api/latest/fleet/hosts/%d", host1.ID), nil, http.StatusOK) - b, err := ioutil.ReadAll(resp.Body) + b, err := io.ReadAll(resp.Body) require.NoError(t, err) var r struct { Host *HostDetailResponse `json:"host"` @@ -6393,7 +6396,6 @@ func (s *integrationTestSuite) TestAppleMDMNotConfigured() { for _, route := range mdmAppleConfigurationRequiredEndpoints() { which := fmt.Sprintf("%s %s", route.method, route.path) - log.Print(which) var expectedErr fleet.ErrWithStatusCode = fleet.ErrMDMNotConfigured if route.premiumOnly && route.deviceAuthenticated { // user-authenticated premium-only routes will never see the ErrMissingLicense error @@ -6889,3 +6891,369 @@ const ( } }` ) + +func (s *integrationTestSuite) TestDirectIngestScheduledQueryStats() { + t := s.T() + + team1, err := s.ds.NewTeam(context.Background(), &fleet.Team{ + Name: "Foobar", + }) + require.NoError(t, err) + team2, err := s.ds.NewTeam(context.Background(), &fleet.Team{ + Name: "Zoo", + }) + require.NoError(t, err) + globalHost, err := s.ds.NewHost(context.Background(), &fleet.Host{ + DetailUpdatedAt: time.Now(), + LabelUpdatedAt: time.Now(), + PolicyUpdatedAt: time.Now(), + SeenTime: time.Now().Add(-1 * time.Minute), + OsqueryHostID: ptr.String(uuid.New().String()), + NodeKey: ptr.String(uuid.New().String()), + UUID: uuid.New().String(), + Hostname: fmt.Sprintf("%sfoo.global", t.Name()), + Platform: "darwin", + }) + require.NoError(t, err) + team1Host, err := s.ds.NewHost(context.Background(), &fleet.Host{ + DetailUpdatedAt: time.Now(), + LabelUpdatedAt: time.Now(), + PolicyUpdatedAt: time.Now(), + SeenTime: time.Now().Add(-1 * time.Minute), + OsqueryHostID: ptr.String(uuid.New().String()), + NodeKey: ptr.String(uuid.New().String()), + UUID: uuid.New().String(), + Hostname: fmt.Sprintf("%sfoo.team", t.Name()), + Platform: "darwin", + TeamID: &team1.ID, + }) + require.NoError(t, err) + scheduledGlobalQuery, err := s.ds.NewQuery(context.Background(), &fleet.Query{ + Name: "scheduled-global-query", + TeamID: nil, + Interval: 10, + Platform: "darwin", + AutomationsEnabled: true, + Logging: "snapshot", + Description: "foobar", + Query: "SELECT * from time;", + Saved: true, + }) + require.NoError(t, err) + nonScheduledGlobalQuery, err := s.ds.NewQuery(context.Background(), &fleet.Query{ + Name: "non-scheduled-global-query", + TeamID: nil, + Interval: 0, + Platform: "darwin", + AutomationsEnabled: false, + Logging: "snapshot", + Description: "foobar", + Query: "SELECT * from osquery_info;", + Saved: true, + }) + require.NoError(t, err) + scheduledTeam1Query1, err := s.ds.NewQuery(context.Background(), &fleet.Query{ + Name: "scheduled-team1-query1", + TeamID: &team1.ID, + Interval: 20, + Platform: "", + AutomationsEnabled: true, + Logging: "snapshot", + Description: "foobar", + Query: "SELECT * from other;", + Saved: true, + }) + require.NoError(t, err) + scheduledTeam1Query2, err := s.ds.NewQuery(context.Background(), &fleet.Query{ + Name: "scheduled-team1-query2", + TeamID: &team1.ID, + Interval: 90, + Platform: "", + AutomationsEnabled: true, + Logging: "snapshot", + Description: "foobar", + Query: "SELECT * from other;", + Saved: true, + }) + require.NoError(t, err) + // Create a non-scheduled query to test that we filter it out when providing + // the queries in the osquery/config endpoint. + _, err = s.ds.NewQuery(context.Background(), &fleet.Query{ + Name: "non-scheduled-team1-query", + TeamID: &team1.ID, + Interval: 0, + Platform: "", + AutomationsEnabled: false, + Logging: "snapshot", + Description: "foobar", + Query: "SELECT * from foobar;", + Saved: true, + }) + require.NoError(t, err) + // Create a scheduled query but on another team to test that we filter it + // out when providing the queries in the osquery/config endpoint. + _, err = s.ds.NewQuery(context.Background(), &fleet.Query{ + Name: "scheduled-team2-query", + TeamID: &team2.ID, + Interval: 40, + Platform: "", + AutomationsEnabled: true, + Logging: "snapshot", + Description: "foobar", + Query: "SELECT * from other;", + Saved: true, + }) + require.NoError(t, err) + // Create a legacy 2017 user pack with one query. + userPack1TargetTeam1, err := s.ds.NewPack(context.Background(), &fleet.Pack{ + Name: "2017 Pack", + Type: nil, + Teams: []fleet.Target{{TargetID: team1.ID, Type: fleet.TargetTeam}}, + TeamIDs: []uint{team1.ID}, + }) + require.NoError(t, err) + scheduledQueryOnPack1, err := s.ds.NewScheduledQuery(context.Background(), &fleet.ScheduledQuery{ + Name: "scheduled-query-pack1", + PackID: userPack1TargetTeam1.ID, + QueryID: nonScheduledGlobalQuery.ID, + Interval: 60, + Snapshot: ptr.Bool(true), + Removed: ptr.Bool(true), + }) + require.NoError(t, err) + + // Simulate the osquery instance of the global host calling the osquery/config endpoint + // and test the returned scheduled queries. + req := getClientConfigRequest{NodeKey: *globalHost.NodeKey} + var resp getClientConfigResponse + s.DoJSON("POST", "/api/osquery/config", req, http.StatusOK, &resp) + packs := resp.Config["packs"].(map[string]interface{}) + require.Len(t, packs, 1) + globalQueries := packs["Global"].(map[string]interface{})["queries"].(map[string]interface{}) + require.Len(t, globalQueries, 1) + require.Contains(t, globalQueries, scheduledGlobalQuery.Name) + + // Simulate the osquery instance of the team host calling the osquery/config endpoint + // and test the returned scheduled queries. + req = getClientConfigRequest{NodeKey: *team1Host.NodeKey} + resp = getClientConfigResponse{} + s.DoJSON("POST", "/api/osquery/config", req, http.StatusOK, &resp) + packs = resp.Config["packs"].(map[string]interface{}) + require.Len(t, packs, 3) + globalQueries = packs["Global"].(map[string]interface{})["queries"].(map[string]interface{}) + require.Len(t, globalQueries, 1) + require.Contains(t, globalQueries, scheduledGlobalQuery.Name) + team1Queries := packs[fmt.Sprintf("team-%d", team1.ID)].(map[string]interface{})["queries"].(map[string]interface{}) + require.Len(t, team1Queries, 2) + require.Contains(t, team1Queries, scheduledTeam1Query1.Name) + require.Contains(t, team1Queries, scheduledTeam1Query2.Name) + userPack1Queries := packs[userPack1TargetTeam1.Name].(map[string]interface{})["queries"].(map[string]interface{}) + require.Len(t, userPack1Queries, 1) + require.Contains(t, userPack1Queries, scheduledQueryOnPack1.Name) + + // Now let's simulate a osquery instance running in the team host returning the + // stats in the distributed/write (osquery_schedule table) + rows := []map[string]string{ + { + "name": "pack/Global/scheduled-global-query", + "query": "SELECT * FROM time;", + "interval": "10", + "executions": "2", + "last_executed": "1693476753", + "denylisted": "0", + "output_size": "576", + "wall_time": "1", + "wall_time_ms": "2", + "last_wall_time_ms": "3", + "user_time": "4", + "last_user_time": "5", + "system_time": "6", + "last_system_time": "7", + "average_memory": "8", + "last_memory": "9", + "delimiter": "/", + }, + { + "name": "pack/2017 Pack/scheduled-query-pack1", + "query": "SELECT * FROM osquery_info;", + "interval": "60", + "executions": "20", + "last_executed": "1693476842", + "denylisted": "0", + "output_size": "9620", + "wall_time": "9", + "wall_time_ms": "8", + "last_wall_time_ms": "7", + "user_time": "6", + "last_user_time": "5", + "system_time": "4", + "last_system_time": "3", + "average_memory": "2", + "last_memory": "1", + "delimiter": "/", + }, + { + "name": fmt.Sprintf("pack/team-%d/scheduled-team1-query1", team1.ID), + "query": "SELECT * FROM other;", + "interval": "20", + "executions": "1", + "last_executed": "1693476561", + "denylisted": "0", + "output_size": "10", + "wall_time": "11", + "wall_time_ms": "12", + "last_wall_time_ms": "13", + "user_time": "14", + "last_user_time": "15", + "system_time": "16", + "last_system_time": "17", + "average_memory": "18", + "last_memory": "19", + "delimiter": "/", + }, + { + "name": fmt.Sprintf("pack/team-%d/scheduled-team1-query2", team1.ID), + "query": "SELECT * FROM other;", + "interval": "90", + "executions": "5", + "last_executed": "1693476666", + "denylisted": "0", + "output_size": "20", + "wall_time": "21", + "wall_time_ms": "22", + "last_wall_time_ms": "23", + "user_time": "24", + "last_user_time": "25", + "system_time": "26", + "last_system_time": "27", + "average_memory": "28", + "last_memory": "29", + "delimiter": "/", + }, + } + + appConfig, err := s.ds.AppConfig(context.Background()) + require.NoError(t, err) + detailQueries := osquery_utils.GetDetailQueries(context.Background(), config.FleetConfig{ + App: config.AppConfig{ + EnableScheduledQueryStats: true, + }, + }, appConfig, &appConfig.Features) + task := async.NewTask(s.ds, nil, clock.C, config.OsqueryConfig{}) + err = detailQueries["scheduled_query_stats"].DirectTaskIngestFunc( + context.Background(), + log.NewNopLogger(), + team1Host, + task, + rows, + ) + require.NoError(t, err) + + // Check that the received stats were stored in the DB as expected. + var scheduledQueriesStats []fleet.ScheduledQueryStats + mysql.ExecAdhocSQL(t, s.ds, func(q sqlx.ExtContext) error { + return sqlx.SelectContext(context.Background(), q, &scheduledQueriesStats, + `SELECT + scheduled_query_id, q.name AS scheduled_query_name, average_memory, denylisted, + executions, q.schedule_interval, last_executed, + output_size, system_time, user_time, wall_time + FROM scheduled_query_stats sqs + JOIN queries q ON sqs.scheduled_query_id = q.id + WHERE host_id = ?;`, + team1Host.ID, + ) + }) + require.Len(t, scheduledQueriesStats, 4) + rowsMap := make(map[string]map[string]string) + for _, row := range rows { + parts := strings.Split(row["name"], "/") + queryName := parts[len(parts)-1] + // we need to map this because 2017 packs send the name of the schedule and not + // the name of the query. + if queryName == "scheduled-query-pack1" { + queryName = "non-scheduled-global-query" + } + rowsMap[queryName] = row + } + for _, sqs := range scheduledQueriesStats { + row := rowsMap[sqs.ScheduledQueryName] + require.Equal(t, strconv.FormatInt(int64(sqs.AverageMemory), 10), row["average_memory"]) + require.Equal(t, strconv.FormatInt(int64(sqs.Executions), 10), row["executions"]) + interval := row["interval"] + if sqs.ScheduledQueryName == "non-scheduled-global-query" { + interval = "0" // this query has metrics because it runs on a pack. + } + require.Equal(t, strconv.FormatInt(int64(sqs.Interval), 10), interval) + lastExecuted, err := strconv.ParseInt(row["last_executed"], 10, 64) + require.NoError(t, err) + require.WithinDuration(t, sqs.LastExecuted, time.Unix(lastExecuted, 0), 1*time.Second) + require.Equal(t, strconv.FormatInt(int64(sqs.OutputSize), 10), row["output_size"]) + require.Equal(t, strconv.FormatInt(int64(sqs.SystemTime), 10), row["system_time"]) + require.Equal(t, strconv.FormatInt(int64(sqs.UserTime), 10), row["user_time"]) + require.Equal(t, strconv.FormatInt(int64(sqs.WallTime), 10), row["wall_time"]) + } + + // Now let's simulate a osquery instance running in the global host returning the + // stats in the distributed/write (osquery_schedule table) + rows = []map[string]string{ + { + "name": "pack/Global/scheduled-global-query", + "query": "SELECT * FROM time;", + "interval": "10", + "executions": "2", + "last_executed": "1693476753", + "denylisted": "0", + "output_size": "576", + "wall_time": "1", + "wall_time_ms": "2", + "last_wall_time_ms": "3", + "user_time": "4", + "last_user_time": "5", + "system_time": "6", + "last_system_time": "7", + "average_memory": "8", + "last_memory": "9", + "delimiter": "/", + }, + } + + err = detailQueries["scheduled_query_stats"].DirectTaskIngestFunc( + context.Background(), + log.NewNopLogger(), + globalHost, + task, + rows, + ) + require.NoError(t, err) + + // Check that the received stats were stored in the DB as expected. + scheduledQueriesStats = []fleet.ScheduledQueryStats{} + mysql.ExecAdhocSQL(t, s.ds, func(q sqlx.ExtContext) error { + return sqlx.SelectContext(context.Background(), q, &scheduledQueriesStats, + `SELECT + scheduled_query_id, q.name AS scheduled_query_name, average_memory, denylisted, + executions, q.schedule_interval, last_executed, + output_size, system_time, user_time, wall_time + FROM scheduled_query_stats sqs + JOIN queries q ON sqs.scheduled_query_id = q.id + WHERE host_id = ?;`, + globalHost.ID, + ) + }) + require.Len(t, scheduledQueriesStats, 1) + row := rows[0] + parts := strings.Split(row["name"], "/") + queryName := parts[len(parts)-1] + sqs := scheduledQueriesStats[0] + require.Equal(t, scheduledQueriesStats[0].ScheduledQueryName, queryName) + require.Equal(t, strconv.FormatInt(int64(sqs.AverageMemory), 10), row["average_memory"]) + require.Equal(t, strconv.FormatInt(int64(sqs.Executions), 10), row["executions"]) + require.Equal(t, strconv.FormatInt(int64(sqs.Interval), 10), row["interval"]) + lastExecuted, err := strconv.ParseInt(row["last_executed"], 10, 64) + require.NoError(t, err) + require.WithinDuration(t, sqs.LastExecuted, time.Unix(lastExecuted, 0), 1*time.Second) + require.Equal(t, strconv.FormatInt(int64(sqs.OutputSize), 10), row["output_size"]) + require.Equal(t, strconv.FormatInt(int64(sqs.SystemTime), 10), row["system_time"]) + require.Equal(t, strconv.FormatInt(int64(sqs.UserTime), 10), row["user_time"]) + require.Equal(t, strconv.FormatInt(int64(sqs.WallTime), 10), row["wall_time"]) +} diff --git a/server/service/osquery.go b/server/service/osquery.go index cc47a39cd4..12c054ac5e 100644 --- a/server/service/osquery.go +++ b/server/service/osquery.go @@ -335,6 +335,14 @@ func (r getClientConfigResponse) MarshalJSON() ([]byte, error) { return json.Marshal(r.Config) } +// UnmarshalJSON implements json.Unmarshaler. +// +// Osquery expects the response for configs to be at the +// top-level of the JSON response. +func (r *getClientConfigResponse) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &r.Config) +} + func getClientConfigEndpoint(ctx context.Context, request interface{}, svc fleet.Service) (errorer, error) { config, err := svc.GetClientConfig(ctx) if err != nil { diff --git a/server/service/testing_client.go b/server/service/testing_client.go index 3fccd3f2a6..f51c97f446 100644 --- a/server/service/testing_client.go +++ b/server/service/testing_client.go @@ -152,6 +152,13 @@ func (ts *withServer) commonTearDownTest(t *testing.T) { require.NoError(t, err) } + packs, err := ts.ds.ListPacks(ctx, fleet.PackListOptions{}) + require.NoError(t, err) + for _, pack := range packs { + err := ts.ds.DeletePack(ctx, pack.Name) + require.NoError(t, err) + } + // SyncHostsSoftware performs a cleanup. err = ts.ds.SyncHostsSoftware(ctx, time.Now()) require.NoError(t, err)