Skip to content

Commit

Permalink
Merge #4917
Browse files Browse the repository at this point in the history
4917: Added QPS rate limiting mechanism to the binary port. The rate limiti… r=zajko a=zajko

…ng mechanism uses sliding window algorithm. Removed "client_request_buffer_size" and "client_request_limit" properties of the "binary_port_server" section. Added "qps_limit" property to said section.


Co-authored-by: Jakub Zajkowski <[email protected]>
  • Loading branch information
casperlabs-bors-ng[bot] and Jakub Zajkowski authored Oct 16, 2024
2 parents 874eae2 + da340ba commit 211cc8d
Show file tree
Hide file tree
Showing 11 changed files with 494 additions and 81 deletions.
5 changes: 5 additions & 0 deletions binary_port/src/error_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ pub enum ErrorCode {
/// Purse was not found for given identifier.
#[error("purse was not found for given identifier")]
PurseNotFound = 87,
/// Too many requests per second.
#[error("request was throttled")]
RequestThrottled = 88,
}

impl TryFrom<u16> for ErrorCode {
Expand Down Expand Up @@ -375,6 +378,7 @@ impl TryFrom<u16> for ErrorCode {
85 => Ok(ErrorCode::GasPriceToleranceTooLow),
86 => Ok(ErrorCode::ReceivedV1Transaction),
87 => Ok(ErrorCode::PurseNotFound),
88 => Ok(ErrorCode::RequestThrottled),
_ => Err(UnknownErrorCode),
}
}
Expand Down Expand Up @@ -536,6 +540,7 @@ mod tests {
"variant {} not covered by TryFrom<u16> implementation",
as_int
);
assert_eq!(decoded.unwrap(), variant);
}
}
}
92 changes: 80 additions & 12 deletions node/src/components/binary_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod config;
mod error;
mod event;
mod metrics;
mod rate_limiter;
#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -40,16 +41,18 @@ use casper_types::{
ContractWasmHash, Digest, EntityAddr, GlobalStateIdentifier, Key, Package, PackageAddr, Peers,
ProtocolVersion, Rewards, SignedBlock, StoredValue, TimeDiff, Transaction, URef,
};
use thiserror::Error as ThisError;

use datasize::DataSize;
use either::Either;
use futures::{SinkExt, StreamExt};
use once_cell::sync::OnceCell;
use prometheus::Registry;
use rate_limiter::{LimiterResponse, RateLimiter, RateLimiterError};
use tokio::{
join,
net::{TcpListener, TcpStream},
sync::{Notify, OwnedSemaphorePermit, Semaphore},
sync::{Mutex, Notify, OwnedSemaphorePermit, Semaphore},
};
use tokio_util::codec::Framed;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -81,6 +84,26 @@ pub(crate) use event::Event;

const COMPONENT_NAME: &str = "binary_port";

#[derive(Debug, ThisError)]
pub(crate) enum BinaryPortInitializationError {
#[error("could not initialize rate limiter: {0}")]
CannotInitializeRateLimiter(String),
#[error("could not initialize metrics: {0}")]
CannotInitializeMetrics(prometheus::Error),
}

impl From<RateLimiterError> for BinaryPortInitializationError {
fn from(value: RateLimiterError) -> Self {
BinaryPortInitializationError::CannotInitializeRateLimiter(value.to_string())
}
}

impl From<prometheus::Error> for BinaryPortInitializationError {
fn from(value: prometheus::Error) -> Self {
BinaryPortInitializationError::CannotInitializeMetrics(value)
}
}

#[derive(Debug, DataSize)]
pub(crate) struct BinaryPort {
#[data_size(skip)]
Expand All @@ -90,6 +113,8 @@ pub(crate) struct BinaryPort {
#[data_size(skip)]
chainspec: Arc<Chainspec>,
#[data_size(skip)]
protocol_version: ProtocolVersion,
#[data_size(skip)]
connection_limit: Arc<Semaphore>,
#[data_size(skip)]
metrics: Arc<Metrics>,
Expand All @@ -99,23 +124,32 @@ pub(crate) struct BinaryPort {
shutdown_trigger: Arc<Notify>,
#[data_size(skip)]
server_join_handle: OnceCell<tokio::task::JoinHandle<()>>,
#[data_size(skip)]
rate_limiter: Arc<Mutex<RateLimiter>>,
}

impl BinaryPort {
pub(crate) fn new(
config: Config,
chainspec: Arc<Chainspec>,
registry: &Registry,
) -> Result<Self, prometheus::Error> {
) -> Result<Self, BinaryPortInitializationError> {
let rate_limiter = Arc::new(Mutex::new(
RateLimiter::new(config.qps_limit, TimeDiff::from_seconds(1))
.map_err(BinaryPortInitializationError::from)?,
));
let protocol_version = chainspec.protocol_version();
Ok(Self {
state: ComponentState::Uninitialized,
connection_limit: Arc::new(Semaphore::new(config.max_connections)),
config: Arc::new(config),
chainspec,
metrics: Arc::new(Metrics::new(registry)?),
protocol_version,
metrics: Arc::new(Metrics::new(registry).map_err(BinaryPortInitializationError::from)?),
local_addr: Arc::new(OnceCell::new()),
shutdown_trigger: Arc::new(Notify::new()),
server_join_handle: OnceCell::new(),
rate_limiter,
})
}

Expand All @@ -134,6 +168,7 @@ async fn handle_request<REv>(
config: &Config,
chainspec: &Chainspec,
metrics: &Metrics,
protocol_version: ProtocolVersion,
) -> BinaryResponse
where
REv: From<Event>
Expand All @@ -148,7 +183,6 @@ where
+ From<ChainspecRawBytesRequest>
+ Send,
{
let protocol_version = effect_builder.get_protocol_version().await;
match req {
BinaryRequest::TryAcceptTransaction { transaction } => {
metrics.binary_port_try_accept_transaction_count.inc();
Expand Down Expand Up @@ -1396,6 +1430,8 @@ async fn handle_client_loop<REv>(
stream: TcpStream,
effect_builder: EffectBuilder<REv>,
max_message_size_bytes: u32,
rate_limiter: Arc<Mutex<RateLimiter>>,
version: ProtocolVersion,
) -> Result<(), Error>
where
REv: From<Event>
Expand Down Expand Up @@ -1423,8 +1459,8 @@ where
return Err(Error::NoPayload);
};

let version = effect_builder.get_protocol_version().await;
let (response, id) = handle_payload(effect_builder, payload, version).await;
let (response, id) =
handle_payload(effect_builder, payload, version, Arc::clone(&rate_limiter)).await;
framed
.send(BinaryMessage::new(
BinaryResponseAndRequest::new(response, payload, id).to_bytes()?,
Expand Down Expand Up @@ -1462,6 +1498,7 @@ async fn handle_payload<REv>(
effect_builder: EffectBuilder<REv>,
payload: &[u8],
protocol_version: ProtocolVersion,
rate_limiter: Arc<Mutex<RateLimiter>>,
) -> (BinaryResponse, u16)
where
REv: From<Event>,
Expand All @@ -1473,6 +1510,13 @@ where

let request_id = header.id();

if let LimiterResponse::Throttled = rate_limiter.lock().await.throttle() {
return (
BinaryResponse::new_error(ErrorCode::RequestThrottled, protocol_version),
request_id,
);
}

if !header
.protocol_version()
.is_compatible_with(&protocol_version)
Expand Down Expand Up @@ -1519,6 +1563,8 @@ async fn handle_client<REv>(
effect_builder: EffectBuilder<REv>,
config: Arc<Config>,
_permit: OwnedSemaphorePermit,
rate_limiter: Arc<Mutex<RateLimiter>>,
protocol_version: ProtocolVersion,
) where
REv: From<Event>
+ From<StorageRequest>
Expand All @@ -1532,8 +1578,14 @@ async fn handle_client<REv>(
+ From<ChainspecRawBytesRequest>
+ Send,
{
if let Err(err) =
handle_client_loop(stream, effect_builder, config.max_message_size_bytes).await
if let Err(err) = handle_client_loop(
stream,
effect_builder,
config.max_message_size_bytes,
rate_limiter,
protocol_version,
)
.await
{
// Low severity is used to prevent malicious clients from causing log floods.
info!(%addr, err=display_error(&err), "binary port client handler error");
Expand Down Expand Up @@ -1783,7 +1835,16 @@ where
if let Ok(permit) = Arc::clone(&self.connection_limit).try_acquire_owned() {
self.metrics.binary_port_connections_count.inc();
let config = Arc::clone(&self.config);
tokio::spawn(handle_client(peer, stream, effect_builder, config, permit));
let rate_limiter = Arc::clone(&self.rate_limiter);
tokio::spawn(handle_client(
peer,
stream,
effect_builder,
config,
permit,
rate_limiter,
self.protocol_version,
));
} else {
warn!(
"connection limit reached, dropping connection from {}",
Expand All @@ -1796,10 +1857,17 @@ where
let config = Arc::clone(&self.config);
let chainspec = Arc::clone(&self.chainspec);
let metrics = Arc::clone(&self.metrics);
let protocol_version = self.protocol_version;
async move {
let response =
handle_request(request, effect_builder, &config, &chainspec, &metrics)
.await;
let response = handle_request(
request,
effect_builder,
&config,
&chainspec,
&metrics,
protocol_version,
)
.await;
responder.respond(response).await;
}
.ignore()
Expand Down
15 changes: 5 additions & 10 deletions node/src/components/binary_port/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ use serde::{Deserialize, Serialize};
const DEFAULT_ADDRESS: &str = "0.0.0.0:0";
/// Default maximum message size.
const DEFAULT_MAX_MESSAGE_SIZE: u32 = 4 * 1024 * 1024;
/// Default request limit.
const DEFAULT_CLIENT_REQUEST_LIMIT: u16 = 3;
/// Default request buffer size.
const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 16;
/// Default maximum number of connections.
const DEFAULT_MAX_CONNECTIONS: usize = 16;
/// Default maximum number of requests per second.
const DEFAULT_MAX_QPS: usize = 100;

/// Binary port server configuration.
#[derive(Clone, DataSize, Debug, Deserialize, Serialize)]
Expand All @@ -31,12 +29,10 @@ pub struct Config {
pub allow_request_speculative_exec: bool,
/// Maximum size of the binary port message.
pub max_message_size_bytes: u32,
/// Maximum number of in-flight requests per client.
pub client_request_limit: u16,
/// Number of requests that can be buffered per client.
pub client_request_buffer_size: usize,
/// Maximum number of connections to the server.
pub max_connections: usize,
/// Maximum number of requests per second.
pub qps_limit: usize,
}

impl Config {
Expand All @@ -48,10 +44,9 @@ impl Config {
allow_request_get_all_values: false,
allow_request_get_trie: false,
allow_request_speculative_exec: false,
client_request_limit: DEFAULT_CLIENT_REQUEST_LIMIT,
max_message_size_bytes: DEFAULT_MAX_MESSAGE_SIZE,
client_request_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
max_connections: DEFAULT_MAX_CONNECTIONS,
qps_limit: DEFAULT_MAX_QPS,
}
}
}
Expand Down
Loading

0 comments on commit 211cc8d

Please sign in to comment.