Skip to content

Commit

Permalink
fix: use correct index for the conclusion feed. (#561)
Browse files Browse the repository at this point in the history
* refactor: update conclusion feed test to highlight bugs

We found a few bugs in the conclusion feed implementation on the event
service. Fixes will follow, however this change updates the test to make
the bugs obvious.

Bugs:
1. Indexes are duplicated
2. Unsigned event indexes are always zero
3. Off by one bug in the bounds of the highwater mark

* fix: use correct delivered index

The previous code would zip the parsed events with the all_blocks
iterator. However the all_blocks iterator was longer than parsed and so
the delivered values got misaligned with the events, causing the index
of conclusion feed events to be incorrect.

Now we correctly filter down the all_block iterator so it has only a
single value per event.

* fix: use correct delivered for unsigned init events

The previous code had a typo to always use 0 as the delivered/index for
unsigned init events. This is now fixed.

* fix: use 1 based highwater_mark values

The sqlite db uses 1 based highwater_mark values, meaning it does delivered
>= highwater_mark vs delivered > highwater_mark. The conclusion feed
doesn't treat the highwater mark directly but instead just returns the
index and expects clients to reuse the max index they have seen which
means we want an exclusive comparison. We achieve this by adding one to
the highwater_mark.

* refactor: make event access stateful

Previously a global static delivered counter was used to assign/track
delivered values. However this makes testing hard as if multiple tests
run they effect the delivered values of each other.

This change makes the EventAccess contain the counter state within
itself and then anywhere that a pool was used to access events it was
replaced with an Arc<EventAccess>. This means we can truly have
isolated delivered counters.

* refactor: use consistent naming of access types

We have an access type pattern for structs that contain the logic to
read specific tables in the db. There were called CeramicOne{Table},
with this change they are renamed to {Table}Access. The CeramicOne
prefix was meaningless and the table name alone was not enough to
distinguish the type from other types related to the same entities.
  • Loading branch information
nathanielc authored Oct 11, 2024
1 parent a92a809 commit 052196f
Show file tree
Hide file tree
Showing 21 changed files with 599 additions and 402 deletions.
4 changes: 2 additions & 2 deletions event-svc/benches/sqlite_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ceramic_event::unvalidated::{
signed::{self, Signer},
Builder,
};
use ceramic_event_svc::store::{CeramicOneEvent, EventInsertable};
use ceramic_event_svc::store::{EventAccess, EventInsertable};
use ceramic_sql::sqlite::SqlitePool;
use criterion2::{criterion_group, criterion_main, BatchSize, Criterion};
use ipld_core::ipld::Ipld;
Expand Down Expand Up @@ -91,7 +91,7 @@ async fn model_routine(input: ModelSetup) {
let futs = futs.into_iter().map(|batch| {
let store = input.pool.clone();
let set = batch.into_iter().collect::<Vec<_>>();
async move { CeramicOneEvent::insert_many(&store, set.iter()).await }
async move { EventAccess::insert_many(&store, set.iter()).await }
});
futures::future::join_all(futs).await;
}
Expand Down
4 changes: 3 additions & 1 deletion event-svc/src/event/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ impl ConclusionFeed for EventService {
limit: i64,
) -> anyhow::Result<Vec<ConclusionEvent>> {
let raw_events = self
.fetch_events_since_highwater_mark(highwater_mark, limit)
// TODO: Can we make highwater_marks zero based?
// highwater marks are 1 based, add one
.fetch_events_since_highwater_mark(highwater_mark + 1, limit)
.await?;

let conclusion_events_futures = raw_events
Expand Down
37 changes: 20 additions & 17 deletions event-svc/src/event/order_events.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;

use crate::store::{CeramicOneEvent, EventInsertable};
use crate::store::{EventAccess, EventInsertable};
use crate::Result;
use ceramic_core::Cid;
use ceramic_sql::sqlite::SqlitePool;

/// Groups the events into lists of those with a delivered prev and those without. This can be used to return an error if the event is required to have history.
/// The events will be marked as deliverable so that they can be passed directly to the store to be persisted. It assumes init events have already been marked deliverable.
Expand Down Expand Up @@ -41,15 +41,15 @@ impl OrderEvents {

/// Uses the in memory set and the database to try to follow prev chains and mark deliverable
pub async fn find_currently_deliverable(
pool: &SqlitePool,
event_access: Arc<EventAccess>,
candidate_events: Vec<EventInsertable>,
) -> Result<Self> {
Self::find_deliverable_internal(Some(pool), candidate_events).await
Self::find_deliverable_internal(Some(event_access), candidate_events).await
}

/// Builds deliverable events, using the db pool if provided
async fn find_deliverable_internal(
pool: Option<&SqlitePool>,
event_access: Option<Arc<EventAccess>>,
candidate_events: Vec<EventInsertable>,
) -> Result<Self> {
let mut new_cids: HashMap<Cid, bool> = HashMap::with_capacity(candidate_events.len());
Expand Down Expand Up @@ -88,9 +88,9 @@ impl OrderEvents {
} else {
undelivered_prevs_in_memory.push_back(event);
}
} else if let Some(pool) = pool {
} else if let Some(event_access) = &event_access {
let (_exists, prev_deliverable) =
CeramicOneEvent::deliverable_by_cid(pool, prev).await?;
event_access.deliverable_by_cid(prev).await?;
if prev_deliverable {
event.set_deliverable(true);
*new_cids.get_mut(event.cid()).expect("CID must exist") = true;
Expand Down Expand Up @@ -145,7 +145,10 @@ impl OrderEvents {

#[cfg(test)]
mod test {
use std::sync::Arc;

use ceramic_core::EventId;
use ceramic_sql::sqlite::SqlitePool;
use rand::seq::SliceRandom;
use rand::thread_rng;
use recon::ReconItem;
Expand Down Expand Up @@ -225,11 +228,12 @@ mod test {
#[test(tokio::test)]
async fn out_of_order_streams_valid() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let (stream_1, stream_2, mut to_insert) = get_2_streams().await;
to_insert.shuffle(&mut thread_rng());

let ordered = OrderEvents::find_currently_deliverable(&pool, to_insert)
let ordered = OrderEvents::find_currently_deliverable(event_access, to_insert)
.await
.unwrap();
assert!(
Expand All @@ -254,13 +258,14 @@ mod test {
#[test(tokio::test)]
async fn missing_history_in_memory() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let (stream_1, stream_2, mut to_insert) = get_2_streams().await;
// if event 2 is missing from stream_1, we will sort stream_2 but stream_1 will be "missing history" after the init event
to_insert.remove(1);
to_insert.shuffle(&mut thread_rng());

let ordered = OrderEvents::find_currently_deliverable(&pool, to_insert)
let ordered = OrderEvents::find_currently_deliverable(event_access, to_insert)
.await
.unwrap();
assert_eq!(
Expand All @@ -285,16 +290,15 @@ mod test {
// so that an API write that had never seen event 2, would not able to write event 3 or after
// the recon ordering task would sort this and mark all deliverable
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let stream_1 = get_n_events(10).await;
let (to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await;
CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();
event_access.insert_many(to_insert.iter()).await.unwrap();

remaining.shuffle(&mut thread_rng());

let ordered = OrderEvents::find_currently_deliverable(&pool, remaining)
let ordered = OrderEvents::find_currently_deliverable(event_access, remaining)
.await
.unwrap();
assert_eq!(
Expand All @@ -310,24 +314,23 @@ mod test {
// this test validates we can order in memory events with each other if one of them has a prev
// in the database that is deliverable, in which case the entire chain is deliverable
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let stream_1 = get_n_events(10).await;
let (mut to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await;
for item in to_insert.as_mut_slice() {
item.set_deliverable(true)
}

CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();
event_access.insert_many(to_insert.iter()).await.unwrap();

let expected = remaining
.iter()
.map(|i| i.order_key().clone())
.collect::<Vec<_>>();
remaining.shuffle(&mut thread_rng());

let ordered = OrderEvents::find_currently_deliverable(&pool, remaining)
let ordered = OrderEvents::find_currently_deliverable(event_access, remaining)
.await
.unwrap();
assert!(
Expand Down
Loading

0 comments on commit 052196f

Please sign in to comment.