From 0be26613b3e579f79ded9233ef6715c453b6a96c Mon Sep 17 00:00:00 2001 From: Michal Nicpon <39177923+michalnicp@users.noreply.github.com> Date: Tue, 15 Feb 2022 12:07:51 -0700 Subject: [PATCH] Cache team agent options (#4193) * use raw literal for json * wrap cache to clone all values --- cmd/fleetctl/apply_test.go | 6 +- server/datastore/cached_mysql/cached_mysql.go | 186 +++++++++++++++--- .../cached_mysql/cached_mysql_test.go | 149 +++++++++++++- server/fleet/app.go | 7 - 4 files changed, 302 insertions(+), 46 deletions(-) diff --git a/cmd/fleetctl/apply_test.go b/cmd/fleetctl/apply_test.go index 7ee249eb61..c8f1fd4241 100644 --- a/cmd/fleetctl/apply_test.go +++ b/cmd/fleetctl/apply_test.go @@ -167,11 +167,11 @@ spec: - secret: AAA `) - newAgentOpts := json.RawMessage("{\"config\":{\"something\":\"else\"}}") + newAgentOpts := json.RawMessage(`{"config":{"something":"else"}}`) require.Equal(t, "[+] applied 2 teams\n", runAppForTest(t, []string{"apply", "-f", tmpFile.Name()})) - assert.Equal(t, &agentOpts, teamsByName["team2"].AgentOptions) - assert.Equal(t, &newAgentOpts, teamsByName["team1"].AgentOptions) + assert.JSONEq(t, string(agentOpts), string(*teamsByName["team2"].AgentOptions)) + assert.JSONEq(t, string(newAgentOpts), string(*teamsByName["team1"].AgentOptions)) assert.Equal(t, []*fleet.EnrollSecret{{Secret: "AAA"}}, enrolledSecretsCalled[uint(42)]) } diff --git a/server/datastore/cached_mysql/cached_mysql.go b/server/datastore/cached_mysql/cached_mysql.go index 59fa05c49f..a8b583bef0 100644 --- a/server/datastore/cached_mysql/cached_mysql.go +++ b/server/datastore/cached_mysql/cached_mysql.go @@ -2,51 +2,129 @@ package cached_mysql import ( "context" + "encoding/json" "fmt" + "reflect" "time" "github.com/fleetdm/fleet/v4/server/fleet" + "github.com/jinzhu/copier" "github.com/patrickmn/go-cache" ) +const ( + appConfigKey = "AppConfig:%s" + defaultAppConfigExpiration = 1 * time.Second + packsHostKey = "Packs:host:%d" + defaultPacksExpiration = 1 * time.Minute + scheduledQueriesKey = "ScheduledQueries:pack:%d" + defaultScheduledQueriesExpiration = 1 * time.Minute + teamAgentOptionsKey = "TeamAgentOptions:team:%d" + defaultTeamAgentOptionsExpiration = 1 * time.Minute +) + +// cloner represents any type that can clone itself. Used by types to provide a more efficient clone method. +type cloner interface { + Clone() (interface{}, error) +} + +func clone(v interface{}) (interface{}, error) { + if cloner, ok := v.(cloner); ok { + return cloner.Clone() + } + + // Use reflection to initialize a clone of v of the same type. + vv := reflect.ValueOf(v) + + // If the value is a pointer, then calling reflect.New on it will result in a double pointer. + // Instead, dereference the pointer first. + isPtr := false + if vv.Kind() == reflect.Ptr { + isPtr = true + vv = vv.Elem() + } + + clone := reflect.New(vv.Type()) + + err := copier.Copy(clone.Interface(), v) + if err != nil { + return nil, err + } + + if isPtr { + return clone.Interface(), nil + } + + // The value was not a pointer. Need to dereference it before returning. + return clone.Elem().Interface(), nil +} + +// cloneCache wraps the in memory cache with one that clones items before returning them. +type cloneCache struct { + *cache.Cache +} + +func (c *cloneCache) Get(k string) (interface{}, bool) { + x, found := c.Cache.Get(k) + if !found { + return nil, false + } + + clone, err := clone(x) + if err != nil { + // Unfortunely, we can't return an error here. Return a cache miss instead of panic'ing. + return nil, false + } + return clone, true +} + +func (c *cloneCache) Set(k string, x interface{}, d time.Duration) { + clone, err := clone(x) + if err != nil { + // Unfortunately, we can't return an error here. Skip caching it if clone fails. + return + } + + c.Cache.Set(k, clone, d) +} + type cachedMysql struct { fleet.Datastore - c *cache.Cache + c *cloneCache packsExp time.Duration scheduledQueriesExp time.Duration + teamAgentOptionsExp time.Duration } -const ( - appConfigKey = "AppConfig" - packsKey = "Packs" - scheduledQueriesKey = "ScheduledQueries" - defaultAppConfigExpiration = 1 * time.Second - defaultPacksExpiration = 1 * time.Minute - defaultScheduledQueriesExpiration = 1 * time.Minute -) - type Option func(*cachedMysql) -func WithPacksExpiration(t time.Duration) Option { +func WithPacksExpiration(d time.Duration) Option { return func(o *cachedMysql) { - o.packsExp = t + o.packsExp = d } } -func WithScheduledQueriesExpiration(t time.Duration) Option { +func WithScheduledQueriesExpiration(d time.Duration) Option { return func(o *cachedMysql) { - o.scheduledQueriesExp = t + o.scheduledQueriesExp = d + } +} + +func WithTeamAgentOptionsExpiration(d time.Duration) Option { + return func(o *cachedMysql) { + o.teamAgentOptionsExp = d } } func New(ds fleet.Datastore, opts ...Option) fleet.Datastore { c := &cachedMysql{ Datastore: ds, - c: cache.New(5*time.Minute, 10*time.Minute), + c: &cloneCache{cache.New(5*time.Minute, 10*time.Minute)}, packsExp: defaultPacksExpiration, scheduledQueriesExp: defaultScheduledQueriesExpiration, + teamAgentOptionsExp: defaultTeamAgentOptionsExpiration, } for _, fn := range opts { fn(c) @@ -62,13 +140,15 @@ func (ds *cachedMysql) NewAppConfig(ctx context.Context, info *fleet.AppConfig) ds.c.Set(appConfigKey, ac, defaultAppConfigExpiration) - return ac.Clone() + return ac, nil } func (ds *cachedMysql) AppConfig(ctx context.Context) (*fleet.AppConfig, error) { - cachedAc, found := ds.c.Get(appConfigKey) - if found { - return cachedAc.(*fleet.AppConfig).Clone() + if x, found := ds.c.Get(appConfigKey); found { + ac, ok := x.(*fleet.AppConfig) + if ok { + return ac, nil + } } ac, err := ds.Datastore.AppConfig(ctx) @@ -78,7 +158,7 @@ func (ds *cachedMysql) AppConfig(ctx context.Context) (*fleet.AppConfig, error) ds.c.Set(appConfigKey, ac, defaultAppConfigExpiration) - return ac.Clone() + return ac, nil } func (ds *cachedMysql) SaveAppConfig(ctx context.Context, info *fleet.AppConfig) error { @@ -93,12 +173,11 @@ func (ds *cachedMysql) SaveAppConfig(ctx context.Context, info *fleet.AppConfig) } func (ds *cachedMysql) ListPacksForHost(ctx context.Context, hid uint) ([]*fleet.Pack, error) { - key := fmt.Sprintf("%s_%d", packsKey, hid) - cachedPacks, found := ds.c.Get(key) - if found && cachedPacks != nil { - casted, ok := cachedPacks.([]*fleet.Pack) + key := fmt.Sprintf(packsHostKey, hid) + if x, found := ds.c.Get(key); found { + cachedPacks, ok := x.([]*fleet.Pack) if ok { - return casted, nil + return cachedPacks, nil } } @@ -112,17 +191,16 @@ func (ds *cachedMysql) ListPacksForHost(ctx context.Context, hid uint) ([]*fleet return packs, nil } -func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, id uint) ([]*fleet.ScheduledQuery, error) { - key := fmt.Sprintf("%s_%d", scheduledQueriesKey, id) - cachedScheduledQueries, found := ds.c.Get(key) - if found && cachedScheduledQueries != nil { - casted, ok := cachedScheduledQueries.([]*fleet.ScheduledQuery) +func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, packID uint) ([]*fleet.ScheduledQuery, error) { + key := fmt.Sprintf(scheduledQueriesKey, packID) + if x, found := ds.c.Get(key); found { + scheduledQueries, ok := x.([]*fleet.ScheduledQuery) if ok { - return casted, nil + return scheduledQueries, nil } } - scheduledQueries, err := ds.Datastore.ListScheduledQueriesInPack(ctx, id) + scheduledQueries, err := ds.Datastore.ListScheduledQueriesInPack(ctx, packID) if err != nil { return nil, err } @@ -131,3 +209,47 @@ func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, id uint) return scheduledQueries, nil } + +func (ds *cachedMysql) TeamAgentOptions(ctx context.Context, teamID uint) (*json.RawMessage, error) { + key := fmt.Sprintf(teamAgentOptionsKey, teamID) + if x, found := ds.c.Get(key); found { + if agentOptions, ok := x.(*json.RawMessage); ok { + return agentOptions, nil + } + } + + agentOptions, err := ds.Datastore.TeamAgentOptions(ctx, teamID) + if err != nil { + return nil, err + } + + ds.c.Set(key, agentOptions, ds.teamAgentOptionsExp) + + return agentOptions, nil +} + +func (ds *cachedMysql) SaveTeam(ctx context.Context, team *fleet.Team) (*fleet.Team, error) { + team, err := ds.Datastore.SaveTeam(ctx, team) + if err != nil { + return nil, err + } + + key := fmt.Sprintf(teamAgentOptionsKey, team.ID) + + ds.c.Set(key, team.AgentOptions, ds.teamAgentOptionsExp) + + return team, nil +} + +func (ds *cachedMysql) DeleteTeam(ctx context.Context, teamID uint) error { + err := ds.Datastore.DeleteTeam(ctx, teamID) + if err != nil { + return err + } + + key := fmt.Sprintf(teamAgentOptionsKey, teamID) + + ds.c.Delete(key) + + return nil +} diff --git a/server/datastore/cached_mysql/cached_mysql_test.go b/server/datastore/cached_mysql/cached_mysql_test.go index b9fd19f094..8b232697ff 100644 --- a/server/datastore/cached_mysql/cached_mysql_test.go +++ b/server/datastore/cached_mysql/cached_mysql_test.go @@ -3,6 +3,7 @@ package cached_mysql import ( "context" "encoding/json" + "errors" "testing" "time" @@ -13,6 +14,64 @@ import ( "github.com/stretchr/testify/require" ) +func TestClone(t *testing.T) { + tests := []struct { + name string + src interface{} + want interface{} + }{ + { + name: "string", + src: "foo", + want: "foo", + }, + { + name: "struct", + src: fleet.AppConfig{ + ServerSettings: fleet.ServerSettings{ + EnableAnalytics: true, + }, + }, + want: fleet.AppConfig{ + ServerSettings: fleet.ServerSettings{ + EnableAnalytics: true, + }, + }, + }, + { + name: "pointer to struct", + src: &fleet.AppConfig{ + ServerSettings: fleet.ServerSettings{ + EnableAnalytics: true, + }, + }, + want: &fleet.AppConfig{ + ServerSettings: fleet.ServerSettings{ + EnableAnalytics: true, + }, + }, + }, + { + name: "slice", + src: []string{"foo", "bar"}, + want: []string{"foo", "bar"}, + }, + { + name: "pointer to slice", + src: &[]string{"foo", "bar"}, + want: &[]string{"foo", "bar"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + clone, err := clone(tc.src) + require.NoError(t, err) + assert.Equal(t, tc.want, clone) + }) + } +} + func TestCachedAppConfig(t *testing.T) { t.Parallel() @@ -92,7 +151,7 @@ func TestCachedPacksforHost(t *testing.T) { t.Parallel() mockedDS := new(mock.Store) - ds := New(mockedDS, WithPacksExpiration(1*time.Second)) + ds := New(mockedDS, WithPacksExpiration(100*time.Millisecond)) dbPacks := []*fleet.Pack{ { @@ -131,7 +190,7 @@ func TestCachedPacksforHost(t *testing.T) { require.Equal(t, packs, packs2) // returns the old cached value require.Equal(t, 1, called) - time.Sleep(2 * time.Second) + time.Sleep(200 * time.Millisecond) packs3, err := ds.ListPacksForHost(context.Background(), 1) require.NoError(t, err) @@ -143,7 +202,7 @@ func TestCachedListScheduledQueriesInPack(t *testing.T) { t.Parallel() mockedDS := new(mock.Store) - ds := New(mockedDS, WithScheduledQueriesExpiration(1*time.Second)) + ds := New(mockedDS, WithScheduledQueriesExpiration(100*time.Millisecond)) dbScheduledQueries := []*fleet.ScheduledQuery{ { @@ -178,10 +237,92 @@ func TestCachedListScheduledQueriesInPack(t *testing.T) { require.Equal(t, scheduledQueries, scheduledQueries2) // returns the new db entry require.Equal(t, 1, called) - time.Sleep(2 * time.Second) + time.Sleep(200 * time.Millisecond) scheduledQueries3, err := ds.ListScheduledQueriesInPack(context.Background(), 1) require.NoError(t, err) require.Equal(t, dbScheduledQueries, scheduledQueries3) // returns the new db entry require.Equal(t, 2, called) } + +func TestCachedTeamAgentOptions(t *testing.T) { + t.Parallel() + + mockedDS := new(mock.Store) + ds := New(mockedDS, WithTeamAgentOptionsExpiration(100*time.Millisecond)) + + testOptions := json.RawMessage(` +{ + "config": { + "options": { + "logger_plugin": "tls", + "pack_delimiter": "/", + "logger_tls_period": 10, + "distributed_plugin": "tls", + "disable_distributed": false, + "logger_tls_endpoint": "/api/v1/osquery/log", + "distributed_interval": 10, + "distributed_tls_max_attempts": 3 + }, + "decorators": { + "load": [ + "SELECT uuid AS host_uuid FROM system_info;", + "SELECT hostname AS hostname FROM system_info;" + ] + } + }, + "overrides": {} +} +`) + + testTeam := &fleet.Team{ + ID: 1, + CreatedAt: time.Now(), + Name: "test", + AgentOptions: &testOptions, + } + + deleted := false + mockedDS.TeamAgentOptionsFunc = func(ctx context.Context, teamID uint) (*json.RawMessage, error) { + if deleted { + return nil, errors.New("not found") + } + return &testOptions, nil + } + mockedDS.SaveTeamFunc = func(ctx context.Context, team *fleet.Team) (*fleet.Team, error) { + return team, nil + } + mockedDS.DeleteTeamFunc = func(ctx context.Context, teamID uint) error { + deleted = true + return nil + } + + options, err := ds.TeamAgentOptions(context.Background(), 1) + require.NoError(t, err) + require.JSONEq(t, string(testOptions), string(*options)) + + // saving a team updates agent options in cache + updateOptions := json.RawMessage(` +{} +`) + updateTeam := &fleet.Team{ + ID: testTeam.ID, + CreatedAt: testTeam.CreatedAt, + Name: testTeam.Name, + AgentOptions: &updateOptions, + } + + _, err = ds.SaveTeam(context.Background(), updateTeam) + require.NoError(t, err) + + options, err = ds.TeamAgentOptions(context.Background(), testTeam.ID) + require.NoError(t, err) + require.JSONEq(t, string(updateOptions), string(*options)) + + // deleting a team removes the agent options from the cache + err = ds.DeleteTeam(context.Background(), testTeam.ID) + require.NoError(t, err) + + _, err = ds.TeamAgentOptions(context.Background(), testTeam.ID) + require.Error(t, err) +} diff --git a/server/fleet/app.go b/server/fleet/app.go index 527c7213ce..89c7410a87 100644 --- a/server/fleet/app.go +++ b/server/fleet/app.go @@ -6,7 +6,6 @@ import ( "time" "github.com/fleetdm/fleet/v4/server/config" - "github.com/jinzhu/copier" ) // SMTP settings names returned from API, these map to SMTPAuthType and @@ -429,9 +428,3 @@ type KafkaRESTConfig struct { ResultTopic string `json:"result_topic"` ProxyHost string `json:"proxyhost"` } - -func (c *AppConfig) Clone() (*AppConfig, error) { - newAc := AppConfig{} - err := copier.Copy(&newAc, c) - return &newAc, err -}