diff --git a/chaindexing-tests/src/factory/json_rpcs.rs b/chaindexing-tests/src/factory/json_rpcs.rs index a67f3a8..b8d68f9 100644 --- a/chaindexing-tests/src/factory/json_rpcs.rs +++ b/chaindexing-tests/src/factory/json_rpcs.rs @@ -24,7 +24,7 @@ macro_rules! json_rpc_with_logs { ($contract_address:expr) => {{ use crate::json_rpc_with_logs; - json_rpc_with_logs!($contract_address, 3) + json_rpc_with_logs!($contract_address, 17774490) }}; ($contract_address:expr, $current_block_number:expr) => {{ use chaindexing::EventsIngesterJsonRpc; diff --git a/chaindexing-tests/src/tests/events_ingester.rs b/chaindexing-tests/src/tests/events_ingester.rs index bbb1fd1..bc8682f 100644 --- a/chaindexing-tests/src/tests/events_ingester.rs +++ b/chaindexing-tests/src/tests/events_ingester.rs @@ -18,7 +18,11 @@ mod tests { test_runner::run_test(&pool, |mut conn| async move { let contracts = vec![bayc_contract()]; - let json_rpc = Arc::new(json_rpc_with_logs!(BAYC_CONTRACT_ADDRESS, 3)); + static CURRENT_BLOCK_NUMBER: u32 = BAYC_CONTRACT_START_BLOCK_NUMBER + 20; + let json_rpc = Arc::new(json_rpc_with_logs!( + BAYC_CONTRACT_ADDRESS, + CURRENT_BLOCK_NUMBER + )); assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty()); Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await; @@ -72,7 +76,7 @@ mod tests { } #[tokio::test] - pub async fn checkpoints_last_ingested_block_to_current_block() { + pub async fn checkpoints_last_ingested_block_to_the_ingested_block_in_a_given_batch() { let pool = test_runner::get_pool().await; test_runner::run_test(&pool, |mut conn| async move { @@ -86,16 +90,19 @@ mod tests { Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await; let conn = Arc::new(Mutex::new(conn)); - EventsIngester::ingest(conn.clone(), &contracts, 10, json_rpc) + let blocks_per_batch = 10; + EventsIngester::ingest(conn.clone(), &contracts, blocks_per_batch, json_rpc) .await .unwrap(); let mut conn = conn.lock().await; let contract_addresses = PostgresRepo::get_all_contract_addresses(&mut conn).await; let bayc_contract_address = contract_addresses.first().unwrap(); + let last_ingested_block_number = + bayc_contract_address.last_ingested_block_number as u64; assert_eq!( - bayc_contract_address.last_ingested_block_number as u32, - CURRENT_BLOCK_NUMBER + last_ingested_block_number, + BAYC_CONTRACT_START_BLOCK_NUMBER as u64 + blocks_per_batch ); }) .await; diff --git a/chaindexing/src/contracts.rs b/chaindexing/src/contracts.rs index 260a49a..9fcadce 100644 --- a/chaindexing/src/contracts.rs +++ b/chaindexing/src/contracts.rs @@ -123,7 +123,6 @@ impl Contracts { contracts .iter() .fold(HashMap::new(), |mut topics_by_contract_name, contract| { - // embrace mutability because Rust helps avoid bugs topics_by_contract_name.insert(contract.name.clone(), contract.get_event_topics()); topics_by_contract_name @@ -178,7 +177,7 @@ impl ContractAddressID { #[diesel(table_name = chaindexing_contract_addresses)] #[diesel(primary_key(id))] pub struct ContractAddress { - id: i32, + pub id: i32, chain_id: i32, pub last_ingested_block_number: i64, pub last_handled_block_number: i64, @@ -199,11 +198,3 @@ impl ContractAddress { .to_string() } } - -pub struct ContractAddresses; - -impl ContractAddresses { - pub fn get_ids(contract_addresses: &Vec) -> Vec { - contract_addresses.iter().map(|c| c.id).collect() - } -} diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index 7aef979..19cefda 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -1,11 +1,12 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use ethers::prelude::Middleware; use ethers::prelude::*; use ethers::providers::{Http, Provider, ProviderError}; -use ethers::types::{Address, Filter, Log}; -use futures_util::future::try_join_all; +use ethers::types::{Address, Filter as EthersFilter, Log}; +use futures_util::future::{join_all, try_join_all}; use futures_util::FutureExt; use futures_util::StreamExt; use tokio::sync::Mutex; @@ -19,7 +20,7 @@ use crate::{ChaindexingRepo, ChaindexingRepoConn, Config, ContractAddress, Repo} #[async_trait::async_trait] pub trait EventsIngesterJsonRpc: Clone + Sync + Send { async fn get_block_number(&self) -> Result; - async fn get_logs(&self, filter: &Filter) -> Result, ProviderError>; + async fn get_logs(&self, filter: &EthersFilter) -> Result, ProviderError>; } #[async_trait::async_trait] @@ -28,7 +29,7 @@ impl EventsIngesterJsonRpc for Provider { Middleware::get_block_number(&self).await } - async fn get_logs(&self, filter: &Filter) -> Result, ProviderError> { + async fn get_logs(&self, filter: &EthersFilter) -> Result, ProviderError> { Middleware::get_logs(&self, filter).await } } @@ -94,32 +95,33 @@ impl EventsIngester { blocks_per_batch: u64, json_rpc: Arc, ) -> Result<(), EventsIngesterError> { - let current_block_number = json_rpc.get_block_number().await?; + let current_block_number = (json_rpc.get_block_number().await?).as_u64(); let mut contract_addresses_stream = ChaindexingRepo::get_contract_addresses_stream(conn.clone()).await; while let Some(contract_addresses) = contract_addresses_stream.next().await { + let contract_addresses = Self::get_only_uningested_contract_addresses( + &contract_addresses, + current_block_number, + ); + let mut conn = conn.lock().await; - let filters = build_filters( + let filters = Filters::build( &contract_addresses, &contracts, - current_block_number.as_u64(), + current_block_number, blocks_per_batch, ); - let logs = fetch_logs(&filters, &json_rpc).await.unwrap(); + let logs = Self::fetch_logs(&filters, &json_rpc).await.unwrap(); let events = Events::new(&logs, &contracts); ChaindexingRepo::run_in_transaction(&mut conn, move |conn| { async move { ChaindexingRepo::create_events(conn, &events.clone()).await; - ChaindexingRepo::update_last_ingested_block_number( - conn, - &contract_addresses.clone(), - current_block_number.as_u64() as i64, - ) - .await; + Self::update_last_ingested_block_numbers(conn, &contract_addresses, &filters) + .await; Ok(()) } @@ -130,57 +132,144 @@ impl EventsIngester { Ok(()) } + + async fn update_last_ingested_block_numbers<'a>( + conn: &mut ChaindexingRepoConn<'a>, + contract_addresses: &Vec, + filters: &Vec, + ) { + let filters_by_contract_address_id = Filters::group_by_contract_address_id(filters); + + let conn = Arc::new(Mutex::new(conn)); + join_all(contract_addresses.iter().map(|contract_address| { + let filters = filters_by_contract_address_id + .get(&contract_address.id) + .unwrap(); + + let conn = conn.clone(); + async move { + if let Some(latest_filter) = Filters::get_latest(filters) { + let last_ingested_block_number = latest_filter.value.get_to_block().unwrap(); + + let mut conn = conn.lock().await; + ChaindexingRepo::update_last_ingested_block_number( + &mut conn, + &contract_address, + last_ingested_block_number.as_u64() as i64, + ) + .await + } + } + })) + .await; + } + + fn get_only_uningested_contract_addresses( + contract_addresses: &Vec, + current_block_number: u64, + ) -> Vec { + contract_addresses + .to_vec() + .into_iter() + .filter(|ca| current_block_number > ca.last_ingested_block_number as u64) + .collect() + } + + async fn fetch_logs( + filters: &Vec, + json_rpc: &Arc, + ) -> Result, EventsIngesterError> { + let logs_per_filter = + try_join_all(filters.iter().map(|f| json_rpc.get_logs(&f.value))).await?; + + Ok(logs_per_filter.into_iter().flatten().collect()) + } +} + +#[derive(Clone, Debug)] +struct Filter { + contract_address_id: i32, + value: EthersFilter, } -async fn fetch_logs( - filters: &Vec, - json_rpc: &Arc, -) -> Result, EventsIngesterError> { - let logs_per_filter = try_join_all(filters.iter().map(|f| json_rpc.get_logs(&f))).await?; +impl Filter { + fn build( + contract_address: &ContractAddress, + topics: &Vec, + current_block_number: u64, + blocks_per_batch: u64, + ) -> Filter { + let last_ingested_block_number = contract_address.last_ingested_block_number as u64; - Ok(logs_per_filter.into_iter().flatten().collect()) + Filter { + contract_address_id: contract_address.id, + value: EthersFilter::new() + // We could use multiple adddresses here but + // we'll rather not because it would affect the correctness of + // last_ingested_block_number since we stream the contracts upstream. + .address(contract_address.address.parse::
().unwrap()) + .topic0(topics.to_vec()) + .from_block(last_ingested_block_number) + .to_block(std::cmp::min( + last_ingested_block_number + blocks_per_batch, + current_block_number, + )), + } + } } -pub fn build_filters( - contract_addresses: &Vec, - contracts: &Vec, - current_block_number: u64, - blocks_per_batch: u64, -) -> Vec { - let topics_by_contract_name = Contracts::group_event_topics_by_names(contracts); - - contract_addresses - .iter() - .map(|contract_address| { - build_filter( - contract_address, - topics_by_contract_name +struct Filters; + +impl Filters { + fn build( + contract_addresses: &Vec, + contracts: &Vec, + current_block_number: u64, + blocks_per_batch: u64, + ) -> Vec { + let topics_by_contract_name = Contracts::group_event_topics_by_names(contracts); + + contract_addresses + .iter() + .map(|contract_address| { + let topics_by_contract_name = topics_by_contract_name .get(contract_address.contract_name.as_str()) - .unwrap(), - current_block_number, - blocks_per_batch, - ) - }) - .collect() -} + .unwrap(); -fn build_filter( - contract_address: &ContractAddress, - topics: &Vec, - current_block_number: u64, - blocks_per_batch: u64, -) -> Filter { - let last_ingested_block_number = contract_address.last_ingested_block_number as u64; - - Filter::new() - // We could use multiple adddresses here but - // we'll rather not because it would affect the correctness of - // last_ingested_block_number since we stream the contracts upstream. - .address(contract_address.address.parse::
().unwrap()) - .topic0(topics.to_vec()) - .from_block(last_ingested_block_number) - .to_block(std::cmp::min( - last_ingested_block_number + blocks_per_batch, - current_block_number, - )) + Filter::build( + contract_address, + topics_by_contract_name, + current_block_number, + blocks_per_batch, + ) + }) + .collect() + } + + fn group_by_contract_address_id(filters: &Vec) -> HashMap> { + let empty_filter_group = vec![]; + + filters.iter().fold( + HashMap::new(), + |mut filters_by_contract_address_id, filter| { + let mut filter_group = filters_by_contract_address_id + .get(&filter.contract_address_id) + .unwrap_or(&empty_filter_group) + .to_vec(); + + filter_group.push(filter.clone()); + + filters_by_contract_address_id.insert(filter.contract_address_id, filter_group); + + filters_by_contract_address_id + }, + ) + } + + fn get_latest(filters: &Vec) -> Option { + let mut filters = filters.clone(); + filters.sort_by_key(|f| f.value.get_to_block()); + + filters.last().cloned() + } } diff --git a/chaindexing/src/repos/postgres_repo.rs b/chaindexing/src/repos/postgres_repo.rs index cb92b10..5cfa717 100644 --- a/chaindexing/src/repos/postgres_repo.rs +++ b/chaindexing/src/repos/postgres_repo.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::{ - contracts::{ContractAddress, ContractAddressID, ContractAddresses, UnsavedContractAddress}, + contracts::{ContractAddress, ContractAddressID, UnsavedContractAddress}, events::Event, }; use diesel_async::RunQueryDsl; @@ -154,16 +154,14 @@ impl Repo for PostgresRepo { } async fn update_last_ingested_block_number<'a>( - conn: &mut Conn<'a>, - contract_addresses: &Vec, + conn: &mut Self::Conn<'a>, + contract_address: &ContractAddress, block_number: i64, ) { use crate::diesel::schema::chaindexing_contract_addresses::dsl::*; - let ids = ContractAddresses::get_ids(contract_addresses); - diesel::update(chaindexing_contract_addresses) - .filter(id.eq_any(ids)) + .filter(id.eq(contract_address.id)) .set(last_ingested_block_number.eq(block_number)) .execute(conn) .await diff --git a/chaindexing/src/repos/repo.rs b/chaindexing/src/repos/repo.rs index 7d277f5..7b79271 100644 --- a/chaindexing/src/repos/repo.rs +++ b/chaindexing/src/repos/repo.rs @@ -48,7 +48,7 @@ pub trait Repo: Sync + Send + Migratable + Clone { async fn update_last_ingested_block_number<'a>( conn: &mut Self::Conn<'a>, - contract_addresses: &Vec, + contract_address: &ContractAddress, block_number: i64, ); async fn update_last_handled_block_number<'a>(