mirror of
https://github.com/argoproj/argo-cd
synced 2026-04-21 17:07:16 +00:00
205 lines
6.1 KiB
Go
205 lines
6.1 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"runtime/debug"
|
|
|
|
"github.com/argoproj/argo-cd/common"
|
|
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
|
|
"github.com/argoproj/argo-cd/reposerver"
|
|
"github.com/argoproj/argo-cd/reposerver/repository"
|
|
"github.com/argoproj/argo-cd/util"
|
|
"github.com/argoproj/argo-cd/util/db"
|
|
log "github.com/sirupsen/logrus"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/selection"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
)
|
|
|
|
type SecretController struct {
|
|
kubeClient kubernetes.Interface
|
|
secretQueue workqueue.RateLimitingInterface
|
|
secretInformer cache.SharedIndexInformer
|
|
repoClientset reposerver.Clientset
|
|
namespace string
|
|
}
|
|
|
|
func (ctrl *SecretController) Run(ctx context.Context) {
|
|
go ctrl.secretInformer.Run(ctx.Done())
|
|
if !cache.WaitForCacheSync(ctx.Done(), ctrl.secretInformer.HasSynced) {
|
|
log.Error("Timed out waiting for caches to sync")
|
|
return
|
|
}
|
|
go wait.Until(func() {
|
|
for ctrl.processSecret() {
|
|
}
|
|
}, time.Second, ctx.Done())
|
|
}
|
|
|
|
func (ctrl *SecretController) processSecret() (processNext bool) {
|
|
secretKey, shutdown := ctrl.secretQueue.Get()
|
|
if shutdown {
|
|
processNext = false
|
|
return
|
|
} else {
|
|
processNext = true
|
|
}
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
|
|
}
|
|
ctrl.secretQueue.Done(secretKey)
|
|
}()
|
|
obj, exists, err := ctrl.secretInformer.GetIndexer().GetByKey(secretKey.(string))
|
|
if err != nil {
|
|
log.Errorf("Failed to get secret '%s' from informer index: %+v", secretKey, err)
|
|
return
|
|
}
|
|
if !exists {
|
|
// This happens after secret was deleted, but the work queue still had an entry for it.
|
|
return
|
|
}
|
|
secret, ok := obj.(*corev1.Secret)
|
|
if !ok {
|
|
log.Warnf("Key '%s' in index is not an secret", secretKey)
|
|
return
|
|
}
|
|
|
|
if secret.Labels[common.LabelKeySecretType] == common.SecretTypeCluster {
|
|
cluster := db.SecretToCluster(secret)
|
|
ctrl.updateState(secret, ctrl.getClusterState(cluster))
|
|
} else if secret.Labels[common.LabelKeySecretType] == common.SecretTypeRepository {
|
|
repo := db.SecretToRepo(secret)
|
|
ctrl.updateState(secret, ctrl.getRepoConnectionState(repo))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (ctrl *SecretController) getRepoConnectionState(repo *v1alpha1.Repository) v1alpha1.ConnectionState {
|
|
state := v1alpha1.ConnectionState{
|
|
ModifiedAt: repo.ConnectionState.ModifiedAt,
|
|
Status: v1alpha1.ConnectionStatusUnknown,
|
|
}
|
|
closer, client, err := ctrl.repoClientset.NewRepositoryClient()
|
|
if err != nil {
|
|
log.Errorf("Unable to create repository client: %v", err)
|
|
return state
|
|
}
|
|
|
|
defer util.Close(closer)
|
|
|
|
_, err = client.ListDir(context.Background(), &repository.ListDirRequest{Repo: repo, Path: ".gitignore"})
|
|
if err == nil {
|
|
state.Status = v1alpha1.ConnectionStatusSuccessful
|
|
} else {
|
|
state.Status = v1alpha1.ConnectionStatusFailed
|
|
state.Message = err.Error()
|
|
}
|
|
|
|
return state
|
|
}
|
|
|
|
func (ctrl *SecretController) getClusterState(cluster *v1alpha1.Cluster) v1alpha1.ConnectionState {
|
|
state := v1alpha1.ConnectionState{
|
|
ModifiedAt: cluster.ConnectionState.ModifiedAt,
|
|
Status: v1alpha1.ConnectionStatusUnknown,
|
|
}
|
|
kubeClientset, err := kubernetes.NewForConfig(cluster.RESTConfig())
|
|
if err == nil {
|
|
_, err = kubeClientset.Discovery().ServerVersion()
|
|
}
|
|
if err == nil {
|
|
state.Status = v1alpha1.ConnectionStatusSuccessful
|
|
} else {
|
|
state.Status = v1alpha1.ConnectionStatusFailed
|
|
state.Message = err.Error()
|
|
}
|
|
|
|
return state
|
|
}
|
|
|
|
func (ctrl *SecretController) updateState(secret *corev1.Secret, state v1alpha1.ConnectionState) {
|
|
annotationsPatch := make(map[string]string)
|
|
for key, value := range db.AnnotationsFromConnectionState(&state) {
|
|
if secret.Annotations[key] != value {
|
|
annotationsPatch[key] = value
|
|
}
|
|
}
|
|
if len(annotationsPatch) > 0 {
|
|
annotationsPatch[common.AnnotationConnectionModifiedAt] = metav1.Now().Format(time.RFC3339)
|
|
patchData, err := json.Marshal(map[string]interface{}{
|
|
"metadata": map[string]interface{}{
|
|
"annotations": annotationsPatch,
|
|
},
|
|
})
|
|
if err != nil {
|
|
log.Warnf("Unable to prepare secret state annotation patch: %v", err)
|
|
} else {
|
|
_, err := ctrl.kubeClient.CoreV1().Secrets(secret.Namespace).Patch(secret.Name, types.MergePatchType, patchData)
|
|
if err != nil {
|
|
log.Warnf("Unable to patch secret state annotation: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func newSecretInformer(client kubernetes.Interface, resyncPeriod time.Duration, namespace string, secretQueue workqueue.RateLimitingInterface) cache.SharedIndexInformer {
|
|
informerFactory := informers.NewFilteredSharedInformerFactory(
|
|
client,
|
|
resyncPeriod,
|
|
namespace,
|
|
func(options *metav1.ListOptions) {
|
|
var req *labels.Requirement
|
|
req, err := labels.NewRequirement(common.LabelKeySecretType, selection.In, []string{common.SecretTypeCluster, common.SecretTypeRepository})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
options.FieldSelector = fields.Everything().String()
|
|
labelSelector := labels.NewSelector().Add(*req)
|
|
options.LabelSelector = labelSelector.String()
|
|
},
|
|
)
|
|
informer := informerFactory.Core().V1().Secrets().Informer()
|
|
informer.AddEventHandler(
|
|
cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
key, err := cache.MetaNamespaceKeyFunc(obj)
|
|
if err == nil {
|
|
secretQueue.Add(key)
|
|
}
|
|
},
|
|
UpdateFunc: func(old, new interface{}) {
|
|
key, err := cache.MetaNamespaceKeyFunc(new)
|
|
if err == nil {
|
|
secretQueue.Add(key)
|
|
}
|
|
},
|
|
},
|
|
)
|
|
return informer
|
|
}
|
|
|
|
func NewSecretController(kubeClient kubernetes.Interface, repoClientset reposerver.Clientset, resyncPeriod time.Duration, namespace string) *SecretController {
|
|
secretQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
|
return &SecretController{
|
|
kubeClient: kubeClient,
|
|
secretQueue: secretQueue,
|
|
secretInformer: newSecretInformer(kubeClient, resyncPeriod, namespace, secretQueue),
|
|
namespace: namespace,
|
|
repoClientset: repoClientset,
|
|
}
|
|
}
|