Skip to content

Commit

Permalink
refactor: create explicit event and interest services with separate s…
Browse files Browse the repository at this point in the history
…tores. (#513)

* refactor: move ceramic-store into ceramic-service

The ceramic-store crate was only consumed by the ceramic-service. In
order to establish a clear pattern that each service should have its own
access path into sqlite we move the ceramic-store crate to be a
submodule of ceramic-service.

* refactor: split ceramic-service into event-svc and interest-svc

* refactor: add ceramic_sql crate for shared sqlite pool

All services will need access to sqlite. This means they need to share a
sqlite pool type. This pulls out the sqlite pool from the services into
a shared type.

* refactor: use store and service names consistently

A store talks directly to a database. A service provides a public API
and consumes a store.

* refactor: fix bugs in rename

* refactor: fix clippy

* refactor: remove unused metrics middleware
  • Loading branch information
nathanielc authored Sep 6, 2024
1 parent c82a685 commit b4ab3b6
Show file tree
Hide file tree
Showing 69 changed files with 1,063 additions and 523 deletions.
145 changes: 78 additions & 67 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 10 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ members = [
"car",
"core",
"event",
"event-svc",
"flight",
"interest-svc",
"kubo-rpc",
"kubo-rpc-server",
"metadata",
"metrics",
"olap",
"one",
"p2p",
"recon",
"store",
"sql",
"validation",
"beetle/iroh-bitswap",
"beetle/iroh-rpc-client",
"beetle/iroh-rpc-types",
"beetle/iroh-util",
"service",
"olap",
"validation",
"flight",
]

[workspace.dependencies]
Expand Down Expand Up @@ -57,14 +58,15 @@ ceramic-api-server = { path = "./api-server" }
ceramic-car = { path = "./car" }
ceramic-core = { path = "./core" }
ceramic-event = { path = "./event" }
ceramic-event-svc = { path = "./event-svc" }
ceramic-flight = { path = "./flight" }
ceramic-interest-svc = { path = "./interest-svc" }
ceramic-kubo-rpc-server = { path = "./kubo-rpc-server" }
ceramic-metadata = { path = "./metadata" }
ceramic-metrics = { path = "./metrics" }
ceramic-one = { path = "./one" }
ceramic-p2p = { path = "./p2p" }
ceramic-service = { path = "./service" }
ceramic-store = { path = "./store" }
ceramic-sql = { path = "./sql" }
ceramic-validation = { path = "./validation" }
chrono = "0.4.31"
cid = { version = "0.11", features = ["serde-codec"] }
Expand Down Expand Up @@ -108,6 +110,7 @@ iroh-p2p = { version = "0.2.0", path = "./beetle/iroh-p2p" }
iroh-rpc-client = { path = "./beetle/iroh-rpc-client" }
iroh-rpc-types = { path = "./beetle/iroh-rpc-types" }
iroh-util = { path = "./beetle/iroh-util" }
itertools = "0.12.0"
k256 = "0.13"
keyed_priority_queue = "0.4.1"
lazy_static = "1.4"
Expand Down
2 changes: 1 addition & 1 deletion api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod server;
pub use resume_token::ResumeToken;

pub use server::{
ApiItem, EventDataResult, EventInsertResult, EventStore, IncludeEventData, InterestStore,
ApiItem, EventDataResult, EventInsertResult, EventService, IncludeEventData, InterestService,
Server,
};

Expand Down
16 changes: 8 additions & 8 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl TryFrom<models::Interest> for ValidatedInterest {

/// Trait for accessing persistent storage of Interests
#[async_trait]
pub trait InterestStore: Send + Sync {
pub trait InterestService: Send + Sync {
/// Returns true if the key was newly inserted, false if it already existed.
async fn insert(&self, key: Interest) -> Result<bool>;
async fn range(
Expand All @@ -147,7 +147,7 @@ pub trait InterestStore: Send + Sync {
}

#[async_trait]
impl<S: InterestStore> InterestStore for Arc<S> {
impl<S: InterestService> InterestService for Arc<S> {
async fn insert(&self, key: Interest) -> Result<bool> {
self.as_ref().insert(key).await
}
Expand Down Expand Up @@ -243,7 +243,7 @@ impl ApiItem {

/// Trait for accessing persistent storage of Events
#[async_trait]
pub trait EventStore: Send + Sync {
pub trait EventService: Send + Sync {
/// Returns (new_key, new_value) where true if was newly inserted, false if it already existed.
async fn insert_many(&self, items: Vec<ApiItem>) -> Result<Vec<EventInsertResult>>;
async fn range_with_values(
Expand Down Expand Up @@ -279,7 +279,7 @@ pub trait EventStore: Send + Sync {
}

#[async_trait::async_trait]
impl<S: EventStore> EventStore for Arc<S> {
impl<S: EventService> EventService for Arc<S> {
async fn insert_many(&self, items: Vec<ApiItem>) -> Result<Vec<EventInsertResult>> {
self.as_ref().insert_many(items).await
}
Expand Down Expand Up @@ -346,8 +346,8 @@ pub struct Server<C, I, M> {

impl<C, I, M> Server<C, I, M>
where
I: InterestStore,
M: EventStore + 'static,
I: InterestService,
M: EventService + 'static,
{
pub fn new(peer_id: PeerId, network: Network, interest: I, model: Arc<M>) -> Self {
let (tx, event_rx) = tokio::sync::mpsc::channel::<EventInsert>(1024);
Expand Down Expand Up @@ -807,8 +807,8 @@ pub(crate) fn decode_multibase_data(value: &str) -> Result<Vec<u8>, BadRequestRe
impl<C, I, M> Api<C> for Server<C, I, M>
where
C: Send + Sync,
I: InterestStore + Sync,
M: EventStore + Sync + 'static,
I: InterestService + Sync,
M: EventService + Sync + 'static,
{
#[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))]
async fn liveness_get(
Expand Down
8 changes: 4 additions & 4 deletions api/src/server/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use ipld_core::ipld::Ipld;
use tokio::io::{AsyncRead, AsyncReadExt as _};
use tracing::debug;

use crate::EventStore;
use crate::EventService;

// Helper function to construct an event ID from CAR data of an event coming in via the HTTP api.
pub async fn event_id_from_car<R, S>(network: Network, mut reader: R, store: &S) -> Result<EventId>
where
R: AsyncRead + Send + Unpin,
S: EventStore,
S: EventService,
{
let mut car_bytes = Vec::new();
reader.read_to_end(&mut car_bytes).await?;
Expand All @@ -29,7 +29,7 @@ async fn event_id_for_event<S>(
store: &S,
) -> Result<EventId>
where
S: EventStore,
S: EventService,
{
match event {
unvalidated::Event::Time(time_event) => {
Expand Down Expand Up @@ -83,7 +83,7 @@ fn event_id_from_init_payload(

async fn get_init_event_payload_from_store(
init_cid: &Cid,
store: &impl EventStore,
store: &impl EventService,
) -> Result<unvalidated::init::Payload<Ipld>> {
let init_bytes = store
.get_block(init_cid)
Expand Down
6 changes: 3 additions & 3 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{ops::Range, str::FromStr, sync::Arc};

use crate::server::{decode_multibase_data, BuildResponse, Server};
use crate::{
ApiItem, EventDataResult, EventInsertResult, EventStore, IncludeEventData, InterestStore,
ApiItem, EventDataResult, EventInsertResult, EventService, IncludeEventData, InterestService,
};

use anyhow::Result;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub fn decode_multibase_str(encoded: &str) -> Vec<u8> {
mock! {
pub AccessInterestStoreTest {}
#[async_trait]
impl InterestStore for AccessInterestStoreTest {
impl InterestService for AccessInterestStoreTest {
async fn insert(&self, key: Interest) -> Result<bool>;
async fn range(
&self,
Expand All @@ -110,7 +110,7 @@ mock! {
mock! {
pub EventStoreTest {}
#[async_trait]
impl EventStore for EventStoreTest {
impl EventService for EventStoreTest {
async fn insert_many(&self, items: Vec<ApiItem>) -> Result<Vec<EventInsertResult>>;
async fn range_with_values(
&self,
Expand Down
Loading

0 comments on commit b4ab3b6

Please sign in to comment.