From 76e8ad67084f5978db506e95bcf9389f7c57a93d Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Fri, 15 Nov 2024 11:20:04 -0500 Subject: [PATCH] feat: update chains without rewrites --- src/chain.rs | 73 ++++++++++++++++++++------------- src/chains.rs | 91 +++++------------------------------------ src/client_query.rs | 99 +++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 147 insertions(+), 116 deletions(-) diff --git a/src/chain.rs b/src/chain.rs index 8e77c805..bf328352 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -1,19 +1,34 @@ use std::{ collections::{BTreeMap, BTreeSet}, iter, + time::Duration, }; use thegraph_core::{BlockHash, IndexerId}; +use tokio::time::Instant; use crate::blocks::Block; #[derive(Default)] -pub struct Chain(BTreeMap>); +pub struct Chain { + blocks: BTreeMap>, + update: parking_lot::Mutex>, +} const MAX_LEN: usize = 512; const DEFAULT_BLOCKS_PER_MINUTE: u64 = 6; +const UPDATE_INTERVAL: Duration = Duration::from_secs(1); impl Chain { + pub fn update_ticket(&self) -> Option<()> { + let mut update = self.update.try_lock()?; + if matches!(*update, Some(t) if t.elapsed() < UPDATE_INTERVAL) { + return None; + } + *update = Some(Instant::now()); + Some(()) + } + pub fn latest(&self) -> Option<&Block> { self.consensus_blocks().next() } @@ -38,33 +53,39 @@ impl Chain { (bps * 60.0) as u64 } - pub fn should_insert(&self, block: &Block, indexer: &IndexerId) -> bool { + pub fn insert(&mut self, block: Block, indexer: IndexerId) { + tracing::trace!(%indexer, ?block); + if !self.should_insert(&block, &indexer) { + return; + } + if self.blocks.len() >= MAX_LEN { + self.evict(); + } + self.blocks.entry(block).or_default().insert(indexer); + } + + fn should_insert(&self, block: &Block, indexer: &IndexerId) -> bool { let redundant = self - .0 + .blocks .get(block) .map(|indexers| indexers.contains(indexer)) .unwrap_or(false); - let lowest_block = self.0.first_key_value().map(|(b, _)| b.number).unwrap_or(0); - let has_space = (self.0.len() < MAX_LEN) || (block.number > lowest_block); + let lowest_block = self + .blocks + .first_key_value() + .map(|(b, _)| b.number) + .unwrap_or(0); + let has_space = (self.blocks.len() < MAX_LEN) || (block.number > lowest_block); !redundant && has_space } - pub fn insert(&mut self, block: Block, indexer: IndexerId) { - tracing::trace!(%indexer, ?block); - debug_assert!(self.should_insert(&block, &indexer)); - if self.0.len() >= MAX_LEN { - self.evict(); - } - self.0.entry(block).or_default().insert(indexer); - } - /// Remove all entries associated with the lowest block number. fn evict(&mut self) { - let min_block = match self.0.pop_first() { + let min_block = match self.blocks.pop_first() { Some((min_block, _)) => min_block, None => return, }; - while let Some(entry) = self.0.first_entry() { + while let Some(entry) = self.blocks.first_entry() { debug_assert!(entry.key().number >= min_block.number); if entry.key().number > min_block.number { break; @@ -100,7 +121,7 @@ impl Chain { } } ConsensusBlocks { - blocks: self.0.iter().rev().peekable(), + blocks: self.blocks.iter().rev().peekable(), } } } @@ -108,7 +129,7 @@ impl Chain { #[cfg(test)] mod tests { use alloy_primitives::U256; - use itertools::Itertools; + use itertools::Itertools as _; use rand::{ rngs::SmallRng, seq::SliceRandom as _, thread_rng, Rng as _, RngCore as _, SeedableRng, }; @@ -137,32 +158,30 @@ mod tests { timestamp, }; let indexer = *indexers.choose(&mut rng).unwrap(); - if chain.should_insert(&block, &indexer) { - chain.insert(block, indexer); - } + chain.insert(block, indexer); } - // println!("{:#?}", chain.0); + // println!("{:#?}", chain.blocks); // println!("{:#?}", chain.consensus_blocks().collect::>()); - assert!(chain.0.len() <= MAX_LEN, "chain len above max"); - assert!(chain.consensus_blocks().count() <= chain.0.len()); + assert!(chain.blocks.len() <= MAX_LEN, "chain len above max"); + assert!(chain.consensus_blocks().count() <= chain.blocks.len()); assert!(chain.blocks_per_minute() > 0); - let blocks = || chain.0.keys(); + let blocks = || chain.blocks.keys(); assert!( blocks().tuple_windows().all(|(a, b)| a.number <= b.number), "chain block numbers not monotonic, check ord impl" ); for block in chain.consensus_blocks() { let max_fork_indexers = chain - .0 + .blocks .iter() .filter(|(block, _)| (block != block) && (block.number == block.number)) .map(|(_, indexers)| indexers.len()) .max() .unwrap_or(0); assert!( - chain.0.get(block).unwrap().len() > max_fork_indexers, + chain.blocks.get(block).unwrap().len() > max_fork_indexers, "consensus block without majority consensus" ); } diff --git a/src/chains.rs b/src/chains.rs index 096e9eb3..636602e0 100644 --- a/src/chains.rs +++ b/src/chains.rs @@ -1,36 +1,11 @@ -use std::{ - collections::{BTreeMap, HashMap}, - time::Duration, -}; +use std::collections::{BTreeMap, HashMap}; -use parking_lot::{RwLock, RwLockReadGuard}; -use thegraph_core::IndexerId; -use tokio::{ - select, spawn, - sync::mpsc, - time::{interval, MissedTickBehavior}, -}; +use parking_lot::RwLock; -use crate::{blocks::Block, chain::Chain, metrics::METRICS}; - -#[derive(Clone)] -pub struct ChainReader { - tx: mpsc::UnboundedSender, - chain: &'static RwLock, -} - -impl ChainReader { - pub fn read(&self) -> RwLockReadGuard<'_, Chain> { - self.chain.read() - } - - pub fn notify(&self, block: Block, indexer: IndexerId) { - let _ = self.tx.send(Msg { block, indexer }); - } -} +use crate::chain::Chain; pub struct Chains { - data: RwLock>, + data: RwLock>>, aliases: BTreeMap, } @@ -42,68 +17,20 @@ impl Chains { } } - pub fn chain(&self, name: &str) -> ChainReader { + pub fn chain(&self, name: &str) -> &'static RwLock { let name = self.aliases.get(name).map(|a| a.as_str()).unwrap_or(name); { let reader = self.data.read(); if let Some(chain) = reader.get(name) { - return chain.clone(); + return chain; } } { let mut writer = self.data.write(); - writer - .entry(name.to_string()) - .or_insert_with(|| Actor::spawn(name.to_string())) - .clone() - } - } -} - -struct Msg { - block: Block, - indexer: IndexerId, -} - -struct Actor; - -impl Actor { - pub fn spawn(chain_name: String) -> ChainReader { - let chain: &'static RwLock = Box::leak(Box::default()); - let (tx, mut rx) = mpsc::unbounded_channel(); - spawn(async move { - let mut msgs: Vec = Default::default(); - let mut timer = interval(Duration::from_secs(1)); - timer.set_missed_tick_behavior(MissedTickBehavior::Delay); - loop { - select! { - _ = rx.recv_many(&mut msgs, 32) => Self::handle_msgs(chain, &mut msgs), - _ = timer.tick() => { - let blocks_per_minute = chain.read().blocks_per_minute(); - METRICS - .blocks_per_minute - .with_label_values(&[&chain_name]) - .set(blocks_per_minute as i64); - }, - } - } - }); - ChainReader { tx, chain } - } - - fn handle_msgs(chain: &RwLock, msgs: &mut Vec) { - { - let reader = chain.read(); - msgs.retain(|Msg { block, indexer }| reader.should_insert(block, indexer)); - } - { - let mut writer = chain.write(); - for Msg { block, indexer } in msgs.drain(..) { - if writer.should_insert(&block, &indexer) { - writer.insert(block, indexer); - } + if !writer.contains_key(name) { + writer.insert(name.to_string(), Box::leak(Default::default())); } } - debug_assert!(msgs.is_empty()); + self.data.read().get(name).unwrap() } } diff --git a/src/client_query.rs b/src/client_query.rs index 1a4c1bb3..2b8e8982 100644 --- a/src/client_query.rs +++ b/src/client_query.rs @@ -4,6 +4,7 @@ use std::{ time::{Duration, Instant}, }; +use alloy_sol_types::Eip712Domain; use anyhow::anyhow; use axum::{ extract::{Path, State}, @@ -15,6 +16,7 @@ use headers::ContentType; use indexer_selection::{ArrayVec, Candidate, Normalized}; use ordered_float::NotNan; use rand::{thread_rng, Rng as _}; +use serde::Deserialize; use thegraph_core::{AllocationId, BlockNumber, DeploymentId, IndexerId}; use tokio::sync::mpsc; use tracing::{info_span, Instrument as _}; @@ -24,15 +26,16 @@ use self::{attestation_header::GraphAttestation, context::Context, query_selecto use crate::{ auth::AuthSettings, block_constraints::{resolve_block_requirements, BlockRequirements}, + blocks::Block, budgets::USD, errors::{Error, IndexerError, IndexerErrors, MissingBlockError, UnavailableReason}, http_ext::HttpBuilderExt as _, - indexer_client::{IndexerAuth, IndexerResponse}, + indexer_client::{IndexerAuth, IndexerClient, IndexerResponse}, indexing_performance, metrics::{with_metric, METRICS}, middleware::RequestId, network::{self, DeploymentError, Indexing, IndexingId, ResolvedSubgraphInfo, SubgraphError}, - receipts::ReceiptStatus, + receipts::{Receipt, ReceiptStatus}, reports, }; @@ -171,8 +174,8 @@ async fn run_indexer_queries( let grt_per_usd = *ctx.grt_per_usd.borrow(); // Get the chain information for the resolved subgraph - let chain = ctx.chains.chain(&subgraph.chain); - let (chain_head, blocks_per_minute, block_requirements) = { + let (chain_head, blocks_per_minute, block_requirements, chain_update) = { + let chain = ctx.chains.chain(&subgraph.chain); let chain_reader = chain.read(); // Get the chain head block number. Try to get it from the chain head tracker service, if it @@ -201,7 +204,12 @@ async fn run_indexer_queries( } }; - (chain_head, blocks_per_minute, block_requirements) + ( + chain_head, + blocks_per_minute, + block_requirements, + chain_reader.update_ticket().is_some(), + ) }; tracing::debug!(chain_head, blocks_per_minute, ?block_requirements); @@ -364,7 +372,12 @@ async fn run_indexer_queries( .map(|i| i.receipt.grt_value() as f64 * 1e-18) .sum(); let total_fees_usd = USD(NotNan::new(total_fees_grt / *grt_per_usd).unwrap()); - let _ = ctx.budgeter.feedback.send(total_fees_usd); + + if chain_update { + let _ = ctx.budgeter.feedback.send(USD(total_fees_usd.0 * 2.0)); + } else { + let _ = ctx.budgeter.feedback.send(total_fees_usd); + } for indexer_request in &indexer_requests { let latest_block = match &indexer_request.result { @@ -379,7 +392,9 @@ async fn run_indexer_queries( latest_block, ); - // TODO: chain.notify(...); + if chain_update && indexer_request.result.is_ok() { + update_chain(&ctx, indexer_request); + } let deployment = indexer_request.deployment.to_string(); let indexer = format!("{:?}", indexer_request.indexer); @@ -799,6 +814,76 @@ pub async fn handle_indexer_query( ) } +fn update_chain(ctx: &Context, indexer_request: &reports::IndexerRequest) { + let allocation = indexer_request.receipt.allocation().into(); + let receipt = match &indexer_request.receipt { + Receipt::Legacy(fee, _) => ctx.receipt_signer.create_legacy_receipt(allocation, *fee), + Receipt::Tap(r) => ctx + .receipt_signer + .create_receipt(allocation, r.message.value), + }; + let attestation_domain = ctx.attestation_domain.clone(); + let indexer_client = ctx.indexer_client.clone(); + let deployment_url = indexer_request + .url + .parse::() + .unwrap() + .join(&format!("subgraphs/id/{}", indexer_request.deployment)) + .unwrap(); + let subgraph_chain = indexer_request.subgraph_chain.clone(); + let chains = ctx.chains; + let indexer = indexer_request.indexer; + tokio::spawn(async move { + match update_chain_inner(indexer_client, deployment_url, receipt, &attestation_domain).await + { + Ok(block) => { + { + let mut chain = chains.chain(&subgraph_chain).write(); + chain.insert(block, indexer); + } + let blocks_per_minute = chains.chain(&subgraph_chain).read().blocks_per_minute(); + METRICS + .blocks_per_minute + .with_label_values(&[&subgraph_chain]) + .set(blocks_per_minute as i64); + } + Err(err) => { + tracing::debug!(chain_update_err = format!("{err:#}")); + } + }; + }); +} +async fn update_chain_inner( + indexer_client: IndexerClient, + deployment_url: Url, + receipt: anyhow::Result, + attestation_domain: &Eip712Domain, +) -> anyhow::Result { + let query = r#"{"query":"{meta:_meta{block{number,hash,timestamp}}}"}"#; + let receipt = receipt?; + let auth = IndexerAuth::Paid(&receipt, attestation_domain); + let response = indexer_client + .query_indexer(deployment_url, auth, query) + .await?; + if !response.errors.is_empty() { + anyhow::bail!(anyhow!("indexer errors: {:?}", response.errors)); + } + #[derive(Debug, Deserialize)] + pub struct QueryResponse { + data: QueryData, + } + #[derive(Debug, Deserialize)] + pub struct QueryData { + meta: Meta, + } + #[derive(Debug, Deserialize)] + pub struct Meta { + block: Block, + } + let response: QueryResponse = serde_json::from_str(&response.response)?; + Ok(response.data.meta.block) +} + #[cfg(test)] mod tests { mod require_req_auth {