From 56dfebb975f8ed3962a5e84234b32a270a965db4 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Mon, 12 Aug 2024 12:25:05 +0100 Subject: [PATCH 1/6] adding batching options for RPC client Signed-off-by: Chengxuan Xing --- internal/rpcserver/server.go | 4 +- internal/signermsgs/en_error_messges.go | 1 + pkg/rpcbackend/backend.go | 309 +++++++++++--- pkg/rpcbackend/backend_test.go | 516 +++++++++++++++++++++++- 4 files changed, 757 insertions(+), 73 deletions(-) diff --git a/internal/rpcserver/server.go b/internal/rpcserver/server.go index 31a6cc6f..48f5a05e 100644 --- a/internal/rpcserver/server.go +++ b/internal/rpcserver/server.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -45,7 +45,7 @@ func NewServer(ctx context.Context, wallet ethsigner.Wallet) (ss Server, err err return nil, err } s := &rpcServer{ - backend: rpcbackend.NewRPCClient(httpClient), + backend: rpcbackend.NewRPCClient(ctx, httpClient), apiServerDone: make(chan error), wallet: wallet, chainID: config.GetInt64(signerconfig.BackendChainID), diff --git a/internal/signermsgs/en_error_messges.go b/internal/signermsgs/en_error_messges.go index 4528d4e4..a9b28468 100644 --- a/internal/signermsgs/en_error_messges.go +++ b/internal/signermsgs/en_error_messges.go @@ -103,4 +103,5 @@ var ( MsgInvalidEIP1559Transaction = ffe("FF22084", "Transaction payload invalid (EIP-1559): %v") MsgInvalidEIP155TransactionV = ffe("FF22085", "Invalid V value from EIP-155 transaction (chainId=%d)") MsgInvalidChainID = ffe("FF22086", "Invalid chainId expected=%d actual=%d") + MsgRPCRequestBatchFailed = ffe("FF22087", "Received response doesn't match the number of batched requests.") ) diff --git a/pkg/rpcbackend/backend.go b/pkg/rpcbackend/backend.go index 36fd3a5d..a0b4bdd2 100644 --- a/pkg/rpcbackend/backend.go +++ b/pkg/rpcbackend/backend.go @@ -50,12 +50,12 @@ type Backend interface { } // NewRPCClient Constructor -func NewRPCClient(client *resty.Client) Backend { - return NewRPCClientWithOption(client, RPCClientOptions{}) +func NewRPCClient(ctx context.Context, client *resty.Client) Backend { + return NewRPCClientWithOption(ctx, client, RPCClientOptions{}) } // NewRPCClientWithOption Constructor -func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Backend { +func NewRPCClientWithOption(ctx context.Context, client *resty.Client, options RPCClientOptions) Backend { rpcClient := &RPCClient{ client: client, } @@ -64,17 +64,44 @@ func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Back rpcClient.concurrencySlots = make(chan bool, options.MaxConcurrentRequest) } + if options.BatchOptions != nil { + batchDelay := 50 * time.Millisecond + batchSize := 500 + batchWorkerCount := 50 + if options.BatchOptions.BatchDelay != nil { + batchDelay = *options.BatchOptions.BatchDelay + } + + if options.BatchOptions.BatchSize != 0 { + batchSize = options.BatchOptions.BatchSize + } + if options.BatchOptions.BatchWorkerCount != 0 { + batchWorkerCount = options.BatchOptions.BatchWorkerCount + } + rpcClient.requestBatchWorkerSlots = make(chan bool, batchWorkerCount) + rpcClient.startBatcher(ctx, batchDelay, batchSize) + } + return rpcClient } type RPCClient struct { - client *resty.Client - concurrencySlots chan bool - requestCounter int64 + client *resty.Client + concurrencySlots chan bool + requestCounter int64 + requestBatchQueue chan *batchRequest + requestBatchWorkerSlots chan bool +} + +type RPCClientBatchOptions struct { + BatchDelay *time.Duration + BatchSize int + BatchWorkerCount int } type RPCClientOptions struct { MaxConcurrentRequest int64 + BatchOptions *RPCClientBatchOptions } type RPCRequest struct { @@ -141,11 +168,137 @@ func (rc *RPCClient) CallRPC(ctx context.Context, result interface{}, method str return nil } -// SyncRequest sends an individual RPC request to the backend (always over HTTP currently), -// and waits synchronously for the response, or an error. -// -// In all return paths *including error paths* the RPCResponse is populated -// so the caller has an RPC structure to send back to the front-end caller. +type batchRequest struct { + rpcReq *RPCRequest + rpcRes chan *RPCResponse + rpcErr chan error +} + +func (rc *RPCClient) startBatcher(ctx context.Context, batchDelay time.Duration, batchSize int) { + requestQueue := make(chan *batchRequest) + + go func() { + ticker := time.NewTicker(batchDelay) + defer ticker.Stop() + + var batch []*batchRequest + + for { + select { + case req := <-requestQueue: + batch = append(batch, req) + if len(batch) >= batchSize { + rc.sendBatch(ctx, batch) + batch = nil + } + case <-ticker.C: + if len(batch) > 0 { + rc.sendBatch(ctx, batch) + batch = nil + } + case <-ctx.Done(): + return + } + } + }() + + rc.requestBatchQueue = requestQueue +} + +func (rc *RPCClient) sendBatch(ctx context.Context, batch []*batchRequest) { + select { + case rc.requestBatchWorkerSlots <- true: + // wait for the worker slot and continue + case <-ctx.Done(): + for _, req := range batch { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed) + req.rpcErr <- err + } + return + } + go func() { + defer func() { + <-rc.requestBatchWorkerSlots + }() + + batchRPCTraceID := fmt.Sprintf("batch-%d", time.Now().UnixNano()) + traceIDs := make([]string, len(batch)) + + var rpcReqs []*RPCRequest + for i, req := range batch { + // We always set the back-end request ID - as we need to support requests coming in from + // multiple concurrent clients on our front-end that might use clashing IDs. + var beReq = *req.rpcReq + beReq.JSONRpc = "2.0" + rpcTraceID := rc.allocateRequestID(&beReq) + if req.rpcReq.ID != nil { + // We're proxying a request with front-end RPC ID - log that as well + rpcTraceID = fmt.Sprintf("%s->%s/%s", req.rpcReq.ID, batchRPCTraceID, rpcTraceID) + } + traceIDs[i] = rpcTraceID + rpcReqs = append(rpcReqs, &beReq) + } + log.L(ctx).Debugf("RPC[%s] --> BATCH %d requests", batchRPCTraceID, len(rpcReqs)) + + responses := make([]*RPCResponse, len(batch)) + res, err := rc.client.R(). + SetContext(ctx). + SetBody(rpcReqs). + SetResult(&responses). + SetError(&responses). + Post("") + + if err != nil { + log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", batchRPCTraceID, err) + for _, req := range batch { + req.rpcErr <- err + } + return + } + + if len(responses) != len(batch) { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed) + for _, req := range batch { + req.rpcErr <- err + } + return + } + + for i, resp := range responses { + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonOutput, _ := json.Marshal(resp) + log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", batchRPCTraceID, jsonOutput) + } + + // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes + if res.IsError() || (resp != nil && resp.Error != nil && resp.Error.Code != 0) { + rpcMsg := "" + errLog := "" + if resp != nil { + rpcMsg = resp.Message() + errLog = rpcMsg + } + if rpcMsg == "" { + // Log the raw result in the case of JSON parse error etc. (note that Resty no longer + // returns this as an error - rather the body comes back raw) + errLog = string(res.Body()) + rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() + } + traceID := traceIDs[i] + log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", traceID, res.StatusCode(), errLog) + batch[i].rpcErr <- fmt.Errorf(rpcMsg) + } else { + if resp.Result == nil { + // We don't want a result for errors, but a null success response needs to go in there + resp.Result = fftypes.JSONAnyPtr(fftypes.NullString) + } + batch[i].rpcRes <- resp + + } + } + }() +} + func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRes *RPCResponse, err error) { if rc.concurrencySlots != nil { select { @@ -160,63 +313,89 @@ func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRe }() } - // We always set the back-end request ID - as we need to support requests coming in from - // multiple concurrent clients on our front-end that might use clashing IDs. - var beReq = *rpcReq - beReq.JSONRpc = "2.0" - rpcTraceID := rc.allocateRequestID(&beReq) - if rpcReq.ID != nil { - // We're proxying a request with front-end RPC ID - log that as well - rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID) - } + if rc.requestBatchQueue != nil { + req := &batchRequest{ + rpcReq: rpcReq, + rpcRes: make(chan *RPCResponse, 1), + rpcErr: make(chan error, 1), + } - rpcRes = new(RPCResponse) + select { + case rc.requestBatchQueue <- req: + case <-ctx.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + } - log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method) - if logrus.IsLevelEnabled(logrus.TraceLevel) { - jsonInput, _ := json.Marshal(rpcReq) - log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput) - } - rpcStartTime := time.Now() - res, err := rc.client.R(). - SetContext(ctx). - SetBody(beReq). - SetResult(&rpcRes). - SetError(rpcRes). - Post("") - - // Restore the original ID - rpcRes.ID = rpcReq.ID - if err != nil { - err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err) - log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err) - rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError) - return rpcRes, err - } - if logrus.IsLevelEnabled(logrus.TraceLevel) { - jsonOutput, _ := json.Marshal(rpcRes) - log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput) - } - // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes - if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 { - rpcMsg := rpcRes.Message() - errLog := rpcMsg - if rpcMsg == "" { - // Log the raw result in the case of JSON parse error etc. (note that Resty no longer - // returns this as an error - rather the body comes back raw) - errLog = string(res.Body()) - rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() - } - log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog) - err := fmt.Errorf(rpcMsg) - return rpcRes, err - } - log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond)) - if rpcRes.Result == nil { - // We don't want a result for errors, but a null success response needs to go in there - rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString) + select { + case rpcRes := <-req.rpcRes: + return rpcRes, nil + case err := <-req.rpcErr: + return nil, err + case <-ctx.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + } + } else { + // We always set the back-end request ID - as we need to support requests coming in from + // multiple concurrent clients on our front-end that might use clashing IDs. + var beReq = *rpcReq + beReq.JSONRpc = "2.0" + rpcTraceID := rc.allocateRequestID(&beReq) + if rpcReq.ID != nil { + // We're proxying a request with front-end RPC ID - log that as well + rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID) + } + + rpcRes = new(RPCResponse) + + log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method) + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonInput, _ := json.Marshal(rpcReq) + log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput) + } + rpcStartTime := time.Now() + res, err := rc.client.R(). + SetContext(ctx). + SetBody(beReq). + SetResult(&rpcRes). + SetError(&rpcRes). + Post("") + + // Restore the original ID + rpcRes.ID = rpcReq.ID + if err != nil { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err) + log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err) + rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError) + return rpcRes, err + } + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonOutput, _ := json.Marshal(rpcRes) + log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput) + } + // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes + if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 { + rpcMsg := rpcRes.Message() + errLog := rpcMsg + if rpcMsg == "" { + // Log the raw result in the case of JSON parse error etc. (note that Resty no longer + // returns this as an error - rather the body comes back raw) + errLog = string(res.Body()) + rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() + } + log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog) + err := fmt.Errorf(rpcMsg) + return rpcRes, err + } + log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond)) + if rpcRes.Result == nil { + // We don't want a result for errors, but a null success response needs to go in there + rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString) + } + return rpcRes, nil } - return rpcRes, nil + } func RPCErrorResponse(err error, id *fftypes.JSONAny, code RPCCode) *RPCResponse { diff --git a/pkg/rpcbackend/backend_test.go b/pkg/rpcbackend/backend_test.go index d8b49736..3bc7444f 100644 --- a/pkg/rpcbackend/backend_test.go +++ b/pkg/rpcbackend/backend_test.go @@ -24,6 +24,7 @@ import ( "net/http/httptest" "strconv" "testing" + "time" "github.com/hyperledger/firefly-common/pkg/ffresty" "github.com/hyperledger/firefly-common/pkg/fftypes" @@ -33,9 +34,10 @@ import ( "github.com/stretchr/testify/assert" ) -type testRPCHander func(rpcReq *RPCRequest) (int, *RPCResponse) +type testRPCHandler func(rpcReq *RPCRequest) (int, *RPCResponse) +type testBatchRPCHandler func(rpcReq []*RPCRequest) (int, []*RPCResponse) -func newTestServer(t *testing.T, rpcHandler testRPCHander, options ...RPCClientOptions) (context.Context, *RPCClient, func()) { +func newTestServer(t *testing.T, rpcHandler testRPCHandler, options ...RPCClientOptions) (context.Context, *RPCClient, func()) { ctx, cancelCtx := context.WithCancel(context.Background()) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -64,7 +66,56 @@ func newTestServer(t *testing.T, rpcHandler testRPCHander, options ...RPCClientO c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClient(c).(*RPCClient) + var rb *RPCClient + + if len(options) == 1 { + rb = NewRPCClientWithOption(ctx, c, options[0]).(*RPCClient) + } else { + rb = NewRPCClient(ctx, c).(*RPCClient) + } + + return ctx, rb, func() { + cancelCtx() + server.Close() + } +} + +func newBatchTestServer(t *testing.T, rpcHandler testBatchRPCHandler, options ...RPCClientOptions) (context.Context, *RPCClient, func()) { + + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + + status, rpcRes := rpcHandler(rpcReqs) + b := []byte(`[]`) + if rpcRes != nil { + b, err = json.Marshal(rpcRes) + assert.NoError(t, err) + } + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(b))) + w.WriteHeader(status) + w.Write(b) + + })) + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) + + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + + var rb *RPCClient + + if len(options) == 1 { + rb = NewRPCClientWithOption(ctx, c, options[0]).(*RPCClient) + } else { + rb = NewRPCClientWithOption(ctx, c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{}}).(*RPCClient) + } return ctx, rb, func() { cancelCtx() @@ -218,7 +269,7 @@ func TestSyncRPCCallBadJSONResponse(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClient(c).(*RPCClient) + rb := NewRPCClient(context.Background(), c).(*RPCClient) var txCount ethtypes.HexInteger rpcErr := rb.CallRPC(context.Background(), &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") @@ -239,7 +290,7 @@ func TestSyncRPCCallFailParseJSONResponse(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClient(c).(*RPCClient) + rb := NewRPCClient(context.Background(), c).(*RPCClient) var mapResult map[string]interface{} rpcErr := rb.CallRPC(context.Background(), &mapResult, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") @@ -290,7 +341,7 @@ func TestSyncRequestConcurrency(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClientWithOption(c, RPCClientOptions{ + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ MaxConcurrentRequest: 1, }).(*RPCClient) @@ -308,5 +359,458 @@ func TestSyncRequestConcurrency(t *testing.T) { close(blocked) <-bgDone +} + +func TestBatchSyncRPCCallNullResponse(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { + rpcReq := rpcReqs[0] + assert.Equal(t, "2.0", rpcReq.JSONRpc) + assert.Equal(t, "eth_getTransactionReceipt", rpcReq.Method) + assert.Equal(t, `"000012346"`, rpcReq.ID.String()) + assert.Equal(t, `"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`, rpcReq.Params[0].String()) + return 200, []*RPCResponse{{ + JSONRpc: "2.0", + ID: rpcReq.ID, + Result: nil}} + }) + rb.requestCounter = 12345 + defer done() + + rpcRes, err := rb.SyncRequest(ctx, &RPCRequest{ + ID: fftypes.JSONAnyPtr("1"), + Method: "eth_getTransactionReceipt", + Params: []*fftypes.JSONAny{ + fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`), + }, + }) + assert.NoError(t, err) + assert.Equal(t, `null`, rpcRes.Result.String()) +} + +func TestBatchSyncRequestCanceledContext(t *testing.T) { + + blocked := make(chan struct{}) + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blocked + cancelCtx() + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`[{}]`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + BatchOptions: &RPCClientBatchOptions{ + BatchSize: 1, + BatchWorkerCount: 1, + }, + }).(*RPCClient) + + checkDone := make(chan bool) + go func() { + _, err = rb.SyncRequest(ctx, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context + close(checkDone) + }() + close(blocked) + <-checkDone + + // this checks request hit cancel context + _, err = rb.SyncRequest(ctx, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) + +} +func TestBatchSyncRequestCanceledContextWhenQueueingABatch(t *testing.T) { + + blocked := make(chan struct{}) + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blocked + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`[{}]`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + BatchOptions: &RPCClientBatchOptions{ + BatchSize: 1, + BatchWorkerCount: 1, + }, + }).(*RPCClient) + + rb.requestBatchWorkerSlots <- true // fill the worker slot so all batch will be queueing + + checkDone := make(chan bool) + go func() { + _, err = rb.SyncRequest(ctx, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context + close(checkDone) + }() + time.Sleep(50 * time.Millisecond) // wait for the quest to be queued in the other go routine + cancelCtx() + <-checkDone + +} +func TestBatchSyncRPCCallErrorResponse(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { + assert.Equal(t, 1, len(rpcReqs)) + return 500, []*RPCResponse{{ + JSONRpc: "2.0", + ID: rpcReqs[0].ID, + Error: &RPCError{ + Message: "pop", + }, + }} + }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}) + rb.requestCounter = 12345 + defer done() + + var txCount ethtypes.HexInteger + err := rb.CallRPC(ctx, &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "pop", err) +} + +func TestBatchSyncRPCCallErrorCountMismatch(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { + assert.Equal(t, 1, len(rpcReqs)) + return 500, []*RPCResponse{} + }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}) + rb.requestCounter = 12345 + defer done() + + var txCount ethtypes.HexInteger + err := rb.CallRPC(ctx, &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "FF22087", err) +} + +func TestBatchSyncRPCCallBadJSONResponse(t *testing.T) { + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`{!!!!`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(context.Background(), c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}).(*RPCClient) + + var txCount ethtypes.HexInteger + rpcErr := rb.CallRPC(context.Background(), &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "FF22012", rpcErr.Error()) +} + +func TestBatchSyncRPCCallFailParseJSONResponse(t *testing.T) { + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(200) + w.Write([]byte(`[{"result":"not an object"}]`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(context.Background(), c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}).(*RPCClient) + + var mapResult map[string]interface{} + rpcErr := rb.CallRPC(context.Background(), &mapResult, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "FF22065", rpcErr.Error()) +} + +func TestBatchSyncRPCCallErrorBadInput(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { return 500, nil }) + defer done() + + var txCount ethtypes.HexInteger + err := rb.CallRPC(ctx, &txCount, "test-bad-params", map[bool]bool{false: true}) + assert.Regexp(t, "FF22011", err) +} + +func TestBatchRequestsOKWithBatchSize(t *testing.T) { + // Define the expected server response to the batch + rpcServerResponseBatchBytes := []byte(`[ + { + "jsonrpc": "2.0", + "id": 1, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x24" + } + }, + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x10" + } + } + ]`) + + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) + w.WriteHeader(200) + w.Write(rpcServerResponseBatchBytes) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + tH := 2 * time.Hour + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + MaxConcurrentRequest: 10, + BatchOptions: &RPCClientBatchOptions{ + BatchDelay: &tH, // very long delay, so need to rely on batch size to be hit for sending a batch + BatchSize: 2, + }, + }).(*RPCClient) + + round := 400 + + reqNumbers := 2*round - 1 + + requestCount := make(chan bool, reqNumbers) + + for i := 0; i < reqNumbers; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqNumbers-1; i++ { + <-requestCount + } + + _, err = rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + + <-requestCount +} + +func TestBatchRequestsTestWorkerCounts(t *testing.T) { + // Define the expected server response to the batch + rpcServerResponseBatchBytes := []byte(`[ + { + "jsonrpc": "2.0", + "id": 1, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x24" + } + }, + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x10" + } + } + ]`) + + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + time.Sleep(200 * time.Millisecond) // set 200s delay for each quest + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) + w.WriteHeader(200) + w.Write(rpcServerResponseBatchBytes) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + + // only a single worker + tH := 2 * time.Hour + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + MaxConcurrentRequest: 10, + BatchOptions: &RPCClientBatchOptions{ + BatchDelay: &tH, // very long delay, so need to rely on batch size to be hit for sending a batch + BatchSize: 2, + BatchWorkerCount: 1, + }, + }).(*RPCClient) + + round := 5 // doing first round, each round will have at least 200ms delay, so the whole flow will guaranteed to be more than 1s + + reqNumbers := 2 * round + + requestCount := make(chan bool, reqNumbers) + + requestStart := time.Now() + + for i := 0; i < reqNumbers; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + ID: fftypes.JSONAnyPtr("testId"), + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqNumbers; i++ { + <-requestCount + } + + assert.Greater(t, time.Since(requestStart), 1*time.Second) + + // number of worker equal to the number of rounds + // so the delay should be slightly greater than per request delay (200ms), but hopefully less than 300ms (with 100ms overhead) + rb = NewRPCClientWithOption(ctx, c, RPCClientOptions{ + MaxConcurrentRequest: 10, + BatchOptions: &RPCClientBatchOptions{ + BatchDelay: &tH, // very long delay, so need to rely on batch size to be hit for sending a batch + BatchSize: 2, + BatchWorkerCount: round, + }, + }).(*RPCClient) + + requestStart = time.Now() + + for i := 0; i < reqNumbers; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqNumbers; i++ { + <-requestCount + } + + assert.Greater(t, time.Since(requestStart), 200*time.Millisecond) + assert.Less(t, time.Since(requestStart), 300*time.Millisecond) + +} + +func TestBatchRequestsOKWithBatchDelay(t *testing.T) { + // Define the expected server response to the batch + rpcServerResponseBatchBytes := []byte(`[ + { + "jsonrpc": "2.0", + "id": 1, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x24" + } + }, + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x10" + } + } + ]`) + + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) + w.WriteHeader(200) + w.Write(rpcServerResponseBatchBytes) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + hundredMs := 100 * time.Millisecond + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + MaxConcurrentRequest: 10, + BatchOptions: &RPCClientBatchOptions{ + BatchDelay: &hundredMs, + BatchSize: 2000, // very big batch size, so need to rely on batch delay to be hit for sending a batch + }, + }).(*RPCClient) + + round := 5 + + reqPerRound := 2 + + for i := 0; i < round; i++ { + requestCount := make(chan bool, reqPerRound) + for i := 0; i < reqPerRound; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqPerRound; i++ { + <-requestCount + } + + } } From 02ec6cc8d34b533bc0873ad55709803277bdfbd8 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Mon, 12 Aug 2024 16:40:24 +0100 Subject: [PATCH 2/6] add configuration parsing logic Signed-off-by: Chengxuan Xing --- internal/rpcserver/server.go | 2 +- internal/signermsgs/en_config_descriptions.go | 8 +- pkg/rpcbackend/backend.go | 322 ++++++++++-------- pkg/rpcbackend/backend_test.go | 195 ++++++++--- pkg/rpcbackend/config.go | 77 +++++ 5 files changed, 408 insertions(+), 196 deletions(-) create mode 100644 pkg/rpcbackend/config.go diff --git a/internal/rpcserver/server.go b/internal/rpcserver/server.go index 48f5a05e..15c0edad 100644 --- a/internal/rpcserver/server.go +++ b/internal/rpcserver/server.go @@ -45,7 +45,7 @@ func NewServer(ctx context.Context, wallet ethsigner.Wallet) (ss Server, err err return nil, err } s := &rpcServer{ - backend: rpcbackend.NewRPCClient(ctx, httpClient), + backend: rpcbackend.NewRPCClient(httpClient), apiServerDone: make(chan error), wallet: wallet, chainID: config.GetInt64(signerconfig.BackendChainID), diff --git a/internal/signermsgs/en_config_descriptions.go b/internal/signermsgs/en_config_descriptions.go index 41d456b0..04e01704 100644 --- a/internal/signermsgs/en_config_descriptions.go +++ b/internal/signermsgs/en_config_descriptions.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -53,4 +53,10 @@ var ( ConfigBackendChainID = ffc("config.backend.chainId", "Optionally set the Chain ID of the blockchain. Otherwise the Network ID will be queried, and used as the Chain ID in signing", "number") ConfigBackendURL = ffc("config.backend.url", "URL for the backend JSON/RPC server / blockchain node", "url") ConfigBackendProxyURL = ffc("config.backend.proxy.url", "Optional HTTP proxy URL", "url") + + ConfigRPCBatchMaxConcurrentRequest = ffc("config.maxConcurrentRequest", "The maximum number of concurrent JSON-RPC requests get processed at a time", i18n.IntType) + ConfigRPCBatchEnabled = ffc("config.batch.enabled", "Whether to enable batching JSON-RPC requests", i18n.BooleanType) + ConfigRPCBatchSize = ffc("config.batch.size", "When the amount of queued requests reaches this number, they will be batched and dispatched", i18n.IntType) + ConfigRPCBatchTimeout = ffc("config.batch.timeout", "When the time since the first request was queued reaches this timeout, all requests in the queue will be batched and dispatched", i18n.TimeDurationType) + ConfigRPCBatchDispatchConcurrency = ffc("config.batch.dispatchConcurrency", "The maximum number of concurrent batch dispatching process", i18n.IntType) ) diff --git a/pkg/rpcbackend/backend.go b/pkg/rpcbackend/backend.go index a0b4bdd2..93e3891b 100644 --- a/pkg/rpcbackend/backend.go +++ b/pkg/rpcbackend/backend.go @@ -50,12 +50,12 @@ type Backend interface { } // NewRPCClient Constructor -func NewRPCClient(ctx context.Context, client *resty.Client) Backend { - return NewRPCClientWithOption(ctx, client, RPCClientOptions{}) +func NewRPCClient(client *resty.Client) Backend { + return NewRPCClientWithOption(client, RPCClientOptions{}) } // NewRPCClientWithOption Constructor -func NewRPCClientWithOption(ctx context.Context, client *resty.Client, options RPCClientOptions) Backend { +func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Backend { rpcClient := &RPCClient{ client: client, } @@ -64,44 +64,36 @@ func NewRPCClientWithOption(ctx context.Context, client *resty.Client, options R rpcClient.concurrencySlots = make(chan bool, options.MaxConcurrentRequest) } - if options.BatchOptions != nil { - batchDelay := 50 * time.Millisecond + if options.BatchOptions != nil && options.BatchOptions.Enabled { + if options.BatchOptions.BatchDispatcherContext == nil { + panic("must provide a batch dispatcher context when batch is enabled") + } + batchTimeout := 50 * time.Millisecond batchSize := 500 batchWorkerCount := 50 - if options.BatchOptions.BatchDelay != nil { - batchDelay = *options.BatchOptions.BatchDelay + if options.BatchOptions.BatchTimeout > 0 { + batchTimeout = options.BatchOptions.BatchTimeout } - if options.BatchOptions.BatchSize != 0 { + if options.BatchOptions.BatchSize > 0 { batchSize = options.BatchOptions.BatchSize } - if options.BatchOptions.BatchWorkerCount != 0 { - batchWorkerCount = options.BatchOptions.BatchWorkerCount + if options.BatchOptions.BatchMaxDispatchConcurrency > 0 { + batchWorkerCount = options.BatchOptions.BatchMaxDispatchConcurrency } - rpcClient.requestBatchWorkerSlots = make(chan bool, batchWorkerCount) - rpcClient.startBatcher(ctx, batchDelay, batchSize) + rpcClient.requestBatchConcurrencySlots = make(chan bool, batchWorkerCount) + rpcClient.startBatchDispatcher(options.BatchOptions.BatchDispatcherContext, batchTimeout, batchSize) } return rpcClient } type RPCClient struct { - client *resty.Client - concurrencySlots chan bool - requestCounter int64 - requestBatchQueue chan *batchRequest - requestBatchWorkerSlots chan bool -} - -type RPCClientBatchOptions struct { - BatchDelay *time.Duration - BatchSize int - BatchWorkerCount int -} - -type RPCClientOptions struct { - MaxConcurrentRequest int64 - BatchOptions *RPCClientBatchOptions + client *resty.Client + concurrencySlots chan bool + requestCounter int64 + requestBatchQueue chan *batchRequest + requestBatchConcurrencySlots chan bool } type RPCRequest struct { @@ -174,40 +166,68 @@ type batchRequest struct { rpcErr chan error } -func (rc *RPCClient) startBatcher(ctx context.Context, batchDelay time.Duration, batchSize int) { - requestQueue := make(chan *batchRequest) - - go func() { - ticker := time.NewTicker(batchDelay) - defer ticker.Stop() - - var batch []*batchRequest - - for { - select { - case req := <-requestQueue: - batch = append(batch, req) - if len(batch) >= batchSize { - rc.sendBatch(ctx, batch) - batch = nil - } - case <-ticker.C: - if len(batch) > 0 { - rc.sendBatch(ctx, batch) - batch = nil +func (rc *RPCClient) startBatchDispatcher(dispatcherRootContext context.Context, batchTimeout time.Duration, batchSize int) { + if rc.requestBatchQueue == nil { // avoid orphaned dispatcher + requestQueue := make(chan *batchRequest) + go func() { + var batch []*batchRequest + var ticker *time.Ticker + var tickerChannel <-chan time.Time + for { + select { + case req := <-requestQueue: + batch = append(batch, req) + if ticker == nil { + // first request received start a ticker + ticker = time.NewTicker(batchTimeout) + defer ticker.Stop() + tickerChannel = ticker.C + } + + if len(batch) >= batchSize { + rc.dispatchBatch(dispatcherRootContext, batch) + batch = nil + ticker.Stop() // batch dispatched, stop the ticker, the next request will start a new ticker if needed + ticker = nil + tickerChannel = nil + } + + case <-tickerChannel: + if len(batch) > 0 { + rc.dispatchBatch(dispatcherRootContext, batch) + batch = nil + + ticker.Stop() // batch dispatched, stop the ticker, the next request will start a new ticker if needed + ticker = nil + tickerChannel = nil + } + + case <-dispatcherRootContext.Done(): + if ticker != nil { + ticker.Stop() // clean up the ticker + } + select { // drain the queue + case req := <-requestQueue: + batch = append(batch, req) + default: + } + for i, req := range batch { + // mark all queueing requests as failed + cancelCtxErr := i18n.NewError(dispatcherRootContext, signermsgs.MsgRequestCanceledContext, req.rpcReq.ID) + batch[i].rpcErr <- cancelCtxErr + } + + return } - case <-ctx.Done(): - return } - } - }() - - rc.requestBatchQueue = requestQueue + }() + rc.requestBatchQueue = requestQueue + } } -func (rc *RPCClient) sendBatch(ctx context.Context, batch []*batchRequest) { +func (rc *RPCClient) dispatchBatch(ctx context.Context, batch []*batchRequest) { select { - case rc.requestBatchWorkerSlots <- true: + case rc.requestBatchConcurrencySlots <- true: // use a buffered channel to control the number of concurrent thread for batch dispatcher // wait for the worker slot and continue case <-ctx.Done(): for _, req := range batch { @@ -218,85 +238,91 @@ func (rc *RPCClient) sendBatch(ctx context.Context, batch []*batchRequest) { } go func() { defer func() { - <-rc.requestBatchWorkerSlots + <-rc.requestBatchConcurrencySlots }() - batchRPCTraceID := fmt.Sprintf("batch-%d", time.Now().UnixNano()) - traceIDs := make([]string, len(batch)) - - var rpcReqs []*RPCRequest - for i, req := range batch { - // We always set the back-end request ID - as we need to support requests coming in from - // multiple concurrent clients on our front-end that might use clashing IDs. - var beReq = *req.rpcReq - beReq.JSONRpc = "2.0" - rpcTraceID := rc.allocateRequestID(&beReq) - if req.rpcReq.ID != nil { - // We're proxying a request with front-end RPC ID - log that as well - rpcTraceID = fmt.Sprintf("%s->%s/%s", req.rpcReq.ID, batchRPCTraceID, rpcTraceID) - } - traceIDs[i] = rpcTraceID - rpcReqs = append(rpcReqs, &beReq) - } - log.L(ctx).Debugf("RPC[%s] --> BATCH %d requests", batchRPCTraceID, len(rpcReqs)) + // once a concurrency slot is obtained, dispatch the batch + rc.dispatch(ctx, batch) + }() +} - responses := make([]*RPCResponse, len(batch)) - res, err := rc.client.R(). - SetContext(ctx). - SetBody(rpcReqs). - SetResult(&responses). - SetError(&responses). - Post("") +func (rc *RPCClient) dispatch(ctx context.Context, batch []*batchRequest) { - if err != nil { - log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", batchRPCTraceID, err) - for _, req := range batch { - req.rpcErr <- err - } - return + batchRPCTraceID := fmt.Sprintf("batch-%d", time.Now().UnixNano()) + traceIDs := make([]string, len(batch)) + + var rpcReqs []*RPCRequest + for i, req := range batch { + // We always set the back-end request ID - as we need to support requests coming in from + // multiple concurrent clients on our front-end that might use clashing IDs. + var beReq = *req.rpcReq + beReq.JSONRpc = "2.0" + rpcTraceID := rc.allocateRequestID(&beReq) + if req.rpcReq.ID != nil { + // We're proxying a request with front-end RPC ID - log that as well + rpcTraceID = fmt.Sprintf("%s->%s/%s", req.rpcReq.ID, batchRPCTraceID, rpcTraceID) } + traceIDs[i] = rpcTraceID + rpcReqs = append(rpcReqs, &beReq) + } + log.L(ctx).Debugf("RPC[%s] --> BATCH %d requests", batchRPCTraceID, len(rpcReqs)) - if len(responses) != len(batch) { - err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed) - for _, req := range batch { - req.rpcErr <- err - } - return + responses := make([]*RPCResponse, len(batch)) + res, err := rc.client.R(). + SetContext(ctx). + SetBody(rpcReqs). + SetResult(&responses). + SetError(&responses). + Post("") + + if err != nil { + log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", batchRPCTraceID, err) + for _, req := range batch { + req.rpcErr <- err } + return + } - for i, resp := range responses { - if logrus.IsLevelEnabled(logrus.TraceLevel) { - jsonOutput, _ := json.Marshal(resp) - log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", batchRPCTraceID, jsonOutput) - } + if len(responses) != len(batch) { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed) + for _, req := range batch { + req.rpcErr <- err + } + return + } - // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes - if res.IsError() || (resp != nil && resp.Error != nil && resp.Error.Code != 0) { - rpcMsg := "" - errLog := "" - if resp != nil { - rpcMsg = resp.Message() - errLog = rpcMsg - } - if rpcMsg == "" { - // Log the raw result in the case of JSON parse error etc. (note that Resty no longer - // returns this as an error - rather the body comes back raw) - errLog = string(res.Body()) - rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() - } - traceID := traceIDs[i] - log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", traceID, res.StatusCode(), errLog) - batch[i].rpcErr <- fmt.Errorf(rpcMsg) - } else { - if resp.Result == nil { - // We don't want a result for errors, but a null success response needs to go in there - resp.Result = fftypes.JSONAnyPtr(fftypes.NullString) - } - batch[i].rpcRes <- resp + for i, resp := range responses { + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonOutput, _ := json.Marshal(resp) + log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", batchRPCTraceID, jsonOutput) + } + // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes + if res.IsError() || (resp != nil && resp.Error != nil && resp.Error.Code != 0) { + rpcMsg := "" + errLog := "" + if resp != nil { + rpcMsg = resp.Message() + errLog = rpcMsg } + if rpcMsg == "" { + // Log the raw result in the case of JSON parse error etc. (note that Resty no longer + // returns this as an error - rather the body comes back raw) + errLog = string(res.Body()) + rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() + } + traceID := traceIDs[i] + log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", traceID, res.StatusCode(), errLog) + batch[i].rpcErr <- fmt.Errorf(rpcMsg) + } else { + if resp.Result == nil { + // We don't want a result for errors, but a null success response needs to go in there + resp.Result = fftypes.JSONAnyPtr(fftypes.NullString) + } + batch[i].rpcRes <- resp + } - }() + } } func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRes *RPCResponse, err error) { @@ -314,28 +340,7 @@ func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRe } if rc.requestBatchQueue != nil { - req := &batchRequest{ - rpcReq: rpcReq, - rpcRes: make(chan *RPCResponse, 1), - rpcErr: make(chan error, 1), - } - - select { - case rc.requestBatchQueue <- req: - case <-ctx.Done(): - err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) - return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err - } - - select { - case rpcRes := <-req.rpcRes: - return rpcRes, nil - case err := <-req.rpcErr: - return nil, err - case <-ctx.Done(): - err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) - return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err - } + return rc.batchSyncRequest(ctx, rpcReq) } else { // We always set the back-end request ID - as we need to support requests coming in from // multiple concurrent clients on our front-end that might use clashing IDs. @@ -398,6 +403,31 @@ func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRe } +func (rc *RPCClient) batchSyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRes *RPCResponse, err error) { + req := &batchRequest{ + rpcReq: rpcReq, + rpcRes: make(chan *RPCResponse, 1), + rpcErr: make(chan error, 1), + } + + select { + case rc.requestBatchQueue <- req: + case <-ctx.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + } + + select { + case rpcRes := <-req.rpcRes: + return rpcRes, nil + case err := <-req.rpcErr: + return nil, err + case <-ctx.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + } +} + func RPCErrorResponse(err error, id *fftypes.JSONAny, code RPCCode) *RPCResponse { return &RPCResponse{ JSONRpc: "2.0", diff --git a/pkg/rpcbackend/backend_test.go b/pkg/rpcbackend/backend_test.go index 3bc7444f..34a42285 100644 --- a/pkg/rpcbackend/backend_test.go +++ b/pkg/rpcbackend/backend_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/ffresty" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-signer/internal/signerconfig" @@ -69,9 +70,9 @@ func newTestServer(t *testing.T, rpcHandler testRPCHandler, options ...RPCClient var rb *RPCClient if len(options) == 1 { - rb = NewRPCClientWithOption(ctx, c, options[0]).(*RPCClient) + rb = NewRPCClientWithOption(c, options[0]).(*RPCClient) } else { - rb = NewRPCClient(ctx, c).(*RPCClient) + rb = NewRPCClient(c).(*RPCClient) } return ctx, rb, func() { @@ -112,9 +113,13 @@ func newBatchTestServer(t *testing.T, rpcHandler testBatchRPCHandler, options .. var rb *RPCClient if len(options) == 1 { - rb = NewRPCClientWithOption(ctx, c, options[0]).(*RPCClient) + rb = NewRPCClientWithOption(c, options[0]).(*RPCClient) } else { - rb = NewRPCClientWithOption(ctx, c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{}}).(*RPCClient) + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcOptions := ReadConfig(ctx, rpcConfig) + rb = NewRPCClientWithOption(c, rpcOptions).(*RPCClient) } return ctx, rb, func() { @@ -123,6 +128,17 @@ func newBatchTestServer(t *testing.T, rpcHandler testBatchRPCHandler, options .. } } +func TestNewRPCClientFailDueToBatchContextMissing(t *testing.T) { + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + + assert.Panics(t, func() { + NewRPCClientWithOption(nil, ReadConfig(nil, rpcConfig)) + }) +} + func TestSyncRequestOK(t *testing.T) { rpcRequestBytes := []byte(`{ @@ -269,7 +285,7 @@ func TestSyncRPCCallBadJSONResponse(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClient(context.Background(), c).(*RPCClient) + rb := NewRPCClient(c).(*RPCClient) var txCount ethtypes.HexInteger rpcErr := rb.CallRPC(context.Background(), &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") @@ -290,7 +306,7 @@ func TestSyncRPCCallFailParseJSONResponse(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClient(context.Background(), c).(*RPCClient) + rb := NewRPCClient(c).(*RPCClient) var mapResult map[string]interface{} rpcErr := rb.CallRPC(context.Background(), &mapResult, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") @@ -341,7 +357,7 @@ func TestSyncRequestConcurrency(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + rb := NewRPCClientWithOption(c, RPCClientOptions{ MaxConcurrentRequest: 1, }).(*RPCClient) @@ -406,10 +422,12 @@ func TestBatchSyncRequestCanceledContext(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + rb := NewRPCClientWithOption(c, RPCClientOptions{ BatchOptions: &RPCClientBatchOptions{ - BatchSize: 1, - BatchWorkerCount: 1, + BatchDispatcherContext: ctx, + Enabled: true, + BatchSize: 1, + BatchMaxDispatchConcurrency: 1, }, }).(*RPCClient) @@ -425,9 +443,60 @@ func TestBatchSyncRequestCanceledContext(t *testing.T) { // this checks request hit cancel context _, err = rb.SyncRequest(ctx, &RPCRequest{}) assert.Regexp(t, "FF22063", err) +} + +func TestBatchSyncRequestCanceledContextWhenQueueing(t *testing.T) { + + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.FailNow(t, "Not expecting JSON-RPC endpoint to get called") + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay + rpcConfig.Set(ConfigBatchSize, 2000) // very big batch size + + rpcOptions := ReadConfig(ctx, rpcConfig) + + rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) + + checkDone := make(chan bool) + go func() { + reqContext := context.Background() + _, err = rb.SyncRequest(reqContext, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context + close(checkDone) + }() + time.Sleep(50 * time.Millisecond) // wait for the request to be queued and start the ticker + cancelCtx() + <-checkDone + ctx2, cancelCtx2 := context.WithCancel(context.Background()) + + rpcOptions = ReadConfig(ctx2, rpcConfig) + rb = NewRPCClientWithOption(c, rpcOptions).(*RPCClient) + + checkDone = make(chan bool) + go func() { + reqContext := context.Background() + _, err = rb.SyncRequest(reqContext, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context + close(checkDone) + }() + cancelCtx2() // cancel context straight away to check the pending request are drained correctly + <-checkDone } -func TestBatchSyncRequestCanceledContextWhenQueueingABatch(t *testing.T) { + +func TestBatchSyncRequestCanceledContextWhenDispatchingABatch(t *testing.T) { blocked := make(chan struct{}) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -444,14 +513,16 @@ func TestBatchSyncRequestCanceledContextWhenQueueingABatch(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + rb := NewRPCClientWithOption(c, RPCClientOptions{ BatchOptions: &RPCClientBatchOptions{ - BatchSize: 1, - BatchWorkerCount: 1, + BatchDispatcherContext: ctx, + Enabled: true, + BatchSize: 1, + BatchMaxDispatchConcurrency: 1, }, }).(*RPCClient) - rb.requestBatchWorkerSlots <- true // fill the worker slot so all batch will be queueing + rb.requestBatchConcurrencySlots <- true // fill the worker slot so all batch will be queueing checkDone := make(chan bool) go func() { @@ -466,6 +537,8 @@ func TestBatchSyncRequestCanceledContextWhenQueueingABatch(t *testing.T) { } func TestBatchSyncRPCCallErrorResponse(t *testing.T) { + bgCtx := context.Background() + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { assert.Equal(t, 1, len(rpcReqs)) return 500, []*RPCResponse{{ @@ -475,7 +548,10 @@ func TestBatchSyncRPCCallErrorResponse(t *testing.T) { Message: "pop", }, }} - }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}) + }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: bgCtx, + Enabled: true, + }}) rb.requestCounter = 12345 defer done() @@ -485,11 +561,15 @@ func TestBatchSyncRPCCallErrorResponse(t *testing.T) { } func TestBatchSyncRPCCallErrorCountMismatch(t *testing.T) { + bgCtx := context.Background() ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { assert.Equal(t, 1, len(rpcReqs)) return 500, []*RPCResponse{} - }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}) + }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: bgCtx, + Enabled: true, + }}) rb.requestCounter = 12345 defer done() @@ -499,6 +579,7 @@ func TestBatchSyncRPCCallErrorCountMismatch(t *testing.T) { } func TestBatchSyncRPCCallBadJSONResponse(t *testing.T) { + bgCtx := context.Background() server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") @@ -512,7 +593,10 @@ func TestBatchSyncRPCCallBadJSONResponse(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClientWithOption(context.Background(), c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}).(*RPCClient) + rb := NewRPCClientWithOption(c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: bgCtx, + Enabled: true, + }}).(*RPCClient) var txCount ethtypes.HexInteger rpcErr := rb.CallRPC(context.Background(), &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") @@ -520,6 +604,7 @@ func TestBatchSyncRPCCallBadJSONResponse(t *testing.T) { } func TestBatchSyncRPCCallFailParseJSONResponse(t *testing.T) { + bgCtx := context.Background() server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") @@ -533,7 +618,10 @@ func TestBatchSyncRPCCallFailParseJSONResponse(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClientWithOption(context.Background(), c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}).(*RPCClient) + rb := NewRPCClientWithOption(c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: bgCtx, + Enabled: true, + }}).(*RPCClient) var mapResult map[string]interface{} rpcErr := rb.CallRPC(context.Background(), &mapResult, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") @@ -589,14 +677,16 @@ func TestBatchRequestsOKWithBatchSize(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - tH := 2 * time.Hour - rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ - MaxConcurrentRequest: 10, - BatchOptions: &RPCClientBatchOptions{ - BatchDelay: &tH, // very long delay, so need to rely on batch size to be hit for sending a batch - BatchSize: 2, - }, - }).(*RPCClient) + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigMaxConcurrentRequest, 10) + rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay, so need to rely on batch size to be hit for sending a batch + rpcConfig.Set(ConfigBatchSize, 2) + + rpcOptions := ReadConfig(ctx, rpcConfig) + + rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) round := 400 @@ -670,15 +760,18 @@ func TestBatchRequestsTestWorkerCounts(t *testing.T) { assert.NoError(t, err) // only a single worker - tH := 2 * time.Hour - rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ - MaxConcurrentRequest: 10, - BatchOptions: &RPCClientBatchOptions{ - BatchDelay: &tH, // very long delay, so need to rely on batch size to be hit for sending a batch - BatchSize: 2, - BatchWorkerCount: 1, - }, - }).(*RPCClient) + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigMaxConcurrentRequest, 10) + rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay, so need to rely on batch size to be hit for sending a batch + rpcConfig.Set(ConfigBatchSize, 2) + rpcConfig.Set(ConfigBatchMaxDispatchConcurrency, 1) + + rpcOptions := ReadConfig(ctx, rpcConfig) + + rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) round := 5 // doing first round, each round will have at least 200ms delay, so the whole flow will guaranteed to be more than 1s @@ -709,12 +802,14 @@ func TestBatchRequestsTestWorkerCounts(t *testing.T) { // number of worker equal to the number of rounds // so the delay should be slightly greater than per request delay (200ms), but hopefully less than 300ms (with 100ms overhead) - rb = NewRPCClientWithOption(ctx, c, RPCClientOptions{ + rb = NewRPCClientWithOption(c, RPCClientOptions{ MaxConcurrentRequest: 10, BatchOptions: &RPCClientBatchOptions{ - BatchDelay: &tH, // very long delay, so need to rely on batch size to be hit for sending a batch - BatchSize: 2, - BatchWorkerCount: round, + BatchDispatcherContext: ctx, + Enabled: true, + BatchTimeout: 2 * time.Hour, // very long delay, so need to rely on batch size to be hit for sending a batch + BatchSize: 2, + BatchMaxDispatchConcurrency: round, }, }).(*RPCClient) @@ -780,14 +875,18 @@ func TestBatchRequestsOKWithBatchDelay(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - hundredMs := 100 * time.Millisecond - rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ - MaxConcurrentRequest: 10, - BatchOptions: &RPCClientBatchOptions{ - BatchDelay: &hundredMs, - BatchSize: 2000, // very big batch size, so need to rely on batch delay to be hit for sending a batch - }, - }).(*RPCClient) + + rpcConfig := config.RootSection("ut_fs_config") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigMaxConcurrentRequest, 10) + rpcConfig.Set(ConfigBatchTimeout, "100ms") // very long delay, so need to rely on batch size to be hit for sending a batch + rpcConfig.Set(ConfigBatchSize, 2000) + rpcConfig.Set(ConfigBatchMaxDispatchConcurrency, 1) + + rpcOptions := ReadConfig(ctx, rpcConfig) + + rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) round := 5 diff --git a/pkg/rpcbackend/config.go b/pkg/rpcbackend/config.go new file mode 100644 index 00000000..59eeda0d --- /dev/null +++ b/pkg/rpcbackend/config.go @@ -0,0 +1,77 @@ +// Copyright © 2024 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpcbackend + +import ( + "context" + "time" + + "github.com/hyperledger/firefly-common/pkg/config" +) + +const ( + // ConfigMaxRequestConcurrency the maximum number of concurrent JSON-RPC requests get processed at a time + ConfigMaxConcurrentRequest = "maxConcurrentRequest" + // ConfigBatchEnabled whether to enable batching JSON-RPC requests: https://www.jsonrpc.org/specification#batch + ConfigBatchEnabled = "batch.enabled" + // ConfigBatchSize when the amount of queued requests reaches this number, they will be batched and dispatched + ConfigBatchSize = "batch.size" + // ConfigBatchTimeout when the time since the first request was queued reaches this timeout, all requests in the queue will be batched and dispatched + ConfigBatchTimeout = "batch.timeout" + // ConfigBatchTimeout the maximum number of concurrent batch dispatching process + ConfigBatchMaxDispatchConcurrency = "batch.dispatchConcurrency" +) + +const ( + DefaultConfigBatchSize = 500 + DefaultConfigTimeout = "50ms" + DefaultConfigDispatchConcurrency = 50 +) + +type RPCClientBatchOptions struct { + Enabled bool + BatchDispatcherContext context.Context + BatchTimeout time.Duration + BatchSize int + BatchMaxDispatchConcurrency int +} + +type RPCClientOptions struct { + MaxConcurrentRequest int64 + BatchOptions *RPCClientBatchOptions +} + +func InitConfig(section config.Section) { + section.AddKnownKey(ConfigBatchEnabled, false) + section.AddKnownKey(ConfigMaxConcurrentRequest, 0) + section.AddKnownKey(ConfigBatchSize, DefaultConfigBatchSize) + section.AddKnownKey(ConfigBatchTimeout, DefaultConfigTimeout) + section.AddKnownKey(ConfigBatchMaxDispatchConcurrency, DefaultConfigDispatchConcurrency) +} + +func ReadConfig(batchDispatcherContext context.Context, section config.Section) RPCClientOptions { + return RPCClientOptions{ + MaxConcurrentRequest: section.GetInt64(ConfigMaxConcurrentRequest), + BatchOptions: &RPCClientBatchOptions{ + Enabled: section.GetBool(ConfigBatchEnabled), + BatchTimeout: section.GetDuration(ConfigBatchTimeout), + BatchSize: section.GetInt(ConfigBatchSize), + BatchMaxDispatchConcurrency: section.GetInt(ConfigBatchMaxDispatchConcurrency), + BatchDispatcherContext: batchDispatcherContext, + }, + } +} From 8d5a8ed50f1a7b85eb1adeb6a645b1113a5e0845 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 13 Aug 2024 06:46:19 +0100 Subject: [PATCH 3/6] fixing config name Signed-off-by: Chengxuan Xing --- internal/signermsgs/en_config_descriptions.go | 10 +++++----- pkg/rpcbackend/backend_test.go | 6 +++--- pkg/rpcbackend/config.go | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/signermsgs/en_config_descriptions.go b/internal/signermsgs/en_config_descriptions.go index 04e01704..39d18b97 100644 --- a/internal/signermsgs/en_config_descriptions.go +++ b/internal/signermsgs/en_config_descriptions.go @@ -54,9 +54,9 @@ var ( ConfigBackendURL = ffc("config.backend.url", "URL for the backend JSON/RPC server / blockchain node", "url") ConfigBackendProxyURL = ffc("config.backend.proxy.url", "Optional HTTP proxy URL", "url") - ConfigRPCBatchMaxConcurrentRequest = ffc("config.maxConcurrentRequest", "The maximum number of concurrent JSON-RPC requests get processed at a time", i18n.IntType) - ConfigRPCBatchEnabled = ffc("config.batch.enabled", "Whether to enable batching JSON-RPC requests", i18n.BooleanType) - ConfigRPCBatchSize = ffc("config.batch.size", "When the amount of queued requests reaches this number, they will be batched and dispatched", i18n.IntType) - ConfigRPCBatchTimeout = ffc("config.batch.timeout", "When the time since the first request was queued reaches this timeout, all requests in the queue will be batched and dispatched", i18n.TimeDurationType) - ConfigRPCBatchDispatchConcurrency = ffc("config.batch.dispatchConcurrency", "The maximum number of concurrent batch dispatching process", i18n.IntType) + ConfigRPCBatchMaxConcurrentRequests = ffc("config.maxConcurrentRequests", "The maximum number of concurrent JSON-RPC requests get processed at a time", i18n.IntType) + ConfigRPCBatchEnabled = ffc("config.batch.enabled", "Whether to enable batching JSON-RPC requests", i18n.BooleanType) + ConfigRPCBatchSize = ffc("config.batch.size", "When the amount of queued requests reaches this number, they will be batched and dispatched", i18n.IntType) + ConfigRPCBatchTimeout = ffc("config.batch.timeout", "When the time since the first request was queued reaches this timeout, all requests in the queue will be batched and dispatched", i18n.TimeDurationType) + ConfigRPCBatchDispatchConcurrency = ffc("config.batch.dispatchConcurrency", "The maximum number of concurrent batch dispatching process", i18n.IntType) ) diff --git a/pkg/rpcbackend/backend_test.go b/pkg/rpcbackend/backend_test.go index 34a42285..a3d20de2 100644 --- a/pkg/rpcbackend/backend_test.go +++ b/pkg/rpcbackend/backend_test.go @@ -680,7 +680,7 @@ func TestBatchRequestsOKWithBatchSize(t *testing.T) { rpcConfig := config.RootSection("unittest") InitConfig(rpcConfig) rpcConfig.Set(ConfigBatchEnabled, true) - rpcConfig.Set(ConfigMaxConcurrentRequest, 10) + rpcConfig.Set(ConfigMaxConcurrentRequests, 10) rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay, so need to rely on batch size to be hit for sending a batch rpcConfig.Set(ConfigBatchSize, 2) @@ -764,7 +764,7 @@ func TestBatchRequestsTestWorkerCounts(t *testing.T) { rpcConfig := config.RootSection("unittest") InitConfig(rpcConfig) rpcConfig.Set(ConfigBatchEnabled, true) - rpcConfig.Set(ConfigMaxConcurrentRequest, 10) + rpcConfig.Set(ConfigMaxConcurrentRequests, 10) rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay, so need to rely on batch size to be hit for sending a batch rpcConfig.Set(ConfigBatchSize, 2) rpcConfig.Set(ConfigBatchMaxDispatchConcurrency, 1) @@ -879,7 +879,7 @@ func TestBatchRequestsOKWithBatchDelay(t *testing.T) { rpcConfig := config.RootSection("ut_fs_config") InitConfig(rpcConfig) rpcConfig.Set(ConfigBatchEnabled, true) - rpcConfig.Set(ConfigMaxConcurrentRequest, 10) + rpcConfig.Set(ConfigMaxConcurrentRequests, 10) rpcConfig.Set(ConfigBatchTimeout, "100ms") // very long delay, so need to rely on batch size to be hit for sending a batch rpcConfig.Set(ConfigBatchSize, 2000) rpcConfig.Set(ConfigBatchMaxDispatchConcurrency, 1) diff --git a/pkg/rpcbackend/config.go b/pkg/rpcbackend/config.go index 59eeda0d..56ac098a 100644 --- a/pkg/rpcbackend/config.go +++ b/pkg/rpcbackend/config.go @@ -25,7 +25,7 @@ import ( const ( // ConfigMaxRequestConcurrency the maximum number of concurrent JSON-RPC requests get processed at a time - ConfigMaxConcurrentRequest = "maxConcurrentRequest" + ConfigMaxConcurrentRequests = "maxConcurrentRequests" // ConfigBatchEnabled whether to enable batching JSON-RPC requests: https://www.jsonrpc.org/specification#batch ConfigBatchEnabled = "batch.enabled" // ConfigBatchSize when the amount of queued requests reaches this number, they will be batched and dispatched @@ -57,7 +57,7 @@ type RPCClientOptions struct { func InitConfig(section config.Section) { section.AddKnownKey(ConfigBatchEnabled, false) - section.AddKnownKey(ConfigMaxConcurrentRequest, 0) + section.AddKnownKey(ConfigMaxConcurrentRequests, 0) section.AddKnownKey(ConfigBatchSize, DefaultConfigBatchSize) section.AddKnownKey(ConfigBatchTimeout, DefaultConfigTimeout) section.AddKnownKey(ConfigBatchMaxDispatchConcurrency, DefaultConfigDispatchConcurrency) @@ -65,7 +65,7 @@ func InitConfig(section config.Section) { func ReadConfig(batchDispatcherContext context.Context, section config.Section) RPCClientOptions { return RPCClientOptions{ - MaxConcurrentRequest: section.GetInt64(ConfigMaxConcurrentRequest), + MaxConcurrentRequest: section.GetInt64(ConfigMaxConcurrentRequests), BatchOptions: &RPCClientBatchOptions{ Enabled: section.GetBool(ConfigBatchEnabled), BatchTimeout: section.GetDuration(ConfigBatchTimeout), From 2e424e9fc180f056ef1916eda1a63fb5e1b8e81f Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 13 Aug 2024 07:35:58 +0100 Subject: [PATCH 4/6] check both context for request Signed-off-by: Chengxuan Xing --- pkg/rpcbackend/backend.go | 42 ++++++++---------- pkg/rpcbackend/backend_test.go | 81 +++++++++++++++++++++++++++------- pkg/rpcbackend/config.go | 3 +- 3 files changed, 87 insertions(+), 39 deletions(-) diff --git a/pkg/rpcbackend/backend.go b/pkg/rpcbackend/backend.go index 93e3891b..f3a8378d 100644 --- a/pkg/rpcbackend/backend.go +++ b/pkg/rpcbackend/backend.go @@ -90,6 +90,7 @@ func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Back type RPCClient struct { client *resty.Client + batchDispatcherContext context.Context concurrencySlots chan bool requestCounter int64 requestBatchQueue chan *batchRequest @@ -167,45 +168,34 @@ type batchRequest struct { } func (rc *RPCClient) startBatchDispatcher(dispatcherRootContext context.Context, batchTimeout time.Duration, batchSize int) { + rc.batchDispatcherContext = dispatcherRootContext if rc.requestBatchQueue == nil { // avoid orphaned dispatcher requestQueue := make(chan *batchRequest) go func() { var batch []*batchRequest - var ticker *time.Ticker - var tickerChannel <-chan time.Time + var timeoutChannel <-chan time.Time for { select { case req := <-requestQueue: batch = append(batch, req) - if ticker == nil { - // first request received start a ticker - ticker = time.NewTicker(batchTimeout) - defer ticker.Stop() - tickerChannel = ticker.C + if timeoutChannel == nil { + // first request received, start a batch timeout + timeoutChannel = time.After(batchTimeout) } if len(batch) >= batchSize { - rc.dispatchBatch(dispatcherRootContext, batch) + rc.dispatchBatch(rc.batchDispatcherContext, batch) batch = nil - ticker.Stop() // batch dispatched, stop the ticker, the next request will start a new ticker if needed - ticker = nil - tickerChannel = nil + timeoutChannel = nil // stop the timeout and let it get reset by the next request } - case <-tickerChannel: + case <-timeoutChannel: if len(batch) > 0 { - rc.dispatchBatch(dispatcherRootContext, batch) + rc.dispatchBatch(rc.batchDispatcherContext, batch) batch = nil - - ticker.Stop() // batch dispatched, stop the ticker, the next request will start a new ticker if needed - ticker = nil - tickerChannel = nil - } - - case <-dispatcherRootContext.Done(): - if ticker != nil { - ticker.Stop() // clean up the ticker + timeoutChannel = nil // stop the timeout and let it get reset by the next request } + case <-rc.batchDispatcherContext.Done(): select { // drain the queue case req := <-requestQueue: batch = append(batch, req) @@ -213,7 +203,7 @@ func (rc *RPCClient) startBatchDispatcher(dispatcherRootContext context.Context, } for i, req := range batch { // mark all queueing requests as failed - cancelCtxErr := i18n.NewError(dispatcherRootContext, signermsgs.MsgRequestCanceledContext, req.rpcReq.ID) + cancelCtxErr := i18n.NewError(rc.batchDispatcherContext, signermsgs.MsgRequestCanceledContext, req.rpcReq.ID) batch[i].rpcErr <- cancelCtxErr } @@ -412,6 +402,9 @@ func (rc *RPCClient) batchSyncRequest(ctx context.Context, rpcReq *RPCRequest) ( select { case rc.requestBatchQueue <- req: + case <-rc.batchDispatcherContext.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err case <-ctx.Done(): err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err @@ -422,6 +415,9 @@ func (rc *RPCClient) batchSyncRequest(ctx context.Context, rpcReq *RPCRequest) ( return rpcRes, nil case err := <-req.rpcErr: return nil, err + case <-rc.batchDispatcherContext.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err case <-ctx.Done(): err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err diff --git a/pkg/rpcbackend/backend_test.go b/pkg/rpcbackend/backend_test.go index a3d20de2..b4f79057 100644 --- a/pkg/rpcbackend/backend_test.go +++ b/pkg/rpcbackend/backend_test.go @@ -479,23 +479,74 @@ func TestBatchSyncRequestCanceledContextWhenQueueing(t *testing.T) { time.Sleep(50 * time.Millisecond) // wait for the request to be queued and start the ticker cancelCtx() <-checkDone - - ctx2, cancelCtx2 := context.WithCancel(context.Background()) - - rpcOptions = ReadConfig(ctx2, rpcConfig) - rb = NewRPCClientWithOption(c, rpcOptions).(*RPCClient) - - checkDone = make(chan bool) - go func() { - reqContext := context.Background() - _, err = rb.SyncRequest(reqContext, &RPCRequest{}) - assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context - close(checkDone) - }() - cancelCtx2() // cancel context straight away to check the pending request are drained correctly - <-checkDone } +// func TestBatchSyncRequestCanceledContextFlushTheQueueWhenRootContextIsCancelled(t *testing.T) { + +// ctx, cancelCtx := context.WithCancel(context.Background()) + +// // Define the expected server response to the batch +// rpcServerResponseBatchBytes := []byte(`[ +// { +// "jsonrpc": "2.0", +// "id": 1, +// "result": { +// "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", +// "nonce": "0x24" +// } +// } +// ]`) + +// server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// w.Header().Add("Content-Type", "application/json") +// w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) +// w.WriteHeader(200) +// w.Write(rpcServerResponseBatchBytes) +// })) +// defer server.Close() + +// signerconfig.Reset() +// prefix := signerconfig.BackendConfig +// prefix.Set(ffresty.HTTPConfigURL, server.URL) +// c, err := ffresty.New(ctx, signerconfig.BackendConfig) +// assert.NoError(t, err) + +// rpcConfig := config.RootSection("unittest") +// InitConfig(rpcConfig) +// rpcConfig.Set(ConfigBatchEnabled, true) +// rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay +// rpcConfig.Set(ConfigBatchSize, 1) +// rpcConfig.Set(ConfigBatchMaxDispatchConcurrency, 1) // set max concurrency to 1 so it can be jammed easily + +// rpcOptions := ReadConfig(ctx, rpcConfig) + +// rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) + +// checkDone := make(chan bool) +// // occupy the concurrent slot, so the first request is queued at batching +// rb.requestBatchConcurrencySlots <- true +// // emit the first request +// go func() { +// reqContext := context.Background() +// _, err = rb.SyncRequest(reqContext, &RPCRequest{ +// Method: "eth_getTransactionByHash", +// Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, +// }) +// assert.NoError(t, err) +// }() + +// // emit the second request, which is now blocked on joining the batch queue +// go func() { +// defer close(checkDone) +// reqContext := context.Background() +// _, err = rb.SyncRequest(reqContext, &RPCRequest{}) +// assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context +// }() +// cancelCtx() // cancel the context +// <-rb.requestBatchConcurrencySlots // unblock the concurrency slot +// <-checkDone +// } + func TestBatchSyncRequestCanceledContextWhenDispatchingABatch(t *testing.T) { blocked := make(chan struct{}) diff --git a/pkg/rpcbackend/config.go b/pkg/rpcbackend/config.go index 56ac098a..61d84a98 100644 --- a/pkg/rpcbackend/config.go +++ b/pkg/rpcbackend/config.go @@ -37,6 +37,7 @@ const ( ) const ( + DefaultMaxConcurrentRequests = 50 DefaultConfigBatchSize = 500 DefaultConfigTimeout = "50ms" DefaultConfigDispatchConcurrency = 50 @@ -57,7 +58,7 @@ type RPCClientOptions struct { func InitConfig(section config.Section) { section.AddKnownKey(ConfigBatchEnabled, false) - section.AddKnownKey(ConfigMaxConcurrentRequests, 0) + section.AddKnownKey(ConfigMaxConcurrentRequests, DefaultMaxConcurrentRequests) section.AddKnownKey(ConfigBatchSize, DefaultConfigBatchSize) section.AddKnownKey(ConfigBatchTimeout, DefaultConfigTimeout) section.AddKnownKey(ConfigBatchMaxDispatchConcurrency, DefaultConfigDispatchConcurrency) From da6709406ef3b8f69c76d870cf98be076f019c90 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 13 Aug 2024 16:58:06 +0100 Subject: [PATCH 5/6] adding ability to opt out batching for specific methods Signed-off-by: Chengxuan Xing --- internal/signermsgs/en_config_descriptions.go | 1 + pkg/rpcbackend/backend.go | 13 +++- pkg/rpcbackend/backend_test.go | 68 ++++++++++++++++++- pkg/rpcbackend/config.go | 11 ++- 4 files changed, 86 insertions(+), 7 deletions(-) diff --git a/internal/signermsgs/en_config_descriptions.go b/internal/signermsgs/en_config_descriptions.go index 39d18b97..b5384c62 100644 --- a/internal/signermsgs/en_config_descriptions.go +++ b/internal/signermsgs/en_config_descriptions.go @@ -58,5 +58,6 @@ var ( ConfigRPCBatchEnabled = ffc("config.batch.enabled", "Whether to enable batching JSON-RPC requests", i18n.BooleanType) ConfigRPCBatchSize = ffc("config.batch.size", "When the amount of queued requests reaches this number, they will be batched and dispatched", i18n.IntType) ConfigRPCBatchTimeout = ffc("config.batch.timeout", "When the time since the first request was queued reaches this timeout, all requests in the queue will be batched and dispatched", i18n.TimeDurationType) + ConfigRPCBatchExcludeMethodsRegex = ffc("config.batch.excludeMethodsRegex", "A Regex string to disable batch for the matching JSON-RPC methods in the requests", i18n.StringType) ConfigRPCBatchDispatchConcurrency = ffc("config.batch.dispatchConcurrency", "The maximum number of concurrent batch dispatching process", i18n.IntType) ) diff --git a/pkg/rpcbackend/backend.go b/pkg/rpcbackend/backend.go index f3a8378d..f2ffc316 100644 --- a/pkg/rpcbackend/backend.go +++ b/pkg/rpcbackend/backend.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "sync/atomic" "time" @@ -81,6 +82,15 @@ func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Back if options.BatchOptions.BatchMaxDispatchConcurrency > 0 { batchWorkerCount = options.BatchOptions.BatchMaxDispatchConcurrency } + + if options.BatchOptions.BatchExcludeMethodsRegex != "" { + excludeRegex, err := regexp.Compile(options.BatchOptions.BatchExcludeMethodsRegex) + if err != nil { + panic(err) + } + rpcClient.batchExcludeMethodsMatcher = excludeRegex + } + rpcClient.requestBatchConcurrencySlots = make(chan bool, batchWorkerCount) rpcClient.startBatchDispatcher(options.BatchOptions.BatchDispatcherContext, batchTimeout, batchSize) } @@ -95,6 +105,7 @@ type RPCClient struct { requestCounter int64 requestBatchQueue chan *batchRequest requestBatchConcurrencySlots chan bool + batchExcludeMethodsMatcher *regexp.Regexp } type RPCRequest struct { @@ -329,7 +340,7 @@ func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRe }() } - if rc.requestBatchQueue != nil { + if rc.requestBatchQueue != nil && (rc.batchExcludeMethodsMatcher == nil || !rc.batchExcludeMethodsMatcher.MatchString(rpcReq.Method)) { return rc.batchSyncRequest(ctx, rpcReq) } else { // We always set the back-end request ID - as we need to support requests coming in from diff --git a/pkg/rpcbackend/backend_test.go b/pkg/rpcbackend/backend_test.go index b4f79057..03e6a96a 100644 --- a/pkg/rpcbackend/backend_test.go +++ b/pkg/rpcbackend/backend_test.go @@ -139,6 +139,18 @@ func TestNewRPCClientFailDueToBatchContextMissing(t *testing.T) { }) } +func TestNewRPCClientFailDueToInvalidExcludeMethodRegexMissing(t *testing.T) { + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigBatchExcludeMethodsRegex, "([A-Z") + + assert.Panics(t, func() { + NewRPCClientWithOption(nil, ReadConfig(context.Background(), rpcConfig)) + }) +} + func TestSyncRequestOK(t *testing.T) { rpcRequestBytes := []byte(`{ @@ -404,6 +416,55 @@ func TestBatchSyncRPCCallNullResponse(t *testing.T) { assert.Equal(t, `null`, rpcRes.Result.String()) } +func TestBatchSyncRPCCallDisableMethods(t *testing.T) { + rpcRequestBytes := []byte(`{ + "id": 2, + "method": "eth_getTransactionByHash", + "params": [ + "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1" + ] + }`) + + rpcServerResponseBytes := []byte(`{ + "jsonrpc": "2.0", + "id": "1", + "result": { + "blockHash": "0x471a236bac44222faf63e3d7808a2a68a704a75ca2f0774f072764867f458268", + "nonce": "0x24" + } + }`) + + var rpcRequest RPCRequest + err := json.Unmarshal(rpcRequestBytes, &rpcRequest) + assert.NoError(t, err) + + var rpcServerResponse RPCResponse + err = json.Unmarshal(rpcServerResponseBytes, &rpcServerResponse) + assert.NoError(t, err) + bgCtx := context.Background() + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigBatchExcludeMethodsRegex, "^eth_getTransactionByHash$") + + ctx, rb, done := newTestServer(t, func(rpcReq *RPCRequest) (status int, rpcRes *RPCResponse) { + assert.Equal(t, "2.0", rpcReq.JSONRpc) + assert.Equal(t, "eth_getTransactionByHash", rpcReq.Method) + assert.Equal(t, `"000012346"`, rpcReq.ID.String()) + assert.Equal(t, `"0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1"`, rpcReq.Params[0].String()) + rpcServerResponse.ID = rpcReq.ID + return 200, &rpcServerResponse + }, ReadConfig(bgCtx, rpcConfig)) + rb.requestCounter = 12345 + defer done() + + rpcRes, err := rb.SyncRequest(ctx, &rpcRequest) + assert.NoError(t, err) + assert.Equal(t, `2`, rpcRes.ID.String()) + assert.Equal(t, `0x24`, rpcRes.Result.JSONObject().GetString(`nonce`)) +} + func TestBatchSyncRequestCanceledContext(t *testing.T) { blocked := make(chan struct{}) @@ -818,7 +879,7 @@ func TestBatchRequestsTestWorkerCounts(t *testing.T) { rpcConfig.Set(ConfigMaxConcurrentRequests, 10) rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay, so need to rely on batch size to be hit for sending a batch rpcConfig.Set(ConfigBatchSize, 2) - rpcConfig.Set(ConfigBatchMaxDispatchConcurrency, 1) + rpcConfig.Set(ConfigBatchDispatchConcurrency, 1) rpcOptions := ReadConfig(ctx, rpcConfig) @@ -933,7 +994,7 @@ func TestBatchRequestsOKWithBatchDelay(t *testing.T) { rpcConfig.Set(ConfigMaxConcurrentRequests, 10) rpcConfig.Set(ConfigBatchTimeout, "100ms") // very long delay, so need to rely on batch size to be hit for sending a batch rpcConfig.Set(ConfigBatchSize, 2000) - rpcConfig.Set(ConfigBatchMaxDispatchConcurrency, 1) + rpcConfig.Set(ConfigBatchDispatchConcurrency, 1) rpcOptions := ReadConfig(ctx, rpcConfig) @@ -947,11 +1008,12 @@ func TestBatchRequestsOKWithBatchDelay(t *testing.T) { requestCount := make(chan bool, reqPerRound) for i := 0; i < reqPerRound; i++ { go func() { - _, err := rb.SyncRequest(ctx, &RPCRequest{ + res, err := rb.SyncRequest(ctx, &RPCRequest{ Method: "eth_getTransactionByHash", Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, }) assert.Nil(t, err) + assert.Equal(t, `0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1`, res.Result.JSONObject().GetString(`hash`)) requestCount <- true }() diff --git a/pkg/rpcbackend/config.go b/pkg/rpcbackend/config.go index 61d84a98..a3371d56 100644 --- a/pkg/rpcbackend/config.go +++ b/pkg/rpcbackend/config.go @@ -33,7 +33,9 @@ const ( // ConfigBatchTimeout when the time since the first request was queued reaches this timeout, all requests in the queue will be batched and dispatched ConfigBatchTimeout = "batch.timeout" // ConfigBatchTimeout the maximum number of concurrent batch dispatching process - ConfigBatchMaxDispatchConcurrency = "batch.dispatchConcurrency" + ConfigBatchDispatchConcurrency = "batch.dispatchConcurrency" + // ConfigBatchExcludeMethodsRegex A Regex string to disable batch for the matching JSON-RPC methods in the requests + ConfigBatchExcludeMethodsRegex = "batch.excludeMethodsRegex" ) const ( @@ -49,6 +51,7 @@ type RPCClientBatchOptions struct { BatchTimeout time.Duration BatchSize int BatchMaxDispatchConcurrency int + BatchExcludeMethodsRegex string } type RPCClientOptions struct { @@ -61,7 +64,8 @@ func InitConfig(section config.Section) { section.AddKnownKey(ConfigMaxConcurrentRequests, DefaultMaxConcurrentRequests) section.AddKnownKey(ConfigBatchSize, DefaultConfigBatchSize) section.AddKnownKey(ConfigBatchTimeout, DefaultConfigTimeout) - section.AddKnownKey(ConfigBatchMaxDispatchConcurrency, DefaultConfigDispatchConcurrency) + section.AddKnownKey(ConfigBatchDispatchConcurrency, DefaultConfigDispatchConcurrency) + section.AddKnownKey(ConfigBatchExcludeMethodsRegex) } func ReadConfig(batchDispatcherContext context.Context, section config.Section) RPCClientOptions { @@ -71,8 +75,9 @@ func ReadConfig(batchDispatcherContext context.Context, section config.Section) Enabled: section.GetBool(ConfigBatchEnabled), BatchTimeout: section.GetDuration(ConfigBatchTimeout), BatchSize: section.GetInt(ConfigBatchSize), - BatchMaxDispatchConcurrency: section.GetInt(ConfigBatchMaxDispatchConcurrency), + BatchMaxDispatchConcurrency: section.GetInt(ConfigBatchDispatchConcurrency), BatchDispatcherContext: batchDispatcherContext, + BatchExcludeMethodsRegex: section.GetString(ConfigBatchExcludeMethodsRegex), }, } } From 448f74980271278319a9612ff3aa079f960c8dd7 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 13 Aug 2024 17:15:55 +0100 Subject: [PATCH 6/6] avoid nil pointer exeption Signed-off-by: Chengxuan Xing --- pkg/rpcbackend/backend.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/rpcbackend/backend.go b/pkg/rpcbackend/backend.go index f2ffc316..41869763 100644 --- a/pkg/rpcbackend/backend.go +++ b/pkg/rpcbackend/backend.go @@ -316,6 +316,10 @@ func (rc *RPCClient) dispatch(ctx context.Context, batch []*batchRequest) { log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", traceID, res.StatusCode(), errLog) batch[i].rpcErr <- fmt.Errorf(rpcMsg) } else { + if resp == nil { + // .... sometimes the JSON-RPC endpoint could return null... + resp = new(RPCResponse) + } if resp.Result == nil { // We don't want a result for errors, but a null success response needs to go in there resp.Result = fftypes.JSONAnyPtr(fftypes.NullString)