From a955ef84c4a249e746f766bbf952e4c4cb97de91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20H=C3=A4ggblad?= Date: Fri, 27 Oct 2023 17:32:25 +0200 Subject: [PATCH] wip: on_message --- Cargo.lock | 2 + common/wireguard/src/tun_task_channel.rs | 4 +- service-providers/ip-forwarder/Cargo.toml | 2 + service-providers/ip-forwarder/src/error.rs | 10 ++- service-providers/ip-forwarder/src/lib.rs | 80 ++++++++++++++++++--- 5 files changed, 84 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e8c4d2078..f13682b709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6648,6 +6648,7 @@ dependencies = [ name = "nym-ip-forwarder" version = "0.1.0" dependencies = [ + "etherparse", "futures", "log", "nym-bin-common", @@ -6661,6 +6662,7 @@ dependencies = [ "nym-wireguard-types", "serde", "serde_json", + "tap", "thiserror", "tokio", ] diff --git a/common/wireguard/src/tun_task_channel.rs b/common/wireguard/src/tun_task_channel.rs index 1cbd6985da..8928aa6049 100644 --- a/common/wireguard/src/tun_task_channel.rs +++ b/common/wireguard/src/tun_task_channel.rs @@ -7,7 +7,7 @@ pub struct TunTaskTx(mpsc::Sender); pub(crate) struct TunTaskRx(mpsc::Receiver); impl TunTaskTx { - pub(crate) async fn send( + pub async fn send( &self, data: TunTaskPayload, ) -> Result<(), tokio::sync::mpsc::error::SendError> { @@ -40,7 +40,7 @@ impl TunTaskResponseTx { } impl TunTaskResponseRx { - pub(crate) async fn recv(&mut self) -> Option { + pub async fn recv(&mut self) -> Option { self.0.recv().await } } diff --git a/service-providers/ip-forwarder/Cargo.toml b/service-providers/ip-forwarder/Cargo.toml index e5cecd07ee..2225ef49b5 100644 --- a/service-providers/ip-forwarder/Cargo.toml +++ b/service-providers/ip-forwarder/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true license.workspace = true [dependencies] +etherparse = "0.13.0" futures = { workspace = true } log = { workspace = true } nym-bin-common = { path = "../../common/bin-common" } @@ -22,5 +23,6 @@ nym-wireguard = { path = "../../common/wireguard" } nym-wireguard-types = { path = "../../common/wireguard-types" } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +tap.workspace = true thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] } diff --git a/service-providers/ip-forwarder/src/error.rs b/service-providers/ip-forwarder/src/error.rs index 7a0268fb61..1fb4be6ad6 100644 --- a/service-providers/ip-forwarder/src/error.rs +++ b/service-providers/ip-forwarder/src/error.rs @@ -12,7 +12,7 @@ pub enum IpForwarderError { FailedToLoadConfig(String), // TODO: add more details here - #[error("Failed to validate the loaded config")] + #[error("failed to validate the loaded config")] ConfigValidationFailure, #[error("failed local version check, client and config mismatch")] @@ -26,4 +26,12 @@ pub enum IpForwarderError { #[error("the entity wrapping the network requester has disconnected")] DisconnectedParent, + + #[error("failed to parse incoming packet: {source}")] + PacketParseFailed { + source: etherparse::ReadError, + }, + + #[error("parsed packet is missing IP header")] + PacketMissingHeader, } diff --git a/service-providers/ip-forwarder/src/lib.rs b/service-providers/ip-forwarder/src/lib.rs index ca4f0481c1..0456d4004f 100644 --- a/service-providers/ip-forwarder/src/lib.rs +++ b/service-providers/ip-forwarder/src/lib.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::{net::IpAddr, path::Path}; use error::IpForwarderError; use futures::{channel::oneshot, StreamExt}; @@ -6,9 +6,13 @@ use nym_client_core::{ client::mix_traffic::transceiver::GatewayTransceiver, config::disk_persistence::CommonClientPaths, HardcodedTopologyProvider, TopologyProvider, }; -use nym_sdk::{mixnet::Recipient, NymNetworkDetails}; +use nym_sdk::{ + mixnet::{InputMessage, MixnetMessageSender, Recipient}, + NymNetworkDetails, +}; use nym_sphinx::receiver::ReconstructedMessage; use nym_task::{TaskClient, TaskHandle}; +use tap::TapFallible; use crate::config::BaseClientConfig; @@ -115,7 +119,7 @@ impl IpForwarderBuilder { let (tun, tun_task_tx, tun_task_response_rx) = nym_wireguard::tun_device::TunDevice::new(None); - let ip_forwarder_service = IpForwarderService { + let ip_forwarder_service = IpForwarder { config: self.config, tun, tun_task_tx, @@ -138,7 +142,7 @@ impl IpForwarderBuilder { } } -struct IpForwarderService { +struct IpForwarder { config: Config, tun: nym_wireguard::tun_device::TunDevice, tun_task_tx: nym_wireguard::tun_task_channel::TunTaskTx, @@ -147,7 +151,7 @@ struct IpForwarderService { task_handle: TaskHandle, } -impl IpForwarderService { +impl IpForwarder { async fn run(mut self) -> Result<(), IpForwarderError> { let mut task_client = self.task_handle.fork("main_loop"); while !task_client.is_shutdown() { @@ -155,21 +159,75 @@ impl IpForwarderService { _ = task_client.recv() => { log::debug!("IpForwarderService [main loop]: received shutdown"); }, - msg = self.mixnet_client.next() => match msg { - Some(msg) => self.on_message(msg).await, - None => { + msg = self.mixnet_client.next() => { + if let Some(msg) = msg { + self.on_message(msg).await.ok(); + } else { + log::trace!("IpForwarderService [main loop]: stopping since channel closed"); + break; + }; + }, + packet = self.tun_task_response_rx.recv() => { + if let Some((tag, packet)) = msg { + let input_message = InputMessage { + }; + self.mixnet_client + .send(input_message) + .await + .tap_err(|err| { + log::error!("IpForwarderService [main loop]: failed to send packet to mixnet: {err}"); + }) + .ok(); + } else { log::trace!("IpForwarderService [main loop]: stopping since channel closed"); break; - }, + } } + } } log::info!("IpForwarderService: stopping"); Ok(()) } - async fn on_message(&mut self, reconstructed: ReconstructedMessage) { - todo!(); + async fn on_message( + &mut self, + reconstructed: ReconstructedMessage, + ) -> Result<(), IpForwarderError> { + log::info!("Received message: {:?}", reconstructed.sender_tag); + + let headers = etherparse::SlicedPacket::from_ip(&reconstructed.message).map_err(|err| { + log::warn!("Received non-IP packet: {err}"); + IpForwarderError::PacketParseFailed { source: err } + })?; + + let (src_addr, dst_addr): (IpAddr, IpAddr) = match headers.ip { + Some(etherparse::InternetSlice::Ipv4(ipv4_header, _)) => ( + ipv4_header.source_addr().into(), + ipv4_header.destination_addr().into(), + ), + Some(etherparse::InternetSlice::Ipv6(ipv6_header, _)) => ( + ipv6_header.source_addr().into(), + ipv6_header.destination_addr().into(), + ), + None => { + log::warn!("Received non-IP packet"); + return Err(IpForwarderError::PacketMissingHeader); + } + }; + log::info!("Received packet: {src_addr} -> {dst_addr}"); + + // TODO: set the tag correctly. Can we just reuse sender_tag? + let tag = 0; + self.tun_task_tx + .send((tag, reconstructed.message)) + .await + .tap_err(|err| { + log::error!("Failed to send packet to tun device: {err}"); + }) + .ok(); + + Ok(()) } }