Skip to content

Commit

Permalink
Merge pull request #56 from smacker/grpc_max_msg_size
Browse files Browse the repository at this point in the history
Grpc max msg size
  • Loading branch information
smacker authored Jul 25, 2018
2 parents 0ac15b5 + 3e83361 commit e917f9a
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 52 deletions.
34 changes: 13 additions & 21 deletions cmd/dummy/main.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,40 @@
package main

import (
"os"
"context"

"github.com/src-d/lookout"
"github.com/src-d/lookout/dummy"
"github.com/src-d/lookout/util/flags"
"github.com/src-d/lookout/util/grpchelper"

"github.com/jessevdk/go-flags"
"google.golang.org/grpc"
_ "google.golang.org/grpc/grpclog/glogger"
)

const maxMsgSize = 1024 * 1024 * 100 // 100mb

var (
version = "local_build_1"
parser = flags.NewParser(nil, flags.Default)
parser = flags.NewParser()
)

type ServeCommand struct {
flags.CommonOptions
Analyzer string `long:"analyzer" default:"ipv4://localhost:10302" env:"LOOKOUT_ANALYZER" description:"gRPC URL to bind the analyzer to"`
DataServer string `long:"data-server" default:"ipv4://localhost:10301" env:"LOOKOUT_DATA_SERVER" description:"grPC URL of the data server"`
}

func (c *ServeCommand) Execute(args []string) error {
var err error
c.DataServer, err = lookout.ToGoGrpcAddress(c.DataServer)
c.DataServer, err = grpchelper.ToGoGrpcAddress(c.DataServer)
if err != nil {
return err
}

conn, err := grpc.Dial(c.DataServer,
conn, err := grpchelper.DialContext(
context.Background(),
c.DataServer,
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.FailFast(false), grpc.MaxCallRecvMsgSize(maxMsgSize)),
grpc.WithDefaultCallOptions(grpc.FailFast(false)),
)
if err != nil {
return err
Expand All @@ -43,10 +45,10 @@ func (c *ServeCommand) Execute(args []string) error {
DataClient: lookout.NewDataClient(conn),
}

server := grpc.NewServer()
server := grpchelper.NewServer()
lookout.RegisterAnalyzerServer(server, a)

lis, err := lookout.Listen(c.Analyzer)
lis, err := grpchelper.Listen(c.Analyzer)
if err != nil {
return err
}
Expand All @@ -60,15 +62,5 @@ func main() {
panic(err)
}

if _, err := parser.Parse(); err != nil {
if err, ok := err.(*flags.Error); ok {
if err.Type == flags.ErrHelp {
os.Exit(0)
}

parser.WriteHelp(os.Stdout)
}

os.Exit(1)
}
flags.RunMain(parser)
}
16 changes: 9 additions & 7 deletions cmd/lookout/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/src-d/lookout"
"github.com/src-d/lookout/service/bblfsh"
"github.com/src-d/lookout/service/git"
"github.com/src-d/lookout/util/flags"
"github.com/src-d/lookout/util/grpchelper"
"google.golang.org/grpc"

gogit "gopkg.in/src-d/go-git.v4"
Expand All @@ -17,6 +19,7 @@ import (
)

type EventCommand struct {
flags.CommonOptions
DataServer string `long:"data-server" default:"ipv4://localhost:10301" env:"LOOKOUT_DATA_SERVER" description:"gRPC URL to bind the data server to"`
Bblfshd string `long:"bblfshd" default:"ipv4://localhost:9432" env:"LOOKOUT_BBLFSHD" description:"gRPC URL of the Bblfshd server"`
GitDir string `long:"git-dir" default:"." env:"GIT_DIR" description:"path to the .git directory to analyze"`
Expand Down Expand Up @@ -84,13 +87,13 @@ func (c *EventCommand) makeDataServerHandler() (*lookout.DataServerHandler, erro
loader := git.NewStorerCommitLoader(c.repo.Storer)
dataService = git.NewService(loader)

c.Bblfshd, err = lookout.ToGoGrpcAddress(c.Bblfshd)
c.Bblfshd, err = grpchelper.ToGoGrpcAddress(c.Bblfshd)
if err != nil {
return nil, err
}
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
bblfshConn, err := grpc.DialContext(timeoutCtx, c.Bblfshd, grpc.WithInsecure(), grpc.WithBlock())
bblfshConn, err := grpchelper.DialContext(timeoutCtx, c.Bblfshd, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Warningf("bblfsh service is unavailable. No UAST will be provided to analyzer. Error: %s", err)
} else {
Expand All @@ -110,10 +113,10 @@ func (c *EventCommand) bindDataServer(srv *lookout.DataServerHandler, serveResul
setGrpcLogger()
}

grpcSrv := grpc.NewServer()
grpcSrv := grpchelper.NewServer()
lookout.RegisterDataServer(grpcSrv, srv)

lis, err := lookout.Listen(c.DataServer)
lis, err := grpchelper.Listen(c.DataServer)
if err != nil {
return nil, err
}
Expand All @@ -126,19 +129,18 @@ func (c *EventCommand) bindDataServer(srv *lookout.DataServerHandler, serveResul
func (c *EventCommand) analyzerClient() (lookout.AnalyzerClient, error) {
var err error

c.Args.Analyzer, err = lookout.ToGoGrpcAddress(c.Args.Analyzer)
c.Args.Analyzer, err = grpchelper.ToGoGrpcAddress(c.Args.Analyzer)
if err != nil {
return nil, err
}

timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(
conn, err := grpchelper.DialContext(
timeoutCtx,
c.Args.Analyzer,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
)
if err != nil {
return nil, err
Expand Down
18 changes: 3 additions & 15 deletions cmd/lookout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,25 @@ import (
stdlog "log"
"os"

"github.com/jessevdk/go-flags"
"github.com/src-d/lookout/util/flags"
"google.golang.org/grpc/grpclog"
"gopkg.in/src-d/go-log.v1"
)

const maxMsgSize = 1024 * 1024 * 100 // 100mb

var (
name = "lookout"
version = "undefined"
build = "undefined"
)

var parser = flags.NewParser(nil, flags.Default)
var parser = flags.NewParser()

func init() {
log.DefaultLogger = log.New(log.Fields{"app": name})
}

func main() {
if _, err := parser.Parse(); err != nil {
if err, ok := err.(*flags.Error); ok {
if err.Type == flags.ErrHelp {
os.Exit(0)
}

parser.WriteHelp(os.Stdout)
}

os.Exit(1)
}
flags.RunMain(parser)
}

func setGrpcLogger() {
Expand Down
15 changes: 9 additions & 6 deletions cmd/lookout/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/src-d/lookout/provider/github"
"github.com/src-d/lookout/service/bblfsh"
"github.com/src-d/lookout/service/git"
"github.com/src-d/lookout/util/flags"
"github.com/src-d/lookout/util/grpchelper"

"google.golang.org/grpc"
"gopkg.in/src-d/go-billy.v4/osfs"
Expand All @@ -25,6 +27,7 @@ func init() {
}

type ServeCommand struct {
flags.CommonOptions
ConfigFile string `long:"config" short:"c" default:"config.yml" env:"LOOKOUT_CONFIG_FILE" description:"path to configuration file"`
GithubUser string `long:"github-user" env:"GITHUB_USER" description:"user for the GitHub API"`
GithubToken string `long:"github-token" env:"GITHUB_TOKEN" description:"access token for the GitHub API"`
Expand Down Expand Up @@ -106,12 +109,12 @@ func (c *ServeCommand) initPoster() (lookout.Poster, error) {
}

func (c *ServeCommand) startAnalyzer(conf lookout.AnalyzerConfig) (lookout.AnalyzerClient, error) {
addr, err := lookout.ToGoGrpcAddress(conf.Addr)
addr, err := grpchelper.ToGoGrpcAddress(conf.Addr)
if err != nil {
return nil, err
}

conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpchelper.DialContext(context.Background(), addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, err
}
Expand All @@ -121,12 +124,12 @@ func (c *ServeCommand) startAnalyzer(conf lookout.AnalyzerConfig) (lookout.Analy

func (c *ServeCommand) initDataHadler() (*lookout.DataServerHandler, error) {
var err error
c.Bblfshd, err = lookout.ToGoGrpcAddress(c.Bblfshd)
c.Bblfshd, err = grpchelper.ToGoGrpcAddress(c.Bblfshd)
if err != nil {
return nil, err
}

bblfshConn, err := grpc.Dial(c.Bblfshd, grpc.WithInsecure())
bblfshConn, err := grpchelper.DialContext(context.Background(), c.Bblfshd, grpc.WithInsecure())
if err != nil {
return nil, err
}
Expand All @@ -147,9 +150,9 @@ func (c *ServeCommand) initDataHadler() (*lookout.DataServerHandler, error) {
}

func (c *ServeCommand) startServer(srv *lookout.DataServerHandler) error {
grpcSrv := grpc.NewServer()
grpcSrv := grpchelper.NewServer()
lookout.RegisterDataServer(grpcSrv, srv)
lis, err := lookout.Listen(c.DataServer)
lis, err := grpchelper.Listen(c.DataServer)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions dummy/dummy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/src-d/lookout"
"github.com/src-d/lookout/service/git"
"github.com/src-d/lookout/util/grpchelper"

"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
Expand Down Expand Up @@ -45,7 +46,7 @@ func (s *DummySuite) SetupSuite() {
}
lookout.RegisterDataServer(s.apiServer, server)

lis, err := lookout.Listen("ipv4://0.0.0.0:9991")
lis, err := grpchelper.Listen("ipv4://0.0.0.0:9991")
require.NoError(err)

go s.apiServer.Serve(lis)
Expand Down Expand Up @@ -86,7 +87,7 @@ func (s *DummySuite) Test() {
s.analyzerServer = grpc.NewServer()
lookout.RegisterAnalyzerServer(s.analyzerServer, a)

lis, err := lookout.Listen("ipv4://0.0.0.0:9995")
lis, err := grpchelper.Listen("ipv4://0.0.0.0:9995")
require.NoError(err)

done := make(chan error)
Expand Down
50 changes: 50 additions & 0 deletions util/flags/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package flags

import (
"os"

flags "github.com/jessevdk/go-flags"
"github.com/src-d/lookout/util/grpchelper"
)

// CommonOptions contains common flags for all commands
type CommonOptions struct {
GrpcMaxMsgSize int `long:"grpc-max-message-size" default:"100" env:"LOOKOUT_GRPC_MAX_MSG_SIZE" description:"max. message size to send/receive to/from clients (in MB)"`
}

// GetGrpcMaxMsgSize implements GrpcMaxMsgSizer interface
func (o *CommonOptions) GetGrpcMaxMsgSize() int {
return o.GrpcMaxMsgSize
}

// GrpcMaxMsgSizer is used to get gRPC maximum message size
type GrpcMaxMsgSizer interface {
GetGrpcMaxMsgSize() int
}

// NewParser returns new flags.Parser
func NewParser() *flags.Parser {
parser := flags.NewParser(nil, flags.Default)
parser.CommandHandler = func(command flags.Commander, args []string) error {
if s, ok := command.(GrpcMaxMsgSizer); ok {
grpchelper.SetMaxMessageSize(s.GetGrpcMaxMsgSize())
}
return command.Execute(args)
}
return parser
}

// RunMain parses arguments and runs commands
func RunMain(parser *flags.Parser) {
if _, err := parser.Parse(); err != nil {
if err, ok := err.(*flags.Error); ok {
if err.Type == flags.ErrHelp {
os.Exit(0)
}

parser.WriteHelp(os.Stdout)
}

os.Exit(1)
}
}
38 changes: 37 additions & 1 deletion common.go → util/grpchelper/helper.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
package lookout
package grpchelper

import (
"context"
"fmt"
"net"
"net/url"
"os"

"google.golang.org/grpc"
log "gopkg.in/src-d/go-log.v1"
)

var maxMessageSize = 100 * 1024 * 1024 // 100mb

// SetMaxMessageSize overrides default grpc max. message size to send/receive to/from clients
func SetMaxMessageSize(size int) {
if size >= 2048 {
// Setting the hard limit of message size to less than 2GB since
// it may overflow an int value, and it should be big enough
log.Errorf(fmt.Errorf("max-message-size too big (limit is 2047MB): %d", size), "SetMaxMessageSize")
os.Exit(1)
}

maxMessageSize = size * 1024 * 1024
}

//TODO: https://github.com/grpc/grpc-go/issues/1911

// ToNetListenerAddress converts a gRPC URL to a network+address consumable by
Expand Down Expand Up @@ -65,3 +84,20 @@ func Listen(address string) (net.Listener, error) {

return net.Listen(n, a)
}

// NewGrpcServer creates new grpc.Server with custom message size
func NewServer(opts ...grpc.ServerOption) *grpc.Server {
opts = append(opts, grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize))

return grpc.NewServer(opts...)
}

// GrpcDialContext creates a client connection to the given target with custom message size
func DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts = append(opts, grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxMessageSize),
grpc.MaxCallSendMsgSize(maxMessageSize),
))

return grpc.DialContext(ctx, target, opts...)
}

0 comments on commit e917f9a

Please sign in to comment.