Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create IpPacketRouter #4068

Merged
merged 11 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions common/wireguard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod packet_relayer;
mod platform;
mod registered_peers;
mod setup;
mod tun_task_channel;
pub mod tun_task_channel;
mod udp_listener;
mod wg_tunnel;

Expand All @@ -20,7 +20,7 @@ use std::sync::Arc;

// Currently the module related to setting up the virtual network device is platform specific.
#[cfg(target_os = "linux")]
use platform::linux::tun_device;
pub use platform::linux::tun_device;

/// Start wireguard UDP listener and TUN device
///
Expand All @@ -32,16 +32,24 @@ pub async fn start_wireguard(
task_client: nym_task::TaskClient,
gateway_client_registry: Arc<GatewayClientRegistry>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
// We can either index peers by their IP like standard wireguard
// TODO: make this configurable

// We can optionally index peers by their IP like standard wireguard. If we don't then we do
// plain NAT where we match incoming destination IP with outgoing source IP.
let peers_by_ip = Arc::new(tokio::sync::Mutex::new(network_table::NetworkTable::new()));

// ... or by their tunnel tag, which is a random number assigned to them
let peers_by_tag = Arc::new(tokio::sync::Mutex::new(wg_tunnel::PeersByTag::new()));
// Alternative 1:
let routing_mode = tun_device::RoutingMode::new_allowed_ips(peers_by_ip.clone());
// Alternative 2:
//let routing_mode = tun_device::RoutingMode::new_nat();

// Start the tun device that is used to relay traffic outbound
let (tun, tun_task_tx, tun_task_response_rx) = tun_device::TunDevice::new(peers_by_ip.clone());
let (tun, tun_task_tx, tun_task_response_rx) = tun_device::TunDevice::new(routing_mode);
tun.start();

// We also index peers by a tag
let peers_by_tag = Arc::new(tokio::sync::Mutex::new(wg_tunnel::PeersByTag::new()));

// If we want to have the tun device on a separate host, it's the tun_task and
// tun_task_response channels that needs to be sent over the network to the host where the tun
// device is running.
Expand Down
2 changes: 1 addition & 1 deletion common/wireguard/src/platform/linux/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub(crate) mod tun_device;
pub mod tun_device;
82 changes: 55 additions & 27 deletions common/wireguard/src/platform/linux/tun_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,42 @@ pub struct TunDevice {
// And when we get replies, this is where we should send it
tun_task_response_tx: TunTaskResponseTx,

routing_mode: RoutingMode,
}

pub enum RoutingMode {
// The routing table, as how wireguard does it
peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>,
AllowedIps(AllowedIpsInner),

// This is an alternative to the routing table, where we just match outgoing source IP with
// incoming destination IP.
Nat(NatInner),
}

impl RoutingMode {
pub fn new_nat() -> Self {
RoutingMode::Nat(NatInner {
nat_table: HashMap::new(),
})
}

pub fn new_allowed_ips(peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>) -> Self {
RoutingMode::AllowedIps(AllowedIpsInner { peers_by_ip })
}
}

pub struct AllowedIpsInner {
peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>,
}

pub struct NatInner {
nat_table: HashMap<IpAddr, u64>,
}

impl TunDevice {
pub fn new(
peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>,
routing_mode: RoutingMode,
// peers_by_ip: Option<Arc<tokio::sync::Mutex<PeersByIp>>>,
) -> (Self, TunTaskTx, TunTaskResponseRx) {
let tun = setup_tokio_tun_device(
format!("{TUN_BASE_NAME}%d").as_str(),
Expand All @@ -69,8 +94,7 @@ impl TunDevice {
tun_task_rx,
tun_task_response_tx,
tun,
peers_by_ip,
nat_table: HashMap::new(),
routing_mode,
};

(tun_device, tun_task_tx, tun_task_response_rx)
Expand All @@ -93,7 +117,9 @@ impl TunDevice {
);

// TODO: expire old entries
self.nat_table.insert(src_addr, tag);
if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
nat_table.nat_table.insert(src_addr, tag);
}

self.tun
.write_all(&packet)
Expand Down Expand Up @@ -121,30 +147,32 @@ impl TunDevice {

// Route packet to the correct peer.

// This is how wireguard does it, by consulting the AllowedIPs table.
if false {
let peers = self.peers_by_ip.lock().await;
if let Some(peer_tx) = peers.longest_match(dst_addr).map(|(_, tx)| tx) {
log::info!("Forward packet to wg tunnel");
peer_tx
.send(Event::Ip(packet.to_vec().into()))
.await
.tap_err(|err| log::error!("{err}"))
.ok();
return;
match self.routing_mode {
// This is how wireguard does it, by consulting the AllowedIPs table.
RoutingMode::AllowedIps(ref peers_by_ip) => {
let peers = peers_by_ip.peers_by_ip.as_ref().lock().await;
if let Some(peer_tx) = peers.longest_match(dst_addr).map(|(_, tx)| tx) {
log::info!("Forward packet to wg tunnel");
peer_tx
.send(Event::Ip(packet.to_vec().into()))
.await
.tap_err(|err| log::error!("{err}"))
.ok();
return;
}
}
}

// But we do it by consulting the NAT table.
{
if let Some(tag) = self.nat_table.get(&dst_addr) {
log::info!("Forward packet to wg tunnel with tag: {tag}");
self.tun_task_response_tx
.send((*tag, packet.to_vec()))
.await
.tap_err(|err| log::error!("{err}"))
.ok();
return;
// But we do it by consulting the NAT table.
RoutingMode::Nat(ref nat_table) => {
if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
log::info!("Forward packet to wg tunnel with tag: {tag}");
self.tun_task_response_tx
.send((*tag, packet.to_vec()))
.await
.tap_err(|err| log::error!("{err}"))
.ok();
return;
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions common/wireguard/src/tun_task_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub struct TunTaskTx(mpsc::Sender<TunTaskPayload>);
pub(crate) struct TunTaskRx(mpsc::Receiver<TunTaskPayload>);

impl TunTaskTx {
pub(crate) async fn send(
pub async fn send(
&self,
data: TunTaskPayload,
) -> Result<(), tokio::sync::mpsc::error::SendError<TunTaskPayload>> {
Expand Down Expand Up @@ -40,7 +40,7 @@ impl TunTaskResponseTx {
}

impl TunTaskResponseRx {
pub(crate) async fn recv(&mut self) -> Option<TunTaskPayload> {
pub async fn recv(&mut self) -> Option<TunTaskPayload> {
self.0.recv().await
}
}
Expand Down
4 changes: 2 additions & 2 deletions gateway/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::node::storage::error::StorageError;
use nym_ip_packet_router::error::IpForwarderError;
use nym_ip_packet_router::error::IpPacketRouterError;
use nym_network_requester::error::{ClientCoreError, NetworkRequesterError};
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::nyxd::AccountId;
Expand Down Expand Up @@ -110,7 +110,7 @@ pub(crate) enum GatewayError {
#[error("there was an issue with the local ip packet router: {source}")]
IpPacketRouterFailure {
#[from]
source: IpForwarderError,
source: IpPacketRouterError,
},

#[error("failed to startup local network requester")]
Expand Down
11 changes: 6 additions & 5 deletions gateway/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,12 @@ impl<St> Gateway<St> {

// TODO: well, wire it up internally to gateway traffic, shutdowns, etc.
let (on_start_tx, on_start_rx) = oneshot::channel();
let mut ip_builder = nym_ip_packet_router::IpForwarderBuilder::new(ip_opts.config.clone())
.with_shutdown(shutdown)
.with_custom_gateway_transceiver(Box::new(transceiver))
.with_wait_for_gateway(true)
.with_on_start(on_start_tx);
let mut ip_builder =
nym_ip_packet_router::IpPacketRouterBuilder::new(ip_opts.config.clone())
.with_shutdown(shutdown)
.with_custom_gateway_transceiver(Box::new(transceiver))
.with_wait_for_gateway(true)
.with_on_start(on_start_tx);

if let Some(custom_mixnet) = &ip_opts.custom_mixnet_path {
ip_builder = ip_builder.with_stored_topology(custom_mixnet)?
Expand Down
5 changes: 5 additions & 0 deletions service-providers/ip-packet-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
etherparse = "0.13.0"
futures = { workspace = true }
log = { workspace = true }
nym-bin-common = { path = "../../common/bin-common" }
Expand All @@ -18,6 +19,10 @@ nym-sdk = { path = "../../sdk/rust/nym-sdk" }
nym-service-providers-common = { path = "../common" }
nym-sphinx = { path = "../../common/nymsphinx" }
nym-task = { path = "../../common/task" }
nym-wireguard = { path = "../../common/wireguard" }
nym-wireguard-types = { path = "../../common/wireguard-types" }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tap.workspace = true
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
22 changes: 11 additions & 11 deletions service-providers/ip-packet-router/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ use std::{
path::{Path, PathBuf},
};

use crate::config::persistence::IpForwarderPaths;
use crate::config::persistence::IpPacketRouterPaths;

use self::template::CONFIG_TEMPLATE;

mod persistence;
mod template;

const DEFAULT_IP_FORWARDERS_DIR: &str = "ip-forwarder";
const DEFAULT_IP_PACKET_ROUTER_DIR: &str = "ip-packet-router";

/// Derive default path to ip forwarder's config directory.
/// It should get resolved to `$HOME/.nym/service-providers/ip-forwareder/<id>/config`
/// Derive default path to ip packet routers' config directory.
/// It should get resolved to `$HOME/.nym/service-providers/ip-packet-router/<id>/config`
pub fn default_config_directory<P: AsRef<Path>>(id: P) -> PathBuf {
must_get_home()
.join(NYM_DIR)
.join(DEFAULT_SERVICE_PROVIDERS_DIR)
.join(DEFAULT_IP_FORWARDERS_DIR)
.join(DEFAULT_IP_PACKET_ROUTER_DIR)
.join(id)
.join(DEFAULT_CONFIG_DIR)
}

/// Derive default path to ip forwarder's config file.
/// It should get resolved to `$HOME/.nym/service-providers/ip-forwarder/<id>/config/config.toml`
/// Derive default path to ip packet routers' config file.
/// It should get resolved to `$HOME/.nym/service-providers/ip-packet-router/<id>/config/config.toml`
pub fn default_config_filepath<P: AsRef<Path>>(id: P) -> PathBuf {
default_config_directory(id).join(DEFAULT_CONFIG_FILENAME)
}
Expand All @@ -44,7 +44,7 @@ pub fn default_data_directory<P: AsRef<Path>>(id: P) -> PathBuf {
must_get_home()
.join(NYM_DIR)
.join(DEFAULT_SERVICE_PROVIDERS_DIR)
.join(DEFAULT_IP_FORWARDERS_DIR)
.join(DEFAULT_IP_PACKET_ROUTER_DIR)
.join(id)
.join(DEFAULT_DATA_DIR)
}
Expand All @@ -55,7 +55,7 @@ pub struct Config {
#[serde(flatten)]
pub base: BaseClientConfig,

pub storage_paths: IpForwarderPaths,
pub storage_paths: IpPacketRouterPaths,

pub logging: LoggingSettings,
}
Expand All @@ -70,13 +70,13 @@ impl Config {
pub fn new<S: AsRef<str>>(id: S) -> Self {
Config {
base: BaseClientConfig::new(id.as_ref(), env!("CARGO_PKG_VERSION")),
storage_paths: IpForwarderPaths::new_base(default_data_directory(id.as_ref())),
storage_paths: IpPacketRouterPaths::new_base(default_data_directory(id.as_ref())),
logging: Default::default(),
}
}

pub fn with_data_directory<P: AsRef<Path>>(mut self, data_directory: P) -> Self {
self.storage_paths = IpForwarderPaths::new_base(data_directory);
self.storage_paths = IpPacketRouterPaths::new_base(data_directory);
self
}

Expand Down
8 changes: 4 additions & 4 deletions service-providers/ip-packet-router/src/config/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ use std::path::{Path, PathBuf};
pub const DEFAULT_DESCRIPTION_FILENAME: &str = "description.toml";

#[derive(Debug, Deserialize, PartialEq, Eq, Serialize, Clone)]
pub struct IpForwarderPaths {
pub struct IpPacketRouterPaths {
#[serde(flatten)]
pub common_paths: CommonClientPaths,

/// Location of the file containing our description
pub ip_forwarder_description: PathBuf,
pub ip_packet_router_description: PathBuf,
}

impl IpForwarderPaths {
impl IpPacketRouterPaths {
pub fn new_base<P: AsRef<Path>>(base_data_directory: P) -> Self {
let base_dir = base_data_directory.as_ref();

Self {
common_paths: CommonClientPaths::new_base(base_dir),
ip_forwarder_description: base_dir.join(DEFAULT_DESCRIPTION_FILENAME),
ip_packet_router_description: base_dir.join(DEFAULT_DESCRIPTION_FILENAME),
}
}
}
2 changes: 1 addition & 1 deletion service-providers/ip-packet-router/src/config/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ allowed_list_location = '{{ storage_paths.allowed_list_location }}'
unknown_list_location = '{{ storage_paths.unknown_list_location }}'

# Path to file containing description of this network-requester.
ip_forwarder_description = '{{ storage_paths.ip_forwarder_description }}'
ip_packet_router_description = '{{ storage_paths.ip_packet_router_description }}'


##### logging configuration options #####
Expand Down
10 changes: 8 additions & 2 deletions service-providers/ip-packet-router/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub use nym_client_core::error::ClientCoreError;

#[derive(thiserror::Error, Debug)]
pub enum IpForwarderError {
pub enum IpPacketRouterError {
#[error("I/O error: {0}")]
IoError(#[from] std::io::Error),

Expand All @@ -12,7 +12,7 @@ pub enum IpForwarderError {
FailedToLoadConfig(String),

// TODO: add more details here
#[error("Failed to validate the loaded config")]
#[error("failed to validate the loaded config")]
ConfigValidationFailure,

#[error("failed local version check, client and config mismatch")]
Expand All @@ -26,4 +26,10 @@ pub enum IpForwarderError {

#[error("the entity wrapping the network requester has disconnected")]
DisconnectedParent,

#[error("failed to parse incoming packet: {source}")]
PacketParseFailed { source: etherparse::ReadError },

#[error("parsed packet is missing IP header")]
PacketMissingHeader,
}
Loading
Loading