Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce task struct boilercode in events processing #42

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions chaindexing/src/event_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use tokio::{sync::Mutex, time::interval};
use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo};
use crate::{ChaindexingRepoRawQueryTxnClient, HasRawQueryClient};

use handle_events::HandleEvents;

#[derive(Clone)]
pub struct EventHandlerContext<'a> {
pub event: Event,
Expand Down Expand Up @@ -54,7 +52,7 @@ impl EventHandlers {
loop {
interval.tick().await;

HandleEvents::run(
handle_events::run(
conn.clone(),
&event_handlers_by_event_abi,
&mut raw_query_client,
Expand Down
112 changes: 54 additions & 58 deletions chaindexing/src/event_handlers/handle_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,73 +11,69 @@ use crate::{

use super::{EventHandler, EventHandlerContext};

pub struct HandleEvents;
pub async fn run<'a>(
conn: Arc<Mutex<ChaindexingRepoConn<'a>>>,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler>>,
raw_query_client: &mut ChaindexingRepoRawQueryClient,
) {
let mut contract_addresses_stream =
ChaindexingRepo::get_contract_addresses_stream(conn.clone());

impl HandleEvents {
pub async fn run<'a>(
conn: Arc<Mutex<ChaindexingRepoConn<'a>>>,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler>>,
raw_query_client: &mut ChaindexingRepoRawQueryClient,
) {
let mut contract_addresses_stream =
ChaindexingRepo::get_contract_addresses_stream(conn.clone());

while let Some(contract_addresses) = contract_addresses_stream.next().await {
for contract_address in contract_addresses {
Self::handle_events_for_contract_address(
conn.clone(),
&contract_address,
event_handlers_by_event_abi,
raw_query_client,
)
.await
}
while let Some(contract_addresses) = contract_addresses_stream.next().await {
for contract_address in contract_addresses {
handle_events_for_contract_address(
conn.clone(),
&contract_address,
event_handlers_by_event_abi,
raw_query_client,
)
.await
}
}
}

async fn handle_events_for_contract_address<'a>(
conn: Arc<Mutex<ChaindexingRepoConn<'a>>>,
contract_address: &ContractAddress,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler>>,
raw_query_client: &mut ChaindexingRepoRawQueryClient,
) {
let mut events_stream = ChaindexingRepo::get_events_stream(
conn.clone(),
contract_address.next_block_number_to_handle_from,
);

while let Some(events) = events_stream.next().await {
// TODO: Move this filter to the stream query level
let mut events: Vec<Event> = events
.into_iter()
.filter(|event| {
event.match_contract_address(&contract_address.address) && event.not_removed()
})
.collect();
events.sort_by_key(|e| (e.block_number, e.log_index));
async fn handle_events_for_contract_address<'a>(
conn: Arc<Mutex<ChaindexingRepoConn<'a>>>,
contract_address: &ContractAddress,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler>>,
raw_query_client: &mut ChaindexingRepoRawQueryClient,
) {
let mut events_stream = ChaindexingRepo::get_events_stream(
conn.clone(),
contract_address.next_block_number_to_handle_from,
);

let raw_query_txn_client =
ChaindexingRepo::get_raw_query_txn_client(raw_query_client).await;
while let Some(events) = events_stream.next().await {
// TODO: Move this filter to the stream query level
let mut events: Vec<Event> = events
.into_iter()
.filter(|event| {
event.match_contract_address(&contract_address.address) && event.not_removed()
})
.collect();
events.sort_by_key(|e| (e.block_number, e.log_index));

for event in events.clone() {
let event_handler = event_handlers_by_event_abi.get(event.abi.as_str()).unwrap();
let event_handler_context =
EventHandlerContext::new(event.clone(), &raw_query_txn_client);
let raw_query_txn_client =
ChaindexingRepo::get_raw_query_txn_client(raw_query_client).await;

event_handler.handle_event(event_handler_context).await;
}
for event in events.clone() {
let event_handler = event_handlers_by_event_abi.get(event.abi.as_str()).unwrap();
let event_handler_context =
EventHandlerContext::new(event.clone(), &raw_query_txn_client);

if let Some(Event { block_number, .. }) = events.last() {
let next_block_number_to_handle_from = block_number + 1;
ChaindexingRepo::update_next_block_number_to_handle_from_in_txn(
&raw_query_txn_client,
contract_address.id(),
next_block_number_to_handle_from,
)
.await;
}
event_handler.handle_event(event_handler_context).await;
}

ChaindexingRepo::commit_raw_query_txns(raw_query_txn_client).await;
if let Some(Event { block_number, .. }) = events.last() {
let next_block_number_to_handle_from = block_number + 1;
ChaindexingRepo::update_next_block_number_to_handle_from_in_txn(
&raw_query_txn_client,
contract_address.id(),
next_block_number_to_handle_from,
)
.await;
}

ChaindexingRepo::commit_raw_query_txns(raw_query_txn_client).await;
}
}
4 changes: 1 addition & 3 deletions chaindexing/src/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use std::cmp::min;
use tokio::sync::Mutex;
use tokio::time::{interval, sleep};

use ingest_events::IngestEvents;

use crate::chain_reorg::Execution;
use crate::contracts::Contract;
use crate::contracts::{ContractEventTopic, Contracts};
Expand Down Expand Up @@ -151,7 +149,7 @@ impl EventsIngester {

let mut conn = conn.lock().await;

IngestEvents::run(
ingest_events::run(
&mut conn,
raw_query_client,
contract_addresses.clone(),
Expand Down
211 changes: 100 additions & 111 deletions chaindexing/src/events_ingester/ingest_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,126 +13,115 @@ use crate::{

use super::{fetch_blocks_by_number, fetch_logs, EventsIngesterError, Filter, Filters};

pub struct IngestEvents;

impl IngestEvents {
pub async fn run<'a>(
conn: &mut ChaindexingRepoConn<'a>,
raw_query_client: &ChaindexingRepoRawQueryClient,
contract_addresses: Vec<ContractAddress>,
contracts: &Vec<Contract>,
json_rpc: &Arc<impl EventsIngesterJsonRpc + 'static>,
current_block_number: u64,
blocks_per_batch: u64,
) -> Result<(), EventsIngesterError> {
let filters = Filters::new(
&contract_addresses,
&contracts,
current_block_number,
blocks_per_batch,
&Execution::Main,
);

let filters =
Self::remove_already_ingested_filters(&filters, &contract_addresses, raw_query_client)
.await;

if !filters.is_empty() {
let logs = fetch_logs(&filters, json_rpc).await;
let blocks_by_tx_hash = fetch_blocks_by_number(&logs, json_rpc).await;
let events = Events::new(&logs, &contracts, &blocks_by_tx_hash);

ChaindexingRepo::run_in_transaction(conn, move |conn| {
async move {
ChaindexingRepo::create_events(conn, &events.clone()).await;

Self::update_next_block_numbers_to_ingest_from(
conn,
&contract_addresses,
&filters,
)
.await;

Ok(())
}
.boxed()
})
.await?;
}

Ok(())
pub async fn run<'a>(
conn: &mut ChaindexingRepoConn<'a>,
raw_query_client: &ChaindexingRepoRawQueryClient,
contract_addresses: Vec<ContractAddress>,
contracts: &Vec<Contract>,
json_rpc: &Arc<impl EventsIngesterJsonRpc + 'static>,
current_block_number: u64,
blocks_per_batch: u64,
) -> Result<(), EventsIngesterError> {
let filters = Filters::new(
&contract_addresses,
&contracts,
current_block_number,
blocks_per_batch,
&Execution::Main,
);

let filters =
remove_already_ingested_filters(&filters, &contract_addresses, raw_query_client).await;

if !filters.is_empty() {
let logs = fetch_logs(&filters, json_rpc).await;
let blocks_by_tx_hash = fetch_blocks_by_number(&logs, json_rpc).await;
let events = Events::new(&logs, &contracts, &blocks_by_tx_hash);

ChaindexingRepo::run_in_transaction(conn, move |conn| {
async move {
ChaindexingRepo::create_events(conn, &events.clone()).await;

update_next_block_numbers_to_ingest_from(conn, &contract_addresses, &filters).await;

Ok(())
}
.boxed()
})
.await?;
}

async fn remove_already_ingested_filters(
filters: &Vec<Filter>,
contract_addresses: &Vec<ContractAddress>,
raw_query_client: &ChaindexingRepoRawQueryClient,
) -> Vec<Filter> {
let current_block_filters: Vec<_> = filters
.iter()
.filter(|f| f.value.get_from_block() == f.value.get_to_block())
.collect();

if current_block_filters.is_empty() {
filters.clone()
} else {
let addresses = contract_addresses.iter().map(|c| c.address.clone()).collect();

let latest_ingested_events =
ChaindexingRepo::load_latest_events(raw_query_client, &addresses).await;
let latest_ingested_events = latest_ingested_events.iter().fold(
HashMap::new(),
|mut events_by_address, event| {
Ok(())
}

async fn remove_already_ingested_filters(
filters: &Vec<Filter>,
contract_addresses: &Vec<ContractAddress>,
raw_query_client: &ChaindexingRepoRawQueryClient,
) -> Vec<Filter> {
let current_block_filters: Vec<_> = filters
.iter()
.filter(|f| f.value.get_from_block() == f.value.get_to_block())
.collect();

if current_block_filters.is_empty() {
filters.clone()
} else {
let addresses = contract_addresses.iter().map(|c| c.address.clone()).collect();

let latest_ingested_events =
ChaindexingRepo::load_latest_events(raw_query_client, &addresses).await;
let latest_ingested_events =
latest_ingested_events
.iter()
.fold(HashMap::new(), |mut events_by_address, event| {
events_by_address.insert(&event.contract_address, event);

events_by_address
},
);

let already_ingested_filters = current_block_filters
.iter()
.filter(|filter| match latest_ingested_events.get(&filter.address) {
Some(latest_event) => {
latest_event.block_number as u64
== filter.value.get_to_block().unwrap().as_u64()
}
None => false,
})
.fold(HashMap::new(), |mut stale_current_block_filters, filter| {
stale_current_block_filters.insert(filter.contract_address_id, filter);

stale_current_block_filters
});

filters
.iter()
.filter(|f| !already_ingested_filters.contains_key(&f.contract_address_id))
.cloned()
.collect::<Vec<_>>()
}
let already_ingested_filters = current_block_filters
.iter()
.filter(|filter| match latest_ingested_events.get(&filter.address) {
Some(latest_event) => {
latest_event.block_number as u64
== filter.value.get_to_block().unwrap().as_u64()
}
None => false,
})
.fold(HashMap::new(), |mut stale_current_block_filters, filter| {
stale_current_block_filters.insert(filter.contract_address_id, filter);

stale_current_block_filters
});

filters
.iter()
.filter(|f| !already_ingested_filters.contains_key(&f.contract_address_id))
.cloned()
.collect::<Vec<_>>()
}
}

async fn update_next_block_numbers_to_ingest_from<'a>(
conn: &mut ChaindexingRepoConn<'a>,
contract_addresses: &Vec<ContractAddress>,
filters: &Vec<Filter>,
) {
let filters_by_contract_address_id = Filters::group_by_contract_address_id(filters);

for contract_address in contract_addresses {
let filters = filters_by_contract_address_id.get(&contract_address.id).unwrap();

if let Some(latest_filter) = Filters::get_latest(filters) {
let next_block_number_to_ingest_from =
latest_filter.value.get_to_block().unwrap() + 1;

ChaindexingRepo::update_next_block_number_to_ingest_from(
conn,
&contract_address,
next_block_number_to_ingest_from.as_u64() as i64,
)
.await
}
async fn update_next_block_numbers_to_ingest_from<'a>(
conn: &mut ChaindexingRepoConn<'a>,
contract_addresses: &Vec<ContractAddress>,
filters: &Vec<Filter>,
) {
let filters_by_contract_address_id = Filters::group_by_contract_address_id(filters);

for contract_address in contract_addresses {
let filters = filters_by_contract_address_id.get(&contract_address.id).unwrap();

if let Some(latest_filter) = Filters::get_latest(filters) {
let next_block_number_to_ingest_from = latest_filter.value.get_to_block().unwrap() + 1;

ChaindexingRepo::update_next_block_number_to_ingest_from(
conn,
&contract_address,
next_block_number_to_ingest_from.as_u64() as i64,
)
.await
}
}
}