diff --git a/subgraph-radio/benches/gossips.rs b/subgraph-radio/benches/gossips.rs index e66d93a..0d7a6c2 100644 --- a/subgraph-radio/benches/gossips.rs +++ b/subgraph-radio/benches/gossips.rs @@ -31,6 +31,7 @@ fn gossip_poi_bench(c: &mut Criterion) { network_subgraph: String::from( "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli", ), + indexer_management_server_endpoint: None, }, waku: Waku { waku_host: None, @@ -65,6 +66,7 @@ fn gossip_poi_bench(c: &mut Criterion) { topic_update_interval: 600, log_format: LogFormat::Pretty, graphcast_network: GraphcastNetworkName::Testnet, + auto_upgrade: CoverageLevel::Comprehensive, }, config_file: None, }); diff --git a/subgraph-radio/src/config.rs b/subgraph-radio/src/config.rs index 78a4fe2..c9e92c7 100644 --- a/subgraph-radio/src/config.rs +++ b/subgraph-radio/src/config.rs @@ -14,7 +14,6 @@ use graphcast_sdk::{ }; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use std::fmt; use tracing::{debug, info, trace}; use crate::state::{panic_hook, PersistedState}; @@ -22,6 +21,7 @@ use crate::{active_allocation_hashes, syncing_deployment_hashes}; #[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default)] pub enum CoverageLevel { + None, Minimal, #[default] OnChain, @@ -160,6 +160,7 @@ impl Config { pub async fn generate_topics(&self, indexer_address: String) -> Vec { let static_topics = HashSet::from_iter(self.radio_infrastructure().topics.to_vec()); let topics = match self.radio_infrastructure().coverage { + CoverageLevel::None => HashSet::new(), CoverageLevel::Minimal => static_topics, CoverageLevel::OnChain => { let mut topics: HashSet = @@ -244,6 +245,13 @@ pub struct GraphStack { help = "Mnemonic to the Graphcast ID wallet (first address of the wallet is used; Only one of private key or mnemonic is needed)", )] pub mnemonic: Option, + #[clap( + long, + value_name = "ENDPOINT", + env = "INDEXER_MANAGEMENT_SERVER_ENDPOINT", + help = "API endpoint to the Indexer management server endpoint" + )] + pub indexer_management_server_endpoint: Option, } #[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] @@ -277,6 +285,18 @@ pub struct RadioInfrastructure { Default: comprehensive" )] pub coverage: CoverageLevel, + #[clap( + long, + value_name = "COVERAGE", + value_enum, + default_value = "comprehensive", + env = "AUTO_UPGRADE", + help = "Toggle for the types of subgraph the radio send offchain syncing commands to indexer management server. Default to upgrade all syncing deployments", + long_help = "Topic upgrade coverage level\ncomprehensive: on-chain allocations, user defined static topics, and additional topics\n + on-chain: Subscribe to on-chain topics and user defined static topics\nminimal: Only subscribe to user defined static topics.\n + none: no automatic upgrade, only notifications.\nDefault: comprehensive" + )] + pub auto_upgrade: CoverageLevel, #[clap( long, value_parser = value_parser!(i64).range(1..), diff --git a/subgraph-radio/src/operator/indexer_management.rs b/subgraph-radio/src/operator/indexer_management.rs new file mode 100644 index 0000000..d383fa7 --- /dev/null +++ b/subgraph-radio/src/operator/indexer_management.rs @@ -0,0 +1,170 @@ +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +use crate::OperationError; + +#[derive(Debug, Serialize, Deserialize)] +struct IndexingRuleAttributes { + id: i32, + identifier: String, + identifier_type: String, + allocation_amount: Option, + allocation_lifetime: Option, + auto_renewal: bool, + parallel_allocations: Option, + max_allocation_percentage: Option, + min_signal: Option, + max_signal: Option, + min_stake: Option, + min_average_query_fees: Option, + custom: Option, + decision_basis: String, + require_supported: bool, + safety: bool, +} + +pub async fn health_query(url: &str) -> Result { + let client = Client::new(); + let response = client.get(url).send().await.unwrap(); + response + .text() + .await + .map_err(|e| OperationError::Query(graphcast_sdk::graphql::QueryError::Transport(e))) +} + +pub async fn indexing_rules(url: &str) -> Result { + let graphql_query = json!({ + "query": r#"query indexingRules { + indexingRules { + identifier + identifierType + allocationAmount + allocationLifetime + autoRenewal + parallelAllocations + maxAllocationPercentage + minSignal + maxSignal + minStake + minAverageQueryFees + custom + decisionBasis + requireSupported + safety + } + }"# + }); + + let client = Client::new(); + let response = client.post(url).json(&graphql_query).send().await.unwrap(); + + response + .json::() + .await + .map_err(|e| OperationError::Query(graphcast_sdk::graphql::QueryError::Transport(e))) +} + +pub async fn offchain_sync_indexing_rules( + url: &str, + deployment: &str, +) -> Result { + let graphql_mutation = json!({ + "query": r#"mutation updateIndexingRule($rule: IndexingRuleInput!) { + setIndexingRule(rule: $rule) { + identifier + identifierType + allocationAmount + allocationLifetime + autoRenewal + parallelAllocations + maxAllocationPercentage + minSignal + maxSignal + minStake + minAverageQueryFees + custom + decisionBasis + requireSupported + safety + } + }"#, + "variables": { + "rule": { + "identifier": deployment, + "decisionBasis": "offchain", + "identifierType": "deployment" + } + } + }); + + let client = Client::new(); + let response = client + .post(url) + .json(&graphql_mutation) + .send() + .await + .map_err(|e| OperationError::Query(graphcast_sdk::graphql::QueryError::Transport(e)))?; + + response + .json::() + .await + .map_err(|e| OperationError::Query(graphcast_sdk::graphql::QueryError::Transport(e))) +} + +// // NOTE: this set of tests can only run in context of running indexer_management server +// #[cfg(test)] +// mod tests { + +// use super::*; + +// // TODO: add setup and teardown functions + +// #[tokio::test] +// async fn test_basic_request() { +// let res = health_query("http://127.0.0.1:18000").await.unwrap(); + +// assert_eq!(res, "Ready to roll!".to_string()); +// } + +// #[tokio::test] +// async fn test_query_indexing_rule() { +// let res_json = indexing_rules("http://127.0.0.1:18000").await; + +// assert!(res_json.is_ok()) +// } + +// #[tokio::test] +// async fn test_set_offchain_sync() { +// let res_json = offchain_sync_indexing_rules( +// "http://127.0.0.1:18000", +// "Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss", +// ) +// .await; +// assert!(res_json.is_ok()); + +// let check_setting = indexing_rules("http://127.0.0.1:18000").await.unwrap(); + +// assert!(check_setting +// .as_object() +// .unwrap() +// .get("data") +// .unwrap() +// .as_object() +// .unwrap() +// .get("iiterles") +// .unwrap() +// .as_array() +// .unwrap() +// .into_iter() +// .any(|o| o +// .as_object() +// .unwrap() +// .get("identifier") +// .unwrap() +// .as_str() +// .unwrap() +// == "Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss") +// ); +// } +// } diff --git a/subgraph-radio/src/operator/mod.rs b/subgraph-radio/src/operator/mod.rs index 9870c1b..5c4a614 100644 --- a/subgraph-radio/src/operator/mod.rs +++ b/subgraph-radio/src/operator/mod.rs @@ -7,7 +7,10 @@ use std::time::Duration; use tokio::time::{interval, sleep, timeout}; use tracing::{debug, error, info, trace, warn}; -use crate::{chainhead_block_str, messages::poi::process_valid_message}; +use crate::{ + chainhead_block_str, messages::poi::process_valid_message, + operator::indexer_management::health_query, +}; use crate::{messages::poi::PublicPoiMessage, metrics::VALIDATED_MESSAGES}; use graphcast_sdk::{ graphcast_agent::{ @@ -29,6 +32,7 @@ use self::notifier::Notifier; pub mod attestation; pub mod callbook; +pub mod indexer_management; pub mod notifier; pub mod operation; @@ -104,6 +108,12 @@ impl RadioOperator { debug!("Set global static instance of graphcast_agent"); _ = GRAPHCAST_AGENT.set(graphcast_agent.clone()); + //TODO: Refactor indexer management server validation to SDK, similar to graph node status endpoint + if let Some(url) = &config.graph_stack.indexer_management_server_endpoint { + _ = health_query(url) + .await + .expect("Failed to validate the provided indexer management server endpoint"); + }; let notifier = Notifier::from_config(config); let state_ref = persisted_state.clone(); diff --git a/subgraph-radio/src/state.rs b/subgraph-radio/src/state.rs index 7e45828..9ce7f7b 100644 --- a/subgraph-radio/src/state.rs +++ b/subgraph-radio/src/state.rs @@ -15,13 +15,14 @@ use tracing::{info, trace, warn}; use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage; -use crate::operator::attestation::{ - clear_local_attestation, ComparisonResult, ComparisonResultType, +use crate::{ + messages::poi::PublicPoiMessage, + operator::attestation::{ + clear_local_attestation, Attestation, ComparisonResult, ComparisonResultType, + }, + operator::notifier::Notifier, + RADIO_OPERATOR, }; -use crate::operator::notifier::Notifier; -use crate::RADIO_OPERATOR; - -use crate::{messages::poi::PublicPoiMessage, operator::attestation::Attestation}; type Local = Arc>>>; type Remote = Arc>>>; diff --git a/test-utils/src/config.rs b/test-utils/src/config.rs index 7b26235..b87e069 100644 --- a/test-utils/src/config.rs +++ b/test-utils/src/config.rs @@ -1,7 +1,5 @@ use clap::{ArgSettings, Parser}; -use graphcast_sdk::{ - graphcast_agent::message_typing::IdentityValidation, GraphcastNetworkName, LogFormat, -}; +use graphcast_sdk::{graphcast_agent::message_typing::IdentityValidation, GraphcastNetworkName, LogFormat}; use serde::{Deserialize, Serialize}; use subgraph_radio::config::{Config, CoverageLevel, GraphStack, RadioInfrastructure, Waku}; @@ -36,6 +34,7 @@ pub fn test_config() -> Config { mnemonic: None, registry_subgraph: String::new(), network_subgraph: String::new(), + indexer_management_server_endpoint: None, } }, waku: { @@ -74,6 +73,7 @@ pub fn test_config() -> Config { telegram_token: None, id_validation: IdentityValidation::ValidAddress, topic_update_interval: 600, + auto_upgrade: CoverageLevel::OnChain, } }, config_file: None, diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 240925d..ef9a7e5 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -185,6 +185,7 @@ pub fn start_radio(config: &Config) -> Child { .arg(config.radio_infrastructure().topics.join(",")) .arg("--coverage") .arg(match config.radio_infrastructure().coverage { + CoverageLevel::None => "none", CoverageLevel::Minimal => "minimal", CoverageLevel::OnChain => "on-chain", CoverageLevel::Comprehensive => "comprehensive",