diff --git a/server/application/application.go b/server/application/application.go index 94e9f83f2a..10cf236f7d 100644 --- a/server/application/application.go +++ b/server/application/application.go @@ -207,7 +207,7 @@ func (s *Server) Get(ctx context.Context, q *ApplicationQuery) (*appv1.Applicati if err != nil { return nil, err } - a, err = argoutil.WaitForRefresh(appIf, *q.Name, nil) + a, err = argoutil.WaitForRefresh(ctx, appIf, *q.Name, nil) if err != nil { return nil, err } diff --git a/util/argo/argo.go b/util/argo/argo.go index af4e842faf..27fe0c224a 100644 --- a/util/argo/argo.go +++ b/util/argo/argo.go @@ -3,13 +3,14 @@ package argo import ( "context" "encoding/json" - "errors" "fmt" "path" "path/filepath" "strings" "time" + "github.com/argoproj/argo-cd/util/kube" + log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -100,58 +101,39 @@ func RefreshApp(appIf v1alpha1.ApplicationInterface, name string) (*argoappv1.Ap // WaitForRefresh watches an application until its comparison timestamp is after the refresh timestamp // If refresh timestamp is not present, will use current timestamp at time of call -func WaitForRefresh(appIf v1alpha1.ApplicationInterface, name string, timeout *time.Duration) (*argoappv1.Application, error) { - ctx := context.Background() +func WaitForRefresh(ctx context.Context, appIf v1alpha1.ApplicationInterface, name string, timeout *time.Duration) (*argoappv1.Application, error) { var cancel context.CancelFunc if timeout != nil { ctx, cancel = context.WithTimeout(ctx, *timeout) defer cancel() } - fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", name)) - listOpts := metav1.ListOptions{FieldSelector: fieldSelector.String()} - watchIf, err := appIf.Watch(listOpts) - if err != nil { - return nil, err - } - defer watchIf.Stop() - now := time.Now().UTC() - - for { - select { - case <-ctx.Done(): - err := ctx.Err() - if err != nil { - if err == context.DeadlineExceeded { - return nil, fmt.Errorf("Timed out (%v) waiting for application to refresh", timeout) - } - return nil, fmt.Errorf("Error waiting for refresh: %v", err) - } - return nil, fmt.Errorf("Application watch on %s closed", name) - case next := <-watchIf.ResultChan(): - if next.Type == watch.Error { - errMsg := "Application watch completed with error" - if status, ok := next.Object.(*metav1.Status); ok { - errMsg = fmt.Sprintf("%s: %v", errMsg, status) - } - return nil, errors.New(errMsg) - } - app, ok := next.Object.(*argoappv1.Application) - if !ok { - return nil, fmt.Errorf("Application event object failed conversion: %v", next) - } - refreshTimestampStr := app.ObjectMeta.Annotations[common.AnnotationKeyRefresh] - if refreshTimestampStr == "" { - refreshTimestampStr = now.String() - } - refreshTimestamp, err := time.Parse(time.RFC3339, refreshTimestampStr) - if err != nil { - return nil, fmt.Errorf("Unable to parse '%s': %v", common.AnnotationKeyRefresh, err) - } - if app.Status.ObservedAt.After(refreshTimestamp) || app.Status.ObservedAt.Time.Equal(refreshTimestamp) { - return app, nil - } + ch := kube.WatchWithRetry(ctx, func() (i watch.Interface, e error) { + fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", name)) + listOpts := metav1.ListOptions{FieldSelector: fieldSelector.String()} + return appIf.Watch(listOpts) + }) + for next := range ch { + if next.Error != nil { + return nil, next.Error + } + app, ok := next.Object.(*argoappv1.Application) + if !ok { + return nil, fmt.Errorf("Application event object failed conversion: %v", next) + } + refreshTimestampStr := app.ObjectMeta.Annotations[common.AnnotationKeyRefresh] + if refreshTimestampStr == "" { + now := time.Now().UTC() + refreshTimestampStr = now.String() + } + refreshTimestamp, err := time.Parse(time.RFC3339, refreshTimestampStr) + if err != nil { + return nil, fmt.Errorf("Unable to parse '%s': %v", common.AnnotationKeyRefresh, err) + } + if app.Status.ObservedAt.After(refreshTimestamp) || app.Status.ObservedAt.Time.Equal(refreshTimestamp) { + return app, nil } } + return nil, fmt.Errorf("application refresh deadline exceeded") } // GetSpecErrors returns list of conditions which indicates that app spec is invalid. Following is checked: diff --git a/util/argo/argo_test.go b/util/argo/argo_test.go index 58109d0c10..dc12c9a8ac 100644 --- a/util/argo/argo_test.go +++ b/util/argo/argo_test.go @@ -1,6 +1,7 @@ package argo import ( + "context" "strings" "testing" "time" @@ -53,10 +54,10 @@ func TestWaitForRefresh(t *testing.T) { // Verify timeout appIf := appClientset.ArgoprojV1alpha1().Applications("default") oneHundredMs := 100 * time.Millisecond - app, err := WaitForRefresh(appIf, "test-app", &oneHundredMs) + app, err := WaitForRefresh(context.Background(), appIf, "test-app", &oneHundredMs) assert.NotNil(t, err) assert.Nil(t, app) - assert.Contains(t, strings.ToLower(err.Error()), "timed out") + assert.Contains(t, strings.ToLower(err.Error()), "deadline exceeded") // Verify success var testApp argoappv1.Application @@ -73,7 +74,7 @@ func TestWaitForRefresh(t *testing.T) { appClientset.PrependWatchReactor("applications", testcore.DefaultWatchReactor(watcher, nil)) // simulate add/update/delete watch events go watcher.Add(&testApp) - app, err = WaitForRefresh(appIf, "test-app", &oneHundredMs) + app, err = WaitForRefresh(context.Background(), appIf, "test-app", &oneHundredMs) assert.Nil(t, err) assert.NotNil(t, app) } diff --git a/util/kube/ctl.go b/util/kube/ctl.go index 239d153f7f..582e24fb6c 100644 --- a/util/kube/ctl.go +++ b/util/kube/ctl.go @@ -13,7 +13,6 @@ import ( "time" "github.com/argoproj/argo-cd/util" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -109,37 +108,25 @@ func (k KubectlCmd) WatchResources( err = errors.New(message) } }() - var w watch.Interface - w, err = apiResIf.resourceIf.Watch(metav1.ListOptions{}) - if err != nil { - return err - } - defer w.Stop() - stopped := false - for !stopped { - select { - case event := <-w.ResultChan(): - if event.Object != nil { - ch <- event - } else if !stopped { - // Workaround for https://github.com/kubernetes/client-go/issues/334. Issue is closed but still not resolved. - return fmt.Errorf("got empty event object. restarting watch") - } - - case <-ctx.Done(): - stopped = true + watchCh := WatchWithRetry(ctx, func() (i watch.Interface, e error) { + return apiResIf.resourceIf.Watch(metav1.ListOptions{}) + }) + for next := range watchCh { + if next.Error != nil { + return next.Error + } + ch <- watch.Event{ + Object: next.Object, + Type: next.Type, } } - if !stopped { - return fmt.Errorf("channel got closed. restarting watch") - } return nil - }, fmt.Sprintf("watch resources %s %s/%s", config.ServerName, apiResIf.groupVersion, apiResIf.apiResource.Kind), ctx, watchResourcesRetryTimeout) + }, fmt.Sprintf("watch resources %s %s/%s", config.Host, apiResIf.groupVersion, apiResIf.apiResource.Kind), ctx, watchResourcesRetryTimeout) }(a) } wg.Wait() close(ch) - log.Infof("Stop watching for resources changes with in cluster %s", config.ServerName) + log.Infof("Stop watching for resources changes with in cluster %s", config.Host) }() return ch, nil } diff --git a/util/kube/kube.go b/util/kube/kube.go index d3545302c1..598e5e356e 100644 --- a/util/kube/kube.go +++ b/util/kube/kube.go @@ -2,12 +2,14 @@ package kube import ( + "context" "encoding/json" "fmt" "os" "reflect" "regexp" "strings" + "time" "github.com/ghodss/yaml" log "github.com/sirupsen/logrus" @@ -17,6 +19,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -489,3 +492,53 @@ func Remarshal(obj *unstructured.Unstructured) (*unstructured.Unstructured, erro unstrBody = jsonutil.RemoveMapFields(obj.Object, unstrBody) return &unstructured.Unstructured{Object: unstrBody}, nil } + +// WatchWithRetry returns channel of watch events or errors of failed to call watch API. +func WatchWithRetry(ctx context.Context, getWatch func() (watch.Interface, error)) chan struct { + *watch.Event + Error error +} { + ch := make(chan struct { + *watch.Event + Error error + }) + execute := func() (bool, error) { + w, err := getWatch() + if err != nil { + return false, err + } + + for { + select { + case event, ok := <-w.ResultChan(): + if ok { + ch <- struct { + *watch.Event + Error error + }{Event: &event, Error: nil} + } else { + return true, nil + } + case <-ctx.Done(): + return false, nil + } + } + } + go func() { + defer close(ch) + for { + retry, err := execute() + if err != nil { + ch <- struct { + *watch.Event + Error error + }{Error: err} + } + if !retry { + return + } + time.Sleep(time.Second) + } + }() + return ch +} diff --git a/util/util.go b/util/util.go index dac0c9080d..2fbcb63211 100644 --- a/util/util.go +++ b/util/util.go @@ -93,6 +93,7 @@ func RetryUntilSucceed(action func() error, desc string, ctx context.Context, ti log.Infof("Start %s", desc) err := action() if err == nil { + log.Infof("Completed %s", desc) return } if err != nil {