Skip to content

Commit

Permalink
Merge pull request #680 from blockscout/lymarenkolev/bens/address-nam…
Browse files Browse the repository at this point in the history
…es-view

Add materialized view for address name resolving
  • Loading branch information
sevenzing authored Nov 29, 2023
2 parents b015e4b + 67b7800 commit 03e837c
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 21 deletions.
39 changes: 39 additions & 0 deletions blockscout-ens/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions blockscout-ens/bens-logic/examples/resolve_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() -> Result<(), anyhow::Error> {
BlockscoutClient::new("https://rootstock.blockscout.com".parse().unwrap(), 5, 30);
let clients: HashMap<i64, BlockscoutClient> =
HashMap::from_iter([(1, eth_client), (30, rootstock_client)]);
let reader = SubgraphReader::initialize(pool.clone(), clients).await?;
let reader = SubgraphReader::initialize(pool.clone(), clients, true).await?;

let addresses = vec![
"0x0292f204513eeafe8c032ffc4cb4c7e10eca908c",
Expand Down Expand Up @@ -141,7 +141,7 @@ async fn main() -> Result<(), anyhow::Error> {
})
.await
.expect("failed to quick resolve");
// job size is 94. elapsed 1.1955539s. resolved as 13 domains
// job size is 94. elapsed 0.65092486s. resolved as 14 domains
println!(
"job size is {}. elapsed {:?}s. resolved as {} domains",
size,
Expand Down
53 changes: 43 additions & 10 deletions blockscout-ens/bens-logic/src/subgraphs_reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
},
hash_name::hex,
};
use anyhow::Context;
use ethers::types::TxHash;
use sqlx::postgres::PgPool;
use std::{
Expand All @@ -30,12 +31,14 @@ pub struct NetworkConfig {
pub struct SubgraphReader {
pool: Arc<PgPool>,
networks: HashMap<i64, NetworkConfig>,
use_cache: bool,
}

impl SubgraphReader {
pub async fn initialize(
pool: Arc<PgPool>,
mut blockscout_clients: HashMap<i64, BlockscoutClient>,
use_cache: bool,
) -> Result<Self, anyhow::Error> {
let schema_names = schema_names(&pool).await?;
tracing::info!(schema_names =? schema_names, "found subgraph schemas");
Expand All @@ -60,12 +63,37 @@ impl SubgraphReader {
for (id, client) in blockscout_clients {
tracing::warn!("no chain found for blockscout url with chain_id {id} and url {}, skip this network", client.url())
}

if use_cache {
for config in networks.values() {
let schema = &config.schema_name;
sql::create_address_names_view(pool.as_ref(), schema)
.await
.context(format!(
"failed to create address_names view for schema {schema}"
))?
}
}
tracing::info!(networks =? networks.keys().collect::<Vec<_>>(), "initialized subgraph reader");
Ok(Self::new(pool, networks))
Ok(Self::new(pool, networks, use_cache))
}

pub fn new(pool: Arc<PgPool>, networks: HashMap<i64, NetworkConfig>, use_cache: bool) -> Self {
Self {
pool,
networks,
use_cache,
}
}

pub fn new(pool: Arc<PgPool>, networks: HashMap<i64, NetworkConfig>) -> Self {
Self { pool, networks }
pub async fn refresh_cache(&self) -> Result<(), anyhow::Error> {
for config in self.networks.values() {
let schema = &config.schema_name;
sql::refresh_address_names_view(self.pool.as_ref(), schema)
.await
.context(format!("failed to update {schema}_address_names"))?;
}
Ok(())
}
}

Expand Down Expand Up @@ -139,8 +167,13 @@ impl SubgraphReader {
// remove duplicates
let addresses: HashSet<String> = input.addresses.into_iter().map(hex).collect();
let addreses_str: Vec<&str> = addresses.iter().map(String::as_str).collect::<Vec<_>>();
let result =
sql::batch_search_addresses(&self.pool, &network.schema_name, &addreses_str).await?;
let result = if self.use_cache {
sql::batch_search_addresses_cached(&self.pool, &network.schema_name, &addreses_str)
.await?
} else {
sql::batch_search_addresses(&self.pool, &network.schema_name, &addreses_str).await?
};

let address_to_name = result
.into_iter()
.map(|d| (d.resolved_address, d.domain_name))
Expand Down Expand Up @@ -203,7 +236,7 @@ mod tests {
async fn get_domain_works(pool: PgPool) {
let pool = Arc::new(pool);
let clients = mocked_blockscout_clients().await;
let reader = SubgraphReader::initialize(pool.clone(), clients)
let reader = SubgraphReader::initialize(pool.clone(), clients, true)
.await
.expect("failed to init reader");

Expand Down Expand Up @@ -269,7 +302,7 @@ mod tests {
async fn lookup_domain_name_works(pool: PgPool) {
let pool = Arc::new(pool);
let clients = mocked_blockscout_clients().await;
let reader = SubgraphReader::initialize(pool.clone(), clients)
let reader = SubgraphReader::initialize(pool.clone(), clients, true)
.await
.expect("failed to init reader");

Expand All @@ -293,7 +326,7 @@ mod tests {
async fn lookup_addresses_works(pool: PgPool) {
let pool = Arc::new(pool);
let clients = mocked_blockscout_clients().await;
let reader = SubgraphReader::initialize(pool.clone(), clients)
let reader = SubgraphReader::initialize(pool.clone(), clients, true)
.await
.expect("failed to init reader");

Expand Down Expand Up @@ -373,7 +406,7 @@ mod tests {
async fn get_domain_history_works(pool: PgPool) {
let pool = Arc::new(pool);
let clients = mocked_blockscout_clients().await;
let reader = SubgraphReader::initialize(pool.clone(), clients)
let reader = SubgraphReader::initialize(pool.clone(), clients, true)
.await
.expect("failed to init reader");
let name = "vitalik.eth".to_string();
Expand Down Expand Up @@ -456,7 +489,7 @@ mod tests {
async fn batch_search_works(pool: PgPool) {
let pool = Arc::new(pool);
let clients = mocked_blockscout_clients().await;
let reader = SubgraphReader::initialize(pool.clone(), clients)
let reader = SubgraphReader::initialize(pool.clone(), clients, true)
.await
.expect("failed to init reader");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use crate::subgraphs_reader::SubgraphReadError;
use sqlx::{postgres::PgPool, Executor};
use tracing::instrument;

use super::{DOMAIN_DEFAULT_WHERE_CLAUSE, DOMAIN_NOT_EXPIRED_WHERE_CLAUSE};

#[instrument(
name = "create_address_names_view",
skip(pool),
err(level = "error"),
level = "info"
)]
pub async fn create_address_names_view(
pool: &PgPool,
schema: &str,
) -> Result<(), SubgraphReadError> {
let mut tx = pool.begin().await?;

tx.execute(sqlx::query(&format!(
r#"
CREATE MATERIALIZED VIEW IF NOT EXISTS {schema}.address_names AS
SELECT DISTINCT ON (resolved_address)
id,
name AS domain_name,
resolved_address
from {schema}.domain
where
resolved_address IS NOT NULL
AND name NOT LIKE '%[%'
AND {DOMAIN_DEFAULT_WHERE_CLAUSE}
AND {DOMAIN_NOT_EXPIRED_WHERE_CLAUSE}
ORDER BY resolved_address, created_at
"#,
)))
.await?;

tx.execute(sqlx::query(&format!(
r#"
CREATE UNIQUE INDEX IF NOT EXISTS
address_names_unique_resolved_address
ON {schema}.address_names (resolved_address);
"#
)))
.await?;

let function_name = refresh_function_name(schema);
tx.execute(sqlx::query(&format!(
r#"
CREATE OR REPLACE FUNCTION {function_name}
RETURNS void AS
$$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY {schema}.address_names;
END;
$$
LANGUAGE plpgsql;
"#
)))
.await?;

tx.commit().await?;

Ok(())
}

#[instrument(
name = "refresh_address_names_view",
skip(pool),
err(level = "error"),
level = "info"
)]
pub async fn refresh_address_names_view(
pool: &PgPool,
schema: &str,
) -> Result<(), SubgraphReadError> {
let function_name = refresh_function_name(schema);
sqlx::query(&format!("SELECT {function_name};"))
.execute(pool)
.await?;
Ok(())
}

fn refresh_function_name(schema: &str) -> String {
format!("{schema}_refresh_address_names()")
}
31 changes: 29 additions & 2 deletions blockscout-ens/bens-logic/src/subgraphs_reader/sql/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ COALESCE(to_timestamp(expiry_date) < now(), false) AS is_expired
// `block_range @>` is special sql syntax for fast filtering int4range
// to access current version of domain.
// Source: https://github.com/graphprotocol/graph-node/blob/19fd41bb48511f889dc94f5d82e16cd492f29da1/store/postgres/src/block_range.rs#L26
const DOMAIN_DEFAULT_WHERE_CLAUSE: &str = r#"
pub const DOMAIN_DEFAULT_WHERE_CLAUSE: &str = r#"
label_name IS NOT NULL
AND block_range @> 2147483647
"#;

const DOMAIN_NOT_EXPIRED_WHERE_CLAUSE: &str = r#"
pub const DOMAIN_NOT_EXPIRED_WHERE_CLAUSE: &str = r#"
(
expiry_date is null
OR to_timestamp(expiry_date) > now()
Expand Down Expand Up @@ -211,3 +211,30 @@ pub async fn batch_search_addresses(

Ok(domains)
}

#[instrument(
name = "batch_search_addresses_cached",
skip(pool, addresses),
fields(job_size = addresses.len()),
err(level = "error"),
level = "info",
)]
pub async fn batch_search_addresses_cached(
pool: &PgPool,
schema: &str,
addresses: &[&str],
) -> Result<Vec<DomainWithAddress>, SubgraphReadError> {
let domains: Vec<DomainWithAddress> = sqlx::query_as(&format!(
r#"
SELECT id, domain_name, resolved_address
FROM {schema}.address_names
where
resolved_address = ANY($1)
"#,
))
.bind(addresses)
.fetch_all(pool)
.await?;

Ok(domains)
}
2 changes: 2 additions & 0 deletions blockscout-ens/bens-logic/src/subgraphs_reader/sql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod address_names;
mod domain;
mod transaction_history;

pub use address_names::*;
pub use domain::*;
pub use transaction_history::*;
1 change: 1 addition & 0 deletions blockscout-ens/bens-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ url = { version = "2", features = ["serde"] }
hex = "0.4"
thiserror = "1"
chrono = "0.4"
tokio-cron-scheduler = "0.9.4"

[dependencies.sqlx]
version = "0.7"
Expand Down
Loading

0 comments on commit 03e837c

Please sign in to comment.