diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index fadaaf44..e809d81c 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -427,10 +427,13 @@ async fn handle_client_query_inner( .iter() .flat_map(|deployment| { let id = deployment.id; - deployment.indexers.iter().map(move |indexer| Indexing { - indexer: indexer.id, - deployment: id, - }) + deployment + .allocations + .iter() + .map(move |allocation| Indexing { + indexer: allocation.indexer.id, + deployment: id, + }) }) .collect::>() .into_iter() @@ -1029,7 +1032,7 @@ mod test { min_block: 0, }), version: version.map(|v| Arc::new(v.parse().unwrap())), - indexers: vec![], + allocations: vec![], subgraphs: BTreeSet::new(), transferred_to_l2: false, }) diff --git a/graph-gateway/src/indexing.rs b/graph-gateway/src/indexing.rs index cf59a433..953b13e8 100644 --- a/graph-gateway/src/indexing.rs +++ b/graph-gateway/src/indexing.rs @@ -14,7 +14,7 @@ use prelude::{epoch_cache::EpochCache, graphql, url::url::Host, *}; use crate::geoip::GeoIP; use crate::subgraph_client::graphql_query; -use crate::topology::{Deployment, Indexer}; +use crate::topology::{Allocation, Deployment, Indexer}; pub struct IndexingStatus { pub chain: String, @@ -70,12 +70,13 @@ async fn update_statuses( client: reqwest::Client, deployments: &HashMap>, ) { - let indexers: Vec<&Indexer> = deployments + let indexers: HashMap = deployments .values() - .flat_map(|deployment| &deployment.indexers) + .flat_map(|deployment| &deployment.allocations) + .map(|Allocation { indexer, .. }| (indexer.id, indexer)) .collect(); - let statuses = join_all(indexers.iter().map(move |indexer| { + let statuses = join_all(indexers.values().map(move |indexer| { let client = client.clone(); async move { match update_indexer(actor, client, indexer).await { diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 23996c0e..36d1d57e 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -42,7 +42,7 @@ use graph_gateway::{ reports, reports::KafkaClient, subgraph_client, subgraph_studio, subscriptions_subgraph, - topology::{Deployment, GraphNetwork, Indexer}, + topology::{Allocation, Deployment, GraphNetwork}, vouchers, JsonResponse, }; use indexer_selection::{ @@ -386,17 +386,17 @@ async fn write_indexer_inputs( ) { tracing::info!( deployments = deployments.len(), - indexings = deployments + allocations = deployments .values() - .map(|d| d.indexers.len()) + .map(|d| d.allocations.len()) .sum::(), indexing_statuses = indexing_statuses.len(), ); let mut indexers: HashMap = deployments .values() - .flat_map(|deployment| &deployment.indexers) - .map(|indexer| { + .flat_map(|deployment| &deployment.allocations) + .map(|Allocation { indexer, .. }| { let update = IndexerUpdate { info: Arc::new(IndexerInfo { stake: indexer.staked_tokens, @@ -426,14 +426,14 @@ async fn write_indexer_inputs( let allocations: HashMap = deployments .get(&indexing.deployment) .into_iter() - .flat_map(|deployment| &deployment.indexers) - .filter(|indexer| indexer.id == indexing.indexer) + .flat_map(|deployment| &deployment.allocations) + .filter(|Allocation { indexer, .. }| indexer.id == indexing.indexer) .map( - |Indexer { - largest_allocation, + |Allocation { + id, allocated_tokens, .. - }| (*largest_allocation, *allocated_tokens), + }| (*id, *allocated_tokens), ) .collect(); diff --git a/graph-gateway/src/topology.rs b/graph-gateway/src/topology.rs index 7660d95e..9794ef6f 100644 --- a/graph-gateway/src/topology.rs +++ b/graph-gateway/src/topology.rs @@ -34,9 +34,7 @@ pub struct Deployment { pub id: DeploymentId, pub manifest: Arc, pub version: Option>, - /// An indexer may have multiple active allocations on a deployment. We collapse them into a single logical - /// allocation using the largest allocation ID and sum of the allocated tokens. - pub indexers: Vec, + pub allocations: Vec, /// A deployment may be associated with multiple subgraphs. pub subgraphs: BTreeSet, /// Indicates that the deployment should not be served directly by this gateway. This will @@ -44,12 +42,16 @@ pub struct Deployment { pub transferred_to_l2: bool, } +pub struct Allocation { + pub id: Address, + pub allocated_tokens: GRT, + pub indexer: Indexer, +} + pub struct Indexer { pub id: Address, pub url: Url, pub staked_tokens: GRT, - pub largest_allocation: Address, - pub allocated_tokens: GRT, } pub struct Manifest { @@ -150,40 +152,22 @@ impl GraphNetwork { }) .map(|subgraph| subgraph.id) .collect(); - - // extract indexer info from each allocation - let allocations: Vec = version + let allocations = version .subgraph_deployment .allocations .iter() .filter_map(|allocation| { - Some(Indexer { - id: allocation.indexer.id, - url: allocation.indexer.url.as_ref()?.parse().ok()?, - staked_tokens: allocation.indexer.staked_tokens.change_precision(), - largest_allocation: allocation.id, + Some(Allocation { + id: allocation.id, allocated_tokens: allocation.allocated_tokens.change_precision(), + indexer: Indexer { + id: allocation.indexer.id, + url: allocation.indexer.url.as_ref()?.parse().ok()?, + staked_tokens: allocation.indexer.staked_tokens.change_precision(), + }, }) }) .collect(); - // TODO: remove need for itertools here: https://github.com/rust-lang/rust/issues/80552 - use itertools::Itertools as _; - let indexers: Vec = allocations - .into_iter() - .map(|indexer| (*indexer.id, indexer)) - .into_group_map() - .into_iter() - .filter_map(|(_, allocations)| { - let total_allocation: GRT = allocations.iter().map(|a| a.allocated_tokens).sum(); - let max_allocation = allocations.iter().map(|a| a.allocated_tokens).max()?; - let mut indexer = allocations - .into_iter() - .find(|a| a.allocated_tokens == max_allocation)?; - indexer.allocated_tokens = total_allocation; - Some(indexer) - }) - .collect(); - let transferred_to_l2 = version.subgraph_deployment.transferred_to_l2 && version.subgraph_deployment.allocations.is_empty(); @@ -201,7 +185,7 @@ impl GraphNetwork { manifest, version, subgraphs, - indexers, + allocations, transferred_to_l2, })) }