From 407f4f7a67885561c9466540d8f401c433eac642 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Mon, 31 Jul 2023 15:10:46 -0500 Subject: [PATCH 1/2] chore: split off one-shot-cli package --- Cargo.lock | 23 -- Cargo.toml | 2 +- one-shot/Cargo.toml | 40 ---- one-shot/src/config.rs | 358 ----------------------------- one-shot/src/lib.rs | 17 -- one-shot/src/main.rs | 19 -- one-shot/src/operator/mod.rs | 63 ----- one-shot/src/operator/operation.rs | 43 ---- 8 files changed, 1 insertion(+), 564 deletions(-) delete mode 100644 one-shot/Cargo.toml delete mode 100644 one-shot/src/config.rs delete mode 100644 one-shot/src/lib.rs delete mode 100644 one-shot/src/main.rs delete mode 100644 one-shot/src/operator/mod.rs delete mode 100644 one-shot/src/operator/operation.rs diff --git a/Cargo.lock b/Cargo.lock index bf1fa27..398b9fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3232,29 +3232,6 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" -[[package]] -name = "one-shot" -version = "0.1.0" -dependencies = [ - "anyhow", - "cargo-husky", - "chrono", - "clap", - "derive-getters", - "dotenv", - "ethers", - "graphcast-sdk", - "once_cell", - "partial_application", - "serde", - "serde_json", - "subgraph-radio", - "thiserror", - "tokio", - "tracing", - "tracing-subscriber", -] - [[package]] name = "oorandom" version = "11.1.3" diff --git a/Cargo.toml b/Cargo.toml index 9d13d76..e2ae1e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["subgraph-radio", "test-sender", "test-utils", "test-runner", "one-shot"] +members = ["subgraph-radio", "test-sender", "test-utils", "test-runner"] resolver = "2" [profile.dev] diff --git a/one-shot/Cargo.toml b/one-shot/Cargo.toml deleted file mode 100644 index e18181d..0000000 --- a/one-shot/Cargo.toml +++ /dev/null @@ -1,40 +0,0 @@ -[package] -name = "one-shot" -version = "0.1.0" -edition = "2021" -authors = ["GraphOps (axiomatic-aardvark, hopeyen)"] -description = "One-shot messaging using Graphcast SDK (can be separated to a different repo)" -license = "Apache-2.0" -repository = "https://github.com/graphops/subgraph-radio" -keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"] -categories = ["network-programming", "web-programming::http-client"] - -[dependencies] -graphcast-sdk = "0.4.0" -subgraph-radio = { path = "../subgraph-radio" } -once_cell = "1.17" -chrono = "0.4" -serde = { version = "1.0.163", features = ["rc"] } -serde_json = "1.0.96" -derive-getters = "0.2.1" -tokio = { version = "1.28.1", features = ["full", "rt"] } -anyhow = "1.0" -thiserror = "1.0.40" -ethers = "2.0.4" -partial_application = "0.2.1" -dotenv = "0.15" -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = [ - "env-filter", - "ansi", - "fmt", - "std", - "json", -] } -clap = { version = "3.2.25", features = ["derive", "env"] } - -[dev-dependencies.cargo-husky] -version = "1" -default-features = false -# Disable features which are enabled by default -features = ["precommit-hook", "run-cargo-fmt", "run-cargo-clippy"] diff --git a/one-shot/src/config.rs b/one-shot/src/config.rs deleted file mode 100644 index 92f7157..0000000 --- a/one-shot/src/config.rs +++ /dev/null @@ -1,358 +0,0 @@ -use clap::Parser; -use derive_getters::Getters; -use ethers::signers::WalletError; -use serde::{Deserialize, Serialize}; -use tracing::info; - -use graphcast_sdk::{ - build_wallet, - callbook::CallBook, - graphcast_agent::{ - message_typing::IdentityValidation, GraphcastAgentConfig, GraphcastAgentError, - }, - graphql::{ - client_network::query_network_subgraph, client_registry::query_registry, QueryError, - }, - init_tracing, wallet_address, -}; - -#[derive(Clone, Debug, Parser, Serialize, Deserialize, Getters, Default)] -#[clap( - name = "one-shot-messenger", - about = "Send a message to Graphcast network", - author = "GraphOps" -)] -pub struct Config { - #[clap( - long, - value_name = "KEY", - value_parser = Config::parse_key, - env = "PRIVATE_KEY", - hide_env_values = true, - help = "Private key to the Graphcast ID wallet (Precendence over mnemonics)", - )] - // should keep this value private, this is current public due to the constructing a Config in test-utils - // We can get around this by making an explicit function to make config instead of direct build in {} - pub private_key: Option, - #[clap( - long, - value_name = "KEY", - value_parser = Config::parse_key, - env = "MNEMONIC", - hide_env_values = true, - help = "Mnemonic to the Graphcast ID wallet (first address of the wallet is used; Only one of private key or mnemonic is needed)", - )] - pub mnemonic: Option, - #[clap( - long, - value_name = "GRAPH_ACCOUNT", - env = "GRAPH_ACCOUNT", - help = "Graph account corresponding to Graphcast operator" - )] - pub graph_account: String, - #[clap( - long, - value_name = "SUBGRAPH", - env = "REGISTRY_SUBGRAPH", - help = "Subgraph endpoint to the Graphcast Registry", - default_value = "https://api.thegraph.com/subgraphs/name/hopeyen/graphcast-registry-goerli" - )] - pub registry_subgraph: String, - #[clap( - long, - value_name = "SUBGRAPH", - env = "NETWORK_SUBGRAPH", - help = "Subgraph endpoint to The Graph network subgraph", - default_value = "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli" - )] - pub network_subgraph: String, - #[clap( - long, - default_value = "testnet", - value_name = "NAME", - env = "GRAPHCAST_NETWORK", - help = "Supported Graphcast networks: mainnet, testnet", - possible_values = ["testnet", "mainnet"], - )] - pub graphcast_network: String, - #[clap( - long, - value_name = "[TOPIC]", - value_delimiter = ',', - env = "TOPICS", - help = "Comma separated static list of content topics to subscribe to (Static list to include) (right now just send message to the first element of the vec?" - )] - pub topics: Vec, - #[clap( - long, - value_name = "IDENTIFIER", - env = "IDENTIFIER", - help = "Subgraph hash is used to be the message content identifier" - )] - pub identifier: String, - #[clap( - long, - value_name = "NEW_HASH", - env = "NEW_HASH", - help = "Subgraph hash for the upgrade version of the subgraph" - )] - pub new_hash: String, - #[clap( - long, - value_name = "SUBGRAPH_ID", - env = "SUBGRAPH_ID", - help = "Subgraph id shared by the old and new deployment" - )] - pub subgraph_id: String, - #[clap( - long, - value_name = "INDEX_NETWORK", - env = "INDEX_NETWORK", - help = "Subgraph id shared by the old and new deployment" - )] - pub index_network: String, - #[clap( - long, - value_name = "MIGRATION_TIME", - env = "MIGRATION_TIME", - help = "UNIX timestamp that the developer plan on migrating the usage" - )] - pub migration_time: i64, - #[clap( - long, - value_name = "WAKU_HOST", - help = "Host for the Waku gossip client", - env = "WAKU_HOST" - )] - pub waku_host: Option, - #[clap( - long, - value_name = "WAKU_PORT", - help = "Port for the Waku gossip client", - env = "WAKU_PORT" - )] - pub waku_port: Option, - #[clap( - long, - value_name = "KEY", - env = "WAKU_NODE_KEY", - hide_env_values = true, - help = "Private key to the Waku node id" - )] - pub waku_node_key: Option, - #[clap( - long, - value_name = "KEY", - env = "WAKU_ADDRESS", - hide_env_values = true, - help = "Advertised address to be connected among the Waku peers" - )] - pub waku_addr: Option, - #[clap( - long, - value_name = "NODE_ADDRESSES", - help = "Comma separated static list of waku boot nodes to connect to", - env = "BOOT_NODE_ADDRESSES" - )] - pub boot_node_addresses: Vec, - #[clap( - long, - value_name = "WAKU_LOG_LEVEL", - help = "Waku node logging configuration", - env = "WAKU_LOG_LEVEL" - )] - pub waku_log_level: Option, - #[clap( - long, - value_name = "DISCV5_ENRS", - help = "Comma separated ENRs for Waku discv5 bootstrapping", - env = "DISCV5_ENRS" - )] - pub discv5_enrs: Option>, - #[clap( - long, - value_name = "DISCV5_PORT", - help = "Waku node to expose discoverable udp port", - env = "DISCV5_PORT" - )] - pub discv5_port: Option, - #[clap( - long, - value_name = "LOG_LEVEL", - default_value = "info", - help = "logging configurationt to set as RUST_LOG", - env = "RUST_LOG" - )] - pub log_level: String, - #[clap( - long, - value_name = "SLACK_TOKEN", - help = "Slack bot API token", - env = "SLACK_TOKEN" - )] - pub slack_token: Option, - #[clap( - long, - value_name = "SLACK_CHANNEL", - help = "Name of Slack channel to send messages to (has to be a public channel)", - env = "SLACK_CHANNEL" - )] - pub slack_channel: Option, - #[clap( - long, - value_name = "DISCORD_WEBHOOK", - help = "Discord webhook URL to send messages to", - env = "DISCORD_WEBHOOK" - )] - pub discord_webhook: Option, - #[clap( - long, - value_name = "TELEGRAM_TOKEN", - help = "Telegram Bot API Token", - env = "TELEGRAM_TOKEN" - )] - pub telegram_token: Option, - #[clap( - long, - value_name = "TELEGRAM_CHAT_ID", - help = "Id of Telegram chat (DM or group) to send messages to", - env = "TELEGRAM_CHAT_ID" - )] - pub telegram_chat_id: Option, - #[clap( - long, - value_name = "LOG_FORMAT", - env = "LOG_FORMAT", - help = "Support logging formats: pretty, json, full, compact", - long_help = "pretty: verbose and human readable; json: not verbose and parsable; compact: not verbose and not parsable; full: verbose and not parsible", - possible_values = ["pretty", "json", "full", "compact"], - default_value = "pretty" - )] - pub log_format: String, - #[clap( - long, - value_name = "RADIO_NAME", - env = "RADIO_NAME", - default_value = "subgraph-radio" - )] - pub radio_name: String, - #[clap( - long, - value_name = "ID_VALIDATION", - value_enum, - default_value = "subgraph-staker", - env = "ID_VALIDATION", - help = "Identity validaiton mechanism for message signers", - long_help = "Identity validaiton mechanism for message signers\n - no-check: all messages signer is valid, \n - valid-address: signer needs to be an valid Eth address, \n - graphcast-registered: must be registered at Graphcast Registry, \n - graph-network-account: must be a Graph account, \n - registered-indexer: must be registered at Graphcast Registry, correspond to and Indexer statisfying indexer minimum stake requirement, \n - indexer: must be registered at Graphcast Registry or is a Graph Account, correspond to and Indexer statisfying indexer minimum stake requirement" - )] - pub id_validation: IdentityValidation, -} - -impl Config { - /// Parse config arguments - pub fn args() -> Self { - // TODO: load config file before parse (maybe add new level of subcommands) - let config = Config::parse(); - std::env::set_var("RUST_LOG", config.log_level.clone()); - // Enables tracing under RUST_LOG variable - init_tracing(config.log_format.clone()).expect("Could not set up global default subscriber for logger, check environmental variable `RUST_LOG` or the CLI input `log-level`"); - config - } - - /// Validate that private key as an Eth wallet - fn parse_key(value: &str) -> Result { - // The wallet can be stored instead of the original private key - let wallet = build_wallet(value)?; - let address = wallet_address(&wallet); - info!(address, "Resolved Graphcast id"); - Ok(String::from(value)) - } - - /// Private key takes precedence over mnemonic - pub fn wallet_input(&self) -> Result<&String, ConfigError> { - match (&self.private_key, &self.mnemonic) { - (Some(p), _) => Ok(p), - (_, Some(m)) => Ok(m), - _ => Err(ConfigError::ValidateInput( - "Must provide either private key or mnemonic".to_string(), - )), - } - } - - pub async fn to_graphcast_agent_config( - &self, - ) -> Result { - let wallet_key = self.wallet_input().unwrap().to_string(); - let topics = self.topics.clone(); - - GraphcastAgentConfig::new( - wallet_key, - self.graph_account.clone(), - self.radio_name.clone(), - self.registry_subgraph.clone(), - self.network_subgraph.clone(), - self.id_validation.clone(), - None, - Some(self.boot_node_addresses.clone()), - Some(self.graphcast_network.to_owned()), - Some(topics), - self.waku_node_key.clone(), - self.waku_host.clone(), - self.waku_port.clone(), - self.waku_addr.clone(), - Some(true), - self.discv5_enrs.clone(), - self.discv5_port, - ) - .await - } - - pub async fn basic_info(&self) -> Result<(String, f32), QueryError> { - // Using unwrap directly as the query has been ran in the set-up validation - let wallet = build_wallet( - self.wallet_input() - .map_err(|e| QueryError::Other(e.into()))?, - ) - .map_err(|e| QueryError::Other(e.into()))?; - // The query here must be Ok but so it is okay to panic here - // Alternatively, make validate_set_up return wallet, address, and stake - let my_address = query_registry(self.registry_subgraph(), &wallet_address(&wallet)).await?; - let my_stake = query_network_subgraph(self.network_subgraph(), &my_address) - .await - .unwrap() - .indexer_stake(); - info!( - my_address, - my_stake, "Initializing radio operator for indexer identity", - ); - Ok((my_address, my_stake)) - } - - pub fn callbook(&self) -> CallBook { - CallBook::new( - self.registry_subgraph.clone(), - self.network_subgraph.clone(), - None, - ) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum ConfigError { - #[error("Validate the input: {0}")] - ValidateInput(String), - #[error("Generate JSON representation of the config file: {0}")] - GenerateJson(serde_json::Error), - #[error("QueryError: {0}")] - QueryError(QueryError), - #[error("Toml file error: {0}")] - ReadStr(std::io::Error), - #[error("Unknown error: {0}")] - Other(anyhow::Error), -} diff --git a/one-shot/src/lib.rs b/one-shot/src/lib.rs deleted file mode 100644 index 0011b77..0000000 --- a/one-shot/src/lib.rs +++ /dev/null @@ -1,17 +0,0 @@ -use once_cell::sync::OnceCell; -use std::sync::Arc; - -use graphcast_sdk::graphcast_agent::GraphcastAgent; - -use crate::operator::RadioOperator; - -pub mod config; -pub mod operator; - -/// A global static (singleton) instance of GraphcastAgent. It is useful to ensure that we have only one GraphcastAgent -/// per Radio instance, so that we can keep track of state and more easily test our Radio application. -pub static RADIO_OPERATOR: OnceCell = OnceCell::new(); - -/// A global static (singleton) instance of GraphcastAgent. It is useful to ensure that we have only one GraphcastAgent -/// per Radio instance, so that we can keep track of state and more easily test our Radio application. -pub static GRAPHCAST_AGENT: OnceCell> = OnceCell::new(); diff --git a/one-shot/src/main.rs b/one-shot/src/main.rs deleted file mode 100644 index e26d2f3..0000000 --- a/one-shot/src/main.rs +++ /dev/null @@ -1,19 +0,0 @@ -use dotenv::dotenv; -use one_shot::{config::Config, operator::RadioOperator, RADIO_OPERATOR}; - -extern crate partial_application; - -#[tokio::main] -async fn main() { - dotenv().ok(); - // Parse basic configurations - let radio_config = Config::args(); - - // Initialization and pass in for static lifetime throughout the program - let radio_operator = RadioOperator::new(&radio_config).await; - - _ = RADIO_OPERATOR.set(radio_operator); - - // Start radio operations - RADIO_OPERATOR.get().unwrap().run().await; -} diff --git a/one-shot/src/operator/mod.rs b/one-shot/src/operator/mod.rs deleted file mode 100644 index 961cb5e..0000000 --- a/one-shot/src/operator/mod.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::sync::Arc; -use tracing::{debug, warn}; - -use graphcast_sdk::{build_wallet, graphcast_agent::GraphcastAgent}; - -use crate::config::Config; -use crate::GRAPHCAST_AGENT; -pub mod operation; - -/// Radio operator contains all states needed for radio operations -#[allow(unused)] -pub struct RadioOperator { - config: Config, - graphcast_agent: Arc, -} - -impl RadioOperator { - /// Create a radio operator with radio configurations, persisted data, - /// graphcast agent, and control flow - pub async fn new(config: &Config) -> RadioOperator { - debug!("Initializing Radio operator"); - let _wallet = build_wallet( - config - .wallet_input() - .expect("Operator wallet input invalid"), - ) - .expect("Radio operator cannot build wallet"); - - debug!("Initializing Graphcast Agent"); - let (agent, _receiver) = - GraphcastAgent::new(config.to_graphcast_agent_config().await.unwrap()) - .await - .expect("Initialize Graphcast agent"); - let graphcast_agent = Arc::new(agent); - // Provide topics to Graphcast agent - let topics = vec![config.identifier.clone()]; - debug!( - topics = tracing::field::debug(&topics), - "Found content topics for subscription", - ); - graphcast_agent.update_content_topics(topics.clone()).await; - debug!("Set global static instance of graphcast_agent"); - _ = GRAPHCAST_AGENT.set(graphcast_agent.clone()); - - RadioOperator { - config: config.clone(), - graphcast_agent, - } - } - - pub fn graphcast_agent(&self) -> &GraphcastAgent { - &self.graphcast_agent - } - - /// radio continuously attempt to send message until success - pub async fn run(&'static self) { - let mut res = self.gossip_one_shot().await; - while let Err(e) = res { - warn!(err = tracing::field::debug(&e), "Failed to gossip, repeat"); - res = self.gossip_one_shot().await; - } - } -} diff --git a/one-shot/src/operator/operation.rs b/one-shot/src/operator/operation.rs deleted file mode 100644 index a2061ff..0000000 --- a/one-shot/src/operator/operation.rs +++ /dev/null @@ -1,43 +0,0 @@ -use chrono::Utc; -use tracing::{error, info}; - -use graphcast_sdk::networks::NetworkName; - -use crate::operator::RadioOperator; -use subgraph_radio::{messages::upgrade::VersionUpgradeMessage, OperationError}; - -impl RadioOperator { - pub async fn gossip_one_shot(&self) -> Result { - // configure radio config to parse in a subcommand for the radio payload message? - let identifier = self.config.identifier.clone(); - let new_hash = self.config.new_hash.clone(); - let subgraph_id = self.config.subgraph_id.clone(); - let time = Utc::now().timestamp(); - let network = self.config.index_network(); - let migrate_time = self.config.migration_time; - let graph_account = self.config.graph_account.clone(); - let radio_message = VersionUpgradeMessage::build( - identifier.clone(), - new_hash.clone(), - time, - subgraph_id, - NetworkName::from_string(network), - migrate_time, - graph_account, - ); - match self - .graphcast_agent - .send_message(&identifier, radio_message, time) - .await - { - Ok(msg_id) => { - info!(msg_id, "Sent message"); - Ok(msg_id) - } - Err(e) => { - error!(err = tracing::field::debug(&e), "Failed to send message"); - Err(OperationError::Agent(e)) - } - } - } -} From f11da13db7d54899b61ccf0bfe8eba3961d635b5 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Tue, 1 Aug 2023 12:22:47 -0500 Subject: [PATCH 2/2] fix: effective ctrl+c shutdown --- subgraph-radio/src/metrics/mod.rs | 7 +++++-- subgraph-radio/src/operator/mod.rs | 6 +++++- subgraph-radio/src/server/mod.rs | 9 ++++----- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/subgraph-radio/src/metrics/mod.rs b/subgraph-radio/src/metrics/mod.rs index fe5cc43..ee33a14 100644 --- a/subgraph-radio/src/metrics/mod.rs +++ b/subgraph-radio/src/metrics/mod.rs @@ -5,6 +5,8 @@ use axum::Router; use once_cell::sync::Lazy; use prometheus::{core::Collector, Registry}; use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts}; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; use std::{net::SocketAddr, str::FromStr}; use tracing::{debug, info}; @@ -124,7 +126,7 @@ pub async fn get_metrics() -> (StatusCode, String) { /// Run the API server as well as Prometheus and a traffic generator #[allow(dead_code)] -pub async fn handle_serve_metrics(host: String, port: u16) { +pub async fn handle_serve_metrics(host: String, port: u16, _running_program: Arc) { // Set up the exporter to collect metrics let _exporter = global_metrics_exporter(); @@ -139,6 +141,7 @@ pub async fn handle_serve_metrics(host: String, port: u16) { server .serve(app.into_make_service()) + // .with_graceful_shutdown(shutdown_signal(running_program)) .await - .expect("Error starting example API server"); + .expect("Error starting Prometheus metrics service"); } diff --git a/subgraph-radio/src/operator/mod.rs b/subgraph-radio/src/operator/mod.rs index 55553ef..dbb6193 100644 --- a/subgraph-radio/src/operator/mod.rs +++ b/subgraph-radio/src/operator/mod.rs @@ -214,7 +214,11 @@ impl RadioOperator { // Set up Prometheus metrics url if configured if let Some(port) = self.config.metrics_port { debug!("Initializing metrics port"); - tokio::spawn(handle_serve_metrics(self.config.metrics_host.clone(), port)); + tokio::spawn(handle_serve_metrics( + self.config.metrics_host.clone(), + port, + self.control_flow.running.clone(), + )); } // Provide generated topics to Graphcast agent diff --git a/subgraph-radio/src/server/mod.rs b/subgraph-radio/src/server/mod.rs index b61ab29..d8b0138 100644 --- a/subgraph-radio/src/server/mod.rs +++ b/subgraph-radio/src/server/mod.rs @@ -11,7 +11,6 @@ use crate::{ model::{build_schema, SubgraphRadioContext}, routes::{graphql_handler, graphql_playground, health}, }, - shutdown_signal, state::PersistedState, }; @@ -25,7 +24,7 @@ pub mod routes; pub async fn run_server( config: Config, persisted_state: &'static PersistedState, - running_program: Arc, + _running_program: Arc, ) { if config.server_port().is_none() { return; @@ -50,11 +49,11 @@ pub async fn run_server( info!( host = tracing::field::debug(config.server_host()), - port, "Bind and serve" + port, "Bind port to service" ); Server::bind(&addr) .serve(app.into_make_service()) - .with_graceful_shutdown(shutdown_signal(running_program)) + // .with_graceful_shutdown(shutdown_signal(running_program)) .await - .unwrap(); + .expect("Error starting API service"); }