Skip to content

Commit

Permalink
refactor: Rename PendingSends to PacketQueue, move PacketQueue and io…
Browse files Browse the repository at this point in the history
…-uring to net module
  • Loading branch information
XAMPPRocky committed Jan 9, 2025
1 parent c81df40 commit b78fed8
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 162 deletions.
2 changes: 1 addition & 1 deletion src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl Proxy {
to: self.to,
to_tokens,
num_workers,
socket,
socket: Some(socket),
qcmp,
phoenix,
notifier: None,
Expand Down
166 changes: 50 additions & 116 deletions src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,80 +18,6 @@ mod error;
pub mod packet_router;
mod sessions;

cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
pub(crate) mod io_uring_shared;
pub(crate) type PacketSendReceiver = io_uring_shared::EventFd;
pub(crate) type PacketSendSender = io_uring_shared::EventFdWriter;
} else {
pub(crate) type PacketSendReceiver = tokio::sync::watch::Receiver<bool>;
pub(crate) type PacketSendSender = tokio::sync::watch::Sender<bool>;
}
}

/// A simple packet queue that signals when a packet is pushed
///
/// For io_uring this notifies an eventfd that will be processed on the next
/// completion loop
#[derive(Clone)]
pub struct PendingSends {
packets: Arc<parking_lot::Mutex<Vec<SendPacket>>>,
notify: PacketSendSender,
}

impl PendingSends {
pub fn new(capacity: usize) -> std::io::Result<(Self, PacketSendReceiver)> {
#[cfg(target_os = "linux")]
let (notify, rx) = {
let rx = io_uring_shared::EventFd::new()?;
(rx.writer(), rx)
};
#[cfg(not(target_os = "linux"))]
let (notify, rx) = tokio::sync::watch::channel(true);

Ok((
Self {
packets: Arc::new(parking_lot::Mutex::new(Vec::with_capacity(capacity))),
notify,
},
rx,
))
}

#[inline]
pub(crate) fn capacity(&self) -> usize {
self.packets.lock().capacity()
}

/// Pushes a packet onto the queue to be sent, signalling a sender that
/// it's available
#[inline]
pub(crate) fn push(&self, packet: SendPacket) {
self.packets.lock().push(packet);
#[cfg(target_os = "linux")]
self.notify.write(1);
#[cfg(not(target_os = "linux"))]
let _ = self.notify.send(true);
}

/// Called to shutdown the consumer side of the sends (ie the io loop that is
/// actually dequing and sending packets)
#[inline]
pub(crate) fn shutdown_receiver(&self) {
#[cfg(target_os = "linux")]
self.notify.write(0xdeadbeef);
#[cfg(not(target_os = "linux"))]
let _ = self.notify.send(false);
}

/// Swaps the current queue with an empty one so we only lock for a pointer swap
#[inline]
pub fn swap(&self, mut swap: Vec<SendPacket>) -> Vec<SendPacket> {
swap.clear();
std::mem::replace(&mut self.packets.lock(), swap)
}
}

use super::RunArgs;
pub use error::{ErrorMap, PipelineError};
pub use sessions::SessionPool;
Expand All @@ -103,20 +29,6 @@ use std::{
},
};

pub struct SendPacket {
/// The destination address of the packet
pub destination: socket2::SockAddr,
/// The packet data being sent
pub data: crate::pool::FrozenPoolBuffer,
/// The asn info for the sender, used for metrics
pub asn_info: Option<crate::net::maxmind_db::MetricsIpNetEntry>,
}

pub struct RecvPacket {
pub source: SocketAddr,
pub data: crate::pool::PoolBuffer,
}

#[derive(Clone, Debug)]
pub struct Ready {
pub idle_request_interval: std::time::Duration,
Expand Down Expand Up @@ -156,7 +68,7 @@ pub struct Proxy {
pub management_servers: Vec<tonic::transport::Endpoint>,
pub to: Vec<SocketAddr>,
pub to_tokens: Option<ToTokens>,
pub socket: socket2::Socket,
pub socket: Option<socket2::Socket>,
pub qcmp: socket2::Socket,
pub phoenix: crate::net::TcpListener,
pub notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
Expand All @@ -173,7 +85,7 @@ impl Default for Proxy {
management_servers: Vec::new(),
to: Vec::new(),
to_tokens: None,
socket: crate::net::raw_socket_with_reuse(0).unwrap(),
socket: Some(crate::net::raw_socket_with_reuse(0).unwrap()),
qcmp,
phoenix,
notifier: None,
Expand All @@ -183,15 +95,16 @@ impl Default for Proxy {

impl Proxy {
pub async fn run(
self,
mut self,
RunArgs {
config,
ready,
mut shutdown_rx,
}: RunArgs<Ready>,
initialized: Option<tokio::sync::oneshot::Sender<()>>,
) -> crate::Result<()> {
let _mmdb_task = self.mmdb.map(|source| {
let _mmdb_task = self.mmdb.as_ref().map(|source| {
let source = source.clone();
tokio::spawn(async move {
while let Err(error) =
tryhard::retry_fn(|| crate::MaxmindDb::update(source.clone()))
Expand All @@ -205,7 +118,7 @@ impl Proxy {
});

if !self.to.is_empty() {
let endpoints = if let Some(tt) = self.to_tokens {
let endpoints = if let Some(tt) = &self.to_tokens {
let (unique, overflow) = 256u64.overflowing_pow(tt.length as _);
if overflow {
panic!(
Expand Down Expand Up @@ -355,28 +268,7 @@ impl Proxy {
.expect("failed to spawn proxy-subscription thread");
}

let num_workers = self.num_workers.get();
let buffer_pool = Arc::new(crate::pool::BufferPool::new(num_workers, 2 * 1024));

let mut worker_sends = Vec::with_capacity(num_workers);
let mut session_sends = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
let psends = PendingSends::new(15)?;
session_sends.push(psends.0.clone());
worker_sends.push(psends);
}

let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone());

packet_router::spawn_receivers(
config.clone(),
self.socket,
worker_sends,
&sessions,
buffer_pool,
)
.await?;

let router_shutdown = self.spawn_packet_router(config.clone()).await?;
crate::codec::qcmp::spawn(self.qcmp, shutdown_rx.clone())?;
crate::net::phoenix::spawn(
self.phoenix,
Expand All @@ -395,8 +287,50 @@ impl Proxy {
.await
.map_err(|error| eyre::eyre!(error))?;

sessions.shutdown(*shutdown_rx.borrow() == crate::ShutdownKind::Normal);
(router_shutdown)(shutdown_rx);

Ok(())
}

pub async fn spawn_packet_router(
&mut self,
config: Arc<crate::config::Config>,
) -> eyre::Result<impl FnOnce(crate::ShutdownRx)> {
self.spawn_user_space_router(config).await
}

/// Launches the user space implementation of the packet router using
/// sockets. This implementation uses a pool of buffers and sockets to
/// manage UDP sessions and sockets. On Linux this will use io-uring, where
/// as it will use epoll interfaces on non-Linux platforms.
pub async fn spawn_user_space_router(
&mut self,
config: Arc<crate::config::Config>,
) -> eyre::Result<impl FnOnce(crate::ShutdownRx)> {
let workers = self.num_workers.get();
let buffer_pool = Arc::new(crate::pool::BufferPool::new(workers, 2 * 1024));

let mut worker_sends = Vec::with_capacity(workers);
let mut session_sends = Vec::with_capacity(workers);
for _ in 0..workers {
let queue = crate::net::queue(15)?;
session_sends.push(queue.0.clone());
worker_sends.push(queue);
}

let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone());

packet_router::spawn_receivers(
config,
self.socket.take().unwrap(),
worker_sends,
&sessions,
buffer_pool,
)
.await?;

Ok(move |shutdown_rx: crate::ShutdownRx| {
sessions.shutdown(*shutdown_rx.borrow() == crate::ShutdownKind::Normal);
})
}
}
6 changes: 3 additions & 3 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl<P: PacketMut> DownstreamPacket<P> {

/// Represents the required arguments to run a worker task that
/// processes packets received downstream.
pub struct DownstreamReceiveWorkerConfig {
pub(crate) struct DownstreamReceiveWorkerConfig {
/// ID of the worker.
pub worker_id: usize,
pub port: u16,
Expand All @@ -158,10 +158,10 @@ pub struct DownstreamReceiveWorkerConfig {
/// This function also spawns the set of worker tasks responsible for consuming packets
/// off the aforementioned queue and processing them through the filter chain and session
/// pipeline.
pub async fn spawn_receivers(
pub(crate) async fn spawn_receivers(
config: Arc<Config>,
socket: socket2::Socket,
worker_sends: Vec<(super::PendingSends, super::PacketSendReceiver)>,
worker_sends: Vec<crate::net::PacketQueue>,
sessions: &Arc<SessionPool>,
buffer_pool: Arc<crate::pool::BufferPool>,
) -> crate::Result<()> {
Expand Down
11 changes: 4 additions & 7 deletions src/components/proxy/packet_router/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ use crate::components::proxy;
use eyre::Context as _;

impl super::DownstreamReceiveWorkerConfig {
pub async fn spawn(
self,
pending_sends: (proxy::PendingSends, proxy::PacketSendReceiver),
) -> eyre::Result<()> {
use crate::components::proxy::io_uring_shared;
pub async fn spawn(self, pending_sends: crate::net::PacketQueue) -> eyre::Result<()> {
use crate::net::io_uring;

let Self {
worker_id,
Expand All @@ -36,11 +33,11 @@ impl super::DownstreamReceiveWorkerConfig {
let socket =
crate::net::DualStackLocalSocket::new(port).context("failed to bind socket")?;

let io_loop = io_uring_shared::IoUringLoop::new(2000, socket)?;
let io_loop = io_uring::IoUringLoop::new(2000, socket)?;
io_loop
.spawn(
format!("packet-router-{worker_id}"),
io_uring_shared::PacketProcessorCtx::Router {
io_uring::PacketProcessorCtx::Router {
config,
sessions,
error_acc: super::super::error::ErrorAccumulator::new(error_sender),
Expand Down
13 changes: 4 additions & 9 deletions src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@

//! The reference implementation is used for non-Linux targets
use crate::components::proxy;

impl super::DownstreamReceiveWorkerConfig {
pub async fn spawn(
self,
pending_sends: (proxy::PendingSends, proxy::PacketSendReceiver),
) -> eyre::Result<()> {
pub async fn spawn(self, packet_queue: crate::net::PacketQueue) -> eyre::Result<()> {
let Self {
worker_id,
port,
Expand All @@ -47,16 +42,16 @@ impl super::DownstreamReceiveWorkerConfig {
let send_socket = socket.clone();

let inner_task = async move {
let (pending_sends, mut sends_rx) = pending_sends;
let mut sends_double_buffer = Vec::with_capacity(pending_sends.capacity());
let (packet_queue, mut sends_rx) = packet_queue;
let mut sends_double_buffer = Vec::with_capacity(packet_queue.capacity());

while sends_rx.changed().await.is_ok() {
if !*sends_rx.borrow() {
tracing::trace!("io loop shutdown requested");
break;
}

sends_double_buffer = pending_sends.swap(sends_double_buffer);
sends_double_buffer = packet_queue.swap(sends_double_buffer);

for packet in sends_double_buffer.drain(..sends_double_buffer.len()) {
let (result, _) = send_socket
Expand Down
Loading

0 comments on commit b78fed8

Please sign in to comment.