Skip to content

Commit

Permalink
wip: on_message
Browse files Browse the repository at this point in the history
  • Loading branch information
octol committed Oct 27, 2023
1 parent 46e2a74 commit a955ef8
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions common/wireguard/src/tun_task_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub struct TunTaskTx(mpsc::Sender<TunTaskPayload>);
pub(crate) struct TunTaskRx(mpsc::Receiver<TunTaskPayload>);

impl TunTaskTx {
pub(crate) async fn send(
pub async fn send(
&self,
data: TunTaskPayload,
) -> Result<(), tokio::sync::mpsc::error::SendError<TunTaskPayload>> {
Expand Down Expand Up @@ -40,7 +40,7 @@ impl TunTaskResponseTx {
}

impl TunTaskResponseRx {
pub(crate) async fn recv(&mut self) -> Option<TunTaskPayload> {
pub async fn recv(&mut self) -> Option<TunTaskPayload> {
self.0.recv().await
}
}
Expand Down
2 changes: 2 additions & 0 deletions service-providers/ip-forwarder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
etherparse = "0.13.0"
futures = { workspace = true }
log = { workspace = true }
nym-bin-common = { path = "../../common/bin-common" }
Expand All @@ -22,5 +23,6 @@ nym-wireguard = { path = "../../common/wireguard" }
nym-wireguard-types = { path = "../../common/wireguard-types" }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tap.workspace = true
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
10 changes: 9 additions & 1 deletion service-providers/ip-forwarder/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub enum IpForwarderError {
FailedToLoadConfig(String),

// TODO: add more details here
#[error("Failed to validate the loaded config")]
#[error("failed to validate the loaded config")]
ConfigValidationFailure,

#[error("failed local version check, client and config mismatch")]
Expand All @@ -26,4 +26,12 @@ pub enum IpForwarderError {

#[error("the entity wrapping the network requester has disconnected")]
DisconnectedParent,

#[error("failed to parse incoming packet: {source}")]
PacketParseFailed {
source: etherparse::ReadError,
},

#[error("parsed packet is missing IP header")]
PacketMissingHeader,
}
80 changes: 69 additions & 11 deletions service-providers/ip-forwarder/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::path::Path;
use std::{net::IpAddr, path::Path};

use error::IpForwarderError;
use futures::{channel::oneshot, StreamExt};
use nym_client_core::{
client::mix_traffic::transceiver::GatewayTransceiver,
config::disk_persistence::CommonClientPaths, HardcodedTopologyProvider, TopologyProvider,
};
use nym_sdk::{mixnet::Recipient, NymNetworkDetails};
use nym_sdk::{
mixnet::{InputMessage, MixnetMessageSender, Recipient},
NymNetworkDetails,
};
use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::{TaskClient, TaskHandle};
use tap::TapFallible;

use crate::config::BaseClientConfig;

Expand Down Expand Up @@ -115,7 +119,7 @@ impl IpForwarderBuilder {
let (tun, tun_task_tx, tun_task_response_rx) =
nym_wireguard::tun_device::TunDevice::new(None);

let ip_forwarder_service = IpForwarderService {
let ip_forwarder_service = IpForwarder {
config: self.config,
tun,
tun_task_tx,
Expand All @@ -138,7 +142,7 @@ impl IpForwarderBuilder {
}
}

struct IpForwarderService {
struct IpForwarder {
config: Config,
tun: nym_wireguard::tun_device::TunDevice,
tun_task_tx: nym_wireguard::tun_task_channel::TunTaskTx,
Expand All @@ -147,29 +151,83 @@ struct IpForwarderService {
task_handle: TaskHandle,
}

impl IpForwarderService {
impl IpForwarder {
async fn run(mut self) -> Result<(), IpForwarderError> {
let mut task_client = self.task_handle.fork("main_loop");
while !task_client.is_shutdown() {
tokio::select! {
_ = task_client.recv() => {
log::debug!("IpForwarderService [main loop]: received shutdown");
},
msg = self.mixnet_client.next() => match msg {
Some(msg) => self.on_message(msg).await,
None => {
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
self.on_message(msg).await.ok();
} else {
log::trace!("IpForwarderService [main loop]: stopping since channel closed");
break;
};
},
packet = self.tun_task_response_rx.recv() => {
if let Some((tag, packet)) = msg {
let input_message = InputMessage {
};
self.mixnet_client
.send(input_message)
.await
.tap_err(|err| {
log::error!("IpForwarderService [main loop]: failed to send packet to mixnet: {err}");
})
.ok();
} else {
log::trace!("IpForwarderService [main loop]: stopping since channel closed");
break;
},
}
}

}
}
log::info!("IpForwarderService: stopping");
Ok(())
}

async fn on_message(&mut self, reconstructed: ReconstructedMessage) {
todo!();
async fn on_message(
&mut self,
reconstructed: ReconstructedMessage,
) -> Result<(), IpForwarderError> {
log::info!("Received message: {:?}", reconstructed.sender_tag);

let headers = etherparse::SlicedPacket::from_ip(&reconstructed.message).map_err(|err| {
log::warn!("Received non-IP packet: {err}");
IpForwarderError::PacketParseFailed { source: err }
})?;

let (src_addr, dst_addr): (IpAddr, IpAddr) = match headers.ip {
Some(etherparse::InternetSlice::Ipv4(ipv4_header, _)) => (
ipv4_header.source_addr().into(),
ipv4_header.destination_addr().into(),
),
Some(etherparse::InternetSlice::Ipv6(ipv6_header, _)) => (
ipv6_header.source_addr().into(),
ipv6_header.destination_addr().into(),
),
None => {
log::warn!("Received non-IP packet");
return Err(IpForwarderError::PacketMissingHeader);
}
};
log::info!("Received packet: {src_addr} -> {dst_addr}");

// TODO: set the tag correctly. Can we just reuse sender_tag?
let tag = 0;
self.tun_task_tx
.send((tag, reconstructed.message))
.await
.tap_err(|err| {
log::error!("Failed to send packet to tun device: {err}");
})
.ok();

Ok(())
}
}

Expand Down

0 comments on commit a955ef8

Please sign in to comment.