Issue #760 - Properly read watch events to avoid nil pointer errors (#890)

This commit is contained in:
Alexander Matyushentsev 2018-12-06 15:20:37 -08:00 committed by GitHub
parent 5cedfb8ead
commit 7eb211eb94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 99 additions and 75 deletions

View file

@ -207,7 +207,7 @@ func (s *Server) Get(ctx context.Context, q *ApplicationQuery) (*appv1.Applicati
if err != nil {
return nil, err
}
a, err = argoutil.WaitForRefresh(appIf, *q.Name, nil)
a, err = argoutil.WaitForRefresh(ctx, appIf, *q.Name, nil)
if err != nil {
return nil, err
}

View file

@ -3,13 +3,14 @@ package argo
import (
"context"
"encoding/json"
"errors"
"fmt"
"path"
"path/filepath"
"strings"
"time"
"github.com/argoproj/argo-cd/util/kube"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -100,58 +101,39 @@ func RefreshApp(appIf v1alpha1.ApplicationInterface, name string) (*argoappv1.Ap
// WaitForRefresh watches an application until its comparison timestamp is after the refresh timestamp
// If refresh timestamp is not present, will use current timestamp at time of call
func WaitForRefresh(appIf v1alpha1.ApplicationInterface, name string, timeout *time.Duration) (*argoappv1.Application, error) {
ctx := context.Background()
func WaitForRefresh(ctx context.Context, appIf v1alpha1.ApplicationInterface, name string, timeout *time.Duration) (*argoappv1.Application, error) {
var cancel context.CancelFunc
if timeout != nil {
ctx, cancel = context.WithTimeout(ctx, *timeout)
defer cancel()
}
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", name))
listOpts := metav1.ListOptions{FieldSelector: fieldSelector.String()}
watchIf, err := appIf.Watch(listOpts)
if err != nil {
return nil, err
}
defer watchIf.Stop()
now := time.Now().UTC()
for {
select {
case <-ctx.Done():
err := ctx.Err()
if err != nil {
if err == context.DeadlineExceeded {
return nil, fmt.Errorf("Timed out (%v) waiting for application to refresh", timeout)
}
return nil, fmt.Errorf("Error waiting for refresh: %v", err)
}
return nil, fmt.Errorf("Application watch on %s closed", name)
case next := <-watchIf.ResultChan():
if next.Type == watch.Error {
errMsg := "Application watch completed with error"
if status, ok := next.Object.(*metav1.Status); ok {
errMsg = fmt.Sprintf("%s: %v", errMsg, status)
}
return nil, errors.New(errMsg)
}
app, ok := next.Object.(*argoappv1.Application)
if !ok {
return nil, fmt.Errorf("Application event object failed conversion: %v", next)
}
refreshTimestampStr := app.ObjectMeta.Annotations[common.AnnotationKeyRefresh]
if refreshTimestampStr == "" {
refreshTimestampStr = now.String()
}
refreshTimestamp, err := time.Parse(time.RFC3339, refreshTimestampStr)
if err != nil {
return nil, fmt.Errorf("Unable to parse '%s': %v", common.AnnotationKeyRefresh, err)
}
if app.Status.ObservedAt.After(refreshTimestamp) || app.Status.ObservedAt.Time.Equal(refreshTimestamp) {
return app, nil
}
ch := kube.WatchWithRetry(ctx, func() (i watch.Interface, e error) {
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", name))
listOpts := metav1.ListOptions{FieldSelector: fieldSelector.String()}
return appIf.Watch(listOpts)
})
for next := range ch {
if next.Error != nil {
return nil, next.Error
}
app, ok := next.Object.(*argoappv1.Application)
if !ok {
return nil, fmt.Errorf("Application event object failed conversion: %v", next)
}
refreshTimestampStr := app.ObjectMeta.Annotations[common.AnnotationKeyRefresh]
if refreshTimestampStr == "" {
now := time.Now().UTC()
refreshTimestampStr = now.String()
}
refreshTimestamp, err := time.Parse(time.RFC3339, refreshTimestampStr)
if err != nil {
return nil, fmt.Errorf("Unable to parse '%s': %v", common.AnnotationKeyRefresh, err)
}
if app.Status.ObservedAt.After(refreshTimestamp) || app.Status.ObservedAt.Time.Equal(refreshTimestamp) {
return app, nil
}
}
return nil, fmt.Errorf("application refresh deadline exceeded")
}
// GetSpecErrors returns list of conditions which indicates that app spec is invalid. Following is checked:

View file

@ -1,6 +1,7 @@
package argo
import (
"context"
"strings"
"testing"
"time"
@ -53,10 +54,10 @@ func TestWaitForRefresh(t *testing.T) {
// Verify timeout
appIf := appClientset.ArgoprojV1alpha1().Applications("default")
oneHundredMs := 100 * time.Millisecond
app, err := WaitForRefresh(appIf, "test-app", &oneHundredMs)
app, err := WaitForRefresh(context.Background(), appIf, "test-app", &oneHundredMs)
assert.NotNil(t, err)
assert.Nil(t, app)
assert.Contains(t, strings.ToLower(err.Error()), "timed out")
assert.Contains(t, strings.ToLower(err.Error()), "deadline exceeded")
// Verify success
var testApp argoappv1.Application
@ -73,7 +74,7 @@ func TestWaitForRefresh(t *testing.T) {
appClientset.PrependWatchReactor("applications", testcore.DefaultWatchReactor(watcher, nil))
// simulate add/update/delete watch events
go watcher.Add(&testApp)
app, err = WaitForRefresh(appIf, "test-app", &oneHundredMs)
app, err = WaitForRefresh(context.Background(), appIf, "test-app", &oneHundredMs)
assert.Nil(t, err)
assert.NotNil(t, app)
}

View file

@ -13,7 +13,6 @@ import (
"time"
"github.com/argoproj/argo-cd/util"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@ -109,37 +108,25 @@ func (k KubectlCmd) WatchResources(
err = errors.New(message)
}
}()
var w watch.Interface
w, err = apiResIf.resourceIf.Watch(metav1.ListOptions{})
if err != nil {
return err
}
defer w.Stop()
stopped := false
for !stopped {
select {
case event := <-w.ResultChan():
if event.Object != nil {
ch <- event
} else if !stopped {
// Workaround for https://github.com/kubernetes/client-go/issues/334. Issue is closed but still not resolved.
return fmt.Errorf("got empty event object. restarting watch")
}
case <-ctx.Done():
stopped = true
watchCh := WatchWithRetry(ctx, func() (i watch.Interface, e error) {
return apiResIf.resourceIf.Watch(metav1.ListOptions{})
})
for next := range watchCh {
if next.Error != nil {
return next.Error
}
ch <- watch.Event{
Object: next.Object,
Type: next.Type,
}
}
if !stopped {
return fmt.Errorf("channel got closed. restarting watch")
}
return nil
}, fmt.Sprintf("watch resources %s %s/%s", config.ServerName, apiResIf.groupVersion, apiResIf.apiResource.Kind), ctx, watchResourcesRetryTimeout)
}, fmt.Sprintf("watch resources %s %s/%s", config.Host, apiResIf.groupVersion, apiResIf.apiResource.Kind), ctx, watchResourcesRetryTimeout)
}(a)
}
wg.Wait()
close(ch)
log.Infof("Stop watching for resources changes with in cluster %s", config.ServerName)
log.Infof("Stop watching for resources changes with in cluster %s", config.Host)
}()
return ch, nil
}

View file

@ -2,12 +2,14 @@
package kube
import (
"context"
"encoding/json"
"fmt"
"os"
"reflect"
"regexp"
"strings"
"time"
"github.com/ghodss/yaml"
log "github.com/sirupsen/logrus"
@ -17,6 +19,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
@ -489,3 +492,53 @@ func Remarshal(obj *unstructured.Unstructured) (*unstructured.Unstructured, erro
unstrBody = jsonutil.RemoveMapFields(obj.Object, unstrBody)
return &unstructured.Unstructured{Object: unstrBody}, nil
}
// WatchWithRetry returns channel of watch events or errors of failed to call watch API.
func WatchWithRetry(ctx context.Context, getWatch func() (watch.Interface, error)) chan struct {
*watch.Event
Error error
} {
ch := make(chan struct {
*watch.Event
Error error
})
execute := func() (bool, error) {
w, err := getWatch()
if err != nil {
return false, err
}
for {
select {
case event, ok := <-w.ResultChan():
if ok {
ch <- struct {
*watch.Event
Error error
}{Event: &event, Error: nil}
} else {
return true, nil
}
case <-ctx.Done():
return false, nil
}
}
}
go func() {
defer close(ch)
for {
retry, err := execute()
if err != nil {
ch <- struct {
*watch.Event
Error error
}{Error: err}
}
if !retry {
return
}
time.Sleep(time.Second)
}
}()
return ch
}

View file

@ -93,6 +93,7 @@ func RetryUntilSucceed(action func() error, desc string, ctx context.Context, ti
log.Infof("Start %s", desc)
err := action()
if err == nil {
log.Infof("Completed %s", desc)
return
}
if err != nil {