From 4140c9867dd84f6c53f0983fdc0f4ad2bb11c4bf Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Wed, 16 Jan 2019 15:56:26 -0800 Subject: [PATCH] Replace grpc repo-server parallelism limit interceptor with semaphore (#1029) --- Gopkg.lock | 15 ++++-------- cmd/argocd-repo-server/main.go | 27 +++------------------ reposerver/repository/repository.go | 25 +++++++++++++++---- reposerver/server.go | 37 +++++++++-------------------- test/e2e/fixture.go | 2 +- 5 files changed, 41 insertions(+), 65 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 5b8673db69..70cd319f1b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -759,14 +759,6 @@ revision = "640f0ab560aeb89d523bb6ac322b1244d5c3796c" version = "v0.2.0" -[[projects]] - branch = "master" - digest = "1:413f9e6cd1e2fe339c1d4a5e36de4e287d9edd575dd63e89b728351b385aadfa" - name = "github.com/yaronsumel/grpc-throttle" - packages = ["."] - pruneopts = "" - revision = "cd1107b3c0b1e9589c5837a847f6cd51188ef11e" - [[projects]] branch = "master" digest = "1:3cf699a0df65293cc8fd2339606950d3e2f6d02a435703951d1da411a23f7cef" @@ -847,7 +839,10 @@ branch = "master" digest = "1:b2ea75de0ccb2db2ac79356407f8a4cd8f798fe15d41b381c00abf3ae8e55ed1" name = "golang.org/x/sync" - packages = ["errgroup"] + packages = [ + "errgroup", + "semaphore", + ] pruneopts = "" revision = "1d60e4601c6fd243af51cc01ddf169918a5407ca" @@ -1442,7 +1437,6 @@ "github.com/stretchr/testify/assert", "github.com/stretchr/testify/mock", "github.com/vmihailenco/msgpack", - "github.com/yaronsumel/grpc-throttle", "github.com/yudai/gojsondiff", "github.com/yudai/gojsondiff/formatter", "golang.org/x/crypto/bcrypt", @@ -1451,6 +1445,7 @@ "golang.org/x/net/context", "golang.org/x/oauth2", "golang.org/x/sync/errgroup", + "golang.org/x/sync/semaphore", "google.golang.org/genproto/googleapis/api/annotations", "google.golang.org/grpc", "google.golang.org/grpc/codes", diff --git a/cmd/argocd-repo-server/main.go b/cmd/argocd-repo-server/main.go index 5ce9e98059..cb8eccca9b 100644 --- a/cmd/argocd-repo-server/main.go +++ b/cmd/argocd-repo-server/main.go @@ -4,8 +4,6 @@ import ( "fmt" "net" "os" - "strconv" - "strings" "time" "github.com/go-redis/redis" @@ -36,7 +34,7 @@ func newCommand() *cobra.Command { redisAddress string sentinelAddresses []string sentinelMaster string - parallelismLimit []string + parallelismLimit int64 tlsConfigCustomizerSrc func() (tls.ConfigCustomizer, error) ) var command = cobra.Command{ @@ -48,9 +46,7 @@ func newCommand() *cobra.Command { tlsConfigCustomizer, err := tlsConfigCustomizerSrc() errors.CheckError(err) - parallelism, err := parseParallelismLimit(parallelismLimit) - errors.CheckError(err) - server, err := reposerver.NewServer(git.NewFactory(), newCache(redisAddress, sentinelAddresses, sentinelMaster), tlsConfigCustomizer, parallelism) + server, err := reposerver.NewServer(git.NewFactory(), newCache(redisAddress, sentinelAddresses, sentinelMaster), tlsConfigCustomizer, parallelismLimit) errors.CheckError(err) grpc := server.CreateGRPC() listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) @@ -74,28 +70,11 @@ func newCommand() *cobra.Command { command.Flags().StringVar(&redisAddress, "redis", "", "Redis server hostname and port (e.g. argocd-redis:6379). ") command.Flags().StringArrayVar(&sentinelAddresses, "sentinel", []string{}, "Redis sentinel hostname and port (e.g. argocd-redis-ha-announce-0:6379). ") command.Flags().StringVar(&sentinelMaster, "sentinelmaster", "master", "Redis sentinel master group name.") - command.Flags().StringArrayVar(¶llelismLimit, - "parallelism-limit", []string{}, "Sets parallelism limit for grpc method (e.g. /repository.RepositoryService/GenerateManifest=10). ") + command.Flags().Int64Var(¶llelismLimit, "parallelismlimit", 0, "Limit on number of concurrent manifests generate requests. Any value less the 1 means no limit.") tlsConfigCustomizerSrc = tls.AddTLSFlagsToCmd(&command) return &command } -func parseParallelismLimit(parallelismLimit []string) (map[string]int, error) { - parallelism := make(map[string]int) - for _, limit := range parallelismLimit { - parts := strings.Split(limit, "=") - if len(parts) != 2 { - return nil, fmt.Errorf("Expected parallelism-limit form is: grpc-method-name=number. Received: %s.", limit) - } - limitNum, err := strconv.Atoi(parts[1]) - if err != nil { - return nil, fmt.Errorf("Unable to convert limit specified in parallelism-limit=%s to number: %v", limit, err) - } - parallelism[parts[0]] = limitNum - } - return parallelism, nil -} - func newCache(redisAddress string, sentinelAddresses []string, sentinelMaster string) cache.Cache { if redisAddress != "" { client := redis.NewClient(&redis.Options{ diff --git a/reposerver/repository/repository.go b/reposerver/repository/repository.go index fc75c83724..74f0bc2174 100644 --- a/reposerver/repository/repository.go +++ b/reposerver/repository/repository.go @@ -28,6 +28,8 @@ import ( "github.com/argoproj/argo-cd/util/ksonnet" "github.com/argoproj/argo-cd/util/kube" "github.com/argoproj/argo-cd/util/kustomize" + + "golang.org/x/sync/semaphore" ) const ( @@ -37,14 +39,21 @@ const ( // Service implements ManifestService interface type Service struct { - repoLock *util.KeyLock - gitFactory git.ClientFactory - cache cache.Cache + repoLock *util.KeyLock + gitFactory git.ClientFactory + cache cache.Cache + parallelismLimitSemaphore *semaphore.Weighted } // NewService returns a new instance of the Manifest service -func NewService(gitFactory git.ClientFactory, cache cache.Cache) *Service { +func NewService(gitFactory git.ClientFactory, cache cache.Cache, parallelismLimit int64) *Service { + var parallelismLimitSemaphore *semaphore.Weighted + if parallelismLimit > 0 { + parallelismLimitSemaphore = semaphore.NewWeighted(parallelismLimit) + } return &Service{ + parallelismLimitSemaphore: parallelismLimitSemaphore, + repoLock: util.NewKeyLock(), gitFactory: gitFactory, cache: cache, @@ -165,6 +174,14 @@ func (s *Service) GenerateManifest(c context.Context, q *ManifestRequest) (*Mani return cached, nil } + if s.parallelismLimitSemaphore != nil { + err = s.parallelismLimitSemaphore.Acquire(c, 1) + if err != nil { + return nil, err + } + defer s.parallelismLimitSemaphore.Release(1) + } + commitSHA, err = checkoutRevision(gitClient, commitSHA) if err != nil { return nil, err diff --git a/reposerver/server.go b/reposerver/server.go index b8f3544fbb..7bba4557f6 100644 --- a/reposerver/server.go +++ b/reposerver/server.go @@ -1,7 +1,6 @@ package reposerver import ( - "context" "crypto/tls" "github.com/argoproj/argo-cd/reposerver/repository" @@ -14,8 +13,6 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" log "github.com/sirupsen/logrus" - "github.com/yaronsumel/grpc-throttle" - "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/reflection" @@ -23,14 +20,15 @@ import ( // ArgoCDRepoServer is the repo server implementation type ArgoCDRepoServer struct { - log *log.Entry - gitFactory git.ClientFactory - cache cache.Cache - opts []grpc.ServerOption + log *log.Entry + gitFactory git.ClientFactory + cache cache.Cache + opts []grpc.ServerOption + parallelismLimit int64 } // NewServer returns a new instance of the Argo CD Repo server -func NewServer(gitFactory git.ClientFactory, cache cache.Cache, tlsConfCustomizer tlsutil.ConfigCustomizer, parallelismLimit map[string]int) (*ArgoCDRepoServer, error) { +func NewServer(gitFactory git.ClientFactory, cache cache.Cache, tlsConfCustomizer tlsutil.ConfigCustomizer, parallelismLimit int64) (*ArgoCDRepoServer, error) { // generate TLS cert hosts := []string{ "localhost", @@ -52,25 +50,12 @@ func NewServer(gitFactory git.ClientFactory, cache cache.Cache, tlsConfCustomize serverLog := log.NewEntry(log.New()) streamInterceptors := []grpc.StreamServerInterceptor{grpc_logrus.StreamServerInterceptor(serverLog), grpc_util.PanicLoggerStreamServerInterceptor(serverLog)} unaryInterceptors := []grpc.UnaryServerInterceptor{grpc_logrus.UnaryServerInterceptor(serverLog), grpc_util.PanicLoggerUnaryServerInterceptor(serverLog)} - if len(parallelismLimit) > 0 { - var semaphores = throttle.SemaphoreMap{} - for m, l := range parallelismLimit { - semaphores[m] = make(throttle.Semaphore, l) - } - throttler := func(ctx context.Context, fullMethod string) (throttle.Semaphore, bool) { - if s, ok := semaphores[fullMethod]; ok { - return s, true - } - return nil, false - } - streamInterceptors = append(streamInterceptors, throttle.StreamServerInterceptor(throttler)) - unaryInterceptors = append(unaryInterceptors, throttle.UnaryServerInterceptor(throttler)) - } return &ArgoCDRepoServer{ - log: serverLog, - gitFactory: gitFactory, - cache: cache, + log: serverLog, + gitFactory: gitFactory, + cache: cache, + parallelismLimit: parallelismLimit, opts: []grpc.ServerOption{ grpc.Creds(credentials.NewTLS(tlsConfig)), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), @@ -83,7 +68,7 @@ func NewServer(gitFactory git.ClientFactory, cache cache.Cache, tlsConfCustomize func (a *ArgoCDRepoServer) CreateGRPC() *grpc.Server { server := grpc.NewServer(a.opts...) version.RegisterVersionServiceServer(server, &version.Server{}) - manifestService := repository.NewService(a.gitFactory, a.cache) + manifestService := repository.NewService(a.gitFactory, a.cache, a.parallelismLimit) repository.RegisterRepositoryServiceServer(server, manifestService) // Register reflection service on gRPC server. diff --git a/test/e2e/fixture.go b/test/e2e/fixture.go index d2bed27413..b00bac2185 100644 --- a/test/e2e/fixture.go +++ b/test/e2e/fixture.go @@ -123,7 +123,7 @@ func (f *Fixture) setup() error { } memCache := cache.NewInMemoryCache(repository.DefaultRepoCacheExpiration) - repoSrv, err := reposerver.NewServer(&FakeGitClientFactory{}, memCache, func(config *tls.Config) {}, make(map[string]int)) + repoSrv, err := reposerver.NewServer(&FakeGitClientFactory{}, memCache, func(config *tls.Config) {}, 0) if err != nil { return err }