Skip to content

Commit

Permalink
Create IpForwarderService
Browse files Browse the repository at this point in the history
  • Loading branch information
octol committed Oct 27, 2023
1 parent 6daca7f commit 46e2a74
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 10 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions common/wireguard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod packet_relayer;
mod platform;
mod registered_peers;
mod setup;
mod tun_task_channel;
pub mod tun_task_channel;
mod udp_listener;
mod wg_tunnel;

Expand All @@ -20,7 +20,7 @@ use std::sync::Arc;

// Currently the module related to setting up the virtual network device is platform specific.
#[cfg(target_os = "linux")]
use platform::linux::tun_device;
pub use platform::linux::tun_device;

/// Start wireguard UDP listener and TUN device
///
Expand All @@ -39,7 +39,8 @@ pub async fn start_wireguard(
let peers_by_tag = Arc::new(tokio::sync::Mutex::new(wg_tunnel::PeersByTag::new()));

// Start the tun device that is used to relay traffic outbound
let (tun, tun_task_tx, tun_task_response_rx) = tun_device::TunDevice::new(peers_by_ip.clone());
let (tun, tun_task_tx, tun_task_response_rx) =
tun_device::TunDevice::new(Some(peers_by_ip.clone()));
tun.start();

// If we want to have the tun device on a separate host, it's the tun_task and
Expand Down
2 changes: 1 addition & 1 deletion common/wireguard/src/platform/linux/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub(crate) mod tun_device;
pub mod tun_device;
6 changes: 3 additions & 3 deletions common/wireguard/src/platform/linux/tun_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct TunDevice {
tun_task_response_tx: TunTaskResponseTx,

// The routing table, as how wireguard does it
peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>,
peers_by_ip: Option<Arc<tokio::sync::Mutex<PeersByIp>>>,

// This is an alternative to the routing table, where we just match outgoing source IP with
// incoming destination IP.
Expand All @@ -52,7 +52,7 @@ pub struct TunDevice {

impl TunDevice {
pub fn new(
peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>,
peers_by_ip: Option<Arc<tokio::sync::Mutex<PeersByIp>>>,
) -> (Self, TunTaskTx, TunTaskResponseRx) {
let tun = setup_tokio_tun_device(
format!("{TUN_BASE_NAME}%d").as_str(),
Expand Down Expand Up @@ -123,7 +123,7 @@ impl TunDevice {

// This is how wireguard does it, by consulting the AllowedIPs table.
if false {
let peers = self.peers_by_ip.lock().await;
let peers = self.peers_by_ip.as_ref().unwrap().lock().await;
if let Some(peer_tx) = peers.longest_match(dst_addr).map(|(_, tx)| tx) {
log::info!("Forward packet to wg tunnel");
peer_tx
Expand Down
3 changes: 3 additions & 0 deletions service-providers/ip-forwarder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ nym-sdk = { path = "../../sdk/rust/nym-sdk" }
nym-service-providers-common = { path = "../common" }
nym-sphinx = { path = "../../common/nymsphinx" }
nym-task = { path = "../../common/task" }
nym-wireguard = { path = "../../common/wireguard" }
nym-wireguard-types = { path = "../../common/wireguard-types" }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
55 changes: 52 additions & 3 deletions service-providers/ip-forwarder/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::path::Path;

use error::IpForwarderError;
use futures::channel::oneshot;
use futures::{channel::oneshot, StreamExt};
use nym_client_core::{
client::mix_traffic::transceiver::GatewayTransceiver,
config::disk_persistence::CommonClientPaths, HardcodedTopologyProvider, TopologyProvider,
};
use nym_sdk::{mixnet::Recipient, NymNetworkDetails};
use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::{TaskClient, TaskHandle};

use crate::config::BaseClientConfig;
Expand Down Expand Up @@ -95,12 +96,12 @@ impl IpForwarderBuilder {

pub async fn run_service_provider(self) -> Result<(), IpForwarderError> {
// Used to notify tasks to shutdown. Not all tasks fully supports this (yet).
let shutdown: TaskHandle = self.shutdown.map(Into::into).unwrap_or_default();
let task_handle: TaskHandle = self.shutdown.map(Into::into).unwrap_or_default();

// Connect to the mixnet
let mixnet_client = create_mixnet_client(
&self.config.base,
shutdown.get_handle().named("nym_sdk::MixnetClient"),
task_handle.get_handle().named("nym_sdk::MixnetClient"),
self.custom_gateway_transceiver,
self.custom_topology_provider,
self.wait_for_gateway,
Expand All @@ -110,6 +111,19 @@ impl IpForwarderBuilder {

let self_address = *mixnet_client.nym_address();

// Create the TUN device that we interact with the rest of the world with
let (tun, tun_task_tx, tun_task_response_rx) =
nym_wireguard::tun_device::TunDevice::new(None);

let ip_forwarder_service = IpForwarderService {
config: self.config,
tun,
tun_task_tx,
tun_task_response_rx,
mixnet_client,
task_handle,
};

log::info!("The address of this client is: {self_address}");
log::info!("All systems go. Press CTRL-C to stop the server.");

Expand All @@ -120,6 +134,41 @@ impl IpForwarderBuilder {
}
}

ip_forwarder_service.run().await
}
}

struct IpForwarderService {
config: Config,

Check warning on line 142 in service-providers/ip-forwarder/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

fields `config`, `tun`, `tun_task_tx`, and `tun_task_response_rx` are never read

warning: fields `config`, `tun`, `tun_task_tx`, and `tun_task_response_rx` are never read --> service-providers/ip-forwarder/src/lib.rs:142:5 | 141 | struct IpForwarderService { | ------------------ fields in this struct 142 | config: Config, | ^^^^^^ 143 | tun: nym_wireguard::tun_device::TunDevice, | ^^^ 144 | tun_task_tx: nym_wireguard::tun_task_channel::TunTaskTx, | ^^^^^^^^^^^ 145 | tun_task_response_rx: nym_wireguard::tun_task_channel::TunTaskResponseRx, | ^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(dead_code)]` on by default
tun: nym_wireguard::tun_device::TunDevice,
tun_task_tx: nym_wireguard::tun_task_channel::TunTaskTx,
tun_task_response_rx: nym_wireguard::tun_task_channel::TunTaskResponseRx,
mixnet_client: nym_sdk::mixnet::MixnetClient,
task_handle: TaskHandle,
}

impl IpForwarderService {
async fn run(mut self) -> Result<(), IpForwarderError> {
let mut task_client = self.task_handle.fork("main_loop");
while !task_client.is_shutdown() {
tokio::select! {
_ = task_client.recv() => {
log::debug!("IpForwarderService [main loop]: received shutdown");
},
msg = self.mixnet_client.next() => match msg {
Some(msg) => self.on_message(msg).await,
None => {
log::trace!("IpForwarderService [main loop]: stopping since channel closed");
break;
},
}
}
}
log::info!("IpForwarderService: stopping");
Ok(())
}

async fn on_message(&mut self, reconstructed: ReconstructedMessage) {

Check warning on line 171 in service-providers/ip-forwarder/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `reconstructed`

warning: unused variable: `reconstructed` --> service-providers/ip-forwarder/src/lib.rs:171:36 | 171 | async fn on_message(&mut self, reconstructed: ReconstructedMessage) { | ^^^^^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_reconstructed` | = note: `#[warn(unused_variables)]` on by default
todo!();
}
}
Expand Down

0 comments on commit 46e2a74

Please sign in to comment.