Issue #1161 - Update resource version on every watch event (#1192)

This commit is contained in:
Alexander Matyushentsev 2019-02-26 23:24:30 -08:00 committed by GitHub
parent af0f6e578b
commit f3172d6727
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 8 deletions

View file

@ -26,8 +26,8 @@ const (
)
type gkInfo struct {
resource metav1.APIResource
listVersion string
resource metav1.APIResource
resourceVersion string
}
type clusterInfo struct {
@ -50,7 +50,7 @@ func (c *clusterInfo) getResourceVersion(gk schema.GroupKind) string {
defer c.lock.Unlock()
info, ok := c.apis[gk]
if ok {
return info.listVersion
return info.resourceVersion
}
return ""
}
@ -81,7 +81,7 @@ func (c *clusterInfo) updateCache(gk schema.GroupKind, resourceVersion string, o
c.onNodeRemoved(key, existingNode)
}
}
info.listVersion = resourceVersion
info.resourceVersion = resourceVersion
}
}
@ -174,8 +174,8 @@ func (c *clusterInfo) sync() (err error) {
}
if _, ok := c.apis[res.GVK.GroupKind()]; !ok {
c.apis[res.GVK.GroupKind()] = &gkInfo{
listVersion: res.ListResourceVersion,
resource: res.ResourceInfo,
resourceVersion: res.ListResourceVersion,
resource: res.ResourceInfo,
}
}
for i := range res.Objects {
@ -311,6 +311,9 @@ func (c *clusterInfo) processEvent(event watch.EventType, un *unstructured.Unstr
c.lock.Lock()
defer c.lock.Unlock()
key := kube.GetResourceKey(un)
if info, ok := c.apis[schema.GroupKind{Group: key.Group, Kind: key.Kind}]; ok {
info.resourceVersion = un.GetResourceVersion()
}
existingNode, exists := c.nodes[key]
if event == watch.Deleted {
if exists {

View file

@ -115,7 +115,8 @@ func (k KubectlCmd) GetResources(config *rest.Config, resourceFilter ResourceFil
const watchResourcesRetryTimeout = 1 * time.Second
// WatchResources Watches all the existing resources with the provided label name in the provided namespace in the cluster provided by the config
// WatchResources watches all the existing resources in the cluster provided by the config. Method retries watch with the most recent resource version stored in cache.
// The WatchResources returns channel which container either watch event with updated resource info or new list of resources if cached resource version had expired.
func (k KubectlCmd) WatchResources(
ctx context.Context,
config *rest.Config,
@ -156,7 +157,7 @@ func (k KubectlCmd) WatchResources(
ResourceVersion: resourceVersion,
})
if apierr.IsGone(err) {
log.Infof("List resource version of %s has expired at cluster %s, reloading list", gvk, config.Host)
log.Infof("Resource version of %s has expired at cluster %s, reloading list", gvk, config.Host)
list, err := apiResIf.resourceIf.List(metav1.ListOptions{})
if err != nil {
return nil, err