This commit is contained in:
boostrack 2026-04-21 13:47:41 +07:00 committed by GitHub
commit 436ac26b42
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 790 additions and 5 deletions

View file

@ -94,6 +94,15 @@ const (
ComparisonWithNothing CompareWith = 0
)
type hydrationSyncGateAction string
const (
hydrationSyncGateActionNone hydrationSyncGateAction = "none"
hydrationSyncGateActionWait hydrationSyncGateAction = "wait"
hydrationSyncGateActionHydrate hydrationSyncGateAction = "hydrate"
hydrationSyncGateActionRefresh hydrationSyncGateAction = "refresh"
)
func (a CompareWith) Max(b CompareWith) CompareWith {
return CompareWith(math.Max(float64(a), float64(b)))
}
@ -1532,6 +1541,13 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
}
ts.AddCheckpoint("initial_operation_stage_ms")
// Sync is gated by hydration for sourceHydrator apps.
// gateSyncOnHydration may enqueue hydrate/refresh requests and returns true when
// the current operation should pause until hydration state catches up.
if ctrl.gateSyncOnHydration(app, state, logCtx) {
return
}
terminating := state.Phase == synccommon.OperationTerminating
project, err := ctrl.getAppProj(app)
if err == nil {
@ -1606,6 +1622,72 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
ts.AddCheckpoint("request_app_refresh_ms")
}
func hydrationSyncGateDecision(app *appv1.Application, state *appv1.OperationState) hydrationSyncGateAction {
if app.Spec.SourceHydrator == nil || state.Operation.Sync == nil {
return hydrationSyncGateActionNone
}
// Decision order matters:
// 1) Wait while hydration is still running.
// 2) Request hydration if current sync started after the last successful hydration.
// 3) Request a non-hydrate refresh if hydrated SHA differs from sync revision.
// If the user requested a specific revision (e.g. rollback), do not block sync on hydration.
if state.Operation.Sync.Revision != "" || len(state.Operation.Sync.Revisions) > 0 {
return hydrationSyncGateActionNone
}
// If hydrateTo is configured, hydration and sync are intentionally decoupled.
if app.Spec.SourceHydrator.HydrateTo != nil {
return hydrationSyncGateActionNone
}
op := app.Status.SourceHydrator.CurrentOperation
if op != nil && op.Phase == appv1.HydrateOperationPhaseHydrating {
return hydrationSyncGateActionWait
}
var lastHydrationTime time.Time
if op != nil && op.Phase == appv1.HydrateOperationPhaseHydrated && op.FinishedAt != nil {
lastHydrationTime = op.FinishedAt.Time
}
if lastHydrationTime.Before(state.StartedAt.Time) {
return hydrationSyncGateActionHydrate
}
if op != nil && op.HydratedSHA != "" && app.Status.Sync.Revision != op.HydratedSHA {
return hydrationSyncGateActionRefresh
}
return hydrationSyncGateActionNone
}
func (ctrl *ApplicationController) gateSyncOnHydration(app *appv1.Application, state *appv1.OperationState, logCtx *log.Entry) bool {
action := hydrationSyncGateDecision(app, state)
logCtx = logCtx.WithField("hydrationSyncGateAction", action)
switch action {
case hydrationSyncGateActionWait:
logCtx.Debug("Sync operation is waiting for source hydration to complete")
return true
case hydrationSyncGateActionHydrate:
logCtx.Info("Requesting source hydration before sync operation")
_, err := argo.RefreshApp(ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace), app.Name, appv1.RefreshTypeNormal, true)
if err != nil {
logCtx.WithError(err).Error("Failed to request hydration before sync")
}
return true
case hydrationSyncGateActionRefresh:
logCtx.Info("Requesting app refresh to pick up hydrated commit before sync operation")
_, err := argo.RefreshApp(ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace), app.Name, appv1.RefreshTypeNormal, false)
if err != nil {
logCtx.WithError(err).Error("Failed to request refresh before sync")
}
return true
default:
return false
}
}
func (ctrl *ApplicationController) setOperationState(app *appv1.Application, state *appv1.OperationState) {
logCtx := log.WithFields(applog.GetAppLogFields(app))
if state.Phase == "" {

View file

@ -102,6 +102,14 @@ func newFakeController(ctx context.Context, data *fakeData, repoErr error) *Appl
}
func newFakeControllerWithResync(ctx context.Context, data *fakeData, appResyncPeriod time.Duration, repoErr, revisionPathsErr error) *ApplicationController {
return newFakeControllerWithResyncAndHydrator(ctx, data, appResyncPeriod, repoErr, revisionPathsErr, false)
}
func newFakeHydratorControllerWithResync(ctx context.Context, data *fakeData, appResyncPeriod time.Duration, repoErr, revisionPathsErr error) *ApplicationController {
return newFakeControllerWithResyncAndHydrator(ctx, data, appResyncPeriod, repoErr, revisionPathsErr, true)
}
func newFakeControllerWithResyncAndHydrator(ctx context.Context, data *fakeData, appResyncPeriod time.Duration, repoErr, revisionPathsErr error, hydratorEnabled bool) *ApplicationController {
var clust corev1.Secret
err := yaml.Unmarshal([]byte(fakeCluster), &clust)
if err != nil {
@ -207,7 +215,7 @@ func newFakeControllerWithResync(ctx context.Context, data *fakeData, appResyncP
false,
normalizers.IgnoreNormalizerOpts{},
testEnableEventList,
false,
hydratorEnabled,
)
db := &dbmocks.ArgoDB{}
db.EXPECT().GetApplicationControllerReplicas().Return(1).Maybe()
@ -2584,6 +2592,173 @@ func TestProjectErrorToCondition(t *testing.T) {
assert.Equal(t, v1alpha1.ApplicationConditionInvalidSpecError, updatedApp.Status.Conditions[0].Type)
}
func TestHydrationSyncGateDecision(t *testing.T) {
now := time.Now().UTC()
newState := func() *v1alpha1.OperationState {
return &v1alpha1.OperationState{
Operation: v1alpha1.Operation{
Sync: &v1alpha1.SyncOperation{},
},
StartedAt: metav1.NewTime(now),
}
}
newApp := func() *v1alpha1.Application {
app := newFakeApp()
app.Spec.SourceHydrator = &v1alpha1.SourceHydrator{
DrySource: v1alpha1.DrySource{
RepoURL: app.Spec.Source.RepoURL,
Path: app.Spec.Source.Path,
TargetRevision: app.Spec.Source.TargetRevision,
},
SyncSource: v1alpha1.SyncSource{
TargetBranch: "main",
Path: "hydrated",
},
}
return app
}
testCases := []struct {
name string
app *v1alpha1.Application
state *v1alpha1.OperationState
expected hydrationSyncGateAction
}{
{
name: "no source hydrator",
app: newFakeApp(),
state: &v1alpha1.OperationState{
Operation: v1alpha1.Operation{Sync: &v1alpha1.SyncOperation{}},
StartedAt: metav1.NewTime(now),
},
expected: hydrationSyncGateActionNone,
},
{
name: "hydration in progress waits sync",
app: func() *v1alpha1.Application {
app := newApp()
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrating,
}
return app
}(),
state: newState(),
expected: hydrationSyncGateActionWait,
},
{
name: "stale hydration requests hydrate",
app: func() *v1alpha1.Application {
app := newApp()
finishedAt := metav1.NewTime(now.Add(-2 * time.Minute))
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrated,
FinishedAt: &finishedAt,
}
return app
}(),
state: newState(),
expected: hydrationSyncGateActionHydrate,
},
{
name: "new hydration but sync revision stale requests refresh",
app: func() *v1alpha1.Application {
app := newApp()
finishedAt := metav1.NewTime(now.Add(1 * time.Minute))
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrated,
FinishedAt: &finishedAt,
HydratedSHA: "new-hydrated-sha",
}
app.Status.Sync.Revision = "old-hydrated-sha"
return app
}(),
state: newState(),
expected: hydrationSyncGateActionRefresh,
},
{
name: "hydrated at same time as sync start does not request hydrate",
app: func() *v1alpha1.Application {
app := newApp()
finishedAt := metav1.NewTime(now)
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrated,
FinishedAt: &finishedAt,
}
return app
}(),
state: newState(),
expected: hydrationSyncGateActionNone,
},
{
name: "hydrated operation with empty hydrated sha does not request refresh",
app: func() *v1alpha1.Application {
app := newApp()
finishedAt := metav1.NewTime(now.Add(1 * time.Minute))
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrated,
FinishedAt: &finishedAt,
}
app.Status.Sync.Revision = "old-hydrated-sha"
return app
}(),
state: newState(),
expected: hydrationSyncGateActionNone,
},
{
name: "explicit sync revision bypasses gating",
app: func() *v1alpha1.Application {
app := newApp()
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrating,
}
return app
}(),
state: func() *v1alpha1.OperationState {
st := newState()
st.Operation.Sync.Revision = "12345"
return st
}(),
expected: hydrationSyncGateActionNone,
},
{
name: "explicit sync revisions bypass gating",
app: func() *v1alpha1.Application {
app := newApp()
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrating,
}
return app
}(),
state: func() *v1alpha1.OperationState {
st := newState()
st.Operation.Sync.Revisions = []string{"12345"}
return st
}(),
expected: hydrationSyncGateActionNone,
},
{
name: "hydrateTo bypasses gating",
app: func() *v1alpha1.Application {
app := newApp()
app.Spec.SourceHydrator.HydrateTo = &v1alpha1.HydrateTo{TargetBranch: "staging"}
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrating,
}
return app
}(),
state: newState(),
expected: hydrationSyncGateActionNone,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
action := hydrationSyncGateDecision(tc.app, tc.state)
assert.Equal(t, tc.expected, action)
})
}
}
func TestFinalizeProjectDeletion_HasApplications(t *testing.T) {
app := newFakeApp()
proj := &v1alpha1.AppProject{ObjectMeta: metav1.ObjectMeta{Name: "default", Namespace: test.FakeArgoCDNamespace}}
@ -2954,6 +3129,215 @@ func TestProcessRequestedAppOperation_Successful(t *testing.T) {
assert.Equal(t, CompareWithLatestForceResolve, level)
}
func TestProcessRequestedAppOperation_HydrationGate_WaitsWhenHydrating(t *testing.T) {
app := newFakeApp()
app.Spec.Project = "default"
app.Spec.SourceHydrator = &v1alpha1.SourceHydrator{
DrySource: v1alpha1.DrySource{
RepoURL: app.Spec.Source.RepoURL,
Path: app.Spec.Source.Path,
TargetRevision: app.Spec.Source.TargetRevision,
},
SyncSource: v1alpha1.SyncSource{
TargetBranch: "main",
Path: "hydrated",
},
}
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrating,
}
app.Operation = &v1alpha1.Operation{
Sync: &v1alpha1.SyncOperation{},
}
ctrl := newFakeController(t.Context(), &fakeData{
apps: []runtime.Object{app, &defaultProj},
manifestResponses: []*apiclient.ManifestResponse{{
Manifests: []string{},
}},
}, nil)
ctrl.processRequestedAppOperation(app)
updatedApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Get(t.Context(), app.Name, metav1.GetOptions{})
require.NoError(t, err)
require.NotNil(t, updatedApp.Status.OperationState)
assert.Equal(t, synccommon.OperationRunning, updatedApp.Status.OperationState.Phase)
assert.Empty(t, updatedApp.Annotations[v1alpha1.AnnotationKeyRefresh])
assert.Empty(t, updatedApp.Annotations[v1alpha1.AnnotationKeyHydrate])
}
func TestProcessRequestedAppOperation_HydrationGate_RequestsHydrate(t *testing.T) {
app := newFakeApp()
app.Spec.Project = "default"
app.Spec.SourceHydrator = &v1alpha1.SourceHydrator{
DrySource: v1alpha1.DrySource{
RepoURL: app.Spec.Source.RepoURL,
Path: app.Spec.Source.Path,
TargetRevision: app.Spec.Source.TargetRevision,
},
SyncSource: v1alpha1.SyncSource{
TargetBranch: "main",
Path: "hydrated",
},
}
finishedAt := metav1.Now()
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrated,
FinishedAt: &finishedAt,
HydratedSHA: "abc123",
}
app.Operation = &v1alpha1.Operation{
Sync: &v1alpha1.SyncOperation{},
}
ctrl := newFakeController(t.Context(), &fakeData{
apps: []runtime.Object{app, &defaultProj},
manifestResponses: []*apiclient.ManifestResponse{{
Manifests: []string{},
}},
}, nil)
ctrl.processRequestedAppOperation(app)
updatedApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Get(t.Context(), app.Name, metav1.GetOptions{})
require.NoError(t, err)
require.NotNil(t, updatedApp.Status.OperationState)
assert.Equal(t, synccommon.OperationRunning, updatedApp.Status.OperationState.Phase)
assert.Equal(t, string(v1alpha1.RefreshTypeNormal), updatedApp.Annotations[v1alpha1.AnnotationKeyRefresh])
assert.Equal(t, string(v1alpha1.HydrateTypeNormal), updatedApp.Annotations[v1alpha1.AnnotationKeyHydrate])
}
func TestProcessRequestedAppOperation_HydrationGate_RequestsRefreshForHydratedSHA(t *testing.T) {
app := newFakeApp()
app.Spec.Project = "default"
app.Spec.SourceHydrator = &v1alpha1.SourceHydrator{
DrySource: v1alpha1.DrySource{
RepoURL: app.Spec.Source.RepoURL,
Path: app.Spec.Source.Path,
TargetRevision: app.Spec.Source.TargetRevision,
},
SyncSource: v1alpha1.SyncSource{
TargetBranch: "main",
Path: "hydrated",
},
}
finishedAt := metav1.NewTime(time.Now().Add(2 * time.Minute))
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrated,
FinishedAt: &finishedAt,
HydratedSHA: "new-hydrated-sha",
}
app.Status.Sync.Revision = "old-hydrated-sha"
app.Operation = &v1alpha1.Operation{
Sync: &v1alpha1.SyncOperation{},
}
ctrl := newFakeController(t.Context(), &fakeData{
apps: []runtime.Object{app, &defaultProj},
manifestResponses: []*apiclient.ManifestResponse{{
Manifests: []string{},
}},
}, nil)
ctrl.processRequestedAppOperation(app)
updatedApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Get(t.Context(), app.Name, metav1.GetOptions{})
require.NoError(t, err)
require.NotNil(t, updatedApp.Status.OperationState)
assert.Equal(t, synccommon.OperationRunning, updatedApp.Status.OperationState.Phase)
assert.Equal(t, string(v1alpha1.RefreshTypeNormal), updatedApp.Annotations[v1alpha1.AnnotationKeyRefresh])
assert.Empty(t, updatedApp.Annotations[v1alpha1.AnnotationKeyHydrate])
}
func TestGateSyncOnHydration_RequestHydrateError(t *testing.T) {
app := newFakeApp()
app.Spec.SourceHydrator = &v1alpha1.SourceHydrator{
DrySource: v1alpha1.DrySource{
RepoURL: app.Spec.Source.RepoURL,
Path: app.Spec.Source.Path,
TargetRevision: app.Spec.Source.TargetRevision,
},
SyncSource: v1alpha1.SyncSource{
TargetBranch: "main",
Path: "hydrated",
},
}
finishedAt := metav1.Now()
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrated,
FinishedAt: &finishedAt,
}
ctrl := newFakeController(t.Context(), &fakeData{apps: []runtime.Object{app}}, nil)
fakeAppCs := ctrl.applicationClientset.(*appclientset.Clientset)
fakeAppCs.PrependReactor("patch", "applications", func(_ kubetesting.Action) (bool, runtime.Object, error) {
return true, nil, errors.New("hydrate refresh failed")
})
state := &v1alpha1.OperationState{
Operation: v1alpha1.Operation{
Sync: &v1alpha1.SyncOperation{},
},
StartedAt: metav1.NewTime(time.Now().Add(1 * time.Minute)),
}
blocked := ctrl.gateSyncOnHydration(app, state, logrus.WithField("test", "hydrate-error"))
assert.True(t, blocked)
}
func TestGateSyncOnHydration_RequestRefreshError(t *testing.T) {
app := newFakeApp()
app.Spec.SourceHydrator = &v1alpha1.SourceHydrator{
DrySource: v1alpha1.DrySource{
RepoURL: app.Spec.Source.RepoURL,
Path: app.Spec.Source.Path,
TargetRevision: app.Spec.Source.TargetRevision,
},
SyncSource: v1alpha1.SyncSource{
TargetBranch: "main",
Path: "hydrated",
},
}
finishedAt := metav1.NewTime(time.Now().Add(2 * time.Minute))
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrated,
FinishedAt: &finishedAt,
HydratedSHA: "new-hydrated-sha",
}
app.Status.Sync.Revision = "old-hydrated-sha"
ctrl := newFakeController(t.Context(), &fakeData{apps: []runtime.Object{app}}, nil)
fakeAppCs := ctrl.applicationClientset.(*appclientset.Clientset)
fakeAppCs.PrependReactor("patch", "applications", func(_ kubetesting.Action) (bool, runtime.Object, error) {
return true, nil, errors.New("refresh failed")
})
state := &v1alpha1.OperationState{
Operation: v1alpha1.Operation{
Sync: &v1alpha1.SyncOperation{},
},
StartedAt: metav1.NewTime(time.Now()),
}
blocked := ctrl.gateSyncOnHydration(app, state, logrus.WithField("test", "refresh-error"))
assert.True(t, blocked)
}
func TestGateSyncOnHydration_NoAction(t *testing.T) {
app := newFakeApp()
ctrl := newFakeController(t.Context(), &fakeData{apps: []runtime.Object{app}}, nil)
state := &v1alpha1.OperationState{
Operation: v1alpha1.Operation{
Sync: &v1alpha1.SyncOperation{},
},
StartedAt: metav1.NewTime(time.Now()),
}
blocked := ctrl.gateSyncOnHydration(app, state, logrus.WithField("test", "no-action"))
assert.False(t, blocked)
}
func TestProcessRequestedAppAutomatedOperation_Successful(t *testing.T) {
app := newFakeApp()
app.Spec.Project = "default"

View file

@ -116,6 +116,15 @@ func (h *Hydrator) ProcessAppHydrateQueueItem(origApp *appv1.Application) {
logCtx.Debug("Processing app hydrate queue item")
needsHydration, reason := appNeedsHydration(app)
if !needsHydration && h.shouldCheckDrySourceRevision(app) {
latestDrySHA, err := h.getLatestDrySourceRevision(app)
if err != nil {
logCtx.WithError(err).Warn("Failed to resolve latest dry source revision")
} else if latestDrySHA != app.Status.SourceHydrator.CurrentOperation.DrySHA {
needsHydration = true
reason = "dry source revision changed"
}
}
if needsHydration {
app.Status.SourceHydrator.CurrentOperation = &appv1.HydrateOperation{
StartedAt: metav1.Now(),
@ -138,6 +147,31 @@ func (h *Hydrator) ProcessAppHydrateQueueItem(origApp *appv1.Application) {
logCtx.Debug("Successfully processed app hydrate queue item")
}
func (h *Hydrator) shouldCheckDrySourceRevision(app *appv1.Application) bool {
if app.Status.SourceHydrator.CurrentOperation == nil || app.Status.SourceHydrator.CurrentOperation.Phase == appv1.HydrateOperationPhaseHydrating {
return false
}
if h.statusRefreshTimeout <= 0 {
return false
}
if app.Status.ReconciledAt == nil {
return true
}
return app.Status.ReconciledAt.Add(h.statusRefreshTimeout).Before(time.Now().UTC())
}
func (h *Hydrator) getLatestDrySourceRevision(app *appv1.Application) (string, error) {
project, err := h.dependencies.GetProcessableAppProj(app)
if err != nil {
return "", fmt.Errorf("failed to get app project %q: %w", app.Spec.Project, err)
}
revision, _, err := h.getManifests(context.Background(), app, "", project)
if err != nil {
return "", fmt.Errorf("failed to resolve latest dry source revision: %w", err)
}
return revision, nil
}
func getHydrationQueueKey(app *appv1.Application) types.HydrationQueueKey {
key := types.HydrationQueueKey{
SourceRepoURL: git.NormalizeGitURLAllowInvalid(app.Spec.SourceHydrator.DrySource.RepoURL),
@ -255,10 +289,13 @@ func (h *Hydrator) ProcessHydrationQueueItem(hydrationKey types.HydrationQueueKe
}
h.dependencies.PersistHydrationStatus(origApp, &app.Status.SourceHydrator)
// Request a refresh since we pushed a new commit.
err := h.dependencies.RequestAppRefresh(app.Name, app.Namespace)
if err != nil {
logCtx.WithFields(applog.GetAppLogFields(app)).WithError(err).Error("Failed to request app refresh after hydration")
// If a sync operation is in progress, the operation worker will coordinate refresh/hydration ordering.
if app.Operation == nil || app.Operation.Sync == nil {
// Request a refresh since we pushed a new commit.
err := h.dependencies.RequestAppRefresh(app.Name, app.Namespace)
if err != nil {
logCtx.WithFields(applog.GetAppLogFields(app)).WithError(err).Error("Failed to request app refresh after hydration")
}
}
}
}

View file

@ -446,6 +446,97 @@ func TestProcessAppHydrateQueueItem_HydrationPassedTimeout(t *testing.T) {
d.AssertCalled(t, "PersistHydrationStatus", mock.Anything, mock.Anything)
}
func TestProcessAppHydrateQueueItem_ReconciledTimeout_DrySourceRevisionChanged(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)
app := setTestAppPhase(newTestApp("test-app"), v1alpha1.HydrateOperationPhaseHydrated)
reconciledAt := metav1.NewTime(time.Now().Add(-2 * time.Minute))
app.Status.ReconciledAt = &reconciledAt
app.Status.SourceHydrator.CurrentOperation.DrySHA = "old-sha"
d.EXPECT().GetProcessableAppProj(app).Return(newTestProject(), nil).Once()
d.EXPECT().GetRepoObjs(mock.Anything, app, app.Spec.SourceHydrator.GetDrySource(), "main", mock.Anything).Return(nil, &repoclient.ManifestResponse{
Revision: "new-sha",
}, nil).Once()
d.EXPECT().PersistAppHydratorStatus(mock.Anything, mock.Anything).Return().Once()
d.EXPECT().AddHydrationQueueItem(mock.Anything).Return().Once()
h := &Hydrator{
dependencies: d,
statusRefreshTimeout: time.Minute,
}
h.ProcessAppHydrateQueueItem(app)
d.AssertCalled(t, "PersistAppHydratorStatus", mock.Anything, mock.Anything)
d.AssertCalled(t, "AddHydrationQueueItem", mock.Anything)
}
func TestProcessAppHydrateQueueItem_ReconciledTimeout_DrySourceRevisionUnchanged(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)
app := setTestAppPhase(newTestApp("test-app"), v1alpha1.HydrateOperationPhaseHydrated)
reconciledAt := metav1.NewTime(time.Now().Add(-2 * time.Minute))
app.Status.ReconciledAt = &reconciledAt
app.Status.SourceHydrator.CurrentOperation.DrySHA = "same-sha"
d.EXPECT().GetProcessableAppProj(app).Return(newTestProject(), nil).Once()
d.EXPECT().GetRepoObjs(mock.Anything, app, app.Spec.SourceHydrator.GetDrySource(), "main", mock.Anything).Return(nil, &repoclient.ManifestResponse{
Revision: "same-sha",
}, nil).Once()
h := &Hydrator{
dependencies: d,
statusRefreshTimeout: time.Minute,
}
h.ProcessAppHydrateQueueItem(app)
d.AssertNotCalled(t, "PersistAppHydratorStatus", mock.Anything, mock.Anything)
d.AssertNotCalled(t, "AddHydrationQueueItem", mock.Anything)
}
func TestProcessAppHydrateQueueItem_ReconciledTimeout_DrySourceRevisionLookupFails(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)
app := setTestAppPhase(newTestApp("test-app"), v1alpha1.HydrateOperationPhaseHydrated)
reconciledAt := metav1.NewTime(time.Now().Add(-2 * time.Minute))
app.Status.ReconciledAt = &reconciledAt
d.EXPECT().GetProcessableAppProj(app).Return(nil, errors.New("project lookup failed")).Once()
h := &Hydrator{
dependencies: d,
statusRefreshTimeout: time.Minute,
}
h.ProcessAppHydrateQueueItem(app)
d.AssertNotCalled(t, "PersistAppHydratorStatus", mock.Anything, mock.Anything)
d.AssertNotCalled(t, "AddHydrationQueueItem", mock.Anything)
}
func TestProcessAppHydrateQueueItem_ReconciledTimeout_DrySourceManifestResolutionFails(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)
app := setTestAppPhase(newTestApp("test-app"), v1alpha1.HydrateOperationPhaseHydrated)
reconciledAt := metav1.NewTime(time.Now().Add(-2 * time.Minute))
app.Status.ReconciledAt = &reconciledAt
d.EXPECT().GetProcessableAppProj(app).Return(newTestProject(), nil).Once()
d.EXPECT().GetRepoObjs(mock.Anything, app, app.Spec.SourceHydrator.GetDrySource(), "main", mock.Anything).Return(nil, nil, errors.New("manifest resolution failed")).Once()
h := &Hydrator{
dependencies: d,
statusRefreshTimeout: time.Minute,
}
h.ProcessAppHydrateQueueItem(app)
d.AssertNotCalled(t, "PersistAppHydratorStatus", mock.Anything, mock.Anything)
d.AssertNotCalled(t, "AddHydrationQueueItem", mock.Anything)
}
func TestProcessAppHydrateQueueItem_NoSourceHydrator(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)
@ -489,6 +580,106 @@ func TestProcessAppHydrateQueueItem_HydrationNotNeeded(t *testing.T) {
d.AssertNotCalled(t, "AddHydrationQueueItem", mock.Anything)
}
func TestHydrator_shouldCheckDrySourceRevision(t *testing.T) {
t.Parallel()
now := time.Now().UTC()
newHydratedApp := func() *v1alpha1.Application {
app := setTestAppPhase(newTestApp("test-app"), v1alpha1.HydrateOperationPhaseHydrated)
return app
}
testCases := []struct {
name string
app *v1alpha1.Application
h *Hydrator
want bool
}{
{
name: "returns false when there is no current operation",
app: newTestApp("test-app"),
h: &Hydrator{statusRefreshTimeout: time.Minute},
want: false,
},
{
name: "returns false while hydration is in progress",
app: setTestAppPhase(newTestApp("test-app"), v1alpha1.HydrateOperationPhaseHydrating),
h: &Hydrator{statusRefreshTimeout: time.Minute},
want: false,
},
{
name: "returns false when refresh timeout is disabled",
app: newHydratedApp(),
h: &Hydrator{statusRefreshTimeout: 0},
want: false,
},
{
name: "returns true when app was never reconciled",
app: newHydratedApp(),
h: &Hydrator{statusRefreshTimeout: time.Minute},
want: true,
},
{
name: "returns false when reconciled recently",
app: func() *v1alpha1.Application {
app := newHydratedApp()
reconciledAt := metav1.NewTime(now.Add(-30 * time.Second))
app.Status.ReconciledAt = &reconciledAt
return app
}(),
h: &Hydrator{statusRefreshTimeout: time.Minute},
want: false,
},
{
name: "returns true when refresh timeout elapsed",
app: func() *v1alpha1.Application {
app := newHydratedApp()
reconciledAt := metav1.NewTime(now.Add(-2 * time.Minute))
app.Status.ReconciledAt = &reconciledAt
return app
}(),
h: &Hydrator{statusRefreshTimeout: time.Minute},
want: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
assert.Equal(t, tc.want, tc.h.shouldCheckDrySourceRevision(tc.app))
})
}
}
func TestHydrator_getLatestDrySourceRevision_ProjectError(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)
app := newTestApp("test-app")
h := &Hydrator{dependencies: d}
d.EXPECT().GetProcessableAppProj(app).Return(nil, errors.New("project error")).Once()
revision, err := h.getLatestDrySourceRevision(app)
require.Error(t, err)
assert.Empty(t, revision)
assert.ErrorContains(t, err, "failed to get app project")
}
func TestHydrator_getLatestDrySourceRevision_ManifestError(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)
app := newTestApp("test-app")
h := &Hydrator{dependencies: d}
d.EXPECT().GetProcessableAppProj(app).Return(newTestProject(), nil).Once()
d.EXPECT().GetRepoObjs(mock.Anything, app, app.Spec.SourceHydrator.GetDrySource(), "main", mock.Anything).Return(nil, nil, errors.New("manifest resolution failed")).Once()
revision, err := h.getLatestDrySourceRevision(app)
require.Error(t, err)
assert.Empty(t, revision)
assert.ErrorContains(t, err, "failed to resolve latest dry source revision")
}
func TestProcessHydrationQueueItem_ValidationFails(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)
@ -668,6 +859,50 @@ func TestProcessHydrationQueueItem_SuccessfulHydration(t *testing.T) {
assert.Equal(t, app.Status.SourceHydrator.CurrentOperation.SourceHydrator, persistedStatus.LastSuccessfulOperation.SourceHydrator)
}
func TestProcessHydrationQueueItem_SuccessfulHydration_SkipsRefreshDuringSync(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)
r := mocks.NewRepoGetter(t)
rc := reposervermocks.NewRepoServerServiceClient(t)
cc := commitservermocks.NewCommitServiceClient(t)
app := setTestAppPhase(newTestApp("test-app"), v1alpha1.HydrateOperationPhaseHydrating)
app.Operation = &v1alpha1.Operation{
Sync: &v1alpha1.SyncOperation{},
}
hydrationKey := getHydrationQueueKey(app)
d.EXPECT().GetProcessableApps().Return(&v1alpha1.ApplicationList{Items: []v1alpha1.Application{*app}}, nil)
d.EXPECT().GetProcessableAppProj(mock.Anything).Return(newTestProject(), nil)
h := &Hydrator{dependencies: d, repoGetter: r, commitClientset: &commitservermocks.Clientset{CommitServiceClient: cc}, repoClientset: &reposervermocks.Clientset{RepoServerServiceClient: rc}}
var persistedStatus *v1alpha1.SourceHydratorStatus
d.EXPECT().PersistAppHydratorStatus(mock.Anything, mock.Anything).Run(func(_ *v1alpha1.Application, newStatus *v1alpha1.SourceHydratorStatus) {
persistedStatus = newStatus
}).Return().Once()
d.EXPECT().GetRepoObjs(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, &repoclient.ManifestResponse{
Revision: "abc123",
}, nil).Once()
r.EXPECT().GetRepository(mock.Anything, "https://example.com/repo", "test-project").Return(nil, nil).Once()
rc.EXPECT().GetRevisionMetadata(mock.Anything, mock.Anything).Return(nil, nil).Once()
d.EXPECT().GetWriteCredentials(mock.Anything, "https://example.com/repo", "test-project").Return(nil, nil).Once()
d.EXPECT().GetHydratorCommitMessageTemplate().Return("commit message", nil).Once()
d.EXPECT().GetCommitAuthorName().Return("", nil).Once()
d.EXPECT().GetCommitAuthorEmail().Return("", nil).Once()
cc.EXPECT().CommitHydratedManifests(mock.Anything, mock.Anything).Return(&commitclient.CommitHydratedManifestsResponse{HydratedSha: "def456"}, nil).Once()
h.ProcessHydrationQueueItem(hydrationKey)
d.AssertCalled(t, "PersistAppHydratorStatus", mock.Anything, mock.Anything)
d.AssertNotCalled(t, "RequestAppRefresh", app.Name, app.Namespace)
assert.NotNil(t, persistedStatus)
assert.NotNil(t, persistedStatus.CurrentOperation.FinishedAt)
assert.Equal(t, v1alpha1.HydrateOperationPhaseHydrated, persistedStatus.CurrentOperation.Phase)
assert.Equal(t, "abc123", persistedStatus.CurrentOperation.DrySHA)
assert.Equal(t, "def456", persistedStatus.CurrentOperation.HydratedSHA)
assert.NotNil(t, persistedStatus.LastSuccessfulOperation)
assert.Equal(t, "abc123", persistedStatus.LastSuccessfulOperation.DrySHA)
assert.Equal(t, "def456", persistedStatus.LastSuccessfulOperation.HydratedSHA)
}
func TestValidateApplications_ProjectError(t *testing.T) {
t.Parallel()
d := mocks.NewDependencies(t)

View file

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"github.com/argoproj/argo-cd/v3/common"
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
@ -121,3 +122,49 @@ func TestGetHydratorCommitMessageTemplate(t *testing.T) {
require.NoError(t, err)
assert.NotEmpty(t, tmpl)
}
func TestProcessAppHydrateQueueItem_ReconcileTimeout_EnqueuesHydrationOnDryRevisionChange(t *testing.T) {
app := newFakeApp()
app.Spec.SourceHydrator = &v1alpha1.SourceHydrator{
DrySource: v1alpha1.DrySource{
RepoURL: app.Spec.Source.RepoURL,
TargetRevision: app.Spec.Source.TargetRevision,
Path: app.Spec.Source.Path,
},
SyncSource: v1alpha1.SyncSource{
TargetBranch: "hydrated",
Path: "hydrated/path",
},
}
app.Status.SourceHydrator.CurrentOperation = &v1alpha1.HydrateOperation{
Phase: v1alpha1.HydrateOperationPhaseHydrated,
DrySHA: "old-sha",
SourceHydrator: *app.Spec.SourceHydrator,
}
reconciledAt := metav1.NewTime(time.Now().Add(-2 * time.Minute))
app.Status.ReconciledAt = &reconciledAt
data := fakeData{
apps: []runtime.Object{app, &defaultProj},
manifestResponse: &apiclient.ManifestResponse{
Manifests: []string{},
Namespace: test.FakeDestNamespace,
Server: test.FakeClusterURL,
Revision: "new-sha",
},
}
ctrl := newFakeHydratorControllerWithResync(t.Context(), &data, time.Minute, nil, nil)
ctrl.appHydrateQueue.Add(app.QualifiedName())
processed := ctrl.processAppHydrateQueueItem()
require.True(t, processed)
require.Eventually(t, func() bool {
updatedApp, err := ctrl.appLister.Applications(app.Namespace).Get(app.Name)
if err != nil {
return false
}
op := updatedApp.Status.SourceHydrator.CurrentOperation
return op != nil && op.Phase == v1alpha1.HydrateOperationPhaseHydrating
}, 2*time.Second, 25*time.Millisecond)
}