feat: implement application cache synchronization in appset controller (#26578)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>
This commit is contained in:
Alexander Matyushentsev 2026-02-25 01:08:53 -08:00 committed by GitHub
parent 5e6449fbba
commit c0a2a579c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 255 additions and 6 deletions

View file

@ -0,0 +1,137 @@
package utils
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"unsafe"
log "github.com/sirupsen/logrus"
k8scache "k8s.io/client-go/tools/cache"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
application "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
)
// NewCacheSyncingClient returns a client that wraps the given client and syncs the cache after each Create, Update, Patch, or Delete operation on Application objects.
func NewCacheSyncingClient(c client.Client, cache ctrlcache.Cache) client.Client {
res := &cacheSyncingClient{Client: c, storesByNs: make(map[string]k8scache.Store)}
// The k8s controller runtime's cache does not expose a way to get the underlying store, so we have to use reflection to access it.
// This is necessary to keep the cache in sync with the client operations.
field := reflect.ValueOf(cache).Elem().FieldByName("namespaceToCache")
if field.Kind() == reflect.Map {
namespaceToCache := *(*map[string]ctrlcache.Cache)(unsafe.Pointer(field.UnsafeAddr()))
res.getNSCache = func(_ context.Context, obj client.Object) (ctrlcache.Cache, error) {
res, ok := namespaceToCache[obj.GetNamespace()]
if !ok {
return nil, fmt.Errorf("cache for namespace %s not found", obj.GetNamespace())
}
return res, nil
}
} else {
res.getNSCache = func(_ context.Context, _ client.Object) (ctrlcache.Cache, error) {
return cache, nil
}
}
return res
}
type cacheSyncingClient struct {
client.Client
getNSCache func(ctx context.Context, obj client.Object) (ctrlcache.Cache, error)
storesByNs map[string]k8scache.Store
storesByNsLock sync.RWMutex
}
func (c *cacheSyncingClient) getStore(ctx context.Context, obj client.Object) (k8scache.Store, error) {
c.storesByNsLock.RLock()
store, ok := c.storesByNs[obj.GetNamespace()]
c.storesByNsLock.RUnlock()
if ok {
return store, nil
}
store, err := c.retrieveStore(ctx, obj)
if err != nil {
return nil, err
}
c.storesByNsLock.Lock()
c.storesByNs[obj.GetNamespace()] = store
c.storesByNsLock.Unlock()
return store, nil
}
func (c *cacheSyncingClient) retrieveStore(ctx context.Context, obj client.Object) (k8scache.Store, error) {
nsCache, err := c.getNSCache(ctx, obj)
if err != nil {
return nil, fmt.Errorf("failed to get namespace cache: %w", err)
}
informer, err := nsCache.GetInformerForKind(ctx, application.ApplicationSchemaGroupVersionKind)
if err != nil {
return nil, fmt.Errorf("failed to get informer: %w", err)
}
indexInformer, ok := informer.(k8scache.SharedIndexInformer)
if !ok {
return nil, errors.New("informer is not a SharedIndexInformer")
}
return indexInformer.GetStore(), nil
}
func (c *cacheSyncingClient) execAndSyncCache(ctx context.Context, op func() error, obj client.Object, deleteObj bool) error {
// execute the operation first and only sync cache if it succeeds
if err := op(); err != nil {
return err
}
// sync cache for applications only
if _, ok := obj.(*application.Application); !ok {
return nil
}
logger := log.WithField("namespace", obj.GetNamespace()).WithField("name", obj.GetName())
store, err := c.getStore(ctx, obj)
if err != nil {
logger.Errorf("failed to get cache store: %v", err)
} else {
if deleteObj {
err = store.Delete(obj)
} else {
err = store.Update(obj)
}
}
if err != nil {
logger.Errorf("failed to sync cache for object: %v", err)
}
return nil
}
func (c *cacheSyncingClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
return c.execAndSyncCache(ctx, func() error {
return c.Client.Create(ctx, obj, opts...)
}, obj, false)
}
func (c *cacheSyncingClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return c.execAndSyncCache(ctx, func() error {
return c.Client.Update(ctx, obj, opts...)
}, obj, false)
}
func (c *cacheSyncingClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
return c.execAndSyncCache(ctx, func() error {
return c.Client.Patch(ctx, obj, patch, opts...)
}, obj, false)
}
func (c *cacheSyncingClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
return c.execAndSyncCache(ctx, func() error {
return c.Client.Delete(ctx, obj, opts...)
}, obj, true)
}

View file

@ -0,0 +1,111 @@
package utils
import (
"context"
"testing"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
k8scache "k8s.io/client-go/tools/cache"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
application "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
)
type fakeMultiNamespaceCache struct {
//nolint:unused
namespaceToCache map[string]ctrlcache.Cache
ctrlcache.Cache
k8scache.SharedIndexInformer
Store k8scache.Store
}
func (f *fakeMultiNamespaceCache) GetInformerForKind(_ context.Context, _ schema.GroupVersionKind, _ ...ctrlcache.InformerGetOption) (ctrlcache.Informer, error) {
return f, nil
}
func (f *fakeMultiNamespaceCache) GetStore() k8scache.Store {
return f.Store
}
func newClient(objs ...client.Object) (*cacheSyncingClient, k8scache.Store, error) {
scheme := runtime.NewScheme()
if err := application.AddToScheme(scheme); err != nil {
return nil, nil, err
}
store := k8scache.NewStore(func(obj any) (string, error) {
return obj.(client.Object).GetName(), nil
})
for _, obj := range objs {
if err := store.Add(obj); err != nil {
return nil, nil, err
}
}
c := &cacheSyncingClient{
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(),
storesByNs: map[string]k8scache.Store{},
getNSCache: func(_ context.Context, _ client.Object) (ctrlcache.Cache, error) {
return &fakeMultiNamespaceCache{Store: store}, nil
},
}
return c, store, nil
}
func TestCreateSyncsCache(t *testing.T) {
c, store, err := newClient()
require.NoError(t, err)
app := &application.Application{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "argocd"},
}
require.NoError(t, c.Create(context.Background(), app))
require.Contains(t, store.List(), app)
}
func TestUpdateSyncsCache(t *testing.T) {
app := &application.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "argocd",
Labels: map[string]string{"foo": "bar"},
},
}
c, store, err := newClient(app)
require.NoError(t, err)
updatedApp := app.DeepCopy()
updatedApp.Labels["foo"] = "bar-UPDATED"
require.NoError(t, c.Update(context.Background(), updatedApp))
updated, _, err := store.GetByKey("test")
require.NoError(t, err)
require.Equal(t, "bar-UPDATED", updated.(*application.Application).Labels["foo"])
}
func TestDeleteSyncsCache(t *testing.T) {
app := &application.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "argocd",
Labels: map[string]string{"foo": "bar"},
},
}
c, store, err := newClient(app)
require.NoError(t, err)
require.NoError(t, c.Delete(context.Background(), app))
require.Empty(t, store.List())
}
func TestNewClientDoesNotCrashWithMultiNamespaceCache(_ *testing.T) {
_ = NewCacheSyncingClient(nil, &fakeMultiNamespaceCache{})
}

View file

@ -78,6 +78,7 @@ func NewCommand() *cobra.Command {
webhookParallelism int
tokenRefStrictMode bool
maxResourcesStatusCount int
cacheSyncPeriod time.Duration
)
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
@ -139,13 +140,11 @@ func NewCommand() *cobra.Command {
os.Exit(1)
}
var cacheOpt ctrlcache.Options
cacheOpt := ctrlcache.Options{SyncPeriod: &cacheSyncPeriod}
if watchedNamespace != "" {
cacheOpt = ctrlcache.Options{
DefaultNamespaces: map[string]ctrlcache.Config{
watchedNamespace: {},
},
cacheOpt.DefaultNamespaces = map[string]ctrlcache.Config{
watchedNamespace: {},
}
}
@ -241,7 +240,7 @@ func NewCommand() *cobra.Command {
if err = (&controllers.ApplicationSetReconciler{
Generators: topLevelGenerators,
Client: mgr.GetClient(),
Client: utils.NewCacheSyncingClient(mgr.GetClient(), mgr.GetCache()),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("applicationset-controller"),
Renderer: &utils.Render{},
@ -303,6 +302,7 @@ func NewCommand() *cobra.Command {
command.Flags().StringSliceVar(&metricsAplicationsetLabels, "metrics-applicationset-labels", []string{}, "List of Application labels that will be added to the argocd_applicationset_labels metric")
command.Flags().BoolVar(&enableGitHubAPIMetrics, "enable-github-api-metrics", env.ParseBoolFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_ENABLE_GITHUB_API_METRICS", false), "Enable GitHub API metrics for generators that use the GitHub API")
command.Flags().IntVar(&maxResourcesStatusCount, "max-resources-status-count", env.ParseNumFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_MAX_RESOURCES_STATUS_COUNT", 5000, 0, math.MaxInt), "Max number of resources stored in appset status.")
command.Flags().DurationVar(&cacheSyncPeriod, "cache-sync-period", env.ParseDurationFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_CACHE_SYNC_PERIOD", time.Hour*10, 0, time.Hour*24), "Period at which the manager client cache is forcefully resynced with the Kubernetes API server. 0 disables periodic resync.")
return &command
}

View file

@ -17,6 +17,7 @@ argocd-applicationset-controller [flags]
--as string Username to impersonate for the operation
--as-group stringArray Group to impersonate for the operation, this flag can be repeated to specify multiple groups.
--as-uid string UID to impersonate for the operation
--cache-sync-period duration Period at which the manager client cache is forcefully resynced with the Kubernetes API server. 0 disables periodic resync. (default 10h0m0s)
--certificate-authority string Path to a cert file for the certificate authority
--client-certificate string Path to a client certificate file for TLS
--client-key string Path to a client key file for TLS