From 6fb2fd1f0512c81578ea0d1984c48851e6c08b0d Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Fri, 11 Oct 2024 16:15:14 +0200 Subject: [PATCH] fix: fix the regression in XNET (#1992) We should be spawning and detaching tasks as soon as a new connection arrives and not only after the TLS handshake --- rs/http_endpoints/xnet/src/lib.rs | 120 +++++++++++++++++------------- 1 file changed, 67 insertions(+), 53 deletions(-) diff --git a/rs/http_endpoints/xnet/src/lib.rs b/rs/http_endpoints/xnet/src/lib.rs index 66e9acbd7b6..5c941b8be9f 100644 --- a/rs/http_endpoints/xnet/src/lib.rs +++ b/rs/http_endpoints/xnet/src/lib.rs @@ -14,7 +14,7 @@ use ic_metrics::{buckets::decimal_buckets, MetricsRegistry}; use ic_protobuf::messaging::xnet::v1 as pb; use ic_protobuf::proxy::ProtoProxy; use ic_types::{xnet::StreamIndex, PrincipalId, SubnetId}; -use prometheus::{Histogram, HistogramVec}; +use prometheus::{Histogram, HistogramVec, IntCounter}; use serde::Serialize; use std::convert::Infallible; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -36,11 +36,15 @@ pub struct XNetEndpointMetrics { pub slice_payload_size: Histogram, /// Status 200 response size in bytes, by resource. pub response_size: HistogramVec, + pub connections_total: IntCounter, + pub closed_connections_total: IntCounter, } const METRIC_REQUEST_DURATION: &str = "xnet_endpoint_request_duration_seconds"; const METRIC_SLICE_PAYLOAD_SIZE: &str = "xnet_endpoint_slice_payload_size_bytes"; const METRIC_RESPONSE_SIZE: &str = "xnet_endpoint_response_size_bytes"; +const METRIC_CONNECTIONS: &str = "xnet_endpoint_connections_total"; +const METRIC_CLOSED_CONNECTIONS: &str = "xnet_endpoint_closed_connections_total"; const RESOURCE_ERROR: &str = "error"; const RESOURCE_STREAM: &str = "stream"; @@ -72,6 +76,14 @@ impl XNetEndpointMetrics { decimal_buckets(1, 6), &["resource"], ), + connections_total: metrics_registry.int_counter( + METRIC_CONNECTIONS, + "Total number of accepted XNet TCP connections.", + ), + closed_connections_total: metrics_registry.int_counter( + METRIC_CLOSED_CONNECTIONS, + "Total number XNet closed connections.", + ), } } } @@ -198,8 +210,6 @@ fn start_server( let hyper_service = hyper::service::service_fn(move |request: Request| router.clone().call(request)); - let http = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new()); - let graceful_shutdown = GracefulShutdown::new(); let logger = log.clone(); @@ -208,62 +218,66 @@ fn start_server( loop { select! { Ok((stream, _peer_addr)) = listener.accept() => { + let metrics = metrics.clone(); + metrics.connections_total.inc(); let logger = logger.clone(); let hyper_service = hyper_service.clone(); - - #[cfg(test)] - { - // TLS is not used in tests. - let _ = tls; - let _ = registry_client; - - let io = TokioIo::new(stream); - let conn = http.serve_connection_with_upgrades(io, hyper_service); - let wrapped = graceful_shutdown.watch(conn.into_owned()); - tokio::spawn(async move { - if let Err(err) = wrapped.await { + let registry_client = registry_client.clone(); + let tls = tls.clone(); + tokio::spawn(async move { + let http = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new()); + + #[cfg(test)] + { + // TLS is not used in tests. + let _ = tls; + let _ = registry_client; + + let io = TokioIo::new(stream); + let conn = http.serve_connection(io, hyper_service); + if let Err(err) = conn.await { warn!(logger, "failed to serve connection: {err}"); } - }); - } - - #[cfg(not(test))] - { - // Creates a new TLS server config and uses it to accept the request. - let registry_version = registry_client.get_latest_version(); - let mut server_config = match tls.server_config( - ic_crypto_tls_interfaces::SomeOrAllNodes::All, - registry_version, - ) { - Ok(config) => config, - Err(err) => { - warn!(logger, "Failed to get server config from crypto {err}"); - return; - } - }; - - const ALPN_HTTP2: &[u8; 2] = b"h2"; - const ALPN_HTTP1_1: &[u8; 8] = b"http/1.1"; - server_config.alpn_protocols = vec![ALPN_HTTP2.to_vec(), ALPN_HTTP1_1.to_vec()]; - - let tls_acceptor = - tokio_rustls::TlsAcceptor::from(Arc::new(server_config)); - match tls_acceptor.accept(stream).await { - Ok(tls_stream) => { - let io = TokioIo::new(tls_stream); - let conn = http.serve_connection_with_upgrades(io, hyper_service); - let wrapped = graceful_shutdown.watch(conn.into_owned()); - tokio::spawn(async move { - if let Err(err) = wrapped.await { + } + + #[cfg(not(test))] + { + // Creates a new TLS server config and uses it to accept the request. + let registry_version = registry_client.get_latest_version(); + let mut server_config = match tls.server_config( + ic_crypto_tls_interfaces::SomeOrAllNodes::All, + registry_version, + ) { + Ok(config) => config, + Err(err) => { + warn!(logger, "Failed to get server config from crypto {err}"); + return; + } + }; + + const ALPN_HTTP2: &[u8; 2] = b"h2"; + const ALPN_HTTP1_1: &[u8; 8] = b"http/1.1"; + server_config.alpn_protocols = vec![ALPN_HTTP2.to_vec(), ALPN_HTTP1_1.to_vec()]; + + let tls_acceptor = + tokio_rustls::TlsAcceptor::from(Arc::new(server_config)); + match tls_acceptor.accept(stream).await { + Ok(tls_stream) => { + let io = TokioIo::new(tls_stream); + let conn = http.serve_connection(io, hyper_service); + if let Err(err) = conn.await { warn!(logger, "failed to serve connection: {err}"); + metrics.closed_connections_total.inc(); } - }); - } - Err(err) => { - warn!(logger, "Error setting up TLS stream: {err}"); - } - }; - } + } + Err(err) => { + warn!(logger, "Error setting up TLS stream: {err}"); + metrics.closed_connections_total.inc(); + + } + }; + } + }); } _ = shutdown_notify.notified() => { graceful_shutdown.shutdown().await;