chore: use grpc-middleware interceptors (#22329)

Signed-off-by: Matthieu MOREL <matthieu.morel35@gmail.com>
This commit is contained in:
Matthieu MOREL 2025-03-13 16:25:38 +01:00 committed by GitHub
parent cae840bb13
commit 83257a9e73
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 17 additions and 39 deletions

View file

@ -9,6 +9,7 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@ -52,13 +53,13 @@ func NewServer(initConstants plugin.CMPServerInitConstants) (*ArgoCDCMPServer, e
otelgrpc.StreamServerInterceptor(), //nolint:staticcheck // TODO: ignore SA1019 for depreciation: see https://github.com/argoproj/argo-cd/issues/18258
logging.StreamServerInterceptor(grpc_util.InterceptorLogger(serverLog)),
serverMetrics.StreamServerInterceptor(),
grpc_util.PanicLoggerStreamServerInterceptor(serverLog),
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpc_util.LoggerRecoveryHandler(serverLog))),
}
unaryInterceptors := []grpc.UnaryServerInterceptor{
otelgrpc.UnaryServerInterceptor(), //nolint:staticcheck // TODO: ignore SA1019 for depreciation: see https://github.com/argoproj/argo-cd/issues/18258
logging.UnaryServerInterceptor(grpc_util.InterceptorLogger(serverLog)),
serverMetrics.UnaryServerInterceptor(),
grpc_util.PanicLoggerUnaryServerInterceptor(serverLog),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpc_util.LoggerRecoveryHandler(serverLog))),
}
serverOpts := []grpc.ServerOption{

View file

@ -12,6 +12,7 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/timeout"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -60,7 +61,7 @@ func NewConnection(address string, timeoutSeconds int, tlsConfig *TLSConfigurati
}
unaryInterceptors := []grpc.UnaryClientInterceptor{grpc_retry.UnaryClientInterceptor(retryOpts...)}
if timeoutSeconds > 0 {
unaryInterceptors = append(unaryInterceptors, argogrpc.WithTimeout(time.Duration(timeoutSeconds)*time.Second))
unaryInterceptors = append(unaryInterceptors, timeout.UnaryClientInterceptor(time.Duration(timeoutSeconds)*time.Second))
}
opts := []grpc.DialOption{
grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)),

View file

@ -8,6 +8,7 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@ -76,13 +77,13 @@ func NewServer(metricsServer *metrics.MetricsServer, cache *reposervercache.Cach
otelgrpc.StreamServerInterceptor(), //nolint:staticcheck // TODO: ignore SA1019 for depreciation: see https://github.com/argoproj/argo-cd/issues/18258
logging.StreamServerInterceptor(grpc_util.InterceptorLogger(serverLog)),
serverMetrics.StreamServerInterceptor(),
grpc_util.PanicLoggerStreamServerInterceptor(serverLog),
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpc_util.LoggerRecoveryHandler(serverLog))),
}
unaryInterceptors := []grpc.UnaryServerInterceptor{
otelgrpc.UnaryServerInterceptor(), //nolint:staticcheck // TODO: ignore SA1019 for depreciation: see https://github.com/argoproj/argo-cd/issues/18258
logging.UnaryServerInterceptor(grpc_util.InterceptorLogger(serverLog)),
serverMetrics.UnaryServerInterceptor(),
grpc_util.PanicLoggerUnaryServerInterceptor(serverLog),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpc_util.LoggerRecoveryHandler(serverLog))),
grpc_util.ErrorSanitizerUnaryServerInterceptor(),
}

View file

@ -35,6 +35,7 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/prometheus/client_golang/prometheus"
@ -943,7 +944,7 @@ func (server *ArgoCDServer) newGRPCServer() (*grpc.Server, application.AppResour
}),
grpc_util.ErrorCodeK8sStreamServerInterceptor(),
grpc_util.ErrorCodeGitStreamServerInterceptor(),
grpc_util.PanicLoggerStreamServerInterceptor(server.log),
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpc_util.LoggerRecoveryHandler(server.log))),
))
sOpts = append(sOpts, grpc.ChainUnaryInterceptor(
bug21955WorkaroundInterceptor,
@ -957,7 +958,7 @@ func (server *ArgoCDServer) newGRPCServer() (*grpc.Server, application.AppResour
}),
grpc_util.ErrorCodeK8sUnaryServerInterceptor(),
grpc_util.ErrorCodeGitUnaryServerInterceptor(),
grpc_util.PanicLoggerUnaryServerInterceptor(server.log),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpc_util.LoggerRecoveryHandler(server.log))),
))
grpcS := grpc.NewServer(sOpts...)

View file

@ -9,6 +9,7 @@ import (
"strings"
"time"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/sirupsen/logrus"
"golang.org/x/net/proxy"
"google.golang.org/grpc"
@ -21,29 +22,11 @@ import (
"github.com/argoproj/argo-cd/v3/common"
)
// PanicLoggerUnaryServerInterceptor returns a new unary server interceptor for recovering from panics and returning error
func PanicLoggerUnaryServerInterceptor(log *logrus.Entry) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, err error) {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
err = status.Errorf(codes.Internal, "%s", r)
}
}()
return handler(ctx, req)
}
}
// PanicLoggerStreamServerInterceptor returns a new streaming server interceptor for recovering from panics and returning error
func PanicLoggerStreamServerInterceptor(log *logrus.Entry) grpc.StreamServerInterceptor {
return func(srv any, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
err = status.Errorf(codes.Internal, "%s", r)
}
}()
return handler(srv, stream)
// LoggerRecoveryHandler return a handler for recovering from panics and returning error
func LoggerRecoveryHandler(log *logrus.Entry) recovery.RecoveryHandlerFunc {
return func(p any) (err error) {
log.Errorf("Recovered from panic: %+v\n%s", p, debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
}
@ -167,12 +150,3 @@ func TestTLS(address string, dialTime time.Duration) (*TLSTestResult, error) {
}
return nil, err
}
func WithTimeout(duration time.Duration) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
clientDeadline := time.Now().Add(duration)
ctx, cancel := context.WithDeadline(ctx, clientDeadline)
defer cancel()
return invoker(ctx, method, req, reply, cc, opts...)
}
}