From 51cd71f46423d1e9595ce940696e954dfbf4c268 Mon Sep 17 00:00:00 2001 From: Lucas Manuel Rodriguez Date: Mon, 25 Mar 2024 15:15:13 -0300 Subject: [PATCH] Fix concurrency bug in calendar cron (#17832) #17441 --- cmd/fleet/calendar_cron.go | 60 +-- cmd/fleet/calendar_cron_test.go | 453 ++++++++++++++++++++- ee/server/calendar/google_calendar_mock.go | 26 +- 3 files changed, 504 insertions(+), 35 deletions(-) diff --git a/cmd/fleet/calendar_cron.go b/cmd/fleet/calendar_cron.go index c6113d7181..e4b1927d55 100644 --- a/cmd/fleet/calendar_cron.go +++ b/cmd/fleet/calendar_cron.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "slices" + "sync" "time" "github.com/fleetdm/fleet/v4/ee/server/calendar" @@ -14,7 +15,6 @@ import ( "github.com/go-kit/log" kitlog "github.com/go-kit/log" "github.com/go-kit/log/level" - "golang.org/x/sync/errgroup" ) func newCalendarSchedule( @@ -162,16 +162,18 @@ func cronCalendarEventsForTeam( // We execute this first to remove any calendar events for a user that is now passing // policies on one of its hosts, and possibly create a new calendar event if they have // another failing host on the same team. - if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendarConfig, passingHosts, logger); err != nil { - level.Info(logger).Log("msg", "removing calendar events from passing hosts", "err", err) - } + start := time.Now() + removeCalendarEventsFromPassingHosts(ctx, ds, calendarConfig, passingHosts, logger) + level.Debug(logger).Log( + "msg", "passing_hosts", "took", time.Since(start), + ) // Process hosts that are failing calendar policies. - if err := processCalendarFailingHosts( - ctx, ds, calendarConfig, orgName, failingHosts, logger, - ); err != nil { - level.Info(logger).Log("msg", "processing failing hosts", "err", err) - } + start = time.Now() + processCalendarFailingHosts(ctx, ds, calendarConfig, orgName, failingHosts, logger) + level.Debug(logger).Log( + "msg", "failing_hosts", "took", time.Since(start), + ) // At last we want to log the hosts that are failing and don't have an associated email. logHostsWithoutAssociatedEmail( @@ -190,15 +192,18 @@ func processCalendarFailingHosts( orgName string, hosts []fleet.HostPolicyMembershipData, logger kitlog.Logger, -) error { +) { hosts = filterHostsWithSameEmail(hosts) const consumers = 20 hostsCh := make(chan fleet.HostPolicyMembershipData) - g, ctx := errgroup.WithContext(ctx) + var wg sync.WaitGroup for i := 0; i < consumers; i++ { - g.Go(func() error { + wg.Add(+1) + go func() { + defer wg.Done() + for host := range hostsCh { logger := log.With(logger, "host_id", host.HostID) @@ -230,7 +235,8 @@ func processCalendarFailingHosts( userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger) if err := userCalendar.Configure(host.Email); err != nil { - return fmt.Errorf("configure user calendar: %w", err) + level.Error(logger).Log("msg", "configure user calendar", "err", err) + continue // continue with next host } switch { @@ -249,11 +255,11 @@ func processCalendarFailingHosts( continue // continue with next host } default: - return fmt.Errorf("get calendar event: %w", err) + level.Error(logger).Log("msg", "get calendar event from db", "err", err) + continue // continue with next host } } - return nil - }) + }() } for _, host := range hosts { @@ -261,7 +267,7 @@ func processCalendarFailingHosts( } close(hostsCh) - return g.Wait() + wg.Wait() } func filterHostsWithSameEmail(hosts []fleet.HostPolicyMembershipData) []fleet.HostPolicyMembershipData { @@ -472,7 +478,7 @@ func removeCalendarEventsFromPassingHosts( calendarConfig *fleet.GoogleCalendarIntegration, hosts []fleet.HostPolicyMembershipData, logger kitlog.Logger, -) error { +) { hostIDsByEmail := make(map[string][]uint) for _, host := range hosts { hostIDsByEmail[host.Email] = append(hostIDsByEmail[host.Email], host.HostID) @@ -491,10 +497,13 @@ func removeCalendarEventsFromPassingHosts( const consumers = 20 emailsCh := make(chan emailWithHosts) - g, ctx := errgroup.WithContext(ctx) + var wg sync.WaitGroup for i := 0; i < consumers; i++ { - g.Go(func() error { + wg.Add(+1) + go func() { + defer wg.Done() + for email := range emailsCh { hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, email.email) @@ -507,15 +516,16 @@ func removeCalendarEventsFromPassingHosts( case fleet.IsNotFound(err): continue default: - return fmt.Errorf("get calendar event from DB: %w", err) + level.Error(logger).Log("msg", "get calendar event from DB", "err", err) + continue } userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger) if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil { - return fmt.Errorf("delete user calendar event: %w", err) + level.Error(logger).Log("msg", "delete user calendar event", "err", err) + continue } } - return nil - }) + }() } for _, emailWithHostIDs := range emails { @@ -523,7 +533,7 @@ func removeCalendarEventsFromPassingHosts( } close(emailsCh) - return g.Wait() + wg.Wait() } func logHostsWithoutAssociatedEmail( diff --git a/cmd/fleet/calendar_cron_test.go b/cmd/fleet/calendar_cron_test.go index c6db483c55..4d9133377c 100644 --- a/cmd/fleet/calendar_cron_test.go +++ b/cmd/fleet/calendar_cron_test.go @@ -2,12 +2,21 @@ package main import ( "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/fleetdm/fleet/v4/ee/server/calendar" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/mock" kitlog "github.com/go-kit/log" - "os" - "testing" - "time" "github.com/stretchr/testify/require" ) @@ -188,5 +197,441 @@ func TestEventForDifferentHost(t *testing.T) { err := cronCalendarEvents(ctx, ds, logger) require.NoError(t, err) - +} + +func TestCalendarEventsMultipleHosts(t *testing.T) { + ds := new(mock.Store) + ctx := context.Background() + logger := kitlog.With(kitlog.NewLogfmtLogger(os.Stdout)) + t.Cleanup(func() { + calendar.ClearMockEvents() + }) + + // TODO(lucas): Test! + webhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "POST", r.Method) + requestBodyBytes, err := io.ReadAll(r.Body) + require.NoError(t, err) + t.Logf("webhook request: %s\n", requestBodyBytes) + })) + t.Cleanup(func() { + webhookServer.Close() + }) + + ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) { + return &fleet.AppConfig{ + Integrations: fleet.Integrations{ + GoogleCalendar: []*fleet.GoogleCalendarIntegration{ + { + Domain: "example.com", + ApiKey: map[string]string{ + fleet.GoogleCalendarEmail: "calendar-mock@example.com", + }, + }, + }, + }, + }, nil + } + + teamID1 := uint(1) + ds.ListTeamsFunc = func(ctx context.Context, filter fleet.TeamFilter, opt fleet.ListOptions) ([]*fleet.Team, error) { + return []*fleet.Team{ + { + ID: teamID1, + Config: fleet.TeamConfig{ + Integrations: fleet.TeamIntegrations{ + GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: webhookServer.URL, + }, + }, + }, + }, + }, nil + } + + policyID1 := uint(10) + policyID2 := uint(11) + ds.GetCalendarPoliciesFunc = func(ctx context.Context, teamID uint) ([]fleet.PolicyCalendarData, error) { + require.Equal(t, teamID1, teamID) + return []fleet.PolicyCalendarData{ + { + ID: policyID1, + Name: "Policy 1", + }, + { + ID: policyID2, + Name: "Policy 2", + }, + }, nil + } + + hostID1, userEmail1 := uint(100), "user1@example.com" + hostID2, userEmail2 := uint(101), "user2@example.com" + hostID3, userEmail3 := uint(102), "user3@other.com" + hostID4, userEmail4 := uint(103), "user4@other.com" + + ds.GetTeamHostsPolicyMembershipsFunc = func( + ctx context.Context, domain string, teamID uint, policyIDs []uint, + ) ([]fleet.HostPolicyMembershipData, error) { + require.Equal(t, teamID1, teamID) + require.Equal(t, []uint{policyID1, policyID2}, policyIDs) + return []fleet.HostPolicyMembershipData{ + { + HostID: hostID1, + Email: userEmail1, + Passing: false, + }, + { + HostID: hostID2, + Email: userEmail2, + Passing: true, + }, + { + HostID: hostID3, + Email: userEmail3, + Passing: false, + }, + { + HostID: hostID4, + Email: userEmail4, + Passing: true, + }, + }, nil + } + + ds.GetHostCalendarEventByEmailFunc = func(ctx context.Context, email string) (*fleet.HostCalendarEvent, *fleet.CalendarEvent, error) { + return nil, nil, notFoundErr{} + } + + ds.CreateOrUpdateCalendarEventFunc = func(ctx context.Context, + email string, + startTime, endTime time.Time, + data []byte, + hostID uint, + webhookStatus fleet.CalendarWebhookStatus, + ) (*fleet.CalendarEvent, error) { + switch email { + case userEmail1: + require.Equal(t, hostID1, hostID) + case userEmail2: + require.Equal(t, hostID2, hostID) + case userEmail3: + require.Equal(t, hostID3, hostID) + case userEmail4: + require.Equal(t, hostID4, hostID) + } + require.Equal(t, fleet.CalendarWebhookStatusNone, webhookStatus) + require.NotEmpty(t, data) + require.NotZero(t, startTime) + require.NotZero(t, endTime) + // Currently, the returned calendar event is unused. + return nil, nil + } + + err := cronCalendarEvents(ctx, ds, logger) + require.NoError(t, err) +} + +type notFoundErr struct{} + +func (n notFoundErr) IsNotFound() bool { + return true +} + +func (n notFoundErr) Error() string { + return "not found" +} + +func TestCalendarEvents1KHosts(t *testing.T) { + ds := new(mock.Store) + ctx := context.Background() + var logger kitlog.Logger + if os.Getenv("CALENDAR_TEST_LOGGING") != "" { + logger = kitlog.With(kitlog.NewLogfmtLogger(os.Stdout)) + } else { + logger = kitlog.NewNopLogger() + } + t.Cleanup(func() { + calendar.ClearMockEvents() + }) + + // TODO(lucas): Use for the test. + webhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "POST", r.Method) + requestBodyBytes, err := io.ReadAll(r.Body) + require.NoError(t, err) + t.Logf("webhook request: %s\n", requestBodyBytes) + })) + t.Cleanup(func() { + webhookServer.Close() + }) + + ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) { + return &fleet.AppConfig{ + Integrations: fleet.Integrations{ + GoogleCalendar: []*fleet.GoogleCalendarIntegration{ + { + Domain: "example.com", + ApiKey: map[string]string{ + fleet.GoogleCalendarEmail: "calendar-mock@example.com", + }, + }, + }, + }, + }, nil + } + + teamID1 := uint(1) + teamID2 := uint(2) + teamID3 := uint(3) + teamID4 := uint(4) + teamID5 := uint(5) + ds.ListTeamsFunc = func(ctx context.Context, filter fleet.TeamFilter, opt fleet.ListOptions) ([]*fleet.Team, error) { + return []*fleet.Team{ + { + ID: teamID1, + Config: fleet.TeamConfig{ + Integrations: fleet.TeamIntegrations{ + GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: webhookServer.URL, + }, + }, + }, + }, + { + ID: teamID2, + Config: fleet.TeamConfig{ + Integrations: fleet.TeamIntegrations{ + GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: webhookServer.URL, + }, + }, + }, + }, + { + ID: teamID3, + Config: fleet.TeamConfig{ + Integrations: fleet.TeamIntegrations{ + GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: webhookServer.URL, + }, + }, + }, + }, + { + ID: teamID4, + Config: fleet.TeamConfig{ + Integrations: fleet.TeamIntegrations{ + GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: webhookServer.URL, + }, + }, + }, + }, + { + ID: teamID5, + Config: fleet.TeamConfig{ + Integrations: fleet.TeamIntegrations{ + GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{ + Enable: true, + WebhookURL: webhookServer.URL, + }, + }, + }, + }, + }, nil + } + + policyID1 := uint(10) + policyID2 := uint(11) + policyID3 := uint(12) + policyID4 := uint(13) + policyID5 := uint(14) + policyID6 := uint(15) + policyID7 := uint(16) + policyID8 := uint(17) + policyID9 := uint(18) + policyID10 := uint(19) + ds.GetCalendarPoliciesFunc = func(ctx context.Context, teamID uint) ([]fleet.PolicyCalendarData, error) { + switch teamID { + case teamID1: + return []fleet.PolicyCalendarData{ + { + ID: policyID1, + Name: "Policy 1", + }, + { + ID: policyID2, + Name: "Policy 2", + }, + }, nil + case teamID2: + return []fleet.PolicyCalendarData{ + { + ID: policyID3, + Name: "Policy 3", + }, + { + ID: policyID4, + Name: "Policy 4", + }, + }, nil + case teamID3: + return []fleet.PolicyCalendarData{ + { + ID: policyID5, + Name: "Policy 5", + }, + { + ID: policyID6, + Name: "Policy 6", + }, + }, nil + case teamID4: + return []fleet.PolicyCalendarData{ + { + ID: policyID7, + Name: "Policy 7", + }, + { + ID: policyID8, + Name: "Policy 8", + }, + }, nil + case teamID5: + return []fleet.PolicyCalendarData{ + { + ID: policyID9, + Name: "Policy 9", + }, + { + ID: policyID10, + Name: "Policy 10", + }, + }, nil + default: + return nil, notFoundErr{} + } + } + + hosts := make([]fleet.HostPolicyMembershipData, 0, 1000) + for i := 0; i < 1000; i++ { + hosts = append(hosts, fleet.HostPolicyMembershipData{ + Email: fmt.Sprintf("user%d@example.com", i), + Passing: i%2 == 0, + HostID: uint(i), + HostDisplayName: fmt.Sprintf("display_name%d", i), + HostHardwareSerial: fmt.Sprintf("serial%d", i), + }) + } + + ds.GetTeamHostsPolicyMembershipsFunc = func( + ctx context.Context, domain string, teamID uint, policyIDs []uint, + ) ([]fleet.HostPolicyMembershipData, error) { + var start, end int + switch teamID { + case teamID1: + start, end = 0, 200 + case teamID2: + start, end = 200, 400 + case teamID3: + start, end = 400, 600 + case teamID4: + start, end = 600, 800 + case teamID5: + start, end = 800, 1000 + } + return hosts[start:end], nil + } + + ds.GetHostCalendarEventByEmailFunc = func(ctx context.Context, email string) (*fleet.HostCalendarEvent, *fleet.CalendarEvent, error) { + return nil, nil, notFoundErr{} + } + + eventsCreated := 0 + var eventsCreatedMu sync.Mutex + + eventPerHost := make(map[uint]*fleet.CalendarEvent) + + ds.CreateOrUpdateCalendarEventFunc = func(ctx context.Context, + email string, + startTime, endTime time.Time, + data []byte, + hostID uint, + webhookStatus fleet.CalendarWebhookStatus, + ) (*fleet.CalendarEvent, error) { + require.Equal(t, fmt.Sprintf("user%d@example.com", hostID), email) + eventsCreatedMu.Lock() + eventsCreated += 1 + eventPerHost[hostID] = &fleet.CalendarEvent{ + ID: hostID, + Email: email, + StartTime: startTime, + EndTime: endTime, + Data: data, + UpdateCreateTimestamps: fleet.UpdateCreateTimestamps{ + CreateTimestamp: fleet.CreateTimestamp{ + CreatedAt: time.Now(), + }, + UpdateTimestamp: fleet.UpdateTimestamp{ + UpdatedAt: time.Now(), + }, + }, + } + eventsCreatedMu.Unlock() + require.Equal(t, fleet.CalendarWebhookStatusNone, webhookStatus) + require.NotEmpty(t, data) + require.NotZero(t, startTime) + require.NotZero(t, endTime) + // Currently, the returned calendar event is unused. + return nil, nil + } + + err := cronCalendarEvents(ctx, ds, logger) + require.NoError(t, err) + + createdCalendarEvents := calendar.ListGoogleMockEvents() + require.Equal(t, eventsCreated, 500) + require.Len(t, createdCalendarEvents, 500) + + hosts = make([]fleet.HostPolicyMembershipData, 0, 1000) + for i := 0; i < 1000; i++ { + hosts = append(hosts, fleet.HostPolicyMembershipData{ + Email: fmt.Sprintf("user%d@example.com", i), + Passing: true, + HostID: uint(i), + HostDisplayName: fmt.Sprintf("display_name%d", i), + HostHardwareSerial: fmt.Sprintf("serial%d", i), + }) + } + + ds.GetHostCalendarEventByEmailFunc = func(ctx context.Context, email string) (*fleet.HostCalendarEvent, *fleet.CalendarEvent, error) { + hostID, err := strconv.Atoi(strings.TrimSuffix(strings.TrimPrefix(email, "user"), "@example.com")) + require.NoError(t, err) + if hostID%2 == 0 { + return nil, nil, notFoundErr{} + } + require.Contains(t, eventPerHost, uint(hostID)) + return &fleet.HostCalendarEvent{ + ID: uint(hostID), + HostID: uint(hostID), + CalendarEventID: uint(hostID), + WebhookStatus: fleet.CalendarWebhookStatusNone, + }, eventPerHost[uint(hostID)], nil + } + + ds.DeleteCalendarEventFunc = func(ctx context.Context, calendarEventID uint) error { + return nil + } + + err = cronCalendarEvents(ctx, ds, logger) + require.NoError(t, err) + + createdCalendarEvents = calendar.ListGoogleMockEvents() + require.Len(t, createdCalendarEvents, 0) } diff --git a/ee/server/calendar/google_calendar_mock.go b/ee/server/calendar/google_calendar_mock.go index 255f8d87c7..08e3a72e20 100644 --- a/ee/server/calendar/google_calendar_mock.go +++ b/ee/server/calendar/google_calendar_mock.go @@ -3,23 +3,26 @@ package calendar import ( "context" "errors" - kitlog "github.com/go-kit/log" - "google.golang.org/api/calendar/v3" - "google.golang.org/api/googleapi" "net/http" "os" "strconv" "sync" "time" + + kitlog "github.com/go-kit/log" + "google.golang.org/api/calendar/v3" + "google.golang.org/api/googleapi" ) type GoogleCalendarMockAPI struct { logger kitlog.Logger } -var mockEvents = make(map[string]*calendar.Event) -var mu sync.Mutex -var id uint64 +var ( + mockEvents = make(map[string]*calendar.Event) + mu sync.Mutex + id uint64 +) const latency = 500 * time.Millisecond @@ -44,6 +47,7 @@ func (lowLevelAPI *GoogleCalendarMockAPI) GetSetting(name string) (*calendar.Set } func (lowLevelAPI *GoogleCalendarMockAPI) CreateEvent(event *calendar.Event) (*calendar.Event, error) { + time.Sleep(latency) mu.Lock() defer mu.Unlock() id += 1 @@ -79,3 +83,13 @@ func (lowLevelAPI *GoogleCalendarMockAPI) DeleteEvent(id string) error { delete(mockEvents, id) return nil } + +func ListGoogleMockEvents() map[string]*calendar.Event { + return mockEvents +} + +func ClearMockEvents() { + mu.Lock() + defer mu.Unlock() + mockEvents = make(map[string]*calendar.Event) +}