fleet/server/datastore/mysql/campaigns_test.go
Victor Lyuboslavsky b3216a1727
Add CleanupCompletedCampaignTargets to cleanup old campaign targets. (#32385)
Fixes #31432 

- Added campaign target cleanup: Deletes targets from campaigns
completed >24h ago. Uses 10% or 50k min per run, processes in 10k
batches. Added DB index, integrated into hourly cron, includes tests.

# Checklist for submitter

If some of the following don't apply, delete the relevant line.

- [x] Changes file added for user-visible changes in `changes/`,
`orbit/changes/` or `ee/fleetd-chrome/changes`.
See [Changes
files](https://github.com/fleetdm/fleet/blob/main/docs/Contributing/guides/committing-changes.md#changes-files)
for more information.

## Testing

- [x] Added/updated automated tests
- [x] Where appropriate, [automated tests simulate multiple hosts and
test for host
isolation](https://github.com/fleetdm/fleet/blob/main/docs/Contributing/reference/patterns-backend.md#unit-testing)
(updates to one hosts's records do not affect another)

- [x] QA'd all new/changed functionality manually


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- New Features
- Automatic cleanup of live query campaign targets 24 hours after
campaign completion to reduce clutter and storage usage.

- Chores
- Added a database index to speed up live query target operations for
improved performance at scale.
- Enhanced scheduled maintenance to log cleanup counts and execution
time for better observability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-08-28 11:04:05 -05:00

380 lines
14 KiB
Go

package mysql
import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
"time"
"github.com/WatchBeam/clock"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCampaigns(t *testing.T) {
ds := CreateMySQLDS(t)
cases := []struct {
name string
fn func(t *testing.T, ds *Datastore)
}{
{"DistributedQuery", testCampaignsDistributedQuery},
{"CleanupDistributedQuery", testCampaignsCleanupDistributedQuery},
{"SaveDistributedQuery", testCampaignsSaveDistributedQuery},
{"CompletedCampaigns", testCompletedCampaigns},
{"CleanupCompletedCampaignTargets", testCleanupCompletedCampaignTargets},
{"CleanupCompletedCampaignTargetsLargeBatch", testCleanupCompletedCampaignTargetsLargeBatch},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
defer TruncateTables(t, ds)
c.fn(t, ds)
})
}
}
func testCampaignsDistributedQuery(t *testing.T, ds *Datastore) {
user := test.NewUser(t, ds, "Zach", "zwass@fleet.co", true)
mockClock := clock.NewMockClock()
query := test.NewQuery(t, ds, nil, "test", "select * from time", user.ID, false)
campaign := test.NewCampaign(t, ds, query.ID, fleet.QueryRunning, mockClock.Now())
{
retrieved, err := ds.DistributedQueryCampaign(context.Background(), campaign.ID)
require.Nil(t, err)
assert.Equal(t, campaign.QueryID, retrieved.QueryID)
assert.Equal(t, campaign.Status, retrieved.Status)
}
h1 := test.NewHost(t, ds, "foo.local", "192.168.1.10", "1", "1", mockClock.Now())
h2 := test.NewHost(t, ds, "bar.local", "192.168.1.11", "2", "2", mockClock.Now().Add(-1*time.Hour))
h3 := test.NewHost(t, ds, "baz.local", "192.168.1.12", "3", "3", mockClock.Now().Add(-13*time.Minute))
l1 := fleet.LabelSpec{
ID: 1,
Name: "label foo",
Query: "query foo",
}
l2 := fleet.LabelSpec{
ID: 2,
Name: "label bar",
Query: "query bar",
}
err := ds.ApplyLabelSpecs(context.Background(), []*fleet.LabelSpec{&l1, &l2})
require.Nil(t, err)
checkTargets(t, ds, campaign.ID, fleet.HostTargets{})
test.AddHostToCampaign(t, ds, campaign.ID, h1.ID)
checkTargets(t, ds, campaign.ID, fleet.HostTargets{HostIDs: []uint{h1.ID}})
test.AddLabelToCampaign(t, ds, campaign.ID, l1.ID)
checkTargets(t, ds, campaign.ID, fleet.HostTargets{HostIDs: []uint{h1.ID}, LabelIDs: []uint{l1.ID}})
test.AddLabelToCampaign(t, ds, campaign.ID, l2.ID)
checkTargets(t, ds, campaign.ID, fleet.HostTargets{HostIDs: []uint{h1.ID}, LabelIDs: []uint{l1.ID, l2.ID}})
test.AddHostToCampaign(t, ds, campaign.ID, h2.ID)
test.AddHostToCampaign(t, ds, campaign.ID, h3.ID)
checkTargets(t, ds, campaign.ID, fleet.HostTargets{HostIDs: []uint{h1.ID, h2.ID, h3.ID}, LabelIDs: []uint{l1.ID, l2.ID}})
}
func testCampaignsCleanupDistributedQuery(t *testing.T, ds *Datastore) {
ctx := context.Background()
user := test.NewUser(t, ds, "Zach", "zwass@fleet.co", true)
mockClock := clock.NewMockClock()
query := test.NewQuery(t, ds, nil, "test", "select * from time", user.ID, false)
c1 := test.NewCampaign(t, ds, query.ID, fleet.QueryWaiting, mockClock.Now())
c2 := test.NewCampaign(t, ds, query.ID, fleet.QueryRunning, mockClock.Now())
// Cleanup and verify that nothing changed (because time has not
// advanced)
expired, err := ds.CleanupDistributedQueryCampaigns(ctx, mockClock.Now())
require.Nil(t, err)
assert.Equal(t, uint(0), expired)
{
retrieved, err := ds.DistributedQueryCampaign(ctx, c1.ID)
require.Nil(t, err)
assert.Equal(t, c1.QueryID, retrieved.QueryID)
assert.Equal(t, c1.Status, retrieved.Status)
}
{
retrieved, err := ds.DistributedQueryCampaign(ctx, c2.ID)
require.Nil(t, err)
assert.Equal(t, c2.QueryID, retrieved.QueryID)
assert.Equal(t, c2.Status, retrieved.Status)
}
// Add some executions
mockClock.AddTime(1*time.Minute + 1*time.Second)
// Cleanup and verify that the campaign was expired and executions
// deleted appropriately
expired, err = ds.CleanupDistributedQueryCampaigns(ctx, mockClock.Now())
require.Nil(t, err)
assert.Equal(t, uint(1), expired)
{
// c1 should now be complete
retrieved, err := ds.DistributedQueryCampaign(ctx, c1.ID)
require.Nil(t, err)
assert.Equal(t, c1.QueryID, retrieved.QueryID)
assert.Equal(t, fleet.QueryComplete, retrieved.Status)
}
{
retrieved, err := ds.DistributedQueryCampaign(ctx, c2.ID)
require.Nil(t, err)
assert.Equal(t, c2.QueryID, retrieved.QueryID)
assert.Equal(t, c2.Status, retrieved.Status)
}
mockClock.AddTime(24*time.Hour + 1*time.Second)
// Cleanup and verify that the campaign was expired and executions
// deleted appropriately
expired, err = ds.CleanupDistributedQueryCampaigns(ctx, mockClock.Now())
require.Nil(t, err)
assert.Equal(t, uint(1), expired)
{
retrieved, err := ds.DistributedQueryCampaign(ctx, c1.ID)
require.Nil(t, err)
assert.Equal(t, c1.QueryID, retrieved.QueryID)
assert.Equal(t, fleet.QueryComplete, retrieved.Status)
}
{
// c2 should now be complete
retrieved, err := ds.DistributedQueryCampaign(ctx, c2.ID)
require.Nil(t, err)
assert.Equal(t, c2.QueryID, retrieved.QueryID)
assert.Equal(t, fleet.QueryComplete, retrieved.Status)
}
// simulate another old campaign created > 7 days ago
c3 := test.NewCampaign(t, ds, query.ID, fleet.QueryWaiting, mockClock.Now().AddDate(0, 0, -8))
{
retrieved, err := ds.DistributedQueryCampaign(ctx, c3.ID)
require.Nil(t, err)
assert.Equal(t, c3.QueryID, retrieved.QueryID)
assert.Equal(t, fleet.QueryWaiting, retrieved.Status)
}
// cleanup will mark c3 as completed because it was waiting for > 1 minute,
// but it won't return it as recently inactive because it's too old a query.
expired, err = ds.CleanupDistributedQueryCampaigns(ctx, mockClock.Now())
require.Nil(t, err)
assert.Equal(t, uint(1), expired)
// cleanup again does not expire any new campaign and still returns the same
// recently inactive campaigns
expired, err = ds.CleanupDistributedQueryCampaigns(ctx, mockClock.Now())
require.Nil(t, err)
assert.Equal(t, uint(0), expired)
// move time forward 7 days and cleanup again, this time it returns no recent
// inactive campaigns
mockClock.AddTime(7*24*time.Hour + 1*time.Second)
expired, err = ds.CleanupDistributedQueryCampaigns(ctx, mockClock.Now())
require.Nil(t, err)
assert.Equal(t, uint(0), expired)
}
func testCampaignsSaveDistributedQuery(t *testing.T, ds *Datastore) {
user := test.NewUser(t, ds, t.Name(), t.Name()+"zwass@fleet.co", true)
mockClock := clock.NewMockClock()
query := test.NewQuery(t, ds, nil, t.Name()+"test", "select * from time", user.ID, false)
c1 := test.NewCampaign(t, ds, query.ID, fleet.QueryWaiting, mockClock.Now())
gotC, err := ds.DistributedQueryCampaign(context.Background(), c1.ID)
require.NoError(t, err)
require.Equal(t, fleet.QueryWaiting, gotC.Status)
c1.Status = fleet.QueryComplete
require.NoError(t, ds.SaveDistributedQueryCampaign(context.Background(), c1))
gotC, err = ds.DistributedQueryCampaign(context.Background(), c1.ID)
require.NoError(t, err)
require.Equal(t, fleet.QueryComplete, gotC.Status)
}
func testCleanupCompletedCampaignTargets(t *testing.T, ds *Datastore) {
ctx := t.Context()
mockClock := clock.NewMockClock() // not actually necessary here since we're not testing time progression, but using for consistency
user := test.NewUser(t, ds, "Test User", "test@example.com", true)
query := test.NewQuery(t, ds, nil, "Test Query", "SELECT 1", user.ID, false)
host1 := test.NewHost(t, ds, "host1", "192.168.1.1", "host1_key", "host1_uuid", mockClock.Now())
host2 := test.NewHost(t, ds, "host2", "192.168.1.2", "host2_key", "host2_uuid", mockClock.Now())
// Create an old completed campaign (should be cleaned up)
oldCampaign := test.NewCampaign(t, ds, query.ID, fleet.QueryComplete, mockClock.Now().Add(-48*time.Hour))
// Manually update the updated_at timestamp for testing
_, err := ds.writer(ctx).ExecContext(ctx,
"UPDATE distributed_query_campaigns SET updated_at = ? WHERE id = ?",
mockClock.Now().Add(-48*time.Hour), oldCampaign.ID)
require.NoError(t, err)
// Add targets to the old campaign
test.AddHostToCampaign(t, ds, oldCampaign.ID, host1.ID)
test.AddHostToCampaign(t, ds, oldCampaign.ID, host2.ID)
// Create a recent completed campaign (should NOT be cleaned up)
recentCampaign := test.NewCampaign(t, ds, query.ID, fleet.QueryComplete, mockClock.Now().Add(-1*time.Hour))
// Add targets to the recent campaign
test.AddHostToCampaign(t, ds, recentCampaign.ID, host1.ID)
// Create a running campaign (should NOT be cleaned up regardless of age)
runningCampaign := test.NewCampaign(t, ds, query.ID, fleet.QueryRunning, mockClock.Now().Add(-48*time.Hour))
// Add targets to the running campaign
test.AddHostToCampaign(t, ds, runningCampaign.ID, host1.ID)
// Verify initial state - all targets exist
oldTargets, err := ds.DistributedQueryCampaignTargetIDs(ctx, oldCampaign.ID)
require.NoError(t, err)
assert.Len(t, oldTargets.HostIDs, 2)
recentTargets, err := ds.DistributedQueryCampaignTargetIDs(ctx, recentCampaign.ID)
require.NoError(t, err)
assert.Len(t, recentTargets.HostIDs, 1)
runningTargets, err := ds.DistributedQueryCampaignTargetIDs(ctx, runningCampaign.ID)
require.NoError(t, err)
assert.Len(t, runningTargets.HostIDs, 1)
// Run cleanup for campaigns older than 24 hours
deleted, err := ds.CleanupCompletedCampaignTargets(ctx, mockClock.Now().Add(-24*time.Hour))
require.NoError(t, err)
assert.Equal(t, uint(2), deleted) // Should delete 2 targets from old campaign
// Verify old campaign targets are deleted
oldTargets, err = ds.DistributedQueryCampaignTargetIDs(ctx, oldCampaign.ID)
require.NoError(t, err)
assert.Len(t, oldTargets.HostIDs, 0)
// Verify recent campaign targets are NOT deleted
recentTargets, err = ds.DistributedQueryCampaignTargetIDs(ctx, recentCampaign.ID)
require.NoError(t, err)
assert.Len(t, recentTargets.HostIDs, 1)
// Verify running campaign targets are NOT deleted
runningTargets, err = ds.DistributedQueryCampaignTargetIDs(ctx, runningCampaign.ID)
require.NoError(t, err)
assert.Len(t, runningTargets.HostIDs, 1)
// Run cleanup again - should delete nothing
deleted, err = ds.CleanupCompletedCampaignTargets(ctx, mockClock.Now().Add(-24*time.Hour))
require.NoError(t, err)
assert.Equal(t, uint(0), deleted)
}
func testCleanupCompletedCampaignTargetsLargeBatch(t *testing.T, ds *Datastore) {
t.Skip("Skipping large batch test due to long test runtime (~1 minute). Only run manually.")
ctx := t.Context()
mockClock := clock.NewMockClock()
user := test.NewUser(t, ds, "Test User", "test@example.com", true)
query := test.NewQuery(t, ds, nil, "Test Query", "SELECT 1", user.ID, false)
// Create an old completed campaign
oldCampaign := test.NewCampaign(t, ds, query.ID, fleet.QueryComplete, mockClock.Now().Add(-48*time.Hour))
// Manually update the updated_at timestamp for testing
_, err := ds.writer(ctx).ExecContext(ctx,
"UPDATE distributed_query_campaigns SET updated_at = ? WHERE id = ?",
mockClock.Now().Add(-48*time.Hour), oldCampaign.ID)
require.NoError(t, err)
// Create many hosts and targets (more than batch size)
const numTargets = 10002
for i := 0; i < numTargets; i++ {
host := test.NewHost(t, ds,
fmt.Sprintf("host%d", i),
fmt.Sprintf("192.168.1.%d", i+10),
fmt.Sprintf("host%d_key", i),
fmt.Sprintf("host%d_uuid", i),
mockClock.Now())
test.AddHostToCampaign(t, ds, oldCampaign.ID, host.ID)
}
// Verify initial state
targets, err := ds.DistributedQueryCampaignTargetIDs(ctx, oldCampaign.ID)
require.NoError(t, err)
assert.Len(t, targets.HostIDs, numTargets)
// Run cleanup
deleted, err := ds.CleanupCompletedCampaignTargets(ctx, mockClock.Now().Add(-24*time.Hour))
require.NoError(t, err)
assert.Equal(t, uint(numTargets), deleted)
// Verify all targets are deleted
targets, err = ds.DistributedQueryCampaignTargetIDs(ctx, oldCampaign.ID)
require.NoError(t, err)
assert.Len(t, targets.HostIDs, 0)
}
func checkTargets(t *testing.T, ds fleet.Datastore, campaignID uint, expectedTargets fleet.HostTargets) {
targets, err := ds.DistributedQueryCampaignTargetIDs(context.Background(), campaignID)
require.Nil(t, err)
assert.ElementsMatch(t, expectedTargets.HostIDs, targets.HostIDs)
assert.ElementsMatch(t, expectedTargets.LabelIDs, targets.LabelIDs)
assert.ElementsMatch(t, expectedTargets.TeamIDs, targets.TeamIDs)
}
func testCompletedCampaigns(t *testing.T, ds *Datastore) {
// Test nil result
result, err := ds.GetCompletedCampaigns(context.Background(), nil)
assert.NoError(t, err)
assert.Len(t, result, 0)
result, err = ds.GetCompletedCampaigns(context.Background(), []uint{234, 1, 1, 34455455453})
assert.NoError(t, err)
assert.Len(t, result, 0)
// Now test reasonable results
user := test.NewUser(t, ds, t.Name(), t.Name()+"zwass@fleet.co", true)
mockClock := clock.NewMockClock()
query := test.NewQuery(t, ds, nil, t.Name()+"test", "select * from time", user.ID, false)
numCampaigns := 5
totalFilterSize := 100000
filter := make([]uint, 0, totalFilterSize)
complete := make([]uint, 0, numCampaigns)
for i := 0; i < numCampaigns; i++ {
c1 := test.NewCampaign(t, ds, query.ID, fleet.QueryWaiting, mockClock.Now())
gotC, err := ds.DistributedQueryCampaign(context.Background(), c1.ID)
require.NoError(t, err)
require.Equal(t, fleet.QueryWaiting, gotC.Status)
if rand.Intn(10) < 7 { //nolint:gosec
c1.Status = fleet.QueryComplete
require.NoError(t, ds.SaveDistributedQueryCampaign(context.Background(), c1))
complete = append(complete, c1.ID)
}
filter = append(filter, c1.ID)
}
for j := filter[len(filter)-1] / 2; j < uint(totalFilterSize); j++ { //nolint:gosec // dismiss G115
// some IDs are duplicated
filter = append(filter, j)
}
rand.Shuffle(len(filter), func(i, j int) { filter[i], filter[j] = filter[j], filter[i] })
result, err = ds.GetCompletedCampaigns(context.Background(), filter)
assert.NoError(t, err)
sort.Slice(result, func(i, j int) bool { return result[i] < result[j] })
assert.Equal(t, complete, result)
}