diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index 802dd07694..3c931cf5d6 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -235,6 +235,20 @@ the way that the Fleet server works. } }() + // Flush seen hosts every second + go func() { + ticker := time.NewTicker(1 * time.Second) + for { + if err := svc.FlushSeenHosts(context.Background()); err != nil { + level.Info(logger).Log( + "err", err, + "msg", "failed to update host seen times", + ) + } + <-ticker.C + } + }() + fieldKeys := []string{"method", "error"} requestCount := kitprometheus.NewCounterFrom(prometheus.CounterOpts{ Namespace: "api", diff --git a/docker-compose.yml b/docker-compose.yml index b3fe56ed2a..06ef252afe 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: image: mysql:5.7 volumes: - mysql-persistent-volume:/tmp - command: mysqld --datadir=/tmp/mysqldata --slow_query_log=1 --log_output=TABLE --log-queries-not-using-indexes --event-scheduler=ON + command: mysqld --datadir=/tmp/mysqldata --event-scheduler=ON environment: &mysql-default-environment MYSQL_ROOT_PASSWORD: toor MYSQL_DATABASE: fleet diff --git a/server/datastore/datastore.go b/server/datastore/datastore.go index f8f7b70ec2..0702b4ea50 100644 --- a/server/datastore/datastore.go +++ b/server/datastore/datastore.go @@ -68,6 +68,7 @@ var TestFunctions = [...]func(*testing.T, kolide.Datastore){ testAddLabelToPackTwice, testGenerateHostStatusStatistics, testMarkHostSeen, + testMarkHostsSeen, testCleanupIncomingHosts, testDuplicateNewQuery, testChangeEmail, diff --git a/server/datastore/datastore_hosts.go b/server/datastore/datastore_hosts.go index c2415b8863..7d3c8ff5b8 100644 --- a/server/datastore/datastore_hosts.go +++ b/server/datastore/datastore_hosts.go @@ -564,6 +564,67 @@ func testMarkHostSeen(t *testing.T, ds kolide.Datastore) { } } +func testMarkHostsSeen(t *testing.T, ds kolide.Datastore) { + mockClock := clock.NewMockClock() + + aSecondAgo := mockClock.Now().Add(-1 * time.Second).UTC() + anHourAgo := mockClock.Now().Add(-1 * time.Hour).UTC() + aDayAgo := mockClock.Now().Add(-24 * time.Hour).UTC() + + h1, err := ds.NewHost(&kolide.Host{ + ID: 1, + OsqueryHostID: "1", + UUID: "1", + NodeKey: "1", + DetailUpdateTime: aDayAgo, + LabelUpdateTime: aDayAgo, + SeenTime: aDayAgo, + }) + require.Nil(t, err) + + h2, err := ds.NewHost(&kolide.Host{ + ID: 2, + OsqueryHostID: "2", + UUID: "2", + NodeKey: "2", + DetailUpdateTime: aDayAgo, + LabelUpdateTime: aDayAgo, + SeenTime: aDayAgo, + }) + require.Nil(t, err) + + err = ds.MarkHostsSeen([]uint{h1.ID}, anHourAgo) + assert.Nil(t, err) + + { + h1Verify, err := ds.Host(h1.ID) + assert.Nil(t, err) + require.NotNil(t, h1Verify) + assert.WithinDuration(t, anHourAgo, h1Verify.SeenTime, time.Second) + + h2Verify, err := ds.Host(h2.ID) + assert.Nil(t, err) + require.NotNil(t, h2Verify) + assert.WithinDuration(t, aDayAgo, h2Verify.SeenTime, time.Second) + } + + err = ds.MarkHostsSeen([]uint{h1.ID, h2.ID}, aSecondAgo) + assert.Nil(t, err) + + { + h1Verify, err := ds.Host(h1.ID) + assert.Nil(t, err) + require.NotNil(t, h1Verify) + assert.WithinDuration(t, aSecondAgo, h1Verify.SeenTime, time.Second) + + h2Verify, err := ds.Host(h2.ID) + assert.Nil(t, err) + require.NotNil(t, h2Verify) + assert.WithinDuration(t, aSecondAgo, h2Verify.SeenTime, time.Second) + } + +} + func testCleanupIncomingHosts(t *testing.T, ds kolide.Datastore) { mockClock := clock.NewMockClock() diff --git a/server/datastore/mysql/hosts.go b/server/datastore/mysql/hosts.go index 9a4b877896..e23ee4cb79 100644 --- a/server/datastore/mysql/hosts.go +++ b/server/datastore/mysql/hosts.go @@ -442,6 +442,34 @@ func (d *Datastore) MarkHostSeen(host *kolide.Host, t time.Time) error { return nil } +func (d *Datastore) MarkHostsSeen(hostIDs []uint, t time.Time) error { + if len(hostIDs) == 0 { + return nil + } + + if err := d.withRetryTxx(func(tx *sqlx.Tx) error { + query := ` + UPDATE hosts SET + seen_time = ? + WHERE id IN (?) + ` + query, args, err := sqlx.In(query, t, hostIDs) + if err != nil { + return errors.Wrap(err, "sqlx in") + } + query = d.db.Rebind(query) + if _, err := d.db.Exec(query, args...); err != nil { + return errors.Wrap(err, "exec update") + } + + return nil + }); err != nil { + return errors.Wrap(err, "MarkHostsSeen transaction") + } + + return nil +} + func (d *Datastore) searchHostsWithOmits(query string, omit ...uint) ([]*kolide.Host, error) { hostQuery := transformQuery(query) ipQuery := `"` + query + `"` diff --git a/server/kolide/hosts.go b/server/kolide/hosts.go index 1934fa504a..7bdc2ebad2 100644 --- a/server/kolide/hosts.go +++ b/server/kolide/hosts.go @@ -54,6 +54,7 @@ type HostStore interface { // endpoints. AuthenticateHost(nodeKey string) (*Host, error) MarkHostSeen(host *Host, t time.Time) error + MarkHostsSeen(hostIDs []uint, t time.Time) error SearchHosts(query string, omit ...uint) ([]*Host, error) // CleanupIncomingHosts deletes hosts that have enrolled but never // updated their status details. This clears dead "incoming hosts" that @@ -83,6 +84,8 @@ type HostService interface { // Possible matches can be on osquery_host_identifier, node_key, UUID, or // hostname. HostByIdentifier(ctx context.Context, identifier string) (*HostDetail, error) + + FlushSeenHosts(ctx context.Context) error } type HostListOptions struct { diff --git a/server/mock/datastore_hosts.go b/server/mock/datastore_hosts.go index 8b20c5687a..3c76777d15 100644 --- a/server/mock/datastore_hosts.go +++ b/server/mock/datastore_hosts.go @@ -28,6 +28,8 @@ type AuthenticateHostFunc func(nodeKey string) (*kolide.Host, error) type MarkHostSeenFunc func(host *kolide.Host, t time.Time) error +type MarkHostsSeenFunc func(hostIDs []uint, t time.Time) error + type CleanupIncomingHostsFunc func(t time.Time) error type SearchHostsFunc func(query string, omit ...uint) ([]*kolide.Host, error) @@ -66,6 +68,9 @@ type HostStore struct { MarkHostSeenFunc MarkHostSeenFunc MarkHostSeenFuncInvoked bool + MarkHostsSeenFunc MarkHostsSeenFunc + MarkHostsSeenFuncInvoked bool + CleanupIncomingHostsFunc CleanupIncomingHostsFunc CleanupIncomingHostsFuncInvoked bool @@ -127,6 +132,11 @@ func (s *HostStore) MarkHostSeen(host *kolide.Host, t time.Time) error { return s.MarkHostSeenFunc(host, t) } +func (s *HostStore) MarkHostsSeen(hostIDs []uint, t time.Time) error { + s.MarkHostsSeenFuncInvoked = true + return s.MarkHostsSeenFunc(hostIDs, t) +} + func (s *HostStore) CleanupIncomingHosts(t time.Time) error { s.CleanupIncomingHostsFuncInvoked = true return s.CleanupIncomingHostsFunc(t) diff --git a/server/service/service.go b/server/service/service.go index eff796debf..c6fcf01fea 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -6,6 +6,7 @@ import ( "html/template" "net/http" "strings" + "sync" "time" "github.com/WatchBeam/clock" @@ -29,7 +30,7 @@ func NewService(ds kolide.Datastore, resultStore kolide.QueryResultStore, return nil, errors.Wrap(err, "initializing osquery logging") } - svc = service{ + svc = &service{ ds: ds, carveStore: carveStore, resultStore: resultStore, @@ -40,6 +41,7 @@ func NewService(ds kolide.Datastore, resultStore kolide.QueryResultStore, osqueryLogWriter: osqueryLogger, mailService: mailService, ssoSessionStore: sso, + seenHostSet: newSeenHostSet(), metaDataClient: &http.Client{ Timeout: 5 * time.Second, }, @@ -62,6 +64,8 @@ type service struct { mailService kolide.MailService ssoSessionStore sso.SessionStore metaDataClient *http.Client + + seenHostSet *seenHostSet } func (s service) SendEmail(mail kolide.Email) error { @@ -90,3 +94,36 @@ func getAssetURL() template.URL { return template.URL("https://github.com/fleetdm/fleet/blob/" + tag) } + +// seenHostSet implements synchronized storage for the set of seen hosts. +type seenHostSet struct { + mutex sync.Mutex + hostIDs map[uint]bool +} + +func newSeenHostSet() *seenHostSet { + return &seenHostSet{ + mutex: sync.Mutex{}, + hostIDs: make(map[uint]bool), + } +} + +// addHostID adds the host identified by ID to the set +func (m *seenHostSet) addHostID(id uint) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.hostIDs[id] = true +} + +// getAndClearHostIDs gets the list of unique host IDs from the set and empties +// the set. +func (m *seenHostSet) getAndClearHostIDs() []uint { + m.mutex.Lock() + defer m.mutex.Unlock() + var ids []uint + for id, _ := range m.hostIDs { + ids = append(ids, id) + } + m.hostIDs = make(map[uint]bool) + return ids +} diff --git a/server/service/service_hosts.go b/server/service/service_hosts.go index e0ecc8dd39..57b9658cd0 100644 --- a/server/service/service_hosts.go +++ b/server/service/service_hosts.go @@ -66,3 +66,8 @@ func (svc service) GetHostSummary(ctx context.Context) (*kolide.HostSummary, err func (svc service) DeleteHost(ctx context.Context, id uint) error { return svc.ds.DeleteHost(id) } + +func (svc *service) FlushSeenHosts(ctx context.Context) error { + hostIDs := svc.seenHostSet.getAndClearHostIDs() + return svc.ds.MarkHostsSeen(hostIDs, svc.clock.Now()) +} diff --git a/server/service/service_invites_test.go b/server/service/service_invites_test.go index 087438321f..66c9689268 100644 --- a/server/service/service_invites_test.go +++ b/server/service/service_invites_test.go @@ -89,7 +89,7 @@ func setupInviteTest(t *testing.T) (kolide.Service, *mock.Store, *mockMailServic KolideServerURL: "https://acme.co", }) mailer := &mockMailService{SendEmailFn: func(e kolide.Email) error { return nil }} - svc := validationMiddleware{service{ + svc := validationMiddleware{&service{ ds: ms, config: config.TestConfig(), mailService: mailer, diff --git a/server/service/service_osquery.go b/server/service/service_osquery.go index 57d7f19683..c9cd4e4163 100644 --- a/server/service/service_osquery.go +++ b/server/service/service_osquery.go @@ -64,11 +64,14 @@ func (svc service) AuthenticateHost(ctx context.Context, nodeKey string) (*kolid } } - // Update the "seen" time used to calculate online status - err = svc.ds.MarkHostSeen(host, svc.clock.Now()) - if err != nil { - return nil, osqueryError{message: "failed to mark host seen: " + err.Error()} - } + // Update the "seen" time used to calculate online status. These updates are + // batched for MySQL performance reasons. Because this is done + // asynchronously, it is possible for the server to shut down before + // updating the seen time for these hosts. This seems to be an acceptable + // tradeoff as an online host will continue to check in and quickly be + // marked online again. + svc.seenHostSet.addHostID(host.ID) + host.SeenTime = svc.clock.Now() return host, nil } diff --git a/server/service/service_osquery_test.go b/server/service/service_osquery_test.go index e3323fe253..185afc05e4 100644 --- a/server/service/service_osquery_test.go +++ b/server/service/service_osquery_test.go @@ -114,23 +114,46 @@ func TestEnrollAgentDetails(t *testing.T) { func TestAuthenticateHost(t *testing.T) { ds := new(mock.Store) svc, err := newTestService(ds, nil, nil) - require.Nil(t, err) + require.NoError(t, err) var gotKey string - host := kolide.Host{HostName: "foobar"} + host := kolide.Host{ID: 1, HostName: "foobar"} ds.AuthenticateHostFunc = func(key string) (*kolide.Host, error) { gotKey = key return &host, nil } - ds.MarkHostSeenFunc = func(host *kolide.Host, t time.Time) error { + var gotHostIDs []uint + ds.MarkHostsSeenFunc = func(hostIDs []uint, t time.Time) error { + gotHostIDs = hostIDs return nil } - h, err := svc.AuthenticateHost(context.Background(), "test") + _, err = svc.AuthenticateHost(context.Background(), "test") require.Nil(t, err) assert.Equal(t, "test", gotKey) - assert.True(t, ds.MarkHostSeenFuncInvoked) - assert.Equal(t, host, *h) + assert.False(t, ds.MarkHostsSeenFuncInvoked) + + host = kolide.Host{ID: 7, HostName: "foobar"} + _, err = svc.AuthenticateHost(context.Background(), "floobar") + require.Nil(t, err) + assert.Equal(t, "floobar", gotKey) + assert.False(t, ds.MarkHostsSeenFuncInvoked) + // Host checks in twice + host = kolide.Host{ID: 7, HostName: "foobar"} + _, err = svc.AuthenticateHost(context.Background(), "floobar") + require.Nil(t, err) + assert.Equal(t, "floobar", gotKey) + assert.False(t, ds.MarkHostsSeenFuncInvoked) + + err = svc.FlushSeenHosts(context.Background()) + require.NoError(t, err) + assert.True(t, ds.MarkHostsSeenFuncInvoked) + assert.ElementsMatch(t, []uint{1, 7}, gotHostIDs) + + err = svc.FlushSeenHosts(context.Background()) + require.NoError(t, err) + assert.True(t, ds.MarkHostsSeenFuncInvoked) + assert.Len(t, gotHostIDs, 0) } func TestAuthenticateHostFailure(t *testing.T) { @@ -161,7 +184,7 @@ func TestSubmitStatusLogs(t *testing.T) { require.Nil(t, err) // Hack to get at the service internals and modify the writer - serv := ((svc.(validationMiddleware)).Service).(service) + serv := ((svc.(validationMiddleware)).Service).(*service) testLogger := &testJSONLogger{} serv.osqueryLogWriter = &logging.OsqueryLogger{Status: testLogger} @@ -190,7 +213,7 @@ func TestSubmitResultLogs(t *testing.T) { require.Nil(t, err) // Hack to get at the service internals and modify the writer - serv := ((svc.(validationMiddleware)).Service).(service) + serv := ((svc.(validationMiddleware)).Service).(*service) testLogger := &testJSONLogger{} serv.osqueryLogWriter = &logging.OsqueryLogger{Result: testLogger} @@ -1527,15 +1550,18 @@ func TestAuthenticationErrors(t *testing.T) { } svc, err := newTestService(ms, nil, nil) - require.Nil(t, err) + require.NoError(t, err) ctx := context.Background() _, err = svc.AuthenticateHost(ctx, "") - require.NotNil(t, err) + require.Error(t, err) require.True(t, err.(osqueryError).NodeInvalid()) + ms.AuthenticateHostFunc = func(nodeKey string) (*kolide.Host, error) { + return &kolide.Host{ID: 1}, nil + } _, err = svc.AuthenticateHost(ctx, "foo") - require.Nil(t, err) + require.NoError(t, err) // return not found error ms.AuthenticateHostFunc = func(nodeKey string) (*kolide.Host, error) { @@ -1543,7 +1569,7 @@ func TestAuthenticationErrors(t *testing.T) { } _, err = svc.AuthenticateHost(ctx, "foo") - require.NotNil(t, err) + require.Error(t, err) require.True(t, err.(osqueryError).NodeInvalid()) // return other error