Skip to content

Commit

Permalink
Merge pull request #13 from Jurshsmith/dispatch-event-handlers-dynami…
Browse files Browse the repository at this point in the history
…cally

Dispatch event handlers dynamically
  • Loading branch information
Jurshsmith authored Sep 24, 2023
2 parents 13e8941 + b62037f commit bfa1352
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 63 deletions.
10 changes: 7 additions & 3 deletions chaindexing-tests/src/factory/contracts.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use chaindexing::{Chain, Contract};

use super::TestEventHandler;
use super::{ApprovalForAllTestEventHandler, TestContractState, TransferTestEventHandler};

pub const TRANSFER_EVENT_ABI: &str =
"event Transfer(address indexed from, address indexed to, uint256 indexed tokenId)";

pub const APPROCAL_EVENT_ABI: &str =
"event ApprovalForAll(address indexed owner, address indexed operator, bool approved)";

pub const BAYC_CONTRACT_ADDRESS: &str = "0xBC4CA0EdA7647A8aB7C2061c2E118A18a936f13D";
pub const BAYC_CONTRACT_START_BLOCK_NUMBER: u32 = 17773490;
pub fn bayc_contract() -> Contract {
pub fn bayc_contract() -> Contract<TestContractState> {
Contract::new("BoredApeYachtClub")
.add_event(TRANSFER_EVENT_ABI, TestEventHandler)
.add_event(TRANSFER_EVENT_ABI, TransferTestEventHandler)
.add_event(APPROCAL_EVENT_ABI, ApprovalForAllTestEventHandler)
.add_address(BAYC_CONTRACT_ADDRESS, &Chain::Mainnet, 17773490)
}
39 changes: 35 additions & 4 deletions chaindexing-tests/src/factory/event_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,39 @@
use chaindexing::{Event, EventHandler};
use chaindexing::{ContractState, Event, EventHandler};

pub struct TestEventHandler;
#[derive(Clone, Debug)]
pub enum TestContractState {
NftState(NftState),
NftOperatorState(NftOperatorState),
}

impl ContractState for TestContractState {}

#[derive(Clone, Debug)]
pub struct NftState;

impl ContractState for NftState {}

pub struct TransferTestEventHandler;

#[async_trait::async_trait]
impl EventHandler for TransferTestEventHandler {
type State = TestContractState;
async fn handle_event(&self, _event: Event) -> Option<Vec<Self::State>> {
None
}
}

#[derive(Clone, Debug)]
pub struct NftOperatorState;

impl ContractState for NftOperatorState {}

pub struct ApprovalForAllTestEventHandler;

#[async_trait::async_trait]
impl EventHandler for TestEventHandler {
async fn handle_event(&self, _event: Event) {}
impl EventHandler for ApprovalForAllTestEventHandler {
type State = TestContractState;
async fn handle_event(&self, event: Event) -> Option<Vec<Self::State>> {
None
}
}
7 changes: 4 additions & 3 deletions chaindexing-tests/src/tests/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ mod tests {
use tokio::sync::Mutex;

use crate::factory::{
bayc_contract, empty_json_rpc, BAYC_CONTRACT_ADDRESS, BAYC_CONTRACT_START_BLOCK_NUMBER,
bayc_contract, empty_json_rpc, TestContractState, BAYC_CONTRACT_ADDRESS,
BAYC_CONTRACT_START_BLOCK_NUMBER,
};
use crate::{
json_rpc_with_empty_logs, json_rpc_with_filter_stubber, json_rpc_with_logs, test_runner,
};
use chaindexing::{Chaindexing, EventsIngester, PostgresRepo, Repo};
use chaindexing::{Chaindexing, Contract, EventsIngester, PostgresRepo, Repo};

#[tokio::test]
pub async fn creates_contract_events() {
Expand Down Expand Up @@ -117,7 +118,7 @@ mod tests {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |conn| async move {
let contracts = vec![];
let contracts: Vec<Contract<TestContractState>> = vec![];
let json_rpc = Arc::new(empty_json_rpc());
let blocks_per_batch = 10;
let conn = Arc::new(Mutex::new(conn));
Expand Down
14 changes: 7 additions & 7 deletions chaindexing/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
use crate::{ChaindexingRepo, Chains, Contract};
use crate::{ChaindexingRepo, Chains, Contract, ContractState};

#[derive(Clone)]
pub struct Config {
pub struct Config<State: ContractState> {
pub chains: Chains,
pub repo: ChaindexingRepo,
pub contracts: Vec<Contract>,
pub contracts: Vec<Contract<State>>,
pub blocks_per_batch: u64,
pub handler_interval_ms: u64,
pub ingestion_interval_ms: u64,
}

impl Config {
pub fn new(repo: ChaindexingRepo, chains: Chains, contracts: Vec<Contract>) -> Self {
impl<State: ContractState> Config<State> {
pub fn new(repo: ChaindexingRepo, chains: Chains) -> Self {
Self {
repo,
chains,
contracts,
contracts: vec![],
blocks_per_batch: 20,
handler_interval_ms: 10000,
ingestion_interval_ms: 10000,
}
}

pub fn add_contract(mut self, contract: Contract) -> Self {
pub fn add_contract(mut self, contract: Contract<State>) -> Self {
self.contracts.push(contract);

self
Expand Down
35 changes: 20 additions & 15 deletions chaindexing/src/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use ethers::{

pub type ContractEventTopic = H256;

use std::fmt::Debug;
pub trait ContractState: Debug + Sync + Send + Clone + 'static {}

#[derive(Debug, Clone)]
pub struct ContractEvent {
pub abi: String,
Expand All @@ -29,13 +32,13 @@ impl ContractEvent {
type EventAbi = &'static str;

#[derive(Clone)]
pub struct Contract {
pub struct Contract<State: ContractState> {
pub addresses: Vec<UnsavedContractAddress>,
pub name: String,
pub event_handlers: HashMap<EventAbi, Arc<dyn EventHandler>>,
pub event_handlers: HashMap<EventAbi, Arc<dyn EventHandler<State = State>>>,
}

impl Contract {
impl<State: ContractState> Contract<State> {
pub fn new(name: &str) -> Self {
Self {
addresses: vec![],
Expand Down Expand Up @@ -63,7 +66,7 @@ impl Contract {
pub fn add_event(
&self,
event_abi: EventAbi,
event_handler: impl EventHandler + 'static,
event_handler: impl EventHandler<State = State> + 'static,
) -> Self {
let mut event_handlers = self.event_handlers.clone();

Expand Down Expand Up @@ -97,9 +100,9 @@ impl Contract {
pub struct Contracts;

impl Contracts {
pub fn get_all_event_handlers_by_event_abi(
contracts: &Vec<Contract>,
) -> HashMap<EventAbi, Arc<dyn EventHandler>> {
pub fn get_all_event_handlers_by_event_abi<State: ContractState>(
contracts: &Vec<Contract<State>>,
) -> HashMap<EventAbi, Arc<dyn EventHandler<State = State>>> {
contracts.iter().fold(
HashMap::new(),
|mut event_handlers_by_event_abi, contract| {
Expand All @@ -115,8 +118,8 @@ impl Contracts {
)
}

pub fn group_event_topics_by_names(
contracts: &Vec<Contract>,
pub fn group_event_topics_by_names<State: ContractState>(
contracts: &Vec<Contract<State>>,
) -> HashMap<String, Vec<ContractEventTopic>> {
contracts
.iter()
Expand All @@ -127,8 +130,8 @@ impl Contracts {
})
}

pub fn group_events_by_topics(
contracts: &Vec<Contract>,
pub fn group_events_by_topics<State: ContractState>(
contracts: &Vec<Contract<State>>,
) -> HashMap<ContractEventTopic, ContractEvent> {
contracts
.iter()
Expand All @@ -137,7 +140,9 @@ impl Contracts {
.collect()
}

pub fn group_by_addresses<'a>(contracts: &'a Vec<Contract>) -> HashMap<Address, &'a Contract> {
pub fn group_by_addresses<'a, State: ContractState>(
contracts: &'a Vec<Contract<State>>,
) -> HashMap<Address, &'a Contract<State>> {
contracts
.iter()
.fold(HashMap::new(), |mut contracts_by_addresses, contract| {
Expand All @@ -162,7 +167,7 @@ pub struct UnsavedContractAddress {
chain_id: i32,
start_block_number: i64,
next_block_number_to_ingest_from: i64,
next_block_number_to_handle: i64,
next_block_number_to_handle_from: i64,
}

impl UnsavedContractAddress {
Expand All @@ -173,7 +178,7 @@ impl UnsavedContractAddress {
chain_id: *chain as i32,
start_block_number: start_block_number,
next_block_number_to_ingest_from: start_block_number,
next_block_number_to_handle: start_block_number,
next_block_number_to_handle_from: start_block_number,
}
}
}
Expand All @@ -194,7 +199,7 @@ pub struct ContractAddress {
pub id: i32,
chain_id: i32,
pub next_block_number_to_ingest_from: i64,
pub next_block_number_to_handle: i64,
pub next_block_number_to_handle_from: i64,
start_block_number: i64,
pub address: String,
pub contract_name: String,
Expand Down
2 changes: 1 addition & 1 deletion chaindexing/src/diesel/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ diesel::table! {
id -> Int4,
chain_id -> Int4,
next_block_number_to_ingest_from -> Int8,
next_block_number_to_handle -> Int8,
next_block_number_to_handle_from -> Int8,
start_block_number -> Int8,
address -> Text,
contract_name -> Text,
Expand Down
35 changes: 23 additions & 12 deletions chaindexing/src/event_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ use futures_util::{stream, StreamExt};
use tokio::{sync::Mutex, time::interval};

use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo};
use crate::{ChaindexingRepoConn, ContractAddress, Streamable};
use crate::{ChaindexingRepoConn, ContractAddress, ContractState, Streamable};

#[async_trait::async_trait]
pub trait EventHandler: Send + Sync {
async fn handle_event(&self, event: Event);
type State: ContractState;
async fn handle_event(&self, event: Event) -> Option<Vec<Self::State>>;
}

pub trait AllEventHandlers {}

pub struct EventHandlers;

impl EventHandlers {
pub fn start(config: &Config) {
pub fn start<State: ContractState>(config: &Config<State>) {
let config = config.clone();
tokio::spawn(async move {
let pool = config.repo.get_pool(1).await;
Expand All @@ -33,14 +36,17 @@ impl EventHandlers {
});
}

pub async fn handle_events<'a>(
pub async fn handle_events<'a, State: ContractState>(
conn: Arc<Mutex<ChaindexingRepoConn<'a>>>,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler>>,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler<State = State>>>,
) {
dbg!("About to fetch Contract Addresses");
let mut contract_addresses_stream =
ChaindexingRepo::get_contract_addresses_stream(conn.clone());

while let Some(contract_addresses) = contract_addresses_stream.next().await {
dbg!("Streaming Contract Addresses");

stream::iter(contract_addresses)
.for_each(|contract_address| {
let conn = conn.clone();
Expand All @@ -58,17 +64,22 @@ impl EventHandlers {
}
}

pub async fn handle_event_for_contract_address<'a>(
pub async fn handle_event_for_contract_address<'a, State: ContractState>(
conn: Arc<Mutex<ChaindexingRepoConn<'a>>>,
contract_address: &ContractAddress,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler>>,
event_handlers_by_event_abi: &HashMap<&str, Arc<dyn EventHandler<State = State>>>,
) {
let mut events_stream = ChaindexingRepo::get_events_stream(
conn.clone(),
contract_address.next_block_number_to_handle,
contract_address.next_block_number_to_handle_from,
);

while let Some(mut events) = events_stream.next().await {
while let Some(events) = events_stream.next().await {
// TODO: Move this filter to the stream query level
let mut events: Vec<Event> = events
.into_iter()
.filter(|event| event.match_contract_address(&contract_address.address))
.collect();
events.sort_by_key(|e| (e.block_number, e.log_index));

join_all(events.iter().map(|event| {
Expand All @@ -81,11 +92,11 @@ impl EventHandlers {
let mut conn = conn.lock().await;

if let Some(Event { block_number, .. }) = events.last() {
let next_block_number_to_handle = block_number + 1;
ChaindexingRepo::update_next_block_number_to_handle(
let next_block_number_to_handle_from = block_number + 1;
ChaindexingRepo::update_next_block_number_to_handle_from(
&mut conn,
contract_address.id(),
next_block_number_to_handle,
next_block_number_to_handle_from,
)
.await;
}
Expand Down
15 changes: 12 additions & 3 deletions chaindexing/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;

use crate::{contracts::Contracts, diesel::schema::chaindexing_events, ContractAddress};
use crate::{
contracts::Contracts, diesel::schema::chaindexing_events, ContractAddress, ContractState,
};
use diesel::{Insertable, Queryable};
use ethers::{
abi::{LogParam, Token},
Expand All @@ -16,7 +18,7 @@ use uuid::Uuid;
pub struct Event {
pub id: Uuid,
pub contract_address: String,
contract_name: String,
pub contract_name: String,
pub abi: String,
log_params: serde_json::Value,
parameters: serde_json::Value,
Expand Down Expand Up @@ -53,6 +55,10 @@ impl Event {
}
}

pub fn match_contract_address(&self, contract_address: &String) -> bool {
self.contract_address.to_lowercase() == *contract_address.to_lowercase()
}

fn log_params_to_parameters(log_params: &Vec<LogParam>) -> HashMap<String, Token> {
log_params
.iter()
Expand All @@ -67,7 +73,10 @@ impl Event {
pub struct Events;

impl Events {
pub fn new(logs: &Vec<Log>, contracts: &Vec<Contract>) -> Vec<Event> {
pub fn new<State: ContractState>(
logs: &Vec<Log>,
contracts: &Vec<Contract<State>>,
) -> Vec<Event> {
let events_by_topics = Contracts::group_events_by_topics(contracts);
let contracts_by_addresses = Contracts::group_by_addresses(contracts);

Expand Down
Loading

0 comments on commit bfa1352

Please sign in to comment.