From f3172d67276dc7cf19957275702003a998f21688 Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Tue, 26 Feb 2019 23:24:30 -0800 Subject: [PATCH] Issue #1161 - Update resource version on every watch event (#1192) --- controller/cache/cluster.go | 15 +++++++++------ util/kube/ctl.go | 5 +++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/controller/cache/cluster.go b/controller/cache/cluster.go index 234eebcfcd..e8f062b1ce 100644 --- a/controller/cache/cluster.go +++ b/controller/cache/cluster.go @@ -26,8 +26,8 @@ const ( ) type gkInfo struct { - resource metav1.APIResource - listVersion string + resource metav1.APIResource + resourceVersion string } type clusterInfo struct { @@ -50,7 +50,7 @@ func (c *clusterInfo) getResourceVersion(gk schema.GroupKind) string { defer c.lock.Unlock() info, ok := c.apis[gk] if ok { - return info.listVersion + return info.resourceVersion } return "" } @@ -81,7 +81,7 @@ func (c *clusterInfo) updateCache(gk schema.GroupKind, resourceVersion string, o c.onNodeRemoved(key, existingNode) } } - info.listVersion = resourceVersion + info.resourceVersion = resourceVersion } } @@ -174,8 +174,8 @@ func (c *clusterInfo) sync() (err error) { } if _, ok := c.apis[res.GVK.GroupKind()]; !ok { c.apis[res.GVK.GroupKind()] = &gkInfo{ - listVersion: res.ListResourceVersion, - resource: res.ResourceInfo, + resourceVersion: res.ListResourceVersion, + resource: res.ResourceInfo, } } for i := range res.Objects { @@ -311,6 +311,9 @@ func (c *clusterInfo) processEvent(event watch.EventType, un *unstructured.Unstr c.lock.Lock() defer c.lock.Unlock() key := kube.GetResourceKey(un) + if info, ok := c.apis[schema.GroupKind{Group: key.Group, Kind: key.Kind}]; ok { + info.resourceVersion = un.GetResourceVersion() + } existingNode, exists := c.nodes[key] if event == watch.Deleted { if exists { diff --git a/util/kube/ctl.go b/util/kube/ctl.go index 770b482d10..5b55e223a3 100644 --- a/util/kube/ctl.go +++ b/util/kube/ctl.go @@ -115,7 +115,8 @@ func (k KubectlCmd) GetResources(config *rest.Config, resourceFilter ResourceFil const watchResourcesRetryTimeout = 1 * time.Second -// WatchResources Watches all the existing resources with the provided label name in the provided namespace in the cluster provided by the config +// WatchResources watches all the existing resources in the cluster provided by the config. Method retries watch with the most recent resource version stored in cache. +// The WatchResources returns channel which container either watch event with updated resource info or new list of resources if cached resource version had expired. func (k KubectlCmd) WatchResources( ctx context.Context, config *rest.Config, @@ -156,7 +157,7 @@ func (k KubectlCmd) WatchResources( ResourceVersion: resourceVersion, }) if apierr.IsGone(err) { - log.Infof("List resource version of %s has expired at cluster %s, reloading list", gvk, config.Host) + log.Infof("Resource version of %s has expired at cluster %s, reloading list", gvk, config.Host) list, err := apiResIf.resourceIf.List(metav1.ListOptions{}) if err != nil { return nil, err