Fleet calendar process 100 hosts at a time (#17806)

Add concurrency for #17441.
This commit is contained in:
Lucas Manuel Rodriguez 2024-03-22 16:26:11 -03:00 committed by Victor Lyuboslavsky
parent 62049b04bd
commit 355379aa0b
No known key found for this signature in database

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"time"
"github.com/fleetdm/fleet/v4/ee/server/calendar"
@ -13,6 +14,7 @@ import (
"github.com/go-kit/log"
kitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
"golang.org/x/sync/errgroup"
)
func newCalendarSchedule(
@ -57,7 +59,6 @@ func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, logger kitlog.L
return nil
}
googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0]
calendar := createUserCalendarFromConfig(ctx, googleCalendarIntegrationConfig, logger)
domain := googleCalendarIntegrationConfig.Domain
teams, err := ds.ListTeams(ctx, fleet.TeamFilter{
@ -71,7 +72,7 @@ func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, logger kitlog.L
for _, team := range teams {
if err := cronCalendarEventsForTeam(
ctx, ds, calendar, *team, appConfig.OrgInfo.OrgName, domain, logger,
ctx, ds, googleCalendarIntegrationConfig, *team, appConfig.OrgInfo.OrgName, domain, logger,
); err != nil {
level.Info(logger).Log("msg", "events calendar cron", "team_id", team.ID, "err", err)
}
@ -92,7 +93,7 @@ func createUserCalendarFromConfig(ctx context.Context, config *fleet.GoogleCalen
func cronCalendarEventsForTeam(
ctx context.Context,
ds fleet.Datastore,
calendar fleet.UserCalendar,
calendarConfig *fleet.GoogleCalendarIntegration,
team fleet.Team,
orgName string,
domain string,
@ -161,13 +162,13 @@ func cronCalendarEventsForTeam(
// We execute this first to remove any calendar events for a user that is now passing
// policies on one of its hosts, and possibly create a new calendar event if they have
// another failing host on the same team.
if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendar, passingHosts); err != nil {
if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendarConfig, passingHosts, logger); err != nil {
level.Info(logger).Log("msg", "removing calendar events from passing hosts", "err", err)
}
// Process hosts that are failing calendar policies.
if err := processCalendarFailingHosts(
ctx, ds, calendar, orgName, failingHosts, logger,
ctx, ds, calendarConfig, orgName, failingHosts, logger,
); err != nil {
level.Info(logger).Log("msg", "processing failing hosts", "err", err)
}
@ -185,67 +186,82 @@ func cronCalendarEventsForTeam(
func processCalendarFailingHosts(
ctx context.Context,
ds fleet.Datastore,
userCalendar fleet.UserCalendar,
calendarConfig *fleet.GoogleCalendarIntegration,
orgName string,
hosts []fleet.HostPolicyMembershipData,
logger kitlog.Logger,
) error {
hosts = filterHostsWithSameEmail(hosts)
for _, host := range hosts {
logger := log.With(logger, "host_id", host.HostID)
const consumers = 100
hostsCh := make(chan fleet.HostPolicyMembershipData)
g, ctx := errgroup.WithContext(ctx)
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, host.Email)
for i := 0; i < consumers; i++ {
g.Go(func() error {
for host := range hostsCh {
logger := log.With(logger, "host_id", host.HostID)
expiredEvent := false
if err == nil {
if hostCalendarEvent.HostID != host.HostID {
// This calendar event belongs to another host with this associated email,
// thus we skip this entry.
continue // continue with next host
}
if hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusPending {
// This can happen if the host went offline (and never returned results)
// after setting the webhook as pending.
continue // continue with next host
}
now := time.Now()
webhookAlreadyFired := hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusSent
if webhookAlreadyFired && sameDate(now, calendarEvent.StartTime) {
// If the webhook already fired today and the policies are still failing
// we give a grace period of one day for the host before we schedule a new event.
continue // continue with next host
}
if calendarEvent.EndTime.Before(now) {
expiredEvent = true
}
}
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, host.Email)
if err := userCalendar.Configure(host.Email); err != nil {
return fmt.Errorf("configure user calendar: %w", err)
}
expiredEvent := false
if err == nil {
if hostCalendarEvent.HostID != host.HostID {
// This calendar event belongs to another host with this associated email,
// thus we skip this entry.
continue // continue with next host
}
if hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusPending {
// This can happen if the host went offline (and never returned results)
// after setting the webhook as pending.
continue // continue with next host
}
now := time.Now()
webhookAlreadyFired := hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusSent
if webhookAlreadyFired && sameDate(now, calendarEvent.StartTime) {
// If the webhook already fired today and the policies are still failing
// we give a grace period of one day for the host before we schedule a new event.
continue // continue with next host
}
if calendarEvent.EndTime.Before(now) {
expiredEvent = true
}
}
switch {
case err == nil && !expiredEvent:
if err := processFailingHostExistingCalendarEvent(
ctx, ds, userCalendar, orgName, hostCalendarEvent, calendarEvent, host,
); err != nil {
level.Info(logger).Log("msg", "process failing host existing calendar event", "err", err)
continue // continue with next host
userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger)
if err := userCalendar.Configure(host.Email); err != nil {
return fmt.Errorf("configure user calendar: %w", err)
}
switch {
case err == nil && !expiredEvent:
if err := processFailingHostExistingCalendarEvent(
ctx, ds, userCalendar, orgName, hostCalendarEvent, calendarEvent, host,
); err != nil {
level.Info(logger).Log("msg", "process failing host existing calendar event", "err", err)
continue // continue with next host
}
case fleet.IsNotFound(err) || expiredEvent:
if err := processFailingHostCreateCalendarEvent(
ctx, ds, userCalendar, orgName, host,
); err != nil {
level.Info(logger).Log("msg", "process failing host create calendar event", "err", err)
continue // continue with next host
}
default:
return fmt.Errorf("get calendar event: %w", err)
}
}
case fleet.IsNotFound(err) || expiredEvent:
if err := processFailingHostCreateCalendarEvent(
ctx, ds, userCalendar, orgName, host,
); err != nil {
level.Info(logger).Log("msg", "process failing host create calendar event", "err", err)
continue // continue with next host
}
default:
return fmt.Errorf("get calendar event: %w", err)
}
return nil
})
}
return nil
for _, host := range hosts {
hostsCh <- host
}
close(hostsCh)
return g.Wait()
}
func filterHostsWithSameEmail(hosts []fleet.HostPolicyMembershipData) []fleet.HostPolicyMembershipData {
@ -437,27 +453,61 @@ func addBusinessDay(date time.Time) time.Time {
func removeCalendarEventsFromPassingHosts(
ctx context.Context,
ds fleet.Datastore,
userCalendar fleet.UserCalendar,
calendarConfig *fleet.GoogleCalendarIntegration,
hosts []fleet.HostPolicyMembershipData,
logger kitlog.Logger,
) error {
hostIDsByEmail := make(map[string][]uint)
for _, host := range hosts {
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, host.Email)
switch {
case err == nil:
if hostCalendarEvent.HostID != host.HostID {
// This calendar event belongs to another host, thus we skip this entry.
continue
}
case fleet.IsNotFound(err):
continue
default:
return fmt.Errorf("get calendar event from DB: %w", err)
}
if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil {
return fmt.Errorf("delete user calendar event: %w", err)
}
hostIDsByEmail[host.Email] = append(hostIDsByEmail[host.Email], host.HostID)
}
return nil
type emailWithHosts struct {
email string
hostIDs []uint
}
emails := make([]emailWithHosts, 0, len(hostIDsByEmail))
for email, hostIDs := range hostIDsByEmail {
emails = append(emails, emailWithHosts{
email: email,
hostIDs: hostIDs,
})
}
const consumers = 100
emailsCh := make(chan emailWithHosts)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < consumers; i++ {
g.Go(func() error {
for email := range emailsCh {
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, email.email)
switch {
case err == nil:
if ok := slices.Contains(email.hostIDs, hostCalendarEvent.HostID); !ok {
// None of the hosts belong to this calendar event.
continue
}
case fleet.IsNotFound(err):
continue
default:
return fmt.Errorf("get calendar event from DB: %w", err)
}
userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger)
if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil {
return fmt.Errorf("delete user calendar event: %w", err)
}
}
return nil
})
}
for _, emailWithHostIDs := range emails {
emailsCh <- emailWithHostIDs
}
close(emailsCh)
return g.Wait()
}
func logHostsWithoutAssociatedEmail(