fix(hydrator): race when pushing notes (#25700)

Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
This commit is contained in:
Michael Crenshaw 2025-12-17 11:17:55 -05:00 committed by GitHub
parent 67d425f237
commit 0a2ae95be8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 270 additions and 11 deletions

View file

@ -0,0 +1,209 @@
package commit
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/argoproj/argo-cd/v3/util/git"
)
// TestAddNoteConcurrentStaggered tests that when multiple AddNote operations run
// with slightly staggered timing, all notes persist correctly.
// Each operation gets its own git clone, simulating multiple concurrent hydration requests.
func TestAddNoteConcurrentStaggered(t *testing.T) {
t.Parallel()
remotePath, localPath := setupRepoWithRemote(t)
// Create 3 branches with commits (simulating different hydration targets)
branches := []string{"env/dev", "env/staging", "env/prod"}
commitSHAs := make([]string, 3)
for i, branch := range branches {
commitSHAs[i] = commitAndPushBranch(t, localPath, branch)
}
// Create separate clones for concurrent operations
cloneClients := make([]git.Client, 3)
for i := 0; i < 3; i++ {
cloneClients[i] = getClientForClone(t, remotePath)
}
// Add notes concurrently with slight stagger
var wg sync.WaitGroup
errors := make([]error, 3)
for i := 0; i < 3; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
time.Sleep(time.Duration(idx*50) * time.Millisecond)
errors[idx] = AddNote(cloneClients[idx], fmt.Sprintf("dry-sha-%d", idx), commitSHAs[idx])
}(i)
}
wg.Wait()
// Verify all notes persisted
verifyClient := getClientForClone(t, remotePath)
for i, commitSHA := range commitSHAs {
note, err := verifyClient.GetCommitNote(commitSHA, NoteNamespace)
require.NoError(t, err, "Note should exist for commit %d", i)
assert.Contains(t, note, fmt.Sprintf("dry-sha-%d", i))
}
}
// TestAddNoteConcurrentSimultaneous tests that when multiple AddNote operations run
// simultaneously (without delays), all notes persist correctly.
// Each operation gets its own git clone, simulating multiple concurrent hydration requests.
func TestAddNoteConcurrentSimultaneous(t *testing.T) {
t.Parallel()
remotePath, localPath := setupRepoWithRemote(t)
// Create 3 branches with commits (simulating different hydration targets)
branches := []string{"env/dev", "env/staging", "env/prod"}
commitSHAs := make([]string, 3)
for i, branch := range branches {
commitSHAs[i] = commitAndPushBranch(t, localPath, branch)
}
// Create separate clones for concurrent operations
cloneClients := make([]git.Client, 3)
for i := 0; i < 3; i++ {
cloneClients[i] = getClientForClone(t, remotePath)
}
// Add notes concurrently without delays
var wg sync.WaitGroup
startChan := make(chan struct{})
for i := 0; i < 3; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
<-startChan
_ = AddNote(cloneClients[idx], fmt.Sprintf("dry-sha-%d", idx), commitSHAs[idx])
}(i)
}
close(startChan)
wg.Wait()
// Verify all notes persisted
verifyClient := getClientForClone(t, remotePath)
for i, commitSHA := range commitSHAs {
note, err := verifyClient.GetCommitNote(commitSHA, NoteNamespace)
require.NoError(t, err, "Note should exist for commit %d", i)
assert.Contains(t, note, fmt.Sprintf("dry-sha-%d", i))
}
}
// setupRepoWithRemote creates a bare remote repo and a local repo configured to push to it.
// Returns the remote path and local path.
func setupRepoWithRemote(t *testing.T) (remotePath, localPath string) {
t.Helper()
ctx := t.Context()
// Create bare remote repository
remoteDir := t.TempDir()
remotePath = filepath.Join(remoteDir, "remote.git")
err := os.MkdirAll(remotePath, 0o755)
require.NoError(t, err)
_, err = runGitCmd(ctx, remotePath, "init", "--bare")
require.NoError(t, err)
// Create local repository
localDir := t.TempDir()
localPath = filepath.Join(localDir, "local")
err = os.MkdirAll(localPath, 0o755)
require.NoError(t, err)
_, err = runGitCmd(ctx, localPath, "init")
require.NoError(t, err)
_, err = runGitCmd(ctx, localPath, "config", "user.name", "Test User")
require.NoError(t, err)
_, err = runGitCmd(ctx, localPath, "config", "user.email", "test@example.com")
require.NoError(t, err)
_, err = runGitCmd(ctx, localPath, "remote", "add", "origin", remotePath)
require.NoError(t, err)
return remotePath, localPath
}
// commitAndPushBranch writes a file, commits it, creates a branch, and pushes to remote.
// Returns the commit SHA.
func commitAndPushBranch(t *testing.T, localPath, branch string) string {
t.Helper()
ctx := t.Context()
testFile := filepath.Join(localPath, "test.txt")
err := os.WriteFile(testFile, []byte("content for "+branch), 0o644)
require.NoError(t, err)
_, err = runGitCmd(ctx, localPath, "add", ".")
require.NoError(t, err)
_, err = runGitCmd(ctx, localPath, "commit", "-m", "commit "+branch)
require.NoError(t, err)
sha, err := runGitCmd(ctx, localPath, "rev-parse", "HEAD")
require.NoError(t, err)
_, err = runGitCmd(ctx, localPath, "branch", branch)
require.NoError(t, err)
_, err = runGitCmd(ctx, localPath, "push", "origin", branch)
require.NoError(t, err)
return sha
}
// getClientForClone creates a git client with a fresh clone of the remote repo.
func getClientForClone(t *testing.T, remotePath string) git.Client {
t.Helper()
ctx := t.Context()
workDir := t.TempDir()
client, err := git.NewClientExt(remotePath, workDir, &git.NopCreds{}, false, false, "", "")
require.NoError(t, err)
err = client.Init()
require.NoError(t, err)
_, err = runGitCmd(ctx, workDir, "config", "user.name", "Test User")
require.NoError(t, err)
_, err = runGitCmd(ctx, workDir, "config", "user.email", "test@example.com")
require.NoError(t, err)
err = client.Fetch("", 0)
require.NoError(t, err)
return client
}
// runGitCmd is a helper function to run git commands
func runGitCmd(ctx context.Context, dir string, args ...string) (string, error) {
cmd := exec.CommandContext(ctx, "git", args...)
cmd.Dir = dir
output, err := cmd.CombinedOutput()
return strings.TrimSpace(string(output)), err
}

2
go.mod
View file

@ -22,6 +22,7 @@ require (
github.com/bradleyfalzon/ghinstallation/v2 v2.17.0
github.com/casbin/casbin/v2 v2.135.0
github.com/casbin/govaluate v1.10.0
github.com/cenkalti/backoff/v5 v5.0.3
github.com/cespare/xxhash/v2 v2.3.0
github.com/chainguard-dev/git-urls v1.0.2
github.com/coreos/go-oidc/v3 v3.14.1
@ -160,7 +161,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/chai2010/gettext-go v1.0.3 // indirect
github.com/clipperhouse/displaywidth v0.6.0 // indirect
github.com/clipperhouse/stringish v0.1.1 // indirect

View file

@ -23,6 +23,7 @@ import (
"unicode/utf8"
"github.com/bmatcuk/doublestar/v4"
"github.com/cenkalti/backoff/v5"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/config"
"github.com/go-git/go-git/v5/plumbing"
@ -1142,26 +1143,75 @@ func (m *nativeGitClient) GetCommitNote(sha string, namespace string) (string, e
}
// AddAndPushNote adds a note to a DRY sha and then pushes it.
// It uses a retry mechanism to handle concurrent note updates from multiple clients.
func (m *nativeGitClient) AddAndPushNote(sha string, namespace string, note string) error {
if namespace == "" {
namespace = "commit"
}
ctx := context.Background()
ref := "--ref=" + namespace
_, err := m.runCmd(ctx, "notes", ref, "add", "-f", "-m", note, sha)
if err != nil {
return fmt.Errorf("failed to push: %w", err)
}
if m.OnPush != nil {
done := m.OnPush(m.repoURL)
defer done()
notesRef := "refs/notes/" + namespace
// Configure exponential backoff with jitter to handle concurrent note updates
b := backoff.NewExponentialBackOff()
b.InitialInterval = 50 * time.Millisecond
b.MaxInterval = 1 * time.Second
attempt := 0
operation := func() (struct{}, error) {
attempt++
// Fetch the latest notes BEFORE adding to merge concurrent updates
// Use + prefix to force update local ref (safe because we want latest remote notes)
_, fetchErr := m.runCmd(ctx, "fetch", "origin", fmt.Sprintf("+%s:%s", notesRef, notesRef))
// Ignore "couldn't find remote ref" errors (notes don't exist yet - first time)
if fetchErr != nil && !strings.Contains(fetchErr.Error(), "couldn't find remote ref") {
log.Debugf("Failed to fetch notes (will continue): %v", fetchErr)
}
// Add note locally (use -f to overwrite if this specific commit already has a note locally)
_, err := m.runCmd(ctx, "notes", ref, "add", "-f", "-m", note, sha)
if err != nil {
return struct{}{}, backoff.Permanent(fmt.Errorf("failed to add note: %w", err))
}
if m.OnPush != nil {
done := m.OnPush(m.repoURL)
defer done()
}
// Push WITHOUT -f flag to avoid overwriting other notes
err = m.runCredentialedCmd(ctx, "push", "origin", notesRef)
if err == nil {
if attempt > 1 {
log.Debugf("AddAndPushNote succeeded after %d retries for commit %s", attempt-1, sha)
}
return struct{}{}, nil
}
log.Debugf("AddAndPushNote push failed (attempt %d): %v", attempt, err)
// Check if this is a retryable error
errStr := err.Error()
isRetryable := strings.Contains(errStr, "fetch first") || // Remote updated after our fetch (concurrent push completed between our fetch and push)
strings.Contains(errStr, "reference already exists") || // Concurrent push is holding the lock (git server-side lock)
strings.Contains(errStr, "incorrect old value") || // Git detected our local ref is stale (concurrent update)
strings.Contains(errStr, "failed to update ref") // Generic ref update failure that may include transient issues
if !isRetryable {
return struct{}{}, backoff.Permanent(fmt.Errorf("failed to push note: %w", err))
}
return struct{}{}, err
}
err = m.runCredentialedCmd(ctx, "push", "-f", "origin", "refs/notes/"+namespace)
_, err := backoff.Retry(ctx, operation,
backoff.WithBackOff(b),
backoff.WithMaxElapsedTime(5*time.Second),
)
if err != nil {
return fmt.Errorf("failed to push: %w", err)
return fmt.Errorf("failed to push note after retries: %w", err)
}
return nil
}