From e8ca9598882deaaceefa6740e109425d35e96aa1 Mon Sep 17 00:00:00 2001 From: Lucas Manuel Rodriguez Date: Thu, 4 Apr 2024 14:58:31 -0300 Subject: [PATCH] Add enterprise integration test for calendar events (#17900) Integration tests for the calendar feature: #17441. Adding coverage screenshots for the calendar cron and the osquery distributed/write coverage: ![Screenshot 2024-03-27 at 14 20 44](https://github.com/fleetdm/fleet/assets/2073526/40d394ab-2208-4bec-981b-fe22fae8b5c1) ![Screenshot 2024-03-27 at 14 21 20](https://github.com/fleetdm/fleet/assets/2073526/1e4c8611-21ba-48a6-82f8-a163594f7f01) --- cmd/fleet/serve.go | 5 +- ee/server/calendar/google_calendar_mock.go | 11 + {cmd/fleet => server/cron}/calendar_cron.go | 15 +- .../cron}/calendar_cron_test.go | 104 +-- server/datastore/mysql/calendar_events.go | 2 +- server/datastore/mysql/policies.go | 1 - server/fleet/datastore.go | 5 + server/service/integration_enterprise_test.go | 676 +++++++++++++++++- server/service/schedule/schedule.go | 10 +- server/service/testing_client.go | 6 + 10 files changed, 763 insertions(+), 72 deletions(-) rename {cmd/fleet => server/cron}/calendar_cron.go (98%) rename {cmd/fleet => server/cron}/calendar_cron_test.go (88%) diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index d0ca94a81c..1fdd588a9e 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -28,6 +28,7 @@ import ( configpkg "github.com/fleetdm/fleet/v4/server/config" "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" licensectx "github.com/fleetdm/fleet/v4/server/contexts/license" + "github.com/fleetdm/fleet/v4/server/cron" "github.com/fleetdm/fleet/v4/server/datastore/cached_mysql" "github.com/fleetdm/fleet/v4/server/datastore/mysql" "github.com/fleetdm/fleet/v4/server/datastore/mysqlredis" @@ -773,9 +774,7 @@ the way that the Fleet server works. if license.IsPremium() { if err := cronSchedules.StartCronSchedule( func() (fleet.CronSchedule, error) { - return newCalendarSchedule( - ctx, instanceID, ds, logger, - ) + return cron.NewCalendarSchedule(ctx, instanceID, ds, 5*time.Minute, logger) }, ); err != nil { initFatal(err, "failed to register calendar schedule") diff --git a/ee/server/calendar/google_calendar_mock.go b/ee/server/calendar/google_calendar_mock.go index 08e3a72e20..1dd6f16bb4 100644 --- a/ee/server/calendar/google_calendar_mock.go +++ b/ee/server/calendar/google_calendar_mock.go @@ -93,3 +93,14 @@ func ClearMockEvents() { defer mu.Unlock() mockEvents = make(map[string]*calendar.Event) } + +func SetMockEventsToNow() { + mu.Lock() + defer mu.Unlock() + + now := time.Now() + for _, mockEvent := range mockEvents { + mockEvent.Start = &calendar.EventDateTime{DateTime: now.Format(time.RFC3339)} + mockEvent.End = &calendar.EventDateTime{DateTime: now.Add(30 * time.Minute).Format(time.RFC3339)} + } +} diff --git a/cmd/fleet/calendar_cron.go b/server/cron/calendar_cron.go similarity index 98% rename from cmd/fleet/calendar_cron.go rename to server/cron/calendar_cron.go index 54bd89f551..cad8ba8513 100644 --- a/cmd/fleet/calendar_cron.go +++ b/server/cron/calendar_cron.go @@ -1,4 +1,4 @@ -package main +package cron import ( "context" @@ -19,19 +19,19 @@ import ( const calendarConsumers = 18 -func newCalendarSchedule( +func NewCalendarSchedule( ctx context.Context, instanceID string, ds fleet.Datastore, + interval time.Duration, logger kitlog.Logger, ) (*schedule.Schedule, error) { const ( - name = string(fleet.CronCalendar) - defaultInterval = 5 * time.Minute + name = string(fleet.CronCalendar) ) logger = kitlog.With(logger, "cron", name) s := schedule.New( - ctx, name, instanceID, defaultInterval, ds, ds, + ctx, name, instanceID, interval, ds, ds, schedule.WithAltLockID("calendar"), schedule.WithLogger(logger), schedule.WithJob( @@ -318,9 +318,6 @@ func processFailingHostExistingCalendarEvent( } // Even if fields haven't changed we want to update the calendar_events.updated_at below. updated = true - // - // TODO(lucas): Check changing updatedEvent to UTC before consuming. - // } if updated { @@ -367,8 +364,6 @@ func processFailingHostExistingCalendarEvent( return fmt.Errorf("update host calendar webhook status: %w", err) } - // TODO(lucas): If this doesn't work at scale, then implement a special refetch - // for policies only. if err := ds.UpdateHostRefetchRequested(ctx, host.HostID, true); err != nil { return fmt.Errorf("refetch host: %w", err) } diff --git a/cmd/fleet/calendar_cron_test.go b/server/cron/calendar_cron_test.go similarity index 88% rename from cmd/fleet/calendar_cron_test.go rename to server/cron/calendar_cron_test.go index 4d9133377c..21e2a8fe1d 100644 --- a/cmd/fleet/calendar_cron_test.go +++ b/server/cron/calendar_cron_test.go @@ -1,11 +1,8 @@ -package main +package cron import ( "context" "fmt" - "io" - "net/http" - "net/http/httptest" "os" "strconv" "strings" @@ -17,7 +14,6 @@ import ( "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/mock" kitlog "github.com/go-kit/log" - "github.com/stretchr/testify/require" ) @@ -207,16 +203,19 @@ func TestCalendarEventsMultipleHosts(t *testing.T) { calendar.ClearMockEvents() }) - // TODO(lucas): Test! - webhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, "POST", r.Method) - requestBodyBytes, err := io.ReadAll(r.Body) - require.NoError(t, err) - t.Logf("webhook request: %s\n", requestBodyBytes) - })) - t.Cleanup(func() { - webhookServer.Close() - }) + // + // Test setup + // + // team1: + // + // policyID1 (calendar) + // policyID2 (calendar) + // + // hostID1 has user1@example.com not passing policies. + // hostID2 has user2@example.com passing policies. + // hostID3 does not have example.com email and is not passing policies. + // hostID4 does not have example.com email and is passing policies. + // ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) { return &fleet.AppConfig{ @@ -242,7 +241,7 @@ func TestCalendarEventsMultipleHosts(t *testing.T) { Integrations: fleet.TeamIntegrations{ GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ Enable: true, - WebhookURL: webhookServer.URL, + WebhookURL: "https://foo.example.com", }, }, }, @@ -268,12 +267,13 @@ func TestCalendarEventsMultipleHosts(t *testing.T) { hostID1, userEmail1 := uint(100), "user1@example.com" hostID2, userEmail2 := uint(101), "user2@example.com" - hostID3, userEmail3 := uint(102), "user3@other.com" - hostID4, userEmail4 := uint(103), "user4@other.com" + hostID3 := uint(102) + hostID4 := uint(103) ds.GetTeamHostsPolicyMembershipsFunc = func( ctx context.Context, domain string, teamID uint, policyIDs []uint, ) ([]fleet.HostPolicyMembershipData, error) { + require.Equal(t, "example.com", domain) require.Equal(t, teamID1, teamID) require.Equal(t, []uint{policyID1, policyID2}, policyIDs) return []fleet.HostPolicyMembershipData{ @@ -289,12 +289,12 @@ func TestCalendarEventsMultipleHosts(t *testing.T) { }, { HostID: hostID3, - Email: userEmail3, + Email: "", // because it does not belong to example.com Passing: false, }, { HostID: hostID4, - Email: userEmail4, + Email: "", // because it does not belong to example.com Passing: true, }, }, nil @@ -304,6 +304,10 @@ func TestCalendarEventsMultipleHosts(t *testing.T) { return nil, nil, notFoundErr{} } + var eventsMu sync.Mutex + calendarEvents := make(map[string]*fleet.CalendarEvent) + hostCalendarEvents := make(map[uint]*fleet.HostCalendarEvent) + ds.CreateOrUpdateCalendarEventFunc = func(ctx context.Context, email string, startTime, endTime time.Time, @@ -311,26 +315,43 @@ func TestCalendarEventsMultipleHosts(t *testing.T) { hostID uint, webhookStatus fleet.CalendarWebhookStatus, ) (*fleet.CalendarEvent, error) { - switch email { - case userEmail1: - require.Equal(t, hostID1, hostID) - case userEmail2: - require.Equal(t, hostID2, hostID) - case userEmail3: - require.Equal(t, hostID3, hostID) - case userEmail4: - require.Equal(t, hostID4, hostID) - } + require.Equal(t, hostID1, hostID) + require.Equal(t, userEmail1, email) require.Equal(t, fleet.CalendarWebhookStatusNone, webhookStatus) require.NotEmpty(t, data) require.NotZero(t, startTime) require.NotZero(t, endTime) - // Currently, the returned calendar event is unused. + + eventsMu.Lock() + calendarEventID := uint(len(calendarEvents) + 1) + calendarEvents[email] = &fleet.CalendarEvent{ + ID: calendarEventID, + Email: email, + StartTime: startTime, + EndTime: endTime, + Data: data, + } + hostCalendarEventID := uint(len(hostCalendarEvents) + 1) + hostCalendarEvents[hostID] = &fleet.HostCalendarEvent{ + ID: hostCalendarEventID, + HostID: hostID, + CalendarEventID: calendarEventID, + WebhookStatus: webhookStatus, + } + eventsMu.Unlock() return nil, nil } err := cronCalendarEvents(ctx, ds, logger) require.NoError(t, err) + + eventsMu.Lock() + require.Len(t, calendarEvents, 1) + require.Len(t, hostCalendarEvents, 1) + eventsMu.Unlock() + + createdCalendarEvents := calendar.ListGoogleMockEvents() + require.Len(t, createdCalendarEvents, 1) } type notFoundErr struct{} @@ -356,17 +377,6 @@ func TestCalendarEvents1KHosts(t *testing.T) { calendar.ClearMockEvents() }) - // TODO(lucas): Use for the test. - webhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, "POST", r.Method) - requestBodyBytes, err := io.ReadAll(r.Body) - require.NoError(t, err) - t.Logf("webhook request: %s\n", requestBodyBytes) - })) - t.Cleanup(func() { - webhookServer.Close() - }) - ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) { return &fleet.AppConfig{ Integrations: fleet.Integrations{ @@ -395,7 +405,7 @@ func TestCalendarEvents1KHosts(t *testing.T) { Integrations: fleet.TeamIntegrations{ GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ Enable: true, - WebhookURL: webhookServer.URL, + WebhookURL: "https://foo.example.com", }, }, }, @@ -406,7 +416,7 @@ func TestCalendarEvents1KHosts(t *testing.T) { Integrations: fleet.TeamIntegrations{ GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ Enable: true, - WebhookURL: webhookServer.URL, + WebhookURL: "https://foo.example.com", }, }, }, @@ -417,7 +427,7 @@ func TestCalendarEvents1KHosts(t *testing.T) { Integrations: fleet.TeamIntegrations{ GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ Enable: true, - WebhookURL: webhookServer.URL, + WebhookURL: "https://foo.example.com", }, }, }, @@ -428,7 +438,7 @@ func TestCalendarEvents1KHosts(t *testing.T) { Integrations: fleet.TeamIntegrations{ GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ Enable: true, - WebhookURL: webhookServer.URL, + WebhookURL: "https://foo.example.com", }, }, }, @@ -439,7 +449,7 @@ func TestCalendarEvents1KHosts(t *testing.T) { Integrations: fleet.TeamIntegrations{ GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ Enable: true, - WebhookURL: webhookServer.URL, + WebhookURL: "https://foo.example.com", }, }, }, diff --git a/server/datastore/mysql/calendar_events.go b/server/datastore/mysql/calendar_events.go index ebd071d81a..91f508c169 100644 --- a/server/datastore/mysql/calendar_events.go +++ b/server/datastore/mysql/calendar_events.go @@ -52,7 +52,7 @@ func (ds *Datastore) CreateOrUpdateCalendarEvent( } else { stmt := `SELECT id FROM calendar_events WHERE email = ?` if err := sqlx.GetContext(ctx, tx, &id, stmt, email); err != nil { - return ctxerr.Wrap(ctx, err, "query mdm solution id") + return ctxerr.Wrap(ctx, err, "calendar event id") } } diff --git a/server/datastore/mysql/policies.go b/server/datastore/mysql/policies.go index 098014f699..0f593af8bc 100644 --- a/server/datastore/mysql/policies.go +++ b/server/datastore/mysql/policies.go @@ -1171,7 +1171,6 @@ func (ds *Datastore) GetCalendarPolicies(ctx context.Context, teamID uint) ([]fl return policies, nil } -// TODO(lucas): Must be tested at scale. func (ds *Datastore) GetTeamHostsPolicyMemberships( ctx context.Context, domain string, diff --git a/server/fleet/datastore.go b/server/fleet/datastore.go index 7d0b112a42..1c03aca415 100644 --- a/server/fleet/datastore.go +++ b/server/fleet/datastore.go @@ -594,6 +594,11 @@ type Datastore interface { PolicyQueriesForHost(ctx context.Context, host *Host) (map[string]string, error) + // GetTeamHostsPolicyMembmerships returns the hosts that belong to the given team and their pass/fail statuses + // around the provided policyIDs. + // - Returns hosts of the team that are failing one or more of the provided policies. + // - Returns hosts of the team that are passing all the policies (or are not running any of the provided policies) + // and have a calendar event scheduled. GetTeamHostsPolicyMemberships(ctx context.Context, domain string, teamID uint, policyIDs []uint) ([]HostPolicyMembershipData, error) GetCalendarPolicies(ctx context.Context, teamID uint) ([]PolicyCalendarData, error) diff --git a/server/service/integration_enterprise_test.go b/server/service/integration_enterprise_test.go index 70891e6a23..06d2e1e388 100644 --- a/server/service/integration_enterprise_test.go +++ b/server/service/integration_enterprise_test.go @@ -16,18 +16,21 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" - "github.com/fleetdm/fleet/v4/server/pubsub" - + "github.com/fleetdm/fleet/v4/ee/server/calendar" "github.com/fleetdm/fleet/v4/pkg/optjson" + "github.com/fleetdm/fleet/v4/server/cron" "github.com/fleetdm/fleet/v4/server/datastore/mysql" "github.com/fleetdm/fleet/v4/server/datastore/redis/redistest" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/live_query/live_query_mock" "github.com/fleetdm/fleet/v4/server/mdm" "github.com/fleetdm/fleet/v4/server/ptr" + "github.com/fleetdm/fleet/v4/server/pubsub" + "github.com/fleetdm/fleet/v4/server/service/schedule" "github.com/fleetdm/fleet/v4/server/test" kitlog "github.com/go-kit/kit/log" "github.com/go-kit/log" @@ -48,7 +51,8 @@ func TestIntegrationsEnterprise(t *testing.T) { type integrationEnterpriseTestSuite struct { withServer suite.Suite - redisPool fleet.RedisPool + redisPool fleet.RedisPool + calendarSchedule *schedule.Schedule lq *live_query_mock.MockLiveQuery } @@ -58,6 +62,7 @@ func (s *integrationEnterpriseTestSuite) SetupSuite() { s.redisPool = redistest.SetupRedis(s.T(), "integration_enterprise", false, false, false) s.lq = live_query_mock.New(s.T()) + var calendarSchedule *schedule.Schedule config := TestServerOpts{ License: &fleet.LicenseInfo{ Tier: fleet.TierPremium, @@ -67,6 +72,16 @@ func (s *integrationEnterpriseTestSuite) SetupSuite() { Lq: s.lq, Logger: log.NewLogfmtLogger(os.Stdout), EnableCachedDS: true, + StartCronSchedules: []TestNewScheduleFunc{ + func(ctx context.Context, ds fleet.Datastore) fleet.NewCronScheduleFunc { + return func() (fleet.CronSchedule, error) { + // We set 24-hour interval so that it only runs when triggered. + var err error + calendarSchedule, err = cron.NewCalendarSchedule(ctx, s.T().Name(), s.ds, 24*time.Hour, log.NewJSONLogger(os.Stdout)) + return calendarSchedule, err + } + }, + }, } if os.Getenv("FLEET_INTEGRATION_TESTS_DISABLE_LOG") != "" { config.Logger = kitlog.NewNopLogger() @@ -76,6 +91,7 @@ func (s *integrationEnterpriseTestSuite) SetupSuite() { s.users = users s.token = s.getTestAdminToken() s.cachedTokens = make(map[string]string) + s.calendarSchedule = calendarSchedule } func (s *integrationEnterpriseTestSuite) TearDownTest() { @@ -3605,7 +3621,6 @@ func (s *integrationEnterpriseTestSuite) TestOSVersions() { "GET", fmt.Sprintf("/api/latest/fleet/os_versions/%d", osinfo.OSVersionID), nil, http.StatusForbidden, &osVersionResp, "team_id", "99999", ) - } func (s *integrationEnterpriseTestSuite) TestMDMNotConfiguredEndpoints() { @@ -7336,7 +7351,8 @@ func (s *integrationEnterpriseTestSuite) TestSoftwareAuth() { Description: "desc team1", }) require.NoError(t, err) - require.NoError(t, s.ds.AddHostsToTeam(ctx, &team1.ID, []uint{tmHost.ID})) + err = s.ds.AddHostsToTeam(ctx, &team1.ID, []uint{tmHost.ID}) + require.NoError(t, err) team2, err := s.ds.NewTeam(ctx, &fleet.Team{ ID: 43, Name: "team2", @@ -7653,3 +7669,653 @@ func (s *integrationEnterpriseTestSuite) TestSoftwareAuth() { // set the admin token again to avoid breaking other tests s.token = s.getTestAdminToken() } + +func (s *integrationEnterpriseTestSuite) TestCalendarEvents() { + ctx := context.Background() + t := s.T() + t.Cleanup(func() { + calendar.ClearMockEvents() + }) + currentAppCfg, err := s.ds.AppConfig(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = s.ds.SaveAppConfig(ctx, currentAppCfg) + require.NoError(t, err) + }) + + team1, err := s.ds.NewTeam(ctx, &fleet.Team{ + Name: "team1", + }) + require.NoError(t, err) + team2, err := s.ds.NewTeam(ctx, &fleet.Team{ + Name: "team2", + }) + require.NoError(t, err) + + newHost := func(name string, teamID *uint) *fleet.Host { + h, err := s.ds.NewHost(ctx, &fleet.Host{ + DetailUpdatedAt: time.Now(), + LabelUpdatedAt: time.Now(), + PolicyUpdatedAt: time.Now(), + SeenTime: time.Now().Add(-1 * time.Minute), + OsqueryHostID: ptr.String(t.Name() + name), + NodeKey: ptr.String(t.Name() + name), + UUID: uuid.New().String(), + Hostname: fmt.Sprintf("%s.%s.local", name, t.Name()), + Platform: "darwin", + TeamID: teamID, + }) + require.NoError(t, err) + return h + } + + host1Team1 := newHost("host1", &team1.ID) + host2Team1 := newHost("host2", &team1.ID) + host3Team2 := newHost("host3", &team2.ID) + host4Team2 := newHost("host4", &team2.ID) + _ = newHost("host5", nil) // global host + + team1Policy1Calendar, err := s.ds.NewTeamPolicy( + ctx, team1.ID, nil, fleet.PolicyPayload{ + Name: "team1Policy1Calendar", + Query: "SELECT 1;", + CalendarEventsEnabled: true, + }, + ) + require.NoError(t, err) + team1Policy2, err := s.ds.NewTeamPolicy( + ctx, team1.ID, nil, fleet.PolicyPayload{ + Name: "team1Policy2", + Query: "SELECT 2;", + CalendarEventsEnabled: true, + }, + ) + require.NoError(t, err) + team2Policy1Calendar, err := s.ds.NewTeamPolicy( + ctx, team1.ID, nil, fleet.PolicyPayload{ + Name: "team2Policy1Calendar", + Query: "SELECT 3;", + CalendarEventsEnabled: true, + }, + ) + require.NoError(t, err) + team2Policy2, err := s.ds.NewTeamPolicy( + ctx, team1.ID, nil, fleet.PolicyPayload{ + Name: "team2Policy2", + Query: "SELECT 4;", + CalendarEventsEnabled: false, + }, + ) + require.NoError(t, err) + globalPolicy, err := s.ds.NewGlobalPolicy( + ctx, nil, fleet.PolicyPayload{ + Name: "globalPolicy", + Query: "SELECT 5;", + CalendarEventsEnabled: false, + }, + ) + require.NoError(t, err) + + genDistributedReqWithPolicyResults := func(host *fleet.Host, policyResults map[uint]*bool) submitDistributedQueryResultsRequestShim { + var ( + results = make(map[string]json.RawMessage) + statuses = make(map[string]interface{}) + messages = make(map[string]string) + ) + for policyID, policyResult := range policyResults { + distributedQueryName := hostPolicyQueryPrefix + fmt.Sprint(policyID) + switch { + case policyResult == nil: + results[distributedQueryName] = json.RawMessage(`[]`) + statuses[distributedQueryName] = 1 + messages[distributedQueryName] = "policy failed execution" + case *policyResult: + results[distributedQueryName] = json.RawMessage(`[{"1": "1"}]`) + statuses[distributedQueryName] = 0 + case !*policyResult: + results[distributedQueryName] = json.RawMessage(`[]`) + statuses[distributedQueryName] = 0 + } + } + return submitDistributedQueryResultsRequestShim{ + NodeKey: *host.NodeKey, + Results: results, + Statuses: statuses, + Messages: messages, + Stats: map[string]*fleet.Stats{}, + } + } + + // host1Team1 is failing a calendar policy and not a non-calendar policy (no results for global). + distributedResp := submitDistributedQueryResultsResponse{} + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host1Team1, + map[uint]*bool{ + team1Policy1Calendar.ID: ptr.Bool(false), + team1Policy2.ID: ptr.Bool(true), + globalPolicy.ID: nil, + }, + ), http.StatusOK, &distributedResp) + + // host2Team1 is passing the calendar policy but not the non-calendar policy (no results for global). + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host2Team1, + map[uint]*bool{ + team2Policy1Calendar.ID: ptr.Bool(true), + team2Policy2.ID: ptr.Bool(false), + globalPolicy.ID: nil, + }, + ), http.StatusOK, &distributedResp) + + // host3Team2 is passing team2Policy1Calendar and failing the global policy + // (not results for team2Policy2). + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host3Team2, + map[uint]*bool{ + team2Policy1Calendar.ID: ptr.Bool(true), + team2Policy2.ID: nil, + globalPolicy.ID: ptr.Bool(false), + }, + ), http.StatusOK, &distributedResp) + + // host4Team2 is not returning results for the calendar policy, failing the non-calendar + // policy and passing the global policy. + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host4Team2, + map[uint]*bool{ + team2Policy1Calendar.ID: nil, + team2Policy2.ID: ptr.Bool(false), + globalPolicy.ID: ptr.Bool(true), + }, + ), http.StatusOK, &distributedResp) + + // Trigger the calendar cron with the global feature is disabled. + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + // No calendar events were created. + allCalendarEvents, err := s.ds.ListCalendarEvents(ctx, nil) + require.NoError(t, err) + require.Empty(t, allCalendarEvents) + + // Set global configuration for the calendar feature. + appCfg, err := s.ds.AppConfig(ctx) + require.NoError(t, err) + appCfg.Integrations.GoogleCalendar = []*fleet.GoogleCalendarIntegration{ + { + Domain: "example.com", + ApiKey: map[string]string{ + fleet.GoogleCalendarEmail: "calendar-mock@example.com", + }, + }, + } + err = s.ds.SaveAppConfig(ctx, appCfg) + require.NoError(t, err) + time.Sleep(2 * time.Second) // Wait 2 seconds for the app config cache to clear. + + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + // No calendar events were created because we are missing enabling it on the teams. + allCalendarEvents, err = s.ds.ListCalendarEvents(ctx, nil) + require.NoError(t, err) + require.Empty(t, allCalendarEvents) + + // Run distributed/write for host4Team2 again, it should not attempt to trigger the webhook because + // it's disabled for the teams. + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host4Team2, + map[uint]*bool{ + team2Policy1Calendar.ID: nil, + team2Policy2.ID: ptr.Bool(false), + globalPolicy.ID: ptr.Bool(true), + }, + ), http.StatusOK, &distributedResp) + + var ( + team1Fired int + team1FiredMu sync.Mutex + ) + + team1WebhookFired := make(chan struct{}) + team1WebhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "POST", r.Method) + requestBodyBytes, err := io.ReadAll(r.Body) + require.NoError(t, err) + t.Logf("team1 webhook request: %s\n", requestBodyBytes) + team1FiredMu.Lock() + team1Fired++ + team1WebhookFired <- struct{}{} + team1FiredMu.Unlock() + })) + t.Cleanup(func() { + team1WebhookServer.Close() + }) + + team1.Config.Integrations.GoogleCalendar = &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: team1WebhookServer.URL, + } + team1, err = s.ds.SaveTeam(ctx, team1) + require.NoError(t, err) + + var ( + team2Fired int + team2FiredMu sync.Mutex + ) + + team2WebhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "POST", r.Method) + requestBodyBytes, err := io.ReadAll(r.Body) + require.NoError(t, err) + t.Logf("team2 webhook request: %s\n", requestBodyBytes) + team2FiredMu.Lock() + team2Fired++ + team2FiredMu.Unlock() + })) + t.Cleanup(func() { + team2WebhookServer.Close() + }) + + team2.Config.Integrations.GoogleCalendar = &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: team2WebhookServer.URL, + } + team2, err = s.ds.SaveTeam(ctx, team2) + require.NoError(t, err) + + // + // Same distributed/write as before but they should not fire yet. + // + + // host1Team1 is failing a calendar policy and not a non-calendar policy (no results for global). + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host1Team1, + map[uint]*bool{ + team1Policy1Calendar.ID: ptr.Bool(false), + team1Policy2.ID: ptr.Bool(true), + globalPolicy.ID: nil, + }, + ), http.StatusOK, &distributedResp) + + // host2Team1 is passing the calendar policy but not the non-calendar policy (no results for global). + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host2Team1, + map[uint]*bool{ + team2Policy1Calendar.ID: ptr.Bool(true), + team2Policy2.ID: ptr.Bool(false), + globalPolicy.ID: nil, + }, + ), http.StatusOK, &distributedResp) + + // host3Team2 is passing team2Policy1Calendar and failing the global policy + // (not results for team2Policy2). + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host3Team2, + map[uint]*bool{ + team2Policy1Calendar.ID: ptr.Bool(true), + team2Policy2.ID: nil, + globalPolicy.ID: ptr.Bool(false), + }, + ), http.StatusOK, &distributedResp) + + // host4Team2 is not returning results for the calendar policy, failing the non-calendar + // policy and passing the global policy. + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host4Team2, + map[uint]*bool{ + team2Policy1Calendar.ID: nil, + team2Policy2.ID: ptr.Bool(false), + globalPolicy.ID: ptr.Bool(true), + }, + ), http.StatusOK, &distributedResp) + + team1FiredMu.Lock() + require.Zero(t, team1Fired) + team1FiredMu.Unlock() + + team2FiredMu.Lock() + require.Zero(t, team2Fired) + team2FiredMu.Unlock() + + // Trigger the calendar cron, global feature enabled, team1 enabled, team2 not yet enabled + // and hosts do not have an associated email yet. + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + team1CalendarEvents, err := s.ds.ListCalendarEvents(ctx, &team1.ID) + require.NoError(t, err) + require.Empty(t, team1CalendarEvents) + + // Add an email but of another domain. + err = s.ds.ReplaceHostDeviceMapping(ctx, host1Team1.ID, []*fleet.HostDeviceMapping{ + { + HostID: host1Team1.ID, + Email: "user@other.com", + Source: "google_chrome_profiles", + }, + }, "google_chrome_profiles") + require.NoError(t, err) + + // Trigger the calendar cron, global feature enabled, team1 enabled, team2 not yet enabled + // and hosts do not have an associated email for the domain yet. + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID) + require.NoError(t, err) + require.Empty(t, team1CalendarEvents) + + err = s.ds.ReplaceHostDeviceMapping(ctx, host1Team1.ID, []*fleet.HostDeviceMapping{ + { + HostID: host1Team1.ID, + Email: "user1@example.com", + Source: "google_chrome_profiles", + }, + }, "google_chrome_profiles") + require.NoError(t, err) + + // Trigger the calendar cron, global feature enabled, team1 enabled, team2 not yet enabled + // and host1Team1 has a domain email associated. + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + // An event should be generated for host1Team1 + team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID) + require.NoError(t, err) + require.Len(t, team1CalendarEvents, 1) + require.NotZero(t, team1CalendarEvents[0].ID) + require.Equal(t, "user1@example.com", team1CalendarEvents[0].Email) + require.NotZero(t, team1CalendarEvents[0].StartTime) + require.NotZero(t, team1CalendarEvents[0].EndTime) + + calendar.SetMockEventsToNow() + + mysql.ExecAdhocSQL(t, s.ds, func(db sqlx.ExtContext) error { + // Update updated_at so the event gets updated (the event is updated every 30 minutes) + _, err := db.ExecContext(ctx, + `UPDATE calendar_events SET updated_at = DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 1 HOUR) WHERE id = ?`, team1CalendarEvents[0].ID) + if err != nil { + return err + } + // Set host1Team1 as online. + if _, err := db.ExecContext(ctx, + `UPDATE host_seen_times SET seen_time = CURRENT_TIMESTAMP WHERE host_id = ?`, host1Team1.ID); err != nil { + return err + } + return nil + }) + + // Trigger the calendar cron, global feature enabled, team1 enabled, team2 not yet enabled + // and host1Team1 has a domain email associated. + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + // Check that refetch on the host was set. + host, err := s.ds.Host(ctx, host1Team1.ID) + require.NoError(t, err) + require.True(t, host.RefetchRequested) + + // host1Team1 is failing a calendar policy and not a non-calendar policy (no results for global). + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host1Team1, + map[uint]*bool{ + team1Policy1Calendar.ID: ptr.Bool(false), + team1Policy2.ID: ptr.Bool(true), + globalPolicy.ID: nil, + }, + ), http.StatusOK, &distributedResp) + + // host2Team1 is passing the calendar policy but not the non-calendar policy (no results for global). + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host2Team1, + map[uint]*bool{ + team2Policy1Calendar.ID: ptr.Bool(true), + team2Policy2.ID: ptr.Bool(false), + globalPolicy.ID: nil, + }, + ), http.StatusOK, &distributedResp) + + select { + case <-team1WebhookFired: + case <-time.After(5 * time.Second): + t.Error("timeout waiting for team1 webhook to fire") + } + + // Trigger again, nothing should fire as webhook has already fired. + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + team1FiredMu.Lock() + require.Equal(t, 1, team1Fired) + team1FiredMu.Unlock() + team2FiredMu.Lock() + require.Equal(t, 0, team2Fired) + team2FiredMu.Unlock() + + // Make host1Team1 pass all policies. + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host1Team1, + map[uint]*bool{ + team1Policy1Calendar.ID: ptr.Bool(true), + team1Policy2.ID: ptr.Bool(true), + globalPolicy.ID: nil, + }, + ), http.StatusOK, &distributedResp) + + // Trigger calendar should cleanup the events. + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + // Events in the user calendar should not be cleaned up because they are not in the future. + mockEvents := calendar.ListGoogleMockEvents() + require.NotEmpty(t, mockEvents) + + // Event should be cleaned up from our database. + team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID) + require.NoError(t, err) + require.Empty(t, team1CalendarEvents) +} + +func (s *integrationEnterpriseTestSuite) TestCalendarEventsTransferringHosts() { + ctx := context.Background() + t := s.T() + t.Cleanup(func() { + calendar.ClearMockEvents() + }) + currentAppCfg, err := s.ds.AppConfig(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err = s.ds.SaveAppConfig(ctx, currentAppCfg) + require.NoError(t, err) + }) + + // Set global configuration for the calendar feature. + appCfg, err := s.ds.AppConfig(ctx) + require.NoError(t, err) + appCfg.Integrations.GoogleCalendar = []*fleet.GoogleCalendarIntegration{ + { + Domain: "example.com", + ApiKey: map[string]string{ + fleet.GoogleCalendarEmail: "calendar-mock@example.com", + }, + }, + } + err = s.ds.SaveAppConfig(ctx, appCfg) + require.NoError(t, err) + time.Sleep(2 * time.Second) // Wait 2 seconds for the app config cache to clear. + + team1, err := s.ds.NewTeam(ctx, &fleet.Team{ + Name: "team1", + }) + require.NoError(t, err) + team2, err := s.ds.NewTeam(ctx, &fleet.Team{ + Name: "team2", + }) + require.NoError(t, err) + + team1.Config.Integrations.GoogleCalendar = &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: "https://foo.example.com", + } + team1, err = s.ds.SaveTeam(ctx, team1) + require.NoError(t, err) + team2.Config.Integrations.GoogleCalendar = &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: "https://foo.example.com", + } + team2, err = s.ds.SaveTeam(ctx, team2) + require.NoError(t, err) + + newHost := func(name string, teamID *uint) *fleet.Host { + h, err := s.ds.NewHost(ctx, &fleet.Host{ + DetailUpdatedAt: time.Now(), + LabelUpdatedAt: time.Now(), + PolicyUpdatedAt: time.Now(), + SeenTime: time.Now().Add(-1 * time.Minute), + OsqueryHostID: ptr.String(t.Name() + name), + NodeKey: ptr.String(t.Name() + name), + UUID: uuid.New().String(), + Hostname: fmt.Sprintf("%s.%s.local", name, t.Name()), + Platform: "darwin", + TeamID: teamID, + }) + require.NoError(t, err) + return h + } + + host1 := newHost("host1", &team1.ID) + err = s.ds.ReplaceHostDeviceMapping(ctx, host1.ID, []*fleet.HostDeviceMapping{ + { + HostID: host1.ID, + Email: "user1@example.com", + Source: "google_chrome_profiles", + }, + }, "google_chrome_profiles") + require.NoError(t, err) + + team1Policy1, err := s.ds.NewTeamPolicy( + ctx, team1.ID, nil, fleet.PolicyPayload{ + Name: "team1Policy1", + Query: "SELECT 1;", + CalendarEventsEnabled: true, + }, + ) + require.NoError(t, err) + team2Policy1, err := s.ds.NewTeamPolicy( + ctx, team2.ID, nil, fleet.PolicyPayload{ + Name: "team2Policy1", + Query: "SELECT 2;", + CalendarEventsEnabled: true, + }, + ) + require.NoError(t, err) + + distributedResp := submitDistributedQueryResultsResponse{} + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host1, + map[uint]*bool{ + team1Policy1.ID: ptr.Bool(false), + }, + ), http.StatusOK, &distributedResp) + + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + team1CalendarEvents, err := s.ds.ListCalendarEvents(ctx, &team1.ID) + require.NoError(t, err) + require.Len(t, team1CalendarEvents, 1) + + // Check the calendar was created on the DB. + hostCalendarEvent, calendarEvent, err := s.ds.GetHostCalendarEventByEmail(ctx, "user1@example.com") + require.NoError(t, err) + + // Transfer host to team2. + err = s.ds.AddHostsToTeam(ctx, &team2.ID, []uint{host1.ID}) + require.NoError(t, err) + + // host1 is failing team2's policy too. + s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults( + host1, + map[uint]*bool{ + team2Policy1.ID: ptr.Bool(false), + }, + ), http.StatusOK, &distributedResp) + + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + // Check the calendar event entry was reused. + hostCalendarEvent2, calendarEvent2, err := s.ds.GetHostCalendarEventByEmail(ctx, "user1@example.com") + require.NoError(t, err) + require.Equal(t, calendarEvent2.ID, calendarEvent.ID) + require.Equal(t, hostCalendarEvent2.CalendarEventID, hostCalendarEvent.CalendarEventID) + + // Transfer host to global. + err = s.ds.AddHostsToTeam(ctx, nil, []uint{host1.ID}) + require.NoError(t, err) + + // Move event to two days ago (to clean up the calendar event) + mysql.ExecAdhocSQL(t, s.ds, func(db sqlx.ExtContext) error { + _, err := db.ExecContext(ctx, + `UPDATE calendar_events SET updated_at = DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 49 HOUR) WHERE id = ?`, team1CalendarEvents[0].ID) + if err != nil { + return err + } + return nil + }) + + triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) + + // Calendar event is cleaned up. + _, _, err = s.ds.GetHostCalendarEventByEmail(ctx, "user1@example.com") + require.True(t, fleet.IsNotFound(err)) +} + +func genDistributedReqWithPolicyResults(host *fleet.Host, policyResults map[uint]*bool) submitDistributedQueryResultsRequestShim { + var ( + results = make(map[string]json.RawMessage) + statuses = make(map[string]interface{}) + messages = make(map[string]string) + ) + for policyID, policyResult := range policyResults { + distributedQueryName := hostPolicyQueryPrefix + fmt.Sprint(policyID) + switch { + case policyResult == nil: + results[distributedQueryName] = json.RawMessage(`[]`) + statuses[distributedQueryName] = 1 + messages[distributedQueryName] = "policy failed execution" + case *policyResult: + results[distributedQueryName] = json.RawMessage(`[{"1": "1"}]`) + statuses[distributedQueryName] = 0 + case !*policyResult: + results[distributedQueryName] = json.RawMessage(`[]`) + statuses[distributedQueryName] = 0 + } + } + return submitDistributedQueryResultsRequestShim{ + NodeKey: *host.NodeKey, + Results: results, + Statuses: statuses, + Messages: messages, + Stats: map[string]*fleet.Stats{}, + } +} + +func triggerAndWait(ctx context.Context, t *testing.T, ds fleet.Datastore, s *schedule.Schedule, timeout time.Duration) { + // Following code assumes (for simplicity) only triggered runs. + stats, err := ds.GetLatestCronStats(ctx, s.Name()) + require.NoError(t, err) + var previousRunID int + if len(stats) > 0 { + previousRunID = stats[0].ID + } + + _, err = s.Trigger() + require.NoError(t, err) + + timeoutCh := time.After(timeout) + for { + stats, err := ds.GetLatestCronStats(ctx, s.Name()) + require.NoError(t, err) + if len(stats) > 0 && stats[0].ID > previousRunID && stats[0].Status == fleet.CronStatsStatusCompleted { + t.Logf("cron %s:%d done", s.Name(), stats[0].ID) + return + } + select { + case <-timeoutCh: + t.Fatalf("timeout waiting for schedule %s to complete", s.Name()) + case <-time.After(250 * time.Millisecond): + } + } +} diff --git a/server/service/schedule/schedule.go b/server/service/schedule/schedule.go index eabe66c394..2d0bacb645 100644 --- a/server/service/schedule/schedule.go +++ b/server/service/schedule/schedule.go @@ -162,7 +162,7 @@ func New( // // All jobs must be added before calling Start. func (s *Schedule) Start() { - prevScheduledRun, _, err := s.getLatestStats() + prevScheduledRun, _, err := s.GetLatestStats() if err != nil { level.Error(s.logger).Log("err", "start schedule", "details", err) ctxerr.Handle(s.ctx, err) @@ -203,7 +203,7 @@ func (s *Schedule) Start() { s.runWithStats(fleet.CronStatsTypeTriggered) - prevScheduledRun, _, err := s.getLatestStats() + prevScheduledRun, _, err := s.GetLatestStats() if err != nil { level.Error(s.logger).Log("err", "trigger get cron stats", "details", err) ctxerr.Handle(s.ctx, err) @@ -235,7 +235,7 @@ func (s *Schedule) Start() { schedInterval := s.getSchedInterval() - prevScheduledRun, prevTriggeredRun, err := s.getLatestStats() + prevScheduledRun, prevTriggeredRun, err := s.GetLatestStats() if err != nil { level.Error(s.logger).Log("err", "get cron stats", "details", err) ctxerr.Handle(s.ctx, err) @@ -374,7 +374,7 @@ func (s *Schedule) Start() { // is blocked or otherwise unavailable to publish the signal. From the caller's perspective, both // cases are deemed to be equivalent. func (s *Schedule) Trigger() (*fleet.CronStats, error) { - sched, trig, err := s.getLatestStats() + sched, trig, err := s.GetLatestStats() switch { case err != nil: return nil, err @@ -549,7 +549,7 @@ func (s *Schedule) holdLock() (bool, context.CancelFunc) { return true, cancelFn } -func (s *Schedule) getLatestStats() (fleet.CronStats, fleet.CronStats, error) { +func (s *Schedule) GetLatestStats() (fleet.CronStats, fleet.CronStats, error) { var scheduled, triggered fleet.CronStats cs, err := s.statsStore.GetLatestCronStats(s.ctx, s.name) diff --git a/server/service/testing_client.go b/server/service/testing_client.go index 6445b6dbee..5b67ada9ee 100644 --- a/server/service/testing_client.go +++ b/server/service/testing_client.go @@ -95,6 +95,12 @@ func (ts *withServer) TearDownSuite() { } func (ts *withServer) commonTearDownTest(t *testing.T) { + // By setting DISABLE_TABLES_CLEANUP a developer can troubleshoot tests + // by inspecting mysql tables. + if os.Getenv("DISABLE_CLEANUP_TABLES") != "" { + return + } + ctx := context.Background() u := ts.users["admin1@example.com"]