mirror of
https://github.com/argoproj/argo-cd
synced 2026-05-24 01:38:43 +00:00
API client watch helper to retry disconnections from API server (#896)
This commit is contained in:
parent
8ea474a3c9
commit
51638bc6f5
5 changed files with 71 additions and 65 deletions
|
|
@ -19,13 +19,12 @@ import (
|
|||
"github.com/spf13/pflag"
|
||||
"github.com/yudai/gojsondiff/formatter"
|
||||
"golang.org/x/crypto/ssh/terminal"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
||||
"github.com/argoproj/argo-cd/controller/services"
|
||||
"github.com/argoproj/argo-cd/errors"
|
||||
"github.com/argoproj/argo-cd/pkg/apiclient"
|
||||
argocdclient "github.com/argoproj/argo-cd/pkg/apiclient"
|
||||
argoappv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
"github.com/argoproj/argo-cd/reposerver/repository"
|
||||
|
|
@ -811,10 +810,7 @@ func NewApplicationWaitCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
|
|||
}
|
||||
appName := args[0]
|
||||
acdClient := argocdclient.NewClientOrDie(clientOpts)
|
||||
conn, appIf := acdClient.NewApplicationClientOrDie()
|
||||
defer util.Close(conn)
|
||||
|
||||
_, err := waitOnApplicationStatus(appIf, appName, appURL(acdClient, appName), timeout, watchSync, watchHealth, watchOperations, nil)
|
||||
_, err := waitOnApplicationStatus(acdClient, appName, timeout, watchSync, watchHealth, watchOperations, nil)
|
||||
errors.CheckError(err)
|
||||
},
|
||||
}
|
||||
|
|
@ -825,59 +821,6 @@ func NewApplicationWaitCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
|
|||
return command
|
||||
}
|
||||
|
||||
func isCanceledContextErr(err error) bool {
|
||||
if err == context.Canceled {
|
||||
return true
|
||||
}
|
||||
if stat, ok := status.FromError(err); ok {
|
||||
if stat.Code() == codes.Canceled {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// watchApp returns a channel of watch events for an app, retrying the watch upon errors. Closes
|
||||
// the returned channel when the context is discovered to be canceled.
|
||||
func watchApp(ctx context.Context, appIf application.ApplicationServiceClient, appName string) chan *argoappv1.ApplicationWatchEvent {
|
||||
appEventsCh := make(chan *argoappv1.ApplicationWatchEvent)
|
||||
go func() {
|
||||
defer close(appEventsCh)
|
||||
for {
|
||||
wc, err := appIf.Watch(ctx, &application.ApplicationQuery{
|
||||
Name: &appName,
|
||||
})
|
||||
if err != nil {
|
||||
if isCanceledContextErr(err) {
|
||||
return
|
||||
}
|
||||
if err != io.EOF {
|
||||
log.Warnf("watch err: %v", err)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
for {
|
||||
appEvent, err := wc.Recv()
|
||||
if err != nil {
|
||||
if isCanceledContextErr(err) {
|
||||
return
|
||||
}
|
||||
if err != io.EOF {
|
||||
log.Warnf("recv err: %v", err)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
break
|
||||
} else {
|
||||
appEventsCh <- appEvent
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
return appEventsCh
|
||||
}
|
||||
|
||||
// printAppResources prints the resources of an application in a tabwriter table
|
||||
// Optionally prints the message from the operation state
|
||||
func printAppResources(w io.Writer, app *argoappv1.Application, showOperation bool) {
|
||||
|
|
@ -987,8 +930,7 @@ func NewApplicationSyncCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
|
|||
_, err := appIf.Sync(ctx, &syncReq)
|
||||
errors.CheckError(err)
|
||||
|
||||
aURL := appURL(acdClient, appName)
|
||||
app, err := waitOnApplicationStatus(appIf, appName, aURL, timeout, false, false, true, syncResources)
|
||||
app, err := waitOnApplicationStatus(acdClient, appName, timeout, false, false, true, syncResources)
|
||||
errors.CheckError(err)
|
||||
|
||||
pruningRequired := 0
|
||||
|
|
@ -1119,7 +1061,7 @@ func calculateResourceStates(app *argoappv1.Application, syncResources []argoapp
|
|||
|
||||
const waitFormatString = "%s\t%5s\t%10s\t%10s\t%20s\t%8s\t%7s\t%10s\t%s\n"
|
||||
|
||||
func waitOnApplicationStatus(appClient application.ApplicationServiceClient, appName, appURL string, timeout uint, watchSync bool, watchHealth bool, watchOperation bool, syncResources []argoappv1.SyncOperationResource) (*argoappv1.Application, error) {
|
||||
func waitOnApplicationStatus(acdClient apiclient.Client, appName string, timeout uint, watchSync bool, watchHealth bool, watchOperation bool, syncResources []argoappv1.SyncOperationResource) (*argoappv1.Application, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
|
|
@ -1131,12 +1073,14 @@ func waitOnApplicationStatus(appClient application.ApplicationServiceClient, app
|
|||
printFinalStatus := func(app *argoappv1.Application) {
|
||||
var err error
|
||||
if refresh {
|
||||
conn, appClient := acdClient.NewApplicationClientOrDie()
|
||||
app, err = appClient.Get(context.Background(), &application.ApplicationQuery{Name: &appName, Refresh: true})
|
||||
errors.CheckError(err)
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
fmt.Println()
|
||||
printAppSummaryTable(app, appURL)
|
||||
printAppSummaryTable(app, appURL(acdClient, appName))
|
||||
fmt.Println()
|
||||
if watchOperation {
|
||||
printOperationResult(app.Status.OperationState)
|
||||
|
|
@ -1160,7 +1104,7 @@ func waitOnApplicationStatus(appClient application.ApplicationServiceClient, app
|
|||
fmt.Fprintf(w, waitFormatString, "TIMESTAMP", "GROUP", "KIND", "NAMESPACE", "NAME", "STATUS", "HEALTH", "HOOK", "MESSAGE")
|
||||
|
||||
prevStates := make(map[string]*resourceState)
|
||||
appEventCh := watchApp(ctx, appClient, appName)
|
||||
appEventCh := acdClient.WatchApplicationWithRetry(ctx, appName)
|
||||
var app *argoappv1.Application
|
||||
|
||||
for appEvent := range appEventCh {
|
||||
|
|
@ -1345,7 +1289,7 @@ func NewApplicationRollbackCommand(clientOpts *argocdclient.ClientOptions) *cobr
|
|||
})
|
||||
errors.CheckError(err)
|
||||
|
||||
_, err = waitOnApplicationStatus(appIf, appName, appURL(acdClient, appName), timeout, false, false, true, nil)
|
||||
_, err = waitOnApplicationStatus(acdClient, appName, timeout, false, false, true, nil)
|
||||
errors.CheckError(err)
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ spec:
|
|||
initContainers:
|
||||
- name: ui
|
||||
image: argoproj/argocd-ui:latest
|
||||
imagePullPolicy: Always
|
||||
command: [cp, -r, /app, /shared]
|
||||
volumeMounts:
|
||||
- mountPath: /shared
|
||||
|
|
|
|||
|
|
@ -427,6 +427,7 @@ spec:
|
|||
- /app
|
||||
- /shared
|
||||
image: argoproj/argocd-ui:latest
|
||||
imagePullPolicy: Always
|
||||
name: ui
|
||||
volumeMounts:
|
||||
- mountPath: /shared
|
||||
|
|
|
|||
|
|
@ -360,6 +360,7 @@ spec:
|
|||
- /app
|
||||
- /shared
|
||||
image: argoproj/argocd-ui:latest
|
||||
imagePullPolicy: Always
|
||||
name: ui
|
||||
volumeMounts:
|
||||
- mountPath: /shared
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
|
|
@ -19,10 +20,14 @@ import (
|
|||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/oauth2"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/argoproj/argo-cd"
|
||||
"github.com/argoproj/argo-cd/common"
|
||||
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
argoappv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
||||
"github.com/argoproj/argo-cd/server/account"
|
||||
"github.com/argoproj/argo-cd/server/application"
|
||||
"github.com/argoproj/argo-cd/server/cluster"
|
||||
|
|
@ -71,6 +76,7 @@ type Client interface {
|
|||
NewProjectClientOrDie() (*grpc.ClientConn, project.ProjectServiceClient)
|
||||
NewAccountClient() (*grpc.ClientConn, account.AccountServiceClient, error)
|
||||
NewAccountClientOrDie() (*grpc.ClientConn, account.AccountServiceClient)
|
||||
WatchApplicationWithRetry(ctx context.Context, appName string) chan *argoappv1.ApplicationWatchEvent
|
||||
}
|
||||
|
||||
// ClientOptions hold address, security, and other settings for the API client.
|
||||
|
|
@ -502,3 +508,56 @@ func (c *client) NewAccountClientOrDie() (*grpc.ClientConn, account.AccountServi
|
|||
}
|
||||
return conn, usrIf
|
||||
}
|
||||
|
||||
// WatchApplicationWithRetry returns a channel of watch events for an application, retrying the
|
||||
// watch upon errors. Closes the returned channel when the context is cancelled.
|
||||
func (c *client) WatchApplicationWithRetry(ctx context.Context, appName string) chan *argoappv1.ApplicationWatchEvent {
|
||||
appEventsCh := make(chan *argoappv1.ApplicationWatchEvent)
|
||||
cancelled := false
|
||||
go func() {
|
||||
defer close(appEventsCh)
|
||||
for !cancelled {
|
||||
conn, appIf, err := c.NewApplicationClient()
|
||||
if err == nil {
|
||||
var wc application.ApplicationService_WatchClient
|
||||
wc, err = appIf.Watch(ctx, &application.ApplicationQuery{Name: &appName})
|
||||
if err == nil {
|
||||
for {
|
||||
var appEvent *v1alpha1.ApplicationWatchEvent
|
||||
appEvent, err = wc.Recv()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
appEventsCh <- appEvent
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if isCanceledContextErr(err) {
|
||||
cancelled = true
|
||||
} else {
|
||||
if err != io.EOF {
|
||||
log.Warnf("watch err: %v", err)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
if conn != nil {
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
return appEventsCh
|
||||
}
|
||||
|
||||
func isCanceledContextErr(err error) bool {
|
||||
if err == context.Canceled {
|
||||
return true
|
||||
}
|
||||
if stat, ok := status.FromError(err); ok {
|
||||
if stat.Code() == codes.Canceled {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue