Skip to content

Commit

Permalink
feat: update chains without rewrites
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 15, 2024
1 parent 10bea3f commit 76e8ad6
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 116 deletions.
73 changes: 46 additions & 27 deletions src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
use std::{
collections::{BTreeMap, BTreeSet},
iter,
time::Duration,
};

use thegraph_core::{BlockHash, IndexerId};
use tokio::time::Instant;

use crate::blocks::Block;

#[derive(Default)]
pub struct Chain(BTreeMap<Block, BTreeSet<IndexerId>>);
pub struct Chain {
blocks: BTreeMap<Block, BTreeSet<IndexerId>>,
update: parking_lot::Mutex<Option<Instant>>,
}

const MAX_LEN: usize = 512;
const DEFAULT_BLOCKS_PER_MINUTE: u64 = 6;
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);

impl Chain {
pub fn update_ticket(&self) -> Option<()> {
let mut update = self.update.try_lock()?;
if matches!(*update, Some(t) if t.elapsed() < UPDATE_INTERVAL) {
return None;
}
*update = Some(Instant::now());
Some(())
}

pub fn latest(&self) -> Option<&Block> {
self.consensus_blocks().next()
}
Expand All @@ -38,33 +53,39 @@ impl Chain {
(bps * 60.0) as u64
}

pub fn should_insert(&self, block: &Block, indexer: &IndexerId) -> bool {
pub fn insert(&mut self, block: Block, indexer: IndexerId) {
tracing::trace!(%indexer, ?block);
if !self.should_insert(&block, &indexer) {
return;
}
if self.blocks.len() >= MAX_LEN {
self.evict();
}
self.blocks.entry(block).or_default().insert(indexer);
}

fn should_insert(&self, block: &Block, indexer: &IndexerId) -> bool {
let redundant = self
.0
.blocks
.get(block)
.map(|indexers| indexers.contains(indexer))
.unwrap_or(false);
let lowest_block = self.0.first_key_value().map(|(b, _)| b.number).unwrap_or(0);
let has_space = (self.0.len() < MAX_LEN) || (block.number > lowest_block);
let lowest_block = self
.blocks
.first_key_value()
.map(|(b, _)| b.number)
.unwrap_or(0);
let has_space = (self.blocks.len() < MAX_LEN) || (block.number > lowest_block);
!redundant && has_space
}

pub fn insert(&mut self, block: Block, indexer: IndexerId) {
tracing::trace!(%indexer, ?block);
debug_assert!(self.should_insert(&block, &indexer));
if self.0.len() >= MAX_LEN {
self.evict();
}
self.0.entry(block).or_default().insert(indexer);
}

/// Remove all entries associated with the lowest block number.
fn evict(&mut self) {
let min_block = match self.0.pop_first() {
let min_block = match self.blocks.pop_first() {
Some((min_block, _)) => min_block,
None => return,
};
while let Some(entry) = self.0.first_entry() {
while let Some(entry) = self.blocks.first_entry() {
debug_assert!(entry.key().number >= min_block.number);
if entry.key().number > min_block.number {
break;
Expand Down Expand Up @@ -100,15 +121,15 @@ impl Chain {
}
}
ConsensusBlocks {
blocks: self.0.iter().rev().peekable(),
blocks: self.blocks.iter().rev().peekable(),
}
}
}

#[cfg(test)]
mod tests {
use alloy_primitives::U256;
use itertools::Itertools;
use itertools::Itertools as _;
use rand::{
rngs::SmallRng, seq::SliceRandom as _, thread_rng, Rng as _, RngCore as _, SeedableRng,
};
Expand Down Expand Up @@ -137,32 +158,30 @@ mod tests {
timestamp,
};
let indexer = *indexers.choose(&mut rng).unwrap();
if chain.should_insert(&block, &indexer) {
chain.insert(block, indexer);
}
chain.insert(block, indexer);
}

// println!("{:#?}", chain.0);
// println!("{:#?}", chain.blocks);
// println!("{:#?}", chain.consensus_blocks().collect::<Vec<_>>());

assert!(chain.0.len() <= MAX_LEN, "chain len above max");
assert!(chain.consensus_blocks().count() <= chain.0.len());
assert!(chain.blocks.len() <= MAX_LEN, "chain len above max");
assert!(chain.consensus_blocks().count() <= chain.blocks.len());
assert!(chain.blocks_per_minute() > 0);
let blocks = || chain.0.keys();
let blocks = || chain.blocks.keys();
assert!(
blocks().tuple_windows().all(|(a, b)| a.number <= b.number),
"chain block numbers not monotonic, check ord impl"
);
for block in chain.consensus_blocks() {
let max_fork_indexers = chain
.0
.blocks
.iter()
.filter(|(block, _)| (block != block) && (block.number == block.number))
.map(|(_, indexers)| indexers.len())
.max()
.unwrap_or(0);
assert!(
chain.0.get(block).unwrap().len() > max_fork_indexers,
chain.blocks.get(block).unwrap().len() > max_fork_indexers,
"consensus block without majority consensus"
);
}
Expand Down
91 changes: 9 additions & 82 deletions src/chains.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,11 @@
use std::{
collections::{BTreeMap, HashMap},
time::Duration,
};
use std::collections::{BTreeMap, HashMap};

use parking_lot::{RwLock, RwLockReadGuard};
use thegraph_core::IndexerId;
use tokio::{
select, spawn,
sync::mpsc,
time::{interval, MissedTickBehavior},
};
use parking_lot::RwLock;

use crate::{blocks::Block, chain::Chain, metrics::METRICS};

#[derive(Clone)]
pub struct ChainReader {
tx: mpsc::UnboundedSender<Msg>,
chain: &'static RwLock<Chain>,
}

impl ChainReader {
pub fn read(&self) -> RwLockReadGuard<'_, Chain> {
self.chain.read()
}

pub fn notify(&self, block: Block, indexer: IndexerId) {
let _ = self.tx.send(Msg { block, indexer });
}
}
use crate::chain::Chain;

pub struct Chains {
data: RwLock<HashMap<String, ChainReader>>,
data: RwLock<HashMap<String, &'static RwLock<Chain>>>,
aliases: BTreeMap<String, String>,
}

Expand All @@ -42,68 +17,20 @@ impl Chains {
}
}

pub fn chain(&self, name: &str) -> ChainReader {
pub fn chain(&self, name: &str) -> &'static RwLock<Chain> {
let name = self.aliases.get(name).map(|a| a.as_str()).unwrap_or(name);
{
let reader = self.data.read();
if let Some(chain) = reader.get(name) {
return chain.clone();
return chain;
}
}
{
let mut writer = self.data.write();
writer
.entry(name.to_string())
.or_insert_with(|| Actor::spawn(name.to_string()))
.clone()
}
}
}

struct Msg {
block: Block,
indexer: IndexerId,
}

struct Actor;

impl Actor {
pub fn spawn(chain_name: String) -> ChainReader {
let chain: &'static RwLock<Chain> = Box::leak(Box::default());
let (tx, mut rx) = mpsc::unbounded_channel();
spawn(async move {
let mut msgs: Vec<Msg> = Default::default();
let mut timer = interval(Duration::from_secs(1));
timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
select! {
_ = rx.recv_many(&mut msgs, 32) => Self::handle_msgs(chain, &mut msgs),
_ = timer.tick() => {
let blocks_per_minute = chain.read().blocks_per_minute();
METRICS
.blocks_per_minute
.with_label_values(&[&chain_name])
.set(blocks_per_minute as i64);
},
}
}
});
ChainReader { tx, chain }
}

fn handle_msgs(chain: &RwLock<Chain>, msgs: &mut Vec<Msg>) {
{
let reader = chain.read();
msgs.retain(|Msg { block, indexer }| reader.should_insert(block, indexer));
}
{
let mut writer = chain.write();
for Msg { block, indexer } in msgs.drain(..) {
if writer.should_insert(&block, &indexer) {
writer.insert(block, indexer);
}
if !writer.contains_key(name) {
writer.insert(name.to_string(), Box::leak(Default::default()));
}
}
debug_assert!(msgs.is_empty());
self.data.read().get(name).unwrap()
}
}
Loading

0 comments on commit 76e8ad6

Please sign in to comment.