Skip to content

Commit

Permalink
Revert "refactor: Flatten allocations per indexing (#365)" (#368)
Browse files Browse the repository at this point in the history
This reverts commit 37b896a.
  • Loading branch information
Theodus authored Aug 9, 2023
1 parent f06c6ce commit 30e3108
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 51 deletions.
13 changes: 8 additions & 5 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,13 @@ async fn handle_client_query_inner(
.iter()
.flat_map(|deployment| {
let id = deployment.id;
deployment.indexers.iter().map(move |indexer| Indexing {
indexer: indexer.id,
deployment: id,
})
deployment
.allocations
.iter()
.map(move |allocation| Indexing {
indexer: allocation.indexer.id,
deployment: id,
})
})
.collect::<BTreeSet<Indexing>>()
.into_iter()
Expand Down Expand Up @@ -1029,7 +1032,7 @@ mod test {
min_block: 0,
}),
version: version.map(|v| Arc::new(v.parse().unwrap())),
indexers: vec![],
allocations: vec![],
subgraphs: BTreeSet::new(),
transferred_to_l2: false,
})
Expand Down
9 changes: 5 additions & 4 deletions graph-gateway/src/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use prelude::{epoch_cache::EpochCache, graphql, url::url::Host, *};

use crate::geoip::GeoIP;
use crate::subgraph_client::graphql_query;
use crate::topology::{Deployment, Indexer};
use crate::topology::{Allocation, Deployment, Indexer};

pub struct IndexingStatus {
pub chain: String,
Expand Down Expand Up @@ -70,12 +70,13 @@ async fn update_statuses(
client: reqwest::Client,
deployments: &HashMap<DeploymentId, Arc<Deployment>>,
) {
let indexers: Vec<&Indexer> = deployments
let indexers: HashMap<Address, &Indexer> = deployments
.values()
.flat_map(|deployment| &deployment.indexers)
.flat_map(|deployment| &deployment.allocations)
.map(|Allocation { indexer, .. }| (indexer.id, indexer))
.collect();

let statuses = join_all(indexers.iter().map(move |indexer| {
let statuses = join_all(indexers.values().map(move |indexer| {
let client = client.clone();
async move {
match update_indexer(actor, client, indexer).await {
Expand Down
20 changes: 10 additions & 10 deletions graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use graph_gateway::{
reports,
reports::KafkaClient,
subgraph_client, subgraph_studio, subscriptions_subgraph,
topology::{Deployment, GraphNetwork, Indexer},
topology::{Allocation, Deployment, GraphNetwork},
vouchers, JsonResponse,
};
use indexer_selection::{
Expand Down Expand Up @@ -386,17 +386,17 @@ async fn write_indexer_inputs(
) {
tracing::info!(
deployments = deployments.len(),
indexings = deployments
allocations = deployments
.values()
.map(|d| d.indexers.len())
.map(|d| d.allocations.len())
.sum::<usize>(),
indexing_statuses = indexing_statuses.len(),
);

let mut indexers: HashMap<Address, IndexerUpdate> = deployments
.values()
.flat_map(|deployment| &deployment.indexers)
.map(|indexer| {
.flat_map(|deployment| &deployment.allocations)
.map(|Allocation { indexer, .. }| {
let update = IndexerUpdate {
info: Arc::new(IndexerInfo {
stake: indexer.staked_tokens,
Expand Down Expand Up @@ -426,14 +426,14 @@ async fn write_indexer_inputs(
let allocations: HashMap<Address, GRT> = deployments
.get(&indexing.deployment)
.into_iter()
.flat_map(|deployment| &deployment.indexers)
.filter(|indexer| indexer.id == indexing.indexer)
.flat_map(|deployment| &deployment.allocations)
.filter(|Allocation { indexer, .. }| indexer.id == indexing.indexer)
.map(
|Indexer {
largest_allocation,
|Allocation {
id,
allocated_tokens,
..
}| (*largest_allocation, *allocated_tokens),
}| (*id, *allocated_tokens),
)
.collect();

Expand Down
48 changes: 16 additions & 32 deletions graph-gateway/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,24 @@ pub struct Deployment {
pub id: DeploymentId,
pub manifest: Arc<Manifest>,
pub version: Option<Arc<semver::Version>>,
/// An indexer may have multiple active allocations on a deployment. We collapse them into a single logical
/// allocation using the largest allocation ID and sum of the allocated tokens.
pub indexers: Vec<Indexer>,
pub allocations: Vec<Allocation>,
/// A deployment may be associated with multiple subgraphs.
pub subgraphs: BTreeSet<SubgraphId>,
/// Indicates that the deployment should not be served directly by this gateway. This will
/// always be false when `allocations > 0`.
pub transferred_to_l2: bool,
}

pub struct Allocation {
pub id: Address,
pub allocated_tokens: GRT,
pub indexer: Indexer,
}

pub struct Indexer {
pub id: Address,
pub url: Url,
pub staked_tokens: GRT,
pub largest_allocation: Address,
pub allocated_tokens: GRT,
}

pub struct Manifest {
Expand Down Expand Up @@ -150,40 +152,22 @@ impl GraphNetwork {
})
.map(|subgraph| subgraph.id)
.collect();

// extract indexer info from each allocation
let allocations: Vec<Indexer> = version
let allocations = version
.subgraph_deployment
.allocations
.iter()
.filter_map(|allocation| {
Some(Indexer {
id: allocation.indexer.id,
url: allocation.indexer.url.as_ref()?.parse().ok()?,
staked_tokens: allocation.indexer.staked_tokens.change_precision(),
largest_allocation: allocation.id,
Some(Allocation {
id: allocation.id,
allocated_tokens: allocation.allocated_tokens.change_precision(),
indexer: Indexer {
id: allocation.indexer.id,
url: allocation.indexer.url.as_ref()?.parse().ok()?,
staked_tokens: allocation.indexer.staked_tokens.change_precision(),
},
})
})
.collect();
// TODO: remove need for itertools here: https://github.com/rust-lang/rust/issues/80552
use itertools::Itertools as _;
let indexers: Vec<Indexer> = allocations
.into_iter()
.map(|indexer| (*indexer.id, indexer))
.into_group_map()
.into_iter()
.filter_map(|(_, allocations)| {
let total_allocation: GRT = allocations.iter().map(|a| a.allocated_tokens).sum();
let max_allocation = allocations.iter().map(|a| a.allocated_tokens).max()?;
let mut indexer = allocations
.into_iter()
.find(|a| a.allocated_tokens == max_allocation)?;
indexer.allocated_tokens = total_allocation;
Some(indexer)
})
.collect();

let transferred_to_l2 = version.subgraph_deployment.transferred_to_l2
&& version.subgraph_deployment.allocations.is_empty();

Expand All @@ -201,7 +185,7 @@ impl GraphNetwork {
manifest,
version,
subgraphs,
indexers,
allocations,
transferred_to_l2,
}))
}
Expand Down

0 comments on commit 30e3108

Please sign in to comment.