Skip to content

Commit

Permalink
feat(graph-gateway): added indexers status public proof of indexing c…
Browse files Browse the repository at this point in the history
…lient (#372)
  • Loading branch information
LNSD authored Aug 11, 2023
1 parent 3a2c785 commit 3843d2a
Show file tree
Hide file tree
Showing 10 changed files with 523 additions and 0 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions graph-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ tracing-subscriber = { version = "0.3.16", features = [
"json",
"parking_lot",
] }
indoc = "2.0.3"

[dev-dependencies]
assert_matches = "1.5.0"
hyper = "*"
regex = "1.5"
tokio = { version = "1.28.2", features = ["macros"] }
2 changes: 2 additions & 0 deletions graph-gateway/src/indexers_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod graphql;
pub mod public_poi;
42 changes: 42 additions & 0 deletions graph-gateway/src/indexers_status/graphql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use serde::de::DeserializeOwned;
use serde_json::json;

use prelude::graphql::http::Response;
use prelude::{anyhow, reqwest, Url};

/// Trait for types that can be converted into a GraphQL query.
pub trait IntoGraphqlQuery {
fn to_query(&self) -> String;
}

/// Send a GraphQL query to a given URL.
///
/// This function is a wrapper around `reqwest::Client` that:
/// - Sets the `Content-Type` header to `application/json`.
/// - Sets the request body to the given query.
/// - Deserializes the response body into the given type.
// TODO: Improve error handling. Define custom error enum.
pub async fn send_graphql_query<T>(
client: &reqwest::Client,
url: Url,
query: impl IntoGraphqlQuery,
) -> anyhow::Result<T>
where
T: DeserializeOwned,
{
let query = query.to_query();
let body = &json!({ "query": query });

let response = client.post(url.0).json(body).send().await?;

let status = response.status();
let body = response
.json::<Response<T>>()
.await
.map_err(|err| anyhow::anyhow!("Response body deserialization failed: {}", err))?;

// The GraphQL server returns a 400 if the query is invalid together with a JSON object
// containing the error message.
body.unpack()
.map_err(|err| anyhow::anyhow!("GraphQL query failed with status {}: {}", status, err))
}
4 changes: 4 additions & 0 deletions graph-gateway/src/indexers_status/public_poi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub use query::*;

pub mod client;
mod query;
65 changes: 65 additions & 0 deletions graph-gateway/src/indexers_status/public_poi/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::collections::HashMap;

use itertools::Itertools;

use prelude::{anyhow, reqwest, DeploymentId, Url};

use crate::indexers_status::graphql;
use crate::indexers_status::public_poi::query;
use crate::poi::{BlockNumber, ProofOfIndexing};

// To convert from query OI type into `poi` module POI types.
impl From<query::ProofOfIndexing> for ProofOfIndexing {
fn from(value: query::ProofOfIndexing) -> Self {
Self(value.0)
}
}

pub async fn send_public_poi_query(
client: reqwest::Client,
status_url: Url,
query: query::PublicProofOfIndexingQuery,
) -> anyhow::Result<query::PublicProofOfIndexingResponse> {
graphql::send_graphql_query(&client, status_url, query).await
}

pub async fn send_public_poi_queries_and_merge_results(
client: reqwest::Client,
status_url: Url,
requests: impl IntoIterator<Item = (DeploymentId, BlockNumber)>,
batch_size: usize,
) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> {
// Build the query batches and create the futures
let queries = requests
.into_iter()
.map(
|(deployment, block_number)| query::PublicProofOfIndexingRequest {
deployment,
block_number,
},
)
.chunks(batch_size)
.into_iter()
.map(|requests| query::PublicProofOfIndexingQuery {
requests: requests.collect(),
})
.map(|query| send_public_poi_query(client.clone(), status_url.clone(), query))
.collect::<Vec<_>>();

// Send all queries concurrently
let responses = futures::future::join_all(queries).await;

let response_map: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = responses
.into_iter()
// TODO: Handle errors (e.g., log them with trace level).
.filter_map(|response| response.ok())
.flat_map(|response| response.public_proofs_of_indexing)
.filter_map(|response| {
// If the response is missing the POI field, skip it.
let poi = response.proof_of_indexing?.into();
Some(((response.deployment, response.block.number), poi))
})
.collect::<HashMap<_, _>>();

response_map
}
206 changes: 206 additions & 0 deletions graph-gateway/src/indexers_status/public_poi/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use indoc::indoc;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};

use prelude::{anyhow, bytes_wrapper, faster_hex, DeploymentId};

use crate::indexers_status::graphql::IntoGraphqlQuery;

pub const MAX_REQUESTS_PER_QUERY: usize = 10;

pub type BlockNumber = u64;

bytes_wrapper!(pub, ProofOfIndexing, 32, "HexStr");

#[derive(Clone, Debug)]
pub struct PublicProofOfIndexingQuery {
pub requests: Vec<PublicProofOfIndexingRequest>,
}

#[derive(Clone, Debug)]
pub struct PublicProofOfIndexingRequest {
pub deployment: DeploymentId,
pub block_number: BlockNumber,
}

impl PublicProofOfIndexingRequest {
fn to_query_params(&self) -> String {
format!(
r#"{{ deployment: "{}", blockNumber: "{}" }}"#,
self.deployment, self.block_number
)
}
}

impl IntoGraphqlQuery for PublicProofOfIndexingQuery {
fn to_query(&self) -> String {
debug_assert!(!self.requests.is_empty(), "Must have at least one request");

let requests = self
.requests
.iter()
.map(|request| request.to_query_params())
.collect::<Vec<_>>()
.join(", ");

format!(
indoc! {
r#"{{
publicProofsOfIndexing(requests: [{}]) {{
deployment
proofOfIndexing
block {{ number }}
}}
}}"#
},
requests
)
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PublicProofOfIndexingResponse {
pub public_proofs_of_indexing: Vec<PublicProofOfIndexingResult>,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PublicProofOfIndexingResult {
pub deployment: DeploymentId,
pub block: PartialBlockPtr,
pub proof_of_indexing: Option<ProofOfIndexing>,
}

#[serde_as]
#[derive(Debug, Deserialize)]
pub struct PartialBlockPtr {
#[serde_as(as = "DisplayFromStr")]
pub number: BlockNumber,
}

#[cfg(test)]
mod tests {
use super::*;

mod query {
use super::*;

#[test]
fn public_proof_of_indexing_request_to_query_params_format() {
//// Given
let request = PublicProofOfIndexingRequest {
deployment: "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw"
.parse()
.expect("Failed to parse deployment ID"),
block_number: 123,
};

//// When
let query = request.to_query_params();

//// Then
assert_eq!(
query.as_str(),
"{ deployment: \"QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw\", blockNumber: \"123\" }"
);
}

#[test]
fn create_status_public_pois_query() {
//// Given
let query = PublicProofOfIndexingQuery {
requests: vec![
PublicProofOfIndexingRequest {
deployment: "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw"
.parse()
.expect("Failed to parse deployment ID"),
block_number: 123,
},
PublicProofOfIndexingRequest {
deployment: "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH"
.parse()
.expect("Failed to parse deployment ID"),
block_number: 456,
},
],
};

//// When
let query = query.to_query();

//// Then
assert_eq!(
query.as_str(),
indoc! { r#"{
publicProofsOfIndexing(requests: [{ deployment: "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw", blockNumber: "123" }, { deployment: "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH", blockNumber: "456" }]) {
deployment
proofOfIndexing
block { number }
}
}"# }
);
}
}

mod response {
use super::*;

#[test]
fn deserialize_public_pois_response() {
//// Given
let response = indoc! {
r#"{
"publicProofsOfIndexing": [
{
"deployment": "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH",
"proofOfIndexing": "0xba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287",
"block": {
"number": "123"
}
},
{
"deployment": "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw",
"block": {
"number": "456"
}
}
]
}"#
};

//// When
let response: PublicProofOfIndexingResponse =
serde_json::from_str(response).expect("Failed to deserialize response");

//// Then
assert_eq!(response.public_proofs_of_indexing.len(), 2);
assert_eq!(
response.public_proofs_of_indexing[0].deployment,
"QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH"
.parse()
.unwrap()
);
assert_eq!(
response.public_proofs_of_indexing[0].proof_of_indexing,
Some(
"0xba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287"
.parse()
.unwrap()
)
);
assert_eq!(response.public_proofs_of_indexing[0].block.number, 123);
assert_eq!(
response.public_proofs_of_indexing[1].deployment,
"QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw"
.parse()
.unwrap()
);
assert_eq!(
response.public_proofs_of_indexing[1].proof_of_indexing,
None
);
assert_eq!(response.public_proofs_of_indexing[1].block.number, 456);
}
}
}
2 changes: 2 additions & 0 deletions graph-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ pub mod config;
pub mod fisherman_client;
pub mod geoip;
pub mod indexer_client;
pub mod indexers_status;
pub mod indexing;
pub mod ipfs;
pub mod metrics;
pub mod network_subgraph;
pub mod poi;
pub mod price_automation;
pub mod receipts;
pub mod reports;
Expand Down
Loading

0 comments on commit 3843d2a

Please sign in to comment.