Skip to content

Commit

Permalink
Problem: node can't quit by signal (#543)
Browse files Browse the repository at this point in the history
* Problem: node can't quit by signal

Solution:
- cleanup graceful shutdown procedure

* Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

* fix lint

* cleanup

---------

Signed-off-by: yihuang <[email protected]>
Co-authored-by: mmsqe <[email protected]>
  • Loading branch information
yihuang and mmsqe authored Oct 17, 2024
1 parent a2ad87c commit 816389c
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (rpc) [#534](https://github.com/crypto-org-chain/ethermint/pull/534), [#540](https://github.com/crypto-org-chain/ethermint/pull/540) Fix opBlockhash when no block header in abci request.
* (rpc) [#536](https://github.com/crypto-org-chain/ethermint/pull/536) Fix validate basic after transaction conversion with raw field.
* (cli) [#537](https://github.com/crypto-org-chain/ethermint/pull/537) Fix unsuppored sign mode SIGN_MODE_TEXTUAL for bank transfer.
* (cli) [#543](https://github.com/crypto-org-chain/ethermint/pull/543) Fix graceful shutdown.

### Improvements

Expand Down
37 changes: 28 additions & 9 deletions server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package server

import (
"context"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -47,18 +48,20 @@ type AppWithPendingTxStream interface {
}

// StartJSONRPC starts the JSON-RPC server
func StartJSONRPC(srvCtx *server.Context,
func StartJSONRPC(
ctx context.Context,
srvCtx *server.Context,
clientCtx client.Context,
g *errgroup.Group,
config *config.Config,
indexer ethermint.EVMTxIndexer,
app AppWithPendingTxStream,
) (*http.Server, chan struct{}, error) {
) (*http.Server, error) {
logger := srvCtx.Logger.With("module", "geth")

evtClient, ok := clientCtx.Client.(rpcclient.EventsClient)
if !ok {
return nil, nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client)
return nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client)
}

var rpcStream *stream.RPCStream
Expand All @@ -73,7 +76,7 @@ func StartJSONRPC(srvCtx *server.Context,
}

if err != nil {
return nil, nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err)
return nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err)
}

app.RegisterPendingTxListener(rpcStream.ListenPendingTx)
Expand Down Expand Up @@ -104,7 +107,7 @@ func StartJSONRPC(srvCtx *server.Context,
"namespace", api.Namespace,
"service", api.Service,
)
return nil, nil, err
return nil, err
}
}

Expand All @@ -128,25 +131,41 @@ func StartJSONRPC(srvCtx *server.Context,

ln, err := Listen(httpSrv.Addr, config)
if err != nil {
return nil, nil, err
return nil, err
}

g.Go(func() error {
srvCtx.Logger.Info("Starting JSON-RPC server", "address", config.JSONRPC.Address)
if err := httpSrv.Serve(ln); err != nil {
errCh := make(chan error)
go func() {
errCh <- httpSrv.Serve(ln)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
// The calling process canceled or closed the provided context, so we must
// gracefully stop the JSON-RPC server.
logger.Info("stopping JSON-RPC server...", "address", config.JSONRPC.Address)
if err := httpSrv.Shutdown(context.Background()); err != nil {
logger.Error("failed to shutdown JSON-RPC server", "error", err.Error())
}
return ln.Close()

case err := <-errCh:
if err == http.ErrServerClosed {
close(httpSrvDone)
}

srvCtx.Logger.Error("failed to start JSON-RPC server", "error", err.Error())
return err
}
return nil
})

srvCtx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress)

wsSrv := rpc.NewWebsocketsServer(clientCtx, srvCtx.Logger, rpcStream, config)
wsSrv.Start()
return httpSrv, httpSrvDone, nil
return httpSrv, nil
}
54 changes: 16 additions & 38 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"runtime/pprof"
"time"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
Expand Down Expand Up @@ -419,10 +417,11 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start
idxer = indexer.NewKVIndexer(idxDB, idxLogger, clientCtx)
indexerService := NewEVMIndexerService(idxer, clientCtx.Client.(rpcclient.Client), config.JSONRPC.AllowIndexerGap)
indexerService.SetLogger(servercmtlog.CometLoggerWrapper{Logger: idxLogger})

g.Go(func() error {
return indexerService.Start()
})
go func() {
if err := indexerService.Start(); err != nil {
logger.Error("failed to start evm indexer service", "error", err.Error())
}
}()
}

if config.API.Enable || config.JSONRPC.Enable {
Expand All @@ -443,30 +442,12 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start
if err != nil {
return err
}
if grpcSrv != nil {
defer grpcSrv.GracefulStop()
}

apiSrv := startAPIServer(ctx, svrCtx, clientCtx, g, config.Config, app, grpcSrv, metrics)
if apiSrv != nil {
defer apiSrv.Close()
}
startAPIServer(ctx, svrCtx, clientCtx, g, config.Config, app, grpcSrv, metrics)

clientCtx, httpSrv, httpSrvDone, err := startJSONRPCServer(svrCtx, clientCtx, g, config, genDocProvider, idxer, app)
if httpSrv != nil {
defer func() {
shutdownCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
if err := httpSrv.Shutdown(shutdownCtx); err != nil {
logger.Error("HTTP server shutdown produced a warning", "error", err.Error())
} else {
logger.Info("HTTP server shut down, waiting 5 sec")
select {
case <-time.Tick(5 * time.Second):
case <-httpSrvDone:
}
}
}()
clientCtx, err = startJSONRPCServer(ctx, svrCtx, clientCtx, g, config, genDocProvider, idxer, app)
if err != nil {
return err
}

// At this point it is safe to block the process if we're in query only mode as
Expand Down Expand Up @@ -619,9 +600,9 @@ func startAPIServer(
app types.Application,
grpcSrv *grpc.Server,
metrics *telemetry.Metrics,
) *api.Server {
) {
if !svrCfg.API.Enable {
return nil
return
}

apiSrv := api.New(clientCtx, svrCtx.Logger.With("server", "api"), grpcSrv)
Expand All @@ -634,38 +615,35 @@ func startAPIServer(
g.Go(func() error {
return apiSrv.Start(ctx, svrCfg)
})
return apiSrv
}

func startJSONRPCServer(
stdCtx context.Context,
svrCtx *server.Context,
clientCtx client.Context,
g *errgroup.Group,
config config.Config,
genDocProvider node.GenesisDocProvider,
idxer ethermint.EVMTxIndexer,
app types.Application,
) (ctx client.Context, httpSrv *http.Server, httpSrvDone chan struct{}, err error) {
) (ctx client.Context, err error) {
ctx = clientCtx
if !config.JSONRPC.Enable {
return
}

txApp, ok := app.(AppWithPendingTxStream)
if !ok {
return ctx, httpSrv, httpSrvDone, fmt.Errorf("json-rpc server requires AppWithPendingTxStream")
return ctx, fmt.Errorf("json-rpc server requires AppWithPendingTxStream")
}

genDoc, err := genDocProvider()
if err != nil {
return ctx, httpSrv, httpSrvDone, err
return ctx, err
}

ctx = clientCtx.WithChainID(genDoc.ChainID)
g.Go(func() error {
httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, g, &config, idxer, txApp)
return err
})
_, err = StartJSONRPC(stdCtx, svrCtx, clientCtx, g, &config, idxer, txApp)
return
}

Expand Down
28 changes: 8 additions & 20 deletions testutil/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,13 @@ type (
RPCClient tmrpcclient.Client
JSONRPCClient *ethclient.Client

tmNode *node.Node
api *api.Server
grpc *grpc.Server
grpcWeb *http.Server
jsonrpc *http.Server
jsonrpcDone chan struct{}
errGroup *errgroup.Group
cancelFn context.CancelFunc
tmNode *node.Node
api *api.Server
grpc *grpc.Server
grpcWeb *http.Server
jsonrpc *http.Server
errGroup *errgroup.Group
cancelFn context.CancelFunc
}
)

Expand Down Expand Up @@ -648,18 +647,7 @@ func (n *Network) Cleanup() {
}

if v.jsonrpc != nil {
shutdownCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()

if err := v.jsonrpc.Shutdown(shutdownCtx); err != nil {
v.tmNode.Logger.Error("HTTP server shutdown produced a warning", "error", err.Error())
} else {
v.tmNode.Logger.Info("HTTP server shut down, waiting 5 sec")
select {
case <-time.Tick(5 * time.Second):
case <-v.jsonrpcDone:
}
}
_ = v.jsonrpc.Close()
}
}

Expand Down
4 changes: 2 additions & 2 deletions testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func startInProcess(cfg Config, val *Validator) error {
return fmt.Errorf("validator %s context is nil", val.Moniker)
}

val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(
val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig,
val.jsonrpc, err = server.StartJSONRPC(
ctx, val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig,
nil, app.(server.AppWithPendingTxStream),
)
if err != nil {
Expand Down

0 comments on commit 816389c

Please sign in to comment.