Replace grpc repo-server parallelism limit interceptor with semaphore (#1029)

This commit is contained in:
Alexander Matyushentsev 2019-01-16 15:56:26 -08:00 committed by GitHub
parent 2988cebaa9
commit 4140c9867d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 41 additions and 65 deletions

15
Gopkg.lock generated
View file

@ -759,14 +759,6 @@
revision = "640f0ab560aeb89d523bb6ac322b1244d5c3796c"
version = "v0.2.0"
[[projects]]
branch = "master"
digest = "1:413f9e6cd1e2fe339c1d4a5e36de4e287d9edd575dd63e89b728351b385aadfa"
name = "github.com/yaronsumel/grpc-throttle"
packages = ["."]
pruneopts = ""
revision = "cd1107b3c0b1e9589c5837a847f6cd51188ef11e"
[[projects]]
branch = "master"
digest = "1:3cf699a0df65293cc8fd2339606950d3e2f6d02a435703951d1da411a23f7cef"
@ -847,7 +839,10 @@
branch = "master"
digest = "1:b2ea75de0ccb2db2ac79356407f8a4cd8f798fe15d41b381c00abf3ae8e55ed1"
name = "golang.org/x/sync"
packages = ["errgroup"]
packages = [
"errgroup",
"semaphore",
]
pruneopts = ""
revision = "1d60e4601c6fd243af51cc01ddf169918a5407ca"
@ -1442,7 +1437,6 @@
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/mock",
"github.com/vmihailenco/msgpack",
"github.com/yaronsumel/grpc-throttle",
"github.com/yudai/gojsondiff",
"github.com/yudai/gojsondiff/formatter",
"golang.org/x/crypto/bcrypt",
@ -1451,6 +1445,7 @@
"golang.org/x/net/context",
"golang.org/x/oauth2",
"golang.org/x/sync/errgroup",
"golang.org/x/sync/semaphore",
"google.golang.org/genproto/googleapis/api/annotations",
"google.golang.org/grpc",
"google.golang.org/grpc/codes",

View file

@ -4,8 +4,6 @@ import (
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
"github.com/go-redis/redis"
@ -36,7 +34,7 @@ func newCommand() *cobra.Command {
redisAddress string
sentinelAddresses []string
sentinelMaster string
parallelismLimit []string
parallelismLimit int64
tlsConfigCustomizerSrc func() (tls.ConfigCustomizer, error)
)
var command = cobra.Command{
@ -48,9 +46,7 @@ func newCommand() *cobra.Command {
tlsConfigCustomizer, err := tlsConfigCustomizerSrc()
errors.CheckError(err)
parallelism, err := parseParallelismLimit(parallelismLimit)
errors.CheckError(err)
server, err := reposerver.NewServer(git.NewFactory(), newCache(redisAddress, sentinelAddresses, sentinelMaster), tlsConfigCustomizer, parallelism)
server, err := reposerver.NewServer(git.NewFactory(), newCache(redisAddress, sentinelAddresses, sentinelMaster), tlsConfigCustomizer, parallelismLimit)
errors.CheckError(err)
grpc := server.CreateGRPC()
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
@ -74,28 +70,11 @@ func newCommand() *cobra.Command {
command.Flags().StringVar(&redisAddress, "redis", "", "Redis server hostname and port (e.g. argocd-redis:6379). ")
command.Flags().StringArrayVar(&sentinelAddresses, "sentinel", []string{}, "Redis sentinel hostname and port (e.g. argocd-redis-ha-announce-0:6379). ")
command.Flags().StringVar(&sentinelMaster, "sentinelmaster", "master", "Redis sentinel master group name.")
command.Flags().StringArrayVar(&parallelismLimit,
"parallelism-limit", []string{}, "Sets parallelism limit for grpc method (e.g. /repository.RepositoryService/GenerateManifest=10). ")
command.Flags().Int64Var(&parallelismLimit, "parallelismlimit", 0, "Limit on number of concurrent manifests generate requests. Any value less the 1 means no limit.")
tlsConfigCustomizerSrc = tls.AddTLSFlagsToCmd(&command)
return &command
}
func parseParallelismLimit(parallelismLimit []string) (map[string]int, error) {
parallelism := make(map[string]int)
for _, limit := range parallelismLimit {
parts := strings.Split(limit, "=")
if len(parts) != 2 {
return nil, fmt.Errorf("Expected parallelism-limit form is: grpc-method-name=number. Received: %s.", limit)
}
limitNum, err := strconv.Atoi(parts[1])
if err != nil {
return nil, fmt.Errorf("Unable to convert limit specified in parallelism-limit=%s to number: %v", limit, err)
}
parallelism[parts[0]] = limitNum
}
return parallelism, nil
}
func newCache(redisAddress string, sentinelAddresses []string, sentinelMaster string) cache.Cache {
if redisAddress != "" {
client := redis.NewClient(&redis.Options{

View file

@ -28,6 +28,8 @@ import (
"github.com/argoproj/argo-cd/util/ksonnet"
"github.com/argoproj/argo-cd/util/kube"
"github.com/argoproj/argo-cd/util/kustomize"
"golang.org/x/sync/semaphore"
)
const (
@ -37,14 +39,21 @@ const (
// Service implements ManifestService interface
type Service struct {
repoLock *util.KeyLock
gitFactory git.ClientFactory
cache cache.Cache
repoLock *util.KeyLock
gitFactory git.ClientFactory
cache cache.Cache
parallelismLimitSemaphore *semaphore.Weighted
}
// NewService returns a new instance of the Manifest service
func NewService(gitFactory git.ClientFactory, cache cache.Cache) *Service {
func NewService(gitFactory git.ClientFactory, cache cache.Cache, parallelismLimit int64) *Service {
var parallelismLimitSemaphore *semaphore.Weighted
if parallelismLimit > 0 {
parallelismLimitSemaphore = semaphore.NewWeighted(parallelismLimit)
}
return &Service{
parallelismLimitSemaphore: parallelismLimitSemaphore,
repoLock: util.NewKeyLock(),
gitFactory: gitFactory,
cache: cache,
@ -165,6 +174,14 @@ func (s *Service) GenerateManifest(c context.Context, q *ManifestRequest) (*Mani
return cached, nil
}
if s.parallelismLimitSemaphore != nil {
err = s.parallelismLimitSemaphore.Acquire(c, 1)
if err != nil {
return nil, err
}
defer s.parallelismLimitSemaphore.Release(1)
}
commitSHA, err = checkoutRevision(gitClient, commitSHA)
if err != nil {
return nil, err

View file

@ -1,7 +1,6 @@
package reposerver
import (
"context"
"crypto/tls"
"github.com/argoproj/argo-cd/reposerver/repository"
@ -14,8 +13,6 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
log "github.com/sirupsen/logrus"
"github.com/yaronsumel/grpc-throttle"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
@ -23,14 +20,15 @@ import (
// ArgoCDRepoServer is the repo server implementation
type ArgoCDRepoServer struct {
log *log.Entry
gitFactory git.ClientFactory
cache cache.Cache
opts []grpc.ServerOption
log *log.Entry
gitFactory git.ClientFactory
cache cache.Cache
opts []grpc.ServerOption
parallelismLimit int64
}
// NewServer returns a new instance of the Argo CD Repo server
func NewServer(gitFactory git.ClientFactory, cache cache.Cache, tlsConfCustomizer tlsutil.ConfigCustomizer, parallelismLimit map[string]int) (*ArgoCDRepoServer, error) {
func NewServer(gitFactory git.ClientFactory, cache cache.Cache, tlsConfCustomizer tlsutil.ConfigCustomizer, parallelismLimit int64) (*ArgoCDRepoServer, error) {
// generate TLS cert
hosts := []string{
"localhost",
@ -52,25 +50,12 @@ func NewServer(gitFactory git.ClientFactory, cache cache.Cache, tlsConfCustomize
serverLog := log.NewEntry(log.New())
streamInterceptors := []grpc.StreamServerInterceptor{grpc_logrus.StreamServerInterceptor(serverLog), grpc_util.PanicLoggerStreamServerInterceptor(serverLog)}
unaryInterceptors := []grpc.UnaryServerInterceptor{grpc_logrus.UnaryServerInterceptor(serverLog), grpc_util.PanicLoggerUnaryServerInterceptor(serverLog)}
if len(parallelismLimit) > 0 {
var semaphores = throttle.SemaphoreMap{}
for m, l := range parallelismLimit {
semaphores[m] = make(throttle.Semaphore, l)
}
throttler := func(ctx context.Context, fullMethod string) (throttle.Semaphore, bool) {
if s, ok := semaphores[fullMethod]; ok {
return s, true
}
return nil, false
}
streamInterceptors = append(streamInterceptors, throttle.StreamServerInterceptor(throttler))
unaryInterceptors = append(unaryInterceptors, throttle.UnaryServerInterceptor(throttler))
}
return &ArgoCDRepoServer{
log: serverLog,
gitFactory: gitFactory,
cache: cache,
log: serverLog,
gitFactory: gitFactory,
cache: cache,
parallelismLimit: parallelismLimit,
opts: []grpc.ServerOption{
grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
@ -83,7 +68,7 @@ func NewServer(gitFactory git.ClientFactory, cache cache.Cache, tlsConfCustomize
func (a *ArgoCDRepoServer) CreateGRPC() *grpc.Server {
server := grpc.NewServer(a.opts...)
version.RegisterVersionServiceServer(server, &version.Server{})
manifestService := repository.NewService(a.gitFactory, a.cache)
manifestService := repository.NewService(a.gitFactory, a.cache, a.parallelismLimit)
repository.RegisterRepositoryServiceServer(server, manifestService)
// Register reflection service on gRPC server.

View file

@ -123,7 +123,7 @@ func (f *Fixture) setup() error {
}
memCache := cache.NewInMemoryCache(repository.DefaultRepoCacheExpiration)
repoSrv, err := reposerver.NewServer(&FakeGitClientFactory{}, memCache, func(config *tls.Config) {}, make(map[string]int))
repoSrv, err := reposerver.NewServer(&FakeGitClientFactory{}, memCache, func(config *tls.Config) {}, 0)
if err != nil {
return err
}