Skip to content

Commit

Permalink
Merge pull request #28 from Jurshsmith/resiliently-interact-with-json…
Browse files Browse the repository at this point in the history
…-rpc

Resiliently interact with JSON RPC
  • Loading branch information
Jurshsmith authored Oct 17, 2023
2 parents ea39400 + 5df2615 commit d69bdef
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 24 deletions.
6 changes: 3 additions & 3 deletions chaindexing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ impl Config {
chains,
contracts: vec![],
min_confirmation_count: MinConfirmationCount::new(40),
blocks_per_batch: 20,
handler_interval_ms: 10000,
ingestion_interval_ms: 10000,
blocks_per_batch: 10000,
handler_interval_ms: 4000,
ingestion_interval_ms: 4000,
reset_count: 0,
}
}
Expand Down
65 changes: 44 additions & 21 deletions chaindexing/src/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_util::FutureExt;
use futures_util::StreamExt;
use std::cmp::min;
use tokio::sync::Mutex;
use tokio::time::interval;
use tokio::time::{interval, sleep};

use crate::chain_reorg::{Execution, UnsavedReorgedBlock};
use crate::contracts::Contract;
Expand Down Expand Up @@ -41,20 +41,10 @@ impl EventsIngesterJsonRpc for Provider<Http> {

#[derive(Debug)]
pub enum EventsIngesterError {
HTTPError(String),
RepoConnectionError,
GenericError(String),
}

impl From<ProviderError> for EventsIngesterError {
fn from(value: ProviderError) -> Self {
match value {
ProviderError::HTTPError(error) => EventsIngesterError::HTTPError(error.to_string()),
other_error => EventsIngesterError::GenericError(other_error.to_string()),
}
}
}

impl From<RepoError> for EventsIngesterError {
fn from(value: RepoError) -> Self {
match value {
Expand Down Expand Up @@ -113,8 +103,7 @@ impl EventsIngester {
min_confirmation_count: &MinConfirmationCount,
run_confirmation_execution: bool,
) -> Result<(), EventsIngesterError> {
let current_block_number = (json_rpc.get_block_number().await?).as_u64();

let current_block_number = fetch_current_block_number(&json_rpc).await;
let mut contract_addresses_stream =
ChaindexingRepo::get_contract_addresses_stream(conn.clone());

Expand Down Expand Up @@ -186,7 +175,7 @@ impl MainExecution {
);

if !filters.is_empty() {
let logs = fetch_logs(&filters, json_rpc).await.unwrap();
let logs = fetch_logs(&filters, json_rpc).await;
let events = Events::new(&logs, &contracts);

ChaindexingRepo::run_in_transaction(conn, move |conn| {
Expand Down Expand Up @@ -290,7 +279,7 @@ impl ConfirmationExecution {
json_rpc: &Arc<impl EventsIngesterJsonRpc + 'static>,
contracts: &Vec<Contract>,
) -> Vec<Event> {
let logs = fetch_logs(&filters, json_rpc).await.unwrap();
let logs = fetch_logs(&filters, json_rpc).await;

Events::new(&logs, contracts)
}
Expand Down Expand Up @@ -371,13 +360,47 @@ impl ConfirmationExecution {
}
}

async fn fetch_logs(
filters: &Vec<Filter>,
json_rpc: &Arc<impl EventsIngesterJsonRpc>,
) -> Result<Vec<Log>, EventsIngesterError> {
let logs_per_filter = try_join_all(filters.iter().map(|f| json_rpc.get_logs(&f.value))).await?;
async fn fetch_current_block_number<'a>(json_rpc: &'a Arc<impl EventsIngesterJsonRpc>) -> u64 {
let mut maybe_current_block_number = None;

while maybe_current_block_number.is_none() {
match json_rpc.get_block_number().await {
Ok(current_block_number) => {
maybe_current_block_number = Some(current_block_number.as_u64())
}
Err(provider_error) => {
eprintln!("Provider Error: {}", provider_error);

backoff().await;
}
}
}

maybe_current_block_number.unwrap()
}
async fn fetch_logs(filters: &Vec<Filter>, json_rpc: &Arc<impl EventsIngesterJsonRpc>) -> Vec<Log> {
let mut maybe_logs = None;

while maybe_logs.is_none() {
match try_join_all(filters.iter().map(|f| json_rpc.get_logs(&f.value))).await {
Ok(logs_per_filter) => {
let logs = logs_per_filter.into_iter().flatten().collect();

maybe_logs = Some(logs)
}
Err(provider_error) => {
eprintln!("Provider Error: {}", provider_error);

backoff().await;
}
}
}

maybe_logs.unwrap()
}

Ok(logs_per_filter.into_iter().flatten().collect())
async fn backoff() {
sleep(Duration::from_secs(2u64.pow(2))).await;
}

struct Filters;
Expand Down

0 comments on commit d69bdef

Please sign in to comment.