argo-cd/cmd/argocd-application-controller/commands/argocd_application_controller.go
jannfis 068048cb80
feat: Applications in any namespace (#9755)
* feat: Applications in any namespace

Signed-off-by: jannfis <jann@mistrust.net>

* Fix typo in CI

Signed-off-by: jannfis <jann@mistrust.net>

* Create argocd-e2e-external namespace

Signed-off-by: jannfis <jann@mistrust.net>

* Update from codegen

Signed-off-by: jannfis <jann@mistrust.net>

* Remove debug code

Signed-off-by: jannfis <jann@mistrust.net>

* Update help text for -N option to app create

Signed-off-by: jannfis <jann@mistrust.net>

* Wrap error when retrieving AppProject from cache

Signed-off-by: jannfis <jann@mistrust.net>

* Check for controller namespace first before matching on additional ns

Signed-off-by: jannfis <jann@mistrust.net>

* Improve TestAppProjectIsSourceNamespacePermitted unit test

Signed-off-by: jannfis <jann@mistrust.net>

* Get rid of some debug leftovers

Signed-off-by: jannfis <jann@mistrust.net>

* Better error wrapping; return IsNotFound as-is

Signed-off-by: jannfis <jann@mistrust.net>

* Updates from codegen

Signed-off-by: jannfis <jann@mistrust.net>

* We don't need AppShortName() anymore

Signed-off-by: jannfis <jann@mistrust.net>

* Update end-to-end tests to use annotation methods

Signed-off-by: jannfis <jann@mistrust.net>

* Add e2e tests to test for app creation in not permitted ns

Signed-off-by: jannfis <jann@mistrust.net>

* Remove deprecated code

Signed-off-by: jannfis <jann@mistrust.net>

* Remove dead code

Signed-off-by: jannfis <jann@mistrust.net>

* Add RBACName() method to application type

Signed-off-by: jannfis <jann@mistrust.net>

* Update from codegen

Signed-off-by: jannfis <jann@mistrust.net>

* Fix e2e test

Signed-off-by: jannfis <jann@mistrust.net>

* Update codegen

Signed-off-by: jannfis <jann@mistrust.net>

* Move RBAC name generation to an application receiver

Signed-off-by: jannfis <jann@mistrust.net>

* Fix sync window status in UI

Signed-off-by: jannfis <jann@mistrust.net>

* Fix pod logs viewer

Signed-off-by: jannfis <jann@mistrust.net>

* Fix application events in UI

Signed-off-by: jannfis <jann@mistrust.net>

* Fix application search in UI

Signed-off-by: jannfis <jann@mistrust.net>

* Fix yarn lint

Signed-off-by: jannfis <jann@mistrust.net>

* Only set up cluster-wide application informer when additional namespaces are specified

Signed-off-by: jannfis <jann@mistrust.net>

* Adapt e2e test to a changed error message

Signed-off-by: jannfis <jann@mistrust.net>

* Application namespace should be taken into account for create

Signed-off-by: jannfis <jann@mistrust.net>

* Use non-qualified application name as Helm release name

Signed-off-by: jannfis <jann@mistrust.net>

* Support --app-namespace in e2e tests

Signed-off-by: jannfis <jann@mistrust.net>

* Enable more e2e tests

Signed-off-by: jannfis <jann@mistrust.net>

* Increase e2e timeout for newly added tests

Signed-off-by: jannfis <jann@mistrust.net>
2022-08-10 11:39:10 +02:00

217 lines
10 KiB
Go

package commands
import (
"context"
"fmt"
"math"
"time"
"github.com/argoproj/pkg/stats"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
cmdutil "github.com/argoproj/argo-cd/v2/cmd/util"
"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/controller"
"github.com/argoproj/argo-cd/v2/controller/sharding"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
cacheutil "github.com/argoproj/argo-cd/v2/util/cache"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/cli"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/errors"
kubeutil "github.com/argoproj/argo-cd/v2/util/kube"
"github.com/argoproj/argo-cd/v2/util/settings"
"github.com/argoproj/argo-cd/v2/util/tls"
"github.com/argoproj/argo-cd/v2/util/trace"
)
const (
// CLIName is the name of the CLI
cliName = "argocd-application-controller"
// Default time in seconds for application resync period
defaultAppResyncPeriod = 180
// Default time in seconds for application hard resync period
defaultAppHardResyncPeriod = 0
)
func NewCommand() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
appResyncPeriod int64
appHardResyncPeriod int64
repoServerAddress string
repoServerTimeoutSeconds int
selfHealTimeoutSeconds int
statusProcessors int
operationProcessors int
glogLevel int
metricsPort int
metricsCacheExpiration time.Duration
metricsAplicationLabels []string
kubectlParallelismLimit int64
cacheSrc func() (*appstatecache.Cache, error)
redisClient *redis.Client
repoServerPlaintext bool
repoServerStrictTLS bool
otlpAddress string
applicationNamespaces []string
)
var command = cobra.Command{
Use: cliName,
Short: "Run ArgoCD Application Controller",
Long: "ArgoCD application controller is a Kubernetes controller that continuously monitors running applications and compares the current, live state against the desired target state (as specified in the repo). This command runs Application Controller in the foreground. It can be configured by following options.",
DisableAutoGenTag: true,
RunE: func(c *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(c.Context())
defer cancel()
vers := common.GetVersion()
namespace, _, err := clientConfig.Namespace()
errors.CheckError(err)
vers.LogStartupInfo(
"ArgoCD Application Controller",
map[string]any{
"namespace": namespace,
},
)
cli.SetLogFormat(cmdutil.LogFormat)
cli.SetLogLevel(cmdutil.LogLevel)
cli.SetGLogLevel(glogLevel)
config, err := clientConfig.ClientConfig()
errors.CheckError(err)
errors.CheckError(v1alpha1.SetK8SConfigDefaults(config))
config.UserAgent = fmt.Sprintf("argocd-application-controller/%s (%s)", vers.Version, vers.Platform)
kubeClient := kubernetes.NewForConfigOrDie(config)
appClient := appclientset.NewForConfigOrDie(config)
hardResyncDuration := time.Duration(appHardResyncPeriod) * time.Second
var resyncDuration time.Duration
if appResyncPeriod == 0 {
// Re-sync should be disabled if period is 0. Set duration to a very long duration
resyncDuration = time.Hour * 24 * 365 * 100
} else {
resyncDuration = time.Duration(appResyncPeriod) * time.Second
}
tlsConfig := apiclient.TLSConfiguration{
DisableTLS: repoServerPlaintext,
StrictValidation: repoServerStrictTLS,
}
// Load CA information to use for validating connections to the
// repository server, if strict TLS validation was requested.
if !repoServerPlaintext && repoServerStrictTLS {
pool, err := tls.LoadX509CertPool(
fmt.Sprintf("%s/controller/tls/tls.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)),
fmt.Sprintf("%s/controller/tls/ca.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)),
)
if err != nil {
log.Fatalf("%v", err)
}
tlsConfig.Certificates = pool
}
repoClientset := apiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig)
cache, err := cacheSrc()
errors.CheckError(err)
cache.Cache.SetClient(cacheutil.NewTwoLevelClient(cache.Cache.GetClient(), 10*time.Minute))
var appController *controller.ApplicationController
settingsMgr := settings.NewSettingsManager(ctx, kubeClient, namespace, settings.WithRepoOrClusterChangedHandler(func() {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterFilter := getClusterFilter()
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
kubeClient,
appClient,
repoClientset,
cache,
kubectl,
resyncDuration,
hardResyncDuration,
time.Duration(selfHealTimeoutSeconds)*time.Second,
metricsPort,
metricsCacheExpiration,
metricsAplicationLabels,
kubectlParallelismLimit,
clusterFilter,
applicationNamespaces)
errors.CheckError(err)
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer())
stats.RegisterStackDumper()
stats.StartStatsTicker(10 * time.Minute)
stats.RegisterHeapDumper("memprofile")
if otlpAddress != "" {
closeTracer, err := trace.InitTracer(ctx, "argocd-controller", otlpAddress)
if err != nil {
log.Fatalf("failed to initialize tracing: %v", err)
}
defer closeTracer()
}
go appController.Run(ctx, statusProcessors, operationProcessors)
// Wait forever
select {}
},
}
clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.Flags().Int64Var(&appResyncPeriod, "app-resync", int64(env.ParseDurationFromEnv("ARGOCD_RECONCILIATION_TIMEOUT", defaultAppResyncPeriod*time.Second, 0, math.MaxInt64).Seconds()), "Time period in seconds for application resync.")
command.Flags().Int64Var(&appHardResyncPeriod, "app-hard-resync", int64(env.ParseDurationFromEnv("ARGOCD_HARD_RECONCILIATION_TIMEOUT", defaultAppHardResyncPeriod*time.Second, 0, math.MaxInt64).Seconds()), "Time period in seconds for application hard resync.")
command.Flags().StringVar(&repoServerAddress, "repo-server", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER", common.DefaultRepoServerAddr), "Repo server address.")
command.Flags().IntVar(&repoServerTimeoutSeconds, "repo-server-timeout-seconds", env.ParseNumFromEnv("ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_TIMEOUT_SECONDS", 60, 0, math.MaxInt64), "Repo server RPC call timeout seconds.")
command.Flags().IntVar(&statusProcessors, "status-processors", env.ParseNumFromEnv("ARGOCD_APPLICATION_CONTROLLER_STATUS_PROCESSORS", 20, 0, math.MaxInt32), "Number of application status processors")
command.Flags().IntVar(&operationProcessors, "operation-processors", env.ParseNumFromEnv("ARGOCD_APPLICATION_CONTROLLER_OPERATION_PROCESSORS", 10, 0, math.MaxInt32), "Number of application operation processors")
command.Flags().StringVar(&cmdutil.LogFormat, "logformat", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_LOGFORMAT", "text"), "Set the logging format. One of: text|json")
command.Flags().StringVar(&cmdutil.LogLevel, "loglevel", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_LOGLEVEL", "info"), "Set the logging level. One of: debug|info|warn|error")
command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level")
command.Flags().IntVar(&metricsPort, "metrics-port", common.DefaultPortArgoCDMetrics, "Start metrics server on given port")
command.Flags().DurationVar(&metricsCacheExpiration, "metrics-cache-expiration", env.ParseDurationFromEnv("ARGOCD_APPLICATION_CONTROLLER_METRICS_CACHE_EXPIRATION", 0*time.Second, 0, math.MaxInt64), "Prometheus metrics cache expiration (disabled by default. e.g. 24h0m0s)")
command.Flags().IntVar(&selfHealTimeoutSeconds, "self-heal-timeout-seconds", env.ParseNumFromEnv("ARGOCD_APPLICATION_CONTROLLER_SELF_HEAL_TIMEOUT_SECONDS", 5, 0, math.MaxInt32), "Specifies timeout between application self heal attempts")
command.Flags().Int64Var(&kubectlParallelismLimit, "kubectl-parallelism-limit", 20, "Number of allowed concurrent kubectl fork/execs. Any value less the 1 means no limit.")
command.Flags().BoolVar(&repoServerPlaintext, "repo-server-plaintext", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_PLAINTEXT", false), "Disable TLS on connections to repo server")
command.Flags().BoolVar(&repoServerStrictTLS, "repo-server-strict-tls", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_STRICT_TLS", false), "Whether to use strict validation of the TLS cert presented by the repo server")
command.Flags().StringSliceVar(&metricsAplicationLabels, "metrics-application-labels", []string{}, "List of Application labels that will be added to the argocd_application_labels metric")
command.Flags().StringVar(&otlpAddress, "otlp-address", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_ADDRESS", ""), "OpenTelemetry collector address to send traces to")
command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", []string{}, "List of additional namespaces that applications are allowed to be created in")
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command, func(client *redis.Client) {
redisClient = client
})
return &command
}
func getClusterFilter() func(cluster *v1alpha1.Cluster) bool {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
var clusterFilter func(cluster *v1alpha1.Cluster) bool
if replicas > 1 {
if shard < 0 {
var err error
shard, err = sharding.InferShard()
errors.CheckError(err)
}
log.Infof("Processing clusters from shard %d", shard)
clusterFilter = sharding.GetClusterFilter(replicas, shard)
} else {
log.Info("Processing all cluster shards")
}
return clusterFilter
}