mirror of
https://github.com/fleetdm/fleet
synced 2026-05-23 08:58:41 +00:00
Reimplement host expiration to not need mysql events (#2552)
* Reimplement host expiration to not need mysql events * Update mocks
This commit is contained in:
parent
a7420140ce
commit
fe5660e006
9 changed files with 103 additions and 76 deletions
1
changes/issue-2544-reimplement-host-expiration
Normal file
1
changes/issue-2544-reimplement-host-expiration
Normal file
|
|
@ -0,0 +1 @@
|
|||
* Reimplement host expiration to not depend on mysql events.
|
||||
|
|
@ -498,6 +498,10 @@ func cronCleanups(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger,
|
|||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning label_membership", "details", err)
|
||||
}
|
||||
err = ds.CleanupExpiredHosts(ctx)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", "cleaning expired hosts", "details", err)
|
||||
}
|
||||
|
||||
err = trySendStatistics(ctx, ds, fleet.StatisticsFrequency, "https://fleetdm.com/api/v1/webhooks/receive-usage-analytics")
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -4,11 +4,8 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/VividCortex/mysqlerr"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
|
@ -24,9 +21,13 @@ func (d *Datastore) NewAppConfig(ctx context.Context, info *fleet.AppConfig) (*f
|
|||
}
|
||||
|
||||
func (d *Datastore) AppConfig(ctx context.Context) (*fleet.AppConfig, error) {
|
||||
return appConfigDB(ctx, d.reader)
|
||||
}
|
||||
|
||||
func appConfigDB(ctx context.Context, q sqlx.QueryerContext) (*fleet.AppConfig, error) {
|
||||
info := &fleet.AppConfig{}
|
||||
var bytes []byte
|
||||
err := sqlx.GetContext(ctx, d.reader, &bytes, `SELECT json_value FROM app_config_json LIMIT 1`)
|
||||
err := sqlx.GetContext(ctx, q, &bytes, `SELECT json_value FROM app_config_json LIMIT 1`)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return nil, errors.Wrap(err, "selecting app config")
|
||||
}
|
||||
|
|
@ -43,79 +44,13 @@ func (d *Datastore) AppConfig(ctx context.Context) (*fleet.AppConfig, error) {
|
|||
return info, nil
|
||||
}
|
||||
|
||||
func (d *Datastore) isEventSchedulerEnabled() (bool, error) {
|
||||
rows, err := d.writer.Query("SELECT @@event_scheduler")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
err := errors.New("Error detecting MySQL event scheduler status.")
|
||||
if rerr := rows.Err(); rerr != nil {
|
||||
err = rerr
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
var value string
|
||||
if err := rows.Scan(&value); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return value == "ON", nil
|
||||
}
|
||||
|
||||
func manageHostExpiryEventDB(ctx context.Context, tx sqlx.ExtContext, hostExpiryEnabled bool, hostExpiryWindow int) error {
|
||||
var err error
|
||||
hostExpiryConfig := struct {
|
||||
Window int `db:"host_expiry_window"`
|
||||
}{}
|
||||
if err = sqlx.GetContext(ctx, tx, &hostExpiryConfig, "SELECT host_expiry_window from app_configs LIMIT 1"); err != nil {
|
||||
return errors.Wrap(err, "get expiry window setting")
|
||||
}
|
||||
|
||||
shouldUpdateWindow := hostExpiryEnabled && hostExpiryConfig.Window != hostExpiryWindow
|
||||
|
||||
if !hostExpiryEnabled || shouldUpdateWindow {
|
||||
if _, err := tx.ExecContext(ctx, "DROP EVENT IF EXISTS host_expiry"); err != nil {
|
||||
if driverErr, ok := err.(*mysql.MySQLError); !ok || driverErr.Number != mysqlerr.ER_DBACCESS_DENIED_ERROR {
|
||||
return errors.Wrap(err, "drop existing host_expiry event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if shouldUpdateWindow {
|
||||
sql := fmt.Sprintf("CREATE EVENT IF NOT EXISTS host_expiry ON SCHEDULE EVERY 1 HOUR ON COMPLETION PRESERVE DO DELETE FROM hosts WHERE seen_time < DATE_SUB(NOW(), INTERVAL %d DAY)", hostExpiryWindow)
|
||||
if _, err := tx.ExecContext(ctx, sql); err != nil {
|
||||
return errors.Wrap(err, "create new host_expiry event")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Datastore) SaveAppConfig(ctx context.Context, info *fleet.AppConfig) error {
|
||||
eventSchedulerEnabled, err := d.isEventSchedulerEnabled()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
expiryEnabled := info.HostExpirySettings.HostExpiryEnabled
|
||||
expiryWindow := info.HostExpirySettings.HostExpiryWindow
|
||||
|
||||
if !eventSchedulerEnabled && expiryEnabled {
|
||||
return errors.New("MySQL event scheduler must be enabled to configure host expiry.")
|
||||
}
|
||||
|
||||
configBytes, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "marshaling config")
|
||||
}
|
||||
|
||||
return d.withTx(ctx, func(tx sqlx.ExtContext) error {
|
||||
if err := manageHostExpiryEventDB(ctx, tx, expiryEnabled, expiryWindow); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := tx.ExecContext(ctx,
|
||||
`INSERT INTO app_config_json(json_value) VALUES(?) ON DUPLICATE KEY UPDATE json_value = VALUES(json_value)`,
|
||||
configBytes,
|
||||
|
|
|
|||
|
|
@ -951,3 +951,18 @@ func (d *Datastore) ListPoliciesForHost(ctx context.Context, hid uint) (packs []
|
|||
}
|
||||
return policies, nil
|
||||
}
|
||||
|
||||
func (d *Datastore) CleanupExpiredHosts(ctx context.Context) error {
|
||||
ac, err := appConfigDB(ctx, d.reader)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting app config")
|
||||
}
|
||||
if !ac.HostExpirySettings.HostExpiryEnabled {
|
||||
return nil
|
||||
}
|
||||
_, err = d.writer.ExecContext(ctx, `DELETE FROM hosts WHERE seen_time < DATE_SUB(NOW(), INTERVAL ? DAY)`, ac.HostExpirySettings.HostExpiryWindow)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "deleting expired hosts")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,6 +84,7 @@ func TestHosts(t *testing.T) {
|
|||
{"AuthenticateHostLoadsDisk", testAuthenticateHostLoadsDisk},
|
||||
{"HostsListBySoftware", testHostsListBySoftware},
|
||||
{"HostsListFailingPolicies", testHostsListFailingPolicies},
|
||||
{"HostsExpiration", testHostsExpiration},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
|
|
@ -1972,3 +1973,63 @@ func testHostsSavePackStatsConcurrent(t *testing.T, ds *Datastore) {
|
|||
require.Fail(t, "timed out")
|
||||
}
|
||||
}
|
||||
|
||||
func testHostsExpiration(t *testing.T, ds *Datastore) {
|
||||
hostExpiryWindow := 70
|
||||
|
||||
ac, err := ds.AppConfig(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
ac.HostExpirySettings.HostExpiryWindow = hostExpiryWindow
|
||||
|
||||
err = ds.SaveAppConfig(context.Background(), ac)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
seenTime := time.Now()
|
||||
if i >= 5 {
|
||||
seenTime = seenTime.Add(time.Duration(-1*(hostExpiryWindow+1)*24) * time.Hour)
|
||||
}
|
||||
_, err := ds.NewHost(context.Background(), &fleet.Host{
|
||||
DetailUpdatedAt: time.Now(),
|
||||
LabelUpdatedAt: time.Now(),
|
||||
PolicyUpdatedAt: time.Now(),
|
||||
SeenTime: seenTime,
|
||||
OsqueryHostID: strconv.Itoa(i),
|
||||
NodeKey: fmt.Sprintf("%d", i),
|
||||
UUID: fmt.Sprintf("%d", i),
|
||||
Hostname: fmt.Sprintf("foo.local%d", i),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
filter := fleet.TeamFilter{User: test.UserAdmin}
|
||||
|
||||
hosts := listHostsCheckCount(t, ds, filter, fleet.HostListOptions{}, 10)
|
||||
require.Len(t, hosts, 10)
|
||||
|
||||
err = ds.CleanupExpiredHosts(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
// host expiration is still disabled
|
||||
hosts = listHostsCheckCount(t, ds, filter, fleet.HostListOptions{}, 10)
|
||||
require.Len(t, hosts, 10)
|
||||
|
||||
// once enabled, it works
|
||||
ac.HostExpirySettings.HostExpiryEnabled = true
|
||||
err = ds.SaveAppConfig(context.Background(), ac)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ds.CleanupExpiredHosts(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
hosts = listHostsCheckCount(t, ds, filter, fleet.HostListOptions{}, 5)
|
||||
require.Len(t, hosts, 5)
|
||||
|
||||
// And it doesn't remove more than it should
|
||||
err = ds.CleanupExpiredHosts(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
hosts = listHostsCheckCount(t, ds, filter, fleet.HostListOptions{}, 5)
|
||||
require.Len(t, hosts, 5)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -295,6 +295,7 @@ type Datastore interface {
|
|||
ScheduledQuery(ctx context.Context, id uint) (*ScheduledQuery, error)
|
||||
CleanupOrphanScheduledQueryStats(ctx context.Context) error
|
||||
CleanupOrphanLabelMembership(ctx context.Context) error
|
||||
CleanupExpiredHosts(ctx context.Context) error
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// TeamStore
|
||||
|
|
|
|||
|
|
@ -231,6 +231,8 @@ type CleanupOrphanScheduledQueryStatsFunc func(ctx context.Context) error
|
|||
|
||||
type CleanupOrphanLabelMembershipFunc func(ctx context.Context) error
|
||||
|
||||
type CleanupExpiredHostsFunc func(ctx context.Context) error
|
||||
|
||||
type NewTeamFunc func(ctx context.Context, team *fleet.Team) (*fleet.Team, error)
|
||||
|
||||
type SaveTeamFunc func(ctx context.Context, team *fleet.Team) (*fleet.Team, error)
|
||||
|
|
@ -269,7 +271,7 @@ type ShouldSendStatisticsFunc func(ctx context.Context, frequency time.Duration)
|
|||
|
||||
type RecordStatisticsSentFunc func(ctx context.Context) error
|
||||
|
||||
type NewGlobalPolicyFunc func(ctx context.Context, queryID uint) (*fleet.Policy, error)
|
||||
type NewGlobalPolicyFunc func(ctx context.Context, queryID uint, resolution string) (*fleet.Policy, error)
|
||||
|
||||
type PolicyFunc func(ctx context.Context, id uint) (*fleet.Policy, error)
|
||||
|
||||
|
|
@ -291,7 +293,7 @@ type MigrationStatusFunc func(ctx context.Context) (fleet.MigrationStatus, error
|
|||
|
||||
type ListSoftwareFunc func(ctx context.Context, teamId *uint, opt fleet.ListOptions) ([]fleet.Software, error)
|
||||
|
||||
type NewTeamPolicyFunc func(ctx context.Context, teamID uint, queryID uint) (*fleet.Policy, error)
|
||||
type NewTeamPolicyFunc func(ctx context.Context, teamID uint, queryID uint, resolution string) (*fleet.Policy, error)
|
||||
|
||||
type ListTeamPoliciesFunc func(ctx context.Context, teamID uint) ([]*fleet.Policy, error)
|
||||
|
||||
|
|
@ -634,6 +636,9 @@ type DataStore struct {
|
|||
CleanupOrphanLabelMembershipFunc CleanupOrphanLabelMembershipFunc
|
||||
CleanupOrphanLabelMembershipFuncInvoked bool
|
||||
|
||||
CleanupExpiredHostsFunc CleanupExpiredHostsFunc
|
||||
CleanupExpiredHostsFuncInvoked bool
|
||||
|
||||
NewTeamFunc NewTeamFunc
|
||||
NewTeamFuncInvoked bool
|
||||
|
||||
|
|
@ -1293,6 +1298,11 @@ func (s *DataStore) CleanupOrphanLabelMembership(ctx context.Context) error {
|
|||
return s.CleanupOrphanLabelMembershipFunc(ctx)
|
||||
}
|
||||
|
||||
func (s *DataStore) CleanupExpiredHosts(ctx context.Context) error {
|
||||
s.CleanupExpiredHostsFuncInvoked = true
|
||||
return s.CleanupExpiredHostsFunc(ctx)
|
||||
}
|
||||
|
||||
func (s *DataStore) NewTeam(ctx context.Context, team *fleet.Team) (*fleet.Team, error) {
|
||||
s.NewTeamFuncInvoked = true
|
||||
return s.NewTeamFunc(ctx, team)
|
||||
|
|
@ -1390,7 +1400,7 @@ func (s *DataStore) RecordStatisticsSent(ctx context.Context) error {
|
|||
|
||||
func (s *DataStore) NewGlobalPolicy(ctx context.Context, queryID uint, resolution string) (*fleet.Policy, error) {
|
||||
s.NewGlobalPolicyFuncInvoked = true
|
||||
return s.NewGlobalPolicyFunc(ctx, queryID)
|
||||
return s.NewGlobalPolicyFunc(ctx, queryID, resolution)
|
||||
}
|
||||
|
||||
func (s *DataStore) Policy(ctx context.Context, id uint) (*fleet.Policy, error) {
|
||||
|
|
@ -1445,7 +1455,7 @@ func (s *DataStore) ListSoftware(ctx context.Context, teamId *uint, opt fleet.Li
|
|||
|
||||
func (s *DataStore) NewTeamPolicy(ctx context.Context, teamID uint, queryID uint, resolution string) (*fleet.Policy, error) {
|
||||
s.NewTeamPolicyFuncInvoked = true
|
||||
return s.NewTeamPolicyFunc(ctx, teamID, queryID)
|
||||
return s.NewTeamPolicyFunc(ctx, teamID, queryID, resolution)
|
||||
}
|
||||
|
||||
func (s *DataStore) ListTeamPolicies(ctx context.Context, teamID uint) ([]*fleet.Policy, error) {
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ func TestGlobalPoliciesAuth(t *testing.T) {
|
|||
ds := new(mock.Store)
|
||||
svc := newTestService(ds, nil, nil)
|
||||
|
||||
ds.NewGlobalPolicyFunc = func(ctx context.Context, queryID uint) (*fleet.Policy, error) {
|
||||
ds.NewGlobalPolicyFunc = func(ctx context.Context, queryID uint, resolution string) (*fleet.Policy, error) {
|
||||
return nil, nil
|
||||
}
|
||||
ds.ListGlobalPoliciesFunc = func(ctx context.Context) ([]*fleet.Policy, error) {
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ func TestTeamPoliciesAuth(t *testing.T) {
|
|||
ds := new(mock.Store)
|
||||
svc := newTestService(ds, nil, nil)
|
||||
|
||||
ds.NewTeamPolicyFunc = func(ctx context.Context, teamID uint, queryID uint) (*fleet.Policy, error) {
|
||||
ds.NewTeamPolicyFunc = func(ctx context.Context, teamID uint, queryID uint, resolution string) (*fleet.Policy, error) {
|
||||
return &fleet.Policy{}, nil
|
||||
}
|
||||
ds.ListTeamPoliciesFunc = func(ctx context.Context, teamID uint) ([]*fleet.Policy, error) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue