Issue #19 - Move Kubernetes manifest generation into separate service (#20)

This commit is contained in:
Alexander Matyushentsev 2018-03-06 22:05:07 -08:00 committed by GitHub
parent cc232a4dc6
commit 405b47ffe6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 681 additions and 357 deletions

View file

@ -23,6 +23,7 @@ spec:
withItems:
- make controller-image
- make server-image
- make repo-server-image
- name: test
template: ci-builder
arguments:

View file

@ -39,7 +39,7 @@ IMAGE_PREFIX=${IMAGE_NAMESPACE}/
endif
.PHONY: all
all: cli server-image controller-image
all: cli server-image controller-image repo-server-image
.PHONY: protogen
protogen:
@ -67,6 +67,15 @@ server-image:
docker build --build-arg BINARY=argocd-server --build-arg MAKE_TARGET=server -t $(IMAGE_PREFIX)argocd-server:$(IMAGE_TAG) -f Dockerfile-argocd .
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)argocd-server:$(IMAGE_TAG) ; fi
.PHONY: repo-server
repo-server:
CGO_ENABLED=0 go build -v -i -ldflags '${LDFLAGS}' -o ${DIST_DIR}/argocd-repo-server ./cmd/argocd-repo-server
.PHONY: repo-server-image
repo-server-image:
docker build --build-arg BINARY=argocd-repo-server --build-arg MAKE_TARGET=repo-server -t $(IMAGE_PREFIX)argocd-repo-server:$(IMAGE_TAG) -f Dockerfile-argocd .
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)argocd-server:$(IMAGE_TAG) ; fi
.PHONY: controller
controller:
CGO_ENABLED=0 go build -v -i -ldflags '${LDFLAGS}' -o ${DIST_DIR}/argocd-application-controller ./cmd/argocd-application-controller

View file

@ -1,99 +0,0 @@
package application
import (
"context"
"fmt"
"os"
"path"
"strings"
"time"
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/server/cluster"
"github.com/argoproj/argo-cd/server/repository"
"github.com/argoproj/argo-cd/util"
"github.com/argoproj/argo-cd/util/git"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Manager is responsible to retrieve application spec and compare it to actual application state.
type Manager struct {
gitClient git.Client
repoService repository.RepositoryServiceServer
clusterService cluster.ClusterServiceServer
statusRefreshTimeout time.Duration
appComparator AppComparator
repoLock *util.KeyLock
}
// NeedRefreshAppStatus answers if application status needs to be refreshed. Returns true if application never been compared, has changed or comparison result has expired.
func (m *Manager) NeedRefreshAppStatus(app *v1alpha1.Application) bool {
return app.Status.ComparisonResult.Status == v1alpha1.ComparisonStatusUnknown ||
!app.Spec.Source.Equals(app.Status.ComparisonResult.ComparedTo) ||
app.Status.ComparisonResult.ComparedAt.Add(m.statusRefreshTimeout).Before(time.Now())
}
// RefreshAppStatus compares application against actual state in target cluster and returns updated status.
func (m *Manager) RefreshAppStatus(app *v1alpha1.Application) *v1alpha1.ApplicationStatus {
status, err := m.tryRefreshAppStatus(app)
if err != nil {
log.Errorf("App %s comparison failed: %+v", app.Name, err)
status = &v1alpha1.ApplicationStatus{
ComparisonResult: v1alpha1.ComparisonResult{
Status: v1alpha1.ComparisonStatusError,
Error: fmt.Sprintf("Failed to get application status for application '%s': %v", app.Name, err),
ComparedTo: app.Spec.Source,
ComparedAt: metav1.Time{Time: time.Now().UTC()},
},
}
}
return status
}
func (m *Manager) tryRefreshAppStatus(app *v1alpha1.Application) (*v1alpha1.ApplicationStatus, error) {
repo, err := m.repoService.Get(context.Background(), &repository.RepoQuery{Repo: app.Spec.Source.RepoURL})
if err != nil {
return nil, err
}
appRepoPath := path.Join(os.TempDir(), strings.Replace(repo.Repo, "/", "_", -1))
m.repoLock.Lock(appRepoPath)
defer m.repoLock.Unlock(appRepoPath)
err = m.gitClient.CloneOrFetch(repo.Repo, repo.Username, repo.Password, appRepoPath)
if err != nil {
return nil, err
}
err = m.gitClient.Checkout(appRepoPath, app.Spec.Source.TargetRevision)
if err != nil {
return nil, err
}
comparisonResult, err := m.appComparator.CompareAppState(appRepoPath, app)
if err != nil {
return nil, err
}
log.Infof("App %s comparison result: prev: %s. current: %s", app.Name, app.Status.ComparisonResult.Status, comparisonResult.Status)
return &v1alpha1.ApplicationStatus{
ComparisonResult: *comparisonResult,
}, nil
}
// NewAppManager creates new instance of app manager.
func NewAppManager(
gitClient git.Client,
repoService repository.RepositoryServiceServer,
clusterService cluster.ClusterServiceServer,
appComparator AppComparator,
statusRefreshTimeout time.Duration,
) *Manager {
return &Manager{
gitClient: gitClient,
repoService: repoService,
clusterService: clusterService,
statusRefreshTimeout: statusRefreshTimeout,
appComparator: appComparator,
repoLock: util.NewKeyLock(),
}
}

View file

@ -1,161 +0,0 @@
package application_test
import (
"context"
"sync"
"testing"
"time"
"github.com/argoproj/argo-cd/application"
appMocks "github.com/argoproj/argo-cd/application/mocks"
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
clusterMocks "github.com/argoproj/argo-cd/server/cluster/mocks"
"github.com/argoproj/argo-cd/server/repository"
repoMocks "github.com/argoproj/argo-cd/server/repository/mocks"
gitMocks "github.com/argoproj/argo-cd/util/git/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type appComparatorStub struct {
compareAppState func(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error)
}
func (stub *appComparatorStub) CompareAppState(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) {
return stub.compareAppState(appRepoPath, app)
}
func TestManager(t *testing.T) {
refreshTimeout := time.Second * 10
appSource := v1alpha1.ApplicationSource{
Environment: "prod/us-west-2",
Path: "apps/elk",
TargetRevision: "master",
RepoURL: "http://my-git-repo.git",
}
gitClientMock := gitMocks.Client{}
appComparatorMock := appMocks.AppComparator{}
repoServiceMock := repoMocks.RepositoryServiceServer{}
clusterServiceMock := clusterMocks.ClusterServiceServer{}
manager := application.NewAppManager(&gitClientMock, &repoServiceMock, &clusterServiceMock, &appComparatorMock, refreshTimeout)
t.Run("NeedRefreshAppStatus", func(t *testing.T) {
t.Run("TestReturnsTrueIfAppWasNotCompared", func(t *testing.T) {
needRefresh := manager.NeedRefreshAppStatus(&v1alpha1.Application{
Spec: v1alpha1.ApplicationSpec{Source: appSource},
Status: v1alpha1.ApplicationStatus{
ComparisonResult: v1alpha1.ComparisonResult{Status: v1alpha1.ComparisonStatusUnknown},
},
})
assert.True(t, needRefresh)
})
t.Run("TestReturnsFalseIfAppWasComparedBeforeRefreshTimeoutExpires", func(t *testing.T) {
needRefresh := manager.NeedRefreshAppStatus(&v1alpha1.Application{
Spec: v1alpha1.ApplicationSpec{Source: appSource},
Status: v1alpha1.ApplicationStatus{
ComparisonResult: v1alpha1.ComparisonResult{Status: v1alpha1.ComparisonStatusSynced, ComparedAt: metav1.Time{Time: time.Now()}, ComparedTo: appSource},
},
})
assert.False(t, needRefresh)
})
t.Run("TestReturnsTrueIfAppWasComparedAfterRefreshTimeoutExpires", func(t *testing.T) {
needRefresh := manager.NeedRefreshAppStatus(&v1alpha1.Application{
Spec: v1alpha1.ApplicationSpec{Source: appSource},
Status: v1alpha1.ApplicationStatus{
ComparisonResult: v1alpha1.ComparisonResult{
Status: v1alpha1.ComparisonStatusSynced,
ComparedAt: metav1.Time{Time: time.Now().Add(-(refreshTimeout + time.Second))},
ComparedTo: appSource,
},
},
})
assert.True(t, needRefresh)
})
t.Run("TestReturnsTrueApplicationSourceHasChanged", func(t *testing.T) {
updatedSource := *appSource.DeepCopy()
updatedSource.TargetRevision = "abc"
needRefresh := manager.NeedRefreshAppStatus(&v1alpha1.Application{
Spec: v1alpha1.ApplicationSpec{Source: appSource},
Status: v1alpha1.ApplicationStatus{
ComparisonResult: v1alpha1.ComparisonResult{Status: v1alpha1.ComparisonStatusSynced, ComparedAt: metav1.Time{Time: time.Now()}, ComparedTo: updatedSource},
},
})
assert.True(t, needRefresh)
})
})
t.Run("RefreshAppStatus", func(t *testing.T) {
repo := v1alpha1.Repository{
Repo: "https://testRepo/repo.git",
Username: "user",
Password: "test",
}
app := v1alpha1.Application{
Spec: v1alpha1.ApplicationSpec{Source: appSource},
Status: v1alpha1.ApplicationStatus{},
}
repoServiceMock.On("Get", context.Background(), &repository.RepoQuery{
Repo: appSource.RepoURL,
}).Return(&repo, nil)
var repoPath string
gitClientMock.On("CloneOrFetch", repo.Repo, repo.Username, repo.Password, mock.MatchedBy(func(receivedRepoPath string) bool {
repoPath = receivedRepoPath
return true
})).Return(nil)
gitClientMock.On("Checkout", mock.MatchedBy(func(receivedRepoPath string) bool {
return repoPath == receivedRepoPath
}), appSource.TargetRevision).Return(nil)
t.Run("TestCheckoutRepoAndCompareStart", func(t *testing.T) {
appComparatorMock.On("CompareAppState", mock.MatchedBy(func(receivedRepoPath string) bool {
return repoPath == receivedRepoPath
}), &app).Return(&v1alpha1.ComparisonResult{
Status: v1alpha1.ComparisonStatusSynced,
}, nil)
updatedAppStatus := manager.RefreshAppStatus(&app)
assert.Equal(t, updatedAppStatus.ComparisonResult.Status, v1alpha1.ComparisonStatusSynced)
})
t.Run("TestDoesNotProcessSameRepoSimultaneously", func(t *testing.T) {
cnt := 3
processingCnt := 0
completeProcessing := make(chan bool)
comparatorStub := appComparatorStub{
compareAppState: func(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) {
processingCnt++
assert.Equal(t, 1, processingCnt)
<-completeProcessing
processingCnt--
return &v1alpha1.ComparisonResult{
Status: v1alpha1.ComparisonStatusSynced,
}, nil
},
}
manager := application.NewAppManager(&gitClientMock, &repoServiceMock, &clusterServiceMock, &comparatorStub, refreshTimeout)
var wg sync.WaitGroup
wg.Add(cnt)
for i := 0; i < cnt; i++ {
go func() {
defer wg.Done()
manager.RefreshAppStatus(&app)
}()
}
for i := 1; i <= cnt; i++ {
time.Sleep(10 * time.Millisecond)
completeProcessing <- true
}
wg.Wait()
})
})
}

View file

@ -7,14 +7,12 @@ import (
"time"
argocd "github.com/argoproj/argo-cd"
"github.com/argoproj/argo-cd/application"
"github.com/argoproj/argo-cd/controller"
"github.com/argoproj/argo-cd/errors"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
"github.com/argoproj/argo-cd/server/cluster"
"github.com/argoproj/argo-cd/server/repository"
apirepository "github.com/argoproj/argo-cd/server/repository"
"github.com/argoproj/argo-cd/util/cli"
"github.com/argoproj/argo-cd/util/git"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
@ -23,20 +21,22 @@ import (
// load the gcp plugin (required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
// load the oidc plugin (required to authenticate with OpenID Connect).
"github.com/argoproj/argo-cd/reposerver"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
)
const (
// CLIName is the name of the CLI
cliName = "application-controller"
cliName = "argocd-application-controller"
// Default time in seconds for application resync period
defaultAppResyncPeriod = 600
)
func newCommand() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
appResyncPeriod int64
clientConfig clientcmd.ClientConfig
appResyncPeriod int64
repoServerAddress string
)
var command = cobra.Command{
Use: cliName,
@ -45,10 +45,6 @@ func newCommand() *cobra.Command {
config, err := clientConfig.ClientConfig()
errors.CheckError(err)
nativeGitClient, err := git.NewNativeGitClient()
if err != nil {
return err
}
kubeClient := kubernetes.NewForConfigOrDie(config)
appClient := appclientset.NewForConfigOrDie(config)
@ -60,16 +56,19 @@ func newCommand() *cobra.Command {
Namespace: namespace,
InstanceID: "",
}
clusterService := cluster.NewServer(namespace, kubeClient, appClient)
resyncDuration := time.Duration(appResyncPeriod) * time.Second
appManager := application.NewAppManager(
nativeGitClient,
repository.NewServer(namespace, kubeClient, appClient),
clusterService,
application.NewKsonnetAppComparator(clusterService),
apiRepoServer := apirepository.NewServer(namespace, kubeClient, appClient)
clusterService := cluster.NewServer(namespace, kubeClient, appClient)
appComparator := controller.NewKsonnetAppComparator(clusterService)
appController := controller.NewApplicationController(
kubeClient,
appClient,
reposerver.NewRepositoryServerClientset(repoServerAddress),
apiRepoServer,
appComparator,
resyncDuration,
)
appController := controller.NewApplicationController(kubeClient, appClient, appManager, resyncDuration, &controllerConfig)
&controllerConfig)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -83,6 +82,7 @@ func newCommand() *cobra.Command {
clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.Flags().Int64Var(&appResyncPeriod, "app-resync", defaultAppResyncPeriod, "Time period in seconds for application resync.")
command.Flags().StringVar(&repoServerAddress, "reposerveraddr", "localhost:8081", "Repo server address.")
return &command
}

View file

@ -0,0 +1,75 @@
package main
import (
"fmt"
"net"
"os"
"github.com/argoproj/argo-cd/errors"
"github.com/argoproj/argo-cd/util/cli"
"github.com/argoproj/argo-cd/util/git"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
// load the gcp plugin (required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
// load the oidc plugin (required to authenticate with OpenID Connect).
"github.com/argoproj/argo-cd"
"github.com/argoproj/argo-cd/reposerver"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
)
const (
// CLIName is the name of the CLI
cliName = "argocd-manifest-server"
port = 8081
)
func newCommand() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
logLevel string
)
var command = cobra.Command{
Use: cliName,
Short: "Run argocd-repo-server",
RunE: func(c *cobra.Command, args []string) error {
level, err := log.ParseLevel(logLevel)
errors.CheckError(err)
log.SetLevel(level)
config, err := clientConfig.ClientConfig()
errors.CheckError(err)
namespace, _, err := clientConfig.Namespace()
errors.CheckError(err)
kubeClientset := kubernetes.NewForConfigOrDie(config)
server := reposerver.NewServer(kubeClientset, namespace)
nativeGitClient, err := git.NewNativeGitClient()
errors.CheckError(err)
grpc := server.CreateGRPC(nativeGitClient)
listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
errors.CheckError(err)
log.Infof("argocd-repo-server %s serving on port %d (namespace: %s)", argocd.GetVersion(), port, namespace)
err = grpc.Serve(listener)
errors.CheckError(err)
return nil
},
}
clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.Flags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
return &command
}
func main() {
if err := newCommand().Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}

View file

@ -6,8 +6,8 @@ import (
"os"
"text/tabwriter"
argocdclient "github.com/argoproj/argo-cd/client"
"github.com/argoproj/argo-cd/errors"
argocdclient "github.com/argoproj/argo-cd/pkg/apiclient"
argoappv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/server/application"
"github.com/argoproj/argo-cd/util"

View file

@ -9,9 +9,9 @@ import (
"strings"
"text/tabwriter"
argocdclient "github.com/argoproj/argo-cd/client"
"github.com/argoproj/argo-cd/common"
"github.com/argoproj/argo-cd/errors"
argocdclient "github.com/argoproj/argo-cd/pkg/apiclient"
argoappv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/server/cluster"
"github.com/argoproj/argo-cd/util"

View file

@ -8,8 +8,8 @@ import (
"syscall"
"text/tabwriter"
argocdclient "github.com/argoproj/argo-cd/client"
"github.com/argoproj/argo-cd/errors"
argocdclient "github.com/argoproj/argo-cd/pkg/apiclient"
appsv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/server/repository"
"github.com/argoproj/argo-cd/util"

View file

@ -1,7 +1,7 @@
package commands
import (
argocdclient "github.com/argoproj/argo-cd/client"
argocdclient "github.com/argoproj/argo-cd/pkg/apiclient"
"github.com/argoproj/argo-cd/util/cli"
"github.com/spf13/cobra"
"k8s.io/client-go/tools/clientcmd"

View file

@ -1,24 +1,22 @@
package application
package controller
import (
"context"
"encoding/json"
"fmt"
"path"
"time"
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/server/cluster"
"github.com/argoproj/argo-cd/util/diff"
ksutil "github.com/argoproj/argo-cd/util/ksonnet"
kubeutil "github.com/argoproj/argo-cd/util/kube"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
// AppComparator defines methods which allow to compare application spec and actual application state.
type AppComparator interface {
CompareAppState(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error)
CompareAppState(server string, namespace string, targetObjs []*unstructured.Unstructured, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error)
}
// KsonnetAppComparator allows to compare application using KSonnet CLI
@ -27,33 +25,21 @@ type KsonnetAppComparator struct {
}
// CompareAppState compares application spec and real app state using KSonnet
func (ks *KsonnetAppComparator) CompareAppState(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) {
func (ks *KsonnetAppComparator) CompareAppState(
server string,
namespace string,
targetObjs []*unstructured.Unstructured,
app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) {
log.Infof("Comparing app %s state", app.ObjectMeta.Name)
appPath := path.Join(appRepoPath, app.Spec.Source.Path)
ksApp, err := ksutil.NewKsonnetApp(appPath)
if err != nil {
return nil, err
}
appSpec := ksApp.AppSpec()
env, ok := appSpec.GetEnvironmentSpec(app.Spec.Source.Environment)
if !ok {
return nil, fmt.Errorf("environment '%s' does not exist in ksonnet app '%s'", app.Spec.Source.Environment, appSpec.Name)
}
// Get the REST config for the cluster corresponding to the environment
clst, err := ks.clusterService.Get(context.Background(), &cluster.ClusterQuery{Server: env.Destination.Server})
if err != nil {
return nil, err
}
// Generate the manifests for the environment
targetObjs, err := ksApp.Show(app.Spec.Source.Environment)
clst, err := ks.clusterService.Get(context.Background(), &cluster.ClusterQuery{Server: server})
if err != nil {
return nil, err
}
// Retrieve the live versions of the objects
liveObjs, err := kubeutil.GetLiveResources(clst.RESTConfig(), targetObjs, env.Destination.Namespace)
liveObjs, err := kubeutil.GetLiveResources(clst.RESTConfig(), targetObjs, namespace)
if err != nil {
return nil, err
}
@ -93,7 +79,7 @@ func (ks *KsonnetAppComparator) CompareAppState(appRepoPath string, app *v1alpha
ComparedTo: app.Spec.Source,
ComparedAt: metav1.Time{Time: time.Now().UTC()},
Server: clst.Server,
Namespace: env.Destination.Namespace,
Namespace: namespace,
Resources: resources,
}
if diffResults.Modified {

View file

@ -7,12 +7,20 @@ import (
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions"
"github.com/argoproj/argo-cd/util"
log "github.com/sirupsen/logrus"
"time"
"github.com/argoproj/argo-cd/application"
"fmt"
"encoding/json"
"github.com/argoproj/argo-cd/reposerver"
"github.com/argoproj/argo-cd/reposerver/repository"
apireposerver "github.com/argoproj/argo-cd/server/repository"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
@ -25,12 +33,14 @@ import (
// ApplicationController is the controller for application resources.
type ApplicationController struct {
appManager *application.Manager
repoClientset reposerver.Clientset
kubeClientset kubernetes.Interface
applicationClientset appclientset.Interface
appQueue workqueue.RateLimitingInterface
appInformer cache.SharedIndexInformer
appComparator AppComparator
statusRefreshTimeout time.Duration
apiRepoService apireposerver.RepositoryServiceServer
}
type ApplicationControllerConfig struct {
@ -42,16 +52,22 @@ type ApplicationControllerConfig struct {
func NewApplicationController(
kubeClientset kubernetes.Interface,
applicationClientset appclientset.Interface,
appManager *application.Manager,
repoClientset reposerver.Clientset,
apiRepoService apireposerver.RepositoryServiceServer,
appComparator AppComparator,
appResyncPeriod time.Duration,
config *ApplicationControllerConfig) *ApplicationController {
config *ApplicationControllerConfig,
) *ApplicationController {
appQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return &ApplicationController{
appManager: appManager,
kubeClientset: kubeClientset,
applicationClientset: applicationClientset,
repoClientset: repoClientset,
appQueue: appQueue,
apiRepoService: apiRepoService,
appComparator: appComparator,
appInformer: newApplicationInformer(applicationClientset, appQueue, appResyncPeriod, config),
statusRefreshTimeout: appResyncPeriod,
}
}
@ -97,15 +113,66 @@ func (ctrl *ApplicationController) processNextItem() bool {
return true
}
updatedApp := app.DeepCopy()
if ctrl.appManager.NeedRefreshAppStatus(updatedApp) {
updatedApp.Status = *ctrl.appManager.RefreshAppStatus(updatedApp)
if app.NeedRefreshAppStatus(ctrl.statusRefreshTimeout) {
updatedApp := app.DeepCopy()
status, err := ctrl.tryRefreshAppStatus(updatedApp)
if err != nil {
status = &appv1.ApplicationStatus{
ComparisonResult: appv1.ComparisonResult{
Status: appv1.ComparisonStatusError,
Error: fmt.Sprintf("Failed to get application status for application '%s': %v", app.Name, err),
ComparedTo: app.Spec.Source,
ComparedAt: metav1.Time{Time: time.Now().UTC()},
},
}
}
updatedApp.Status = *status
ctrl.persistApp(updatedApp)
}
return true
}
func (ctrl *ApplicationController) tryRefreshAppStatus(app *appv1.Application) (*appv1.ApplicationStatus, error) {
conn, client, err := ctrl.repoClientset.NewRepositoryClient()
if err != nil {
return nil, err
}
defer util.Close(conn)
repo, err := ctrl.apiRepoService.Get(context.Background(), &apireposerver.RepoQuery{Repo: app.Spec.Source.RepoURL})
if err != nil {
return nil, err
}
revision := app.Spec.Source.TargetRevision
manifestInfo, err := client.GenerateManifest(context.Background(), &repository.ManifestRequest{
Repo: repo,
Revision: revision,
Path: app.Spec.Source.Path,
Environment: app.Spec.Source.Environment,
})
if err != nil {
return nil, err
}
targetObjs := make([]*unstructured.Unstructured, len(manifestInfo.Manifests))
for i, manifestStr := range manifestInfo.Manifests {
var obj map[string]interface{}
if err := json.Unmarshal([]byte(manifestStr), &obj); err != nil {
if err != nil {
return nil, err
}
}
targetObjs[i] = &unstructured.Unstructured{Object: obj}
}
comparisonResult, err := ctrl.appComparator.CompareAppState(manifestInfo.Server, manifestInfo.Namespace, targetObjs, app)
if err != nil {
return nil, err
}
log.Infof("App %s comparison result: prev: %s. current: %s", app.Name, app.Status.ComparisonResult.Status, comparisonResult.Status)
return &appv1.ApplicationStatus{
ComparisonResult: *comparisonResult,
}, nil
}
func (ctrl *ApplicationController) runWorker() {
for ctrl.processNextItem() {
}

View file

@ -48,8 +48,9 @@ go build -i -o dist/protoc-gen-gofast ./vendor/github.com/gogo/protobuf/protoc-g
go build -i -o dist/protoc-gen-grpc-gateway ./vendor/github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
# Generate server/<service>/(<service>.pb.go|<service>.pb.gw.go)
PROTO_FILES=$(find ${PROJECT_ROOT}/server -name "*.proto" -not -path "${PROJECT_ROOT}/vendor/*")
PROTO_FILES=$(find $PROJECT_ROOT \( -name "*.proto" -and -path '*/server/*' -or -path '*/reposerver/*' -and -name "*.proto" \))
for i in ${PROTO_FILES}; do
# Path to the google API gateway annotations.proto will be different depending if we are
# building natively (e.g. from workspace) vs. part of a docker build.
if [ -f /.dockerenv ]; then

View file

@ -1,4 +1,4 @@
package client
package apiclient
import (
"errors"
@ -16,7 +16,7 @@ const (
EnvArgoCDServer = "ARGOCD_SERVER"
)
type Client interface {
type ServerClient interface {
NewConn() (*grpc.ClientConn, error)
NewRepoClient() (*grpc.ClientConn, repository.RepositoryServiceClient, error)
NewRepoClientOrDie() (*grpc.ClientConn, repository.RepositoryServiceClient)
@ -35,7 +35,7 @@ type client struct {
ClientOptions
}
func NewClient(opts *ClientOptions) (Client, error) {
func NewClient(opts *ClientOptions) (ServerClient, error) {
clientOpts := *opts
if clientOpts.ServerAddr == "" {
clientOpts.ServerAddr = os.Getenv(EnvArgoCDServer)
@ -48,7 +48,7 @@ func NewClient(opts *ClientOptions) (Client, error) {
}, nil
}
func NewClientOrDie(opts *ClientOptions) Client {
func NewClientOrDie(opts *ClientOptions) ServerClient {
client, err := NewClient(opts)
if err != nil {
log.Fatal(err)

View file

@ -3,6 +3,8 @@ package v1alpha1
import (
"encoding/json"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
@ -157,6 +159,13 @@ type RepositoryList struct {
Items []Repository `json:"items" protobuf:"bytes,2,rep,name=items"`
}
// NeedRefreshAppStatus answers if application status needs to be refreshed. Returns true if application never been compared, has changed or comparison result has expired.
func (app *Application) NeedRefreshAppStatus(statusRefreshTimeout time.Duration) bool {
return app.Status.ComparisonResult.Status == ComparisonStatusUnknown ||
!app.Spec.Source.Equals(app.Status.ComparisonResult.ComparedTo) ||
app.Status.ComparisonResult.ComparedAt.Add(statusRefreshTimeout).Before(time.Now())
}
// Equals compares two instances of ApplicationSource and return true if instances are equal.
func (source ApplicationSource) Equals(other ApplicationSource) bool {
return source.TargetRevision == other.TargetRevision &&

29
reposerver/clientset.go Normal file
View file

@ -0,0 +1,29 @@
package reposerver
import (
"github.com/argoproj/argo-cd/reposerver/repository"
"github.com/argoproj/argo-cd/util"
"google.golang.org/grpc"
)
// Clientset represets repository server api clients
type Clientset interface {
NewRepositoryClient() (util.Closer, repository.RepositoryServiceClient, error)
}
type clientSet struct {
address string
}
func (c *clientSet) NewRepositoryClient() (util.Closer, repository.RepositoryServiceClient, error) {
conn, err := grpc.Dial(c.address, grpc.WithInsecure())
if err != nil {
return nil, nil, err
}
return conn, repository.NewRepositoryServiceClient(conn), nil
}
// NewRepositoryServerClientset creates new instance of repo server Clientset
func NewRepositoryServerClientset(address string) Clientset {
return &clientSet{address: address}
}

View file

@ -0,0 +1,78 @@
package repository
import (
"os"
"path"
"strings"
"fmt"
"encoding/json"
"github.com/argoproj/argo-cd/util"
"github.com/argoproj/argo-cd/util/git"
ksutil "github.com/argoproj/argo-cd/util/ksonnet"
"golang.org/x/net/context"
"k8s.io/client-go/kubernetes"
)
// Service implements ManifestService interface
type Service struct {
ns string
kubeClient kubernetes.Interface
gitClient git.Client
repoLock *util.KeyLock
}
// NewService returns a new instance of the Manifest service
func NewService(namespace string, kubeClient kubernetes.Interface, gitClient git.Client) *Service {
return &Service{
ns: namespace,
kubeClient: kubeClient,
gitClient: gitClient,
repoLock: util.NewKeyLock(),
}
}
func (s *Service) GenerateManifest(c context.Context, q *ManifestRequest) (*ManifestResponse, error) {
appRepoPath := path.Join(os.TempDir(), strings.Replace(q.Repo.Repo, "/", "_", -1))
s.repoLock.Lock(appRepoPath)
defer s.repoLock.Unlock(appRepoPath)
err := s.gitClient.CloneOrFetch(q.Repo.Repo, q.Repo.Username, q.Repo.Password, appRepoPath)
if err != nil {
return nil, err
}
err = s.gitClient.Checkout(appRepoPath, q.Revision)
if err != nil {
return nil, err
}
appPath := path.Join(appRepoPath, q.Path)
ksApp, err := ksutil.NewKsonnetApp(appPath)
if err != nil {
return nil, err
}
appSpec := ksApp.AppSpec()
env, ok := appSpec.GetEnvironmentSpec(q.Environment)
targetObjs, err := ksApp.Show(q.Environment)
if err != nil {
return nil, err
}
manifests := make([]string, len(targetObjs))
for i, target := range targetObjs {
manifestStr, err := json.Marshal(target.Object)
if err != nil {
return nil, err
}
manifests[i] = string(manifestStr)
}
if !ok {
return nil, fmt.Errorf("environment '%s' does not exist in ksonnet app '%s'", q.Environment, appSpec.Name)
}
return &ManifestResponse{
Manifests: manifests,
Namespace: env.Destination.Namespace,
Server: env.Destination.Server,
}, nil
}

View file

@ -0,0 +1,219 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: reposerver/repository/repository.proto
/*
Package repository is a generated protocol buffer package.
It is generated from these files:
reposerver/repository/repository.proto
It has these top-level messages:
ManifestRequest
ManifestResponse
*/
package repository
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import _ "github.com/gogo/protobuf/gogoproto"
import _ "google.golang.org/genproto/googleapis/api/annotations"
import _ "k8s.io/api/core/v1"
import github_com_argoproj_argo_cd_pkg_apis_application_v1alpha1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// ManifestRequest is a query for manifest generation.
type ManifestRequest struct {
Repo *github_com_argoproj_argo_cd_pkg_apis_application_v1alpha1.Repository `protobuf:"bytes,1,opt,name=repo" json:"repo,omitempty"`
Revision string `protobuf:"bytes,2,opt,name=revision" json:"revision,omitempty"`
Path string `protobuf:"bytes,3,opt,name=path" json:"path,omitempty"`
Environment string `protobuf:"bytes,4,opt,name=environment" json:"environment,omitempty"`
}
func (m *ManifestRequest) Reset() { *m = ManifestRequest{} }
func (m *ManifestRequest) String() string { return proto.CompactTextString(m) }
func (*ManifestRequest) ProtoMessage() {}
func (*ManifestRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *ManifestRequest) GetRepo() *github_com_argoproj_argo_cd_pkg_apis_application_v1alpha1.Repository {
if m != nil {
return m.Repo
}
return nil
}
func (m *ManifestRequest) GetRevision() string {
if m != nil {
return m.Revision
}
return ""
}
func (m *ManifestRequest) GetPath() string {
if m != nil {
return m.Path
}
return ""
}
func (m *ManifestRequest) GetEnvironment() string {
if m != nil {
return m.Environment
}
return ""
}
type ManifestResponse struct {
Manifests []string `protobuf:"bytes,1,rep,name=manifests" json:"manifests,omitempty"`
Namespace string `protobuf:"bytes,2,opt,name=namespace" json:"namespace,omitempty"`
Server string `protobuf:"bytes,3,opt,name=server" json:"server,omitempty"`
}
func (m *ManifestResponse) Reset() { *m = ManifestResponse{} }
func (m *ManifestResponse) String() string { return proto.CompactTextString(m) }
func (*ManifestResponse) ProtoMessage() {}
func (*ManifestResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *ManifestResponse) GetManifests() []string {
if m != nil {
return m.Manifests
}
return nil
}
func (m *ManifestResponse) GetNamespace() string {
if m != nil {
return m.Namespace
}
return ""
}
func (m *ManifestResponse) GetServer() string {
if m != nil {
return m.Server
}
return ""
}
func init() {
proto.RegisterType((*ManifestRequest)(nil), "repository.ManifestRequest")
proto.RegisterType((*ManifestResponse)(nil), "repository.ManifestResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for RepositoryService service
type RepositoryServiceClient interface {
// Generate manifest for specified repo name and sha
GenerateManifest(ctx context.Context, in *ManifestRequest, opts ...grpc.CallOption) (*ManifestResponse, error)
}
type repositoryServiceClient struct {
cc *grpc.ClientConn
}
func NewRepositoryServiceClient(cc *grpc.ClientConn) RepositoryServiceClient {
return &repositoryServiceClient{cc}
}
func (c *repositoryServiceClient) GenerateManifest(ctx context.Context, in *ManifestRequest, opts ...grpc.CallOption) (*ManifestResponse, error) {
out := new(ManifestResponse)
err := grpc.Invoke(ctx, "/repository.RepositoryService/GenerateManifest", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for RepositoryService service
type RepositoryServiceServer interface {
// Generate manifest for specified repo name and sha
GenerateManifest(context.Context, *ManifestRequest) (*ManifestResponse, error)
}
func RegisterRepositoryServiceServer(s *grpc.Server, srv RepositoryServiceServer) {
s.RegisterService(&_RepositoryService_serviceDesc, srv)
}
func _RepositoryService_GenerateManifest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ManifestRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RepositoryServiceServer).GenerateManifest(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/repository.RepositoryService/GenerateManifest",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RepositoryServiceServer).GenerateManifest(ctx, req.(*ManifestRequest))
}
return interceptor(ctx, in, info, handler)
}
var _RepositoryService_serviceDesc = grpc.ServiceDesc{
ServiceName: "repository.RepositoryService",
HandlerType: (*RepositoryServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GenerateManifest",
Handler: _RepositoryService_GenerateManifest_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "reposerver/repository/repository.proto",
}
func init() { proto.RegisterFile("reposerver/repository/repository.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 368 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x52, 0xb1, 0x6e, 0xdb, 0x30,
0x10, 0xad, 0x6a, 0xc3, 0xa8, 0xe9, 0xa1, 0x2e, 0x51, 0x14, 0x82, 0xea, 0x41, 0xd0, 0x50, 0x78,
0x29, 0x09, 0xdb, 0x4b, 0xe7, 0x02, 0x45, 0x91, 0x21, 0x08, 0xa0, 0x4c, 0xc9, 0x12, 0xd0, 0xd2,
0x99, 0x66, 0x6c, 0xf1, 0x18, 0x92, 0x16, 0x90, 0x9f, 0xcb, 0xb7, 0x05, 0xa2, 0x65, 0x4b, 0x48,
0x8c, 0x6c, 0x8f, 0xef, 0x91, 0xf7, 0x1e, 0xef, 0x8e, 0xfc, 0xb2, 0x60, 0xd0, 0x81, 0xad, 0xc1,
0xf2, 0x00, 0x95, 0x47, 0xfb, 0xdc, 0x83, 0xcc, 0x58, 0xf4, 0x48, 0x49, 0xc7, 0x24, 0xdf, 0x25,
0x4a, 0x0c, 0x34, 0x6f, 0xd0, 0xf1, 0x46, 0x32, 0x93, 0x88, 0x72, 0x0f, 0x5c, 0x18, 0xc5, 0x85,
0xd6, 0xe8, 0x85, 0x57, 0xa8, 0x5d, 0xab, 0x66, 0xbb, 0x3f, 0x8e, 0x29, 0x0c, 0x6a, 0x81, 0x16,
0x78, 0xbd, 0xe0, 0x12, 0x34, 0x58, 0xe1, 0xa1, 0x6c, 0xef, 0x5c, 0x49, 0xe5, 0xb7, 0x87, 0x35,
0x2b, 0xb0, 0xe2, 0xc2, 0x06, 0x8b, 0xc7, 0x00, 0x7e, 0x17, 0x25, 0x37, 0x3b, 0xd9, 0x3c, 0x76,
0x5c, 0x18, 0xb3, 0x57, 0x45, 0x28, 0xce, 0xeb, 0x85, 0xd8, 0x9b, 0xad, 0x78, 0x57, 0x2a, 0x7b,
0x89, 0xc8, 0xd7, 0x6b, 0xa1, 0xd5, 0x06, 0x9c, 0xcf, 0xe1, 0xe9, 0x00, 0xce, 0xd3, 0x3b, 0x32,
0x6c, 0x3e, 0x11, 0x47, 0x69, 0x34, 0x9f, 0x2c, 0xff, 0xb1, 0xce, 0x8d, 0x9d, 0xdc, 0x02, 0x78,
0x28, 0x4a, 0x66, 0x76, 0x92, 0x35, 0x6e, 0xac, 0xe7, 0xc6, 0x4e, 0x6e, 0x2c, 0x3f, 0xf7, 0x22,
0x0f, 0x25, 0x69, 0x42, 0xbe, 0x58, 0xa8, 0x95, 0x53, 0xa8, 0xe3, 0xcf, 0x69, 0x34, 0x1f, 0xe7,
0xe7, 0x33, 0xa5, 0x64, 0x68, 0x84, 0xdf, 0xc6, 0x83, 0xc0, 0x07, 0x4c, 0x53, 0x32, 0x01, 0x5d,
0x2b, 0x8b, 0xba, 0x02, 0xed, 0xe3, 0x61, 0x90, 0xfa, 0x54, 0xb6, 0x21, 0xd3, 0x2e, 0xbf, 0x33,
0xa8, 0x1d, 0xd0, 0x19, 0x19, 0x57, 0x2d, 0xe7, 0xe2, 0x28, 0x1d, 0xcc, 0xc7, 0x79, 0x47, 0x34,
0xaa, 0x16, 0x15, 0x38, 0x23, 0x0a, 0x68, 0x43, 0x74, 0x04, 0xfd, 0x41, 0x46, 0xc7, 0x29, 0xb7,
0x39, 0xda, 0xd3, 0xb2, 0x24, 0xdf, 0xba, 0xdf, 0xdc, 0x82, 0xad, 0x55, 0x01, 0xf4, 0x86, 0x4c,
0xff, 0xb7, 0x0d, 0x3d, 0x85, 0xa0, 0x3f, 0x59, 0x6f, 0x27, 0xde, 0xb4, 0x36, 0x99, 0x5d, 0x16,
0x8f, 0xb9, 0xb3, 0x4f, 0x7f, 0x57, 0xf7, 0x8b, 0x8f, 0x66, 0x7b, 0x71, 0x07, 0xd7, 0xa3, 0x30,
0xca, 0xd5, 0x6b, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb9, 0x44, 0x27, 0xd1, 0xa3, 0x02, 0x00, 0x00,
}

View file

@ -0,0 +1,32 @@
syntax = "proto3";
option go_package = "github.com/argoproj/argo-cd/reposerver/repository";
package repository;
import "gogoproto/gogo.proto";
import "google/api/annotations.proto";
import "k8s.io/api/core/v1/generated.proto";
import "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1/generated.proto";
// ManifestRequest is a query for manifest generation.
message ManifestRequest {
github.com.argoproj.argo_cd.pkg.apis.application.v1alpha1.Repository repo = 1;
string revision = 2;
string path = 3;
string environment = 4;
}
message ManifestResponse {
repeated string manifests = 1;
string namespace = 2;
string server = 3;
}
// ManifestService
service RepositoryService {
// Generate manifest for application in specified repo name and revision
rpc GenerateManifest(ManifestRequest) returns (ManifestResponse) {
}
}

48
reposerver/server.go Normal file
View file

@ -0,0 +1,48 @@
package reposerver
import (
"github.com/argoproj/argo-cd/reposerver/repository"
"github.com/argoproj/argo-cd/server/version"
"github.com/argoproj/argo-cd/util/git"
grpc_util "github.com/argoproj/argo-cd/util/grpc"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"k8s.io/client-go/kubernetes"
)
// ArgoCDRepoServer is the repo server implementation
type ArgoCDRepoServer struct {
ns string
kubeclientset kubernetes.Interface
log *log.Entry
}
// NewServer returns a new instance of the ArgoCD Repo server
func NewServer(kubeclientset kubernetes.Interface, namespace string) *ArgoCDRepoServer {
return &ArgoCDRepoServer{
ns: namespace,
kubeclientset: kubeclientset,
log: log.NewEntry(log.New()),
}
}
// CreateGRPC creates new configured grpc server
func (a *ArgoCDRepoServer) CreateGRPC(gitClient git.Client) *grpc.Server {
server := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_logrus.StreamServerInterceptor(a.log),
grpc_util.PanicLoggerStreamServerInterceptor(a.log),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_logrus.UnaryServerInterceptor(a.log),
grpc_util.PanicLoggerUnaryServerInterceptor(a.log),
)),
)
version.RegisterVersionServiceServer(server, &version.Server{})
manifestService := repository.NewService(a.ns, a.kubeclientset, gitClient)
repository.RegisterRepositoryServiceServer(server, manifestService)
return server
}

View file

@ -43,12 +43,12 @@ func TestController(t *testing.T) {
assert.Equal(t, app.Status.ComparisonResult.Status, v1alpha1.ComparisonStatusError)
})
t.Run("TestComparisonSuccessful", func(t *testing.T) {
t.Run("TestComparisonFailsIfClusterNotAdded", func(t *testing.T) {
ctrl := fixture.CreateController()
ctx, cancel := context.WithCancel(context.Background())
go ctrl.Run(ctx, 1)
defer cancel()
_, err := fixture.RepoService.Create(context.Background(), &v1alpha1.Repository{Repo: testApp.Spec.Source.RepoURL, Username: "", Password: ""})
_, err := fixture.ApiRepoService.Create(context.Background(), &v1alpha1.Repository{Repo: testApp.Spec.Source.RepoURL, Username: "", Password: ""})
if err != nil {
t.Fatal(fmt.Sprintf("Unable to create repo %v", err))
}
@ -64,7 +64,7 @@ func TestController(t *testing.T) {
t.Fatal(fmt.Sprintf("Unable to get app %v", err))
}
assert.NotEqual(t, app.Status.ComparisonResult.Status, v1alpha1.ComparisonStatusError)
assert.Equal(t, app.Status.ComparisonResult.Status, v1alpha1.ComparisonStatusError)
})
}

View file

@ -3,19 +3,21 @@ package e2e
import (
"fmt"
"log"
"net"
"os"
"os/exec"
"testing"
"time"
"github.com/argoproj/argo-cd/application"
"github.com/argoproj/argo-cd/common"
"github.com/argoproj/argo-cd/controller"
"github.com/argoproj/argo-cd/install"
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
"github.com/argoproj/argo-cd/reposerver"
"github.com/argoproj/argo-cd/server/cluster"
"github.com/argoproj/argo-cd/server/repository"
apirepository "github.com/argoproj/argo-cd/server/repository"
"google.golang.org/grpc"
"k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -26,19 +28,22 @@ import (
)
const (
TestTimeout = time.Second * 3
TestTimeout = time.Minute * 3
)
// Fixture represents e2e tests fixture.
type Fixture struct {
Config *rest.Config
AppManager *application.Manager
KubeClient kubernetes.Interface
ExtensionsClient apiextensionsclient.Interface
AppClient appclientset.Interface
RepoService repository.RepositoryServiceServer
Namespace string
InstanceID string
Config *rest.Config
KubeClient kubernetes.Interface
ExtensionsClient apiextensionsclient.Interface
AppClient appclientset.Interface
ApiRepoService apirepository.RepositoryServiceServer
RepoClientset reposerver.Clientset
AppComparator controller.AppComparator
Namespace string
InstanceID string
repoServerGRPC *grpc.Server
repoServerListener net.Listener
}
func createNamespace(kubeClient *kubernetes.Clientset) (string, error) {
@ -59,13 +64,24 @@ func (f *Fixture) setup() error {
if err != nil {
return err
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return err
}
f.repoServerListener = listener
go func() {
err = f.repoServerGRPC.Serve(listener)
}()
installer.InstallApplicationCRD()
return nil
return err
}
// TearDown deletes fixture resources.
func (f *Fixture) TearDown() {
err := f.KubeClient.CoreV1().Namespaces().Delete(f.Namespace, &metav1.DeleteOptions{})
if err != nil {
f.repoServerGRPC.Stop()
}
if err != nil {
println("Unable to tear down fixture")
}
@ -93,15 +109,11 @@ func NewFixture() (*Fixture, error) {
kubeClient := kubernetes.NewForConfigOrDie(config)
namespace, err := createNamespace(kubeClient)
clusterService := cluster.NewServer(namespace, kubeClient, appClient)
appManager := application.NewAppManager(
&FakeGitClient{},
repository.NewServer(namespace, kubeClient, appClient),
clusterService,
application.NewKsonnetAppComparator(clusterService),
time.Second)
repoServerGRPC := reposerver.NewServer(kubeClient, namespace).CreateGRPC(&FakeGitClient{})
if err != nil {
return nil, err
}
appComparator := controller.NewKsonnetAppComparator(clusterService)
fixture := &Fixture{
Config: config,
ExtensionsClient: extensionsClient,
@ -109,8 +121,9 @@ func NewFixture() (*Fixture, error) {
KubeClient: kubeClient,
Namespace: namespace,
InstanceID: namespace,
RepoService: repository.NewServer(namespace, kubeClient, appClient),
AppManager: appManager,
ApiRepoService: apirepository.NewServer(namespace, kubeClient, appClient),
AppComparator: appComparator,
repoServerGRPC: repoServerGRPC,
}
err = fixture.setup()
if err != nil {
@ -137,10 +150,14 @@ func (f *Fixture) CreateApp(t *testing.T, application *v1alpha1.Application) *v1
// CreateController creates new controller instance
func (f *Fixture) CreateController() *controller.ApplicationController {
return controller.NewApplicationController(f.KubeClient, f.AppClient, f.AppManager, time.Second, &controller.ApplicationControllerConfig{
Namespace: f.Namespace,
InstanceID: f.InstanceID,
})
return controller.NewApplicationController(
f.KubeClient,
f.AppClient,
reposerver.NewRepositoryServerClientset(f.repoServerListener.Addr().String()),
f.ApiRepoService,
f.AppComparator,
time.Second,
&controller.ApplicationControllerConfig{Namespace: f.Namespace, InstanceID: f.InstanceID})
}
// PollUntil periodically executes specified condition until it returns true.
@ -169,7 +186,11 @@ type FakeGitClient struct {
}
func (c *FakeGitClient) CloneOrFetch(repo string, username string, password string, repoPath string) error {
_, err := exec.Command("cp", "-r", "functional/ks-example", repoPath).Output()
_, err := exec.Command("rm", "-rf", repoPath).Output()
if err != nil {
return err
}
_, err = exec.Command("cp", "-r", "functional/ks-example", repoPath).Output()
return err
}

View file

@ -1,4 +1,4 @@
apiVersion: "0.1"
apiVersion: 0.1.0
gitVersion:
commitSha: 422d521c05aa905df949868143b26445f5e4eda5
refSpec: master

View file

@ -1,4 +1,11 @@
apiVersion: 0.0.1
environments:
default:
destination:
namespace: default
server: https://localhost:6443
k8sVersion: v1.7.0
path: default
kind: ksonnet.io/app
name: ks-example
registries:

View file

@ -1,3 +1,4 @@
local env = std.extVar("__ksonnet/environments");
local params = std.extVar("__ksonnet/params").components["guestbook-ui"];
local k = import "k.libsonnet";
local deployment = k.apps.v1beta1.deployment;

View file

@ -1,4 +1,4 @@
local base = import "../base.libsonnet";
local base = import "base.libsonnet";
local k = import "k.libsonnet";
base + {

View file

@ -1,4 +0,0 @@
{
"server": "https://localhost:6443",
"namespace": ""
}

View file

@ -33,6 +33,11 @@ func (m *NativeGitClient) CloneOrFetch(repo string, username string, password st
needClone = err != nil
}
if needClone {
_, err := exec.Command("rm", "-rf", repoPath).Output()
if err != nil {
return fmt.Errorf("unable to clean repo cache at %s: %v", repoPath, err)
}
repoURL, err := url.ParseRequestURI(repo)
if err != nil {
return err