diff --git a/internal/rpcserver/server.go b/internal/rpcserver/server.go index 31a6cc6f..15c0edad 100644 --- a/internal/rpcserver/server.go +++ b/internal/rpcserver/server.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/internal/signermsgs/en_config_descriptions.go b/internal/signermsgs/en_config_descriptions.go index 41d456b0..b5384c62 100644 --- a/internal/signermsgs/en_config_descriptions.go +++ b/internal/signermsgs/en_config_descriptions.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -53,4 +53,11 @@ var ( ConfigBackendChainID = ffc("config.backend.chainId", "Optionally set the Chain ID of the blockchain. Otherwise the Network ID will be queried, and used as the Chain ID in signing", "number") ConfigBackendURL = ffc("config.backend.url", "URL for the backend JSON/RPC server / blockchain node", "url") ConfigBackendProxyURL = ffc("config.backend.proxy.url", "Optional HTTP proxy URL", "url") + + ConfigRPCBatchMaxConcurrentRequests = ffc("config.maxConcurrentRequests", "The maximum number of concurrent JSON-RPC requests get processed at a time", i18n.IntType) + ConfigRPCBatchEnabled = ffc("config.batch.enabled", "Whether to enable batching JSON-RPC requests", i18n.BooleanType) + ConfigRPCBatchSize = ffc("config.batch.size", "When the amount of queued requests reaches this number, they will be batched and dispatched", i18n.IntType) + ConfigRPCBatchTimeout = ffc("config.batch.timeout", "When the time since the first request was queued reaches this timeout, all requests in the queue will be batched and dispatched", i18n.TimeDurationType) + ConfigRPCBatchExcludeMethodsRegex = ffc("config.batch.excludeMethodsRegex", "A Regex string to disable batch for the matching JSON-RPC methods in the requests", i18n.StringType) + ConfigRPCBatchDispatchConcurrency = ffc("config.batch.dispatchConcurrency", "The maximum number of concurrent batch dispatching process", i18n.IntType) ) diff --git a/internal/signermsgs/en_error_messges.go b/internal/signermsgs/en_error_messges.go index 4528d4e4..a9b28468 100644 --- a/internal/signermsgs/en_error_messges.go +++ b/internal/signermsgs/en_error_messges.go @@ -103,4 +103,5 @@ var ( MsgInvalidEIP1559Transaction = ffe("FF22084", "Transaction payload invalid (EIP-1559): %v") MsgInvalidEIP155TransactionV = ffe("FF22085", "Invalid V value from EIP-155 transaction (chainId=%d)") MsgInvalidChainID = ffe("FF22086", "Invalid chainId expected=%d actual=%d") + MsgRPCRequestBatchFailed = ffe("FF22087", "Received response doesn't match the number of batched requests.") ) diff --git a/pkg/rpcbackend/backend.go b/pkg/rpcbackend/backend.go index 36fd3a5d..41869763 100644 --- a/pkg/rpcbackend/backend.go +++ b/pkg/rpcbackend/backend.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "sync/atomic" "time" @@ -64,17 +65,47 @@ func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Back rpcClient.concurrencySlots = make(chan bool, options.MaxConcurrentRequest) } + if options.BatchOptions != nil && options.BatchOptions.Enabled { + if options.BatchOptions.BatchDispatcherContext == nil { + panic("must provide a batch dispatcher context when batch is enabled") + } + batchTimeout := 50 * time.Millisecond + batchSize := 500 + batchWorkerCount := 50 + if options.BatchOptions.BatchTimeout > 0 { + batchTimeout = options.BatchOptions.BatchTimeout + } + + if options.BatchOptions.BatchSize > 0 { + batchSize = options.BatchOptions.BatchSize + } + if options.BatchOptions.BatchMaxDispatchConcurrency > 0 { + batchWorkerCount = options.BatchOptions.BatchMaxDispatchConcurrency + } + + if options.BatchOptions.BatchExcludeMethodsRegex != "" { + excludeRegex, err := regexp.Compile(options.BatchOptions.BatchExcludeMethodsRegex) + if err != nil { + panic(err) + } + rpcClient.batchExcludeMethodsMatcher = excludeRegex + } + + rpcClient.requestBatchConcurrencySlots = make(chan bool, batchWorkerCount) + rpcClient.startBatchDispatcher(options.BatchOptions.BatchDispatcherContext, batchTimeout, batchSize) + } + return rpcClient } type RPCClient struct { - client *resty.Client - concurrencySlots chan bool - requestCounter int64 -} - -type RPCClientOptions struct { - MaxConcurrentRequest int64 + client *resty.Client + batchDispatcherContext context.Context + concurrencySlots chan bool + requestCounter int64 + requestBatchQueue chan *batchRequest + requestBatchConcurrencySlots chan bool + batchExcludeMethodsMatcher *regexp.Regexp } type RPCRequest struct { @@ -141,11 +172,164 @@ func (rc *RPCClient) CallRPC(ctx context.Context, result interface{}, method str return nil } -// SyncRequest sends an individual RPC request to the backend (always over HTTP currently), -// and waits synchronously for the response, or an error. -// -// In all return paths *including error paths* the RPCResponse is populated -// so the caller has an RPC structure to send back to the front-end caller. +type batchRequest struct { + rpcReq *RPCRequest + rpcRes chan *RPCResponse + rpcErr chan error +} + +func (rc *RPCClient) startBatchDispatcher(dispatcherRootContext context.Context, batchTimeout time.Duration, batchSize int) { + rc.batchDispatcherContext = dispatcherRootContext + if rc.requestBatchQueue == nil { // avoid orphaned dispatcher + requestQueue := make(chan *batchRequest) + go func() { + var batch []*batchRequest + var timeoutChannel <-chan time.Time + for { + select { + case req := <-requestQueue: + batch = append(batch, req) + if timeoutChannel == nil { + // first request received, start a batch timeout + timeoutChannel = time.After(batchTimeout) + } + + if len(batch) >= batchSize { + rc.dispatchBatch(rc.batchDispatcherContext, batch) + batch = nil + timeoutChannel = nil // stop the timeout and let it get reset by the next request + } + + case <-timeoutChannel: + if len(batch) > 0 { + rc.dispatchBatch(rc.batchDispatcherContext, batch) + batch = nil + timeoutChannel = nil // stop the timeout and let it get reset by the next request + } + case <-rc.batchDispatcherContext.Done(): + select { // drain the queue + case req := <-requestQueue: + batch = append(batch, req) + default: + } + for i, req := range batch { + // mark all queueing requests as failed + cancelCtxErr := i18n.NewError(rc.batchDispatcherContext, signermsgs.MsgRequestCanceledContext, req.rpcReq.ID) + batch[i].rpcErr <- cancelCtxErr + } + + return + } + } + }() + rc.requestBatchQueue = requestQueue + } +} + +func (rc *RPCClient) dispatchBatch(ctx context.Context, batch []*batchRequest) { + select { + case rc.requestBatchConcurrencySlots <- true: // use a buffered channel to control the number of concurrent thread for batch dispatcher + // wait for the worker slot and continue + case <-ctx.Done(): + for _, req := range batch { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed) + req.rpcErr <- err + } + return + } + go func() { + defer func() { + <-rc.requestBatchConcurrencySlots + }() + + // once a concurrency slot is obtained, dispatch the batch + rc.dispatch(ctx, batch) + }() +} + +func (rc *RPCClient) dispatch(ctx context.Context, batch []*batchRequest) { + + batchRPCTraceID := fmt.Sprintf("batch-%d", time.Now().UnixNano()) + traceIDs := make([]string, len(batch)) + + var rpcReqs []*RPCRequest + for i, req := range batch { + // We always set the back-end request ID - as we need to support requests coming in from + // multiple concurrent clients on our front-end that might use clashing IDs. + var beReq = *req.rpcReq + beReq.JSONRpc = "2.0" + rpcTraceID := rc.allocateRequestID(&beReq) + if req.rpcReq.ID != nil { + // We're proxying a request with front-end RPC ID - log that as well + rpcTraceID = fmt.Sprintf("%s->%s/%s", req.rpcReq.ID, batchRPCTraceID, rpcTraceID) + } + traceIDs[i] = rpcTraceID + rpcReqs = append(rpcReqs, &beReq) + } + log.L(ctx).Debugf("RPC[%s] --> BATCH %d requests", batchRPCTraceID, len(rpcReqs)) + + responses := make([]*RPCResponse, len(batch)) + res, err := rc.client.R(). + SetContext(ctx). + SetBody(rpcReqs). + SetResult(&responses). + SetError(&responses). + Post("") + + if err != nil { + log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", batchRPCTraceID, err) + for _, req := range batch { + req.rpcErr <- err + } + return + } + + if len(responses) != len(batch) { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed) + for _, req := range batch { + req.rpcErr <- err + } + return + } + + for i, resp := range responses { + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonOutput, _ := json.Marshal(resp) + log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", batchRPCTraceID, jsonOutput) + } + + // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes + if res.IsError() || (resp != nil && resp.Error != nil && resp.Error.Code != 0) { + rpcMsg := "" + errLog := "" + if resp != nil { + rpcMsg = resp.Message() + errLog = rpcMsg + } + if rpcMsg == "" { + // Log the raw result in the case of JSON parse error etc. (note that Resty no longer + // returns this as an error - rather the body comes back raw) + errLog = string(res.Body()) + rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() + } + traceID := traceIDs[i] + log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", traceID, res.StatusCode(), errLog) + batch[i].rpcErr <- fmt.Errorf(rpcMsg) + } else { + if resp == nil { + // .... sometimes the JSON-RPC endpoint could return null... + resp = new(RPCResponse) + } + if resp.Result == nil { + // We don't want a result for errors, but a null success response needs to go in there + resp.Result = fftypes.JSONAnyPtr(fftypes.NullString) + } + batch[i].rpcRes <- resp + + } + } +} + func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRes *RPCResponse, err error) { if rc.concurrencySlots != nil { select { @@ -160,63 +344,99 @@ func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRe }() } - // We always set the back-end request ID - as we need to support requests coming in from - // multiple concurrent clients on our front-end that might use clashing IDs. - var beReq = *rpcReq - beReq.JSONRpc = "2.0" - rpcTraceID := rc.allocateRequestID(&beReq) - if rpcReq.ID != nil { - // We're proxying a request with front-end RPC ID - log that as well - rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID) + if rc.requestBatchQueue != nil && (rc.batchExcludeMethodsMatcher == nil || !rc.batchExcludeMethodsMatcher.MatchString(rpcReq.Method)) { + return rc.batchSyncRequest(ctx, rpcReq) + } else { + // We always set the back-end request ID - as we need to support requests coming in from + // multiple concurrent clients on our front-end that might use clashing IDs. + var beReq = *rpcReq + beReq.JSONRpc = "2.0" + rpcTraceID := rc.allocateRequestID(&beReq) + if rpcReq.ID != nil { + // We're proxying a request with front-end RPC ID - log that as well + rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID) + } + + rpcRes = new(RPCResponse) + + log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method) + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonInput, _ := json.Marshal(rpcReq) + log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput) + } + rpcStartTime := time.Now() + res, err := rc.client.R(). + SetContext(ctx). + SetBody(beReq). + SetResult(&rpcRes). + SetError(&rpcRes). + Post("") + + // Restore the original ID + rpcRes.ID = rpcReq.ID + if err != nil { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err) + log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err) + rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError) + return rpcRes, err + } + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonOutput, _ := json.Marshal(rpcRes) + log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput) + } + // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes + if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 { + rpcMsg := rpcRes.Message() + errLog := rpcMsg + if rpcMsg == "" { + // Log the raw result in the case of JSON parse error etc. (note that Resty no longer + // returns this as an error - rather the body comes back raw) + errLog = string(res.Body()) + rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() + } + log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog) + err := fmt.Errorf(rpcMsg) + return rpcRes, err + } + log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond)) + if rpcRes.Result == nil { + // We don't want a result for errors, but a null success response needs to go in there + rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString) + } + return rpcRes, nil } - rpcRes = new(RPCResponse) +} - log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method) - if logrus.IsLevelEnabled(logrus.TraceLevel) { - jsonInput, _ := json.Marshal(rpcReq) - log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput) +func (rc *RPCClient) batchSyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRes *RPCResponse, err error) { + req := &batchRequest{ + rpcReq: rpcReq, + rpcRes: make(chan *RPCResponse, 1), + rpcErr: make(chan error, 1), } - rpcStartTime := time.Now() - res, err := rc.client.R(). - SetContext(ctx). - SetBody(beReq). - SetResult(&rpcRes). - SetError(rpcRes). - Post("") - // Restore the original ID - rpcRes.ID = rpcReq.ID - if err != nil { - err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err) - log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err) - rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError) - return rpcRes, err - } - if logrus.IsLevelEnabled(logrus.TraceLevel) { - jsonOutput, _ := json.Marshal(rpcRes) - log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput) - } - // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes - if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 { - rpcMsg := rpcRes.Message() - errLog := rpcMsg - if rpcMsg == "" { - // Log the raw result in the case of JSON parse error etc. (note that Resty no longer - // returns this as an error - rather the body comes back raw) - errLog = string(res.Body()) - rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() - } - log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog) - err := fmt.Errorf(rpcMsg) - return rpcRes, err - } - log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond)) - if rpcRes.Result == nil { - // We don't want a result for errors, but a null success response needs to go in there - rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString) - } - return rpcRes, nil + select { + case rc.requestBatchQueue <- req: + case <-rc.batchDispatcherContext.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + case <-ctx.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + } + + select { + case rpcRes := <-req.rpcRes: + return rpcRes, nil + case err := <-req.rpcErr: + return nil, err + case <-rc.batchDispatcherContext.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + case <-ctx.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + } } func RPCErrorResponse(err error, id *fftypes.JSONAny, code RPCCode) *RPCResponse { diff --git a/pkg/rpcbackend/backend_test.go b/pkg/rpcbackend/backend_test.go index d8b49736..03e6a96a 100644 --- a/pkg/rpcbackend/backend_test.go +++ b/pkg/rpcbackend/backend_test.go @@ -24,7 +24,9 @@ import ( "net/http/httptest" "strconv" "testing" + "time" + "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/ffresty" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-signer/internal/signerconfig" @@ -33,9 +35,10 @@ import ( "github.com/stretchr/testify/assert" ) -type testRPCHander func(rpcReq *RPCRequest) (int, *RPCResponse) +type testRPCHandler func(rpcReq *RPCRequest) (int, *RPCResponse) +type testBatchRPCHandler func(rpcReq []*RPCRequest) (int, []*RPCResponse) -func newTestServer(t *testing.T, rpcHandler testRPCHander, options ...RPCClientOptions) (context.Context, *RPCClient, func()) { +func newTestServer(t *testing.T, rpcHandler testRPCHandler, options ...RPCClientOptions) (context.Context, *RPCClient, func()) { ctx, cancelCtx := context.WithCancel(context.Background()) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -64,7 +67,60 @@ func newTestServer(t *testing.T, rpcHandler testRPCHander, options ...RPCClientO c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClient(c).(*RPCClient) + var rb *RPCClient + + if len(options) == 1 { + rb = NewRPCClientWithOption(c, options[0]).(*RPCClient) + } else { + rb = NewRPCClient(c).(*RPCClient) + } + + return ctx, rb, func() { + cancelCtx() + server.Close() + } +} + +func newBatchTestServer(t *testing.T, rpcHandler testBatchRPCHandler, options ...RPCClientOptions) (context.Context, *RPCClient, func()) { + + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + + status, rpcRes := rpcHandler(rpcReqs) + b := []byte(`[]`) + if rpcRes != nil { + b, err = json.Marshal(rpcRes) + assert.NoError(t, err) + } + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(b))) + w.WriteHeader(status) + w.Write(b) + + })) + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) + + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + + var rb *RPCClient + + if len(options) == 1 { + rb = NewRPCClientWithOption(c, options[0]).(*RPCClient) + } else { + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcOptions := ReadConfig(ctx, rpcConfig) + rb = NewRPCClientWithOption(c, rpcOptions).(*RPCClient) + } return ctx, rb, func() { cancelCtx() @@ -72,6 +128,29 @@ func newTestServer(t *testing.T, rpcHandler testRPCHander, options ...RPCClientO } } +func TestNewRPCClientFailDueToBatchContextMissing(t *testing.T) { + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + + assert.Panics(t, func() { + NewRPCClientWithOption(nil, ReadConfig(nil, rpcConfig)) + }) +} + +func TestNewRPCClientFailDueToInvalidExcludeMethodRegexMissing(t *testing.T) { + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigBatchExcludeMethodsRegex, "([A-Z") + + assert.Panics(t, func() { + NewRPCClientWithOption(nil, ReadConfig(context.Background(), rpcConfig)) + }) +} + func TestSyncRequestOK(t *testing.T) { rpcRequestBytes := []byte(`{ @@ -308,5 +387,642 @@ func TestSyncRequestConcurrency(t *testing.T) { close(blocked) <-bgDone +} + +func TestBatchSyncRPCCallNullResponse(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { + rpcReq := rpcReqs[0] + assert.Equal(t, "2.0", rpcReq.JSONRpc) + assert.Equal(t, "eth_getTransactionReceipt", rpcReq.Method) + assert.Equal(t, `"000012346"`, rpcReq.ID.String()) + assert.Equal(t, `"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`, rpcReq.Params[0].String()) + return 200, []*RPCResponse{{ + JSONRpc: "2.0", + ID: rpcReq.ID, + Result: nil}} + }) + rb.requestCounter = 12345 + defer done() + + rpcRes, err := rb.SyncRequest(ctx, &RPCRequest{ + ID: fftypes.JSONAnyPtr("1"), + Method: "eth_getTransactionReceipt", + Params: []*fftypes.JSONAny{ + fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`), + }, + }) + assert.NoError(t, err) + assert.Equal(t, `null`, rpcRes.Result.String()) +} + +func TestBatchSyncRPCCallDisableMethods(t *testing.T) { + rpcRequestBytes := []byte(`{ + "id": 2, + "method": "eth_getTransactionByHash", + "params": [ + "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1" + ] + }`) + + rpcServerResponseBytes := []byte(`{ + "jsonrpc": "2.0", + "id": "1", + "result": { + "blockHash": "0x471a236bac44222faf63e3d7808a2a68a704a75ca2f0774f072764867f458268", + "nonce": "0x24" + } + }`) + + var rpcRequest RPCRequest + err := json.Unmarshal(rpcRequestBytes, &rpcRequest) + assert.NoError(t, err) + + var rpcServerResponse RPCResponse + err = json.Unmarshal(rpcServerResponseBytes, &rpcServerResponse) + assert.NoError(t, err) + bgCtx := context.Background() + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigBatchExcludeMethodsRegex, "^eth_getTransactionByHash$") + + ctx, rb, done := newTestServer(t, func(rpcReq *RPCRequest) (status int, rpcRes *RPCResponse) { + assert.Equal(t, "2.0", rpcReq.JSONRpc) + assert.Equal(t, "eth_getTransactionByHash", rpcReq.Method) + assert.Equal(t, `"000012346"`, rpcReq.ID.String()) + assert.Equal(t, `"0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1"`, rpcReq.Params[0].String()) + rpcServerResponse.ID = rpcReq.ID + return 200, &rpcServerResponse + }, ReadConfig(bgCtx, rpcConfig)) + rb.requestCounter = 12345 + defer done() + + rpcRes, err := rb.SyncRequest(ctx, &rpcRequest) + assert.NoError(t, err) + assert.Equal(t, `2`, rpcRes.ID.String()) + assert.Equal(t, `0x24`, rpcRes.Result.JSONObject().GetString(`nonce`)) +} + +func TestBatchSyncRequestCanceledContext(t *testing.T) { + + blocked := make(chan struct{}) + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blocked + cancelCtx() + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`[{}]`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(c, RPCClientOptions{ + BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: ctx, + Enabled: true, + BatchSize: 1, + BatchMaxDispatchConcurrency: 1, + }, + }).(*RPCClient) + + checkDone := make(chan bool) + go func() { + _, err = rb.SyncRequest(ctx, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context + close(checkDone) + }() + close(blocked) + <-checkDone + + // this checks request hit cancel context + _, err = rb.SyncRequest(ctx, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) +} + +func TestBatchSyncRequestCanceledContextWhenQueueing(t *testing.T) { + + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.FailNow(t, "Not expecting JSON-RPC endpoint to get called") + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay + rpcConfig.Set(ConfigBatchSize, 2000) // very big batch size + + rpcOptions := ReadConfig(ctx, rpcConfig) + + rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) + + checkDone := make(chan bool) + go func() { + reqContext := context.Background() + _, err = rb.SyncRequest(reqContext, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context + close(checkDone) + }() + time.Sleep(50 * time.Millisecond) // wait for the request to be queued and start the ticker + cancelCtx() + <-checkDone +} + +// func TestBatchSyncRequestCanceledContextFlushTheQueueWhenRootContextIsCancelled(t *testing.T) { + +// ctx, cancelCtx := context.WithCancel(context.Background()) + +// // Define the expected server response to the batch +// rpcServerResponseBatchBytes := []byte(`[ +// { +// "jsonrpc": "2.0", +// "id": 1, +// "result": { +// "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", +// "nonce": "0x24" +// } +// } +// ]`) + +// server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// w.Header().Add("Content-Type", "application/json") +// w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) +// w.WriteHeader(200) +// w.Write(rpcServerResponseBatchBytes) +// })) +// defer server.Close() + +// signerconfig.Reset() +// prefix := signerconfig.BackendConfig +// prefix.Set(ffresty.HTTPConfigURL, server.URL) +// c, err := ffresty.New(ctx, signerconfig.BackendConfig) +// assert.NoError(t, err) + +// rpcConfig := config.RootSection("unittest") +// InitConfig(rpcConfig) +// rpcConfig.Set(ConfigBatchEnabled, true) +// rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay +// rpcConfig.Set(ConfigBatchSize, 1) +// rpcConfig.Set(ConfigBatchMaxDispatchConcurrency, 1) // set max concurrency to 1 so it can be jammed easily + +// rpcOptions := ReadConfig(ctx, rpcConfig) + +// rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) + +// checkDone := make(chan bool) +// // occupy the concurrent slot, so the first request is queued at batching +// rb.requestBatchConcurrencySlots <- true +// // emit the first request +// go func() { +// reqContext := context.Background() +// _, err = rb.SyncRequest(reqContext, &RPCRequest{ +// Method: "eth_getTransactionByHash", +// Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, +// }) +// assert.NoError(t, err) +// }() + +// // emit the second request, which is now blocked on joining the batch queue +// go func() { +// defer close(checkDone) +// reqContext := context.Background() +// _, err = rb.SyncRequest(reqContext, &RPCRequest{}) +// assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context +// }() +// cancelCtx() // cancel the context +// <-rb.requestBatchConcurrencySlots // unblock the concurrency slot +// <-checkDone +// } + +func TestBatchSyncRequestCanceledContextWhenDispatchingABatch(t *testing.T) { + + blocked := make(chan struct{}) + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blocked + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`[{}]`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(c, RPCClientOptions{ + BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: ctx, + Enabled: true, + BatchSize: 1, + BatchMaxDispatchConcurrency: 1, + }, + }).(*RPCClient) + + rb.requestBatchConcurrencySlots <- true // fill the worker slot so all batch will be queueing + + checkDone := make(chan bool) + go func() { + _, err = rb.SyncRequest(ctx, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context + close(checkDone) + }() + time.Sleep(50 * time.Millisecond) // wait for the quest to be queued in the other go routine + cancelCtx() + <-checkDone + +} +func TestBatchSyncRPCCallErrorResponse(t *testing.T) { + + bgCtx := context.Background() + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { + assert.Equal(t, 1, len(rpcReqs)) + return 500, []*RPCResponse{{ + JSONRpc: "2.0", + ID: rpcReqs[0].ID, + Error: &RPCError{ + Message: "pop", + }, + }} + }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: bgCtx, + Enabled: true, + }}) + rb.requestCounter = 12345 + defer done() + + var txCount ethtypes.HexInteger + err := rb.CallRPC(ctx, &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "pop", err) +} + +func TestBatchSyncRPCCallErrorCountMismatch(t *testing.T) { + bgCtx := context.Background() + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { + assert.Equal(t, 1, len(rpcReqs)) + return 500, []*RPCResponse{} + }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: bgCtx, + Enabled: true, + }}) + rb.requestCounter = 12345 + defer done() + + var txCount ethtypes.HexInteger + err := rb.CallRPC(ctx, &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "FF22087", err) +} + +func TestBatchSyncRPCCallBadJSONResponse(t *testing.T) { + bgCtx := context.Background() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`{!!!!`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: bgCtx, + Enabled: true, + }}).(*RPCClient) + + var txCount ethtypes.HexInteger + rpcErr := rb.CallRPC(context.Background(), &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "FF22012", rpcErr.Error()) +} + +func TestBatchSyncRPCCallFailParseJSONResponse(t *testing.T) { + bgCtx := context.Background() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(200) + w.Write([]byte(`[{"result":"not an object"}]`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: bgCtx, + Enabled: true, + }}).(*RPCClient) + + var mapResult map[string]interface{} + rpcErr := rb.CallRPC(context.Background(), &mapResult, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "FF22065", rpcErr.Error()) +} + +func TestBatchSyncRPCCallErrorBadInput(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { return 500, nil }) + defer done() + + var txCount ethtypes.HexInteger + err := rb.CallRPC(ctx, &txCount, "test-bad-params", map[bool]bool{false: true}) + assert.Regexp(t, "FF22011", err) +} + +func TestBatchRequestsOKWithBatchSize(t *testing.T) { + // Define the expected server response to the batch + rpcServerResponseBatchBytes := []byte(`[ + { + "jsonrpc": "2.0", + "id": 1, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x24" + } + }, + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x10" + } + } + ]`) + + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) + w.WriteHeader(200) + w.Write(rpcServerResponseBatchBytes) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigMaxConcurrentRequests, 10) + rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay, so need to rely on batch size to be hit for sending a batch + rpcConfig.Set(ConfigBatchSize, 2) + + rpcOptions := ReadConfig(ctx, rpcConfig) + + rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) + + round := 400 + + reqNumbers := 2*round - 1 + + requestCount := make(chan bool, reqNumbers) + + for i := 0; i < reqNumbers; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqNumbers-1; i++ { + <-requestCount + } + + _, err = rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + + <-requestCount +} + +func TestBatchRequestsTestWorkerCounts(t *testing.T) { + // Define the expected server response to the batch + rpcServerResponseBatchBytes := []byte(`[ + { + "jsonrpc": "2.0", + "id": 1, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x24" + } + }, + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x10" + } + } + ]`) + + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + time.Sleep(200 * time.Millisecond) // set 200s delay for each quest + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) + w.WriteHeader(200) + w.Write(rpcServerResponseBatchBytes) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + + // only a single worker + + rpcConfig := config.RootSection("unittest") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigMaxConcurrentRequests, 10) + rpcConfig.Set(ConfigBatchTimeout, "2h") // very long delay, so need to rely on batch size to be hit for sending a batch + rpcConfig.Set(ConfigBatchSize, 2) + rpcConfig.Set(ConfigBatchDispatchConcurrency, 1) + + rpcOptions := ReadConfig(ctx, rpcConfig) + + rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) + + round := 5 // doing first round, each round will have at least 200ms delay, so the whole flow will guaranteed to be more than 1s + + reqNumbers := 2 * round + + requestCount := make(chan bool, reqNumbers) + + requestStart := time.Now() + + for i := 0; i < reqNumbers; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + ID: fftypes.JSONAnyPtr("testId"), + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqNumbers; i++ { + <-requestCount + } + + assert.Greater(t, time.Since(requestStart), 1*time.Second) + + // number of worker equal to the number of rounds + // so the delay should be slightly greater than per request delay (200ms), but hopefully less than 300ms (with 100ms overhead) + rb = NewRPCClientWithOption(c, RPCClientOptions{ + MaxConcurrentRequest: 10, + BatchOptions: &RPCClientBatchOptions{ + BatchDispatcherContext: ctx, + Enabled: true, + BatchTimeout: 2 * time.Hour, // very long delay, so need to rely on batch size to be hit for sending a batch + BatchSize: 2, + BatchMaxDispatchConcurrency: round, + }, + }).(*RPCClient) + + requestStart = time.Now() + + for i := 0; i < reqNumbers; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqNumbers; i++ { + <-requestCount + } + + assert.Greater(t, time.Since(requestStart), 200*time.Millisecond) + assert.Less(t, time.Since(requestStart), 300*time.Millisecond) + +} + +func TestBatchRequestsOKWithBatchDelay(t *testing.T) { + // Define the expected server response to the batch + rpcServerResponseBatchBytes := []byte(`[ + { + "jsonrpc": "2.0", + "id": 1, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x24" + } + }, + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x10" + } + } + ]`) + + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) + w.WriteHeader(200) + w.Write(rpcServerResponseBatchBytes) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + + rpcConfig := config.RootSection("ut_fs_config") + InitConfig(rpcConfig) + rpcConfig.Set(ConfigBatchEnabled, true) + rpcConfig.Set(ConfigMaxConcurrentRequests, 10) + rpcConfig.Set(ConfigBatchTimeout, "100ms") // very long delay, so need to rely on batch size to be hit for sending a batch + rpcConfig.Set(ConfigBatchSize, 2000) + rpcConfig.Set(ConfigBatchDispatchConcurrency, 1) + + rpcOptions := ReadConfig(ctx, rpcConfig) + + rb := NewRPCClientWithOption(c, rpcOptions).(*RPCClient) + + round := 5 + + reqPerRound := 2 + + for i := 0; i < round; i++ { + requestCount := make(chan bool, reqPerRound) + for i := 0; i < reqPerRound; i++ { + go func() { + res, err := rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + assert.Equal(t, `0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1`, res.Result.JSONObject().GetString(`hash`)) + requestCount <- true + + }() + } + + for i := 0; i < reqPerRound; i++ { + <-requestCount + } + + } } diff --git a/pkg/rpcbackend/config.go b/pkg/rpcbackend/config.go new file mode 100644 index 00000000..a3371d56 --- /dev/null +++ b/pkg/rpcbackend/config.go @@ -0,0 +1,83 @@ +// Copyright © 2024 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpcbackend + +import ( + "context" + "time" + + "github.com/hyperledger/firefly-common/pkg/config" +) + +const ( + // ConfigMaxRequestConcurrency the maximum number of concurrent JSON-RPC requests get processed at a time + ConfigMaxConcurrentRequests = "maxConcurrentRequests" + // ConfigBatchEnabled whether to enable batching JSON-RPC requests: https://www.jsonrpc.org/specification#batch + ConfigBatchEnabled = "batch.enabled" + // ConfigBatchSize when the amount of queued requests reaches this number, they will be batched and dispatched + ConfigBatchSize = "batch.size" + // ConfigBatchTimeout when the time since the first request was queued reaches this timeout, all requests in the queue will be batched and dispatched + ConfigBatchTimeout = "batch.timeout" + // ConfigBatchTimeout the maximum number of concurrent batch dispatching process + ConfigBatchDispatchConcurrency = "batch.dispatchConcurrency" + // ConfigBatchExcludeMethodsRegex A Regex string to disable batch for the matching JSON-RPC methods in the requests + ConfigBatchExcludeMethodsRegex = "batch.excludeMethodsRegex" +) + +const ( + DefaultMaxConcurrentRequests = 50 + DefaultConfigBatchSize = 500 + DefaultConfigTimeout = "50ms" + DefaultConfigDispatchConcurrency = 50 +) + +type RPCClientBatchOptions struct { + Enabled bool + BatchDispatcherContext context.Context + BatchTimeout time.Duration + BatchSize int + BatchMaxDispatchConcurrency int + BatchExcludeMethodsRegex string +} + +type RPCClientOptions struct { + MaxConcurrentRequest int64 + BatchOptions *RPCClientBatchOptions +} + +func InitConfig(section config.Section) { + section.AddKnownKey(ConfigBatchEnabled, false) + section.AddKnownKey(ConfigMaxConcurrentRequests, DefaultMaxConcurrentRequests) + section.AddKnownKey(ConfigBatchSize, DefaultConfigBatchSize) + section.AddKnownKey(ConfigBatchTimeout, DefaultConfigTimeout) + section.AddKnownKey(ConfigBatchDispatchConcurrency, DefaultConfigDispatchConcurrency) + section.AddKnownKey(ConfigBatchExcludeMethodsRegex) +} + +func ReadConfig(batchDispatcherContext context.Context, section config.Section) RPCClientOptions { + return RPCClientOptions{ + MaxConcurrentRequest: section.GetInt64(ConfigMaxConcurrentRequests), + BatchOptions: &RPCClientBatchOptions{ + Enabled: section.GetBool(ConfigBatchEnabled), + BatchTimeout: section.GetDuration(ConfigBatchTimeout), + BatchSize: section.GetInt(ConfigBatchSize), + BatchMaxDispatchConcurrency: section.GetInt(ConfigBatchDispatchConcurrency), + BatchDispatcherContext: batchDispatcherContext, + BatchExcludeMethodsRegex: section.GetString(ConfigBatchExcludeMethodsRegex), + }, + } +}