From 265b2c65f95ca7d29fa106beeb965e21378c6314 Mon Sep 17 00:00:00 2001 From: Jakub Zajkowski Date: Fri, 11 Oct 2024 00:01:49 +0200 Subject: [PATCH] Added QPS rate limiting mechanism to the binary port. The rate limiting mechanism uses sliding window algorithm. Removed "client_request_buffer_size" and "client_request_limit" properties of the "binary_port_server" section. Added "qps_limit" property to said section. --- binary_port/src/error_code.rs | 5 + node/src/components/binary_port.rs | 92 ++++- node/src/components/binary_port/config.rs | 15 +- .../components/binary_port/rate_limiter.rs | 381 ++++++++++++++++++ node/src/components/binary_port/tests.rs | 7 +- node/src/effect.rs | 15 +- node/src/effect/requests.rs | 28 +- node/src/reactor/main_reactor.rs | 3 - node/src/reactor/main_reactor/error.rs | 9 +- resources/local/config.toml | 4 + resources/production/config-example.toml | 4 + 11 files changed, 496 insertions(+), 67 deletions(-) create mode 100644 node/src/components/binary_port/rate_limiter.rs diff --git a/binary_port/src/error_code.rs b/binary_port/src/error_code.rs index 9b552d1f7b..d1cd026ae2 100644 --- a/binary_port/src/error_code.rs +++ b/binary_port/src/error_code.rs @@ -280,6 +280,9 @@ pub enum ErrorCode { /// Purse was not found for given identifier. #[error("purse was not found for given identifier")] PurseNotFound = 87, + /// Too many requests per second. + #[error("request was throttled")] + TooManyRequests = 88, } impl TryFrom for ErrorCode { @@ -375,6 +378,7 @@ impl TryFrom for ErrorCode { 85 => Ok(ErrorCode::GasPriceToleranceTooLow), 86 => Ok(ErrorCode::ReceivedV1Transaction), 87 => Ok(ErrorCode::PurseNotFound), + 88 => Ok(ErrorCode::TooManyRequests), _ => Err(UnknownErrorCode), } } @@ -536,6 +540,7 @@ mod tests { "variant {} not covered by TryFrom implementation", as_int ); + assert_eq!(decoded.unwrap(), variant); } } } diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index 1040f093f6..fa8fd09461 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -3,6 +3,7 @@ mod config; mod error; mod event; mod metrics; +mod rate_limiter; #[cfg(test)] mod tests; @@ -40,16 +41,18 @@ use casper_types::{ ContractWasmHash, Digest, EntityAddr, GlobalStateIdentifier, Key, Package, PackageAddr, Peers, ProtocolVersion, Rewards, SignedBlock, StoredValue, TimeDiff, Transaction, URef, }; +use thiserror::Error as ThisError; use datasize::DataSize; use either::Either; use futures::{SinkExt, StreamExt}; use once_cell::sync::OnceCell; use prometheus::Registry; +use rate_limiter::{LimiterResponse, RateLimiter, RateLimiterError}; use tokio::{ join, net::{TcpListener, TcpStream}, - sync::{Notify, OwnedSemaphorePermit, Semaphore}, + sync::{Mutex, Notify, OwnedSemaphorePermit, Semaphore}, }; use tokio_util::codec::Framed; use tracing::{debug, error, info, warn}; @@ -81,6 +84,26 @@ pub(crate) use event::Event; const COMPONENT_NAME: &str = "binary_port"; +#[derive(Debug, ThisError)] +pub(crate) enum BinaryPortInitializationError { + #[error("could not initialize rate limiter: {0}")] + CannotInitializeRateLimiter(String), + #[error("could not initialize metrics: {0}")] + CannotInitializeMetrics(prometheus::Error), +} + +impl From for BinaryPortInitializationError { + fn from(value: RateLimiterError) -> Self { + BinaryPortInitializationError::CannotInitializeRateLimiter(value.to_string()) + } +} + +impl From for BinaryPortInitializationError { + fn from(value: prometheus::Error) -> Self { + BinaryPortInitializationError::CannotInitializeMetrics(value) + } +} + #[derive(Debug, DataSize)] pub(crate) struct BinaryPort { #[data_size(skip)] @@ -90,6 +113,8 @@ pub(crate) struct BinaryPort { #[data_size(skip)] chainspec: Arc, #[data_size(skip)] + protocol_version: ProtocolVersion, + #[data_size(skip)] connection_limit: Arc, #[data_size(skip)] metrics: Arc, @@ -99,6 +124,8 @@ pub(crate) struct BinaryPort { shutdown_trigger: Arc, #[data_size(skip)] server_join_handle: OnceCell>, + #[data_size(skip)] + rate_limiter: Arc>, } impl BinaryPort { @@ -106,16 +133,23 @@ impl BinaryPort { config: Config, chainspec: Arc, registry: &Registry, - ) -> Result { + ) -> Result { + let rate_limiter = Arc::new(Mutex::new( + RateLimiter::new(config.qps_limit, TimeDiff::from_seconds(1)) + .map_err(BinaryPortInitializationError::from)?, + )); + let protocol_version = chainspec.protocol_version(); Ok(Self { state: ComponentState::Uninitialized, connection_limit: Arc::new(Semaphore::new(config.max_connections)), config: Arc::new(config), chainspec, - metrics: Arc::new(Metrics::new(registry)?), + protocol_version, + metrics: Arc::new(Metrics::new(registry).map_err(BinaryPortInitializationError::from)?), local_addr: Arc::new(OnceCell::new()), shutdown_trigger: Arc::new(Notify::new()), server_join_handle: OnceCell::new(), + rate_limiter, }) } @@ -134,6 +168,7 @@ async fn handle_request( config: &Config, chainspec: &Chainspec, metrics: &Metrics, + protocol_version: ProtocolVersion, ) -> BinaryResponse where REv: From @@ -148,7 +183,6 @@ where + From + Send, { - let protocol_version = effect_builder.get_protocol_version().await; match req { BinaryRequest::TryAcceptTransaction { transaction } => { metrics.binary_port_try_accept_transaction_count.inc(); @@ -1396,6 +1430,8 @@ async fn handle_client_loop( stream: TcpStream, effect_builder: EffectBuilder, max_message_size_bytes: u32, + rate_limiter: Arc>, + version: ProtocolVersion, ) -> Result<(), Error> where REv: From @@ -1423,8 +1459,8 @@ where return Err(Error::NoPayload); }; - let version = effect_builder.get_protocol_version().await; - let (response, id) = handle_payload(effect_builder, payload, version).await; + let (response, id) = + handle_payload(effect_builder, payload, version, Arc::clone(&rate_limiter)).await; framed .send(BinaryMessage::new( BinaryResponseAndRequest::new(response, payload, id).to_bytes()?, @@ -1462,6 +1498,7 @@ async fn handle_payload( effect_builder: EffectBuilder, payload: &[u8], protocol_version: ProtocolVersion, + rate_limiter: Arc>, ) -> (BinaryResponse, u16) where REv: From, @@ -1473,6 +1510,13 @@ where let request_id = header.id(); + if let LimiterResponse::Throttled = rate_limiter.lock().await.throttle() { + return ( + BinaryResponse::new_error(ErrorCode::TooManyRequests, protocol_version), + request_id, + ); + } + if !header .protocol_version() .is_compatible_with(&protocol_version) @@ -1519,6 +1563,8 @@ async fn handle_client( effect_builder: EffectBuilder, config: Arc, _permit: OwnedSemaphorePermit, + rate_limiter: Arc>, + protocol_version: ProtocolVersion, ) where REv: From + From @@ -1532,8 +1578,14 @@ async fn handle_client( + From + Send, { - if let Err(err) = - handle_client_loop(stream, effect_builder, config.max_message_size_bytes).await + if let Err(err) = handle_client_loop( + stream, + effect_builder, + config.max_message_size_bytes, + rate_limiter, + protocol_version, + ) + .await { // Low severity is used to prevent malicious clients from causing log floods. info!(%addr, err=display_error(&err), "binary port client handler error"); @@ -1783,7 +1835,16 @@ where if let Ok(permit) = Arc::clone(&self.connection_limit).try_acquire_owned() { self.metrics.binary_port_connections_count.inc(); let config = Arc::clone(&self.config); - tokio::spawn(handle_client(peer, stream, effect_builder, config, permit)); + let rate_limiter = Arc::clone(&self.rate_limiter); + tokio::spawn(handle_client( + peer, + stream, + effect_builder, + config, + permit, + rate_limiter, + self.protocol_version, + )); } else { warn!( "connection limit reached, dropping connection from {}", @@ -1796,10 +1857,17 @@ where let config = Arc::clone(&self.config); let chainspec = Arc::clone(&self.chainspec); let metrics = Arc::clone(&self.metrics); + let protocol_version = self.protocol_version; async move { - let response = - handle_request(request, effect_builder, &config, &chainspec, &metrics) - .await; + let response = handle_request( + request, + effect_builder, + &config, + &chainspec, + &metrics, + protocol_version, + ) + .await; responder.respond(response).await; } .ignore() diff --git a/node/src/components/binary_port/config.rs b/node/src/components/binary_port/config.rs index 828a3c1ddb..808df2c194 100644 --- a/node/src/components/binary_port/config.rs +++ b/node/src/components/binary_port/config.rs @@ -5,12 +5,10 @@ use serde::{Deserialize, Serialize}; const DEFAULT_ADDRESS: &str = "0.0.0.0:0"; /// Default maximum message size. const DEFAULT_MAX_MESSAGE_SIZE: u32 = 4 * 1024 * 1024; -/// Default request limit. -const DEFAULT_CLIENT_REQUEST_LIMIT: u16 = 3; -/// Default request buffer size. -const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 16; /// Default maximum number of connections. const DEFAULT_MAX_CONNECTIONS: usize = 16; +/// Default maximum number of requests per second. +const DEFAULT_MAX_QPS: usize = 100; /// Binary port server configuration. #[derive(Clone, DataSize, Debug, Deserialize, Serialize)] @@ -31,12 +29,10 @@ pub struct Config { pub allow_request_speculative_exec: bool, /// Maximum size of the binary port message. pub max_message_size_bytes: u32, - /// Maximum number of in-flight requests per client. - pub client_request_limit: u16, - /// Number of requests that can be buffered per client. - pub client_request_buffer_size: usize, /// Maximum number of connections to the server. pub max_connections: usize, + /// Maximum number of requests per second. + pub qps_limit: usize, } impl Config { @@ -48,10 +44,9 @@ impl Config { allow_request_get_all_values: false, allow_request_get_trie: false, allow_request_speculative_exec: false, - client_request_limit: DEFAULT_CLIENT_REQUEST_LIMIT, max_message_size_bytes: DEFAULT_MAX_MESSAGE_SIZE, - client_request_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE, max_connections: DEFAULT_MAX_CONNECTIONS, + qps_limit: DEFAULT_MAX_QPS, } } } diff --git a/node/src/components/binary_port/rate_limiter.rs b/node/src/components/binary_port/rate_limiter.rs new file mode 100644 index 0000000000..a4bc595d72 --- /dev/null +++ b/node/src/components/binary_port/rate_limiter.rs @@ -0,0 +1,381 @@ +use casper_types::{TimeDiff, Timestamp}; +use thiserror::Error as ThisError; + +#[derive(Debug, ThisError)] +pub(crate) enum RateLimiterError { + #[error("Cannot create Rate limiter with 0 max_requests")] + EmptyWindowNotAllowed, + #[error("Maximum window duration is too large")] + WindowDurationTooLarge, + #[error("Maximum window duration is too small")] + WindowDurationTooSmall, +} + +const MAX_WINDOW_DURATION_MS: u64 = 1000 * 60 * 60; // 1 hour + +#[derive(PartialEq, Eq, Debug)] +/// Response from the rate limiter. +pub(crate) enum LimiterResponse { + /// when limiter allowed the request + Allowed, + /// when limiter throttled the request + Throttled, +} + +/// A buffer to store timestamps of requests. The assumption is that the buffer will keep the +/// monotonical order of timestamps as they are pushed. +#[derive(Debug)] +struct Buffer { + buffer: Vec, + in_index: usize, + out_index: usize, + capacity: usize, +} + +impl Buffer { + fn new(size: usize) -> Self { + Buffer { + buffer: vec![0; size + 1], + in_index: 0, + out_index: 0, + capacity: size + 1, + } + } + + fn is_full(&self) -> bool { + self.in_index == (self.out_index + self.capacity - 1) % self.capacity + } + + fn is_empty(&self) -> bool { + self.in_index == self.out_index + } + + //This should only be used from `push` + fn push_and_slide(&mut self, value: u64) -> bool { + let out_index = self.out_index as i32; + let capacity = self.capacity as i32; + let mut to_index = self.in_index as i32; + let mut from_index = (self.in_index as i32 + capacity - 1) % capacity; + + while to_index != out_index && self.buffer[from_index as usize] > value { + self.buffer[to_index as usize] = self.buffer[from_index as usize]; + to_index = (to_index + capacity - 1) % capacity; + from_index = (from_index + capacity - 1) % capacity; + } + self.buffer[to_index as usize] = value; + self.in_index = (self.in_index + 1) % self.capacity; + true + } + + fn push(&mut self, value: u64) -> bool { + if self.is_full() { + return false; + } + if !self.is_empty() { + let last_stored_index = (self.in_index + self.capacity - 1) % self.capacity; + let last_stored = self.buffer[last_stored_index]; + // We are expecting values to be monotonically increasing. But there is a scenario in + // which the system time might be changed to a previous time. + // We handle that by wiggling it inside the buffer + if last_stored > value { + return self.push_and_slide(value); + } + } + self.buffer[self.in_index] = value; + self.in_index = (self.in_index + 1) % self.capacity; + true + } + + fn prune_lt(&mut self, value: u64) -> usize { + if self.is_empty() { + return 0; + } + let mut number_of_pruned = 0; + while self.in_index != self.out_index { + if self.buffer[self.out_index] >= value { + break; + } + self.out_index = (self.out_index + 1) % self.capacity; + number_of_pruned += 1; + } + number_of_pruned + } + + #[cfg(test)] + fn to_vec(&self) -> Vec { + let mut vec = Vec::new(); + let mut local_out = self.out_index; + while self.in_index != local_out { + vec.push(self.buffer[local_out]); + local_out = (local_out + 1) % self.capacity; + } + vec + } +} + +#[derive(Debug)] +pub(crate) struct RateLimiter { + /// window duration. + window_duration_in_ms: u64, + /// Log of unix epoch time in ms when requests were made. + recorder_timestamps: Buffer, +} + +impl RateLimiter { + //ctor + pub(crate) fn new( + max_requests: usize, + window_duration: TimeDiff, + ) -> Result { + if max_requests == 0 { + // We consider 0-max_requests as a misconfiguration + return Err(RateLimiterError::EmptyWindowNotAllowed); + } + let window_duration_in_ms = window_duration.millis(); + if window_duration_in_ms >= MAX_WINDOW_DURATION_MS { + return Err(RateLimiterError::WindowDurationTooLarge); + } + let window_duration_in_ms = window_duration.millis(); + if window_duration_in_ms == 0 { + return Err(RateLimiterError::WindowDurationTooSmall); + } + Ok(RateLimiter { + window_duration_in_ms, + recorder_timestamps: Buffer::new(max_requests), + }) + } + + pub(crate) fn throttle(&mut self) -> LimiterResponse { + self.internal_throttle(Timestamp::now().millis()) + } + + fn internal_throttle(&mut self, now: u64) -> LimiterResponse { + let is_full = self.recorder_timestamps.is_full(); + if !is_full { + self.recorder_timestamps.push(now); + return LimiterResponse::Allowed; + } else { + //The following subtraction could theoretically not fit in unsigned, but in real-life + // cases we limit the window duration to 1 hour (it's checked in ctor). So unless + // someone calls it from the perspective of 1970, it should be fine. + let no_of_pruned = self + .recorder_timestamps + .prune_lt(now - self.window_duration_in_ms); + if no_of_pruned == 0 { + //No pruning was done, so we are still at max_requests + return LimiterResponse::Throttled; + } + } + self.recorder_timestamps.push(now); + LimiterResponse::Allowed + } +} + +#[cfg(test)] +mod tests { + use casper_types::TimeDiff; + + use super::*; + + #[test] + fn sliding_window_should_validate_ctor_inputs() { + assert!(RateLimiter::new(0, TimeDiff::from_millis(1000)).is_err()); + assert!(RateLimiter::new(10, TimeDiff::from_millis(MAX_WINDOW_DURATION_MS + 1)).is_err()); + assert!(RateLimiter::new(10, TimeDiff::from_millis(0)).is_err()); + } + + #[test] + fn sliding_window_throttle_should_limit_requests() { + let mut rate_limiter = rate_limiter(); + let t_1 = 10000_u64; + let t_2 = 10002_u64; + let t_3 = 10003_u64; + + assert_eq!( + rate_limiter.internal_throttle(t_1), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_2), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_3), + LimiterResponse::Throttled + ); + } + + #[test] + fn sliding_window_throttle_should_limit_requests_on_burst() { + let mut rate_limiter = rate_limiter(); + let t_1 = 10000; + assert_eq!( + rate_limiter.internal_throttle(t_1), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_1), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_1), + LimiterResponse::Throttled + ); + } + + #[test] + fn sliding_window_should_slide_away_from_old_checks() { + let mut rate_limiter = rate_limiter(); + let t_1 = 10000_u64; + let t_2 = 10002_u64; + let t_3 = 11002_u64; + assert_eq!( + rate_limiter.internal_throttle(t_1), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_2), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_3), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_3), + LimiterResponse::Throttled + ); + } + + #[test] + fn sliding_window_should_take_past_timestamp() { + let mut rate_limiter = rate_limiter(); + let t_1 = 10000_u64; + let t_2 = 9999_u64; + let t_3 = 10001_u64; + assert_eq!( + rate_limiter.internal_throttle(t_1), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_2), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_3), + LimiterResponse::Throttled + ); + } + + #[test] + fn sliding_window_should_anneal_timestamp_from_past_() { + let mut rate_limiter = rate_limiter(); + let t_1 = 10000_u64; + let t_2 = 9999_u64; + let t_3 = 12001_u64; + let t_4 = 12002_u64; + assert_eq!( + rate_limiter.internal_throttle(t_1), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_2), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_3), + LimiterResponse::Allowed + ); + assert_eq!( + rate_limiter.internal_throttle(t_4), + LimiterResponse::Allowed + ); + } + + #[test] + fn buffer_should_saturate_with_values() { + let mut buffer = Buffer::new(3); + assert!(buffer.push(1)); + assert!(buffer.push(2)); + assert!(buffer.push(3)); + assert!(!buffer.push(4)); + assert_eq!(buffer.to_vec(), vec![1_u64, 2_u64, 3_u64]); + } + + #[test] + fn buffer_should_prune() { + let mut buffer = Buffer::new(3); + assert!(buffer.push(1)); + assert!(buffer.push(2)); + assert!(buffer.push(3)); + assert_eq!(buffer.prune_lt(3), 2); + assert!(buffer.push(4)); + assert_eq!(buffer.to_vec(), vec![3_u64, 4_u64]); + assert_eq!(buffer.prune_lt(5), 2); + + assert!(buffer.push(1)); + assert!(buffer.push(2)); + assert!(buffer.push(3)); + assert_eq!(buffer.prune_lt(5), 3); + assert!(buffer.to_vec().is_empty()); + + assert!(buffer.push(5)); + assert!(buffer.push(6)); + assert!(buffer.push(7)); + assert_eq!(buffer.to_vec(), vec![5, 6, 7]); + } + + #[test] + fn push_and_slide_should_keep_order() { + let mut buffer = Buffer::new(5); + assert!(buffer.push(1)); + assert!(buffer.push(2)); + assert!(buffer.push(7)); + assert!(buffer.push(6)); + assert_eq!(buffer.to_vec(), vec![1, 2, 6, 7]); + assert_eq!(buffer.prune_lt(7), 3); + assert_eq!(buffer.to_vec(), vec![7]); + + let mut buffer = Buffer::new(4); + assert!(buffer.push(2)); + assert!(buffer.push(8)); + assert!(buffer.push(5)); + assert!(buffer.push(1)); + assert_eq!(buffer.to_vec(), vec![1, 2, 5, 8]); + assert_eq!(buffer.prune_lt(5), 2); + assert_eq!(buffer.to_vec(), vec![5, 8]); + + let mut buffer = Buffer::new(4); + assert!(buffer.push(2)); + assert!(buffer.push(8)); + assert!(buffer.push(2)); + assert!(buffer.push(1)); + assert_eq!(buffer.to_vec(), vec![1, 2, 2, 8]); + + let mut buffer = Buffer::new(4); + assert!(buffer.push(2)); + assert!(buffer.push(8)); + assert!(buffer.push(3)); + assert!(buffer.push(1)); + assert_eq!(buffer.prune_lt(2), 1); + assert!(buffer.push(0)); + assert_eq!(buffer.to_vec(), vec![0, 2, 3, 8]); + + let mut buffer = Buffer::new(4); + assert!(buffer.push(8)); + assert!(buffer.push(7)); + assert!(buffer.push(6)); + assert!(buffer.push(5)); + assert_eq!(buffer.prune_lt(7), 2); + assert!(buffer.push(9)); + assert!(buffer.push(10)); + assert_eq!(buffer.prune_lt(9), 2); + assert!(buffer.push(11)); + assert!(buffer.push(1)); + assert_eq!(buffer.to_vec(), vec![1, 9, 10, 11]); + } + + fn rate_limiter() -> RateLimiter { + RateLimiter::new(2, TimeDiff::from_millis(1000)).unwrap() + } +} diff --git a/node/src/components/binary_port/tests.rs b/node/src/components/binary_port/tests.rs index 25a76127c4..f55c45d2a1 100644 --- a/node/src/components/binary_port/tests.rs +++ b/node/src/components/binary_port/tests.rs @@ -30,7 +30,7 @@ use prometheus::Registry; use thiserror::Error as ThisError; use casper_binary_port::ErrorCode; -use casper_types::{testing::TestRng, Chainspec, ChainspecRawBytes, ProtocolVersion}; +use casper_types::{testing::TestRng, Chainspec, ChainspecRawBytes}; use crate::{ components::{ @@ -164,8 +164,6 @@ async fn run_test_case( allow_request_get_trie, allow_request_speculative_exec, max_message_size_bytes: 1024, - client_request_limit: 2, - client_request_buffer_size: 16, max_connections: 2, ..Default::default() }; @@ -251,9 +249,6 @@ impl Reactor for MockReactor { self.binary_port.handle_event(effect_builder, rng, event), ), Event::ControlAnnouncement(_) => panic!("unexpected control announcement"), - Event::ReactorInfoRequest(ReactorInfoRequest::ProtocolVersion { responder }) => { - responder.respond(ProtocolVersion::V1_0_0).ignore() - } Event::ContractRuntimeRequest(_) | Event::ReactorInfoRequest(_) => { // We're only interested if the binary port actually created a request to Contract // Runtime component, but we're not interested in the result. diff --git a/node/src/effect.rs b/node/src/effect.rs index 2017752aff..336c20d19f 100644 --- a/node/src/effect.rs +++ b/node/src/effect.rs @@ -134,8 +134,8 @@ use casper_types::{ Approval, AvailableBlockRange, Block, BlockHash, BlockHeader, BlockSignatures, BlockSynchronizerStatus, BlockV2, ChainspecRawBytes, DeployHash, Digest, EntityAddr, EraId, ExecutionInfo, FinalitySignature, FinalitySignatureId, FinalitySignatureV2, Key, NextUpgrade, - Package, ProtocolUpgradeConfig, ProtocolVersion, PublicKey, TimeDiff, Timestamp, Transaction, - TransactionHash, TransactionHeader, TransactionId, Transfer, U512, + Package, ProtocolUpgradeConfig, PublicKey, TimeDiff, Timestamp, Transaction, TransactionHash, + TransactionHeader, TransactionId, Transfer, U512, }; use crate::{ @@ -1450,17 +1450,6 @@ impl EffectBuilder { .await } - pub(crate) async fn get_protocol_version(self) -> ProtocolVersion - where - REv: From, - { - self.make_request( - |responder| ReactorInfoRequest::ProtocolVersion { responder }, - QueueKind::Regular, - ) - .await - } - #[allow(unused)] pub(crate) async fn get_balance_holds_interval(self) -> TimeDiff where diff --git a/node/src/effect/requests.rs b/node/src/effect/requests.rs index c5f89403b2..edc4523186 100644 --- a/node/src/effect/requests.rs +++ b/node/src/effect/requests.rs @@ -34,8 +34,8 @@ use casper_types::{ execution::ExecutionResult, Approval, AvailableBlockRange, Block, BlockHash, BlockHeader, BlockSignatures, BlockSynchronizerStatus, BlockV2, ChainspecRawBytes, DeployHash, Digest, DisplayIter, EntityAddr, EraId, ExecutionInfo, FinalitySignature, FinalitySignatureId, Key, - NextUpgrade, ProtocolUpgradeConfig, ProtocolVersion, PublicKey, TimeDiff, Timestamp, - Transaction, TransactionHash, TransactionHeader, TransactionId, Transfer, + NextUpgrade, ProtocolUpgradeConfig, PublicKey, TimeDiff, Timestamp, Transaction, + TransactionHash, TransactionHeader, TransactionId, Transfer, }; use super::{AutoClosingResponder, GossipTarget, Responder}; @@ -1112,24 +1112,11 @@ impl Display for UpgradeWatcherRequest { #[derive(Debug, Serialize)] pub(crate) enum ReactorInfoRequest { - ReactorState { - responder: Responder, - }, - LastProgress { - responder: Responder, - }, - Uptime { - responder: Responder, - }, - NetworkName { - responder: Responder, - }, - ProtocolVersion { - responder: Responder, - }, - BalanceHoldsInterval { - responder: Responder, - }, + ReactorState { responder: Responder }, + LastProgress { responder: Responder }, + Uptime { responder: Responder }, + NetworkName { responder: Responder }, + BalanceHoldsInterval { responder: Responder }, } impl Display for ReactorInfoRequest { @@ -1142,7 +1129,6 @@ impl Display for ReactorInfoRequest { ReactorInfoRequest::LastProgress { .. } => "LastProgress", ReactorInfoRequest::Uptime { .. } => "Uptime", ReactorInfoRequest::NetworkName { .. } => "NetworkName", - ReactorInfoRequest::ProtocolVersion { .. } => "ProtocolVersion", ReactorInfoRequest::BalanceHoldsInterval { .. } => "BalanceHoldsInterval", } ) diff --git a/node/src/reactor/main_reactor.rs b/node/src/reactor/main_reactor.rs index b18b6fb784..0cd799f49f 100644 --- a/node/src/reactor/main_reactor.rs +++ b/node/src/reactor/main_reactor.rs @@ -261,9 +261,6 @@ impl reactor::Reactor for MainReactor { ReactorInfoRequest::NetworkName { responder } => responder .respond(NetworkName::new(self.chainspec.network_config.name.clone())) .ignore(), - ReactorInfoRequest::ProtocolVersion { responder } => responder - .respond(self.chainspec.protocol_version()) - .ignore(), ReactorInfoRequest::BalanceHoldsInterval { responder } => responder .respond(self.chainspec.core_config.gas_hold_interval) .ignore(), diff --git a/node/src/reactor/main_reactor/error.rs b/node/src/reactor/main_reactor/error.rs index 449cc7a65f..86a58a1a5b 100644 --- a/node/src/reactor/main_reactor/error.rs +++ b/node/src/reactor/main_reactor/error.rs @@ -5,8 +5,9 @@ use casper_types::{bytesrepr, crypto::ErrorExt as CryptoError}; use crate::{ components::{ - contract_runtime, contract_runtime::BlockExecutionError, diagnostics_port, network, - storage, upgrade_watcher, + binary_port::BinaryPortInitializationError, + contract_runtime::{self, BlockExecutionError}, + diagnostics_port, network, storage, upgrade_watcher, }, utils::{ListeningError, LoadError}, }; @@ -61,6 +62,10 @@ pub(crate) enum Error { /// Error while loading the signing key pair. #[error("signing key pair load error: {0}")] LoadSigningKeyPair(#[from] LoadError), + + /// `BinaryPort` component error. + #[error("binary port: {0}")] + BinaryPort(#[from] BinaryPortInitializationError), } impl From for Error { diff --git a/resources/local/config.toml b/resources/local/config.toml index 7e6d933350..f5b231d193 100644 --- a/resources/local/config.toml +++ b/resources/local/config.toml @@ -331,6 +331,10 @@ client_request_buffer_size = 20 # Maximum number of connections to the server. max_connections = 16 +# The global max rate of requests (per second) before they are limited. +# The implementation uses a sliding window algorithm. +qps_limit = 100 + # ============================================== # Configuration options for the REST HTTP server # ============================================== diff --git a/resources/production/config-example.toml b/resources/production/config-example.toml index d18a6d43d4..61a4a854ea 100644 --- a/resources/production/config-example.toml +++ b/resources/production/config-example.toml @@ -329,6 +329,10 @@ client_request_buffer_size = 16 # Maximum number of connections to the server. max_connections = 16 +# The global max rate of requests (per second) before they are limited. +# The implementation uses a sliding window algorithm. +qps_limit = 50 + # ============================================== # Configuration options for the REST HTTP server # ==============================================