diff --git a/cmd/fleet/calendar_cron.go b/cmd/fleet/calendar_cron.go index 238260ea3a..a909add0b6 100644 --- a/cmd/fleet/calendar_cron.go +++ b/cmd/fleet/calendar_cron.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "slices" "time" "github.com/fleetdm/fleet/v4/ee/server/calendar" @@ -13,6 +14,7 @@ import ( "github.com/go-kit/log" kitlog "github.com/go-kit/log" "github.com/go-kit/log/level" + "golang.org/x/sync/errgroup" ) func newCalendarSchedule( @@ -57,7 +59,6 @@ func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, logger kitlog.L return nil } googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0] - calendar := createUserCalendarFromConfig(ctx, googleCalendarIntegrationConfig, logger) domain := googleCalendarIntegrationConfig.Domain teams, err := ds.ListTeams(ctx, fleet.TeamFilter{ @@ -71,7 +72,7 @@ func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, logger kitlog.L for _, team := range teams { if err := cronCalendarEventsForTeam( - ctx, ds, calendar, *team, appConfig.OrgInfo.OrgName, domain, logger, + ctx, ds, googleCalendarIntegrationConfig, *team, appConfig.OrgInfo.OrgName, domain, logger, ); err != nil { level.Info(logger).Log("msg", "events calendar cron", "team_id", team.ID, "err", err) } @@ -92,7 +93,7 @@ func createUserCalendarFromConfig(ctx context.Context, config *fleet.GoogleCalen func cronCalendarEventsForTeam( ctx context.Context, ds fleet.Datastore, - calendar fleet.UserCalendar, + calendarConfig *fleet.GoogleCalendarIntegration, team fleet.Team, orgName string, domain string, @@ -161,13 +162,13 @@ func cronCalendarEventsForTeam( // We execute this first to remove any calendar events for a user that is now passing // policies on one of its hosts, and possibly create a new calendar event if they have // another failing host on the same team. - if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendar, passingHosts); err != nil { + if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendarConfig, passingHosts, logger); err != nil { level.Info(logger).Log("msg", "removing calendar events from passing hosts", "err", err) } // Process hosts that are failing calendar policies. if err := processCalendarFailingHosts( - ctx, ds, calendar, orgName, failingHosts, logger, + ctx, ds, calendarConfig, orgName, failingHosts, logger, ); err != nil { level.Info(logger).Log("msg", "processing failing hosts", "err", err) } @@ -185,67 +186,82 @@ func cronCalendarEventsForTeam( func processCalendarFailingHosts( ctx context.Context, ds fleet.Datastore, - userCalendar fleet.UserCalendar, + calendarConfig *fleet.GoogleCalendarIntegration, orgName string, hosts []fleet.HostPolicyMembershipData, logger kitlog.Logger, ) error { hosts = filterHostsWithSameEmail(hosts) - for _, host := range hosts { - logger := log.With(logger, "host_id", host.HostID) + const consumers = 100 + hostsCh := make(chan fleet.HostPolicyMembershipData) + g, ctx := errgroup.WithContext(ctx) - hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, host.Email) + for i := 0; i < consumers; i++ { + g.Go(func() error { + for host := range hostsCh { + logger := log.With(logger, "host_id", host.HostID) - expiredEvent := false - if err == nil { - if hostCalendarEvent.HostID != host.HostID { - // This calendar event belongs to another host with this associated email, - // thus we skip this entry. - continue // continue with next host - } - if hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusPending { - // This can happen if the host went offline (and never returned results) - // after setting the webhook as pending. - continue // continue with next host - } - now := time.Now() - webhookAlreadyFired := hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusSent - if webhookAlreadyFired && sameDate(now, calendarEvent.StartTime) { - // If the webhook already fired today and the policies are still failing - // we give a grace period of one day for the host before we schedule a new event. - continue // continue with next host - } - if calendarEvent.EndTime.Before(now) { - expiredEvent = true - } - } + hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, host.Email) - if err := userCalendar.Configure(host.Email); err != nil { - return fmt.Errorf("configure user calendar: %w", err) - } + expiredEvent := false + if err == nil { + if hostCalendarEvent.HostID != host.HostID { + // This calendar event belongs to another host with this associated email, + // thus we skip this entry. + continue // continue with next host + } + if hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusPending { + // This can happen if the host went offline (and never returned results) + // after setting the webhook as pending. + continue // continue with next host + } + now := time.Now() + webhookAlreadyFired := hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusSent + if webhookAlreadyFired && sameDate(now, calendarEvent.StartTime) { + // If the webhook already fired today and the policies are still failing + // we give a grace period of one day for the host before we schedule a new event. + continue // continue with next host + } + if calendarEvent.EndTime.Before(now) { + expiredEvent = true + } + } - switch { - case err == nil && !expiredEvent: - if err := processFailingHostExistingCalendarEvent( - ctx, ds, userCalendar, orgName, hostCalendarEvent, calendarEvent, host, - ); err != nil { - level.Info(logger).Log("msg", "process failing host existing calendar event", "err", err) - continue // continue with next host + userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger) + if err := userCalendar.Configure(host.Email); err != nil { + return fmt.Errorf("configure user calendar: %w", err) + } + + switch { + case err == nil && !expiredEvent: + if err := processFailingHostExistingCalendarEvent( + ctx, ds, userCalendar, orgName, hostCalendarEvent, calendarEvent, host, + ); err != nil { + level.Info(logger).Log("msg", "process failing host existing calendar event", "err", err) + continue // continue with next host + } + case fleet.IsNotFound(err) || expiredEvent: + if err := processFailingHostCreateCalendarEvent( + ctx, ds, userCalendar, orgName, host, + ); err != nil { + level.Info(logger).Log("msg", "process failing host create calendar event", "err", err) + continue // continue with next host + } + default: + return fmt.Errorf("get calendar event: %w", err) + } } - case fleet.IsNotFound(err) || expiredEvent: - if err := processFailingHostCreateCalendarEvent( - ctx, ds, userCalendar, orgName, host, - ); err != nil { - level.Info(logger).Log("msg", "process failing host create calendar event", "err", err) - continue // continue with next host - } - default: - return fmt.Errorf("get calendar event: %w", err) - } + return nil + }) } - return nil + for _, host := range hosts { + hostsCh <- host + } + close(hostsCh) + + return g.Wait() } func filterHostsWithSameEmail(hosts []fleet.HostPolicyMembershipData) []fleet.HostPolicyMembershipData { @@ -437,27 +453,61 @@ func addBusinessDay(date time.Time) time.Time { func removeCalendarEventsFromPassingHosts( ctx context.Context, ds fleet.Datastore, - userCalendar fleet.UserCalendar, + calendarConfig *fleet.GoogleCalendarIntegration, hosts []fleet.HostPolicyMembershipData, + logger kitlog.Logger, ) error { + hostIDsByEmail := make(map[string][]uint) for _, host := range hosts { - hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, host.Email) - switch { - case err == nil: - if hostCalendarEvent.HostID != host.HostID { - // This calendar event belongs to another host, thus we skip this entry. - continue - } - case fleet.IsNotFound(err): - continue - default: - return fmt.Errorf("get calendar event from DB: %w", err) - } - if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil { - return fmt.Errorf("delete user calendar event: %w", err) - } + hostIDsByEmail[host.Email] = append(hostIDsByEmail[host.Email], host.HostID) } - return nil + type emailWithHosts struct { + email string + hostIDs []uint + } + emails := make([]emailWithHosts, 0, len(hostIDsByEmail)) + for email, hostIDs := range hostIDsByEmail { + emails = append(emails, emailWithHosts{ + email: email, + hostIDs: hostIDs, + }) + } + + const consumers = 100 + emailsCh := make(chan emailWithHosts) + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < consumers; i++ { + g.Go(func() error { + for email := range emailsCh { + + hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, email.email) + switch { + case err == nil: + if ok := slices.Contains(email.hostIDs, hostCalendarEvent.HostID); !ok { + // None of the hosts belong to this calendar event. + continue + } + case fleet.IsNotFound(err): + continue + default: + return fmt.Errorf("get calendar event from DB: %w", err) + } + userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger) + if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil { + return fmt.Errorf("delete user calendar event: %w", err) + } + } + return nil + }) + } + + for _, emailWithHostIDs := range emails { + emailsCh <- emailWithHostIDs + } + close(emailsCh) + + return g.Wait() } func logHostsWithoutAssociatedEmail(