Fix controller deadlock when checking for stale cache (#1046)

* Controller cache was susceptible to clock skew in managed cluster

* Fix controller deadlock when checking for stale cache
This commit is contained in:
Jesse Suen 2019-01-18 10:38:51 -08:00 committed by GitHub
parent 130a5ee0d9
commit d40bbb23cb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -191,11 +191,14 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
defer c.lock.Unlock()
managedObjs := make(map[kube.ResourceKey]*unstructured.Unstructured)
// iterate all objects in live state cache to find ones associated with app
for key, o := range c.nodes {
if o.appName == a.Name && o.resource != nil && len(o.ownerRefs) == 0 {
managedObjs[key] = o.resource
}
}
// iterate target objects and identify ones that already exist in the cluster,\
// but are simply missing our label
lock := &sync.Mutex{}
err := util.RunAllAsync(len(targetObjs), func(i int) error {
targetObj := targetObjs[i]
@ -211,12 +214,13 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
} else {
var err error
managedObj, err = c.kubectl.GetResource(c.cluster.RESTConfig(), targetObj.GroupVersionKind(), existingObj.ref.Name, existingObj.ref.Namespace)
err = c.handleError(targetObj.GroupVersionKind(), existingObj.ref.Namespace, existingObj.ref.Name, err)
if err != nil && !errors.IsNotFound(err) {
if err != nil {
if errors.IsNotFound(err) {
c.checkAndInvalidateStaleCache(targetObj.GroupVersionKind(), existingObj.ref.Namespace, existingObj.ref.Name)
return nil
}
return err
}
// TODO: may need to add following line due to k8s behavior of returning an object even when err is NotFound
// managedObj = nil
}
}
}
@ -241,25 +245,26 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
func (c *clusterInfo) delete(obj *unstructured.Unstructured) error {
err := c.kubectl.DeleteResource(c.cluster.RESTConfig(), obj.GroupVersionKind(), obj.GetName(), obj.GetNamespace(), false)
err = c.handleError(obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName(), err)
if err != nil && errors.IsNotFound(err) {
err = nil
// a delete request came in for an object which does not exist. it's possible that our cache
// is stale. Check and invalidate if it is
c.lock.Lock()
c.checkAndInvalidateStaleCache(obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName())
c.lock.Unlock()
return nil
}
return err
}
func (c *clusterInfo) handleError(gvk schema.GroupVersionKind, namespace string, name string, err error) error {
if err != nil && errors.IsNotFound(err) {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.nodes[kube.NewResourceKey(gvk.Group, gvk.Kind, namespace, name)]; ok {
if c.syncTime != nil {
c.log.Warn("Dropped stale cache")
c.syncTime = nil
}
// checkAndInvalidateStaleCache checks if our cache is stale and invalidate it based on error
// should be called whenever we suspect our cache is stale
func (c *clusterInfo) checkAndInvalidateStaleCache(gvk schema.GroupVersionKind, namespace string, name string) {
if _, ok := c.nodes[kube.NewResourceKey(gvk.Group, gvk.Kind, namespace, name)]; ok {
if c.syncTime != nil {
c.log.Warnf("invalidated stale cache due to mismatch of %s, %s/%s", gvk, namespace, name)
c.invalidate()
}
}
return err
}
func (c *clusterInfo) processEvent(event watch.EventType, un *unstructured.Unstructured) error {