diff --git a/controller/cache/cluster.go b/controller/cache/cluster.go index 234eebcfcd..e8f062b1ce 100644 --- a/controller/cache/cluster.go +++ b/controller/cache/cluster.go @@ -26,8 +26,8 @@ const ( ) type gkInfo struct { - resource metav1.APIResource - listVersion string + resource metav1.APIResource + resourceVersion string } type clusterInfo struct { @@ -50,7 +50,7 @@ func (c *clusterInfo) getResourceVersion(gk schema.GroupKind) string { defer c.lock.Unlock() info, ok := c.apis[gk] if ok { - return info.listVersion + return info.resourceVersion } return "" } @@ -81,7 +81,7 @@ func (c *clusterInfo) updateCache(gk schema.GroupKind, resourceVersion string, o c.onNodeRemoved(key, existingNode) } } - info.listVersion = resourceVersion + info.resourceVersion = resourceVersion } } @@ -174,8 +174,8 @@ func (c *clusterInfo) sync() (err error) { } if _, ok := c.apis[res.GVK.GroupKind()]; !ok { c.apis[res.GVK.GroupKind()] = &gkInfo{ - listVersion: res.ListResourceVersion, - resource: res.ResourceInfo, + resourceVersion: res.ListResourceVersion, + resource: res.ResourceInfo, } } for i := range res.Objects { @@ -311,6 +311,9 @@ func (c *clusterInfo) processEvent(event watch.EventType, un *unstructured.Unstr c.lock.Lock() defer c.lock.Unlock() key := kube.GetResourceKey(un) + if info, ok := c.apis[schema.GroupKind{Group: key.Group, Kind: key.Kind}]; ok { + info.resourceVersion = un.GetResourceVersion() + } existingNode, exists := c.nodes[key] if event == watch.Deleted { if exists { diff --git a/util/kube/ctl.go b/util/kube/ctl.go index 770b482d10..5b55e223a3 100644 --- a/util/kube/ctl.go +++ b/util/kube/ctl.go @@ -115,7 +115,8 @@ func (k KubectlCmd) GetResources(config *rest.Config, resourceFilter ResourceFil const watchResourcesRetryTimeout = 1 * time.Second -// WatchResources Watches all the existing resources with the provided label name in the provided namespace in the cluster provided by the config +// WatchResources watches all the existing resources in the cluster provided by the config. Method retries watch with the most recent resource version stored in cache. +// The WatchResources returns channel which container either watch event with updated resource info or new list of resources if cached resource version had expired. func (k KubectlCmd) WatchResources( ctx context.Context, config *rest.Config, @@ -156,7 +157,7 @@ func (k KubectlCmd) WatchResources( ResourceVersion: resourceVersion, }) if apierr.IsGone(err) { - log.Infof("List resource version of %s has expired at cluster %s, reloading list", gvk, config.Host) + log.Infof("Resource version of %s has expired at cluster %s, reloading list", gvk, config.Host) list, err := apiResIf.resourceIf.List(metav1.ListOptions{}) if err != nil { return nil, err