diff --git a/controller/cache/cluster.go b/controller/cache/cluster.go index 297521be28..19dc178913 100644 --- a/controller/cache/cluster.go +++ b/controller/cache/cluster.go @@ -191,11 +191,14 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns defer c.lock.Unlock() managedObjs := make(map[kube.ResourceKey]*unstructured.Unstructured) + // iterate all objects in live state cache to find ones associated with app for key, o := range c.nodes { if o.appName == a.Name && o.resource != nil && len(o.ownerRefs) == 0 { managedObjs[key] = o.resource } } + // iterate target objects and identify ones that already exist in the cluster,\ + // but are simply missing our label lock := &sync.Mutex{} err := util.RunAllAsync(len(targetObjs), func(i int) error { targetObj := targetObjs[i] @@ -211,12 +214,13 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns } else { var err error managedObj, err = c.kubectl.GetResource(c.cluster.RESTConfig(), targetObj.GroupVersionKind(), existingObj.ref.Name, existingObj.ref.Namespace) - err = c.handleError(targetObj.GroupVersionKind(), existingObj.ref.Namespace, existingObj.ref.Name, err) - if err != nil && !errors.IsNotFound(err) { + if err != nil { + if errors.IsNotFound(err) { + c.checkAndInvalidateStaleCache(targetObj.GroupVersionKind(), existingObj.ref.Namespace, existingObj.ref.Name) + return nil + } return err } - // TODO: may need to add following line due to k8s behavior of returning an object even when err is NotFound - // managedObj = nil } } } @@ -241,25 +245,26 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns func (c *clusterInfo) delete(obj *unstructured.Unstructured) error { err := c.kubectl.DeleteResource(c.cluster.RESTConfig(), obj.GroupVersionKind(), obj.GetName(), obj.GetNamespace(), false) - err = c.handleError(obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName(), err) if err != nil && errors.IsNotFound(err) { - err = nil + // a delete request came in for an object which does not exist. it's possible that our cache + // is stale. Check and invalidate if it is + c.lock.Lock() + c.checkAndInvalidateStaleCache(obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName()) + c.lock.Unlock() + return nil } return err } -func (c *clusterInfo) handleError(gvk schema.GroupVersionKind, namespace string, name string, err error) error { - if err != nil && errors.IsNotFound(err) { - c.lock.Lock() - defer c.lock.Unlock() - if _, ok := c.nodes[kube.NewResourceKey(gvk.Group, gvk.Kind, namespace, name)]; ok { - if c.syncTime != nil { - c.log.Warn("Dropped stale cache") - c.syncTime = nil - } +// checkAndInvalidateStaleCache checks if our cache is stale and invalidate it based on error +// should be called whenever we suspect our cache is stale +func (c *clusterInfo) checkAndInvalidateStaleCache(gvk schema.GroupVersionKind, namespace string, name string) { + if _, ok := c.nodes[kube.NewResourceKey(gvk.Group, gvk.Kind, namespace, name)]; ok { + if c.syncTime != nil { + c.log.Warnf("invalidated stale cache due to mismatch of %s, %s/%s", gvk, namespace, name) + c.invalidate() } } - return err } func (c *clusterInfo) processEvent(event watch.EventType, un *unstructured.Unstructured) error {