Skip to content

Commit

Permalink
feat(indexer): optimized key registry logs
Browse files Browse the repository at this point in the history
  • Loading branch information
gregfromstl committed Feb 23, 2024
1 parent 66b1aa8 commit ff0cd13
Show file tree
Hide file tree
Showing 3 changed files with 393 additions and 107 deletions.
193 changes: 129 additions & 64 deletions lib/eth/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::id_registry;
use crate::id_registry::{
CHANGE_RECOVERY_ADDRESS_SIGNATURE, RECOVERY_SIGNATURE, REGISTER_SIGNATURE, TRANSFER_SIGNATURE,
};
use crate::key_registry;
use crate::key_registry::{
self, ADD_SIGNER_SIGNATURE, ADMIN_RESET_SIGNATURE, MIGRATED_SIGNATURE, REMOVE_SIGNER_SIGNATURE,
};
use crate::storage_registry;
use crate::utils::{get_block_timestamp, get_signature_topic};
use ethers::{
Expand Down Expand Up @@ -109,12 +111,12 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
let change_recovery_address_future = self
.id_registry
.get_change_recovery_address_logs(start_block, end_block);
// let add_future = self.key_registry.get_add_logs(start_block, end_block);
// let remove_future = self.key_registry.get_remove_logs(start_block, end_block);
// let admin_reset_future = self
// .key_registry
// .get_admin_reset_logs(start_block, end_block);
// let migrated_future = self.key_registry.get_migrated_logs(start_block, end_block);
let add_future = self.key_registry.get_add_logs(start_block, end_block);
let remove_future = self.key_registry.get_remove_logs(start_block, end_block);
let admin_reset_future = self
.key_registry
.get_admin_reset_logs(start_block, end_block);
let migrated_future = self.key_registry.get_migrated_logs(start_block, end_block);
// let rent_future = self.storage_registry.get_rent_logs(start_block, end_block);
// let set_max_units_future = self
// .storage_registry
Expand All @@ -128,10 +130,10 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
transfer_logs,
recovery_logs,
change_recovery_address_logs,
// add_logs,
// remove_logs,
// admin_reset_logs,
// migrated_logs,
add_logs,
remove_logs,
admin_reset_logs,
migrated_logs,
// rent_logs,
// set_max_units_logs,
// deprecation_timestamp_logs,
Expand All @@ -140,10 +142,10 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
transfer_future,
recovery_future,
change_recovery_address_future,
// add_future,
// remove_future,
// admin_reset_future,
// migrated_future,
add_future,
remove_future,
admin_reset_future,
migrated_future,
// rent_future,
// set_max_units_future,
// deprecation_timestamp_future
Expand All @@ -155,10 +157,10 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
collected_logs.extend(transfer_logs);
collected_logs.extend(recovery_logs);
collected_logs.extend(change_recovery_address_logs);
// collected_logs.extend(add_logs);
// collected_logs.extend(remove_logs);
// collected_logs.extend(admin_reset_logs);
// collected_logs.extend(migrated_logs);
collected_logs.extend(add_logs);
collected_logs.extend(remove_logs);
collected_logs.extend(admin_reset_logs);
collected_logs.extend(migrated_logs);
// collected_logs.extend(rent_logs);
// collected_logs.extend(set_max_units_logs);
// collected_logs.extend(deprecation_timestamp_logs);
Expand Down Expand Up @@ -323,59 +325,128 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
Ok(())
}

async fn sync_add_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let add_logs = self.key_registry.get_add_logs(start, end).await?;
for log in add_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;
async fn sync_add_logs(
&mut self,
logs: &Vec<Log>,
timestamps_map: &HashMap<H256, u32>,
) -> Result<(), Box<dyn Error>> {
let add_logs: Vec<&Log> = logs
.iter()
.filter(|log| {
log.topics
.contains(&get_signature_topic(ADD_SIGNER_SIGNATURE))
})
.collect();

self.key_registry
.persist_add_log(&self.store, &log, self.chain_id, timestamp as i64)
.await?
let mut timestamps = Vec::new();
for log in &add_logs {
if let Some(block_hash) = log.block_hash {
if let Some(timestamp) = timestamps_map.get(&block_hash) {
timestamps.push(*timestamp);
}
}
}

self.key_registry
.persist_many_add_logs(&self.store, add_logs, self.chain_id, &timestamps)
.await
.unwrap();

Ok(())
}

async fn sync_remove_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let remove_logs = self.key_registry.get_remove_logs(start, end).await?;
for log in remove_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;
async fn sync_remove_logs(
&mut self,
logs: &Vec<Log>,
timestamps_map: &HashMap<H256, u32>,
) -> Result<(), Box<dyn Error>> {
let remove_logs: Vec<&Log> = logs
.iter()
.filter(|log| {
log.topics
.contains(&get_signature_topic(REMOVE_SIGNER_SIGNATURE))
})
.collect();

self.key_registry
.persist_remove_log(&self.store, &log, self.chain_id, timestamp as i64)
.await?
let mut timestamps = Vec::new();
for log in &remove_logs {
if let Some(block_hash) = log.block_hash {
if let Some(timestamp) = timestamps_map.get(&block_hash) {
timestamps.push(*timestamp);
}
}
}

self.key_registry
.persist_many_remove_logs(&self.store, remove_logs, self.chain_id, &timestamps)
.await
.unwrap();

Ok(())
}

async fn sync_admin_reset_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let admin_reset_logs = self.key_registry.get_admin_reset_logs(start, end).await?;
for log in admin_reset_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;
async fn sync_admin_reset_logs(
&mut self,
logs: &Vec<Log>,
timestamps_map: &HashMap<H256, u32>,
) -> Result<(), Box<dyn Error>> {
let admin_reset_logs: Vec<&Log> = logs
.iter()
.filter(|log| {
log.topics
.contains(&get_signature_topic(ADMIN_RESET_SIGNATURE))
})
.collect();

self.key_registry
.persist_admin_reset_log(&self.store, &log, self.chain_id, timestamp as i64)
.await?
let mut timestamps = Vec::new();
for log in &admin_reset_logs {
if let Some(block_hash) = log.block_hash {
if let Some(timestamp) = timestamps_map.get(&block_hash) {
timestamps.push(*timestamp);
}
}
}

self.key_registry
.persist_many_admin_reset_logs(
&self.store,
admin_reset_logs,
self.chain_id,
&timestamps,
)
.await
.unwrap();

Ok(())
}

async fn sync_migrated_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let migrated_logs = self.key_registry.get_migrated_logs(start, end).await?;
for log in migrated_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;
async fn sync_migrated_logs(
&mut self,
logs: &Vec<Log>,
timestamps_map: &HashMap<H256, u32>,
) -> Result<(), Box<dyn Error>> {
let migrated_logs: Vec<&Log> = logs
.iter()
.filter(|log| {
log.topics
.contains(&get_signature_topic(MIGRATED_SIGNATURE))
})
.collect();

self.key_registry
.persist_migrated_log(&self.store, &log, self.chain_id, timestamp as i64)
.await?
let mut timestamps = Vec::new();
for log in &migrated_logs {
if let Some(block_hash) = log.block_hash {
if let Some(timestamp) = timestamps_map.get(&block_hash) {
timestamps.push(*timestamp);
}
}
}

self.key_registry
.persist_many_migrated_logs(&self.store, migrated_logs, self.chain_id, &timestamps)
.await
.unwrap();

Ok(())
}

Expand Down Expand Up @@ -497,19 +568,13 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
self.sync_change_recovery_address_logs(&collected_logs, &timestamps)
.await?;

// tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
// log::info!("synced recovery logs");
// log::info!("synced change recovery address logs");

// // key registry logs
// self.sync_add_logs(start, end).await?;
// log::info!("synced add logs");
// self.sync_remove_logs(start, end).await?;
// log::info!("synced remove logs");
// self.sync_admin_reset_logs(start, end).await?;
// log::info!("synced admin reset logs");
// self.sync_migrated_logs(start, end).await?;
// log::info!("synced migrated logs");
// key registry logs
self.sync_add_logs(&collected_logs, &timestamps).await?;
self.sync_remove_logs(&collected_logs, &timestamps).await?;
self.sync_admin_reset_logs(&collected_logs, &timestamps)
.await?;
self.sync_migrated_logs(&collected_logs, &timestamps)
.await?;

// // storage registry logs
// self.sync_rent_logs(start, end).await?;
Expand Down
Loading

0 comments on commit ff0cd13

Please sign in to comment.