Fix queries stats ingestion (Performance impact) (#13432)

#13318

- [X] Changes file added for user-visible changes in `changes/` or
`orbit/changes/`.
See [Changes
files](https://fleetdm.com/docs/contributing/committing-changes#changes-files)
for more information.
- ~[ ] Documented any API changes (docs/Using-Fleet/REST-API.md or
docs/Contributing/API-for-contributors.md)~
- ~[ ] Documented any permissions changes~
- ~[ ] Input data is properly validated, `SELECT *` is avoided, SQL
injection is prevented (using placeholders for values in statements)~
- ~[ ] Added support on fleet's osquery simulator `cmd/osquery-perf` for
new osquery data ingestion features.~
- [x] Added/updated tests
- [X] Manual QA for all new/changed functionality
  - ~For Orbit and Fleet Desktop changes:~
- ~[ ] Manual QA must be performed in the three main OSs, macOS, Windows
and Linux.~
- ~[ ] Auto-update manual QA, from released version of component to new
version (see [tools/tuf/test](../tools/tuf/test/README.md)).~
This commit is contained in:
Lucas Manuel Rodriguez 2023-09-01 15:14:49 -03:00 committed by GitHub
parent 7d6264b46b
commit 03caba2030
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 391 additions and 7 deletions

View file

@ -0,0 +1 @@
* Fixed a bug where query stats (aka `Performance impact`) were not being populated in Fleet.

View file

@ -164,7 +164,7 @@ func saveHostPackStatsDB(ctx context.Context, db *sqlx.DB, teamID *uint, hostID
}
scheduledQueriesArgs = append(scheduledQueriesArgs,
teamIDArg,
query.QueryName,
query.ScheduledQueryName,
hostID,
query.AverageMemory,

View file

@ -9,8 +9,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"net/url"
@ -21,14 +19,19 @@ import (
"testing"
"time"
"github.com/WatchBeam/clock"
"github.com/fleetdm/fleet/v4/pkg/fleethttp"
"github.com/fleetdm/fleet/v4/server"
"github.com/fleetdm/fleet/v4/server/config"
"github.com/fleetdm/fleet/v4/server/datastore/mysql"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/live_query/live_query_mock"
"github.com/fleetdm/fleet/v4/server/ptr"
"github.com/fleetdm/fleet/v4/server/service/async"
"github.com/fleetdm/fleet/v4/server/service/osquery_utils"
"github.com/fleetdm/fleet/v4/server/test"
"github.com/ghodss/yaml"
"github.com/go-kit/kit/log"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
@ -670,7 +673,7 @@ func (s *integrationTestSuite) TestVulnerableSoftware() {
require.True(t, inserted)
resp := s.Do("GET", fmt.Sprintf("/api/latest/fleet/hosts/%d", host.ID), nil, http.StatusOK)
bodyBytes, err := ioutil.ReadAll(resp.Body)
bodyBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)
expectedJSONSoft2 := `"name": "bar",
@ -696,7 +699,7 @@ func (s *integrationTestSuite) TestVulnerableSoftware() {
// no software host counts have been calculated yet, so this returns nothing
var lsResp listSoftwareResponse
resp = s.Do("GET", "/api/latest/fleet/software", nil, http.StatusOK, "vulnerable", "true", "order_key", "generated_cpe", "order_direction", "desc")
bodyBytes, err = ioutil.ReadAll(resp.Body)
bodyBytes, err = io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Contains(t, string(bodyBytes), `"counts_updated_at": null`)
@ -2173,7 +2176,7 @@ func (s *integrationTestSuite) TestHostDetailsPolicies() {
require.NoError(t, err)
resp := s.Do("GET", fmt.Sprintf("/api/latest/fleet/hosts/%d", host1.ID), nil, http.StatusOK)
b, err := ioutil.ReadAll(resp.Body)
b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
var r struct {
Host *HostDetailResponse `json:"host"`
@ -6393,7 +6396,6 @@ func (s *integrationTestSuite) TestAppleMDMNotConfigured() {
for _, route := range mdmAppleConfigurationRequiredEndpoints() {
which := fmt.Sprintf("%s %s", route.method, route.path)
log.Print(which)
var expectedErr fleet.ErrWithStatusCode = fleet.ErrMDMNotConfigured
if route.premiumOnly && route.deviceAuthenticated {
// user-authenticated premium-only routes will never see the ErrMissingLicense error
@ -6889,3 +6891,369 @@ const (
}
}`
)
func (s *integrationTestSuite) TestDirectIngestScheduledQueryStats() {
t := s.T()
team1, err := s.ds.NewTeam(context.Background(), &fleet.Team{
Name: "Foobar",
})
require.NoError(t, err)
team2, err := s.ds.NewTeam(context.Background(), &fleet.Team{
Name: "Zoo",
})
require.NoError(t, err)
globalHost, err := s.ds.NewHost(context.Background(), &fleet.Host{
DetailUpdatedAt: time.Now(),
LabelUpdatedAt: time.Now(),
PolicyUpdatedAt: time.Now(),
SeenTime: time.Now().Add(-1 * time.Minute),
OsqueryHostID: ptr.String(uuid.New().String()),
NodeKey: ptr.String(uuid.New().String()),
UUID: uuid.New().String(),
Hostname: fmt.Sprintf("%sfoo.global", t.Name()),
Platform: "darwin",
})
require.NoError(t, err)
team1Host, err := s.ds.NewHost(context.Background(), &fleet.Host{
DetailUpdatedAt: time.Now(),
LabelUpdatedAt: time.Now(),
PolicyUpdatedAt: time.Now(),
SeenTime: time.Now().Add(-1 * time.Minute),
OsqueryHostID: ptr.String(uuid.New().String()),
NodeKey: ptr.String(uuid.New().String()),
UUID: uuid.New().String(),
Hostname: fmt.Sprintf("%sfoo.team", t.Name()),
Platform: "darwin",
TeamID: &team1.ID,
})
require.NoError(t, err)
scheduledGlobalQuery, err := s.ds.NewQuery(context.Background(), &fleet.Query{
Name: "scheduled-global-query",
TeamID: nil,
Interval: 10,
Platform: "darwin",
AutomationsEnabled: true,
Logging: "snapshot",
Description: "foobar",
Query: "SELECT * from time;",
Saved: true,
})
require.NoError(t, err)
nonScheduledGlobalQuery, err := s.ds.NewQuery(context.Background(), &fleet.Query{
Name: "non-scheduled-global-query",
TeamID: nil,
Interval: 0,
Platform: "darwin",
AutomationsEnabled: false,
Logging: "snapshot",
Description: "foobar",
Query: "SELECT * from osquery_info;",
Saved: true,
})
require.NoError(t, err)
scheduledTeam1Query1, err := s.ds.NewQuery(context.Background(), &fleet.Query{
Name: "scheduled-team1-query1",
TeamID: &team1.ID,
Interval: 20,
Platform: "",
AutomationsEnabled: true,
Logging: "snapshot",
Description: "foobar",
Query: "SELECT * from other;",
Saved: true,
})
require.NoError(t, err)
scheduledTeam1Query2, err := s.ds.NewQuery(context.Background(), &fleet.Query{
Name: "scheduled-team1-query2",
TeamID: &team1.ID,
Interval: 90,
Platform: "",
AutomationsEnabled: true,
Logging: "snapshot",
Description: "foobar",
Query: "SELECT * from other;",
Saved: true,
})
require.NoError(t, err)
// Create a non-scheduled query to test that we filter it out when providing
// the queries in the osquery/config endpoint.
_, err = s.ds.NewQuery(context.Background(), &fleet.Query{
Name: "non-scheduled-team1-query",
TeamID: &team1.ID,
Interval: 0,
Platform: "",
AutomationsEnabled: false,
Logging: "snapshot",
Description: "foobar",
Query: "SELECT * from foobar;",
Saved: true,
})
require.NoError(t, err)
// Create a scheduled query but on another team to test that we filter it
// out when providing the queries in the osquery/config endpoint.
_, err = s.ds.NewQuery(context.Background(), &fleet.Query{
Name: "scheduled-team2-query",
TeamID: &team2.ID,
Interval: 40,
Platform: "",
AutomationsEnabled: true,
Logging: "snapshot",
Description: "foobar",
Query: "SELECT * from other;",
Saved: true,
})
require.NoError(t, err)
// Create a legacy 2017 user pack with one query.
userPack1TargetTeam1, err := s.ds.NewPack(context.Background(), &fleet.Pack{
Name: "2017 Pack",
Type: nil,
Teams: []fleet.Target{{TargetID: team1.ID, Type: fleet.TargetTeam}},
TeamIDs: []uint{team1.ID},
})
require.NoError(t, err)
scheduledQueryOnPack1, err := s.ds.NewScheduledQuery(context.Background(), &fleet.ScheduledQuery{
Name: "scheduled-query-pack1",
PackID: userPack1TargetTeam1.ID,
QueryID: nonScheduledGlobalQuery.ID,
Interval: 60,
Snapshot: ptr.Bool(true),
Removed: ptr.Bool(true),
})
require.NoError(t, err)
// Simulate the osquery instance of the global host calling the osquery/config endpoint
// and test the returned scheduled queries.
req := getClientConfigRequest{NodeKey: *globalHost.NodeKey}
var resp getClientConfigResponse
s.DoJSON("POST", "/api/osquery/config", req, http.StatusOK, &resp)
packs := resp.Config["packs"].(map[string]interface{})
require.Len(t, packs, 1)
globalQueries := packs["Global"].(map[string]interface{})["queries"].(map[string]interface{})
require.Len(t, globalQueries, 1)
require.Contains(t, globalQueries, scheduledGlobalQuery.Name)
// Simulate the osquery instance of the team host calling the osquery/config endpoint
// and test the returned scheduled queries.
req = getClientConfigRequest{NodeKey: *team1Host.NodeKey}
resp = getClientConfigResponse{}
s.DoJSON("POST", "/api/osquery/config", req, http.StatusOK, &resp)
packs = resp.Config["packs"].(map[string]interface{})
require.Len(t, packs, 3)
globalQueries = packs["Global"].(map[string]interface{})["queries"].(map[string]interface{})
require.Len(t, globalQueries, 1)
require.Contains(t, globalQueries, scheduledGlobalQuery.Name)
team1Queries := packs[fmt.Sprintf("team-%d", team1.ID)].(map[string]interface{})["queries"].(map[string]interface{})
require.Len(t, team1Queries, 2)
require.Contains(t, team1Queries, scheduledTeam1Query1.Name)
require.Contains(t, team1Queries, scheduledTeam1Query2.Name)
userPack1Queries := packs[userPack1TargetTeam1.Name].(map[string]interface{})["queries"].(map[string]interface{})
require.Len(t, userPack1Queries, 1)
require.Contains(t, userPack1Queries, scheduledQueryOnPack1.Name)
// Now let's simulate a osquery instance running in the team host returning the
// stats in the distributed/write (osquery_schedule table)
rows := []map[string]string{
{
"name": "pack/Global/scheduled-global-query",
"query": "SELECT * FROM time;",
"interval": "10",
"executions": "2",
"last_executed": "1693476753",
"denylisted": "0",
"output_size": "576",
"wall_time": "1",
"wall_time_ms": "2",
"last_wall_time_ms": "3",
"user_time": "4",
"last_user_time": "5",
"system_time": "6",
"last_system_time": "7",
"average_memory": "8",
"last_memory": "9",
"delimiter": "/",
},
{
"name": "pack/2017 Pack/scheduled-query-pack1",
"query": "SELECT * FROM osquery_info;",
"interval": "60",
"executions": "20",
"last_executed": "1693476842",
"denylisted": "0",
"output_size": "9620",
"wall_time": "9",
"wall_time_ms": "8",
"last_wall_time_ms": "7",
"user_time": "6",
"last_user_time": "5",
"system_time": "4",
"last_system_time": "3",
"average_memory": "2",
"last_memory": "1",
"delimiter": "/",
},
{
"name": fmt.Sprintf("pack/team-%d/scheduled-team1-query1", team1.ID),
"query": "SELECT * FROM other;",
"interval": "20",
"executions": "1",
"last_executed": "1693476561",
"denylisted": "0",
"output_size": "10",
"wall_time": "11",
"wall_time_ms": "12",
"last_wall_time_ms": "13",
"user_time": "14",
"last_user_time": "15",
"system_time": "16",
"last_system_time": "17",
"average_memory": "18",
"last_memory": "19",
"delimiter": "/",
},
{
"name": fmt.Sprintf("pack/team-%d/scheduled-team1-query2", team1.ID),
"query": "SELECT * FROM other;",
"interval": "90",
"executions": "5",
"last_executed": "1693476666",
"denylisted": "0",
"output_size": "20",
"wall_time": "21",
"wall_time_ms": "22",
"last_wall_time_ms": "23",
"user_time": "24",
"last_user_time": "25",
"system_time": "26",
"last_system_time": "27",
"average_memory": "28",
"last_memory": "29",
"delimiter": "/",
},
}
appConfig, err := s.ds.AppConfig(context.Background())
require.NoError(t, err)
detailQueries := osquery_utils.GetDetailQueries(context.Background(), config.FleetConfig{
App: config.AppConfig{
EnableScheduledQueryStats: true,
},
}, appConfig, &appConfig.Features)
task := async.NewTask(s.ds, nil, clock.C, config.OsqueryConfig{})
err = detailQueries["scheduled_query_stats"].DirectTaskIngestFunc(
context.Background(),
log.NewNopLogger(),
team1Host,
task,
rows,
)
require.NoError(t, err)
// Check that the received stats were stored in the DB as expected.
var scheduledQueriesStats []fleet.ScheduledQueryStats
mysql.ExecAdhocSQL(t, s.ds, func(q sqlx.ExtContext) error {
return sqlx.SelectContext(context.Background(), q, &scheduledQueriesStats,
`SELECT
scheduled_query_id, q.name AS scheduled_query_name, average_memory, denylisted,
executions, q.schedule_interval, last_executed,
output_size, system_time, user_time, wall_time
FROM scheduled_query_stats sqs
JOIN queries q ON sqs.scheduled_query_id = q.id
WHERE host_id = ?;`,
team1Host.ID,
)
})
require.Len(t, scheduledQueriesStats, 4)
rowsMap := make(map[string]map[string]string)
for _, row := range rows {
parts := strings.Split(row["name"], "/")
queryName := parts[len(parts)-1]
// we need to map this because 2017 packs send the name of the schedule and not
// the name of the query.
if queryName == "scheduled-query-pack1" {
queryName = "non-scheduled-global-query"
}
rowsMap[queryName] = row
}
for _, sqs := range scheduledQueriesStats {
row := rowsMap[sqs.ScheduledQueryName]
require.Equal(t, strconv.FormatInt(int64(sqs.AverageMemory), 10), row["average_memory"])
require.Equal(t, strconv.FormatInt(int64(sqs.Executions), 10), row["executions"])
interval := row["interval"]
if sqs.ScheduledQueryName == "non-scheduled-global-query" {
interval = "0" // this query has metrics because it runs on a pack.
}
require.Equal(t, strconv.FormatInt(int64(sqs.Interval), 10), interval)
lastExecuted, err := strconv.ParseInt(row["last_executed"], 10, 64)
require.NoError(t, err)
require.WithinDuration(t, sqs.LastExecuted, time.Unix(lastExecuted, 0), 1*time.Second)
require.Equal(t, strconv.FormatInt(int64(sqs.OutputSize), 10), row["output_size"])
require.Equal(t, strconv.FormatInt(int64(sqs.SystemTime), 10), row["system_time"])
require.Equal(t, strconv.FormatInt(int64(sqs.UserTime), 10), row["user_time"])
require.Equal(t, strconv.FormatInt(int64(sqs.WallTime), 10), row["wall_time"])
}
// Now let's simulate a osquery instance running in the global host returning the
// stats in the distributed/write (osquery_schedule table)
rows = []map[string]string{
{
"name": "pack/Global/scheduled-global-query",
"query": "SELECT * FROM time;",
"interval": "10",
"executions": "2",
"last_executed": "1693476753",
"denylisted": "0",
"output_size": "576",
"wall_time": "1",
"wall_time_ms": "2",
"last_wall_time_ms": "3",
"user_time": "4",
"last_user_time": "5",
"system_time": "6",
"last_system_time": "7",
"average_memory": "8",
"last_memory": "9",
"delimiter": "/",
},
}
err = detailQueries["scheduled_query_stats"].DirectTaskIngestFunc(
context.Background(),
log.NewNopLogger(),
globalHost,
task,
rows,
)
require.NoError(t, err)
// Check that the received stats were stored in the DB as expected.
scheduledQueriesStats = []fleet.ScheduledQueryStats{}
mysql.ExecAdhocSQL(t, s.ds, func(q sqlx.ExtContext) error {
return sqlx.SelectContext(context.Background(), q, &scheduledQueriesStats,
`SELECT
scheduled_query_id, q.name AS scheduled_query_name, average_memory, denylisted,
executions, q.schedule_interval, last_executed,
output_size, system_time, user_time, wall_time
FROM scheduled_query_stats sqs
JOIN queries q ON sqs.scheduled_query_id = q.id
WHERE host_id = ?;`,
globalHost.ID,
)
})
require.Len(t, scheduledQueriesStats, 1)
row := rows[0]
parts := strings.Split(row["name"], "/")
queryName := parts[len(parts)-1]
sqs := scheduledQueriesStats[0]
require.Equal(t, scheduledQueriesStats[0].ScheduledQueryName, queryName)
require.Equal(t, strconv.FormatInt(int64(sqs.AverageMemory), 10), row["average_memory"])
require.Equal(t, strconv.FormatInt(int64(sqs.Executions), 10), row["executions"])
require.Equal(t, strconv.FormatInt(int64(sqs.Interval), 10), row["interval"])
lastExecuted, err := strconv.ParseInt(row["last_executed"], 10, 64)
require.NoError(t, err)
require.WithinDuration(t, sqs.LastExecuted, time.Unix(lastExecuted, 0), 1*time.Second)
require.Equal(t, strconv.FormatInt(int64(sqs.OutputSize), 10), row["output_size"])
require.Equal(t, strconv.FormatInt(int64(sqs.SystemTime), 10), row["system_time"])
require.Equal(t, strconv.FormatInt(int64(sqs.UserTime), 10), row["user_time"])
require.Equal(t, strconv.FormatInt(int64(sqs.WallTime), 10), row["wall_time"])
}

View file

@ -335,6 +335,14 @@ func (r getClientConfigResponse) MarshalJSON() ([]byte, error) {
return json.Marshal(r.Config)
}
// UnmarshalJSON implements json.Unmarshaler.
//
// Osquery expects the response for configs to be at the
// top-level of the JSON response.
func (r *getClientConfigResponse) UnmarshalJSON(data []byte) error {
return json.Unmarshal(data, &r.Config)
}
func getClientConfigEndpoint(ctx context.Context, request interface{}, svc fleet.Service) (errorer, error) {
config, err := svc.GetClientConfig(ctx)
if err != nil {

View file

@ -152,6 +152,13 @@ func (ts *withServer) commonTearDownTest(t *testing.T) {
require.NoError(t, err)
}
packs, err := ts.ds.ListPacks(ctx, fleet.PackListOptions{})
require.NoError(t, err)
for _, pack := range packs {
err := ts.ds.DeletePack(ctx, pack.Name)
require.NoError(t, err)
}
// SyncHostsSoftware performs a cleanup.
err = ts.ds.SyncHostsSoftware(ctx, time.Now())
require.NoError(t, err)