Skip to content

Commit

Permalink
feat: anchor service (part 2) (#476)
Browse files Browse the repository at this point in the history
* feat: anchor service (part 2)

- Build Merkle Tree
- Build Time Events

* fix: comments

* chore: update deps

---------

Co-authored-by: Aaron D Goldman <[email protected]>
  • Loading branch information
smrz2001 and AaronGoldman authored Sep 6, 2024
1 parent b4ab3b6 commit c447c19
Show file tree
Hide file tree
Showing 15 changed files with 532 additions and 4 deletions.
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Pick two reviewers for each top level directory:
# the owner and someone from the eng-reviewers team.
/.github/ @smrz2001 @ceramicnetwork/eng-reviewers
/anchor-service/ @AaronGoldman @ceramicnetwork/eng-reviewers
/api-server/ @dav1do @ceramicnetwork/eng-reviewers
/api/ @dav1do @ceramicnetwork/eng-reviewers
/beetle/ @nathanielc @ceramicnetwork/eng-reviewers
Expand Down
18 changes: 17 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resolver = "2"
members = [
"api",
"api-server",
"anchor-service",
"car",
"core",
"event",
Expand Down Expand Up @@ -53,6 +54,7 @@ bs58 = "0.4"
bytecheck = "0.6.7"
bytes = "1.1"
bytesize = "1.1"
ceramic-anchor-service = { path = "./anchor-service" }
ceramic-api = { path = "./api" }
ceramic-api-server = { path = "./api-server" }
ceramic-car = { path = "./car" }
Expand Down Expand Up @@ -101,6 +103,7 @@ http-serde = "1.1"
humansize = "2"
hyper = { version = "0.14", features = ["full"] }
ignore = "0.4.18"
indexmap = "2.3.0"
indicatif = "0.17.1"
integer-encoding = "3.0"
ipld-core = "0.4"
Expand All @@ -110,7 +113,7 @@ iroh-p2p = { version = "0.2.0", path = "./beetle/iroh-p2p" }
iroh-rpc-client = { path = "./beetle/iroh-rpc-client" }
iroh-rpc-types = { path = "./beetle/iroh-rpc-types" }
iroh-util = { path = "./beetle/iroh-util" }
itertools = "0.12.0"
itertools = "0.13.0"
k256 = "0.13"
keyed_priority_queue = "0.4.1"
lazy_static = "1.4"
Expand Down Expand Up @@ -226,6 +229,9 @@ authors = [
"Danny Browning <[email protected]>",
"Nathaniel Cook <[email protected]>",
"Aaron D Goldman <[email protected]>",
"Mohsin Zaidi <@smrz2001>",
"David Estes <@dav1do>",
"Spencer T Brody <@stbrody>"
]
license = "Apache-2.0/MIT"
repository = "https://github.com/3box/rust-ceramic"
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ MANUAL_DEPLOY ?= false
TEST_SELECTOR ?= .

.PHONY: all
all: build check-fmt check-clippy check-deps test
all: check-deps check-fmt check-clippy build test

.PHONY: build
build:
Expand Down
23 changes: 23 additions & 0 deletions anchor-service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "ceramic-anchor-service"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
publish = false

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
ceramic-core.workspace = true
ceramic-event.workspace = true
cid.workspace = true
expect-test.workspace = true
indexmap.workspace = true
serde.workspace = true
serde_ipld_dagjson.workspace = true
tokio.workspace = true

[features]
test-network = []
90 changes: 90 additions & 0 deletions anchor-service/src/anchor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use cid::Cid;
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};

use ceramic_event::unvalidated::{Proof, RawTimeEvent};

/// AnchorRequest for a Data Event on a Stream
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AnchorRequest {
/// The CID of the stream
pub id: Cid,
/// The CID of the Event to be anchored
pub prev: Cid,
}

/// Merkle tree node
pub type MerkleNode = Vec<Option<Cid>>;

/// A collection of Merkle tree nodes.
#[derive(Default)]
pub struct MerkleNodes {
/// This is a map from CIDs to Merkle Tree nodes that have those CIDs.
/// We are using an IndexMap to keep the block in insert order.
/// This keeps the remote and local block together for easier debugging.
nodes: IndexMap<Cid, MerkleNode>,
}

impl MerkleNodes {
/// Extend one map of MerkleNodes with another
pub fn extend(&mut self, other: MerkleNodes) {
self.nodes.extend(other.nodes);
}

/// Insert a new MerkleNode into the map
pub fn insert(&mut self, key: Cid, value: MerkleNode) {
self.nodes.insert(key, value);
}

/// Return an iterator over the MerkleNodes
pub fn iter(&self) -> indexmap::map::Iter<Cid, MerkleNode> {
self.nodes.iter()
}
}

impl std::fmt::Debug for MerkleNodes {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_map()
.entries(self.nodes.iter().map(|(k, v)| {
(
format!("{:?}", k),
v.iter().map(|x| format!("{:?}", x)).collect::<Vec<_>>(),
)
}))
.finish()
}
}

/// A list of Time Events
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TimeEvents {
/// The list of Time Events
pub events: Vec<RawTimeEvent>,
}

impl std::fmt::Debug for TimeEvents {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_list().entries(self.events.iter()).finish()
}
}

/// TimeEvents, MerkleNodes, and Proof emitted from anchoring
pub struct TimeEventBatch {
/// The intermediate Merkle Tree Nodes
pub merkle_nodes: MerkleNodes,
/// The anchor proof
pub proof: Proof,
/// The Time Events
pub time_events: TimeEvents,
}

impl std::fmt::Debug for TimeEventBatch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimeEventBatch")
.field("merkle_nodes", &self.merkle_nodes)
.field("proof", &self.proof)
.field("time_events", &self.time_events)
.finish()
}
}
59 changes: 59 additions & 0 deletions anchor-service/src/anchor_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;

use crate::{
anchor::{AnchorRequest, TimeEventBatch},
merkle_tree::{build_merkle_tree, MerkleTree},
time_event::build_time_events,
transaction_manager::{Receipt, TransactionManager},
};

/// ceramic_anchor_service::Store is responsible for fetching AnchorRequests and storing TimeEvents.
#[async_trait]
pub trait Store: Send + Sync {
/// Get a batch of AnchorRequests.
async fn get_anchor_requests(&self) -> Result<Vec<AnchorRequest>>;
/// Store a batch of TimeEvents.
async fn put_time_events(&self, batch: TimeEventBatch) -> Result<()>;
}

/// An AnchorService is responsible for anchoring batches of AnchorRequests and storing TimeEvents generated based on
/// the requests and the anchor proof.
pub struct AnchorService {
tx_manager: Arc<dyn TransactionManager>,
}

impl AnchorService {
/// Create a new AnchorService.
pub fn new(tx_manager: Arc<dyn TransactionManager>) -> Self {
Self { tx_manager }
}

/// Anchor a batch of requests using a Transaction Manager:
/// - Build a MerkleTree from the anchor requests
/// - Anchor the root of the tree and obtain a proof from the Transaction Manager
/// - Build TimeEvents from the anchor requests and the proof
///
/// This function will block until the proof is obtained from the Transaction Manager.
pub async fn anchor_batch(&self, anchor_requests: &[AnchorRequest]) -> Result<TimeEventBatch> {
let MerkleTree {
root_cid,
nodes,
count,
} = build_merkle_tree(anchor_requests)?;
let Receipt {
proof,
detached_time_event,
mut remote_merkle_nodes,
} = self.tx_manager.make_proof(root_cid).await?;
let time_events = build_time_events(anchor_requests, &detached_time_event, count)?;
remote_merkle_nodes.extend(nodes);
Ok(TimeEventBatch {
merkle_nodes: remote_merkle_nodes,
proof,
time_events,
})
}
}
11 changes: 11 additions & 0 deletions anchor-service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! This crate include all the machinery needed for building Merkle Trees from Anchor Requests and then the Time Events
//! corresponding to those requests once the root of the tree has been anchored.
#![warn(missing_docs)]
mod anchor;
mod anchor_batch;
mod merkle_tree;
mod time_event;
mod transaction_manager;

pub use anchor_batch::{AnchorService, Store};
pub use transaction_manager::{DetachedTimeEvent, Receipt, TransactionManager};
83 changes: 83 additions & 0 deletions anchor-service/src/merkle_tree.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use crate::anchor::{AnchorRequest, MerkleNode, MerkleNodes};
use anyhow::{anyhow, Result};
use ceramic_core::SerializeExt;
use cid::Cid;

pub struct MerkleTree {
pub root_cid: Cid,
pub nodes: MerkleNodes,
pub count: u64,
}

/// Make a tree using Merkle mountain range
/// Ref: https://eprint.iacr.org/2021/038.pdf
pub fn build_merkle_tree(anchor_requests: &[AnchorRequest]) -> Result<MerkleTree> {
// For size zero trees return an error
if anchor_requests.is_empty() {
return Err(anyhow!("no requests to anchor"));
}
// The roots of the sub-trees with full power of 2 trees.
// They are in the places in the array where the 1s are in the count u64.
// e.g. 13 = 0b1110
// root
// / \
// / \ / \
// / \ / \ / \ / \
// /\ /\ / \ / \ / \ / \ 12 13
// 0 1 2 3 4 5 6 7 8 9 10 11
//
// here the peeks are [none, 12..13, 8..11, 0..7]
// place values 1's, 2's, 4's, 8's
let mut peaks: Vec<Option<Cid>> = vec![None; 64];

// The nodes in the Merkle Map[node_cid, [left_cid, right_cid]]
let mut nodes = MerkleNodes::default();

// insert all the `anchor_request.prev`s into the peaks.
for anchor_request in anchor_requests {
let mut new_node_cid: Cid = anchor_request.prev;
for peek in peaks.iter_mut() {
// walk the place values
match peek {
None => {
// if the place values peek is empty put the cid there
*peek = Some(new_node_cid);
// we found a place to put it. we are done
break;
}
Some(old_node_cid) => {
// if the place values peek is occupied add the old cid to the new cid and carry to the next place values peek
let merged_node: MerkleNode;
(new_node_cid, merged_node) = merge_nodes(*old_node_cid, new_node_cid)?;
// remember the generated nodes
nodes.insert(new_node_cid, merged_node);
// clear the place value peek we took old node from
*peek = None;
}
}
}
}

// Roll up the peaks into a root.
// Since each tree is larger then all the preceding trees combined
// we just walk the non empty peeks putting the last peek on the left.
let mut peaks_iter = peaks.into_iter().flatten();
let mut right_cid = peaks_iter.next().expect("should never be empty");
for left_cid in peaks_iter {
let merged_node: MerkleNode;
(right_cid, merged_node) = merge_nodes(left_cid, right_cid)?;
nodes.insert(right_cid, merged_node);
}

Ok(MerkleTree {
root_cid: right_cid,
count: anchor_requests.len() as u64,
nodes,
})
}

/// Accepts the CIDs of two blocks and returns the CID of the CBOR list that includes both CIDs.
pub(crate) fn merge_nodes(left: Cid, right: Cid) -> Result<(Cid, MerkleNode)> {
let merkle_node = vec![Some(left), Some(right)];
Ok((merkle_node.to_cid()?, merkle_node))
}
Loading

0 comments on commit c447c19

Please sign in to comment.