Skip to content

Commit

Permalink
Update 'streamer' semantics to 'stream'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jurshsmith committed Sep 17, 2023
1 parent ee8ae39 commit 9e30701
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
10 changes: 5 additions & 5 deletions chaindexing/src/event_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ impl EventHandlers {
conn: Arc<Mutex<ChaindexingRepoConn<'a>>>,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler>>,
) {
let mut contract_addresses_streamer =
ChaindexingRepo::get_contract_addresses_streamer(conn.clone()).await;
let mut contract_addresses_stream =
ChaindexingRepo::get_contract_addresses_stream(conn.clone()).await;

while let Some(contract_addresses) = contract_addresses_streamer.next().await {
while let Some(contract_addresses) = contract_addresses_stream.next().await {
stream::iter(contract_addresses)
.for_each(|contract_address| {
let conn = conn.clone();
Expand All @@ -66,13 +66,13 @@ impl EventHandlers {
contract_address: &ContractAddress,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler>>,
) {
let mut events_streamer = ChaindexingRepo::get_events_streamer(
let mut events_stream = ChaindexingRepo::get_events_stream(
conn.clone(),
contract_address.last_handled_block_number,
)
.await;

while let Some(mut events) = events_streamer.next().await {
while let Some(mut events) = events_stream.next().await {
events.sort_by_key(|e| (e.block_number, e.log_index));

join_all(events.iter().map(|event| {
Expand Down
6 changes: 3 additions & 3 deletions chaindexing/src/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ impl EventsIngester {
) -> Result<(), EventsIngesterError> {
let current_block_number = json_rpc.get_block_number().await?;

let mut contract_addresses_streamer =
ChaindexingRepo::get_contract_addresses_streamer(conn.clone()).await;
let mut contract_addresses_stream =
ChaindexingRepo::get_contract_addresses_stream(conn.clone()).await;

while let Some(contract_addresses) = contract_addresses_streamer.next().await {
while let Some(contract_addresses) = contract_addresses_stream.next().await {
let mut conn = conn.lock().await;
let filters = build_filters(
&contract_addresses,
Expand Down
4 changes: 2 additions & 2 deletions chaindexing/src/repos/postgres_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Repo for PostgresRepo {
chaindexing_contract_addresses.load(conn).await.unwrap()
}

async fn get_contract_addresses_streamer<'a>(
async fn get_contract_addresses_stream<'a>(
conn: Arc<Mutex<Conn<'a>>>,
) -> Box<dyn Stream<Item = Vec<ContractAddress>> + Send + Unpin + 'a> {
use crate::diesel::schema::chaindexing_contract_addresses::dsl::*;
Expand All @@ -117,7 +117,7 @@ impl Repo for PostgresRepo {
)
}

async fn get_events_streamer<'a>(
async fn get_events_stream<'a>(
conn: Arc<Mutex<Conn<'a>>>,
from: i64,
) -> Box<dyn Stream<Item = Vec<Event>> + Send + Unpin + 'a> {
Expand Down
4 changes: 2 additions & 2 deletions chaindexing/src/repos/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ pub trait Repo: Sync + Send + Migratable + Clone {
contract_addresses: &Vec<UnsavedContractAddress>,
);
async fn get_all_contract_addresses<'a>(conn: &mut Self::Conn<'a>) -> Vec<ContractAddress>;
async fn get_contract_addresses_streamer<'a>(
async fn get_contract_addresses_stream<'a>(
conn: Arc<Mutex<Self::Conn<'a>>>,
) -> Box<dyn Stream<Item = Vec<ContractAddress>> + Send + Unpin + 'a>;

async fn create_events<'a>(conn: &mut Self::Conn<'a>, events: &Vec<Event>);
async fn get_all_events<'a>(conn: &mut Self::Conn<'a>) -> Vec<Event>;
async fn get_events_streamer<'a>(
async fn get_events_stream<'a>(
conn: Arc<Mutex<Self::Conn<'a>>>,
from: i64,
) -> Box<dyn Stream<Item = Vec<Event>> + Send + Unpin + 'a>;
Expand Down

0 comments on commit 9e30701

Please sign in to comment.