From fe5660e00647660c74b3cdb93e3543ab4acd161d Mon Sep 17 00:00:00 2001 From: Tomas Touceda Date: Tue, 19 Oct 2021 17:47:37 -0300 Subject: [PATCH] Reimplement host expiration to not need mysql events (#2552) * Reimplement host expiration to not need mysql events * Update mocks --- .../issue-2544-reimplement-host-expiration | 1 + cmd/fleet/serve.go | 4 + server/datastore/mysql/app_configs.go | 75 ++----------------- server/datastore/mysql/hosts.go | 15 ++++ server/datastore/mysql/hosts_test.go | 61 +++++++++++++++ server/fleet/datastore.go | 1 + server/mock/datastore_mock.go | 18 ++++- server/service/global_policies_test.go | 2 +- server/service/team_policies_test.go | 2 +- 9 files changed, 103 insertions(+), 76 deletions(-) create mode 100644 changes/issue-2544-reimplement-host-expiration diff --git a/changes/issue-2544-reimplement-host-expiration b/changes/issue-2544-reimplement-host-expiration new file mode 100644 index 0000000000..43c92985dc --- /dev/null +++ b/changes/issue-2544-reimplement-host-expiration @@ -0,0 +1 @@ +* Reimplement host expiration to not depend on mysql events. diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index 7d56eb18b3..d81defd226 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -498,6 +498,10 @@ func cronCleanups(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger, if err != nil { level.Error(logger).Log("err", "cleaning label_membership", "details", err) } + err = ds.CleanupExpiredHosts(ctx) + if err != nil { + level.Error(logger).Log("err", "cleaning expired hosts", "details", err) + } err = trySendStatistics(ctx, ds, fleet.StatisticsFrequency, "https://fleetdm.com/api/v1/webhooks/receive-usage-analytics") if err != nil { diff --git a/server/datastore/mysql/app_configs.go b/server/datastore/mysql/app_configs.go index df63a4d5dc..9097461a45 100644 --- a/server/datastore/mysql/app_configs.go +++ b/server/datastore/mysql/app_configs.go @@ -4,11 +4,8 @@ import ( "context" "database/sql" "encoding/json" - "fmt" - "github.com/VividCortex/mysqlerr" "github.com/fleetdm/fleet/v4/server/fleet" - "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" "github.com/pkg/errors" ) @@ -24,9 +21,13 @@ func (d *Datastore) NewAppConfig(ctx context.Context, info *fleet.AppConfig) (*f } func (d *Datastore) AppConfig(ctx context.Context) (*fleet.AppConfig, error) { + return appConfigDB(ctx, d.reader) +} + +func appConfigDB(ctx context.Context, q sqlx.QueryerContext) (*fleet.AppConfig, error) { info := &fleet.AppConfig{} var bytes []byte - err := sqlx.GetContext(ctx, d.reader, &bytes, `SELECT json_value FROM app_config_json LIMIT 1`) + err := sqlx.GetContext(ctx, q, &bytes, `SELECT json_value FROM app_config_json LIMIT 1`) if err != nil && err != sql.ErrNoRows { return nil, errors.Wrap(err, "selecting app config") } @@ -43,79 +44,13 @@ func (d *Datastore) AppConfig(ctx context.Context) (*fleet.AppConfig, error) { return info, nil } -func (d *Datastore) isEventSchedulerEnabled() (bool, error) { - rows, err := d.writer.Query("SELECT @@event_scheduler") - if err != nil { - return false, err - } - defer rows.Close() - - if !rows.Next() { - err := errors.New("Error detecting MySQL event scheduler status.") - if rerr := rows.Err(); rerr != nil { - err = rerr - } - return false, err - } - var value string - if err := rows.Scan(&value); err != nil { - return false, err - } - - return value == "ON", nil -} - -func manageHostExpiryEventDB(ctx context.Context, tx sqlx.ExtContext, hostExpiryEnabled bool, hostExpiryWindow int) error { - var err error - hostExpiryConfig := struct { - Window int `db:"host_expiry_window"` - }{} - if err = sqlx.GetContext(ctx, tx, &hostExpiryConfig, "SELECT host_expiry_window from app_configs LIMIT 1"); err != nil { - return errors.Wrap(err, "get expiry window setting") - } - - shouldUpdateWindow := hostExpiryEnabled && hostExpiryConfig.Window != hostExpiryWindow - - if !hostExpiryEnabled || shouldUpdateWindow { - if _, err := tx.ExecContext(ctx, "DROP EVENT IF EXISTS host_expiry"); err != nil { - if driverErr, ok := err.(*mysql.MySQLError); !ok || driverErr.Number != mysqlerr.ER_DBACCESS_DENIED_ERROR { - return errors.Wrap(err, "drop existing host_expiry event") - } - } - } - - if shouldUpdateWindow { - sql := fmt.Sprintf("CREATE EVENT IF NOT EXISTS host_expiry ON SCHEDULE EVERY 1 HOUR ON COMPLETION PRESERVE DO DELETE FROM hosts WHERE seen_time < DATE_SUB(NOW(), INTERVAL %d DAY)", hostExpiryWindow) - if _, err := tx.ExecContext(ctx, sql); err != nil { - return errors.Wrap(err, "create new host_expiry event") - } - } - return nil -} - func (d *Datastore) SaveAppConfig(ctx context.Context, info *fleet.AppConfig) error { - eventSchedulerEnabled, err := d.isEventSchedulerEnabled() - if err != nil { - return err - } - - expiryEnabled := info.HostExpirySettings.HostExpiryEnabled - expiryWindow := info.HostExpirySettings.HostExpiryWindow - - if !eventSchedulerEnabled && expiryEnabled { - return errors.New("MySQL event scheduler must be enabled to configure host expiry.") - } - configBytes, err := json.Marshal(info) if err != nil { return errors.Wrap(err, "marshaling config") } return d.withTx(ctx, func(tx sqlx.ExtContext) error { - if err := manageHostExpiryEventDB(ctx, tx, expiryEnabled, expiryWindow); err != nil { - return err - } - _, err := tx.ExecContext(ctx, `INSERT INTO app_config_json(json_value) VALUES(?) ON DUPLICATE KEY UPDATE json_value = VALUES(json_value)`, configBytes, diff --git a/server/datastore/mysql/hosts.go b/server/datastore/mysql/hosts.go index a402a30656..2d5dadea71 100644 --- a/server/datastore/mysql/hosts.go +++ b/server/datastore/mysql/hosts.go @@ -951,3 +951,18 @@ func (d *Datastore) ListPoliciesForHost(ctx context.Context, hid uint) (packs [] } return policies, nil } + +func (d *Datastore) CleanupExpiredHosts(ctx context.Context) error { + ac, err := appConfigDB(ctx, d.reader) + if err != nil { + return errors.Wrap(err, "getting app config") + } + if !ac.HostExpirySettings.HostExpiryEnabled { + return nil + } + _, err = d.writer.ExecContext(ctx, `DELETE FROM hosts WHERE seen_time < DATE_SUB(NOW(), INTERVAL ? DAY)`, ac.HostExpirySettings.HostExpiryWindow) + if err != nil { + return errors.Wrap(err, "deleting expired hosts") + } + return nil +} diff --git a/server/datastore/mysql/hosts_test.go b/server/datastore/mysql/hosts_test.go index c0c2294734..65c16be215 100644 --- a/server/datastore/mysql/hosts_test.go +++ b/server/datastore/mysql/hosts_test.go @@ -84,6 +84,7 @@ func TestHosts(t *testing.T) { {"AuthenticateHostLoadsDisk", testAuthenticateHostLoadsDisk}, {"HostsListBySoftware", testHostsListBySoftware}, {"HostsListFailingPolicies", testHostsListFailingPolicies}, + {"HostsExpiration", testHostsExpiration}, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { @@ -1972,3 +1973,63 @@ func testHostsSavePackStatsConcurrent(t *testing.T, ds *Datastore) { require.Fail(t, "timed out") } } + +func testHostsExpiration(t *testing.T, ds *Datastore) { + hostExpiryWindow := 70 + + ac, err := ds.AppConfig(context.Background()) + require.NoError(t, err) + + ac.HostExpirySettings.HostExpiryWindow = hostExpiryWindow + + err = ds.SaveAppConfig(context.Background(), ac) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + seenTime := time.Now() + if i >= 5 { + seenTime = seenTime.Add(time.Duration(-1*(hostExpiryWindow+1)*24) * time.Hour) + } + _, err := ds.NewHost(context.Background(), &fleet.Host{ + DetailUpdatedAt: time.Now(), + LabelUpdatedAt: time.Now(), + PolicyUpdatedAt: time.Now(), + SeenTime: seenTime, + OsqueryHostID: strconv.Itoa(i), + NodeKey: fmt.Sprintf("%d", i), + UUID: fmt.Sprintf("%d", i), + Hostname: fmt.Sprintf("foo.local%d", i), + }) + require.NoError(t, err) + } + + filter := fleet.TeamFilter{User: test.UserAdmin} + + hosts := listHostsCheckCount(t, ds, filter, fleet.HostListOptions{}, 10) + require.Len(t, hosts, 10) + + err = ds.CleanupExpiredHosts(context.Background()) + require.NoError(t, err) + + // host expiration is still disabled + hosts = listHostsCheckCount(t, ds, filter, fleet.HostListOptions{}, 10) + require.Len(t, hosts, 10) + + // once enabled, it works + ac.HostExpirySettings.HostExpiryEnabled = true + err = ds.SaveAppConfig(context.Background(), ac) + require.NoError(t, err) + + err = ds.CleanupExpiredHosts(context.Background()) + require.NoError(t, err) + + hosts = listHostsCheckCount(t, ds, filter, fleet.HostListOptions{}, 5) + require.Len(t, hosts, 5) + + // And it doesn't remove more than it should + err = ds.CleanupExpiredHosts(context.Background()) + require.NoError(t, err) + + hosts = listHostsCheckCount(t, ds, filter, fleet.HostListOptions{}, 5) + require.Len(t, hosts, 5) +} diff --git a/server/fleet/datastore.go b/server/fleet/datastore.go index 6df9ba8d3a..00fdd9d597 100644 --- a/server/fleet/datastore.go +++ b/server/fleet/datastore.go @@ -295,6 +295,7 @@ type Datastore interface { ScheduledQuery(ctx context.Context, id uint) (*ScheduledQuery, error) CleanupOrphanScheduledQueryStats(ctx context.Context) error CleanupOrphanLabelMembership(ctx context.Context) error + CleanupExpiredHosts(ctx context.Context) error /////////////////////////////////////////////////////////////////////////////// // TeamStore diff --git a/server/mock/datastore_mock.go b/server/mock/datastore_mock.go index f9b4f01c02..e9732210f3 100644 --- a/server/mock/datastore_mock.go +++ b/server/mock/datastore_mock.go @@ -231,6 +231,8 @@ type CleanupOrphanScheduledQueryStatsFunc func(ctx context.Context) error type CleanupOrphanLabelMembershipFunc func(ctx context.Context) error +type CleanupExpiredHostsFunc func(ctx context.Context) error + type NewTeamFunc func(ctx context.Context, team *fleet.Team) (*fleet.Team, error) type SaveTeamFunc func(ctx context.Context, team *fleet.Team) (*fleet.Team, error) @@ -269,7 +271,7 @@ type ShouldSendStatisticsFunc func(ctx context.Context, frequency time.Duration) type RecordStatisticsSentFunc func(ctx context.Context) error -type NewGlobalPolicyFunc func(ctx context.Context, queryID uint) (*fleet.Policy, error) +type NewGlobalPolicyFunc func(ctx context.Context, queryID uint, resolution string) (*fleet.Policy, error) type PolicyFunc func(ctx context.Context, id uint) (*fleet.Policy, error) @@ -291,7 +293,7 @@ type MigrationStatusFunc func(ctx context.Context) (fleet.MigrationStatus, error type ListSoftwareFunc func(ctx context.Context, teamId *uint, opt fleet.ListOptions) ([]fleet.Software, error) -type NewTeamPolicyFunc func(ctx context.Context, teamID uint, queryID uint) (*fleet.Policy, error) +type NewTeamPolicyFunc func(ctx context.Context, teamID uint, queryID uint, resolution string) (*fleet.Policy, error) type ListTeamPoliciesFunc func(ctx context.Context, teamID uint) ([]*fleet.Policy, error) @@ -634,6 +636,9 @@ type DataStore struct { CleanupOrphanLabelMembershipFunc CleanupOrphanLabelMembershipFunc CleanupOrphanLabelMembershipFuncInvoked bool + CleanupExpiredHostsFunc CleanupExpiredHostsFunc + CleanupExpiredHostsFuncInvoked bool + NewTeamFunc NewTeamFunc NewTeamFuncInvoked bool @@ -1293,6 +1298,11 @@ func (s *DataStore) CleanupOrphanLabelMembership(ctx context.Context) error { return s.CleanupOrphanLabelMembershipFunc(ctx) } +func (s *DataStore) CleanupExpiredHosts(ctx context.Context) error { + s.CleanupExpiredHostsFuncInvoked = true + return s.CleanupExpiredHostsFunc(ctx) +} + func (s *DataStore) NewTeam(ctx context.Context, team *fleet.Team) (*fleet.Team, error) { s.NewTeamFuncInvoked = true return s.NewTeamFunc(ctx, team) @@ -1390,7 +1400,7 @@ func (s *DataStore) RecordStatisticsSent(ctx context.Context) error { func (s *DataStore) NewGlobalPolicy(ctx context.Context, queryID uint, resolution string) (*fleet.Policy, error) { s.NewGlobalPolicyFuncInvoked = true - return s.NewGlobalPolicyFunc(ctx, queryID) + return s.NewGlobalPolicyFunc(ctx, queryID, resolution) } func (s *DataStore) Policy(ctx context.Context, id uint) (*fleet.Policy, error) { @@ -1445,7 +1455,7 @@ func (s *DataStore) ListSoftware(ctx context.Context, teamId *uint, opt fleet.Li func (s *DataStore) NewTeamPolicy(ctx context.Context, teamID uint, queryID uint, resolution string) (*fleet.Policy, error) { s.NewTeamPolicyFuncInvoked = true - return s.NewTeamPolicyFunc(ctx, teamID, queryID) + return s.NewTeamPolicyFunc(ctx, teamID, queryID, resolution) } func (s *DataStore) ListTeamPolicies(ctx context.Context, teamID uint) ([]*fleet.Policy, error) { diff --git a/server/service/global_policies_test.go b/server/service/global_policies_test.go index 135a1f8312..a5523507fc 100644 --- a/server/service/global_policies_test.go +++ b/server/service/global_policies_test.go @@ -14,7 +14,7 @@ func TestGlobalPoliciesAuth(t *testing.T) { ds := new(mock.Store) svc := newTestService(ds, nil, nil) - ds.NewGlobalPolicyFunc = func(ctx context.Context, queryID uint) (*fleet.Policy, error) { + ds.NewGlobalPolicyFunc = func(ctx context.Context, queryID uint, resolution string) (*fleet.Policy, error) { return nil, nil } ds.ListGlobalPoliciesFunc = func(ctx context.Context) ([]*fleet.Policy, error) { diff --git a/server/service/team_policies_test.go b/server/service/team_policies_test.go index 26714eb0fb..e7555b6d59 100644 --- a/server/service/team_policies_test.go +++ b/server/service/team_policies_test.go @@ -16,7 +16,7 @@ func TestTeamPoliciesAuth(t *testing.T) { ds := new(mock.Store) svc := newTestService(ds, nil, nil) - ds.NewTeamPolicyFunc = func(ctx context.Context, teamID uint, queryID uint) (*fleet.Policy, error) { + ds.NewTeamPolicyFunc = func(ctx context.Context, teamID uint, queryID uint, resolution string) (*fleet.Policy, error) { return &fleet.Policy{}, nil } ds.ListTeamPoliciesFunc = func(ctx context.Context, teamID uint) ([]*fleet.Policy, error) {