Skip to content

Commit

Permalink
feat: separate event production from listener multiplexing
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Dec 30, 2024
1 parent 849ad1b commit 9df8a9a
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 302 deletions.
1 change: 1 addition & 0 deletions firefly-cardanoconnect/src/streams.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod blockchain;
mod events;
mod manager;
mod mux;
mod types;
Expand Down
110 changes: 41 additions & 69 deletions firefly-cardanoconnect/src/streams/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,19 @@ impl ChainListener {
Ok(self.get_impl().await?.get_tip())
}

pub async fn get_event(&mut self, block_ref: &BlockReference) -> Option<ListenerEvent> {
self.get_impl().await.unwrap().get_event(block_ref)
pub fn try_get_next_event(&mut self, block_ref: &BlockReference) -> Option<ListenerEvent> {
let l = self.0.try_get()?.as_mut().ok()?;
l.try_get_next_event(block_ref)
}

pub async fn get_next(&mut self, block_ref: &BlockReference) -> Option<BlockReference> {
self.get_impl().await.unwrap().get_next(block_ref).await
}

pub async fn get_next_rollback(&mut self, block_ref: &BlockReference) -> BlockReference {
self.get_impl().await.unwrap().get_next_rollback(block_ref)
pub async fn wait_for_next_event(&mut self, block_ref: &BlockReference) -> ListenerEvent {
let l = self.get_impl().await.unwrap();
loop {
if let Some(event) = l.try_get_next_event(block_ref) {
return event;
}
l.wait_for_more_events().await;
}
}

async fn get_impl(&mut self) -> Result<&mut ChainListenerImpl> {
Expand Down Expand Up @@ -103,13 +106,13 @@ impl ChainListenerImpl {
self.history.back().unwrap().as_reference()
}

pub fn get_event(&mut self, block_ref: &BlockReference) -> Option<ListenerEvent> {
let (target_slot, target_hash) = match block_ref {
pub fn try_get_next_event(&mut self, block_ref: &BlockReference) -> Option<ListenerEvent> {
let (prev_slot, prev_hash) = match block_ref {
BlockReference::Origin => (None, self.genesis_hash.clone()),
BlockReference::Point(slot, hash) => (*slot, hash.clone()),
};

if let Some(slot) = target_slot {
if let Some(slot) = prev_slot {
// if we haven't seen enough blocks to be "sure" that this one is immutable, apply all pending updates synchronously
if self
.history
Expand All @@ -124,73 +127,42 @@ impl ChainListenerImpl {
}
}

// If we already know this block has been rolled back, just say so
if let Some(rollback) = self.rollbacks.get(block_ref) {
return Some(ListenerEvent::Rollback(rollback.clone()));
// Check if we've rolled back already
if let Some(rollback_to) = self.rollbacks.get(block_ref) {
return Some(ListenerEvent::Rollback(rollback_to.clone()));
}

// If we have it already, return it.
// If we don't, no big deal, some other consumer is running at a different point in history
self.history
.iter()
.rev()
.take_while(|b| {
b.block_slot
.is_none_or(|b| target_slot.is_none_or(|t| b >= t))
})
.find(|b| b.block_hash == target_hash)
.cloned()
.map(ListenerEvent::Process)
}

pub async fn get_next(&mut self, block_ref: &BlockReference) -> Option<BlockReference> {
let (prev_slot, prev_hash) = match block_ref {
BlockReference::Origin => (None, self.genesis_hash.clone()),
BlockReference::Point(slot, hash) => (*slot, hash.clone()),
};

loop {
// Have we rolled back to before this block? If so, don't wait for its successor.
// That successor will never come, and even if it did, we'd ignore it.
if self.rollbacks.contains_key(block_ref) {
return None;
}

for (index, block) in self.history.iter().enumerate().rev() {
if block.block_hash == prev_hash {
if let Some(next) = self.history.get(index + 1) {
// we already have the block which comes after this!
return Some(next.as_reference());
} else {
// we don't have that block yet, so process events until we do
break;
}
}
// If we can tell by the slots we've gone too far back, break early
if block
.block_slot
.is_some_and(|slot| prev_slot.is_some_and(|target| slot < target))
{
for (index, block) in self.history.iter().enumerate().rev() {
if block.block_hash == prev_hash {
if let Some(next) = self.history.get(index + 1) {
// we already have the block which comes after this!
return Some(ListenerEvent::Process(next.clone()));
} else {
// we don't have that block yet, so process events until we do
break;
}
}

// We don't have it, wait until the chain has progressed before checking again
let mut sync_events = vec![];
if self.sync_event_source.recv_many(&mut sync_events, 32).await == 0 {
panic!("data source has been shut down")
}
for sync_event in sync_events {
self.handle_sync_event(sync_event);
// If we can tell by the slots we've gone too far back, break early
if block
.block_slot
.is_some_and(|slot| prev_slot.is_some_and(|target| slot < target))
{
break;
}
}

// We don't have it, wait until the chain has progressed before checking again
None
}

pub fn get_next_rollback(&mut self, block_ref: &BlockReference) -> BlockReference {
let Some(rollback_to) = self.rollbacks.get(block_ref) else {
panic!("caller is trying to roll back when we didn't tell them to");
};
rollback_to.as_reference()
pub async fn wait_for_more_events(&mut self) {
let mut sync_events = vec![];
if self.sync_event_source.recv_many(&mut sync_events, 32).await == 0 {
panic!("data source has been shut down")
}
for sync_event in sync_events {
self.handle_sync_event(sync_event);
}
}

fn handle_sync_event(&mut self, sync_event: ChainSyncEvent) {
Expand Down
188 changes: 188 additions & 0 deletions firefly-cardanoconnect/src/streams/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use std::{collections::HashMap, time::SystemTime};

use serde_json::json;

use super::{
blockchain::{ChainListener, ListenerEvent},
BlockInfo, BlockReference, Event, EventId, EventReference, ListenerFilter, ListenerId,
};

#[derive(Debug)]
pub struct ChainEventStream {
id: ListenerId,
filters: Vec<ListenerFilter>,
sync: ChainListener,
cache: HashMap<BlockReference, Vec<(EventReference, Event)>>,
}

impl ChainEventStream {
pub fn new(id: ListenerId, filters: Vec<ListenerFilter>, sync: ChainListener) -> Self {
Self {
id,
filters,
sync,
cache: HashMap::new(),
}
}

pub fn try_get_next_event(&mut self, hwm: &EventReference) -> Option<(EventReference, Event)> {
let mut next_hwm = hwm.clone();
loop {
if let Some(result) = self.next_event_in_memory(&next_hwm) {
return Some(result);
}
let (rollbacks, block) = self.try_get_next_block(&next_hwm.block)?;
let block_ref = block.as_reference();
if let Some(event) = self.collect_events(rollbacks, block) {
return Some(event);
}
next_hwm = EventReference {
block: block_ref,
rollback: false,
tx_index: None,
log_index: None,
}
}
}

pub async fn wait_for_next_event(&mut self, hwm: &EventReference) -> (EventReference, Event) {
let mut next_hwm = hwm.clone();
loop {
if let Some(result) = self.next_event_in_memory(&next_hwm) {
return result;
}
let (rollbacks, block) = self.wait_for_next_block(&next_hwm.block).await;
let block_ref = block.as_reference();
if let Some(event) = self.collect_events(rollbacks, block) {
return event;
}
next_hwm = EventReference {
block: block_ref,
rollback: false,
tx_index: None,
log_index: None,
}
}
}

fn next_event_in_memory(&mut self, hwm: &EventReference) -> Option<(EventReference, Event)> {
let cached = self.cache.get(&hwm.block)?;
if hwm.tx_index.is_none() && hwm.log_index.is_none() {
// We haven't processed any events from this block yet, so just process the first
return cached.first().cloned();
}
let current_index = cached.iter().position(|(e_ref, _)| e_ref == hwm)?;
cached.get(current_index + 1).cloned()
}

fn try_get_next_block(
&mut self,
block_ref: &BlockReference,
) -> Option<(Vec<BlockInfo>, BlockInfo)> {
let mut rollbacks = vec![];
let mut at = block_ref.clone();
loop {
match self.sync.try_get_next_event(&at)? {
ListenerEvent::Rollback(block) => {
at = block.as_reference();
rollbacks.push(block);
}
ListenerEvent::Process(block) => {
return Some((rollbacks, block));
}
}
}
}

async fn wait_for_next_block(
&mut self,
block_ref: &BlockReference,
) -> (Vec<BlockInfo>, BlockInfo) {
let mut rollbacks = vec![];
let mut at = block_ref.clone();
loop {
match self.sync.wait_for_next_event(&at).await {
ListenerEvent::Rollback(block) => {
at = block.as_reference();
rollbacks.push(block);
}
ListenerEvent::Process(block) => {
return (rollbacks, block);
}
}
}
}

fn collect_events(
&mut self,
rollbacks: Vec<BlockInfo>,
block: BlockInfo,
) -> Option<(EventReference, Event)> {
let mut result = None;
for rollback in rollbacks {
let Some(forwards) = self.cache.get(&rollback.as_reference()) else {
continue;
};
let backwards: Vec<_> = forwards
.iter()
.map(|(_, e)| {
let event_ref = EventReference {
block: block.as_reference(),
rollback: true,
tx_index: Some(e.id.transaction_index),
log_index: Some(e.id.log_index),
};
let event = e.clone().into_rollback();
(event_ref, event)
})
.collect();
result = result.or(backwards.first().cloned());
self.cache.insert(rollback.as_reference(), backwards);
}
let events = self.collect_forward_tx_events(&block);
result = result.or(events.first().cloned());
self.cache.insert(block.as_reference(), events);
result
}

fn collect_forward_tx_events(&self, block: &BlockInfo) -> Vec<(EventReference, Event)> {
let mut events = vec![];
for (tx_idx, tx_hash) in block.transaction_hashes.iter().enumerate() {
let tx_idx = tx_idx as u64;
if self.matches_tx_filter(tx_hash) {
let id = EventId {
listener_id: self.id.clone(),
signature: "TransactionAccepted(string, string, string)".into(),
block_hash: block.block_hash.clone(),
block_number: block.block_height,
transaction_hash: tx_hash.clone(),
transaction_index: tx_idx,
log_index: 0,
timestamp: Some(SystemTime::now()),
};
let event = Event {
id,
data: json!({}),
};
let event_ref = EventReference {
block: block.as_reference(),
rollback: false,
tx_index: Some(tx_idx),
log_index: Some(0),
};
events.push((event_ref, event));
}
}
events
}

fn matches_tx_filter(&self, tx_hash: &str) -> bool {
for filter in &self.filters {
let ListenerFilter::TransactionId(id) = filter;
if id == tx_hash || id == "any" {
return true;
}
}
false
}
}
Loading

0 comments on commit 9df8a9a

Please sign in to comment.