Skip to content

Commit

Permalink
refactor(indexer): use transactions to insert to the database
Browse files Browse the repository at this point in the history
  • Loading branch information
gregfromstl committed Feb 26, 2024
1 parent bcb0fc0 commit e9c2c80
Show file tree
Hide file tree
Showing 4 changed files with 375 additions and 22 deletions.
69 changes: 61 additions & 8 deletions lib/eth/src/id_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ethers::{
providers::{JsonRpcClient, Provider},
types::{Address, Filter, Log, U256},
};
use sqlx::Acquire;
use std::error::Error;
use std::sync::Arc;
use teleport_common::protobufs::generated::{
Expand Down Expand Up @@ -166,8 +167,21 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
event_rows.push(event_row);
}

db::ChainEventRow::bulk_insert(store, &event_rows).await?;
db::FidRow::bulk_insert(store, &fid_rows).await?;
let mut connection = store.conn.acquire().await?;
let mut transaction = connection.begin().await?;

let queries = db::ChainEventRow::generate_bulk_insert_queries(&event_rows)?;
for query in queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

let queries = db::FidRow::generate_bulk_insert_queries(&fid_rows)?;
for query in queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}
transaction.commit().await?;

Ok(())
}
Expand Down Expand Up @@ -257,8 +271,20 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
event_rows.push(event_row);
}

db::ChainEventRow::bulk_insert(store, &event_rows).await?;
db::FidRow::bulk_transfer(store, &fid_transfers).await?;
let mut conn = store.conn.acquire().await?;
let mut transaction = conn.begin().await?;

let insert_queries = db::ChainEventRow::generate_bulk_insert_queries(&event_rows)?;
for query in insert_queries {
sqlx::query(&query).execute(&mut *transaction).await?;
}

let transfer_queries = db::FidRow::generate_bulk_transfer_queries(&fid_transfers)?;
for query in transfer_queries {
sqlx::query(&query).execute(&mut *transaction).await?;
}

transaction.commit().await?;

Ok(())
}
Expand Down Expand Up @@ -351,8 +377,22 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
event_rows.push(event_row);
}

db::ChainEventRow::bulk_insert(&store, &event_rows).await?;
db::FidRow::bulk_transfer(&store, &recoveries).await?;
let mut connection = store.conn.acquire().await?;
let mut transaction = connection.begin().await?;

let insert_queries = db::ChainEventRow::generate_bulk_insert_queries(&event_rows)?;
for query in insert_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

let transfer_queries = db::FidRow::generate_bulk_transfer_queries(&recoveries)?;
for query in transfer_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

transaction.commit().await?;

Ok(())
}
Expand Down Expand Up @@ -447,8 +487,21 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
event_rows.push(event_row);
}

db::FidRow::bulk_update_recovery_address(&store, &recovery_address_updates).await?;
db::ChainEventRow::bulk_insert(&store, &event_rows).await?;
let mut conn = store.conn.acquire().await?;
let mut transaction = conn.begin().await?;

let insert_queries = db::ChainEventRow::generate_bulk_insert_queries(&event_rows)?;
for query in insert_queries {
sqlx::query(&query).execute(&mut *transaction).await?;
}

let update_queries =
db::FidRow::generate_bulk_update_recovery_address_queries(&recovery_address_updates)?;
for query in update_queries {
sqlx::query(&query).execute(&mut *transaction).await?;
}

transaction.commit().await?;

Ok(())
}
Expand Down
64 changes: 55 additions & 9 deletions lib/eth/src/key_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use crate::utils::{get_logs, get_signature_topic, read_abi};
use alloy_dyn_abi::DynSolType;
use ethers::{
contract::{parse_log, Contract as EthContract, ContractInstance, EthEvent},
core::utils::keccak256,
providers::{JsonRpcClient, Middleware, Provider},
providers::{JsonRpcClient, Provider},
types::{Address, Bytes, Filter, Log, H256, U256},
};
use log;
use serde::{Deserialize, Serialize};
use serde_json::{self};
use sqlx::Acquire;
use std::error::Error;
use std::sync::Arc;
use teleport_common::protobufs::generated::{
Expand Down Expand Up @@ -224,8 +224,22 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
event_rows.push(event_row);
}

db::ChainEventRow::bulk_insert(store, &event_rows).await?;
db::SignerRow::bulk_insert(store, &signer_rows).await?;
let mut connection = store.conn.acquire().await?;
let mut transaction = connection.begin().await?;

let event_queries = db::ChainEventRow::generate_bulk_insert_queries(&event_rows)?;
for query in event_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

let signer_queries = db::SignerRow::generate_bulk_insert_queries(&signer_rows)?;
for query in signer_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

transaction.commit().await?;

Ok(())
}
Expand Down Expand Up @@ -309,7 +323,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
.await?;

event_row.insert(store).await?;
db::SignerRow::update_remove_chain_event(&store, key_bytes.to_vec(), event_row.id).await?;
db::SignerRow::update_remove_event(&store, key_bytes.to_vec(), event_row.id).await?;

Ok(())
}
Expand All @@ -332,8 +346,22 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
event_rows.push(event_row);
}

db::ChainEventRow::bulk_insert(store, &event_rows).await?;
db::SignerRow::bulk_update_remove_chain_event(store, &updates).await?;
let mut connection = store.conn.acquire().await?;
let mut transaction = connection.begin().await?;

let insert_queries = db::ChainEventRow::generate_bulk_insert_queries(&event_rows)?;
for query in insert_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

let update_queries = db::SignerRow::generate_bulk_remove_update_queries(&updates)?;
for query in update_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

transaction.commit().await?;

Ok(())
}
Expand Down Expand Up @@ -443,10 +471,19 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
event_rows.push(event_row);
}

db::ChainEventRow::bulk_insert(store, &event_rows).await?;
let mut connection = store.conn.acquire().await?;
let mut transaction = connection.begin().await?;

let event_queries = db::ChainEventRow::generate_bulk_insert_queries(&event_rows)?;
for query in event_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

// TODO: invalidate keyBytes and messages signed by these keyBytes

transaction.commit().await?;

Ok(())
}

Expand Down Expand Up @@ -537,7 +574,14 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
event_rows.push(event_row);
}

db::ChainEventRow::bulk_insert(store, &event_rows).await?;
let mut connection = store.conn.acquire().await?;
let mut transaction = connection.begin().await?;

let event_queries = db::ChainEventRow::generate_bulk_insert_queries(&event_rows)?;
for query in event_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

/*
TODO
Expand All @@ -546,6 +590,8 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
3. Drop any messages created by off-chain Farcaster Signers whose pub key was not emitted as an Add event.
*/

transaction.commit().await?;

Ok(())
}
}
20 changes: 18 additions & 2 deletions lib/eth/src/storage_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ethers::{
providers::{JsonRpcClient, Provider},
types::{Address, Filter, Log, U256},
};
use sqlx::Acquire;
use std::error::Error;
use std::sync::Arc;
use teleport_common::protobufs::generated::{
Expand Down Expand Up @@ -148,8 +149,23 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
event_rows.push(event_row);
}

db::ChainEventRow::bulk_insert(store, &event_rows).await?;
db::StorageAllocationRow::bulk_insert(store, &storage_allocations).await?;
let mut connection = store.conn.acquire().await?;
let mut transaction = connection.begin().await?;

let event_queries = db::ChainEventRow::generate_bulk_insert_queries(&event_rows)?;
for query in event_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

let allocation_queries =
db::StorageAllocationRow::generate_bulk_insert_queries(&storage_allocations)?;
for query in allocation_queries {
let query = sqlx::query(&query);
query.execute(&mut *transaction).await?;
}

transaction.commit().await?;

Ok(())
}
Expand Down
Loading

0 comments on commit e9c2c80

Please sign in to comment.