diff --git a/build/testing/integration/integration.go b/build/testing/integration/integration.go index a96e7175c..b8fac398d 100644 --- a/build/testing/integration/integration.go +++ b/build/testing/integration/integration.go @@ -25,7 +25,7 @@ import ( ) var ( - fliptAddr = flag.String("flipt-addr", "grpc://localhost:9000", "Address for running Flipt instance (gRPC only)") + fliptAddr = flag.String("flipt-addr", "grpc://localhost:9000", "Address for running Flipt instance") fliptToken = flag.String("flipt-token", "", "Full-Access authentication token to be used during test suite") fliptReferences = flag.Bool("flipt-supports-references", false, "Identifies the backend as supporting references") ) diff --git a/cmd/flipt/main.go b/cmd/flipt/main.go index a9f18182e..3469f4dc6 100644 --- a/cmd/flipt/main.go +++ b/cmd/flipt/main.go @@ -352,13 +352,20 @@ func run(ctx context.Context, logger *zap.Logger, cfg *config.Config) error { // acts as a registry for all grpc services so they can be shared between // the grpc server and the in-process client connection - ipch := &inprocgrpc.Channel{} + var ( + ipch = &inprocgrpc.Channel{} + handlers = &grpchan.HandlerMap{} + ) - grpcServer, err := cmd.NewGRPCServer(ctx, logger, cfg, ipch, info, forceMigrate) + handlers.ForEach(ipch.RegisterService) + + grpcServer, err := cmd.NewGRPCServer(ctx, logger, cfg, handlers, info, forceMigrate) if err != nil { return err } + handlers.ForEach(grpcServer.RegisterService) + // register all grpc services onto the grpc server // starts grpc server g.Go(grpcServer.Run) diff --git a/internal/cmd/authn.go b/internal/cmd/authn.go index b8617f75d..082a0ce97 100644 --- a/internal/cmd/authn.go +++ b/internal/cmd/authn.go @@ -9,7 +9,7 @@ import ( "regexp" "strings" - "github.com/fullstorydev/grpchan/inprocgrpc" + "github.com/fullstorydev/grpchan" "github.com/go-chi/chi/v5" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/selector" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -91,7 +91,7 @@ func authenticationGRPC( ctx context.Context, logger *zap.Logger, cfg *config.Config, - ipch *inprocgrpc.Channel, + handlers *grpchan.HandlerMap, forceMigrate bool, tokenDeletedEnabled bool, authOpts ...containers.Option[authmiddlewaregrpc.InterceptorOptions], @@ -108,8 +108,8 @@ func authenticationGRPC( // All that is required to establish a connection for authentication is to either make auth required // or configure at-least one authentication method (e.g. enable token method). if !authCfg.Enabled() && (cfg.Storage.Type != config.DatabaseStorageType) { - rpcauth.RegisterPublicAuthenticationServiceServer(ipch, public.NewServer(logger, authCfg)) - rpcauth.RegisterAuthenticationServiceServer(ipch, authn.NewServer(logger, storageauthmemory.NewStore())) + rpcauth.RegisterPublicAuthenticationServiceServer(handlers, public.NewServer(logger, authCfg)) + rpcauth.RegisterAuthenticationServiceServer(handlers, authn.NewServer(logger, storageauthmemory.NewStore())) return nil, shutdown, nil } @@ -128,8 +128,8 @@ func authenticationGRPC( var interceptors []grpc.UnaryServerInterceptor - rpcauth.RegisterPublicAuthenticationServiceServer(ipch, public.NewServer(logger, authCfg)) - rpcauth.RegisterAuthenticationServiceServer(ipch, authn.NewServer(logger, store, authn.WithAuditLoggingEnabled(tokenDeletedEnabled))) + rpcauth.RegisterPublicAuthenticationServiceServer(handlers, public.NewServer(logger, authCfg)) + rpcauth.RegisterAuthenticationServiceServer(handlers, authn.NewServer(logger, store, authn.WithAuditLoggingEnabled(tokenDeletedEnabled))) // register auth method token service if authCfg.Methods.Token.Enabled { @@ -160,20 +160,20 @@ func authenticationGRPC( logger.Info("access token created", zap.String("client_token", clientToken)) } - rpcauth.RegisterAuthenticationMethodTokenServiceServer(ipch, authtoken.NewServer(logger, store)) + rpcauth.RegisterAuthenticationMethodTokenServiceServer(handlers, authtoken.NewServer(logger, store)) logger.Debug("authentication method \"token\" server registered") } // register auth method oidc service if authCfg.Methods.OIDC.Enabled { - rpcauth.RegisterAuthenticationMethodOIDCServiceServer(ipch, authoidc.NewServer(logger, store, authCfg)) + rpcauth.RegisterAuthenticationMethodOIDCServiceServer(handlers, authoidc.NewServer(logger, store, authCfg)) logger.Debug("authentication method \"oidc\" server registered") } if authCfg.Methods.Github.Enabled { - rpcauth.RegisterAuthenticationMethodGithubServiceServer(ipch, authgithub.NewServer(logger, store, authCfg)) + rpcauth.RegisterAuthenticationMethodGithubServiceServer(handlers, authgithub.NewServer(logger, store, authCfg)) logger.Debug("authentication method \"github\" registered") } @@ -183,7 +183,7 @@ func authenticationGRPC( if err != nil { return nil, nil, fmt.Errorf("configuring kubernetes authentication: %w", err) } - rpcauth.RegisterAuthenticationMethodKubernetesServiceServer(ipch, kubernetesServer) + rpcauth.RegisterAuthenticationMethodKubernetesServiceServer(handlers, kubernetesServer) logger.Debug("authentication method \"kubernetes\" server registered") } diff --git a/internal/cmd/grpc.go b/internal/cmd/grpc.go index 6b413b5bf..c5d1d6164 100644 --- a/internal/cmd/grpc.go +++ b/internal/cmd/grpc.go @@ -16,7 +16,7 @@ import ( "go.opentelemetry.io/contrib/propagators/autoprop" sq "github.com/Masterminds/squirrel" - "github.com/fullstorydev/grpchan/inprocgrpc" + "github.com/fullstorydev/grpchan" "go.flipt.io/flipt/internal/cache" "go.flipt.io/flipt/internal/cache/memory" "go.flipt.io/flipt/internal/cache/redis" @@ -69,7 +69,6 @@ import ( "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -99,7 +98,7 @@ func NewGRPCServer( ctx context.Context, logger *zap.Logger, cfg *config.Config, - ipch *inprocgrpc.Channel, + handlers *grpchan.HandlerMap, info info.Flipt, forceMigrate bool, ) (*GRPCServer, error) { @@ -292,7 +291,7 @@ func NewGRPCServer( ctx, logger, cfg, - ipch, + handlers, forceMigrate, tokenDeletedEnabled, authnOpts..., @@ -318,24 +317,24 @@ func NewGRPCServer( server.onShutdown(func(ctx context.Context) error { return analyticsExporter.Shutdown(ctx) }) - rpcanalytics.RegisterAnalyticsServiceServer(ipch, analytics.New(logger, client)) + rpcanalytics.RegisterAnalyticsServiceServer(handlers, analytics.New(logger, client)) logger.Debug("analytics enabled", zap.String("database", client.String()), zap.String("flush_period", cfg.Analytics.Buffer.FlushPeriod.String())) } else if cfg.Analytics.Storage.Prometheus.Enabled { client, err := prometheus.New(logger, cfg) if err != nil { return nil, err } - rpcanalytics.RegisterAnalyticsServiceServer(ipch, analytics.New(logger, client)) + rpcanalytics.RegisterAnalyticsServiceServer(handlers, analytics.New(logger, client)) logger.Debug("analytics enabled", zap.String("database", client.String())) } } // register servers - rpcflipt.RegisterFliptServer(ipch, fliptsrv) - rpcmeta.RegisterMetadataServiceServer(ipch, metasrv) - rpcevaluation.RegisterEvaluationServiceServer(ipch, evalsrv) - rpcevaluation.RegisterDataServiceServer(ipch, evaldatasrv) - rpcoffrep.RegisterOFREPServiceServer(ipch, ofrepsrv) + rpcflipt.RegisterFliptServer(handlers, fliptsrv) + rpcmeta.RegisterMetadataServiceServer(handlers, metasrv) + rpcevaluation.RegisterEvaluationServiceServer(handlers, evalsrv) + rpcevaluation.RegisterDataServiceServer(handlers, evaldatasrv) + rpcoffrep.RegisterOFREPServiceServer(handlers, ofrepsrv) // forward internal gRPC logging to zap grpcLogLevel, err := zapcore.ParseLevel(cfg.Log.GRPCLevel) @@ -488,8 +487,7 @@ func NewGRPCServer( // initialize grpc server server.Server = grpc.NewServer(grpcOpts...) - grpc_health.RegisterHealthServer(ipch, healthsrv) - ipch.WithServerUnaryInterceptor(grpc_middleware.ChainUnaryServer(interceptors...)) + grpc_health.RegisterHealthServer(handlers, healthsrv) // register grpcServer graceful stop on shutdown server.onShutdown(func(context.Context) error { diff --git a/internal/cmd/http.go b/internal/cmd/http.go index 8062f6dcb..7fe95c41d 100644 --- a/internal/cmd/http.go +++ b/internal/cmd/http.go @@ -185,9 +185,7 @@ func NewHTTPServer( // mount the metadata service to the chi router under /meta. r.Mount("/meta", runtime.NewServeMux( - func(mux *runtime.ServeMux) { - meta.RegisterMetadataServiceHandlerClient(ctx, mux, meta.NewMetadataServiceClient(conn)) - }, + register(ctx, meta.NewMetadataServiceClient(conn), meta.RegisterMetadataServiceHandlerClient), runtime.WithMetadata(method.ForwardPrefix), )) })