Skip to content

Commit

Permalink
refactor: create sync car crate (#485)
Browse files Browse the repository at this point in the history
Replaces iroh-car with ceramic-car that has both async and sync APIs.
Additionally changes the Event::encode_car and Event::decode_car methods
to be synchronous as we nearly always have the bytes in memory already
and its cheap to copy them into memory when we do not.
  • Loading branch information
nathanielc authored Aug 19, 2024
1 parent 92b9c44 commit 01ec7c9
Show file tree
Hide file tree
Showing 35 changed files with 495 additions and 164 deletions.
40 changes: 20 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resolver = "2"
members = [
"api",
"api-server",
"car",
"core",
"event",
"kubo-rpc",
Expand All @@ -14,7 +15,6 @@ members = [
"recon",
"store",
"beetle/iroh-bitswap",
"beetle/iroh-car",
"beetle/iroh-rpc-client",
"beetle/iroh-rpc-types",
"beetle/iroh-util",
Expand Down Expand Up @@ -51,6 +51,7 @@ bytes = "1.1"
bytesize = "1.1"
ceramic-api = { path = "./api" }
ceramic-api-server = { path = "./api-server" }
ceramic-car = { path = "./car" }
ceramic-core = { path = "./core" }
ceramic-event = { path = "./event" }
ceramic-kubo-rpc-server = { path = "./kubo-rpc-server" }
Expand Down Expand Up @@ -97,7 +98,6 @@ integer-encoding = "3.0"
ipld-core = "0.4"
ipld-dagpb = "0.2"
iroh-bitswap = { path = "./beetle/iroh-bitswap" }
iroh-car = { path = "./beetle/iroh-car" }
iroh-p2p = { version = "0.2.0", path = "./beetle/iroh-p2p" }
iroh-rpc-client = { path = "./beetle/iroh-rpc-client" }
iroh-rpc-types = { path = "./beetle/iroh-rpc-types" }
Expand Down
2 changes: 1 addition & 1 deletion api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ ceramic-event.workspace = true
ceramic-metadata.workspace = true
futures.workspace = true
ipld-core.workspace = true
iroh-car.workspace = true
ceramic-car.workspace = true
multibase.workspace = true
recon.workspace = true
serde.workspace = true
Expand Down
12 changes: 7 additions & 5 deletions api/src/server/event.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use std::collections::HashMap;
use std::{collections::HashMap, io::Cursor};

use anyhow::{anyhow, bail, Context, Result};
use ceramic_car::CarReader;
use ceramic_core::{Cid, EventId, Network};
use ceramic_event::unvalidated;
use ipld_core::ipld::Ipld;
use iroh_car::CarReader;
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, AsyncReadExt as _};
use tracing::debug;

use crate::EventStore;

// Helper function to construct an event ID from CAR data of an event coming in via the HTTP api.
pub async fn event_id_from_car<R, S>(network: Network, reader: R, store: &S) -> Result<EventId>
pub async fn event_id_from_car<R, S>(network: Network, mut reader: R, store: &S) -> Result<EventId>
where
R: AsyncRead + Send + Unpin,
S: EventStore,
{
let (event_cid, event) = unvalidated::Event::decode_car(reader, true).await?;
let mut car_bytes = Vec::new();
reader.read_to_end(&mut car_bytes).await?;
let (event_cid, event) = unvalidated::Event::decode_car(Cursor::new(&car_bytes), true)?;
event_id_for_event(event_cid, event, network, store).await
}

Expand Down
15 changes: 8 additions & 7 deletions beetle/iroh-car/Cargo.toml → car/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
[package]
name = "iroh-car"
authors = ["dignifiedquire <[email protected]>"]
description = "Implementation the car files for iroh"
name = "ceramic-car"
authors = [
"dignifiedquire <[email protected]>",
"Nathaniel Cook <[email protected]>",
]
description = "Car V1 reading and writing API"
version.workspace = true
edition.workspace = true
license.workspace = true
Expand All @@ -10,8 +13,8 @@ publish = false

[dependencies]
cid.workspace = true
futures.workspace = true
integer-encoding = { workspace = true, features = ["tokio_async"] }
futures.workspace = true
serde_ipld_dagcbor.workspace = true
serde.workspace = true
thiserror.workspace = true
Expand All @@ -20,13 +23,11 @@ tokio = { workspace = true, features = ["io-util"] }
[dev-dependencies]
multihash.workspace = true
multihash-codetable.workspace = true
ipld-core.workspace = true
tokio = { workspace = true, features = [
"macros",
"sync",
"rt",
"fs",
"io-util",
] }
ipld-core.workspace = true

[features]
File renamed without changes.
File renamed without changes.
File renamed without changes.
4 changes: 4 additions & 0 deletions beetle/iroh-car/src/lib.rs → car/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
mod error;
mod header;
mod reader;
/// Synchronous version of the same API.
/// Useful if all data already exists in memory.
pub mod sync;
mod util;
mod writer;

pub use crate::error::Error;
pub use crate::header::{CarHeader, CarHeaderV1};
pub use crate::reader::CarReader;
pub use crate::writer::CarWriter;
File renamed without changes.
10 changes: 10 additions & 0 deletions car/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//! Implementation of the [car](https://ipld.io/specs/transport/car/) format.

mod reader;
mod util;
mod writer;

pub use crate::error::Error;
pub use crate::header::{CarHeader, CarHeaderV1};
pub use reader::CarReader;
pub use writer::CarWriter;
102 changes: 102 additions & 0 deletions car/src/sync/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::io::Read;

use cid::Cid;

use super::util::{ld_read, read_node};
use crate::{header::CarHeader, Error};

/// Reads CAR files that are in a BufReader
#[derive(Debug)]
pub struct CarReader<R> {
reader: R,
header: CarHeader,
buffer: Vec<u8>,
}

impl<R> CarReader<R>
where
R: Read + Send + Unpin,
{
/// Creates a new CarReader and parses the CarHeader
pub fn new(mut reader: R) -> Result<Self, Error> {
let mut buffer = Vec::new();

match ld_read(&mut reader, &mut buffer)? {
Some(buf) => {
let header = CarHeader::decode(buf)?;

Ok(CarReader {
reader,
header,
buffer,
})
}
None => Err(Error::Parsing(
"failed to parse uvarint for header".to_string(),
)),
}
}

/// Returns the header of this car file.
pub fn header(&self) -> &CarHeader {
&self.header
}

/// Returns the next IPLD Block in the buffer
pub fn next_block(&mut self) -> Result<Option<(Cid, Vec<u8>)>, Error> {
read_node(&mut self.reader, &mut self.buffer)
}
}

impl<R> Iterator for CarReader<R>
where
R: Read + Send + Unpin,
{
type Item = Result<(Cid, Vec<u8>), Error>;

fn next(&mut self) -> Option<Self::Item> {
read_node(&mut self.reader, &mut self.buffer).transpose()
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;

use cid::Cid;
use ipld_core::{codec::Codec, ipld::Ipld};
use multihash_codetable::{Code, MultihashDigest};
use serde_ipld_dagcbor::codec::DagCborCodec;

use crate::sync::*;

#[test]
fn car_write_read() {
let digest_test = Code::Sha2_256.digest(b"test");
let cid_test = Cid::new_v1(<DagCborCodec as Codec<Ipld>>::CODE, digest_test);

let digest_foo = Code::Sha2_256.digest(b"foo");
let cid_foo = Cid::new_v1(<DagCborCodec as Codec<Ipld>>::CODE, digest_foo);

let header = CarHeader::V1(CarHeaderV1::from(vec![cid_foo]));

let mut buffer = Vec::new();
let mut writer = CarWriter::new(header, &mut buffer);
writer.write(cid_test, b"test").unwrap();
writer.write(cid_foo, b"foo").unwrap();
writer.finish().unwrap();

let reader = Cursor::new(&buffer);
let car_reader = CarReader::new(reader).unwrap();
let files: Vec<_> = car_reader
.into_iter()
.collect::<Result<_, Error>>()
.unwrap();

assert_eq!(files.len(), 2);
assert_eq!(files[0].0, cid_test);
assert_eq!(files[0].1, b"test");
assert_eq!(files[1].0, cid_foo);
assert_eq!(files[1].1, b"foo");
}
}
Loading

0 comments on commit 01ec7c9

Please sign in to comment.