2018-07-07 07:54:06 +00:00
package controller
import (
"context"
"fmt"
2019-06-05 01:17:41 +00:00
"reflect"
2018-09-10 17:14:14 +00:00
"sort"
2019-06-05 01:17:41 +00:00
"strings"
2018-07-07 07:54:06 +00:00
"sync"
2019-07-24 22:00:40 +00:00
"time"
2018-07-07 07:54:06 +00:00
log "github.com/sirupsen/logrus"
2019-07-24 22:00:40 +00:00
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
2018-07-07 07:54:06 +00:00
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2019-07-24 22:00:40 +00:00
"k8s.io/apimachinery/pkg/util/wait"
2018-07-07 07:54:06 +00:00
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
2019-06-07 15:46:11 +00:00
"github.com/argoproj/argo-cd/common"
2019-04-29 19:42:59 +00:00
"github.com/argoproj/argo-cd/controller/metrics"
2019-06-05 01:17:41 +00:00
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
listersv1alpha1 "github.com/argoproj/argo-cd/pkg/client/listers/application/v1alpha1"
2018-07-10 21:45:18 +00:00
"github.com/argoproj/argo-cd/util/argo"
2019-06-05 01:17:41 +00:00
"github.com/argoproj/argo-cd/util/health"
"github.com/argoproj/argo-cd/util/hook"
2018-07-07 07:54:06 +00:00
"github.com/argoproj/argo-cd/util/kube"
2019-06-07 15:46:11 +00:00
"github.com/argoproj/argo-cd/util/resource"
2018-07-07 07:54:06 +00:00
)
2019-07-24 22:00:40 +00:00
const (
crdReadinessTimeout = time . Duration ( 3 ) * time . Second
)
2018-07-07 07:54:06 +00:00
type syncContext struct {
2019-07-24 22:00:40 +00:00
resourceOverrides map [ string ] v1alpha1 . ResourceOverride
appName string
proj * v1alpha1 . AppProject
compareResult * comparisonResult
config * rest . Config
dynamicIf dynamic . Interface
disco discovery . DiscoveryInterface
extensionsclientset * clientset . Clientset
kubectl kube . Kubectl
namespace string
server string
syncOp * v1alpha1 . SyncOperation
syncRes * v1alpha1 . SyncOperationResult
syncResources [ ] v1alpha1 . SyncOperationResource
opState * v1alpha1 . OperationState
log * log . Entry
2018-07-07 07:54:06 +00:00
// lock to protect concurrent updates of the result list
lock sync . Mutex
}
2019-06-05 01:17:41 +00:00
func ( m * appStateManager ) SyncAppState ( app * v1alpha1 . Application , state * v1alpha1 . OperationState ) {
2018-09-24 15:52:43 +00:00
// Sync requests might be requested with ambiguous revisions (e.g. master, HEAD, v1.2.3).
2018-07-07 07:54:06 +00:00
// This can change meaning when resuming operations (e.g a hook sync). After calculating a
2018-12-04 10:52:57 +00:00
// concrete git commit SHA, the SHA is remembered in the status.operationState.syncResult field.
// This ensures that when resuming an operation, we sync to the same revision that we initially
// started with.
2018-07-07 07:54:06 +00:00
var revision string
2019-06-05 01:17:41 +00:00
var syncOp v1alpha1 . SyncOperation
var syncRes * v1alpha1 . SyncOperationResult
var syncResources [ ] v1alpha1 . SyncOperationResource
var source v1alpha1 . ApplicationSource
2019-03-04 08:56:36 +00:00
if state . Operation . Sync == nil {
2019-06-05 01:17:41 +00:00
state . Phase = v1alpha1 . OperationFailed
2018-07-07 07:54:06 +00:00
state . Message = "Invalid operation request: no operation specified"
return
}
2019-03-04 08:56:36 +00:00
syncOp = * state . Operation . Sync
if syncOp . Source == nil {
// normal sync case (where source is taken from app.spec.source)
source = app . Spec . Source
} else {
// rollback case
source = * state . Operation . Sync . Source
}
syncResources = syncOp . Resources
if state . SyncResult != nil {
syncRes = state . SyncResult
revision = state . SyncResult . Revision
} else {
2019-06-05 01:17:41 +00:00
syncRes = & v1alpha1 . SyncOperationResult { }
2019-03-04 08:56:36 +00:00
// status.operationState.syncResult.source. must be set properly since auto-sync relies
// on this information to decide if it should sync (if source is different than the last
// sync attempt)
syncRes . Source = source
state . SyncResult = syncRes
}
2018-07-07 07:54:06 +00:00
if revision == "" {
// if we get here, it means we did not remember a commit SHA which we should be syncing to.
// This typically indicates we are just about to begin a brand new sync/rollback operation.
// Take the value in the requested operation. We will resolve this to a SHA later.
revision = syncOp . Revision
}
2018-10-10 17:12:20 +00:00
2019-06-18 02:09:43 +00:00
compareResult , err := m . CompareAppState ( app , revision , source , false , syncOp . Manifests )
2018-07-07 07:54:06 +00:00
if err != nil {
2019-06-05 01:17:41 +00:00
state . Phase = v1alpha1 . OperationError
2018-07-07 07:54:06 +00:00
state . Message = err . Error ( )
return
}
2018-12-04 10:52:57 +00:00
// If there are any error conditions, do not perform the operation
2019-06-05 01:17:41 +00:00
errConditions := make ( [ ] v1alpha1 . ApplicationCondition , 0 )
2018-12-04 10:52:57 +00:00
for i := range compareResult . conditions {
if compareResult . conditions [ i ] . IsError ( ) {
errConditions = append ( errConditions , compareResult . conditions [ i ] )
2018-07-11 20:00:48 +00:00
}
}
2018-07-10 21:45:18 +00:00
if len ( errConditions ) > 0 {
2019-06-05 01:17:41 +00:00
state . Phase = v1alpha1 . OperationError
2018-07-10 21:45:18 +00:00
state . Message = argo . FormatAppConditions ( errConditions )
return
}
2018-12-04 10:52:57 +00:00
2019-03-04 08:56:36 +00:00
// We now have a concrete commit SHA. Save this in the sync result revision so that we remember
2018-07-07 07:54:06 +00:00
// what we should be syncing to when resuming operations.
2018-12-04 10:52:57 +00:00
syncRes . Revision = compareResult . syncStatus . Revision
2018-07-07 07:54:06 +00:00
2018-11-28 21:38:02 +00:00
clst , err := m . db . GetCluster ( context . Background ( ) , app . Spec . Destination . Server )
2018-07-07 07:54:06 +00:00
if err != nil {
2019-06-05 01:17:41 +00:00
state . Phase = v1alpha1 . OperationError
2018-07-07 07:54:06 +00:00
state . Message = err . Error ( )
return
}
2019-04-29 19:42:59 +00:00
restConfig := metrics . AddMetricsTransportWrapper ( m . metricsServer , app , clst . RESTConfig ( ) )
2018-10-29 05:46:13 +00:00
dynamicIf , err := dynamic . NewForConfig ( restConfig )
2018-07-07 07:54:06 +00:00
if err != nil {
2019-06-05 01:17:41 +00:00
state . Phase = v1alpha1 . OperationError
2018-07-07 07:54:06 +00:00
state . Message = fmt . Sprintf ( "Failed to initialize dynamic client: %v" , err )
return
}
2018-10-29 05:46:13 +00:00
disco , err := discovery . NewDiscoveryClientForConfig ( restConfig )
if err != nil {
2019-06-05 01:17:41 +00:00
state . Phase = v1alpha1 . OperationError
2018-10-29 05:46:13 +00:00
state . Message = fmt . Sprintf ( "Failed to initialize discovery client: %v" , err )
return
}
2018-07-07 07:54:06 +00:00
2019-07-24 22:00:40 +00:00
extensionsclientset , err := clientset . NewForConfig ( restConfig )
if err != nil {
state . Phase = v1alpha1 . OperationError
state . Message = fmt . Sprintf ( "Failed to initialize extensions client: %v" , err )
return
}
2019-06-05 01:17:41 +00:00
proj , err := argo . GetAppProject ( & app . Spec , listersv1alpha1 . NewAppProjectLister ( m . projInformer . GetIndexer ( ) ) , m . namespace )
2018-09-20 16:48:54 +00:00
if err != nil {
2019-06-05 01:17:41 +00:00
state . Phase = v1alpha1 . OperationError
2018-09-20 16:48:54 +00:00
state . Message = fmt . Sprintf ( "Failed to load application project: %v" , err )
return
}
2019-06-21 22:59:05 +00:00
resourceOverrides , err := m . settingsMgr . GetResourceOverrides ( )
if err != nil {
state . Phase = v1alpha1 . OperationError
state . Message = fmt . Sprintf ( "Failed to load resource overrides: %v" , err )
return
}
2018-07-07 07:54:06 +00:00
syncCtx := syncContext {
2019-07-24 22:00:40 +00:00
resourceOverrides : resourceOverrides ,
appName : app . Name ,
proj : proj ,
compareResult : compareResult ,
config : restConfig ,
dynamicIf : dynamicIf ,
disco : disco ,
extensionsclientset : extensionsclientset ,
kubectl : m . kubectl ,
namespace : app . Spec . Destination . Namespace ,
server : app . Spec . Destination . Server ,
syncOp : & syncOp ,
syncRes : syncRes ,
syncResources : syncResources ,
opState : state ,
log : log . WithFields ( log . Fields { "application" : app . Name } ) ,
2019-06-05 01:17:41 +00:00
}
if state . Phase == v1alpha1 . OperationTerminating {
2018-07-14 00:13:31 +00:00
syncCtx . terminate ( )
} else {
syncCtx . sync ( )
}
2019-06-05 01:17:41 +00:00
syncCtx . log . Info ( "sync/terminate complete" )
if ! syncOp . DryRun && ! syncCtx . isSelectiveSync ( ) && syncCtx . opState . Phase . Successful ( ) {
2019-03-04 08:56:36 +00:00
err := m . persistRevisionHistory ( app , compareResult . syncStatus . Revision , source )
2018-07-07 07:54:06 +00:00
if err != nil {
2019-06-05 01:17:41 +00:00
syncCtx . setOperationPhase ( v1alpha1 . OperationError , fmt . Sprintf ( "failed to record sync to history: %v" , err ) )
2018-07-07 07:54:06 +00:00
}
}
}
// sync has performs the actual apply or hook based sync
func ( sc * syncContext ) sync ( ) {
2019-06-05 01:17:41 +00:00
sc . log . WithFields ( log . Fields { "isSelectiveSync" : sc . isSelectiveSync ( ) , "skipHooks" : sc . skipHooks ( ) , "started" : sc . started ( ) } ) . Info ( "syncing" )
tasks , successful := sc . getSyncTasks ( )
2018-07-07 07:54:06 +00:00
if ! successful {
2019-06-05 01:17:41 +00:00
sc . setOperationPhase ( v1alpha1 . OperationFailed , "one or more synchronization tasks are not valid" )
2018-07-07 07:54:06 +00:00
return
}
2018-11-27 10:52:46 +00:00
2019-06-05 01:17:41 +00:00
sc . log . WithFields ( log . Fields { "tasks" : tasks , "isSelectiveSync" : sc . isSelectiveSync ( ) } ) . Info ( "tasks" )
2018-11-27 10:52:46 +00:00
2018-07-07 07:54:06 +00:00
// Perform a `kubectl apply --dry-run` against all the manifests. This will detect most (but
// not all) validation issues with the user's manifests (e.g. will detect syntax issues, but
// will not not detect if they are mutating immutable fields). If anything fails, we will refuse
2019-06-05 01:17:41 +00:00
// to perform the sync. we only wish to do this once per operation, performing additional dry-runs
// is harmless, but redundant. The indicator we use to detect if we have already performed
// the dry-run for this operation, is if the resource or hook list is empty.
if ! sc . started ( ) {
sc . log . Debug ( "dry-run" )
if ! sc . runTasks ( tasks , true ) {
sc . setOperationPhase ( v1alpha1 . OperationFailed , "one or more objects failed to apply (dry run)" )
2018-07-07 07:54:06 +00:00
return
}
}
2019-06-05 01:17:41 +00:00
// update status of any tasks that are running, note that this must exclude pruning tasks
for _ , task := range tasks . Filter ( func ( t * syncTask ) bool {
// just occasionally, you can be running yet not have a live resource
return t . running ( ) && t . liveObj != nil
} ) {
if task . isHook ( ) {
// update the hook's result
operationState , message := getOperationPhase ( task . liveObj )
sc . setResourceResult ( task , "" , operationState , message )
// maybe delete the hook
if enforceHookDeletePolicy ( task . liveObj , task . operationState ) {
err := sc . deleteResource ( task )
if err != nil {
sc . setResourceResult ( task , "" , v1alpha1 . OperationError , fmt . Sprintf ( "failed to delete resource: %v" , err ) )
}
}
} else {
// this must be calculated on the live object
healthStatus , err := health . GetResourceHealth ( task . liveObj , sc . resourceOverrides )
if err == nil {
log . WithFields ( log . Fields { "task" : task , "healthStatus" : healthStatus } ) . Debug ( "attempting to update health of running task" )
if healthStatus == nil {
// some objects (e.g. secret) do not have health, and they automatically success
sc . setResourceResult ( task , task . syncStatus , v1alpha1 . OperationSucceeded , task . message )
} else {
switch healthStatus . Status {
case v1alpha1 . HealthStatusHealthy :
sc . setResourceResult ( task , task . syncStatus , v1alpha1 . OperationSucceeded , healthStatus . Message )
case v1alpha1 . HealthStatusDegraded :
sc . setResourceResult ( task , task . syncStatus , v1alpha1 . OperationFailed , healthStatus . Message )
}
}
2018-07-12 02:12:30 +00:00
}
2018-07-07 07:54:06 +00:00
}
2019-06-05 01:17:41 +00:00
}
// any running tasks, lets wait...
2019-06-28 22:50:02 +00:00
if tasks . Any ( func ( t * syncTask ) bool { return t . running ( ) } ) {
2019-06-05 01:17:41 +00:00
sc . setOperationPhase ( v1alpha1 . OperationRunning , "one or more tasks are running" )
return
}
2019-06-28 22:50:02 +00:00
// syncFailTasks only run during failure, so separate them from regular tasks
syncFailTasks , tasks := tasks . Split ( func ( t * syncTask ) bool { return t . phase == v1alpha1 . SyncPhaseSyncFail } )
2019-06-05 01:17:41 +00:00
// if there are any completed but unsuccessful tasks, sync is a failure.
2019-06-28 22:50:02 +00:00
if tasks . Any ( func ( t * syncTask ) bool { return t . completed ( ) && ! t . successful ( ) } ) {
sc . setOperationFailed ( syncFailTasks , "one or more synchronization tasks completed unsuccessfully" )
2018-07-07 07:54:06 +00:00
return
}
2019-06-05 01:17:41 +00:00
sc . log . WithFields ( log . Fields { "tasks" : tasks } ) . Debug ( "filtering out completed tasks" )
// remove tasks that are completed, we can assume that there are no running tasks
tasks = tasks . Filter ( func ( t * syncTask ) bool { return ! t . completed ( ) } )
// If no sync tasks were generated (e.g., in case all application manifests have been removed),
// the sync operation is successful.
if len ( tasks ) == 0 {
2019-06-11 22:47:19 +00:00
sc . setOperationPhase ( v1alpha1 . OperationSucceeded , "successfully synced (no more tasks)" )
2019-06-05 01:17:41 +00:00
return
}
// remove any tasks not in this wave
phase := tasks . phase ( )
wave := tasks . wave ( )
2019-06-11 22:47:19 +00:00
// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
// EVEN if those objects subsequently degraded
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
2019-06-28 22:50:02 +00:00
complete := ! tasks . Any ( func ( t * syncTask ) bool { return t . phase != phase || wave != t . wave ( ) || t . isHook ( ) } )
2019-06-11 22:47:19 +00:00
2019-06-28 22:50:02 +00:00
sc . log . WithFields ( log . Fields { "phase" : phase , "wave" : wave , "tasks" : tasks , "syncFailTasks" : syncFailTasks } ) . Debug ( "filtering tasks in correct phase and wave" )
2019-06-05 01:17:41 +00:00
tasks = tasks . Filter ( func ( t * syncTask ) bool { return t . phase == phase && t . wave ( ) == wave } )
sc . setOperationPhase ( v1alpha1 . OperationRunning , "one or more tasks are running" )
sc . log . WithFields ( log . Fields { "tasks" : tasks } ) . Debug ( "wet-run" )
if ! sc . runTasks ( tasks , false ) {
2019-06-28 22:50:02 +00:00
sc . setOperationFailed ( syncFailTasks , "one or more objects failed to apply" )
2019-06-11 22:47:19 +00:00
} else {
if complete {
sc . setOperationPhase ( v1alpha1 . OperationSucceeded , "successfully synced (all tasks run)" )
}
2019-06-05 01:17:41 +00:00
}
2018-07-07 07:54:06 +00:00
}
2019-06-28 22:50:02 +00:00
func ( sc * syncContext ) setOperationFailed ( syncFailTasks syncTasks , message string ) {
if len ( syncFailTasks ) > 0 {
// if all the failure hooks are completed, don't run them again, and mark the sync as failed
if syncFailTasks . All ( func ( task * syncTask ) bool { return task . completed ( ) } ) {
sc . setOperationPhase ( v1alpha1 . OperationFailed , message )
return
}
// otherwise, we need to start the failure hooks, and then return without setting
// the phase, so we make sure we have at least one more sync
sc . log . WithFields ( log . Fields { "syncFailTasks" : syncFailTasks } ) . Debug ( "running sync fail tasks" )
if ! sc . runTasks ( syncFailTasks , false ) {
sc . setOperationPhase ( v1alpha1 . OperationFailed , message )
}
} else {
sc . setOperationPhase ( v1alpha1 . OperationFailed , message )
}
}
2019-06-05 01:17:41 +00:00
func ( sc * syncContext ) started ( ) bool {
return len ( sc . syncRes . Resources ) > 0
}
func ( sc * syncContext ) isSelectiveSync ( ) bool {
// we've selected no resources
if sc . syncResources == nil {
return false
}
// map both lists into string
var a [ ] string
for _ , r := range sc . compareResult . resources {
if ! r . Hook {
a = append ( a , fmt . Sprintf ( "%s:%s:%s" , r . Group , r . Kind , r . Name ) )
}
}
sort . Strings ( a )
var b [ ] string
for _ , r := range sc . syncResources {
b = append ( b , fmt . Sprintf ( "%s:%s:%s" , r . Group , r . Kind , r . Name ) )
}
sort . Strings ( b )
return ! reflect . DeepEqual ( a , b )
}
// this essentially enforces the old "apply" behaviour
func ( sc * syncContext ) skipHooks ( ) bool {
// All objects passed a `kubectl apply --dry-run`, so we are now ready to actually perform the sync.
// default sync strategy to hook if no strategy
return sc . syncOp . IsApplyStrategy ( ) || sc . isSelectiveSync ( )
}
func ( sc * syncContext ) containsResource ( resourceState managedResource ) bool {
return ! sc . isSelectiveSync ( ) ||
( resourceState . Live != nil && argo . ContainsSyncResource ( resourceState . Live . GetName ( ) , resourceState . Live . GroupVersionKind ( ) , sc . syncResources ) ) ||
( resourceState . Target != nil && argo . ContainsSyncResource ( resourceState . Target . GetName ( ) , resourceState . Target . GroupVersionKind ( ) , sc . syncResources ) )
}
// generates the list of sync tasks we will be performing during this sync.
func ( sc * syncContext ) getSyncTasks ( ) ( _ syncTasks , successful bool ) {
resourceTasks := syncTasks { }
successful = true
for _ , resource := range sc . compareResult . managedResources {
if ! sc . containsResource ( resource ) {
log . WithFields ( log . Fields { "group" : resource . Group , "kind" : resource . Kind , "name" : resource . Name } ) .
Debug ( "skipping" )
2018-12-03 18:27:43 +00:00
continue
}
2018-12-19 19:43:21 +00:00
2019-06-05 01:17:41 +00:00
obj := obj ( resource . Target , resource . Live )
2018-11-29 23:34:46 +00:00
2019-06-05 01:17:41 +00:00
// this creates garbage tasks
if hook . IsHook ( obj ) {
log . WithFields ( log . Fields { "group" : obj . GroupVersionKind ( ) . Group , "kind" : obj . GetKind ( ) , "namespace" : obj . GetNamespace ( ) , "name" : obj . GetName ( ) } ) .
Debug ( "skipping hook" )
continue
}
for _ , phase := range syncPhases ( obj ) {
resourceTasks = append ( resourceTasks , & syncTask { phase : phase , targetObj : resource . Target , liveObj : resource . Live } )
}
}
sc . log . WithFields ( log . Fields { "resourceTasks" : resourceTasks } ) . Debug ( "tasks from managed resources" )
hookTasks := syncTasks { }
if ! sc . skipHooks ( ) {
for _ , obj := range sc . compareResult . hooks {
for _ , phase := range syncPhases ( obj ) {
// Hook resources names are deterministic, whether they are defined by the user (metadata.name),
// or formulated at the time of the operation (metadata.generateName). If user specifies
// metadata.generateName, then we will generate a formulated metadata.name before submission.
targetObj := obj . DeepCopy ( )
if targetObj . GetName ( ) == "" {
postfix := strings . ToLower ( fmt . Sprintf ( "%s-%s-%d" , sc . syncRes . Revision [ 0 : 7 ] , phase , sc . opState . StartedAt . UTC ( ) . Unix ( ) ) )
generateName := obj . GetGenerateName ( )
targetObj . SetName ( fmt . Sprintf ( "%s%s" , generateName , postfix ) )
2018-11-29 23:34:46 +00:00
}
2019-06-05 01:17:41 +00:00
hookTasks = append ( hookTasks , & syncTask { phase : phase , targetObj : targetObj } )
2018-11-29 23:34:46 +00:00
}
2018-07-07 07:54:06 +00:00
}
}
2018-11-27 10:52:46 +00:00
2019-06-05 01:17:41 +00:00
sc . log . WithFields ( log . Fields { "hookTasks" : hookTasks } ) . Debug ( "tasks from hooks" )
2018-07-07 07:54:06 +00:00
2019-06-05 01:17:41 +00:00
tasks := resourceTasks
tasks = append ( tasks , hookTasks ... )
2018-07-12 02:12:30 +00:00
2019-06-05 01:17:41 +00:00
// enrich target objects with the namespace
for _ , task := range tasks {
if task . targetObj == nil {
continue
2018-12-04 01:39:55 +00:00
}
2019-06-05 01:17:41 +00:00
if task . targetObj . GetNamespace ( ) == "" {
// If target object's namespace is empty, we set namespace in the object. We do
// this even though it might be a cluster-scoped resource. This prevents any
// possibility of the resource from unintentionally becoming created in the
// namespace during the `kubectl apply`
task . targetObj = task . targetObj . DeepCopy ( )
task . targetObj . SetNamespace ( sc . namespace )
2018-07-12 02:12:30 +00:00
}
}
2019-06-05 01:17:41 +00:00
// enrich task with live obj
for _ , task := range tasks {
if task . targetObj == nil || task . liveObj != nil {
continue
}
task . liveObj = sc . liveObj ( task . targetObj )
}
// enrich tasks with the result
for _ , task := range tasks {
_ , result := sc . syncRes . Resources . Find ( task . group ( ) , task . kind ( ) , task . namespace ( ) , task . name ( ) , task . phase )
if result != nil {
task . syncStatus = result . Status
task . operationState = result . HookPhase
task . message = result . Message
}
}
// check permissions
for _ , task := range tasks {
serverRes , err := kube . ServerResourceForGroupVersionKind ( sc . disco , task . groupVersionKind ( ) )
if err != nil {
// Special case for custom resources: if CRD is not yet known by the K8s API server,
// skip verification during `kubectl apply --dry-run` since we expect the CRD
// to be created during app synchronization.
if apierr . IsNotFound ( err ) && sc . hasCRDOfGroupKind ( task . group ( ) , task . kind ( ) ) {
sc . log . WithFields ( log . Fields { "task" : task } ) . Debug ( "skip dry-run for custom resource" )
task . skipDryRun = true
} else {
sc . setResourceResult ( task , v1alpha1 . ResultCodeSyncFailed , "" , err . Error ( ) )
successful = false
}
} else {
if ! sc . proj . IsResourcePermitted ( metav1 . GroupKind { Group : task . group ( ) , Kind : task . kind ( ) } , serverRes . Namespaced ) {
sc . setResourceResult ( task , v1alpha1 . ResultCodeSyncFailed , "" , fmt . Sprintf ( "Resource %s:%s is not permitted in project %s." , task . group ( ) , task . kind ( ) , sc . proj . Name ) )
successful = false
}
if serverRes . Namespaced && ! sc . proj . IsDestinationPermitted ( v1alpha1 . ApplicationDestination { Namespace : task . namespace ( ) , Server : sc . server } ) {
sc . setResourceResult ( task , v1alpha1 . ResultCodeSyncFailed , "" , fmt . Sprintf ( "namespace %v is not permitted in project '%s'" , task . namespace ( ) , sc . proj . Name ) )
successful = false
}
}
}
sort . Sort ( tasks )
return tasks , successful
2018-07-12 02:12:30 +00:00
}
2019-06-05 01:17:41 +00:00
func obj ( a , b * unstructured . Unstructured ) * unstructured . Unstructured {
if a != nil {
return a
} else {
return b
}
}
func ( sc * syncContext ) liveObj ( obj * unstructured . Unstructured ) * unstructured . Unstructured {
for _ , resource := range sc . compareResult . managedResources {
if resource . Group == obj . GroupVersionKind ( ) . Group &&
resource . Kind == obj . GetKind ( ) &&
2019-06-21 04:26:19 +00:00
// cluster scoped objects will not have a namespace, even if the user has defined it
( resource . Namespace == "" || resource . Namespace == obj . GetNamespace ( ) ) &&
2019-06-05 01:17:41 +00:00
resource . Name == obj . GetName ( ) {
return resource . Live
2018-07-12 02:12:30 +00:00
}
}
2019-06-05 01:17:41 +00:00
return nil
2018-07-12 02:12:30 +00:00
}
2019-06-05 01:17:41 +00:00
func ( sc * syncContext ) setOperationPhase ( phase v1alpha1 . OperationPhase , message string ) {
2018-07-12 02:12:30 +00:00
if sc . opState . Phase != phase || sc . opState . Message != message {
sc . log . Infof ( "Updating operation state. phase: %s -> %s, message: '%s' -> '%s'" , sc . opState . Phase , phase , sc . opState . Message , message )
}
2018-07-07 07:54:06 +00:00
sc . opState . Phase = phase
sc . opState . Message = message
}
2019-07-24 22:00:40 +00:00
// ensureCRDReady waits until specified CRD is ready (established condition is true). Method is best effort - it does not fail even if CRD is not ready without timeout.
func ( sc * syncContext ) ensureCRDReady ( name string ) {
_ = wait . PollImmediate ( time . Duration ( 100 ) * time . Millisecond , crdReadinessTimeout , func ( ) ( bool , error ) {
crd , err := sc . extensionsclientset . ApiextensionsV1beta1 ( ) . CustomResourceDefinitions ( ) . Get ( name , metav1 . GetOptions { } )
if err != nil {
return false , err
}
for _ , condition := range crd . Status . Conditions {
if condition . Type == v1beta1 . Established {
return condition . Status == v1beta1 . ConditionTrue , nil
}
}
return false , nil
} )
}
2018-07-07 07:54:06 +00:00
// applyObject performs a `kubectl apply` of a single resource
2019-06-05 01:17:41 +00:00
func ( sc * syncContext ) applyObject ( targetObj * unstructured . Unstructured , dryRun bool , force bool ) ( v1alpha1 . ResultCode , string ) {
2019-06-18 21:07:26 +00:00
validate := ! resource . HasAnnotationOption ( targetObj , common . AnnotationSyncOptions , "Validate=false" )
message , err := sc . kubectl . ApplyResource ( sc . config , targetObj , targetObj . GetNamespace ( ) , dryRun , force , validate )
2018-07-07 07:54:06 +00:00
if err != nil {
2019-06-05 01:17:41 +00:00
return v1alpha1 . ResultCodeSyncFailed , err . Error ( )
2018-07-07 07:54:06 +00:00
}
2019-07-24 22:00:40 +00:00
if kube . IsCRD ( targetObj ) && ! dryRun {
sc . ensureCRDReady ( targetObj . GetName ( ) )
}
2019-06-05 01:17:41 +00:00
return v1alpha1 . ResultCodeSynced , message
2018-07-07 07:54:06 +00:00
}
// pruneObject deletes the object if both prune is true and dryRun is false. Otherwise appropriate message
2019-06-05 01:17:41 +00:00
func ( sc * syncContext ) pruneObject ( liveObj * unstructured . Unstructured , prune , dryRun bool ) ( v1alpha1 . ResultCode , string ) {
2019-06-07 15:46:11 +00:00
if ! prune {
return v1alpha1 . ResultCodePruneSkipped , "ignored (requires pruning)"
} else if resource . HasAnnotationOption ( liveObj , common . AnnotationSyncOptions , "Prune=false" ) {
return v1alpha1 . ResultCodePruneSkipped , "ignored (no prune)"
} else {
2018-07-07 07:54:06 +00:00
if dryRun {
2019-06-05 01:17:41 +00:00
return v1alpha1 . ResultCodePruned , "pruned (dry run)"
2018-07-07 07:54:06 +00:00
} else {
2019-02-13 18:11:01 +00:00
// Skip deletion if object is already marked for deletion, so we don't cause a resource update hotloop
deletionTimestamp := liveObj . GetDeletionTimestamp ( )
if deletionTimestamp == nil || deletionTimestamp . IsZero ( ) {
err := sc . kubectl . DeleteResource ( sc . config , liveObj . GroupVersionKind ( ) , liveObj . GetName ( ) , liveObj . GetNamespace ( ) , false )
if err != nil {
2019-06-05 01:17:41 +00:00
return v1alpha1 . ResultCodeSyncFailed , err . Error ( )
2019-02-13 18:11:01 +00:00
}
2018-07-07 07:54:06 +00:00
}
2019-06-05 01:17:41 +00:00
return v1alpha1 . ResultCodePruned , "pruned"
2018-07-07 07:54:06 +00:00
}
}
}
2019-06-05 01:17:41 +00:00
func ( sc * syncContext ) hasCRDOfGroupKind ( group string , kind string ) bool {
for _ , res := range sc . compareResult . managedResources {
2018-11-29 23:34:46 +00:00
if res . Target != nil && kube . IsCRD ( res . Target ) {
crdGroup , ok , err := unstructured . NestedString ( res . Target . Object , "spec" , "group" )
2018-09-20 16:48:54 +00:00
if err != nil || ! ok {
continue
}
2018-11-29 23:34:46 +00:00
crdKind , ok , err := unstructured . NestedString ( res . Target . Object , "spec" , "names" , "kind" )
2018-09-20 16:48:54 +00:00
if err != nil || ! ok {
continue
}
if group == crdGroup && crdKind == kind {
return true
}
}
}
return false
}
2019-06-05 01:17:41 +00:00
// terminate looks for any running jobs/workflow hooks and deletes the resource
func ( sc * syncContext ) terminate ( ) {
terminateSuccessful := true
sc . log . Debug ( "terminating" )
tasks , _ := sc . getSyncTasks ( )
for _ , task := range tasks {
if ! task . isHook ( ) || ! task . completed ( ) {
continue
}
if isRunnable ( task . groupVersionKind ( ) ) {
err := sc . deleteResource ( task )
if err != nil {
sc . setResourceResult ( task , "" , v1alpha1 . OperationFailed , fmt . Sprintf ( "Failed to delete: %v" , err ) )
terminateSuccessful = false
} else {
sc . setResourceResult ( task , "" , v1alpha1 . OperationSucceeded , fmt . Sprintf ( "Deleted" ) )
}
}
}
if terminateSuccessful {
sc . setOperationPhase ( v1alpha1 . OperationFailed , "Operation terminated" )
} else {
sc . setOperationPhase ( v1alpha1 . OperationError , "Operation termination had errors" )
}
}
func ( sc * syncContext ) deleteResource ( task * syncTask ) error {
sc . log . WithFields ( log . Fields { "task" : task } ) . Debug ( "deleting task" )
apiResource , err := kube . ServerResourceForGroupVersionKind ( sc . disco , task . groupVersionKind ( ) )
if err != nil {
return err
}
resource := kube . ToGroupVersionResource ( task . groupVersionKind ( ) . GroupVersion ( ) . String ( ) , apiResource )
resIf := kube . ToResourceInterface ( sc . dynamicIf , apiResource , resource , task . namespace ( ) )
propagationPolicy := metav1 . DeletePropagationForeground
return resIf . Delete ( task . name ( ) , & metav1 . DeleteOptions { PropagationPolicy : & propagationPolicy } )
}
var operationPhases = map [ v1alpha1 . ResultCode ] v1alpha1 . OperationPhase {
v1alpha1 . ResultCodeSynced : v1alpha1 . OperationRunning ,
v1alpha1 . ResultCodeSyncFailed : v1alpha1 . OperationFailed ,
v1alpha1 . ResultCodePruned : v1alpha1 . OperationSucceeded ,
v1alpha1 . ResultCodePruneSkipped : v1alpha1 . OperationSucceeded ,
}
func ( sc * syncContext ) runTasks ( tasks syncTasks , dryRun bool ) bool {
dryRun = dryRun || sc . syncOp . DryRun
sc . log . WithFields ( log . Fields { "numTasks" : len ( tasks ) , "dryRun" : dryRun } ) . Debug ( "running tasks" )
successful := true
var createTasks syncTasks
var pruneTasks syncTasks
for _ , task := range tasks {
if task . isPrune ( ) {
pruneTasks = append ( pruneTasks , task )
2018-09-10 17:14:14 +00:00
} else {
2019-06-05 01:17:41 +00:00
createTasks = append ( createTasks , task )
2018-09-10 17:14:14 +00:00
}
}
2018-07-07 07:54:06 +00:00
var wg sync . WaitGroup
2018-09-10 17:14:14 +00:00
for _ , task := range pruneTasks {
2018-07-13 17:03:56 +00:00
wg . Add ( 1 )
2019-06-05 01:17:41 +00:00
go func ( t * syncTask ) {
2018-07-07 07:54:06 +00:00
defer wg . Done ( )
2019-06-05 01:17:41 +00:00
sc . log . WithFields ( log . Fields { "dryRun" : dryRun , "task" : t } ) . Debug ( "pruning" )
result , message := sc . pruneObject ( t . liveObj , sc . syncOp . Prune , dryRun )
if result == v1alpha1 . ResultCodeSyncFailed {
successful = false
2018-07-07 07:54:06 +00:00
}
2019-06-05 01:17:41 +00:00
if ! dryRun || result == v1alpha1 . ResultCodeSyncFailed {
sc . setResourceResult ( t , result , operationPhases [ result ] , message )
2018-09-10 17:14:14 +00:00
}
} ( task )
}
2018-09-20 16:48:54 +00:00
wg . Wait ( )
2019-06-05 01:17:41 +00:00
processCreateTasks := func ( tasks syncTasks ) {
2018-09-20 16:48:54 +00:00
var createWg sync . WaitGroup
2019-06-05 01:17:41 +00:00
for _ , task := range tasks {
if dryRun && task . skipDryRun {
2018-11-29 23:34:46 +00:00
continue
}
2018-09-20 16:48:54 +00:00
createWg . Add ( 1 )
2019-06-05 01:17:41 +00:00
go func ( t * syncTask ) {
2018-09-20 16:48:54 +00:00
defer createWg . Done ( )
2019-06-05 01:17:41 +00:00
sc . log . WithFields ( log . Fields { "dryRun" : dryRun , "task" : t } ) . Debug ( "applying" )
result , message := sc . applyObject ( t . targetObj , dryRun , sc . syncOp . SyncStrategy . Force ( ) )
if result == v1alpha1 . ResultCodeSyncFailed {
successful = false
2018-09-20 16:48:54 +00:00
}
2019-06-05 01:17:41 +00:00
if ! dryRun || result == v1alpha1 . ResultCodeSyncFailed {
sc . setResourceResult ( t , result , operationPhases [ result ] , message )
2018-09-20 16:48:54 +00:00
}
2019-06-05 01:17:41 +00:00
} ( task )
2018-09-20 16:48:54 +00:00
}
createWg . Wait ( )
}
2019-06-05 01:17:41 +00:00
var tasksGroup syncTasks
2018-09-20 16:48:54 +00:00
for _ , task := range createTasks {
//Only wait if the type of the next task is different than the previous type
2019-06-05 01:17:41 +00:00
if len ( tasksGroup ) > 0 && tasksGroup [ 0 ] . targetObj . GetKind ( ) != task . kind ( ) {
2018-11-29 23:34:46 +00:00
processCreateTasks ( tasksGroup )
2019-06-05 01:17:41 +00:00
tasksGroup = syncTasks { task }
2018-09-20 16:48:54 +00:00
} else {
tasksGroup = append ( tasksGroup , task )
}
}
if len ( tasksGroup ) > 0 {
2018-11-29 23:34:46 +00:00
processCreateTasks ( tasksGroup )
2018-07-07 07:54:06 +00:00
}
2019-06-05 01:17:41 +00:00
return successful
2018-09-10 17:14:14 +00:00
}
2019-06-05 01:17:41 +00:00
// setResourceResult sets a resource details in the SyncResult.Resources list
func ( sc * syncContext ) setResourceResult ( task * syncTask , syncStatus v1alpha1 . ResultCode , operationState v1alpha1 . OperationPhase , message string ) {
2018-09-10 17:14:14 +00:00
2019-06-05 01:17:41 +00:00
task . syncStatus = syncStatus
task . operationState = operationState
// we always want to keep the latest message
if message != "" {
task . message = message
2018-09-10 17:14:14 +00:00
}
2019-01-18 15:32:50 +00:00
2019-06-05 01:17:41 +00:00
sc . lock . Lock ( )
defer sc . lock . Unlock ( )
i , existing := sc . syncRes . Resources . Find ( task . group ( ) , task . kind ( ) , task . namespace ( ) , task . name ( ) , task . phase )
res := v1alpha1 . ResourceResult {
Group : task . group ( ) ,
Version : task . version ( ) ,
Kind : task . kind ( ) ,
Namespace : task . namespace ( ) ,
Name : task . name ( ) ,
Status : task . syncStatus ,
Message : task . message ,
HookType : task . hookType ( ) ,
HookPhase : task . operationState ,
SyncPhase : task . phase ,
}
logCtx := sc . log . WithFields ( log . Fields { "namespace" : task . namespace ( ) , "kind" : task . kind ( ) , "name" : task . name ( ) , "phase" : task . phase } )
if existing != nil {
// update existing value
if res . Status != existing . Status || res . HookPhase != existing . HookPhase || res . Message != existing . Message {
logCtx . Infof ( "updating resource result, status: '%s' -> '%s', phase '%s' -> '%s', message '%s' -> '%s'" ,
existing . Status , res . Status ,
existing . HookPhase , res . HookPhase ,
existing . Message , res . Message )
}
sc . syncRes . Resources [ i ] = & res
} else {
logCtx . Infof ( "adding resource result, status: '%s', phase: '%s', message: '%s'" , res . Status , res . HookPhase , res . Message )
sc . syncRes . Resources = append ( sc . syncRes . Resources , & res )
2019-01-18 15:32:50 +00:00
}
2018-09-10 17:14:14 +00:00
}