diff --git a/Cargo.lock b/Cargo.lock index 530c347b85..94c67f1bd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1020,7 +1020,9 @@ dependencies = [ "curve25519-dalek-derive", "digest", "fiat-crypto", + "rand_core", "rustc_version", + "serde", "subtle", "zeroize", ] @@ -2927,6 +2929,7 @@ version = "0.18.0" dependencies = [ "anyhow", "bytes", + "curve25519-dalek", "derive_more", "ed25519-dalek", "flume", @@ -2947,6 +2950,7 @@ dependencies = [ "rand_core", "redb 2.1.0", "serde", + "sha2", "strum 0.26.2", "tempfile", "test-strategy", diff --git a/iroh-willow/Cargo.toml b/iroh-willow/Cargo.toml index 83c42ce85c..1be238597f 100644 --- a/iroh-willow/Cargo.toml +++ b/iroh-willow/Cargo.toml @@ -40,6 +40,8 @@ tokio-util = { version = "0.7", features = ["io-util", "io"] } tracing = "0.1" zerocopy = { version = "0.8.0-alpha.9", features = ["derive"] } hex = "0.4.3" +curve25519-dalek = { version = "4.1.3", features = ["digest", "rand_core", "serde"] } +sha2 = "0.10.8" [dev-dependencies] iroh-test = { path = "../iroh-test" } diff --git a/iroh-willow/src/actor.rs b/iroh-willow/src/actor.rs index e258e05d07..a5d41b0354 100644 --- a/iroh-willow/src/actor.rs +++ b/iroh-willow/src/actor.rs @@ -389,13 +389,18 @@ impl Actor { } => { let Channels { send, recv } = channels; let id = self.next_session_id(); - let session = Session::new(id, init.mode, our_role, send, initial_transmission); + let session = + Session::new(&self.store, id, our_role, send, init, initial_transmission); + let session = match session { + Ok(session) => session, + Err(err) => return send_reply(reply, Err(err.into())), + }; let store = self.store.clone(); let cancel_token = CancellationToken::new(); let future = session - .run(store, recv, init, cancel_token.clone()) + .run(store, recv, cancel_token.clone()) .instrument(error_span!("session", peer = %peer.fmt_short())); let task_key = self.session_tasks.spawn_local(id, future); @@ -481,6 +486,7 @@ impl Actor { fn complete_session(&mut self, session_id: &SessionId, result: Result<(), Error>) { let session = self.sessions.remove(session_id); if let Some(session) = session { + debug!(?session, ?result, "complete session"); session.on_finish.send(result).ok(); self.session_tasks.remove(&session.task_key); } else { diff --git a/iroh-willow/src/auth.rs b/iroh-willow/src/auth.rs index 23d71a631f..d069dbbc81 100644 --- a/iroh-willow/src/auth.rs +++ b/iroh-willow/src/auth.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, sync::{Arc, RwLock}, }; @@ -19,6 +19,8 @@ use crate::{ store::traits::{SecretStorage, SecretStoreError, Storage}, }; +pub type InterestMap = BTreeMap>; + #[derive(Debug, Clone)] pub struct DelegateTo { pub user: UserId, @@ -220,7 +222,7 @@ impl Auth { pub fn find_read_caps_for_interests( &self, interests: Interests, - ) -> Result>, AuthError> { + ) -> Result { match interests { Interests::All => { let out = self @@ -230,11 +232,12 @@ impl Auth { let aoi = AreaOfInterest::new(area); (auth, BTreeSet::from_iter([aoi])) }) - .collect::>(); + .collect::>(); Ok(out) } Interests::Some(interests) => { - let mut out: HashMap> = HashMap::new(); + let mut out: BTreeMap> = + BTreeMap::new(); for (cap_selector, aoi_selector) in interests { let cap = self.get_read_cap(&cap_selector)?; if let Some(cap) = cap { diff --git a/iroh-willow/src/net.rs b/iroh-willow/src/net.rs index 1d6420a137..ae3995fce3 100644 --- a/iroh-willow/src/net.rs +++ b/iroh-willow/src/net.rs @@ -105,7 +105,10 @@ impl SessionHandle { pub async fn join(&mut self) -> anyhow::Result<()> { let session_res = self.handle.on_finish().await; let net_tasks_res = join_all(&mut self.tasks).await; - session_res.or(net_tasks_res) + match session_res { + Err(err) => Err(err.into()), + Ok(()) => net_tasks_res, + } } } @@ -232,7 +235,7 @@ async fn recv_loop(mut recv_stream: RecvStream, mut channel_writer: Writer) -> a let max_buffer_size = channel_writer.max_buffer_size(); while let Some(buf) = recv_stream.read_chunk(max_buffer_size, true).await? { channel_writer.write_all(&buf.bytes[..]).await?; - trace!(len = buf.bytes.len(), "recv"); + // trace!(len = buf.bytes.len(), "recv"); } channel_writer.close(); trace!("close"); @@ -241,9 +244,9 @@ async fn recv_loop(mut recv_stream: RecvStream, mut channel_writer: Writer) -> a async fn send_loop(mut send_stream: SendStream, channel_reader: Reader) -> anyhow::Result<()> { while let Some(data) = channel_reader.read_bytes().await { - let len = data.len(); + // let len = data.len(); send_stream.write_chunk(data).await?; - trace!(len, "sent"); + // trace!(len, "sent"); } send_stream.finish().await?; trace!("close"); @@ -279,7 +282,7 @@ async fn join_all(join_set: &mut JoinSet>) -> anyhow::Result< let mut joined = 0; while let Some(res) = join_set.join_next().await { joined += 1; - tracing::trace!("joined {joined} tasks, remaining {}", join_set.len()); + trace!("joined {joined} tasks, remaining {}", join_set.len()); let res = match res { Ok(Ok(())) => Ok(()), Ok(Err(err)) => Err(err), diff --git a/iroh-willow/src/proto/sync.rs b/iroh-willow/src/proto/sync.rs index 06418ad5eb..b887105d47 100644 --- a/iroh-willow/src/proto/sync.rs +++ b/iroh-willow/src/proto/sync.rs @@ -10,7 +10,7 @@ use crate::util::codec::{DecodeOutcome, Decoder, Encoder}; use super::{ grouping::{Area, AreaOfInterest, ThreeDRange}, meadowcap, - willow::{Entry, DIGEST_LENGTH}, + willow::{Entry, NamespaceId, DIGEST_LENGTH}, }; pub const MAX_PAYLOAD_SIZE_POWER: u8 = 12; @@ -55,7 +55,7 @@ pub type SyncSignature = meadowcap::UserSignature; pub type Receiver = meadowcap::UserPublicKey; /// Represents an authorisation to read an area of data in a Namespace. -#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq, Ord, PartialOrd)] pub struct ReadAuthorisation(pub ReadCapability, pub Option); impl From for ReadAuthorisation { @@ -76,6 +76,10 @@ impl ReadAuthorisation { pub fn subspace_cap(&self) -> Option<&SubspaceCapability> { self.1.as_ref() } + + pub fn namespace(&self) -> NamespaceId { + self.0.granted_namespace().id() + } } /// The different resource handles employed by the WGPS. @@ -304,7 +308,7 @@ pub enum Message { #[debug("{:?}", _0)] PaiRequestSubspaceCapability(PaiRequestSubspaceCapability), #[debug("{:?}", _0)] - PaiReplySubspaceCapability(PaiReplySubspaceCapability), + PaiReplySubspaceCapability(Box), #[debug("{:?}", _0)] SetupBindStaticToken(SetupBindStaticToken), #[debug("{:?}", _0)] @@ -873,39 +877,40 @@ pub struct ControlFreeHandle { handle_type: HandleType, } -type PsiGroup = (); +pub type PsiGroupBytes = [u8; 32]; + /// Bind data to an IntersectionHandle for performing private area intersection. #[derive(Debug, Serialize, Deserialize)] pub struct PaiBindFragment { /// The result of first applying hash_into_group to some fragment for private area intersection and then performing scalar multiplication with scalar. - group_member: PsiGroup, + pub group_member: PsiGroupBytes, /// Set to true if the private set intersection item is a secondary fragment. - is_secondary: bool, + pub is_secondary: bool, } /// Finalise private set intersection for a single item. #[derive(Debug, Serialize, Deserialize)] pub struct PaiReplyFragment { /// The IntersectionHandle of the PaiBindFragment message which this finalises. - handle: IntersectionHandle, + pub handle: IntersectionHandle, /// The result of performing scalar multiplication between the group_member of the message that this is replying to and scalar. - group_member: PsiGroup, + pub group_member: PsiGroupBytes, } /// Ask the receiver to send a SubspaceCapability. #[derive(Debug, Serialize, Deserialize)] pub struct PaiRequestSubspaceCapability { /// The IntersectionHandle bound by the sender for the least-specific secondary fragment for whose NamespaceId to request the SubspaceCapability. - handle: IntersectionHandle, + pub handle: IntersectionHandle, } /// Send a previously requested SubspaceCapability. #[derive(Debug, Serialize, Deserialize)] pub struct PaiReplySubspaceCapability { /// The handle of the PaiRequestSubspaceCapability message that this answers (hence, an IntersectionHandle bound by the receiver of this message). - handle: IntersectionHandle, + pub handle: IntersectionHandle, /// A SubspaceCapability whose granted namespace corresponds to the request this answers. - capability: SubspaceCapability, + pub capability: SubspaceCapability, /// The SyncSubspaceSignature issued by the receiver of the capability over the sender’s challenge. - signature: SyncSignature, + pub signature: SyncSignature, } diff --git a/iroh-willow/src/proto/willow.rs b/iroh-willow/src/proto/willow.rs index a53b4a9153..e4bc2d3390 100644 --- a/iroh-willow/src/proto/willow.rs +++ b/iroh-willow/src/proto/willow.rs @@ -93,6 +93,11 @@ impl Path { Path(path) } + pub fn from_components(components: &[Component]) -> Self { + let path: Arc<[Component]> = components.to_vec().into(); + Self(path) + } + pub fn validate(components: &[&[u8]]) -> Result<(), InvalidPath> { if components.len() > MAX_COMPONENT_COUNT { return Err(InvalidPath::TooManyComponents); @@ -146,6 +151,14 @@ impl Path { let start = count.min(self.len()); Self::new_unchecked(self[start..].to_vec()) } + + pub fn component_count(&self) -> usize { + self.0.len() + } + + pub fn components(&self) -> &[Component] { + &self.0 + } } impl std::ops::Deref for Path { diff --git a/iroh-willow/src/session.rs b/iroh-willow/src/session.rs index 7db3a95db3..b7457b0e82 100644 --- a/iroh-willow/src/session.rs +++ b/iroh-willow/src/session.rs @@ -12,6 +12,7 @@ use crate::{ pub mod channels; mod data; mod error; +mod pai; mod payload; mod reconciler; mod resource; @@ -113,7 +114,7 @@ pub enum Scope { /// Intersection between two areas of interest. #[derive(Debug, Clone)] -pub struct AreaOfInterestIntersection { +pub struct AoiIntersection { pub our_handle: AreaOfInterestHandle, pub their_handle: AreaOfInterestHandle, pub intersection: Area, diff --git a/iroh-willow/src/session/error.rs b/iroh-willow/src/session/error.rs index 48ff2b9785..c5478fb5bb 100644 --- a/iroh-willow/src/session/error.rs +++ b/iroh-willow/src/session/error.rs @@ -6,6 +6,7 @@ use crate::{ sync::ResourceHandle, willow::Unauthorised, }, + session::{pai::PaiError, resource::MissingResource}, store::traits::SecretStoreError, util::channel::{ReadError, WriteError}, }; @@ -64,6 +65,10 @@ pub enum Error { MissingUserKey(UserId), #[error("a task failed to join")] TaskFailed(#[from] tokio::task::JoinError), + #[error("no known interests for given capability")] + NoKnownInterestsForCapability, + #[error("private area intersection error: {0}")] + Pai(#[from] PaiError), } impl From for Error { @@ -88,3 +93,9 @@ impl From for Error { Self::InvalidParameters("") } } + +impl From for Error { + fn from(value: MissingResource) -> Self { + Self::MissingResource(value.0) + } +} diff --git a/iroh-willow/src/session/pai.rs b/iroh-willow/src/session/pai.rs new file mode 100644 index 0000000000..d4f12c0da5 --- /dev/null +++ b/iroh-willow/src/session/pai.rs @@ -0,0 +1,602 @@ +//! Private Area Intersection finder +//! +//! As defined by the willow spec: [Private Area Intersection](https://willowprotocol.org/specs/pai/index.html) +//! +//! Partly ported from the implementation in earthstar and willow: +//! * https://github.com/earthstar-project/willow-js/blob/0db4b9ec7710fb992ab75a17bd8557040d9a1062/src/wgps/pai/pai_finder.ts +//! * https://github.com/earthstar-project/earthstar/blob/16d6d4028c22fdbb72f7395013b29be7dcd9217a/src/schemes/schemes.ts#L662 +//! Licensed under LGPL and ported into this MIT/Apache codebase with explicit permission +//! from the original author (gwil). + +use std::collections::{HashMap, HashSet}; + +use anyhow::Result; +use curve25519_dalek::{ristretto::CompressedRistretto, RistrettoPoint, Scalar}; +use futures_lite::StreamExt; +use tracing::debug; + +use crate::{ + proto::{ + grouping::SubspaceArea, + sync::{ + IntersectionHandle, IntersectionMessage, PaiBindFragment, PaiReplyFragment, + PaiRequestSubspaceCapability, ReadAuthorisation, ReadCapability, + }, + willow::{NamespaceId, Path, SubspaceId}, + }, + session::{ + channels::MessageReceiver, + resource::{MissingResource, ResourceMap}, + Error, Scope, Session, + }, + store::{traits::Storage, Store}, + util::{codec::Encoder, stream::Cancelable}, +}; + +#[derive(Debug, thiserror::Error)] +pub enum PaiError { + #[error("Partner replied with subspace cap for handle which we never sent a request for")] + SubspaceCapRequestForInvalidHandle, + #[error("Partner replied with subspace capability for the wrong namespace")] + SubspaceCapRequestForWrongNamespace, + #[error("Missing resource {:?}", _0.0)] + MissingResource(#[from] MissingResource), +} + +#[derive(Debug)] +pub enum ToPai { + SubmitAuthorisation(ReadAuthorisation), + ReceivedSubspaceCapRequest(IntersectionHandle), + ReceivedVerifiedSubspaceCapReply(IntersectionHandle, NamespaceId), + ReceivedReadCapForIntersection(IntersectionHandle), +} + +#[derive(Debug)] +pub struct PaiFinder { + session: Session, + store: Store, + scalar: PsiScalar, + fragments_info: HashMap, + our_intersection_handles: ResourceMap, + their_intersection_handles: ResourceMap, + requested_subspace_cap_handles: HashSet, +} + +impl PaiFinder { + pub fn new(session: Session, store: Store) -> Self { + Self { + session, + store, + scalar: PaiScheme::get_scalar(), + our_intersection_handles: Default::default(), + their_intersection_handles: Default::default(), + fragments_info: Default::default(), + requested_subspace_cap_handles: Default::default(), + } + } + + pub async fn run( + mut self, + to_pai: flume::Receiver, + mut recv: Cancelable>, + ) -> Result<(), Error> { + loop { + tokio::select! { + action = to_pai.recv_async() => { + match action { + Err(_) => break, + Ok(action) => self.on_action(action).await? + } + } + message = recv.next() => { + match message { + None => break, + Some(message) => self.on_message(message?).await? + } + } + } + } + Ok(()) + } + + async fn on_message(&mut self, message: IntersectionMessage) -> Result<(), Error> { + debug!("on_message {message:?}"); + match message { + IntersectionMessage::BindFragment(message) => self.receive_bind(message).await?, + IntersectionMessage::ReplyFragment(message) => self.receive_reply(message).await?, + } + Ok(()) + } + + async fn on_action(&mut self, action: ToPai) -> Result<(), Error> { + debug!("on_action {action:?}"); + match action { + ToPai::SubmitAuthorisation(auth) => self.submit_autorisation(auth).await?, + ToPai::ReceivedSubspaceCapRequest(handle) => { + self.received_subspace_cap_request(handle).await? + } + ToPai::ReceivedVerifiedSubspaceCapReply(handle, namespace) => { + self.received_verified_subspace_cap_reply(handle, namespace)? + } + ToPai::ReceivedReadCapForIntersection(handle) => { + self.received_read_cap_for_intersection(handle)? + } + } + Ok(()) + } + + async fn submit_autorisation(&mut self, authorisation: ReadAuthorisation) -> Result<(), Error> { + let read_cap = authorisation.read_cap(); + let fragment_kit = PaiScheme::get_fragment_kit(read_cap); + let fragment_set = fragment_kit.into_fragment_set(); + match fragment_set { + FragmentSet::Complete(pairs) => { + let last = pairs.len().wrapping_sub(1); + for (i, pair) in pairs.into_iter().enumerate() { + let is_most_specific = i == last; + let (namespace_id, path) = pair.clone(); + let (handle, message) = self.submit_fragment(Fragment::Pair(pair), false)?; + let info = LocalFragmentInfo { + on_intersection: IntersectionAction::new_primary(is_most_specific), + authorisation: authorisation.clone(), + namespace_id, + path, + subspace: SubspaceArea::Any, + }; + self.fragments_info.insert(handle, info); + self.session.send(message).await?; + } + } + FragmentSet::Selective { primary, secondary } => { + let last = primary.len().wrapping_sub(1); + for (i, triple) in primary.into_iter().enumerate() { + let is_most_specific = i == last; + let (namespace_id, subspace_id, path) = triple.clone(); + let (handle, message) = + self.submit_fragment(Fragment::Triple(triple), false)?; + let info = LocalFragmentInfo { + on_intersection: IntersectionAction::new_primary(is_most_specific), + authorisation: authorisation.clone(), + namespace_id, + path, + subspace: SubspaceArea::Id(subspace_id), + }; + self.fragments_info.insert(handle, info); + self.session.send(message).await?; + } + let last = secondary.len().wrapping_sub(1); + for (i, pair) in secondary.into_iter().enumerate() { + let is_most_specific = i == last; + let (namespace_id, path) = pair.clone(); + let (handle, message) = self.submit_fragment(Fragment::Pair(pair), true)?; + let info = LocalFragmentInfo { + on_intersection: IntersectionAction::new_secondary(is_most_specific), + authorisation: authorisation.clone(), + namespace_id, + path, + subspace: SubspaceArea::Any, + }; + self.fragments_info.insert(handle, info); + self.session.send(message).await?; + } + } + } + Ok(()) + } + + fn submit_fragment( + &mut self, + fragment: Fragment, + is_secondary: bool, + ) -> Result<(IntersectionHandle, PaiBindFragment)> { + let unmixed = PaiScheme::fragment_to_group(fragment); + let multiplied = PaiScheme::scalar_mult(unmixed, self.scalar); + let info = FragmentInfo { + group: multiplied, + state: FragmentState::Pending, + is_secondary, + }; + let message = info.to_message(); + let handle = self.our_intersection_handles.bind(info); + Ok((handle, message)) + } + + async fn receive_bind(&mut self, message: PaiBindFragment) -> Result<()> { + let PaiBindFragment { + group_member, + is_secondary, + } = message; + let unmixed = PsiGroup::from_bytes(group_member)?; + let multiplied = PaiScheme::scalar_mult(unmixed, self.scalar); + let fragment = FragmentInfo { + group: multiplied, + is_secondary, + state: FragmentState::Pending, + }; + let handle = self.their_intersection_handles.bind(fragment); + let reply = PaiReplyFragment { + handle, + group_member, + }; + self.session.send(reply).await?; + self.check_for_intersection(handle, Scope::Theirs).await?; + Ok(()) + } + + async fn receive_reply(&mut self, message: PaiReplyFragment) -> Result<()> { + let PaiReplyFragment { + handle, + group_member, + } = message; + let group_member = PsiGroup::from_bytes(group_member)?; + let intersection = self.our_intersection_handles.try_get(&handle)?; + let fragment = FragmentInfo { + group: group_member, + is_secondary: intersection.is_secondary, + state: FragmentState::Complete, + }; + self.our_intersection_handles.update(handle, fragment)?; + self.check_for_intersection(handle, Scope::Ours).await?; + Ok(()) + } + + async fn check_for_intersection( + &mut self, + handle: IntersectionHandle, + scope: Scope, + ) -> Result<(), Error> { + let store_to_check = match scope { + Scope::Ours => &self.our_intersection_handles, + Scope::Theirs => &self.their_intersection_handles, + }; + let intersection = store_to_check.try_get(&handle)?; + + if !intersection.is_complete() { + return Ok(()); + } + + // Here we are looping through the whole contents of the handle store because... + // otherwise we need to build a special handle store just for intersections. + // Which we might do one day, but I'm not convinced it's worth it yet. + for (other_handle, other_intersection) in store_to_check.iter() { + if !other_intersection.completes_with(intersection) { + continue; + } + + // If there is an intersection, check what we have to do! + let our_handle = match scope { + Scope::Ours => handle, + Scope::Theirs => *other_handle, + }; + + let fragment_info = self + .fragments_info + .get(&our_handle) + .ok_or(Error::MissingResource(our_handle.into()))?; + + match fragment_info.on_intersection { + IntersectionAction::BindReadCap => { + let intersection = fragment_info.to_pai_intersection(our_handle); + self.session.push_pai_intersection(intersection); + } + IntersectionAction::RequestSubspaceCap => { + self.requested_subspace_cap_handles.insert(our_handle); + let message = PaiRequestSubspaceCapability { handle }; + self.session.send(message).await?; + } + IntersectionAction::ReplyReadCap | IntersectionAction::DoNothing => {} + } + } + + Ok(()) + } + + fn received_read_cap_for_intersection( + &mut self, + their_handle: IntersectionHandle, + ) -> Result<()> { + let their_intersection = self.their_intersection_handles.try_get(&their_handle)?; + for (our_handle, our_intersection) in self.our_intersection_handles.iter() { + if !our_intersection.completes_with(their_intersection) { + continue; + } + let fragment_info = self + .fragments_info + .get(our_handle) + .ok_or(Error::MissingResource((*our_handle).into()))?; + if let IntersectionAction::ReplyReadCap = fragment_info.on_intersection { + let intersection = fragment_info.to_pai_intersection(*our_handle); + self.session.push_pai_intersection(intersection); + } + } + Ok(()) + } + + fn received_verified_subspace_cap_reply( + &mut self, + handle: IntersectionHandle, + namespace_id: NamespaceId, + ) -> Result<(), PaiError> { + if !self.requested_subspace_cap_handles.remove(&handle) { + return Err(PaiError::SubspaceCapRequestForInvalidHandle); + } + let _ = self.our_intersection_handles.try_get(&handle)?; + let fragment_info = self + .fragments_info + .get(&handle) + .ok_or(PaiError::SubspaceCapRequestForInvalidHandle)?; + + if fragment_info.namespace_id != namespace_id { + return Err(PaiError::SubspaceCapRequestForWrongNamespace); + } + let intersection = fragment_info.to_pai_intersection(handle); + self.session.push_pai_intersection(intersection); + Ok(()) + } + + pub async fn received_subspace_cap_request( + &mut self, + handle: IntersectionHandle, + ) -> Result<(), Error> { + let result = self.their_intersection_handles.try_get(&handle)?; + for (our_handle, intersection) in self.our_intersection_handles.iter() { + if !intersection.is_complete() { + continue; + } + if !PaiScheme::is_group_equal(&result.group, &intersection.group) { + continue; + } + let fragment_info = self + .fragments_info + .get(our_handle) + .ok_or(PaiError::SubspaceCapRequestForInvalidHandle)?; + if let Some(cap) = fragment_info.authorisation.subspace_cap() { + let message = + self.session + .sign_subspace_capabiltiy(self.store.secrets(), cap, handle)?; + self.session.send(Box::new(message)).await?; + } + } + Ok(()) + } +} + +#[derive(Debug)] +pub struct LocalFragmentInfo { + on_intersection: IntersectionAction, + authorisation: ReadAuthorisation, + namespace_id: NamespaceId, + // will be needed for spec-compliant encodings of read capabilities + #[allow(dead_code)] + path: Path, + // will be needed for spec-compliant encodings of read capabilities + #[allow(dead_code)] + subspace: SubspaceArea, +} + +impl LocalFragmentInfo { + fn to_pai_intersection(&self, handle: IntersectionHandle) -> PaiIntersection { + PaiIntersection { + authorisation: self.authorisation.clone(), + handle, + } + } +} + +#[derive(Debug, Clone)] +pub enum Fragment { + Pair(FragmentPair), + Triple(FragmentTriple), +} + +impl Encoder for Fragment { + fn encoded_len(&self) -> usize { + match self { + Fragment::Pair((_, path)) => NamespaceId::LENGTH + path.encoded_len(), + Fragment::Triple((_, _, path)) => { + NamespaceId::LENGTH + SubspaceId::LENGTH + path.encoded_len() + } + } + } + fn encode_into(&self, out: &mut W) -> Result<()> { + match self { + Fragment::Pair((namespace_id, path)) => { + out.write_all(namespace_id.as_bytes())?; + path.encode_into(out)?; + } + Fragment::Triple((namespace_id, subspace_id, path)) => { + out.write_all(namespace_id.as_bytes())?; + out.write_all(subspace_id.as_bytes())?; + path.encode_into(out)?; + } + } + Ok(()) + } +} + +pub type FragmentTriple = (NamespaceId, SubspaceId, Path); + +pub type FragmentPair = (NamespaceId, Path); + +#[derive(Debug, Clone)] +pub enum FragmentSet { + Complete(Vec), + Selective { + primary: Vec, + secondary: Vec, + }, +} + +#[derive(Debug)] +pub enum FragmentKit { + Complete(NamespaceId, Path), + Selective(NamespaceId, SubspaceId, Path), +} + +impl FragmentKit { + fn into_fragment_set(self) -> FragmentSet { + match self { + FragmentKit::Complete(namespace_id, path) => { + let mut pairs = vec![]; + for prefix in prefixes_of(&path) { + pairs.push((namespace_id, prefix)); + } + FragmentSet::Complete(pairs) + } + FragmentKit::Selective(namespace_id, subspace_id, path) => { + let mut primary = vec![]; + let mut secondary = vec![]; + for prefix in prefixes_of(&path) { + primary.push((namespace_id, subspace_id, prefix.clone())); + secondary.push((namespace_id, prefix.clone())); + } + FragmentSet::Selective { primary, secondary } + } + } + } +} + +fn prefixes_of(path: &Path) -> Vec { + let mut out = vec![Path::empty()]; + let components = path.components(); + if components.is_empty() { + return out; + } + for i in 1..=components.len() { + let prefix = Path::from_components(&components[..i]); + out.push(prefix); + } + out +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct PsiGroup(RistrettoPoint); + +#[derive(Debug, thiserror::Error)] +#[error("Invalid Psi Group")] +pub struct InvalidPsiGroup; + +impl PsiGroup { + pub fn from_bytes(bytes: [u8; 32]) -> Result { + let compressed = CompressedRistretto(bytes); + let uncompressed = compressed.decompress().ok_or(InvalidPsiGroup)?; + Ok(Self(uncompressed)) + } + + pub fn to_bytes(self) -> [u8; 32] { + self.0.compress().0 + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct PsiScalar(Scalar); + +pub struct PaiScheme; + +impl PaiScheme { + fn fragment_to_group(fragment: Fragment) -> PsiGroup { + let encoded = fragment.encode().expect("encoding not to fail"); + let point = RistrettoPoint::hash_from_bytes::(&encoded); + PsiGroup(point) + } + + fn get_scalar() -> PsiScalar { + PsiScalar(Scalar::random(&mut rand::thread_rng())) + } + + fn scalar_mult(group: PsiGroup, scalar: PsiScalar) -> PsiGroup { + PsiGroup(group.0 * scalar.0) + } + + fn is_group_equal(a: &PsiGroup, b: &PsiGroup) -> bool { + a == b + } + + fn get_fragment_kit(cap: &ReadCapability) -> FragmentKit { + let granted_area = cap.granted_area(); + let granted_namespace = cap.granted_namespace().id(); + let granted_path = granted_area.path.clone(); + + match granted_area.subspace { + SubspaceArea::Any => FragmentKit::Complete(granted_namespace, granted_path), + SubspaceArea::Id(granted_subspace) => { + FragmentKit::Selective(granted_namespace, granted_subspace, granted_path) + } + } + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum FragmentState { + Pending, + Complete, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct FragmentInfo { + group: PsiGroup, + state: FragmentState, + is_secondary: bool, +} + +#[derive(Debug)] +pub struct PaiIntersection { + pub authorisation: ReadAuthorisation, + pub handle: IntersectionHandle, +} + +impl FragmentInfo { + fn to_message(&self) -> PaiBindFragment { + PaiBindFragment { + group_member: self.group.to_bytes(), + is_secondary: self.is_secondary, + } + } + + fn is_complete(&self) -> bool { + matches!(self.state, FragmentState::Complete) + } + + fn is_secondary(&self) -> bool { + self.is_secondary + } + + fn completes_with(&self, other: &Self) -> bool { + if !self.is_complete() || !other.is_complete() { + return false; + } + if self.is_secondary() && other.is_secondary() { + return false; + } + if !PaiScheme::is_group_equal(&self.group, &other.group) { + return false; + } + true + } +} + +#[derive(Debug, Clone, Copy)] +pub enum IntersectionAction { + DoNothing, + BindReadCap, + RequestSubspaceCap, + ReplyReadCap, +} + +impl IntersectionAction { + pub fn new_primary(is_most_specific: bool) -> Self { + if is_most_specific { + IntersectionAction::BindReadCap + } else { + IntersectionAction::ReplyReadCap + } + } + + pub fn new_secondary(is_most_specific: bool) -> Self { + if is_most_specific { + IntersectionAction::RequestSubspaceCap + } else { + IntersectionAction::DoNothing + } + } +} diff --git a/iroh-willow/src/session/reconciler.rs b/iroh-willow/src/session/reconciler.rs index 5f05c6f179..1e4a2ea9f3 100644 --- a/iroh-willow/src/session/reconciler.rs +++ b/iroh-willow/src/session/reconciler.rs @@ -15,7 +15,7 @@ use crate::{ session::{ channels::MessageReceiver, payload::{send_payload_chunked, CurrentPayload}, - AreaOfInterestIntersection, Error, Session, + AoiIntersection, Error, Session, }, store::{ traits::{EntryReader, EntryStorage, SplitAction, SplitOpts, Storage}, @@ -93,8 +93,8 @@ impl Reconciler { Ok(()) } - async fn initiate(&mut self, intersection: AreaOfInterestIntersection) -> Result<(), Error> { - let AreaOfInterestIntersection { + async fn initiate(&mut self, intersection: AoiIntersection) -> Result<(), Error> { + let AoiIntersection { our_handle, their_handle, intersection, diff --git a/iroh-willow/src/session/resource.rs b/iroh-willow/src/session/resource.rs index 63e027b6f7..d0a0150b9e 100644 --- a/iroh-willow/src/session/resource.rs +++ b/iroh-willow/src/session/resource.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, VecDeque}, + collections::{hash_map, HashMap, VecDeque}, task::{Context, Poll, Waker}, }; @@ -33,8 +33,9 @@ impl ResourceMaps { F: for<'a> Fn(&'a Self) -> &'a ResourceMap, R: Eq + PartialEq + Clone, { - let res = selector(self); - res.try_get(&handle).cloned() + let store = selector(self); + let res = store.try_get(&handle).cloned()?; + Ok(res) } pub fn poll_get_eventually( @@ -111,12 +112,12 @@ where } } - pub fn try_get(&self, handle: &H) -> Result<&R, Error> { + pub fn try_get(&self, handle: &H) -> Result<&R, MissingResource> { self.map .get(handle) .as_ref() .map(|r| &r.value) - .ok_or_else(|| Error::MissingResource((*handle).into())) + .ok_or_else(|| MissingResource((*handle).into())) } pub fn get(&self, handle: &H) -> Option<&R> { @@ -151,8 +152,22 @@ where Poll::Pending } } + + pub fn update(&mut self, handle: H, resource: R) -> Result<(), Error> { + match self.map.entry(handle) { + hash_map::Entry::Vacant(_) => Err(Error::MissingResource(handle.into())), + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().value = resource; + Ok(()) + } + } + } } +#[derive(Debug, thiserror::Error)] +#[error("missing resource {0:?}")] +pub struct MissingResource(pub ResourceHandle); + // #[derive(Debug)] // enum ResourceState { // Active, diff --git a/iroh-willow/src/session/run.rs b/iroh-willow/src/session/run.rs index 3f2dad41ca..c235a06ec5 100644 --- a/iroh-willow/src/session/run.rs +++ b/iroh-willow/src/session/run.rs @@ -4,8 +4,12 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error_span, trace, warn}; use crate::{ - proto::sync::{ControlIssueGuarantee, LogicalChannel, Message, SetupBindAreaOfInterest}, - session::{channels::LogicalChannelReceivers, Error, Scope, Session, SessionInit}, + proto::sync::{ControlIssueGuarantee, LogicalChannel, Message}, + session::{ + channels::LogicalChannelReceivers, + pai::{PaiFinder, ToPai}, + Error, Session, + }, store::{traits::Storage, Store}, util::{channel::Receiver, stream::Cancelable}, }; @@ -24,7 +28,6 @@ impl Session { self, store: Store, recv: ChannelReceivers, - init: SessionInit, cancel_token: CancellationToken, ) -> Result<(), Error> { let ChannelReceivers { @@ -43,11 +46,19 @@ impl Session { // Make all our receivers close once the cancel_token is triggered. let control_recv = Cancelable::new(control_recv, cancel_token.clone()); let reconciliation_recv = Cancelable::new(reconciliation_recv, cancel_token.clone()); + let intersection_recv = Cancelable::new(intersection_recv, cancel_token.clone()); let mut static_tokens_recv = Cancelable::new(static_tokens_recv, cancel_token.clone()); let mut capability_recv = Cancelable::new(capability_recv, cancel_token.clone()); let mut aoi_recv = Cancelable::new(aoi_recv, cancel_token.clone()); let mut data_recv = Cancelable::new(data_recv, cancel_token.clone()); + // Setup the private area intersection finder. + let pai_finder = PaiFinder::new(self.clone(), store.clone()); + let (to_pai_tx, to_pai_rx) = flume::bounded(128); + self.spawn(error_span!("pai"), { + move |_session| async move { pai_finder.run(to_pai_rx, intersection_recv).await } + }); + // Spawn a task to handle incoming static tokens. self.spawn(error_span!("stt"), move |session| async move { while let Some(message) = static_tokens_recv.try_next().await? { @@ -57,7 +68,7 @@ impl Session { }); // Only setup data receiver if session is configured in live mode. - if init.mode == SessionMode::Live { + if self.mode() == SessionMode::Live { self.spawn(error_span!("dat:r"), { let store = store.clone(); move |session| async move { @@ -78,11 +89,19 @@ impl Session { } // Spawn a task to handle incoming capabilities. - self.spawn(error_span!("cap"), move |session| async move { - while let Some(message) = capability_recv.try_next().await? { - session.on_setup_bind_read_capability(message)?; + self.spawn(error_span!("cap"), { + let to_pai = to_pai_tx.clone(); + move |session| async move { + while let Some(message) = capability_recv.try_next().await? { + let handle = message.handle; + session.on_setup_bind_read_capability(message)?; + to_pai + .send_async(ToPai::ReceivedReadCapForIntersection(handle)) + .await + .map_err(|_| Error::InvalidState("PAI actor dead"))?; + } + Ok(()) } - Ok(()) }); // Spawn a task to handle incoming areas of interest. @@ -109,12 +128,24 @@ impl Session { } }); + // Spawn a task to react to found PAI intersections. + let pai_intersections = self.pai_intersection_stream(); + let mut pai_intersections = Cancelable::new(pai_intersections, cancel_token.clone()); + self.spawn(error_span!("pai:intersections"), { + let store = store.clone(); + move |session| async move { + while let Some(intersection) = pai_intersections.next().await { + session.on_pai_intersection(&store, intersection).await?; + } + Ok(()) + } + }); + // Spawn a task to handle control messages self.spawn(error_span!("ctl"), { - let store = store.clone(); let cancel_token = cancel_token.clone(); move |session| async move { - let res = control_loop(session, store, control_recv, init).await; + let res = control_loop(session, control_recv, to_pai_tx).await; cancel_token.cancel(); res } @@ -152,7 +183,10 @@ impl Session { let _guard = span.enter(); trace!(?result, remaining = self.remaining_tasks(), "task complete"); if let Err(err) = result { - warn!("task failed: {err:?}"); + match err { + Error::TaskFailed(err) if err.is_cancelled() => {} + err => warn!("task failed: {err:?}"), + } } } @@ -165,14 +199,13 @@ impl Session { } } -async fn control_loop( +async fn control_loop( session: Session, - store: Store, mut control_recv: Cancelable>, - init: SessionInit, + to_pai: flume::Sender, ) -> Result<(), Error> { debug!(role = ?session.our_role(), "start session"); - let mut init = Some(init); + let mut commitment_revealed = false; // Reveal our nonce. let reveal_message = session.reveal_commitment()?; @@ -191,18 +224,36 @@ async fn control_loop( match message { Message::CommitmentReveal(msg) => { session.on_commitment_reveal(msg)?; - let init = init.take().ok_or(Error::InvalidMessageInCurrentState)?; - // send setup messages, but in a separate task to not block incoming guarantees - let store = store.clone(); - session.spawn(error_span!("setup"), move |session| { - setup(store, session, init) + if commitment_revealed { + return Err(Error::InvalidMessageInCurrentState)?; + } + commitment_revealed = true; + let to_pai = to_pai.clone(); + session.spawn(error_span!("setup-pai"), move |session| { + setup_pai(session, to_pai) }); } Message::ControlIssueGuarantee(msg) => { let ControlIssueGuarantee { amount, channel } = msg; - trace!(?channel, %amount, "add guarantees"); + // trace!(?channel, %amount, "add guarantees"); session.add_guarantees(channel, amount); } + Message::PaiRequestSubspaceCapability(msg) => { + to_pai + .send_async(ToPai::ReceivedSubspaceCapRequest(msg.handle)) + .await + .map_err(|_| Error::InvalidState("PAI actor dead"))?; + } + Message::PaiReplySubspaceCapability(msg) => { + session.verify_subspace_capability(&msg)?; + to_pai + .send_async(ToPai::ReceivedVerifiedSubspaceCapReply( + msg.handle, + msg.capability.granted_namespace().id(), + )) + .await + .map_err(|_| Error::InvalidState("PAI actor dead"))?; + } _ => return Err(Error::UnsupportedMessage), } } @@ -210,38 +261,12 @@ async fn control_loop( Ok(()) } -async fn setup( - store: Store, - session: Session, - init: SessionInit, -) -> Result<(), Error> { - // debug!(interests = init.interests.len(), "start setup"); - debug!(?init, "start setup"); - let interests = store.auth().find_read_caps_for_interests(init.interests)?; - debug!(?interests, "found interests"); - for (authorisation, aois) in interests { - // TODO: implement private area intersection - let intersection_handle = 0.into(); - let read_cap = authorisation.read_cap(); - let (our_capability_handle, message) = session.bind_and_sign_capability( - store.secrets(), - intersection_handle, - read_cap.clone(), - )?; - if let Some(message) = message { - session.send(message).await?; - } - - for area_of_interest in aois { - let msg = SetupBindAreaOfInterest { - area_of_interest, - authorisation: our_capability_handle, - }; - // TODO: We could skip the clone if we re-enabled sending by reference. - session.bind_area_of_interest(Scope::Ours, msg.clone(), read_cap)?; - session.send(msg).await?; - } +async fn setup_pai(session: Session, to_pai: flume::Sender) -> Result<(), Error> { + for authorisation in session.interests().keys() { + to_pai + .send_async(ToPai::SubmitAuthorisation(authorisation.clone())) + .await + .map_err(|_| Error::InvalidState("PAI actor dead"))?; } - debug!("setup done"); Ok(()) } diff --git a/iroh-willow/src/session/state.rs b/iroh-willow/src/session/state.rs index 2323ade36d..1ef26e9d31 100644 --- a/iroh-willow/src/session/state.rs +++ b/iroh-willow/src/session/state.rs @@ -11,27 +11,32 @@ use futures_lite::Stream; use tracing::{debug, trace, Instrument, Span}; use crate::{ + auth::InterestMap, proto::{ challenge::ChallengeState, grouping::ThreeDRange, keys::NamespaceId, sync::{ AreaOfInterestHandle, CapabilityHandle, Channel, CommitmentReveal, DynamicToken, - IntersectionHandle, IsHandle, LogicalChannel, Message, ReadCapability, - ReconciliationAnnounceEntries, ReconciliationSendFingerprint, SetupBindAreaOfInterest, - SetupBindReadCapability, SetupBindStaticToken, StaticToken, StaticTokenHandle, + IntersectionHandle, IsHandle, LogicalChannel, Message, PaiReplySubspaceCapability, + ReadCapability, ReconciliationAnnounceEntries, ReconciliationSendFingerprint, + SetupBindAreaOfInterest, SetupBindReadCapability, SetupBindStaticToken, StaticToken, + StaticTokenHandle, SubspaceCapability, }, willow::{AuthorisedEntry, Entry}, }, - session::InitialTransmission, - store::traits::SecretStorage, + session::{pai::PaiIntersection, InitialTransmission, SessionInit}, + store::{ + traits::{SecretStorage, Storage}, + Store, + }, util::{channel::WriteError, queue::Queue, task::JoinMap}, }; use super::{ channels::ChannelSenders, resource::{ResourceMap, ResourceMaps}, - AreaOfInterestIntersection, Error, Role, Scope, SessionId, SessionMode, + AoiIntersection, Error, Role, Scope, SessionId, SessionMode, }; #[derive(Debug, Clone)] @@ -42,36 +47,44 @@ struct SessionInner { id: SessionId, our_role: Role, mode: SessionMode, + interests: InterestMap, state: RefCell, send: ChannelSenders, tasks: RefCell>>, } impl Session { - pub fn new( + pub fn new( + store: &Store, id: SessionId, - mode: SessionMode, our_role: Role, send: ChannelSenders, + init: SessionInit, initial_transmission: InitialTransmission, - ) -> Self { + ) -> Result { let state = SessionState::new(initial_transmission); - Self(Rc::new(SessionInner { - mode, + let interests = store.auth().find_read_caps_for_interests(init.interests)?; + Ok(Self(Rc::new(SessionInner { + mode: init.mode, id, our_role, + interests, state: RefCell::new(state), send, tasks: Default::default(), - })) + }))) } pub fn id(&self) -> &SessionId { &self.0.id } - pub fn mode(&self) -> &SessionMode { - &self.0.mode + pub fn mode(&self) -> SessionMode { + self.0.mode + } + + pub fn interests(&self) -> &InterestMap { + &self.0.interests } pub fn spawn(&self, span: Span, f: F) @@ -157,9 +170,9 @@ impl Session { self.0.our_role } - pub async fn next_aoi_intersection(&self) -> Option { + pub async fn next_aoi_intersection(&self) -> Option { poll_fn(|cx| { - let mut queue = &mut self.0.state.borrow_mut().intersection_queue; + let mut queue = &mut self.0.state.borrow_mut().aoi_intersection_queue; Pin::new(&mut queue).poll_next(cx) }) .await @@ -195,6 +208,23 @@ impl Session { .await } + pub fn sign_subspace_capabiltiy( + &self, + key_store: &K, + cap: &SubspaceCapability, + handle: IntersectionHandle, + ) -> Result { + let inner = self.state(); + let signable = inner.challenge.signable()?; + let signature = key_store.sign_user(&cap.receiver().id(), &signable)?; + let message = PaiReplySubspaceCapability { + handle, + capability: cap.clone(), + signature, + }; + Ok(message) + } + pub fn bind_and_sign_capability( &self, key_store: &K, @@ -323,6 +353,17 @@ impl Session { Ok(()) } + pub fn verify_subspace_capability( + &self, + msg: &PaiReplySubspaceCapability, + ) -> Result<(), Error> { + msg.capability.validate()?; + self.state() + .challenge + .verify(msg.capability.receiver(), &msg.signature)?; + Ok(()) + } + pub fn reconciliation_is_complete(&self) -> bool { let state = self.state(); // tracing::debug!( @@ -434,6 +475,64 @@ impl Session { (handle, msg) } + pub fn push_pai_intersection(&self, intersection: PaiIntersection) { + self.state_mut() + .pai_intersection_queue + .push_back(intersection) + } + + pub async fn next_pai_intersection(&self) -> Option { + poll_fn(|cx| { + let mut queue = &mut self.0.state.borrow_mut().pai_intersection_queue; + Pin::new(&mut queue).poll_next(cx) + }) + .await + } + + pub fn pai_intersection_stream(&self) -> PaiIntersectionStream { + PaiIntersectionStream { + session: self.clone(), + } + } + + pub async fn on_pai_intersection( + &self, + store: &Store, + intersection: PaiIntersection, + ) -> Result<(), Error> { + // TODO: Somehow getting from the BTreeMap is not working, even though the equality check + // below works as exepcted. + // let aois = self + // .0 + // .interests + // .get(&intersection.authorisation) + // .ok_or(Error::NoKnownInterestsForCapability)?; + for (authorisation, aois) in self.0.interests.iter() { + if *authorisation != intersection.authorisation { + continue; + } + let read_cap = authorisation.read_cap(); + let (our_capability_handle, message) = self.bind_and_sign_capability( + store.secrets(), + intersection.handle, + read_cap.clone(), + )?; + if let Some(message) = message { + self.send(message).await?; + } + + for area_of_interest in aois.iter().cloned() { + let msg = SetupBindAreaOfInterest { + area_of_interest, + authorisation: our_capability_handle, + }; + self.bind_area_of_interest(Scope::Ours, msg.clone(), read_cap)?; + self.send(msg).await?; + } + } + Ok(()) + } + async fn their_aoi_to_namespace_eventually( &self, handle: AreaOfInterestHandle, @@ -479,7 +578,8 @@ struct SessionState { our_uncovered_ranges: HashSet<(AreaOfInterestHandle, u64)>, their_uncovered_ranges: HashSet<(AreaOfInterestHandle, u64)>, pending_announced_entries: Option, - intersection_queue: Queue, + aoi_intersection_queue: Queue, + pai_intersection_queue: Queue, } impl SessionState { @@ -499,7 +599,8 @@ impl SessionState { our_uncovered_ranges: Default::default(), their_uncovered_ranges: Default::default(), pending_announced_entries: Default::default(), - intersection_queue: Default::default(), + aoi_intersection_queue: Default::default(), + pai_intersection_queue: Default::default(), } } @@ -523,6 +624,7 @@ impl SessionState { Scope::Theirs => &self.our_resources, }; + // TODO: If we stored the AoIs by namespace we would need to iterate less. for (candidate_handle, candidate) in other_resources.areas_of_interest.iter() { let candidate_handle = *candidate_handle; // Ignore areas without a capability. @@ -540,13 +642,13 @@ impl SessionState { Scope::Ours => (handle, candidate_handle), Scope::Theirs => (candidate_handle, handle), }; - let info = AreaOfInterestIntersection { + let info = AoiIntersection { our_handle, their_handle, intersection, namespace: namespace.into(), }; - self.intersection_queue.push_back(info); + self.aoi_intersection_queue.push_back(info); } } Ok(()) @@ -601,3 +703,20 @@ impl SessionState { range_count } } + +#[derive(Debug)] +pub struct PaiIntersectionStream { + session: Session, +} + +impl Stream for PaiIntersectionStream { + type Item = PaiIntersection; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let mut queue = &mut self.session.0.state.borrow_mut().pai_intersection_queue; + Pin::new(&mut queue).poll_next(cx) + } +}