Skip to content

Commit

Permalink
Merge pull request #9 from Jurshsmith/checkpoint-last-ingested-block-…
Browse files Browse the repository at this point in the history
…at-latest-block-in-batch

Checkpoint last ingested block at latest block in batch
  • Loading branch information
Jurshsmith authored Sep 20, 2023
2 parents ed36540 + ec06d96 commit 24f9427
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 83 deletions.
2 changes: 1 addition & 1 deletion chaindexing-tests/src/factory/json_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ macro_rules! json_rpc_with_logs {
($contract_address:expr) => {{
use crate::json_rpc_with_logs;

json_rpc_with_logs!($contract_address, 3)
json_rpc_with_logs!($contract_address, 17774490)
}};
($contract_address:expr, $current_block_number:expr) => {{
use chaindexing::EventsIngesterJsonRpc;
Expand Down
17 changes: 12 additions & 5 deletions chaindexing-tests/src/tests/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ mod tests {

test_runner::run_test(&pool, |mut conn| async move {
let contracts = vec![bayc_contract()];
let json_rpc = Arc::new(json_rpc_with_logs!(BAYC_CONTRACT_ADDRESS, 3));
static CURRENT_BLOCK_NUMBER: u32 = BAYC_CONTRACT_START_BLOCK_NUMBER + 20;
let json_rpc = Arc::new(json_rpc_with_logs!(
BAYC_CONTRACT_ADDRESS,
CURRENT_BLOCK_NUMBER
));

assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty());
Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await;
Expand Down Expand Up @@ -72,7 +76,7 @@ mod tests {
}

#[tokio::test]
pub async fn checkpoints_last_ingested_block_to_current_block() {
pub async fn checkpoints_last_ingested_block_to_the_ingested_block_in_a_given_batch() {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |mut conn| async move {
Expand All @@ -86,16 +90,19 @@ mod tests {
Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await;

let conn = Arc::new(Mutex::new(conn));
EventsIngester::ingest(conn.clone(), &contracts, 10, json_rpc)
let blocks_per_batch = 10;
EventsIngester::ingest(conn.clone(), &contracts, blocks_per_batch, json_rpc)
.await
.unwrap();

let mut conn = conn.lock().await;
let contract_addresses = PostgresRepo::get_all_contract_addresses(&mut conn).await;
let bayc_contract_address = contract_addresses.first().unwrap();
let last_ingested_block_number =
bayc_contract_address.last_ingested_block_number as u64;
assert_eq!(
bayc_contract_address.last_ingested_block_number as u32,
CURRENT_BLOCK_NUMBER
last_ingested_block_number,
BAYC_CONTRACT_START_BLOCK_NUMBER as u64 + blocks_per_batch
);
})
.await;
Expand Down
11 changes: 1 addition & 10 deletions chaindexing/src/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ impl Contracts {
contracts
.iter()
.fold(HashMap::new(), |mut topics_by_contract_name, contract| {
// embrace mutability because Rust helps avoid bugs
topics_by_contract_name.insert(contract.name.clone(), contract.get_event_topics());

topics_by_contract_name
Expand Down Expand Up @@ -178,7 +177,7 @@ impl ContractAddressID {
#[diesel(table_name = chaindexing_contract_addresses)]
#[diesel(primary_key(id))]
pub struct ContractAddress {
id: i32,
pub id: i32,
chain_id: i32,
pub last_ingested_block_number: i64,
pub last_handled_block_number: i64,
Expand All @@ -199,11 +198,3 @@ impl ContractAddress {
.to_string()
}
}

pub struct ContractAddresses;

impl ContractAddresses {
pub fn get_ids(contract_addresses: &Vec<ContractAddress>) -> Vec<i32> {
contract_addresses.iter().map(|c| c.id).collect()
}
}
209 changes: 149 additions & 60 deletions chaindexing/src/events_ingester.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use ethers::prelude::Middleware;
use ethers::prelude::*;
use ethers::providers::{Http, Provider, ProviderError};
use ethers::types::{Address, Filter, Log};
use futures_util::future::try_join_all;
use ethers::types::{Address, Filter as EthersFilter, Log};
use futures_util::future::{join_all, try_join_all};
use futures_util::FutureExt;
use futures_util::StreamExt;
use tokio::sync::Mutex;
Expand All @@ -19,7 +20,7 @@ use crate::{ChaindexingRepo, ChaindexingRepoConn, Config, ContractAddress, Repo}
#[async_trait::async_trait]
pub trait EventsIngesterJsonRpc: Clone + Sync + Send {
async fn get_block_number(&self) -> Result<U64, ProviderError>;
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, ProviderError>;
async fn get_logs(&self, filter: &EthersFilter) -> Result<Vec<Log>, ProviderError>;
}

#[async_trait::async_trait]
Expand All @@ -28,7 +29,7 @@ impl EventsIngesterJsonRpc for Provider<Http> {
Middleware::get_block_number(&self).await
}

async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, ProviderError> {
async fn get_logs(&self, filter: &EthersFilter) -> Result<Vec<Log>, ProviderError> {
Middleware::get_logs(&self, filter).await
}
}
Expand Down Expand Up @@ -94,32 +95,33 @@ impl EventsIngester {
blocks_per_batch: u64,
json_rpc: Arc<impl EventsIngesterJsonRpc + 'static>,
) -> Result<(), EventsIngesterError> {
let current_block_number = json_rpc.get_block_number().await?;
let current_block_number = (json_rpc.get_block_number().await?).as_u64();

let mut contract_addresses_stream =
ChaindexingRepo::get_contract_addresses_stream(conn.clone()).await;

while let Some(contract_addresses) = contract_addresses_stream.next().await {
let contract_addresses = Self::get_only_uningested_contract_addresses(
&contract_addresses,
current_block_number,
);

let mut conn = conn.lock().await;
let filters = build_filters(
let filters = Filters::build(
&contract_addresses,
&contracts,
current_block_number.as_u64(),
current_block_number,
blocks_per_batch,
);
let logs = fetch_logs(&filters, &json_rpc).await.unwrap();
let logs = Self::fetch_logs(&filters, &json_rpc).await.unwrap();
let events = Events::new(&logs, &contracts);

ChaindexingRepo::run_in_transaction(&mut conn, move |conn| {
async move {
ChaindexingRepo::create_events(conn, &events.clone()).await;

ChaindexingRepo::update_last_ingested_block_number(
conn,
&contract_addresses.clone(),
current_block_number.as_u64() as i64,
)
.await;
Self::update_last_ingested_block_numbers(conn, &contract_addresses, &filters)
.await;

Ok(())
}
Expand All @@ -130,57 +132,144 @@ impl EventsIngester {

Ok(())
}

async fn update_last_ingested_block_numbers<'a>(
conn: &mut ChaindexingRepoConn<'a>,
contract_addresses: &Vec<ContractAddress>,
filters: &Vec<Filter>,
) {
let filters_by_contract_address_id = Filters::group_by_contract_address_id(filters);

let conn = Arc::new(Mutex::new(conn));
join_all(contract_addresses.iter().map(|contract_address| {
let filters = filters_by_contract_address_id
.get(&contract_address.id)
.unwrap();

let conn = conn.clone();
async move {
if let Some(latest_filter) = Filters::get_latest(filters) {
let last_ingested_block_number = latest_filter.value.get_to_block().unwrap();

let mut conn = conn.lock().await;
ChaindexingRepo::update_last_ingested_block_number(
&mut conn,
&contract_address,
last_ingested_block_number.as_u64() as i64,
)
.await
}
}
}))
.await;
}

fn get_only_uningested_contract_addresses(
contract_addresses: &Vec<ContractAddress>,
current_block_number: u64,
) -> Vec<ContractAddress> {
contract_addresses
.to_vec()
.into_iter()
.filter(|ca| current_block_number > ca.last_ingested_block_number as u64)
.collect()
}

async fn fetch_logs(
filters: &Vec<Filter>,
json_rpc: &Arc<impl EventsIngesterJsonRpc>,
) -> Result<Vec<Log>, EventsIngesterError> {
let logs_per_filter =
try_join_all(filters.iter().map(|f| json_rpc.get_logs(&f.value))).await?;

Ok(logs_per_filter.into_iter().flatten().collect())
}
}

#[derive(Clone, Debug)]
struct Filter {
contract_address_id: i32,
value: EthersFilter,
}

async fn fetch_logs(
filters: &Vec<Filter>,
json_rpc: &Arc<impl EventsIngesterJsonRpc>,
) -> Result<Vec<Log>, EventsIngesterError> {
let logs_per_filter = try_join_all(filters.iter().map(|f| json_rpc.get_logs(&f))).await?;
impl Filter {
fn build(
contract_address: &ContractAddress,
topics: &Vec<ContractEventTopic>,
current_block_number: u64,
blocks_per_batch: u64,
) -> Filter {
let last_ingested_block_number = contract_address.last_ingested_block_number as u64;

Ok(logs_per_filter.into_iter().flatten().collect())
Filter {
contract_address_id: contract_address.id,
value: EthersFilter::new()
// We could use multiple adddresses here but
// we'll rather not because it would affect the correctness of
// last_ingested_block_number since we stream the contracts upstream.
.address(contract_address.address.parse::<Address>().unwrap())
.topic0(topics.to_vec())
.from_block(last_ingested_block_number)
.to_block(std::cmp::min(
last_ingested_block_number + blocks_per_batch,
current_block_number,
)),
}
}
}

pub fn build_filters(
contract_addresses: &Vec<ContractAddress>,
contracts: &Vec<Contract>,
current_block_number: u64,
blocks_per_batch: u64,
) -> Vec<Filter> {
let topics_by_contract_name = Contracts::group_event_topics_by_names(contracts);

contract_addresses
.iter()
.map(|contract_address| {
build_filter(
contract_address,
topics_by_contract_name
struct Filters;

impl Filters {
fn build(
contract_addresses: &Vec<ContractAddress>,
contracts: &Vec<Contract>,
current_block_number: u64,
blocks_per_batch: u64,
) -> Vec<Filter> {
let topics_by_contract_name = Contracts::group_event_topics_by_names(contracts);

contract_addresses
.iter()
.map(|contract_address| {
let topics_by_contract_name = topics_by_contract_name
.get(contract_address.contract_name.as_str())
.unwrap(),
current_block_number,
blocks_per_batch,
)
})
.collect()
}
.unwrap();

fn build_filter(
contract_address: &ContractAddress,
topics: &Vec<ContractEventTopic>,
current_block_number: u64,
blocks_per_batch: u64,
) -> Filter {
let last_ingested_block_number = contract_address.last_ingested_block_number as u64;

Filter::new()
// We could use multiple adddresses here but
// we'll rather not because it would affect the correctness of
// last_ingested_block_number since we stream the contracts upstream.
.address(contract_address.address.parse::<Address>().unwrap())
.topic0(topics.to_vec())
.from_block(last_ingested_block_number)
.to_block(std::cmp::min(
last_ingested_block_number + blocks_per_batch,
current_block_number,
))
Filter::build(
contract_address,
topics_by_contract_name,
current_block_number,
blocks_per_batch,
)
})
.collect()
}

fn group_by_contract_address_id(filters: &Vec<Filter>) -> HashMap<i32, Vec<Filter>> {
let empty_filter_group = vec![];

filters.iter().fold(
HashMap::new(),
|mut filters_by_contract_address_id, filter| {
let mut filter_group = filters_by_contract_address_id
.get(&filter.contract_address_id)
.unwrap_or(&empty_filter_group)
.to_vec();

filter_group.push(filter.clone());

filters_by_contract_address_id.insert(filter.contract_address_id, filter_group);

filters_by_contract_address_id
},
)
}

fn get_latest(filters: &Vec<Filter>) -> Option<Filter> {
let mut filters = filters.clone();
filters.sort_by_key(|f| f.value.get_to_block());

filters.last().cloned()
}
}
10 changes: 4 additions & 6 deletions chaindexing/src/repos/postgres_repo.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use crate::{
contracts::{ContractAddress, ContractAddressID, ContractAddresses, UnsavedContractAddress},
contracts::{ContractAddress, ContractAddressID, UnsavedContractAddress},
events::Event,
};
use diesel_async::RunQueryDsl;
Expand Down Expand Up @@ -154,16 +154,14 @@ impl Repo for PostgresRepo {
}

async fn update_last_ingested_block_number<'a>(
conn: &mut Conn<'a>,
contract_addresses: &Vec<ContractAddress>,
conn: &mut Self::Conn<'a>,
contract_address: &ContractAddress,
block_number: i64,
) {
use crate::diesel::schema::chaindexing_contract_addresses::dsl::*;

let ids = ContractAddresses::get_ids(contract_addresses);

diesel::update(chaindexing_contract_addresses)
.filter(id.eq_any(ids))
.filter(id.eq(contract_address.id))
.set(last_ingested_block_number.eq(block_number))
.execute(conn)
.await
Expand Down
2 changes: 1 addition & 1 deletion chaindexing/src/repos/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub trait Repo: Sync + Send + Migratable + Clone {

async fn update_last_ingested_block_number<'a>(
conn: &mut Self::Conn<'a>,
contract_addresses: &Vec<ContractAddress>,
contract_address: &ContractAddress,
block_number: i64,
);
async fn update_last_handled_block_number<'a>(
Expand Down

0 comments on commit 24f9427

Please sign in to comment.