2018-02-20 23:23:37 +00:00
package controller
import (
"context"
2018-03-09 10:01:15 +00:00
"encoding/json"
"fmt"
2019-07-15 20:34:26 +00:00
"math"
2018-07-07 07:54:06 +00:00
"reflect"
2018-05-15 07:36:11 +00:00
"runtime/debug"
2018-11-27 21:38:00 +00:00
"strings"
2018-04-11 19:53:33 +00:00
"sync"
2018-05-05 00:01:57 +00:00
"time"
2018-04-11 19:53:33 +00:00
2018-11-30 21:50:27 +00:00
log "github.com/sirupsen/logrus"
2019-09-10 16:56:48 +00:00
"golang.org/x/sync/semaphore"
2019-02-22 23:20:34 +00:00
v1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
2018-02-26 15:43:35 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2018-03-07 06:05:07 +00:00
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2018-05-03 22:55:01 +00:00
"k8s.io/apimachinery/pkg/types"
2018-02-20 23:23:37 +00:00
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
2018-07-07 07:54:06 +00:00
2018-12-24 06:25:04 +00:00
"github.com/argoproj/argo-cd/common"
2018-11-28 21:38:02 +00:00
statecache "github.com/argoproj/argo-cd/controller/cache"
2019-02-22 23:20:34 +00:00
"github.com/argoproj/argo-cd/controller/metrics"
"github.com/argoproj/argo-cd/errors"
2019-06-05 01:17:41 +00:00
"github.com/argoproj/argo-cd/pkg/apis/application"
2018-07-07 07:54:06 +00:00
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions"
2019-01-08 22:53:45 +00:00
"github.com/argoproj/argo-cd/pkg/client/informers/externalversions/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/pkg/client/listers/application/v1alpha1"
2019-07-13 00:17:23 +00:00
"github.com/argoproj/argo-cd/reposerver/apiclient"
2018-11-28 21:38:02 +00:00
"github.com/argoproj/argo-cd/util"
2018-07-09 17:45:03 +00:00
"github.com/argoproj/argo-cd/util/argo"
2019-02-13 23:20:40 +00:00
argocache "github.com/argoproj/argo-cd/util/cache"
2018-07-07 07:54:06 +00:00
"github.com/argoproj/argo-cd/util/db"
2018-11-30 21:50:27 +00:00
"github.com/argoproj/argo-cd/util/diff"
2018-07-07 07:54:06 +00:00
"github.com/argoproj/argo-cd/util/kube"
2018-11-17 01:10:04 +00:00
settings_util "github.com/argoproj/argo-cd/util/settings"
2018-02-20 23:23:37 +00:00
)
2018-04-11 19:53:33 +00:00
const (
2018-05-11 18:50:32 +00:00
updateOperationStateTimeout = 1 * time . Second
2019-08-19 15:14:48 +00:00
// orphanedIndex contains application which monitor orphaned resources by namespace
orphanedIndex = "orphaned"
2018-04-11 19:53:33 +00:00
)
2019-07-15 20:34:26 +00:00
type CompareWith int
const (
// Compare live application state against state defined in latest git revision.
CompareWithLatest CompareWith = 2
// Compare live application state against state defined using revision of most recent comparison.
CompareWithRecent CompareWith = 1
// Skip comparison and only refresh application resources tree
ComparisonWithNothing CompareWith = 0
)
func ( a CompareWith ) Max ( b CompareWith ) CompareWith {
return CompareWith ( math . Max ( float64 ( a ) , float64 ( b ) ) )
}
2018-02-20 23:23:37 +00:00
// ApplicationController is the controller for application resources.
type ApplicationController struct {
2019-02-13 23:20:40 +00:00
cache * argocache . Cache
2018-12-18 02:23:35 +00:00
namespace string
kubeClientset kubernetes . Interface
kubectl kube . Kubectl
applicationClientset appclientset . Interface
auditLogger * argo . AuditLogger
appRefreshQueue workqueue . RateLimitingInterface
appOperationQueue workqueue . RateLimitingInterface
appInformer cache . SharedIndexInformer
2019-02-22 23:20:34 +00:00
appLister applisters . ApplicationLister
2019-01-08 22:53:45 +00:00
projInformer cache . SharedIndexInformer
2018-12-18 02:23:35 +00:00
appStateManager AppStateManager
stateCache statecache . LiveStateCache
statusRefreshTimeout time . Duration
2019-07-25 02:26:09 +00:00
selfHealTimeout time . Duration
2019-07-13 00:17:23 +00:00
repoClientset apiclient . Clientset
2018-12-18 02:23:35 +00:00
db db . ArgoDB
2018-12-24 06:25:04 +00:00
settingsMgr * settings_util . SettingsManager
2019-07-15 20:34:26 +00:00
refreshRequestedApps map [ string ] CompareWith
2018-12-18 02:23:35 +00:00
refreshRequestedAppsMutex * sync . Mutex
2019-02-22 23:20:34 +00:00
metricsServer * metrics . MetricsServer
2019-09-10 16:56:48 +00:00
kubectlSemaphore * semaphore . Weighted
2018-02-26 15:43:35 +00:00
}
type ApplicationControllerConfig struct {
InstanceID string
Namespace string
2018-02-20 23:23:37 +00:00
}
// NewApplicationController creates new instance of ApplicationController.
2018-02-22 18:56:14 +00:00
func NewApplicationController (
2018-04-11 19:53:33 +00:00
namespace string ,
2019-01-08 22:53:45 +00:00
settingsMgr * settings_util . SettingsManager ,
2018-02-22 18:56:14 +00:00
kubeClientset kubernetes . Interface ,
applicationClientset appclientset . Interface ,
2019-07-13 00:17:23 +00:00
repoClientset apiclient . Clientset ,
2019-02-13 23:20:40 +00:00
argoCache * argocache . Cache ,
2019-09-11 23:37:00 +00:00
kubectl kube . Kubectl ,
2018-02-26 15:43:35 +00:00
appResyncPeriod time . Duration ,
2019-07-25 02:26:09 +00:00
selfHealTimeout time . Duration ,
2019-05-28 18:41:02 +00:00
metricsPort int ,
2019-09-10 16:56:48 +00:00
kubectlParallelismLimit int64 ,
2019-01-08 22:53:45 +00:00
) ( * ApplicationController , error ) {
2018-11-09 17:58:07 +00:00
db := db . NewDB ( namespace , settingsMgr , kubeClientset )
2018-09-11 21:28:53 +00:00
ctrl := ApplicationController {
2019-02-13 23:20:40 +00:00
cache : argoCache ,
2018-12-18 02:23:35 +00:00
namespace : namespace ,
kubeClientset : kubeClientset ,
2019-09-11 23:37:00 +00:00
kubectl : kubectl ,
2018-12-18 02:23:35 +00:00
applicationClientset : applicationClientset ,
repoClientset : repoClientset ,
appRefreshQueue : workqueue . NewRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) ) ,
appOperationQueue : workqueue . NewRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) ) ,
db : db ,
statusRefreshTimeout : appResyncPeriod ,
2019-07-15 20:34:26 +00:00
refreshRequestedApps : make ( map [ string ] CompareWith ) ,
2018-12-18 02:23:35 +00:00
refreshRequestedAppsMutex : & sync . Mutex { } ,
2018-12-20 21:16:01 +00:00
auditLogger : argo . NewAuditLogger ( namespace , kubeClientset , "argocd-application-controller" ) ,
2018-12-24 06:25:04 +00:00
settingsMgr : settingsMgr ,
2019-07-25 02:26:09 +00:00
selfHealTimeout : selfHealTimeout ,
2018-11-28 21:38:02 +00:00
}
2019-09-10 16:56:48 +00:00
if kubectlParallelismLimit > 0 {
ctrl . kubectlSemaphore = semaphore . NewWeighted ( kubectlParallelismLimit )
}
2019-09-11 23:37:00 +00:00
kubectl . SetOnKubectlRun ( ctrl . onKubectlRun )
2019-08-19 15:14:48 +00:00
appInformer , appLister , err := ctrl . newApplicationInformerAndLister ( )
if err != nil {
return nil , err
}
2019-01-08 22:53:45 +00:00
projInformer := v1alpha1 . NewAppProjectInformer ( applicationClientset , namespace , appResyncPeriod , cache . Indexers { } )
2019-05-28 18:41:02 +00:00
metricsAddr := fmt . Sprintf ( "0.0.0.0:%d" , metricsPort )
2019-06-19 19:00:01 +00:00
ctrl . metricsServer = metrics . NewMetricsServer ( metricsAddr , appLister , func ( ) error {
_ , err := kubeClientset . Discovery ( ) . ServerVersion ( )
return err
} )
2019-09-11 23:37:00 +00:00
stateCache := statecache . NewLiveStateCache ( db , appInformer , ctrl . settingsMgr , kubectl , ctrl . metricsServer , ctrl . handleObjectUpdated )
appStateManager := NewAppStateManager ( db , applicationClientset , repoClientset , namespace , kubectl , ctrl . settingsMgr , stateCache , projInformer , ctrl . metricsServer )
2018-11-28 21:38:02 +00:00
ctrl . appInformer = appInformer
2019-02-22 23:20:34 +00:00
ctrl . appLister = appLister
2019-01-08 22:53:45 +00:00
ctrl . projInformer = projInformer
2018-11-28 21:38:02 +00:00
ctrl . appStateManager = appStateManager
ctrl . stateCache = stateCache
2019-04-29 19:42:59 +00:00
2019-01-08 22:53:45 +00:00
return & ctrl , nil
2018-02-20 23:23:37 +00:00
}
2019-09-10 16:56:48 +00:00
func ( ctrl * ApplicationController ) onKubectlRun ( command string ) ( util . Closer , error ) {
ctrl . metricsServer . IncKubectlExec ( command )
if ctrl . kubectlSemaphore != nil {
if err := ctrl . kubectlSemaphore . Acquire ( context . Background ( ) , 1 ) ; err != nil {
return nil , err
}
ctrl . metricsServer . IncKubectlExecPending ( command )
}
return util . NewCloser ( func ( ) error {
if ctrl . kubectlSemaphore != nil {
ctrl . kubectlSemaphore . Release ( 1 )
ctrl . metricsServer . DecKubectlExecPending ( command )
}
return nil
} ) , nil
}
2019-05-13 18:17:32 +00:00
func isSelfReferencedApp ( app * appv1 . Application , ref v1 . ObjectReference ) bool {
gvk := ref . GroupVersionKind ( )
return ref . UID == app . UID &&
ref . Name == app . Name &&
ref . Namespace == app . Namespace &&
gvk . Group == application . Group &&
gvk . Kind == application . ApplicationKind
2019-05-02 21:38:37 +00:00
}
2019-08-19 15:14:48 +00:00
func ( ctrl * ApplicationController ) getAppProj ( app * appv1 . Application ) ( * appv1 . AppProject , error ) {
return argo . GetAppProject ( & app . Spec , applisters . NewAppProjectLister ( ctrl . projInformer . GetIndexer ( ) ) , ctrl . namespace )
}
2019-05-13 18:17:32 +00:00
2019-08-19 15:14:48 +00:00
func ( ctrl * ApplicationController ) handleObjectUpdated ( managedByApp map [ string ] bool , ref v1 . ObjectReference ) {
// if namespaced resource is not managed by any app it might be orphaned resource of some other apps
if len ( managedByApp ) == 0 && ref . Namespace != "" {
// retrieve applications which monitor orphaned resources in the same namespace and refresh them unless resource is blacklisted in app project
if objs , err := ctrl . appInformer . GetIndexer ( ) . ByIndex ( orphanedIndex , ref . Namespace ) ; err == nil {
for i := range objs {
app , ok := objs [ i ] . ( * appv1 . Application )
if ! ok {
continue
}
2019-08-20 22:48:37 +00:00
// exclude resource unless it is permitted in the app project. If project is not permitted then it is not controlled by the user and there is no point showing the warning.
if proj , err := ctrl . getAppProj ( app ) ; err == nil && proj . IsResourcePermitted ( metav1 . GroupKind { Group : ref . GroupVersionKind ( ) . Group , Kind : ref . Kind } , true ) &&
! isKnownOrphanedResourceExclusion ( kube . NewResourceKey ( ref . GroupVersionKind ( ) . Group , ref . GroupVersionKind ( ) . Kind , ref . Namespace , ref . Name ) ) {
2019-08-19 15:14:48 +00:00
managedByApp [ app . Name ] = false
}
}
}
2019-05-13 18:17:32 +00:00
}
2019-08-19 15:14:48 +00:00
for appName , isManagedResource := range managedByApp {
skipForceRefresh := false
obj , exists , err := ctrl . appInformer . GetIndexer ( ) . GetByKey ( ctrl . namespace + "/" + appName )
if app , ok := obj . ( * appv1 . Application ) ; exists && err == nil && ok && isSelfReferencedApp ( app , ref ) {
// Don't force refresh app if related resource is application itself. This prevents infinite reconciliation loop.
skipForceRefresh = true
}
2019-05-13 18:17:32 +00:00
2019-08-19 15:14:48 +00:00
if ! skipForceRefresh {
level := ComparisonWithNothing
if isManagedResource {
level = CompareWithRecent
}
ctrl . requestAppRefresh ( appName , level )
2019-07-15 20:34:26 +00:00
}
2019-08-19 15:14:48 +00:00
ctrl . appRefreshQueue . Add ( fmt . Sprintf ( "%s/%s" , ctrl . namespace , appName ) )
2019-05-01 16:42:45 +00:00
}
}
2019-04-02 15:48:34 +00:00
func ( ctrl * ApplicationController ) setAppManagedResources ( a * appv1 . Application , comparisonResult * comparisonResult ) ( * appv1 . ApplicationTree , error ) {
2019-07-03 21:17:58 +00:00
managedResources , err := ctrl . managedResources ( comparisonResult )
2018-11-28 21:38:02 +00:00
if err != nil {
2019-04-02 15:48:34 +00:00
return nil , err
2019-02-13 23:20:40 +00:00
}
2019-04-02 15:48:34 +00:00
tree , err := ctrl . getResourceTree ( a , managedResources )
2019-02-13 23:20:40 +00:00
if err != nil {
2019-04-02 15:48:34 +00:00
return nil , err
2018-11-17 01:10:04 +00:00
}
2019-02-13 23:20:40 +00:00
err = ctrl . cache . SetAppResourcesTree ( a . Name , tree )
if err != nil {
2019-04-02 15:48:34 +00:00
return nil , err
2019-02-13 23:20:40 +00:00
}
2019-04-02 15:48:34 +00:00
return tree , ctrl . cache . SetAppManagedResources ( a . Name , managedResources )
2019-02-13 23:20:40 +00:00
}
2019-08-20 22:48:37 +00:00
// returns true of given resources exist in the namespace by default and not managed by the user
func isKnownOrphanedResourceExclusion ( key kube . ResourceKey ) bool {
if key . Namespace == "default" && key . Group == "" && key . Kind == kube . ServiceKind && key . Name == "kubernetes" {
return true
}
if key . Group == "" && key . Kind == kube . ServiceAccountKind && key . Name == "default" {
return true
}
return false
}
2019-04-02 15:48:34 +00:00
func ( ctrl * ApplicationController ) getResourceTree ( a * appv1 . Application , managedResources [ ] * appv1 . ResourceDiff ) ( * appv1 . ApplicationTree , error ) {
2019-03-30 03:59:25 +00:00
nodes := make ( [ ] appv1 . ResourceNode , 0 )
2019-08-19 15:14:48 +00:00
proj , err := argo . GetAppProject ( & a . Spec , applisters . NewAppProjectLister ( ctrl . projInformer . GetIndexer ( ) ) , ctrl . namespace )
if err != nil {
return nil , err
}
orphanedNodesMap := make ( map [ kube . ResourceKey ] appv1 . ResourceNode )
warnOrphaned := true
if proj . Spec . OrphanedResources != nil {
orphanedNodesMap , err = ctrl . stateCache . GetNamespaceTopLevelResources ( a . Spec . Destination . Server , a . Spec . Destination . Namespace )
if err != nil {
return nil , err
}
warnOrphaned = proj . Spec . OrphanedResources . IsWarn ( )
}
2019-03-14 21:54:34 +00:00
for i := range managedResources {
managedResource := managedResources [ i ]
2019-08-19 15:14:48 +00:00
delete ( orphanedNodesMap , kube . NewResourceKey ( managedResource . Group , managedResource . Kind , managedResource . Namespace , managedResource . Name ) )
2019-03-14 21:54:34 +00:00
var live = & unstructured . Unstructured { }
err := json . Unmarshal ( [ ] byte ( managedResource . LiveState ) , & live )
if err != nil {
return nil , err
}
var target = & unstructured . Unstructured { }
2019-03-14 22:52:50 +00:00
err = json . Unmarshal ( [ ] byte ( managedResource . TargetState ) , & target )
2019-03-14 21:54:34 +00:00
if err != nil {
return nil , err
}
2019-03-30 03:59:25 +00:00
if live == nil {
nodes = append ( nodes , appv1 . ResourceNode {
ResourceRef : appv1 . ResourceRef {
Version : target . GroupVersionKind ( ) . Version ,
Name : managedResource . Name ,
Kind : managedResource . Kind ,
Group : managedResource . Group ,
Namespace : managedResource . Namespace ,
} ,
} )
} else {
2019-08-19 15:14:48 +00:00
err := ctrl . stateCache . IterateHierarchy ( a . Spec . Destination . Server , kube . GetResourceKey ( live ) , func ( child appv1 . ResourceNode , appName string ) {
2019-03-30 03:59:25 +00:00
nodes = append ( nodes , child )
} )
2018-11-17 01:10:04 +00:00
if err != nil {
return nil , err
}
2018-11-28 21:38:02 +00:00
}
}
2019-08-19 15:14:48 +00:00
orphanedNodes := make ( [ ] appv1 . ResourceNode , 0 )
for k := range orphanedNodesMap {
2019-08-20 22:48:37 +00:00
if k . Namespace != "" && proj . IsResourcePermitted ( metav1 . GroupKind { Group : k . Group , Kind : k . Kind } , true ) && ! isKnownOrphanedResourceExclusion ( k ) {
2019-08-19 15:14:48 +00:00
err := ctrl . stateCache . IterateHierarchy ( a . Spec . Destination . Server , k , func ( child appv1 . ResourceNode , appName string ) {
belongToAnotherApp := false
if appName != "" {
if _ , exists , err := ctrl . appInformer . GetIndexer ( ) . GetByKey ( ctrl . namespace + "/" + appName ) ; exists && err == nil {
belongToAnotherApp = true
}
}
if ! belongToAnotherApp {
orphanedNodes = append ( orphanedNodes , child )
}
} )
if err != nil {
return nil , err
}
}
}
2019-08-19 18:12:07 +00:00
var conditions [ ] appv1 . ApplicationCondition
2019-08-19 15:14:48 +00:00
if len ( orphanedNodes ) > 0 && warnOrphaned {
2019-08-19 18:12:07 +00:00
conditions = [ ] appv1 . ApplicationCondition { {
2019-08-19 15:14:48 +00:00
Type : appv1 . ApplicationConditionOrphanedResourceWarning ,
Message : fmt . Sprintf ( "Application has %d orphaned resources" , len ( orphanedNodes ) ) ,
2019-08-19 18:12:07 +00:00
} }
2019-08-19 15:14:48 +00:00
}
2019-08-19 18:12:07 +00:00
a . Status . SetConditions ( conditions , map [ appv1 . ApplicationConditionType ] bool { appv1 . ApplicationConditionOrphanedResourceWarning : true } )
2019-08-19 15:14:48 +00:00
return & appv1 . ApplicationTree { Nodes : nodes , OrphanedNodes : orphanedNodes } , nil
2018-11-28 21:38:02 +00:00
}
2019-07-03 21:17:58 +00:00
func ( ctrl * ApplicationController ) managedResources ( comparisonResult * comparisonResult ) ( [ ] * appv1 . ResourceDiff , error ) {
2019-02-22 21:19:10 +00:00
items := make ( [ ] * appv1 . ResourceDiff , len ( comparisonResult . managedResources ) )
for i := range comparisonResult . managedResources {
res := comparisonResult . managedResources [ i ]
2018-12-04 01:39:55 +00:00
item := appv1 . ResourceDiff {
2018-11-28 21:38:02 +00:00
Namespace : res . Namespace ,
Name : res . Name ,
Group : res . Group ,
Kind : res . Kind ,
2019-08-26 20:50:19 +00:00
Hook : res . Hook ,
2018-11-28 21:38:02 +00:00
}
2018-12-07 23:40:55 +00:00
target := res . Target
live := res . Live
resDiff := res . Diff
if res . Kind == kube . SecretKind && res . Group == "" {
var err error
2019-02-12 01:12:00 +00:00
target , live , err = diff . HideSecretData ( res . Target , res . Live )
2018-12-07 23:40:55 +00:00
if err != nil {
return nil , err
}
2019-02-22 21:19:10 +00:00
resDiff = * diff . Diff ( target , live , comparisonResult . diffNormalizer )
2018-12-07 23:40:55 +00:00
}
2018-11-28 21:38:02 +00:00
if live != nil {
data , err := json . Marshal ( live )
if err != nil {
return nil , err
2018-11-17 01:10:04 +00:00
}
2018-11-28 21:38:02 +00:00
item . LiveState = string ( data )
} else {
item . LiveState = "null"
}
if target != nil {
data , err := json . Marshal ( target )
if err != nil {
return nil , err
2018-11-17 01:10:04 +00:00
}
2018-11-28 21:38:02 +00:00
item . TargetState = string ( data )
} else {
item . TargetState = "null"
2018-11-17 01:10:04 +00:00
}
2018-12-07 23:40:55 +00:00
jsonDiff , err := resDiff . JSONFormat ( )
2018-11-28 21:38:02 +00:00
if err != nil {
return nil , err
}
item . Diff = jsonDiff
items [ i ] = & item
2018-11-17 01:10:04 +00:00
}
2019-02-13 23:20:40 +00:00
return items , nil
2018-11-17 01:10:04 +00:00
}
2018-02-20 23:23:37 +00:00
// Run starts the Application CRD controller.
2018-05-11 18:50:32 +00:00
func ( ctrl * ApplicationController ) Run ( ctx context . Context , statusProcessors int , operationProcessors int ) {
2018-02-20 23:23:37 +00:00
defer runtime . HandleCrash ( )
2018-05-11 18:50:32 +00:00
defer ctrl . appRefreshQueue . ShutDown ( )
2018-02-20 23:23:37 +00:00
go ctrl . appInformer . Run ( ctx . Done ( ) )
2019-01-08 22:53:45 +00:00
go ctrl . projInformer . Run ( ctx . Done ( ) )
2018-02-20 23:23:37 +00:00
2019-01-08 22:53:45 +00:00
if ! cache . WaitForCacheSync ( ctx . Done ( ) , ctrl . appInformer . HasSynced , ctrl . projInformer . HasSynced ) {
2018-02-20 23:23:37 +00:00
log . Error ( "Timed out waiting for caches to sync" )
return
}
2019-06-21 22:59:05 +00:00
go func ( ) { errors . CheckError ( ctrl . stateCache . Run ( ctx ) ) } ( )
2019-02-22 23:20:34 +00:00
go func ( ) { errors . CheckError ( ctrl . metricsServer . ListenAndServe ( ) ) } ( )
2018-09-10 15:20:17 +00:00
2018-05-11 18:50:32 +00:00
for i := 0 ; i < statusProcessors ; i ++ {
go wait . Until ( func ( ) {
for ctrl . processAppRefreshQueueItem ( ) {
}
} , time . Second , ctx . Done ( ) )
}
for i := 0 ; i < operationProcessors ; i ++ {
go wait . Until ( func ( ) {
for ctrl . processAppOperationQueueItem ( ) {
}
} , time . Second , ctx . Done ( ) )
2018-02-20 23:23:37 +00:00
}
<- ctx . Done ( )
}
2019-07-15 20:34:26 +00:00
func ( ctrl * ApplicationController ) requestAppRefresh ( appName string , compareWith CompareWith ) {
2018-12-18 02:23:35 +00:00
ctrl . refreshRequestedAppsMutex . Lock ( )
defer ctrl . refreshRequestedAppsMutex . Unlock ( )
2019-07-15 20:34:26 +00:00
ctrl . refreshRequestedApps [ appName ] = compareWith . Max ( ctrl . refreshRequestedApps [ appName ] )
2018-04-11 19:53:33 +00:00
}
2019-07-15 20:34:26 +00:00
func ( ctrl * ApplicationController ) isRefreshRequested ( appName string ) ( bool , CompareWith ) {
2018-12-18 02:23:35 +00:00
ctrl . refreshRequestedAppsMutex . Lock ( )
defer ctrl . refreshRequestedAppsMutex . Unlock ( )
2019-07-15 20:34:26 +00:00
level , ok := ctrl . refreshRequestedApps [ appName ]
2018-04-11 19:53:33 +00:00
if ok {
2018-12-18 02:23:35 +00:00
delete ( ctrl . refreshRequestedApps , appName )
2018-04-11 19:53:33 +00:00
}
2019-07-15 20:34:26 +00:00
return ok , level
2018-04-11 19:53:33 +00:00
}
2018-05-16 23:30:28 +00:00
func ( ctrl * ApplicationController ) processAppOperationQueueItem ( ) ( processNext bool ) {
2018-05-11 18:50:32 +00:00
appKey , shutdown := ctrl . appOperationQueue . Get ( )
if shutdown {
2018-05-16 23:30:28 +00:00
processNext = false
return
2018-05-11 18:50:32 +00:00
}
2018-11-19 20:25:45 +00:00
processNext = true
2018-05-16 23:30:28 +00:00
defer func ( ) {
if r := recover ( ) ; r != nil {
log . Errorf ( "Recovered from panic: %+v\n%s" , r , debug . Stack ( ) )
}
ctrl . appOperationQueue . Done ( appKey )
} ( )
2018-05-11 18:50:32 +00:00
obj , exists , err := ctrl . appInformer . GetIndexer ( ) . GetByKey ( appKey . ( string ) )
if err != nil {
log . Errorf ( "Failed to get application '%s' from informer index: %+v" , appKey , err )
2018-05-16 23:30:28 +00:00
return
2018-05-11 18:50:32 +00:00
}
if ! exists {
// This happens after app was deleted, but the work queue still had an entry for it.
2018-05-16 23:30:28 +00:00
return
2018-05-11 18:50:32 +00:00
}
app , ok := obj . ( * appv1 . Application )
if ! ok {
log . Warnf ( "Key '%s' in index is not an application" , appKey )
2018-05-16 23:30:28 +00:00
return
}
if app . Operation != nil {
ctrl . processRequestedAppOperation ( app )
} else if app . DeletionTimestamp != nil && app . CascadedDeletion ( ) {
2018-11-19 20:25:45 +00:00
err = ctrl . finalizeApplicationDeletion ( app )
if err != nil {
ctrl . setAppCondition ( app , appv1 . ApplicationCondition {
Type : appv1 . ApplicationConditionDeletionError ,
Message : err . Error ( ) ,
} )
message := fmt . Sprintf ( "Unable to delete application resources: %v" , err . Error ( ) )
ctrl . auditLogger . LogAppEvent ( app , argo . EventInfo { Reason : argo . EventReasonStatusRefreshed , Type : v1 . EventTypeWarning } , message )
}
2018-05-11 18:50:32 +00:00
}
2018-05-16 23:30:28 +00:00
return
}
2019-05-06 19:49:29 +00:00
func shouldBeDeleted ( app * appv1 . Application , obj * unstructured . Unstructured ) bool {
2019-05-13 18:17:32 +00:00
return ! kube . IsCRD ( obj ) && ! isSelfReferencedApp ( app , kube . GetObjectRef ( obj ) )
2019-05-06 19:49:29 +00:00
}
2018-11-19 20:25:45 +00:00
func ( ctrl * ApplicationController ) finalizeApplicationDeletion ( app * appv1 . Application ) error {
2018-09-24 15:52:43 +00:00
logCtx := log . WithField ( "application" , app . Name )
logCtx . Infof ( "Deleting resources" )
2018-05-16 23:30:28 +00:00
// Get refreshed application info, since informer app copy might be stale
app , err := ctrl . applicationClientset . ArgoprojV1alpha1 ( ) . Applications ( app . Namespace ) . Get ( app . Name , metav1 . GetOptions { } )
if err != nil {
2019-02-22 23:20:34 +00:00
if ! apierr . IsNotFound ( err ) {
2018-09-24 15:52:43 +00:00
logCtx . Errorf ( "Unable to get refreshed application info prior deleting resources: %v" , err )
2018-05-16 23:30:28 +00:00
}
2018-11-19 20:25:45 +00:00
return nil
2018-05-15 07:36:11 +00:00
}
2018-11-30 19:29:12 +00:00
2018-11-30 18:32:31 +00:00
objsMap , err := ctrl . stateCache . GetManagedLiveObjs ( app , [ ] * unstructured . Unstructured { } )
2018-05-16 23:30:28 +00:00
if err != nil {
2018-11-19 20:25:45 +00:00
return err
}
2018-11-28 21:38:02 +00:00
objs := make ( [ ] * unstructured . Unstructured , 0 )
for k := range objsMap {
2019-05-06 19:49:29 +00:00
if objsMap [ k ] . GetDeletionTimestamp ( ) == nil && shouldBeDeleted ( app , objsMap [ k ] ) {
2019-04-08 22:08:48 +00:00
objs = append ( objs , objsMap [ k ] )
}
2018-11-28 21:38:02 +00:00
}
2019-04-29 19:42:59 +00:00
cluster , err := ctrl . db . GetCluster ( context . Background ( ) , app . Spec . Destination . Server )
if err != nil {
return err
}
config := metrics . AddMetricsTransportWrapper ( ctrl . metricsServer , app , cluster . RESTConfig ( ) )
2018-11-28 21:38:02 +00:00
err = util . RunAllAsync ( len ( objs ) , func ( i int ) error {
obj := objs [ i ]
2019-04-29 19:42:59 +00:00
return ctrl . kubectl . DeleteResource ( config , obj . GroupVersionKind ( ) , obj . GetName ( ) , obj . GetNamespace ( ) , false )
2018-11-28 21:38:02 +00:00
} )
2018-11-19 20:25:45 +00:00
if err != nil {
return err
}
2018-11-28 21:38:02 +00:00
2018-11-30 18:32:31 +00:00
objsMap , err = ctrl . stateCache . GetManagedLiveObjs ( app , [ ] * unstructured . Unstructured { } )
2018-11-28 21:38:02 +00:00
if err != nil {
return err
}
2019-05-06 19:49:29 +00:00
for k , obj := range objsMap {
if ! shouldBeDeleted ( app , obj ) {
delete ( objsMap , k )
}
}
2018-11-28 21:38:02 +00:00
if len ( objsMap ) > 0 {
logCtx . Infof ( "%d objects remaining for deletion" , len ( objsMap ) )
2018-11-19 20:25:45 +00:00
return nil
}
2019-02-13 23:20:40 +00:00
err = ctrl . cache . SetAppManagedResources ( app . Name , nil )
if err != nil {
return err
}
err = ctrl . cache . SetAppResourcesTree ( app . Name , nil )
if err != nil {
return err
}
2018-11-19 20:25:45 +00:00
app . SetCascadedDeletion ( false )
var patch [ ] byte
patch , _ = json . Marshal ( map [ string ] interface { } {
"metadata" : map [ string ] interface { } {
"finalizers" : app . Finalizers ,
} ,
} )
_ , err = ctrl . applicationClientset . ArgoprojV1alpha1 ( ) . Applications ( app . Namespace ) . Patch ( app . Name , types . MergePatchType , patch )
if err != nil {
return err
}
2018-11-28 21:38:02 +00:00
2018-11-19 20:25:45 +00:00
logCtx . Info ( "Successfully deleted resources" )
return nil
2018-05-16 23:30:28 +00:00
}
func ( ctrl * ApplicationController ) setAppCondition ( app * appv1 . Application , condition appv1 . ApplicationCondition ) {
index := - 1
for i , exiting := range app . Status . Conditions {
if exiting . Type == condition . Type {
index = i
break
}
}
if index > - 1 {
app . Status . Conditions [ index ] = condition
} else {
app . Status . Conditions = append ( app . Status . Conditions , condition )
}
var patch [ ] byte
patch , err := json . Marshal ( map [ string ] interface { } {
"status" : map [ string ] interface { } {
"conditions" : app . Status . Conditions ,
} ,
} )
if err == nil {
_ , err = ctrl . applicationClientset . ArgoprojV1alpha1 ( ) . Applications ( app . Namespace ) . Patch ( app . Name , types . MergePatchType , patch )
}
if err != nil {
log . Errorf ( "Unable to set application condition: %v" , err )
}
}
func ( ctrl * ApplicationController ) processRequestedAppOperation ( app * appv1 . Application ) {
2018-09-11 21:28:53 +00:00
logCtx := log . WithField ( "application" , app . Name )
2018-07-07 07:54:06 +00:00
var state * appv1 . OperationState
2018-05-15 07:36:11 +00:00
// Recover from any unexpected panics and automatically set the status to be failed
defer func ( ) {
if r := recover ( ) ; r != nil {
2018-09-11 21:28:53 +00:00
logCtx . Errorf ( "Recovered from panic: %+v\n%s" , r , debug . Stack ( ) )
2018-05-15 18:35:10 +00:00
state . Phase = appv1 . OperationError
2018-05-15 07:36:11 +00:00
if rerr , ok := r . ( error ) ; ok {
state . Message = rerr . Error ( )
2018-05-11 18:50:32 +00:00
} else {
2018-05-15 07:36:11 +00:00
state . Message = fmt . Sprintf ( "%v" , r )
2018-05-11 18:50:32 +00:00
}
2018-07-14 00:13:31 +00:00
ctrl . setOperationState ( app , state )
2018-05-15 07:36:11 +00:00
}
} ( )
2018-07-14 00:13:31 +00:00
if isOperationInProgress ( app ) {
// If we get here, we are about process an operation but we notice it is already in progress.
// We need to detect if the app object we pulled off the informer is stale and doesn't
// reflect the fact that the operation is completed. We don't want to perform the operation
// again. To detect this, always retrieve the latest version to ensure it is not stale.
2018-05-15 07:36:11 +00:00
freshApp , err := ctrl . applicationClientset . ArgoprojV1alpha1 ( ) . Applications ( ctrl . namespace ) . Get ( app . ObjectMeta . Name , metav1 . GetOptions { } )
if err != nil {
2018-09-11 21:28:53 +00:00
logCtx . Errorf ( "Failed to retrieve latest application state: %v" , err )
2018-05-16 23:30:28 +00:00
return
2018-05-15 07:36:11 +00:00
}
2018-07-14 00:13:31 +00:00
if ! isOperationInProgress ( freshApp ) {
2018-09-11 21:28:53 +00:00
logCtx . Infof ( "Skipping operation on stale application state" )
2018-05-16 23:30:28 +00:00
return
2018-05-15 07:36:11 +00:00
}
2018-07-07 07:54:06 +00:00
app = freshApp
state = app . Status . OperationState . DeepCopy ( )
2018-09-11 21:28:53 +00:00
logCtx . Infof ( "Resuming in-progress operation. phase: %s, message: %s" , state . Phase , state . Message )
2018-05-15 07:36:11 +00:00
} else {
2018-07-07 07:54:06 +00:00
state = & appv1 . OperationState { Phase : appv1 . OperationRunning , Operation : * app . Operation , StartedAt : metav1 . Now ( ) }
2018-07-14 00:13:31 +00:00
ctrl . setOperationState ( app , state )
2018-09-11 21:28:53 +00:00
logCtx . Infof ( "Initialized new operation: %v" , * app . Operation )
2018-05-15 07:36:11 +00:00
}
2019-06-18 02:09:43 +00:00
2018-07-07 07:54:06 +00:00
ctrl . appStateManager . SyncAppState ( app , state )
2018-07-14 00:13:31 +00:00
if state . Phase == appv1 . OperationRunning {
// It's possible for an app to be terminated while we were operating on it. We do not want
// to clobber the Terminated state with Running. Get the latest app state to check for this.
freshApp , err := ctrl . applicationClientset . ArgoprojV1alpha1 ( ) . Applications ( ctrl . namespace ) . Get ( app . ObjectMeta . Name , metav1 . GetOptions { } )
if err == nil {
if freshApp . Status . OperationState != nil && freshApp . Status . OperationState . Phase == appv1 . OperationTerminating {
state . Phase = appv1 . OperationTerminating
state . Message = "operation is terminating"
// after this, we will get requeued to the workqueue, but next time the
// SyncAppState will operate in a Terminating phase, allowing the worker to perform
// cleanup (e.g. delete jobs, workflows, etc...)
}
}
}
ctrl . setOperationState ( app , state )
2018-07-12 19:39:46 +00:00
if state . Phase . Completed ( ) {
// if we just completed an operation, force a refresh so that UI will report up-to-date
// sync/health information
2019-07-25 02:26:09 +00:00
if key , err := cache . MetaNamespaceKeyFunc ( app ) ; err == nil {
// force app refresh with using CompareWithLatest comparison type and trigger app reconciliation loop
ctrl . requestAppRefresh ( app . Name , CompareWithLatest )
ctrl . appRefreshQueue . Add ( key )
} else {
logCtx . Warnf ( "Fails to requeue application: %v" , err )
}
2018-07-12 19:39:46 +00:00
}
2018-05-11 18:50:32 +00:00
}
2018-07-14 00:13:31 +00:00
func ( ctrl * ApplicationController ) setOperationState ( app * appv1 . Application , state * appv1 . OperationState ) {
2018-11-28 21:38:02 +00:00
util . RetryUntilSucceed ( func ( ) error {
2018-05-15 07:36:11 +00:00
if state . Phase == "" {
// expose any bugs where we neglect to set phase
panic ( "no phase was set" )
}
2018-07-14 00:13:31 +00:00
if state . Phase . Completed ( ) {
now := metav1 . Now ( )
state . FinishedAt = & now
2018-05-15 07:36:11 +00:00
}
2018-07-14 00:13:31 +00:00
patch := map [ string ] interface { } {
2018-05-11 18:50:32 +00:00
"status" : map [ string ] interface { } {
"operationState" : state ,
} ,
2018-07-14 00:13:31 +00:00
}
if state . Phase . Completed ( ) {
// If operation is completed, clear the operation field to indicate no operation is
// in progress.
patch [ "operation" ] = nil
}
if reflect . DeepEqual ( app . Status . OperationState , state ) {
log . Infof ( "No operation updates necessary to '%s'. Skipping patch" , app . Name )
return nil
}
patchJSON , err := json . Marshal ( patch )
2018-05-15 07:36:11 +00:00
if err != nil {
return err
}
appClient := ctrl . applicationClientset . ArgoprojV1alpha1 ( ) . Applications ( ctrl . namespace )
2018-07-14 00:13:31 +00:00
_ , err = appClient . Patch ( app . Name , types . MergePatchType , patchJSON )
2018-05-15 07:36:11 +00:00
if err != nil {
2019-05-30 19:39:54 +00:00
// Stop retrying updating deleted application
if apierr . IsNotFound ( err ) {
return nil
}
2018-05-15 07:36:11 +00:00
return err
2018-05-11 18:50:32 +00:00
}
2018-07-07 07:54:06 +00:00
log . Infof ( "updated '%s' operation (phase: %s)" , app . Name , state . Phase )
2018-09-24 15:52:43 +00:00
if state . Phase . Completed ( ) {
eventInfo := argo . EventInfo { Reason : argo . EventReasonOperationCompleted }
2018-11-27 21:38:00 +00:00
var messages [ ] string
if state . Operation . Sync != nil && len ( state . Operation . Sync . Resources ) > 0 {
messages = [ ] string { "Partial sync operation" }
} else {
messages = [ ] string { "Sync operation" }
}
if state . SyncResult != nil {
messages = append ( messages , "to" , state . SyncResult . Revision )
}
2018-09-24 15:52:43 +00:00
if state . Phase . Successful ( ) {
eventInfo . Type = v1 . EventTypeNormal
2018-11-27 21:38:00 +00:00
messages = append ( messages , "succeeded" )
2018-09-24 15:52:43 +00:00
} else {
eventInfo . Type = v1 . EventTypeWarning
2018-11-27 21:38:00 +00:00
messages = append ( messages , "failed:" , state . Message )
2018-09-24 15:52:43 +00:00
}
2018-11-27 21:38:00 +00:00
ctrl . auditLogger . LogAppEvent ( app , eventInfo , strings . Join ( messages , " " ) )
2019-02-22 23:20:34 +00:00
ctrl . metricsServer . IncSync ( app , state )
2018-09-24 15:52:43 +00:00
}
2018-05-15 07:36:11 +00:00
return nil
2018-05-11 18:50:32 +00:00
} , "Update application operation state" , context . Background ( ) , updateOperationStateTimeout )
}
2018-05-16 23:30:28 +00:00
func ( ctrl * ApplicationController ) processAppRefreshQueueItem ( ) ( processNext bool ) {
2018-05-11 18:50:32 +00:00
appKey , shutdown := ctrl . appRefreshQueue . Get ( )
2018-02-20 23:23:37 +00:00
if shutdown {
2018-05-16 23:30:28 +00:00
processNext = false
return
2018-02-20 23:23:37 +00:00
}
2018-07-12 19:39:46 +00:00
processNext = true
2018-05-16 23:30:28 +00:00
defer func ( ) {
if r := recover ( ) ; r != nil {
log . Errorf ( "Recovered from panic: %+v\n%s" , r , debug . Stack ( ) )
}
ctrl . appRefreshQueue . Done ( appKey )
} ( )
2018-02-22 18:56:14 +00:00
obj , exists , err := ctrl . appInformer . GetIndexer ( ) . GetByKey ( appKey . ( string ) )
if err != nil {
log . Errorf ( "Failed to get application '%s' from informer index: %+v" , appKey , err )
2018-05-16 23:30:28 +00:00
return
2018-02-22 18:56:14 +00:00
}
if ! exists {
// This happens after app was deleted, but the work queue still had an entry for it.
2018-05-16 23:30:28 +00:00
return
2018-02-22 18:56:14 +00:00
}
2018-12-04 10:52:57 +00:00
origApp , ok := obj . ( * appv1 . Application )
2018-02-22 18:56:14 +00:00
if ! ok {
log . Warnf ( "Key '%s' in index is not an application" , appKey )
2018-05-16 23:30:28 +00:00
return
2018-02-22 18:56:14 +00:00
}
2019-07-15 20:34:26 +00:00
needRefresh , refreshType , comparisonLevel := ctrl . needRefreshAppStatus ( origApp , ctrl . statusRefreshTimeout )
2018-12-18 02:23:35 +00:00
if ! needRefresh {
2018-07-12 02:12:30 +00:00
return
}
2019-03-14 21:54:34 +00:00
2019-01-07 21:46:11 +00:00
startTime := time . Now ( )
defer func ( ) {
2019-03-28 20:37:53 +00:00
reconcileDuration := time . Since ( startTime )
2019-02-27 07:09:46 +00:00
ctrl . metricsServer . IncReconcile ( origApp , reconcileDuration )
2019-09-30 21:33:44 +00:00
logCtx := log . WithFields ( log . Fields {
"application" : origApp . Name ,
"time_ms" : reconcileDuration . Seconds ( ) * 1e3 ,
"level" : comparisonLevel ,
"dest-server" : origApp . Spec . Destination . Server ,
"dest-namespace" : origApp . Spec . Destination . Namespace ,
} )
2019-02-13 18:11:01 +00:00
logCtx . Info ( "Reconciliation completed" )
2019-01-07 21:46:11 +00:00
} ( )
2019-03-14 21:54:34 +00:00
2019-03-04 08:56:36 +00:00
app := origApp . DeepCopy ( )
2019-03-20 21:02:54 +00:00
logCtx := log . WithFields ( log . Fields { "application" : app . Name } )
2019-07-15 20:34:26 +00:00
if comparisonLevel == ComparisonWithNothing {
2019-03-14 21:54:34 +00:00
if managedResources , err := ctrl . cache . GetAppManagedResources ( app . Name ) ; err != nil {
logCtx . Warnf ( "Failed to get cached managed resources for tree reconciliation, fallback to full reconciliation" )
} else {
2019-04-02 15:48:34 +00:00
if tree , err := ctrl . getResourceTree ( app , managedResources ) ; err != nil {
2019-03-14 21:54:34 +00:00
app . Status . Conditions = [ ] appv1 . ApplicationCondition { { Type : appv1 . ApplicationConditionComparisonError , Message : err . Error ( ) } }
} else {
2019-05-08 16:00:45 +00:00
app . Status . Summary = tree . GetSummary ( )
2019-03-14 21:54:34 +00:00
if err = ctrl . cache . SetAppResourcesTree ( app . Name , tree ) ; err != nil {
logCtx . Errorf ( "Failed to cache resources tree: %v" , err )
return
}
}
2019-06-28 00:55:06 +00:00
now := metav1 . Now ( )
app . Status . ObservedAt = & now
2019-03-14 21:54:34 +00:00
ctrl . persistAppStatus ( origApp , & app . Status )
return
}
}
2018-07-10 21:45:18 +00:00
2019-08-22 16:36:27 +00:00
hasErrors := ctrl . refreshAppConditions ( app )
if hasErrors {
2018-12-04 10:52:57 +00:00
app . Status . Sync . Status = appv1 . SyncStatusCodeUnknown
app . Status . Health . Status = appv1 . HealthStatusUnknown
ctrl . persistAppStatus ( origApp , & app . Status )
2018-07-12 02:12:30 +00:00
return
}
2018-07-10 21:45:18 +00:00
2019-06-18 02:09:43 +00:00
var localManifests [ ] string
2019-09-11 20:53:31 +00:00
if opState := app . Status . OperationState ; opState != nil && opState . Operation . Sync != nil {
2019-06-18 02:09:43 +00:00
localManifests = opState . Operation . Sync . Manifests
}
2019-07-15 20:34:26 +00:00
revision := app . Spec . Source . TargetRevision
if comparisonLevel == CompareWithRecent {
revision = app . Status . Sync . Revision
}
2019-08-20 15:43:29 +00:00
compareResult := ctrl . appStateManager . CompareAppState ( app , revision , app . Spec . Source , refreshType == appv1 . RefreshTypeHard , localManifests )
2019-09-06 22:37:25 +00:00
ctrl . normalizeApplication ( origApp , app )
2019-08-20 15:43:29 +00:00
app . Status . Conditions = append ( app . Status . Conditions , compareResult . conditions ... )
2019-04-02 15:48:34 +00:00
tree , err := ctrl . setAppManagedResources ( app , compareResult )
2019-02-13 23:20:40 +00:00
if err != nil {
2019-03-20 21:02:54 +00:00
logCtx . Errorf ( "Failed to cache app resources: %v" , err )
2019-04-02 15:48:34 +00:00
} else {
2019-05-08 16:00:45 +00:00
app . Status . Summary = tree . GetSummary ( )
2019-02-13 23:20:40 +00:00
}
2018-09-11 21:28:53 +00:00
2019-10-01 22:23:09 +00:00
var sErr bool
project , err := ctrl . getAppProj ( app )
if err != nil {
logCtx . Infof ( "Could not lookup project for %s in order to check maintenance state" , app . Name )
2019-08-22 16:36:27 +00:00
} else {
2019-10-01 22:23:09 +00:00
active := project . Spec . Maintenance . ActiveWindows ( )
match , _ := active . Match ( app )
if match {
logCtx . Infof ( "Maintenance window active, skipping sync" )
} else {
syncErrCond := ctrl . autoSync ( app , compareResult . syncStatus , compareResult . resources )
if syncErrCond != nil {
sErr = true
app . Status . SetConditions ( [ ] appv1 . ApplicationCondition { * syncErrCond } , map [ appv1 . ApplicationConditionType ] bool { appv1 . ApplicationConditionSyncError : true } )
}
}
}
if ! sErr {
2019-08-22 16:36:27 +00:00
app . Status . SetConditions ( [ ] appv1 . ApplicationCondition { } , map [ appv1 . ApplicationConditionType ] bool { appv1 . ApplicationConditionSyncError : true } )
2018-09-11 21:28:53 +00:00
}
2019-06-28 00:55:06 +00:00
app . Status . ObservedAt = & compareResult . reconciledAt
app . Status . ReconciledAt = & compareResult . reconciledAt
2018-12-04 10:52:57 +00:00
app . Status . Sync = * compareResult . syncStatus
app . Status . Health = * compareResult . healthStatus
app . Status . Resources = compareResult . resources
2019-03-18 21:39:32 +00:00
app . Status . SourceType = compareResult . appSourceType
2018-12-04 10:52:57 +00:00
ctrl . persistAppStatus ( origApp , & app . Status )
2018-05-16 23:30:28 +00:00
return
2018-02-20 23:23:37 +00:00
}
2018-07-12 19:39:46 +00:00
// needRefreshAppStatus answers if application status needs to be refreshed.
// Returns true if application never been compared, has changed or comparison result has expired.
2019-03-14 21:54:34 +00:00
// Additionally returns whether full refresh was requested or not.
// If full refresh is requested then target and live state should be reconciled, else only live state tree should be updated.
2019-07-15 20:34:26 +00:00
func ( ctrl * ApplicationController ) needRefreshAppStatus ( app * appv1 . Application , statusRefreshTimeout time . Duration ) ( bool , appv1 . RefreshType , CompareWith ) {
2018-09-24 15:52:43 +00:00
logCtx := log . WithFields ( log . Fields { "application" : app . Name } )
2018-07-12 19:39:46 +00:00
var reason string
2019-07-15 20:34:26 +00:00
compareWith := CompareWithLatest
2018-12-18 02:23:35 +00:00
refreshType := appv1 . RefreshTypeNormal
2019-06-28 00:55:06 +00:00
expired := app . Status . ReconciledAt == nil || app . Status . ReconciledAt . Add ( statusRefreshTimeout ) . Before ( time . Now ( ) . UTC ( ) )
2019-09-23 17:31:59 +00:00
if requestedType , ok := app . IsRefreshRequested ( ) ; ok || expired {
if ok {
refreshType = requestedType
reason = fmt . Sprintf ( "%s refresh requested" , refreshType )
} else if expired {
reason = fmt . Sprintf ( "comparison expired. reconciledAt: %v, expiry: %v" , app . Status . ReconciledAt , statusRefreshTimeout )
}
2019-07-15 20:34:26 +00:00
} else if requested , level := ctrl . isRefreshRequested ( app . Name ) ; requested {
compareWith = level
2019-01-07 22:35:16 +00:00
reason = fmt . Sprintf ( "controller refresh requested" )
2018-12-04 10:52:57 +00:00
} else if app . Status . Sync . Status == appv1 . SyncStatusCodeUnknown && expired {
2018-07-12 19:39:46 +00:00
reason = "comparison status unknown"
2018-12-20 20:48:42 +00:00
} else if ! app . Spec . Source . Equals ( app . Status . Sync . ComparedTo . Source ) {
reason = "spec.source differs"
} else if ! app . Spec . Destination . Equals ( app . Status . Sync . ComparedTo . Destination ) {
2019-02-25 18:51:24 +00:00
reason = "spec.destination differs"
2018-07-12 19:39:46 +00:00
}
if reason != "" {
2019-07-15 20:34:26 +00:00
logCtx . Infof ( "Refreshing app status (%s), level (%d)" , reason , compareWith )
return true , refreshType , compareWith
2018-07-12 19:39:46 +00:00
}
2019-07-15 20:34:26 +00:00
return false , refreshType , compareWith
2018-07-12 19:39:46 +00:00
}
2019-08-22 16:36:27 +00:00
func ( ctrl * ApplicationController ) refreshAppConditions ( app * appv1 . Application ) bool {
errorConditions := make ( [ ] appv1 . ApplicationCondition , 0 )
2019-08-19 15:14:48 +00:00
proj , err := ctrl . getAppProj ( app )
2018-07-09 17:45:03 +00:00
if err != nil {
2019-02-22 23:20:34 +00:00
if apierr . IsNotFound ( err ) {
2019-08-22 16:36:27 +00:00
errorConditions = append ( errorConditions , appv1 . ApplicationCondition {
2018-07-10 21:45:18 +00:00
Type : appv1 . ApplicationConditionInvalidSpecError ,
Message : fmt . Sprintf ( "Application referencing project %s which does not exist" , app . Spec . Project ) ,
} )
} else {
2019-08-22 16:36:27 +00:00
errorConditions = append ( errorConditions , appv1 . ApplicationCondition {
2018-07-10 21:45:18 +00:00
Type : appv1 . ApplicationConditionUnknownError ,
Message : err . Error ( ) ,
} )
}
2018-07-09 17:45:03 +00:00
} else {
2019-04-29 22:04:25 +00:00
specConditions , err := argo . ValidatePermissions ( context . Background ( ) , & app . Spec , proj , ctrl . db )
2018-07-09 17:45:03 +00:00
if err != nil {
2019-08-22 16:36:27 +00:00
errorConditions = append ( errorConditions , appv1 . ApplicationCondition {
2018-07-10 21:45:18 +00:00
Type : appv1 . ApplicationConditionUnknownError ,
Message : err . Error ( ) ,
} )
2018-07-09 17:45:03 +00:00
} else {
2019-08-22 16:36:27 +00:00
errorConditions = append ( errorConditions , specConditions ... )
2018-07-09 17:45:03 +00:00
}
}
2019-08-22 16:36:27 +00:00
app . Status . SetConditions ( errorConditions , map [ appv1 . ApplicationConditionType ] bool {
2019-03-18 20:21:03 +00:00
appv1 . ApplicationConditionInvalidSpecError : true ,
appv1 . ApplicationConditionUnknownError : true ,
appv1 . ApplicationConditionComparisonError : true ,
appv1 . ApplicationConditionSharedResourceWarning : true ,
appv1 . ApplicationConditionRepeatedResourceWarning : true ,
2019-07-02 20:56:25 +00:00
appv1 . ApplicationConditionExcludedResourceWarning : true ,
2019-08-19 15:14:48 +00:00
} )
2019-08-22 16:36:27 +00:00
return len ( errorConditions ) > 0
2018-05-07 15:38:25 +00:00
}
2018-11-30 21:50:27 +00:00
// normalizeApplication normalizes an application.spec and additionally persists updates if it changed
2019-09-06 22:37:25 +00:00
func ( ctrl * ApplicationController ) normalizeApplication ( orig , app * appv1 . Application ) {
2018-11-30 21:50:27 +00:00
logCtx := log . WithFields ( log . Fields { "application" : app . Name } )
2019-09-06 22:37:25 +00:00
app . Spec = * argo . NormalizeApplicationSpec ( & app . Spec )
2019-03-04 08:56:36 +00:00
patch , modified , err := diff . CreateTwoWayMergePatch ( orig , app , appv1 . Application { } )
2018-11-30 21:50:27 +00:00
if err != nil {
logCtx . Errorf ( "error constructing app spec patch: %v" , err )
} else if modified {
2019-03-04 08:56:36 +00:00
appClient := ctrl . applicationClientset . ArgoprojV1alpha1 ( ) . Applications ( app . Namespace )
2018-11-30 21:50:27 +00:00
_ , err = appClient . Patch ( app . Name , types . MergePatchType , patch )
if err != nil {
logCtx . Errorf ( "Error persisting normalized application spec: %v" , err )
} else {
logCtx . Infof ( "Normalized app spec: %s" , string ( patch ) )
}
}
}
// persistAppStatus persists updates to application status. If no changes were made, it is a no-op
2018-12-04 10:52:57 +00:00
func ( ctrl * ApplicationController ) persistAppStatus ( orig * appv1 . Application , newStatus * appv1 . ApplicationStatus ) {
logCtx := log . WithFields ( log . Fields { "application" : orig . Name } )
if orig . Status . Sync . Status != newStatus . Sync . Status {
message := fmt . Sprintf ( "Updated sync status: %s -> %s" , orig . Status . Sync . Status , newStatus . Sync . Status )
ctrl . auditLogger . LogAppEvent ( orig , argo . EventInfo { Reason : argo . EventReasonResourceUpdated , Type : v1 . EventTypeNormal } , message )
2018-07-12 19:39:46 +00:00
}
2018-12-04 10:52:57 +00:00
if orig . Status . Health . Status != newStatus . Health . Status {
message := fmt . Sprintf ( "Updated health status: %s -> %s" , orig . Status . Health . Status , newStatus . Health . Status )
ctrl . auditLogger . LogAppEvent ( orig , argo . EventInfo { Reason : argo . EventReasonResourceUpdated , Type : v1 . EventTypeNormal } , message )
2018-07-12 19:39:46 +00:00
}
2018-12-18 02:23:35 +00:00
var newAnnotations map [ string ] string
if orig . GetAnnotations ( ) != nil {
newAnnotations = make ( map [ string ] string )
for k , v := range orig . GetAnnotations ( ) {
newAnnotations [ k ] = v
}
delete ( newAnnotations , common . AnnotationKeyRefresh )
}
patch , modified , err := diff . CreateTwoWayMergePatch (
& appv1 . Application { ObjectMeta : metav1 . ObjectMeta { Annotations : orig . GetAnnotations ( ) } , Status : orig . Status } ,
& appv1 . Application { ObjectMeta : metav1 . ObjectMeta { Annotations : newAnnotations } , Status : * newStatus } , appv1 . Application { } )
2018-07-12 19:39:46 +00:00
if err != nil {
2018-11-30 21:50:27 +00:00
logCtx . Errorf ( "Error constructing app status patch: %v" , err )
2018-07-12 19:39:46 +00:00
return
}
2018-11-30 21:50:27 +00:00
if ! modified {
2018-09-24 15:52:43 +00:00
logCtx . Infof ( "No status changes. Skipping patch" )
2018-07-12 19:39:46 +00:00
return
2018-05-03 22:55:01 +00:00
}
2019-02-25 18:51:24 +00:00
logCtx . Debugf ( "patch: %s" , string ( patch ) )
2018-12-04 10:52:57 +00:00
appClient := ctrl . applicationClientset . ArgoprojV1alpha1 ( ) . Applications ( orig . Namespace )
_ , err = appClient . Patch ( orig . Name , types . MergePatchType , patch )
2018-02-22 18:56:14 +00:00
if err != nil {
2018-09-24 15:52:43 +00:00
logCtx . Warnf ( "Error updating application: %v" , err )
2018-04-11 19:53:33 +00:00
} else {
2018-09-24 15:52:43 +00:00
logCtx . Infof ( "Update successful" )
2018-02-22 18:56:14 +00:00
}
}
2018-09-11 21:28:53 +00:00
// autoSync will initiate a sync operation for an application configured with automated sync
2019-07-25 02:26:09 +00:00
func ( ctrl * ApplicationController ) autoSync ( app * appv1 . Application , syncStatus * appv1 . SyncStatus , resources [ ] appv1 . ResourceStatus ) * appv1 . ApplicationCondition {
2018-09-11 21:28:53 +00:00
if app . Spec . SyncPolicy == nil || app . Spec . SyncPolicy . Automated == nil {
return nil
}
logCtx := log . WithFields ( log . Fields { "application" : app . Name } )
if app . Operation != nil {
logCtx . Infof ( "Skipping auto-sync: another operation is in progress" )
return nil
}
2019-03-04 08:56:36 +00:00
if app . DeletionTimestamp != nil && ! app . DeletionTimestamp . IsZero ( ) {
logCtx . Infof ( "Skipping auto-sync: deletion in progress" )
return nil
}
2019-10-01 22:23:09 +00:00
2018-09-11 21:28:53 +00:00
// Only perform auto-sync if we detect OutOfSync status. This is to prevent us from attempting
// a sync when application is already in a Synced or Unknown state
2018-12-04 10:52:57 +00:00
if syncStatus . Status != appv1 . SyncStatusCodeOutOfSync {
logCtx . Infof ( "Skipping auto-sync: application status is %s" , syncStatus . Status )
2018-09-11 21:28:53 +00:00
return nil
}
2018-09-24 15:52:43 +00:00
2019-07-25 02:26:09 +00:00
desiredCommitSHA := syncStatus . Revision
alreadyAttempted , attemptPhase := alreadyAttemptedSync ( app , desiredCommitSHA )
selfHeal := app . Spec . SyncPolicy . Automated . SelfHeal
op := appv1 . Operation {
Sync : & appv1 . SyncOperation {
Revision : desiredCommitSHA ,
Prune : app . Spec . SyncPolicy . Automated . Prune ,
} ,
}
2018-09-11 21:28:53 +00:00
// It is possible for manifests to remain OutOfSync even after a sync/kubectl apply (e.g.
// auto-sync with pruning disabled). We need to ensure that we do not keep Syncing an
// application in an infinite loop. To detect this, we only attempt the Sync if the revision
2018-09-24 15:52:43 +00:00
// and parameter overrides are different from our most recent sync operation.
2019-07-25 02:26:09 +00:00
if alreadyAttempted && ( ! selfHeal || ! attemptPhase . Successful ( ) ) {
if ! attemptPhase . Successful ( ) {
2018-09-11 21:28:53 +00:00
logCtx . Warnf ( "Skipping auto-sync: failed previous sync attempt to %s" , desiredCommitSHA )
message := fmt . Sprintf ( "Failed sync attempt to %s: %s" , desiredCommitSHA , app . Status . OperationState . Message )
return & appv1 . ApplicationCondition { Type : appv1 . ApplicationConditionSyncError , Message : message }
}
2018-09-24 15:52:43 +00:00
logCtx . Infof ( "Skipping auto-sync: most recent sync already to %s" , desiredCommitSHA )
return nil
2019-07-25 02:26:09 +00:00
} else if alreadyAttempted && selfHeal {
if shouldSelfHeal , retryAfter := ctrl . shouldSelfHeal ( app ) ; shouldSelfHeal {
for _ , resource := range resources {
if resource . Status != appv1 . SyncStatusCodeSynced {
op . Sync . Resources = append ( op . Sync . Resources , appv1 . SyncOperationResource {
Kind : resource . Kind ,
Group : resource . Group ,
Name : resource . Name ,
} )
}
}
} else {
logCtx . Infof ( "Skipping auto-sync: already attempted sync to %s with timeout %v (retrying in %v)" , desiredCommitSHA , ctrl . selfHealTimeout , retryAfter )
if key , err := cache . MetaNamespaceKeyFunc ( app ) ; err == nil {
ctrl . requestAppRefresh ( app . Name , CompareWithLatest )
ctrl . appRefreshQueue . AddAfter ( key , retryAfter )
} else {
logCtx . Warnf ( "Fails to requeue application: %v" , err )
}
return nil
}
2018-02-26 15:43:35 +00:00
2018-09-11 21:28:53 +00:00
}
2019-07-25 02:26:09 +00:00
2018-09-11 21:28:53 +00:00
appIf := ctrl . applicationClientset . ArgoprojV1alpha1 ( ) . Applications ( app . Namespace )
2018-11-17 01:10:04 +00:00
_ , err := argo . SetAppOperation ( appIf , app . Name , & op )
2018-09-11 21:28:53 +00:00
if err != nil {
logCtx . Errorf ( "Failed to initiate auto-sync to %s: %v" , desiredCommitSHA , err )
return & appv1 . ApplicationCondition { Type : appv1 . ApplicationConditionSyncError , Message : err . Error ( ) }
}
2018-09-24 15:52:43 +00:00
message := fmt . Sprintf ( "Initiated automated sync to '%s'" , desiredCommitSHA )
ctrl . auditLogger . LogAppEvent ( app , argo . EventInfo { Reason : argo . EventReasonOperationStarted , Type : v1 . EventTypeNormal } , message )
logCtx . Info ( message )
2018-09-11 21:28:53 +00:00
return nil
}
2018-09-24 15:52:43 +00:00
// alreadyAttemptedSync returns whether or not the most recent sync was performed against the
2019-03-04 08:56:36 +00:00
// commitSHA and with the same app source config which are currently set in the app
2019-07-25 02:26:09 +00:00
func alreadyAttemptedSync ( app * appv1 . Application , commitSHA string ) ( bool , appv1 . OperationPhase ) {
2018-09-24 15:52:43 +00:00
if app . Status . OperationState == nil || app . Status . OperationState . Operation . Sync == nil || app . Status . OperationState . SyncResult == nil {
2019-07-25 02:26:09 +00:00
return false , ""
2018-09-24 15:52:43 +00:00
}
if app . Status . OperationState . SyncResult . Revision != commitSHA {
2019-07-25 02:26:09 +00:00
return false , ""
2018-09-24 15:52:43 +00:00
}
2019-03-04 08:56:36 +00:00
// Ignore differences in target revision, since we already just verified commitSHAs are equal,
// and we do not want to trigger auto-sync due to things like HEAD != master
specSource := app . Spec . Source . DeepCopy ( )
specSource . TargetRevision = ""
syncResSource := app . Status . OperationState . SyncResult . Source . DeepCopy ( )
syncResSource . TargetRevision = ""
2019-07-25 02:26:09 +00:00
return reflect . DeepEqual ( app . Spec . Source , app . Status . OperationState . SyncResult . Source ) , app . Status . OperationState . Phase
}
func ( ctrl * ApplicationController ) shouldSelfHeal ( app * appv1 . Application ) ( bool , time . Duration ) {
if app . Status . OperationState == nil {
return true , time . Duration ( 0 )
}
var retryAfter time . Duration
if app . Status . OperationState . FinishedAt == nil {
retryAfter = ctrl . selfHealTimeout
} else {
retryAfter = ctrl . selfHealTimeout - time . Since ( app . Status . OperationState . FinishedAt . Time )
}
return retryAfter <= 0 , retryAfter
2018-09-24 15:52:43 +00:00
}
2019-08-19 15:14:48 +00:00
func ( ctrl * ApplicationController ) newApplicationInformerAndLister ( ) ( cache . SharedIndexInformer , applisters . ApplicationLister , error ) {
2018-09-11 21:28:53 +00:00
appInformerFactory := appinformers . NewFilteredSharedInformerFactory (
ctrl . applicationClientset ,
ctrl . statusRefreshTimeout ,
ctrl . namespace ,
func ( options * metav1 . ListOptions ) { } ,
2018-02-20 23:23:37 +00:00
)
informer := appInformerFactory . Argoproj ( ) . V1alpha1 ( ) . Applications ( ) . Informer ( )
2019-02-22 23:20:34 +00:00
lister := appInformerFactory . Argoproj ( ) . V1alpha1 ( ) . Applications ( ) . Lister ( )
2018-02-20 23:23:37 +00:00
informer . AddEventHandler (
cache . ResourceEventHandlerFuncs {
AddFunc : func ( obj interface { } ) {
key , err := cache . MetaNamespaceKeyFunc ( obj )
if err == nil {
2018-09-11 21:28:53 +00:00
ctrl . appRefreshQueue . Add ( key )
ctrl . appOperationQueue . Add ( key )
2018-02-20 23:23:37 +00:00
}
} ,
UpdateFunc : func ( old , new interface { } ) {
key , err := cache . MetaNamespaceKeyFunc ( new )
2018-09-11 21:28:53 +00:00
if err != nil {
return
}
oldApp , oldOK := old . ( * appv1 . Application )
newApp , newOK := new . ( * appv1 . Application )
if oldOK && newOK {
if toggledAutomatedSync ( oldApp , newApp ) {
log . WithField ( "application" , newApp . Name ) . Info ( "Enabled automated sync" )
2019-07-15 20:34:26 +00:00
ctrl . requestAppRefresh ( newApp . Name , CompareWithLatest )
2018-09-11 21:28:53 +00:00
}
2018-02-20 23:23:37 +00:00
}
2018-09-11 21:28:53 +00:00
ctrl . appRefreshQueue . Add ( key )
ctrl . appOperationQueue . Add ( key )
2018-02-20 23:23:37 +00:00
} ,
DeleteFunc : func ( obj interface { } ) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key , err := cache . DeletionHandlingMetaNamespaceKeyFunc ( obj )
if err == nil {
2018-09-11 21:28:53 +00:00
ctrl . appRefreshQueue . Add ( key )
2018-02-20 23:23:37 +00:00
}
} ,
} ,
)
2019-08-19 15:14:48 +00:00
err := informer . AddIndexers ( cache . Indexers {
orphanedIndex : func ( obj interface { } ) ( i [ ] string , e error ) {
app , ok := obj . ( * appv1 . Application )
if ! ok {
return nil , nil
}
proj , err := ctrl . getAppProj ( app )
if err != nil {
return nil , nil
}
if proj . Spec . OrphanedResources != nil {
return [ ] string { app . Spec . Destination . Namespace } , nil
}
return nil , nil
} ,
} )
return informer , lister , err
2018-02-20 23:23:37 +00:00
}
2018-07-07 07:54:06 +00:00
2018-07-14 00:13:31 +00:00
func isOperationInProgress ( app * appv1 . Application ) bool {
2018-07-07 07:54:06 +00:00
return app . Status . OperationState != nil && ! app . Status . OperationState . Phase . Completed ( )
}
2018-09-11 21:28:53 +00:00
// toggledAutomatedSync tests if an app went from auto-sync disabled to enabled.
// if it was toggled to be enabled, the informer handler will force a refresh
func toggledAutomatedSync ( old * appv1 . Application , new * appv1 . Application ) bool {
if new . Spec . SyncPolicy == nil || new . Spec . SyncPolicy . Automated == nil {
return false
}
// auto-sync is enabled. check if it was previously disabled
if old . Spec . SyncPolicy == nil || old . Spec . SyncPolicy . Automated == nil {
return true
}
// nothing changed
return false
}