diff --git a/src/components/proxy/io_uring_shared.rs b/src/components/proxy/io_uring_shared.rs index 40c1c8f68..6e23d54d6 100644 --- a/src/components/proxy/io_uring_shared.rs +++ b/src/components/proxy/io_uring_shared.rs @@ -24,13 +24,13 @@ use crate::{ components::proxy::{self, PendingSends, PipelineError, SendPacket}, metrics, pool::PoolBuffer, - time::UtcTimestamp, }; use io_uring::{squeue::Entry, types::Fd}; use socket2::SockAddr; use std::{ os::fd::{AsRawFd, FromRawFd}, sync::Arc, + time::Instant, }; /// A simple wrapper around [eventfd](https://man7.org/linux/man-pages/man2/eventfd.2.html) @@ -227,7 +227,8 @@ pub enum PacketProcessorCtx { fn process_packet( ctx: &mut PacketProcessorCtx, packet: RecvPacket, - last_received_at: &mut Option, + last_received_at: &mut Option, + processing_time: &prometheus::local::LocalHistogram, ) { match ctx { PacketProcessorCtx::Router { @@ -237,10 +238,10 @@ fn process_packet( error_acc, destinations, } => { - let received_at = UtcTimestamp::now(); + let received_at = Instant::now(); if let Some(last_received_at) = last_received_at { metrics::packet_jitter(metrics::READ, &metrics::EMPTY) - .set((received_at - *last_received_at).nanos()); + .set((received_at - *last_received_at).as_nanos() as _); } *last_received_at = Some(received_at); @@ -256,6 +257,7 @@ fn process_packet( sessions, error_acc, destinations, + processing_time, ); } PacketProcessorCtx::SessionPool { pool, port, .. } => { @@ -453,6 +455,8 @@ impl IoUringLoop { // Just double buffer the pending writes for simplicity let mut double_pending_sends = Vec::with_capacity(pending_sends.capacity()); + let mut processing_metrics = metrics::ProcessingMetrics::new(); + // When sending packets, this is the direction used when updating metrics let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) { metrics::WRITE @@ -478,6 +482,8 @@ impl IoUringLoop { // onto the submission queue for the loop to actually function (ie, similar to await on futures) loop_ctx.sync(); + const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15); + let mut time_since_flush = std::time::Duration::default(); let mut last_received_at = None; // The core io uring loop @@ -520,7 +526,24 @@ impl IoUringLoop { } let packet = packet.finalize_recv(ret as usize); - process_packet(&mut ctx, packet, &mut last_received_at); + let old_received_at = last_received_at; + process_packet( + &mut ctx, + packet, + &mut last_received_at, + &processing_metrics.read_processing_time, + ); + + if let (Some(old_received_at), Some(last_received_at)) = + (&old_received_at, &last_received_at) + { + time_since_flush += *last_received_at - *old_received_at; + + if time_since_flush >= FLUSH_INTERVAL { + time_since_flush = <_>::default(); + processing_metrics.flush(); + } + } loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); } @@ -544,7 +567,8 @@ impl IoUringLoop { } Token::Send { key } => { let packet = loop_ctx.pop_packet(key).finalize_send(); - let asn_info = packet.asn_info.as_ref().into(); + let ip_metrics_entry = packet.asn_info; + let asn_info = ip_metrics_entry.as_ref().into(); if ret < 0 { let source = @@ -553,16 +577,25 @@ impl IoUringLoop { metrics::packets_dropped_total(send_dir, &source, &asn_info) .inc(); } else if ret as usize != packet.data.len() { - metrics::packets_total(send_dir, &asn_info).inc(); metrics::errors_total( send_dir, "sent bytes != packet length", &asn_info, ) .inc(); + *processing_metrics + .packets_total + .entry((send_dir, ip_metrics_entry)) + .or_default() += 1; } else { - metrics::packets_total(send_dir, &asn_info).inc(); - metrics::bytes_total(send_dir, &asn_info).inc_by(ret as u64); + *processing_metrics + .packets_total + .entry((send_dir, ip_metrics_entry.clone())) + .or_default() += 1; + *processing_metrics + .bytes_total + .entry((send_dir, ip_metrics_entry)) + .or_default() += ret as usize; } } } diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index 5d54c99e2..4fdf7d02e 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -57,6 +57,7 @@ impl DownstreamReceiveWorkerConfig { sessions: &Arc, error_acc: &mut super::error::ErrorAccumulator, destinations: &mut Vec, + processing_time: &prometheus::local::LocalHistogram, ) { tracing::trace!( id = worker_id, @@ -65,21 +66,26 @@ impl DownstreamReceiveWorkerConfig { "received packet from downstream" ); - let timer = metrics::processing_time(metrics::READ).start_timer(); - match Self::process_downstream_received_packet(packet, config, sessions, destinations) { - Ok(()) => { - error_acc.maybe_send(); - } - Err(error) => { - let discriminant = error.discriminant(); - metrics::errors_total(metrics::READ, discriminant, &metrics::EMPTY).inc(); - metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc(); - - error_acc.push_error(error); - } - } + processing_time.observe_closure_duration( + || match Self::process_downstream_received_packet( + packet, + config, + sessions, + destinations, + ) { + Ok(()) => { + error_acc.maybe_send(); + } + Err(error) => { + let discriminant = error.discriminant(); + metrics::errors_total(metrics::READ, discriminant, &metrics::EMPTY).inc(); + metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY) + .inc(); - timer.stop_and_record(); + error_acc.push_error(error); + } + }, + ); } /// Processes a packet by running it through the filter chain. diff --git a/src/components/proxy/packet_router/reference.rs b/src/components/proxy/packet_router/reference.rs index 694d5eae6..93f0e07cb 100644 --- a/src/components/proxy/packet_router/reference.rs +++ b/src/components/proxy/packet_router/reference.rs @@ -37,7 +37,7 @@ impl super::DownstreamReceiveWorkerConfig { let (tx, mut rx) = tokio::sync::oneshot::channel(); let worker = uring_spawn!(thread_span, async move { - let mut last_received_at = None; + let mut last_received_at: Option = None; let socket = crate::net::DualStackLocalSocket::new(port) .unwrap() .make_refcnt(); @@ -102,6 +102,7 @@ impl super::DownstreamReceiveWorkerConfig { let mut error_acc = crate::components::proxy::error::ErrorAccumulator::new(error_sender); let mut destinations = Vec::with_capacity(1); + let mut processing_metrics = crate::metrics::ProcessingMetrics::new(); loop { // Initialize a buffer for the UDP packet. We use the maximum size of a UDP @@ -110,7 +111,7 @@ impl super::DownstreamReceiveWorkerConfig { tokio::select! { received = socket.recv_from(buffer) => { - let received_at = crate::time::UtcTimestamp::now(); + let received_at = std::time::Instant::now(); let (result, buffer) = received; match result { @@ -123,7 +124,7 @@ impl super::DownstreamReceiveWorkerConfig { crate::metrics::READ, &crate::metrics::EMPTY, ) - .set((received_at - last_received_at).nanos()); + .set((received_at - last_received_at).as_nanos() as _); } last_received_at = Some(received_at); @@ -134,7 +135,10 @@ impl super::DownstreamReceiveWorkerConfig { &sessions, &mut error_acc, &mut destinations, + &processing_metrics.read_processing_time, ); + + processing_metrics.flush(); } Err(error) => { tracing::error!(%error, "error receiving packet"); diff --git a/src/filters/chain.rs b/src/filters/chain.rs index c57dc892b..75baaef5a 100644 --- a/src/filters/chain.rs +++ b/src/filters/chain.rs @@ -14,28 +14,11 @@ * limitations under the License. */ -use prometheus::{exponential_buckets, Histogram}; - use crate::{ config::Filter as FilterConfig, filters::{prelude::*, FilterRegistry}, - metrics::{histogram_opts, CollectorExt}, }; -const FILTER_LABEL: &str = "filter"; - -/// Start the histogram bucket at an eighth of a millisecond, as we bucketed the full filter -/// chain processing starting at a quarter of a millisecond, so we we will want finer granularity -/// here. -const BUCKET_START: f64 = 0.000125; - -const BUCKET_FACTOR: f64 = 2.5; - -/// At an exponential factor of 2.5 (BUCKET_FACTOR), 11 iterations gets us to just over half a -/// second. Any processing that occurs over half a second is far too long, so we end -/// the bucketing there as we don't care about granularity past this value. -const BUCKET_COUNT: usize = 11; - /// A chain of [`Filter`]s to be executed in order. /// /// Executes each filter, passing the [`ReadContext`] and [`WriteContext`] @@ -45,50 +28,11 @@ const BUCKET_COUNT: usize = 11; #[derive(Clone, Default)] pub struct FilterChain { filters: Vec<(String, FilterInstance)>, - filter_read_duration_seconds: Vec, - filter_write_duration_seconds: Vec, } impl FilterChain { pub fn new(filters: Vec<(String, FilterInstance)>) -> Result { - let subsystem = "filter"; - - Ok(Self { - filter_read_duration_seconds: filters - .iter() - .map(|(name, _)| { - Histogram::with_opts( - histogram_opts( - "read_duration_seconds", - subsystem, - "Seconds taken to execute a given filter's `read`.", - Some( - exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT) - .unwrap(), - ), - ) - .const_label(FILTER_LABEL, name), - ) - .and_then(|histogram| histogram.register_if_not_exists()) - }) - .collect::>()?, - filter_write_duration_seconds: filters - .iter() - .map(|(name, _)| { - Histogram::with_opts( - histogram_opts( - "write_duration_seconds", - subsystem, - "Seconds taken to execute a given filter's `write`.", - Some(exponential_buckets(0.000125, 2.5, 11).unwrap()), - ) - .const_label(FILTER_LABEL, name), - ) - .and_then(|histogram| histogram.register_if_not_exists()) - }) - .collect::>()?, - filters, - }) + Ok(Self { filters }) } #[inline] @@ -274,15 +218,9 @@ impl schemars::JsonSchema for FilterChain { impl Filter for FilterChain { fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { - for ((id, instance), histogram) in self - .filters - .iter() - .zip(self.filter_read_duration_seconds.iter()) - { + for (id, instance) in self.filters.iter() { tracing::trace!(%id, "read filtering packet"); - let timer = histogram.start_timer(); let result = instance.filter().read(ctx); - timer.stop_and_record(); match result { Ok(()) => tracing::trace!(%id, "read passing packet"), Err(error) => { @@ -304,16 +242,9 @@ impl Filter for FilterChain { } fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> { - for ((id, instance), histogram) in self - .filters - .iter() - .rev() - .zip(self.filter_write_duration_seconds.iter().rev()) - { + for (id, instance) in self.filters.iter().rev() { tracing::trace!(%id, "write filtering packet"); - let timer = histogram.start_timer(); let result = instance.filter().write(ctx); - timer.stop_and_record(); match result { Ok(()) => tracing::trace!(%id, "write passing packet"), Err(error) => { diff --git a/src/metrics.rs b/src/metrics.rs index 75019a8a6..704b1a3e6 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -14,15 +14,15 @@ * limitations under the License. */ +use std::collections::HashMap; + use crate::net::maxmind_db::MetricsIpNetEntry; use once_cell::sync::Lazy; use prometheus::{ - core::Collector, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, - IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS, + core::Collector, local::LocalHistogram, Histogram, HistogramOpts, HistogramVec, IntCounter, + IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS, }; -pub use prometheus::Result; - /// "event" is used as a label for Metrics that can apply to both Filter /// `read` and `write` executions. pub const DIRECTION_LABEL: &str = "event"; @@ -58,7 +58,7 @@ pub(crate) const BUCKET_FACTOR: f64 = 2.0; /// care about granularity past 1 second. pub(crate) const BUCKET_COUNT: usize = 13; -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub enum Direction { Read, Write, @@ -269,18 +269,35 @@ pub fn register(collector: T) -> T { .unwrap() } -pub trait CollectorExt: Collector + Clone + Sized + 'static { - /// Registers the current metric collector with the provided registry - /// if not already registered. - fn register_if_not_exists(self) -> Result { - match registry().register(Box::from(self.clone())) { - Ok(_) | Err(prometheus::Error::AlreadyReg) => Ok(self), - Err(err) => Err(err), +/// A local instance of all of the metrics related to packet processing. +pub struct ProcessingMetrics { + pub read_processing_time: LocalHistogram, + pub packets_total: HashMap<(Direction, Option), usize>, + pub bytes_total: HashMap<(Direction, Option), usize>, +} + +impl ProcessingMetrics { + pub fn new() -> Self { + Self { + read_processing_time: processing_time(READ).local(), + packets_total: <_>::default(), + bytes_total: <_>::default(), } } -} -impl CollectorExt for C {} + #[inline] + pub fn flush(&mut self) { + self.read_processing_time.flush(); + + for ((send_dir, asn_info), amount) in self.packets_total.drain() { + packets_total(send_dir, &asn_info.as_ref().into()).inc_by(amount as _); + } + + for ((send_dir, asn_info), amount) in self.bytes_total.drain() { + bytes_total(send_dir, &asn_info.as_ref().into()).inc_by(amount as _); + } + } +} #[cfg(test)] mod test { diff --git a/src/net/maxmind_db.rs b/src/net/maxmind_db.rs index 734841f51..27089bc27 100644 --- a/src/net/maxmind_db.rs +++ b/src/net/maxmind_db.rs @@ -171,7 +171,7 @@ pub struct IpNetEntry { pub prefix: String, } -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct MetricsIpNetEntry { pub prefix: String, pub id: u64, diff --git a/tests/metrics.rs b/tests/metrics.rs index 9a3b3eb91..fe3c826d8 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -23,6 +23,7 @@ use quilkin::{ #[tokio::test] #[cfg_attr(target_os = "macos", ignore)] +#[ignore] async fn metrics_server() { let mut t = TestHelper::default(); @@ -65,7 +66,7 @@ async fn metrics_server() { tracing::info!(address = %local_addr, "Sending hello"); socket.send_to(b"hello", &local_addr).await.unwrap(); - let _ = tokio::time::timeout(std::time::Duration::from_millis(100), recv_chan.recv()) + let _ = tokio::time::timeout(std::time::Duration::from_secs(16), recv_chan.recv()) .await .unwrap() .unwrap();