From d7a4ea2dc90c73d90e81fbcc24ee75a0477958de Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Wed, 25 Jul 2018 12:51:10 +0200 Subject: [PATCH 1/2] use helper for grpc.NewServer and grpc.Dial Signed-off-by: Maxim Sukharev --- cmd/dummy/main.go | 16 +++++++++------- cmd/lookout/event.go | 14 +++++++------- cmd/lookout/main.go | 2 -- cmd/lookout/serve.go | 13 +++++++------ dummy/dummy_test.go | 5 +++-- common.go => util/grpchelper/helper.go | 25 ++++++++++++++++++++++++- 6 files changed, 50 insertions(+), 25 deletions(-) rename common.go => util/grpchelper/helper.go (60%) diff --git a/cmd/dummy/main.go b/cmd/dummy/main.go index 6c622171..eb1460db 100644 --- a/cmd/dummy/main.go +++ b/cmd/dummy/main.go @@ -1,18 +1,18 @@ package main import ( + "context" "os" "github.com/src-d/lookout" "github.com/src-d/lookout/dummy" + "github.com/src-d/lookout/util/grpchelper" "github.com/jessevdk/go-flags" "google.golang.org/grpc" _ "google.golang.org/grpc/grpclog/glogger" ) -const maxMsgSize = 1024 * 1024 * 100 // 100mb - var ( version = "local_build_1" parser = flags.NewParser(nil, flags.Default) @@ -25,14 +25,16 @@ type ServeCommand struct { func (c *ServeCommand) Execute(args []string) error { var err error - c.DataServer, err = lookout.ToGoGrpcAddress(c.DataServer) + c.DataServer, err = grpchelper.ToGoGrpcAddress(c.DataServer) if err != nil { return err } - conn, err := grpc.Dial(c.DataServer, + conn, err := grpchelper.DialContext( + context.Background(), + c.DataServer, grpc.WithInsecure(), - grpc.WithDefaultCallOptions(grpc.FailFast(false), grpc.MaxCallRecvMsgSize(maxMsgSize)), + grpc.WithDefaultCallOptions(grpc.FailFast(false)), ) if err != nil { return err @@ -43,10 +45,10 @@ func (c *ServeCommand) Execute(args []string) error { DataClient: lookout.NewDataClient(conn), } - server := grpc.NewServer() + server := grpchelper.NewServer() lookout.RegisterAnalyzerServer(server, a) - lis, err := lookout.Listen(c.Analyzer) + lis, err := grpchelper.Listen(c.Analyzer) if err != nil { return err } diff --git a/cmd/lookout/event.go b/cmd/lookout/event.go index 0dcd381e..26db691f 100644 --- a/cmd/lookout/event.go +++ b/cmd/lookout/event.go @@ -9,6 +9,7 @@ import ( "github.com/src-d/lookout" "github.com/src-d/lookout/service/bblfsh" "github.com/src-d/lookout/service/git" + "github.com/src-d/lookout/util/grpchelper" "google.golang.org/grpc" gogit "gopkg.in/src-d/go-git.v4" @@ -84,13 +85,13 @@ func (c *EventCommand) makeDataServerHandler() (*lookout.DataServerHandler, erro loader := git.NewStorerCommitLoader(c.repo.Storer) dataService = git.NewService(loader) - c.Bblfshd, err = lookout.ToGoGrpcAddress(c.Bblfshd) + c.Bblfshd, err = grpchelper.ToGoGrpcAddress(c.Bblfshd) if err != nil { return nil, err } timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - bblfshConn, err := grpc.DialContext(timeoutCtx, c.Bblfshd, grpc.WithInsecure(), grpc.WithBlock()) + bblfshConn, err := grpchelper.DialContext(timeoutCtx, c.Bblfshd, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Warningf("bblfsh service is unavailable. No UAST will be provided to analyzer. Error: %s", err) } else { @@ -110,10 +111,10 @@ func (c *EventCommand) bindDataServer(srv *lookout.DataServerHandler, serveResul setGrpcLogger() } - grpcSrv := grpc.NewServer() + grpcSrv := grpchelper.NewServer() lookout.RegisterDataServer(grpcSrv, srv) - lis, err := lookout.Listen(c.DataServer) + lis, err := grpchelper.Listen(c.DataServer) if err != nil { return nil, err } @@ -126,19 +127,18 @@ func (c *EventCommand) bindDataServer(srv *lookout.DataServerHandler, serveResul func (c *EventCommand) analyzerClient() (lookout.AnalyzerClient, error) { var err error - c.Args.Analyzer, err = lookout.ToGoGrpcAddress(c.Args.Analyzer) + c.Args.Analyzer, err = grpchelper.ToGoGrpcAddress(c.Args.Analyzer) if err != nil { return nil, err } timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - conn, err := grpc.DialContext( + conn, err := grpchelper.DialContext( timeoutCtx, c.Args.Analyzer, grpc.WithInsecure(), grpc.WithBlock(), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), ) if err != nil { return nil, err diff --git a/cmd/lookout/main.go b/cmd/lookout/main.go index 950702d6..6cfaaadc 100644 --- a/cmd/lookout/main.go +++ b/cmd/lookout/main.go @@ -9,8 +9,6 @@ import ( "gopkg.in/src-d/go-log.v1" ) -const maxMsgSize = 1024 * 1024 * 100 // 100mb - var ( name = "lookout" version = "undefined" diff --git a/cmd/lookout/serve.go b/cmd/lookout/serve.go index ebaefe1c..21045d3e 100644 --- a/cmd/lookout/serve.go +++ b/cmd/lookout/serve.go @@ -10,6 +10,7 @@ import ( "github.com/src-d/lookout/provider/github" "github.com/src-d/lookout/service/bblfsh" "github.com/src-d/lookout/service/git" + "github.com/src-d/lookout/util/grpchelper" "google.golang.org/grpc" "gopkg.in/src-d/go-billy.v4/osfs" @@ -106,12 +107,12 @@ func (c *ServeCommand) initPoster() (lookout.Poster, error) { } func (c *ServeCommand) startAnalyzer(conf lookout.AnalyzerConfig) (lookout.AnalyzerClient, error) { - addr, err := lookout.ToGoGrpcAddress(conf.Addr) + addr, err := grpchelper.ToGoGrpcAddress(conf.Addr) if err != nil { return nil, err } - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) + conn, err := grpchelper.DialContext(context.Background(), addr, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return nil, err } @@ -121,12 +122,12 @@ func (c *ServeCommand) startAnalyzer(conf lookout.AnalyzerConfig) (lookout.Analy func (c *ServeCommand) initDataHadler() (*lookout.DataServerHandler, error) { var err error - c.Bblfshd, err = lookout.ToGoGrpcAddress(c.Bblfshd) + c.Bblfshd, err = grpchelper.ToGoGrpcAddress(c.Bblfshd) if err != nil { return nil, err } - bblfshConn, err := grpc.Dial(c.Bblfshd, grpc.WithInsecure()) + bblfshConn, err := grpchelper.DialContext(context.Background(), c.Bblfshd, grpc.WithInsecure()) if err != nil { return nil, err } @@ -147,9 +148,9 @@ func (c *ServeCommand) initDataHadler() (*lookout.DataServerHandler, error) { } func (c *ServeCommand) startServer(srv *lookout.DataServerHandler) error { - grpcSrv := grpc.NewServer() + grpcSrv := grpchelper.NewServer() lookout.RegisterDataServer(grpcSrv, srv) - lis, err := lookout.Listen(c.DataServer) + lis, err := grpchelper.Listen(c.DataServer) if err != nil { return err } diff --git a/dummy/dummy_test.go b/dummy/dummy_test.go index 48082a6e..3bf4e14b 100644 --- a/dummy/dummy_test.go +++ b/dummy/dummy_test.go @@ -7,6 +7,7 @@ import ( "github.com/src-d/lookout" "github.com/src-d/lookout/service/git" + "github.com/src-d/lookout/util/grpchelper" "github.com/stretchr/testify/suite" "google.golang.org/grpc" @@ -45,7 +46,7 @@ func (s *DummySuite) SetupSuite() { } lookout.RegisterDataServer(s.apiServer, server) - lis, err := lookout.Listen("ipv4://0.0.0.0:9991") + lis, err := grpchelper.Listen("ipv4://0.0.0.0:9991") require.NoError(err) go s.apiServer.Serve(lis) @@ -86,7 +87,7 @@ func (s *DummySuite) Test() { s.analyzerServer = grpc.NewServer() lookout.RegisterAnalyzerServer(s.analyzerServer, a) - lis, err := lookout.Listen("ipv4://0.0.0.0:9995") + lis, err := grpchelper.Listen("ipv4://0.0.0.0:9995") require.NoError(err) done := make(chan error) diff --git a/common.go b/util/grpchelper/helper.go similarity index 60% rename from common.go rename to util/grpchelper/helper.go index dd34889f..3ba326a9 100644 --- a/common.go +++ b/util/grpchelper/helper.go @@ -1,11 +1,17 @@ -package lookout +package grpchelper import ( + "context" "fmt" "net" "net/url" + + "google.golang.org/grpc" ) +// MaxMessageSize overrides default grpc max. message size to send/receive to/from clients +var MaxMessageSize = 100 * 1024 * 1024 // 100mb + //TODO: https://github.com/grpc/grpc-go/issues/1911 // ToNetListenerAddress converts a gRPC URL to a network+address consumable by @@ -65,3 +71,20 @@ func Listen(address string) (net.Listener, error) { return net.Listen(n, a) } + +// NewGrpcServer creates new grpc.Server with custom message size +func NewServer(opts ...grpc.ServerOption) *grpc.Server { + opts = append(opts, grpc.MaxRecvMsgSize(MaxMessageSize), grpc.MaxSendMsgSize(MaxMessageSize)) + + return grpc.NewServer(opts...) +} + +// GrpcDialContext creates a client connection to the given target with custom message size +func DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + opts = append(opts, grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(MaxMessageSize), + grpc.MaxCallSendMsgSize(MaxMessageSize), + )) + + return grpc.DialContext(ctx, target, opts...) +} From 3e83361a12182eb9d9df4e91471d10e0eb8d5a8b Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Wed, 25 Jul 2018 14:26:17 +0200 Subject: [PATCH 2/2] Make grpc max msg size configurable Signed-off-by: Maxim Sukharev --- cmd/dummy/main.go | 18 ++++---------- cmd/lookout/event.go | 2 ++ cmd/lookout/main.go | 16 +++---------- cmd/lookout/serve.go | 2 ++ util/flags/flags.go | 50 +++++++++++++++++++++++++++++++++++++++ util/grpchelper/helper.go | 23 ++++++++++++++---- 6 files changed, 79 insertions(+), 32 deletions(-) create mode 100644 util/flags/flags.go diff --git a/cmd/dummy/main.go b/cmd/dummy/main.go index eb1460db..4942a4c7 100644 --- a/cmd/dummy/main.go +++ b/cmd/dummy/main.go @@ -2,23 +2,23 @@ package main import ( "context" - "os" "github.com/src-d/lookout" "github.com/src-d/lookout/dummy" + "github.com/src-d/lookout/util/flags" "github.com/src-d/lookout/util/grpchelper" - "github.com/jessevdk/go-flags" "google.golang.org/grpc" _ "google.golang.org/grpc/grpclog/glogger" ) var ( version = "local_build_1" - parser = flags.NewParser(nil, flags.Default) + parser = flags.NewParser() ) type ServeCommand struct { + flags.CommonOptions Analyzer string `long:"analyzer" default:"ipv4://localhost:10302" env:"LOOKOUT_ANALYZER" description:"gRPC URL to bind the analyzer to"` DataServer string `long:"data-server" default:"ipv4://localhost:10301" env:"LOOKOUT_DATA_SERVER" description:"grPC URL of the data server"` } @@ -62,15 +62,5 @@ func main() { panic(err) } - if _, err := parser.Parse(); err != nil { - if err, ok := err.(*flags.Error); ok { - if err.Type == flags.ErrHelp { - os.Exit(0) - } - - parser.WriteHelp(os.Stdout) - } - - os.Exit(1) - } + flags.RunMain(parser) } diff --git a/cmd/lookout/event.go b/cmd/lookout/event.go index 26db691f..6ad6ca55 100644 --- a/cmd/lookout/event.go +++ b/cmd/lookout/event.go @@ -9,6 +9,7 @@ import ( "github.com/src-d/lookout" "github.com/src-d/lookout/service/bblfsh" "github.com/src-d/lookout/service/git" + "github.com/src-d/lookout/util/flags" "github.com/src-d/lookout/util/grpchelper" "google.golang.org/grpc" @@ -18,6 +19,7 @@ import ( ) type EventCommand struct { + flags.CommonOptions DataServer string `long:"data-server" default:"ipv4://localhost:10301" env:"LOOKOUT_DATA_SERVER" description:"gRPC URL to bind the data server to"` Bblfshd string `long:"bblfshd" default:"ipv4://localhost:9432" env:"LOOKOUT_BBLFSHD" description:"gRPC URL of the Bblfshd server"` GitDir string `long:"git-dir" default:"." env:"GIT_DIR" description:"path to the .git directory to analyze"` diff --git a/cmd/lookout/main.go b/cmd/lookout/main.go index 6cfaaadc..eb4678d8 100644 --- a/cmd/lookout/main.go +++ b/cmd/lookout/main.go @@ -4,7 +4,7 @@ import ( stdlog "log" "os" - "github.com/jessevdk/go-flags" + "github.com/src-d/lookout/util/flags" "google.golang.org/grpc/grpclog" "gopkg.in/src-d/go-log.v1" ) @@ -15,24 +15,14 @@ var ( build = "undefined" ) -var parser = flags.NewParser(nil, flags.Default) +var parser = flags.NewParser() func init() { log.DefaultLogger = log.New(log.Fields{"app": name}) } func main() { - if _, err := parser.Parse(); err != nil { - if err, ok := err.(*flags.Error); ok { - if err.Type == flags.ErrHelp { - os.Exit(0) - } - - parser.WriteHelp(os.Stdout) - } - - os.Exit(1) - } + flags.RunMain(parser) } func setGrpcLogger() { diff --git a/cmd/lookout/serve.go b/cmd/lookout/serve.go index 21045d3e..9f026781 100644 --- a/cmd/lookout/serve.go +++ b/cmd/lookout/serve.go @@ -10,6 +10,7 @@ import ( "github.com/src-d/lookout/provider/github" "github.com/src-d/lookout/service/bblfsh" "github.com/src-d/lookout/service/git" + "github.com/src-d/lookout/util/flags" "github.com/src-d/lookout/util/grpchelper" "google.golang.org/grpc" @@ -26,6 +27,7 @@ func init() { } type ServeCommand struct { + flags.CommonOptions ConfigFile string `long:"config" short:"c" default:"config.yml" env:"LOOKOUT_CONFIG_FILE" description:"path to configuration file"` GithubUser string `long:"github-user" env:"GITHUB_USER" description:"user for the GitHub API"` GithubToken string `long:"github-token" env:"GITHUB_TOKEN" description:"access token for the GitHub API"` diff --git a/util/flags/flags.go b/util/flags/flags.go new file mode 100644 index 00000000..ddc1088b --- /dev/null +++ b/util/flags/flags.go @@ -0,0 +1,50 @@ +package flags + +import ( + "os" + + flags "github.com/jessevdk/go-flags" + "github.com/src-d/lookout/util/grpchelper" +) + +// CommonOptions contains common flags for all commands +type CommonOptions struct { + GrpcMaxMsgSize int `long:"grpc-max-message-size" default:"100" env:"LOOKOUT_GRPC_MAX_MSG_SIZE" description:"max. message size to send/receive to/from clients (in MB)"` +} + +// GetGrpcMaxMsgSize implements GrpcMaxMsgSizer interface +func (o *CommonOptions) GetGrpcMaxMsgSize() int { + return o.GrpcMaxMsgSize +} + +// GrpcMaxMsgSizer is used to get gRPC maximum message size +type GrpcMaxMsgSizer interface { + GetGrpcMaxMsgSize() int +} + +// NewParser returns new flags.Parser +func NewParser() *flags.Parser { + parser := flags.NewParser(nil, flags.Default) + parser.CommandHandler = func(command flags.Commander, args []string) error { + if s, ok := command.(GrpcMaxMsgSizer); ok { + grpchelper.SetMaxMessageSize(s.GetGrpcMaxMsgSize()) + } + return command.Execute(args) + } + return parser +} + +// RunMain parses arguments and runs commands +func RunMain(parser *flags.Parser) { + if _, err := parser.Parse(); err != nil { + if err, ok := err.(*flags.Error); ok { + if err.Type == flags.ErrHelp { + os.Exit(0) + } + + parser.WriteHelp(os.Stdout) + } + + os.Exit(1) + } +} diff --git a/util/grpchelper/helper.go b/util/grpchelper/helper.go index 3ba326a9..b84d233c 100644 --- a/util/grpchelper/helper.go +++ b/util/grpchelper/helper.go @@ -5,12 +5,25 @@ import ( "fmt" "net" "net/url" + "os" "google.golang.org/grpc" + log "gopkg.in/src-d/go-log.v1" ) -// MaxMessageSize overrides default grpc max. message size to send/receive to/from clients -var MaxMessageSize = 100 * 1024 * 1024 // 100mb +var maxMessageSize = 100 * 1024 * 1024 // 100mb + +// SetMaxMessageSize overrides default grpc max. message size to send/receive to/from clients +func SetMaxMessageSize(size int) { + if size >= 2048 { + // Setting the hard limit of message size to less than 2GB since + // it may overflow an int value, and it should be big enough + log.Errorf(fmt.Errorf("max-message-size too big (limit is 2047MB): %d", size), "SetMaxMessageSize") + os.Exit(1) + } + + maxMessageSize = size * 1024 * 1024 +} //TODO: https://github.com/grpc/grpc-go/issues/1911 @@ -74,7 +87,7 @@ func Listen(address string) (net.Listener, error) { // NewGrpcServer creates new grpc.Server with custom message size func NewServer(opts ...grpc.ServerOption) *grpc.Server { - opts = append(opts, grpc.MaxRecvMsgSize(MaxMessageSize), grpc.MaxSendMsgSize(MaxMessageSize)) + opts = append(opts, grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize)) return grpc.NewServer(opts...) } @@ -82,8 +95,8 @@ func NewServer(opts ...grpc.ServerOption) *grpc.Server { // GrpcDialContext creates a client connection to the given target with custom message size func DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { opts = append(opts, grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(MaxMessageSize), - grpc.MaxCallSendMsgSize(MaxMessageSize), + grpc.MaxCallRecvMsgSize(maxMessageSize), + grpc.MaxCallSendMsgSize(maxMessageSize), )) return grpc.DialContext(ctx, target, opts...)