Skip to content

Commit

Permalink
Merge pull request #669 from blockscout/lymarenkolev/bens/fix-blockscout
Browse files Browse the repository at this point in the history
Fix blockscout client and add tests
  • Loading branch information
sevenzing authored Nov 8, 2023
2 parents 2e02f53 + 1655da8 commit a7c664e
Show file tree
Hide file tree
Showing 21 changed files with 578 additions and 118 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/bens.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ jobs:
DATABASE_URL: postgres://graph-node:admin@localhost:5432/
SQLX_OFFLINE: true

# - name: Integration tests
# run: cargo test --locked --workspace --test '*' -- --nocapture --include-ignored
# if: success() || failure()
# env:
# RUST_BACKTRACE: 1
# RUST_LOG: info
# DATABASE_URL: postgres://graph-node:admin@localhost:5432/
# SQLX_OFFLINE: true
- name: Integration tests
run: cargo test --locked --workspace --test '*' -- --nocapture --include-ignored
if: success() || failure()
env:
RUST_BACKTRACE: 1
RUST_LOG: info
DATABASE_URL: postgres://graph-node:admin@localhost:5432/
SQLX_OFFLINE: true

lint:
name: Linting
Expand Down
84 changes: 80 additions & 4 deletions blockscout-ens/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions blockscout-ens/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "2"

members = [
"bens-logic",
Expand Down
7 changes: 6 additions & 1 deletion blockscout-ens/bens-logic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ reqwest-retry = "0.3"
lazy_static = "1.4"
tracing = "0.1"
futures = "0.3"
cached = { version = "0.46.1", features = ["proc_macro", "tokio", "async", "async_tokio_rt_multi_thread"] }
wiremock = {version = "0.5", optional = true }

[dependencies.sqlx]
version = "0.7"
Expand All @@ -36,8 +38,11 @@ features = [

[dev-dependencies]
pretty_assertions = "1.4.0"
wiremock = "0.5"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}

[[example]]
name = "resolve_benchmark"

[features]
default = []
test-utils = ["dep:wiremock"]
4 changes: 2 additions & 2 deletions blockscout-ens/bens-logic/examples/resolve_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ async fn main() -> Result<(), anyhow::Error> {
.connect(&url)
.await?,
);
let eth_client = BlockscoutClient::new("https://eth.blockscout.com".parse().unwrap());
let eth_client = BlockscoutClient::new("https://eth.blockscout.com".parse().unwrap(), 5, 30);
let rootstock_client =
BlockscoutClient::new("https://rootstock.blockscout.com".parse().unwrap());
BlockscoutClient::new("https://rootstock.blockscout.com".parse().unwrap(), 5, 30);
let clients: HashMap<i64, BlockscoutClient> =
HashMap::from_iter([(1, eth_client), (30, rootstock_client)]);
let reader = SubgraphReader::initialize(pool.clone(), clients).await?;
Expand Down
2 changes: 2 additions & 0 deletions blockscout-ens/bens-logic/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod entity;
pub mod hash_name;
pub mod subgraphs_reader;
#[cfg(feature = "test-utils")]
pub mod test_utils;
91 changes: 61 additions & 30 deletions blockscout-ens/bens-logic/src/subgraphs_reader/blockscout.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,49 @@
use cached::proc_macro::cached;
use ethers::types::TxHash;
use futures::StreamExt;
use reqwest::StatusCode;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use serde::{de::DeserializeOwned, Deserialize};
use std::sync::Arc;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tracing::instrument;

const MAX_REQUESTS_BATCH: usize = 5;

#[derive(Debug, Clone)]
pub struct BlockscoutClient {
url: url::Url,
inner: ClientWithMiddleware,
max_concurrent_requests: usize,
}

impl BlockscoutClient {
pub fn new(url: url::Url) -> Self {
pub fn new(url: url::Url, max_concurrent_requests: usize, timeout_seconds: u64) -> Self {
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
let client = ClientBuilder::new(reqwest::Client::new())
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
Self { url, inner: client }
let client = ClientBuilder::new(
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(timeout_seconds))
.build()
.expect("valid client"),
)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
Self {
url,
inner: client,
max_concurrent_requests,
}
}

pub fn url(&self) -> &url::Url {
&self.url
}
}

#[derive(Debug, Deserialize)]
use reqwest::StatusCode;
use serde::{de::DeserializeOwned, Deserialize};
#[derive(Debug, Clone, Deserialize)]
pub struct TransactionFrom {
pub hash: ethers::types::Address,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct Transaction {
pub timestamp: String,
pub method: Option<String>,
Expand All @@ -43,12 +52,12 @@ pub struct Transaction {
pub block: i64,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct Message {
pub message: String,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Response<T> {
Ok(T),
NotFound(String),
Expand All @@ -71,38 +80,60 @@ where
}
}

#[cached(
key = "String",
convert = r#"{
let url = client.url();
format!("{url}/tx/{transaction_hash:#}")
}"#,
result = true,
time = 86_400, // 24 * 60 * 60 seconds
size = 50_000,
sync_writes = true,
)]
pub async fn cached_transaction(
client: &BlockscoutClient,
transaction_hash: &ethers::types::TxHash,
) -> reqwest_middleware::Result<Response<Transaction>> {
let response = client
.inner
.get(
client
.url
.join(&format!("/api/v2/transactions/{transaction_hash:#x}"))
.unwrap(),
)
.send()
.await?;
Response::try_from_reqwest_response(response).await
}

impl BlockscoutClient {
#[instrument(name = "blockscout_api:transaction", skip_all, err, level = "debug")]
#[instrument(name = "blockscout_api:transaction", skip(self), err, level = "debug")]
pub async fn transaction(
&self,
transaction_hash: &ethers::types::TxHash,
) -> reqwest_middleware::Result<Response<Transaction>> {
let response = self
.inner
.get(
self.url
.join(&format!("/api/v2/transactions/{transaction_hash:#x}"))
.unwrap(),
)
.send()
.await?;
Response::try_from_reqwest_response(response).await
cached_transaction(self, transaction_hash).await
}

pub async fn transactions_batch(
self: Arc<Self>,
transaction_hashes: Vec<&ethers::types::TxHash>,
) -> reqwest_middleware::Result<Vec<(TxHash, Response<Transaction>)>> {
let fetches = futures::stream::iter(transaction_hashes.into_iter().cloned().map(|hash| {
transaction_hashes: impl IntoIterator<Item = ethers::types::TxHash>,
) -> reqwest_middleware::Result<HashMap<TxHash, Response<Transaction>>> {
let fetches = futures::stream::iter(transaction_hashes.into_iter().map(|hash| {
let client = self.clone();
async move {
let result = client.transaction(&hash).await;
result.map(|r| (TxHash::clone(&hash), r))
}
}))
.buffer_unordered(MAX_REQUESTS_BATCH)
.buffer_unordered(self.max_concurrent_requests)
.collect::<Vec<_>>();
let result = fetches.await.into_iter().collect::<Result<Vec<_>, _>>()?;
let result = fetches
.await
.into_iter()
.collect::<Result<HashMap<_, _>, _>>()?;
Ok(result)
}
}
2 changes: 0 additions & 2 deletions blockscout-ens/bens-logic/src/subgraphs_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ pub mod blockscout;
mod reader;
mod schema_selector;
mod sql;
#[cfg(test)]
mod test_helpers;
mod types;

pub use reader::{SubgraphReadError, SubgraphReader};
Expand Down
Loading

0 comments on commit a7c664e

Please sign in to comment.