diff --git a/Procfile b/Procfile index 4f28cb66e3..7b3c90b8f2 100644 --- a/Procfile +++ b/Procfile @@ -2,7 +2,7 @@ controller: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run api-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/api-server} FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-server $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --disable-auth=${ARGOCD_E2E_DISABLE_AUTH:-'true'} --insecure --dex-server http://localhost:${ARGOCD_E2E_DEX_PORT:-5556} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --port ${ARGOCD_E2E_APISERVER_PORT:-8080} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''} --hydrator-enabled=${ARGOCD_HYDRATOR_ENABLED:='false'}" dex: sh -c "ARGOCD_BINARY_NAME=argocd-dex go run github.com/argoproj/argo-cd/v3/cmd gendexcfg -o `pwd`/dist/dex.yaml && (test -f dist/dex.yaml || { echo 'Failed to generate dex configuration'; exit 1; }) && docker run --rm -p ${ARGOCD_E2E_DEX_PORT:-5556}:${ARGOCD_E2E_DEX_PORT:-5556} -v `pwd`/dist/dex.yaml:/dex.yaml ghcr.io/dexidp/dex:$(grep "image: ghcr.io/dexidp/dex:v2.45.0" manifests/base/dex/argocd-dex-server-deployment.yaml | cut -d':' -f3) dex serve /dex.yaml" redis: hack/start-redis-with-password.sh -repo-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "export PATH=./dist:\$PATH && [ -n \"\$ARGOCD_GIT_CONFIG\" ] && export GIT_CONFIG_GLOBAL=\$ARGOCD_GIT_CONFIG && export GIT_CONFIG_NOSYSTEM=1; GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/repo-server} FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_GNUPGHOME=${ARGOCD_GNUPGHOME:-/tmp/argocd-local/gpg/keys} ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} ARGOCD_GPG_DATA_PATH=${ARGOCD_GPG_DATA_PATH:-/tmp/argocd-local/gpg/source} ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-repo-server ARGOCD_GPG_ENABLED=${ARGOCD_GPG_ENABLED:-false} $COMMAND --loglevel debug --port ${ARGOCD_E2E_REPOSERVER_PORT:-8081} --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --otlp-address=${ARGOCD_OTLP_ADDRESS}" +repo-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "export PATH=\$(pwd)/dist:\$PATH && [ -n \"\$ARGOCD_GIT_CONFIG\" ] && export GIT_CONFIG_GLOBAL=\$ARGOCD_GIT_CONFIG && export GIT_CONFIG_NOSYSTEM=1; GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/repo-server} FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_GNUPGHOME=${ARGOCD_GNUPGHOME:-/tmp/argocd-local/gpg/keys} ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} ARGOCD_GPG_DATA_PATH=${ARGOCD_GPG_DATA_PATH:-/tmp/argocd-local/gpg/source} ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-repo-server ARGOCD_GPG_ENABLED=${ARGOCD_GPG_ENABLED:-false} $COMMAND --loglevel debug --port ${ARGOCD_E2E_REPOSERVER_PORT:-8081} --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --otlp-address=${ARGOCD_OTLP_ADDRESS}" cmp-server: [ "$ARGOCD_E2E_TEST" = 'true' ] && exit 0 || [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_BINARY_NAME=argocd-cmp-server ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} $COMMAND --config-dir-path ./test/cmp --loglevel debug --otlp-address=${ARGOCD_OTLP_ADDRESS}" commit-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/commit-server} FORCE_LOG_COLORS=1 ARGOCD_BINARY_NAME=argocd-commit-server $COMMAND --loglevel debug --port ${ARGOCD_E2E_COMMITSERVER_PORT:-8086}" ui: sh -c 'cd ui && ${ARGOCD_E2E_YARN_CMD:-yarn} start' diff --git a/applicationset/controllers/applicationset_controller.go b/applicationset/controllers/applicationset_controller.go index ff9d985e6b..c8cd6db17b 100644 --- a/applicationset/controllers/applicationset_controller.go +++ b/applicationset/controllers/applicationset_controller.go @@ -24,11 +24,13 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -103,15 +105,16 @@ type ApplicationSetReconciler struct { Policy argov1alpha1.ApplicationsSyncPolicy EnablePolicyOverride bool utils.Renderer - ArgoCDNamespace string - ApplicationSetNamespaces []string - EnableProgressiveSyncs bool - SCMRootCAPath string - GlobalPreservedAnnotations []string - GlobalPreservedLabels []string - Metrics *metrics.ApplicationsetMetrics - MaxResourcesStatusCount int - ClusterInformer *settings.ClusterInformer + ArgoCDNamespace string + ApplicationSetNamespaces []string + EnableProgressiveSyncs bool + SCMRootCAPath string + GlobalPreservedAnnotations []string + GlobalPreservedLabels []string + Metrics *metrics.ApplicationsetMetrics + MaxResourcesStatusCount int + ClusterInformer *settings.ClusterInformer + ConcurrentApplicationUpdates int } // +kubebuilder:rbac:groups=argoproj.io,resources=applicationsets,verbs=get;list;watch;create;update;patch;delete @@ -688,108 +691,133 @@ func (r *ApplicationSetReconciler) SetupWithManager(mgr ctrl.Manager, enableProg // - For existing application, it will call update // The function also adds owner reference to all applications, and uses it to delete them. func (r *ApplicationSetReconciler) createOrUpdateInCluster(ctx context.Context, logCtx *log.Entry, applicationSet argov1alpha1.ApplicationSet, desiredApplications []argov1alpha1.Application) error { - var firstError error - // Creates or updates the application in appList - for _, generatedApp := range desiredApplications { - appLog := logCtx.WithFields(applog.GetAppLogFields(&generatedApp)) + // Build the diff config once per reconcile. + // Diff config is per applicationset, so generate it once for all applications + diffConfig, err := utils.BuildIgnoreDiffConfig(applicationSet.Spec.IgnoreApplicationDifferences, normalizers.IgnoreNormalizerOpts{}) + if err != nil { + return fmt.Errorf("failed to build ignore diff config: %w", err) + } + g, ctx := errgroup.WithContext(ctx) + concurrency := r.concurrency() + g.SetLimit(concurrency) + + var appErrorsMu sync.Mutex + appErrors := map[string]error{} + + for _, generatedApp := range desiredApplications { // Normalize to avoid fighting with the application controller. generatedApp.Spec = *argoutil.NormalizeApplicationSpec(&generatedApp.Spec) + g.Go(func() error { + appLog := logCtx.WithFields(applog.GetAppLogFields(&generatedApp)) - found := &argov1alpha1.Application{ - ObjectMeta: metav1.ObjectMeta{ - Name: generatedApp.Name, - Namespace: generatedApp.Namespace, - }, - TypeMeta: metav1.TypeMeta{ - Kind: application.ApplicationKind, - APIVersion: "argoproj.io/v1alpha1", - }, - } - - action, err := utils.CreateOrUpdate(ctx, appLog, r.Client, applicationSet.Spec.IgnoreApplicationDifferences, normalizers.IgnoreNormalizerOpts{}, found, func() error { - // Copy only the Application/ObjectMeta fields that are significant, from the generatedApp - found.Spec = generatedApp.Spec - - // allow setting the Operation field to trigger a sync operation on an Application - if generatedApp.Operation != nil { - found.Operation = generatedApp.Operation + found := &argov1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: generatedApp.Name, + Namespace: generatedApp.Namespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: application.ApplicationKind, + APIVersion: "argoproj.io/v1alpha1", + }, } - preservedAnnotations := make([]string, 0) - preservedLabels := make([]string, 0) + action, err := utils.CreateOrUpdate(ctx, appLog, r.Client, diffConfig, found, func() error { + // Copy only the Application/ObjectMeta fields that are significant, from the generatedApp + found.Spec = generatedApp.Spec - if applicationSet.Spec.PreservedFields != nil { - preservedAnnotations = append(preservedAnnotations, applicationSet.Spec.PreservedFields.Annotations...) - preservedLabels = append(preservedLabels, applicationSet.Spec.PreservedFields.Labels...) - } - - if len(r.GlobalPreservedAnnotations) > 0 { - preservedAnnotations = append(preservedAnnotations, r.GlobalPreservedAnnotations...) - } - - if len(r.GlobalPreservedLabels) > 0 { - preservedLabels = append(preservedLabels, r.GlobalPreservedLabels...) - } - - // Preserve specially treated argo cd annotations: - // * https://github.com/argoproj/applicationset/issues/180 - // * https://github.com/argoproj/argo-cd/issues/10500 - preservedAnnotations = append(preservedAnnotations, defaultPreservedAnnotations...) - - for _, key := range preservedAnnotations { - if state, exists := found.Annotations[key]; exists { - if generatedApp.Annotations == nil { - generatedApp.Annotations = map[string]string{} - } - generatedApp.Annotations[key] = state + // allow setting the Operation field to trigger a sync operation on an Application + if generatedApp.Operation != nil { + found.Operation = generatedApp.Operation } - } - for _, key := range preservedLabels { - if state, exists := found.Labels[key]; exists { - if generatedApp.Labels == nil { - generatedApp.Labels = map[string]string{} - } - generatedApp.Labels[key] = state + preservedAnnotations := make([]string, 0) + preservedLabels := make([]string, 0) + + if applicationSet.Spec.PreservedFields != nil { + preservedAnnotations = append(preservedAnnotations, applicationSet.Spec.PreservedFields.Annotations...) + preservedLabels = append(preservedLabels, applicationSet.Spec.PreservedFields.Labels...) } - } - // Preserve deleting finalizers and avoid diff conflicts - for _, finalizer := range defaultPreservedFinalizers { - for _, f := range found.Finalizers { - // For finalizers, use prefix matching in case it contains "/" stages - if strings.HasPrefix(f, finalizer) { - generatedApp.Finalizers = append(generatedApp.Finalizers, f) + if len(r.GlobalPreservedAnnotations) > 0 { + preservedAnnotations = append(preservedAnnotations, r.GlobalPreservedAnnotations...) + } + + if len(r.GlobalPreservedLabels) > 0 { + preservedLabels = append(preservedLabels, r.GlobalPreservedLabels...) + } + + // Preserve specially treated argo cd annotations: + // * https://github.com/argoproj/applicationset/issues/180 + // * https://github.com/argoproj/argo-cd/issues/10500 + preservedAnnotations = append(preservedAnnotations, defaultPreservedAnnotations...) + + for _, key := range preservedAnnotations { + if state, exists := found.Annotations[key]; exists { + if generatedApp.Annotations == nil { + generatedApp.Annotations = map[string]string{} + } + generatedApp.Annotations[key] = state } } + + for _, key := range preservedLabels { + if state, exists := found.Labels[key]; exists { + if generatedApp.Labels == nil { + generatedApp.Labels = map[string]string{} + } + generatedApp.Labels[key] = state + } + } + + // Preserve deleting finalizers and avoid diff conflicts + for _, finalizer := range defaultPreservedFinalizers { + for _, f := range found.Finalizers { + // For finalizers, use prefix matching in case it contains "/" stages + if strings.HasPrefix(f, finalizer) { + generatedApp.Finalizers = append(generatedApp.Finalizers, f) + } + } + } + + found.Annotations = generatedApp.Annotations + found.Labels = generatedApp.Labels + found.Finalizers = generatedApp.Finalizers + + return controllerutil.SetControllerReference(&applicationSet, found, r.Scheme) + }) + if err != nil { + appLog.WithError(err).WithField("action", action).Errorf("failed to %s Application", action) + // If the context was canceled or its deadline exceeded, return the error so it propagates through g.Wait(). + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return err + } + // For backwards compatibility with sequential behavior: continue processing other applications + // but record the error keyed by app name so we can deterministically return the error from + // the lexicographically first failing app, regardless of goroutine scheduling order. + appErrorsMu.Lock() + appErrors[generatedApp.Name] = err + appErrorsMu.Unlock() + return nil } - found.Annotations = generatedApp.Annotations - found.Labels = generatedApp.Labels - found.Finalizers = generatedApp.Finalizers - - return controllerutil.SetControllerReference(&applicationSet, found, r.Scheme) + if action != controllerutil.OperationResultNone { + // Don't pollute etcd with "unchanged Application" events + r.Recorder.Eventf(&applicationSet, corev1.EventTypeNormal, fmt.Sprint(action), "%s Application %q", action, generatedApp.Name) + appLog.Logf(log.InfoLevel, "%s Application", action) + } else { + // "unchanged Application" can be inferred by Reconcile Complete with no action being listed + // Or enable debug logging + appLog.Logf(log.DebugLevel, "%s Application", action) + } + return nil }) - if err != nil { - appLog.WithError(err).WithField("action", action).Errorf("failed to %s Application", action) - if firstError == nil { - firstError = err - } - continue - } - - if action != controllerutil.OperationResultNone { - // Don't pollute etcd with "unchanged Application" events - r.Recorder.Eventf(&applicationSet, corev1.EventTypeNormal, fmt.Sprint(action), "%s Application %q", action, generatedApp.Name) - appLog.Logf(log.InfoLevel, "%s Application", action) - } else { - // "unchanged Application" can be inferred by Reconcile Complete with no action being listed - // Or enable debug logging - appLog.Logf(log.DebugLevel, "%s Application", action) - } } - return firstError + + if err := g.Wait(); errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return err + } + return firstAppError(appErrors) } // createInCluster will filter from the desiredApplications only the application that needs to be created @@ -849,36 +877,84 @@ func (r *ApplicationSetReconciler) deleteInCluster(ctx context.Context, logCtx * m[app.Name] = true } - // Delete apps that are not in m[string]bool - var firstError error - for _, app := range current { - logCtx = logCtx.WithFields(applog.GetAppLogFields(&app)) - _, exists := m[app.Name] + g, ctx := errgroup.WithContext(ctx) + concurrency := r.concurrency() + g.SetLimit(concurrency) - if !exists { + var appErrorsMu sync.Mutex + appErrors := map[string]error{} + + // Delete apps that are not in m[string]bool + for _, app := range current { + _, exists := m[app.Name] + if exists { + continue + } + appLogCtx := logCtx.WithFields(applog.GetAppLogFields(&app)) + g.Go(func() error { // Removes the Argo CD resources finalizer if the application contains an invalid target (eg missing cluster) - err := r.removeFinalizerOnInvalidDestination(ctx, applicationSet, &app, clusterList, logCtx) + err := r.removeFinalizerOnInvalidDestination(ctx, applicationSet, &app, clusterList, appLogCtx) if err != nil { - logCtx.WithError(err).Error("failed to update Application") - if firstError != nil { - firstError = err + appLogCtx.WithError(err).Error("failed to update Application") + // If the context was canceled or its deadline exceeded, return the error so it propagates through g.Wait(). + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return err } - continue + // For backwards compatibility with sequential behavior: continue processing other applications + // but record the error keyed by app name so we can deterministically return the error from + // the lexicographically first failing app, regardless of goroutine scheduling order. + appErrorsMu.Lock() + appErrors[app.Name] = err + appErrorsMu.Unlock() + return nil } err = r.Delete(ctx, &app) if err != nil { - logCtx.WithError(err).Error("failed to delete Application") - if firstError != nil { - firstError = err + appLogCtx.WithError(err).Error("failed to delete Application") + // If the context was canceled or its deadline exceeded, return the error so it propagates through g.Wait(). + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return err } - continue + appErrorsMu.Lock() + appErrors[app.Name] = err + appErrorsMu.Unlock() + return nil } r.Recorder.Eventf(&applicationSet, corev1.EventTypeNormal, "Deleted", "Deleted Application %q", app.Name) - logCtx.Log(log.InfoLevel, "Deleted application") - } + appLogCtx.Log(log.InfoLevel, "Deleted application") + return nil + }) } - return firstError + + if err := g.Wait(); errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return err + } + return firstAppError(appErrors) +} + +// concurrency returns the configured number of concurrent application updates, defaulting to 1. +func (r *ApplicationSetReconciler) concurrency() int { + if r.ConcurrentApplicationUpdates <= 0 { + return 1 + } + return r.ConcurrentApplicationUpdates +} + +// firstAppError returns the error associated with the lexicographically smallest application name +// in the provided map. This gives a deterministic result when multiple goroutines may have +// recorded errors concurrently, matching the behavior of the original sequential loop where the +// first application in iteration order would determine the returned error. +func firstAppError(appErrors map[string]error) error { + if len(appErrors) == 0 { + return nil + } + names := make([]string, 0, len(appErrors)) + for name := range appErrors { + names = append(names, name) + } + sort.Strings(names) + return appErrors[names[0]] } // removeFinalizerOnInvalidDestination removes the Argo CD resources finalizer if the application contains an invalid target (eg missing cluster) diff --git a/applicationset/controllers/applicationset_controller_test.go b/applicationset/controllers/applicationset_controller_test.go index 4f4b558020..34b8c884cb 100644 --- a/applicationset/controllers/applicationset_controller_test.go +++ b/applicationset/controllers/applicationset_controller_test.go @@ -25,6 +25,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" crtclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" @@ -1077,6 +1078,70 @@ func TestCreateOrUpdateInCluster(t *testing.T) { }, }, }, + { + name: "Ensure that unnormalized live spec does not cause a spurious patch", + appSet: v1alpha1.ApplicationSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: v1alpha1.ApplicationSetSpec{ + Template: v1alpha1.ApplicationSetTemplate{ + Spec: v1alpha1.ApplicationSpec{ + Project: "project", + }, + }, + }, + }, + existingApps: []v1alpha1.Application{ + { + TypeMeta: metav1.TypeMeta{ + Kind: application.ApplicationKind, + APIVersion: "argoproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "app1", + Namespace: "namespace", + ResourceVersion: "2", + }, + Spec: v1alpha1.ApplicationSpec{ + Project: "project", + // Without normalizing the live object, the equality check + // sees &SyncPolicy{} vs nil and issues an unnecessary patch. + SyncPolicy: &v1alpha1.SyncPolicy{}, + }, + }, + }, + desiredApps: []v1alpha1.Application{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "app1", + Namespace: "namespace", + }, + Spec: v1alpha1.ApplicationSpec{ + Project: "project", + SyncPolicy: nil, + }, + }, + }, + expected: []v1alpha1.Application{ + { + TypeMeta: metav1.TypeMeta{ + Kind: application.ApplicationKind, + APIVersion: "argoproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "app1", + Namespace: "namespace", + ResourceVersion: "2", + }, + Spec: v1alpha1.ApplicationSpec{ + Project: "project", + SyncPolicy: &v1alpha1.SyncPolicy{}, + }, + }, + }, + }, { name: "Ensure that argocd pre-delete and post-delete finalizers are preserved from an existing app", appSet: v1alpha1.ApplicationSet{ @@ -1186,6 +1251,374 @@ func TestCreateOrUpdateInCluster(t *testing.T) { } } +func TestCreateOrUpdateInCluster_Concurrent(t *testing.T) { + scheme := runtime.NewScheme() + err := v1alpha1.AddToScheme(scheme) + require.NoError(t, err) + + appSet := v1alpha1.ApplicationSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + } + + t.Run("all apps are created correctly with concurrency > 1", func(t *testing.T) { + desiredApps := make([]v1alpha1.Application, 5) + for i := range desiredApps { + desiredApps[i] = v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("app%d", i), + Namespace: "namespace", + }, + Spec: v1alpha1.ApplicationSpec{Project: "project"}, + } + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(&appSet). + WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer). + Build() + metrics := appsetmetrics.NewFakeAppsetMetrics() + + r := ApplicationSetReconciler{ + Client: fakeClient, + Scheme: scheme, + Recorder: record.NewFakeRecorder(10), + Metrics: metrics, + ConcurrentApplicationUpdates: 5, + } + + err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, desiredApps) + require.NoError(t, err) + + for _, desired := range desiredApps { + got := &v1alpha1.Application{} + require.NoError(t, fakeClient.Get(t.Context(), crtclient.ObjectKey{Namespace: desired.Namespace, Name: desired.Name}, got)) + assert.Equal(t, desired.Spec.Project, got.Spec.Project) + } + }) + + t.Run("non-context errors from concurrent goroutines are collected and one is returned", func(t *testing.T) { + existingApps := make([]v1alpha1.Application, 5) + initObjs := []crtclient.Object{&appSet} + for i := range existingApps { + existingApps[i] = v1alpha1.Application{ + TypeMeta: metav1.TypeMeta{ + Kind: application.ApplicationKind, + APIVersion: "argoproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("app%d", i), + Namespace: "namespace", + ResourceVersion: "1", + }, + Spec: v1alpha1.ApplicationSpec{Project: "old"}, + } + app := existingApps[i].DeepCopy() + require.NoError(t, controllerutil.SetControllerReference(&appSet, app, scheme)) + initObjs = append(initObjs, app) + } + + desiredApps := make([]v1alpha1.Application, 5) + for i := range desiredApps { + desiredApps[i] = v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("app%d", i), + Namespace: "namespace", + }, + Spec: v1alpha1.ApplicationSpec{Project: "new"}, + } + } + + patchErr := errors.New("some patch error") + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(initObjs...). + WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer). + WithInterceptorFuncs(interceptor.Funcs{ + Patch: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ crtclient.Patch, _ ...crtclient.PatchOption) error { + return patchErr + }, + }). + Build() + metrics := appsetmetrics.NewFakeAppsetMetrics() + + r := ApplicationSetReconciler{ + Client: fakeClient, + Scheme: scheme, + Recorder: record.NewFakeRecorder(10), + Metrics: metrics, + ConcurrentApplicationUpdates: 5, + } + + err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, desiredApps) + require.ErrorIs(t, err, patchErr) + }) +} + +func TestCreateOrUpdateInCluster_ContextCancellation(t *testing.T) { + scheme := runtime.NewScheme() + err := v1alpha1.AddToScheme(scheme) + require.NoError(t, err) + + appSet := v1alpha1.ApplicationSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + } + existingApp := v1alpha1.Application{ + TypeMeta: metav1.TypeMeta{ + Kind: application.ApplicationKind, + APIVersion: "argoproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "app1", + Namespace: "namespace", + ResourceVersion: "1", + }, + Spec: v1alpha1.ApplicationSpec{Project: "old"}, + } + desiredApp := v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app1", + Namespace: "namespace", + }, + Spec: v1alpha1.ApplicationSpec{Project: "new"}, + } + + t.Run("context canceled on patch is returned directly", func(t *testing.T) { + initObjs := []crtclient.Object{&appSet} + app := existingApp.DeepCopy() + err = controllerutil.SetControllerReference(&appSet, app, scheme) + require.NoError(t, err) + initObjs = append(initObjs, app) + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(initObjs...). + WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer). + WithInterceptorFuncs(interceptor.Funcs{ + Patch: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ crtclient.Patch, _ ...crtclient.PatchOption) error { + return context.Canceled + }, + }). + Build() + metrics := appsetmetrics.NewFakeAppsetMetrics() + + r := ApplicationSetReconciler{ + Client: fakeClient, + Scheme: scheme, + Recorder: record.NewFakeRecorder(10), + Metrics: metrics, + } + + err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{desiredApp}) + require.ErrorIs(t, err, context.Canceled) + }) + + t.Run("context deadline exceeded on patch is returned directly", func(t *testing.T) { + initObjs := []crtclient.Object{&appSet} + app := existingApp.DeepCopy() + err = controllerutil.SetControllerReference(&appSet, app, scheme) + require.NoError(t, err) + initObjs = append(initObjs, app) + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(initObjs...). + WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer). + WithInterceptorFuncs(interceptor.Funcs{ + Patch: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ crtclient.Patch, _ ...crtclient.PatchOption) error { + return context.DeadlineExceeded + }, + }). + Build() + metrics := appsetmetrics.NewFakeAppsetMetrics() + + r := ApplicationSetReconciler{ + Client: fakeClient, + Scheme: scheme, + Recorder: record.NewFakeRecorder(10), + Metrics: metrics, + } + + err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{desiredApp}) + require.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("non-context error is collected and returned after all goroutines finish", func(t *testing.T) { + initObjs := []crtclient.Object{&appSet} + app := existingApp.DeepCopy() + err = controllerutil.SetControllerReference(&appSet, app, scheme) + require.NoError(t, err) + initObjs = append(initObjs, app) + + patchErr := errors.New("some patch error") + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(initObjs...). + WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer). + WithInterceptorFuncs(interceptor.Funcs{ + Patch: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ crtclient.Patch, _ ...crtclient.PatchOption) error { + return patchErr + }, + }). + Build() + metrics := appsetmetrics.NewFakeAppsetMetrics() + + r := ApplicationSetReconciler{ + Client: fakeClient, + Scheme: scheme, + Recorder: record.NewFakeRecorder(10), + Metrics: metrics, + } + + err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{desiredApp}) + require.ErrorIs(t, err, patchErr) + }) + + t.Run("context canceled on create is returned directly", func(t *testing.T) { + initObjs := []crtclient.Object{&appSet} + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(initObjs...). + WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer). + WithInterceptorFuncs(interceptor.Funcs{ + Create: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ ...crtclient.CreateOption) error { + return context.Canceled + }, + }). + Build() + metrics := appsetmetrics.NewFakeAppsetMetrics() + + r := ApplicationSetReconciler{ + Client: fakeClient, + Scheme: scheme, + Recorder: record.NewFakeRecorder(10), + Metrics: metrics, + } + + newApp := v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{Name: "newapp", Namespace: "namespace"}, + Spec: v1alpha1.ApplicationSpec{Project: "default"}, + } + err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{newApp}) + require.ErrorIs(t, err, context.Canceled) + }) +} + +func TestDeleteInCluster_ContextCancellation(t *testing.T) { + scheme := runtime.NewScheme() + err := v1alpha1.AddToScheme(scheme) + require.NoError(t, err) + err = corev1.AddToScheme(scheme) + require.NoError(t, err) + + appSet := v1alpha1.ApplicationSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + } + existingApp := v1alpha1.Application{ + TypeMeta: metav1.TypeMeta{ + Kind: application.ApplicationKind, + APIVersion: "argoproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "delete-me", + Namespace: "namespace", + ResourceVersion: "1", + }, + Spec: v1alpha1.ApplicationSpec{Project: "project"}, + } + + makeReconciler := func(t *testing.T, fakeClient crtclient.Client) ApplicationSetReconciler { + t.Helper() + kubeclientset := kubefake.NewClientset() + clusterInformer, err := settings.NewClusterInformer(kubeclientset, "namespace") + require.NoError(t, err) + cancel := startAndSyncInformer(t, clusterInformer) + t.Cleanup(cancel) + return ApplicationSetReconciler{ + Client: fakeClient, + Scheme: scheme, + Recorder: record.NewFakeRecorder(10), + KubeClientset: kubeclientset, + Metrics: appsetmetrics.NewFakeAppsetMetrics(), + ClusterInformer: clusterInformer, + } + } + + t.Run("context canceled on delete is returned directly", func(t *testing.T) { + app := existingApp.DeepCopy() + err = controllerutil.SetControllerReference(&appSet, app, scheme) + require.NoError(t, err) + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(&appSet, app). + WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer). + WithInterceptorFuncs(interceptor.Funcs{ + Delete: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ ...crtclient.DeleteOption) error { + return context.Canceled + }, + }). + Build() + + r := makeReconciler(t, fakeClient) + err = r.deleteInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{}) + require.ErrorIs(t, err, context.Canceled) + }) + + t.Run("context deadline exceeded on delete is returned directly", func(t *testing.T) { + app := existingApp.DeepCopy() + err = controllerutil.SetControllerReference(&appSet, app, scheme) + require.NoError(t, err) + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(&appSet, app). + WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer). + WithInterceptorFuncs(interceptor.Funcs{ + Delete: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ ...crtclient.DeleteOption) error { + return context.DeadlineExceeded + }, + }). + Build() + + r := makeReconciler(t, fakeClient) + err = r.deleteInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{}) + require.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("non-context delete error is collected and returned", func(t *testing.T) { + app := existingApp.DeepCopy() + err = controllerutil.SetControllerReference(&appSet, app, scheme) + require.NoError(t, err) + + deleteErr := errors.New("delete failed") + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(&appSet, app). + WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer). + WithInterceptorFuncs(interceptor.Funcs{ + Delete: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ ...crtclient.DeleteOption) error { + return deleteErr + }, + }). + Build() + + r := makeReconciler(t, fakeClient) + err = r.deleteInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{}) + require.ErrorIs(t, err, deleteErr) + }) +} + func TestRemoveFinalizerOnInvalidDestination_FinalizerTypes(t *testing.T) { scheme := runtime.NewScheme() err := v1alpha1.AddToScheme(scheme) @@ -7321,6 +7754,40 @@ func TestIsRollingSyncStrategy(t *testing.T) { } } +func TestFirstAppError(t *testing.T) { + errA := errors.New("error from app-a") + errB := errors.New("error from app-b") + errC := errors.New("error from app-c") + + t.Run("returns nil for empty map", func(t *testing.T) { + assert.NoError(t, firstAppError(map[string]error{})) + }) + + t.Run("returns the single error", func(t *testing.T) { + assert.ErrorIs(t, firstAppError(map[string]error{"app-a": errA}), errA) + }) + + t.Run("returns error from lexicographically first app name", func(t *testing.T) { + appErrors := map[string]error{ + "app-c": errC, + "app-a": errA, + "app-b": errB, + } + assert.ErrorIs(t, firstAppError(appErrors), errA) + }) + + t.Run("result is stable across multiple calls with same input", func(t *testing.T) { + appErrors := map[string]error{ + "app-c": errC, + "app-a": errA, + "app-b": errB, + } + for range 10 { + assert.ErrorIs(t, firstAppError(appErrors), errA, "firstAppError must return the same error on every call") + } + }) +} + func TestSyncApplication(t *testing.T) { tests := []struct { name string diff --git a/applicationset/utils/createOrUpdate.go b/applicationset/utils/createOrUpdate.go index 1367e11cde..d06cb50acf 100644 --- a/applicationset/utils/createOrUpdate.go +++ b/applicationset/utils/createOrUpdate.go @@ -24,6 +24,43 @@ import ( "github.com/argoproj/argo-cd/v3/util/argo/normalizers" ) +var appEquality = conversion.EqualitiesOrDie( + func(a, b resource.Quantity) bool { + // Ignore formatting, only care that numeric value stayed the same. + // TODO: if we decide it's important, it should be safe to start comparing the format. + // + // Uninitialized quantities are equivalent to 0 quantities. + return a.Cmp(b) == 0 + }, + func(a, b metav1.MicroTime) bool { + return a.UTC().Equal(b.UTC()) + }, + func(a, b metav1.Time) bool { + return a.UTC().Equal(b.UTC()) + }, + func(a, b labels.Selector) bool { + return a.String() == b.String() + }, + func(a, b fields.Selector) bool { + return a.String() == b.String() + }, + func(a, b argov1alpha1.ApplicationDestination) bool { + return a.Namespace == b.Namespace && a.Name == b.Name && a.Server == b.Server + }, +) + +// BuildIgnoreDiffConfig constructs a DiffConfig from the ApplicationSet's ignoreDifferences rules. +// Returns nil when ignoreDifferences is empty. +func BuildIgnoreDiffConfig(ignoreDifferences argov1alpha1.ApplicationSetIgnoreDifferences, ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts) (argodiff.DiffConfig, error) { + if len(ignoreDifferences) == 0 { + return nil, nil + } + return argodiff.NewDiffConfigBuilder(). + WithDiffSettings(ignoreDifferences.ToApplicationIgnoreDifferences(), nil, false, ignoreNormalizerOpts). + WithNoCache(). + Build() +} + // CreateOrUpdate overrides "sigs.k8s.io/controller-runtime" function // in sigs.k8s.io/controller-runtime/pkg/controller/controllerutil/controllerutil.go // to add equality for argov1alpha1.ApplicationDestination @@ -34,10 +71,15 @@ import ( // cluster. The object's desired state must be reconciled with the existing // state inside the passed in callback MutateFn. // +// diffConfig must be built once per reconcile cycle via BuildIgnoreDiffConfig and may be nil +// when there are no ignoreDifferences rules. obj.Spec must already be normalized by the caller +// via NormalizeApplicationSpec before this function is called; the live object fetched from the +// cluster is normalized internally. +// // The MutateFn is called regardless of creating or updating an object. // // It returns the executed operation and an error. -func CreateOrUpdate(ctx context.Context, logCtx *log.Entry, c client.Client, ignoreAppDifferences argov1alpha1.ApplicationSetIgnoreDifferences, ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts, obj *argov1alpha1.Application, f controllerutil.MutateFn) (controllerutil.OperationResult, error) { +func CreateOrUpdate(ctx context.Context, logCtx *log.Entry, c client.Client, diffConfig argodiff.DiffConfig, obj *argov1alpha1.Application, f controllerutil.MutateFn) (controllerutil.OperationResult, error) { key := client.ObjectKeyFromObject(obj) if err := c.Get(ctx, key, obj); err != nil { if !errors.IsNotFound(err) { @@ -59,43 +101,18 @@ func CreateOrUpdate(ctx context.Context, logCtx *log.Entry, c client.Client, ign return controllerutil.OperationResultNone, err } + // Normalize the live spec to avoid spurious diffs from unimportant differences (e.g. nil vs + // empty SyncPolicy). obj.Spec is already normalized by the caller; only the live side needs it. + normalizedLive.Spec = *argo.NormalizeApplicationSpec(&normalizedLive.Spec) + // Apply ignoreApplicationDifferences rules to remove ignored fields from both the live and the desired state. This // prevents those differences from appearing in the diff and therefore in the patch. - err := applyIgnoreDifferences(ignoreAppDifferences, normalizedLive, obj, ignoreNormalizerOpts) + err := applyIgnoreDifferences(diffConfig, normalizedLive, obj) if err != nil { return controllerutil.OperationResultNone, fmt.Errorf("failed to apply ignore differences: %w", err) } - // Normalize to avoid diffing on unimportant differences. - normalizedLive.Spec = *argo.NormalizeApplicationSpec(&normalizedLive.Spec) - obj.Spec = *argo.NormalizeApplicationSpec(&obj.Spec) - - equality := conversion.EqualitiesOrDie( - func(a, b resource.Quantity) bool { - // Ignore formatting, only care that numeric value stayed the same. - // TODO: if we decide it's important, it should be safe to start comparing the format. - // - // Uninitialized quantities are equivalent to 0 quantities. - return a.Cmp(b) == 0 - }, - func(a, b metav1.MicroTime) bool { - return a.UTC().Equal(b.UTC()) - }, - func(a, b metav1.Time) bool { - return a.UTC().Equal(b.UTC()) - }, - func(a, b labels.Selector) bool { - return a.String() == b.String() - }, - func(a, b fields.Selector) bool { - return a.String() == b.String() - }, - func(a, b argov1alpha1.ApplicationDestination) bool { - return a.Namespace == b.Namespace && a.Name == b.Name && a.Server == b.Server - }, - ) - - if equality.DeepEqual(normalizedLive, obj) { + if appEquality.DeepEqual(normalizedLive, obj) { return controllerutil.OperationResultNone, nil } @@ -135,19 +152,13 @@ func mutate(f controllerutil.MutateFn, key client.ObjectKey, obj client.Object) } // applyIgnoreDifferences applies the ignore differences rules to the found application. It modifies the applications in place. -func applyIgnoreDifferences(applicationSetIgnoreDifferences argov1alpha1.ApplicationSetIgnoreDifferences, found *argov1alpha1.Application, generatedApp *argov1alpha1.Application, ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts) error { - if len(applicationSetIgnoreDifferences) == 0 { +// diffConfig may be nil, in which case this is a no-op. +func applyIgnoreDifferences(diffConfig argodiff.DiffConfig, found *argov1alpha1.Application, generatedApp *argov1alpha1.Application) error { + if diffConfig == nil { return nil } generatedAppCopy := generatedApp.DeepCopy() - diffConfig, err := argodiff.NewDiffConfigBuilder(). - WithDiffSettings(applicationSetIgnoreDifferences.ToApplicationIgnoreDifferences(), nil, false, ignoreNormalizerOpts). - WithNoCache(). - Build() - if err != nil { - return fmt.Errorf("failed to build diff config: %w", err) - } unstructuredFound, err := appToUnstructured(found) if err != nil { return fmt.Errorf("failed to convert found application to unstructured: %w", err) diff --git a/applicationset/utils/createOrUpdate_test.go b/applicationset/utils/createOrUpdate_test.go index 374eb826ea..bc06f5e1dd 100644 --- a/applicationset/utils/createOrUpdate_test.go +++ b/applicationset/utils/createOrUpdate_test.go @@ -224,7 +224,9 @@ spec: generatedApp := v1alpha1.Application{TypeMeta: appMeta} err = yaml.Unmarshal([]byte(tc.generatedApp), &generatedApp) require.NoError(t, err, tc.generatedApp) - err = applyIgnoreDifferences(tc.ignoreDifferences, &foundApp, &generatedApp, normalizers.IgnoreNormalizerOpts{}) + diffConfig, err := BuildIgnoreDiffConfig(tc.ignoreDifferences, normalizers.IgnoreNormalizerOpts{}) + require.NoError(t, err) + err = applyIgnoreDifferences(diffConfig, &foundApp, &generatedApp) require.NoError(t, err) yamlFound, err := yaml.Marshal(tc.foundApp) require.NoError(t, err) diff --git a/cmd/argocd-applicationset-controller/commands/applicationset_controller.go b/cmd/argocd-applicationset-controller/commands/applicationset_controller.go index 088f8c7326..3f20c27214 100644 --- a/cmd/argocd-applicationset-controller/commands/applicationset_controller.go +++ b/cmd/argocd-applicationset-controller/commands/applicationset_controller.go @@ -79,6 +79,7 @@ func NewCommand() *cobra.Command { tokenRefStrictMode bool maxResourcesStatusCount int cacheSyncPeriod time.Duration + concurrentApplicationUpdates int ) scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) @@ -239,24 +240,25 @@ func NewCommand() *cobra.Command { }) if err = (&controllers.ApplicationSetReconciler{ - Generators: topLevelGenerators, - Client: utils.NewCacheSyncingClient(mgr.GetClient(), mgr.GetCache()), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("applicationset-controller"), - Renderer: &utils.Render{}, - Policy: policyObj, - EnablePolicyOverride: enablePolicyOverride, - KubeClientset: k8sClient, - ArgoDB: argoCDDB, - ArgoCDNamespace: namespace, - ApplicationSetNamespaces: applicationSetNamespaces, - EnableProgressiveSyncs: enableProgressiveSyncs, - SCMRootCAPath: scmRootCAPath, - GlobalPreservedAnnotations: globalPreservedAnnotations, - GlobalPreservedLabels: globalPreservedLabels, - Metrics: &metrics, - MaxResourcesStatusCount: maxResourcesStatusCount, - ClusterInformer: clusterInformer, + Generators: topLevelGenerators, + Client: utils.NewCacheSyncingClient(mgr.GetClient(), mgr.GetCache()), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("applicationset-controller"), + Renderer: &utils.Render{}, + Policy: policyObj, + EnablePolicyOverride: enablePolicyOverride, + KubeClientset: k8sClient, + ArgoDB: argoCDDB, + ArgoCDNamespace: namespace, + ApplicationSetNamespaces: applicationSetNamespaces, + EnableProgressiveSyncs: enableProgressiveSyncs, + SCMRootCAPath: scmRootCAPath, + GlobalPreservedAnnotations: globalPreservedAnnotations, + GlobalPreservedLabels: globalPreservedLabels, + Metrics: &metrics, + MaxResourcesStatusCount: maxResourcesStatusCount, + ClusterInformer: clusterInformer, + ConcurrentApplicationUpdates: concurrentApplicationUpdates, }).SetupWithManager(mgr, enableProgressiveSyncs, maxConcurrentReconciliations); err != nil { log.Error(err, "unable to create controller", "controller", "ApplicationSet") os.Exit(1) @@ -303,6 +305,7 @@ func NewCommand() *cobra.Command { command.Flags().BoolVar(&enableGitHubAPIMetrics, "enable-github-api-metrics", env.ParseBoolFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_ENABLE_GITHUB_API_METRICS", false), "Enable GitHub API metrics for generators that use the GitHub API") command.Flags().IntVar(&maxResourcesStatusCount, "max-resources-status-count", env.ParseNumFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_MAX_RESOURCES_STATUS_COUNT", 5000, 0, math.MaxInt), "Max number of resources stored in appset status.") command.Flags().DurationVar(&cacheSyncPeriod, "cache-sync-period", env.ParseDurationFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_CACHE_SYNC_PERIOD", time.Hour*10, 0, time.Hour*24), "Period at which the manager client cache is forcefully resynced with the Kubernetes API server. 0 disables periodic resync.") + command.Flags().IntVar(&concurrentApplicationUpdates, "concurrent-application-updates", env.ParseNumFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_CONCURRENT_APPLICATION_UPDATES", 1, 1, 200), "Number of concurrent Application create/update/delete operations per ApplicationSet reconcile.") return &command } diff --git a/cmd/argocd-applicationset-controller/commands/applicationset_controller_test.go b/cmd/argocd-applicationset-controller/commands/applicationset_controller_test.go new file mode 100644 index 0000000000..346502958f --- /dev/null +++ b/cmd/argocd-applicationset-controller/commands/applicationset_controller_test.go @@ -0,0 +1,28 @@ +package command + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewCommand_ConcurrentApplicationUpdatesFlag(t *testing.T) { + cmd := NewCommand() + + flag := cmd.Flags().Lookup("concurrent-application-updates") + require.NotNil(t, flag, "expected --concurrent-application-updates flag to be registered") + assert.Equal(t, "int", flag.Value.Type()) + assert.Equal(t, "1", flag.DefValue, "default should be 1") +} + +func TestNewCommand_ConcurrentApplicationUpdatesFlagValue(t *testing.T) { + cmd := NewCommand() + + err := cmd.Flags().Set("concurrent-application-updates", "5") + require.NoError(t, err) + + val, err := cmd.Flags().GetInt("concurrent-application-updates") + require.NoError(t, err) + assert.Equal(t, 5, val) +} diff --git a/docs/operator-manual/server-commands/argocd-applicationset-controller.md b/docs/operator-manual/server-commands/argocd-applicationset-controller.md index 836598c7de..ee720b49b7 100644 --- a/docs/operator-manual/server-commands/argocd-applicationset-controller.md +++ b/docs/operator-manual/server-commands/argocd-applicationset-controller.md @@ -22,6 +22,7 @@ argocd-applicationset-controller [flags] --client-certificate string Path to a client certificate file for TLS --client-key string Path to a client key file for TLS --cluster string The name of the kubeconfig cluster to use + --concurrent-application-updates int Number of concurrent Application create/update/delete operations per ApplicationSet reconcile. (default 1) --concurrent-reconciliations int Max concurrent reconciliations limit for the controller (default 10) --context string The name of the kubeconfig context to use --debug Print debug logs. Takes precedence over loglevel diff --git a/gitops-engine/pkg/cache/mocks/ClusterCache.go b/gitops-engine/pkg/cache/mocks/ClusterCache.go index dbf8241760..ba5c0b4754 100644 --- a/gitops-engine/pkg/cache/mocks/ClusterCache.go +++ b/gitops-engine/pkg/cache/mocks/ClusterCache.go @@ -563,7 +563,7 @@ func (_c *ClusterCache_IsNamespaced_Call) RunAndReturn(run func(gk schema.GroupK return _c } -// IterateHierarchyV2 provides a mock function with given fields: keys, action, orphanedResourceNamespace +// IterateHierarchyV2 provides a mock function for the type ClusterCache func (_mock *ClusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *cache.Resource, namespaceResources map[kube.ResourceKey]*cache.Resource) bool) { _mock.Called(keys, action) return diff --git a/util/settings/cluster_informer_test.go b/util/settings/cluster_informer_test.go index dfe62cf617..fab27ec4f2 100644 --- a/util/settings/cluster_informer_test.go +++ b/util/settings/cluster_informer_test.go @@ -734,10 +734,13 @@ func TestClusterInformer_SecretDeletion(t *testing.T) { err = clientset.CoreV1().Secrets("argocd").Delete(t.Context(), "cluster1", metav1.DeleteOptions{}) require.NoError(t, err) - time.Sleep(100 * time.Millisecond) + require.Eventually(t, func() bool { + _, err := informer.GetClusterByURL("https://cluster1.example.com") + return err != nil + }, 5*time.Second, 10*time.Millisecond, "expected cluster1 to be removed from cache after secret deletion") _, err = informer.GetClusterByURL("https://cluster1.example.com") - assert.Error(t, err) + require.Error(t, err) assert.Contains(t, err.Error(), "not found") cluster2, err := informer.GetClusterByURL("https://cluster2.example.com")