Skip to content

Commit

Permalink
feat: indexer management client basics
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Aug 9, 2023
1 parent 1997e9d commit a6e7f8d
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 9 deletions.
2 changes: 2 additions & 0 deletions subgraph-radio/benches/gossips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ fn gossip_poi_bench(c: &mut Criterion) {
network_subgraph: String::from(
"https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli",
),
indexer_management_server_endpoint: None,
},
waku: Waku {
waku_host: None,
Expand Down Expand Up @@ -65,6 +66,7 @@ fn gossip_poi_bench(c: &mut Criterion) {
topic_update_interval: 600,
log_format: LogFormat::Pretty,
graphcast_network: GraphcastNetworkName::Testnet,
auto_upgrade: CoverageLevel::Comprehensive,
},
config_file: None,
});
Expand Down
24 changes: 22 additions & 2 deletions subgraph-radio/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use graphcast_sdk::{
message_typing::IdentityValidation, GraphcastAgentConfig, GraphcastAgentError,
},
graphql::{client_network::query_network_subgraph, QueryError},
init_tracing, wallet_address,
init_tracing, wallet_address, GraphcastNetworkName, LogFormat,
};
use graphcast_sdk::{GraphcastNetworkName, LogFormat};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use tracing::{debug, info, trace};
Expand All @@ -22,6 +21,7 @@ use crate::{active_allocation_hashes, syncing_deployment_hashes};

#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default)]
pub enum CoverageLevel {
None,
Minimal,
#[default]
OnChain,
Expand Down Expand Up @@ -160,6 +160,7 @@ impl Config {
pub async fn generate_topics(&self, indexer_address: String) -> Vec<String> {
let static_topics = HashSet::from_iter(self.radio_infrastructure().topics.to_vec());
let topics = match self.radio_infrastructure().coverage {
CoverageLevel::None => HashSet::new(),
CoverageLevel::Minimal => static_topics,
CoverageLevel::OnChain => {
let mut topics: HashSet<String> =
Expand Down Expand Up @@ -244,6 +245,13 @@ pub struct GraphStack {
help = "Mnemonic to the Graphcast ID wallet (first address of the wallet is used; Only one of private key or mnemonic is needed)",
)]
pub mnemonic: Option<String>,
#[clap(
long,
value_name = "ENDPOINT",
env = "INDEXER_MANAGEMENT_SERVER_ENDPOINT",
help = "API endpoint to the Indexer management server endpoint"
)]
pub indexer_management_server_endpoint: Option<String>,
}

#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
Expand Down Expand Up @@ -277,6 +285,18 @@ pub struct RadioInfrastructure {
Default: comprehensive"
)]
pub coverage: CoverageLevel,
#[clap(
long,
value_name = "COVERAGE",
value_enum,
default_value = "comprehensive",
env = "AUTO_UPGRADE",
help = "Toggle for the types of subgraph the radio send offchain syncing commands to indexer management server. Default to upgrade all syncing deployments",
long_help = "Topic upgrade coverage level\ncomprehensive: on-chain allocations, user defined static topics, and additional topics\n
on-chain: Subscribe to on-chain topics and user defined static topics\nminimal: Only subscribe to user defined static topics.\n
none: no automatic upgrade, only notifications.\nDefault: comprehensive"
)]
pub auto_upgrade: CoverageLevel,
#[clap(
long,
value_parser = value_parser!(i64).range(1..),
Expand Down
170 changes: 170 additions & 0 deletions subgraph-radio/src/operator/indexer_management.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;

use crate::OperationError;

#[derive(Debug, Serialize, Deserialize)]
struct IndexingRuleAttributes {
id: i32,
identifier: String,
identifier_type: String,
allocation_amount: Option<String>,
allocation_lifetime: Option<i32>,
auto_renewal: bool,
parallel_allocations: Option<i32>,
max_allocation_percentage: Option<i32>,
min_signal: Option<String>,
max_signal: Option<String>,
min_stake: Option<String>,
min_average_query_fees: Option<String>,
custom: Option<String>,
decision_basis: String,
require_supported: bool,
safety: bool,
}

pub async fn health_query(url: &str) -> Result<String, OperationError> {
let client = Client::new();
let response = client.get(url).send().await.unwrap();
response
.text()
.await
.map_err(|e| OperationError::Query(graphcast_sdk::graphql::QueryError::Transport(e)))
}

pub async fn indexing_rules(url: &str) -> Result<serde_json::Value, OperationError> {
let graphql_query = json!({
"query": r#"query indexingRules {
indexingRules {
identifier
identifierType
allocationAmount
allocationLifetime
autoRenewal
parallelAllocations
maxAllocationPercentage
minSignal
maxSignal
minStake
minAverageQueryFees
custom
decisionBasis
requireSupported
safety
}
}"#
});

let client = Client::new();
let response = client.post(url).json(&graphql_query).send().await.unwrap();

response
.json::<serde_json::Value>()
.await
.map_err(|e| OperationError::Query(graphcast_sdk::graphql::QueryError::Transport(e)))
}

pub async fn offchain_sync_indexing_rules(
url: &str,
deployment: &str,
) -> Result<serde_json::Value, OperationError> {
let graphql_mutation = json!({
"query": r#"mutation updateIndexingRule($rule: IndexingRuleInput!) {
setIndexingRule(rule: $rule) {
identifier
identifierType
allocationAmount
allocationLifetime
autoRenewal
parallelAllocations
maxAllocationPercentage
minSignal
maxSignal
minStake
minAverageQueryFees
custom
decisionBasis
requireSupported
safety
}
}"#,
"variables": {
"rule": {
"identifier": deployment,
"decisionBasis": "offchain",
"identifierType": "deployment"
}
}
});

let client = Client::new();
let response = client
.post(url)
.json(&graphql_mutation)
.send()
.await
.map_err(|e| OperationError::Query(graphcast_sdk::graphql::QueryError::Transport(e)))?;

response
.json::<serde_json::Value>()
.await
.map_err(|e| OperationError::Query(graphcast_sdk::graphql::QueryError::Transport(e)))
}

// // NOTE: this set of tests can only run in context of running indexer_management server
// #[cfg(test)]
// mod tests {

// use super::*;

// // TODO: add setup and teardown functions

// #[tokio::test]
// async fn test_basic_request() {
// let res = health_query("http://127.0.0.1:18000").await.unwrap();

// assert_eq!(res, "Ready to roll!".to_string());
// }

// #[tokio::test]
// async fn test_query_indexing_rule() {
// let res_json = indexing_rules("http://127.0.0.1:18000").await;

// assert!(res_json.is_ok())
// }

// #[tokio::test]
// async fn test_set_offchain_sync() {
// let res_json = offchain_sync_indexing_rules(
// "http://127.0.0.1:18000",
// "Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss",
// )
// .await;
// assert!(res_json.is_ok());

// let check_setting = indexing_rules("http://127.0.0.1:18000").await.unwrap();

// assert!(check_setting
// .as_object()
// .unwrap()
// .get("data")
// .unwrap()
// .as_object()
// .unwrap()
// .get("iiterles")
// .unwrap()
// .as_array()
// .unwrap()
// .into_iter()
// .any(|o| o
// .as_object()
// .unwrap()
// .get("identifier")
// .unwrap()
// .as_str()
// .unwrap()
// == "Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss")
// );
// }
// }
12 changes: 11 additions & 1 deletion subgraph-radio/src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::time::Duration;
use tokio::time::{interval, sleep, timeout};
use tracing::{debug, error, info, trace, warn};

use crate::{chainhead_block_str, messages::poi::process_valid_message};
use crate::{
chainhead_block_str, messages::poi::process_valid_message,
operator::indexer_management::health_query,
};
use crate::{messages::poi::PublicPoiMessage, metrics::VALIDATED_MESSAGES};
use graphcast_sdk::{
graphcast_agent::{
Expand All @@ -29,6 +32,7 @@ use self::notifier::Notifier;

pub mod attestation;
pub mod callbook;
pub mod indexer_management;
pub mod notifier;
pub mod operation;

Expand Down Expand Up @@ -104,6 +108,12 @@ impl RadioOperator {
debug!("Set global static instance of graphcast_agent");
_ = GRAPHCAST_AGENT.set(graphcast_agent.clone());

//TODO: Refactor indexer management server validation to SDK, similar to graph node status endpoint
if let Some(url) = &config.graph_stack.indexer_management_server_endpoint {
_ = health_query(url)
.await
.expect("Failed to validate the provided indexer management server endpoint");
};
let notifier = Notifier::from_config(config);

let state_ref = persisted_state.clone();
Expand Down
13 changes: 7 additions & 6 deletions subgraph-radio/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ use tracing::{info, trace, warn};

use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage;

use crate::operator::attestation::{
clear_local_attestation, ComparisonResult, ComparisonResultType,
use crate::{
messages::poi::PublicPoiMessage,
operator::attestation::{
clear_local_attestation, Attestation, ComparisonResult, ComparisonResultType,
},
operator::notifier::Notifier,
RADIO_OPERATOR,
};
use crate::operator::notifier::Notifier;
use crate::RADIO_OPERATOR;

use crate::{messages::poi::PublicPoiMessage, operator::attestation::Attestation};

type Local = Arc<SyncMutex<HashMap<String, HashMap<u64, Attestation>>>>;
type Remote = Arc<SyncMutex<Vec<GraphcastMessage<PublicPoiMessage>>>>;
Expand Down
2 changes: 2 additions & 0 deletions test-utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub fn test_config() -> Config {
mnemonic: None,
registry_subgraph: String::new(),
network_subgraph: String::new(),
indexer_management_server_endpoint: None,
}
},
waku: {
Expand Down Expand Up @@ -74,6 +75,7 @@ pub fn test_config() -> Config {
telegram_token: None,
id_validation: IdentityValidation::ValidAddress,
topic_update_interval: 600,
auto_upgrade: CoverageLevel::OnChain,
}
},
config_file: None,
Expand Down
1 change: 1 addition & 0 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ pub fn start_radio(config: &Config) -> Child {
.arg(config.radio_infrastructure().topics.join(","))
.arg("--coverage")
.arg(match config.radio_infrastructure().coverage {
CoverageLevel::None => "none",
CoverageLevel::Minimal => "minimal",
CoverageLevel::OnChain => "on-chain",
CoverageLevel::Comprehensive => "comprehensive",
Expand Down

0 comments on commit a6e7f8d

Please sign in to comment.