From 01ec7c9487d195d3cade8ee9c183ecbfbd4e6522 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Mon, 19 Aug 2024 10:06:57 -0600 Subject: [PATCH] refactor: create sync car crate (#485) Replaces iroh-car with ceramic-car that has both async and sync APIs. Additionally changes the Event::encode_car and Event::decode_car methods to be synchronous as we nearly always have the bytes in memory already and its cheap to copy them into memory when we do not. --- Cargo.lock | 40 +++---- Cargo.toml | 4 +- api/Cargo.toml | 2 +- api/src/server/event.rs | 12 ++- {beetle/iroh-car => car}/Cargo.toml | 15 +-- {beetle/iroh-car => car}/README.md | 0 {beetle/iroh-car => car}/src/error.rs | 0 {beetle/iroh-car => car}/src/header.rs | 0 {beetle/iroh-car => car}/src/lib.rs | 4 + {beetle/iroh-car => car}/src/reader.rs | 0 car/src/sync/mod.rs | 10 ++ car/src/sync/reader.rs | 102 ++++++++++++++++++ car/src/sync/util.rs | 96 +++++++++++++++++ car/src/sync/writer.rs | 72 +++++++++++++ {beetle/iroh-car => car}/src/util.rs | 0 {beetle/iroh-car => car}/src/writer.rs | 0 .../iroh-car => car}/tests/car_file_test.rs | 3 +- .../iroh-car => car}/tests/carv1_basic.car | Bin car/tests/sync_car_file_test.rs | 82 ++++++++++++++ {beetle/iroh-car => car}/tests/testv1.car | Bin event/Cargo.toml | 2 +- event/src/unvalidated/builder.rs | 8 +- event/src/unvalidated/event.rs | 100 ++++++++--------- event/src/unvalidated/payload/init.rs | 8 +- event/src/unvalidated/signed/mod.rs | 12 +-- service/Cargo.toml | 3 +- service/src/event/migration.rs | 2 +- service/src/event/ordering_task.rs | 1 - service/src/event/service.rs | 1 - service/src/event/store.rs | 2 +- service/src/tests/migration.rs | 62 ++++------- service/src/tests/mod.rs | 6 +- store/Cargo.toml | 2 +- store/src/sql/entities/event.rs | 7 +- store/src/sql/entities/event_block.rs | 1 - 35 files changed, 495 insertions(+), 164 deletions(-) rename {beetle/iroh-car => car}/Cargo.toml (78%) rename {beetle/iroh-car => car}/README.md (100%) rename {beetle/iroh-car => car}/src/error.rs (100%) rename {beetle/iroh-car => car}/src/header.rs (100%) rename {beetle/iroh-car => car}/src/lib.rs (65%) rename {beetle/iroh-car => car}/src/reader.rs (100%) create mode 100644 car/src/sync/mod.rs create mode 100644 car/src/sync/reader.rs create mode 100644 car/src/sync/util.rs create mode 100644 car/src/sync/writer.rs rename {beetle/iroh-car => car}/src/util.rs (100%) rename {beetle/iroh-car => car}/src/writer.rs (100%) rename {beetle/iroh-car => car}/tests/car_file_test.rs (99%) rename {beetle/iroh-car => car}/tests/carv1_basic.car (100%) create mode 100644 car/tests/sync_car_file_test.rs rename {beetle/iroh-car => car}/tests/testv1.car (100%) diff --git a/Cargo.lock b/Cargo.lock index 2c6af91ee..4b4f6c5ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1320,6 +1320,7 @@ dependencies = [ "anyhow", "async-trait", "ceramic-api-server", + "ceramic-car", "ceramic-core", "ceramic-event", "ceramic-metadata", @@ -1327,7 +1328,6 @@ dependencies = [ "futures", "hex", "ipld-core", - "iroh-car", "jemalloc_pprof", "mockall", "multibase 0.9.1", @@ -1369,6 +1369,22 @@ dependencies = [ "validator", ] +[[package]] +name = "ceramic-car" +version = "0.32.0" +dependencies = [ + "cid 0.11.1", + "futures", + "integer-encoding", + "ipld-core", + "multihash 0.19.1", + "multihash-codetable", + "serde", + "serde_ipld_dagcbor", + "thiserror", + "tokio", +] + [[package]] name = "ceramic-core" version = "0.32.0" @@ -1406,11 +1422,11 @@ version = "0.32.0" dependencies = [ "anyhow", "base64 0.21.7", + "ceramic-car", "ceramic-core", "cid 0.11.1", "expect-test", "ipld-core", - "iroh-car", "multibase 0.9.1", "multihash-codetable", "serde", @@ -1637,6 +1653,7 @@ dependencies = [ "async-trait", "bytes 1.6.0", "ceramic-api", + "ceramic-car", "ceramic-core", "ceramic-event", "ceramic-store", @@ -1646,7 +1663,6 @@ dependencies = [ "hex", "ipld-core", "iroh-bitswap", - "iroh-car", "multibase 0.9.1", "multihash-codetable", "multihash-derive 0.9.0", @@ -1671,6 +1687,7 @@ dependencies = [ "anyhow", "async-trait", "ceramic-api", + "ceramic-car", "ceramic-core", "ceramic-event", "ceramic-metrics", @@ -1681,7 +1698,6 @@ dependencies = [ "hex", "ipld-core", "iroh-bitswap", - "iroh-car", "itertools 0.12.1", "multibase 0.9.1", "multihash 0.19.1", @@ -4423,22 +4439,6 @@ dependencies = [ "unsigned-varint 0.8.0", ] -[[package]] -name = "iroh-car" -version = "0.32.0" -dependencies = [ - "cid 0.11.1", - "futures", - "integer-encoding", - "ipld-core", - "multihash 0.19.1", - "multihash-codetable", - "serde", - "serde_ipld_dagcbor", - "thiserror", - "tokio", -] - [[package]] name = "iroh-rpc-client" version = "0.32.0" diff --git a/Cargo.toml b/Cargo.toml index 3da8aad58..774052a37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ resolver = "2" members = [ "api", "api-server", + "car", "core", "event", "kubo-rpc", @@ -14,7 +15,6 @@ members = [ "recon", "store", "beetle/iroh-bitswap", - "beetle/iroh-car", "beetle/iroh-rpc-client", "beetle/iroh-rpc-types", "beetle/iroh-util", @@ -51,6 +51,7 @@ bytes = "1.1" bytesize = "1.1" ceramic-api = { path = "./api" } ceramic-api-server = { path = "./api-server" } +ceramic-car = { path = "./car" } ceramic-core = { path = "./core" } ceramic-event = { path = "./event" } ceramic-kubo-rpc-server = { path = "./kubo-rpc-server" } @@ -97,7 +98,6 @@ integer-encoding = "3.0" ipld-core = "0.4" ipld-dagpb = "0.2" iroh-bitswap = { path = "./beetle/iroh-bitswap" } -iroh-car = { path = "./beetle/iroh-car" } iroh-p2p = { version = "0.2.0", path = "./beetle/iroh-p2p" } iroh-rpc-client = { path = "./beetle/iroh-rpc-client" } iroh-rpc-types = { path = "./beetle/iroh-rpc-types" } diff --git a/api/Cargo.toml b/api/Cargo.toml index fa711c6cf..b4a396c27 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -17,7 +17,7 @@ ceramic-event.workspace = true ceramic-metadata.workspace = true futures.workspace = true ipld-core.workspace = true -iroh-car.workspace = true +ceramic-car.workspace = true multibase.workspace = true recon.workspace = true serde.workspace = true diff --git a/api/src/server/event.rs b/api/src/server/event.rs index 077515e01..725d70137 100644 --- a/api/src/server/event.rs +++ b/api/src/server/event.rs @@ -1,22 +1,24 @@ -use std::collections::HashMap; +use std::{collections::HashMap, io::Cursor}; use anyhow::{anyhow, bail, Context, Result}; +use ceramic_car::CarReader; use ceramic_core::{Cid, EventId, Network}; use ceramic_event::unvalidated; use ipld_core::ipld::Ipld; -use iroh_car::CarReader; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, AsyncReadExt as _}; use tracing::debug; use crate::EventStore; // Helper function to construct an event ID from CAR data of an event coming in via the HTTP api. -pub async fn event_id_from_car(network: Network, reader: R, store: &S) -> Result +pub async fn event_id_from_car(network: Network, mut reader: R, store: &S) -> Result where R: AsyncRead + Send + Unpin, S: EventStore, { - let (event_cid, event) = unvalidated::Event::decode_car(reader, true).await?; + let mut car_bytes = Vec::new(); + reader.read_to_end(&mut car_bytes).await?; + let (event_cid, event) = unvalidated::Event::decode_car(Cursor::new(&car_bytes), true)?; event_id_for_event(event_cid, event, network, store).await } diff --git a/beetle/iroh-car/Cargo.toml b/car/Cargo.toml similarity index 78% rename from beetle/iroh-car/Cargo.toml rename to car/Cargo.toml index 93ab198d4..8d451989b 100644 --- a/beetle/iroh-car/Cargo.toml +++ b/car/Cargo.toml @@ -1,7 +1,10 @@ [package] -name = "iroh-car" -authors = ["dignifiedquire "] -description = "Implementation the car files for iroh" +name = "ceramic-car" +authors = [ + "dignifiedquire ", + "Nathaniel Cook ", +] +description = "Car V1 reading and writing API" version.workspace = true edition.workspace = true license.workspace = true @@ -10,8 +13,8 @@ publish = false [dependencies] cid.workspace = true -futures.workspace = true integer-encoding = { workspace = true, features = ["tokio_async"] } +futures.workspace = true serde_ipld_dagcbor.workspace = true serde.workspace = true thiserror.workspace = true @@ -20,6 +23,7 @@ tokio = { workspace = true, features = ["io-util"] } [dev-dependencies] multihash.workspace = true multihash-codetable.workspace = true +ipld-core.workspace = true tokio = { workspace = true, features = [ "macros", "sync", @@ -27,6 +31,3 @@ tokio = { workspace = true, features = [ "fs", "io-util", ] } -ipld-core.workspace = true - -[features] diff --git a/beetle/iroh-car/README.md b/car/README.md similarity index 100% rename from beetle/iroh-car/README.md rename to car/README.md diff --git a/beetle/iroh-car/src/error.rs b/car/src/error.rs similarity index 100% rename from beetle/iroh-car/src/error.rs rename to car/src/error.rs diff --git a/beetle/iroh-car/src/header.rs b/car/src/header.rs similarity index 100% rename from beetle/iroh-car/src/header.rs rename to car/src/header.rs diff --git a/beetle/iroh-car/src/lib.rs b/car/src/lib.rs similarity index 65% rename from beetle/iroh-car/src/lib.rs rename to car/src/lib.rs index 5bc5e5a6a..b25b3a72a 100644 --- a/beetle/iroh-car/src/lib.rs +++ b/car/src/lib.rs @@ -3,9 +3,13 @@ mod error; mod header; mod reader; +/// Synchronous version of the same API. +/// Useful if all data already exists in memory. +pub mod sync; mod util; mod writer; +pub use crate::error::Error; pub use crate::header::{CarHeader, CarHeaderV1}; pub use crate::reader::CarReader; pub use crate::writer::CarWriter; diff --git a/beetle/iroh-car/src/reader.rs b/car/src/reader.rs similarity index 100% rename from beetle/iroh-car/src/reader.rs rename to car/src/reader.rs diff --git a/car/src/sync/mod.rs b/car/src/sync/mod.rs new file mode 100644 index 000000000..53632c584 --- /dev/null +++ b/car/src/sync/mod.rs @@ -0,0 +1,10 @@ +//! Implementation of the [car](https://ipld.io/specs/transport/car/) format. + +mod reader; +mod util; +mod writer; + +pub use crate::error::Error; +pub use crate::header::{CarHeader, CarHeaderV1}; +pub use reader::CarReader; +pub use writer::CarWriter; diff --git a/car/src/sync/reader.rs b/car/src/sync/reader.rs new file mode 100644 index 000000000..84ba66f07 --- /dev/null +++ b/car/src/sync/reader.rs @@ -0,0 +1,102 @@ +use std::io::Read; + +use cid::Cid; + +use super::util::{ld_read, read_node}; +use crate::{header::CarHeader, Error}; + +/// Reads CAR files that are in a BufReader +#[derive(Debug)] +pub struct CarReader { + reader: R, + header: CarHeader, + buffer: Vec, +} + +impl CarReader +where + R: Read + Send + Unpin, +{ + /// Creates a new CarReader and parses the CarHeader + pub fn new(mut reader: R) -> Result { + let mut buffer = Vec::new(); + + match ld_read(&mut reader, &mut buffer)? { + Some(buf) => { + let header = CarHeader::decode(buf)?; + + Ok(CarReader { + reader, + header, + buffer, + }) + } + None => Err(Error::Parsing( + "failed to parse uvarint for header".to_string(), + )), + } + } + + /// Returns the header of this car file. + pub fn header(&self) -> &CarHeader { + &self.header + } + + /// Returns the next IPLD Block in the buffer + pub fn next_block(&mut self) -> Result)>, Error> { + read_node(&mut self.reader, &mut self.buffer) + } +} + +impl Iterator for CarReader +where + R: Read + Send + Unpin, +{ + type Item = Result<(Cid, Vec), Error>; + + fn next(&mut self) -> Option { + read_node(&mut self.reader, &mut self.buffer).transpose() + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use cid::Cid; + use ipld_core::{codec::Codec, ipld::Ipld}; + use multihash_codetable::{Code, MultihashDigest}; + use serde_ipld_dagcbor::codec::DagCborCodec; + + use crate::sync::*; + + #[test] + fn car_write_read() { + let digest_test = Code::Sha2_256.digest(b"test"); + let cid_test = Cid::new_v1(>::CODE, digest_test); + + let digest_foo = Code::Sha2_256.digest(b"foo"); + let cid_foo = Cid::new_v1(>::CODE, digest_foo); + + let header = CarHeader::V1(CarHeaderV1::from(vec![cid_foo])); + + let mut buffer = Vec::new(); + let mut writer = CarWriter::new(header, &mut buffer); + writer.write(cid_test, b"test").unwrap(); + writer.write(cid_foo, b"foo").unwrap(); + writer.finish().unwrap(); + + let reader = Cursor::new(&buffer); + let car_reader = CarReader::new(reader).unwrap(); + let files: Vec<_> = car_reader + .into_iter() + .collect::>() + .unwrap(); + + assert_eq!(files.len(), 2); + assert_eq!(files[0].0, cid_test); + assert_eq!(files[0].1, b"test"); + assert_eq!(files[1].0, cid_foo); + assert_eq!(files[1].1, b"foo"); + } +} diff --git a/car/src/sync/util.rs b/car/src/sync/util.rs new file mode 100644 index 000000000..460d07349 --- /dev/null +++ b/car/src/sync/util.rs @@ -0,0 +1,96 @@ +use std::io::Read; + +use cid::Cid; +use integer_encoding::VarIntReader; + +use crate::Error; + +/// Maximum size that is used for single node. +pub(crate) const MAX_ALLOC: usize = 4 * 1024 * 1024; + +pub(crate) fn ld_read(mut reader: R, buf: &mut Vec) -> Result, Error> +where + R: Read + Send + Unpin, +{ + let length: usize = match VarIntReader::read_varint(&mut reader) { + Ok(len) => len, + Err(e) => { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + return Ok(None); + } + return Err(Error::Parsing(e.to_string())); + } + }; + + if length > MAX_ALLOC { + return Err(Error::LdReadTooLarge(length)); + } + if length > buf.len() { + buf.resize(length, 0); + } + + reader + .read_exact(&mut buf[..length]) + .map_err(|e| Error::Parsing(e.to_string()))?; + + Ok(Some(&buf[..length])) +} + +pub(crate) fn read_node( + buf_reader: &mut R, + buf: &mut Vec, +) -> Result)>, Error> +where + R: Read + Send + Unpin, +{ + if let Some(buf) = ld_read(buf_reader, buf)? { + let mut cursor = std::io::Cursor::new(buf); + let c = Cid::read_bytes(&mut cursor)?; + let pos = cursor.position() as usize; + + return Ok(Some((c, buf[pos..].to_vec()))); + } + Ok(None) +} + +#[cfg(test)] +mod tests { + use std::io::Write; + + use integer_encoding::VarIntWriter; + + use super::*; + + fn ld_write<'a, W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error> + where + W: Write + Send + Unpin, + { + writer.write_varint(bytes.len())?; + writer.write_all(bytes)?; + writer.flush()?; + Ok(()) + } + + #[test] + fn ld_read_write_good() { + let mut buffer = Vec::::new(); + ld_write(&mut buffer, b"test bytes").unwrap(); + let reader = std::io::Cursor::new(buffer); + + let mut buffer = vec![1u8; 1024]; + let read = ld_read(reader, &mut buffer).unwrap().unwrap(); + assert_eq!(read, b"test bytes"); + } + + #[test] + fn ld_read_write_fail() { + let mut buffer = Vec::::new(); + let size = MAX_ALLOC + 1; + ld_write(&mut buffer, &vec![2u8; size]).unwrap(); + let reader = std::io::Cursor::new(buffer); + + let mut buffer = vec![1u8; 1024]; + let read = ld_read(reader, &mut buffer); + assert!(matches!(read, Err(Error::LdReadTooLarge(_)))); + } +} diff --git a/car/src/sync/writer.rs b/car/src/sync/writer.rs new file mode 100644 index 000000000..3a3b79fe7 --- /dev/null +++ b/car/src/sync/writer.rs @@ -0,0 +1,72 @@ +use std::io::Write; + +use cid::Cid; +use integer_encoding::VarIntWriter; + +use crate::{CarHeader, Error}; + +#[derive(Debug)] +pub struct CarWriter { + header: CarHeader, + writer: W, + cid_buffer: Vec, + is_header_written: bool, +} + +impl CarWriter +where + W: Write + Send + Unpin, +{ + pub fn new(header: CarHeader, writer: W) -> Self { + CarWriter { + header, + writer, + cid_buffer: Vec::new(), + is_header_written: false, + } + } + + /// Writes header and stream of data to writer in Car format. + pub fn write(&mut self, cid: Cid, data: T) -> Result<(), Error> + where + T: AsRef<[u8]>, + { + if !self.is_header_written { + // Write header bytes + let header_bytes = self.header.encode()?; + self.writer.write_varint(header_bytes.len())?; + self.writer.write_all(&header_bytes)?; + self.is_header_written = true; + } + + // Write the given block. + self.cid_buffer.clear(); + cid.write_bytes(&mut self.cid_buffer).expect("vec write"); + + let data = data.as_ref(); + let len = self.cid_buffer.len() + data.len(); + + self.writer.write_varint(len)?; + self.writer.write_all(&self.cid_buffer)?; + self.writer.write_all(data)?; + + Ok(()) + } + + /// Finishes writing, including flushing and returns the writer. + pub fn finish(mut self) -> Result { + self.flush()?; + Ok(self.writer) + } + + /// Flushes the underlying writer. + pub fn flush(&mut self) -> Result<(), Error> { + self.writer.flush()?; + Ok(()) + } + + /// Consumes the [`CarWriter`] and returns the underlying writer. + pub fn into_inner(self) -> W { + self.writer + } +} diff --git a/beetle/iroh-car/src/util.rs b/car/src/util.rs similarity index 100% rename from beetle/iroh-car/src/util.rs rename to car/src/util.rs diff --git a/beetle/iroh-car/src/writer.rs b/car/src/writer.rs similarity index 100% rename from beetle/iroh-car/src/writer.rs rename to car/src/writer.rs diff --git a/beetle/iroh-car/tests/car_file_test.rs b/car/tests/car_file_test.rs similarity index 99% rename from beetle/iroh-car/tests/car_file_test.rs rename to car/tests/car_file_test.rs index 28340d48e..8afe71dcd 100644 --- a/beetle/iroh-car/tests/car_file_test.rs +++ b/car/tests/car_file_test.rs @@ -1,8 +1,9 @@ use futures::TryStreamExt; -use iroh_car::*; use tokio::fs::{self, File}; use tokio::io::BufReader; +use ceramic_car::*; + #[tokio::test] async fn roundtrip_carv1_test_file() { let file = File::open("tests/testv1.car").await.unwrap(); diff --git a/beetle/iroh-car/tests/carv1_basic.car b/car/tests/carv1_basic.car similarity index 100% rename from beetle/iroh-car/tests/carv1_basic.car rename to car/tests/carv1_basic.car diff --git a/car/tests/sync_car_file_test.rs b/car/tests/sync_car_file_test.rs new file mode 100644 index 000000000..e4fa72d8e --- /dev/null +++ b/car/tests/sync_car_file_test.rs @@ -0,0 +1,82 @@ +use std::{ + fs::{self, File}, + io::BufReader, +}; + +use ceramic_car::sync::*; + +#[test] +fn roundtrip_carv1_test_file() { + let file = File::open("tests/testv1.car").unwrap(); + let buf_reader = BufReader::new(file); + + let car_reader = CarReader::new(buf_reader).unwrap(); + let header = car_reader.header().clone(); + let files: Vec<_> = car_reader + .into_iter() + .collect::>() + .unwrap(); + assert_eq!(files.len(), 35); + + let mut buffer = Vec::new(); + let mut writer = CarWriter::new(header, &mut buffer); + for (cid, data) in &files { + writer.write(*cid, data).unwrap(); + } + writer.finish().unwrap(); + + let file = fs::read("tests/testv1.car").unwrap(); + assert_eq!(file, buffer); +} + +#[test] +fn roundtrip_carv1_basic_fixtures_file() { + let file = File::open("tests/carv1_basic.car").unwrap(); + let buf_reader = BufReader::new(file); + + let car_reader = CarReader::new(buf_reader).unwrap(); + let header = car_reader.header().clone(); + + assert_eq!( + car_reader.header().roots(), + [ + "bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm" + .parse() + .unwrap(), + "bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm" + .parse() + .unwrap() + ] + ); + + let files: Vec<_> = car_reader + .into_iter() + .collect::>() + .unwrap(); + assert_eq!(files.len(), 8); + + let cids = [ + "bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm", + "QmNX6Tffavsya4xgBi2VJQnSuqy9GsxongxZZ9uZBqp16d", + "bafkreifw7plhl6mofk6sfvhnfh64qmkq73oeqwl6sloru6rehaoujituke", + "QmWXZxVQ9yZfhQxLD35eDR8LiMRsYtHxYqTFCBbJoiJVys", + "bafkreiebzrnroamgos2adnbpgw5apo3z4iishhbdx77gldnbk57d4zdio4", + "QmdwjhxpxzcMsR3qUuj7vUL8pbA7MgR3GAxWi2GLHjsKCT", + "bafkreidbxzk2ryxwwtqxem4l3xyyjvw35yu4tcct4cqeqxwo47zhxgxqwq", + "bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm", + ]; + + for (expected_cid, (cid, _)) in cids.iter().zip(&files) { + assert_eq!(*cid, expected_cid.parse().unwrap()); + } + + let mut buffer = Vec::new(); + let mut writer = CarWriter::new(header, &mut buffer); + for (cid, data) in &files { + writer.write(*cid, data).unwrap(); + } + writer.finish().unwrap(); + + let file = fs::read("tests/carv1_basic.car").unwrap(); + assert_eq!(file, buffer); +} diff --git a/beetle/iroh-car/tests/testv1.car b/car/tests/testv1.car similarity index 100% rename from beetle/iroh-car/tests/testv1.car rename to car/tests/testv1.car diff --git a/event/Cargo.toml b/event/Cargo.toml index e9c6bfab6..b687968a4 100644 --- a/event/Cargo.toml +++ b/event/Cargo.toml @@ -12,10 +12,10 @@ publish = false [dependencies] anyhow.workspace = true base64.workspace = true +ceramic-car.workspace = true ceramic-core.workspace = true cid.workspace = true ipld-core.workspace = true -iroh-car.workspace = true multihash-codetable.workspace = true multibase.workspace = true serde.workspace = true diff --git a/event/src/unvalidated/builder.rs b/event/src/unvalidated/builder.rs index db3aa1f5f..9e13e663b 100644 --- a/event/src/unvalidated/builder.rs +++ b/event/src/unvalidated/builder.rs @@ -503,7 +503,7 @@ mod tests { let event_car_str = multibase::encode( multibase::Base::Base64Url, - signed_event.encode_car().await.unwrap(), + signed_event.encode_car().unwrap(), ); assert_eq!(SIGNED_INIT_EVENT_CAR, event_car_str); } @@ -528,10 +528,8 @@ mod tests { .build() .unwrap(); - let event_car_str = multibase::encode( - multibase::Base::Base64Url, - event.encode_car().await.unwrap(), - ); + let event_car_str = + multibase::encode(multibase::Base::Base64Url, event.encode_car().unwrap()); assert_eq!(TIME_EVENT_CAR_SINGLE_EVENT_BATCH, event_car_str); } } diff --git a/event/src/unvalidated/event.rs b/event/src/unvalidated/event.rs index 315462aca..648199ff2 100644 --- a/event/src/unvalidated/event.rs +++ b/event/src/unvalidated/event.rs @@ -1,12 +1,11 @@ //! Types of raw unvalidated Ceramic Events use anyhow::{anyhow, bail, Context}; +use ceramic_car::sync::{CarHeader, CarReader, CarWriter}; use cid::Cid; use ipld_core::ipld::Ipld; -use iroh_car::{CarHeader, CarReader, CarWriter}; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, fmt::Debug}; -use tokio::io::AsyncRead; +use std::{collections::HashMap, fmt::Debug, io::Read}; use tracing::debug; use super::{cid_from_dag_cbor, init, signed, Payload}; @@ -92,23 +91,20 @@ where } /// Encode the event into a CAR bytes containing all blocks of the event. - pub async fn encode_car(&self) -> anyhow::Result> { + pub fn encode_car(&self) -> anyhow::Result> { match self { - Event::Time(event) => event.encode_car().await, - Event::Signed(event) => event.encode_car().await, - Event::Unsigned(event) => event.encode_car().await, + Event::Time(event) => event.encode_car(), + Event::Signed(event) => event.encode_car(), + Event::Unsigned(event) => event.encode_car(), } } /// Decode bytes into a materialized event. - pub async fn decode_car( - reader: R, - deny_unexpected_fields: bool, - ) -> anyhow::Result<(Cid, Self)> + pub fn decode_car(reader: R, deny_unexpected_fields: bool) -> anyhow::Result<(Cid, Self)> where - R: AsyncRead + Send + Unpin, + R: Read + Send + Unpin, { - let mut car = CarReader::new(reader).await?; + let car = CarReader::new(reader)?; let event_cid = *car .header() .roots() @@ -118,7 +114,8 @@ where debug!(%event_cid, "first root cid"); let mut car_blocks = HashMap::new(); - while let Some((cid, bytes)) = car.next_block().await? { + for block in car { + let (cid, bytes) = block?; car_blocks.insert(cid, bytes); } let event_bytes = car_blocks @@ -299,7 +296,7 @@ impl TimeEvent { self.event.path.as_ref() } /// Encode the event into CAR bytes including all relevant blocks. - pub async fn encode_car(&self) -> anyhow::Result> { + pub fn encode_car(&self) -> anyhow::Result> { let event = serde_ipld_dagcbor::to_vec(&self.event)?; let cid = cid_from_dag_cbor(&event); @@ -308,14 +305,14 @@ impl TimeEvent { let mut car = Vec::new(); let roots: Vec = vec![cid]; let mut writer = CarWriter::new(CarHeader::V1(roots.into()), &mut car); - writer.write(cid, event).await?; - writer.write(self.event.proof, proof).await?; + writer.write(cid, event)?; + writer.write(self.event.proof, proof)?; for block in &self.blocks_in_path { let block_bytes = serde_ipld_dagcbor::to_vec(&block)?; let block_cid = cid_from_dag_cbor(&block_bytes); - writer.write(block_cid, block_bytes).await?; + writer.write(block_cid, block_bytes)?; } - writer.finish().await?; + writer.finish()?; Ok(car) } } @@ -446,51 +443,46 @@ mod tests { Builder, Event, }; - async fn round_trip(car: &str) { + fn round_trip(car: &str) { let (base, data) = multibase::decode(car).unwrap(); - let (_cid, event) = Event::::decode_car(data.as_slice(), true) - .await - .unwrap(); - assert_eq!( - car, - multibase::encode(base, event.encode_car().await.unwrap()) - ); + let (_cid, event) = Event::::decode_car(data.as_slice(), true).unwrap(); + assert_eq!(car, multibase::encode(base, event.encode_car().unwrap())); } - #[test(tokio::test)] - async fn round_trip_signed_init_event() { - round_trip(SIGNED_INIT_EVENT_CAR).await; + #[test] + fn round_trip_signed_init_event() { + round_trip(SIGNED_INIT_EVENT_CAR); } - #[test(tokio::test)] - async fn round_trip_unsigned_init_event() { - round_trip(UNSIGNED_INIT_EVENT_CAR).await; + #[test] + fn round_trip_unsigned_init_event() { + round_trip(UNSIGNED_INIT_EVENT_CAR); } - #[test(tokio::test)] - async fn round_trip_signed_data_event() { - round_trip(SIGNED_DATA_EVENT_CAR).await; + #[test] + fn round_trip_signed_data_event() { + round_trip(SIGNED_DATA_EVENT_CAR); } - #[test(tokio::test)] - async fn round_trip_cacao_signed_data_event() { - round_trip(CACAO_SIGNED_DATA_EVENT_CAR).await; + #[test] + fn round_trip_cacao_signed_data_event() { + round_trip(CACAO_SIGNED_DATA_EVENT_CAR); } - #[test(tokio::test)] - async fn round_trip_data_event_unsigned_init() { - round_trip(DATA_EVENT_CAR_UNSIGNED_INIT).await; + #[test] + fn round_trip_data_event_unsigned_init() { + round_trip(DATA_EVENT_CAR_UNSIGNED_INIT); } #[test(tokio::test)] async fn round_trip_time_event_single_event_batch() { - round_trip(TIME_EVENT_CAR_SINGLE_EVENT_BATCH).await; + round_trip(TIME_EVENT_CAR_SINGLE_EVENT_BATCH); } #[test(tokio::test)] async fn round_trip_time_event_multi_event_batch() { - round_trip(TIME_EVENT_CAR_MULTI_EVENT_BATCH).await; + round_trip(TIME_EVENT_CAR_MULTI_EVENT_BATCH); } - #[test(tokio::test)] - async fn round_trip_init_payload_with_no_sep() { - round_trip(UNSIGNED_INIT_NO_SEP_CAR).await; + #[test] + fn round_trip_init_payload_with_no_sep() { + round_trip(UNSIGNED_INIT_NO_SEP_CAR); } - #[test(tokio::test)] - async fn decode_time_event_with_no_tree() { + #[test] + fn decode_time_event_with_no_tree() { let id = Cid::from_str(SIGNED_INIT_EVENT_CID).unwrap(); let prev = Cid::from_str("bagcqcerae5oqoglzjjgz53enwsttl7mqglp5eoh2llzbbvfktmzxleeiffbq").unwrap(); @@ -508,10 +500,8 @@ mod tests { .build() .unwrap(); - let event_car = event.encode_car().await.unwrap(); - let (_cid, parsed_event) = Event::::decode_car(event_car.as_slice(), true) - .await - .unwrap(); + let event_car = event.encode_car().unwrap(); + let (_cid, parsed_event) = Event::::decode_car(event_car.as_slice(), true).unwrap(); let Event::Time(parsed_event) = parsed_event else { panic!("Event must be a time event") @@ -521,8 +511,8 @@ mod tests { assert_eq!(prev, parsed_event.proof.root); assert_eq!("", parsed_event.event.path); } - #[test(tokio::test)] - async fn decode_event_with_no_sep() { + #[test] + fn decode_event_with_no_sep() { // Tests that decoding an init payload that does not have the `sep` field defaults to // `model`. const INIT_PAYLOAD_NO_SEP:&str="uomRkYXRhpmRkYXRho2N1cmxgZWxhYmVsZ0Zhc3RpbmduY2hpbGRyZW5IaWRkZW70ZHR5cGVsUXVlc3Rpb25Ob2RlZ2NyZWF0ZWR4GDIwMjMtMDItMjBUMTU6MTk6MzYuMjc5Wmhwb3NpdGlvbqJhePtApYOSIAAAAGF5-0C2p4AAAAAAaWxhdGVyYWxJRHgkNjlhYWYzN2QtNTU5Yi00Yjk1LWExMDAtNWVlOTgxOGZjNWVkaXByb2plY3RJRHg_a2p6bDZrY3ltN3c4eTVkNjVmOW9rbjRyaXlkYXQ5MmgzczZ2dnpwd3d1NzU0NGk5MmZqeWdjNTY3bHpocnZjZmhlYWRlcqNlbW9kZWxYKM4BAgGFARIgluyz1feTN9qD54Xo4XHQoMg5Xo_kPE6L5xYBadM3kWNmdW5pcXVlTCrb84nFhVpKhrCYm2tjb250cm9sbGVyc4F4O2RpZDpwa2g6ZWlwMTU1OjE6MHhjYTVmZjRiMzQ0MmZjYWMyN2UxYWY0NDU3ZTAyZWI2MjljNzEyOTgz"; diff --git a/event/src/unvalidated/payload/init.rs b/event/src/unvalidated/payload/init.rs index a907a6186..b564e7680 100644 --- a/event/src/unvalidated/payload/init.rs +++ b/event/src/unvalidated/payload/init.rs @@ -1,6 +1,6 @@ use crate::{bytes::Bytes, unvalidated::cid_from_dag_cbor}; +use ceramic_car::sync::{CarHeader, CarWriter}; use cid::Cid; -use iroh_car::{CarHeader, CarWriter}; use serde::{Deserialize, Serialize}; /// Payload of an init event @@ -34,14 +34,14 @@ impl Payload { Ok(cid_from_dag_cbor(&event)) } /// Encode the unsigned init event into CAR bytes. - pub async fn encode_car(&self) -> Result, anyhow::Error> { + pub fn encode_car(&self) -> Result, anyhow::Error> { let event = serde_ipld_dagcbor::to_vec(self)?; let cid = cid_from_dag_cbor(&event); let mut car = Vec::new(); let roots: Vec = vec![cid]; let mut writer = CarWriter::new(CarHeader::V1(roots.into()), &mut car); - writer.write(cid, event).await?; - writer.finish().await?; + writer.write(cid, event)?; + writer.finish()?; Ok(car) } } diff --git a/event/src/unvalidated/signed/mod.rs b/event/src/unvalidated/signed/mod.rs index 785d4f71d..fc493b10d 100644 --- a/event/src/unvalidated/signed/mod.rs +++ b/event/src/unvalidated/signed/mod.rs @@ -6,10 +6,10 @@ use std::fmt::Debug; use crate::bytes::Bytes; use crate::unvalidated::Payload; use base64::Engine; +use ceramic_car::sync::{CarHeader, CarWriter}; use ceramic_core::{DidDocument, Jwk}; use cid::Cid; use ipld_core::ipld::Ipld; -use iroh_car::{CarHeader, CarWriter}; use serde::{Deserialize, Serialize}; use ssi::jwk::Algorithm; use std::{collections::BTreeMap, str::FromStr as _}; @@ -140,7 +140,7 @@ impl Event { } /// Encodes the full signed event into a CAR file. - pub async fn encode_car(&self) -> anyhow::Result> { + pub fn encode_car(&self) -> anyhow::Result> { let envelope_bytes = self.encode_envelope()?; let payload_bytes = self.encode_payload()?; let capability_bytes = self.encode_capability()?; @@ -149,11 +149,11 @@ impl Event { let roots: Vec = vec![self.envelope_cid]; let mut writer = CarWriter::new(CarHeader::V1(roots.into()), &mut car); if let Some((cid, bytes)) = capability_bytes { - writer.write(cid, &bytes).await?; + writer.write(cid, &bytes)?; } - writer.write(self.payload_cid, payload_bytes).await?; - writer.write(self.envelope_cid, envelope_bytes).await?; - writer.finish().await?; + writer.write(self.payload_cid, payload_bytes)?; + writer.write(self.envelope_cid, envelope_bytes)?; + writer.finish()?; Ok(car) } diff --git a/service/Cargo.toml b/service/Cargo.toml index d0bd41f5d..f5c11cfb7 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -11,6 +11,7 @@ anyhow.workspace = true async-trait.workspace = true bytes.workspace = true ceramic-api.workspace = true +ceramic-car.workspace = true ceramic-core.workspace = true ceramic-event.workspace = true ceramic-store.workspace = true @@ -19,7 +20,6 @@ futures.workspace = true hex.workspace = true ipld-core.workspace = true iroh-bitswap.workspace = true -iroh-car.workspace = true multibase.workspace = true multihash-codetable.workspace = true multihash-derive.workspace = true @@ -43,4 +43,3 @@ tmpdir.workspace = true tokio.workspace = true tracing-subscriber.workspace = true uuid.workspace = true -iroh-car.workspace = true diff --git a/service/src/event/migration.rs b/service/src/event/migration.rs index 9e0413ad8..33bc8fce4 100644 --- a/service/src/event/migration.rs +++ b/service/src/event/migration.rs @@ -360,7 +360,7 @@ impl EventBuilder { .with_event(&self.event_cid) .build(); - let body = event.encode_car().await?; + let body = event.encode_car()?; Ok(ReconItem::new(event_id, body)) } } diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index d377331d0..774a4bf61 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -128,7 +128,6 @@ impl StreamEvent { )) })?; let (_cid, parsed) = unvalidated::Event::::decode_car(data.as_slice(), false) - .await .map_err(Error::new_app)?; let metadata = EventMetadata::from(&parsed); diff --git a/service/src/event/service.rs b/service/src/event/service.rs index f664ebdfa..b0b435227 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -119,7 +119,6 @@ impl CeramicEventService { ) -> Result { let (cid, parsed_event) = unvalidated::Event::::decode_car(item.value.as_slice(), false) - .await .map_err(Error::new_app)?; Ok(EventInsertable::new( diff --git a/service/src/event/store.rs b/service/src/event/store.rs index 1c564b895..51346ccad 100644 --- a/service/src/event/store.rs +++ b/service/src/event/store.rs @@ -221,7 +221,7 @@ impl ceramic_api::EventStore for CeramicEventService { for (cid, value) in data { res.push(ceramic_api::EventDataResult::new( cid, - Some(value.encode_car().await?), + Some(value.encode_car()?), )); } (hw, res) diff --git a/service/src/tests/migration.rs b/service/src/tests/migration.rs index 91100ca2a..650947667 100644 --- a/service/src/tests/migration.rs +++ b/service/src/tests/migration.rs @@ -6,12 +6,12 @@ use std::{ use anyhow::Result; use async_trait::async_trait; +use ceramic_car::CarReader; use ceramic_core::{DidDocument, EventId, Network, StreamId}; use ceramic_event::unvalidated; use cid::Cid; use futures::{pin_mut, stream::BoxStream, StreamExt as _, TryStreamExt as _}; use ipld_core::{codec::Codec, ipld, ipld::Ipld}; -use iroh_car::CarReader; use multihash_codetable::{Code, MultihashDigest}; use rand::{thread_rng, Rng, RngCore}; use recon::Key; @@ -213,46 +213,28 @@ async fn unsigned_init_event() { test_migration(vec![random_unsigned_init_event() .await .encode_car() - .await .unwrap()]) .await; } #[test(tokio::test)] async fn many_unsigned_init_events() { test_migration(vec![ - random_unsigned_init_event() - .await - .encode_car() - .await - .unwrap(), - random_unsigned_init_event() - .await - .encode_car() - .await - .unwrap(), - random_unsigned_init_event() - .await - .encode_car() - .await - .unwrap(), + random_unsigned_init_event().await.encode_car().unwrap(), + random_unsigned_init_event().await.encode_car().unwrap(), + random_unsigned_init_event().await.encode_car().unwrap(), ]) .await; } #[test(tokio::test)] async fn signed_init_event() { - test_migration(vec![random_signed_init_event() - .await - .encode_car() - .await - .unwrap()]) - .await; + test_migration(vec![random_signed_init_event().await.encode_car().unwrap()]).await; } #[test(tokio::test)] async fn many_signed_init_events() { test_migration(vec![ - random_signed_init_event().await.encode_car().await.unwrap(), - random_signed_init_event().await.encode_car().await.unwrap(), - random_signed_init_event().await.encode_car().await.unwrap(), + random_signed_init_event().await.encode_car().unwrap(), + random_signed_init_event().await.encode_car().unwrap(), + random_signed_init_event().await.encode_car().unwrap(), ]) .await; } @@ -260,7 +242,7 @@ async fn many_signed_init_events() { async fn signed_data_event() { let mut cars = Vec::new(); for event in random_signed_data_event().await { - cars.push(event.encode_car().await.unwrap()); + cars.push(event.encode_car().unwrap()); } test_migration(cars).await; } @@ -269,7 +251,7 @@ async fn many_signed_data_events() { let mut cars = Vec::new(); for _ in 0..3 { for event in random_signed_data_event().await { - cars.push(event.encode_car().await.unwrap()); + cars.push(event.encode_car().unwrap()); } } test_migration(cars).await; @@ -282,7 +264,7 @@ async fn cacao_signed_data_event() { async fn unsigned_time_event() { let mut cars = Vec::new(); for event in random_unsigned_init_time_event().await { - cars.push(event.encode_car().await.unwrap()); + cars.push(event.encode_car().unwrap()); } test_migration(cars).await; } @@ -290,7 +272,7 @@ async fn unsigned_time_event() { async fn signed_init_time_event() { let mut cars = Vec::new(); for event in random_signed_init_time_event().await { - cars.push(event.encode_car().await.unwrap()); + cars.push(event.encode_car().unwrap()); } test_migration(cars).await; @@ -300,12 +282,12 @@ async fn many_time_events() { let mut cars = Vec::new(); for _ in 0..3 { for event in random_unsigned_init_time_event().await { - cars.push(event.encode_car().await.unwrap()); + cars.push(event.encode_car().unwrap()); } } for _ in 0..3 { for event in random_signed_init_time_event().await { - cars.push(event.encode_car().await.unwrap()); + cars.push(event.encode_car().unwrap()); } } test_migration(cars).await; @@ -317,30 +299,24 @@ async fn all_events() { cars.push(new_cacao_signed_data_event()); for _ in 0..3 { - cars.push( - random_unsigned_init_event() - .await - .encode_car() - .await - .unwrap(), - ); + cars.push(random_unsigned_init_event().await.encode_car().unwrap()); } for _ in 0..3 { - cars.push(random_signed_init_event().await.encode_car().await.unwrap()); + cars.push(random_signed_init_event().await.encode_car().unwrap()); } for _ in 0..3 { for event in random_signed_data_event().await { - cars.push(event.encode_car().await.unwrap()); + cars.push(event.encode_car().unwrap()); } } for _ in 0..3 { for event in random_unsigned_init_time_event().await { - cars.push(event.encode_car().await.unwrap()); + cars.push(event.encode_car().unwrap()); } } for _ in 0..3 { for event in random_signed_init_time_event().await { - cars.push(event.encode_car().await.unwrap()); + cars.push(event.encode_car().unwrap()); } } test_migration(cars).await; diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index bebbea78d..388fdb171 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -83,7 +83,7 @@ async fn build_event_fixed_model(model: StreamId) -> TestEventInfo { let init_cid = signed.envelope_cid(); let event_id = build_event_id(&init_cid, &init_cid, &model); - let car = signed.encode_car().await.unwrap(); + let car = signed.encode_car().unwrap(); TestEventInfo { event_id, blocks: vec![ @@ -175,7 +175,7 @@ async fn get_init_plus_n_events_with_model( let init_cid = init.envelope_cid(); let (event_id, car) = ( build_event_id(&init_cid, &init_cid, model), - init.encode_car().await.unwrap(), + init.encode_car().unwrap(), ); let init_cid = event_id.cid().unwrap(); @@ -196,7 +196,7 @@ async fn get_init_plus_n_events_with_model( let data = data_event(init_cid, prev, data, &signer).await; let (data_id, data_car) = ( build_event_id(&data.envelope_cid(), &init_cid, model), - data.encode_car().await.unwrap(), + data.encode_car().unwrap(), ); prev = data_id.cid().unwrap(); events.push(ReconItem::new(data_id, data_car)); diff --git a/store/Cargo.toml b/store/Cargo.toml index d9482a1a4..c5f03ec2a 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -12,6 +12,7 @@ publish = false anyhow.workspace = true async-trait.workspace = true ceramic-api.workspace = true +ceramic-car.workspace = true ceramic-core.workspace = true ceramic-event.workspace = true ceramic-metrics.workspace = true @@ -20,7 +21,6 @@ futures.workspace = true hex.workspace = true ipld-core.workspace = true iroh-bitswap.workspace = true -iroh-car.workspace = true itertools = "0.12.0" multihash-codetable.workspace = true multihash.workspace = true diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 4d78ec93c..39c49b6ec 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -1,10 +1,11 @@ +use std::collections::BTreeSet; + use anyhow::anyhow; +use ceramic_car::{CarHeader, CarReader, CarWriter}; use ceramic_core::EventId; use ceramic_event::unvalidated; use cid::Cid; use ipld_core::ipld::Ipld; -use iroh_car::{CarHeader, CarReader, CarWriter}; -use std::collections::BTreeSet; pub use crate::sql::entities::EventBlockRaw; @@ -114,7 +115,7 @@ impl EventInsertable { pub async fn get_raw_blocks(&self) -> Result> { // TODO(AES-311): Turn the Event into its raw blocks in a single pass, instead of first // turning it into a CAR file and then turning the CAR file into the raw blocks. - let car = self.event.encode_car().await.map_err(Error::new_app)?; + let car = self.event.encode_car().map_err(Error::new_app)?; let mut reader = CarReader::new(car.as_slice()) .await diff --git a/store/src/sql/entities/event_block.rs b/store/src/sql/entities/event_block.rs index 078eb3a27..cc4fdc5c6 100644 --- a/store/src/sql/entities/event_block.rs +++ b/store/src/sql/entities/event_block.rs @@ -75,7 +75,6 @@ impl ReconEventBlockRaw { for (_, carfile) in parsed { let (cid, parsed_event) = unvalidated::Event::::decode_car(carfile.as_slice(), false) - .await .map_err(Error::new_app)?; res.push((cid, parsed_event)); }