From 9a8ac02bc13d83ed93fb37ea2fffdc0f39289a77 Mon Sep 17 00:00:00 2001 From: Lucas Manuel Rodriguez Date: Tue, 19 Mar 2024 13:05:48 -0300 Subject: [PATCH] Happy path implementation of the calendar cron job (#17713) Happy path for #17441. --- cmd/fleet/calendar_cron.go | 454 ++++++++++++++++++ cmd/fleet/calendar_cron_test.go | 57 +++ cmd/fleet/serve.go | 12 + server/datastore/mysql/calendar_events.go | 150 ++++++ .../datastore/mysql/calendar_events_test.go | 6 + server/datastore/mysql/policies.go | 52 +- server/datastore/mysql/policies_test.go | 56 ++- server/fleet/app.go | 7 + server/fleet/calendar.go | 38 +- server/fleet/calendar_events.go | 9 + server/fleet/cron_schedules.go | 1 + server/fleet/datastore.go | 13 + server/fleet/policies.go | 5 + server/mock/datastore_mock.go | 96 ++++ server/service/osquery.go | 97 ++++ 15 files changed, 1050 insertions(+), 3 deletions(-) create mode 100644 cmd/fleet/calendar_cron.go create mode 100644 cmd/fleet/calendar_cron_test.go create mode 100644 server/datastore/mysql/calendar_events.go create mode 100644 server/datastore/mysql/calendar_events_test.go diff --git a/cmd/fleet/calendar_cron.go b/cmd/fleet/calendar_cron.go new file mode 100644 index 0000000000..099a938b2c --- /dev/null +++ b/cmd/fleet/calendar_cron.go @@ -0,0 +1,454 @@ +package main + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/fleetdm/fleet/v4/ee/server/calendar" + "github.com/fleetdm/fleet/v4/server/fleet" + "github.com/fleetdm/fleet/v4/server/ptr" + "github.com/fleetdm/fleet/v4/server/service/schedule" + "github.com/go-kit/log" + kitlog "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +func newCalendarSchedule( + ctx context.Context, + instanceID string, + ds fleet.Datastore, + logger kitlog.Logger, +) (*schedule.Schedule, error) { + const ( + name = string(fleet.CronCalendar) + defaultInterval = 5 * time.Minute + ) + logger = kitlog.With(logger, "cron", name) + s := schedule.New( + ctx, name, instanceID, defaultInterval, ds, ds, + schedule.WithAltLockID("calendar"), + schedule.WithLogger(logger), + schedule.WithJob( + "calendar_events", + func(ctx context.Context) error { + return cronCalendarEvents(ctx, ds, logger) + }, + ), + ) + + return s, nil +} + +func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, logger kitlog.Logger) error { + appConfig, err := ds.AppConfig(ctx) + if err != nil { + return fmt.Errorf("load app config: %w", err) + } + + if len(appConfig.Integrations.GoogleCalendar) == 0 { + return nil + } + googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0] + googleCalendarConfig := calendar.GoogleCalendarConfig{ + Context: ctx, + IntegrationConfig: googleCalendarIntegrationConfig, + Logger: log.With(logger, "component", "google_calendar"), + } + calendar := calendar.NewGoogleCalendar(&googleCalendarConfig) + domain := googleCalendarIntegrationConfig.Domain + + teams, err := ds.ListTeams(ctx, fleet.TeamFilter{ + User: &fleet.User{ + GlobalRole: ptr.String(fleet.RoleAdmin), + }, + }, fleet.ListOptions{}) + if err != nil { + return fmt.Errorf("list teams: %w", err) + } + + for _, team := range teams { + if err := cronCalendarEventsForTeam( + ctx, ds, calendar, *team, appConfig.OrgInfo.OrgName, domain, logger, + ); err != nil { + level.Info(logger).Log("msg", "events calendar cron", "team_id", team.ID, "err", err) + } + } + + return nil +} + +func cronCalendarEventsForTeam( + ctx context.Context, + ds fleet.Datastore, + calendar fleet.UserCalendar, + team fleet.Team, + orgName string, + domain string, + logger kitlog.Logger, +) error { + if team.Config.Integrations.GoogleCalendar == nil || + !team.Config.Integrations.GoogleCalendar.Enable { + return nil + } + + policies, err := ds.GetCalendarPolicies(ctx, team.ID) + if err != nil { + return fmt.Errorf("get calendar policy ids: %w", err) + } + + if len(policies) == 0 { + return nil + } + + logger = kitlog.With(logger, "team_id", team.ID) + + // + // NOTEs: + // - We ignore hosts that are passing all policies and do not have an associated email. + // - We get only one host per email that's failing policies (the one with lower host id). + // - On every host, we get only the first email that matches the domain (sorted lexicographically). + // + // TODOs(lucas): + // - We need to rate limit calendar requests. + // + + policyIDs := make([]uint, 0, len(policies)) + for _, policy := range policies { + policyIDs = append(policyIDs, policy.ID) + } + hosts, err := ds.GetHostsPolicyMemberships(ctx, domain, policyIDs) + if err != nil { + return fmt.Errorf("get team hosts failing policies: %w", err) + } + + var ( + passingHosts []fleet.HostPolicyMembershipData + failingHosts []fleet.HostPolicyMembershipData + failingHostsWithoutAssociatedEmail []fleet.HostPolicyMembershipData + ) + for _, host := range hosts { + if host.Passing { // host is passing all configured policies + if host.Email != "" { + passingHosts = append(passingHosts, host) + } + } else { // host is failing some of the configured policies + if host.Email == "" { + failingHostsWithoutAssociatedEmail = append(failingHostsWithoutAssociatedEmail, host) + } else { + failingHosts = append(failingHosts, host) + } + } + } + level.Debug(logger).Log( + "msg", "summary", + "passing_hosts", len(passingHosts), + "failing_hosts", len(failingHosts), + "failing_hosts_without_associated_email", len(failingHostsWithoutAssociatedEmail), + ) + + if err := processCalendarFailingHosts( + ctx, ds, calendar, orgName, failingHosts, logger, + ); err != nil { + level.Info(logger).Log("msg", "processing failing hosts", "err", err) + } + + // Remove calendar events from hosts that are passing the policies. + if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendar, passingHosts); err != nil { + level.Info(logger).Log("msg", "removing calendar events from passing hosts", "err", err) + } + + // At last we want to notify the hosts that are failing and don't have an associated email. + if err := fireWebhookForHostsWithoutAssociatedEmail( + team.Config.Integrations.GoogleCalendar.WebhookURL, + domain, + failingHostsWithoutAssociatedEmail, + logger, + ); err != nil { + level.Info(logger).Log("msg", "webhook for hosts without associated email", "err", err) + } + + return nil +} + +func processCalendarFailingHosts( + ctx context.Context, + ds fleet.Datastore, + userCalendar fleet.UserCalendar, + orgName string, + hosts []fleet.HostPolicyMembershipData, + logger kitlog.Logger, +) error { + for _, host := range hosts { + logger := log.With(logger, "host_id", host.HostID) + if err := userCalendar.Configure(host.Email); err != nil { + return fmt.Errorf("configure user calendar: %w", err) + } + + hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEvent(ctx, host.HostID) + + deletedExpiredEvent := false + if err == nil { + if calendarEvent.EndTime.Before(time.Now()) { + if err := ds.DeleteCalendarEvent(ctx, calendarEvent.ID); err != nil { + level.Info(logger).Log("msg", "deleting existing expired calendar event", "err", err) + continue // continue with next host + } + deletedExpiredEvent = true + } + } + + switch { + case err == nil && !deletedExpiredEvent: + if err := processFailingHostExistingCalendarEvent( + ctx, ds, userCalendar, orgName, hostCalendarEvent, calendarEvent, host, + ); err != nil { + level.Info(logger).Log("msg", "process failing host existing calendar event", "err", err) + continue // continue with next host + } + case fleet.IsNotFound(err) || deletedExpiredEvent: + if err := processFailingHostCreateCalendarEvent( + ctx, ds, userCalendar, orgName, host, + ); err != nil { + level.Info(logger).Log("msg", "process failing host create calendar event", "err", err) + continue // continue with next host + } + default: + return fmt.Errorf("get calendar event: %w", err) + } + } + + return nil +} + +func processFailingHostExistingCalendarEvent( + ctx context.Context, + ds fleet.Datastore, + calendar fleet.UserCalendar, + orgName string, + hostCalendarEvent *fleet.HostCalendarEvent, + calendarEvent *fleet.CalendarEvent, + host fleet.HostPolicyMembershipData, +) error { + updatedEvent, updated, err := calendar.GetAndUpdateEvent(calendarEvent, func() string { + return generateCalendarEventBody(orgName, host.HostDisplayName) + }) + if err != nil { + return fmt.Errorf("get event calendar on db: %w", err) + } + if updated { + if err := ds.UpdateCalendarEvent(ctx, + calendarEvent.ID, + updatedEvent.StartTime, + updatedEvent.EndTime, + updatedEvent.Data, + ); err != nil { + return fmt.Errorf("updating event calendar on db: %w", err) + } + } + now := time.Now() + eventInFuture := now.Before(updatedEvent.StartTime) + if eventInFuture { + // If the webhook status was sent and event was moved to the future we set the status to pending. + // This can happen if the admin wants to retry a remediation. + if hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusSent { + if err := ds.UpdateHostCalendarWebhookStatus(ctx, host.HostID, fleet.CalendarWebhookStatusPending); err != nil { + return fmt.Errorf("update host calendar webhook status: %w", err) + } + } + // Nothing else to do as event is in the future. + return nil + } + if now.After(updatedEvent.EndTime) { + return fmt.Errorf( + "unexpected event in the past: now=%s, start_time=%s, end_time=%s", + now, updatedEvent.StartTime, updatedEvent.EndTime, + ) + } + + // + // Event happening now. + // + + if hostCalendarEvent.WebhookStatus == fleet.CalendarWebhookStatusSent { + return nil + } + + online, err := isHostOnline(ctx, ds, host.HostID) + if err != nil { + return fmt.Errorf("host online check: %w", err) + } + if !online { + // If host is offline then there's nothing to do. + return nil + } + + if err := ds.UpdateHostCalendarWebhookStatus(ctx, host.HostID, fleet.CalendarWebhookStatusPending); err != nil { + return fmt.Errorf("update host calendar webhook status: %w", err) + } + + // TODO(lucas): If this doesn't work at scale, then implement a special refetch + // for policies only. + if err := ds.UpdateHostRefetchRequested(ctx, host.HostID, true); err != nil { + return fmt.Errorf("refetch host: %w", err) + } + return nil +} + +func processFailingHostCreateCalendarEvent( + ctx context.Context, + ds fleet.Datastore, + userCalendar fleet.UserCalendar, + orgName string, + host fleet.HostPolicyMembershipData, +) error { + calendarEvent, err := attemptCreatingEventOnUserCalendar(orgName, host, userCalendar) + if err != nil { + return fmt.Errorf("create event on user calendar: %w", err) + } + if _, err := ds.NewCalendarEvent(ctx, host.Email, calendarEvent.StartTime, calendarEvent.EndTime, calendarEvent.Data, host.HostID); err != nil { + return fmt.Errorf("create calendar event on db: %w", err) + } + return nil +} + +func attemptCreatingEventOnUserCalendar( + orgName string, + host fleet.HostPolicyMembershipData, + userCalendar fleet.UserCalendar, +) (*fleet.CalendarEvent, error) { + // TODO(lucas): Where do we handle the following case (it seems CreateEvent needs to return no slot available for the requested day if there are none or too late): + // + // - If it’s the 3rd Tuesday of the month, create an event in the upcoming slot (if available). + // For example, if it’s the 3rd Tuesday of the month at 10:07a, Fleet will look for an open slot starting at 10:30a. + // - If it’s the 3rd Tuesday, Weds, Thurs, etc. of the month and it’s past the last slot, schedule the call for the next business day. + year, month, today := time.Now().Date() + preferredDate := getPreferredCalendarEventDate(year, month, today) + body := generateCalendarEventBody(orgName, host.HostDisplayName) + for { + calendarEvent, err := userCalendar.CreateEvent(preferredDate, body) + var dee fleet.DayEndedError + switch { + case err == nil: + return calendarEvent, nil + case errors.As(err, &dee): + preferredDate = addBusinessDay(preferredDate) + continue + default: + return nil, fmt.Errorf("create event on user calendar: %w", err) + } + } +} + +func getPreferredCalendarEventDate(year int, month time.Month, today int) time.Time { + const ( + // 3rd Tuesday of Month + preferredWeekDay = time.Tuesday + preferredOrdinal = 3 + ) + + firstDayOfMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.UTC) + offset := int(preferredWeekDay - firstDayOfMonth.Weekday()) + if offset < 0 { + offset += 7 + } + preferredDate := firstDayOfMonth.AddDate(0, 0, offset+(7*(preferredOrdinal-1))) + if today > preferredDate.Day() { + today_ := time.Date(year, month, today, 0, 0, 0, 0, time.UTC) + preferredDate = addBusinessDay(today_) + } + return preferredDate +} + +func addBusinessDay(date time.Time) time.Time { + nextBusinessDay := 1 + switch weekday := date.Weekday(); weekday { + case time.Friday: + nextBusinessDay += 2 + case time.Saturday: + nextBusinessDay += 1 + } + return date.AddDate(0, 0, nextBusinessDay) +} + +func removeCalendarEventsFromPassingHosts( + ctx context.Context, + ds fleet.Datastore, + calendar fleet.UserCalendar, + hosts []fleet.HostPolicyMembershipData, +) error { + for _, host := range hosts { + calendarEvent, err := ds.GetCalendarEvent(ctx, host.Email) + switch { + case err == nil: + // OK + case fleet.IsNotFound(err): + continue + default: + return fmt.Errorf("get calendar event from DB: %w", err) + } + + if err := ds.DeleteCalendarEvent(ctx, calendarEvent.ID); err != nil { + return fmt.Errorf("delete db calendar event: %w", err) + } + if err := calendar.Configure(host.Email); err != nil { + return fmt.Errorf("connect to user calendar: %w", err) + } + if err := calendar.DeleteEvent(calendarEvent); err != nil { + return fmt.Errorf("delete calendar event: %w", err) + } + } + return nil +} + +func fireWebhookForHostsWithoutAssociatedEmail( + webhookURL string, + domain string, + hosts []fleet.HostPolicyMembershipData, + logger kitlog.Logger, +) error { + // TODO(lucas): We are firing these every 5 minutes... + for _, host := range hosts { + if err := fleet.FireCalendarWebhook( + webhookURL, + host.HostID, host.HostHardwareSerial, host.HostDisplayName, nil, + fmt.Sprintf("No %s Google account associated with this host.", domain), + ); err != nil { + level.Error(logger).Log( + "msg", "fire webhook for hosts without associated email", "err", err, + ) + } + } + return nil +} + +func generateCalendarEventBody(orgName, hostDisplayName string) string { + return fmt.Sprintf(`Please leave your computer on and connected to power. + +Expect an automated restart. + +%s reserved this time to fix %s.`, orgName, hostDisplayName, + ) +} + +func isHostOnline(ctx context.Context, ds fleet.Datastore, hostID uint) (bool, error) { + hostLite, err := ds.HostLiteByID(ctx, hostID) + if err != nil { + return false, fmt.Errorf("get host lite: %w", err) + } + status := (&fleet.Host{ + DistributedInterval: hostLite.DistributedInterval, + ConfigTLSRefresh: hostLite.ConfigTLSRefresh, + SeenTime: hostLite.SeenTime, + }).Status(time.Now()) + + switch status { + case fleet.StatusOnline, fleet.StatusNew: + return true, nil + case fleet.StatusOffline, fleet.StatusMIA, fleet.StatusMissing: + return false, nil + default: + return false, fmt.Errorf("unknown host status: %s", status) + } +} diff --git a/cmd/fleet/calendar_cron_test.go b/cmd/fleet/calendar_cron_test.go new file mode 100644 index 0000000000..680cf50d95 --- /dev/null +++ b/cmd/fleet/calendar_cron_test.go @@ -0,0 +1,57 @@ +package main + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestGetPreferredCalendarEventDate(t *testing.T) { + date := func(year int, month time.Month, day int) time.Time { + return time.Date(year, month, day, 0, 0, 0, 0, time.UTC) + } + for _, tc := range []struct { + name string + year int + month time.Month + days int + + expected time.Time + }{ + { + year: 2024, + month: 3, + days: 31, + name: "March 2024", + expected: date(2024, 3, 19), + }, + { + year: 2024, + month: 4, + days: 30, + name: "April 2024", + expected: date(2024, 4, 16), + }, + } { + t.Run(tc.name, func(t *testing.T) { + for day := 1; day <= tc.days; day++ { + actual := getPreferredCalendarEventDate(tc.year, tc.month, day) + require.NotEqual(t, actual.Weekday(), time.Saturday) + require.NotEqual(t, actual.Weekday(), time.Sunday) + if day <= tc.expected.Day() { + require.Equal(t, tc.expected, actual) + } else { + today := date(tc.year, tc.month, day) + if weekday := today.Weekday(); weekday == time.Friday { + require.Equal(t, today.AddDate(0, 0, +3), actual) + } else if weekday == time.Saturday { + require.Equal(t, today.AddDate(0, 0, +2), actual) + } else { + require.Equal(t, today.AddDate(0, 0, +1), actual) + } + } + } + }) + } +} diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index 729cc31801..0971c3e39a 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -768,6 +768,18 @@ the way that the Fleet server works. } } + if license.IsPremium() { + if err := cronSchedules.StartCronSchedule( + func() (fleet.CronSchedule, error) { + return newCalendarSchedule( + ctx, instanceID, ds, logger, + ) + }, + ); err != nil { + initFatal(err, "failed to register calendar schedule") + } + } + level.Info(logger).Log("msg", fmt.Sprintf("started cron schedules: %s", strings.Join(cronSchedules.ScheduleNames(), ", "))) // StartCollectors starts a goroutine per collector, using ctx to cancel. diff --git a/server/datastore/mysql/calendar_events.go b/server/datastore/mysql/calendar_events.go new file mode 100644 index 0000000000..3990915976 --- /dev/null +++ b/server/datastore/mysql/calendar_events.go @@ -0,0 +1,150 @@ +package mysql + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" + "github.com/fleetdm/fleet/v4/server/fleet" + "github.com/jmoiron/sqlx" +) + +func (ds *Datastore) NewCalendarEvent( + ctx context.Context, + email string, + startTime time.Time, + endTime time.Time, + data []byte, + hostID uint, +) (*fleet.CalendarEvent, error) { + var calendarEvent *fleet.CalendarEvent + if err := ds.withRetryTxx(ctx, func(tx sqlx.ExtContext) error { + const calendarEventsQuery = ` + INSERT INTO calendar_events ( + email, + start_time, + end_time, + event + ) VALUES (?, ?, ?, ?); + ` + result, err := tx.ExecContext( + ctx, + calendarEventsQuery, + email, + startTime, + endTime, + data, + ) + if err != nil { + return ctxerr.Wrap(ctx, err, "insert calendar event") + } + + id, _ := result.LastInsertId() + calendarEvent = &fleet.CalendarEvent{ + ID: uint(id), + Email: email, + StartTime: startTime, + EndTime: endTime, + Data: data, + } + + const hostCalendarEventsQuery = ` + INSERT INTO host_calendar_events ( + host_id, + calendar_event_id, + webhook_status + ) VALUES (?, ?, ?); + ` + result, err = tx.ExecContext( + ctx, + hostCalendarEventsQuery, + hostID, + calendarEvent.ID, + fleet.CalendarWebhookStatusPending, + ) + if err != nil { + return ctxerr.Wrap(ctx, err, "insert host calendar event") + } + return nil + }); err != nil { + return nil, ctxerr.Wrap(ctx, err) + } + return calendarEvent, nil +} + +func (ds *Datastore) GetCalendarEvent(ctx context.Context, email string) (*fleet.CalendarEvent, error) { + const calendarEventsQuery = ` + SELECT * FROM calendar_events WHERE email = ?; + ` + var calendarEvent fleet.CalendarEvent + err := sqlx.GetContext(ctx, ds.reader(ctx), &calendarEvent, calendarEventsQuery, email) + if err != nil { + if err == sql.ErrNoRows { + return nil, ctxerr.Wrap(ctx, notFound("CalendarEvent").WithMessage(fmt.Sprintf("email: %s", email))) + } + return nil, ctxerr.Wrap(ctx, err, "get calendar event") + } + return &calendarEvent, nil +} + +func (ds *Datastore) UpdateCalendarEvent(ctx context.Context, calendarEventID uint, startTime time.Time, endTime time.Time, data []byte) error { + const calendarEventsQuery = ` + UPDATE calendar_events SET + start_time = ?, + end_time = ?, + event = ? + WHERE id = ?; + ` + if _, err := ds.writer(ctx).ExecContext(ctx, calendarEventsQuery, startTime, endTime, data, calendarEventID); err != nil { + return ctxerr.Wrap(ctx, err, "update calendar event") + } + return nil +} + +func (ds *Datastore) DeleteCalendarEvent(ctx context.Context, calendarEventID uint) error { + const calendarEventsQuery = ` + DELETE FROM calendar_events WHERE id = ?; + ` + if _, err := ds.writer(ctx).ExecContext(ctx, calendarEventsQuery, calendarEventID); err != nil { + return ctxerr.Wrap(ctx, err, "delete calendar event") + } + return nil +} + +func (ds *Datastore) GetHostCalendarEvent(ctx context.Context, hostID uint) (*fleet.HostCalendarEvent, *fleet.CalendarEvent, error) { + const hostCalendarEventsQuery = ` + SELECT * FROM host_calendar_events WHERE host_id = ? + ` + var hostCalendarEvent fleet.HostCalendarEvent + if err := sqlx.GetContext(ctx, ds.reader(ctx), &hostCalendarEvent, hostCalendarEventsQuery, hostID); err != nil { + if err == sql.ErrNoRows { + return nil, nil, ctxerr.Wrap(ctx, notFound("HostCalendarEvent").WithMessage(fmt.Sprintf("host_id: %d", hostID))) + } + return nil, nil, ctxerr.Wrap(ctx, err, "get host calendar event") + } + const calendarEventsQuery = ` + SELECT * FROM calendar_events WHERE id = ? + ` + var calendarEvent fleet.CalendarEvent + if err := sqlx.GetContext(ctx, ds.reader(ctx), &calendarEvent, calendarEventsQuery, hostCalendarEvent.CalendarEventID); err != nil { + if err == sql.ErrNoRows { + return nil, nil, ctxerr.Wrap(ctx, notFound("CalendarEvent").WithID(hostCalendarEvent.CalendarEventID)) + } + return nil, nil, ctxerr.Wrap(ctx, err, "get calendar event") + } + return &hostCalendarEvent, &calendarEvent, nil +} + +func (ds *Datastore) UpdateHostCalendarWebhookStatus(ctx context.Context, hostID uint, status fleet.CalendarWebhookStatus) error { + const calendarEventsQuery = ` + UPDATE host_calendar_events SET + webhook_status = ? + WHERE host_id = ?; + ` + if _, err := ds.writer(ctx).ExecContext(ctx, calendarEventsQuery, status, hostID); err != nil { + return ctxerr.Wrap(ctx, err, "update host calendar event webhook status") + } + return nil +} diff --git a/server/datastore/mysql/calendar_events_test.go b/server/datastore/mysql/calendar_events_test.go new file mode 100644 index 0000000000..ccf07b3c7b --- /dev/null +++ b/server/datastore/mysql/calendar_events_test.go @@ -0,0 +1,6 @@ +package mysql + +import "testing" + +func TestCalendarEvents(t *testing.T) { +} diff --git a/server/datastore/mysql/policies.go b/server/datastore/mysql/policies.go index d2f2424072..0b3498319d 100644 --- a/server/datastore/mysql/policies.go +++ b/server/datastore/mysql/policies.go @@ -5,11 +5,12 @@ import ( "database/sql" "encoding/json" "fmt" - "golang.org/x/text/unicode/norm" "sort" "strings" "time" + "golang.org/x/text/unicode/norm" + "github.com/doug-martin/goqu/v9" "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" "github.com/fleetdm/fleet/v4/server/fleet" @@ -1159,3 +1160,52 @@ func (ds *Datastore) UpdateHostPolicyCounts(ctx context.Context) error { return nil } + +func (ds *Datastore) GetCalendarPolicies(ctx context.Context, teamID uint) ([]fleet.PolicyCalendarData, error) { + query := `SELECT id, name FROM policies WHERE team_id = ? AND calendar_events_enabled;` + var policies []fleet.PolicyCalendarData + err := sqlx.SelectContext(ctx, ds.reader(ctx), &policies, query, teamID) + if err != nil { + return nil, ctxerr.Wrap(ctx, err, "get calendar policies") + } + return policies, nil +} + +// TODO(lucas): Must be tested at scale. +func (ds *Datastore) GetHostsPolicyMemberships(ctx context.Context, domain string, policyIDs []uint) ([]fleet.HostPolicyMembershipData, error) { + query := ` + SELECT + COALESCE(sh.email, '') AS email, + pm.passing AS passing, + h.id AS host_id, + hdn.display_name AS host_display_name, + h.hardware_serial AS host_hardware_serial + FROM ( + SELECT host_id, BIT_AND(COALESCE(passes, 0)) AS passing + FROM policy_membership + WHERE policy_id IN (?) + GROUP BY host_id + ) pm + LEFT JOIN ( + SELECT MIN(h.host_id) as host_id, h.email as email + FROM ( + SELECT host_id, MIN(email) AS email + FROM host_emails WHERE email LIKE CONCAT('%@', ?) + GROUP BY host_id + ) h GROUP BY h.email + ) sh ON sh.host_id = pm.host_id + JOIN hosts h ON h.id = pm.host_id + LEFT JOIN host_display_names hdn ON hdn.host_id = pm.host_id; +` + + query, args, err := sqlx.In(query, policyIDs, domain) + if err != nil { + return nil, ctxerr.Wrapf(ctx, err, "build select get team hosts policy memberships query") + } + var hosts []fleet.HostPolicyMembershipData + if err := sqlx.SelectContext(ctx, ds.reader(ctx), &hosts, query, args...); err != nil { + return nil, ctxerr.Wrap(ctx, err, "listing policies") + } + + return hosts, nil +} diff --git a/server/datastore/mysql/policies_test.go b/server/datastore/mysql/policies_test.go index b0ef3b1bcc..514de6dd38 100644 --- a/server/datastore/mysql/policies_test.go +++ b/server/datastore/mysql/policies_test.go @@ -59,6 +59,7 @@ func TestPolicies(t *testing.T) { {"TestPoliciesNameUnicode", testPoliciesNameUnicode}, {"TestPoliciesNameEmoji", testPoliciesNameEmoji}, {"TestPoliciesNameSort", testPoliciesNameSort}, + {"TestGetCalendarPolicies", testGetCalendarPolicies}, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { @@ -2784,7 +2785,6 @@ func testPoliciesNameEmoji(t *testing.T, ds *Datastore) { assert.NoError(t, err) require.Len(t, policies, 1) assert.Equal(t, emoji1, policies[0].Name) - } // Ensure case-insensitive sort order for policy names @@ -2806,3 +2806,57 @@ func testPoliciesNameSort(t *testing.T, ds *Datastore) { assert.Equal(t, policy.Name, policiesResult[i].Name) } } + +func testGetCalendarPolicies(t *testing.T, ds *Datastore) { + ctx := context.Background() + + // Test with non-existent team. + _, err := ds.GetCalendarPolicies(ctx, 999) + require.NoError(t, err) + + team, err := ds.NewTeam(ctx, &fleet.Team{ + Name: "Foobar", + }) + require.NoError(t, err) + + // Test when the team has no policies. + _, err = ds.GetCalendarPolicies(ctx, team.ID) + require.NoError(t, err) + + // Create a global query to test that only team policies are returned. + _, err = ds.NewGlobalPolicy(ctx, nil, fleet.PolicyPayload{ + Name: "Global Policy", + Query: "SELECT * FROM time;", + }) + require.NoError(t, err) + + _, err = ds.NewTeamPolicy(ctx, team.ID, nil, fleet.PolicyPayload{ + Name: "Team Policy 1", + Query: "SELECT * FROM system_info;", + CalendarEventsEnabled: false, + }) + require.NoError(t, err) + + // Test when the team has policies, but none is configured for calendar. + _, err = ds.GetCalendarPolicies(ctx, team.ID) + require.NoError(t, err) + + teamPolicy2, err := ds.NewTeamPolicy(ctx, team.ID, nil, fleet.PolicyPayload{ + Name: "Team Policy 2", + Query: "SELECT * FROM osquery_info;", + CalendarEventsEnabled: true, + }) + require.NoError(t, err) + teamPolicy3, err := ds.NewTeamPolicy(ctx, team.ID, nil, fleet.PolicyPayload{ + Name: "Team Policy 3", + Query: "SELECT * FROM os_version;", + CalendarEventsEnabled: true, + }) + require.NoError(t, err) + + calendarPolicies, err := ds.GetCalendarPolicies(ctx, team.ID) + require.NoError(t, err) + require.Len(t, calendarPolicies, 2) + require.Equal(t, calendarPolicies[0].ID, teamPolicy2.ID) + require.Equal(t, calendarPolicies[1].ID, teamPolicy3.ID) +} diff --git a/server/fleet/app.go b/server/fleet/app.go index 4b936063a0..560b8bb345 100644 --- a/server/fleet/app.go +++ b/server/fleet/app.go @@ -571,6 +571,13 @@ func (c *AppConfig) Copy() *AppConfig { clone.Integrations.Zendesk[i] = &zd } } + if len(c.Integrations.GoogleCalendar) > 0 { + clone.Integrations.GoogleCalendar = make([]*GoogleCalendarIntegration, len(c.Integrations.GoogleCalendar)) + for i, g := range c.Integrations.GoogleCalendar { + gc := *g + clone.Integrations.GoogleCalendar[i] = &gc + } + } if c.MDM.MacOSSettings.CustomSettings != nil { clone.MDM.MacOSSettings.CustomSettings = make([]MDMProfileSpec, len(c.MDM.MacOSSettings.CustomSettings)) diff --git a/server/fleet/calendar.go b/server/fleet/calendar.go index db2bbbc45e..9b45c2c8a5 100644 --- a/server/fleet/calendar.go +++ b/server/fleet/calendar.go @@ -1,6 +1,12 @@ package fleet -import "time" +import ( + "context" + "fmt" + "time" + + "github.com/fleetdm/fleet/v4/server" +) type DayEndedError struct { Msg string @@ -23,3 +29,33 @@ type UserCalendar interface { // DeleteEvent deletes the event with the given ID. DeleteEvent(event *CalendarEvent) error } + +type CalendarWebhookPayload struct { + Timestamp time.Time `json:"timestamp"` + HostID uint `json:"host_id"` + HostDisplayName string `json:"host_display_name"` + HostSerialNumber string `json:"host_serial_number"` + FailingPolicies []PolicyCalendarData `json:"failing_policies,omitempty"` + Error string `json:"error,omitempty"` +} + +func FireCalendarWebhook( + webhookURL string, + hostID uint, + hostHardwareSerial string, + hostDisplayName string, + failingCalendarPolicies []PolicyCalendarData, + err string, +) error { + if err := server.PostJSONWithTimeout(context.Background(), webhookURL, &CalendarWebhookPayload{ + Timestamp: time.Now(), + HostID: hostID, + HostDisplayName: hostDisplayName, + HostSerialNumber: hostHardwareSerial, + FailingPolicies: failingCalendarPolicies, + Error: err, + }); err != nil { + return fmt.Errorf("POST to %q: %w", server.MaskSecretURLParams(webhookURL), server.MaskURLError(err)) + } + return nil +} diff --git a/server/fleet/calendar_events.go b/server/fleet/calendar_events.go index 7671b4aba5..348cb074a4 100644 --- a/server/fleet/calendar_events.go +++ b/server/fleet/calendar_events.go @@ -27,3 +27,12 @@ type HostCalendarEvent struct { UpdateCreateTimestamps } + +type HostPolicyMembershipData struct { + Email string `db:"email"` + Passing bool `db:"passing"` + + HostID uint `db:"host_id"` + HostDisplayName string `db:"host_display_name"` + HostHardwareSerial string `db:"host_hardware_serial"` +} diff --git a/server/fleet/cron_schedules.go b/server/fleet/cron_schedules.go index 607f15f85c..6b16734fd4 100644 --- a/server/fleet/cron_schedules.go +++ b/server/fleet/cron_schedules.go @@ -21,6 +21,7 @@ const ( CronWorkerIntegrations CronScheduleName = "integrations" CronActivitiesStreaming CronScheduleName = "activities_streaming" CronMDMAppleProfileManager CronScheduleName = "mdm_apple_profile_manager" + CronCalendar CronScheduleName = "calendar" ) type CronSchedulesService interface { diff --git a/server/fleet/datastore.go b/server/fleet/datastore.go index 4081db8af0..f2178bf32b 100644 --- a/server/fleet/datastore.go +++ b/server/fleet/datastore.go @@ -594,6 +594,9 @@ type Datastore interface { PolicyQueriesForHost(ctx context.Context, host *Host) (map[string]string, error) + GetHostsPolicyMemberships(ctx context.Context, domain string, policyIDs []uint) ([]HostPolicyMembershipData, error) + GetCalendarPolicies(ctx context.Context, teamID uint) ([]PolicyCalendarData, error) + // Methods used for async processing of host policy query results. AsyncBatchInsertPolicyMembership(ctx context.Context, batch []PolicyMembershipResult) error AsyncBatchUpdatePolicyTimestamp(ctx context.Context, ids []uint, ts time.Time) error @@ -613,6 +616,16 @@ type Datastore interface { // the updated_at timestamp is older than the provided duration DeleteOutOfDateVulnerabilities(ctx context.Context, source VulnerabilitySource, duration time.Duration) error + /////////////////////////////////////////////////////////////////////////////// + // Calendar events + + NewCalendarEvent(ctx context.Context, email string, startTime time.Time, endTime time.Time, data []byte, hostID uint) (*CalendarEvent, error) + GetCalendarEvent(ctx context.Context, email string) (*CalendarEvent, error) + DeleteCalendarEvent(ctx context.Context, calendarEventID uint) error + UpdateCalendarEvent(ctx context.Context, calendarEventID uint, startTime time.Time, endTime time.Time, data []byte) error + GetHostCalendarEvent(ctx context.Context, hostID uint) (*HostCalendarEvent, *CalendarEvent, error) + UpdateHostCalendarWebhookStatus(ctx context.Context, hostID uint, status CalendarWebhookStatus) error + /////////////////////////////////////////////////////////////////////////////// // Team Policies diff --git a/server/fleet/policies.go b/server/fleet/policies.go index 52a6109b2a..dda2ec047d 100644 --- a/server/fleet/policies.go +++ b/server/fleet/policies.go @@ -179,6 +179,11 @@ type Policy struct { HostCountUpdatedAt *time.Time `json:"host_count_updated_at" db:"host_count_updated_at"` } +type PolicyCalendarData struct { + ID uint `db:"id" json:"id"` + Name string `db:"name" json:"name"` +} + func (p Policy) AuthzType() string { return "policy" } diff --git a/server/mock/datastore_mock.go b/server/mock/datastore_mock.go index 1469826979..4e35d1eef3 100644 --- a/server/mock/datastore_mock.go +++ b/server/mock/datastore_mock.go @@ -440,6 +440,10 @@ type UpdateHostPolicyCountsFunc func(ctx context.Context) error type PolicyQueriesForHostFunc func(ctx context.Context, host *fleet.Host) (map[string]string, error) +type GetHostsPolicyMembershipsFunc func(ctx context.Context, domain string, policyIDs []uint) ([]fleet.HostPolicyMembershipData, error) + +type GetCalendarPoliciesFunc func(ctx context.Context, teamID uint) ([]fleet.PolicyCalendarData, error) + type AsyncBatchInsertPolicyMembershipFunc func(ctx context.Context, batch []fleet.PolicyMembershipResult) error type AsyncBatchUpdatePolicyTimestampFunc func(ctx context.Context, ids []uint, ts time.Time) error @@ -458,6 +462,18 @@ type DeleteSoftwareVulnerabilitiesFunc func(ctx context.Context, vulnerabilities type DeleteOutOfDateVulnerabilitiesFunc func(ctx context.Context, source fleet.VulnerabilitySource, duration time.Duration) error +type NewCalendarEventFunc func(ctx context.Context, email string, startTime time.Time, endTime time.Time, data []byte, hostID uint) (*fleet.CalendarEvent, error) + +type GetCalendarEventFunc func(ctx context.Context, email string) (*fleet.CalendarEvent, error) + +type DeleteCalendarEventFunc func(ctx context.Context, calendarEventID uint) error + +type UpdateCalendarEventFunc func(ctx context.Context, calendarEventID uint, startTime time.Time, endTime time.Time, data []byte) error + +type GetHostCalendarEventFunc func(ctx context.Context, hostID uint) (*fleet.HostCalendarEvent, *fleet.CalendarEvent, error) + +type UpdateHostCalendarWebhookStatusFunc func(ctx context.Context, hostID uint, status fleet.CalendarWebhookStatus) error + type NewTeamPolicyFunc func(ctx context.Context, teamID uint, authorID *uint, args fleet.PolicyPayload) (*fleet.Policy, error) type ListTeamPoliciesFunc func(ctx context.Context, teamID uint, opts fleet.ListOptions, iopts fleet.ListOptions) (teamPolicies []*fleet.Policy, inheritedPolicies []*fleet.Policy, err error) @@ -1492,6 +1508,12 @@ type DataStore struct { PolicyQueriesForHostFunc PolicyQueriesForHostFunc PolicyQueriesForHostFuncInvoked bool + GetHostsPolicyMembershipsFunc GetHostsPolicyMembershipsFunc + GetHostsPolicyMembershipsFuncInvoked bool + + GetCalendarPoliciesFunc GetCalendarPoliciesFunc + GetCalendarPoliciesFuncInvoked bool + AsyncBatchInsertPolicyMembershipFunc AsyncBatchInsertPolicyMembershipFunc AsyncBatchInsertPolicyMembershipFuncInvoked bool @@ -1519,6 +1541,24 @@ type DataStore struct { DeleteOutOfDateVulnerabilitiesFunc DeleteOutOfDateVulnerabilitiesFunc DeleteOutOfDateVulnerabilitiesFuncInvoked bool + NewCalendarEventFunc NewCalendarEventFunc + NewCalendarEventFuncInvoked bool + + GetCalendarEventFunc GetCalendarEventFunc + GetCalendarEventFuncInvoked bool + + DeleteCalendarEventFunc DeleteCalendarEventFunc + DeleteCalendarEventFuncInvoked bool + + UpdateCalendarEventFunc UpdateCalendarEventFunc + UpdateCalendarEventFuncInvoked bool + + GetHostCalendarEventFunc GetHostCalendarEventFunc + GetHostCalendarEventFuncInvoked bool + + UpdateHostCalendarWebhookStatusFunc UpdateHostCalendarWebhookStatusFunc + UpdateHostCalendarWebhookStatusFuncInvoked bool + NewTeamPolicyFunc NewTeamPolicyFunc NewTeamPolicyFuncInvoked bool @@ -3599,6 +3639,20 @@ func (s *DataStore) PolicyQueriesForHost(ctx context.Context, host *fleet.Host) return s.PolicyQueriesForHostFunc(ctx, host) } +func (s *DataStore) GetHostsPolicyMemberships(ctx context.Context, domain string, policyIDs []uint) ([]fleet.HostPolicyMembershipData, error) { + s.mu.Lock() + s.GetHostsPolicyMembershipsFuncInvoked = true + s.mu.Unlock() + return s.GetHostsPolicyMembershipsFunc(ctx, domain, policyIDs) +} + +func (s *DataStore) GetCalendarPolicies(ctx context.Context, teamID uint) ([]fleet.PolicyCalendarData, error) { + s.mu.Lock() + s.GetCalendarPoliciesFuncInvoked = true + s.mu.Unlock() + return s.GetCalendarPoliciesFunc(ctx, teamID) +} + func (s *DataStore) AsyncBatchInsertPolicyMembership(ctx context.Context, batch []fleet.PolicyMembershipResult) error { s.mu.Lock() s.AsyncBatchInsertPolicyMembershipFuncInvoked = true @@ -3662,6 +3716,48 @@ func (s *DataStore) DeleteOutOfDateVulnerabilities(ctx context.Context, source f return s.DeleteOutOfDateVulnerabilitiesFunc(ctx, source, duration) } +func (s *DataStore) NewCalendarEvent(ctx context.Context, email string, startTime time.Time, endTime time.Time, data []byte, hostID uint) (*fleet.CalendarEvent, error) { + s.mu.Lock() + s.NewCalendarEventFuncInvoked = true + s.mu.Unlock() + return s.NewCalendarEventFunc(ctx, email, startTime, endTime, data, hostID) +} + +func (s *DataStore) GetCalendarEvent(ctx context.Context, email string) (*fleet.CalendarEvent, error) { + s.mu.Lock() + s.GetCalendarEventFuncInvoked = true + s.mu.Unlock() + return s.GetCalendarEventFunc(ctx, email) +} + +func (s *DataStore) DeleteCalendarEvent(ctx context.Context, calendarEventID uint) error { + s.mu.Lock() + s.DeleteCalendarEventFuncInvoked = true + s.mu.Unlock() + return s.DeleteCalendarEventFunc(ctx, calendarEventID) +} + +func (s *DataStore) UpdateCalendarEvent(ctx context.Context, calendarEventID uint, startTime time.Time, endTime time.Time, data []byte) error { + s.mu.Lock() + s.UpdateCalendarEventFuncInvoked = true + s.mu.Unlock() + return s.UpdateCalendarEventFunc(ctx, calendarEventID, startTime, endTime, data) +} + +func (s *DataStore) GetHostCalendarEvent(ctx context.Context, hostID uint) (*fleet.HostCalendarEvent, *fleet.CalendarEvent, error) { + s.mu.Lock() + s.GetHostCalendarEventFuncInvoked = true + s.mu.Unlock() + return s.GetHostCalendarEventFunc(ctx, hostID) +} + +func (s *DataStore) UpdateHostCalendarWebhookStatus(ctx context.Context, hostID uint, status fleet.CalendarWebhookStatus) error { + s.mu.Lock() + s.UpdateHostCalendarWebhookStatusFuncInvoked = true + s.mu.Unlock() + return s.UpdateHostCalendarWebhookStatusFunc(ctx, hostID, status) +} + func (s *DataStore) NewTeamPolicy(ctx context.Context, teamID uint, authorID *uint, args fleet.PolicyPayload) (*fleet.Policy, error) { s.mu.Lock() s.NewTeamPolicyFuncInvoked = true diff --git a/server/service/osquery.go b/server/service/osquery.go index 8a77903a88..379afafd6a 100644 --- a/server/service/osquery.go +++ b/server/service/osquery.go @@ -1001,6 +1001,10 @@ func (svc *Service) SubmitDistributedQueryResults( if len(policyResults) > 0 { + if err := processCalendarPolicies(ctx, svc.ds, ac, host, policyResults, svc.logger); err != nil { + logging.WithErr(ctx, err) + } + // filter policy results for webhooks var policyIDs []uint if globalPolicyAutomationsEnabled(ac.WebhookSettings, ac.Integrations) { @@ -1093,6 +1097,99 @@ func (svc *Service) SubmitDistributedQueryResults( return nil } +func processCalendarPolicies( + ctx context.Context, + ds fleet.Datastore, + appConfig *fleet.AppConfig, + host *fleet.Host, + policyResults map[uint]*bool, + logger log.Logger, +) error { + if len(appConfig.Integrations.GoogleCalendar) == 0 || host.TeamID == nil { + return nil + } + + team, err := ds.Team(ctx, *host.TeamID) + if err != nil { + return ctxerr.Wrap(ctx, err, "load host team") + } + + if team.Config.Integrations.GoogleCalendar == nil || !team.Config.Integrations.GoogleCalendar.Enable { + return nil + } + + hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEvent(ctx, host.ID) + switch { + case err == nil: + if hostCalendarEvent.WebhookStatus != fleet.CalendarWebhookStatusPending { + return nil + } + case fleet.IsNotFound(err): + return nil + default: + return ctxerr.Wrap(ctx, err, "get host calendar event") + } + + now := time.Now() + if now.Before(calendarEvent.StartTime) { + level.Warn(logger).Log("msg", "results came too early", "now", now, "start_time", calendarEvent.StartTime) + return nil + } + + // + // TODO(lucas): Discuss. + // + const allowedTimeBeforeEndTime = 5 * time.Minute // up to 5 minutes before the end_time + + if now.After(calendarEvent.EndTime.Add(-allowedTimeBeforeEndTime)) { + level.Warn(logger).Log("msg", "results came too late", "now", now, "end_time", calendarEvent.EndTime) + return nil + } + + calendarPolicies, err := ds.GetCalendarPolicies(ctx, *host.TeamID) + if err != nil { + return ctxerr.Wrap(ctx, err, "get calendar policy ids") + } + if len(calendarPolicies) == 0 { + return nil + } + + failingCalendarPolicies := getFailingCalendarPolicies(policyResults, calendarPolicies) + if len(failingCalendarPolicies) == 0 { + return nil + } + + go func() { + if err := fleet.FireCalendarWebhook( + team.Config.Integrations.GoogleCalendar.WebhookURL, + host.ID, host.HardwareSerial, host.DisplayName(), failingCalendarPolicies, "", + ); err != nil { + level.Error(logger).Log("msg", "fire webhook", "err", err) + return + } + if err := ds.UpdateHostCalendarWebhookStatus(context.Background(), host.ID, fleet.CalendarWebhookStatusSent); err != nil { + level.Error(logger).Log("msg", "mark fired webhook as sent", "err", err) + } + }() + + return nil +} + +func getFailingCalendarPolicies(policyResults map[uint]*bool, calendarPolicies []fleet.PolicyCalendarData) []fleet.PolicyCalendarData { + var failingPolicies []fleet.PolicyCalendarData + for _, calendarPolicy := range calendarPolicies { + result, ok := policyResults[calendarPolicy.ID] + if !ok || // ignore result of a policy that's not configured for calendar. + result == nil { // ignore policies that failed to execute. + continue + } + if !*result { + failingPolicies = append(failingPolicies, calendarPolicy) + } + } + return failingPolicies +} + // preProcessSoftwareResults will run pre-processing on the responses of the software queries. // It will move the results from the software extra queries (e.g. software_vscode_extensions) // into the main software query results (software_{macos|linux|windows}).