From 8a17a2f26c0d4aa6cd7124d6b439f92784211fea Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Wed, 29 May 2024 19:25:37 +0200 Subject: [PATCH] feat(congestion): reject new transactions on RPC level Summary: In this PR, we introduce a new failure mode on the RPC level when a transaction is submitted under congestion. The error is of type `InvalidTxError` and called `ShardCongested` with a single field `shard_id` referencing the congested shard. ## Details With [cross-shard congestion control](https://github.com/near/NEPs/pull/539) being stabilized soon, we want to reject new transactions as early as possible when the receiver shard is already overloaded with traffic. On the chunk producer level, all transactions going to a congested shard will be dropped. This keeps the memory requirements of chunk producers bounded. Further, we decided to go for a relatively low threshold in order to keep the latency of accepted transactions low, preventing new transactions as soon as we hit 25% congestion on a specific shard. Consequently, when shards are congested, it will not be long before transactions are rejected. This has consequences for the users. On the positive side, they will no longer have to wait for a long time not knowing if their transaction will be accepted or not. Either, it is executed within a bounded time (at most 20 blocks after inclusion) or it will be rejected immediately. But on the negative side, when a shard is congested, they will have to actively retry sending the transaction until it gets accepted. We hope that this can be automated by wallets, which can also provide useful live updates to the user about what is happening. But for this, they will need to understand and handle the new error `ShardCongested` different from existing errors. --- chain/chain/src/runtime/mod.rs | 16 ++++++++++++++ chain/chain/src/test_utils/kv_runtime.rs | 3 ++- chain/chain/src/types.rs | 1 + chain/client/src/client.rs | 28 ++++++++++++++++++++---- chain/jsonrpc/res/rpc_errors_schema.json | 10 ++++++++- core/primitives/src/errors.rs | 6 +++++ 6 files changed, 58 insertions(+), 6 deletions(-) diff --git a/chain/chain/src/runtime/mod.rs b/chain/chain/src/runtime/mod.rs index 32847c922f2..e87eee9926f 100644 --- a/chain/chain/src/runtime/mod.rs +++ b/chain/chain/src/runtime/mod.rs @@ -643,9 +643,25 @@ impl RuntimeAdapter for NightshadeRuntime { verify_signature: bool, epoch_id: &EpochId, current_protocol_version: ProtocolVersion, + receiver_congestion_info: Option, ) -> Result, Error> { let runtime_config = self.runtime_config_store.get_config(current_protocol_version); + if let Some(congestion_info) = receiver_congestion_info { + let congestion_control = CongestionControl::new( + runtime_config.congestion_control_config, + congestion_info.congestion_info, + congestion_info.missed_chunks_count, + ); + if !congestion_control.shard_accepts_transactions() { + let receiver_shard = + self.account_id_to_shard_uid(transaction.transaction.receiver_id(), epoch_id)?; + return Ok(Some(InvalidTxError::ShardCongested { + shard_id: receiver_shard.shard_id, + })); + } + } + if let Some(state_root) = state_root { let shard_uid = self.account_id_to_shard_uid(transaction.transaction.signer_id(), epoch_id)?; diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 80ee7b2df74..b0f9132ae96 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -17,7 +17,7 @@ use near_primitives::account::{AccessKey, Account}; use near_primitives::apply::ApplyChunkReason; use near_primitives::block::Tip; use near_primitives::block_header::{Approval, ApprovalInner}; -use near_primitives::congestion_info::CongestionInfo; +use near_primitives::congestion_info::{CongestionInfo, ExtendedCongestionInfo}; use near_primitives::epoch_manager::block_info::BlockInfo; use near_primitives::epoch_manager::epoch_info::EpochInfo; use near_primitives::epoch_manager::EpochConfig; @@ -1077,6 +1077,7 @@ impl RuntimeAdapter for KeyValueRuntime { _verify_signature: bool, _epoch_id: &EpochId, _current_protocol_version: ProtocolVersion, + _receiver_congestion_info: Option, ) -> Result, Error> { Ok(None) } diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 518ab248503..1482e25529c 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -420,6 +420,7 @@ pub trait RuntimeAdapter: Send + Sync { verify_signature: bool, epoch_id: &EpochId, current_protocol_version: ProtocolVersion, + receiver_congestion_info: Option, ) -> Result, Error>; /// Returns an ordered list of valid transactions from the pool up the given limits. diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index c3f789251e4..d94fd9e7bb4 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -2225,7 +2225,8 @@ impl Client { ) -> Result { let head = self.chain.head()?; let me = self.validator_signer.as_ref().map(|vs| vs.validator_id()); - let cur_block_header = self.chain.head_header()?; + let cur_block = self.chain.get_head_block()?; + let cur_block_header = cur_block.header(); let transaction_validity_period = self.chain.transaction_validity_period; // here it is fine to use `cur_block_header` as it is a best effort estimate. If the transaction // were to be included, the block that the chunk points to will have height >= height of @@ -2240,12 +2241,23 @@ impl Client { } let gas_price = cur_block_header.next_gas_price(); let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&head.last_block_hash)?; - + let receiver_shard = + self.epoch_manager.account_id_to_shard_id(tx.transaction.receiver_id(), &epoch_id)?; + let receiver_congestion_info = + cur_block.shards_congestion_info().get(&receiver_shard).copied(); let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?; if let Some(err) = self .runtime_adapter - .validate_tx(gas_price, None, tx, true, &epoch_id, protocol_version) + .validate_tx( + gas_price, + None, + tx, + true, + &epoch_id, + protocol_version, + receiver_congestion_info, + ) .expect("no storage errors") { debug!(target: "client", tx_hash = ?tx.get_hash(), ?err, "Invalid tx during basic validation"); @@ -2277,7 +2289,15 @@ impl Client { }; if let Some(err) = self .runtime_adapter - .validate_tx(gas_price, Some(state_root), tx, false, &epoch_id, protocol_version) + .validate_tx( + gas_price, + Some(state_root), + tx, + false, + &epoch_id, + protocol_version, + receiver_congestion_info, + ) .expect("no storage errors") { debug!(target: "client", ?err, "Invalid tx"); diff --git a/chain/jsonrpc/res/rpc_errors_schema.json b/chain/jsonrpc/res/rpc_errors_schema.json index a09dfebc2fe..f622fe3c7fb 100644 --- a/chain/jsonrpc/res/rpc_errors_schema.json +++ b/chain/jsonrpc/res/rpc_errors_schema.json @@ -569,7 +569,8 @@ "Expired", "ActionsValidation", "TransactionSizeExceeded", - "InvalidTransactionVersion" + "InvalidTransactionVersion", + "ShardCongested" ], "props": {} }, @@ -772,6 +773,13 @@ "subtypes": [], "props": {} }, + "ShardCongested": { + "name": "ShardCongested", + "subtypes": [], + "props": { + "shard_id": "" + } + }, "SignerDoesNotExist": { "name": "SignerDoesNotExist", "subtypes": [], diff --git a/core/primitives/src/errors.rs b/core/primitives/src/errors.rs index 8cd728a6f8d..cf50d5c6c70 100644 --- a/core/primitives/src/errors.rs +++ b/core/primitives/src/errors.rs @@ -178,6 +178,9 @@ pub enum InvalidTxError { TransactionSizeExceeded { size: u64, limit: u64 }, /// Transaction version is invalid. InvalidTransactionVersion, + /// The receiver shard of the transaction is too congestion to accept new + /// transactions at the moment. + ShardCongested { shard_id: u32 }, } impl std::error::Error for InvalidTxError {} @@ -576,6 +579,9 @@ impl Display for InvalidTxError { InvalidTxError::InvalidTransactionVersion => { write!(f, "Transaction version is invalid") } + InvalidTxError::ShardCongested { shard_id } => { + write!(f, "Shard {shard_id} is currently congested and rejects new transactions.") + } } } }