Skip to content

Commit

Permalink
fix: stop overwhelming indexers on status updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Aug 11, 2023
1 parent 006fb43 commit 3a2c785
Showing 1 changed file with 24 additions and 24 deletions.
48 changes: 24 additions & 24 deletions graph-gateway/src/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use prelude::{epoch_cache::EpochCache, graphql, url::url::Host, *};

use crate::geoip::GeoIP;
use crate::subgraph_client::graphql_query;
use crate::topology::{Deployment, Indexer};
use crate::topology::Deployment;

pub struct IndexingStatus {
pub chain: String,
Expand Down Expand Up @@ -70,15 +70,17 @@ async fn update_statuses(
client: reqwest::Client,
deployments: &HashMap<DeploymentId, Arc<Deployment>>,
) {
let indexers: Vec<&Indexer> = deployments
// There can only be one URL per indexer entity in the network subgraph
let indexers: HashMap<Address, Url> = deployments
.values()
.flat_map(|deployment| &deployment.indexers)
.map(|indexer| (indexer.id, indexer.url.clone()))
.collect();

let statuses = join_all(indexers.iter().map(move |indexer| {
let statuses = join_all(indexers.into_iter().map(move |(indexer, url)| {
let client = client.clone();
async move {
match update_indexer(actor, client, indexer).await {
match update_indexer(actor, client, indexer, url).await {
Ok(indexings) => indexings,
Err(indexer_status_err) => {
tracing::warn!(indexer_status_err);
Expand All @@ -101,10 +103,10 @@ async fn update_statuses(
async fn update_indexer(
actor: &'static Mutex<Actor>,
client: reqwest::Client,
indexer: &Indexer,
indexer: Address,
url: Url,
) -> Result<Vec<(Indexing, IndexingStatus)>, String> {
let version_url = indexer
.url
let version_url = url
.join("version")
.map_err(|err| format!("IndexerVersionError({err})"))?;
let version = client
Expand All @@ -123,30 +125,29 @@ async fn update_indexer(
if version < locked_actor.min_version {
return Err(format!("IndexerVersionBelowMinimum({version})"));
}
apply_geoblocking(&mut locked_actor, indexer).await?;
apply_geoblocking(&mut locked_actor, &url).await?;
drop(locked_actor);

query_status(actor, &client, indexer)
query_status(actor, &client, indexer, url)
.await
.map_err(|err| format!("IndexerStatusError({err})"))
}

async fn apply_geoblocking(actor: &mut Actor, indexer: &Indexer) -> Result<(), String> {
async fn apply_geoblocking(actor: &mut Actor, url: &Url) -> Result<(), String> {
let geoip = match &actor.geoip {
Some(geoip) => geoip,
None => return Ok(()),
};
let key = indexer.url.as_str();
if let Some(result) = actor.geoblocking_cache.get(key) {
let key = url.to_string();
if let Some(result) = actor.geoblocking_cache.get(&key) {
return result.clone();
}
async fn apply_geoblocking_inner(
dns_resolver: &DNSResolver,
geoip: &GeoIP,
indexer: &Indexer,
url: &Url,
) -> Result<(), String> {
let host = indexer
.url
let host = url
.host()
.ok_or_else(|| "host missing in URL".to_string())?;
let ips = match host {
Expand All @@ -166,19 +167,18 @@ async fn apply_geoblocking(actor: &mut Actor, indexer: &Indexer) -> Result<(), S
}
Ok(())
}
let result = apply_geoblocking_inner(&actor.dns_resolver, geoip, indexer).await;
actor
.geoblocking_cache
.insert(key.to_string(), result.clone());
let result = apply_geoblocking_inner(&actor.dns_resolver, geoip, url).await;
actor.geoblocking_cache.insert(key, result.clone());
result
}

async fn query_status(
actor: &'static Mutex<Actor>,
client: &reqwest::Client,
indexer: &Indexer,
indexer: Address,
url: Url,
) -> Result<Vec<(Indexing, IndexingStatus)>, String> {
let status_url = indexer.url.join("status").map_err(|err| err.to_string())?;
let status_url = url.join("status").map_err(|err| err.to_string())?;
let status_query = json!({ "query": r#"{
indexingStatuses(subgraphs: []) {
subgraph
Expand All @@ -195,7 +195,7 @@ async fn query_status(
.unpack()?
.indexing_statuses;

let cost_url = indexer.url.join("cost").map_err(|err| err.to_string())?;
let cost_url = url.join("cost").map_err(|err| err.to_string())?;
let deployments = statuses
.iter()
.map(|stat| stat.subgraph.to_string())
Expand Down Expand Up @@ -227,7 +227,7 @@ async fn query_status(
Err(cost_model_compile_err) => {
tracing::debug!(
%cost_model_compile_err,
indexer = %indexer.id,
%indexer,
deployment = %src.deployment,
);
return None;
Expand All @@ -242,7 +242,7 @@ async fn query_status(
.into_iter()
.filter_map(|status| {
let indexing = Indexing {
indexer: indexer.id,
indexer,
deployment: status.subgraph,
};
let chain = &status.chains.get(0)?;
Expand Down

0 comments on commit 3a2c785

Please sign in to comment.