Enabling calendar clean up job to delete Google calendar events in parallel. (#17987)

#17230 

This fix addresses the unreleased bug where calendar cleanup job can
take too long, causing the subsequent job to miss a calendar event. The
event deletions now occur in parallel.

Also, reducing max bandwidth for accessing Google calendar by 10% to
prevent potential rate-limiting corner cases.

# Checklist for submitter

- [ ] Changes file added for user-visible changes in `changes/` or
`orbit/changes/`.
See [Changes
files](https://fleetdm.com/docs/contributing/committing-changes#changes-files)
for more information.
- [ ] Added/updated tests
- [x] Manual QA for all new/changed functionality
This commit is contained in:
Victor Lyuboslavsky 2024-04-01 17:03:00 -05:00 committed by GitHub
commit dbe8f95692
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -17,6 +17,8 @@ import (
"github.com/go-kit/log/level"
)
const calendarConsumers = 18
func newCalendarSchedule(
ctx context.Context,
instanceID string,
@ -200,11 +202,10 @@ func processCalendarFailingHosts(
) {
hosts = filterHostsWithSameEmail(hosts)
const consumers = 20
hostsCh := make(chan fleet.HostPolicyMembershipData)
var wg sync.WaitGroup
for i := 0; i < consumers; i++ {
for i := 0; i < calendarConsumers; i++ {
wg.Add(+1)
go func() {
defer wg.Done()
@ -500,11 +501,10 @@ func removeCalendarEventsFromPassingHosts(
})
}
const consumers = 20
emailsCh := make(chan emailWithHosts)
var wg sync.WaitGroup
for i := 0; i < consumers; i++ {
for i := 0; i < calendarConsumers; i++ {
wg.Add(+1)
go func() {
defer wg.Done()
@ -601,15 +601,16 @@ func cronCalendarEventsCleanup(ctx context.Context, ds fleet.Datastore, logger k
}
var userCalendar fleet.UserCalendar
var calendarConfig *fleet.GoogleCalendarIntegration
if len(appConfig.Integrations.GoogleCalendar) > 0 {
googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0]
userCalendar = createUserCalendarFromConfig(ctx, googleCalendarIntegrationConfig, logger)
calendarConfig = appConfig.Integrations.GoogleCalendar[0]
userCalendar = createUserCalendarFromConfig(ctx, calendarConfig, logger)
}
// If global setting is disabled, we remove all calendar events from the DB
// (we cannot delete the events from the user calendar because there's no configuration anymore).
if userCalendar == nil {
if err := deleteAllCalendarEvents(ctx, ds, nil, nil); err != nil {
if err := deleteAllCalendarEvents(ctx, ds, nil, nil, logger); err != nil {
return fmt.Errorf("delete all calendar events: %w", err)
}
// We've deleted all calendar events, nothing else to do.
@ -630,7 +631,7 @@ func cronCalendarEventsCleanup(ctx context.Context, ds fleet.Datastore, logger k
}
for _, team := range teams {
if err := cleanupTeamCalendarEvents(ctx, ds, userCalendar, *team); err != nil {
if err := cleanupTeamCalendarEvents(ctx, ds, calendarConfig, *team, logger); err != nil {
level.Info(logger).Log("msg", "delete team calendar events", "team_id", team.ID, "err", err)
}
}
@ -644,38 +645,59 @@ func cronCalendarEventsCleanup(ctx context.Context, ds fleet.Datastore, logger k
if err != nil {
return fmt.Errorf("list out of date calendar events: %w", err)
}
for _, outOfDateCalendarEvent := range outOfDateCalendarEvents {
if err := deleteCalendarEvent(ctx, ds, userCalendar, outOfDateCalendarEvent); err != nil {
return fmt.Errorf("delete user calendar event: %w", err)
}
}
deleteCalendarEventsInParallel(ctx, ds, calendarConfig, outOfDateCalendarEvents, logger)
return nil
}
func deleteAllCalendarEvents(
ctx context.Context,
ds fleet.Datastore,
userCalendar fleet.UserCalendar,
calendarConfig *fleet.GoogleCalendarIntegration,
teamID *uint,
logger kitlog.Logger,
) error {
calendarEvents, err := ds.ListCalendarEvents(ctx, teamID)
if err != nil {
return fmt.Errorf("list calendar events: %w", err)
}
for _, calendarEvent := range calendarEvents {
if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil {
return fmt.Errorf("delete user calendar event: %w", err)
}
}
deleteCalendarEventsInParallel(ctx, ds, calendarConfig, calendarEvents, logger)
return nil
}
func deleteCalendarEventsInParallel(
ctx context.Context, ds fleet.Datastore, calendarConfig *fleet.GoogleCalendarIntegration, calendarEvents []*fleet.CalendarEvent,
logger kitlog.Logger,
) {
if len(calendarEvents) > 0 {
calendarEventCh := make(chan *fleet.CalendarEvent)
var wg sync.WaitGroup
for i := 0; i < calendarConsumers; i++ {
wg.Add(+1)
go func() {
defer wg.Done()
for calEvent := range calendarEventCh {
userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger)
if err := deleteCalendarEvent(ctx, ds, userCalendar, calEvent); err != nil {
level.Error(logger).Log("msg", "delete user calendar event", "err", err)
continue
}
}
}()
}
for _, outOfDateCalendarEvent := range calendarEvents {
calendarEventCh <- outOfDateCalendarEvent
}
close(calendarEventCh)
wg.Wait()
}
}
func cleanupTeamCalendarEvents(
ctx context.Context,
ds fleet.Datastore,
userCalendar fleet.UserCalendar,
calendarConfig *fleet.GoogleCalendarIntegration,
team fleet.Team,
logger kitlog.Logger,
) error {
teamFeatureEnabled := team.Config.Integrations.GoogleCalendar != nil && team.Config.Integrations.GoogleCalendar.Enable
@ -692,7 +714,7 @@ func cleanupTeamCalendarEvents(
// so we want to cleanup all calendar events for the team.
}
return deleteAllCalendarEvents(ctx, ds, userCalendar, &team.ID)
return deleteAllCalendarEvents(ctx, ds, calendarConfig, &team.ID, logger)
}
func deleteCalendarEvent(ctx context.Context, ds fleet.Datastore, userCalendar fleet.UserCalendar, calendarEvent *fleet.CalendarEvent) error {