Skip to content

Commit

Permalink
feat(graph-gateway): added indexers lookup table to graph network (#371)
Browse files Browse the repository at this point in the history
  • Loading branch information
LNSD authored Aug 11, 2023
1 parent 3843d2a commit 9fcf45e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 25 deletions.
10 changes: 2 additions & 8 deletions graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use graph_gateway::{
reports,
reports::KafkaClient,
subgraph_client, subgraph_studio, subscriptions_subgraph,
topology::{Deployment, GraphNetwork, Indexer},
topology::{Deployment, GraphNetwork},
vouchers, JsonResponse,
};
use indexer_selection::{
Expand Down Expand Up @@ -428,13 +428,7 @@ async fn write_indexer_inputs(
.into_iter()
.flat_map(|deployment| &deployment.indexers)
.filter(|indexer| indexer.id == indexing.indexer)
.map(
|Indexer {
largest_allocation,
allocated_tokens,
..
}| (*largest_allocation, *allocated_tokens),
)
.map(|indexer| (indexer.largest_allocation, indexer.allocated_tokens))
.collect();

receipt_pools
Expand Down
75 changes: 58 additions & 17 deletions graph-gateway/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use chrono::Utc;
use futures_util::future::join_all;
use itertools::Itertools;
use serde::Deserialize;

use prelude::{anyhow::anyhow, eventuals::EventualExt as _, tokio::sync::RwLock, *};
Expand All @@ -16,6 +17,7 @@ use crate::{ipfs, network_subgraph};
pub struct GraphNetwork {
pub subgraphs: Eventual<Ptr<HashMap<SubgraphId, Subgraph>>>,
pub deployments: Eventual<Ptr<HashMap<DeploymentId, Arc<Deployment>>>>,
pub indexers: Eventual<Ptr<HashMap<Address, Arc<Indexer>>>>,
}

/// In an effort to keep the ownership structure a simple tree, this only contains the info required
Expand All @@ -36,14 +38,20 @@ pub struct Deployment {
pub version: Option<Arc<semver::Version>>,
/// An indexer may have multiple active allocations on a deployment. We collapse them into a single logical
/// allocation using the largest allocation ID and sum of the allocated tokens.
pub indexers: Vec<Indexer>,
pub indexers: Vec<Arc<Indexer>>,
/// A deployment may be associated with multiple subgraphs.
pub subgraphs: BTreeSet<SubgraphId>,
/// Indicates that the deployment should not be served directly by this gateway. This will
/// always be false when `allocations > 0`.
pub transferred_to_l2: bool,
}

pub struct Allocation {
pub id: Address,
pub allocated_tokens: GRT,
pub indexer: Arc<Indexer>,
}

pub struct Indexer {
pub id: Address,
pub url: Url,
Expand All @@ -52,6 +60,20 @@ pub struct Indexer {
pub allocated_tokens: GRT,
}

impl Indexer {
pub fn cost_url(&self) -> Url {
// Indexer URLs are validated when they are added to the network, so this should never fail.
// 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
self.url.join("cost").unwrap().into()
}

pub fn status_url(&self) -> Url {
// Indexer URLs are validated when they are added to the network, so this should never fail.
// 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
self.url.join("status").unwrap().into()
}
}

pub struct Manifest {
pub network: String,
pub features: Vec<String>,
Expand All @@ -70,9 +92,13 @@ impl GraphNetwork {
metadata: HashMap::new(),
})));

// Create a lookup table for subgraphs, keyed by their ID.
// Invalid URL indexers are filtered out. See: 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
let subgraphs = subgraphs.map(move |subgraphs| async move {
Ptr::new(Self::subgraphs(&subgraphs, cache, l2_transfer_delay).await)
});

// Create a lookup table for deployments, keyed by their ID (which is also their IPFS hash).
let deployments = subgraphs.clone().map(|subgraphs| async move {
subgraphs
.values()
Expand All @@ -82,14 +108,26 @@ impl GraphNetwork {
.into()
});

// Create a lookup table for indexers, keyed by their ID (which is also their address).
let indexers = subgraphs.clone().map(|subgraphs| async move {
subgraphs
.values()
.flat_map(|subgraph| &subgraph.deployments)
.flat_map(|deployment| &deployment.indexers)
.map(|indexer| (indexer.id, indexer.clone()))
.collect::<HashMap<Address, Arc<Indexer>>>()
.into()
});

// Return only after eventuals have values, to avoid serving client queries prematurely.
if deployments.value().await.is_err() {
if deployments.value().await.is_err() || indexers.value().await.is_err() {
panic!("Failed to await Graph network topology");
}

Self {
subgraphs,
deployments,
indexers,
}
}

Expand Down Expand Up @@ -152,26 +190,28 @@ impl GraphNetwork {
.collect();

// extract indexer info from each allocation
let allocations: Vec<Indexer> = version
let indexers = version
.subgraph_deployment
.allocations
.iter()
.filter_map(|allocation| {
Some(Indexer {
id: allocation.indexer.id,
url: allocation.indexer.url.as_ref()?.parse().ok()?,
staked_tokens: allocation.indexer.staked_tokens.change_precision(),
largest_allocation: allocation.id,
allocated_tokens: allocation.allocated_tokens.change_precision(),
})
// If indexer URL parsing fails, the allocation is ignored (filtered out).
// 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
let url = allocation.indexer.url.as_ref()?.parse().ok()?;

let id = allocation.indexer.id;
Some((
id,
Indexer {
id,
url,
staked_tokens: allocation.indexer.staked_tokens.change_precision(),
largest_allocation: allocation.id,
allocated_tokens: allocation.allocated_tokens.change_precision(),
},
))
})
.collect();
// TODO: remove need for itertools here: https://github.com/rust-lang/rust/issues/80552
use itertools::Itertools as _;
let indexers: Vec<Indexer> = allocations
.into_iter()
.map(|indexer| (*indexer.id, indexer))
.into_group_map()
.into_group_map() // TODO: remove need for itertools here: https://github.com/rust-lang/rust/issues/80552
.into_iter()
.filter_map(|(_, mut allocations)| {
let total_allocation: GRT = allocations.iter().map(|a| a.allocated_tokens).sum();
Expand All @@ -180,6 +220,7 @@ impl GraphNetwork {
indexer.allocated_tokens = total_allocation;
Some(indexer)
})
.map(Arc::new)
.collect();

let transferred_to_l2 = version.subgraph_deployment.transferred_to_l2
Expand Down

0 comments on commit 9fcf45e

Please sign in to comment.