Skip to content

Commit

Permalink
Ensure both nodes have a filesystem backend when transfering files (#212
Browse files Browse the repository at this point in the history
)

* Ensure both nodes have a filesystem backend when transferring files
  • Loading branch information
tbraun96 authored Apr 6, 2024
1 parent 78df8a6 commit e3b9ee2
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 20 deletions.
1 change: 1 addition & 0 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ jobs:
- run: cargo clippy --features=webrtc,sql,redis,multi-threaded --release -- -D warnings
- run: cargo clippy --features=webrtc,sql,redis -- -D warnings
- run: cargo clippy --features=webrtc,sql,redis --release -- -D warnings
- run: cargo clippy --tests -- -D warnings
- run: cargo fmt --check
- run: RUSTDOCFLAGS="-D warnings" cargo make docs
- run: cargo test --package citadel_sdk --doc
Expand Down
11 changes: 10 additions & 1 deletion citadel_proto/src/proto/packet_crafter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,14 @@ pub(crate) mod do_connect {
use citadel_types::crypto::SecurityLevel;
use citadel_types::user::MutualPeer;
use citadel_user::auth::proposed_credentials::ProposedCredentials;
use citadel_user::backend::BackendType;
use citadel_user::serialization::SyncIO;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
pub struct DoConnectStage0Packet {
pub proposed_credentials: ProposedCredentials,
pub uses_filesystem: bool,
}

/// Alice receives the nonce from Bob. She must now inscribe her username/password
Expand All @@ -463,6 +465,7 @@ pub(crate) mod do_connect {
proposed_credentials: ProposedCredentials,
timestamp: i64,
security_level: SecurityLevel,
backend_type: &BackendType,
) -> BytesMut {
let header = HdpHeader {
protocol_version: (*crate::constants::PROTOCOL_VERSION).into(),
Expand All @@ -480,8 +483,11 @@ pub(crate) mod do_connect {
target_cid: U64::new(0),
};

let uses_filesystem = matches!(backend_type, BackendType::Filesystem(..));

let payload = DoConnectStage0Packet {
proposed_credentials,
uses_filesystem,
};

let mut packet =
Expand Down Expand Up @@ -526,6 +532,7 @@ pub(crate) mod do_connect {
peers: Vec<MutualPeer>,
timestamp: i64,
security_level: SecurityLevel,
backend_type: &BackendType,
) -> BytesMut {
let payload = DoConnectFinalStatusPacket {
mailbox,
Expand All @@ -540,14 +547,16 @@ pub(crate) mod do_connect {
packet_flags::cmd::aux::do_connect::FAILURE
};

let is_filesystem = matches!(backend_type, BackendType::Filesystem(..));

let header = HdpHeader {
protocol_version: (*crate::constants::PROTOCOL_VERSION).into(),
cmd_primary: packet_flags::cmd::primary::DO_CONNECT,
cmd_aux,
algorithm: 0,
security_level: security_level.value(),
context_info: U128::new(0),
group: U64::new(0),
group: U64::new(is_filesystem as u64),
wave_id: U32::new(0),
session_cid: U64::new(hyper_ratchet.get_cid()),
drill_version: U32::new(hyper_ratchet.version()),
Expand Down
23 changes: 20 additions & 3 deletions citadel_proto/src/proto/packet_processor/connect_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::error::NetworkError;
use crate::proto::node_result::{ConnectFail, ConnectSuccess, MailboxDelivery};
use crate::proto::packet_processor::primary_group_packet::get_proper_hyper_ratchet;
use citadel_types::proto::ConnectMode;
use citadel_user::backend::BackendType;
use citadel_user::external_services::ServicesObject;
use std::sync::atomic::Ordering;

Expand Down Expand Up @@ -54,12 +55,18 @@ pub async fn process_connect(
match header.cmd_aux {
// Node is Bob. Bob gets the encrypted username and password (separately encrypted)
packet_flags::cmd::aux::do_connect::STAGE0 => {
log::trace!(target: "citadel", "STAGE 2 CONNECT PACKET");
log::trace!(target: "citadel", "STAGE 0 CONNECT PACKET");
let task = {
match validation::do_connect::validate_stage0_packet(&cnac, &payload).await {
Ok(_) => {
Ok(stage0_packet) => {
let mut state_container = inner_mut_state!(session.state_container);

let local_uses_file_system = matches!(
session.account_manager.get_backend_type(),
BackendType::Filesystem(..)
);
session
.file_transfer_compatible
.set_once(local_uses_file_system && stage0_packet.uses_filesystem);
let cid = hyper_ratchet.get_cid();
let success_time = session.time_tracker.get_global_time_ns();
let addr = session.remote_peer;
Expand Down Expand Up @@ -130,6 +137,7 @@ pub async fn process_connect(
peers,
success_time,
security_level,
session.account_manager.get_backend_type(),
);

session.implicated_cid.set(Some(cid));
Expand Down Expand Up @@ -176,6 +184,7 @@ pub async fn process_connect(
Vec::new(),
fail_time,
security_level,
session.account_manager.get_backend_type(),
);
return Ok(PrimaryProcessorResult::ReplyToSender(packet));
}
Expand Down Expand Up @@ -227,6 +236,14 @@ pub async fn process_connect(
let task = {
let mut state_container = inner_mut_state!(session.state_container);
let last_stage = state_container.connect_state.last_stage;
let remote_uses_filesystem = header.group.get() != 0;
let local_uses_file_system = matches!(
session.account_manager.get_backend_type(),
BackendType::Filesystem(..)
);
session
.file_transfer_compatible
.set_once(local_uses_file_system && remote_uses_filesystem);

if last_stage == packet_flags::cmd::aux::do_connect::STAGE1 {
if let Some(payload) =
Expand Down
41 changes: 37 additions & 4 deletions citadel_proto/src/proto/packet_processor/peer/peer_cmd_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use citadel_crypt::stacked_ratchet::constructor::{
use citadel_crypt::stacked_ratchet::StackedRatchet;
use citadel_crypt::toolset::Toolset;
use citadel_types::proto::UdpMode;
use citadel_user::backend::BackendType;
use citadel_user::serialization::SyncIO;
use netbeam::sync::RelativeNodeType;

Expand Down Expand Up @@ -376,9 +377,18 @@ pub async fn process_peer_cmd(
let bob_transfer =
return_if_none!(transfer.serialize_to_vector().ok());

let local_is_file_transfer_compat = matches!(
session.account_manager.get_backend_type(),
BackendType::Filesystem(..)
);

let signal = PeerSignal::Kex {
peer_conn_type: conn.reverse(),
kex_payload: KeyExchangeProcess::Stage1(bob_transfer, None),
kex_payload: KeyExchangeProcess::Stage1(
bob_transfer,
None,
local_is_file_transfer_compat,
),
};

let mut state_container_kem = PeerKemStateContainer::new(
Expand All @@ -401,7 +411,11 @@ pub async fn process_peer_cmd(
Ok(PrimaryProcessorResult::ReplyToSender(stage1_kem))
}

KeyExchangeProcess::Stage1(transfer, Some(bob_nat_info)) => {
KeyExchangeProcess::Stage1(
transfer,
Some(bob_nat_info),
peer_file_transfer_compat,
) => {
// Here, we finalize the creation of the pqc for alice, and then, generate the new toolset
// The toolset gets encrypted to ensure the central server doesn't see the toolset. This is
// to combat a "chinese communist hijack" scenario wherein a rogue government takes over our
Expand Down Expand Up @@ -461,6 +475,10 @@ pub async fn process_peer_cmd(
);
log::trace!(target: "citadel", "[STUN] Peer public addr: {:?} || needs TURN? {}", &bob_predicted_socket_addr, needs_turn);
let udp_rx_opt = kem_state.udp_channel_sender.rx.take();
let local_is_file_transfer_compat = matches!(
session.account_manager.get_backend_type(),
BackendType::Filesystem(..)
);

let channel = state_container
.insert_new_peer_virtual_connection_as_endpoint(
Expand All @@ -471,6 +489,8 @@ pub async fn process_peer_cmd(
vconn_type,
peer_crypto,
session,
local_is_file_transfer_compat
&& *peer_file_transfer_compat,
);
// load the channel now that the keys have been exchanged

Expand All @@ -487,6 +507,7 @@ pub async fn process_peer_cmd(
kex_payload: KeyExchangeProcess::Stage2(
sync_time_ns,
None,
local_is_file_transfer_compat,
),
};

Expand Down Expand Up @@ -580,7 +601,11 @@ pub async fn process_peer_cmd(
Ok(PrimaryProcessorResult::Void)
}

KeyExchangeProcess::Stage2(sync_time_ns, Some(alice_nat_info)) => {
KeyExchangeProcess::Stage2(
sync_time_ns,
Some(alice_nat_info),
peer_file_transfer_compat,
) => {
// NEW UPDATE: now that we know the other side successfully created its toolset,
// calculate sync time then begin the hole punch subroutine
log::trace!(target: "citadel", "RECV STAGE 2 PEER KEM");
Expand Down Expand Up @@ -625,7 +650,13 @@ pub async fn process_peer_cmd(
alice_nat_info.generate_proper_listener_connect_addr(
&session.local_nat_type,
);
let local_is_file_transfer_compat = matches!(
session.account_manager.get_backend_type(),
BackendType::Filesystem(..)
);

log::trace!(target: "citadel", "[STUN] Peer public addr: {:?} || needs TURN? {}", &alice_predicted_socket_addr, needs_turn);

let channel = state_container
.insert_new_peer_virtual_connection_as_endpoint(
alice_predicted_socket_addr,
Expand All @@ -635,6 +666,8 @@ pub async fn process_peer_cmd(
vconn_type,
peer_crypto,
session,
local_is_file_transfer_compat
&& *peer_file_transfer_compat,
);

log::trace!(target: "citadel", "Virtual connection forged on endpoint tuple {} -> {}", this_cid, peer_cid);
Expand Down Expand Up @@ -806,7 +839,7 @@ async fn process_signal_command_as_server(
};

match &mut kep {
KeyExchangeProcess::Stage1(_, val) | KeyExchangeProcess::Stage2(_, val) => {
KeyExchangeProcess::Stage1(_, val, _) | KeyExchangeProcess::Stage2(_, val, _) => {
*val = Some(peer_nat_info);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ fn begin_connect_process(
proposed_credentials,
timestamp,
security_level,
session.account_manager.get_backend_type(),
);
state_container.connect_state.last_stage = packet_flags::cmd::aux::do_connect::STAGE1;
// we now store the pqc temporarily in the state container
Expand Down
8 changes: 4 additions & 4 deletions citadel_proto/src/proto/peer/peer_crypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use std::net::SocketAddr;
pub enum KeyExchangeProcess {
// alice sends public key
Stage0(Vec<u8>, SessionSecuritySettings, UdpMode),
// Bob sends ciphertext, addr
Stage1(Vec<u8>, Option<PeerNatInfo>),
// Alice sends a sync time over. Server takes care of external addr
Stage2(i64, Option<PeerNatInfo>),
// Bob sends ciphertext, addr, file transfer compatibility
Stage1(Vec<u8>, Option<PeerNatInfo>, bool),
// Alice sends a sync time over. Server takes care of external addr. Includes file transfer compat
Stage2(i64, Option<PeerNatInfo>, bool),
// The hole-punch failed
HolePunchFailed,
}
Expand Down
14 changes: 12 additions & 2 deletions citadel_proto/src/proto/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub struct HdpSessionInner {
pub(super) hypernode_peer_layer: HyperNodePeerLayer,
pub(super) stun_servers: Option<Vec<String>>,
pub(super) init_time: Instant,
pub(super) file_transfer_compatible: DualLateInit<bool>,
on_drop: UnboundedSender<()>,
}

Expand Down Expand Up @@ -392,6 +393,7 @@ impl HdpSession {
client_config,
stun_servers,
init_time,
file_transfer_compatible: DualLateInit::default(),
};

if let Some(proposed_credentials) = session_init_params
Expand Down Expand Up @@ -1205,7 +1207,6 @@ impl HdpSession {
security_level: SecurityLevel,
) -> Result<(), NetworkError> {
self.ensure_connected(&ticket)?;

let mut state_container = inner_mut_state!(self.state_container);
let ts = self.time_tracker.get_global_time_ns();

Expand Down Expand Up @@ -1395,6 +1396,10 @@ impl HdpSession {
VirtualTargetType::LocalGroupServer { implicated_cid } => {
// if we are sending this just to the HyperLAN server (in the case of file uploads),
// then, we use this session's pqc, the cnac's latest drill, and 0 for target_cid
if !*self.file_transfer_compatible {
return Err(NetworkError::msg("File transfer is not enabled for this session. Both nodes must use a filesystem backend"));
}

let crypt_container = &mut state_container
.c2s_channel_container
.as_mut()
Expand Down Expand Up @@ -1474,7 +1479,8 @@ impl HdpSession {
peer_cid: target_cid,
} => {
log::trace!(target: "citadel", "Sending HyperLAN peer ({}) <-> HyperLAN Peer ({})", implicated_cid, target_cid);
// here, we don't use the base session's PQC. Instead, we use the vconn's pqc and
// here, we don't use the base session's PQC. Instead, we use the c2s vconn's pqc to ensure the peer can't access the contents
// of the file
let crypt_container_c2s = &state_container
.c2s_channel_container
.as_ref()
Expand All @@ -1488,6 +1494,10 @@ impl HdpSession {
let endpoint_container =
state_container.get_peer_endpoint_container_mut(target_cid)?;

if !endpoint_container.file_transfer_compatible {
return Err(NetworkError::msg("File transfer is not enabled for this p2p session. Both nodes must use a filesystem backend"));
}

let object_id = endpoint_container
.endpoint_crypto
.get_and_increment_object_id();
Expand Down
3 changes: 3 additions & 0 deletions citadel_proto/src/proto/state_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub struct EndpointChannelContainer<R: Ratchet = StackedRatchet> {
pub(crate) to_unordered_channel: Option<UnorderedChannelContainer>,
#[allow(dead_code)]
pub(crate) peer_socket_addr: SocketAddr,
pub(crate) file_transfer_compatible: bool,
}

pub struct C2SChannelContainer<R: Ratchet = StackedRatchet> {
Expand Down Expand Up @@ -788,6 +789,7 @@ impl StateContainerInner {
connection_type: VirtualConnectionType,
endpoint_crypto: PeerSessionCrypto,
sess: &HdpSession,
file_transfer_compatible: bool,
) -> PeerChannel {
let (channel_tx, channel_rx) = unbounded();
let (tx, rx) = crate::proto::outbound_sender::channel(MAX_OUTGOING_UNPROCESSED_REQUESTS);
Expand Down Expand Up @@ -817,6 +819,7 @@ impl StateContainerInner {
to_default_channel: to_channel,
to_unordered_channel: None,
peer_socket_addr,
file_transfer_compatible,
});

let vconn = VirtualConnection {
Expand Down
6 changes: 3 additions & 3 deletions citadel_proto/src/proto/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ pub(crate) mod do_connect {
pub(crate) async fn validate_stage0_packet(
cnac: &ClientNetworkAccount,
payload: &[u8],
) -> Result<(), NetworkError> {
) -> Result<DoConnectStage0Packet, NetworkError> {
// Now, validate the username and password. The payload is already decrypted
let payload = DoConnectStage0Packet::deserialize_from_vector(payload)
.map_err(|err| NetworkError::Generic(err.into_string()))?;
cnac.validate_credentials(payload.proposed_credentials)
cnac.validate_credentials(payload.proposed_credentials.clone())
.await
.map_err(|err| NetworkError::Generic(err.into_string()))?;
log::trace!(target: "citadel", "Success validating credentials!");
Ok(())
Ok(payload)
}

pub(crate) fn validate_final_status_packet(
Expand Down
2 changes: 1 addition & 1 deletion citadel_sdk/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ mod tests {
SigAlgorithm::Falcon1024
)]
#[tokio::test]
async fn test_c2s_file_transfer_revfs(
async fn test_c2s_file_transfer_revfsq(
#[case] enx: EncryptionAlgorithm,
#[case] kem: KemAlgorithm,
#[case] sig: SigAlgorithm,
Expand Down
3 changes: 1 addition & 2 deletions citadel_user/src/backend/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ impl DerefMut for FileTransferStarter {

impl Drop for FileTransferStarter {
fn drop(&mut self) {
// TODO: Remove this once debugging complete
if self.inner.is_some() {
log::error!(target: "citadel", "FileTransferStarter dropped without being used");
log::warn!(target: "citadel", "FileTransferStarter dropped without being used");
}
}
}
Expand Down

0 comments on commit e3b9ee2

Please sign in to comment.