chore: migrate to cluster informer (#27206)

Signed-off-by: Blake Pettersson <blake.pettersson@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Blake Pettersson 2026-04-08 15:26:21 +02:00 committed by GitHub
parent 212f51d851
commit f1388674cc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 283 additions and 98 deletions

View file

@ -63,8 +63,6 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
@ -465,38 +463,28 @@ func (l *Listeners) Close() error {
// logInClusterWarnings checks the in-cluster configuration and prints out any warnings.
func (server *ArgoCDServer) logInClusterWarnings() error {
labelSelector := labels.NewSelector()
req, err := labels.NewRequirement(common.LabelKeySecretType, selection.Equals, []string{common.LabelValueSecretTypeCluster})
informer, err := server.settingsMgr.GetClusterInformer()
if err != nil {
return fmt.Errorf("failed to construct cluster-type label selector: %w", err)
return fmt.Errorf("failed to get cluster informer: %w", err)
}
labelSelector = labelSelector.Add(*req)
secretsLister, err := server.settingsMgr.GetSecretsLister()
clusters, err := informer.ListAvailableClusters()
if err != nil {
return fmt.Errorf("failed to get secrets lister: %w", err)
return fmt.Errorf("failed to list clusters: %w", err)
}
clusterSecrets, err := secretsLister.Secrets(server.ArgoCDServerOpts.Namespace).List(labelSelector)
if err != nil {
return fmt.Errorf("failed to list cluster secrets: %w", err)
}
var inClusterSecrets []string
for _, clusterSecret := range clusterSecrets {
cluster, err := db.SecretToCluster(clusterSecret)
if err != nil {
return fmt.Errorf("could not unmarshal cluster secret %q: %w", clusterSecret.Name, err)
}
var inClusterNames []string
for _, cluster := range clusters {
if cluster.Server == v1alpha1.KubernetesInternalAPIServerAddr {
inClusterSecrets = append(inClusterSecrets, clusterSecret.Name)
inClusterNames = append(inClusterNames, cluster.ObjectMeta.Name)
}
}
if len(inClusterSecrets) > 0 {
if len(inClusterNames) > 0 {
// Don't make this call unless we actually have in-cluster secrets, to save time.
inClusterEnabled, err := server.settingsMgr.IsInClusterEnabled()
if err != nil {
return fmt.Errorf("could not check if in-cluster is enabled: %w", err)
}
if !inClusterEnabled {
for _, clusterName := range inClusterSecrets {
for _, clusterName := range inClusterNames {
log.Warnf("cluster %q uses in-cluster server address but it's disabled in Argo CD settings", clusterName)
}
}

View file

@ -22,7 +22,6 @@ import (
"github.com/argoproj/argo-cd/v3/common"
appv1 "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v3/util/settings"
)
const (
@ -66,7 +65,11 @@ func (db *db) getLocalCluster() *appv1.Cluster {
// ListClusters returns list of clusters
func (db *db) ListClusters(_ context.Context) (*appv1.ClusterList, error) {
clusterSecrets, err := db.listSecretsByType(common.LabelValueSecretTypeCluster)
informer, err := db.settingsMgr.GetClusterInformer()
if err != nil {
return nil, fmt.Errorf("failed to get cluster informer: %w", err)
}
clusters, err := informer.ListAvailableClusters()
if err != nil {
return nil, err
}
@ -78,12 +81,7 @@ func (db *db) ListClusters(_ context.Context) (*appv1.ClusterList, error) {
log.Warnf(errCheckingInClusterEnabled, "ListClusters", err)
}
hasInClusterCredentials := false
for _, clusterSecret := range clusterSecrets {
cluster, err := SecretToCluster(clusterSecret)
if err != nil {
log.Errorf("could not unmarshal cluster secret %s", clusterSecret.Name)
continue
}
for _, cluster := range clusters {
if cluster.Server == appv1.KubernetesInternalAPIServerAddr {
if inClusterEnabled {
hasInClusterCredentials = true
@ -218,18 +216,27 @@ func (db *db) WatchClusters(ctx context.Context,
return err
}
func (db *db) getClusterSecret(server string) (*corev1.Secret, error) {
clusterSecrets, err := db.listSecretsByType(common.LabelValueSecretTypeCluster)
func (db *db) getClusterSecret(ctx context.Context, server string) (*corev1.Secret, error) {
informer, err := db.settingsMgr.GetClusterInformer()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get cluster informer: %w", err)
}
srv := strings.TrimRight(server, "/")
for _, clusterSecret := range clusterSecrets {
if strings.TrimRight(string(clusterSecret.Data["server"]), "/") == srv {
return clusterSecret, nil
cluster, err := informer.GetClusterByURL(server)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "cluster %q not found", server)
}
return nil, status.Errorf(codes.Internal, "failed to get cluster %q from informer: %v", server, err)
}
return nil, status.Errorf(codes.NotFound, "cluster %q not found", server)
secretName := cluster.ObjectMeta.Name
secret, err := db.kubeclientset.CoreV1().Secrets(db.ns).Get(ctx, secretName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "cluster %q not found", server)
}
return nil, status.Errorf(codes.Internal, "failed to get cluster secret %s: %v", secretName, err)
}
return secret, nil
}
// GetCluster returns a cluster from a query
@ -253,6 +260,9 @@ func (db *db) GetCluster(_ context.Context, server string) (*appv1.Cluster, erro
if err == nil {
return cluster, nil
}
if !apierrors.IsNotFound(err) {
return nil, status.Errorf(codes.Internal, "failed to get cluster %q: %v", server, err)
}
// Fall back to the hardcoded local cluster if no secret is configured
return db.getLocalCluster(), nil
@ -260,7 +270,10 @@ func (db *db) GetCluster(_ context.Context, server string) (*appv1.Cluster, erro
cluster, err := informer.GetClusterByURL(server)
if err != nil {
return nil, status.Errorf(codes.NotFound, "cluster %q not found", server)
if apierrors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "cluster %q not found", server)
}
return nil, status.Errorf(codes.Internal, "failed to get cluster %q: %v", server, err)
}
return cluster, nil
@ -268,23 +281,15 @@ func (db *db) GetCluster(_ context.Context, server string) (*appv1.Cluster, erro
// GetProjectClusters return project scoped clusters by given project name
func (db *db) GetProjectClusters(_ context.Context, project string) ([]*appv1.Cluster, error) {
informer, err := db.settingsMgr.GetSecretsInformer()
informer, err := db.settingsMgr.GetClusterInformer()
if err != nil {
return nil, fmt.Errorf("failed to get secrets informer: %w", err)
return nil, fmt.Errorf("failed to get cluster informer: %w", err)
}
secrets, err := informer.GetIndexer().ByIndex(settings.ByProjectClusterIndexer, project)
clusters, err := informer.GetAvailableProjectClusters(project)
if err != nil {
return nil, fmt.Errorf("failed to get index by project cluster indexer for project %q: %w", project, err)
return nil, fmt.Errorf("failed to get index by project clusters for project %q: %w", project, err)
}
var res []*appv1.Cluster
for i := range secrets {
cluster, err := SecretToCluster(secrets[i].(*corev1.Secret))
if err != nil {
return nil, fmt.Errorf("failed to convert secret to cluster: %w", err)
}
res = append(res, cluster)
}
return res, nil
return clusters, nil
}
func (db *db) GetClusterServersByName(_ context.Context, name string) ([]string, error) {
@ -328,7 +333,7 @@ func (db *db) GetClusterServersByName(_ context.Context, name string) ([]string,
// UpdateCluster updates a cluster
func (db *db) UpdateCluster(ctx context.Context, c *appv1.Cluster) (*appv1.Cluster, error) {
clusterSecret, err := db.getClusterSecret(c.Server)
clusterSecret, err := db.getClusterSecret(ctx, c.Server)
if err != nil {
if status.Code(err) == codes.NotFound {
return db.CreateCluster(ctx, c)
@ -353,7 +358,7 @@ func (db *db) UpdateCluster(ctx context.Context, c *appv1.Cluster) (*appv1.Clust
// DeleteCluster deletes a cluster by name
func (db *db) DeleteCluster(ctx context.Context, server string) error {
secret, err := db.getClusterSecret(server)
secret, err := db.getClusterSecret(ctx, server)
if err != nil {
return err
}

View file

@ -10,7 +10,9 @@ import (
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
@ -24,6 +26,8 @@ const (
ClusterCacheByURLIndexer = "byClusterURL"
// ClusterCacheByNameIndexer indexes clusters by name
ClusterCacheByNameIndexer = "byClusterName"
// ClusterCacheByProjectIndexer indexes clusters by project
ClusterCacheByProjectIndexer = "byProjectCluster"
)
// ClusterInformer provides a cached view of cluster secrets as Cluster objects.
@ -64,6 +68,16 @@ func NewClusterInformer(clientset kubernetes.Interface, namespace string) (*Clus
}
return nil, nil
},
ClusterCacheByProjectIndexer: func(obj any) ([]string, error) {
cluster, ok := obj.(*appv1.Cluster)
if !ok {
return nil, nil
}
if cluster.Project != "" {
return []string{cluster.Project}, nil
}
return nil, nil
},
}, func(options *metav1.ListOptions) {
// Only watch secrets with the cluster label
options.LabelSelector = fmt.Sprintf("%s=%s", common.LabelKeySecretType, common.LabelValueSecretTypeCluster)
@ -113,7 +127,7 @@ func (cc *ClusterInformer) GetClusterByURL(url string) (*appv1.Cluster, error) {
}
if len(items) == 0 {
return nil, fmt.Errorf("cluster %q not found in cache", url)
return nil, apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "secrets"}, url)
}
cluster, ok := items[0].(*appv1.Cluster)
@ -125,6 +139,24 @@ func (cc *ClusterInformer) GetClusterByURL(url string) (*appv1.Cluster, error) {
return cluster.DeepCopy(), nil
}
func (cc *ClusterInformer) GetProjectClusters(project string) ([]*appv1.Cluster, error) {
items, err := cc.GetIndexer().ByIndex(ClusterCacheByProjectIndexer, project)
if err != nil {
return nil, fmt.Errorf("failed to query cluster cache by project %q: %w", project, err)
}
return collectClusters(items, true)
}
// GetAvailableProjectClusters is like GetProjectClusters but skips malformed entries
// instead of returning an error, for callers that prefer partial results.
func (cc *ClusterInformer) GetAvailableProjectClusters(project string) ([]*appv1.Cluster, error) {
items, err := cc.GetIndexer().ByIndex(ClusterCacheByProjectIndexer, project)
if err != nil {
return nil, fmt.Errorf("failed to query cluster cache by project %q: %w", project, err)
}
return collectClusters(items, false)
}
// GetClusterServersByName retrieves all server URLs for clusters with the given name.
func (cc *ClusterInformer) GetClusterServersByName(name string) ([]string, error) {
items, err := cc.GetIndexer().ByIndex(ClusterCacheByNameIndexer, name)
@ -148,14 +180,27 @@ func (cc *ClusterInformer) GetClusterServersByName(name string) ([]string, error
// ListClusters returns all clusters in the cache.
// Returns an error if any item in the cache is not a *Cluster (indicates transform failure).
func (cc *ClusterInformer) ListClusters() ([]*appv1.Cluster, error) {
items := cc.GetIndexer().List()
clusters := make([]*appv1.Cluster, 0, len(items))
return collectClusters(cc.GetIndexer().List(), true)
}
// ListAvailableClusters is like ListClusters but skips malformed entries
// instead of returning an error, for callers that prefer partial results.
func (cc *ClusterInformer) ListAvailableClusters() ([]*appv1.Cluster, error) {
return collectClusters(cc.GetIndexer().List(), false)
}
// collectClusters extracts Cluster objects from cache items. When strict is true,
// a non-Cluster item causes an error. When false, it is skipped with a warning.
func collectClusters(items []any, strict bool) ([]*appv1.Cluster, error) {
clusters := make([]*appv1.Cluster, 0, len(items))
for _, item := range items {
cluster, ok := item.(*appv1.Cluster)
if !ok {
// Return an error to prevent partial data from causing incorrect applicationset deletions
return nil, fmt.Errorf("cluster cache contains unexpected type %T instead of *Cluster, secret conversion failure", item)
if strict {
return nil, fmt.Errorf("cluster cache contains unexpected type %T instead of *Cluster, secret conversion failure", item)
}
log.Warnf("Expected *appv1.Cluster in cache, got %T (skipping)", item)
continue
}
// Return copies to prevent modification of cached objects
clusters = append(clusters, cluster.DeepCopy())

View file

@ -150,7 +150,7 @@ func TestClusterInformer_TransformErrors_MixedSecrets(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "good-cluster", cluster.Name)
// But ListClusters should fail because there's a bad secret in the cache
// ListClusters should fail because there's a bad secret in the cache
_, err = informer.ListClusters()
require.Error(t, err)
assert.Contains(t, err.Error(), "cluster cache contains unexpected type")
@ -877,3 +877,99 @@ func BenchmarkClusterInformer_GetClusterByURL(b *testing.B) {
}
})
}
func TestClusterInformer_GetProjectClusters(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
secrets := []runtime.Object{
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-proj-a-1",
Namespace: "argocd",
Labels: map[string]string{common.LabelKeySecretType: common.LabelValueSecretTypeCluster},
},
Data: map[string][]byte{
"server": []byte("https://a1.example.com"),
"name": []byte("a1"),
"config": []byte("{}"),
"project": []byte("project-a"),
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-proj-a-2",
Namespace: "argocd",
Labels: map[string]string{common.LabelKeySecretType: common.LabelValueSecretTypeCluster},
},
Data: map[string][]byte{
"server": []byte("https://a2.example.com"),
"name": []byte("a2"),
"config": []byte("{}"),
"project": []byte("project-a"),
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-proj-b",
Namespace: "argocd",
Labels: map[string]string{common.LabelKeySecretType: common.LabelValueSecretTypeCluster},
},
Data: map[string][]byte{
"server": []byte("https://b1.example.com"),
"name": []byte("b1"),
"config": []byte("{}"),
"project": []byte("project-b"),
},
},
}
clientset := fake.NewClientset(secrets...)
informer, err := NewClusterInformer(clientset, "argocd")
require.NoError(t, err)
go informer.Run(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), informer.HasSynced)
t.Run("returns clusters for matching project", func(t *testing.T) {
clusters, err := informer.GetProjectClusters("project-a")
require.NoError(t, err)
assert.Len(t, clusters, 2)
servers := make([]string, 0, len(clusters))
for _, c := range clusters {
servers = append(servers, c.Server)
}
assert.Contains(t, servers, "https://a1.example.com")
assert.Contains(t, servers, "https://a2.example.com")
})
t.Run("does not return clusters from other projects", func(t *testing.T) {
clusters, err := informer.GetProjectClusters("project-b")
require.NoError(t, err)
assert.Len(t, clusters, 1)
assert.Equal(t, "https://b1.example.com", clusters[0].Server)
})
t.Run("returns empty for non-existent project", func(t *testing.T) {
clusters, err := informer.GetProjectClusters("no-such-project")
require.NoError(t, err)
assert.Empty(t, clusters)
})
t.Run("returned clusters are isolated from cache", func(t *testing.T) {
clusters, err := informer.GetProjectClusters("project-a")
require.NoError(t, err)
require.Len(t, clusters, 2)
// Mutate the returned cluster
clusters[0].Name = "mutated"
// Fetch again and verify cache is unaffected
fresh, err := informer.GetProjectClusters("project-a")
require.NoError(t, err)
for _, c := range fresh {
assert.NotEqual(t, "mutated", c.Name, "cache should not be affected by caller mutation")
}
})
}

View file

@ -225,41 +225,6 @@ type AzureOIDCConfig struct {
}
var (
ByClusterURLIndexer = "byClusterURL"
byClusterURLIndexerFunc = func(obj any) ([]string, error) {
s, ok := obj.(*corev1.Secret)
if !ok {
return nil, nil
}
if s.Labels == nil || s.Labels[common.LabelKeySecretType] != common.LabelValueSecretTypeCluster {
return nil, nil
}
if s.Data == nil {
return nil, nil
}
if url, ok := s.Data["server"]; ok {
return []string{strings.TrimRight(string(url), "/")}, nil
}
return nil, nil
}
ByClusterNameIndexer = "byClusterName"
byClusterNameIndexerFunc = func(obj any) ([]string, error) {
s, ok := obj.(*corev1.Secret)
if !ok {
return nil, nil
}
if s.Labels == nil || s.Labels[common.LabelKeySecretType] != common.LabelValueSecretTypeCluster {
return nil, nil
}
if s.Data == nil {
return nil, nil
}
if name, ok := s.Data["name"]; ok {
return []string{string(name)}, nil
}
return nil, nil
}
ByProjectClusterIndexer = "byProjectCluster"
ByProjectRepoIndexer = "byProjectRepo"
ByProjectRepoWriteIndexer = "byProjectRepoWrite"
byProjectIndexerFunc = func(secretType string) func(obj any) ([]string, error) {
@ -1365,14 +1330,13 @@ func (mgr *SettingsManager) initialize(ctx context.Context) error {
}
indexers := cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
ByClusterURLIndexer: byClusterURLIndexerFunc,
ByClusterNameIndexer: byClusterNameIndexerFunc,
ByProjectClusterIndexer: byProjectIndexerFunc(common.LabelValueSecretTypeCluster),
ByProjectRepoIndexer: byProjectIndexerFunc(common.LabelValueSecretTypeRepository),
ByProjectRepoWriteIndexer: byProjectIndexerFunc(common.LabelValueSecretTypeRepositoryWrite),
}
cmInformer := informersv1.NewFilteredConfigMapInformer(mgr.clientset, mgr.namespace, 3*time.Minute, indexers, tweakConfigMap)
secretsInformer := informersv1.NewSecretInformer(mgr.clientset, mgr.namespace, 3*time.Minute, indexers)
secretsInformer := informersv1.NewFilteredSecretInformer(mgr.clientset, mgr.namespace, 3*time.Minute, indexers, func(options *metav1.ListOptions) {
options.LabelSelector = common.LabelKeySecretType + "!=" + common.LabelValueSecretTypeCluster
})
clusterInformer, err := NewClusterInformer(mgr.clientset, mgr.namespace)
if err != nil {
log.Error(err)

View file

@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes/fake"
@ -2348,3 +2349,89 @@ func TestSettingsManager_GetAllowedNodeLabels(t *testing.T) {
})
}
}
func TestSecretsInformerExcludesClusterSecrets(t *testing.T) {
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDConfigMapName,
Namespace: "default",
Labels: map[string]string{
"app.kubernetes.io/part-of": "argocd",
},
},
}
argoSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDSecretName,
Namespace: "default",
Labels: map[string]string{
"app.kubernetes.io/part-of": "argocd",
},
},
Data: map[string][]byte{},
}
repoSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "repo-secret",
Namespace: "default",
Labels: map[string]string{
common.LabelKeySecretType: common.LabelValueSecretTypeRepository,
},
},
Data: map[string][]byte{
"url": []byte("https://github.com/example/repo"),
},
}
clusterSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-secret",
Namespace: "default",
Labels: map[string]string{
common.LabelKeySecretType: common.LabelValueSecretTypeCluster,
},
},
Data: map[string][]byte{
"server": []byte("https://cluster.example.com"),
"name": []byte("test-cluster"),
"config": []byte("{}"),
},
}
kubeClient := fake.NewClientset(cm, argoSecret, repoSecret, clusterSecret)
settingsManager := NewSettingsManager(t.Context(), kubeClient, "default")
t.Run("secrets lister excludes cluster secrets", func(t *testing.T) {
lister, err := settingsManager.GetSecretsLister()
require.NoError(t, err)
secrets, err := lister.Secrets("default").List(labels.Everything())
require.NoError(t, err)
secretNames := make([]string, 0, len(secrets))
for _, s := range secrets {
secretNames = append(secretNames, s.Name)
}
assert.Contains(t, secretNames, "repo-secret", "repository secret should be in secrets informer")
assert.NotContains(t, secretNames, "cluster-secret", "cluster secret should be excluded from secrets informer")
})
t.Run("cluster informer includes cluster secrets", func(t *testing.T) {
informer, err := settingsManager.GetClusterInformer()
require.NoError(t, err)
cluster, err := informer.GetClusterByURL("https://cluster.example.com")
require.NoError(t, err)
assert.Equal(t, "test-cluster", cluster.Name)
})
t.Run("cluster informer excludes non-cluster secrets", func(t *testing.T) {
informer, err := settingsManager.GetClusterInformer()
require.NoError(t, err)
clusters, err := informer.ListClusters()
require.NoError(t, err)
for _, c := range clusters {
assert.NotEqual(t, "repo-secret", c.ObjectMeta.Name, "repository secret should not appear in cluster informer")
}
})
}