Skip to content

Commit

Permalink
Added QPS rate limiting mechanism to the binary port. The rate limiti…
Browse files Browse the repository at this point in the history
…ng mechanism uses sliding window algorithm. Removed "client_request_buffer_size" and "client_request_limit" properties of the "binary_port_server" section. Added "qps_limit" property to said section.
  • Loading branch information
Jakub Zajkowski committed Oct 15, 2024
1 parent 4b0a4fe commit 265b2c6
Show file tree
Hide file tree
Showing 11 changed files with 496 additions and 67 deletions.
5 changes: 5 additions & 0 deletions binary_port/src/error_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ pub enum ErrorCode {
/// Purse was not found for given identifier.
#[error("purse was not found for given identifier")]
PurseNotFound = 87,
/// Too many requests per second.
#[error("request was throttled")]
TooManyRequests = 88,
}

impl TryFrom<u16> for ErrorCode {
Expand Down Expand Up @@ -375,6 +378,7 @@ impl TryFrom<u16> for ErrorCode {
85 => Ok(ErrorCode::GasPriceToleranceTooLow),
86 => Ok(ErrorCode::ReceivedV1Transaction),
87 => Ok(ErrorCode::PurseNotFound),
88 => Ok(ErrorCode::TooManyRequests),
_ => Err(UnknownErrorCode),
}
}
Expand Down Expand Up @@ -536,6 +540,7 @@ mod tests {
"variant {} not covered by TryFrom<u16> implementation",
as_int
);
assert_eq!(decoded.unwrap(), variant);
}
}
}
92 changes: 80 additions & 12 deletions node/src/components/binary_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod config;
mod error;
mod event;
mod metrics;
mod rate_limiter;
#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -40,16 +41,18 @@ use casper_types::{
ContractWasmHash, Digest, EntityAddr, GlobalStateIdentifier, Key, Package, PackageAddr, Peers,
ProtocolVersion, Rewards, SignedBlock, StoredValue, TimeDiff, Transaction, URef,
};
use thiserror::Error as ThisError;

use datasize::DataSize;
use either::Either;
use futures::{SinkExt, StreamExt};
use once_cell::sync::OnceCell;
use prometheus::Registry;
use rate_limiter::{LimiterResponse, RateLimiter, RateLimiterError};
use tokio::{
join,
net::{TcpListener, TcpStream},
sync::{Notify, OwnedSemaphorePermit, Semaphore},
sync::{Mutex, Notify, OwnedSemaphorePermit, Semaphore},
};
use tokio_util::codec::Framed;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -81,6 +84,26 @@ pub(crate) use event::Event;

const COMPONENT_NAME: &str = "binary_port";

#[derive(Debug, ThisError)]
pub(crate) enum BinaryPortInitializationError {
#[error("could not initialize rate limiter: {0}")]
CannotInitializeRateLimiter(String),
#[error("could not initialize metrics: {0}")]
CannotInitializeMetrics(prometheus::Error),
}

impl From<RateLimiterError> for BinaryPortInitializationError {
fn from(value: RateLimiterError) -> Self {
BinaryPortInitializationError::CannotInitializeRateLimiter(value.to_string())
}
}

impl From<prometheus::Error> for BinaryPortInitializationError {
fn from(value: prometheus::Error) -> Self {
BinaryPortInitializationError::CannotInitializeMetrics(value)
}
}

#[derive(Debug, DataSize)]
pub(crate) struct BinaryPort {
#[data_size(skip)]
Expand All @@ -90,6 +113,8 @@ pub(crate) struct BinaryPort {
#[data_size(skip)]
chainspec: Arc<Chainspec>,
#[data_size(skip)]
protocol_version: ProtocolVersion,
#[data_size(skip)]
connection_limit: Arc<Semaphore>,
#[data_size(skip)]
metrics: Arc<Metrics>,
Expand All @@ -99,23 +124,32 @@ pub(crate) struct BinaryPort {
shutdown_trigger: Arc<Notify>,
#[data_size(skip)]
server_join_handle: OnceCell<tokio::task::JoinHandle<()>>,
#[data_size(skip)]
rate_limiter: Arc<Mutex<RateLimiter>>,
}

impl BinaryPort {
pub(crate) fn new(
config: Config,
chainspec: Arc<Chainspec>,
registry: &Registry,
) -> Result<Self, prometheus::Error> {
) -> Result<Self, BinaryPortInitializationError> {
let rate_limiter = Arc::new(Mutex::new(
RateLimiter::new(config.qps_limit, TimeDiff::from_seconds(1))
.map_err(BinaryPortInitializationError::from)?,
));
let protocol_version = chainspec.protocol_version();
Ok(Self {
state: ComponentState::Uninitialized,
connection_limit: Arc::new(Semaphore::new(config.max_connections)),
config: Arc::new(config),
chainspec,
metrics: Arc::new(Metrics::new(registry)?),
protocol_version,
metrics: Arc::new(Metrics::new(registry).map_err(BinaryPortInitializationError::from)?),
local_addr: Arc::new(OnceCell::new()),
shutdown_trigger: Arc::new(Notify::new()),
server_join_handle: OnceCell::new(),
rate_limiter,
})
}

Expand All @@ -134,6 +168,7 @@ async fn handle_request<REv>(
config: &Config,
chainspec: &Chainspec,
metrics: &Metrics,
protocol_version: ProtocolVersion,
) -> BinaryResponse
where
REv: From<Event>
Expand All @@ -148,7 +183,6 @@ where
+ From<ChainspecRawBytesRequest>
+ Send,
{
let protocol_version = effect_builder.get_protocol_version().await;
match req {
BinaryRequest::TryAcceptTransaction { transaction } => {
metrics.binary_port_try_accept_transaction_count.inc();
Expand Down Expand Up @@ -1396,6 +1430,8 @@ async fn handle_client_loop<REv>(
stream: TcpStream,
effect_builder: EffectBuilder<REv>,
max_message_size_bytes: u32,
rate_limiter: Arc<Mutex<RateLimiter>>,
version: ProtocolVersion,
) -> Result<(), Error>
where
REv: From<Event>
Expand Down Expand Up @@ -1423,8 +1459,8 @@ where
return Err(Error::NoPayload);
};

let version = effect_builder.get_protocol_version().await;
let (response, id) = handle_payload(effect_builder, payload, version).await;
let (response, id) =
handle_payload(effect_builder, payload, version, Arc::clone(&rate_limiter)).await;
framed
.send(BinaryMessage::new(
BinaryResponseAndRequest::new(response, payload, id).to_bytes()?,
Expand Down Expand Up @@ -1462,6 +1498,7 @@ async fn handle_payload<REv>(
effect_builder: EffectBuilder<REv>,
payload: &[u8],
protocol_version: ProtocolVersion,
rate_limiter: Arc<Mutex<RateLimiter>>,
) -> (BinaryResponse, u16)
where
REv: From<Event>,
Expand All @@ -1473,6 +1510,13 @@ where

let request_id = header.id();

if let LimiterResponse::Throttled = rate_limiter.lock().await.throttle() {
return (
BinaryResponse::new_error(ErrorCode::TooManyRequests, protocol_version),
request_id,
);
}

if !header
.protocol_version()
.is_compatible_with(&protocol_version)
Expand Down Expand Up @@ -1519,6 +1563,8 @@ async fn handle_client<REv>(
effect_builder: EffectBuilder<REv>,
config: Arc<Config>,
_permit: OwnedSemaphorePermit,
rate_limiter: Arc<Mutex<RateLimiter>>,
protocol_version: ProtocolVersion,
) where
REv: From<Event>
+ From<StorageRequest>
Expand All @@ -1532,8 +1578,14 @@ async fn handle_client<REv>(
+ From<ChainspecRawBytesRequest>
+ Send,
{
if let Err(err) =
handle_client_loop(stream, effect_builder, config.max_message_size_bytes).await
if let Err(err) = handle_client_loop(
stream,
effect_builder,
config.max_message_size_bytes,
rate_limiter,
protocol_version,
)
.await
{
// Low severity is used to prevent malicious clients from causing log floods.
info!(%addr, err=display_error(&err), "binary port client handler error");
Expand Down Expand Up @@ -1783,7 +1835,16 @@ where
if let Ok(permit) = Arc::clone(&self.connection_limit).try_acquire_owned() {
self.metrics.binary_port_connections_count.inc();
let config = Arc::clone(&self.config);
tokio::spawn(handle_client(peer, stream, effect_builder, config, permit));
let rate_limiter = Arc::clone(&self.rate_limiter);
tokio::spawn(handle_client(
peer,
stream,
effect_builder,
config,
permit,
rate_limiter,
self.protocol_version,
));
} else {
warn!(
"connection limit reached, dropping connection from {}",
Expand All @@ -1796,10 +1857,17 @@ where
let config = Arc::clone(&self.config);
let chainspec = Arc::clone(&self.chainspec);
let metrics = Arc::clone(&self.metrics);
let protocol_version = self.protocol_version;
async move {
let response =
handle_request(request, effect_builder, &config, &chainspec, &metrics)
.await;
let response = handle_request(
request,
effect_builder,
&config,
&chainspec,
&metrics,
protocol_version,
)
.await;
responder.respond(response).await;
}
.ignore()
Expand Down
15 changes: 5 additions & 10 deletions node/src/components/binary_port/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ use serde::{Deserialize, Serialize};
const DEFAULT_ADDRESS: &str = "0.0.0.0:0";
/// Default maximum message size.
const DEFAULT_MAX_MESSAGE_SIZE: u32 = 4 * 1024 * 1024;
/// Default request limit.
const DEFAULT_CLIENT_REQUEST_LIMIT: u16 = 3;
/// Default request buffer size.
const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 16;
/// Default maximum number of connections.
const DEFAULT_MAX_CONNECTIONS: usize = 16;
/// Default maximum number of requests per second.
const DEFAULT_MAX_QPS: usize = 100;

/// Binary port server configuration.
#[derive(Clone, DataSize, Debug, Deserialize, Serialize)]
Expand All @@ -31,12 +29,10 @@ pub struct Config {
pub allow_request_speculative_exec: bool,
/// Maximum size of the binary port message.
pub max_message_size_bytes: u32,
/// Maximum number of in-flight requests per client.
pub client_request_limit: u16,
/// Number of requests that can be buffered per client.
pub client_request_buffer_size: usize,
/// Maximum number of connections to the server.
pub max_connections: usize,
/// Maximum number of requests per second.
pub qps_limit: usize,
}

impl Config {
Expand All @@ -48,10 +44,9 @@ impl Config {
allow_request_get_all_values: false,
allow_request_get_trie: false,
allow_request_speculative_exec: false,
client_request_limit: DEFAULT_CLIENT_REQUEST_LIMIT,
max_message_size_bytes: DEFAULT_MAX_MESSAGE_SIZE,
client_request_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
max_connections: DEFAULT_MAX_CONNECTIONS,
qps_limit: DEFAULT_MAX_QPS,
}
}
}
Expand Down
Loading

0 comments on commit 265b2c6

Please sign in to comment.