From 74968de6e3ff6e3756d3cbd1a6f153898054eb8a Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Mon, 1 Apr 2024 15:54:37 -0500 Subject: [PATCH] Enabling calendar clean up job to delete Google calendar events in parallel. --- cmd/fleet/calendar_cron.go | 48 +++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/cmd/fleet/calendar_cron.go b/cmd/fleet/calendar_cron.go index 953747c602..553fa62c65 100644 --- a/cmd/fleet/calendar_cron.go +++ b/cmd/fleet/calendar_cron.go @@ -609,7 +609,7 @@ func cronCalendarEventsCleanup(ctx context.Context, ds fleet.Datastore, logger k // If global setting is disabled, we remove all calendar events from the DB // (we cannot delete the events from the user calendar because there's no configuration anymore). if userCalendar == nil { - if err := deleteAllCalendarEvents(ctx, ds, nil, nil); err != nil { + if err := deleteAllCalendarEvents(ctx, ds, nil, nil, logger); err != nil { return fmt.Errorf("delete all calendar events: %w", err) } // We've deleted all calendar events, nothing else to do. @@ -630,7 +630,7 @@ func cronCalendarEventsCleanup(ctx context.Context, ds fleet.Datastore, logger k } for _, team := range teams { - if err := cleanupTeamCalendarEvents(ctx, ds, userCalendar, *team); err != nil { + if err := cleanupTeamCalendarEvents(ctx, ds, userCalendar, *team, logger); err != nil { level.Info(logger).Log("msg", "delete team calendar events", "team_id", team.ID, "err", err) } } @@ -644,12 +644,7 @@ func cronCalendarEventsCleanup(ctx context.Context, ds fleet.Datastore, logger k if err != nil { return fmt.Errorf("list out of date calendar events: %w", err) } - for _, outOfDateCalendarEvent := range outOfDateCalendarEvents { - if err := deleteCalendarEvent(ctx, ds, userCalendar, outOfDateCalendarEvent); err != nil { - return fmt.Errorf("delete user calendar event: %w", err) - } - } - + deleteCalendarEventsInParallel(ctx, ds, userCalendar, outOfDateCalendarEvents, logger) return nil } @@ -658,24 +653,49 @@ func deleteAllCalendarEvents( ds fleet.Datastore, userCalendar fleet.UserCalendar, teamID *uint, + logger kitlog.Logger, ) error { calendarEvents, err := ds.ListCalendarEvents(ctx, teamID) if err != nil { return fmt.Errorf("list calendar events: %w", err) } - for _, calendarEvent := range calendarEvents { - if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil { - return fmt.Errorf("delete user calendar event: %w", err) - } - } + deleteCalendarEventsInParallel(ctx, ds, userCalendar, calendarEvents, logger) return nil } +func deleteCalendarEventsInParallel( + ctx context.Context, ds fleet.Datastore, userCalendar fleet.UserCalendar, calendarEvents []*fleet.CalendarEvent, logger kitlog.Logger, +) { + if len(calendarEvents) > 0 { + const consumers = 20 + calendarEventCh := make(chan *fleet.CalendarEvent) + var wg sync.WaitGroup + for i := 0; i < consumers; i++ { + wg.Add(+1) + go func() { + defer wg.Done() + for calEvent := range calendarEventCh { + if err := deleteCalendarEvent(ctx, ds, userCalendar, calEvent); err != nil { + level.Error(logger).Log("msg", "delete user calendar event", "err", err) + continue + } + } + }() + } + for _, outOfDateCalendarEvent := range calendarEvents { + calendarEventCh <- outOfDateCalendarEvent + } + close(calendarEventCh) + wg.Wait() + } +} + func cleanupTeamCalendarEvents( ctx context.Context, ds fleet.Datastore, userCalendar fleet.UserCalendar, team fleet.Team, + logger kitlog.Logger, ) error { teamFeatureEnabled := team.Config.Integrations.GoogleCalendar != nil && team.Config.Integrations.GoogleCalendar.Enable @@ -692,7 +712,7 @@ func cleanupTeamCalendarEvents( // so we want to cleanup all calendar events for the team. } - return deleteAllCalendarEvents(ctx, ds, userCalendar, &team.ID) + return deleteAllCalendarEvents(ctx, ds, userCalendar, &team.ID, logger) } func deleteCalendarEvent(ctx context.Context, ds fleet.Datastore, userCalendar fleet.UserCalendar, calendarEvent *fleet.CalendarEvent) error {