Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Use local histogram for processing time #1044

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use crate::{
components::proxy::{self, PendingSends, PipelineError, SendPacket},
metrics,
pool::PoolBuffer,
time::UtcTimestamp,
};
use io_uring::{squeue::Entry, types::Fd};
use socket2::SockAddr;
use std::{
os::fd::{AsRawFd, FromRawFd},
sync::Arc,
time::Instant,
};

/// A simple wrapper around [eventfd](https://man7.org/linux/man-pages/man2/eventfd.2.html)
Expand Down Expand Up @@ -227,7 +227,8 @@ pub enum PacketProcessorCtx {
fn process_packet(
ctx: &mut PacketProcessorCtx,
packet: RecvPacket,
last_received_at: &mut Option<UtcTimestamp>,
last_received_at: &mut Option<Instant>,
processing_time: &prometheus::local::LocalHistogram,
) {
match ctx {
PacketProcessorCtx::Router {
Expand All @@ -237,10 +238,10 @@ fn process_packet(
error_acc,
destinations,
} => {
let received_at = UtcTimestamp::now();
let received_at = Instant::now();
if let Some(last_received_at) = last_received_at {
metrics::packet_jitter(metrics::READ, &metrics::EMPTY)
.set((received_at - *last_received_at).nanos());
.set((received_at - *last_received_at).as_nanos() as _);
}
*last_received_at = Some(received_at);

Expand All @@ -256,6 +257,7 @@ fn process_packet(
sessions,
error_acc,
destinations,
processing_time,
);
}
PacketProcessorCtx::SessionPool { pool, port, .. } => {
Expand Down Expand Up @@ -453,6 +455,8 @@ impl IoUringLoop {
// Just double buffer the pending writes for simplicity
let mut double_pending_sends = Vec::with_capacity(pending_sends.capacity());

let mut processing_metrics = metrics::ProcessingMetrics::new();

// When sending packets, this is the direction used when updating metrics
let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) {
metrics::WRITE
Expand All @@ -478,6 +482,8 @@ impl IoUringLoop {
// onto the submission queue for the loop to actually function (ie, similar to await on futures)
loop_ctx.sync();

const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
let mut time_since_flush = std::time::Duration::default();
let mut last_received_at = None;

// The core io uring loop
Expand Down Expand Up @@ -520,7 +526,24 @@ impl IoUringLoop {
}

let packet = packet.finalize_recv(ret as usize);
process_packet(&mut ctx, packet, &mut last_received_at);
let old_received_at = last_received_at;
process_packet(
&mut ctx,
packet,
&mut last_received_at,
&processing_metrics.read_processing_time,
);

if let (Some(old_received_at), Some(last_received_at)) =
(&old_received_at, &last_received_at)
{
time_since_flush += *last_received_at - *old_received_at;

if time_since_flush >= FLUSH_INTERVAL {
time_since_flush = <_>::default();
processing_metrics.flush();
}
}

loop_ctx.enqueue_recv(buffer_pool.clone().alloc());
}
Expand All @@ -544,7 +567,8 @@ impl IoUringLoop {
}
Token::Send { key } => {
let packet = loop_ctx.pop_packet(key).finalize_send();
let asn_info = packet.asn_info.as_ref().into();
let ip_metrics_entry = packet.asn_info;
let asn_info = ip_metrics_entry.as_ref().into();

if ret < 0 {
let source =
Expand All @@ -553,16 +577,25 @@ impl IoUringLoop {
metrics::packets_dropped_total(send_dir, &source, &asn_info)
.inc();
} else if ret as usize != packet.data.len() {
metrics::packets_total(send_dir, &asn_info).inc();
metrics::errors_total(
send_dir,
"sent bytes != packet length",
&asn_info,
)
.inc();
*processing_metrics
.packets_total
.entry((send_dir, ip_metrics_entry))
.or_default() += 1;
} else {
metrics::packets_total(send_dir, &asn_info).inc();
metrics::bytes_total(send_dir, &asn_info).inc_by(ret as u64);
*processing_metrics
.packets_total
.entry((send_dir, ip_metrics_entry.clone()))
.or_default() += 1;
*processing_metrics
.bytes_total
.entry((send_dir, ip_metrics_entry))
.or_default() += ret as usize;
}
}
}
Expand Down
34 changes: 20 additions & 14 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl DownstreamReceiveWorkerConfig {
sessions: &Arc<SessionPool>,
error_acc: &mut super::error::ErrorAccumulator,
destinations: &mut Vec<crate::net::EndpointAddress>,
processing_time: &prometheus::local::LocalHistogram,
) {
tracing::trace!(
id = worker_id,
Expand All @@ -65,21 +66,26 @@ impl DownstreamReceiveWorkerConfig {
"received packet from downstream"
);

let timer = metrics::processing_time(metrics::READ).start_timer();
match Self::process_downstream_received_packet(packet, config, sessions, destinations) {
Ok(()) => {
error_acc.maybe_send();
}
Err(error) => {
let discriminant = error.discriminant();
metrics::errors_total(metrics::READ, discriminant, &metrics::EMPTY).inc();
metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc();

error_acc.push_error(error);
}
}
processing_time.observe_closure_duration(
|| match Self::process_downstream_received_packet(
packet,
config,
sessions,
destinations,
) {
Ok(()) => {
error_acc.maybe_send();
}
Err(error) => {
let discriminant = error.discriminant();
metrics::errors_total(metrics::READ, discriminant, &metrics::EMPTY).inc();
metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY)
.inc();

timer.stop_and_record();
error_acc.push_error(error);
}
},
);
}

/// Processes a packet by running it through the filter chain.
Expand Down
10 changes: 7 additions & 3 deletions src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl super::DownstreamReceiveWorkerConfig {
let (tx, mut rx) = tokio::sync::oneshot::channel();

let worker = uring_spawn!(thread_span, async move {
let mut last_received_at = None;
let mut last_received_at: Option<std::time::Instant> = None;
let socket = crate::net::DualStackLocalSocket::new(port)
.unwrap()
.make_refcnt();
Expand Down Expand Up @@ -102,6 +102,7 @@ impl super::DownstreamReceiveWorkerConfig {
let mut error_acc =
crate::components::proxy::error::ErrorAccumulator::new(error_sender);
let mut destinations = Vec::with_capacity(1);
let mut processing_metrics = crate::metrics::ProcessingMetrics::new();

loop {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
Expand All @@ -110,7 +111,7 @@ impl super::DownstreamReceiveWorkerConfig {

tokio::select! {
received = socket.recv_from(buffer) => {
let received_at = crate::time::UtcTimestamp::now();
let received_at = std::time::Instant::now();
let (result, buffer) = received;

match result {
Expand All @@ -123,7 +124,7 @@ impl super::DownstreamReceiveWorkerConfig {
crate::metrics::READ,
&crate::metrics::EMPTY,
)
.set((received_at - last_received_at).nanos());
.set((received_at - last_received_at).as_nanos() as _);
}
last_received_at = Some(received_at);

Expand All @@ -134,7 +135,10 @@ impl super::DownstreamReceiveWorkerConfig {
&sessions,
&mut error_acc,
&mut destinations,
&processing_metrics.read_processing_time,
);

processing_metrics.flush();
}
Err(error) => {
tracing::error!(%error, "error receiving packet");
Expand Down
75 changes: 3 additions & 72 deletions src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,11 @@
* limitations under the License.
*/

use prometheus::{exponential_buckets, Histogram};

use crate::{
config::Filter as FilterConfig,
filters::{prelude::*, FilterRegistry},
metrics::{histogram_opts, CollectorExt},
};

const FILTER_LABEL: &str = "filter";

/// Start the histogram bucket at an eighth of a millisecond, as we bucketed the full filter
/// chain processing starting at a quarter of a millisecond, so we we will want finer granularity
/// here.
const BUCKET_START: f64 = 0.000125;

const BUCKET_FACTOR: f64 = 2.5;

/// At an exponential factor of 2.5 (BUCKET_FACTOR), 11 iterations gets us to just over half a
/// second. Any processing that occurs over half a second is far too long, so we end
/// the bucketing there as we don't care about granularity past this value.
const BUCKET_COUNT: usize = 11;

/// A chain of [`Filter`]s to be executed in order.
///
/// Executes each filter, passing the [`ReadContext`] and [`WriteContext`]
Expand All @@ -45,50 +28,11 @@ const BUCKET_COUNT: usize = 11;
#[derive(Clone, Default)]
pub struct FilterChain {
filters: Vec<(String, FilterInstance)>,
filter_read_duration_seconds: Vec<Histogram>,
filter_write_duration_seconds: Vec<Histogram>,
}

impl FilterChain {
pub fn new(filters: Vec<(String, FilterInstance)>) -> Result<Self, CreationError> {
let subsystem = "filter";

Ok(Self {
filter_read_duration_seconds: filters
.iter()
.map(|(name, _)| {
Histogram::with_opts(
histogram_opts(
"read_duration_seconds",
subsystem,
"Seconds taken to execute a given filter's `read`.",
Some(
exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT)
.unwrap(),
),
)
.const_label(FILTER_LABEL, name),
)
.and_then(|histogram| histogram.register_if_not_exists())
})
.collect::<Result<_, prometheus::Error>>()?,
filter_write_duration_seconds: filters
.iter()
.map(|(name, _)| {
Histogram::with_opts(
histogram_opts(
"write_duration_seconds",
subsystem,
"Seconds taken to execute a given filter's `write`.",
Some(exponential_buckets(0.000125, 2.5, 11).unwrap()),
)
.const_label(FILTER_LABEL, name),
)
.and_then(|histogram| histogram.register_if_not_exists())
})
.collect::<Result<_, prometheus::Error>>()?,
filters,
})
Ok(Self { filters })
}

#[inline]
Expand Down Expand Up @@ -274,15 +218,9 @@ impl schemars::JsonSchema for FilterChain {

impl Filter for FilterChain {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
for ((id, instance), histogram) in self
.filters
.iter()
.zip(self.filter_read_duration_seconds.iter())
{
for (id, instance) in self.filters.iter() {
tracing::trace!(%id, "read filtering packet");
let timer = histogram.start_timer();
let result = instance.filter().read(ctx);
timer.stop_and_record();
match result {
Ok(()) => tracing::trace!(%id, "read passing packet"),
Err(error) => {
Expand All @@ -304,16 +242,9 @@ impl Filter for FilterChain {
}

fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
for ((id, instance), histogram) in self
.filters
.iter()
.rev()
.zip(self.filter_write_duration_seconds.iter().rev())
{
for (id, instance) in self.filters.iter().rev() {
tracing::trace!(%id, "write filtering packet");
let timer = histogram.start_timer();
let result = instance.filter().write(ctx);
timer.stop_and_record();
match result {
Ok(()) => tracing::trace!(%id, "write passing packet"),
Err(error) => {
Expand Down
Loading
Loading