Batch updates to host seen time (#633)

Instead of synchronously updating the seen_time column for a host on an update, batch these updates to be written together every 1 second.

This results in a ~33% reduction in MySQL CPU usage in a local test with 4,000 simulated hosts and MySQL running in Docker.
This commit is contained in:
Zach Wasserman 2021-04-12 16:22:22 -07:00 committed by GitHub
parent 1d1b26ee89
commit e961cfe0c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 208 additions and 20 deletions

View file

@ -235,6 +235,20 @@ the way that the Fleet server works.
}
}()
// Flush seen hosts every second
go func() {
ticker := time.NewTicker(1 * time.Second)
for {
if err := svc.FlushSeenHosts(context.Background()); err != nil {
level.Info(logger).Log(
"err", err,
"msg", "failed to update host seen times",
)
}
<-ticker.C
}
}()
fieldKeys := []string{"method", "error"}
requestCount := kitprometheus.NewCounterFrom(prometheus.CounterOpts{
Namespace: "api",

View file

@ -5,7 +5,7 @@ services:
image: mysql:5.7
volumes:
- mysql-persistent-volume:/tmp
command: mysqld --datadir=/tmp/mysqldata --slow_query_log=1 --log_output=TABLE --log-queries-not-using-indexes --event-scheduler=ON
command: mysqld --datadir=/tmp/mysqldata --event-scheduler=ON
environment: &mysql-default-environment
MYSQL_ROOT_PASSWORD: toor
MYSQL_DATABASE: fleet

View file

@ -68,6 +68,7 @@ var TestFunctions = [...]func(*testing.T, kolide.Datastore){
testAddLabelToPackTwice,
testGenerateHostStatusStatistics,
testMarkHostSeen,
testMarkHostsSeen,
testCleanupIncomingHosts,
testDuplicateNewQuery,
testChangeEmail,

View file

@ -564,6 +564,67 @@ func testMarkHostSeen(t *testing.T, ds kolide.Datastore) {
}
}
func testMarkHostsSeen(t *testing.T, ds kolide.Datastore) {
mockClock := clock.NewMockClock()
aSecondAgo := mockClock.Now().Add(-1 * time.Second).UTC()
anHourAgo := mockClock.Now().Add(-1 * time.Hour).UTC()
aDayAgo := mockClock.Now().Add(-24 * time.Hour).UTC()
h1, err := ds.NewHost(&kolide.Host{
ID: 1,
OsqueryHostID: "1",
UUID: "1",
NodeKey: "1",
DetailUpdateTime: aDayAgo,
LabelUpdateTime: aDayAgo,
SeenTime: aDayAgo,
})
require.Nil(t, err)
h2, err := ds.NewHost(&kolide.Host{
ID: 2,
OsqueryHostID: "2",
UUID: "2",
NodeKey: "2",
DetailUpdateTime: aDayAgo,
LabelUpdateTime: aDayAgo,
SeenTime: aDayAgo,
})
require.Nil(t, err)
err = ds.MarkHostsSeen([]uint{h1.ID}, anHourAgo)
assert.Nil(t, err)
{
h1Verify, err := ds.Host(h1.ID)
assert.Nil(t, err)
require.NotNil(t, h1Verify)
assert.WithinDuration(t, anHourAgo, h1Verify.SeenTime, time.Second)
h2Verify, err := ds.Host(h2.ID)
assert.Nil(t, err)
require.NotNil(t, h2Verify)
assert.WithinDuration(t, aDayAgo, h2Verify.SeenTime, time.Second)
}
err = ds.MarkHostsSeen([]uint{h1.ID, h2.ID}, aSecondAgo)
assert.Nil(t, err)
{
h1Verify, err := ds.Host(h1.ID)
assert.Nil(t, err)
require.NotNil(t, h1Verify)
assert.WithinDuration(t, aSecondAgo, h1Verify.SeenTime, time.Second)
h2Verify, err := ds.Host(h2.ID)
assert.Nil(t, err)
require.NotNil(t, h2Verify)
assert.WithinDuration(t, aSecondAgo, h2Verify.SeenTime, time.Second)
}
}
func testCleanupIncomingHosts(t *testing.T, ds kolide.Datastore) {
mockClock := clock.NewMockClock()

View file

@ -442,6 +442,34 @@ func (d *Datastore) MarkHostSeen(host *kolide.Host, t time.Time) error {
return nil
}
func (d *Datastore) MarkHostsSeen(hostIDs []uint, t time.Time) error {
if len(hostIDs) == 0 {
return nil
}
if err := d.withRetryTxx(func(tx *sqlx.Tx) error {
query := `
UPDATE hosts SET
seen_time = ?
WHERE id IN (?)
`
query, args, err := sqlx.In(query, t, hostIDs)
if err != nil {
return errors.Wrap(err, "sqlx in")
}
query = d.db.Rebind(query)
if _, err := d.db.Exec(query, args...); err != nil {
return errors.Wrap(err, "exec update")
}
return nil
}); err != nil {
return errors.Wrap(err, "MarkHostsSeen transaction")
}
return nil
}
func (d *Datastore) searchHostsWithOmits(query string, omit ...uint) ([]*kolide.Host, error) {
hostQuery := transformQuery(query)
ipQuery := `"` + query + `"`

View file

@ -54,6 +54,7 @@ type HostStore interface {
// endpoints.
AuthenticateHost(nodeKey string) (*Host, error)
MarkHostSeen(host *Host, t time.Time) error
MarkHostsSeen(hostIDs []uint, t time.Time) error
SearchHosts(query string, omit ...uint) ([]*Host, error)
// CleanupIncomingHosts deletes hosts that have enrolled but never
// updated their status details. This clears dead "incoming hosts" that
@ -83,6 +84,8 @@ type HostService interface {
// Possible matches can be on osquery_host_identifier, node_key, UUID, or
// hostname.
HostByIdentifier(ctx context.Context, identifier string) (*HostDetail, error)
FlushSeenHosts(ctx context.Context) error
}
type HostListOptions struct {

View file

@ -28,6 +28,8 @@ type AuthenticateHostFunc func(nodeKey string) (*kolide.Host, error)
type MarkHostSeenFunc func(host *kolide.Host, t time.Time) error
type MarkHostsSeenFunc func(hostIDs []uint, t time.Time) error
type CleanupIncomingHostsFunc func(t time.Time) error
type SearchHostsFunc func(query string, omit ...uint) ([]*kolide.Host, error)
@ -66,6 +68,9 @@ type HostStore struct {
MarkHostSeenFunc MarkHostSeenFunc
MarkHostSeenFuncInvoked bool
MarkHostsSeenFunc MarkHostsSeenFunc
MarkHostsSeenFuncInvoked bool
CleanupIncomingHostsFunc CleanupIncomingHostsFunc
CleanupIncomingHostsFuncInvoked bool
@ -127,6 +132,11 @@ func (s *HostStore) MarkHostSeen(host *kolide.Host, t time.Time) error {
return s.MarkHostSeenFunc(host, t)
}
func (s *HostStore) MarkHostsSeen(hostIDs []uint, t time.Time) error {
s.MarkHostsSeenFuncInvoked = true
return s.MarkHostsSeenFunc(hostIDs, t)
}
func (s *HostStore) CleanupIncomingHosts(t time.Time) error {
s.CleanupIncomingHostsFuncInvoked = true
return s.CleanupIncomingHostsFunc(t)

View file

@ -6,6 +6,7 @@ import (
"html/template"
"net/http"
"strings"
"sync"
"time"
"github.com/WatchBeam/clock"
@ -29,7 +30,7 @@ func NewService(ds kolide.Datastore, resultStore kolide.QueryResultStore,
return nil, errors.Wrap(err, "initializing osquery logging")
}
svc = service{
svc = &service{
ds: ds,
carveStore: carveStore,
resultStore: resultStore,
@ -40,6 +41,7 @@ func NewService(ds kolide.Datastore, resultStore kolide.QueryResultStore,
osqueryLogWriter: osqueryLogger,
mailService: mailService,
ssoSessionStore: sso,
seenHostSet: newSeenHostSet(),
metaDataClient: &http.Client{
Timeout: 5 * time.Second,
},
@ -62,6 +64,8 @@ type service struct {
mailService kolide.MailService
ssoSessionStore sso.SessionStore
metaDataClient *http.Client
seenHostSet *seenHostSet
}
func (s service) SendEmail(mail kolide.Email) error {
@ -90,3 +94,36 @@ func getAssetURL() template.URL {
return template.URL("https://github.com/fleetdm/fleet/blob/" + tag)
}
// seenHostSet implements synchronized storage for the set of seen hosts.
type seenHostSet struct {
mutex sync.Mutex
hostIDs map[uint]bool
}
func newSeenHostSet() *seenHostSet {
return &seenHostSet{
mutex: sync.Mutex{},
hostIDs: make(map[uint]bool),
}
}
// addHostID adds the host identified by ID to the set
func (m *seenHostSet) addHostID(id uint) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.hostIDs[id] = true
}
// getAndClearHostIDs gets the list of unique host IDs from the set and empties
// the set.
func (m *seenHostSet) getAndClearHostIDs() []uint {
m.mutex.Lock()
defer m.mutex.Unlock()
var ids []uint
for id, _ := range m.hostIDs {
ids = append(ids, id)
}
m.hostIDs = make(map[uint]bool)
return ids
}

View file

@ -66,3 +66,8 @@ func (svc service) GetHostSummary(ctx context.Context) (*kolide.HostSummary, err
func (svc service) DeleteHost(ctx context.Context, id uint) error {
return svc.ds.DeleteHost(id)
}
func (svc *service) FlushSeenHosts(ctx context.Context) error {
hostIDs := svc.seenHostSet.getAndClearHostIDs()
return svc.ds.MarkHostsSeen(hostIDs, svc.clock.Now())
}

View file

@ -89,7 +89,7 @@ func setupInviteTest(t *testing.T) (kolide.Service, *mock.Store, *mockMailServic
KolideServerURL: "https://acme.co",
})
mailer := &mockMailService{SendEmailFn: func(e kolide.Email) error { return nil }}
svc := validationMiddleware{service{
svc := validationMiddleware{&service{
ds: ms,
config: config.TestConfig(),
mailService: mailer,

View file

@ -64,11 +64,14 @@ func (svc service) AuthenticateHost(ctx context.Context, nodeKey string) (*kolid
}
}
// Update the "seen" time used to calculate online status
err = svc.ds.MarkHostSeen(host, svc.clock.Now())
if err != nil {
return nil, osqueryError{message: "failed to mark host seen: " + err.Error()}
}
// Update the "seen" time used to calculate online status. These updates are
// batched for MySQL performance reasons. Because this is done
// asynchronously, it is possible for the server to shut down before
// updating the seen time for these hosts. This seems to be an acceptable
// tradeoff as an online host will continue to check in and quickly be
// marked online again.
svc.seenHostSet.addHostID(host.ID)
host.SeenTime = svc.clock.Now()
return host, nil
}

View file

@ -114,23 +114,46 @@ func TestEnrollAgentDetails(t *testing.T) {
func TestAuthenticateHost(t *testing.T) {
ds := new(mock.Store)
svc, err := newTestService(ds, nil, nil)
require.Nil(t, err)
require.NoError(t, err)
var gotKey string
host := kolide.Host{HostName: "foobar"}
host := kolide.Host{ID: 1, HostName: "foobar"}
ds.AuthenticateHostFunc = func(key string) (*kolide.Host, error) {
gotKey = key
return &host, nil
}
ds.MarkHostSeenFunc = func(host *kolide.Host, t time.Time) error {
var gotHostIDs []uint
ds.MarkHostsSeenFunc = func(hostIDs []uint, t time.Time) error {
gotHostIDs = hostIDs
return nil
}
h, err := svc.AuthenticateHost(context.Background(), "test")
_, err = svc.AuthenticateHost(context.Background(), "test")
require.Nil(t, err)
assert.Equal(t, "test", gotKey)
assert.True(t, ds.MarkHostSeenFuncInvoked)
assert.Equal(t, host, *h)
assert.False(t, ds.MarkHostsSeenFuncInvoked)
host = kolide.Host{ID: 7, HostName: "foobar"}
_, err = svc.AuthenticateHost(context.Background(), "floobar")
require.Nil(t, err)
assert.Equal(t, "floobar", gotKey)
assert.False(t, ds.MarkHostsSeenFuncInvoked)
// Host checks in twice
host = kolide.Host{ID: 7, HostName: "foobar"}
_, err = svc.AuthenticateHost(context.Background(), "floobar")
require.Nil(t, err)
assert.Equal(t, "floobar", gotKey)
assert.False(t, ds.MarkHostsSeenFuncInvoked)
err = svc.FlushSeenHosts(context.Background())
require.NoError(t, err)
assert.True(t, ds.MarkHostsSeenFuncInvoked)
assert.ElementsMatch(t, []uint{1, 7}, gotHostIDs)
err = svc.FlushSeenHosts(context.Background())
require.NoError(t, err)
assert.True(t, ds.MarkHostsSeenFuncInvoked)
assert.Len(t, gotHostIDs, 0)
}
func TestAuthenticateHostFailure(t *testing.T) {
@ -161,7 +184,7 @@ func TestSubmitStatusLogs(t *testing.T) {
require.Nil(t, err)
// Hack to get at the service internals and modify the writer
serv := ((svc.(validationMiddleware)).Service).(service)
serv := ((svc.(validationMiddleware)).Service).(*service)
testLogger := &testJSONLogger{}
serv.osqueryLogWriter = &logging.OsqueryLogger{Status: testLogger}
@ -190,7 +213,7 @@ func TestSubmitResultLogs(t *testing.T) {
require.Nil(t, err)
// Hack to get at the service internals and modify the writer
serv := ((svc.(validationMiddleware)).Service).(service)
serv := ((svc.(validationMiddleware)).Service).(*service)
testLogger := &testJSONLogger{}
serv.osqueryLogWriter = &logging.OsqueryLogger{Result: testLogger}
@ -1527,15 +1550,18 @@ func TestAuthenticationErrors(t *testing.T) {
}
svc, err := newTestService(ms, nil, nil)
require.Nil(t, err)
require.NoError(t, err)
ctx := context.Background()
_, err = svc.AuthenticateHost(ctx, "")
require.NotNil(t, err)
require.Error(t, err)
require.True(t, err.(osqueryError).NodeInvalid())
ms.AuthenticateHostFunc = func(nodeKey string) (*kolide.Host, error) {
return &kolide.Host{ID: 1}, nil
}
_, err = svc.AuthenticateHost(ctx, "foo")
require.Nil(t, err)
require.NoError(t, err)
// return not found error
ms.AuthenticateHostFunc = func(nodeKey string) (*kolide.Host, error) {
@ -1543,7 +1569,7 @@ func TestAuthenticationErrors(t *testing.T) {
}
_, err = svc.AuthenticateHost(ctx, "foo")
require.NotNil(t, err)
require.Error(t, err)
require.True(t, err.(osqueryError).NodeInvalid())
// return other error