Skip to content

Commit

Permalink
feat: use data container wrapper for mutable metadata
Browse files Browse the repository at this point in the history
Existing MID events have a header on data payloads. This data is mutable
metadata. We needed a mechanism to represent this kind of data within
the conclusion events and doc states. We added a simple wrapper type
with metadata and data fields.
  • Loading branch information
nathanielc committed Oct 9, 2024
1 parent 32eee5a commit e4bcc38
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 100 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 44 additions & 12 deletions event-svc/src/event/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex, MutexGuard},
};

Expand All @@ -11,6 +11,7 @@ use super::{
};
use async_trait::async_trait;
use ceramic_core::{EventId, Network, NodeId, SerializeExt};
use ceramic_event::unvalidated::data::SHOULD_INDEX_DEFAULT;
use ceramic_flight::{ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime};
use ceramic_sql::sqlite::SqlitePool;
use cid::Cid;
Expand Down Expand Up @@ -142,7 +143,7 @@ impl EventService {
}

/// Given the incoming events, see if any of them are the init event that events were
/// 'pending on' and return all previously pending events that can now be validated.
/// 'pending on' and return all previously pending events that can now be validated.
fn remove_unblocked_from_pending_q(&self, new: &[UnvalidatedEvent]) -> Vec<UnvalidatedEvent> {
let new_init_cids = new
.iter()
Expand Down Expand Up @@ -343,13 +344,31 @@ impl EventService {
} = event;
let stream_cid = event.stream_cid();
let init_event = self.get_event_by_cid(stream_cid).await?;
let init = ConclusionInit::try_from(init_event).map_err(|e| {
let init = ConclusionInit::try_from(&init_event).map_err(|e| {
Error::new_app(anyhow::anyhow!(
"Malformed event found in the database: {}",
e
))
})?;

// Small wrapper container around the data field to hold other mutable metadata for the
// event.
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
struct DataContainer<'a> {
metadata: BTreeMap<String, Ipld>,
data: Option<&'a Ipld>,
}

impl<'a> DataContainer<'a> {
fn new_with_should_index(should_index: bool, data: Option<&'a Ipld>) -> Self {
Self {
metadata: BTreeMap::from([("shouldIndex".to_string(), should_index.into())]),
data,
}
}
}

match event {
ceramic_event::unvalidated::Event::Time(time_event) => {
Ok(ConclusionEvent::Time(ConclusionTime {
Expand All @@ -366,7 +385,14 @@ impl EventService {
event_cid,
init,
previous: vec![*data.prev()],
data: data.data().to_json_bytes().map_err(|e| {
data: DataContainer::new_with_should_index(
data.header()
.map(|header| header.should_index())
.unwrap_or(SHOULD_INDEX_DEFAULT),
Some(data.data()),
)
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!(
"Failed to serialize IPLD data: {}",
e
Expand All @@ -380,7 +406,12 @@ impl EventService {
event_cid,
init,
previous: vec![],
data: init_event.data().to_json_bytes().map_err(|e| {
data: DataContainer::new_with_should_index(
init_event.header().should_index(),
init_event.data(),
)
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!(
"Failed to serialize IPLD data: {}",
e
Expand All @@ -396,13 +427,14 @@ impl EventService {
event_cid,
init,
previous: vec![],
data: unsigned_event
.payload()
.data()
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!("Failed to serialize IPLD data: {}", e))
})?,
data: DataContainer::new_with_should_index(
unsigned_event.payload().header().should_index(),
unsigned_event.payload().data(),
)
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!("Failed to serialize IPLD data: {}", e))
})?,
index: 0,
}))
}
Expand Down
Loading

0 comments on commit e4bcc38

Please sign in to comment.