From d1ea36f5b5ab66ef4c94ada4483190415b82a89a Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Wed, 29 May 2024 19:25:37 +0200 Subject: [PATCH 1/4] feat(congestion): reject new transactions on RPC level Summary: In this PR, we introduce a new failure mode on the RPC level when a transaction is submitted under congestion. The error is of type `InvalidTxError` and called `ShardCongested` with a single field `shard_id` referencing the congested shard. ## Details With [cross-shard congestion control](https://github.com/near/NEPs/pull/539) being stabilized soon, we want to reject new transactions as early as possible when the receiver shard is already overloaded with traffic. On the chunk producer level, all transactions going to a congested shard will be dropped. This keeps the memory requirements of chunk producers bounded. Further, we decided to go for a relatively low threshold in order to keep the latency of accepted transactions low, preventing new transactions as soon as we hit 25% congestion on a specific shard. Consequently, when shards are congested, it will not be long before transactions are rejected. This has consequences for the users. On the positive side, they will no longer have to wait for a long time not knowing if their transaction will be accepted or not. Either, it is executed within a bounded time (at most 20 blocks after inclusion) or it will be rejected immediately. But on the negative side, when a shard is congested, they will have to actively retry sending the transaction until it gets accepted. We hope that this can be automated by wallets, which can also provide useful live updates to the user about what is happening. But for this, they will need to understand and handle the new error `ShardCongested` different from existing errors. --- chain/chain/src/runtime/mod.rs | 16 ++++++++++++++ chain/chain/src/test_utils/kv_runtime.rs | 3 ++- chain/chain/src/types.rs | 1 + chain/client/src/client.rs | 28 ++++++++++++++++++++---- chain/jsonrpc/res/rpc_errors_schema.json | 10 ++++++++- core/primitives/src/errors.rs | 6 +++++ 6 files changed, 58 insertions(+), 6 deletions(-) diff --git a/chain/chain/src/runtime/mod.rs b/chain/chain/src/runtime/mod.rs index 1d292538219..8c668059c82 100644 --- a/chain/chain/src/runtime/mod.rs +++ b/chain/chain/src/runtime/mod.rs @@ -643,9 +643,25 @@ impl RuntimeAdapter for NightshadeRuntime { verify_signature: bool, epoch_id: &EpochId, current_protocol_version: ProtocolVersion, + receiver_congestion_info: Option, ) -> Result, Error> { let runtime_config = self.runtime_config_store.get_config(current_protocol_version); + if let Some(congestion_info) = receiver_congestion_info { + let congestion_control = CongestionControl::new( + runtime_config.congestion_control_config, + congestion_info.congestion_info, + congestion_info.missed_chunks_count, + ); + if !congestion_control.shard_accepts_transactions() { + let receiver_shard = + self.account_id_to_shard_uid(transaction.transaction.receiver_id(), epoch_id)?; + return Ok(Some(InvalidTxError::ShardCongested { + shard_id: receiver_shard.shard_id, + })); + } + } + if let Some(state_root) = state_root { let shard_uid = self.account_id_to_shard_uid(transaction.transaction.signer_id(), epoch_id)?; diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 6b267550e81..7458b669a00 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -18,7 +18,7 @@ use near_primitives::account::{AccessKey, Account}; use near_primitives::apply::ApplyChunkReason; use near_primitives::block::Tip; use near_primitives::block_header::{Approval, ApprovalInner}; -use near_primitives::congestion_info::CongestionInfo; +use near_primitives::congestion_info::{CongestionInfo, ExtendedCongestionInfo}; use near_primitives::epoch_manager::block_info::BlockInfo; use near_primitives::epoch_manager::epoch_info::EpochInfo; use near_primitives::epoch_manager::EpochConfig; @@ -1089,6 +1089,7 @@ impl RuntimeAdapter for KeyValueRuntime { _verify_signature: bool, _epoch_id: &EpochId, _current_protocol_version: ProtocolVersion, + _receiver_congestion_info: Option, ) -> Result, Error> { Ok(None) } diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 736b3279e33..2b7a3f4a7ff 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -417,6 +417,7 @@ pub trait RuntimeAdapter: Send + Sync { verify_signature: bool, epoch_id: &EpochId, current_protocol_version: ProtocolVersion, + receiver_congestion_info: Option, ) -> Result, Error>; /// Returns an ordered list of valid transactions from the pool up the given limits. diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 9bf4d31735e..77d3e1ee227 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -2259,7 +2259,8 @@ impl Client { ) -> Result { let head = self.chain.head()?; let me = self.validator_signer.as_ref().map(|vs| vs.validator_id()); - let cur_block_header = self.chain.head_header()?; + let cur_block = self.chain.get_head_block()?; + let cur_block_header = cur_block.header(); let transaction_validity_period = self.chain.transaction_validity_period; // here it is fine to use `cur_block_header` as it is a best effort estimate. If the transaction // were to be included, the block that the chunk points to will have height >= height of @@ -2274,12 +2275,23 @@ impl Client { } let gas_price = cur_block_header.next_gas_price(); let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&head.last_block_hash)?; - + let receiver_shard = + self.epoch_manager.account_id_to_shard_id(tx.transaction.receiver_id(), &epoch_id)?; + let receiver_congestion_info = + cur_block.shards_congestion_info().get(&receiver_shard).copied(); let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?; if let Some(err) = self .runtime_adapter - .validate_tx(gas_price, None, tx, true, &epoch_id, protocol_version) + .validate_tx( + gas_price, + None, + tx, + true, + &epoch_id, + protocol_version, + receiver_congestion_info, + ) .expect("no storage errors") { debug!(target: "client", tx_hash = ?tx.get_hash(), ?err, "Invalid tx during basic validation"); @@ -2311,7 +2323,15 @@ impl Client { }; if let Some(err) = self .runtime_adapter - .validate_tx(gas_price, Some(state_root), tx, false, &epoch_id, protocol_version) + .validate_tx( + gas_price, + Some(state_root), + tx, + false, + &epoch_id, + protocol_version, + receiver_congestion_info, + ) .expect("no storage errors") { debug!(target: "client", ?err, "Invalid tx"); diff --git a/chain/jsonrpc/res/rpc_errors_schema.json b/chain/jsonrpc/res/rpc_errors_schema.json index a09dfebc2fe..f622fe3c7fb 100644 --- a/chain/jsonrpc/res/rpc_errors_schema.json +++ b/chain/jsonrpc/res/rpc_errors_schema.json @@ -569,7 +569,8 @@ "Expired", "ActionsValidation", "TransactionSizeExceeded", - "InvalidTransactionVersion" + "InvalidTransactionVersion", + "ShardCongested" ], "props": {} }, @@ -772,6 +773,13 @@ "subtypes": [], "props": {} }, + "ShardCongested": { + "name": "ShardCongested", + "subtypes": [], + "props": { + "shard_id": "" + } + }, "SignerDoesNotExist": { "name": "SignerDoesNotExist", "subtypes": [], diff --git a/core/primitives/src/errors.rs b/core/primitives/src/errors.rs index 8cd728a6f8d..cf50d5c6c70 100644 --- a/core/primitives/src/errors.rs +++ b/core/primitives/src/errors.rs @@ -178,6 +178,9 @@ pub enum InvalidTxError { TransactionSizeExceeded { size: u64, limit: u64 }, /// Transaction version is invalid. InvalidTransactionVersion, + /// The receiver shard of the transaction is too congestion to accept new + /// transactions at the moment. + ShardCongested { shard_id: u32 }, } impl std::error::Error for InvalidTxError {} @@ -576,6 +579,9 @@ impl Display for InvalidTxError { InvalidTxError::InvalidTransactionVersion => { write!(f, "Transaction version is invalid") } + InvalidTxError::ShardCongested { shard_id } => { + write!(f, "Shard {shard_id} is currently congested and rejects new transactions.") + } } } } From f161e5440b284f0c22a0ebab298b45218ee4f51a Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 30 May 2024 10:11:23 +0200 Subject: [PATCH 2/4] fix existing tests Some tests should run without congestion control to function the same way the did. And some congestion control tests will observe the new error, which is expected. --- .../src/tests/client/features/congestion_control.rs | 10 ++++++++-- integration-tests/src/tests/client/resharding.rs | 2 +- integration-tests/src/tests/client/sync_state_nodes.rs | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/integration-tests/src/tests/client/features/congestion_control.rs b/integration-tests/src/tests/client/features/congestion_control.rs index 65a29fa3670..088b6c81cd3 100644 --- a/integration-tests/src/tests/client/features/congestion_control.rs +++ b/integration-tests/src/tests/client/features/congestion_control.rs @@ -6,7 +6,9 @@ use near_o11y::testonly::init_test_logger; use near_parameters::{RuntimeConfig, RuntimeConfigStore}; use near_primitives::account::id::AccountId; use near_primitives::congestion_info::{CongestionControl, CongestionInfo}; -use near_primitives::errors::{ActionErrorKind, FunctionCallError, TxExecutionError}; +use near_primitives::errors::{ + ActionErrorKind, FunctionCallError, InvalidTxError, TxExecutionError, +}; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardLayout; use near_primitives::sharding::{ShardChunk, ShardChunkHeader}; @@ -292,7 +294,11 @@ fn submit_n_100tgas_fns(env: &mut TestEnv, n: u32, nonce: &mut u64, signer: &InM let fn_tx = new_fn_call_100tgas(nonce, signer, *block.hash()); // this only adds the tx to the pool, no chain progress is made let response = env.clients[0].process_tx(fn_tx, false, false); - assert_eq!(response, ProcessTxResponse::ValidTx); + match response { + ProcessTxResponse::ValidTx + | ProcessTxResponse::InvalidTx(InvalidTxError::ShardCongested { .. }) => (), + other => panic!("unexpected result from submitting tx: {other:?}"), + } } } diff --git a/integration-tests/src/tests/client/resharding.rs b/integration-tests/src/tests/client/resharding.rs index 9bdf874c8ca..bffff1f1124 100644 --- a/integration-tests/src/tests/client/resharding.rs +++ b/integration-tests/src/tests/client/resharding.rs @@ -239,7 +239,7 @@ impl TestReshardingEnv { .validator_seats(num_validators) .real_stores() .epoch_managers_with_test_overrides(epoch_config_test_overrides) - .nightshade_runtimes(&genesis) + .nightshade_runtimes_congestion_control_disabled(&genesis) .track_all_shards() .build(); assert_eq!(env.validators.len(), num_validators); diff --git a/integration-tests/src/tests/client/sync_state_nodes.rs b/integration-tests/src/tests/client/sync_state_nodes.rs index 9b60c2098fb..d42b028d7ed 100644 --- a/integration-tests/src/tests/client/sync_state_nodes.rs +++ b/integration-tests/src/tests/client/sync_state_nodes.rs @@ -570,7 +570,7 @@ fn test_dump_epoch_missing_chunk_in_last_block() { .clients_count(2) .use_state_snapshots() .real_stores() - .nightshade_runtimes(&genesis) + .nightshade_runtimes_congestion_control_disabled(&genesis) .build(); let genesis_block = env.clients[0].chain.get_block_by_height(0).unwrap(); From 68918915e9f37a2dd321db12c37d04a2e7779084 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Mon, 3 Jun 2024 11:28:40 +0200 Subject: [PATCH 3/4] add a test for RPCs rejecting new transactions --- .../client/features/congestion_control.rs | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/integration-tests/src/tests/client/features/congestion_control.rs b/integration-tests/src/tests/client/features/congestion_control.rs index 088b6c81cd3..5afa18fed25 100644 --- a/integration-tests/src/tests/client/features/congestion_control.rs +++ b/integration-tests/src/tests/client/features/congestion_control.rs @@ -1,3 +1,4 @@ +use assert_matches::assert_matches; use near_chain_configs::Genesis; use near_client::test_utils::TestEnv; use near_client::ProcessTxResponse; @@ -571,3 +572,53 @@ fn measure_tx_limit( local_tx_included_with_congestion, ) } + +/// Test that RPC clients stop accepting transactions when the receiver is +/// congested. +#[test] +fn test_rpc_client_rejection() { + let sender_id: AccountId = "test0".parse().unwrap(); + let mut env = setup_runtime(sender_id.clone(), PROTOCOL_VERSION); + + // prepare a contract to call + setup_contract(&mut env); + + let signer = InMemorySigner::from_seed(sender_id.clone(), KeyType::ED25519, sender_id.as_str()); + let mut nonce = 10; + + // Check we can send transactions at the start. + let fn_tx = new_fn_call_100tgas( + &mut nonce, + &signer, + *env.clients[0].chain.head_header().unwrap().hash(), + ); + let response = env.clients[0].process_tx(fn_tx, false, false); + assert_eq!(response, ProcessTxResponse::ValidTx); + + // Congest the network with a burst of 100 PGas. + submit_n_100tgas_fns(&mut env, 1_000, &mut nonce, &signer); + + // Allow transactions to enter the chain and enough receipts to arrive at + // the receiver shard for it to become congested. + let tip = env.clients[0].chain.head().unwrap(); + for i in 1..10 { + env.produce_block(0, tip.height + i); + } + + // Check that congestion control rejects new transactions. + let fn_tx = new_fn_call_100tgas( + &mut nonce, + &signer, + *env.clients[0].chain.head_header().unwrap().hash(), + ); + let response = env.clients[0].process_tx(fn_tx, false, false); + + if ProtocolFeature::CongestionControl.enabled(PROTOCOL_VERSION) { + assert_matches!( + response, + ProcessTxResponse::InvalidTx(InvalidTxError::ShardCongested { .. }) + ); + } else { + assert_eq!(response, ProcessTxResponse::ValidTx); + } +} From 5b25b6cb430a77575efd1a86c879be44101f0d75 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Mon, 3 Jun 2024 17:01:38 +0200 Subject: [PATCH 4/4] fix typo --- core/primitives/src/errors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/primitives/src/errors.rs b/core/primitives/src/errors.rs index cf50d5c6c70..1cedc3638ed 100644 --- a/core/primitives/src/errors.rs +++ b/core/primitives/src/errors.rs @@ -178,7 +178,7 @@ pub enum InvalidTxError { TransactionSizeExceeded { size: u64, limit: u64 }, /// Transaction version is invalid. InvalidTransactionVersion, - /// The receiver shard of the transaction is too congestion to accept new + /// The receiver shard of the transaction is too congested to accept new /// transactions at the moment. ShardCongested { shard_id: u32 }, }