From 51638bc6f59b02de72b0aa6d5608f679b1fc9f45 Mon Sep 17 00:00:00 2001 From: Jesse Suen Date: Fri, 7 Dec 2018 11:33:58 -0800 Subject: [PATCH] API client watch helper to retry disconnections from API server (#896) --- cmd/argocd/commands/app.go | 74 +++----------------- manifests/base/argocd-server-deployment.yaml | 1 + manifests/install.yaml | 1 + manifests/namespace-install.yaml | 1 + pkg/apiclient/apiclient.go | 59 ++++++++++++++++ 5 files changed, 71 insertions(+), 65 deletions(-) diff --git a/cmd/argocd/commands/app.go b/cmd/argocd/commands/app.go index 90099c5878..3d31d296c4 100644 --- a/cmd/argocd/commands/app.go +++ b/cmd/argocd/commands/app.go @@ -19,13 +19,12 @@ import ( "github.com/spf13/pflag" "github.com/yudai/gojsondiff/formatter" "golang.org/x/crypto/ssh/terminal" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/argoproj/argo-cd/controller/services" "github.com/argoproj/argo-cd/errors" + "github.com/argoproj/argo-cd/pkg/apiclient" argocdclient "github.com/argoproj/argo-cd/pkg/apiclient" argoappv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" "github.com/argoproj/argo-cd/reposerver/repository" @@ -811,10 +810,7 @@ func NewApplicationWaitCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co } appName := args[0] acdClient := argocdclient.NewClientOrDie(clientOpts) - conn, appIf := acdClient.NewApplicationClientOrDie() - defer util.Close(conn) - - _, err := waitOnApplicationStatus(appIf, appName, appURL(acdClient, appName), timeout, watchSync, watchHealth, watchOperations, nil) + _, err := waitOnApplicationStatus(acdClient, appName, timeout, watchSync, watchHealth, watchOperations, nil) errors.CheckError(err) }, } @@ -825,59 +821,6 @@ func NewApplicationWaitCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co return command } -func isCanceledContextErr(err error) bool { - if err == context.Canceled { - return true - } - if stat, ok := status.FromError(err); ok { - if stat.Code() == codes.Canceled { - return true - } - } - return false -} - -// watchApp returns a channel of watch events for an app, retrying the watch upon errors. Closes -// the returned channel when the context is discovered to be canceled. -func watchApp(ctx context.Context, appIf application.ApplicationServiceClient, appName string) chan *argoappv1.ApplicationWatchEvent { - appEventsCh := make(chan *argoappv1.ApplicationWatchEvent) - go func() { - defer close(appEventsCh) - for { - wc, err := appIf.Watch(ctx, &application.ApplicationQuery{ - Name: &appName, - }) - if err != nil { - if isCanceledContextErr(err) { - return - } - if err != io.EOF { - log.Warnf("watch err: %v", err) - } - time.Sleep(1 * time.Second) - continue - } - for { - appEvent, err := wc.Recv() - if err != nil { - if isCanceledContextErr(err) { - return - } - if err != io.EOF { - log.Warnf("recv err: %v", err) - } - time.Sleep(1 * time.Second) - break - } else { - appEventsCh <- appEvent - } - } - } - - }() - return appEventsCh -} - // printAppResources prints the resources of an application in a tabwriter table // Optionally prints the message from the operation state func printAppResources(w io.Writer, app *argoappv1.Application, showOperation bool) { @@ -987,8 +930,7 @@ func NewApplicationSyncCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co _, err := appIf.Sync(ctx, &syncReq) errors.CheckError(err) - aURL := appURL(acdClient, appName) - app, err := waitOnApplicationStatus(appIf, appName, aURL, timeout, false, false, true, syncResources) + app, err := waitOnApplicationStatus(acdClient, appName, timeout, false, false, true, syncResources) errors.CheckError(err) pruningRequired := 0 @@ -1119,7 +1061,7 @@ func calculateResourceStates(app *argoappv1.Application, syncResources []argoapp const waitFormatString = "%s\t%5s\t%10s\t%10s\t%20s\t%8s\t%7s\t%10s\t%s\n" -func waitOnApplicationStatus(appClient application.ApplicationServiceClient, appName, appURL string, timeout uint, watchSync bool, watchHealth bool, watchOperation bool, syncResources []argoappv1.SyncOperationResource) (*argoappv1.Application, error) { +func waitOnApplicationStatus(acdClient apiclient.Client, appName string, timeout uint, watchSync bool, watchHealth bool, watchOperation bool, syncResources []argoappv1.SyncOperationResource) (*argoappv1.Application, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1131,12 +1073,14 @@ func waitOnApplicationStatus(appClient application.ApplicationServiceClient, app printFinalStatus := func(app *argoappv1.Application) { var err error if refresh { + conn, appClient := acdClient.NewApplicationClientOrDie() app, err = appClient.Get(context.Background(), &application.ApplicationQuery{Name: &appName, Refresh: true}) errors.CheckError(err) + _ = conn.Close() } fmt.Println() - printAppSummaryTable(app, appURL) + printAppSummaryTable(app, appURL(acdClient, appName)) fmt.Println() if watchOperation { printOperationResult(app.Status.OperationState) @@ -1160,7 +1104,7 @@ func waitOnApplicationStatus(appClient application.ApplicationServiceClient, app fmt.Fprintf(w, waitFormatString, "TIMESTAMP", "GROUP", "KIND", "NAMESPACE", "NAME", "STATUS", "HEALTH", "HOOK", "MESSAGE") prevStates := make(map[string]*resourceState) - appEventCh := watchApp(ctx, appClient, appName) + appEventCh := acdClient.WatchApplicationWithRetry(ctx, appName) var app *argoappv1.Application for appEvent := range appEventCh { @@ -1345,7 +1289,7 @@ func NewApplicationRollbackCommand(clientOpts *argocdclient.ClientOptions) *cobr }) errors.CheckError(err) - _, err = waitOnApplicationStatus(appIf, appName, appURL(acdClient, appName), timeout, false, false, true, nil) + _, err = waitOnApplicationStatus(acdClient, appName, timeout, false, false, true, nil) errors.CheckError(err) }, } diff --git a/manifests/base/argocd-server-deployment.yaml b/manifests/base/argocd-server-deployment.yaml index a2aec0f208..1d6ba5ebd0 100644 --- a/manifests/base/argocd-server-deployment.yaml +++ b/manifests/base/argocd-server-deployment.yaml @@ -15,6 +15,7 @@ spec: initContainers: - name: ui image: argoproj/argocd-ui:latest + imagePullPolicy: Always command: [cp, -r, /app, /shared] volumeMounts: - mountPath: /shared diff --git a/manifests/install.yaml b/manifests/install.yaml index 6beea8d496..9848263eb8 100644 --- a/manifests/install.yaml +++ b/manifests/install.yaml @@ -427,6 +427,7 @@ spec: - /app - /shared image: argoproj/argocd-ui:latest + imagePullPolicy: Always name: ui volumeMounts: - mountPath: /shared diff --git a/manifests/namespace-install.yaml b/manifests/namespace-install.yaml index ebfabaa0e4..57d38aa9cf 100644 --- a/manifests/namespace-install.yaml +++ b/manifests/namespace-install.yaml @@ -360,6 +360,7 @@ spec: - /app - /shared image: argoproj/argocd-ui:latest + imagePullPolicy: Always name: ui volumeMounts: - mountPath: /shared diff --git a/pkg/apiclient/apiclient.go b/pkg/apiclient/apiclient.go index 7925bb2628..bcb977f2a7 100644 --- a/pkg/apiclient/apiclient.go +++ b/pkg/apiclient/apiclient.go @@ -7,6 +7,7 @@ import ( "encoding/base64" "errors" "fmt" + "io" "io/ioutil" "net" "net/http" @@ -19,10 +20,14 @@ import ( log "github.com/sirupsen/logrus" "golang.org/x/oauth2" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" "github.com/argoproj/argo-cd" "github.com/argoproj/argo-cd/common" + "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" + argoappv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" "github.com/argoproj/argo-cd/server/account" "github.com/argoproj/argo-cd/server/application" "github.com/argoproj/argo-cd/server/cluster" @@ -71,6 +76,7 @@ type Client interface { NewProjectClientOrDie() (*grpc.ClientConn, project.ProjectServiceClient) NewAccountClient() (*grpc.ClientConn, account.AccountServiceClient, error) NewAccountClientOrDie() (*grpc.ClientConn, account.AccountServiceClient) + WatchApplicationWithRetry(ctx context.Context, appName string) chan *argoappv1.ApplicationWatchEvent } // ClientOptions hold address, security, and other settings for the API client. @@ -502,3 +508,56 @@ func (c *client) NewAccountClientOrDie() (*grpc.ClientConn, account.AccountServi } return conn, usrIf } + +// WatchApplicationWithRetry returns a channel of watch events for an application, retrying the +// watch upon errors. Closes the returned channel when the context is cancelled. +func (c *client) WatchApplicationWithRetry(ctx context.Context, appName string) chan *argoappv1.ApplicationWatchEvent { + appEventsCh := make(chan *argoappv1.ApplicationWatchEvent) + cancelled := false + go func() { + defer close(appEventsCh) + for !cancelled { + conn, appIf, err := c.NewApplicationClient() + if err == nil { + var wc application.ApplicationService_WatchClient + wc, err = appIf.Watch(ctx, &application.ApplicationQuery{Name: &appName}) + if err == nil { + for { + var appEvent *v1alpha1.ApplicationWatchEvent + appEvent, err = wc.Recv() + if err != nil { + break + } + appEventsCh <- appEvent + } + } + } + if err != nil { + if isCanceledContextErr(err) { + cancelled = true + } else { + if err != io.EOF { + log.Warnf("watch err: %v", err) + } + time.Sleep(1 * time.Second) + } + } + if conn != nil { + _ = conn.Close() + } + } + }() + return appEventsCh +} + +func isCanceledContextErr(err error) bool { + if err == context.Canceled { + return true + } + if stat, ok := status.FromError(err); ok { + if stat.Code() == codes.Canceled { + return true + } + } + return false +}