From 3a2c7851ffac5ffc82b531afd4a417578f617539 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Fri, 11 Aug 2023 13:48:45 -0400 Subject: [PATCH] fix: stop overwhelming indexers on status updates --- graph-gateway/src/indexing.rs | 48 +++++++++++++++++------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/graph-gateway/src/indexing.rs b/graph-gateway/src/indexing.rs index cf59a433..9c3dc25c 100644 --- a/graph-gateway/src/indexing.rs +++ b/graph-gateway/src/indexing.rs @@ -14,7 +14,7 @@ use prelude::{epoch_cache::EpochCache, graphql, url::url::Host, *}; use crate::geoip::GeoIP; use crate::subgraph_client::graphql_query; -use crate::topology::{Deployment, Indexer}; +use crate::topology::Deployment; pub struct IndexingStatus { pub chain: String, @@ -70,15 +70,17 @@ async fn update_statuses( client: reqwest::Client, deployments: &HashMap>, ) { - let indexers: Vec<&Indexer> = deployments + // There can only be one URL per indexer entity in the network subgraph + let indexers: HashMap = deployments .values() .flat_map(|deployment| &deployment.indexers) + .map(|indexer| (indexer.id, indexer.url.clone())) .collect(); - let statuses = join_all(indexers.iter().map(move |indexer| { + let statuses = join_all(indexers.into_iter().map(move |(indexer, url)| { let client = client.clone(); async move { - match update_indexer(actor, client, indexer).await { + match update_indexer(actor, client, indexer, url).await { Ok(indexings) => indexings, Err(indexer_status_err) => { tracing::warn!(indexer_status_err); @@ -101,10 +103,10 @@ async fn update_statuses( async fn update_indexer( actor: &'static Mutex, client: reqwest::Client, - indexer: &Indexer, + indexer: Address, + url: Url, ) -> Result, String> { - let version_url = indexer - .url + let version_url = url .join("version") .map_err(|err| format!("IndexerVersionError({err})"))?; let version = client @@ -123,30 +125,29 @@ async fn update_indexer( if version < locked_actor.min_version { return Err(format!("IndexerVersionBelowMinimum({version})")); } - apply_geoblocking(&mut locked_actor, indexer).await?; + apply_geoblocking(&mut locked_actor, &url).await?; drop(locked_actor); - query_status(actor, &client, indexer) + query_status(actor, &client, indexer, url) .await .map_err(|err| format!("IndexerStatusError({err})")) } -async fn apply_geoblocking(actor: &mut Actor, indexer: &Indexer) -> Result<(), String> { +async fn apply_geoblocking(actor: &mut Actor, url: &Url) -> Result<(), String> { let geoip = match &actor.geoip { Some(geoip) => geoip, None => return Ok(()), }; - let key = indexer.url.as_str(); - if let Some(result) = actor.geoblocking_cache.get(key) { + let key = url.to_string(); + if let Some(result) = actor.geoblocking_cache.get(&key) { return result.clone(); } async fn apply_geoblocking_inner( dns_resolver: &DNSResolver, geoip: &GeoIP, - indexer: &Indexer, + url: &Url, ) -> Result<(), String> { - let host = indexer - .url + let host = url .host() .ok_or_else(|| "host missing in URL".to_string())?; let ips = match host { @@ -166,19 +167,18 @@ async fn apply_geoblocking(actor: &mut Actor, indexer: &Indexer) -> Result<(), S } Ok(()) } - let result = apply_geoblocking_inner(&actor.dns_resolver, geoip, indexer).await; - actor - .geoblocking_cache - .insert(key.to_string(), result.clone()); + let result = apply_geoblocking_inner(&actor.dns_resolver, geoip, url).await; + actor.geoblocking_cache.insert(key, result.clone()); result } async fn query_status( actor: &'static Mutex, client: &reqwest::Client, - indexer: &Indexer, + indexer: Address, + url: Url, ) -> Result, String> { - let status_url = indexer.url.join("status").map_err(|err| err.to_string())?; + let status_url = url.join("status").map_err(|err| err.to_string())?; let status_query = json!({ "query": r#"{ indexingStatuses(subgraphs: []) { subgraph @@ -195,7 +195,7 @@ async fn query_status( .unpack()? .indexing_statuses; - let cost_url = indexer.url.join("cost").map_err(|err| err.to_string())?; + let cost_url = url.join("cost").map_err(|err| err.to_string())?; let deployments = statuses .iter() .map(|stat| stat.subgraph.to_string()) @@ -227,7 +227,7 @@ async fn query_status( Err(cost_model_compile_err) => { tracing::debug!( %cost_model_compile_err, - indexer = %indexer.id, + %indexer, deployment = %src.deployment, ); return None; @@ -242,7 +242,7 @@ async fn query_status( .into_iter() .filter_map(|status| { let indexing = Indexing { - indexer: indexer.id, + indexer, deployment: status.subgraph, }; let chain = &status.chains.get(0)?;