From 9e30701c2bca390712b8d2cd5f0d7e5249cc7591 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Sun, 17 Sep 2023 17:27:50 +0100 Subject: [PATCH] Update 'streamer' semantics to 'stream' --- chaindexing/src/event_handlers.rs | 10 +++++----- chaindexing/src/events_ingester.rs | 6 +++--- chaindexing/src/repos/postgres_repo.rs | 4 ++-- chaindexing/src/repos/repo.rs | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/chaindexing/src/event_handlers.rs b/chaindexing/src/event_handlers.rs index e37dfd4..4164a78 100644 --- a/chaindexing/src/event_handlers.rs +++ b/chaindexing/src/event_handlers.rs @@ -40,10 +40,10 @@ impl EventHandlers { conn: Arc>>, event_handlers_by_event_abi: &HashMap<&str, Arc>, ) { - let mut contract_addresses_streamer = - ChaindexingRepo::get_contract_addresses_streamer(conn.clone()).await; + let mut contract_addresses_stream = + ChaindexingRepo::get_contract_addresses_stream(conn.clone()).await; - while let Some(contract_addresses) = contract_addresses_streamer.next().await { + while let Some(contract_addresses) = contract_addresses_stream.next().await { stream::iter(contract_addresses) .for_each(|contract_address| { let conn = conn.clone(); @@ -66,13 +66,13 @@ impl EventHandlers { contract_address: &ContractAddress, event_handlers_by_event_abi: &HashMap<&str, Arc>, ) { - let mut events_streamer = ChaindexingRepo::get_events_streamer( + let mut events_stream = ChaindexingRepo::get_events_stream( conn.clone(), contract_address.last_handled_block_number, ) .await; - while let Some(mut events) = events_streamer.next().await { + while let Some(mut events) = events_stream.next().await { events.sort_by_key(|e| (e.block_number, e.log_index)); join_all(events.iter().map(|event| { diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index c15b683..7aef979 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -96,10 +96,10 @@ impl EventsIngester { ) -> Result<(), EventsIngesterError> { let current_block_number = json_rpc.get_block_number().await?; - let mut contract_addresses_streamer = - ChaindexingRepo::get_contract_addresses_streamer(conn.clone()).await; + let mut contract_addresses_stream = + ChaindexingRepo::get_contract_addresses_stream(conn.clone()).await; - while let Some(contract_addresses) = contract_addresses_streamer.next().await { + while let Some(contract_addresses) = contract_addresses_stream.next().await { let mut conn = conn.lock().await; let filters = build_filters( &contract_addresses, diff --git a/chaindexing/src/repos/postgres_repo.rs b/chaindexing/src/repos/postgres_repo.rs index 98f6a26..cb92b10 100644 --- a/chaindexing/src/repos/postgres_repo.rs +++ b/chaindexing/src/repos/postgres_repo.rs @@ -102,7 +102,7 @@ impl Repo for PostgresRepo { chaindexing_contract_addresses.load(conn).await.unwrap() } - async fn get_contract_addresses_streamer<'a>( + async fn get_contract_addresses_stream<'a>( conn: Arc>>, ) -> Box> + Send + Unpin + 'a> { use crate::diesel::schema::chaindexing_contract_addresses::dsl::*; @@ -117,7 +117,7 @@ impl Repo for PostgresRepo { ) } - async fn get_events_streamer<'a>( + async fn get_events_stream<'a>( conn: Arc>>, from: i64, ) -> Box> + Send + Unpin + 'a> { diff --git a/chaindexing/src/repos/repo.rs b/chaindexing/src/repos/repo.rs index 65d2d15..7d277f5 100644 --- a/chaindexing/src/repos/repo.rs +++ b/chaindexing/src/repos/repo.rs @@ -35,13 +35,13 @@ pub trait Repo: Sync + Send + Migratable + Clone { contract_addresses: &Vec, ); async fn get_all_contract_addresses<'a>(conn: &mut Self::Conn<'a>) -> Vec; - async fn get_contract_addresses_streamer<'a>( + async fn get_contract_addresses_stream<'a>( conn: Arc>>, ) -> Box> + Send + Unpin + 'a>; async fn create_events<'a>(conn: &mut Self::Conn<'a>, events: &Vec); async fn get_all_events<'a>(conn: &mut Self::Conn<'a>) -> Vec; - async fn get_events_streamer<'a>( + async fn get_events_stream<'a>( conn: Arc>>, from: i64, ) -> Box> + Send + Unpin + 'a>;