Skip to content

Commit

Permalink
feat: private area intersection
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 27, 2024
1 parent 4ab46d6 commit b5a2ddf
Show file tree
Hide file tree
Showing 14 changed files with 914 additions and 105 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions iroh-willow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ tokio-util = { version = "0.7", features = ["io-util", "io"] }
tracing = "0.1"
zerocopy = { version = "0.8.0-alpha.9", features = ["derive"] }
hex = "0.4.3"
curve25519-dalek = { version = "4.1.3", features = ["digest", "rand_core", "serde"] }
sha2 = "0.10.8"

[dev-dependencies]
iroh-test = { path = "../iroh-test" }
Expand Down
10 changes: 8 additions & 2 deletions iroh-willow/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,18 @@ impl<S: Storage> Actor<S> {
} => {
let Channels { send, recv } = channels;
let id = self.next_session_id();
let session = Session::new(id, init.mode, our_role, send, initial_transmission);
let session =
Session::new(&self.store, id, our_role, send, init, initial_transmission);
let session = match session {
Ok(session) => session,
Err(err) => return send_reply(reply, Err(err.into())),
};

let store = self.store.clone();
let cancel_token = CancellationToken::new();

let future = session
.run(store, recv, init, cancel_token.clone())
.run(store, recv, cancel_token.clone())
.instrument(error_span!("session", peer = %peer.fmt_short()));
let task_key = self.session_tasks.spawn_local(id, future);

Expand Down Expand Up @@ -481,6 +486,7 @@ impl<S: Storage> Actor<S> {
fn complete_session(&mut self, session_id: &SessionId, result: Result<(), Error>) {
let session = self.sessions.remove(session_id);
if let Some(session) = session {
debug!(?session, ?result, "complete session");
session.on_finish.send(result).ok();
self.session_tasks.remove(&session.task_key);
} else {
Expand Down
11 changes: 7 additions & 4 deletions iroh-willow/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeSet, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
sync::{Arc, RwLock},
};

Expand All @@ -19,6 +19,8 @@ use crate::{
store::traits::{SecretStorage, SecretStoreError, Storage},
};

pub type InterestMap = BTreeMap<ReadAuthorisation, BTreeSet<AreaOfInterest>>;

#[derive(Debug, Clone)]
pub struct DelegateTo {
pub user: UserId,
Expand Down Expand Up @@ -220,7 +222,7 @@ impl<S: Storage> Auth<S> {
pub fn find_read_caps_for_interests(
&self,
interests: Interests,
) -> Result<HashMap<ReadAuthorisation, BTreeSet<AreaOfInterest>>, AuthError> {
) -> Result<InterestMap, AuthError> {
match interests {
Interests::All => {
let out = self
Expand All @@ -230,11 +232,12 @@ impl<S: Storage> Auth<S> {
let aoi = AreaOfInterest::new(area);
(auth, BTreeSet::from_iter([aoi]))
})
.collect::<HashMap<_, _>>();
.collect::<BTreeMap<_, _>>();
Ok(out)
}
Interests::Some(interests) => {
let mut out: HashMap<ReadAuthorisation, BTreeSet<AreaOfInterest>> = HashMap::new();
let mut out: BTreeMap<ReadAuthorisation, BTreeSet<AreaOfInterest>> =
BTreeMap::new();
for (cap_selector, aoi_selector) in interests {
let cap = self.get_read_cap(&cap_selector)?;
if let Some(cap) = cap {
Expand Down
13 changes: 8 additions & 5 deletions iroh-willow/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ impl SessionHandle {
pub async fn join(&mut self) -> anyhow::Result<()> {
let session_res = self.handle.on_finish().await;
let net_tasks_res = join_all(&mut self.tasks).await;
session_res.or(net_tasks_res)
match session_res {
Err(err) => Err(err.into()),
Ok(()) => net_tasks_res,
}
}
}

Expand Down Expand Up @@ -232,7 +235,7 @@ async fn recv_loop(mut recv_stream: RecvStream, mut channel_writer: Writer) -> a
let max_buffer_size = channel_writer.max_buffer_size();
while let Some(buf) = recv_stream.read_chunk(max_buffer_size, true).await? {
channel_writer.write_all(&buf.bytes[..]).await?;
trace!(len = buf.bytes.len(), "recv");
// trace!(len = buf.bytes.len(), "recv");
}
channel_writer.close();
trace!("close");
Expand All @@ -241,9 +244,9 @@ async fn recv_loop(mut recv_stream: RecvStream, mut channel_writer: Writer) -> a

async fn send_loop(mut send_stream: SendStream, channel_reader: Reader) -> anyhow::Result<()> {
while let Some(data) = channel_reader.read_bytes().await {
let len = data.len();
// let len = data.len();
send_stream.write_chunk(data).await?;
trace!(len, "sent");
// trace!(len, "sent");
}
send_stream.finish().await?;
trace!("close");
Expand Down Expand Up @@ -279,7 +282,7 @@ async fn join_all(join_set: &mut JoinSet<anyhow::Result<()>>) -> anyhow::Result<
let mut joined = 0;
while let Some(res) = join_set.join_next().await {
joined += 1;
tracing::trace!("joined {joined} tasks, remaining {}", join_set.len());
trace!("joined {joined} tasks, remaining {}", join_set.len());
let res = match res {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Expand Down
29 changes: 17 additions & 12 deletions iroh-willow/src/proto/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::util::codec::{DecodeOutcome, Decoder, Encoder};
use super::{
grouping::{Area, AreaOfInterest, ThreeDRange},
meadowcap,
willow::{Entry, DIGEST_LENGTH},
willow::{Entry, NamespaceId, DIGEST_LENGTH},
};

pub const MAX_PAYLOAD_SIZE_POWER: u8 = 12;
Expand Down Expand Up @@ -55,7 +55,7 @@ pub type SyncSignature = meadowcap::UserSignature;
pub type Receiver = meadowcap::UserPublicKey;

/// Represents an authorisation to read an area of data in a Namespace.
#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct ReadAuthorisation(pub ReadCapability, pub Option<SubspaceCapability>);

impl From<ReadCapability> for ReadAuthorisation {
Expand All @@ -76,6 +76,10 @@ impl ReadAuthorisation {
pub fn subspace_cap(&self) -> Option<&SubspaceCapability> {
self.1.as_ref()
}

pub fn namespace(&self) -> NamespaceId {
self.0.granted_namespace().id()
}
}

/// The different resource handles employed by the WGPS.
Expand Down Expand Up @@ -304,7 +308,7 @@ pub enum Message {
#[debug("{:?}", _0)]
PaiRequestSubspaceCapability(PaiRequestSubspaceCapability),
#[debug("{:?}", _0)]
PaiReplySubspaceCapability(PaiReplySubspaceCapability),
PaiReplySubspaceCapability(Box<PaiReplySubspaceCapability>),
#[debug("{:?}", _0)]
SetupBindStaticToken(SetupBindStaticToken),
#[debug("{:?}", _0)]
Expand Down Expand Up @@ -873,39 +877,40 @@ pub struct ControlFreeHandle {
handle_type: HandleType,
}

type PsiGroup = ();
pub type PsiGroupBytes = [u8; 32];

/// Bind data to an IntersectionHandle for performing private area intersection.
#[derive(Debug, Serialize, Deserialize)]
pub struct PaiBindFragment {
/// The result of first applying hash_into_group to some fragment for private area intersection and then performing scalar multiplication with scalar.
group_member: PsiGroup,
pub group_member: PsiGroupBytes,
/// Set to true if the private set intersection item is a secondary fragment.
is_secondary: bool,
pub is_secondary: bool,
}

/// Finalise private set intersection for a single item.
#[derive(Debug, Serialize, Deserialize)]
pub struct PaiReplyFragment {
/// The IntersectionHandle of the PaiBindFragment message which this finalises.
handle: IntersectionHandle,
pub handle: IntersectionHandle,
/// The result of performing scalar multiplication between the group_member of the message that this is replying to and scalar.
group_member: PsiGroup,
pub group_member: PsiGroupBytes,
}

/// Ask the receiver to send a SubspaceCapability.
#[derive(Debug, Serialize, Deserialize)]
pub struct PaiRequestSubspaceCapability {
/// The IntersectionHandle bound by the sender for the least-specific secondary fragment for whose NamespaceId to request the SubspaceCapability.
handle: IntersectionHandle,
pub handle: IntersectionHandle,
}

/// Send a previously requested SubspaceCapability.
#[derive(Debug, Serialize, Deserialize)]
pub struct PaiReplySubspaceCapability {
/// The handle of the PaiRequestSubspaceCapability message that this answers (hence, an IntersectionHandle bound by the receiver of this message).
handle: IntersectionHandle,
pub handle: IntersectionHandle,
/// A SubspaceCapability whose granted namespace corresponds to the request this answers.
capability: SubspaceCapability,
pub capability: SubspaceCapability,
/// The SyncSubspaceSignature issued by the receiver of the capability over the sender’s challenge.
signature: SyncSignature,
pub signature: SyncSignature,
}
13 changes: 13 additions & 0 deletions iroh-willow/src/proto/willow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ impl Path {
Path(path)
}

pub fn from_components(components: &[Component]) -> Self {
let path: Arc<[Component]> = components.to_vec().into();
Self(path)
}

pub fn validate(components: &[&[u8]]) -> Result<(), InvalidPath> {
if components.len() > MAX_COMPONENT_COUNT {
return Err(InvalidPath::TooManyComponents);
Expand Down Expand Up @@ -146,6 +151,14 @@ impl Path {
let start = count.min(self.len());
Self::new_unchecked(self[start..].to_vec())
}

pub fn component_count(&self) -> usize {
self.0.len()
}

pub fn components(&self) -> &[Component] {
&self.0
}
}

impl std::ops::Deref for Path {
Expand Down
3 changes: 2 additions & 1 deletion iroh-willow/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
pub mod channels;
mod data;
mod error;
mod pai;
mod payload;
mod reconciler;
mod resource;
Expand Down Expand Up @@ -113,7 +114,7 @@ pub enum Scope {

/// Intersection between two areas of interest.
#[derive(Debug, Clone)]
pub struct AreaOfInterestIntersection {
pub struct AoiIntersection {
pub our_handle: AreaOfInterestHandle,
pub their_handle: AreaOfInterestHandle,
pub intersection: Area,
Expand Down
11 changes: 11 additions & 0 deletions iroh-willow/src/session/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
sync::ResourceHandle,
willow::Unauthorised,
},
session::{pai::PaiError, resource::MissingResource},
store::traits::SecretStoreError,
util::channel::{ReadError, WriteError},
};
Expand Down Expand Up @@ -64,6 +65,10 @@ pub enum Error {
MissingUserKey(UserId),
#[error("a task failed to join")]
TaskFailed(#[from] tokio::task::JoinError),
#[error("no known interests for given capability")]
NoKnownInterestsForCapability,
#[error("private area intersection error: {0}")]
Pai(#[from] PaiError),
}

impl From<Unauthorised> for Error {
Expand All @@ -88,3 +93,9 @@ impl From<meadowcap::InvalidParams> for Error {
Self::InvalidParameters("")
}
}

impl From<MissingResource> for Error {
fn from(value: MissingResource) -> Self {
Self::MissingResource(value.0)
}
}
Loading

0 comments on commit b5a2ddf

Please sign in to comment.