Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit events to be used for replication #140

Closed
wants to merge 12 commits into from
Closed
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ futures = "0.3"
crc32fast = "1"
intmap = "2"
moka = { version = "0.12", optional = true, features = ["sync"] }
tokio = { version = "1.27.0", features = ["rt", "sync"], optional = true}

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
random-access-disk = { version = "3", default-features = false }
Expand All @@ -59,9 +60,9 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"]
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }

[features]
default = ["async-std", "sparse"]
default = ["tokio", "sparse"]
sparse = ["random-access-disk/sparse"]
tokio = ["random-access-disk/tokio"]
tokio = ["dep:tokio", "random-access-disk/tokio"]
async-std = ["random-access-disk/async-std"]
cache = ["moka"]
# Used only in interoperability tests under tests/js-interop which use the javascript version of hypercore
Expand Down
5 changes: 5 additions & 0 deletions src/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ pub(crate) struct NodeByteRange {
// disk.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Node {
/// TODO document me
pub(crate) index: u64,
/// hash of the data in this node
pub(crate) hash: Vec<u8>,
/// number of bytes of the data
pub(crate) length: u64,
pub(crate) parent: u64,
/// The data. Other metadata in this struct is provided before the actual data.
/// so it is optional
pub(crate) data: Option<Vec<u8>>,
pub(crate) blank: bool,
}
Expand Down
21 changes: 18 additions & 3 deletions src/common/peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Types needed for passing information with with peers.
//! hypercore-protocol-rs uses these types and wraps them
//! into wire messages.
use std::fmt::Display;

use crate::Node;

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -104,14 +106,27 @@ pub struct DataSeek {
#[derive(Debug, Clone, PartialEq)]
/// TODO: Document
pub struct DataUpgrade {
/// TODO: Document
/// starting block of this upgrade response
pub start: u64,
/// TODO: Document
/// number of blocks in this upgrade response
pub length: u64,
/// TODO: Document
/// the metadata nodes?
pub nodes: Vec<Node>,
/// TODO: Document
pub additional_nodes: Vec<Node>,
/// TODO: Document
pub signature: Vec<u8>,
}

impl Display for DataUpgrade {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DataUpgrade(start: {}, length: {}, nodes: {:?}, .., signature: {})",
self.start,
self.length,
self.nodes,
self.signature.len()
)
}
}
61 changes: 61 additions & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use ed25519_dalek::Signature;
use futures::future::Either;
use std::convert::TryFrom;
use std::fmt::Debug;
#[cfg(feature = "tokio")]
use tokio::sync::broadcast::{self, Receiver, Sender};
use tracing::instrument;

#[cfg(feature = "tokio")]
static MAX_EVENT_QUEUE_CAPACITY: usize = 32;
#[cfg(feature = "cache")]
use crate::common::cache::CacheOptions;
use crate::{
Expand Down Expand Up @@ -37,6 +41,24 @@ impl HypercoreOptions {
}
}

#[derive(Debug)]
#[cfg(feature = "tokio")]
struct Events {
/// Sends a notification to the replicator that core is upgraded
on_upgrade: Sender<()>,
/// Notify receiver to get block over the network.
on_get: Sender<(u64, Sender<()>)>,
}

#[cfg(feature = "tokio")]
impl Events {
fn new() -> Self {
let (on_upgrade, _) = broadcast::channel(MAX_EVENT_QUEUE_CAPACITY);
let (on_get, _) = broadcast::channel(MAX_EVENT_QUEUE_CAPACITY);
Self { on_upgrade, on_get }
}
}

/// Hypercore is an append-only log structure.
#[derive(Debug)]
pub struct Hypercore {
Expand All @@ -48,6 +70,8 @@ pub struct Hypercore {
pub(crate) bitfield: Bitfield,
skip_flush_count: u8, // autoFlush in Javascript
header: Header,
#[cfg(feature = "tokio")]
events: Events,
}

/// Response from append, matches that of the Javascript result
Expand Down Expand Up @@ -247,6 +271,8 @@ impl Hypercore {
bitfield,
header,
skip_flush_count: 0,
#[cfg(feature = "tokio")]
events: Events::new(),
})
}

Expand Down Expand Up @@ -323,17 +349,52 @@ impl Hypercore {
}
}

// NB: send() returns an error when there are no receivers. Which is the case when there is
// no replication. We ignore the error. No recievers is ok.
#[cfg(feature = "tokio")]
let _ = self.events.on_upgrade.send(());

// Return the new value
Ok(AppendOutcome {
length: self.tree.length,
byte_length: self.tree.byte_length,
})
}

#[cfg(feature = "tokio")]
/// Subscribe to upgrade events
pub fn on_upgrade(&self) -> Receiver<()> {
self.events.on_upgrade.subscribe()
}

#[cfg(feature = "tokio")]
/// Notify listeners of a get request
pub fn on_get(&self, index: u64) -> Receiver<()> {
let (tx, rx) = broadcast::channel(1);
let _ = self.events.on_get.send((index, tx));
rx
}

#[cfg(feature = "tokio")]
/// Subscribe to `.get` requests
pub fn on_get_subscribe(&self) -> Receiver<(u64, Sender<()>)> {
self.events.on_get.subscribe()
}

/// Read value at given index, if any.
#[instrument(err, skip(self))]
pub async fn get(&mut self, index: u64) -> Result<Option<Vec<u8>>, HypercoreError> {
if !self.bitfield.get(index) {
// not in this core
// try getting it over the network
#[cfg(feature = "tokio")]
{
let mut rx = self.on_get(index);
//let res = rx.recv().await.unwrap();
tokio::spawn(async move {
let _ = rx.recv().await;
});
}
return Ok(None);
}

Expand Down
Loading