diff --git a/Cargo.toml b/Cargo.toml index 4c4cdad..a2f4c44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ futures = "0.3" crc32fast = "1" intmap = "2" moka = { version = "0.12", optional = true, features = ["sync"] } +tokio = { version = "1.27.0", features = ["rt", "sync"], optional = true} [target.'cfg(not(target_arch = "wasm32"))'.dependencies] random-access-disk = { version = "3", default-features = false } @@ -59,9 +60,9 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"] tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } [features] -default = ["async-std", "sparse"] +default = ["tokio", "sparse"] sparse = ["random-access-disk/sparse"] -tokio = ["random-access-disk/tokio"] +tokio = ["dep:tokio", "random-access-disk/tokio"] async-std = ["random-access-disk/async-std"] cache = ["moka"] # Used only in interoperability tests under tests/js-interop which use the javascript version of hypercore diff --git a/src/common/node.rs b/src/common/node.rs index 7e339d3..c0a9dfc 100644 --- a/src/common/node.rs +++ b/src/common/node.rs @@ -20,10 +20,15 @@ pub(crate) struct NodeByteRange { // disk. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Node { + /// TODO document me pub(crate) index: u64, + /// hash of the data in this node pub(crate) hash: Vec, + /// number of bytes of the data pub(crate) length: u64, pub(crate) parent: u64, + /// The data. Other metadata in this struct is provided before the actual data. + /// so it is optional pub(crate) data: Option>, pub(crate) blank: bool, } diff --git a/src/common/peer.rs b/src/common/peer.rs index c71b981..11de57d 100644 --- a/src/common/peer.rs +++ b/src/common/peer.rs @@ -1,6 +1,8 @@ //! Types needed for passing information with with peers. //! hypercore-protocol-rs uses these types and wraps them //! into wire messages. +use std::fmt::Display; + use crate::Node; #[derive(Debug, Clone, PartialEq)] @@ -104,14 +106,27 @@ pub struct DataSeek { #[derive(Debug, Clone, PartialEq)] /// TODO: Document pub struct DataUpgrade { - /// TODO: Document + /// starting block of this upgrade response pub start: u64, - /// TODO: Document + /// number of blocks in this upgrade response pub length: u64, - /// TODO: Document + /// the metadata nodes? pub nodes: Vec, /// TODO: Document pub additional_nodes: Vec, /// TODO: Document pub signature: Vec, } + +impl Display for DataUpgrade { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DataUpgrade(start: {}, length: {}, nodes: {:?}, .., signature: {})", + self.start, + self.length, + self.nodes, + self.signature.len() + ) + } +} diff --git a/src/core.rs b/src/core.rs index 886ff98..a950dc9 100644 --- a/src/core.rs +++ b/src/core.rs @@ -3,8 +3,12 @@ use ed25519_dalek::Signature; use futures::future::Either; use std::convert::TryFrom; use std::fmt::Debug; +#[cfg(feature = "tokio")] +use tokio::sync::broadcast::{self, Receiver, Sender}; use tracing::instrument; +#[cfg(feature = "tokio")] +static MAX_EVENT_QUEUE_CAPACITY: usize = 32; #[cfg(feature = "cache")] use crate::common::cache::CacheOptions; use crate::{ @@ -37,6 +41,24 @@ impl HypercoreOptions { } } +#[derive(Debug)] +#[cfg(feature = "tokio")] +struct Events { + /// Sends a notification to the replicator that core is upgraded + on_upgrade: Sender<()>, + /// Notify receiver to get block over the network. + on_get: Sender<(u64, Sender<()>)>, +} + +#[cfg(feature = "tokio")] +impl Events { + fn new() -> Self { + let (on_upgrade, _) = broadcast::channel(MAX_EVENT_QUEUE_CAPACITY); + let (on_get, _) = broadcast::channel(MAX_EVENT_QUEUE_CAPACITY); + Self { on_upgrade, on_get } + } +} + /// Hypercore is an append-only log structure. #[derive(Debug)] pub struct Hypercore { @@ -48,6 +70,8 @@ pub struct Hypercore { pub(crate) bitfield: Bitfield, skip_flush_count: u8, // autoFlush in Javascript header: Header, + #[cfg(feature = "tokio")] + events: Events, } /// Response from append, matches that of the Javascript result @@ -247,6 +271,8 @@ impl Hypercore { bitfield, header, skip_flush_count: 0, + #[cfg(feature = "tokio")] + events: Events::new(), }) } @@ -323,6 +349,11 @@ impl Hypercore { } } + // NB: send() returns an error when there are no receivers. Which is the case when there is + // no replication. We ignore the error. No recievers is ok. + #[cfg(feature = "tokio")] + let _ = self.events.on_upgrade.send(()); + // Return the new value Ok(AppendOutcome { length: self.tree.length, @@ -330,10 +361,40 @@ impl Hypercore { }) } + #[cfg(feature = "tokio")] + /// Subscribe to upgrade events + pub fn on_upgrade(&self) -> Receiver<()> { + self.events.on_upgrade.subscribe() + } + + #[cfg(feature = "tokio")] + /// Notify listeners of a get request + pub fn on_get(&self, index: u64) -> Receiver<()> { + let (tx, rx) = broadcast::channel(1); + let _ = self.events.on_get.send((index, tx)); + rx + } + + #[cfg(feature = "tokio")] + /// Subscribe to `.get` requests + pub fn on_get_subscribe(&self) -> Receiver<(u64, Sender<()>)> { + self.events.on_get.subscribe() + } + /// Read value at given index, if any. #[instrument(err, skip(self))] pub async fn get(&mut self, index: u64) -> Result>, HypercoreError> { if !self.bitfield.get(index) { + // not in this core + // try getting it over the network + #[cfg(feature = "tokio")] + { + let mut rx = self.on_get(index); + //let res = rx.recv().await.unwrap(); + tokio::spawn(async move { + let _ = rx.recv().await; + }); + } return Ok(None); }