Skip to content

Commit

Permalink
refactor(server/v2): eager config loading (#22267)
Browse files Browse the repository at this point in the history
  • Loading branch information
kocubinski authored Oct 29, 2024
1 parent 7b9a89b commit 31f97e9
Show file tree
Hide file tree
Showing 39 changed files with 1,045 additions and 676 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/runtime/v2/ @julienrbrt @hieuvubk @cosmos/sdk-core-dev
/schema/ @aaronc @testinginprod @cosmos/sdk-core-dev
/server/ @cosmos/sdk-core-dev
/server/v2/ @julienrbrt @hieuvubk @cosmos/sdk-core-dev
/server/v2/ @julienrbrt @hieuvubk @kocubinski @cosmos/sdk-core-dev
/server/v2/stf/ @testinginprod @kocubinski @cosmos/sdk-core-dev
/server/v2/appmanager/ @testinginprod @facundomedica @cosmos/sdk-core-dev
/server/v2/cometbft/ @facundomedica @sontrinh16 @cosmos/sdk-core-dev
Expand Down
46 changes: 46 additions & 0 deletions runtime/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package runtime

import (
"cosmossdk.io/core/server"
"cosmossdk.io/depinject"
)

// ModuleConfigMaps is a map module scoped ConfigMaps
type ModuleConfigMaps map[string]server.ConfigMap

type ModuleConfigMapsInput struct {
depinject.In

ModuleConfigs []server.ModuleConfigMap
DynamicConfig server.DynamicConfig `optional:"true"`
}

// ProvideModuleConfigMaps returns a map of module name to module config map.
// The module config map is a map of flag to value.
func ProvideModuleConfigMaps(in ModuleConfigMapsInput) ModuleConfigMaps {
moduleConfigMaps := make(ModuleConfigMaps)
if in.DynamicConfig == nil {
return moduleConfigMaps
}
for _, moduleConfig := range in.ModuleConfigs {
cfg := moduleConfig.Config
name := moduleConfig.Module
moduleConfigMaps[name] = make(server.ConfigMap)
for flag, df := range cfg {
val := in.DynamicConfig.Get(flag)
if val != nil {
moduleConfigMaps[name][flag] = val
} else {
moduleConfigMaps[name][flag] = df
}
}
}
return moduleConfigMaps
}

func ProvideModuleScopedConfigMap(
key depinject.ModuleKey,
moduleConfigs ModuleConfigMaps,
) server.ConfigMap {
return moduleConfigs[key.Name()]
}
2 changes: 2 additions & 0 deletions runtime/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func init() {
ProvideTransientStoreService,
ProvideModuleManager,
ProvideCometService,
ProvideModuleConfigMaps,
ProvideModuleScopedConfigMap,
),
appconfig.Invoke(SetupAppBuilder),
)
Expand Down
2 changes: 1 addition & 1 deletion runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (a *App[T]) LoadLatestHeight() (uint64, error) {
return a.db.GetLatestVersion()
}

// GetQueryHandlers returns the query handlers.
// QueryHandlers returns the query handlers.
func (a *App[T]) QueryHandlers() map[string]appmodulev2.Handler {
return a.queryHandlers
}
Expand Down
10 changes: 6 additions & 4 deletions runtime/v2/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type AppBuilder[T transaction.Tx] struct {
app *App[T]
storeBuilder root.Builder
storeConfig *root.Config

// the following fields are used to overwrite the default
branch func(state store.ReaderMap) store.WriterMap
Expand Down Expand Up @@ -82,12 +83,13 @@ func (a *AppBuilder[T]) Build(opts ...AppBuilderOption[T]) (*App[T], error) {
}
}

a.app.db = a.storeBuilder.Get()
if a.app.db == nil {
return nil, fmt.Errorf("storeBuilder did not return a db")
var err error
a.app.db, err = a.storeBuilder.Build(a.app.logger, a.storeConfig)
if err != nil {
return nil, err
}

if err := a.app.moduleManager.RegisterServices(a.app); err != nil {
if err = a.app.moduleManager.RegisterServices(a.app); err != nil {
return nil, err
}

Expand Down
67 changes: 67 additions & 0 deletions runtime/v2/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package runtime

import (
"strings"

"cosmossdk.io/core/server"
"cosmossdk.io/depinject"
)

// GlobalConfig is a recursive configuration map containing configuration
// key-value pairs parsed from the configuration file, flags, or other
// input sources.
//
// It is aliased to server.ConfigMap so that DI can distinguish between
// module-scoped and global configuration maps. In the DI container `server.ConfigMap`
// objects are module-scoped and `GlobalConfig` is global-scoped.
type GlobalConfig server.ConfigMap

// ModuleConfigMaps is a map module scoped ConfigMaps
type ModuleConfigMaps map[string]server.ConfigMap

// ProvideModuleConfigMaps returns a map of module name to module config map.
// The module config map is a map of flag to value.
func ProvideModuleConfigMaps(
moduleConfigs []server.ModuleConfigMap,
globalConfig GlobalConfig,
) ModuleConfigMaps {
moduleConfigMaps := make(ModuleConfigMaps)
for _, moduleConfig := range moduleConfigs {
cfg := moduleConfig.Config
name := moduleConfig.Module
moduleConfigMaps[name] = make(server.ConfigMap)
for flag, df := range cfg {
m := globalConfig
fetchFlag := flag
// splitting on "." is required to handle nested flags which are defined
// in other modules that are not the current module
// for example: "server.minimum-gas-prices" is defined in the server module
// but required by x/validate
for _, part := range strings.Split(flag, ".") {
if maybeMap, ok := m[part]; ok {
innerMap, ok := maybeMap.(map[string]any)
if !ok {
fetchFlag = part
break
}
m = innerMap
} else {
break
}
}
if val, ok := m[fetchFlag]; ok {
moduleConfigMaps[name][flag] = val
} else {
moduleConfigMaps[name][flag] = df
}
}
}
return moduleConfigMaps
}

func ProvideModuleScopedConfigMap(
key depinject.ModuleKey,
moduleConfigs ModuleConfigMaps,
) server.ConfigMap {
return moduleConfigs[key.Name()]
}
6 changes: 5 additions & 1 deletion runtime/v2/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func init() {
ProvideModuleManager[transaction.Tx],
ProvideEnvironment,
ProvideKVService,
ProvideModuleConfigMaps,
ProvideModuleScopedConfigMap,
),
appconfig.Invoke(SetupAppBuilder),
)
Expand All @@ -108,6 +110,7 @@ func ProvideAppBuilder[T transaction.Tx](
interfaceRegistrar registry.InterfaceRegistrar,
amino registry.AminoRegistrar,
storeBuilder root.Builder,
storeConfig *root.Config,
) (
*AppBuilder[T],
*stf.MsgRouterBuilder,
Expand All @@ -134,14 +137,15 @@ func ProvideAppBuilder[T transaction.Tx](
queryHandlers: map[string]appmodulev2.Handler{},
storeLoader: DefaultStoreLoader,
}
appBuilder := &AppBuilder[T]{app: app, storeBuilder: storeBuilder}
appBuilder := &AppBuilder[T]{app: app, storeBuilder: storeBuilder, storeConfig: storeConfig}

return appBuilder, msgRouterBuilder, appModule[T]{app}, protoFiles, protoTypes
}

type AppInputs struct {
depinject.In

StoreConfig *root.Config
Config *runtimev2.Module
AppBuilder *AppBuilder[transaction.Tx]
ModuleManager *MM[transaction.Tx]
Expand Down
53 changes: 32 additions & 21 deletions server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
Expand All @@ -42,40 +43,50 @@ type Server[T transaction.Tx] struct {
}

// New creates a new grpc server.
func New[T transaction.Tx](cfgOptions ...CfgOption) *Server[T] {
return &Server[T]{
func New[T transaction.Tx](
logger log.Logger,
interfaceRegistry server.InterfaceRegistry,
queryHandlers map[string]appmodulev2.Handler,
queryable interface {
Query(ctx context.Context, version uint64, msg transaction.Msg) (transaction.Msg, error)
},
cfg server.ConfigMap,
cfgOptions ...CfgOption,
) (*Server[T], error) {
srv := &Server[T]{
cfgOptions: cfgOptions,
}
}

// Init returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server.
func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error {
serverCfg := s.Config().(*Config)
serverCfg := srv.Config().(*Config)
if len(cfg) > 0 {
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
if err := serverv2.UnmarshalSubConfig(cfg, srv.Name(), &serverCfg); err != nil {
return nil, fmt.Errorf("failed to unmarshal config: %w", err)
}
}
methodsMap := appI.QueryHandlers()

grpcSrv := grpc.NewServer(
grpc.ForceServerCodec(newProtoCodec(appI.InterfaceRegistry()).GRPCCodec()),
grpc.ForceServerCodec(newProtoCodec(interfaceRegistry).GRPCCodec()),
grpc.MaxSendMsgSize(serverCfg.MaxSendMsgSize),
grpc.MaxRecvMsgSize(serverCfg.MaxRecvMsgSize),
grpc.UnknownServiceHandler(
makeUnknownServiceHandler(methodsMap, appI),
),
grpc.UnknownServiceHandler(makeUnknownServiceHandler(queryHandlers, queryable)),
)

// Reflection allows external clients to see what services and methods the gRPC server exposes.
gogoreflection.Register(grpcSrv, slices.Collect(maps.Keys(methodsMap)), logger.With("sub-module", "grpc-reflection"))
gogoreflection.Register(grpcSrv, slices.Collect(maps.Keys(queryHandlers)), logger.With("sub-module", "grpc-reflection"))

s.grpcSrv = grpcSrv
s.config = serverCfg
s.logger = logger.With(log.ModuleKey, s.Name())
srv.grpcSrv = grpcSrv
srv.config = serverCfg
srv.logger = logger.With(log.ModuleKey, srv.Name())

return nil
return srv, nil
}

// NewWithConfigOptions creates a new GRPC server with the provided config options.
// It is *not* a fully functional server (since it has been created without dependencies)
// The returned server should only be used to get and set configuration.
func NewWithConfigOptions[T transaction.Tx](opts ...CfgOption) *Server[T] {
return &Server[T]{
cfgOptions: opts,
}
}

func (s *Server[T]) StartCmdFlags() *pflag.FlagSet {
Expand Down Expand Up @@ -186,7 +197,7 @@ func (s *Server[T]) Config() any {
return s.config
}

func (s *Server[T]) Start(ctx context.Context) error {
func (s *Server[T]) Start(context.Context) error {
if !s.config.Enable {
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
return nil
Expand Down
41 changes: 23 additions & 18 deletions server/v2/api/grpcgateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"

"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
Expand All @@ -34,7 +35,13 @@ type Server[T transaction.Tx] struct {
}

// New creates a new gRPC-gateway server.
func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *Server[T] {
func New[T transaction.Tx](
logger log.Logger,
config server.ConfigMap,
grpcSrv *grpc.Server,
ir jsonpb.AnyResolver,
cfgOptions ...CfgOption,
) (*Server[T], error) {
// The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields.
// Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue.
marshalerOption := &gateway.JSONPb{
Expand All @@ -44,7 +51,7 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio
AnyResolver: ir,
}

return &Server[T]{
s := &Server[T]{
gRPCSrv: grpcSrv,
gRPCGatewayRouter: runtime.NewServeMux(
// Custom marshaler option is required for gogo proto
Expand All @@ -60,6 +67,20 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio
),
cfgOptions: cfgOptions,
}

serverCfg := s.Config().(*Config)
if len(config) > 0 {
if err := serverv2.UnmarshalSubConfig(config, s.Name(), &serverCfg); err != nil {
return s, fmt.Errorf("failed to unmarshal config: %w", err)
}
}

// TODO: register the gRPC-Gateway routes

s.logger = logger.With(log.ModuleKey, s.Name())
s.config = serverCfg

return s, nil
}

func (s *Server[T]) Name() string {
Expand All @@ -80,22 +101,6 @@ func (s *Server[T]) Config() any {
return s.config
}

func (s *Server[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error {
serverCfg := s.Config().(*Config)
if len(cfg) > 0 {
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}

// TODO: register the gRPC-Gateway routes

s.logger = logger.With(log.ModuleKey, s.Name())
s.config = serverCfg

return nil
}

func (s *Server[T]) Start(ctx context.Context) error {
if !s.config.Enable {
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
Expand Down
Loading

0 comments on commit 31f97e9

Please sign in to comment.