Issue #910 - Reconstruct tree structure on the flight to avoid inconsistent state (#921)

This commit is contained in:
Alexander Matyushentsev 2018-12-10 14:48:31 -08:00 committed by GitHub
parent 84863acac9
commit 1deeada249
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 117 deletions

View file

@ -96,6 +96,7 @@ func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) {
apis: make(map[schema.GroupVersionKind]v1.APIResource),
lock: &sync.Mutex{},
nodes: make(map[kube.ResourceKey]*node),
nsIndex: make(map[string]map[kube.ResourceKey]*node),
onAppUpdated: c.onAppUpdated,
kubectl: c.kubectl,
cluster: cluster,

View file

@ -24,6 +24,7 @@ const (
type clusterInfo struct {
apis map[schema.GroupVersionKind]metav1.APIResource
nodes map[kube.ResourceKey]*node
nsIndex map[string]map[kube.ResourceKey]*node
lock *sync.Mutex
onAppUpdated func(appName string)
kubectl kube.Kubectl
@ -52,8 +53,6 @@ func createObjInfo(un *unstructured.Unstructured) *node {
Namespace: un.GetNamespace(),
},
ownerRefs: ownerRefs,
parents: make(map[kube.ResourceKey]*node),
children: make(map[kube.ResourceKey]*node),
tags: getTags(un),
}
appName := kube.GetAppInstanceLabel(un)
@ -64,6 +63,27 @@ func createObjInfo(un *unstructured.Unstructured) *node {
return info
}
func (c *clusterInfo) setNode(n *node) {
key := n.resourceKey()
c.nodes[key] = n
ns, ok := c.nsIndex[key.Namespace]
if !ok {
ns = make(map[kube.ResourceKey]*node)
c.nsIndex[key.Namespace] = ns
}
ns[key] = n
}
func (c *clusterInfo) removeNode(key kube.ResourceKey) {
delete(c.nodes, key)
if ns, ok := c.nsIndex[key.Namespace]; ok {
delete(ns, key)
if len(ns) == 0 {
delete(c.nsIndex, key.Namespace)
}
}
}
func (c *clusterInfo) synced() bool {
return c.syncTime != nil && time.Now().Before(c.syncTime.Add(clusterSyncTimeout))
}
@ -86,18 +106,9 @@ func (c *clusterInfo) ensureSynced() error {
}
for i := range resources {
c.nodes[kube.GetResourceKey(resources[i])] = createObjInfo(resources[i])
c.setNode(createObjInfo(resources[i]))
}
nodes := make(map[kube.ResourceKey]*node)
for k, v := range c.nodes {
nodes[k] = v
}
for _, obj := range c.nodes {
if len(obj.ownerRefs) == 0 {
obj.fillChildren(nodes)
}
}
resyncTime := time.Now()
c.syncTime = &resyncTime
c.log.Info("Cluster successfully synced")
@ -109,8 +120,11 @@ func (c *clusterInfo) getChildren(obj *unstructured.Unstructured) []appv1.Resour
defer c.lock.Unlock()
children := make([]appv1.ResourceNode, 0)
if objInfo, ok := c.nodes[kube.GetResourceKey(obj)]; ok {
for _, child := range objInfo.children {
children = append(children, child.childResourceNodes())
nsNodes := c.nsIndex[obj.GetNamespace()]
for _, child := range nsNodes {
if objInfo.isParentOf(child) {
children = append(children, child.childResourceNodes(nsNodes))
}
}
}
return children
@ -129,7 +143,7 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
managedObjs := make(map[kube.ResourceKey]*unstructured.Unstructured)
for key, o := range c.nodes {
if o.appName == a.Name && o.resource != nil && len(o.parents) == 0 {
if o.appName == a.Name && o.resource != nil && len(o.ownerRefs) == 0 {
managedObjs[key] = o.resource
}
}
@ -174,14 +188,6 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
return managedObjs, nil
}
func ownerRefGV(ownerRef metav1.OwnerReference) schema.GroupVersion {
gv, err := schema.ParseGroupVersion(ownerRef.APIVersion)
if err != nil {
gv = schema.GroupVersion{}
}
return gv
}
func (c *clusterInfo) delete(obj *unstructured.Unstructured) error {
err := c.kubectl.DeleteResource(c.cluster.RESTConfig(), obj.GroupVersionKind(), obj.GetName(), obj.GetNamespace(), false)
err = c.handleError(obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName(), err)
@ -208,62 +214,36 @@ func (c *clusterInfo) handleError(gvk schema.GroupVersionKind, namespace string,
func (c *clusterInfo) processEvent(event watch.EventType, un *unstructured.Unstructured) error {
c.lock.Lock()
defer c.lock.Unlock()
obj, exists := c.nodes[kube.GetResourceKey(un)]
if exists && event == watch.Deleted {
for i := range obj.parents {
delete(obj.parents[i].children, obj.resourceKey())
key := kube.GetResourceKey(un)
existingNode, exists := c.nodes[key]
if event == watch.Deleted {
if exists {
c.removeNode(key)
if existingNode.appName != "" {
c.onAppUpdated(existingNode.appName)
}
}
for i := range obj.children {
delete(obj.children[i].parents, obj.resourceKey())
} else if event != watch.Deleted {
nodes := make([]*node, 0)
if exists {
nodes = append(nodes, existingNode)
}
delete(c.nodes, kube.GetResourceKey(un))
if obj.appName != "" {
c.onAppUpdated(obj.appName)
}
} else if !exists && event != watch.Deleted {
newObj := createObjInfo(un)
c.nodes[newObj.resourceKey()] = newObj
if len(newObj.ownerRefs) > 0 {
sameNamespace := make(map[kube.ResourceKey]*node)
for k := range c.nodes {
if c.nodes[k].ref.Namespace == un.GetNamespace() {
sameNamespace[k] = c.nodes[k]
}
}
for _, ownerRef := range newObj.ownerRefs {
if owner, ok := sameNamespace[kube.NewResourceKey(ownerRefGV(ownerRef).Group, ownerRef.Kind, un.GetNamespace(), ownerRef.Name)]; ok {
owner.fillChildren(sameNamespace)
}
}
}
if newObj.appName != "" {
c.onAppUpdated(newObj.appName)
}
} else if exists {
obj.resourceVersion = un.GetResourceVersion()
toNotify := make([]string, 0)
if obj.appName != "" {
toNotify = append(toNotify, obj.appName)
}
c.setNode(newObj)
nodes = append(nodes, newObj)
if len(obj.ownerRefs) == 0 {
newAppName := kube.GetAppInstanceLabel(un)
if newAppName != obj.appName {
obj.setAppName(newAppName)
if newAppName != "" {
toNotify = append(toNotify, newAppName)
toNotify := make(map[string]bool)
for i := range nodes {
n := nodes[i]
if ns, ok := c.nsIndex[n.ref.Namespace]; ok {
app := n.getApp(ns)
if app != "" {
toNotify[app] = true
}
}
}
if len(obj.parents) == 0 && obj.appName != "" {
obj.resource = un
} else {
obj.resource = nil
}
obj.tags = getTags(un)
for _, name := range toNotify {
for name := range toNotify {
c.onAppUpdated(name)
}
}

View file

@ -15,7 +15,6 @@ import (
"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"
"github.com/argoproj/argo-cd/common"
"github.com/argoproj/argo-cd/errors"
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/util/kube"
@ -81,6 +80,7 @@ func newCluster(resources ...*unstructured.Unstructured) *clusterInfo {
kubectl: kubetest.MockKubectlCmd{
Resources: resources,
},
nsIndex: make(map[string]map[kube.ResourceKey]*node),
cluster: &appv1.Cluster{},
syncTime: nil,
syncLock: &sync.Mutex{},
@ -257,28 +257,3 @@ func TestUpdateAppResource(t *testing.T) {
assert.Equal(t, []string{"helm-guestbook"}, updatesReceived)
}
func TestUpdateRootAppResource(t *testing.T) {
updatesReceived := make([]string, 0)
cluster := newCluster(testPod, testRS, testDeploy)
cluster.onAppUpdated = func(appName string) {
updatesReceived = append(updatesReceived, appName)
}
err := cluster.ensureSynced()
assert.Nil(t, err)
for k := range cluster.nodes {
assert.Equal(t, "helm-guestbook", cluster.nodes[k].appName)
}
updatedDeploy := testDeploy.DeepCopy()
updatedDeploy.SetLabels(map[string]string{common.LabelKeyAppInstance: "helm-guestbook2"})
err = cluster.processEvent(watch.Modified, updatedDeploy)
assert.Nil(t, err)
assert.Equal(t, []string{"helm-guestbook", "helm-guestbook2"}, updatesReceived)
for k := range cluster.nodes {
assert.Equal(t, "helm-guestbook2", cluster.nodes[k].appName)
}
}

View file

@ -13,8 +13,6 @@ type node struct {
resourceVersion string
ref v1.ObjectReference
ownerRefs []metav1.OwnerReference
children map[kube.ResourceKey]*node
parents map[kube.ResourceKey]*node
tags []string
appName string
resource *unstructured.Unstructured
@ -25,9 +23,6 @@ func (n *node) resourceKey() kube.ResourceKey {
}
func (n *node) isParentOf(child *node) bool {
if n.ref.Namespace != child.ref.Namespace {
return false
}
ownerGvk := n.ref.GroupVersionKind()
for _, ownerRef := range child.ownerRefs {
if kube.NewResourceKey(ownerGvk.Group, ownerRef.Kind, n.ref.Namespace, ownerRef.Name) == n.resourceKey() {
@ -38,29 +33,36 @@ func (n *node) isParentOf(child *node) bool {
return false
}
func (n *node) setAppName(appName string) {
n.appName = appName
for i := range n.children {
n.children[i].setAppName(appName)
func ownerRefGV(ownerRef metav1.OwnerReference) schema.GroupVersion {
gv, err := schema.ParseGroupVersion(ownerRef.APIVersion)
if err != nil {
gv = schema.GroupVersion{}
}
return gv
}
func (n *node) fillChildren(nodes map[kube.ResourceKey]*node) {
for k, child := range nodes {
if n.isParentOf(child) {
delete(nodes, k)
child.appName = n.appName
child.parents[n.resourceKey()] = n
n.children[child.resourceKey()] = child
child.fillChildren(nodes)
func (n *node) getApp(ns map[kube.ResourceKey]*node) string {
if n.appName != "" {
return n.appName
}
for _, ownerRef := range n.ownerRefs {
gv := ownerRefGV(ownerRef)
if parent, ok := ns[kube.NewResourceKey(gv.Group, ownerRef.Kind, n.ref.Namespace, ownerRef.Name)]; ok {
app := parent.getApp(ns)
if app != "" {
return app
}
}
}
return ""
}
func (n *node) childResourceNodes() appv1.ResourceNode {
func (n *node) childResourceNodes(ns map[kube.ResourceKey]*node) appv1.ResourceNode {
children := make([]appv1.ResourceNode, 0)
for i := range n.children {
children = append(children, n.children[i].childResourceNodes())
for key := range ns {
if n.isParentOf(ns[key]) {
children = append(children, ns[key].childResourceNodes(ns))
}
}
gv, err := schema.ParseGroupVersion(n.ref.APIVersion)
if err != nil {