diff --git a/Cargo.lock b/Cargo.lock index 3c9b24851..abac55ec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1534,6 +1534,47 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "ceramic-event-svc" +version = "0.34.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes 1.7.1", + "ceramic-api", + "ceramic-car", + "ceramic-core", + "ceramic-event", + "ceramic-metrics", + "ceramic-sql", + "cid 0.11.1", + "criterion2", + "expect-test", + "futures", + "hex", + "ipld-core", + "iroh-bitswap", + "itertools 0.12.1", + "multibase 0.9.1", + "multihash 0.19.1", + "multihash-codetable", + "multihash-derive 0.9.0", + "paste", + "prometheus-client", + "rand 0.8.5", + "recon", + "serde", + "serde_ipld_dagcbor", + "sqlx", + "test-log", + "thiserror", + "tmpdir", + "tokio", + "tracing", + "tracing-subscriber", + "uuid 1.10.0", +] + [[package]] name = "ceramic-flight" version = "0.34.0" @@ -1550,6 +1591,38 @@ dependencies = [ "tokio", ] +[[package]] +name = "ceramic-interest-svc" +version = "0.34.0" +dependencies = [ + "anyhow", + "async-trait", + "ceramic-api", + "ceramic-core", + "ceramic-event", + "ceramic-metrics", + "ceramic-sql", + "criterion2", + "expect-test", + "futures", + "ipld-core", + "multibase 0.9.1", + "paste", + "prometheus-client", + "rand 0.8.5", + "recon", + "serde", + "serde_ipld_dagcbor", + "sqlx", + "test-log", + "thiserror", + "tmpdir", + "tokio", + "tracing", + "tracing-subscriber", + "uuid 1.10.0", +] + [[package]] name = "ceramic-kubo-rpc" version = "0.34.0" @@ -1678,12 +1751,13 @@ dependencies = [ "ceramic-api-server", "ceramic-core", "ceramic-event", + "ceramic-event-svc", + "ceramic-interest-svc", "ceramic-kubo-rpc", "ceramic-kubo-rpc-server", "ceramic-metrics", "ceramic-p2p", - "ceramic-service", - "ceramic-store", + "ceramic-sql", "cid 0.11.1", "clap 4.5.16", "expect-test", @@ -1725,9 +1799,8 @@ dependencies = [ "async-trait", "backoff", "ceramic-core", + "ceramic-event-svc", "ceramic-metrics", - "ceramic-service", - "ceramic-store", "cid 0.11.1", "criterion2", "futures", @@ -1756,74 +1829,12 @@ dependencies = [ ] [[package]] -name = "ceramic-service" -version = "0.34.0" -dependencies = [ - "anyhow", - "async-trait", - "bytes 1.7.1", - "ceramic-api", - "ceramic-car", - "ceramic-core", - "ceramic-event", - "ceramic-store", - "cid 0.11.1", - "expect-test", - "futures", - "hex", - "ipld-core", - "iroh-bitswap", - "multibase 0.9.1", - "multihash-codetable", - "multihash-derive 0.9.0", - "paste", - "rand 0.8.5", - "recon", - "serde", - "serde_ipld_dagcbor", - "test-log", - "thiserror", - "tmpdir", - "tokio", - "tracing", - "tracing-subscriber", - "uuid 1.10.0", -] - -[[package]] -name = "ceramic-store" +name = "ceramic-sql" version = "0.34.0" dependencies = [ "anyhow", - "async-trait", - "ceramic-api", - "ceramic-car", - "ceramic-core", - "ceramic-event", - "ceramic-metrics", - "cid 0.11.1", - "criterion2", - "expect-test", - "futures", - "hex", - "ipld-core", - "iroh-bitswap", - "itertools 0.12.1", - "multibase 0.9.1", - "multihash 0.19.1", - "multihash-codetable", - "paste", - "prometheus-client", - "rand 0.8.5", - "recon", - "serde_ipld_dagcbor", "sqlx", - "test-log", "thiserror", - "tmpdir", - "tokio", - "tracing-subscriber", - "uuid 1.10.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 23ec757f7..6386e4f24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,22 +6,23 @@ members = [ "car", "core", "event", + "event-svc", + "flight", + "interest-svc", "kubo-rpc", "kubo-rpc-server", "metadata", "metrics", + "olap", "one", "p2p", "recon", - "store", + "sql", + "validation", "beetle/iroh-bitswap", "beetle/iroh-rpc-client", "beetle/iroh-rpc-types", "beetle/iroh-util", - "service", - "olap", - "validation", - "flight", ] [workspace.dependencies] @@ -57,14 +58,15 @@ ceramic-api-server = { path = "./api-server" } ceramic-car = { path = "./car" } ceramic-core = { path = "./core" } ceramic-event = { path = "./event" } +ceramic-event-svc = { path = "./event-svc" } ceramic-flight = { path = "./flight" } +ceramic-interest-svc = { path = "./interest-svc" } ceramic-kubo-rpc-server = { path = "./kubo-rpc-server" } ceramic-metadata = { path = "./metadata" } ceramic-metrics = { path = "./metrics" } ceramic-one = { path = "./one" } ceramic-p2p = { path = "./p2p" } -ceramic-service = { path = "./service" } -ceramic-store = { path = "./store" } +ceramic-sql = { path = "./sql" } ceramic-validation = { path = "./validation" } chrono = "0.4.31" cid = { version = "0.11", features = ["serde-codec"] } @@ -108,6 +110,7 @@ iroh-p2p = { version = "0.2.0", path = "./beetle/iroh-p2p" } iroh-rpc-client = { path = "./beetle/iroh-rpc-client" } iroh-rpc-types = { path = "./beetle/iroh-rpc-types" } iroh-util = { path = "./beetle/iroh-util" } +itertools = "0.12.0" k256 = "0.13" keyed_priority_queue = "0.4.1" lazy_static = "1.4" diff --git a/api/src/lib.rs b/api/src/lib.rs index 90f43a64e..3be76df8a 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -4,7 +4,7 @@ mod server; pub use resume_token::ResumeToken; pub use server::{ - ApiItem, EventDataResult, EventInsertResult, EventStore, IncludeEventData, InterestStore, + ApiItem, EventDataResult, EventInsertResult, EventService, IncludeEventData, InterestService, Server, }; diff --git a/api/src/server.rs b/api/src/server.rs index 908ac354c..23bcd550f 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -134,7 +134,7 @@ impl TryFrom for ValidatedInterest { /// Trait for accessing persistent storage of Interests #[async_trait] -pub trait InterestStore: Send + Sync { +pub trait InterestService: Send + Sync { /// Returns true if the key was newly inserted, false if it already existed. async fn insert(&self, key: Interest) -> Result; async fn range( @@ -147,7 +147,7 @@ pub trait InterestStore: Send + Sync { } #[async_trait] -impl InterestStore for Arc { +impl InterestService for Arc { async fn insert(&self, key: Interest) -> Result { self.as_ref().insert(key).await } @@ -243,7 +243,7 @@ impl ApiItem { /// Trait for accessing persistent storage of Events #[async_trait] -pub trait EventStore: Send + Sync { +pub trait EventService: Send + Sync { /// Returns (new_key, new_value) where true if was newly inserted, false if it already existed. async fn insert_many(&self, items: Vec) -> Result>; async fn range_with_values( @@ -279,7 +279,7 @@ pub trait EventStore: Send + Sync { } #[async_trait::async_trait] -impl EventStore for Arc { +impl EventService for Arc { async fn insert_many(&self, items: Vec) -> Result> { self.as_ref().insert_many(items).await } @@ -346,8 +346,8 @@ pub struct Server { impl Server where - I: InterestStore, - M: EventStore + 'static, + I: InterestService, + M: EventService + 'static, { pub fn new(peer_id: PeerId, network: Network, interest: I, model: Arc) -> Self { let (tx, event_rx) = tokio::sync::mpsc::channel::(1024); @@ -807,8 +807,8 @@ pub(crate) fn decode_multibase_data(value: &str) -> Result, BadRequestRe impl Api for Server where C: Send + Sync, - I: InterestStore + Sync, - M: EventStore + Sync + 'static, + I: InterestService + Sync, + M: EventService + Sync + 'static, { #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] async fn liveness_get( diff --git a/api/src/server/event.rs b/api/src/server/event.rs index 7ac66d9b9..6d785a766 100644 --- a/api/src/server/event.rs +++ b/api/src/server/event.rs @@ -8,13 +8,13 @@ use ipld_core::ipld::Ipld; use tokio::io::{AsyncRead, AsyncReadExt as _}; use tracing::debug; -use crate::EventStore; +use crate::EventService; // Helper function to construct an event ID from CAR data of an event coming in via the HTTP api. pub async fn event_id_from_car(network: Network, mut reader: R, store: &S) -> Result where R: AsyncRead + Send + Unpin, - S: EventStore, + S: EventService, { let mut car_bytes = Vec::new(); reader.read_to_end(&mut car_bytes).await?; @@ -29,7 +29,7 @@ async fn event_id_for_event( store: &S, ) -> Result where - S: EventStore, + S: EventService, { match event { unvalidated::Event::Time(time_event) => { @@ -83,7 +83,7 @@ fn event_id_from_init_payload( async fn get_init_event_payload_from_store( init_cid: &Cid, - store: &impl EventStore, + store: &impl EventService, ) -> Result> { let init_bytes = store .get_block(init_cid) diff --git a/api/src/tests.rs b/api/src/tests.rs index dc7be7d81..2ede64b9f 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -4,7 +4,7 @@ use std::{ops::Range, str::FromStr, sync::Arc}; use crate::server::{decode_multibase_data, BuildResponse, Server}; use crate::{ - ApiItem, EventDataResult, EventInsertResult, EventStore, IncludeEventData, InterestStore, + ApiItem, EventDataResult, EventInsertResult, EventService, IncludeEventData, InterestService, }; use anyhow::Result; @@ -95,7 +95,7 @@ pub fn decode_multibase_str(encoded: &str) -> Vec { mock! { pub AccessInterestStoreTest {} #[async_trait] - impl InterestStore for AccessInterestStoreTest { + impl InterestService for AccessInterestStoreTest { async fn insert(&self, key: Interest) -> Result; async fn range( &self, @@ -110,7 +110,7 @@ mock! { mock! { pub EventStoreTest {} #[async_trait] - impl EventStore for EventStoreTest { + impl EventService for EventStoreTest { async fn insert_many(&self, items: Vec) -> Result>; async fn range_with_values( &self, diff --git a/store/Cargo.toml b/event-svc/Cargo.toml similarity index 80% rename from store/Cargo.toml rename to event-svc/Cargo.toml index c5f03ec2a..260c27e83 100644 --- a/store/Cargo.toml +++ b/event-svc/Cargo.toml @@ -1,34 +1,39 @@ [package] -name = "ceramic-store" -description = "A storage implementation for Ceramic" +name = "ceramic-event-svc" version.workspace = true edition.workspace = true authors.workspace = true license.workspace = true repository.workspace = true -publish = false [dependencies] anyhow.workspace = true async-trait.workspace = true +bytes.workspace = true ceramic-api.workspace = true ceramic-car.workspace = true ceramic-core.workspace = true ceramic-event.workspace = true ceramic-metrics.workspace = true +ceramic-sql.workspace = true cid.workspace = true futures.workspace = true hex.workspace = true ipld-core.workspace = true iroh-bitswap.workspace = true -itertools = "0.12.0" +itertools.workspace = true +multibase.workspace = true multihash-codetable.workspace = true +multihash-derive.workspace = true multihash.workspace = true prometheus-client.workspace = true recon.workspace = true +serde.workspace = true +serde_ipld_dagcbor.workspace = true sqlx.workspace = true thiserror.workspace = true tokio.workspace = true +tracing.workspace = true [dev-dependencies] ceramic-event.workspace = true @@ -38,6 +43,7 @@ ipld-core.workspace = true multibase.workspace = true paste = "1.0" rand.workspace = true +serde.workspace = true serde_ipld_dagcbor.workspace = true test-log.workspace = true tmpdir.workspace = true @@ -45,7 +51,6 @@ tokio.workspace = true tracing-subscriber.workspace = true uuid.workspace = true - [[bench]] name = "sqlite_store" harness = false diff --git a/store/benches/sqlite_store.rs b/event-svc/benches/sqlite_store.rs similarity index 94% rename from store/benches/sqlite_store.rs rename to event-svc/benches/sqlite_store.rs index f526303c2..8dcac0665 100644 --- a/store/benches/sqlite_store.rs +++ b/event-svc/benches/sqlite_store.rs @@ -6,12 +6,13 @@ use ceramic_event::unvalidated::{ signed::{self, Signer}, Builder, }; -use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; use criterion2::{criterion_group, criterion_main, BatchSize, Criterion}; use ipld_core::ipld::Ipld; use itertools::Itertools; use rand::RngCore; +use crate::store::{CeramicOneEvent, EventInsertable, SqlitePool}; + const INSERT_BATCH_SIZE: usize = 10; struct ModelSetup { @@ -95,7 +96,8 @@ async fn model_routine(input: ModelSetup) { fn small_model_inserts(c: &mut Criterion) { let exec = tokio::runtime::Runtime::new().unwrap(); - let dir = exec.block_on(async move { tmpdir::TmpDir::new("ceramic_store").await.unwrap() }); + let dir = + exec.block_on(async move { tmpdir::TmpDir::new("event_service_store").await.unwrap() }); let mut group = c.benchmark_group("small model inserts"); group.bench_function("sqlite store", move |b| { b.to_async(&exec).iter_batched_async_setup( @@ -117,7 +119,9 @@ fn small_model_inserts(c: &mut Criterion) { fn large_model_inserts(c: &mut Criterion) { let exec = tokio::runtime::Runtime::new().unwrap(); - let dir = exec.block_on(tmpdir::TmpDir::new("ceramic_store")).unwrap(); + let dir = exec + .block_on(tmpdir::TmpDir::new("event_service_store")) + .unwrap(); let mut group = c.benchmark_group("large model inserts"); group.bench_function("sqlite store", |b| { b.to_async(&exec).iter_batched_async_setup( diff --git a/service/src/error.rs b/event-svc/src/error.rs similarity index 87% rename from service/src/error.rs rename to event-svc/src/error.rs index f75c33d0a..93c4acb1a 100644 --- a/service/src/error.rs +++ b/event-svc/src/error.rs @@ -89,13 +89,13 @@ impl From for recon::Error { } } -impl From for Error { - fn from(value: ceramic_store::Error) -> Self { +impl From for Error { + fn from(value: crate::store::Error) -> Self { match value { - ceramic_store::Error::Application { error } => Error::Application { error }, - ceramic_store::Error::Fatal { error } => Error::Fatal { error }, - ceramic_store::Error::Transient { error } => Error::Transient { error }, - ceramic_store::Error::InvalidArgument { error } => Error::InvalidArgument { error }, + crate::store::Error::Application { error } => Error::Application { error }, + crate::store::Error::Fatal { error } => Error::Fatal { error }, + crate::store::Error::Transient { error } => Error::Transient { error }, + crate::store::Error::InvalidArgument { error } => Error::InvalidArgument { error }, } } } diff --git a/service/src/event/migration.rs b/event-svc/src/event/migration.rs similarity index 99% rename from service/src/event/migration.rs rename to event-svc/src/event/migration.rs index 148ab8b0d..370412b31 100644 --- a/service/src/event/migration.rs +++ b/event-svc/src/event/migration.rs @@ -13,11 +13,11 @@ use tracing::{debug, error, info, instrument, Level}; use crate::{ event::{BlockStore, DeliverableRequirement}, - CeramicEventService, + EventService, }; pub struct Migrator<'a, S> { - service: &'a CeramicEventService, + service: &'a EventService, network: Network, blocks: S, batch: Vec>, @@ -39,11 +39,11 @@ pub struct Migrator<'a, S> { impl<'a, S: BlockStore> Migrator<'a, S> { pub async fn new( - service: &'a CeramicEventService, + service: &'a EventService, network: Network, blocks: S, log_tile_docs: bool, - ) -> anyhow::Result { + ) -> Result { Ok(Self { network, service, @@ -369,7 +369,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> { type Result = std::result::Result; #[derive(Error, Debug)] -enum Error { +pub enum Error { #[error("missing linked block from event {0}")] MissingBlock(Cid), #[error("block is an init tile document: {0}")] diff --git a/service/src/event/mod.rs b/event-svc/src/event/mod.rs similarity index 50% rename from service/src/event/mod.rs rename to event-svc/src/event/mod.rs index ac371a076..5a7e4dd38 100644 --- a/service/src/event/mod.rs +++ b/event-svc/src/event/mod.rs @@ -4,4 +4,4 @@ mod ordering_task; mod service; mod store; -pub use service::{BlockStore, CeramicEventService, DeliverableRequirement}; +pub use service::{BlockStore, DeliverableRequirement, EventService}; diff --git a/service/src/event/order_events.rs b/event-svc/src/event/order_events.rs similarity index 97% rename from service/src/event/order_events.rs rename to event-svc/src/event/order_events.rs index 237d9d31e..56664e9a6 100644 --- a/service/src/event/order_events.rs +++ b/event-svc/src/event/order_events.rs @@ -1,8 +1,9 @@ use std::collections::{HashMap, VecDeque}; use ceramic_core::Cid; -use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; +use ceramic_sql::sqlite::SqlitePool; +use crate::store::{CeramicOneEvent, EventInsertable}; use crate::Result; pub(crate) struct OrderEvents { @@ -141,7 +142,7 @@ mod test { use super::*; - use crate::{tests::get_n_events, CeramicEventService}; + use crate::{tests::get_n_events, EventService}; async fn get_2_streams() -> ( Vec>, @@ -152,9 +153,7 @@ mod test { let stream_1 = get_n_events(10).await; let mut to_insert = Vec::with_capacity(10); for event in stream_1.iter().chain(stream_2.iter()) { - let insertable = CeramicEventService::parse_discovered_event(event) - .await - .unwrap(); + let insertable = EventService::parse_discovered_event(event).await.unwrap(); to_insert.push(insertable); } (stream_1, stream_2, to_insert) @@ -188,9 +187,7 @@ mod test { let mut insertable = Vec::with_capacity(first_vec_count); let mut remaining = Vec::with_capacity(events.len() - first_vec_count); for (i, event) in events.iter().enumerate() { - let new = CeramicEventService::parse_discovered_event(event) - .await - .unwrap(); + let new = EventService::parse_discovered_event(event).await.unwrap(); if i < first_vec_count { insertable.push(new); } else { diff --git a/service/src/event/ordering_task.rs b/event-svc/src/event/ordering_task.rs similarity index 99% rename from service/src/event/ordering_task.rs rename to event-svc/src/event/ordering_task.rs index b59c8ec77..de040507b 100644 --- a/service/src/event/ordering_task.rs +++ b/event-svc/src/event/ordering_task.rs @@ -2,11 +2,12 @@ use std::collections::{HashMap, VecDeque}; use anyhow::anyhow; use ceramic_event::unvalidated; -use ceramic_store::{CeramicOneEvent, SqlitePool}; +use ceramic_sql::sqlite::SqlitePool; use cid::Cid; use ipld_core::ipld::Ipld; use tracing::{debug, error, info, trace, warn}; +use crate::store::CeramicOneEvent; use crate::{Error, Result}; use super::service::DiscoveredEvent; @@ -597,10 +598,10 @@ impl OrderingState { #[cfg(test)] mod test { - use ceramic_store::EventInsertable; + use crate::store::EventInsertable; use test_log::test; - use crate::{tests::get_n_events, CeramicEventService}; + use crate::{tests::get_n_events, EventService}; use super::*; @@ -608,9 +609,7 @@ mod test { let mut res = Vec::with_capacity(n); let events = get_n_events(n).await; for event in events { - let event = CeramicEventService::parse_discovered_event(&event) - .await - .unwrap(); + let event = EventService::parse_discovered_event(&event).await.unwrap(); res.push(event); } res diff --git a/service/src/event/service.rs b/event-svc/src/event/service.rs similarity index 96% rename from service/src/event/service.rs rename to event-svc/src/event/service.rs index fdb8cfaca..ddfeeff75 100644 --- a/service/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use ceramic_core::{EventId, Network}; use ceramic_event::unvalidated; use ceramic_event::unvalidated::Event; -use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; +use ceramic_sql::sqlite::SqlitePool; use cid::Cid; use futures::stream::BoxStream; use ipld_core::ipld::Ipld; @@ -17,6 +17,7 @@ use recon::ReconItem; use tokio::try_join; use tracing::{trace, warn}; +use crate::store::{CeramicOneEvent, EventInsertable}; use crate::{Error, Result}; /// How many events to select at once to see if they've become deliverable when we have downtime @@ -35,7 +36,7 @@ const PENDING_EVENTS_CHANNEL_DEPTH: usize = 1_000_000; #[derive(Debug)] /// A database store that verifies the bytes it stores are valid Ceramic events. /// Implements the [`recon::Store`], [`iroh_bitswap::Store`], and [`ceramic_api::EventStore`] traits for [`ceramic_core::EventId`]. -pub struct CeramicEventService { +pub struct EventService { pub(crate) pool: SqlitePool, _validate_events: bool, delivery_task: DeliverableTask, @@ -65,9 +66,9 @@ pub enum DeliverableRequirement { Lazy, } -impl CeramicEventService { +impl EventService { /// Create a new CeramicEventStore - pub async fn new(pool: SqlitePool, _validate_events: bool) -> Result { + pub async fn try_new(pool: SqlitePool, _validate_events: bool) -> Result { CeramicOneEvent::init_delivered_order(&pool).await?; let delivery_task = OrderingTask::run(pool.clone(), PENDING_EVENTS_CHANNEL_DEPTH).await; @@ -84,7 +85,7 @@ impl CeramicEventService { /// in the next pass.. but it's basically same same but different. #[allow(dead_code)] pub(crate) async fn new_with_event_validation(pool: SqlitePool) -> Result { - Self::new(pool, true).await + Self::try_new(pool, true).await } /// Returns the number of undelivered events that were updated @@ -97,6 +98,7 @@ impl CeramicEventService { .await } + /// Migrate a collection of blocks into the event service. pub async fn migrate_from_ipfs( &self, network: Network, @@ -255,7 +257,7 @@ impl CeramicEventService { async fn notify_ordering_task( &self, ordered: &OrderEvents, - store_result: &ceramic_store::InsertResult, + store_result: &crate::store::InsertResult, ) -> Result<()> { let new = store_result .inserted @@ -313,7 +315,7 @@ pub enum InvalidItem { #[derive(Debug, PartialEq, Eq, Default)] pub struct InsertResult { pub rejected: Vec, - pub(crate) store_result: ceramic_store::InsertResult, + pub(crate) store_result: crate::store::InsertResult, } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/service/src/event/store.rs b/event-svc/src/event/store.rs similarity index 92% rename from service/src/event/store.rs rename to event-svc/src/event/store.rs index 51346ccad..796353d2d 100644 --- a/service/src/event/store.rs +++ b/event-svc/src/event/store.rs @@ -2,12 +2,13 @@ use std::ops::Range; use anyhow::anyhow; use ceramic_core::EventId; -use ceramic_store::{CeramicOneBlock, CeramicOneEvent}; use cid::Cid; use iroh_bitswap::Block; use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; -use crate::event::{CeramicEventService, DeliverableRequirement}; +use crate::event::{DeliverableRequirement, EventService}; +use crate::store::{CeramicOneBlock, CeramicOneEvent}; +use crate::Error; use super::service::{InsertResult, InvalidItem}; @@ -33,7 +34,7 @@ impl From for recon::InsertResult { } #[async_trait::async_trait] -impl recon::Store for CeramicEventService { +impl recon::Store for EventService { type Key = EventId; type Hash = Sha256a; @@ -55,7 +56,9 @@ impl recon::Store for CeramicEventService { /// Both range bounds are exclusive. /// Returns ReconResult<(Hash, count), Err> async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { - let res = CeramicOneEvent::hash_range(&self.pool, range).await?; + let res = CeramicOneEvent::hash_range(&self.pool, range) + .await + .map_err(Error::from)?; Ok(res) } @@ -71,7 +74,8 @@ impl recon::Store for CeramicEventService { ) -> ReconResult + Send + 'static>> { Ok(Box::new( CeramicOneEvent::range(&self.pool, range, offset, limit) - .await? + .await + .map_err(Error::from)? .into_iter(), )) } @@ -88,13 +92,16 @@ impl recon::Store for CeramicEventService { ) -> ReconResult)> + Send + 'static>> { Ok(Box::new( CeramicOneEvent::range_with_values(&self.pool, range, offset, limit) - .await? + .await + .map_err(Error::from)? .into_iter(), )) } /// Return the number of keys within the range. async fn count(&self, range: Range<&Self::Key>) -> ReconResult { - Ok(CeramicOneEvent::count(&self.pool, range).await?) + Ok(CeramicOneEvent::count(&self.pool, range) + .await + .map_err(Error::from)?) } /// value_for_key returns @@ -102,12 +109,14 @@ impl recon::Store for CeramicEventService { /// Ok(None) if not stored, and /// Err(e) if retrieving failed. async fn value_for_key(&self, key: &Self::Key) -> ReconResult>> { - Ok(CeramicOneEvent::value_by_order_key(&self.pool, key).await?) + Ok(CeramicOneEvent::value_by_order_key(&self.pool, key) + .await + .map_err(Error::from)?) } } #[async_trait::async_trait] -impl iroh_bitswap::Store for CeramicEventService { +impl iroh_bitswap::Store for EventService { async fn get_size(&self, cid: &Cid) -> anyhow::Result { Ok(CeramicOneBlock::get_size(&self.pool, cid).await?) } @@ -150,7 +159,7 @@ impl From for Vec { } #[async_trait::async_trait] -impl ceramic_api::EventStore for CeramicEventService { +impl ceramic_api::EventService for EventService { async fn insert_many( &self, items: Vec, diff --git a/event-svc/src/lib.rs b/event-svc/src/lib.rs new file mode 100644 index 000000000..fe8620005 --- /dev/null +++ b/event-svc/src/lib.rs @@ -0,0 +1,13 @@ +//! The Event Service provides an API for ingesting and querying Ceramic Events. +#![warn(missing_docs)] + +mod error; +mod event; +pub mod store; +#[cfg(test)] +mod tests; + +pub use error::Error; +pub use event::{BlockStore, EventService}; + +pub(crate) type Result = std::result::Result; diff --git a/store/src/metrics.rs b/event-svc/src/store/metrics.rs similarity index 88% rename from store/src/metrics.rs rename to event-svc/src/store/metrics.rs index 03cd2bec0..4a89996f2 100644 --- a/store/src/metrics.rs +++ b/event-svc/src/store/metrics.rs @@ -1,7 +1,7 @@ use std::{ops::Range, time::Duration}; use async_trait::async_trait; -use ceramic_core::{Cid, EventId, Interest}; +use ceramic_core::{Cid, EventId}; use ceramic_metrics::{register, Recorder}; use futures::Future; use prometheus_client::{ @@ -114,49 +114,12 @@ impl StoreMetricsMiddleware { metrics.record(&event); ret } - - fn record_key_insert(&self, new_key: bool) { - if new_key { - self.metrics.record(&InsertEvent { cnt: 1 }); - } - } -} - -#[async_trait] -impl ceramic_api::InterestStore for StoreMetricsMiddleware -where - S: ceramic_api::InterestStore, -{ - async fn insert(&self, key: Interest) -> anyhow::Result { - let new = StoreMetricsMiddleware::::record( - &self.metrics, - "api_interest_insert", - self.store.insert(key), - ) - .await?; - self.record_key_insert(new); - Ok(new) - } - async fn range( - &self, - start: &Interest, - end: &Interest, - offset: usize, - limit: usize, - ) -> anyhow::Result> { - StoreMetricsMiddleware::::record( - &self.metrics, - "api_interest_range", - self.store.range(start, end, offset, limit), - ) - .await - } } #[async_trait] -impl ceramic_api::EventStore for StoreMetricsMiddleware +impl ceramic_api::EventService for StoreMetricsMiddleware where - S: ceramic_api::EventStore, + S: ceramic_api::EventService, { async fn insert_many( &self, diff --git a/event-svc/src/store/mod.rs b/event-svc/src/store/mod.rs new file mode 100644 index 000000000..8606c828c --- /dev/null +++ b/event-svc/src/store/mod.rs @@ -0,0 +1,11 @@ +//! An implementation of store for event. + +mod metrics; +mod sql; + +pub use metrics::{Metrics, StoreMetricsMiddleware}; +pub use sql::{ + entities::{BlockHash, EventBlockRaw, EventInsertable}, + CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneVersion, Error, InsertResult, + InsertedEvent, Result, SqlitePool, SqliteRootStore, +}; diff --git a/store/src/sql/access/block.rs b/event-svc/src/store/sql/access/block.rs similarity index 96% rename from store/src/sql/access/block.rs rename to event-svc/src/store/sql/access/block.rs index 4fa75d625..3e2027fdc 100644 --- a/store/src/sql/access/block.rs +++ b/event-svc/src/store/sql/access/block.rs @@ -1,13 +1,13 @@ +use ceramic_sql::sqlite::{SqlitePool, SqliteTransaction}; use cid::Cid; use multihash_codetable::Multihash; -use crate::{ +use crate::store::{ sql::{ entities::{BlockBytes, CountRow}, query::BlockQuery, - sqlite::SqliteTransaction, }, - Result, SqlitePool, + Result, }; /// Access to the block table and related logic diff --git a/store/src/sql/access/event.rs b/event-svc/src/store/sql/access/event.rs similarity index 94% rename from store/src/sql/access/event.rs rename to event-svc/src/store/sql/access/event.rs index 5c88c1f74..67ff91dc4 100644 --- a/store/src/sql/access/event.rs +++ b/event-svc/src/store/sql/access/event.rs @@ -7,19 +7,19 @@ use std::{ use anyhow::anyhow; use ceramic_core::{event_id::InvalidEventId, Cid, EventId}; use ceramic_event::unvalidated; +use ceramic_sql::sqlite::{SqlitePool, SqliteTransaction}; use ipld_core::ipld::Ipld; -use recon::{AssociativeHash, HashCount, Key, Result as ReconResult, Sha256a}; +use recon::{AssociativeHash, HashCount, Key, Sha256a}; -use crate::{ +use crate::store::{ sql::{ entities::{ rebuild_car, BlockRow, CountRow, EventInsertable, OrderKey, ReconEventBlockRaw, ReconHash, }, - query::{EventQuery, ReconQuery, ReconType, SqlBackend}, - sqlite::SqliteTransaction, + query::{EventQuery, ReconQuery, SqlBackend}, }, - CeramicOneBlock, CeramicOneEventBlock, Error, Result, SqlitePool, + CeramicOneBlock, CeramicOneEventBlock, Error, Result, }; static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0); @@ -196,14 +196,12 @@ impl CeramicOneEvent { pub async fn hash_range( pool: &SqlitePool, range: Range<&EventId>, - ) -> ReconResult> { - let row: ReconHash = - sqlx::query_as(ReconQuery::hash_range(ReconType::Event, SqlBackend::Sqlite)) - .bind(range.start.as_bytes()) - .bind(range.end.as_bytes()) - .fetch_one(pool.reader()) - .await - .map_err(Error::from)?; + ) -> Result> { + let row: ReconHash = sqlx::query_as(ReconQuery::hash_range(SqlBackend::Sqlite)) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_one(pool.reader()) + .await?; Ok(HashCount::new(Sha256a::from(row.hash()), row.count())) } @@ -218,7 +216,7 @@ impl CeramicOneEvent { Error::new_app(anyhow!("Offset too large to fit into i64")) })?; let limit = limit.try_into().unwrap_or(100000); // 100k is still a huge limit - let rows: Vec = sqlx::query_as(ReconQuery::range(ReconType::Event)) + let rows: Vec = sqlx::query_as(ReconQuery::range()) .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) .bind(limit) @@ -257,13 +255,12 @@ impl CeramicOneEvent { } /// Count the number of events in a range - pub async fn count(pool: &SqlitePool, range: Range<&EventId>) -> ReconResult { - let row: CountRow = sqlx::query_as(ReconQuery::count(ReconType::Event, SqlBackend::Sqlite)) + pub async fn count(pool: &SqlitePool, range: Range<&EventId>) -> Result { + let row: CountRow = sqlx::query_as(ReconQuery::count(SqlBackend::Sqlite)) .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) .fetch_one(pool.reader()) - .await - .map_err(Error::from)?; + .await?; Ok(row.res as usize) } diff --git a/store/src/sql/access/event_block.rs b/event-svc/src/store/sql/access/event_block.rs similarity index 84% rename from store/src/sql/access/event_block.rs rename to event-svc/src/store/sql/access/event_block.rs index eab0e8a89..bd8cf7ec8 100644 --- a/store/src/sql/access/event_block.rs +++ b/event-svc/src/store/sql/access/event_block.rs @@ -1,5 +1,7 @@ -use crate::{ - sql::{entities::EventBlockRaw, query::EventBlockQuery, sqlite::SqliteTransaction}, +use ceramic_sql::sqlite::SqliteTransaction; + +use crate::store::{ + sql::{entities::EventBlockRaw, query::EventBlockQuery}, Result, }; diff --git a/store/src/sql/access/mod.rs b/event-svc/src/store/sql/access/mod.rs similarity index 81% rename from store/src/sql/access/mod.rs rename to event-svc/src/store/sql/access/mod.rs index f5c2ebd7d..9fef76613 100644 --- a/store/src/sql/access/mod.rs +++ b/event-svc/src/store/sql/access/mod.rs @@ -1,11 +1,9 @@ mod block; mod event; mod event_block; -mod interest; mod version; pub use block::CeramicOneBlock; pub use event::{CeramicOneEvent, InsertResult, InsertedEvent}; pub use event_block::CeramicOneEventBlock; -pub use interest::CeramicOneInterest; pub use version::CeramicOneVersion; diff --git a/event-svc/src/store/sql/access/version.rs b/event-svc/src/store/sql/access/version.rs new file mode 100644 index 000000000..2f57f37de --- /dev/null +++ b/event-svc/src/store/sql/access/version.rs @@ -0,0 +1,95 @@ +use std::str::FromStr; + +use anyhow::anyhow; + +use crate::store::sql::{entities::VersionRow, Error, Result, SqlitePool}; + +#[derive(Debug, Clone, PartialEq, Eq)] +/// It's kind of pointless to roundtrip CARGO_PKG_VERSION through this struct, +/// but it makes it clear how we expect to format our versions in the database. +struct SemVer { + major: u64, + minor: u64, + patch: u64, +} + +impl std::fmt::Display for SemVer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}.{}", self.major, self.minor, self.patch) + } +} + +impl std::str::FromStr for SemVer { + type Err = Error; + + fn from_str(s: &str) -> std::result::Result { + let parts: Vec<&str> = s.split('.').collect(); + if parts.len() != 3 { + Err(Error::new_invalid_arg(anyhow!( + "Invalid version. Must have 3 parts: {}", + s.to_string() + ))) + } else { + let major = parts[0].parse().map_err(|_| { + Error::new_invalid_arg(anyhow!( + "Invalid version. Major did not parse: {}", + s.to_string() + )) + })?; + let minor = parts[1].parse().map_err(|_| { + Error::new_invalid_arg(anyhow!( + "Invalid version. Minor did not parse: {}", + s.to_string() + )) + })?; + let patch = parts[2].parse().map_err(|_| { + Error::new_invalid_arg(anyhow!( + "Invalid version. Patch did not parse: {}", + s.to_string() + )) + })?; + Ok(Self { + major, + minor, + patch, + }) + } + } +} + +#[derive(Debug, Clone)] +/// Access to ceramic version information +pub struct CeramicOneVersion {} + +impl CeramicOneVersion { + /// Fetch the previous version from the database. May be None if no previous version exists. + pub async fn fetch_previous(pool: &SqlitePool) -> Result> { + let current = SemVer::from_str(env!("CARGO_PKG_VERSION"))?; + VersionRow::_fetch_previous(pool, ¤t.to_string()).await + } + + /// Insert the current version into the database + pub async fn insert_current(pool: &SqlitePool) -> Result<()> { + let current = SemVer::from_str(env!("CARGO_PKG_VERSION"))?; + VersionRow::insert_current(pool, ¤t.to_string()).await + } +} + +#[cfg(test)] +mod test { + use ceramic_sql::sqlite::SqlitePool; + + use super::*; + + #[tokio::test] + async fn insert_version() { + let mem = SqlitePool::connect_in_memory().await.unwrap(); + CeramicOneVersion::insert_current(&mem).await.unwrap(); + } + + #[tokio::test] + async fn prev_version() { + let mem = SqlitePool::connect_in_memory().await.unwrap(); + CeramicOneVersion::fetch_previous(&mem).await.unwrap(); + } +} diff --git a/store/src/sql/entities/block.rs b/event-svc/src/store/sql/entities/block.rs similarity index 98% rename from store/src/sql/entities/block.rs rename to event-svc/src/store/sql/entities/block.rs index 98b1058eb..210ea86e3 100644 --- a/store/src/sql/entities/block.rs +++ b/event-svc/src/store/sql/entities/block.rs @@ -5,7 +5,7 @@ use cid::Cid; use multihash_codetable::Multihash; use sqlx::{sqlite::SqliteRow, FromRow, Row}; -use crate::Error; +use crate::store::Error; use super::EventBlockRaw; diff --git a/store/src/sql/entities/event.rs b/event-svc/src/store/sql/entities/event.rs similarity index 97% rename from store/src/sql/entities/event.rs rename to event-svc/src/store/sql/entities/event.rs index 5e85c5ab8..a41e6a003 100644 --- a/store/src/sql/entities/event.rs +++ b/event-svc/src/store/sql/entities/event.rs @@ -7,9 +7,8 @@ use ceramic_event::unvalidated; use cid::Cid; use ipld_core::ipld::Ipld; -pub use crate::sql::entities::EventBlockRaw; - -use crate::{sql::entities::BlockRow, Error, Result}; +use super::{BlockRow, EventBlockRaw}; +use crate::store::{Error, Result}; pub async fn rebuild_car(blocks: Vec) -> Result>> { if blocks.is_empty() { diff --git a/store/src/sql/entities/event_block.rs b/event-svc/src/store/sql/entities/event_block.rs similarity index 99% rename from store/src/sql/entities/event_block.rs rename to event-svc/src/store/sql/entities/event_block.rs index 4d0e036c6..dd53759e7 100644 --- a/store/src/sql/entities/event_block.rs +++ b/event-svc/src/store/sql/entities/event_block.rs @@ -9,7 +9,7 @@ use itertools::{process_results, Itertools}; use multihash_codetable::{Code, MultihashDigest}; use sqlx::{sqlite::SqliteRow, Row as _}; -use crate::{ +use crate::store::{ sql::entities::{rebuild_car, BlockHash, BlockRow}, Error, Result, }; diff --git a/store/src/sql/entities/hash.rs b/event-svc/src/store/sql/entities/hash.rs similarity index 98% rename from store/src/sql/entities/hash.rs rename to event-svc/src/store/sql/entities/hash.rs index f94d6cd0c..9f3feb1e2 100644 --- a/store/src/sql/entities/hash.rs +++ b/event-svc/src/store/sql/entities/hash.rs @@ -2,7 +2,7 @@ use multihash::Multihash; use sqlx::{sqlite::SqliteRow, Row as _}; -use crate::{Error, Result}; +use crate::store::{Error, Result}; // TODO: make type private #[allow(missing_docs)] diff --git a/store/src/sql/entities/mod.rs b/event-svc/src/store/sql/entities/mod.rs similarity index 100% rename from store/src/sql/entities/mod.rs rename to event-svc/src/store/sql/entities/mod.rs diff --git a/store/src/sql/entities/utils.rs b/event-svc/src/store/sql/entities/utils.rs similarity index 100% rename from store/src/sql/entities/utils.rs rename to event-svc/src/store/sql/entities/utils.rs diff --git a/store/src/sql/entities/version.rs b/event-svc/src/store/sql/entities/version.rs similarity index 96% rename from store/src/sql/entities/version.rs rename to event-svc/src/store/sql/entities/version.rs index 35aeaf712..b30ebf908 100644 --- a/store/src/sql/entities/version.rs +++ b/event-svc/src/store/sql/entities/version.rs @@ -1,6 +1,6 @@ use sqlx::types::chrono; -use crate::{Result, SqlitePool}; +use crate::store::{Result, SqlitePool}; #[derive(Debug, Clone, sqlx::FromRow)] // We want to retrieve these fields for logging but we don't refer to them directly diff --git a/event-svc/src/store/sql/mod.rs b/event-svc/src/store/sql/mod.rs new file mode 100644 index 000000000..3bff5829f --- /dev/null +++ b/event-svc/src/store/sql/mod.rs @@ -0,0 +1,13 @@ +mod access; +pub mod entities; +mod query; +mod root; +#[cfg(test)] +mod test; + +pub use access::{ + CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneVersion, InsertResult, + InsertedEvent, +}; +pub use ceramic_sql::{sqlite::SqlitePool, Error, Result}; +pub use root::SqliteRootStore; diff --git a/store/src/sql/query.rs b/event-svc/src/store/sql/query.rs similarity index 73% rename from store/src/sql/query.rs rename to event-svc/src/store/sql/query.rs index 6bb45c6d8..c993a43b2 100644 --- a/store/src/sql/query.rs +++ b/event-svc/src/store/sql/query.rs @@ -162,34 +162,15 @@ impl EventBlockQuery { } } -/// Holds the SQL queries than can be shared between interests, events, and across DB types +/// Holds the SQL queries for accessing events across DB types pub struct ReconQuery {} -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum ReconType { - Event, - Interest, -} - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum SqlBackend { Sqlite, } impl ReconQuery { - /// Requires 9 parameters: the order_key and the 8 hash values - pub fn insert_interest() -> &'static str { - "INSERT INTO ceramic_one_interest ( - order_key, - ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7 - ) VALUES ( - $1, - $2, $3, $4, $5, - $6, $7, $8, $9 - );" - } - /// Requires 10 parameters: the order_key, cid and the 8 hash values pub fn insert_event() -> &'static str { "INSERT INTO ceramic_one_event ( @@ -206,9 +187,9 @@ impl ReconQuery { } /// Requires binding 2 parameters. Returned as `ReconHash` struct - pub fn hash_range(key_type: ReconType, db: SqlBackend) -> &'static str { - match (key_type, db) { - (ReconType::Event, SqlBackend::Sqlite) => { + pub fn hash_range(db: SqlBackend) -> &'static str { + match db { + SqlBackend::Sqlite => { r#"SELECT TOTAL(ahash_0) & 0xFFFFFFFF as ahash_0, TOTAL(ahash_1) & 0xFFFFFFFF as ahash_1, TOTAL(ahash_2) & 0xFFFFFFFF as ahash_2, TOTAL(ahash_3) & 0xFFFFFFFF as ahash_3, @@ -218,55 +199,27 @@ impl ReconQuery { FROM ceramic_one_event WHERE order_key >= $1 AND order_key < $2;"# } - (ReconType::Interest, SqlBackend::Sqlite) => { - r#"SELECT - TOTAL(ahash_0) & 0xFFFFFFFF as ahash_0, TOTAL(ahash_1) & 0xFFFFFFFF as ahash_1, - TOTAL(ahash_2) & 0xFFFFFFFF as ahash_2, TOTAL(ahash_3) & 0xFFFFFFFF as ahash_3, - TOTAL(ahash_4) & 0xFFFFFFFF as ahash_4, TOTAL(ahash_5) & 0xFFFFFFFF as ahash_5, - TOTAL(ahash_6) & 0xFFFFFFFF as ahash_6, TOTAL(ahash_7) & 0xFFFFFFFF as ahash_7, - COUNT(1) as count - FROM ceramic_one_interest - WHERE order_key >= $1 AND order_key < $2;"# - } } } /// Requires binding 2 parameters - pub fn range(key_type: ReconType) -> &'static str { - match key_type { - ReconType::Event => { - r#"SELECT - order_key - FROM - ceramic_one_event - WHERE - order_key >= $1 AND order_key < $2 - ORDER BY - order_key ASC - LIMIT - $3 - OFFSET - $4;"# - } - ReconType::Interest => { - r#"SELECT - order_key - FROM - ceramic_one_interest - WHERE - order_key >= $1 AND order_key < $2 - ORDER BY - order_key ASC - LIMIT - $3 - OFFSET - $4;"# - } - } - } - - pub fn count(key_type: ReconType, db: SqlBackend) -> &'static str { - match (key_type, db) { - (ReconType::Event, SqlBackend::Sqlite) => { + pub fn range() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_event + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC + LIMIT + $3 + OFFSET + $4;"# + } + + pub fn count(db: SqlBackend) -> &'static str { + match db { + SqlBackend::Sqlite => { r#"SELECT count(order_key) as res FROM @@ -274,14 +227,6 @@ impl ReconQuery { WHERE order_key >= $1 AND order_key < $2"# } - (ReconType::Interest, SqlBackend::Sqlite) => { - r#"SELECT - count(order_key) as res - FROM - ceramic_one_interest - WHERE - order_key >= $1 AND order_key < $2"# - } } } } diff --git a/store/src/sql/root.rs b/event-svc/src/store/sql/root.rs similarity index 97% rename from store/src/sql/root.rs rename to event-svc/src/store/sql/root.rs index 2680825da..08731e701 100644 --- a/store/src/sql/root.rs +++ b/event-svc/src/store/sql/root.rs @@ -1,8 +1,7 @@ use anyhow::Result; +use ceramic_sql::sqlite::SqlitePool; use sqlx::Row; -use crate::SqlitePool; - /// We use the RootStore as a local cache of the EthereumRootStore /// /// The EthereumRootStore is the authoritative root store but it is also immutable diff --git a/store/src/sql/test.rs b/event-svc/src/store/sql/test.rs similarity index 98% rename from store/src/sql/test.rs rename to event-svc/src/store/sql/test.rs index 360d0c5a6..0eb7a1501 100644 --- a/store/src/sql/test.rs +++ b/event-svc/src/store/sql/test.rs @@ -1,6 +1,5 @@ use std::str::FromStr; -use crate::{CeramicOneEvent, EventInsertable, SqlitePool}; use ceramic_core::{ event_id::{Builder, WithInit}, EventId, Network, @@ -14,6 +13,8 @@ use multihash_codetable::{Code, MultihashDigest}; use serde_ipld_dagcbor::codec::DagCborCodec; use test_log::test; +use crate::store::{CeramicOneEvent, EventInsertable, SqlitePool}; + const MODEL_ID: &str = "k2t6wz4yhfp1r5pwi52gw89nzjbu53qk7m32o5iguw42c6knsaj0feuf927agb"; const CONTROLLER: &str = "did:key:z6Mkqtw7Pj5Lv9xc4PgUYAnwfaVoMC6FRneGWVr5ekTEfKVL"; const INIT_ID: &str = "baeabeiajn5ypv2gllvkk4muvzujvcnoen2orknxix7qtil2daqn6vu6khq"; diff --git a/service/src/tests/event.rs b/event-svc/src/tests/event.rs similarity index 96% rename from service/src/tests/event.rs rename to event-svc/src/tests/event.rs index 4def195ff..6ec7ec676 100644 --- a/service/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use anyhow::Error; use bytes::Bytes; -use ceramic_api::{ApiItem, EventStore}; +use ceramic_api::{ApiItem, EventService as ApiEventService}; use cid::{Cid, CidGeneric}; use expect_test::expect; use iroh_bitswap::Store; @@ -16,8 +16,8 @@ macro_rules! test_with_sqlite { #[test_log::test(tokio::test)] async fn [<$test_name _sqlite>]() { - let conn = ceramic_store::SqlitePool::connect_in_memory().await.unwrap(); - let store = $crate::CeramicEventService::new_with_event_validation(conn).await.unwrap(); + let conn = $crate::store::SqlitePool::connect_in_memory().await.unwrap(); + let store = $crate::EventService::new_with_event_validation(conn).await.unwrap(); store.process_all_undelivered_events().await.unwrap(); $( for stmt in $sql_stmts { @@ -227,7 +227,7 @@ where // stores 3 keys with 3,5,10 block long CAR files // each one takes n+1 blocks as it needs to store the root and all blocks so we expect 3+5+10+3=21 blocks // but we use a delivered integer per event, so we expect it to increment by 1 for each event -async fn prep_highwater_tests(store: &dyn EventStore) -> (Cid, Cid, Cid) { +async fn prep_highwater_tests(store: &dyn ApiEventService) -> (Cid, Cid, Cid) { let mut keys = Vec::with_capacity(3); for _ in 0..3 { let TestEventInfo { @@ -257,7 +257,7 @@ test_with_dbs!( ); async fn events_since_highwater_mark_all_global_counter_with_data(store: S) where - S: EventStore, + S: ApiEventService, { events_since_highwater_mark_all_global_counter(store, true).await; } @@ -273,13 +273,13 @@ test_with_dbs!( ); async fn events_since_highwater_mark_all_global_counter_no_data(store: S) where - S: EventStore, + S: ApiEventService, { events_since_highwater_mark_all_global_counter(store, false).await; } async fn events_since_highwater_mark_all_global_counter( - store: impl EventStore, + store: impl ApiEventService, include_data: bool, ) { let include_data = if include_data { @@ -311,7 +311,7 @@ test_with_dbs!( ); async fn events_since_highwater_mark_limit_1_with_data(store: S) where - S: EventStore, + S: ApiEventService, { events_since_highwater_mark_limit_1(store, true).await; } @@ -327,14 +327,14 @@ test_with_dbs!( ); async fn events_since_highwater_mark_limit_1_no_data(store: S) where - S: EventStore, + S: ApiEventService, { events_since_highwater_mark_limit_1(store, false).await; } async fn events_since_highwater_mark_limit_1(store: S, include_data: bool) where - S: EventStore, + S: ApiEventService, { let include_data = if include_data { ceramic_api::IncludeEventData::Full @@ -367,7 +367,7 @@ test_with_dbs!( "delete from ceramic_one_block", ] ); -async fn events_since_highwater_mark_middle_start_with_data(store: impl EventStore) { +async fn events_since_highwater_mark_middle_start_with_data(store: impl ApiEventService) { events_since_highwater_mark_middle_start(store, true).await; } @@ -380,11 +380,11 @@ test_with_dbs!( "delete from ceramic_one_block", ] ); -async fn events_since_highwater_mark_middle_start_no_data(store: impl EventStore) { +async fn events_since_highwater_mark_middle_start_no_data(store: impl ApiEventService) { events_since_highwater_mark_middle_start(store, false).await; } -async fn events_since_highwater_mark_middle_start(store: impl EventStore, include_data: bool) { +async fn events_since_highwater_mark_middle_start(store: impl ApiEventService, include_data: bool) { let include_data = if include_data { ceramic_api::IncludeEventData::Full } else { @@ -431,7 +431,7 @@ test_with_dbs!( async fn get_event_by_event_id(store: S) where - S: EventStore, + S: ApiEventService, { let TestEventInfo { event_id: key, @@ -457,7 +457,7 @@ test_with_dbs!( async fn get_event_by_cid(store: S) where - S: EventStore, + S: ApiEventService, { let TestEventInfo { event_id: key, diff --git a/service/src/tests/migration.rs b/event-svc/src/tests/migration.rs similarity index 97% rename from service/src/tests/migration.rs rename to event-svc/src/tests/migration.rs index 4ea82c40c..08922d16d 100644 --- a/service/src/tests/migration.rs +++ b/event-svc/src/tests/migration.rs @@ -17,7 +17,7 @@ use ceramic_car::CarReader; use ceramic_core::{DidDocument, EventId, Network, SerializeExt, StreamId}; use ceramic_event::unvalidated; -use crate::{event::BlockStore, CeramicEventService}; +use crate::{event::BlockStore, EventService}; struct InMemBlockStore { blocks: BTreeMap>, @@ -53,12 +53,8 @@ async fn test_migration(cars: Vec>) { .map(|car| multibase::encode(multibase::Base::Base64Url, car)) .collect(); let blocks = blocks_from_cars(cars).await; - let conn = ceramic_store::SqlitePool::connect_in_memory() - .await - .unwrap(); - let service = CeramicEventService::new_with_event_validation(conn) - .await - .unwrap(); + let conn = crate::store::SqlitePool::connect_in_memory().await.unwrap(); + let service = EventService::new_with_event_validation(conn).await.unwrap(); service .migrate_from_ipfs(Network::Local(42), blocks, false) .await diff --git a/service/src/tests/mod.rs b/event-svc/src/tests/mod.rs similarity index 97% rename from service/src/tests/mod.rs rename to event-svc/src/tests/mod.rs index 915894dd8..83afdd1e1 100644 --- a/service/src/tests/mod.rs +++ b/event-svc/src/tests/mod.rs @@ -1,5 +1,4 @@ mod event; -mod interest; mod migration; mod ordering; @@ -117,11 +116,11 @@ fn gen_rand_bytes() -> [u8; SIZE] { } pub(crate) async fn check_deliverable( - pool: &ceramic_store::SqlitePool, + pool: &crate::store::SqlitePool, cid: &Cid, deliverable: bool, ) { - let (exists, delivered) = ceramic_store::CeramicOneEvent::deliverable_by_cid(pool, cid) + let (exists, delivered) = crate::store::CeramicOneEvent::deliverable_by_cid(pool, cid) .await .unwrap(); assert!(exists); diff --git a/service/src/tests/ordering.rs b/event-svc/src/tests/ordering.rs similarity index 93% rename from service/src/tests/ordering.rs rename to event-svc/src/tests/ordering.rs index 5aad68474..cfb85091f 100644 --- a/service/src/tests/ordering.rs +++ b/event-svc/src/tests/ordering.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use ceramic_api::{EventDataResult, EventStore, IncludeEventData}; +use ceramic_api::{EventDataResult, EventService as ApiEventService, IncludeEventData}; use ceramic_core::EventId; use rand::seq::SliceRandom; use rand::thread_rng; @@ -10,26 +10,22 @@ use test_log::test; use crate::{ event::DeliverableRequirement, tests::{check_deliverable, get_events}, - CeramicEventService, + EventService, }; -async fn setup_service() -> CeramicEventService { - let conn = ceramic_store::SqlitePool::connect_in_memory() - .await - .unwrap(); +async fn setup_service() -> EventService { + let conn = crate::store::SqlitePool::connect_in_memory().await.unwrap(); - CeramicEventService::new_with_event_validation(conn) - .await - .unwrap() + EventService::new_with_event_validation(conn).await.unwrap() } -async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem) { +async fn add_and_assert_new_recon_event(store: &EventService, item: ReconItem) { tracing::trace!("inserted event: {}", item.key.cid().unwrap()); let new = recon::Store::insert_many(store, &[item]).await.unwrap(); assert!(new.included_new_key()); } -async fn add_and_assert_new_local_event(store: &CeramicEventService, item: ReconItem) { +async fn add_and_assert_new_local_event(store: &EventService, item: ReconItem) { let new = store .insert_events(&[item], DeliverableRequirement::Immediate) .await @@ -38,7 +34,7 @@ async fn add_and_assert_new_local_event(store: &CeramicEventService, item: Recon assert_eq!(1, new); } -async fn get_delivered_cids(store: &CeramicEventService) -> Vec { +async fn get_delivered_cids(store: &EventService) -> Vec { let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX, IncludeEventData::Full) .await @@ -241,7 +237,7 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat assert_eq!(expected, delivered); } -async fn validate_all_delivered(store: &CeramicEventService, expected_delivered: usize) { +async fn validate_all_delivered(store: &EventService, expected_delivered: usize) { loop { let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX, IncludeEventData::None) @@ -317,7 +313,7 @@ async fn recon_lots_of_streams() { // first just make sure they were all inserted (not delivered yet) for (i, cid) in all_cids.iter().enumerate() { let (exists, _delivered) = - ceramic_store::CeramicOneEvent::deliverable_by_cid(&store.pool, cid) + crate::store::CeramicOneEvent::deliverable_by_cid(&store.pool, cid) .await .unwrap(); assert!(exists, "idx: {}. missing cid: {}", i, cid); diff --git a/service/Cargo.toml b/interest-svc/Cargo.toml similarity index 67% rename from service/Cargo.toml rename to interest-svc/Cargo.toml index f5c11cfb7..c559d1274 100644 --- a/service/Cargo.toml +++ b/interest-svc/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "ceramic-service" +name = "ceramic-interest-svc" version.workspace = true edition.workspace = true authors.workspace = true @@ -9,28 +9,21 @@ repository.workspace = true [dependencies] anyhow.workspace = true async-trait.workspace = true -bytes.workspace = true ceramic-api.workspace = true -ceramic-car.workspace = true ceramic-core.workspace = true -ceramic-event.workspace = true -ceramic-store.workspace = true -cid.workspace = true +ceramic-metrics.workspace = true +ceramic-sql.workspace = true futures.workspace = true -hex.workspace = true -ipld-core.workspace = true -iroh-bitswap.workspace = true -multibase.workspace = true -multihash-codetable.workspace = true -multihash-derive.workspace = true +prometheus-client.workspace = true recon.workspace = true -serde.workspace = true -serde_ipld_dagcbor.workspace = true +sqlx.workspace = true thiserror.workspace = true tokio.workspace = true tracing.workspace = true [dev-dependencies] +ceramic-event.workspace = true +criterion2 = { workspace = true, features = ["async", "async_tokio"] } expect-test.workspace = true ipld-core.workspace = true multibase.workspace = true diff --git a/interest-svc/src/error.rs b/interest-svc/src/error.rs new file mode 100644 index 000000000..93c4acb1a --- /dev/null +++ b/interest-svc/src/error.rs @@ -0,0 +1,101 @@ +#[derive(Debug, thiserror::Error)] +/// The Errors that can be raised by store operations +pub enum Error { + #[error("Application error encountered: {error}")] + /// An internal application error that is not fatal to the process e.g. a 500/unhandled error + Application { + /// The error details that may include context and other information + error: anyhow::Error, + }, + #[error("InvalidArgument: {error}")] + /// Invalid client input + InvalidArgument { + /// The error details that may include context and other information + error: anyhow::Error, + }, + #[error("Fatal error encountered: {error}")] + /// A fatal error that is unlikely to be recoverable, and may require terminating the process completely + Fatal { + /// The error details that may include context and other information + error: anyhow::Error, + }, + #[error("Transient error encountered: {error}")] + /// An error that can be retried, and may resolve itself. If an error is transient repeatedly, it should be + /// considered an "application" level error and propagated upward. + Transient { + /// The error details that may include context and other information + error: anyhow::Error, + }, +} + +impl Error { + /// Create a transient error + pub fn new_transient(error: impl Into) -> Self { + Self::Transient { + error: error.into(), + } + } + /// Create a fatal error + pub fn new_fatal(error: impl Into) -> Self { + Self::Fatal { + error: error.into(), + } + } + + /// Create an application error + pub fn new_app(error: impl Into) -> Self { + Self::Application { + error: error.into(), + } + } + + /// Crate an InvalidArgument error + pub fn new_invalid_arg(error: impl Into) -> Self { + Self::InvalidArgument { + error: error.into(), + } + } + + /// Add context to the internal error. Works identically to `anyhow::context` + pub fn context(self, context: C) -> Self + where + C: std::fmt::Display + Send + Sync + 'static, + { + match self { + Error::Application { error } => Self::Application { + error: error.context(context), + }, + Error::Fatal { error } => Self::Fatal { + error: error.context(context), + }, + Error::Transient { error } => Self::Transient { + error: error.context(context), + }, + Error::InvalidArgument { error } => Self::InvalidArgument { + error: error.context(context), + }, + } + } +} + +impl From for recon::Error { + fn from(value: Error) -> Self { + match value { + Error::Application { error } => recon::Error::Application { error }, + Error::Fatal { error } => recon::Error::Fatal { error }, + Error::Transient { error } => recon::Error::Transient { error }, + Error::InvalidArgument { error } => recon::Error::Application { error }, + } + } +} + +impl From for Error { + fn from(value: crate::store::Error) -> Self { + match value { + crate::store::Error::Application { error } => Error::Application { error }, + crate::store::Error::Fatal { error } => Error::Fatal { error }, + crate::store::Error::Transient { error } => Error::Transient { error }, + crate::store::Error::InvalidArgument { error } => Error::InvalidArgument { error }, + } + } +} diff --git a/interest-svc/src/interest/mod.rs b/interest-svc/src/interest/mod.rs new file mode 100644 index 000000000..f558e0f11 --- /dev/null +++ b/interest-svc/src/interest/mod.rs @@ -0,0 +1,4 @@ +mod service; +mod store; + +pub use service::InterestService; diff --git a/service/src/interest/service.rs b/interest-svc/src/interest/service.rs similarity index 69% rename from service/src/interest/service.rs rename to interest-svc/src/interest/service.rs index 4b28f42bd..67b28042a 100644 --- a/service/src/interest/service.rs +++ b/interest-svc/src/interest/service.rs @@ -1,12 +1,13 @@ -use ceramic_store::SqlitePool; +use crate::store::SqlitePool; /// A Service that understands how to process and store Ceramic Interests. /// Implements the [`recon::Store`], [`iroh_bitswap::Store`], and [`ceramic_api::EventStore`] traits for [`ceramic_core::Interest`]. #[derive(Debug)] -pub struct CeramicInterestService { +pub struct InterestService { pub(crate) pool: SqlitePool, } -impl CeramicInterestService { +impl InterestService { + /// Construct a new interest service from a [`SqlitePool`]. pub fn new(pool: SqlitePool) -> Self { Self { pool } } diff --git a/service/src/interest/store.rs b/interest-svc/src/interest/store.rs similarity index 82% rename from service/src/interest/store.rs rename to interest-svc/src/interest/store.rs index 792b0e77d..92b2540ff 100644 --- a/service/src/interest/store.rs +++ b/interest-svc/src/interest/store.rs @@ -1,14 +1,15 @@ use std::ops::Range; use ceramic_core::Interest; -use ceramic_store::CeramicOneInterest; use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a}; use tracing::instrument; -use crate::CeramicInterestService; +use crate::store::CeramicOneInterest; +use crate::Error; +use crate::InterestService; #[async_trait::async_trait] -impl recon::Store for CeramicInterestService { +impl recon::Store for InterestService { type Key = Interest; type Hash = Sha256a; @@ -21,7 +22,9 @@ impl recon::Store for CeramicInterestService { items: &[ReconItem], ) -> ReconResult> { let keys = items.iter().map(|item| &item.key).collect::>(); - Ok(CeramicOneInterest::insert_many(&self.pool, &keys).await?) + Ok(CeramicOneInterest::insert_many(&self.pool, &keys) + .await + .map_err(Error::from)?) } /// Return the hash of all keys in the range between left_fencepost and right_fencepost. @@ -29,7 +32,9 @@ impl recon::Store for CeramicInterestService { /// Returns ReconResult<(Hash, count), Err> #[instrument(skip(self))] async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { - Ok(CeramicOneInterest::hash_range(&self.pool, range).await?) + Ok(CeramicOneInterest::hash_range(&self.pool, range) + .await + .map_err(Error::from)?) } /// Return all keys in the range between left_fencepost and right_fencepost. @@ -46,7 +51,8 @@ impl recon::Store for CeramicInterestService { ) -> ReconResult + Send + 'static>> { Ok(Box::new( CeramicOneInterest::range(&self.pool, range, offset, limit) - .await? + .await + .map_err(Error::from)? .into_iter(), )) } @@ -62,13 +68,17 @@ impl recon::Store for CeramicInterestService { offset: usize, limit: usize, ) -> ReconResult)> + Send + 'static>> { - let res = CeramicOneInterest::range(&self.pool, range, offset, limit).await?; + let res = CeramicOneInterest::range(&self.pool, range, offset, limit) + .await + .map_err(Error::from)?; Ok(Box::new(res.into_iter().map(|key| (key, vec![])))) } /// Return the number of keys within the range. #[instrument(skip(self))] async fn count(&self, range: Range<&Self::Key>) -> ReconResult { - Ok(CeramicOneInterest::count(&self.pool, range).await?) + Ok(CeramicOneInterest::count(&self.pool, range) + .await + .map_err(Error::from)?) } /// value_for_key returns @@ -82,7 +92,7 @@ impl recon::Store for CeramicInterestService { } #[async_trait::async_trait] -impl ceramic_api::InterestStore for CeramicInterestService { +impl ceramic_api::InterestService for InterestService { async fn insert(&self, key: Interest) -> anyhow::Result { Ok(CeramicOneInterest::insert(&self.pool, &key).await?) } diff --git a/interest-svc/src/lib.rs b/interest-svc/src/lib.rs new file mode 100644 index 000000000..01dbbf6a2 --- /dev/null +++ b/interest-svc/src/lib.rs @@ -0,0 +1,11 @@ +//! The Event Service provides an API for ingesting and querying Ceramic Events. +#![warn(missing_docs)] + +mod error; +mod interest; +pub mod store; +#[cfg(test)] +mod tests; + +pub use error::Error; +pub use interest::InterestService; diff --git a/interest-svc/src/store/metrics.rs b/interest-svc/src/store/metrics.rs new file mode 100644 index 000000000..2f7043d3a --- /dev/null +++ b/interest-svc/src/store/metrics.rs @@ -0,0 +1,249 @@ +use std::{ops::Range, time::Duration}; + +use async_trait::async_trait; +use ceramic_core::Interest; +use ceramic_metrics::{register, Recorder}; +use futures::Future; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + histogram::{exponential_buckets, Histogram}, + }, + registry::Registry, +}; +use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult}; +use tokio::time::Instant; + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct StorageQuery { + pub name: &'static str, + pub duration: Duration, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct InsertEvent { + pub cnt: u64, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct QueryLabels { + name: &'static str, +} + +impl From<&StorageQuery> for QueryLabels { + fn from(value: &StorageQuery) -> Self { + Self { name: value.name } + } +} + +#[derive(Clone, Debug)] +/// Storage system metrics +pub struct Metrics { + key_value_insert_count: Counter, + + query_durations: Family, +} + +impl Metrics { + /// Register and construct Metrics + pub fn register(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("store"); + + register!( + key_value_insert_count, + "Number times a new key/value pair is inserted into the datastore", + Counter::default(), + sub_registry + ); + + register!( + query_durations, + "Durations of store queries in seconds", + Family::::new_with_constructor(|| { + Histogram::new(exponential_buckets(0.005, 2.0, 20)) + }), + sub_registry + ); + + Self { + key_value_insert_count, + query_durations, + } + } +} + +impl Recorder for Metrics { + fn record(&self, event: &InsertEvent) { + self.key_value_insert_count.inc_by(event.cnt); + } +} + +impl Recorder for Metrics { + fn record(&self, event: &StorageQuery) { + let labels: QueryLabels = event.into(); + self.query_durations + .get_or_create(&labels) + .observe(event.duration.as_secs_f64()); + } +} + +/// Implement the Store and record metrics +#[derive(Debug, Clone)] +pub struct StoreMetricsMiddleware +where + S: Send + Sync, +{ + store: S, + metrics: Metrics, +} + +impl StoreMetricsMiddleware { + /// Construct a new StoreMetricsMiddleware. + /// The metrics should have already be registered. + pub fn new(store: S, metrics: Metrics) -> Self { + Self { store, metrics } + } + // Record metrics for a given API endpoint + async fn record(metrics: &Metrics, name: &'static str, fut: impl Future) -> T { + let start = Instant::now(); + let ret = fut.await; + let duration = start.elapsed(); + let event = StorageQuery { name, duration }; + metrics.record(&event); + ret + } + + fn record_key_insert(&self, new_key: bool) { + if new_key { + self.metrics.record(&InsertEvent { cnt: 1 }); + } + } +} + +#[async_trait] +impl ceramic_api::InterestService for StoreMetricsMiddleware +where + S: ceramic_api::InterestService, +{ + async fn insert(&self, key: Interest) -> anyhow::Result { + let new = StoreMetricsMiddleware::::record( + &self.metrics, + "api_interest_insert", + self.store.insert(key), + ) + .await?; + self.record_key_insert(new); + Ok(new) + } + async fn range( + &self, + start: &Interest, + end: &Interest, + offset: usize, + limit: usize, + ) -> anyhow::Result> { + StoreMetricsMiddleware::::record( + &self.metrics, + "api_interest_range", + self.store.range(start, end, offset, limit), + ) + .await + } +} + +#[async_trait] +impl recon::Store for StoreMetricsMiddleware +where + S: recon::Store + Send + Sync, + K: recon::Key, + H: AssociativeHash, +{ + type Key = K; + type Hash = H; + + async fn insert_many( + &self, + items: &[ReconItem], + ) -> ReconResult> { + let res = StoreMetricsMiddleware::::record( + &self.metrics, + "insert_many", + self.store.insert_many(items), + ) + .await?; + + self.metrics.record(&InsertEvent { + cnt: res.count_inserted() as u64, + }); + + Ok(res) + } + + async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record( + &self.metrics, + "hash_range", + self.store.hash_range(range), + ) + .await + } + + async fn range( + &self, + range: Range<&Self::Key>, + offset: usize, + limit: usize, + ) -> ReconResult + Send + 'static>> { + StoreMetricsMiddleware::::record( + &self.metrics, + "range", + self.store.range(range, offset, limit), + ) + .await + } + async fn range_with_values( + &self, + range: Range<&Self::Key>, + offset: usize, + limit: usize, + ) -> ReconResult)> + Send + 'static>> { + StoreMetricsMiddleware::::record( + &self.metrics, + "range_with_values", + self.store.range_with_values(range, offset, limit), + ) + .await + } + + async fn full_range( + &self, + ) -> ReconResult + Send + 'static>> { + StoreMetricsMiddleware::::record(&self.metrics, "full_range", self.store.full_range()) + .await + } + + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await + } + async fn count(&self, range: Range<&Self::Key>) -> ReconResult { + StoreMetricsMiddleware::::record(&self.metrics, "count", self.store.count(range)).await + } + async fn len(&self) -> ReconResult { + StoreMetricsMiddleware::::record(&self.metrics, "len", self.store.len()).await + } + + async fn is_empty(&self) -> ReconResult { + StoreMetricsMiddleware::::record(&self.metrics, "is_empty", self.store.is_empty()).await + } + + async fn value_for_key(&self, key: &Self::Key) -> ReconResult>> { + StoreMetricsMiddleware::::record( + &self.metrics, + "value_for_key", + self.store.value_for_key(key), + ) + .await + } +} diff --git a/interest-svc/src/store/mod.rs b/interest-svc/src/store/mod.rs new file mode 100644 index 000000000..ebcb0cf2a --- /dev/null +++ b/interest-svc/src/store/mod.rs @@ -0,0 +1,9 @@ +//! An implementation of store for event. + +mod metrics; +mod sql; + +pub use metrics::{Metrics, StoreMetricsMiddleware}; +pub use sql::{ + CeramicOneInterest, CeramicOneVersion, Error, Result, SqlitePool, SqliteTransaction, +}; diff --git a/store/src/sql/access/interest.rs b/interest-svc/src/store/sql/access/interest.rs similarity index 89% rename from store/src/sql/access/interest.rs rename to interest-svc/src/store/sql/access/interest.rs index 174eefb7f..07a848337 100644 --- a/store/src/sql/access/interest.rs +++ b/interest-svc/src/store/sql/access/interest.rs @@ -8,11 +8,11 @@ use ceramic_core::Interest; use recon::{AssociativeHash, HashCount, InsertResult, Key, Sha256a}; use sqlx::Row; -use crate::{ +use crate::store::{ sql::{ entities::ReconHash, - query::{ReconQuery, ReconType, SqlBackend}, - sqlite::SqliteTransaction, + query::{ReconQuery, SqlBackend}, + SqliteTransaction, }, Error, Result, SqlitePool, }; @@ -94,15 +94,12 @@ impl CeramicOneInterest { return Ok(HashCount::new(Sha256a::identity(), 0)); } - let res: ReconHash = sqlx::query_as(ReconQuery::hash_range( - ReconType::Interest, - SqlBackend::Sqlite, - )) - .bind(range.start.as_bytes()) - .bind(range.end.as_bytes()) - .fetch_one(pool.reader()) - .await - .map_err(Error::from)?; + let res: ReconHash = sqlx::query_as(ReconQuery::hash_range(SqlBackend::Sqlite)) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_one(pool.reader()) + .await + .map_err(Error::from)?; let bytes = res.hash(); Ok(HashCount::new(Sha256a::from(bytes), res.count())) } @@ -114,7 +111,7 @@ impl CeramicOneInterest { offset: usize, limit: usize, ) -> Result> { - let query = sqlx::query(ReconQuery::range(ReconType::Interest)); + let query = sqlx::query(ReconQuery::range()); let rows = query .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) @@ -135,7 +132,7 @@ impl CeramicOneInterest { /// Count the number of keys in a given range pub async fn count(pool: &SqlitePool, range: Range<&Interest>) -> Result { - let row = sqlx::query(ReconQuery::count(ReconType::Interest, SqlBackend::Sqlite)) + let row = sqlx::query(ReconQuery::count(SqlBackend::Sqlite)) .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) .fetch_one(pool.reader()) diff --git a/interest-svc/src/store/sql/access/mod.rs b/interest-svc/src/store/sql/access/mod.rs new file mode 100644 index 000000000..c12668ae2 --- /dev/null +++ b/interest-svc/src/store/sql/access/mod.rs @@ -0,0 +1,5 @@ +mod interest; +mod version; + +pub use interest::CeramicOneInterest; +pub use version::CeramicOneVersion; diff --git a/store/src/sql/access/version.rs b/interest-svc/src/store/sql/access/version.rs similarity index 98% rename from store/src/sql/access/version.rs rename to interest-svc/src/store/sql/access/version.rs index 8d5867462..fd4202a66 100644 --- a/store/src/sql/access/version.rs +++ b/interest-svc/src/store/sql/access/version.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use anyhow::anyhow; -use crate::{ +use crate::store::{ sql::{entities::VersionRow, SqlitePool}, Error, Result, }; @@ -82,7 +82,7 @@ impl CeramicOneVersion { mod test { use super::*; - use crate::SqlitePool; + use crate::store::SqlitePool; #[tokio::test] async fn insert_version() { diff --git a/interest-svc/src/store/sql/entities/hash.rs b/interest-svc/src/store/sql/entities/hash.rs new file mode 100644 index 000000000..250fd68b2 --- /dev/null +++ b/interest-svc/src/store/sql/entities/hash.rs @@ -0,0 +1,48 @@ +use sqlx::{sqlite::SqliteRow, Row as _}; + +#[derive(Debug, Clone)] +pub struct ReconHash { + pub count: i64, + pub ahash_0: u32, + pub ahash_1: u32, + pub ahash_2: u32, + pub ahash_3: u32, + pub ahash_4: u32, + pub ahash_5: u32, + pub ahash_6: u32, + pub ahash_7: u32, +} + +impl sqlx::FromRow<'_, SqliteRow> for ReconHash { + fn from_row(row: &SqliteRow) -> std::result::Result { + Ok(Self { + count: row.try_get("count")?, + ahash_0: row.try_get("ahash_0")?, + ahash_1: row.try_get("ahash_1")?, + ahash_2: row.try_get("ahash_2")?, + ahash_3: row.try_get("ahash_3")?, + ahash_4: row.try_get("ahash_4")?, + ahash_5: row.try_get("ahash_5")?, + ahash_6: row.try_get("ahash_6")?, + ahash_7: row.try_get("ahash_7")?, + }) + } +} + +impl ReconHash { + pub fn count(&self) -> u64 { + self.count as u64 + } + pub fn hash(&self) -> [u32; 8] { + [ + self.ahash_0, + self.ahash_1, + self.ahash_2, + self.ahash_3, + self.ahash_4, + self.ahash_5, + self.ahash_6, + self.ahash_7, + ] + } +} diff --git a/interest-svc/src/store/sql/entities/mod.rs b/interest-svc/src/store/sql/entities/mod.rs new file mode 100644 index 000000000..4860a3909 --- /dev/null +++ b/interest-svc/src/store/sql/entities/mod.rs @@ -0,0 +1,5 @@ +mod hash; +mod version; + +pub use hash::ReconHash; +pub use version::VersionRow; diff --git a/interest-svc/src/store/sql/entities/version.rs b/interest-svc/src/store/sql/entities/version.rs new file mode 100644 index 000000000..b30ebf908 --- /dev/null +++ b/interest-svc/src/store/sql/entities/version.rs @@ -0,0 +1,39 @@ +use sqlx::types::chrono; + +use crate::store::{Result, SqlitePool}; + +#[derive(Debug, Clone, sqlx::FromRow)] +// We want to retrieve these fields for logging but we don't refer to them directly +#[allow(dead_code)] +pub struct VersionRow { + id: i64, + pub version: String, + pub installed_at: chrono::NaiveDateTime, + pub last_started_at: chrono::NaiveDateTime, +} + +impl VersionRow { + /// Return the version installed before the current version + pub async fn _fetch_previous(pool: &SqlitePool, current_version: &str) -> Result> { + Ok(sqlx::query_as( + "SELECT id, version, installed_at + FROM ceramic_one_version + WHERE version <> $1 + ORDER BY installed_at DESC limit 1;", + ) + .bind(current_version) + .fetch_optional(pool.reader()) + .await?) + } + + /// Add the current version to the database, updating the last_started_at field if the version already exists + pub async fn insert_current(pool: &SqlitePool, current_version: &str) -> Result<()> { + sqlx::query( + "INSERT INTO ceramic_one_version (version) VALUES ($1) ON CONFLICT (version) DO UPDATE set last_started_at = CURRENT_TIMESTAMP;", + ) + .bind(current_version) + .execute(pool.writer()) + .await?; + Ok(()) + } +} diff --git a/interest-svc/src/store/sql/mod.rs b/interest-svc/src/store/sql/mod.rs new file mode 100644 index 000000000..0a753ed85 --- /dev/null +++ b/interest-svc/src/store/sql/mod.rs @@ -0,0 +1,9 @@ +mod access; +pub mod entities; +mod query; + +pub use access::{CeramicOneInterest, CeramicOneVersion}; +pub use ceramic_sql::{ + sqlite::{SqlitePool, SqliteTransaction}, + Error, Result, +}; diff --git a/interest-svc/src/store/sql/query.rs b/interest-svc/src/store/sql/query.rs new file mode 100644 index 000000000..98c319db5 --- /dev/null +++ b/interest-svc/src/store/sql/query.rs @@ -0,0 +1,65 @@ +/// Holds the SQL queries for accessing interests across DB types +pub struct ReconQuery {} +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SqlBackend { + Sqlite, +} + +impl ReconQuery { + /// Requires 9 parameters: the order_key and the 8 hash values + pub fn insert_interest() -> &'static str { + "INSERT INTO ceramic_one_interest ( + order_key, + ahash_0, ahash_1, ahash_2, ahash_3, + ahash_4, ahash_5, ahash_6, ahash_7 + ) VALUES ( + $1, + $2, $3, $4, $5, + $6, $7, $8, $9 + );" + } + + /// Requires binding 2 parameters. Returned as `ReconHash` struct + pub fn hash_range(db: SqlBackend) -> &'static str { + match db { + SqlBackend::Sqlite => { + r#"SELECT + TOTAL(ahash_0) & 0xFFFFFFFF as ahash_0, TOTAL(ahash_1) & 0xFFFFFFFF as ahash_1, + TOTAL(ahash_2) & 0xFFFFFFFF as ahash_2, TOTAL(ahash_3) & 0xFFFFFFFF as ahash_3, + TOTAL(ahash_4) & 0xFFFFFFFF as ahash_4, TOTAL(ahash_5) & 0xFFFFFFFF as ahash_5, + TOTAL(ahash_6) & 0xFFFFFFFF as ahash_6, TOTAL(ahash_7) & 0xFFFFFFFF as ahash_7, + COUNT(1) as count + FROM ceramic_one_interest + WHERE order_key >= $1 AND order_key < $2;"# + } + } + } + /// Requires binding 2 parameters + pub fn range() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_interest + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC + LIMIT + $3 + OFFSET + $4;"# + } + + pub fn count(db: SqlBackend) -> &'static str { + match db { + SqlBackend::Sqlite => { + r#"SELECT + count(order_key) as res + FROM + ceramic_one_interest + WHERE + order_key >= $1 AND order_key < $2"# + } + } + } +} diff --git a/service/src/tests/interest.rs b/interest-svc/src/tests/interest.rs similarity index 94% rename from service/src/tests/interest.rs rename to interest-svc/src/tests/interest.rs index 2cd129243..9681d3a10 100644 --- a/service/src/tests/interest.rs +++ b/interest-svc/src/tests/interest.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeSet, str::FromStr}; -use ceramic_api::InterestStore; +use ceramic_api::InterestService; use ceramic_core::{ interest::{Builder, WithPeerId}, Interest, PeerId, @@ -19,8 +19,8 @@ macro_rules! test_with_sqlite { #[test(tokio::test)] async fn [<$test_name _sqlite>]() { - let conn = ceramic_store::SqlitePool::connect_in_memory().await.unwrap(); - let store = $crate::CeramicInterestService::new(conn); + let conn = $crate::store::SqlitePool::connect_in_memory().await.unwrap(); + let store = $crate::InterestService::new(conn); $( for stmt in $sql_stmts { store.pool.run_statement(stmt).await.unwrap(); @@ -76,16 +76,16 @@ test_with_dbs!( ); // This is the same as the recon::Store range test, but with the interest store (hits all its methods) -async fn access_interest_model(store: impl InterestStore) { +async fn access_interest_model(store: impl InterestService) { let interest_0 = random_interest(None, None); let interest_1 = random_interest(None, None); - InterestStore::insert(&store, interest_0.clone()) + InterestService::insert(&store, interest_0.clone()) .await .unwrap(); - InterestStore::insert(&store, interest_1.clone()) + InterestService::insert(&store, interest_1.clone()) .await .unwrap(); - let interests = InterestStore::range( + let interests = InterestService::range( &store, &random_interest_min(), &random_interest_max(), diff --git a/interest-svc/src/tests/mod.rs b/interest-svc/src/tests/mod.rs new file mode 100644 index 000000000..6cbe25d75 --- /dev/null +++ b/interest-svc/src/tests/mod.rs @@ -0,0 +1 @@ +mod interest; diff --git a/one/Cargo.toml b/one/Cargo.toml index d0402ace3..edbe735aa 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -20,8 +20,9 @@ ceramic-kubo-rpc = { path = "../kubo-rpc", features = ["http"] } ceramic-kubo-rpc-server.workspace = true ceramic-metrics.workspace = true ceramic-p2p.workspace = true -ceramic-service.workspace = true -ceramic-store.workspace = true +ceramic-event-svc.workspace = true +ceramic-interest-svc.workspace = true +ceramic-sql.workspace = true cid.workspace = true clap.workspace = true futures.workspace = true diff --git a/one/src/lib.rs b/one/src/lib.rs index d8880d0b1..6a28130da 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -11,12 +11,13 @@ mod network; use std::{env, path::PathBuf, time::Duration}; use anyhow::{anyhow, Context, Result}; -use ceramic_api::{EventStore, InterestStore}; +use ceramic_api::{EventService as ApiEventService, InterestService as ApiInterestService}; use ceramic_core::{EventId, Interest}; +use ceramic_event_svc::EventService; +use ceramic_interest_svc::InterestService; use ceramic_kubo_rpc::Multiaddr; use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle}; use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig}; -use ceramic_service::{CeramicEventService, CeramicInterestService, CeramicService}; use clap::{Args, Parser, Subcommand, ValueEnum}; use feature_flags::*; use futures::StreamExt; @@ -330,18 +331,18 @@ impl DBOpts { validate_events: bool, ) -> Result { let sql_pool = - ceramic_store::SqlitePool::connect(path, ceramic_store::Migrations::Apply).await?; - let ceramic_service = CeramicService::try_new(sql_pool, validate_events).await?; - let interest_store = ceramic_service.interest_service().to_owned(); - let event_store = ceramic_service.event_service().to_owned(); + ceramic_sql::sqlite::SqlitePool::connect(path, ceramic_sql::sqlite::Migrations::Apply) + .await?; + let interest_svc = InterestService::new(sql_pool.clone()); + let event_svc = EventService::try_new(sql_pool, validate_events).await?; if process_undelivered { - event_store.process_all_undelivered_events().await?; + event_svc.process_all_undelivered_events().await?; } info!(path, "connected to sqlite db"); Ok(Databases::Sqlite(SqliteBackend { - event_store, - interest_store, + event_svc: event_svc.into(), + interest_svc: interest_svc.into(), })) } } @@ -350,8 +351,8 @@ enum Databases { Sqlite(SqliteBackend), } struct SqliteBackend { - interest_store: Arc, - event_store: Arc, + interest_svc: Arc, + event_svc: Arc, } struct Daemon; @@ -405,11 +406,11 @@ impl Daemon { Databases::Sqlite(db) => { Daemon::run_internal( opts, - db.interest_store.clone(), - db.interest_store, - db.event_store.clone(), - db.event_store.clone(), - db.event_store, + db.interest_svc.clone(), + db.interest_svc, + db.event_svc.clone(), + db.event_svc.clone(), + db.event_svc, metrics_handle, ) .await @@ -427,9 +428,9 @@ impl Daemon { metrics_handle: MetricsHandle, ) -> Result<()> where - I1: InterestStore + Send + Sync + 'static, + I1: ApiInterestService + Send + Sync + 'static, I2: recon::Store + Send + Sync + 'static, - E1: EventStore + Send + Sync + 'static, + E1: ApiEventService + Send + Sync + 'static, E2: recon::Store + Send + Sync + 'static, E3: iroh_bitswap::Store + Send + Sync + 'static, { @@ -501,28 +502,35 @@ impl Daemon { // Register metrics for all components let recon_metrics = MetricsHandle::register(recon::Metrics::register); - let store_metrics = MetricsHandle::register(ceramic_store::Metrics::register); + let interest_svc_store_metrics = + MetricsHandle::register(ceramic_interest_svc::store::Metrics::register); + let event_svc_store_metrics = + MetricsHandle::register(ceramic_event_svc::store::Metrics::register); let http_metrics = Arc::new(ceramic_metrics::MetricsHandle::register( http_metrics::Metrics::register, )); // Create recon store for interests. - let interest_store = ceramic_store::StoreMetricsMiddleware::new( + let interest_store = ceramic_interest_svc::store::StoreMetricsMiddleware::new( interest_recon_store.clone(), - store_metrics.clone(), + interest_svc_store_metrics.clone(), ); - let interest_api_store = - ceramic_store::StoreMetricsMiddleware::new(interest_api_store, store_metrics.clone()); + let interest_api_store = ceramic_interest_svc::store::StoreMetricsMiddleware::new( + interest_api_store, + interest_svc_store_metrics.clone(), + ); // Create second recon store for models. - let model_store = ceramic_store::StoreMetricsMiddleware::new( + let model_store = ceramic_event_svc::store::StoreMetricsMiddleware::new( model_recon_store.clone(), - store_metrics.clone(), + event_svc_store_metrics.clone(), ); - let model_api_store = - ceramic_store::StoreMetricsMiddleware::new(model_api_store, store_metrics); + let model_api_store = ceramic_event_svc::store::StoreMetricsMiddleware::new( + model_api_store, + event_svc_store_metrics, + ); // Construct a recon implementation for interests. let recon_interest_svr = Recon::new( diff --git a/one/src/migrations.rs b/one/src/migrations.rs index 252fca3df..decf7974a 100644 --- a/one/src/migrations.rs +++ b/one/src/migrations.rs @@ -4,8 +4,8 @@ use anyhow::{anyhow, Result}; use async_stream::try_stream; use async_trait::async_trait; use ceramic_event::unvalidated; +use ceramic_event_svc::BlockStore; use ceramic_metrics::config::Config as MetricsConfig; -use ceramic_service::BlockStore; use cid::Cid; use clap::{Args, Subcommand}; use futures::{stream::BoxStream, StreamExt}; @@ -97,7 +97,7 @@ async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> { let blocks = FSBlockStore { input_ipfs_path: opts.input_ipfs_path, }; - db.event_store + db.event_svc .migrate_from_ipfs(network, blocks, opts.log_tile_docs) .await?; Ok(()) diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index e0a61ec13..2248f0917 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -16,14 +16,14 @@ async-trait.workspace = true backoff.workspace = true ceramic-core.workspace = true ceramic-metrics.workspace = true -ceramic-service.workspace = true -ceramic-store.workspace = true cid.workspace = true futures-util.workspace = true futures.workspace = true # temporary to address lack of error for invalid resolv.conf (missing nameservers) # and libp2p not exposing enough types to actually build a dns resolver -hickory-resolver = { version = "0.24.1", default-features = false, features = ["system-config"] } +hickory-resolver = { version = "0.24.1", default-features = false, features = [ + "system-config", +] } iroh-bitswap.workspace = true iroh-rpc-client.workspace = true iroh-rpc-types.workspace = true @@ -69,10 +69,10 @@ features = [ ] [dev-dependencies] +ceramic-event-svc.workspace = true criterion2.workspace = true rand_chacha.workspace = true test-log.workspace = true -ceramic-store.workspace = true [[bench]] name = "lru_cache" diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 049fdbb7c..3c13827bb 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -1169,8 +1169,7 @@ mod tests { use async_trait::async_trait; use ceramic_core::RangeOpen; - use ceramic_service::CeramicEventService; - use ceramic_store::SqlitePool; + use ceramic_event_svc::{store::SqlitePool, EventService}; use futures::TryStreamExt; use rand::prelude::*; use rand_chacha::ChaCha8Rng; @@ -1387,7 +1386,7 @@ mod tests { let sql_pool = SqlitePool::connect_in_memory().await.unwrap(); let metrics = Metrics::register(&mut prometheus_client::registry::Registry::default()); - let store = Arc::new(CeramicEventService::new(sql_pool, true).await?); + let store = Arc::new(EventService::try_new(sql_pool, true).await?); store.process_all_undelivered_events().await?; let mut p2p = Node::new( network_config, diff --git a/service/src/interest/mod.rs b/service/src/interest/mod.rs deleted file mode 100644 index db5920c7d..000000000 --- a/service/src/interest/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod service; -mod store; - -pub use service::CeramicInterestService; diff --git a/service/src/lib.rs b/service/src/lib.rs deleted file mode 100644 index 52cf0ab66..000000000 --- a/service/src/lib.rs +++ /dev/null @@ -1,44 +0,0 @@ -mod error; -mod event; -mod interest; - -#[cfg(test)] -mod tests; - -use std::sync::Arc; - -use ceramic_store::{CeramicOneVersion, SqlitePool}; -pub use error::Error; -pub use event::{BlockStore, CeramicEventService}; -pub use interest::CeramicInterestService; - -pub(crate) type Result = std::result::Result; - -/// The ceramic service holds the logic needed by the other components (e.g. api, recon) to access the store and process events -/// in a way that makes sense to the ceramic protocol, and not just as raw bytes. -#[derive(Debug)] -pub struct CeramicService { - pub(crate) interest: Arc, - pub(crate) event: Arc, -} - -impl CeramicService { - /// Create a new CeramicService and process undelivered events if requested - pub async fn try_new(pool: SqlitePool, enable_event_validation: bool) -> Result { - // In the future, we may need to check the previous version to make sure we're not downgrading and risking data loss - CeramicOneVersion::insert_current(&pool).await?; - let interest = Arc::new(CeramicInterestService::new(pool.clone())); - let event = Arc::new(CeramicEventService::new(pool, enable_event_validation).await?); - Ok(Self { interest, event }) - } - - /// Get the interest service - pub fn interest_service(&self) -> &Arc { - &self.interest - } - - /// Get the event service - pub fn event_service(&self) -> &Arc { - &self.event - } -} diff --git a/sql/Cargo.toml b/sql/Cargo.toml new file mode 100644 index 000000000..8c8f33902 --- /dev/null +++ b/sql/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "ceramic-sql" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +anyhow.workspace = true +sqlx.workspace = true +thiserror.workspace = true diff --git a/store/src/error.rs b/sql/src/error.rs similarity index 92% rename from store/src/error.rs rename to sql/src/error.rs index afd574126..8df014d98 100644 --- a/store/src/error.rs +++ b/sql/src/error.rs @@ -124,14 +124,3 @@ impl From for Error { } } } - -impl From for recon::Error { - fn from(value: Error) -> Self { - match value { - Error::Application { error } => recon::Error::Application { error }, - Error::Fatal { error } => recon::Error::Fatal { error }, - Error::Transient { error } => recon::Error::Transient { error }, - Error::InvalidArgument { error } => recon::Error::Application { error }, - } - } -} diff --git a/sql/src/lib.rs b/sql/src/lib.rs new file mode 100644 index 000000000..fb0a25654 --- /dev/null +++ b/sql/src/lib.rs @@ -0,0 +1,6 @@ +mod error; +pub mod sqlite; + +pub use error::Error; + +pub type Result = std::result::Result; diff --git a/store/src/sql/sqlite.rs b/sql/src/sqlite.rs similarity index 89% rename from store/src/sql/sqlite.rs rename to sql/src/sqlite.rs index b6fb05fa5..3e5c0db10 100644 --- a/store/src/sql/sqlite.rs +++ b/sql/src/sqlite.rs @@ -5,7 +5,17 @@ use sqlx::{ Sqlite, Transaction, }; -use crate::{Migrations, Result}; +use crate::Result; + +/// How to handle outstanding database migrations. +/// Intend to add a `Check` variant to verify the database is up to date and return an error if it is not. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Migrations { + /// Apply migrations after opening connection + Apply, + /// Do nothing + Skip, +} #[derive(Clone, Debug)] /// The sqlite pool is split into a writer and a reader pool. @@ -91,12 +101,12 @@ impl SqlitePool { /// Get a reference to the writer database pool. The writer pool has only one connection. /// If you are going to do multiple writes in a row, instead use `tx` and `commit`. - pub(crate) fn writer(&self) -> &sqlx::SqlitePool { + pub fn writer(&self) -> &sqlx::SqlitePool { &self.writer } /// Get a reference to the reader database pool. The reader pool has many connections. - pub(crate) fn reader(&self) -> &sqlx::SqlitePool { + pub fn reader(&self) -> &sqlx::SqlitePool { &self.reader } @@ -126,7 +136,7 @@ impl<'a> SqliteTransaction<'a> { Ok(()) } - pub(crate) fn inner(&mut self) -> &mut Transaction<'a, Sqlite> { + pub fn inner(&mut self) -> &mut Transaction<'a, Sqlite> { &mut self.tx } } diff --git a/store/src/lib.rs b/store/src/lib.rs deleted file mode 100644 index 93536b8bb..000000000 --- a/store/src/lib.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! A unified implementation of both the [`recon::Store`] and [`iroh_bitswap::Store`] traits. -//! This unified implementation allows for exposing Recon values as IPFS blocks -#![warn(missing_docs)] - -mod error; -mod metrics; -mod sql; - -pub use error::Error; -pub use metrics::{Metrics, StoreMetricsMiddleware}; -pub use sql::{ - entities::{BlockHash, EventBlockRaw, EventInsertable}, - CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneVersion, - InsertResult, InsertedEvent, Migrations, SqlitePool, SqliteRootStore, SqliteTransaction, -}; - -pub(crate) type Result = std::result::Result; diff --git a/store/src/sql/mod.rs b/store/src/sql/mod.rs deleted file mode 100644 index 2a9997927..000000000 --- a/store/src/sql/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -mod access; -pub mod entities; -mod query; -mod root; -mod sqlite; -#[cfg(test)] -mod test; - -pub use access::{ - CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneVersion, - InsertResult, InsertedEvent, -}; -pub use root::SqliteRootStore; -pub use sqlite::{SqlitePool, SqliteTransaction}; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -/// Now to handle outstanding database migrations. -/// Intend to add a `Check` variant to verify the database is up to date and return an error if it is not. -pub enum Migrations { - /// Apply migrations after opening connection - Apply, - /// Do nothing - Skip, -}