diff --git a/Cargo.lock b/Cargo.lock index 938ab2606d..8e8c4d2078 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6657,9 +6657,12 @@ dependencies = [ "nym-service-providers-common", "nym-sphinx", "nym-task", + "nym-wireguard", + "nym-wireguard-types", "serde", "serde_json", "thiserror", + "tokio", ] [[package]] diff --git a/common/wireguard/src/lib.rs b/common/wireguard/src/lib.rs index dc226c56fd..bd5aa47b4f 100644 --- a/common/wireguard/src/lib.rs +++ b/common/wireguard/src/lib.rs @@ -11,7 +11,7 @@ mod packet_relayer; mod platform; mod registered_peers; mod setup; -mod tun_task_channel; +pub mod tun_task_channel; mod udp_listener; mod wg_tunnel; @@ -20,7 +20,7 @@ use std::sync::Arc; // Currently the module related to setting up the virtual network device is platform specific. #[cfg(target_os = "linux")] -use platform::linux::tun_device; +pub use platform::linux::tun_device; /// Start wireguard UDP listener and TUN device /// @@ -39,7 +39,8 @@ pub async fn start_wireguard( let peers_by_tag = Arc::new(tokio::sync::Mutex::new(wg_tunnel::PeersByTag::new())); // Start the tun device that is used to relay traffic outbound - let (tun, tun_task_tx, tun_task_response_rx) = tun_device::TunDevice::new(peers_by_ip.clone()); + let (tun, tun_task_tx, tun_task_response_rx) = + tun_device::TunDevice::new(Some(peers_by_ip.clone())); tun.start(); // If we want to have the tun device on a separate host, it's the tun_task and diff --git a/common/wireguard/src/platform/linux/mod.rs b/common/wireguard/src/platform/linux/mod.rs index ebe0ba212c..fdf1de229d 100644 --- a/common/wireguard/src/platform/linux/mod.rs +++ b/common/wireguard/src/platform/linux/mod.rs @@ -1 +1 @@ -pub(crate) mod tun_device; +pub mod tun_device; diff --git a/common/wireguard/src/platform/linux/tun_device.rs b/common/wireguard/src/platform/linux/tun_device.rs index 348abdb609..1bf84eb52d 100644 --- a/common/wireguard/src/platform/linux/tun_device.rs +++ b/common/wireguard/src/platform/linux/tun_device.rs @@ -43,7 +43,7 @@ pub struct TunDevice { tun_task_response_tx: TunTaskResponseTx, // The routing table, as how wireguard does it - peers_by_ip: Arc>, + peers_by_ip: Option>>, // This is an alternative to the routing table, where we just match outgoing source IP with // incoming destination IP. @@ -52,7 +52,7 @@ pub struct TunDevice { impl TunDevice { pub fn new( - peers_by_ip: Arc>, + peers_by_ip: Option>>, ) -> (Self, TunTaskTx, TunTaskResponseRx) { let tun = setup_tokio_tun_device( format!("{TUN_BASE_NAME}%d").as_str(), @@ -123,7 +123,7 @@ impl TunDevice { // This is how wireguard does it, by consulting the AllowedIPs table. if false { - let peers = self.peers_by_ip.lock().await; + let peers = self.peers_by_ip.as_ref().unwrap().lock().await; if let Some(peer_tx) = peers.longest_match(dst_addr).map(|(_, tx)| tx) { log::info!("Forward packet to wg tunnel"); peer_tx diff --git a/service-providers/ip-forwarder/Cargo.toml b/service-providers/ip-forwarder/Cargo.toml index bb88978f18..e5cecd07ee 100644 --- a/service-providers/ip-forwarder/Cargo.toml +++ b/service-providers/ip-forwarder/Cargo.toml @@ -18,6 +18,9 @@ nym-sdk = { path = "../../sdk/rust/nym-sdk" } nym-service-providers-common = { path = "../common" } nym-sphinx = { path = "../../common/nymsphinx" } nym-task = { path = "../../common/task" } +nym-wireguard = { path = "../../common/wireguard" } +nym-wireguard-types = { path = "../../common/wireguard-types" } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] } diff --git a/service-providers/ip-forwarder/src/lib.rs b/service-providers/ip-forwarder/src/lib.rs index 9aadf5f50a..ca4f0481c1 100644 --- a/service-providers/ip-forwarder/src/lib.rs +++ b/service-providers/ip-forwarder/src/lib.rs @@ -1,12 +1,13 @@ use std::path::Path; use error::IpForwarderError; -use futures::channel::oneshot; +use futures::{channel::oneshot, StreamExt}; use nym_client_core::{ client::mix_traffic::transceiver::GatewayTransceiver, config::disk_persistence::CommonClientPaths, HardcodedTopologyProvider, TopologyProvider, }; use nym_sdk::{mixnet::Recipient, NymNetworkDetails}; +use nym_sphinx::receiver::ReconstructedMessage; use nym_task::{TaskClient, TaskHandle}; use crate::config::BaseClientConfig; @@ -95,12 +96,12 @@ impl IpForwarderBuilder { pub async fn run_service_provider(self) -> Result<(), IpForwarderError> { // Used to notify tasks to shutdown. Not all tasks fully supports this (yet). - let shutdown: TaskHandle = self.shutdown.map(Into::into).unwrap_or_default(); + let task_handle: TaskHandle = self.shutdown.map(Into::into).unwrap_or_default(); // Connect to the mixnet let mixnet_client = create_mixnet_client( &self.config.base, - shutdown.get_handle().named("nym_sdk::MixnetClient"), + task_handle.get_handle().named("nym_sdk::MixnetClient"), self.custom_gateway_transceiver, self.custom_topology_provider, self.wait_for_gateway, @@ -110,6 +111,19 @@ impl IpForwarderBuilder { let self_address = *mixnet_client.nym_address(); + // Create the TUN device that we interact with the rest of the world with + let (tun, tun_task_tx, tun_task_response_rx) = + nym_wireguard::tun_device::TunDevice::new(None); + + let ip_forwarder_service = IpForwarderService { + config: self.config, + tun, + tun_task_tx, + tun_task_response_rx, + mixnet_client, + task_handle, + }; + log::info!("The address of this client is: {self_address}"); log::info!("All systems go. Press CTRL-C to stop the server."); @@ -120,6 +134,41 @@ impl IpForwarderBuilder { } } + ip_forwarder_service.run().await + } +} + +struct IpForwarderService { + config: Config, + tun: nym_wireguard::tun_device::TunDevice, + tun_task_tx: nym_wireguard::tun_task_channel::TunTaskTx, + tun_task_response_rx: nym_wireguard::tun_task_channel::TunTaskResponseRx, + mixnet_client: nym_sdk::mixnet::MixnetClient, + task_handle: TaskHandle, +} + +impl IpForwarderService { + async fn run(mut self) -> Result<(), IpForwarderError> { + let mut task_client = self.task_handle.fork("main_loop"); + while !task_client.is_shutdown() { + tokio::select! { + _ = task_client.recv() => { + log::debug!("IpForwarderService [main loop]: received shutdown"); + }, + msg = self.mixnet_client.next() => match msg { + Some(msg) => self.on_message(msg).await, + None => { + log::trace!("IpForwarderService [main loop]: stopping since channel closed"); + break; + }, + } + } + } + log::info!("IpForwarderService: stopping"); + Ok(()) + } + + async fn on_message(&mut self, reconstructed: ReconstructedMessage) { todo!(); } }