Revert "fix: prevent automatic refreshes from informer resync and status updates (cherry-pick #25290 for 3.2)" (#27399)

This commit is contained in:
Michael Crenshaw 2026-04-17 04:42:52 -04:00 committed by GitHub
parent 3f789ce02d
commit 91da6a0b4c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 37 additions and 448 deletions

View file

@ -6,6 +6,7 @@ import (
stderrors "errors"
"fmt"
"math"
"math/rand"
"net/http"
"reflect"
"runtime/debug"
@ -25,7 +26,6 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -124,6 +124,7 @@ type ApplicationController struct {
stateCache statecache.LiveStateCache
statusRefreshTimeout time.Duration
statusHardRefreshTimeout time.Duration
statusRefreshJitter time.Duration
selfHealTimeout time.Duration
selfHealBackoff *wait.Backoff
selfHealBackoffCooldown time.Duration
@ -202,6 +203,7 @@ func NewApplicationController(
db: db,
statusRefreshTimeout: appResyncPeriod,
statusHardRefreshTimeout: appHardResyncPeriod,
statusRefreshJitter: appResyncJitter,
refreshRequestedApps: make(map[string]CompareWith),
refreshRequestedAppsMutex: &sync.Mutex{},
auditLogger: argo.NewAuditLogger(kubeClientset, common.ApplicationController, enableK8sEvent),
@ -1013,54 +1015,17 @@ func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext b
log.Errorf("Failed to get application '%s' from informer index: %+v", appKey, err)
return
}
var app *appv1.Application
var logCtx *log.Entry
if !exists {
parts := strings.Split(appKey, "/")
if len(parts) != 2 {
log.WithField("appkey", appKey).Warn("Unexpected appKey format, expected namespace/name")
return processNext
}
appNamespace, appName := parts[0], parts[1]
freshApp, apiErr := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(appNamespace).Get(context.Background(), appName, metav1.GetOptions{})
if apiErr != nil {
if apierrors.IsNotFound(apiErr) {
return processNext
}
log.WithField("appkey", appKey).WithError(apiErr).Error("Failed to retrieve application from API server")
return processNext
}
if freshApp.Operation == nil {
return processNext
}
app = freshApp
logCtx = log.WithFields(applog.GetAppLogFields(app))
} else {
origApp, ok := obj.(*appv1.Application)
if !ok {
log.WithField("appkey", appKey).Warn("Key in index is not an application")
return processNext
}
app = origApp.DeepCopy()
logCtx = log.WithFields(applog.GetAppLogFields(app))
if app.Operation != nil {
freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace).Get(context.Background(), app.Name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
logCtx.WithError(err).Error("Failed to retrieve latest application state")
}
return processNext
}
if freshApp.Operation == nil {
return processNext
}
app = freshApp
}
// This happens after app was deleted, but the work queue still had an entry for it.
return
}
origApp, ok := obj.(*appv1.Application)
if !ok {
log.Warnf("Key '%s' in index is not an application", appKey)
return
}
app := origApp.DeepCopy()
logCtx := log.WithFields(applog.GetAppLogFields(app))
ts := stats.NewTimingStats()
defer func() {
for k, v := range ts.Timings() {
@ -1069,6 +1034,18 @@ func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext b
logCtx = logCtx.WithField("time_ms", time.Since(ts.StartTime).Milliseconds())
logCtx.Debug("Finished processing app operation queue item")
}()
if app.Operation != nil {
// If we get here, we are about to process an operation, but we cannot rely on informer since it might have stale data.
// So always retrieve the latest version to ensure it is not stale to avoid unnecessary syncing.
// We cannot rely on informer since applications might be updated by both application controller and api server.
freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace).Get(context.Background(), app.Name, metav1.GetOptions{})
if err != nil {
logCtx.Errorf("Failed to retrieve latest application state: %v", err)
return
}
app = freshApp
}
ts.AddCheckpoint("get_fresh_app_ms")
if app.Operation != nil {
@ -2406,29 +2383,6 @@ func (ctrl *ApplicationController) canProcessApp(obj any) bool {
return ctrl.clusterSharding.IsManagedCluster(destCluster)
}
func operationChanged(oldApp, newApp *appv1.Application) bool {
return (oldApp.Operation == nil && newApp.Operation != nil) ||
(oldApp.Operation != nil && newApp.Operation != nil && !equality.Semantic.DeepEqual(oldApp.Operation, newApp.Operation))
}
func deletionTimestampChanged(oldApp, newApp *appv1.Application) bool {
return (oldApp.DeletionTimestamp == nil && newApp.DeletionTimestamp != nil) ||
(oldApp.DeletionTimestamp != nil && newApp.DeletionTimestamp != nil && !oldApp.DeletionTimestamp.Equal(newApp.DeletionTimestamp))
}
func isStatusOnlyUpdate(oldApp, newApp *appv1.Application) bool {
if !equality.Semantic.DeepEqual(oldApp.Spec, newApp.Spec) {
return false
}
if operationChanged(oldApp, newApp) {
return false
}
if deletionTimestampChanged(oldApp, newApp) || newApp.DeletionTimestamp != nil {
return false
}
return true
}
func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.SharedIndexInformer, applisters.ApplicationLister) {
watchNamespace := ctrl.namespace
// If we have at least one additional namespace configured, we need to
@ -2521,59 +2475,34 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar
}
},
UpdateFunc: func(old, new any) {
if !ctrl.canProcessApp(new) {
return
}
key, err := cache.MetaNamespaceKeyFunc(new)
if err != nil {
return
}
oldApp, oldOK := old.(*appv1.Application)
newApp, newOK := new.(*appv1.Application)
if !ctrl.canProcessApp(new) {
return
}
if newOK && newApp.Operation != nil {
ctrl.appOperationQueue.AddRateLimited(key)
}
var compareWith *CompareWith
var delay *time.Duration
oldApp, oldOK := old.(*appv1.Application)
newApp, newOK := new.(*appv1.Application)
if oldOK && newOK {
if oldApp.ResourceVersion == newApp.ResourceVersion {
if ctrl.hydrator != nil {
ctrl.appHydrateQueue.AddRateLimited(newApp.QualifiedName())
}
ctrl.clusterSharding.UpdateApp(newApp)
return
}
if isStatusOnlyUpdate(oldApp, newApp) {
oldAnnotations := oldApp.GetAnnotations()
newAnnotations := newApp.GetAnnotations()
refreshAdded := (oldAnnotations == nil || oldAnnotations[appv1.AnnotationKeyRefresh] == "") &&
(newAnnotations != nil && newAnnotations[appv1.AnnotationKeyRefresh] != "")
hydrateAdded := (oldAnnotations == nil || oldAnnotations[appv1.AnnotationKeyHydrate] == "") &&
(newAnnotations != nil && newAnnotations[appv1.AnnotationKeyHydrate] != "")
if !refreshAdded && !hydrateAdded {
if ctrl.hydrator != nil {
ctrl.appHydrateQueue.AddRateLimited(newApp.QualifiedName())
}
ctrl.clusterSharding.UpdateApp(newApp)
return
}
}
if automatedSyncEnabled(oldApp, newApp) {
log.WithFields(applog.GetAppLogFields(newApp)).Info("Enabled automated sync")
compareWith = CompareWithLatest.Pointer()
}
if ctrl.statusRefreshJitter != 0 && oldApp.ResourceVersion == newApp.ResourceVersion {
// Handler is refreshing the apps, add a random jitter to spread the load and avoid spikes
jitter := time.Duration(float64(ctrl.statusRefreshJitter) * rand.Float64())
delay = &jitter
}
}
ctrl.requestAppRefresh(newApp.QualifiedName(), compareWith, delay)
if !newOK {
if !newOK || (delay != nil && *delay != time.Duration(0)) {
ctrl.appOperationQueue.AddRateLimited(key)
}
if ctrl.hydrator != nil {
@ -2586,7 +2515,7 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar
return
}
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// Key function.
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
// for deletes, we immediately add to the refresh queue

View file

@ -13,7 +13,6 @@ import (
"github.com/argoproj/gitops-engine/pkg/utils/kube/kubetest"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
@ -1694,252 +1693,6 @@ func TestUnchangedManagedNamespaceMetadata(t *testing.T) {
assert.Equal(t, CompareWithLatest, compareWith)
}
func TestApplicationInformerUpdateFunc(t *testing.T) {
// Test that UpdateFunc correctly handles:
// 1. Status-only updates (no annotation) - should NOT trigger refresh
// 2. Status-only updates WITH refresh annotation - should trigger refresh
// 3. Spec changes - should trigger refresh
// 4. Informer resync (same ResourceVersion) - should NOT trigger refresh
app := newFakeApp()
app.Spec.Destination.Namespace = test.FakeArgoCDNamespace
app.Spec.Destination.Server = v1alpha1.KubernetesInternalAPIServerAddr
proj := defaultProj.DeepCopy()
proj.Spec.SourceNamespaces = []string{test.FakeArgoCDNamespace}
ctrl := newFakeController(&fakeData{apps: []runtime.Object{app, proj}}, nil)
simulateUpdateFunc := func(oldApp, newApp *v1alpha1.Application) {
if !ctrl.canProcessApp(newApp) {
return
}
key, err := cache.MetaNamespaceKeyFunc(newApp)
if err != nil {
return
}
var compareWith *CompareWith
var delay *time.Duration
oldOK := oldApp != nil
newOK := newApp != nil
if oldOK && newOK {
if oldApp.ResourceVersion == newApp.ResourceVersion {
if ctrl.hydrator != nil {
ctrl.appHydrateQueue.AddRateLimited(newApp.QualifiedName())
}
ctrl.clusterSharding.UpdateApp(newApp)
return
}
// Check if operation was added or changed - always process operations
operationChanged := (oldApp.Operation == nil && newApp.Operation != nil) ||
(oldApp.Operation != nil && newApp.Operation != nil && !equality.Semantic.DeepEqual(oldApp.Operation, newApp.Operation))
deletionTimestampChanged := (oldApp.DeletionTimestamp == nil && newApp.DeletionTimestamp != nil) ||
(oldApp.DeletionTimestamp != nil && newApp.DeletionTimestamp != nil && !oldApp.DeletionTimestamp.Equal(newApp.DeletionTimestamp))
appBeingDeleted := newApp.DeletionTimestamp != nil
if equality.Semantic.DeepEqual(oldApp.Spec, newApp.Spec) && !operationChanged && !deletionTimestampChanged && !appBeingDeleted {
oldAnnotations := oldApp.GetAnnotations()
newAnnotations := newApp.GetAnnotations()
refreshAdded := (oldAnnotations == nil || oldAnnotations[v1alpha1.AnnotationKeyRefresh] == "") &&
(newAnnotations != nil && newAnnotations[v1alpha1.AnnotationKeyRefresh] != "")
hydrateAdded := (oldAnnotations == nil || oldAnnotations[v1alpha1.AnnotationKeyHydrate] == "") &&
(newAnnotations != nil && newAnnotations[v1alpha1.AnnotationKeyHydrate] != "")
if !refreshAdded && !hydrateAdded {
if ctrl.hydrator != nil {
ctrl.appHydrateQueue.AddRateLimited(newApp.QualifiedName())
}
ctrl.clusterSharding.UpdateApp(newApp)
return
}
}
if automatedSyncEnabled(oldApp, newApp) {
compareWith = CompareWithLatest.Pointer()
}
if compareWith == nil {
compareWith = CompareWithRecent.Pointer()
}
}
ctrl.requestAppRefresh(newApp.QualifiedName(), compareWith, delay)
if !newOK {
ctrl.appOperationQueue.AddRateLimited(key)
}
if ctrl.hydrator != nil {
ctrl.appHydrateQueue.AddRateLimited(newApp.QualifiedName())
}
ctrl.clusterSharding.UpdateApp(newApp)
}
checkRefreshRequested := func(appName string, shouldBeRequested bool, msg string) {
key := ctrl.toAppKey(appName)
ctrl.refreshRequestedAppsMutex.Lock()
_, isRequested := ctrl.refreshRequestedApps[key]
ctrl.refreshRequestedAppsMutex.Unlock()
assert.Equal(t, shouldBeRequested, isRequested, "%s: Refresh request state mismatch for app %s (key: %s)", msg, appName, key)
}
t.Run("Status-only update without annotation should NOT trigger refresh", func(_ *testing.T) {
ctrl.refreshRequestedAppsMutex.Lock()
ctrl.refreshRequestedApps = make(map[string]CompareWith)
ctrl.refreshRequestedAppsMutex.Unlock()
oldApp := app.DeepCopy()
oldApp.ResourceVersion = "1"
oldApp.Status.ReconciledAt = &metav1.Time{Time: time.Now().Add(-1 * time.Hour)}
newApp := oldApp.DeepCopy()
newApp.ResourceVersion = "2"
newApp.Status.ReconciledAt = &metav1.Time{Time: time.Now()}
simulateUpdateFunc(oldApp, newApp)
checkRefreshRequested(app.QualifiedName(), false, "Status-only update without annotation")
})
t.Run("Status-only update WITH refresh annotation SHOULD trigger refresh", func(_ *testing.T) {
ctrl.refreshRequestedAppsMutex.Lock()
ctrl.refreshRequestedApps = make(map[string]CompareWith)
ctrl.refreshRequestedAppsMutex.Unlock()
oldApp := app.DeepCopy()
oldApp.ResourceVersion = "3"
oldApp.Status.ReconciledAt = &metav1.Time{Time: time.Now().Add(-1 * time.Hour)}
newApp := oldApp.DeepCopy()
newApp.ResourceVersion = "4"
newApp.Status.ReconciledAt = &metav1.Time{Time: time.Now()}
if newApp.Annotations == nil {
newApp.Annotations = make(map[string]string)
}
newApp.Annotations[v1alpha1.AnnotationKeyRefresh] = string(v1alpha1.RefreshTypeNormal)
simulateUpdateFunc(oldApp, newApp)
checkRefreshRequested(app.QualifiedName(), true, "Status-only update WITH refresh annotation")
})
t.Run("Status-only update WITH hydrate annotation SHOULD trigger refresh", func(_ *testing.T) {
ctrl.refreshRequestedAppsMutex.Lock()
ctrl.refreshRequestedApps = make(map[string]CompareWith)
ctrl.refreshRequestedAppsMutex.Unlock()
oldApp := app.DeepCopy()
oldApp.ResourceVersion = "5"
oldApp.Status.ReconciledAt = &metav1.Time{Time: time.Now().Add(-1 * time.Hour)}
newApp := oldApp.DeepCopy()
newApp.ResourceVersion = "6"
newApp.Status.ReconciledAt = &metav1.Time{Time: time.Now()}
if newApp.Annotations == nil {
newApp.Annotations = make(map[string]string)
}
newApp.Annotations[v1alpha1.AnnotationKeyHydrate] = "true"
simulateUpdateFunc(oldApp, newApp)
checkRefreshRequested(app.QualifiedName(), true, "Status-only update WITH hydrate annotation")
})
t.Run("Status-only update WITH both refresh and hydrate annotations SHOULD trigger refresh", func(_ *testing.T) {
ctrl.refreshRequestedAppsMutex.Lock()
ctrl.refreshRequestedApps = make(map[string]CompareWith)
ctrl.refreshRequestedAppsMutex.Unlock()
oldApp := app.DeepCopy()
oldApp.ResourceVersion = "7"
oldApp.Status.ReconciledAt = &metav1.Time{Time: time.Now().Add(-1 * time.Hour)}
newApp := oldApp.DeepCopy()
newApp.ResourceVersion = "8"
newApp.Status.ReconciledAt = &metav1.Time{Time: time.Now()}
if newApp.Annotations == nil {
newApp.Annotations = make(map[string]string)
}
newApp.Annotations[v1alpha1.AnnotationKeyRefresh] = string(v1alpha1.RefreshTypeNormal)
newApp.Annotations[v1alpha1.AnnotationKeyHydrate] = "true"
simulateUpdateFunc(oldApp, newApp)
checkRefreshRequested(app.QualifiedName(), true, "Status-only update WITH both refresh and hydrate annotations")
})
t.Run("Status-only update with annotation REMOVAL should NOT trigger refresh", func(_ *testing.T) {
ctrl.refreshRequestedAppsMutex.Lock()
ctrl.refreshRequestedApps = make(map[string]CompareWith)
ctrl.refreshRequestedAppsMutex.Unlock()
oldApp := app.DeepCopy()
oldApp.ResourceVersion = "9"
oldApp.Status.ReconciledAt = &metav1.Time{Time: time.Now().Add(-1 * time.Hour)}
if oldApp.Annotations == nil {
oldApp.Annotations = make(map[string]string)
}
oldApp.Annotations[v1alpha1.AnnotationKeyRefresh] = string(v1alpha1.RefreshTypeNormal)
newApp := oldApp.DeepCopy()
newApp.ResourceVersion = "10"
newApp.Status.ReconciledAt = &metav1.Time{Time: time.Now()}
delete(newApp.Annotations, v1alpha1.AnnotationKeyRefresh)
simulateUpdateFunc(oldApp, newApp)
checkRefreshRequested(app.QualifiedName(), false, "Status-only update with annotation REMOVAL")
})
t.Run("Spec change SHOULD trigger refresh", func(_ *testing.T) {
ctrl.refreshRequestedAppsMutex.Lock()
ctrl.refreshRequestedApps = make(map[string]CompareWith)
ctrl.refreshRequestedAppsMutex.Unlock()
oldApp := app.DeepCopy()
oldApp.ResourceVersion = "11"
newApp := oldApp.DeepCopy()
newApp.ResourceVersion = "12"
newApp.Spec.Destination.Namespace = "different-namespace"
simulateUpdateFunc(oldApp, newApp)
checkRefreshRequested(app.QualifiedName(), true, "Spec change")
})
t.Run("Informer resync (same ResourceVersion) should NOT trigger refresh", func(_ *testing.T) {
ctrl.refreshRequestedAppsMutex.Lock()
ctrl.refreshRequestedApps = make(map[string]CompareWith)
ctrl.refreshRequestedAppsMutex.Unlock()
oldApp := app.DeepCopy()
oldApp.ResourceVersion = "13"
newApp := oldApp.DeepCopy()
newApp.ResourceVersion = "13"
newApp.Status.ReconciledAt = &metav1.Time{Time: time.Now()}
simulateUpdateFunc(oldApp, newApp)
checkRefreshRequested(app.QualifiedName(), false, "Informer resync")
})
t.Run("DeletionTimestamp added SHOULD trigger refresh", func(_ *testing.T) {
// Reset refresh state
ctrl.refreshRequestedAppsMutex.Lock()
ctrl.refreshRequestedApps = make(map[string]CompareWith)
ctrl.refreshRequestedAppsMutex.Unlock()
oldApp := app.DeepCopy()
oldApp.ResourceVersion = "14"
oldApp.DeletionTimestamp = nil
newApp := oldApp.DeepCopy()
newApp.ResourceVersion = "15"
newApp.DeletionTimestamp = &metav1.Time{Time: time.Now()}
newApp.Status.ReconciledAt = &metav1.Time{Time: time.Now()}
simulateUpdateFunc(oldApp, newApp)
checkRefreshRequested(app.QualifiedName(), true, "DeletionTimestamp added")
})
}
func TestRefreshAppConditions(t *testing.T) {
defaultProj := v1alpha1.AppProject{
ObjectMeta: metav1.ObjectMeta{

View file

@ -2412,96 +2412,3 @@ func TestCreateAppInNotAllowedNamespace(t *testing.T) {
Expect(DoesNotExist()).
Expect(Error("", "namespace 'default' is not permitted"))
}
// TestZeroReconciliationTimeoutNoExcessiveRefreshes verifies that when timeout.reconciliation=0s,
// applications do not trigger excessive automatic refreshes from status-only updates.
func TestZeroReconciliationTimeoutNoExcessiveRefreshes(t *testing.T) {
ctx := t.Context()
namespace := fixture.TestNamespace()
require.NoError(t, fixture.SetParamInSettingConfigMap("timeout.reconciliation", "0s"))
require.NoError(t, fixture.SetParamInSettingConfigMap("timeout.reconciliation.jitter", "0s"))
defer func() {
_ = fixture.SetParamInSettingConfigMap("timeout.reconciliation", "")
_ = fixture.SetParamInSettingConfigMap("timeout.reconciliation.jitter", "")
}()
configMap, err := fixture.KubeClientset.CoreV1().ConfigMaps(namespace).Get(ctx, common.ArgoCDConfigMapName, metav1.GetOptions{})
require.NoError(t, err)
require.Equal(t, "0s", configMap.Data["timeout.reconciliation"])
require.Equal(t, "0s", configMap.Data["timeout.reconciliation.jitter"])
configMapResourceVersion := configMap.ResourceVersion
configMapUpdateTime := time.Now()
require.Eventually(t, func() bool {
currentConfigMap, err := fixture.KubeClientset.CoreV1().ConfigMaps(namespace).Get(ctx, common.ArgoCDConfigMapName, metav1.GetOptions{})
if err != nil {
return false
}
if currentConfigMap.ResourceVersion != configMapResourceVersion {
configMapResourceVersion = currentConfigMap.ResourceVersion
configMapUpdateTime = time.Now()
return false
}
timeSinceUpdate := time.Since(configMapUpdateTime)
if timeSinceUpdate < 5*time.Second {
return false
}
apps, err := fixture.AppClientset.ArgoprojV1alpha1().Applications(fixture.AppNamespace()).List(ctx, metav1.ListOptions{})
if err != nil {
return false
}
now := time.Now()
for _, app := range apps.Items {
if app.Status.ReconciledAt != nil {
reconciledTime := app.Status.ReconciledAt.Time
if now.Sub(reconciledTime) < 30*time.Second {
return true
}
}
}
return true
}, 30*time.Second, 1*time.Second, "controller did not sync ConfigMap in time")
Given(t).
Path(guestbookPath).
SetTrackingMethod("annotation").
SetAppNamespace(fixture.AppNamespace()).
When().
CreateApp().
Sync().
Then().
Expect(OperationPhaseIs(OperationSucceeded)).
Expect(SyncStatusIs(SyncStatusCodeSynced)).
And(func(a *Application) {
time.Sleep(5 * time.Second)
app, err := fixture.AppClientset.ArgoprojV1alpha1().Applications(fixture.AppNamespace()).Get(context.Background(), a.Name, metav1.GetOptions{})
require.NoError(t, err)
initialReconciledAt := app.Status.ReconciledAt
require.NotNil(t, initialReconciledAt)
ctx, cancel := context.WithTimeout(t.Context(), 4*time.Minute)
defer cancel()
refreshCount := 0
lastReconciledAt := initialReconciledAt.DeepCopy()
for event := range fixture.ArgoCDClientset.WatchApplicationWithRetry(ctx, a.QualifiedName(), app.ResourceVersion) {
reconciledAt := event.Application.Status.ReconciledAt
if reconciledAt == nil {
continue
}
if !lastReconciledAt.Equal(reconciledAt) {
refreshCount++
lastReconciledAt = reconciledAt.DeepCopy()
}
}
assert.LessOrEqual(t, refreshCount, 1, "application refreshed %d times (expected ≤1) with timeout.reconciliation=0s", refreshCount)
})
}