mirror of
https://github.com/argoproj/argo-cd
synced 2026-04-21 08:57:17 +00:00
feat(appset): add concurrency when managing applications (#26642)
Signed-off-by: rumstead <37445536+rumstead@users.noreply.github.com> Signed-off-by: Alexandre Gaudreault <alexandre_gaudreault@intuit.com> Co-authored-by: Alexandre Gaudreault <alexandre_gaudreault@intuit.com>
This commit is contained in:
parent
5ceb8354e6
commit
5b3073986f
10 changed files with 767 additions and 176 deletions
2
Procfile
2
Procfile
|
|
@ -2,7 +2,7 @@ controller: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run
|
|||
api-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/api-server} FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-server $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --disable-auth=${ARGOCD_E2E_DISABLE_AUTH:-'true'} --insecure --dex-server http://localhost:${ARGOCD_E2E_DEX_PORT:-5556} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --port ${ARGOCD_E2E_APISERVER_PORT:-8080} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''} --hydrator-enabled=${ARGOCD_HYDRATOR_ENABLED:='false'}"
|
||||
dex: sh -c "ARGOCD_BINARY_NAME=argocd-dex go run github.com/argoproj/argo-cd/v3/cmd gendexcfg -o `pwd`/dist/dex.yaml && (test -f dist/dex.yaml || { echo 'Failed to generate dex configuration'; exit 1; }) && docker run --rm -p ${ARGOCD_E2E_DEX_PORT:-5556}:${ARGOCD_E2E_DEX_PORT:-5556} -v `pwd`/dist/dex.yaml:/dex.yaml ghcr.io/dexidp/dex:$(grep "image: ghcr.io/dexidp/dex:v2.45.0" manifests/base/dex/argocd-dex-server-deployment.yaml | cut -d':' -f3) dex serve /dex.yaml"
|
||||
redis: hack/start-redis-with-password.sh
|
||||
repo-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "export PATH=./dist:\$PATH && [ -n \"\$ARGOCD_GIT_CONFIG\" ] && export GIT_CONFIG_GLOBAL=\$ARGOCD_GIT_CONFIG && export GIT_CONFIG_NOSYSTEM=1; GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/repo-server} FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_GNUPGHOME=${ARGOCD_GNUPGHOME:-/tmp/argocd-local/gpg/keys} ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} ARGOCD_GPG_DATA_PATH=${ARGOCD_GPG_DATA_PATH:-/tmp/argocd-local/gpg/source} ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-repo-server ARGOCD_GPG_ENABLED=${ARGOCD_GPG_ENABLED:-false} $COMMAND --loglevel debug --port ${ARGOCD_E2E_REPOSERVER_PORT:-8081} --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --otlp-address=${ARGOCD_OTLP_ADDRESS}"
|
||||
repo-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "export PATH=\$(pwd)/dist:\$PATH && [ -n \"\$ARGOCD_GIT_CONFIG\" ] && export GIT_CONFIG_GLOBAL=\$ARGOCD_GIT_CONFIG && export GIT_CONFIG_NOSYSTEM=1; GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/repo-server} FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_GNUPGHOME=${ARGOCD_GNUPGHOME:-/tmp/argocd-local/gpg/keys} ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} ARGOCD_GPG_DATA_PATH=${ARGOCD_GPG_DATA_PATH:-/tmp/argocd-local/gpg/source} ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-repo-server ARGOCD_GPG_ENABLED=${ARGOCD_GPG_ENABLED:-false} $COMMAND --loglevel debug --port ${ARGOCD_E2E_REPOSERVER_PORT:-8081} --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --otlp-address=${ARGOCD_OTLP_ADDRESS}"
|
||||
cmp-server: [ "$ARGOCD_E2E_TEST" = 'true' ] && exit 0 || [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_BINARY_NAME=argocd-cmp-server ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} $COMMAND --config-dir-path ./test/cmp --loglevel debug --otlp-address=${ARGOCD_OTLP_ADDRESS}"
|
||||
commit-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "GOCOVERDIR=${ARGOCD_COVERAGE_DIR:-/tmp/coverage/commit-server} FORCE_LOG_COLORS=1 ARGOCD_BINARY_NAME=argocd-commit-server $COMMAND --loglevel debug --port ${ARGOCD_E2E_COMMITSERVER_PORT:-8086}"
|
||||
ui: sh -c 'cd ui && ${ARGOCD_E2E_YARN_CMD:-yarn} start'
|
||||
|
|
|
|||
|
|
@ -24,11 +24,13 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
|
@ -103,15 +105,16 @@ type ApplicationSetReconciler struct {
|
|||
Policy argov1alpha1.ApplicationsSyncPolicy
|
||||
EnablePolicyOverride bool
|
||||
utils.Renderer
|
||||
ArgoCDNamespace string
|
||||
ApplicationSetNamespaces []string
|
||||
EnableProgressiveSyncs bool
|
||||
SCMRootCAPath string
|
||||
GlobalPreservedAnnotations []string
|
||||
GlobalPreservedLabels []string
|
||||
Metrics *metrics.ApplicationsetMetrics
|
||||
MaxResourcesStatusCount int
|
||||
ClusterInformer *settings.ClusterInformer
|
||||
ArgoCDNamespace string
|
||||
ApplicationSetNamespaces []string
|
||||
EnableProgressiveSyncs bool
|
||||
SCMRootCAPath string
|
||||
GlobalPreservedAnnotations []string
|
||||
GlobalPreservedLabels []string
|
||||
Metrics *metrics.ApplicationsetMetrics
|
||||
MaxResourcesStatusCount int
|
||||
ClusterInformer *settings.ClusterInformer
|
||||
ConcurrentApplicationUpdates int
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=argoproj.io,resources=applicationsets,verbs=get;list;watch;create;update;patch;delete
|
||||
|
|
@ -688,108 +691,133 @@ func (r *ApplicationSetReconciler) SetupWithManager(mgr ctrl.Manager, enableProg
|
|||
// - For existing application, it will call update
|
||||
// The function also adds owner reference to all applications, and uses it to delete them.
|
||||
func (r *ApplicationSetReconciler) createOrUpdateInCluster(ctx context.Context, logCtx *log.Entry, applicationSet argov1alpha1.ApplicationSet, desiredApplications []argov1alpha1.Application) error {
|
||||
var firstError error
|
||||
// Creates or updates the application in appList
|
||||
for _, generatedApp := range desiredApplications {
|
||||
appLog := logCtx.WithFields(applog.GetAppLogFields(&generatedApp))
|
||||
// Build the diff config once per reconcile.
|
||||
// Diff config is per applicationset, so generate it once for all applications
|
||||
diffConfig, err := utils.BuildIgnoreDiffConfig(applicationSet.Spec.IgnoreApplicationDifferences, normalizers.IgnoreNormalizerOpts{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build ignore diff config: %w", err)
|
||||
}
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
concurrency := r.concurrency()
|
||||
g.SetLimit(concurrency)
|
||||
|
||||
var appErrorsMu sync.Mutex
|
||||
appErrors := map[string]error{}
|
||||
|
||||
for _, generatedApp := range desiredApplications {
|
||||
// Normalize to avoid fighting with the application controller.
|
||||
generatedApp.Spec = *argoutil.NormalizeApplicationSpec(&generatedApp.Spec)
|
||||
g.Go(func() error {
|
||||
appLog := logCtx.WithFields(applog.GetAppLogFields(&generatedApp))
|
||||
|
||||
found := &argov1alpha1.Application{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: generatedApp.Name,
|
||||
Namespace: generatedApp.Namespace,
|
||||
},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: application.ApplicationKind,
|
||||
APIVersion: "argoproj.io/v1alpha1",
|
||||
},
|
||||
}
|
||||
|
||||
action, err := utils.CreateOrUpdate(ctx, appLog, r.Client, applicationSet.Spec.IgnoreApplicationDifferences, normalizers.IgnoreNormalizerOpts{}, found, func() error {
|
||||
// Copy only the Application/ObjectMeta fields that are significant, from the generatedApp
|
||||
found.Spec = generatedApp.Spec
|
||||
|
||||
// allow setting the Operation field to trigger a sync operation on an Application
|
||||
if generatedApp.Operation != nil {
|
||||
found.Operation = generatedApp.Operation
|
||||
found := &argov1alpha1.Application{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: generatedApp.Name,
|
||||
Namespace: generatedApp.Namespace,
|
||||
},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: application.ApplicationKind,
|
||||
APIVersion: "argoproj.io/v1alpha1",
|
||||
},
|
||||
}
|
||||
|
||||
preservedAnnotations := make([]string, 0)
|
||||
preservedLabels := make([]string, 0)
|
||||
action, err := utils.CreateOrUpdate(ctx, appLog, r.Client, diffConfig, found, func() error {
|
||||
// Copy only the Application/ObjectMeta fields that are significant, from the generatedApp
|
||||
found.Spec = generatedApp.Spec
|
||||
|
||||
if applicationSet.Spec.PreservedFields != nil {
|
||||
preservedAnnotations = append(preservedAnnotations, applicationSet.Spec.PreservedFields.Annotations...)
|
||||
preservedLabels = append(preservedLabels, applicationSet.Spec.PreservedFields.Labels...)
|
||||
}
|
||||
|
||||
if len(r.GlobalPreservedAnnotations) > 0 {
|
||||
preservedAnnotations = append(preservedAnnotations, r.GlobalPreservedAnnotations...)
|
||||
}
|
||||
|
||||
if len(r.GlobalPreservedLabels) > 0 {
|
||||
preservedLabels = append(preservedLabels, r.GlobalPreservedLabels...)
|
||||
}
|
||||
|
||||
// Preserve specially treated argo cd annotations:
|
||||
// * https://github.com/argoproj/applicationset/issues/180
|
||||
// * https://github.com/argoproj/argo-cd/issues/10500
|
||||
preservedAnnotations = append(preservedAnnotations, defaultPreservedAnnotations...)
|
||||
|
||||
for _, key := range preservedAnnotations {
|
||||
if state, exists := found.Annotations[key]; exists {
|
||||
if generatedApp.Annotations == nil {
|
||||
generatedApp.Annotations = map[string]string{}
|
||||
}
|
||||
generatedApp.Annotations[key] = state
|
||||
// allow setting the Operation field to trigger a sync operation on an Application
|
||||
if generatedApp.Operation != nil {
|
||||
found.Operation = generatedApp.Operation
|
||||
}
|
||||
}
|
||||
|
||||
for _, key := range preservedLabels {
|
||||
if state, exists := found.Labels[key]; exists {
|
||||
if generatedApp.Labels == nil {
|
||||
generatedApp.Labels = map[string]string{}
|
||||
}
|
||||
generatedApp.Labels[key] = state
|
||||
preservedAnnotations := make([]string, 0)
|
||||
preservedLabels := make([]string, 0)
|
||||
|
||||
if applicationSet.Spec.PreservedFields != nil {
|
||||
preservedAnnotations = append(preservedAnnotations, applicationSet.Spec.PreservedFields.Annotations...)
|
||||
preservedLabels = append(preservedLabels, applicationSet.Spec.PreservedFields.Labels...)
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve deleting finalizers and avoid diff conflicts
|
||||
for _, finalizer := range defaultPreservedFinalizers {
|
||||
for _, f := range found.Finalizers {
|
||||
// For finalizers, use prefix matching in case it contains "/" stages
|
||||
if strings.HasPrefix(f, finalizer) {
|
||||
generatedApp.Finalizers = append(generatedApp.Finalizers, f)
|
||||
if len(r.GlobalPreservedAnnotations) > 0 {
|
||||
preservedAnnotations = append(preservedAnnotations, r.GlobalPreservedAnnotations...)
|
||||
}
|
||||
|
||||
if len(r.GlobalPreservedLabels) > 0 {
|
||||
preservedLabels = append(preservedLabels, r.GlobalPreservedLabels...)
|
||||
}
|
||||
|
||||
// Preserve specially treated argo cd annotations:
|
||||
// * https://github.com/argoproj/applicationset/issues/180
|
||||
// * https://github.com/argoproj/argo-cd/issues/10500
|
||||
preservedAnnotations = append(preservedAnnotations, defaultPreservedAnnotations...)
|
||||
|
||||
for _, key := range preservedAnnotations {
|
||||
if state, exists := found.Annotations[key]; exists {
|
||||
if generatedApp.Annotations == nil {
|
||||
generatedApp.Annotations = map[string]string{}
|
||||
}
|
||||
generatedApp.Annotations[key] = state
|
||||
}
|
||||
}
|
||||
|
||||
for _, key := range preservedLabels {
|
||||
if state, exists := found.Labels[key]; exists {
|
||||
if generatedApp.Labels == nil {
|
||||
generatedApp.Labels = map[string]string{}
|
||||
}
|
||||
generatedApp.Labels[key] = state
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve deleting finalizers and avoid diff conflicts
|
||||
for _, finalizer := range defaultPreservedFinalizers {
|
||||
for _, f := range found.Finalizers {
|
||||
// For finalizers, use prefix matching in case it contains "/" stages
|
||||
if strings.HasPrefix(f, finalizer) {
|
||||
generatedApp.Finalizers = append(generatedApp.Finalizers, f)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
found.Annotations = generatedApp.Annotations
|
||||
found.Labels = generatedApp.Labels
|
||||
found.Finalizers = generatedApp.Finalizers
|
||||
|
||||
return controllerutil.SetControllerReference(&applicationSet, found, r.Scheme)
|
||||
})
|
||||
if err != nil {
|
||||
appLog.WithError(err).WithField("action", action).Errorf("failed to %s Application", action)
|
||||
// If the context was canceled or its deadline exceeded, return the error so it propagates through g.Wait().
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
// For backwards compatibility with sequential behavior: continue processing other applications
|
||||
// but record the error keyed by app name so we can deterministically return the error from
|
||||
// the lexicographically first failing app, regardless of goroutine scheduling order.
|
||||
appErrorsMu.Lock()
|
||||
appErrors[generatedApp.Name] = err
|
||||
appErrorsMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
found.Annotations = generatedApp.Annotations
|
||||
found.Labels = generatedApp.Labels
|
||||
found.Finalizers = generatedApp.Finalizers
|
||||
|
||||
return controllerutil.SetControllerReference(&applicationSet, found, r.Scheme)
|
||||
if action != controllerutil.OperationResultNone {
|
||||
// Don't pollute etcd with "unchanged Application" events
|
||||
r.Recorder.Eventf(&applicationSet, corev1.EventTypeNormal, fmt.Sprint(action), "%s Application %q", action, generatedApp.Name)
|
||||
appLog.Logf(log.InfoLevel, "%s Application", action)
|
||||
} else {
|
||||
// "unchanged Application" can be inferred by Reconcile Complete with no action being listed
|
||||
// Or enable debug logging
|
||||
appLog.Logf(log.DebugLevel, "%s Application", action)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
appLog.WithError(err).WithField("action", action).Errorf("failed to %s Application", action)
|
||||
if firstError == nil {
|
||||
firstError = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if action != controllerutil.OperationResultNone {
|
||||
// Don't pollute etcd with "unchanged Application" events
|
||||
r.Recorder.Eventf(&applicationSet, corev1.EventTypeNormal, fmt.Sprint(action), "%s Application %q", action, generatedApp.Name)
|
||||
appLog.Logf(log.InfoLevel, "%s Application", action)
|
||||
} else {
|
||||
// "unchanged Application" can be inferred by Reconcile Complete with no action being listed
|
||||
// Or enable debug logging
|
||||
appLog.Logf(log.DebugLevel, "%s Application", action)
|
||||
}
|
||||
}
|
||||
return firstError
|
||||
|
||||
if err := g.Wait(); errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
return firstAppError(appErrors)
|
||||
}
|
||||
|
||||
// createInCluster will filter from the desiredApplications only the application that needs to be created
|
||||
|
|
@ -849,36 +877,84 @@ func (r *ApplicationSetReconciler) deleteInCluster(ctx context.Context, logCtx *
|
|||
m[app.Name] = true
|
||||
}
|
||||
|
||||
// Delete apps that are not in m[string]bool
|
||||
var firstError error
|
||||
for _, app := range current {
|
||||
logCtx = logCtx.WithFields(applog.GetAppLogFields(&app))
|
||||
_, exists := m[app.Name]
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
concurrency := r.concurrency()
|
||||
g.SetLimit(concurrency)
|
||||
|
||||
if !exists {
|
||||
var appErrorsMu sync.Mutex
|
||||
appErrors := map[string]error{}
|
||||
|
||||
// Delete apps that are not in m[string]bool
|
||||
for _, app := range current {
|
||||
_, exists := m[app.Name]
|
||||
if exists {
|
||||
continue
|
||||
}
|
||||
appLogCtx := logCtx.WithFields(applog.GetAppLogFields(&app))
|
||||
g.Go(func() error {
|
||||
// Removes the Argo CD resources finalizer if the application contains an invalid target (eg missing cluster)
|
||||
err := r.removeFinalizerOnInvalidDestination(ctx, applicationSet, &app, clusterList, logCtx)
|
||||
err := r.removeFinalizerOnInvalidDestination(ctx, applicationSet, &app, clusterList, appLogCtx)
|
||||
if err != nil {
|
||||
logCtx.WithError(err).Error("failed to update Application")
|
||||
if firstError != nil {
|
||||
firstError = err
|
||||
appLogCtx.WithError(err).Error("failed to update Application")
|
||||
// If the context was canceled or its deadline exceeded, return the error so it propagates through g.Wait().
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
// For backwards compatibility with sequential behavior: continue processing other applications
|
||||
// but record the error keyed by app name so we can deterministically return the error from
|
||||
// the lexicographically first failing app, regardless of goroutine scheduling order.
|
||||
appErrorsMu.Lock()
|
||||
appErrors[app.Name] = err
|
||||
appErrorsMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
err = r.Delete(ctx, &app)
|
||||
if err != nil {
|
||||
logCtx.WithError(err).Error("failed to delete Application")
|
||||
if firstError != nil {
|
||||
firstError = err
|
||||
appLogCtx.WithError(err).Error("failed to delete Application")
|
||||
// If the context was canceled or its deadline exceeded, return the error so it propagates through g.Wait().
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
appErrorsMu.Lock()
|
||||
appErrors[app.Name] = err
|
||||
appErrorsMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
r.Recorder.Eventf(&applicationSet, corev1.EventTypeNormal, "Deleted", "Deleted Application %q", app.Name)
|
||||
logCtx.Log(log.InfoLevel, "Deleted application")
|
||||
}
|
||||
appLogCtx.Log(log.InfoLevel, "Deleted application")
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return firstError
|
||||
|
||||
if err := g.Wait(); errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
return firstAppError(appErrors)
|
||||
}
|
||||
|
||||
// concurrency returns the configured number of concurrent application updates, defaulting to 1.
|
||||
func (r *ApplicationSetReconciler) concurrency() int {
|
||||
if r.ConcurrentApplicationUpdates <= 0 {
|
||||
return 1
|
||||
}
|
||||
return r.ConcurrentApplicationUpdates
|
||||
}
|
||||
|
||||
// firstAppError returns the error associated with the lexicographically smallest application name
|
||||
// in the provided map. This gives a deterministic result when multiple goroutines may have
|
||||
// recorded errors concurrently, matching the behavior of the original sequential loop where the
|
||||
// first application in iteration order would determine the returned error.
|
||||
func firstAppError(appErrors map[string]error) error {
|
||||
if len(appErrors) == 0 {
|
||||
return nil
|
||||
}
|
||||
names := make([]string, 0, len(appErrors))
|
||||
for name := range appErrors {
|
||||
names = append(names, name)
|
||||
}
|
||||
sort.Strings(names)
|
||||
return appErrors[names[0]]
|
||||
}
|
||||
|
||||
// removeFinalizerOnInvalidDestination removes the Argo CD resources finalizer if the application contains an invalid target (eg missing cluster)
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
crtclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
|
||||
|
|
@ -1077,6 +1078,70 @@ func TestCreateOrUpdateInCluster(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Ensure that unnormalized live spec does not cause a spurious patch",
|
||||
appSet: v1alpha1.ApplicationSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
Namespace: "namespace",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSetSpec{
|
||||
Template: v1alpha1.ApplicationSetTemplate{
|
||||
Spec: v1alpha1.ApplicationSpec{
|
||||
Project: "project",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
existingApps: []v1alpha1.Application{
|
||||
{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: application.ApplicationKind,
|
||||
APIVersion: "argoproj.io/v1alpha1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "app1",
|
||||
Namespace: "namespace",
|
||||
ResourceVersion: "2",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSpec{
|
||||
Project: "project",
|
||||
// Without normalizing the live object, the equality check
|
||||
// sees &SyncPolicy{} vs nil and issues an unnecessary patch.
|
||||
SyncPolicy: &v1alpha1.SyncPolicy{},
|
||||
},
|
||||
},
|
||||
},
|
||||
desiredApps: []v1alpha1.Application{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "app1",
|
||||
Namespace: "namespace",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSpec{
|
||||
Project: "project",
|
||||
SyncPolicy: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: []v1alpha1.Application{
|
||||
{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: application.ApplicationKind,
|
||||
APIVersion: "argoproj.io/v1alpha1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "app1",
|
||||
Namespace: "namespace",
|
||||
ResourceVersion: "2",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSpec{
|
||||
Project: "project",
|
||||
SyncPolicy: &v1alpha1.SyncPolicy{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Ensure that argocd pre-delete and post-delete finalizers are preserved from an existing app",
|
||||
appSet: v1alpha1.ApplicationSet{
|
||||
|
|
@ -1186,6 +1251,374 @@ func TestCreateOrUpdateInCluster(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCreateOrUpdateInCluster_Concurrent(t *testing.T) {
|
||||
scheme := runtime.NewScheme()
|
||||
err := v1alpha1.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
|
||||
appSet := v1alpha1.ApplicationSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
Namespace: "namespace",
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("all apps are created correctly with concurrency > 1", func(t *testing.T) {
|
||||
desiredApps := make([]v1alpha1.Application, 5)
|
||||
for i := range desiredApps {
|
||||
desiredApps[i] = v1alpha1.Application{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("app%d", i),
|
||||
Namespace: "namespace",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSpec{Project: "project"},
|
||||
}
|
||||
}
|
||||
|
||||
fakeClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(&appSet).
|
||||
WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer).
|
||||
Build()
|
||||
metrics := appsetmetrics.NewFakeAppsetMetrics()
|
||||
|
||||
r := ApplicationSetReconciler{
|
||||
Client: fakeClient,
|
||||
Scheme: scheme,
|
||||
Recorder: record.NewFakeRecorder(10),
|
||||
Metrics: metrics,
|
||||
ConcurrentApplicationUpdates: 5,
|
||||
}
|
||||
|
||||
err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, desiredApps)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, desired := range desiredApps {
|
||||
got := &v1alpha1.Application{}
|
||||
require.NoError(t, fakeClient.Get(t.Context(), crtclient.ObjectKey{Namespace: desired.Namespace, Name: desired.Name}, got))
|
||||
assert.Equal(t, desired.Spec.Project, got.Spec.Project)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("non-context errors from concurrent goroutines are collected and one is returned", func(t *testing.T) {
|
||||
existingApps := make([]v1alpha1.Application, 5)
|
||||
initObjs := []crtclient.Object{&appSet}
|
||||
for i := range existingApps {
|
||||
existingApps[i] = v1alpha1.Application{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: application.ApplicationKind,
|
||||
APIVersion: "argoproj.io/v1alpha1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("app%d", i),
|
||||
Namespace: "namespace",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSpec{Project: "old"},
|
||||
}
|
||||
app := existingApps[i].DeepCopy()
|
||||
require.NoError(t, controllerutil.SetControllerReference(&appSet, app, scheme))
|
||||
initObjs = append(initObjs, app)
|
||||
}
|
||||
|
||||
desiredApps := make([]v1alpha1.Application, 5)
|
||||
for i := range desiredApps {
|
||||
desiredApps[i] = v1alpha1.Application{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("app%d", i),
|
||||
Namespace: "namespace",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSpec{Project: "new"},
|
||||
}
|
||||
}
|
||||
|
||||
patchErr := errors.New("some patch error")
|
||||
fakeClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(initObjs...).
|
||||
WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer).
|
||||
WithInterceptorFuncs(interceptor.Funcs{
|
||||
Patch: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ crtclient.Patch, _ ...crtclient.PatchOption) error {
|
||||
return patchErr
|
||||
},
|
||||
}).
|
||||
Build()
|
||||
metrics := appsetmetrics.NewFakeAppsetMetrics()
|
||||
|
||||
r := ApplicationSetReconciler{
|
||||
Client: fakeClient,
|
||||
Scheme: scheme,
|
||||
Recorder: record.NewFakeRecorder(10),
|
||||
Metrics: metrics,
|
||||
ConcurrentApplicationUpdates: 5,
|
||||
}
|
||||
|
||||
err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, desiredApps)
|
||||
require.ErrorIs(t, err, patchErr)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCreateOrUpdateInCluster_ContextCancellation(t *testing.T) {
|
||||
scheme := runtime.NewScheme()
|
||||
err := v1alpha1.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
|
||||
appSet := v1alpha1.ApplicationSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
Namespace: "namespace",
|
||||
},
|
||||
}
|
||||
existingApp := v1alpha1.Application{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: application.ApplicationKind,
|
||||
APIVersion: "argoproj.io/v1alpha1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "app1",
|
||||
Namespace: "namespace",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSpec{Project: "old"},
|
||||
}
|
||||
desiredApp := v1alpha1.Application{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "app1",
|
||||
Namespace: "namespace",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSpec{Project: "new"},
|
||||
}
|
||||
|
||||
t.Run("context canceled on patch is returned directly", func(t *testing.T) {
|
||||
initObjs := []crtclient.Object{&appSet}
|
||||
app := existingApp.DeepCopy()
|
||||
err = controllerutil.SetControllerReference(&appSet, app, scheme)
|
||||
require.NoError(t, err)
|
||||
initObjs = append(initObjs, app)
|
||||
|
||||
fakeClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(initObjs...).
|
||||
WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer).
|
||||
WithInterceptorFuncs(interceptor.Funcs{
|
||||
Patch: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ crtclient.Patch, _ ...crtclient.PatchOption) error {
|
||||
return context.Canceled
|
||||
},
|
||||
}).
|
||||
Build()
|
||||
metrics := appsetmetrics.NewFakeAppsetMetrics()
|
||||
|
||||
r := ApplicationSetReconciler{
|
||||
Client: fakeClient,
|
||||
Scheme: scheme,
|
||||
Recorder: record.NewFakeRecorder(10),
|
||||
Metrics: metrics,
|
||||
}
|
||||
|
||||
err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{desiredApp})
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
})
|
||||
|
||||
t.Run("context deadline exceeded on patch is returned directly", func(t *testing.T) {
|
||||
initObjs := []crtclient.Object{&appSet}
|
||||
app := existingApp.DeepCopy()
|
||||
err = controllerutil.SetControllerReference(&appSet, app, scheme)
|
||||
require.NoError(t, err)
|
||||
initObjs = append(initObjs, app)
|
||||
|
||||
fakeClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(initObjs...).
|
||||
WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer).
|
||||
WithInterceptorFuncs(interceptor.Funcs{
|
||||
Patch: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ crtclient.Patch, _ ...crtclient.PatchOption) error {
|
||||
return context.DeadlineExceeded
|
||||
},
|
||||
}).
|
||||
Build()
|
||||
metrics := appsetmetrics.NewFakeAppsetMetrics()
|
||||
|
||||
r := ApplicationSetReconciler{
|
||||
Client: fakeClient,
|
||||
Scheme: scheme,
|
||||
Recorder: record.NewFakeRecorder(10),
|
||||
Metrics: metrics,
|
||||
}
|
||||
|
||||
err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{desiredApp})
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
})
|
||||
|
||||
t.Run("non-context error is collected and returned after all goroutines finish", func(t *testing.T) {
|
||||
initObjs := []crtclient.Object{&appSet}
|
||||
app := existingApp.DeepCopy()
|
||||
err = controllerutil.SetControllerReference(&appSet, app, scheme)
|
||||
require.NoError(t, err)
|
||||
initObjs = append(initObjs, app)
|
||||
|
||||
patchErr := errors.New("some patch error")
|
||||
fakeClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(initObjs...).
|
||||
WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer).
|
||||
WithInterceptorFuncs(interceptor.Funcs{
|
||||
Patch: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ crtclient.Patch, _ ...crtclient.PatchOption) error {
|
||||
return patchErr
|
||||
},
|
||||
}).
|
||||
Build()
|
||||
metrics := appsetmetrics.NewFakeAppsetMetrics()
|
||||
|
||||
r := ApplicationSetReconciler{
|
||||
Client: fakeClient,
|
||||
Scheme: scheme,
|
||||
Recorder: record.NewFakeRecorder(10),
|
||||
Metrics: metrics,
|
||||
}
|
||||
|
||||
err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{desiredApp})
|
||||
require.ErrorIs(t, err, patchErr)
|
||||
})
|
||||
|
||||
t.Run("context canceled on create is returned directly", func(t *testing.T) {
|
||||
initObjs := []crtclient.Object{&appSet}
|
||||
|
||||
fakeClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(initObjs...).
|
||||
WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer).
|
||||
WithInterceptorFuncs(interceptor.Funcs{
|
||||
Create: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ ...crtclient.CreateOption) error {
|
||||
return context.Canceled
|
||||
},
|
||||
}).
|
||||
Build()
|
||||
metrics := appsetmetrics.NewFakeAppsetMetrics()
|
||||
|
||||
r := ApplicationSetReconciler{
|
||||
Client: fakeClient,
|
||||
Scheme: scheme,
|
||||
Recorder: record.NewFakeRecorder(10),
|
||||
Metrics: metrics,
|
||||
}
|
||||
|
||||
newApp := v1alpha1.Application{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "newapp", Namespace: "namespace"},
|
||||
Spec: v1alpha1.ApplicationSpec{Project: "default"},
|
||||
}
|
||||
err = r.createOrUpdateInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{newApp})
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteInCluster_ContextCancellation(t *testing.T) {
|
||||
scheme := runtime.NewScheme()
|
||||
err := v1alpha1.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
err = corev1.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
|
||||
appSet := v1alpha1.ApplicationSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
Namespace: "namespace",
|
||||
},
|
||||
}
|
||||
existingApp := v1alpha1.Application{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: application.ApplicationKind,
|
||||
APIVersion: "argoproj.io/v1alpha1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "delete-me",
|
||||
Namespace: "namespace",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Spec: v1alpha1.ApplicationSpec{Project: "project"},
|
||||
}
|
||||
|
||||
makeReconciler := func(t *testing.T, fakeClient crtclient.Client) ApplicationSetReconciler {
|
||||
t.Helper()
|
||||
kubeclientset := kubefake.NewClientset()
|
||||
clusterInformer, err := settings.NewClusterInformer(kubeclientset, "namespace")
|
||||
require.NoError(t, err)
|
||||
cancel := startAndSyncInformer(t, clusterInformer)
|
||||
t.Cleanup(cancel)
|
||||
return ApplicationSetReconciler{
|
||||
Client: fakeClient,
|
||||
Scheme: scheme,
|
||||
Recorder: record.NewFakeRecorder(10),
|
||||
KubeClientset: kubeclientset,
|
||||
Metrics: appsetmetrics.NewFakeAppsetMetrics(),
|
||||
ClusterInformer: clusterInformer,
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("context canceled on delete is returned directly", func(t *testing.T) {
|
||||
app := existingApp.DeepCopy()
|
||||
err = controllerutil.SetControllerReference(&appSet, app, scheme)
|
||||
require.NoError(t, err)
|
||||
|
||||
fakeClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(&appSet, app).
|
||||
WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer).
|
||||
WithInterceptorFuncs(interceptor.Funcs{
|
||||
Delete: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ ...crtclient.DeleteOption) error {
|
||||
return context.Canceled
|
||||
},
|
||||
}).
|
||||
Build()
|
||||
|
||||
r := makeReconciler(t, fakeClient)
|
||||
err = r.deleteInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{})
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
})
|
||||
|
||||
t.Run("context deadline exceeded on delete is returned directly", func(t *testing.T) {
|
||||
app := existingApp.DeepCopy()
|
||||
err = controllerutil.SetControllerReference(&appSet, app, scheme)
|
||||
require.NoError(t, err)
|
||||
|
||||
fakeClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(&appSet, app).
|
||||
WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer).
|
||||
WithInterceptorFuncs(interceptor.Funcs{
|
||||
Delete: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ ...crtclient.DeleteOption) error {
|
||||
return context.DeadlineExceeded
|
||||
},
|
||||
}).
|
||||
Build()
|
||||
|
||||
r := makeReconciler(t, fakeClient)
|
||||
err = r.deleteInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{})
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
})
|
||||
|
||||
t.Run("non-context delete error is collected and returned", func(t *testing.T) {
|
||||
app := existingApp.DeepCopy()
|
||||
err = controllerutil.SetControllerReference(&appSet, app, scheme)
|
||||
require.NoError(t, err)
|
||||
|
||||
deleteErr := errors.New("delete failed")
|
||||
fakeClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(&appSet, app).
|
||||
WithIndex(&v1alpha1.Application{}, ".metadata.controller", appControllerIndexer).
|
||||
WithInterceptorFuncs(interceptor.Funcs{
|
||||
Delete: func(_ context.Context, _ crtclient.WithWatch, _ crtclient.Object, _ ...crtclient.DeleteOption) error {
|
||||
return deleteErr
|
||||
},
|
||||
}).
|
||||
Build()
|
||||
|
||||
r := makeReconciler(t, fakeClient)
|
||||
err = r.deleteInCluster(t.Context(), log.NewEntry(log.StandardLogger()), appSet, []v1alpha1.Application{})
|
||||
require.ErrorIs(t, err, deleteErr)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRemoveFinalizerOnInvalidDestination_FinalizerTypes(t *testing.T) {
|
||||
scheme := runtime.NewScheme()
|
||||
err := v1alpha1.AddToScheme(scheme)
|
||||
|
|
@ -7321,6 +7754,40 @@ func TestIsRollingSyncStrategy(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFirstAppError(t *testing.T) {
|
||||
errA := errors.New("error from app-a")
|
||||
errB := errors.New("error from app-b")
|
||||
errC := errors.New("error from app-c")
|
||||
|
||||
t.Run("returns nil for empty map", func(t *testing.T) {
|
||||
assert.NoError(t, firstAppError(map[string]error{}))
|
||||
})
|
||||
|
||||
t.Run("returns the single error", func(t *testing.T) {
|
||||
assert.ErrorIs(t, firstAppError(map[string]error{"app-a": errA}), errA)
|
||||
})
|
||||
|
||||
t.Run("returns error from lexicographically first app name", func(t *testing.T) {
|
||||
appErrors := map[string]error{
|
||||
"app-c": errC,
|
||||
"app-a": errA,
|
||||
"app-b": errB,
|
||||
}
|
||||
assert.ErrorIs(t, firstAppError(appErrors), errA)
|
||||
})
|
||||
|
||||
t.Run("result is stable across multiple calls with same input", func(t *testing.T) {
|
||||
appErrors := map[string]error{
|
||||
"app-c": errC,
|
||||
"app-a": errA,
|
||||
"app-b": errB,
|
||||
}
|
||||
for range 10 {
|
||||
assert.ErrorIs(t, firstAppError(appErrors), errA, "firstAppError must return the same error on every call")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSyncApplication(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
|
|||
|
|
@ -24,6 +24,43 @@ import (
|
|||
"github.com/argoproj/argo-cd/v3/util/argo/normalizers"
|
||||
)
|
||||
|
||||
var appEquality = conversion.EqualitiesOrDie(
|
||||
func(a, b resource.Quantity) bool {
|
||||
// Ignore formatting, only care that numeric value stayed the same.
|
||||
// TODO: if we decide it's important, it should be safe to start comparing the format.
|
||||
//
|
||||
// Uninitialized quantities are equivalent to 0 quantities.
|
||||
return a.Cmp(b) == 0
|
||||
},
|
||||
func(a, b metav1.MicroTime) bool {
|
||||
return a.UTC().Equal(b.UTC())
|
||||
},
|
||||
func(a, b metav1.Time) bool {
|
||||
return a.UTC().Equal(b.UTC())
|
||||
},
|
||||
func(a, b labels.Selector) bool {
|
||||
return a.String() == b.String()
|
||||
},
|
||||
func(a, b fields.Selector) bool {
|
||||
return a.String() == b.String()
|
||||
},
|
||||
func(a, b argov1alpha1.ApplicationDestination) bool {
|
||||
return a.Namespace == b.Namespace && a.Name == b.Name && a.Server == b.Server
|
||||
},
|
||||
)
|
||||
|
||||
// BuildIgnoreDiffConfig constructs a DiffConfig from the ApplicationSet's ignoreDifferences rules.
|
||||
// Returns nil when ignoreDifferences is empty.
|
||||
func BuildIgnoreDiffConfig(ignoreDifferences argov1alpha1.ApplicationSetIgnoreDifferences, ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts) (argodiff.DiffConfig, error) {
|
||||
if len(ignoreDifferences) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return argodiff.NewDiffConfigBuilder().
|
||||
WithDiffSettings(ignoreDifferences.ToApplicationIgnoreDifferences(), nil, false, ignoreNormalizerOpts).
|
||||
WithNoCache().
|
||||
Build()
|
||||
}
|
||||
|
||||
// CreateOrUpdate overrides "sigs.k8s.io/controller-runtime" function
|
||||
// in sigs.k8s.io/controller-runtime/pkg/controller/controllerutil/controllerutil.go
|
||||
// to add equality for argov1alpha1.ApplicationDestination
|
||||
|
|
@ -34,10 +71,15 @@ import (
|
|||
// cluster. The object's desired state must be reconciled with the existing
|
||||
// state inside the passed in callback MutateFn.
|
||||
//
|
||||
// diffConfig must be built once per reconcile cycle via BuildIgnoreDiffConfig and may be nil
|
||||
// when there are no ignoreDifferences rules. obj.Spec must already be normalized by the caller
|
||||
// via NormalizeApplicationSpec before this function is called; the live object fetched from the
|
||||
// cluster is normalized internally.
|
||||
//
|
||||
// The MutateFn is called regardless of creating or updating an object.
|
||||
//
|
||||
// It returns the executed operation and an error.
|
||||
func CreateOrUpdate(ctx context.Context, logCtx *log.Entry, c client.Client, ignoreAppDifferences argov1alpha1.ApplicationSetIgnoreDifferences, ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts, obj *argov1alpha1.Application, f controllerutil.MutateFn) (controllerutil.OperationResult, error) {
|
||||
func CreateOrUpdate(ctx context.Context, logCtx *log.Entry, c client.Client, diffConfig argodiff.DiffConfig, obj *argov1alpha1.Application, f controllerutil.MutateFn) (controllerutil.OperationResult, error) {
|
||||
key := client.ObjectKeyFromObject(obj)
|
||||
if err := c.Get(ctx, key, obj); err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
|
|
@ -59,43 +101,18 @@ func CreateOrUpdate(ctx context.Context, logCtx *log.Entry, c client.Client, ign
|
|||
return controllerutil.OperationResultNone, err
|
||||
}
|
||||
|
||||
// Normalize the live spec to avoid spurious diffs from unimportant differences (e.g. nil vs
|
||||
// empty SyncPolicy). obj.Spec is already normalized by the caller; only the live side needs it.
|
||||
normalizedLive.Spec = *argo.NormalizeApplicationSpec(&normalizedLive.Spec)
|
||||
|
||||
// Apply ignoreApplicationDifferences rules to remove ignored fields from both the live and the desired state. This
|
||||
// prevents those differences from appearing in the diff and therefore in the patch.
|
||||
err := applyIgnoreDifferences(ignoreAppDifferences, normalizedLive, obj, ignoreNormalizerOpts)
|
||||
err := applyIgnoreDifferences(diffConfig, normalizedLive, obj)
|
||||
if err != nil {
|
||||
return controllerutil.OperationResultNone, fmt.Errorf("failed to apply ignore differences: %w", err)
|
||||
}
|
||||
|
||||
// Normalize to avoid diffing on unimportant differences.
|
||||
normalizedLive.Spec = *argo.NormalizeApplicationSpec(&normalizedLive.Spec)
|
||||
obj.Spec = *argo.NormalizeApplicationSpec(&obj.Spec)
|
||||
|
||||
equality := conversion.EqualitiesOrDie(
|
||||
func(a, b resource.Quantity) bool {
|
||||
// Ignore formatting, only care that numeric value stayed the same.
|
||||
// TODO: if we decide it's important, it should be safe to start comparing the format.
|
||||
//
|
||||
// Uninitialized quantities are equivalent to 0 quantities.
|
||||
return a.Cmp(b) == 0
|
||||
},
|
||||
func(a, b metav1.MicroTime) bool {
|
||||
return a.UTC().Equal(b.UTC())
|
||||
},
|
||||
func(a, b metav1.Time) bool {
|
||||
return a.UTC().Equal(b.UTC())
|
||||
},
|
||||
func(a, b labels.Selector) bool {
|
||||
return a.String() == b.String()
|
||||
},
|
||||
func(a, b fields.Selector) bool {
|
||||
return a.String() == b.String()
|
||||
},
|
||||
func(a, b argov1alpha1.ApplicationDestination) bool {
|
||||
return a.Namespace == b.Namespace && a.Name == b.Name && a.Server == b.Server
|
||||
},
|
||||
)
|
||||
|
||||
if equality.DeepEqual(normalizedLive, obj) {
|
||||
if appEquality.DeepEqual(normalizedLive, obj) {
|
||||
return controllerutil.OperationResultNone, nil
|
||||
}
|
||||
|
||||
|
|
@ -135,19 +152,13 @@ func mutate(f controllerutil.MutateFn, key client.ObjectKey, obj client.Object)
|
|||
}
|
||||
|
||||
// applyIgnoreDifferences applies the ignore differences rules to the found application. It modifies the applications in place.
|
||||
func applyIgnoreDifferences(applicationSetIgnoreDifferences argov1alpha1.ApplicationSetIgnoreDifferences, found *argov1alpha1.Application, generatedApp *argov1alpha1.Application, ignoreNormalizerOpts normalizers.IgnoreNormalizerOpts) error {
|
||||
if len(applicationSetIgnoreDifferences) == 0 {
|
||||
// diffConfig may be nil, in which case this is a no-op.
|
||||
func applyIgnoreDifferences(diffConfig argodiff.DiffConfig, found *argov1alpha1.Application, generatedApp *argov1alpha1.Application) error {
|
||||
if diffConfig == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
generatedAppCopy := generatedApp.DeepCopy()
|
||||
diffConfig, err := argodiff.NewDiffConfigBuilder().
|
||||
WithDiffSettings(applicationSetIgnoreDifferences.ToApplicationIgnoreDifferences(), nil, false, ignoreNormalizerOpts).
|
||||
WithNoCache().
|
||||
Build()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build diff config: %w", err)
|
||||
}
|
||||
unstructuredFound, err := appToUnstructured(found)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to convert found application to unstructured: %w", err)
|
||||
|
|
|
|||
|
|
@ -224,7 +224,9 @@ spec:
|
|||
generatedApp := v1alpha1.Application{TypeMeta: appMeta}
|
||||
err = yaml.Unmarshal([]byte(tc.generatedApp), &generatedApp)
|
||||
require.NoError(t, err, tc.generatedApp)
|
||||
err = applyIgnoreDifferences(tc.ignoreDifferences, &foundApp, &generatedApp, normalizers.IgnoreNormalizerOpts{})
|
||||
diffConfig, err := BuildIgnoreDiffConfig(tc.ignoreDifferences, normalizers.IgnoreNormalizerOpts{})
|
||||
require.NoError(t, err)
|
||||
err = applyIgnoreDifferences(diffConfig, &foundApp, &generatedApp)
|
||||
require.NoError(t, err)
|
||||
yamlFound, err := yaml.Marshal(tc.foundApp)
|
||||
require.NoError(t, err)
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ func NewCommand() *cobra.Command {
|
|||
tokenRefStrictMode bool
|
||||
maxResourcesStatusCount int
|
||||
cacheSyncPeriod time.Duration
|
||||
concurrentApplicationUpdates int
|
||||
)
|
||||
scheme := runtime.NewScheme()
|
||||
_ = clientgoscheme.AddToScheme(scheme)
|
||||
|
|
@ -239,24 +240,25 @@ func NewCommand() *cobra.Command {
|
|||
})
|
||||
|
||||
if err = (&controllers.ApplicationSetReconciler{
|
||||
Generators: topLevelGenerators,
|
||||
Client: utils.NewCacheSyncingClient(mgr.GetClient(), mgr.GetCache()),
|
||||
Scheme: mgr.GetScheme(),
|
||||
Recorder: mgr.GetEventRecorderFor("applicationset-controller"),
|
||||
Renderer: &utils.Render{},
|
||||
Policy: policyObj,
|
||||
EnablePolicyOverride: enablePolicyOverride,
|
||||
KubeClientset: k8sClient,
|
||||
ArgoDB: argoCDDB,
|
||||
ArgoCDNamespace: namespace,
|
||||
ApplicationSetNamespaces: applicationSetNamespaces,
|
||||
EnableProgressiveSyncs: enableProgressiveSyncs,
|
||||
SCMRootCAPath: scmRootCAPath,
|
||||
GlobalPreservedAnnotations: globalPreservedAnnotations,
|
||||
GlobalPreservedLabels: globalPreservedLabels,
|
||||
Metrics: &metrics,
|
||||
MaxResourcesStatusCount: maxResourcesStatusCount,
|
||||
ClusterInformer: clusterInformer,
|
||||
Generators: topLevelGenerators,
|
||||
Client: utils.NewCacheSyncingClient(mgr.GetClient(), mgr.GetCache()),
|
||||
Scheme: mgr.GetScheme(),
|
||||
Recorder: mgr.GetEventRecorderFor("applicationset-controller"),
|
||||
Renderer: &utils.Render{},
|
||||
Policy: policyObj,
|
||||
EnablePolicyOverride: enablePolicyOverride,
|
||||
KubeClientset: k8sClient,
|
||||
ArgoDB: argoCDDB,
|
||||
ArgoCDNamespace: namespace,
|
||||
ApplicationSetNamespaces: applicationSetNamespaces,
|
||||
EnableProgressiveSyncs: enableProgressiveSyncs,
|
||||
SCMRootCAPath: scmRootCAPath,
|
||||
GlobalPreservedAnnotations: globalPreservedAnnotations,
|
||||
GlobalPreservedLabels: globalPreservedLabels,
|
||||
Metrics: &metrics,
|
||||
MaxResourcesStatusCount: maxResourcesStatusCount,
|
||||
ClusterInformer: clusterInformer,
|
||||
ConcurrentApplicationUpdates: concurrentApplicationUpdates,
|
||||
}).SetupWithManager(mgr, enableProgressiveSyncs, maxConcurrentReconciliations); err != nil {
|
||||
log.Error(err, "unable to create controller", "controller", "ApplicationSet")
|
||||
os.Exit(1)
|
||||
|
|
@ -303,6 +305,7 @@ func NewCommand() *cobra.Command {
|
|||
command.Flags().BoolVar(&enableGitHubAPIMetrics, "enable-github-api-metrics", env.ParseBoolFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_ENABLE_GITHUB_API_METRICS", false), "Enable GitHub API metrics for generators that use the GitHub API")
|
||||
command.Flags().IntVar(&maxResourcesStatusCount, "max-resources-status-count", env.ParseNumFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_MAX_RESOURCES_STATUS_COUNT", 5000, 0, math.MaxInt), "Max number of resources stored in appset status.")
|
||||
command.Flags().DurationVar(&cacheSyncPeriod, "cache-sync-period", env.ParseDurationFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_CACHE_SYNC_PERIOD", time.Hour*10, 0, time.Hour*24), "Period at which the manager client cache is forcefully resynced with the Kubernetes API server. 0 disables periodic resync.")
|
||||
command.Flags().IntVar(&concurrentApplicationUpdates, "concurrent-application-updates", env.ParseNumFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_CONCURRENT_APPLICATION_UPDATES", 1, 1, 200), "Number of concurrent Application create/update/delete operations per ApplicationSet reconcile.")
|
||||
|
||||
return &command
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,28 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewCommand_ConcurrentApplicationUpdatesFlag(t *testing.T) {
|
||||
cmd := NewCommand()
|
||||
|
||||
flag := cmd.Flags().Lookup("concurrent-application-updates")
|
||||
require.NotNil(t, flag, "expected --concurrent-application-updates flag to be registered")
|
||||
assert.Equal(t, "int", flag.Value.Type())
|
||||
assert.Equal(t, "1", flag.DefValue, "default should be 1")
|
||||
}
|
||||
|
||||
func TestNewCommand_ConcurrentApplicationUpdatesFlagValue(t *testing.T) {
|
||||
cmd := NewCommand()
|
||||
|
||||
err := cmd.Flags().Set("concurrent-application-updates", "5")
|
||||
require.NoError(t, err)
|
||||
|
||||
val, err := cmd.Flags().GetInt("concurrent-application-updates")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, val)
|
||||
}
|
||||
|
|
@ -22,6 +22,7 @@ argocd-applicationset-controller [flags]
|
|||
--client-certificate string Path to a client certificate file for TLS
|
||||
--client-key string Path to a client key file for TLS
|
||||
--cluster string The name of the kubeconfig cluster to use
|
||||
--concurrent-application-updates int Number of concurrent Application create/update/delete operations per ApplicationSet reconcile. (default 1)
|
||||
--concurrent-reconciliations int Max concurrent reconciliations limit for the controller (default 10)
|
||||
--context string The name of the kubeconfig context to use
|
||||
--debug Print debug logs. Takes precedence over loglevel
|
||||
|
|
|
|||
2
gitops-engine/pkg/cache/mocks/ClusterCache.go
generated
vendored
2
gitops-engine/pkg/cache/mocks/ClusterCache.go
generated
vendored
|
|
@ -563,7 +563,7 @@ func (_c *ClusterCache_IsNamespaced_Call) RunAndReturn(run func(gk schema.GroupK
|
|||
return _c
|
||||
}
|
||||
|
||||
// IterateHierarchyV2 provides a mock function with given fields: keys, action, orphanedResourceNamespace
|
||||
// IterateHierarchyV2 provides a mock function for the type ClusterCache
|
||||
func (_mock *ClusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *cache.Resource, namespaceResources map[kube.ResourceKey]*cache.Resource) bool) {
|
||||
_mock.Called(keys, action)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -734,10 +734,13 @@ func TestClusterInformer_SecretDeletion(t *testing.T) {
|
|||
err = clientset.CoreV1().Secrets("argocd").Delete(t.Context(), "cluster1", metav1.DeleteOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := informer.GetClusterByURL("https://cluster1.example.com")
|
||||
return err != nil
|
||||
}, 5*time.Second, 10*time.Millisecond, "expected cluster1 to be removed from cache after secret deletion")
|
||||
|
||||
_, err = informer.GetClusterByURL("https://cluster1.example.com")
|
||||
assert.Error(t, err)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "not found")
|
||||
|
||||
cluster2, err := informer.GetClusterByURL("https://cluster2.example.com")
|
||||
|
|
|
|||
Loading…
Reference in a new issue