From d5c207fa6fd4429dc7ef641669d5a6893cb9d8d5 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 30 Apr 2024 00:54:43 -0500 Subject: [PATCH 01/12] Add event interface for upgrade --- Cargo.toml | 1 + src/core.rs | 31 +++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 4c4cdad..a9c9168 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ futures = "0.3" crc32fast = "1" intmap = "2" moka = { version = "0.12", optional = true, features = ["sync"] } +tokio = { version = "1.27.0", features = ["sync"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] random-access-disk = { version = "3", default-features = false } diff --git a/src/core.rs b/src/core.rs index 886ff98..3608f57 100644 --- a/src/core.rs +++ b/src/core.rs @@ -3,8 +3,10 @@ use ed25519_dalek::Signature; use futures::future::Either; use std::convert::TryFrom; use std::fmt::Debug; +use tokio::sync::broadcast::{self, Receiver, Sender}; use tracing::instrument; +static MAX_EVENT_QUEUE_CAPACITY: usize = 32; #[cfg(feature = "cache")] use crate::common::cache::CacheOptions; use crate::{ @@ -37,6 +39,19 @@ impl HypercoreOptions { } } +#[derive(Debug)] +struct Events { + onupgrade: Sender<()>, +} + +impl Events { + fn new() -> Self { + Self { + onupgrade: broadcast::channel(MAX_EVENT_QUEUE_CAPACITY).0, + } + } +} + /// Hypercore is an append-only log structure. #[derive(Debug)] pub struct Hypercore { @@ -48,6 +63,7 @@ pub struct Hypercore { pub(crate) bitfield: Bitfield, skip_flush_count: u8, // autoFlush in Javascript header: Header, + events: Events, } /// Response from append, matches that of the Javascript result @@ -247,6 +263,7 @@ impl Hypercore { bitfield, header, skip_flush_count: 0, + events: Events::new(), }) } @@ -323,6 +340,15 @@ impl Hypercore { } } + // if we drop the receiver and there are no listeners, this will error, but we don't care + match self.events.onupgrade.send(()) { + Err(e) => { + dbg!(e); + () + } + Ok(_) => (), + } + // Return the new value Ok(AppendOutcome { length: self.tree.length, @@ -330,6 +356,11 @@ impl Hypercore { }) } + /// Subscribe to upgrade events + pub fn onupgrade(&self) -> Receiver<()> { + self.events.onupgrade.subscribe() + } + /// Read value at given index, if any. #[instrument(err, skip(self))] pub async fn get(&mut self, index: u64) -> Result>, HypercoreError> { From c7507df8abdfc6060b401466bb11bd92fa8b057b Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 1 May 2024 13:26:10 -0500 Subject: [PATCH 02/12] rm dbg --- src/core.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/core.rs b/src/core.rs index 3608f57..8fe821a 100644 --- a/src/core.rs +++ b/src/core.rs @@ -340,14 +340,9 @@ impl Hypercore { } } - // if we drop the receiver and there are no listeners, this will error, but we don't care - match self.events.onupgrade.send(()) { - Err(e) => { - dbg!(e); - () - } - Ok(_) => (), - } + // NB: send() returns an error when there are no receivers. Which is the case when there is + // no replication. We ignore the error. No recievers is ok. + let _ = self.events.onupgrade.send(()); // Return the new value Ok(AppendOutcome { From 5528469ce3b617ef1b157d85e77a9ed1939c9c0b Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Fri, 3 May 2024 23:26:56 -0500 Subject: [PATCH 03/12] feature-ify events with tokio --- Cargo.toml | 4 ++-- src/core.rs | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a9c9168..055a265 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ futures = "0.3" crc32fast = "1" intmap = "2" moka = { version = "0.12", optional = true, features = ["sync"] } -tokio = { version = "1.27.0", features = ["sync"] } +tokio = { version = "1.27.0", features = ["sync"], optional = true} [target.'cfg(not(target_arch = "wasm32"))'.dependencies] random-access-disk = { version = "3", default-features = false } @@ -62,7 +62,7 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } [features] default = ["async-std", "sparse"] sparse = ["random-access-disk/sparse"] -tokio = ["random-access-disk/tokio"] +tokio = ["dep:tokio", "random-access-disk/tokio"] async-std = ["random-access-disk/async-std"] cache = ["moka"] # Used only in interoperability tests under tests/js-interop which use the javascript version of hypercore diff --git a/src/core.rs b/src/core.rs index 8fe821a..e242352 100644 --- a/src/core.rs +++ b/src/core.rs @@ -3,6 +3,7 @@ use ed25519_dalek::Signature; use futures::future::Either; use std::convert::TryFrom; use std::fmt::Debug; +#[cfg(feature = "tokio")] use tokio::sync::broadcast::{self, Receiver, Sender}; use tracing::instrument; @@ -39,11 +40,13 @@ impl HypercoreOptions { } } +#[cfg(feature = "tokio")] #[derive(Debug)] struct Events { onupgrade: Sender<()>, } +#[cfg(feature = "tokio")] impl Events { fn new() -> Self { Self { @@ -63,6 +66,7 @@ pub struct Hypercore { pub(crate) bitfield: Bitfield, skip_flush_count: u8, // autoFlush in Javascript header: Header, + #[cfg(feature = "tokio")] events: Events, } @@ -263,6 +267,7 @@ impl Hypercore { bitfield, header, skip_flush_count: 0, + #[cfg(feature = "tokio")] events: Events::new(), }) } @@ -342,6 +347,7 @@ impl Hypercore { // NB: send() returns an error when there are no receivers. Which is the case when there is // no replication. We ignore the error. No recievers is ok. + #[cfg(feature = "tokio")] let _ = self.events.onupgrade.send(()); // Return the new value @@ -351,6 +357,7 @@ impl Hypercore { }) } + #[cfg(feature = "tokio")] /// Subscribe to upgrade events pub fn onupgrade(&self) -> Receiver<()> { self.events.onupgrade.subscribe() From 905ace8eb5a5e1f12403aa8b2ec43f2fc929c2e1 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sun, 12 May 2024 17:11:44 -0500 Subject: [PATCH 04/12] s/onupgrade/on_upgrade/g --- src/core.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/core.rs b/src/core.rs index e242352..7559dbb 100644 --- a/src/core.rs +++ b/src/core.rs @@ -43,15 +43,14 @@ impl HypercoreOptions { #[cfg(feature = "tokio")] #[derive(Debug)] struct Events { - onupgrade: Sender<()>, + on_upgrade: Sender<()>, } #[cfg(feature = "tokio")] impl Events { fn new() -> Self { - Self { - onupgrade: broadcast::channel(MAX_EVENT_QUEUE_CAPACITY).0, - } + let (sender, _) = broadcast::channel(MAX_EVENT_QUEUE_CAPACITY); + Self { on_upgrade: sender } } } @@ -348,7 +347,7 @@ impl Hypercore { // NB: send() returns an error when there are no receivers. Which is the case when there is // no replication. We ignore the error. No recievers is ok. #[cfg(feature = "tokio")] - let _ = self.events.onupgrade.send(()); + let _ = self.events.on_upgrade.send(()); // Return the new value Ok(AppendOutcome { @@ -359,8 +358,8 @@ impl Hypercore { #[cfg(feature = "tokio")] /// Subscribe to upgrade events - pub fn onupgrade(&self) -> Receiver<()> { - self.events.onupgrade.subscribe() + pub fn on_upgrade(&self) -> Receiver<()> { + self.events.on_upgrade.subscribe() } /// Read value at given index, if any. From 30aec63e59b7fd6e0cd7e95124975a96ba69da14 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Thu, 27 Jun 2024 16:40:21 -0700 Subject: [PATCH 05/12] fix unused warning --- src/core.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core.rs b/src/core.rs index 7559dbb..a90dd3c 100644 --- a/src/core.rs +++ b/src/core.rs @@ -7,6 +7,7 @@ use std::fmt::Debug; use tokio::sync::broadcast::{self, Receiver, Sender}; use tracing::instrument; +#[cfg(feature = "tokio")] static MAX_EVENT_QUEUE_CAPACITY: usize = 32; #[cfg(feature = "cache")] use crate::common::cache::CacheOptions; From 7d3d83ebb7196e8ff5d130ea2d81c84e0993afbb Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sat, 10 Aug 2024 17:22:28 -0400 Subject: [PATCH 06/12] start documenting Node struct --- src/common/node.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/common/node.rs b/src/common/node.rs index 7e339d3..c0a9dfc 100644 --- a/src/common/node.rs +++ b/src/common/node.rs @@ -20,10 +20,15 @@ pub(crate) struct NodeByteRange { // disk. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Node { + /// TODO document me pub(crate) index: u64, + /// hash of the data in this node pub(crate) hash: Vec, + /// number of bytes of the data pub(crate) length: u64, pub(crate) parent: u64, + /// The data. Other metadata in this struct is provided before the actual data. + /// so it is optional pub(crate) data: Option>, pub(crate) blank: bool, } From 1aadca965c491e1a97795ad88c931688655b56f5 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sun, 11 Aug 2024 15:51:48 -0400 Subject: [PATCH 07/12] tokio rt needed --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 055a265..32faed3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ futures = "0.3" crc32fast = "1" intmap = "2" moka = { version = "0.12", optional = true, features = ["sync"] } -tokio = { version = "1.27.0", features = ["sync"], optional = true} +tokio = { version = "1.27.0", features = ["rt", "sync"], optional = true} [target.'cfg(not(target_arch = "wasm32"))'.dependencies] random-access-disk = { version = "3", default-features = false } From 85af76cfc7ff5d0c7b1a447f5c1de48b3c755ac2 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sun, 11 Aug 2024 16:02:40 -0400 Subject: [PATCH 08/12] docs --- src/common/peer.rs | 6 +++--- src/core.rs | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/common/peer.rs b/src/common/peer.rs index c71b981..899de14 100644 --- a/src/common/peer.rs +++ b/src/common/peer.rs @@ -104,11 +104,11 @@ pub struct DataSeek { #[derive(Debug, Clone, PartialEq)] /// TODO: Document pub struct DataUpgrade { - /// TODO: Document + /// starting block of this upgrade response pub start: u64, - /// TODO: Document + /// number of blocks in this upgrade response pub length: u64, - /// TODO: Document + /// the metadata nodes? pub nodes: Vec, /// TODO: Document pub additional_nodes: Vec, diff --git a/src/core.rs b/src/core.rs index a90dd3c..881c6e1 100644 --- a/src/core.rs +++ b/src/core.rs @@ -44,7 +44,11 @@ impl HypercoreOptions { #[cfg(feature = "tokio")] #[derive(Debug)] struct Events { + /// Sends a notification to the replicator that core is upgraded on_upgrade: Sender<()>, + /// Notify receiver to get block over the network. + /// maybe should not return the block itself + on_get: Sender<(u64, Sender<()>)>, } #[cfg(feature = "tokio")] From 7d84e1f1a2bef9c8a360b14eb448dd99c6cb44a5 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sun, 11 Aug 2024 16:03:01 -0400 Subject: [PATCH 09/12] Add on_get events --- src/core.rs | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/src/core.rs b/src/core.rs index 881c6e1..8d289c0 100644 --- a/src/core.rs +++ b/src/core.rs @@ -41,21 +41,21 @@ impl HypercoreOptions { } } -#[cfg(feature = "tokio")] #[derive(Debug)] +#[cfg(feature = "tokio")] struct Events { /// Sends a notification to the replicator that core is upgraded on_upgrade: Sender<()>, /// Notify receiver to get block over the network. - /// maybe should not return the block itself on_get: Sender<(u64, Sender<()>)>, } #[cfg(feature = "tokio")] impl Events { fn new() -> Self { - let (sender, _) = broadcast::channel(MAX_EVENT_QUEUE_CAPACITY); - Self { on_upgrade: sender } + let (on_upgrade, _) = broadcast::channel(MAX_EVENT_QUEUE_CAPACITY); + let (on_get, _) = broadcast::channel(MAX_EVENT_QUEUE_CAPACITY); + Self { on_upgrade, on_get } } } @@ -367,10 +367,34 @@ impl Hypercore { self.events.on_upgrade.subscribe() } + #[cfg(feature = "tokio")] + /// Notify listeners of a get request + pub fn on_get(&self, index: u64) -> Receiver<()> { + let (tx, rx) = broadcast::channel(1); + let _ = self.events.on_get.send((index, tx)); + rx + } + + #[cfg(feature = "tokio")] + /// Subscribe to `.get` requests + pub fn on_get_subscribe(&self) -> Receiver<(u64, Sender<()>)> { + self.events.on_get.subscribe() + } + /// Read value at given index, if any. #[instrument(err, skip(self))] pub async fn get(&mut self, index: u64) -> Result>, HypercoreError> { if !self.bitfield.get(index) { + // not in this core + // try getting it over the network + #[cfg(feature = "tokio")] + { + let mut rx = self.on_get(index); + //let res = rx.recv().await.unwrap(); + tokio::spawn(async move { + rx.recv().await; + }); + } return Ok(None); } From 111d994d93c23036f9ddc022166c3dc5b3874d01 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sun, 11 Aug 2024 16:08:16 -0400 Subject: [PATCH 10/12] Add Display helper for DataUpgrade --- src/common/peer.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/common/peer.rs b/src/common/peer.rs index 899de14..11de57d 100644 --- a/src/common/peer.rs +++ b/src/common/peer.rs @@ -1,6 +1,8 @@ //! Types needed for passing information with with peers. //! hypercore-protocol-rs uses these types and wraps them //! into wire messages. +use std::fmt::Display; + use crate::Node; #[derive(Debug, Clone, PartialEq)] @@ -115,3 +117,16 @@ pub struct DataUpgrade { /// TODO: Document pub signature: Vec, } + +impl Display for DataUpgrade { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DataUpgrade(start: {}, length: {}, nodes: {:?}, .., signature: {})", + self.start, + self.length, + self.nodes, + self.signature.len() + ) + } +} From 89bdc161fa73810af55f746720c2c29a7a3721a3 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sun, 11 Aug 2024 16:22:14 -0400 Subject: [PATCH 11/12] fix warnings --- src/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core.rs b/src/core.rs index 8d289c0..a950dc9 100644 --- a/src/core.rs +++ b/src/core.rs @@ -392,7 +392,7 @@ impl Hypercore { let mut rx = self.on_get(index); //let res = rx.recv().await.unwrap(); tokio::spawn(async move { - rx.recv().await; + let _ = rx.recv().await; }); } return Ok(None); From 5036133c848947c160438c2a35d32c6e3583fc99 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sat, 17 Aug 2024 23:47:56 -0400 Subject: [PATCH 12/12] use tokio instead of async-std by default --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 32faed3..a2f4c44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"] tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } [features] -default = ["async-std", "sparse"] +default = ["tokio", "sparse"] sparse = ["random-access-disk/sparse"] tokio = ["dep:tokio", "random-access-disk/tokio"] async-std = ["random-access-disk/async-std"]