From 8d020fa6945c4ea45cae52e640e65aa19f3d2f75 Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Wed, 26 Dec 2018 13:38:35 -0800 Subject: [PATCH] Issue #956 - Slow comparison if cluster is down (#957) --- controller/cache/cache.go | 48 ++++++++++----------------- controller/cache/cluster.go | 65 +++++++++++++++++++++++++++++-------- util/kube/ctl.go | 9 +++++ util/kube/kubetest/mock.go | 13 ++++++-- 4 files changed, 88 insertions(+), 47 deletions(-) diff --git a/controller/cache/cache.go b/controller/cache/cache.go index 2e3e16113a..5cc2799059 100644 --- a/controller/cache/cache.go +++ b/controller/cache/cache.go @@ -11,7 +11,6 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/discovery" "k8s.io/client-go/tools/cache" appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" @@ -21,10 +20,6 @@ import ( "github.com/argoproj/argo-cd/util/settings" ) -const ( - watchResourcesRetryTimeout = 10 * time.Second -) - type LiveStateCache interface { IsNamespaced(server string, gvk schema.GroupVersionKind) (bool, error) // Returns child nodes for a given k8s resource @@ -73,7 +68,7 @@ type liveStateCache struct { } func (c *liveStateCache) processEvent(event watch.EventType, obj *unstructured.Unstructured, url string) error { - info, err := c.getCluster(url) + info, err := c.getSyncedCluster(url) if err != nil { return err } @@ -89,6 +84,7 @@ func (c *liveStateCache) removeCluster(server string) { func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) { c.lock.Lock() + defer c.lock.Unlock() info, ok := c.clusters[server] if !ok { cluster, err := c.db.GetCluster(context.Background(), server) @@ -110,26 +106,16 @@ func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) { } c.clusters[cluster.Server] = info - disco, err := discovery.NewDiscoveryClientForConfig(cluster.RESTConfig()) - if err != nil { - return nil, err - } - resources, err := disco.ServerResources() - if err != nil { - return nil, err - } - for _, r := range resources { - gv, err := schema.ParseGroupVersion(r.GroupVersion) - if err != nil { - gv = schema.GroupVersion{} - } - for i := range r.APIResources { - info.apis[gv.WithKind(r.APIResources[i].Kind)] = r.APIResources[i] - } - } } - c.lock.Unlock() - err := info.ensureSynced() + return info, nil +} + +func (c *liveStateCache) getSyncedCluster(server string) (*clusterInfo, error) { + info, err := c.getCluster(server) + if err != nil { + return nil, err + } + err = info.ensureSynced() if err != nil { return nil, err } @@ -149,7 +135,7 @@ func (c *liveStateCache) Invalidate() { } func (c *liveStateCache) Delete(server string, obj *unstructured.Unstructured) error { - clusterInfo, err := c.getCluster(server) + clusterInfo, err := c.getSyncedCluster(server) if err != nil { return err } @@ -157,7 +143,7 @@ func (c *liveStateCache) Delete(server string, obj *unstructured.Unstructured) e } func (c *liveStateCache) IsNamespaced(server string, gvk schema.GroupVersionKind) (bool, error) { - clusterInfo, err := c.getCluster(server) + clusterInfo, err := c.getSyncedCluster(server) if err != nil { return false, err } @@ -165,7 +151,7 @@ func (c *liveStateCache) IsNamespaced(server string, gvk schema.GroupVersionKind } func (c *liveStateCache) GetChildren(server string, obj *unstructured.Unstructured) ([]appv1.ResourceNode, error) { - clusterInfo, err := c.getCluster(server) + clusterInfo, err := c.getSyncedCluster(server) if err != nil { return nil, err } @@ -173,7 +159,7 @@ func (c *liveStateCache) GetChildren(server string, obj *unstructured.Unstructur } func (c *liveStateCache) GetManagedLiveObjs(a *appv1.Application, targetObjs []*unstructured.Unstructured) (map[kube.ResourceKey]*unstructured.Unstructured, error) { - clusterInfo, err := c.getCluster(a.Spec.Destination.Server) + clusterInfo, err := c.getSyncedCluster(a.Spec.Destination.Server) if err != nil { return nil, err } @@ -254,7 +240,7 @@ func (c *liveStateCache) Run(ctx context.Context) { return c.db.WatchClusters(ctx, clusterEventCallback) - }, "watch clusters", ctx, watchResourcesRetryTimeout) + }, "watch clusters", ctx, clusterRetryTimeout) <-ctx.Done() } @@ -294,5 +280,5 @@ func (c *liveStateCache) watchClusterResources(ctx context.Context, item appv1.C } } return fmt.Errorf("resource updates channel has closed") - }, fmt.Sprintf("watch app resources on %s", item.Server), ctx, watchResourcesRetryTimeout) + }, fmt.Sprintf("watch app resources on %s", item.Server), ctx, clusterRetryTimeout) } diff --git a/controller/cache/cluster.go b/controller/cache/cluster.go index 1f24de1d68..760603a81e 100644 --- a/controller/cache/cluster.go +++ b/controller/cache/cluster.go @@ -1,6 +1,8 @@ package cache import ( + "fmt" + "runtime/debug" "sync" "time" @@ -19,7 +21,8 @@ import ( ) const ( - clusterSyncTimeout = 1 * time.Hour + clusterSyncTimeout = 1 * time.Hour + clusterRetryTimeout = 10 * time.Second ) type clusterInfo struct { @@ -32,6 +35,7 @@ type clusterInfo struct { cluster *appv1.Cluster syncLock *sync.Mutex syncTime *time.Time + syncError error log *log.Entry settings *settings.ArgoCDSettings } @@ -91,19 +95,39 @@ func (c *clusterInfo) invalidate() { } func (c *clusterInfo) synced() bool { - return c.syncTime != nil && time.Now().Before(c.syncTime.Add(clusterSyncTimeout)) + if c.syncTime == nil { + return false + } + if c.syncError != nil { + return time.Now().Before(c.syncTime.Add(clusterRetryTimeout)) + } + return time.Now().Before(c.syncTime.Add(clusterSyncTimeout)) } -func (c *clusterInfo) ensureSynced() error { - if c.synced() { - return nil - } - c.syncLock.Lock() - defer c.syncLock.Unlock() - if c.synced() { - return nil - } +func (c *clusterInfo) sync() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack()) + } + }() + c.log.Info("Start syncing cluster") + + clusterResources, err := c.kubectl.GetAPIResources(c.cluster.RESTConfig()) + if err != nil { + return err + } + c.apis = make(map[schema.GroupVersionKind]metav1.APIResource) + for _, r := range clusterResources { + gv, err := schema.ParseGroupVersion(r.GroupVersion) + if err != nil { + gv = schema.GroupVersion{} + } + for i := range r.APIResources { + c.apis[gv.WithKind(r.APIResources[i].Kind)] = r.APIResources[i] + } + } + c.nodes = make(map[kube.ResourceKey]*node) resources, err := c.kubectl.GetResources(c.cluster.RESTConfig(), "") if err != nil { @@ -116,12 +140,27 @@ func (c *clusterInfo) ensureSynced() error { c.setNode(createObjInfo(resources[i], appLabelKey)) } - resyncTime := time.Now() - c.syncTime = &resyncTime c.log.Info("Cluster successfully synced") return nil } +func (c *clusterInfo) ensureSynced() error { + if c.synced() { + return c.syncError + } + c.syncLock.Lock() + defer c.syncLock.Unlock() + if c.synced() { + return c.syncError + } + + err := c.sync() + syncTime := time.Now() + c.syncTime = &syncTime + c.syncError = err + return c.syncError +} + func (c *clusterInfo) getChildren(obj *unstructured.Unstructured) []appv1.ResourceNode { c.lock.Lock() defer c.lock.Unlock() diff --git a/util/kube/ctl.go b/util/kube/ctl.go index 7b56715dc9..ea563fbd5a 100644 --- a/util/kube/ctl.go +++ b/util/kube/ctl.go @@ -34,10 +34,19 @@ type Kubectl interface { GetResource(config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string) (*unstructured.Unstructured, error) WatchResources(ctx context.Context, config *rest.Config, namespace string) (chan watch.Event, error) GetResources(config *rest.Config, namespace string) ([]*unstructured.Unstructured, error) + GetAPIResources(config *rest.Config) ([]*metav1.APIResourceList, error) } type KubectlCmd struct{} +func (k KubectlCmd) GetAPIResources(config *rest.Config) ([]*metav1.APIResourceList, error) { + disco, err := discovery.NewDiscoveryClientForConfig(config) + if err != nil { + return nil, err + } + return disco.ServerResources() +} + // GetResources returns all kubernetes resources func (k KubectlCmd) GetResources(config *rest.Config, namespace string) ([]*unstructured.Unstructured, error) { diff --git a/util/kube/kubetest/mock.go b/util/kube/kubetest/mock.go index 2ad919ac6c..9683e6cdd7 100644 --- a/util/kube/kubetest/mock.go +++ b/util/kube/kubetest/mock.go @@ -3,6 +3,8 @@ package kubetest import ( "context" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" @@ -15,9 +17,14 @@ type KubectlOutput struct { } type MockKubectlCmd struct { - Resources []*unstructured.Unstructured - Commands map[string]KubectlOutput - Events chan watch.Event + APIResources []*v1.APIResourceList + Resources []*unstructured.Unstructured + Commands map[string]KubectlOutput + Events chan watch.Event +} + +func (k MockKubectlCmd) GetAPIResources(config *rest.Config) ([]*v1.APIResourceList, error) { + return k.APIResources, nil } func (k MockKubectlCmd) GetResources(config *rest.Config, namespace string) ([]*unstructured.Unstructured, error) {