mirror of
https://github.com/fleetdm/fleet
synced 2026-05-05 22:39:17 +00:00
Cache team agent options (#4193)
* use raw literal for json * wrap cache to clone all values
This commit is contained in:
parent
9009857022
commit
0be26613b3
4 changed files with 302 additions and 46 deletions
|
|
@ -167,11 +167,11 @@ spec:
|
|||
- secret: AAA
|
||||
`)
|
||||
|
||||
newAgentOpts := json.RawMessage("{\"config\":{\"something\":\"else\"}}")
|
||||
newAgentOpts := json.RawMessage(`{"config":{"something":"else"}}`)
|
||||
|
||||
require.Equal(t, "[+] applied 2 teams\n", runAppForTest(t, []string{"apply", "-f", tmpFile.Name()}))
|
||||
assert.Equal(t, &agentOpts, teamsByName["team2"].AgentOptions)
|
||||
assert.Equal(t, &newAgentOpts, teamsByName["team1"].AgentOptions)
|
||||
assert.JSONEq(t, string(agentOpts), string(*teamsByName["team2"].AgentOptions))
|
||||
assert.JSONEq(t, string(newAgentOpts), string(*teamsByName["team1"].AgentOptions))
|
||||
assert.Equal(t, []*fleet.EnrollSecret{{Secret: "AAA"}}, enrolledSecretsCalled[uint(42)])
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,51 +2,129 @@ package cached_mysql
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/jinzhu/copier"
|
||||
"github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
const (
|
||||
appConfigKey = "AppConfig:%s"
|
||||
defaultAppConfigExpiration = 1 * time.Second
|
||||
packsHostKey = "Packs:host:%d"
|
||||
defaultPacksExpiration = 1 * time.Minute
|
||||
scheduledQueriesKey = "ScheduledQueries:pack:%d"
|
||||
defaultScheduledQueriesExpiration = 1 * time.Minute
|
||||
teamAgentOptionsKey = "TeamAgentOptions:team:%d"
|
||||
defaultTeamAgentOptionsExpiration = 1 * time.Minute
|
||||
)
|
||||
|
||||
// cloner represents any type that can clone itself. Used by types to provide a more efficient clone method.
|
||||
type cloner interface {
|
||||
Clone() (interface{}, error)
|
||||
}
|
||||
|
||||
func clone(v interface{}) (interface{}, error) {
|
||||
if cloner, ok := v.(cloner); ok {
|
||||
return cloner.Clone()
|
||||
}
|
||||
|
||||
// Use reflection to initialize a clone of v of the same type.
|
||||
vv := reflect.ValueOf(v)
|
||||
|
||||
// If the value is a pointer, then calling reflect.New on it will result in a double pointer.
|
||||
// Instead, dereference the pointer first.
|
||||
isPtr := false
|
||||
if vv.Kind() == reflect.Ptr {
|
||||
isPtr = true
|
||||
vv = vv.Elem()
|
||||
}
|
||||
|
||||
clone := reflect.New(vv.Type())
|
||||
|
||||
err := copier.Copy(clone.Interface(), v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isPtr {
|
||||
return clone.Interface(), nil
|
||||
}
|
||||
|
||||
// The value was not a pointer. Need to dereference it before returning.
|
||||
return clone.Elem().Interface(), nil
|
||||
}
|
||||
|
||||
// cloneCache wraps the in memory cache with one that clones items before returning them.
|
||||
type cloneCache struct {
|
||||
*cache.Cache
|
||||
}
|
||||
|
||||
func (c *cloneCache) Get(k string) (interface{}, bool) {
|
||||
x, found := c.Cache.Get(k)
|
||||
if !found {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
clone, err := clone(x)
|
||||
if err != nil {
|
||||
// Unfortunely, we can't return an error here. Return a cache miss instead of panic'ing.
|
||||
return nil, false
|
||||
}
|
||||
return clone, true
|
||||
}
|
||||
|
||||
func (c *cloneCache) Set(k string, x interface{}, d time.Duration) {
|
||||
clone, err := clone(x)
|
||||
if err != nil {
|
||||
// Unfortunately, we can't return an error here. Skip caching it if clone fails.
|
||||
return
|
||||
}
|
||||
|
||||
c.Cache.Set(k, clone, d)
|
||||
}
|
||||
|
||||
type cachedMysql struct {
|
||||
fleet.Datastore
|
||||
|
||||
c *cache.Cache
|
||||
c *cloneCache
|
||||
|
||||
packsExp time.Duration
|
||||
scheduledQueriesExp time.Duration
|
||||
teamAgentOptionsExp time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
appConfigKey = "AppConfig"
|
||||
packsKey = "Packs"
|
||||
scheduledQueriesKey = "ScheduledQueries"
|
||||
defaultAppConfigExpiration = 1 * time.Second
|
||||
defaultPacksExpiration = 1 * time.Minute
|
||||
defaultScheduledQueriesExpiration = 1 * time.Minute
|
||||
)
|
||||
|
||||
type Option func(*cachedMysql)
|
||||
|
||||
func WithPacksExpiration(t time.Duration) Option {
|
||||
func WithPacksExpiration(d time.Duration) Option {
|
||||
return func(o *cachedMysql) {
|
||||
o.packsExp = t
|
||||
o.packsExp = d
|
||||
}
|
||||
}
|
||||
|
||||
func WithScheduledQueriesExpiration(t time.Duration) Option {
|
||||
func WithScheduledQueriesExpiration(d time.Duration) Option {
|
||||
return func(o *cachedMysql) {
|
||||
o.scheduledQueriesExp = t
|
||||
o.scheduledQueriesExp = d
|
||||
}
|
||||
}
|
||||
|
||||
func WithTeamAgentOptionsExpiration(d time.Duration) Option {
|
||||
return func(o *cachedMysql) {
|
||||
o.teamAgentOptionsExp = d
|
||||
}
|
||||
}
|
||||
|
||||
func New(ds fleet.Datastore, opts ...Option) fleet.Datastore {
|
||||
c := &cachedMysql{
|
||||
Datastore: ds,
|
||||
c: cache.New(5*time.Minute, 10*time.Minute),
|
||||
c: &cloneCache{cache.New(5*time.Minute, 10*time.Minute)},
|
||||
packsExp: defaultPacksExpiration,
|
||||
scheduledQueriesExp: defaultScheduledQueriesExpiration,
|
||||
teamAgentOptionsExp: defaultTeamAgentOptionsExpiration,
|
||||
}
|
||||
for _, fn := range opts {
|
||||
fn(c)
|
||||
|
|
@ -62,13 +140,15 @@ func (ds *cachedMysql) NewAppConfig(ctx context.Context, info *fleet.AppConfig)
|
|||
|
||||
ds.c.Set(appConfigKey, ac, defaultAppConfigExpiration)
|
||||
|
||||
return ac.Clone()
|
||||
return ac, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) AppConfig(ctx context.Context) (*fleet.AppConfig, error) {
|
||||
cachedAc, found := ds.c.Get(appConfigKey)
|
||||
if found {
|
||||
return cachedAc.(*fleet.AppConfig).Clone()
|
||||
if x, found := ds.c.Get(appConfigKey); found {
|
||||
ac, ok := x.(*fleet.AppConfig)
|
||||
if ok {
|
||||
return ac, nil
|
||||
}
|
||||
}
|
||||
|
||||
ac, err := ds.Datastore.AppConfig(ctx)
|
||||
|
|
@ -78,7 +158,7 @@ func (ds *cachedMysql) AppConfig(ctx context.Context) (*fleet.AppConfig, error)
|
|||
|
||||
ds.c.Set(appConfigKey, ac, defaultAppConfigExpiration)
|
||||
|
||||
return ac.Clone()
|
||||
return ac, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) SaveAppConfig(ctx context.Context, info *fleet.AppConfig) error {
|
||||
|
|
@ -93,12 +173,11 @@ func (ds *cachedMysql) SaveAppConfig(ctx context.Context, info *fleet.AppConfig)
|
|||
}
|
||||
|
||||
func (ds *cachedMysql) ListPacksForHost(ctx context.Context, hid uint) ([]*fleet.Pack, error) {
|
||||
key := fmt.Sprintf("%s_%d", packsKey, hid)
|
||||
cachedPacks, found := ds.c.Get(key)
|
||||
if found && cachedPacks != nil {
|
||||
casted, ok := cachedPacks.([]*fleet.Pack)
|
||||
key := fmt.Sprintf(packsHostKey, hid)
|
||||
if x, found := ds.c.Get(key); found {
|
||||
cachedPacks, ok := x.([]*fleet.Pack)
|
||||
if ok {
|
||||
return casted, nil
|
||||
return cachedPacks, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -112,17 +191,16 @@ func (ds *cachedMysql) ListPacksForHost(ctx context.Context, hid uint) ([]*fleet
|
|||
return packs, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, id uint) ([]*fleet.ScheduledQuery, error) {
|
||||
key := fmt.Sprintf("%s_%d", scheduledQueriesKey, id)
|
||||
cachedScheduledQueries, found := ds.c.Get(key)
|
||||
if found && cachedScheduledQueries != nil {
|
||||
casted, ok := cachedScheduledQueries.([]*fleet.ScheduledQuery)
|
||||
func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, packID uint) ([]*fleet.ScheduledQuery, error) {
|
||||
key := fmt.Sprintf(scheduledQueriesKey, packID)
|
||||
if x, found := ds.c.Get(key); found {
|
||||
scheduledQueries, ok := x.([]*fleet.ScheduledQuery)
|
||||
if ok {
|
||||
return casted, nil
|
||||
return scheduledQueries, nil
|
||||
}
|
||||
}
|
||||
|
||||
scheduledQueries, err := ds.Datastore.ListScheduledQueriesInPack(ctx, id)
|
||||
scheduledQueries, err := ds.Datastore.ListScheduledQueriesInPack(ctx, packID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -131,3 +209,47 @@ func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, id uint)
|
|||
|
||||
return scheduledQueries, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) TeamAgentOptions(ctx context.Context, teamID uint) (*json.RawMessage, error) {
|
||||
key := fmt.Sprintf(teamAgentOptionsKey, teamID)
|
||||
if x, found := ds.c.Get(key); found {
|
||||
if agentOptions, ok := x.(*json.RawMessage); ok {
|
||||
return agentOptions, nil
|
||||
}
|
||||
}
|
||||
|
||||
agentOptions, err := ds.Datastore.TeamAgentOptions(ctx, teamID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds.c.Set(key, agentOptions, ds.teamAgentOptionsExp)
|
||||
|
||||
return agentOptions, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) SaveTeam(ctx context.Context, team *fleet.Team) (*fleet.Team, error) {
|
||||
team, err := ds.Datastore.SaveTeam(ctx, team)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key := fmt.Sprintf(teamAgentOptionsKey, team.ID)
|
||||
|
||||
ds.c.Set(key, team.AgentOptions, ds.teamAgentOptionsExp)
|
||||
|
||||
return team, nil
|
||||
}
|
||||
|
||||
func (ds *cachedMysql) DeleteTeam(ctx context.Context, teamID uint) error {
|
||||
err := ds.Datastore.DeleteTeam(ctx, teamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := fmt.Sprintf(teamAgentOptionsKey, teamID)
|
||||
|
||||
ds.c.Delete(key)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package cached_mysql
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -13,6 +14,64 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestClone(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
src interface{}
|
||||
want interface{}
|
||||
}{
|
||||
{
|
||||
name: "string",
|
||||
src: "foo",
|
||||
want: "foo",
|
||||
},
|
||||
{
|
||||
name: "struct",
|
||||
src: fleet.AppConfig{
|
||||
ServerSettings: fleet.ServerSettings{
|
||||
EnableAnalytics: true,
|
||||
},
|
||||
},
|
||||
want: fleet.AppConfig{
|
||||
ServerSettings: fleet.ServerSettings{
|
||||
EnableAnalytics: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "pointer to struct",
|
||||
src: &fleet.AppConfig{
|
||||
ServerSettings: fleet.ServerSettings{
|
||||
EnableAnalytics: true,
|
||||
},
|
||||
},
|
||||
want: &fleet.AppConfig{
|
||||
ServerSettings: fleet.ServerSettings{
|
||||
EnableAnalytics: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "slice",
|
||||
src: []string{"foo", "bar"},
|
||||
want: []string{"foo", "bar"},
|
||||
},
|
||||
{
|
||||
name: "pointer to slice",
|
||||
src: &[]string{"foo", "bar"},
|
||||
want: &[]string{"foo", "bar"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
clone, err := clone(tc.src)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.want, clone)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCachedAppConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
@ -92,7 +151,7 @@ func TestCachedPacksforHost(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
mockedDS := new(mock.Store)
|
||||
ds := New(mockedDS, WithPacksExpiration(1*time.Second))
|
||||
ds := New(mockedDS, WithPacksExpiration(100*time.Millisecond))
|
||||
|
||||
dbPacks := []*fleet.Pack{
|
||||
{
|
||||
|
|
@ -131,7 +190,7 @@ func TestCachedPacksforHost(t *testing.T) {
|
|||
require.Equal(t, packs, packs2) // returns the old cached value
|
||||
require.Equal(t, 1, called)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
packs3, err := ds.ListPacksForHost(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -143,7 +202,7 @@ func TestCachedListScheduledQueriesInPack(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
mockedDS := new(mock.Store)
|
||||
ds := New(mockedDS, WithScheduledQueriesExpiration(1*time.Second))
|
||||
ds := New(mockedDS, WithScheduledQueriesExpiration(100*time.Millisecond))
|
||||
|
||||
dbScheduledQueries := []*fleet.ScheduledQuery{
|
||||
{
|
||||
|
|
@ -178,10 +237,92 @@ func TestCachedListScheduledQueriesInPack(t *testing.T) {
|
|||
require.Equal(t, scheduledQueries, scheduledQueries2) // returns the new db entry
|
||||
require.Equal(t, 1, called)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
scheduledQueries3, err := ds.ListScheduledQueriesInPack(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, dbScheduledQueries, scheduledQueries3) // returns the new db entry
|
||||
require.Equal(t, 2, called)
|
||||
}
|
||||
|
||||
func TestCachedTeamAgentOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
mockedDS := new(mock.Store)
|
||||
ds := New(mockedDS, WithTeamAgentOptionsExpiration(100*time.Millisecond))
|
||||
|
||||
testOptions := json.RawMessage(`
|
||||
{
|
||||
"config": {
|
||||
"options": {
|
||||
"logger_plugin": "tls",
|
||||
"pack_delimiter": "/",
|
||||
"logger_tls_period": 10,
|
||||
"distributed_plugin": "tls",
|
||||
"disable_distributed": false,
|
||||
"logger_tls_endpoint": "/api/v1/osquery/log",
|
||||
"distributed_interval": 10,
|
||||
"distributed_tls_max_attempts": 3
|
||||
},
|
||||
"decorators": {
|
||||
"load": [
|
||||
"SELECT uuid AS host_uuid FROM system_info;",
|
||||
"SELECT hostname AS hostname FROM system_info;"
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": {}
|
||||
}
|
||||
`)
|
||||
|
||||
testTeam := &fleet.Team{
|
||||
ID: 1,
|
||||
CreatedAt: time.Now(),
|
||||
Name: "test",
|
||||
AgentOptions: &testOptions,
|
||||
}
|
||||
|
||||
deleted := false
|
||||
mockedDS.TeamAgentOptionsFunc = func(ctx context.Context, teamID uint) (*json.RawMessage, error) {
|
||||
if deleted {
|
||||
return nil, errors.New("not found")
|
||||
}
|
||||
return &testOptions, nil
|
||||
}
|
||||
mockedDS.SaveTeamFunc = func(ctx context.Context, team *fleet.Team) (*fleet.Team, error) {
|
||||
return team, nil
|
||||
}
|
||||
mockedDS.DeleteTeamFunc = func(ctx context.Context, teamID uint) error {
|
||||
deleted = true
|
||||
return nil
|
||||
}
|
||||
|
||||
options, err := ds.TeamAgentOptions(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
require.JSONEq(t, string(testOptions), string(*options))
|
||||
|
||||
// saving a team updates agent options in cache
|
||||
updateOptions := json.RawMessage(`
|
||||
{}
|
||||
`)
|
||||
updateTeam := &fleet.Team{
|
||||
ID: testTeam.ID,
|
||||
CreatedAt: testTeam.CreatedAt,
|
||||
Name: testTeam.Name,
|
||||
AgentOptions: &updateOptions,
|
||||
}
|
||||
|
||||
_, err = ds.SaveTeam(context.Background(), updateTeam)
|
||||
require.NoError(t, err)
|
||||
|
||||
options, err = ds.TeamAgentOptions(context.Background(), testTeam.ID)
|
||||
require.NoError(t, err)
|
||||
require.JSONEq(t, string(updateOptions), string(*options))
|
||||
|
||||
// deleting a team removes the agent options from the cache
|
||||
err = ds.DeleteTeam(context.Background(), testTeam.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = ds.TeamAgentOptions(context.Background(), testTeam.ID)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/config"
|
||||
"github.com/jinzhu/copier"
|
||||
)
|
||||
|
||||
// SMTP settings names returned from API, these map to SMTPAuthType and
|
||||
|
|
@ -429,9 +428,3 @@ type KafkaRESTConfig struct {
|
|||
ResultTopic string `json:"result_topic"`
|
||||
ProxyHost string `json:"proxyhost"`
|
||||
}
|
||||
|
||||
func (c *AppConfig) Clone() (*AppConfig, error) {
|
||||
newAc := AppConfig{}
|
||||
err := copier.Copy(&newAc, c)
|
||||
return &newAc, err
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue