Add enterprise integration test for calendar events (#17900)

Integration tests for the calendar feature: #17441.

Adding coverage screenshots for the calendar cron and the osquery
distributed/write coverage:

![Screenshot 2024-03-27 at 14 20
44](https://github.com/fleetdm/fleet/assets/2073526/40d394ab-2208-4bec-981b-fe22fae8b5c1)
![Screenshot 2024-03-27 at 14 21
20](https://github.com/fleetdm/fleet/assets/2073526/1e4c8611-21ba-48a6-82f8-a163594f7f01)
This commit is contained in:
Lucas Manuel Rodriguez 2024-04-04 14:58:31 -03:00 committed by GitHub
parent 77c8adf4bb
commit e8ca959888
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 763 additions and 72 deletions

View file

@ -28,6 +28,7 @@ import (
configpkg "github.com/fleetdm/fleet/v4/server/config"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
licensectx "github.com/fleetdm/fleet/v4/server/contexts/license"
"github.com/fleetdm/fleet/v4/server/cron"
"github.com/fleetdm/fleet/v4/server/datastore/cached_mysql"
"github.com/fleetdm/fleet/v4/server/datastore/mysql"
"github.com/fleetdm/fleet/v4/server/datastore/mysqlredis"
@ -773,9 +774,7 @@ the way that the Fleet server works.
if license.IsPremium() {
if err := cronSchedules.StartCronSchedule(
func() (fleet.CronSchedule, error) {
return newCalendarSchedule(
ctx, instanceID, ds, logger,
)
return cron.NewCalendarSchedule(ctx, instanceID, ds, 5*time.Minute, logger)
},
); err != nil {
initFatal(err, "failed to register calendar schedule")

View file

@ -93,3 +93,14 @@ func ClearMockEvents() {
defer mu.Unlock()
mockEvents = make(map[string]*calendar.Event)
}
func SetMockEventsToNow() {
mu.Lock()
defer mu.Unlock()
now := time.Now()
for _, mockEvent := range mockEvents {
mockEvent.Start = &calendar.EventDateTime{DateTime: now.Format(time.RFC3339)}
mockEvent.End = &calendar.EventDateTime{DateTime: now.Add(30 * time.Minute).Format(time.RFC3339)}
}
}

View file

@ -1,4 +1,4 @@
package main
package cron
import (
"context"
@ -19,19 +19,19 @@ import (
const calendarConsumers = 18
func newCalendarSchedule(
func NewCalendarSchedule(
ctx context.Context,
instanceID string,
ds fleet.Datastore,
interval time.Duration,
logger kitlog.Logger,
) (*schedule.Schedule, error) {
const (
name = string(fleet.CronCalendar)
defaultInterval = 5 * time.Minute
name = string(fleet.CronCalendar)
)
logger = kitlog.With(logger, "cron", name)
s := schedule.New(
ctx, name, instanceID, defaultInterval, ds, ds,
ctx, name, instanceID, interval, ds, ds,
schedule.WithAltLockID("calendar"),
schedule.WithLogger(logger),
schedule.WithJob(
@ -318,9 +318,6 @@ func processFailingHostExistingCalendarEvent(
}
// Even if fields haven't changed we want to update the calendar_events.updated_at below.
updated = true
//
// TODO(lucas): Check changing updatedEvent to UTC before consuming.
//
}
if updated {
@ -367,8 +364,6 @@ func processFailingHostExistingCalendarEvent(
return fmt.Errorf("update host calendar webhook status: %w", err)
}
// TODO(lucas): If this doesn't work at scale, then implement a special refetch
// for policies only.
if err := ds.UpdateHostRefetchRequested(ctx, host.HostID, true); err != nil {
return fmt.Errorf("refetch host: %w", err)
}

View file

@ -1,11 +1,8 @@
package main
package cron
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"strconv"
"strings"
@ -17,7 +14,6 @@ import (
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/mock"
kitlog "github.com/go-kit/log"
"github.com/stretchr/testify/require"
)
@ -207,16 +203,19 @@ func TestCalendarEventsMultipleHosts(t *testing.T) {
calendar.ClearMockEvents()
})
// TODO(lucas): Test!
webhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "POST", r.Method)
requestBodyBytes, err := io.ReadAll(r.Body)
require.NoError(t, err)
t.Logf("webhook request: %s\n", requestBodyBytes)
}))
t.Cleanup(func() {
webhookServer.Close()
})
//
// Test setup
//
// team1:
//
// policyID1 (calendar)
// policyID2 (calendar)
//
// hostID1 has user1@example.com not passing policies.
// hostID2 has user2@example.com passing policies.
// hostID3 does not have example.com email and is not passing policies.
// hostID4 does not have example.com email and is passing policies.
//
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return &fleet.AppConfig{
@ -242,7 +241,7 @@ func TestCalendarEventsMultipleHosts(t *testing.T) {
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
WebhookURL: "https://foo.example.com",
},
},
},
@ -268,12 +267,13 @@ func TestCalendarEventsMultipleHosts(t *testing.T) {
hostID1, userEmail1 := uint(100), "user1@example.com"
hostID2, userEmail2 := uint(101), "user2@example.com"
hostID3, userEmail3 := uint(102), "user3@other.com"
hostID4, userEmail4 := uint(103), "user4@other.com"
hostID3 := uint(102)
hostID4 := uint(103)
ds.GetTeamHostsPolicyMembershipsFunc = func(
ctx context.Context, domain string, teamID uint, policyIDs []uint,
) ([]fleet.HostPolicyMembershipData, error) {
require.Equal(t, "example.com", domain)
require.Equal(t, teamID1, teamID)
require.Equal(t, []uint{policyID1, policyID2}, policyIDs)
return []fleet.HostPolicyMembershipData{
@ -289,12 +289,12 @@ func TestCalendarEventsMultipleHosts(t *testing.T) {
},
{
HostID: hostID3,
Email: userEmail3,
Email: "", // because it does not belong to example.com
Passing: false,
},
{
HostID: hostID4,
Email: userEmail4,
Email: "", // because it does not belong to example.com
Passing: true,
},
}, nil
@ -304,6 +304,10 @@ func TestCalendarEventsMultipleHosts(t *testing.T) {
return nil, nil, notFoundErr{}
}
var eventsMu sync.Mutex
calendarEvents := make(map[string]*fleet.CalendarEvent)
hostCalendarEvents := make(map[uint]*fleet.HostCalendarEvent)
ds.CreateOrUpdateCalendarEventFunc = func(ctx context.Context,
email string,
startTime, endTime time.Time,
@ -311,26 +315,43 @@ func TestCalendarEventsMultipleHosts(t *testing.T) {
hostID uint,
webhookStatus fleet.CalendarWebhookStatus,
) (*fleet.CalendarEvent, error) {
switch email {
case userEmail1:
require.Equal(t, hostID1, hostID)
case userEmail2:
require.Equal(t, hostID2, hostID)
case userEmail3:
require.Equal(t, hostID3, hostID)
case userEmail4:
require.Equal(t, hostID4, hostID)
}
require.Equal(t, hostID1, hostID)
require.Equal(t, userEmail1, email)
require.Equal(t, fleet.CalendarWebhookStatusNone, webhookStatus)
require.NotEmpty(t, data)
require.NotZero(t, startTime)
require.NotZero(t, endTime)
// Currently, the returned calendar event is unused.
eventsMu.Lock()
calendarEventID := uint(len(calendarEvents) + 1)
calendarEvents[email] = &fleet.CalendarEvent{
ID: calendarEventID,
Email: email,
StartTime: startTime,
EndTime: endTime,
Data: data,
}
hostCalendarEventID := uint(len(hostCalendarEvents) + 1)
hostCalendarEvents[hostID] = &fleet.HostCalendarEvent{
ID: hostCalendarEventID,
HostID: hostID,
CalendarEventID: calendarEventID,
WebhookStatus: webhookStatus,
}
eventsMu.Unlock()
return nil, nil
}
err := cronCalendarEvents(ctx, ds, logger)
require.NoError(t, err)
eventsMu.Lock()
require.Len(t, calendarEvents, 1)
require.Len(t, hostCalendarEvents, 1)
eventsMu.Unlock()
createdCalendarEvents := calendar.ListGoogleMockEvents()
require.Len(t, createdCalendarEvents, 1)
}
type notFoundErr struct{}
@ -356,17 +377,6 @@ func TestCalendarEvents1KHosts(t *testing.T) {
calendar.ClearMockEvents()
})
// TODO(lucas): Use for the test.
webhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "POST", r.Method)
requestBodyBytes, err := io.ReadAll(r.Body)
require.NoError(t, err)
t.Logf("webhook request: %s\n", requestBodyBytes)
}))
t.Cleanup(func() {
webhookServer.Close()
})
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return &fleet.AppConfig{
Integrations: fleet.Integrations{
@ -395,7 +405,7 @@ func TestCalendarEvents1KHosts(t *testing.T) {
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
WebhookURL: "https://foo.example.com",
},
},
},
@ -406,7 +416,7 @@ func TestCalendarEvents1KHosts(t *testing.T) {
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
WebhookURL: "https://foo.example.com",
},
},
},
@ -417,7 +427,7 @@ func TestCalendarEvents1KHosts(t *testing.T) {
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
WebhookURL: "https://foo.example.com",
},
},
},
@ -428,7 +438,7 @@ func TestCalendarEvents1KHosts(t *testing.T) {
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
WebhookURL: "https://foo.example.com",
},
},
},
@ -439,7 +449,7 @@ func TestCalendarEvents1KHosts(t *testing.T) {
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
WebhookURL: "https://foo.example.com",
},
},
},

View file

@ -52,7 +52,7 @@ func (ds *Datastore) CreateOrUpdateCalendarEvent(
} else {
stmt := `SELECT id FROM calendar_events WHERE email = ?`
if err := sqlx.GetContext(ctx, tx, &id, stmt, email); err != nil {
return ctxerr.Wrap(ctx, err, "query mdm solution id")
return ctxerr.Wrap(ctx, err, "calendar event id")
}
}

View file

@ -1171,7 +1171,6 @@ func (ds *Datastore) GetCalendarPolicies(ctx context.Context, teamID uint) ([]fl
return policies, nil
}
// TODO(lucas): Must be tested at scale.
func (ds *Datastore) GetTeamHostsPolicyMemberships(
ctx context.Context,
domain string,

View file

@ -594,6 +594,11 @@ type Datastore interface {
PolicyQueriesForHost(ctx context.Context, host *Host) (map[string]string, error)
// GetTeamHostsPolicyMembmerships returns the hosts that belong to the given team and their pass/fail statuses
// around the provided policyIDs.
// - Returns hosts of the team that are failing one or more of the provided policies.
// - Returns hosts of the team that are passing all the policies (or are not running any of the provided policies)
// and have a calendar event scheduled.
GetTeamHostsPolicyMemberships(ctx context.Context, domain string, teamID uint, policyIDs []uint) ([]HostPolicyMembershipData, error)
GetCalendarPolicies(ctx context.Context, teamID uint) ([]PolicyCalendarData, error)

View file

@ -16,18 +16,21 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/fleetdm/fleet/v4/server/pubsub"
"github.com/fleetdm/fleet/v4/ee/server/calendar"
"github.com/fleetdm/fleet/v4/pkg/optjson"
"github.com/fleetdm/fleet/v4/server/cron"
"github.com/fleetdm/fleet/v4/server/datastore/mysql"
"github.com/fleetdm/fleet/v4/server/datastore/redis/redistest"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/live_query/live_query_mock"
"github.com/fleetdm/fleet/v4/server/mdm"
"github.com/fleetdm/fleet/v4/server/ptr"
"github.com/fleetdm/fleet/v4/server/pubsub"
"github.com/fleetdm/fleet/v4/server/service/schedule"
"github.com/fleetdm/fleet/v4/server/test"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/log"
@ -48,7 +51,8 @@ func TestIntegrationsEnterprise(t *testing.T) {
type integrationEnterpriseTestSuite struct {
withServer
suite.Suite
redisPool fleet.RedisPool
redisPool fleet.RedisPool
calendarSchedule *schedule.Schedule
lq *live_query_mock.MockLiveQuery
}
@ -58,6 +62,7 @@ func (s *integrationEnterpriseTestSuite) SetupSuite() {
s.redisPool = redistest.SetupRedis(s.T(), "integration_enterprise", false, false, false)
s.lq = live_query_mock.New(s.T())
var calendarSchedule *schedule.Schedule
config := TestServerOpts{
License: &fleet.LicenseInfo{
Tier: fleet.TierPremium,
@ -67,6 +72,16 @@ func (s *integrationEnterpriseTestSuite) SetupSuite() {
Lq: s.lq,
Logger: log.NewLogfmtLogger(os.Stdout),
EnableCachedDS: true,
StartCronSchedules: []TestNewScheduleFunc{
func(ctx context.Context, ds fleet.Datastore) fleet.NewCronScheduleFunc {
return func() (fleet.CronSchedule, error) {
// We set 24-hour interval so that it only runs when triggered.
var err error
calendarSchedule, err = cron.NewCalendarSchedule(ctx, s.T().Name(), s.ds, 24*time.Hour, log.NewJSONLogger(os.Stdout))
return calendarSchedule, err
}
},
},
}
if os.Getenv("FLEET_INTEGRATION_TESTS_DISABLE_LOG") != "" {
config.Logger = kitlog.NewNopLogger()
@ -76,6 +91,7 @@ func (s *integrationEnterpriseTestSuite) SetupSuite() {
s.users = users
s.token = s.getTestAdminToken()
s.cachedTokens = make(map[string]string)
s.calendarSchedule = calendarSchedule
}
func (s *integrationEnterpriseTestSuite) TearDownTest() {
@ -3605,7 +3621,6 @@ func (s *integrationEnterpriseTestSuite) TestOSVersions() {
"GET", fmt.Sprintf("/api/latest/fleet/os_versions/%d", osinfo.OSVersionID), nil, http.StatusForbidden, &osVersionResp, "team_id",
"99999",
)
}
func (s *integrationEnterpriseTestSuite) TestMDMNotConfiguredEndpoints() {
@ -7336,7 +7351,8 @@ func (s *integrationEnterpriseTestSuite) TestSoftwareAuth() {
Description: "desc team1",
})
require.NoError(t, err)
require.NoError(t, s.ds.AddHostsToTeam(ctx, &team1.ID, []uint{tmHost.ID}))
err = s.ds.AddHostsToTeam(ctx, &team1.ID, []uint{tmHost.ID})
require.NoError(t, err)
team2, err := s.ds.NewTeam(ctx, &fleet.Team{
ID: 43,
Name: "team2",
@ -7653,3 +7669,653 @@ func (s *integrationEnterpriseTestSuite) TestSoftwareAuth() {
// set the admin token again to avoid breaking other tests
s.token = s.getTestAdminToken()
}
func (s *integrationEnterpriseTestSuite) TestCalendarEvents() {
ctx := context.Background()
t := s.T()
t.Cleanup(func() {
calendar.ClearMockEvents()
})
currentAppCfg, err := s.ds.AppConfig(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = s.ds.SaveAppConfig(ctx, currentAppCfg)
require.NoError(t, err)
})
team1, err := s.ds.NewTeam(ctx, &fleet.Team{
Name: "team1",
})
require.NoError(t, err)
team2, err := s.ds.NewTeam(ctx, &fleet.Team{
Name: "team2",
})
require.NoError(t, err)
newHost := func(name string, teamID *uint) *fleet.Host {
h, err := s.ds.NewHost(ctx, &fleet.Host{
DetailUpdatedAt: time.Now(),
LabelUpdatedAt: time.Now(),
PolicyUpdatedAt: time.Now(),
SeenTime: time.Now().Add(-1 * time.Minute),
OsqueryHostID: ptr.String(t.Name() + name),
NodeKey: ptr.String(t.Name() + name),
UUID: uuid.New().String(),
Hostname: fmt.Sprintf("%s.%s.local", name, t.Name()),
Platform: "darwin",
TeamID: teamID,
})
require.NoError(t, err)
return h
}
host1Team1 := newHost("host1", &team1.ID)
host2Team1 := newHost("host2", &team1.ID)
host3Team2 := newHost("host3", &team2.ID)
host4Team2 := newHost("host4", &team2.ID)
_ = newHost("host5", nil) // global host
team1Policy1Calendar, err := s.ds.NewTeamPolicy(
ctx, team1.ID, nil, fleet.PolicyPayload{
Name: "team1Policy1Calendar",
Query: "SELECT 1;",
CalendarEventsEnabled: true,
},
)
require.NoError(t, err)
team1Policy2, err := s.ds.NewTeamPolicy(
ctx, team1.ID, nil, fleet.PolicyPayload{
Name: "team1Policy2",
Query: "SELECT 2;",
CalendarEventsEnabled: true,
},
)
require.NoError(t, err)
team2Policy1Calendar, err := s.ds.NewTeamPolicy(
ctx, team1.ID, nil, fleet.PolicyPayload{
Name: "team2Policy1Calendar",
Query: "SELECT 3;",
CalendarEventsEnabled: true,
},
)
require.NoError(t, err)
team2Policy2, err := s.ds.NewTeamPolicy(
ctx, team1.ID, nil, fleet.PolicyPayload{
Name: "team2Policy2",
Query: "SELECT 4;",
CalendarEventsEnabled: false,
},
)
require.NoError(t, err)
globalPolicy, err := s.ds.NewGlobalPolicy(
ctx, nil, fleet.PolicyPayload{
Name: "globalPolicy",
Query: "SELECT 5;",
CalendarEventsEnabled: false,
},
)
require.NoError(t, err)
genDistributedReqWithPolicyResults := func(host *fleet.Host, policyResults map[uint]*bool) submitDistributedQueryResultsRequestShim {
var (
results = make(map[string]json.RawMessage)
statuses = make(map[string]interface{})
messages = make(map[string]string)
)
for policyID, policyResult := range policyResults {
distributedQueryName := hostPolicyQueryPrefix + fmt.Sprint(policyID)
switch {
case policyResult == nil:
results[distributedQueryName] = json.RawMessage(`[]`)
statuses[distributedQueryName] = 1
messages[distributedQueryName] = "policy failed execution"
case *policyResult:
results[distributedQueryName] = json.RawMessage(`[{"1": "1"}]`)
statuses[distributedQueryName] = 0
case !*policyResult:
results[distributedQueryName] = json.RawMessage(`[]`)
statuses[distributedQueryName] = 0
}
}
return submitDistributedQueryResultsRequestShim{
NodeKey: *host.NodeKey,
Results: results,
Statuses: statuses,
Messages: messages,
Stats: map[string]*fleet.Stats{},
}
}
// host1Team1 is failing a calendar policy and not a non-calendar policy (no results for global).
distributedResp := submitDistributedQueryResultsResponse{}
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host1Team1,
map[uint]*bool{
team1Policy1Calendar.ID: ptr.Bool(false),
team1Policy2.ID: ptr.Bool(true),
globalPolicy.ID: nil,
},
), http.StatusOK, &distributedResp)
// host2Team1 is passing the calendar policy but not the non-calendar policy (no results for global).
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host2Team1,
map[uint]*bool{
team2Policy1Calendar.ID: ptr.Bool(true),
team2Policy2.ID: ptr.Bool(false),
globalPolicy.ID: nil,
},
), http.StatusOK, &distributedResp)
// host3Team2 is passing team2Policy1Calendar and failing the global policy
// (not results for team2Policy2).
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host3Team2,
map[uint]*bool{
team2Policy1Calendar.ID: ptr.Bool(true),
team2Policy2.ID: nil,
globalPolicy.ID: ptr.Bool(false),
},
), http.StatusOK, &distributedResp)
// host4Team2 is not returning results for the calendar policy, failing the non-calendar
// policy and passing the global policy.
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host4Team2,
map[uint]*bool{
team2Policy1Calendar.ID: nil,
team2Policy2.ID: ptr.Bool(false),
globalPolicy.ID: ptr.Bool(true),
},
), http.StatusOK, &distributedResp)
// Trigger the calendar cron with the global feature is disabled.
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
// No calendar events were created.
allCalendarEvents, err := s.ds.ListCalendarEvents(ctx, nil)
require.NoError(t, err)
require.Empty(t, allCalendarEvents)
// Set global configuration for the calendar feature.
appCfg, err := s.ds.AppConfig(ctx)
require.NoError(t, err)
appCfg.Integrations.GoogleCalendar = []*fleet.GoogleCalendarIntegration{
{
Domain: "example.com",
ApiKey: map[string]string{
fleet.GoogleCalendarEmail: "calendar-mock@example.com",
},
},
}
err = s.ds.SaveAppConfig(ctx, appCfg)
require.NoError(t, err)
time.Sleep(2 * time.Second) // Wait 2 seconds for the app config cache to clear.
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
// No calendar events were created because we are missing enabling it on the teams.
allCalendarEvents, err = s.ds.ListCalendarEvents(ctx, nil)
require.NoError(t, err)
require.Empty(t, allCalendarEvents)
// Run distributed/write for host4Team2 again, it should not attempt to trigger the webhook because
// it's disabled for the teams.
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host4Team2,
map[uint]*bool{
team2Policy1Calendar.ID: nil,
team2Policy2.ID: ptr.Bool(false),
globalPolicy.ID: ptr.Bool(true),
},
), http.StatusOK, &distributedResp)
var (
team1Fired int
team1FiredMu sync.Mutex
)
team1WebhookFired := make(chan struct{})
team1WebhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "POST", r.Method)
requestBodyBytes, err := io.ReadAll(r.Body)
require.NoError(t, err)
t.Logf("team1 webhook request: %s\n", requestBodyBytes)
team1FiredMu.Lock()
team1Fired++
team1WebhookFired <- struct{}{}
team1FiredMu.Unlock()
}))
t.Cleanup(func() {
team1WebhookServer.Close()
})
team1.Config.Integrations.GoogleCalendar = &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: team1WebhookServer.URL,
}
team1, err = s.ds.SaveTeam(ctx, team1)
require.NoError(t, err)
var (
team2Fired int
team2FiredMu sync.Mutex
)
team2WebhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "POST", r.Method)
requestBodyBytes, err := io.ReadAll(r.Body)
require.NoError(t, err)
t.Logf("team2 webhook request: %s\n", requestBodyBytes)
team2FiredMu.Lock()
team2Fired++
team2FiredMu.Unlock()
}))
t.Cleanup(func() {
team2WebhookServer.Close()
})
team2.Config.Integrations.GoogleCalendar = &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: team2WebhookServer.URL,
}
team2, err = s.ds.SaveTeam(ctx, team2)
require.NoError(t, err)
//
// Same distributed/write as before but they should not fire yet.
//
// host1Team1 is failing a calendar policy and not a non-calendar policy (no results for global).
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host1Team1,
map[uint]*bool{
team1Policy1Calendar.ID: ptr.Bool(false),
team1Policy2.ID: ptr.Bool(true),
globalPolicy.ID: nil,
},
), http.StatusOK, &distributedResp)
// host2Team1 is passing the calendar policy but not the non-calendar policy (no results for global).
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host2Team1,
map[uint]*bool{
team2Policy1Calendar.ID: ptr.Bool(true),
team2Policy2.ID: ptr.Bool(false),
globalPolicy.ID: nil,
},
), http.StatusOK, &distributedResp)
// host3Team2 is passing team2Policy1Calendar and failing the global policy
// (not results for team2Policy2).
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host3Team2,
map[uint]*bool{
team2Policy1Calendar.ID: ptr.Bool(true),
team2Policy2.ID: nil,
globalPolicy.ID: ptr.Bool(false),
},
), http.StatusOK, &distributedResp)
// host4Team2 is not returning results for the calendar policy, failing the non-calendar
// policy and passing the global policy.
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host4Team2,
map[uint]*bool{
team2Policy1Calendar.ID: nil,
team2Policy2.ID: ptr.Bool(false),
globalPolicy.ID: ptr.Bool(true),
},
), http.StatusOK, &distributedResp)
team1FiredMu.Lock()
require.Zero(t, team1Fired)
team1FiredMu.Unlock()
team2FiredMu.Lock()
require.Zero(t, team2Fired)
team2FiredMu.Unlock()
// Trigger the calendar cron, global feature enabled, team1 enabled, team2 not yet enabled
// and hosts do not have an associated email yet.
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
team1CalendarEvents, err := s.ds.ListCalendarEvents(ctx, &team1.ID)
require.NoError(t, err)
require.Empty(t, team1CalendarEvents)
// Add an email but of another domain.
err = s.ds.ReplaceHostDeviceMapping(ctx, host1Team1.ID, []*fleet.HostDeviceMapping{
{
HostID: host1Team1.ID,
Email: "user@other.com",
Source: "google_chrome_profiles",
},
}, "google_chrome_profiles")
require.NoError(t, err)
// Trigger the calendar cron, global feature enabled, team1 enabled, team2 not yet enabled
// and hosts do not have an associated email for the domain yet.
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID)
require.NoError(t, err)
require.Empty(t, team1CalendarEvents)
err = s.ds.ReplaceHostDeviceMapping(ctx, host1Team1.ID, []*fleet.HostDeviceMapping{
{
HostID: host1Team1.ID,
Email: "user1@example.com",
Source: "google_chrome_profiles",
},
}, "google_chrome_profiles")
require.NoError(t, err)
// Trigger the calendar cron, global feature enabled, team1 enabled, team2 not yet enabled
// and host1Team1 has a domain email associated.
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
// An event should be generated for host1Team1
team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID)
require.NoError(t, err)
require.Len(t, team1CalendarEvents, 1)
require.NotZero(t, team1CalendarEvents[0].ID)
require.Equal(t, "user1@example.com", team1CalendarEvents[0].Email)
require.NotZero(t, team1CalendarEvents[0].StartTime)
require.NotZero(t, team1CalendarEvents[0].EndTime)
calendar.SetMockEventsToNow()
mysql.ExecAdhocSQL(t, s.ds, func(db sqlx.ExtContext) error {
// Update updated_at so the event gets updated (the event is updated every 30 minutes)
_, err := db.ExecContext(ctx,
`UPDATE calendar_events SET updated_at = DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 1 HOUR) WHERE id = ?`, team1CalendarEvents[0].ID)
if err != nil {
return err
}
// Set host1Team1 as online.
if _, err := db.ExecContext(ctx,
`UPDATE host_seen_times SET seen_time = CURRENT_TIMESTAMP WHERE host_id = ?`, host1Team1.ID); err != nil {
return err
}
return nil
})
// Trigger the calendar cron, global feature enabled, team1 enabled, team2 not yet enabled
// and host1Team1 has a domain email associated.
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
// Check that refetch on the host was set.
host, err := s.ds.Host(ctx, host1Team1.ID)
require.NoError(t, err)
require.True(t, host.RefetchRequested)
// host1Team1 is failing a calendar policy and not a non-calendar policy (no results for global).
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host1Team1,
map[uint]*bool{
team1Policy1Calendar.ID: ptr.Bool(false),
team1Policy2.ID: ptr.Bool(true),
globalPolicy.ID: nil,
},
), http.StatusOK, &distributedResp)
// host2Team1 is passing the calendar policy but not the non-calendar policy (no results for global).
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host2Team1,
map[uint]*bool{
team2Policy1Calendar.ID: ptr.Bool(true),
team2Policy2.ID: ptr.Bool(false),
globalPolicy.ID: nil,
},
), http.StatusOK, &distributedResp)
select {
case <-team1WebhookFired:
case <-time.After(5 * time.Second):
t.Error("timeout waiting for team1 webhook to fire")
}
// Trigger again, nothing should fire as webhook has already fired.
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
team1FiredMu.Lock()
require.Equal(t, 1, team1Fired)
team1FiredMu.Unlock()
team2FiredMu.Lock()
require.Equal(t, 0, team2Fired)
team2FiredMu.Unlock()
// Make host1Team1 pass all policies.
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host1Team1,
map[uint]*bool{
team1Policy1Calendar.ID: ptr.Bool(true),
team1Policy2.ID: ptr.Bool(true),
globalPolicy.ID: nil,
},
), http.StatusOK, &distributedResp)
// Trigger calendar should cleanup the events.
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
// Events in the user calendar should not be cleaned up because they are not in the future.
mockEvents := calendar.ListGoogleMockEvents()
require.NotEmpty(t, mockEvents)
// Event should be cleaned up from our database.
team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID)
require.NoError(t, err)
require.Empty(t, team1CalendarEvents)
}
func (s *integrationEnterpriseTestSuite) TestCalendarEventsTransferringHosts() {
ctx := context.Background()
t := s.T()
t.Cleanup(func() {
calendar.ClearMockEvents()
})
currentAppCfg, err := s.ds.AppConfig(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = s.ds.SaveAppConfig(ctx, currentAppCfg)
require.NoError(t, err)
})
// Set global configuration for the calendar feature.
appCfg, err := s.ds.AppConfig(ctx)
require.NoError(t, err)
appCfg.Integrations.GoogleCalendar = []*fleet.GoogleCalendarIntegration{
{
Domain: "example.com",
ApiKey: map[string]string{
fleet.GoogleCalendarEmail: "calendar-mock@example.com",
},
},
}
err = s.ds.SaveAppConfig(ctx, appCfg)
require.NoError(t, err)
time.Sleep(2 * time.Second) // Wait 2 seconds for the app config cache to clear.
team1, err := s.ds.NewTeam(ctx, &fleet.Team{
Name: "team1",
})
require.NoError(t, err)
team2, err := s.ds.NewTeam(ctx, &fleet.Team{
Name: "team2",
})
require.NoError(t, err)
team1.Config.Integrations.GoogleCalendar = &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: "https://foo.example.com",
}
team1, err = s.ds.SaveTeam(ctx, team1)
require.NoError(t, err)
team2.Config.Integrations.GoogleCalendar = &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: "https://foo.example.com",
}
team2, err = s.ds.SaveTeam(ctx, team2)
require.NoError(t, err)
newHost := func(name string, teamID *uint) *fleet.Host {
h, err := s.ds.NewHost(ctx, &fleet.Host{
DetailUpdatedAt: time.Now(),
LabelUpdatedAt: time.Now(),
PolicyUpdatedAt: time.Now(),
SeenTime: time.Now().Add(-1 * time.Minute),
OsqueryHostID: ptr.String(t.Name() + name),
NodeKey: ptr.String(t.Name() + name),
UUID: uuid.New().String(),
Hostname: fmt.Sprintf("%s.%s.local", name, t.Name()),
Platform: "darwin",
TeamID: teamID,
})
require.NoError(t, err)
return h
}
host1 := newHost("host1", &team1.ID)
err = s.ds.ReplaceHostDeviceMapping(ctx, host1.ID, []*fleet.HostDeviceMapping{
{
HostID: host1.ID,
Email: "user1@example.com",
Source: "google_chrome_profiles",
},
}, "google_chrome_profiles")
require.NoError(t, err)
team1Policy1, err := s.ds.NewTeamPolicy(
ctx, team1.ID, nil, fleet.PolicyPayload{
Name: "team1Policy1",
Query: "SELECT 1;",
CalendarEventsEnabled: true,
},
)
require.NoError(t, err)
team2Policy1, err := s.ds.NewTeamPolicy(
ctx, team2.ID, nil, fleet.PolicyPayload{
Name: "team2Policy1",
Query: "SELECT 2;",
CalendarEventsEnabled: true,
},
)
require.NoError(t, err)
distributedResp := submitDistributedQueryResultsResponse{}
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host1,
map[uint]*bool{
team1Policy1.ID: ptr.Bool(false),
},
), http.StatusOK, &distributedResp)
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
team1CalendarEvents, err := s.ds.ListCalendarEvents(ctx, &team1.ID)
require.NoError(t, err)
require.Len(t, team1CalendarEvents, 1)
// Check the calendar was created on the DB.
hostCalendarEvent, calendarEvent, err := s.ds.GetHostCalendarEventByEmail(ctx, "user1@example.com")
require.NoError(t, err)
// Transfer host to team2.
err = s.ds.AddHostsToTeam(ctx, &team2.ID, []uint{host1.ID})
require.NoError(t, err)
// host1 is failing team2's policy too.
s.DoJSON("POST", "/api/osquery/distributed/write", genDistributedReqWithPolicyResults(
host1,
map[uint]*bool{
team2Policy1.ID: ptr.Bool(false),
},
), http.StatusOK, &distributedResp)
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
// Check the calendar event entry was reused.
hostCalendarEvent2, calendarEvent2, err := s.ds.GetHostCalendarEventByEmail(ctx, "user1@example.com")
require.NoError(t, err)
require.Equal(t, calendarEvent2.ID, calendarEvent.ID)
require.Equal(t, hostCalendarEvent2.CalendarEventID, hostCalendarEvent.CalendarEventID)
// Transfer host to global.
err = s.ds.AddHostsToTeam(ctx, nil, []uint{host1.ID})
require.NoError(t, err)
// Move event to two days ago (to clean up the calendar event)
mysql.ExecAdhocSQL(t, s.ds, func(db sqlx.ExtContext) error {
_, err := db.ExecContext(ctx,
`UPDATE calendar_events SET updated_at = DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 49 HOUR) WHERE id = ?`, team1CalendarEvents[0].ID)
if err != nil {
return err
}
return nil
})
triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second)
// Calendar event is cleaned up.
_, _, err = s.ds.GetHostCalendarEventByEmail(ctx, "user1@example.com")
require.True(t, fleet.IsNotFound(err))
}
func genDistributedReqWithPolicyResults(host *fleet.Host, policyResults map[uint]*bool) submitDistributedQueryResultsRequestShim {
var (
results = make(map[string]json.RawMessage)
statuses = make(map[string]interface{})
messages = make(map[string]string)
)
for policyID, policyResult := range policyResults {
distributedQueryName := hostPolicyQueryPrefix + fmt.Sprint(policyID)
switch {
case policyResult == nil:
results[distributedQueryName] = json.RawMessage(`[]`)
statuses[distributedQueryName] = 1
messages[distributedQueryName] = "policy failed execution"
case *policyResult:
results[distributedQueryName] = json.RawMessage(`[{"1": "1"}]`)
statuses[distributedQueryName] = 0
case !*policyResult:
results[distributedQueryName] = json.RawMessage(`[]`)
statuses[distributedQueryName] = 0
}
}
return submitDistributedQueryResultsRequestShim{
NodeKey: *host.NodeKey,
Results: results,
Statuses: statuses,
Messages: messages,
Stats: map[string]*fleet.Stats{},
}
}
func triggerAndWait(ctx context.Context, t *testing.T, ds fleet.Datastore, s *schedule.Schedule, timeout time.Duration) {
// Following code assumes (for simplicity) only triggered runs.
stats, err := ds.GetLatestCronStats(ctx, s.Name())
require.NoError(t, err)
var previousRunID int
if len(stats) > 0 {
previousRunID = stats[0].ID
}
_, err = s.Trigger()
require.NoError(t, err)
timeoutCh := time.After(timeout)
for {
stats, err := ds.GetLatestCronStats(ctx, s.Name())
require.NoError(t, err)
if len(stats) > 0 && stats[0].ID > previousRunID && stats[0].Status == fleet.CronStatsStatusCompleted {
t.Logf("cron %s:%d done", s.Name(), stats[0].ID)
return
}
select {
case <-timeoutCh:
t.Fatalf("timeout waiting for schedule %s to complete", s.Name())
case <-time.After(250 * time.Millisecond):
}
}
}

View file

@ -162,7 +162,7 @@ func New(
//
// All jobs must be added before calling Start.
func (s *Schedule) Start() {
prevScheduledRun, _, err := s.getLatestStats()
prevScheduledRun, _, err := s.GetLatestStats()
if err != nil {
level.Error(s.logger).Log("err", "start schedule", "details", err)
ctxerr.Handle(s.ctx, err)
@ -203,7 +203,7 @@ func (s *Schedule) Start() {
s.runWithStats(fleet.CronStatsTypeTriggered)
prevScheduledRun, _, err := s.getLatestStats()
prevScheduledRun, _, err := s.GetLatestStats()
if err != nil {
level.Error(s.logger).Log("err", "trigger get cron stats", "details", err)
ctxerr.Handle(s.ctx, err)
@ -235,7 +235,7 @@ func (s *Schedule) Start() {
schedInterval := s.getSchedInterval()
prevScheduledRun, prevTriggeredRun, err := s.getLatestStats()
prevScheduledRun, prevTriggeredRun, err := s.GetLatestStats()
if err != nil {
level.Error(s.logger).Log("err", "get cron stats", "details", err)
ctxerr.Handle(s.ctx, err)
@ -374,7 +374,7 @@ func (s *Schedule) Start() {
// is blocked or otherwise unavailable to publish the signal. From the caller's perspective, both
// cases are deemed to be equivalent.
func (s *Schedule) Trigger() (*fleet.CronStats, error) {
sched, trig, err := s.getLatestStats()
sched, trig, err := s.GetLatestStats()
switch {
case err != nil:
return nil, err
@ -549,7 +549,7 @@ func (s *Schedule) holdLock() (bool, context.CancelFunc) {
return true, cancelFn
}
func (s *Schedule) getLatestStats() (fleet.CronStats, fleet.CronStats, error) {
func (s *Schedule) GetLatestStats() (fleet.CronStats, fleet.CronStats, error) {
var scheduled, triggered fleet.CronStats
cs, err := s.statsStore.GetLatestCronStats(s.ctx, s.name)

View file

@ -95,6 +95,12 @@ func (ts *withServer) TearDownSuite() {
}
func (ts *withServer) commonTearDownTest(t *testing.T) {
// By setting DISABLE_TABLES_CLEANUP a developer can troubleshoot tests
// by inspecting mysql tables.
if os.Getenv("DISABLE_CLEANUP_TABLES") != "" {
return
}
ctx := context.Background()
u := ts.users["admin1@example.com"]