Skip to content

Commit

Permalink
fix: fix the regression in XNET (#1992)
Browse files Browse the repository at this point in the history
We should be spawning and detaching tasks as soon as a new connection
arrives and not only after the TLS handshake
  • Loading branch information
rumenov authored and DFINITYManu committed Oct 11, 2024
1 parent 5b82b0e commit 6fb2fd1
Showing 1 changed file with 67 additions and 53 deletions.
120 changes: 67 additions & 53 deletions rs/http_endpoints/xnet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ic_metrics::{buckets::decimal_buckets, MetricsRegistry};
use ic_protobuf::messaging::xnet::v1 as pb;
use ic_protobuf::proxy::ProtoProxy;
use ic_types::{xnet::StreamIndex, PrincipalId, SubnetId};
use prometheus::{Histogram, HistogramVec};
use prometheus::{Histogram, HistogramVec, IntCounter};
use serde::Serialize;
use std::convert::Infallible;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
Expand All @@ -36,11 +36,15 @@ pub struct XNetEndpointMetrics {
pub slice_payload_size: Histogram,
/// Status 200 response size in bytes, by resource.
pub response_size: HistogramVec,
pub connections_total: IntCounter,
pub closed_connections_total: IntCounter,
}

const METRIC_REQUEST_DURATION: &str = "xnet_endpoint_request_duration_seconds";
const METRIC_SLICE_PAYLOAD_SIZE: &str = "xnet_endpoint_slice_payload_size_bytes";
const METRIC_RESPONSE_SIZE: &str = "xnet_endpoint_response_size_bytes";
const METRIC_CONNECTIONS: &str = "xnet_endpoint_connections_total";
const METRIC_CLOSED_CONNECTIONS: &str = "xnet_endpoint_closed_connections_total";

const RESOURCE_ERROR: &str = "error";
const RESOURCE_STREAM: &str = "stream";
Expand Down Expand Up @@ -72,6 +76,14 @@ impl XNetEndpointMetrics {
decimal_buckets(1, 6),
&["resource"],
),
connections_total: metrics_registry.int_counter(
METRIC_CONNECTIONS,
"Total number of accepted XNet TCP connections.",
),
closed_connections_total: metrics_registry.int_counter(
METRIC_CLOSED_CONNECTIONS,
"Total number XNet closed connections.",
),
}
}
}
Expand Down Expand Up @@ -198,8 +210,6 @@ fn start_server(
let hyper_service =
hyper::service::service_fn(move |request: Request<Incoming>| router.clone().call(request));

let http = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new());

let graceful_shutdown = GracefulShutdown::new();

let logger = log.clone();
Expand All @@ -208,62 +218,66 @@ fn start_server(
loop {
select! {
Ok((stream, _peer_addr)) = listener.accept() => {
let metrics = metrics.clone();
metrics.connections_total.inc();
let logger = logger.clone();
let hyper_service = hyper_service.clone();

#[cfg(test)]
{
// TLS is not used in tests.
let _ = tls;
let _ = registry_client;

let io = TokioIo::new(stream);
let conn = http.serve_connection_with_upgrades(io, hyper_service);
let wrapped = graceful_shutdown.watch(conn.into_owned());
tokio::spawn(async move {
if let Err(err) = wrapped.await {
let registry_client = registry_client.clone();
let tls = tls.clone();
tokio::spawn(async move {
let http = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new());

#[cfg(test)]
{
// TLS is not used in tests.
let _ = tls;
let _ = registry_client;

let io = TokioIo::new(stream);
let conn = http.serve_connection(io, hyper_service);
if let Err(err) = conn.await {
warn!(logger, "failed to serve connection: {err}");
}
});
}

#[cfg(not(test))]
{
// Creates a new TLS server config and uses it to accept the request.
let registry_version = registry_client.get_latest_version();
let mut server_config = match tls.server_config(
ic_crypto_tls_interfaces::SomeOrAllNodes::All,
registry_version,
) {
Ok(config) => config,
Err(err) => {
warn!(logger, "Failed to get server config from crypto {err}");
return;
}
};

const ALPN_HTTP2: &[u8; 2] = b"h2";
const ALPN_HTTP1_1: &[u8; 8] = b"http/1.1";
server_config.alpn_protocols = vec![ALPN_HTTP2.to_vec(), ALPN_HTTP1_1.to_vec()];

let tls_acceptor =
tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
match tls_acceptor.accept(stream).await {
Ok(tls_stream) => {
let io = TokioIo::new(tls_stream);
let conn = http.serve_connection_with_upgrades(io, hyper_service);
let wrapped = graceful_shutdown.watch(conn.into_owned());
tokio::spawn(async move {
if let Err(err) = wrapped.await {
}

#[cfg(not(test))]
{
// Creates a new TLS server config and uses it to accept the request.
let registry_version = registry_client.get_latest_version();
let mut server_config = match tls.server_config(
ic_crypto_tls_interfaces::SomeOrAllNodes::All,
registry_version,
) {
Ok(config) => config,
Err(err) => {
warn!(logger, "Failed to get server config from crypto {err}");
return;
}
};

const ALPN_HTTP2: &[u8; 2] = b"h2";
const ALPN_HTTP1_1: &[u8; 8] = b"http/1.1";
server_config.alpn_protocols = vec![ALPN_HTTP2.to_vec(), ALPN_HTTP1_1.to_vec()];

let tls_acceptor =
tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
match tls_acceptor.accept(stream).await {
Ok(tls_stream) => {
let io = TokioIo::new(tls_stream);
let conn = http.serve_connection(io, hyper_service);
if let Err(err) = conn.await {
warn!(logger, "failed to serve connection: {err}");
metrics.closed_connections_total.inc();
}
});
}
Err(err) => {
warn!(logger, "Error setting up TLS stream: {err}");
}
};
}
}
Err(err) => {
warn!(logger, "Error setting up TLS stream: {err}");
metrics.closed_connections_total.inc();

}
};
}
});
}
_ = shutdown_notify.notified() => {
graceful_shutdown.shutdown().await;
Expand Down

0 comments on commit 6fb2fd1

Please sign in to comment.