From c91dd8b4b4396e8b2e45796dfe751c35d6618380 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 14 Jul 2023 13:04:41 -0500 Subject: [PATCH] feat: rescope poi-radio --- .github/workflows/docker.yml | 2 +- .github/workflows/gen-binaries.yml | 8 +- Cargo.lock | 106 ++--- Cargo.toml | 2 +- Dockerfile | 14 +- README.md | 18 +- contributing.md | 4 +- one-shot/Cargo.toml | 4 +- one-shot/src/config.rs | 2 +- one-shot/src/operator/operation.rs | 2 +- poi-radio/src/messages/poi.rs | 148 ------- poi-radio/src/operator/operation.rs | 394 ------------------ scripts/release.sh | 4 +- {poi-radio => subgraph-radio}/Cargo.toml | 8 +- .../benches/attestations.rs | 20 +- .../benches/gossips.rs | 6 +- {poi-radio => subgraph-radio}/src/config.rs | 8 +- .../src/graphql/mod.rs | 0 .../query_block_hash_from_number.graphql | 0 .../graphql/query_indexing_statuses.graphql | 0 .../graphql/query_proof_of_indexing.graphql | 0 .../src/graphql/schema_graph_node.graphql | 0 {poi-radio => subgraph-radio}/src/lib.rs | 2 +- {poi-radio => subgraph-radio}/src/main.rs | 2 +- .../src/messages/mod.rs | 0 subgraph-radio/src/messages/poi.rs | 382 +++++++++++++++++ .../src/messages/upgrade.rs | 14 + .../src/metrics/mod.rs | 36 +- .../src/operator/attestation.rs | 132 +++--- .../src/operator/callbook.rs | 0 .../src/operator/mod.rs | 34 +- .../src/operator/notifier.rs | 0 subgraph-radio/src/operator/operation.rs | 192 +++++++++ .../src/server/mod.rs | 4 +- .../src/server/model/mod.rs | 52 +-- .../src/server/routes/mod.rs | 8 +- {poi-radio => subgraph-radio}/src/state.rs | 68 +-- test-runner/Cargo.toml | 6 +- test-runner/src/invalid_block_hash.rs | 6 +- test-runner/src/invalid_nonce.rs | 6 +- test-runner/src/invalid_payload.rs | 6 +- test-runner/src/invalid_sender.rs | 6 +- test-runner/src/main.rs | 2 +- test-runner/src/message_handling.rs | 16 +- test-runner/src/poi_divergent.rs | 2 +- test-runner/src/poi_match.rs | 2 +- test-runner/src/topics.rs | 16 +- test-sender/Cargo.toml | 4 +- test-sender/src/main.rs | 4 +- test-utils/Cargo.toml | 6 +- test-utils/src/config.rs | 4 +- test-utils/src/dummy_msg.rs | 2 +- test-utils/src/lib.rs | 8 +- 53 files changed, 905 insertions(+), 867 deletions(-) delete mode 100644 poi-radio/src/messages/poi.rs delete mode 100644 poi-radio/src/operator/operation.rs rename {poi-radio => subgraph-radio}/Cargo.toml (89%) rename {poi-radio => subgraph-radio}/benches/attestations.rs (93%) rename {poi-radio => subgraph-radio}/benches/gossips.rs (95%) rename {poi-radio => subgraph-radio}/src/config.rs (98%) rename {poi-radio => subgraph-radio}/src/graphql/mod.rs (100%) rename {poi-radio => subgraph-radio}/src/graphql/query_block_hash_from_number.graphql (100%) rename {poi-radio => subgraph-radio}/src/graphql/query_indexing_statuses.graphql (100%) rename {poi-radio => subgraph-radio}/src/graphql/query_proof_of_indexing.graphql (100%) rename {poi-radio => subgraph-radio}/src/graphql/schema_graph_node.graphql (100%) rename {poi-radio => subgraph-radio}/src/lib.rs (99%) rename {poi-radio => subgraph-radio}/src/main.rs (86%) rename {poi-radio => subgraph-radio}/src/messages/mod.rs (100%) create mode 100644 subgraph-radio/src/messages/poi.rs rename {poi-radio => subgraph-radio}/src/messages/upgrade.rs (87%) rename {poi-radio => subgraph-radio}/src/metrics/mod.rs (84%) rename {poi-radio => subgraph-radio}/src/operator/attestation.rs (91%) rename {poi-radio => subgraph-radio}/src/operator/callbook.rs (100%) rename {poi-radio => subgraph-radio}/src/operator/mod.rs (92%) rename {poi-radio => subgraph-radio}/src/operator/notifier.rs (100%) create mode 100644 subgraph-radio/src/operator/operation.rs rename {poi-radio => subgraph-radio}/src/server/mod.rs (92%) rename {poi-radio => subgraph-radio}/src/server/model/mod.rs (88%) rename {poi-radio => subgraph-radio}/src/server/routes/mod.rs (87%) rename {poi-radio => subgraph-radio}/src/state.rs (89%) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index f2a21c8..7d60962 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -9,7 +9,7 @@ on: env: REGISTRY: ghcr.io - BASE_NAME: graphops/poi-radio + BASE_NAME: graphops/subgraph-radio jobs: build-linux: diff --git a/.github/workflows/gen-binaries.yml b/.github/workflows/gen-binaries.yml index ad41ce8..ae58573 100644 --- a/.github/workflows/gen-binaries.yml +++ b/.github/workflows/gen-binaries.yml @@ -33,8 +33,8 @@ jobs: GITHUB_TOKEN: ${{ secrets.RELEASE_TOKEN }} with: upload_url: ${{ github.event.release.upload_url }} - asset_path: ./target/release/poi-radio - asset_name: poi-radio-${{ github.event.release.tag_name }}-ubuntu + asset_path: ./target/release/subgraph-radio + asset_name: subgraph-radio-${{ github.event.release.tag_name }}-ubuntu asset_content_type: binary/octet-stream build-macos: runs-on: macos-latest @@ -56,6 +56,6 @@ jobs: GITHUB_TOKEN: ${{ secrets.RELEASE_TOKEN }} with: upload_url: ${{ github.event.release.upload_url }} - asset_path: ./target/release/poi-radio - asset_name: poi-radio-${{ github.event.release.tag_name }}-macos + asset_path: ./target/release/subgraph-radio + asset_name: subgraph-radio-${{ github.event.release.tag_name }}-macos asset_content_type: binary/octet-stream diff --git a/Cargo.lock b/Cargo.lock index af6e98a..aeb1db5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3428,7 +3428,6 @@ dependencies = [ "once_cell", "opentelemetry", "partial_application", - "poi-radio", "prometheus", "prost", "rand 0.8.5", @@ -3439,6 +3438,7 @@ dependencies = [ "serde_derive", "serde_json", "sha3", + "subgraph-radio", "thiserror", "tokio", "tower-http 0.4.1", @@ -3879,55 +3879,6 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "poi-radio" -version = "0.3.6" -dependencies = [ - "anyhow", - "async-graphql", - "async-graphql-axum", - "async-trait", - "autometrics", - "axum 0.5.17", - "cargo-husky", - "chrono", - "clap", - "criterion", - "derive-getters", - "dotenv", - "ethers", - "ethers-contract", - "ethers-core 2.0.7", - "ethers-derive-eip712", - "graphcast-sdk", - "graphql_client 0.9.0", - "hex", - "metrics", - "metrics-exporter-prometheus", - "num-bigint", - "num-traits", - "once_cell", - "opentelemetry", - "partial_application", - "prometheus", - "prost", - "rand 0.8.5", - "regex", - "reqwest", - "secp256k1 0.25.0", - "serde", - "serde_derive", - "serde_json", - "sha3", - "thiserror", - "tokio", - "tower-http 0.4.1", - "tracing", - "tracing-opentelemetry", - "tracing-subscriber", - "wiremock", -] - [[package]] name = "polyval" version = "0.6.1" @@ -5223,6 +5174,55 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "subgraph-radio" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-graphql", + "async-graphql-axum", + "async-trait", + "autometrics", + "axum 0.5.17", + "cargo-husky", + "chrono", + "clap", + "criterion", + "derive-getters", + "dotenv", + "ethers", + "ethers-contract", + "ethers-core 2.0.7", + "ethers-derive-eip712", + "graphcast-sdk", + "graphql_client 0.9.0", + "hex", + "metrics", + "metrics-exporter-prometheus", + "num-bigint", + "num-traits", + "once_cell", + "opentelemetry", + "partial_application", + "prometheus", + "prost", + "rand 0.8.5", + "regex", + "reqwest", + "secp256k1 0.25.0", + "serde", + "serde_derive", + "serde_json", + "sha3", + "thiserror", + "tokio", + "tower-http 0.4.1", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", + "wiremock", +] + [[package]] name = "subtle" version = "2.5.0" @@ -5404,10 +5404,10 @@ dependencies = [ "axum 0.5.17", "chrono", "graphcast-sdk", - "poi-radio", "rand 0.8.5", "serde", "serde_json", + "subgraph-radio", "test-utils", "tokio", "tower", @@ -5426,11 +5426,11 @@ dependencies = [ "clap", "ethers", "graphcast-sdk", - "poi-radio", "prost", "serde", "serde_derive", "serde_json", + "subgraph-radio", "test-utils", "tokio", "tower", @@ -5453,12 +5453,12 @@ dependencies = [ "ethers-core 2.0.7", "ethers-derive-eip712", "graphcast-sdk", - "poi-radio", "prost", "rand 0.8.5", "serde", "serde_derive", "serde_json", + "subgraph-radio", "tokio", "tower", "tower-http 0.4.1", diff --git a/Cargo.toml b/Cargo.toml index ba8ea16..9d13d76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["poi-radio", "test-sender", "test-utils", "test-runner", "one-shot"] +members = ["subgraph-radio", "test-sender", "test-utils", "test-runner", "one-shot"] resolver = "2" [profile.dev] diff --git a/Dockerfile b/Dockerfile index ccef349..f9f1eb8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,20 +14,20 @@ RUN apt-get update \ RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates -COPY . /poi-radio -WORKDIR /poi-radio +COPY . /subgraph-radio +WORKDIR /subgraph-radio RUN sh install-golang.sh ENV PATH=$PATH:/usr/local/go/bin -RUN cargo build --release -p poi-radio +RUN cargo build --release -p subgraph-radio FROM alpine:3.17.3 as alpine RUN set -x \ && apk update \ && apk add --no-cache upx dumb-init -COPY --from=build-image /poi-radio/target/release/poi-radio /poi-radio/target/release/poi-radio -RUN upx --overlay=strip --best /poi-radio/target/release/poi-radio +COPY --from=build-image /subgraph-radio/target/release/subgraph-radio /subgraph-radio/target/release/subgraph-radio +RUN upx --overlay=strip --best /subgraph-radio/target/release/subgraph-radio FROM gcr.io/distroless/cc AS runtime COPY --from=build-image /usr/share/zoneinfo /usr/share/zoneinfo @@ -35,5 +35,5 @@ COPY --from=build-image /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=build-image /etc/passwd /etc/passwd COPY --from=build-image /etc/group /etc/group COPY --from=alpine /usr/bin/dumb-init /usr/bin/dumb-init -COPY --from=alpine "/poi-radio/target/release/poi-radio" "/usr/local/bin/poi-radio" -ENTRYPOINT [ "/usr/bin/dumb-init", "--", "/usr/local/bin/poi-radio" ] +COPY --from=alpine "/subgraph-radio/target/release/subgraph-radio" "/usr/local/bin/subgraph-radio" +ENTRYPOINT [ "/usr/bin/dumb-init", "--", "/usr/local/bin/subgraph-radio" ] diff --git a/README.md b/README.md index 0395889..88ce371 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,23 @@ -# POI Radio +# Subgraph Radio -[![Docs](https://img.shields.io/badge/docs-latest-brightgreen.svg)](https://docs.graphops.xyz/graphcast/radios/poi-radio) -[![crates.io](https://img.shields.io/crates/v/poi-radio.svg)](https://crates.io/crates/poi-radio) +[![Docs](https://img.shields.io/badge/docs-latest-brightgreen.svg)](https://docs.graphops.xyz/graphcast/radios/subgraph-radio) +[![crates.io](https://img.shields.io/crates/v/subgraph-radio.svg)](https://crates.io/crates/subgraph-radio) ## Introduction +This is a Graphcast Radio focused on sending gossips about particular subgraphs on a P2P network. The available message types are Public Proof of Indexing (PoI) messages from an indexer, or a version upgrade annoucement message from a subgraph owner. + +Reaching Public PoI consensus and ensuring data availability during subgraph upgrades is critical to the indexing service. Both messages should find value from indexers, subgraph owners, and ultimately data consumers. + +[Documentation](https://docs.graphops.xyz/graphcast/radios/subgraph-radio) | [Packages](https://github.com/graphops/subgraph-radio/pkgs/container/subgraph-radio) | [Chat](https://discord.com/channels/438038660412342282/1087503343410225152) + +### Public PoI message + The key requirement for an Indexer to earn indexing rewards is to submit a valid Proof of Indexing (POI) promptly. The importance of valid POIs causes many Indexers to alert each other on subgraph health in community discussions. To alleviate the Indexer workload, this Radio utilized Graphcast SDK to exchange and aggregate POI along with a list of Indexer on-chain identities that can be used to trace reputations. With the pubsub pattern, the Indexer can get notified as soon as other indexers (with majority of stake) publish a POI different from the local POI. -[Documentation](https://docs.graphops.xyz/graphcast/radios/poi-radio) | [Packages](https://github.com/graphops/poi-radio/pkgs/container/poi-radio) | [Chat](https://discord.com/channels/438038660412342282/1087503343410225152) +### Version Upgrade message + +When developers publish a new version (subgraph deployment) to their subgraph, data service instability may occur while their API queries the pre-existing version. Indexers may require some time to sync a subgraph to the chainhead after they have stopped syncing the previous deployment. To decrease the upgrade friction, developers can send a message before publishing the subgraph, including the old deployment hash, new deployment hash, matching subgraph id, the time they would like to publish the version. Indexers who is running the subgraph radio with notification methods configured should get notified; later on this radio can optionally automate the deployment process on graph node, but it is still at the subgraph developers' discretion to await for the indexers to sync upto chainhead, in which point they can publish the staged version without disrupting API usage. ## 🧪 Testing diff --git a/contributing.md b/contributing.md index b703d1a..d8a835b 100644 --- a/contributing.md +++ b/contributing.md @@ -1,7 +1,7 @@ -# Contributing to the POI-Crosschecker Radio +# Contributing to the Subgraph Radio -Welcome to Graphcast POI crosschecker Radio! Thanks a ton for your interest in contributing. +Welcome to Graphcast Subgraph Radio! Thanks a ton for your interest in contributing. If you run into any problems feel free to create an issue. PRs are much appreciated for simple things. If it's something more complex we'd appreciate having a quick chat in GitHub Issues or the Graph Discord server. diff --git a/one-shot/Cargo.toml b/one-shot/Cargo.toml index 9fae187..2c96239 100644 --- a/one-shot/Cargo.toml +++ b/one-shot/Cargo.toml @@ -5,13 +5,13 @@ edition = "2021" authors = ["GraphOps (axiomatic-aardvark, hopeyen)"] description = "One-shot messaging using Graphcast SDK (can be separated to a different repo)" license = "Apache-2.0" -repository = "https://github.com/graphops/poi-radio" +repository = "https://github.com/graphops/subgraph-radio" keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"] categories = ["network-programming", "web-programming::http-client"] [dependencies] graphcast-sdk = "0.4.0" -poi-radio = { path = "../poi-radio" } +subgraph-radio = { path = "../subgraph-radio" } prost = "0.11" once_cell = "1.17" chrono = "0.4" diff --git a/one-shot/src/config.rs b/one-shot/src/config.rs index 520db12..cc3740d 100644 --- a/one-shot/src/config.rs +++ b/one-shot/src/config.rs @@ -270,7 +270,7 @@ pub struct Config { long, value_name = "RADIO_NAME", env = "RADIO_NAME", - default_value = "poi-radio" + default_value = "subgraph-radio" )] pub radio_name: String, #[clap(long, value_name = "FILTER_PROTOCOL", env = "ENABLE_FILTER_PROTOCOL")] diff --git a/one-shot/src/operator/operation.rs b/one-shot/src/operator/operation.rs index a88de85..a2061ff 100644 --- a/one-shot/src/operator/operation.rs +++ b/one-shot/src/operator/operation.rs @@ -4,7 +4,7 @@ use tracing::{error, info}; use graphcast_sdk::networks::NetworkName; use crate::operator::RadioOperator; -use poi_radio::{messages::upgrade::VersionUpgradeMessage, OperationError}; +use subgraph_radio::{messages::upgrade::VersionUpgradeMessage, OperationError}; impl RadioOperator { pub async fn gossip_one_shot(&self) -> Result { diff --git a/poi-radio/src/messages/poi.rs b/poi-radio/src/messages/poi.rs deleted file mode 100644 index 9b77548..0000000 --- a/poi-radio/src/messages/poi.rs +++ /dev/null @@ -1,148 +0,0 @@ -use async_graphql::SimpleObject; -use ethers_contract::EthAbiType; -use ethers_core::types::transaction::eip712::Eip712; -use ethers_derive_eip712::*; -use graphcast_sdk::{ - graphcast_agent::message_typing::{BuildMessageError, GraphcastMessage}, - graphql::client_graph_node::query_graph_node_network_block_hash, - networks::NetworkName, -}; -use prost::Message; -use serde::{Deserialize, Serialize}; -use tracing::trace; - -#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, PartialEq, SimpleObject)] -#[eip712( - name = "PublicPoiMessage", - version = "0", - chain_id = 1, - verifying_contract = "0xc944e90c64b2c07662a292be6244bdf05cda44a7" -)] -pub struct PublicPoiMessage { - #[prost(string, tag = "1")] - pub identifier: String, - #[prost(string, tag = "2")] - pub content: String, - //TODO: see if timestamp that comes with waku message can be used - /// nonce cached to check against the next incoming message - #[prost(int64, tag = "3")] - pub nonce: i64, - /// blockchain relevant to the message - #[prost(string, tag = "4")] - pub network: String, - /// block relevant to the message - #[prost(uint64, tag = "5")] - pub block_number: u64, - /// block hash generated from the block number - #[prost(string, tag = "6")] - pub block_hash: String, - /// Graph account sender - #[prost(string, tag = "7")] - pub graph_account: String, -} - -impl PublicPoiMessage { - pub fn new( - identifier: String, - content: String, - nonce: i64, - network: String, - block_number: u64, - block_hash: String, - graph_account: String, - ) -> Self { - PublicPoiMessage { - identifier, - content, - nonce, - network, - block_number, - block_hash, - graph_account, - } - } - - pub fn build( - identifier: String, - content: String, - nonce: i64, - network: NetworkName, - block_number: u64, - block_hash: String, - graph_account: String, - ) -> Self { - PublicPoiMessage::new( - identifier, - content, - nonce, - network.to_string(), - block_number, - block_hash, - graph_account, - ) - } - - pub fn payload_content(&self) -> String { - self.content.clone() - } - - // Check for the valid hash between local graph node and gossip - pub async fn valid_hash(&self, graph_node_endpoint: &str) -> Result<&Self, BuildMessageError> { - let block_hash: String = query_graph_node_network_block_hash( - graph_node_endpoint, - &self.network, - self.block_number, - ) - .await - .map_err(BuildMessageError::FieldDerivations)?; - - trace!( - network = tracing::field::debug(self.network.clone()), - block_number = self.block_number, - block_hash = block_hash, - "Queried block hash from graph node", - ); - - if self.block_hash == block_hash { - Ok(self) - } else { - Err(BuildMessageError::InvalidFields(anyhow::anyhow!( - "Message hash ({}) differ from trusted provider response ({}), drop message", - self.block_hash, - block_hash - ))) - } - } - - /// Check duplicated fields: payload message has duplicated fields with GraphcastMessage, the values must be the same - pub fn valid_outer(&self, outer: &GraphcastMessage) -> Result<&Self, BuildMessageError> { - if self.nonce == outer.nonce - && self.graph_account == outer.graph_account - && self.identifier == outer.identifier - { - Ok(self) - } else { - Err(BuildMessageError::InvalidFields(anyhow::anyhow!( - "Radio message wrapped by inconsistent GraphcastMessage: {:#?} <- {:#?}\nnonce check: {:#?}\naccount check: {:#?}\nidentifier check: {:#?}", - &self, - &outer, - self.nonce == outer.nonce, - self.graph_account == outer.graph_account, - self.identifier == outer.identifier, - ))) - } - } - - /// Make sure all messages stored are valid - pub async fn validity_check( - &self, - gc_msg: &GraphcastMessage, - graph_node_endpoint: &str, - ) -> Result<&Self, BuildMessageError> { - let _ = self - .valid_hash(graph_node_endpoint) - .await - .map(|radio_msg| radio_msg.valid_outer(gc_msg))??; - Ok(self) - } -} diff --git a/poi-radio/src/operator/operation.rs b/poi-radio/src/operator/operation.rs deleted file mode 100644 index a9d9bf1..0000000 --- a/poi-radio/src/operator/operation.rs +++ /dev/null @@ -1,394 +0,0 @@ -use autometrics::autometrics; -use chrono::Utc; -use graphcast_sdk::callbook::CallBook; -use std::cmp::max; -use std::collections::HashMap; -use std::sync::{Arc, Mutex as SyncMutex}; -use tracing::{debug, error, trace, warn}; - -use graphcast_sdk::{ - determine_message_block, - graphcast_agent::{ - message_typing::{BuildMessageError, GraphcastMessage}, - GraphcastAgent, GraphcastAgentError, - }, - networks::NetworkName, - BlockPointer, NetworkBlockError, NetworkPointer, -}; - -use crate::messages::poi::PublicPoiMessage; -use crate::operator::attestation::process_ppoi_message; -use crate::{ - metrics::CACHED_MESSAGES, - operator::{ - attestation::{ - compare_attestations, local_comparison_point, save_local_attestation, Attestation, - ComparisonResult, - }, - callbook::CallBookRadioExtensions, - RadioOperator, - }, - OperationError, GRAPHCAST_AGENT, -}; - -/// Determine the parameters for messages to send and compare -#[autometrics(track_concurrency)] -pub async fn gossip_set_up( - id: String, - network_chainhead_blocks: &HashMap, - subgraph_network_latest_blocks: &HashMap, -) -> Result<(NetworkName, BlockPointer, u64), BuildMessageError> { - // Get the indexing network of the deployment - // and update the NETWORK message block - let (network_name, latest_block) = match subgraph_network_latest_blocks.get(&id.clone()) { - Some(network_block) => ( - NetworkName::from_string(&network_block.network.clone()), - network_block.block.clone(), - ), - None => { - let err_msg = format!("Could not query the subgraph's indexing network, check Graph node's indexing statuses of subgraph deployment {}", id.clone()); - warn!( - err = tracing::field::debug(&err_msg), - "Failed to build message" - ); - return Err(BuildMessageError::Network(NetworkBlockError::FailedStatus( - err_msg, - ))); - } - }; - - let message_block = match determine_message_block(network_chainhead_blocks, network_name) { - Ok(block) => block, - Err(e) => return Err(BuildMessageError::Network(e)), - }; - - debug!( - deployment_hash = tracing::field::debug(&id), - network = tracing::field::debug(&network_name), - message_block = message_block, - latest_block = latest_block.number, - message_countdown_blocks = max(0, message_block as i64 - latest_block.number as i64), - "Deployment status", - ); - - Ok((network_name, latest_block, message_block)) -} - -/// Construct the message and send it to Graphcast network -#[autometrics(track_concurrency)] -pub async fn message_send( - id: String, - callbook: CallBook, - message_block: u64, - latest_block: BlockPointer, - network_name: NetworkName, - local_attestations: Arc>>>, - graphcast_agent: &GraphcastAgent, -) -> Result { - trace!( - message_block = message_block, - latest_block = latest_block.number, - "Check message send requirement", - ); - - // Deployment did not sync to message_block - if latest_block.number < message_block { - //TODO: fill in variant in SDK - let err_msg = format!( - "Did not send message for deployment {}: latest_block ({}) syncing status must catch up to the message block ({})", - id.clone(), - latest_block.number, message_block, - ); - trace!(err = err_msg, "Skip send",); - return Err(OperationError::SendTrigger(err_msg)); - }; - - // Message has already been sent - if local_attestations - .lock() - .unwrap() - .get(&id.clone()) - .and_then(|blocks| blocks.get(&message_block)) - .is_some() - { - let err_msg = format!( - "Repeated message for deployment {}, skip sending message for block: {}", - id.clone(), - message_block - ); - trace!(err = err_msg, "Skip send"); - return Err(OperationError::SkipDuplicate(err_msg)); - } - - let block_hash = match graphcast_agent - .callbook - .block_hash(&network_name.to_string(), message_block) - .await - { - Ok(hash) => hash, - Err(e) => { - let err_msg = format!("Failed to query graph node for the block hash: {e}"); - warn!(err = err_msg, "Failed to send message"); - return Err(OperationError::Query(e)); - } - }; - - match callbook - .query_poi( - id.clone(), - block_hash.clone(), - message_block.try_into().unwrap(), - ) - .await - { - Ok(content) => { - let nonce = Utc::now().timestamp(); - let block_hash = callbook - .block_hash(&network_name.to_string(), message_block) - .await - .map_err(OperationError::Query)?; - let radio_message = PublicPoiMessage::build( - id.clone(), - content.clone(), - nonce, - network_name, - message_block, - block_hash, - graphcast_agent.graphcast_identity.graph_account.clone(), - ); - match graphcast_agent - .send_message(&id, radio_message, nonce) - .await - { - Ok(msg_id) => { - save_local_attestation( - local_attestations.clone(), - content.clone(), - id.clone(), - message_block, - ); - trace!("save local attestations: {:#?}", local_attestations); - Ok(msg_id) - } - Err(e) => { - error!(err = tracing::field::debug(&e), "Failed to send message"); - Err(OperationError::Agent(e)) - } - } - } - Err(e) => { - error!( - err = tracing::field::debug(&e), - "Failed to query message content" - ); - Err(OperationError::Agent( - GraphcastAgentError::QueryResponseError(e), - )) - } - } -} - -/// Compare validated messages -#[allow(clippy::too_many_arguments)] -#[autometrics(track_concurrency)] -pub async fn message_comparison( - id: String, - collect_window_duration: i64, - callbook: CallBook, - messages: Vec>, - local_attestations: HashMap>, -) -> Result { - let time = Utc::now().timestamp(); - - let (compare_block, collect_window_end) = match local_comparison_point( - &local_attestations, - &messages, - id.clone(), - collect_window_duration, - ) { - Some((block, window)) if time >= window => (block, window), - Some((compare_block, window)) => { - let err_msg = format!("Deployment {} comparison not triggered: collecting messages until time {}; currently {time}", id.clone(), window); - debug!(err = err_msg, "Collecting messages",); - return Err(OperationError::CompareTrigger( - id.clone(), - compare_block, - err_msg, - )); - } - _ => { - let err_msg = format!( - "Deployment {} comparison not triggered: no matching attestation to compare", - id.clone() - ); - debug!(err = err_msg, "No matching attestations",); - return Err(OperationError::CompareTrigger(id.clone(), 0, err_msg)); - } - }; - - let filter_msg: Vec> = messages - .iter() - .filter(|&m| m.payload.block_number == compare_block && m.nonce <= collect_window_end) - .cloned() - .collect(); - debug!( - deployment_hash = id, - time, - comparison_time = collect_window_end, - compare_block, - comparison_countdown_seconds = max(0, time - collect_window_end), - number_of_messages_matched_to_compare = filter_msg.len(), - "Comparison state", - ); - let remote_attestations_result = process_ppoi_message(filter_msg, &callbook).await; - let remote_attestations = match remote_attestations_result { - Ok(remote) => { - debug!(unique_remote_nPOIs = remote.len(), "Processed messages",); - remote - } - Err(err) => { - trace!( - err = tracing::field::debug(&err), - "An error occured while processing the messages", - ); - return Err(OperationError::Attestation(err)); - } - }; - let comparison_result = - compare_attestations(compare_block, remote_attestations, &local_attestations, &id); - - Ok(comparison_result) -} - -impl RadioOperator { - pub async fn gossip_poi( - &self, - identifiers: Vec, - network_chainhead_blocks: &HashMap, - subgraph_network_latest_blocks: &HashMap, - ) -> Vec> { - let mut send_handles = vec![]; - for id in identifiers.clone() { - /* Set up */ - let (network_name, latest_block, message_block) = if let Ok(params) = gossip_set_up( - id.clone(), - network_chainhead_blocks, - subgraph_network_latest_blocks, - ) - .await - { - params - } else { - let err_msg = "Failed to set up message parameters".to_string(); - warn!(id, err_msg, "Gossip POI failed"); - continue; - }; - - /* Send message */ - let id_cloned = id.clone(); - - let callbook = self.config.callbook(); - let local_attestations = self.persisted_state.local_attestations.clone(); - let send_handle = tokio::spawn(async move { - message_send( - id_cloned, - callbook, - message_block, - latest_block, - network_name, - Arc::clone(&local_attestations), - GRAPHCAST_AGENT.get().unwrap(), - ) - .await - }); - - send_handles.push(send_handle); - } - - let mut send_ops = vec![]; - for handle in send_handles { - if let Ok(s) = handle.await { - send_ops.push(s); - } - } - send_ops - } - - pub async fn compare_poi( - &self, - identifiers: Vec, - ) -> Vec> { - let mut compare_handles = vec![]; - - // Additional radio message check happens here since messages are synchronously stored to state cache in msg handler - let remote_messages = self - .state() - .valid_ppoi_messages(&self.config.graph_node_endpoint) - .await; - - for id in identifiers.clone() { - /* Set up */ - let collect_duration: i64 = self.config.collect_message_duration().to_owned(); - let id_cloned = id.clone(); - let callbook = self.config.callbook(); - let local_attestations = self.state().local_attestations(); - let filtered_msg = remote_messages - .iter() - .filter(|&m| m.identifier == id.clone()) - .cloned() - .collect(); - - let compare_handle = tokio::spawn(async move { - message_comparison( - id_cloned, - collect_duration, - callbook.clone(), - filtered_msg, - local_attestations, - ) - .await - }); - compare_handles.push(compare_handle); - } - - let mut compare_ops = vec![]; - for handle in compare_handles { - let res = handle.await; - if let Ok(s) = res { - // Skip clean up for comparisonResult for Error and buildFailed - match s { - Ok(r) => { - compare_ops.push(Ok(r.clone())); - - /* Clean up cache */ - // Only clear the ones matching identifier and block number equal or less - // Retain the msgs with a different identifier, or if their block number is greater - // clear_local_attestation(&mut local_attestations, r.deployment_hash(), r.block()); - self.persisted_state - .clean_local_attestations(r.block(), r.deployment_hash()); - self.persisted_state - .clean_remote_messages(r.block(), r.deployment_hash()); - CACHED_MESSAGES - .with_label_values(&[&r.deployment_hash()]) - .set(self.state().remote_messages().len().try_into().unwrap()); - } - // Err(OperationError::CompareTrigger(d, b, m)) => { - // trace!(m, "Compare handles"); - // self.persisted_state - // .clean_local_attestations(b, d.clone()); - // self.persisted_state - // .clean_remote_messages(b, d.clone()); - - // compare_ops.push(Err(OperationError::CompareTrigger(d, b, m).clone_with_inner())); - // } - Err(e) => { - trace!(err = tracing::field::debug(&e), "Compare handles"); - - compare_ops.push(Err(e.clone_with_inner())); - } - } - } - } - compare_ops - } -} diff --git a/scripts/release.sh b/scripts/release.sh index 5eb178f..9e3d009 100755 --- a/scripts/release.sh +++ b/scripts/release.sh @@ -2,7 +2,7 @@ set -e set -x -VERSION="$(cargo metadata --quiet --format-version 1 | jq -r '.packages[] | select(.name == "poi-radio") | .version')" +VERSION="$(cargo metadata --quiet --format-version 1 | jq -r '.packages[] | select(.name == "subgraph-radio") | .version')" if [[ -z "$VERSION" ]]; then echo "Usage: $0 " @@ -17,4 +17,4 @@ git-cliff -o CHANGELOG.md ) || true # Publish to crates.io -cargo publish -p poi-radio +cargo publish -p subgraph-radio diff --git a/poi-radio/Cargo.toml b/subgraph-radio/Cargo.toml similarity index 89% rename from poi-radio/Cargo.toml rename to subgraph-radio/Cargo.toml index f8a027d..f0e98e9 100644 --- a/poi-radio/Cargo.toml +++ b/subgraph-radio/Cargo.toml @@ -1,11 +1,11 @@ [package] -name = "poi-radio" -version = "0.3.6" +name = "subgraph-radio" +version = "0.1.0" edition = "2021" authors = ["GraphOps (axiomatic-aardvark, hopeyen)"] -description = "POI Radio monitors subgraph data integrity in real time using Graphcast SDK" +description = "Subgraph Radio monitors subgraph Public PoI and version upgrades messages in real time using Graphcast SDK" license = "Apache-2.0" -repository = "https://github.com/graphops/poi-radio" +repository = "https://github.com/graphops/subgraph-radio" keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"] categories = ["network-programming", "web-programming::http-client"] diff --git a/poi-radio/benches/attestations.rs b/subgraph-radio/benches/attestations.rs similarity index 93% rename from poi-radio/benches/attestations.rs rename to subgraph-radio/benches/attestations.rs index 99dd347..7541a77 100644 --- a/poi-radio/benches/attestations.rs +++ b/subgraph-radio/benches/attestations.rs @@ -5,13 +5,13 @@ extern crate criterion; mod attestation { use criterion::{black_box, criterion_group, Criterion}; use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage; - use poi_radio::{ + use std::collections::HashMap; + use subgraph_radio::{ messages::poi::PublicPoiMessage, operator::attestation::{ compare_attestations, local_comparison_point, update_blocks, Attestation, }, }; - use std::collections::HashMap; criterion_group!( benches, @@ -38,7 +38,7 @@ mod attestation { update_blocks( 42, &blocks, - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, "0xadd3".to_string(), 1, @@ -49,7 +49,7 @@ mod attestation { fn update_attestations_bench(c: &mut Criterion) { let attestation = black_box(Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![2], @@ -65,7 +65,7 @@ mod attestation { let mut local_blocks: HashMap = black_box(HashMap::new()); let remote = black_box(Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![0], @@ -73,7 +73,7 @@ mod attestation { black_box(remote_blocks.insert(42, vec![remote])); let local = black_box(Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, Vec::new(), vec![0], @@ -103,21 +103,21 @@ mod attestation { fn comparison_point_bench(c: &mut Criterion) { let mut local_blocks: HashMap = black_box(HashMap::new()); let attestation1 = black_box(Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![2], )); let attestation2 = black_box(Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa2".to_string()], vec![4], )); let attestation3 = black_box(Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 1.0, vec!["0xa3".to_string()], vec![6], @@ -147,7 +147,7 @@ mod attestation { graph_account: String::from("0x7e6528e4ce3055e829a32b5dc4450072bac28bc6"), payload: PublicPoiMessage { identifier: String::from("hash"), - content: String::from("awesome-npoi"), + content: String::from("awesome-ppoi"), nonce: 2, network: String::from("goerli"), block_number: 42, diff --git a/poi-radio/benches/gossips.rs b/subgraph-radio/benches/gossips.rs similarity index 95% rename from poi-radio/benches/gossips.rs rename to subgraph-radio/benches/gossips.rs index 50f454e..3f2b1d8 100644 --- a/poi-radio/benches/gossips.rs +++ b/subgraph-radio/benches/gossips.rs @@ -1,7 +1,7 @@ use criterion::async_executor::FuturesExecutor; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use graphcast_sdk::graphcast_agent::message_typing::IdentityValidation; -use poi_radio::operator::RadioOperator; +use subgraph_radio::operator::RadioOperator; use rand::{thread_rng, Rng}; use secp256k1::SecretKey; @@ -9,7 +9,7 @@ use std::collections::HashMap; use graphcast_sdk::networks::NetworkName; use graphcast_sdk::{BlockPointer, NetworkPointer}; -use poi_radio::config::Config; +use subgraph_radio::config::Config; fn gossip_poi_bench(c: &mut Criterion) { let identifiers = black_box(vec!["identifier1".to_string(), "identifier2".to_string()]); @@ -35,7 +35,7 @@ fn gossip_poi_bench(c: &mut Criterion) { topics: vec![String::from( "QmbaLc7fEfLGUioKWehRhq838rRzeR8cBoapNJWNSAZE8u", )], - coverage: poi_radio::config::CoverageLevel::Comprehensive, + coverage: subgraph_radio::config::CoverageLevel::Comprehensive, collect_message_duration: 10, waku_host: None, waku_port: None, diff --git a/poi-radio/src/config.rs b/subgraph-radio/src/config.rs similarity index 98% rename from poi-radio/src/config.rs rename to subgraph-radio/src/config.rs index 0a79c63..f94e527 100644 --- a/poi-radio/src/config.rs +++ b/subgraph-radio/src/config.rs @@ -31,8 +31,8 @@ pub enum CoverageLevel { #[derive(Clone, Debug, Parser, Serialize, Deserialize, Getters, Default)] #[clap( - name = "poi-radio", - about = "Cross-check POIs with other Indexer in real time", + name = "subgraph-radio", + about = "Cross-check POIs with other Indexer and received subgraph owner notifications in real time", author = "GraphOps" )] pub struct Config { @@ -276,7 +276,7 @@ pub struct Config { long, value_name = "RADIO_NAME", env = "RADIO_NAME", - default_value = "poi-radio" + default_value = "subgraph-radio" )] pub radio_name: String, #[clap(long, value_name = "FILTER_PROTOCOL", env = "ENABLE_FILTER_PROTOCOL")] @@ -395,7 +395,7 @@ impl Config { let state = PersistedState::load_cache(path); trace!( local_attestations = tracing::field::debug(&state.local_attestations()), - remote_messages = tracing::field::debug(&state.remote_messages()), + remote_ppoi_messages = tracing::field::debug(&state.remote_ppoi_messages()), state = tracing::field::debug(&state), "Loaded Persisted state cache" ); diff --git a/poi-radio/src/graphql/mod.rs b/subgraph-radio/src/graphql/mod.rs similarity index 100% rename from poi-radio/src/graphql/mod.rs rename to subgraph-radio/src/graphql/mod.rs diff --git a/poi-radio/src/graphql/query_block_hash_from_number.graphql b/subgraph-radio/src/graphql/query_block_hash_from_number.graphql similarity index 100% rename from poi-radio/src/graphql/query_block_hash_from_number.graphql rename to subgraph-radio/src/graphql/query_block_hash_from_number.graphql diff --git a/poi-radio/src/graphql/query_indexing_statuses.graphql b/subgraph-radio/src/graphql/query_indexing_statuses.graphql similarity index 100% rename from poi-radio/src/graphql/query_indexing_statuses.graphql rename to subgraph-radio/src/graphql/query_indexing_statuses.graphql diff --git a/poi-radio/src/graphql/query_proof_of_indexing.graphql b/subgraph-radio/src/graphql/query_proof_of_indexing.graphql similarity index 100% rename from poi-radio/src/graphql/query_proof_of_indexing.graphql rename to subgraph-radio/src/graphql/query_proof_of_indexing.graphql diff --git a/poi-radio/src/graphql/schema_graph_node.graphql b/subgraph-radio/src/graphql/schema_graph_node.graphql similarity index 100% rename from poi-radio/src/graphql/schema_graph_node.graphql rename to subgraph-radio/src/graphql/schema_graph_node.graphql diff --git a/poi-radio/src/lib.rs b/subgraph-radio/src/lib.rs similarity index 99% rename from poi-radio/src/lib.rs rename to subgraph-radio/src/lib.rs index 61d29a2..b8be013 100644 --- a/poi-radio/src/lib.rs +++ b/subgraph-radio/src/lib.rs @@ -40,7 +40,7 @@ pub static RADIO_OPERATOR: OnceCell = OnceCell::new(); pub static GRAPHCAST_AGENT: OnceCell> = OnceCell::new(); pub fn radio_name() -> &'static str { - "poi-radio" + "subgraph-radio" } /// Generate default topics that is operator address resolved to indexer address diff --git a/poi-radio/src/main.rs b/subgraph-radio/src/main.rs similarity index 86% rename from poi-radio/src/main.rs rename to subgraph-radio/src/main.rs index 0078b70..f367aa4 100644 --- a/poi-radio/src/main.rs +++ b/subgraph-radio/src/main.rs @@ -1,6 +1,6 @@ use dotenv::dotenv; -use poi_radio::{config::Config, operator::RadioOperator, RADIO_OPERATOR}; +use subgraph_radio::{config::Config, operator::RadioOperator, RADIO_OPERATOR}; extern crate partial_application; diff --git a/poi-radio/src/messages/mod.rs b/subgraph-radio/src/messages/mod.rs similarity index 100% rename from poi-radio/src/messages/mod.rs rename to subgraph-radio/src/messages/mod.rs diff --git a/subgraph-radio/src/messages/poi.rs b/subgraph-radio/src/messages/poi.rs new file mode 100644 index 0000000..88746e4 --- /dev/null +++ b/subgraph-radio/src/messages/poi.rs @@ -0,0 +1,382 @@ +use async_graphql::SimpleObject; +use autometrics::autometrics; +use chrono::Utc; +use ethers_contract::EthAbiType; +use ethers_core::types::transaction::eip712::Eip712; +use ethers_derive_eip712::*; +use graphcast_sdk::callbook::CallBook; +use graphcast_sdk::{ + graphcast_agent::message_typing::{BuildMessageError, GraphcastMessage}, + graphql::client_graph_node::query_graph_node_network_block_hash, + networks::NetworkName, +}; +use prost::Message; +use serde::{Deserialize, Serialize}; + +use std::cmp::max; +use std::collections::HashMap; +use std::sync::{Arc, Mutex as SyncMutex}; +use tracing::{debug, error, trace, warn}; + +use graphcast_sdk::{ + graphcast_agent::{GraphcastAgent, GraphcastAgentError}, + BlockPointer, +}; + +use crate::{ + metrics::CACHED_MESSAGES, + operator::{ + attestation::{ + compare_attestations, local_comparison_point, save_local_attestation, Attestation, + ComparisonResult, + }, + callbook::CallBookRadioExtensions, + }, + OperationError, +}; +use crate::{operator::attestation::process_ppoi_message, state::PersistedState}; + +#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, PartialEq, SimpleObject)] +#[eip712( + name = "PublicPoiMessage", + version = "0", + chain_id = 1, + verifying_contract = "0xc944e90c64b2c07662a292be6244bdf05cda44a7" +)] +pub struct PublicPoiMessage { + #[prost(string, tag = "1")] + pub identifier: String, + #[prost(string, tag = "2")] + pub content: String, + //TODO: see if timestamp that comes with waku message can be used + /// nonce cached to check against the next incoming message + #[prost(int64, tag = "3")] + pub nonce: i64, + /// blockchain relevant to the message + #[prost(string, tag = "4")] + pub network: String, + /// block relevant to the message + #[prost(uint64, tag = "5")] + pub block_number: u64, + /// block hash generated from the block number + #[prost(string, tag = "6")] + pub block_hash: String, + /// Graph account sender + #[prost(string, tag = "7")] + pub graph_account: String, +} + +impl PublicPoiMessage { + pub fn new( + identifier: String, + content: String, + nonce: i64, + network: String, + block_number: u64, + block_hash: String, + graph_account: String, + ) -> Self { + PublicPoiMessage { + identifier, + content, + nonce, + network, + block_number, + block_hash, + graph_account, + } + } + + pub fn build( + identifier: String, + content: String, + nonce: i64, + network: NetworkName, + block_number: u64, + block_hash: String, + graph_account: String, + ) -> Self { + PublicPoiMessage::new( + identifier, + content, + nonce, + network.to_string(), + block_number, + block_hash, + graph_account, + ) + } + + pub fn payload_content(&self) -> String { + self.content.clone() + } + + // Check for the valid hash between local graph node and gossip + pub async fn valid_hash(&self, graph_node_endpoint: &str) -> Result<&Self, BuildMessageError> { + let block_hash: String = query_graph_node_network_block_hash( + graph_node_endpoint, + &self.network, + self.block_number, + ) + .await + .map_err(BuildMessageError::FieldDerivations)?; + + trace!( + network = tracing::field::debug(self.network.clone()), + block_number = self.block_number, + block_hash = block_hash, + "Queried block hash from graph node", + ); + + if self.block_hash == block_hash { + Ok(self) + } else { + Err(BuildMessageError::InvalidFields(anyhow::anyhow!( + "Message hash ({}) differ from trusted provider response ({}), drop message", + self.block_hash, + block_hash + ))) + } + } + + /// Check duplicated fields: payload message has duplicated fields with GraphcastMessage, the values must be the same + pub fn valid_outer(&self, outer: &GraphcastMessage) -> Result<&Self, BuildMessageError> { + if self.nonce == outer.nonce + && self.graph_account == outer.graph_account + && self.identifier == outer.identifier + { + Ok(self) + } else { + Err(BuildMessageError::InvalidFields(anyhow::anyhow!( + "Radio message wrapped by inconsistent GraphcastMessage: {:#?} <- {:#?}\nnonce check: {:#?}\naccount check: {:#?}\nidentifier check: {:#?}", + &self, + &outer, + self.nonce == outer.nonce, + self.graph_account == outer.graph_account, + self.identifier == outer.identifier, + ))) + } + } + + /// Make sure all messages stored are valid + pub async fn validity_check( + &self, + gc_msg: &GraphcastMessage, + graph_node_endpoint: &str, + ) -> Result<&Self, BuildMessageError> { + let _ = self + .valid_hash(graph_node_endpoint) + .await + .map(|radio_msg| radio_msg.valid_outer(gc_msg))??; + Ok(self) + } +} + +/// Construct the message and send it to Graphcast network +#[autometrics(track_concurrency)] +pub async fn send_poi_message( + id: String, + callbook: CallBook, + message_block: u64, + latest_block: BlockPointer, + network_name: NetworkName, + local_attestations: Arc>>>, + graphcast_agent: &GraphcastAgent, +) -> Result { + trace!( + message_block = message_block, + latest_block = latest_block.number, + "Check message send requirement", + ); + + // Deployment did not sync to message_block + if latest_block.number < message_block { + //TODO: fill in variant in SDK + let err_msg = format!( + "Did not send message for deployment {}: latest_block ({}) syncing status must catch up to the message block ({})", + id.clone(), + latest_block.number, message_block, + ); + trace!(err = err_msg, "Skip send",); + return Err(OperationError::SendTrigger(err_msg)); + }; + + // Message has already been sent + if local_attestations + .lock() + .unwrap() + .get(&id.clone()) + .and_then(|blocks| blocks.get(&message_block)) + .is_some() + { + let err_msg = format!( + "Repeated message for deployment {}, skip sending message for block: {}", + id.clone(), + message_block + ); + trace!(err = err_msg, "Skip send"); + return Err(OperationError::SkipDuplicate(err_msg)); + } + + let block_hash = match graphcast_agent + .callbook + .block_hash(&network_name.to_string(), message_block) + .await + { + Ok(hash) => hash, + Err(e) => { + let err_msg = format!("Failed to query graph node for the block hash: {e}"); + warn!(err = err_msg, "Failed to send message"); + return Err(OperationError::Query(e)); + } + }; + + match callbook + .query_poi( + id.clone(), + block_hash.clone(), + message_block.try_into().unwrap(), + ) + .await + { + Ok(content) => { + let nonce = Utc::now().timestamp(); + let block_hash = callbook + .block_hash(&network_name.to_string(), message_block) + .await + .map_err(OperationError::Query)?; + let radio_message = PublicPoiMessage::build( + id.clone(), + content.clone(), + nonce, + network_name, + message_block, + block_hash, + graphcast_agent.graphcast_identity.graph_account.clone(), + ); + match graphcast_agent + .send_message(&id, radio_message, nonce) + .await + { + Ok(msg_id) => { + save_local_attestation( + local_attestations.clone(), + content.clone(), + id.clone(), + message_block, + ); + trace!("save local attestations: {:#?}", local_attestations); + Ok(msg_id) + } + Err(e) => { + error!(err = tracing::field::debug(&e), "Failed to send message"); + Err(OperationError::Agent(e)) + } + } + } + Err(e) => { + error!( + err = tracing::field::debug(&e), + "Failed to query message content" + ); + Err(OperationError::Agent( + GraphcastAgentError::QueryResponseError(e), + )) + } + } +} + +/// If we want to have process_valid_message fn in the struct, then +/// we should update PersistedState::remote_ppoi_message standalone +/// from GraphcastMessage field such as nonce +#[autometrics(track_concurrency)] +pub async fn process_valid_message( + msg: GraphcastMessage, + state: &PersistedState, +) { + let identifier = msg.identifier.clone(); + + state.add_remote_ppoi_message(msg.clone()); + CACHED_MESSAGES.with_label_values(&[&identifier]).set( + state + .remote_ppoi_messages() + .iter() + .filter(|m: &&GraphcastMessage| m.identifier == identifier) + .collect::>>() + .len() + .try_into() + .unwrap(), + ); +} + +/// Compare validated messages +#[allow(clippy::too_many_arguments)] +#[autometrics(track_concurrency)] +pub async fn poi_message_comparison( + id: String, + collect_window_duration: i64, + callbook: CallBook, + messages: Vec>, + local_attestations: HashMap>, +) -> Result { + let time = Utc::now().timestamp(); + + let (compare_block, collect_window_end) = match local_comparison_point( + &local_attestations, + &messages, + id.clone(), + collect_window_duration, + ) { + Some((block, window)) if time >= window => (block, window), + Some((compare_block, window)) => { + let err_msg = format!("Deployment {} comparison not triggered: collecting messages until time {}; currently {time}", id.clone(), window); + debug!(err = err_msg, "Collecting messages",); + return Err(OperationError::CompareTrigger( + id.clone(), + compare_block, + err_msg, + )); + } + _ => { + let err_msg = format!( + "Deployment {} comparison not triggered: no matching attestation to compare", + id.clone() + ); + debug!(err = err_msg, "No matching attestations",); + return Err(OperationError::CompareTrigger(id.clone(), 0, err_msg)); + } + }; + + let filter_msg: Vec> = messages + .iter() + .filter(|&m| m.payload.block_number == compare_block && m.nonce <= collect_window_end) + .cloned() + .collect(); + debug!( + deployment_hash = id, + time, + comparison_time = collect_window_end, + compare_block, + comparison_countdown_seconds = max(0, time - collect_window_end), + number_of_messages_matched_to_compare = filter_msg.len(), + "Comparison state", + ); + let remote_attestations_result = process_ppoi_message(filter_msg, &callbook).await; + let remote_attestations = match remote_attestations_result { + Ok(remote) => { + debug!(unique_remote_pPOIs = remote.len(), "Processed messages",); + remote + } + Err(err) => { + trace!( + err = tracing::field::debug(&err), + "An error occured while processing the messages", + ); + return Err(OperationError::Attestation(err)); + } + }; + let comparison_result = + compare_attestations(compare_block, remote_attestations, &local_attestations, &id); + + Ok(comparison_result) +} diff --git a/poi-radio/src/messages/upgrade.rs b/subgraph-radio/src/messages/upgrade.rs similarity index 87% rename from poi-radio/src/messages/upgrade.rs rename to subgraph-radio/src/messages/upgrade.rs index 1983fef..92e8258 100644 --- a/poi-radio/src/messages/upgrade.rs +++ b/subgraph-radio/src/messages/upgrade.rs @@ -11,6 +11,8 @@ use graphcast_sdk::{ use prost::Message; use serde::{Deserialize, Serialize}; +use crate::operator::notifier::Notifier; + #[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, PartialEq, SimpleObject)] #[eip712( name = "VersionUpgradeMessage", @@ -125,4 +127,16 @@ impl VersionUpgradeMessage { .map(|radio_msg| radio_msg.valid_outer(gc_msg))??; Ok(self) } + + /// process the validated version upgrade messages, currently just notify + pub async fn process_valid_message(&self, notifier: &Notifier) { + // send notifications, later can optionally automate deployment + notifier.notify(format!( + "Subgraph owner for a deployment has shared version upgrade info:\nold deployment: {}\nnew deployment: {}\nplanned migrate time: {}\nnetwork: {}", + self.identifier, + self.new_hash, + self.migrate_time, + self.network + )).await; + } } diff --git a/poi-radio/src/metrics/mod.rs b/subgraph-radio/src/metrics/mod.rs similarity index 84% rename from poi-radio/src/metrics/mod.rs rename to subgraph-radio/src/metrics/mod.rs index eb54c6f..9896094 100644 --- a/poi-radio/src/metrics/mod.rs +++ b/subgraph-radio/src/metrics/mod.rs @@ -16,7 +16,7 @@ pub static VALIDATED_MESSAGES: Lazy = Lazy::new(|| { let m = IntCounterVec::new( Opts::new("validated_messages", "Number of validated messages") .namespace("graphcast") - .subsystem("poi_radio"), + .subsystem("subgraph_radio"), &["deployment"], ) .expect("Failed to create validated_messages counters"); @@ -31,7 +31,7 @@ pub static CACHED_MESSAGES: Lazy = Lazy::new(|| { let m = IntGaugeVec::new( Opts::new("cached_messages", "Number of messages in cache") .namespace("graphcast") - .subsystem("poi_radio"), + .subsystem("subgraph_radio"), &["deployment"], ) .expect("Failed to create cached_messages gauges"); @@ -48,7 +48,7 @@ pub static ACTIVE_INDEXERS: Lazy = Lazy::new(|| { "Number of indexers actively crosschecking on the deployment (self excluded)", ) .namespace("graphcast") - .subsystem("poi_radio"), + .subsystem("subgraph_radio"), &["deployment"], ) .expect("Failed to create ACTIVE_INDEXERS gauges"); @@ -64,7 +64,7 @@ pub static DIVERGING_SUBGRAPHS: Lazy = Lazy::new(|| { "Number of diverging subgraphs with non-consensus POIs from cross-checking", ) .namespace("graphcast") - .subsystem("poi_radio"), + .subsystem("subgraph_radio"), ) .expect("Failed to create diverging_subgraphs gauge"); prometheus::register(Box::new(m.clone())) @@ -73,31 +73,31 @@ pub static DIVERGING_SUBGRAPHS: Lazy = Lazy::new(|| { }); #[allow(dead_code)] -pub static LOCAL_NPOIS_TO_COMPARE: Lazy = Lazy::new(|| { +pub static LOCAL_PPOIS_TO_COMPARE: Lazy = Lazy::new(|| { let m = IntGaugeVec::new( Opts::new( - "local_npois_to_compare", - "Number of nPOIs stored locally for each subgraph", + "local_ppois_to_compare", + "Number of pPOIs stored locally for each subgraph", ) .namespace("graphcast") - .subsystem("poi_radio"), + .subsystem("subgraph_radio"), &["deployment"], ) - .expect("Failed to create LOCAL_NPOIS_TO_COMPARE gauges"); + .expect("Failed to create LOCAL_PPOIS_TO_COMPARE gauges"); prometheus::register(Box::new(m.clone())) - .expect("Failed to register local_npois_to_compare gauge"); + .expect("Failed to register local_ppois_to_compare gauge"); m }); #[allow(dead_code)] -pub static INDEXER_COUNT_BY_NPOI: Lazy = Lazy::new(|| { +pub static INDEXER_COUNT_BY_PPOI: Lazy = Lazy::new(|| { let m = HistogramVec::new( HistogramOpts::new( - "indexer_count_by_npoi", - "Count of indexers attesting for a nPOI", + "indexer_count_by_ppoi", + "Count of indexers attesting for a pPOI", ) .namespace("graphcast") - .subsystem("poi_radio") + .subsystem("subgraph_radio") .buckets(linear_buckets(0.0, 1.0, 20).unwrap()), // Q: if we add indexer group hash here // then new metric is created for changes in indexers. I imagine this not so important @@ -105,9 +105,9 @@ pub static INDEXER_COUNT_BY_NPOI: Lazy = Lazy::new(|| { // description of metrics cannot be updated after initialization &["deployment"], ) - .expect("Failed to create indexer_count_by_npoi histograms"); + .expect("Failed to create indexer_count_by_ppoi histograms"); prometheus::register(Box::new(m.clone())) - .expect("Failed to register indexer_count_by_npoi counter"); + .expect("Failed to register indexer_count_by_ppoi counter"); m }); @@ -131,8 +131,8 @@ pub fn start_metrics() { Box::new(CACHED_MESSAGES.clone()), Box::new(ACTIVE_INDEXERS.clone()), Box::new(DIVERGING_SUBGRAPHS.clone()), - Box::new(LOCAL_NPOIS_TO_COMPARE.clone()), - Box::new(INDEXER_COUNT_BY_NPOI.clone()), + Box::new(LOCAL_PPOIS_TO_COMPARE.clone()), + Box::new(INDEXER_COUNT_BY_PPOI.clone()), ], ); } diff --git a/poi-radio/src/operator/attestation.rs b/subgraph-radio/src/operator/attestation.rs similarity index 91% rename from poi-radio/src/operator/attestation.rs rename to subgraph-radio/src/operator/attestation.rs index 28b4c42..e06199d 100644 --- a/poi-radio/src/operator/attestation.rs +++ b/subgraph-radio/src/operator/attestation.rs @@ -20,7 +20,7 @@ use graphcast_sdk::{ use crate::{ messages::poi::PublicPoiMessage, metrics::{ - ACTIVE_INDEXERS, DIVERGING_SUBGRAPHS, INDEXER_COUNT_BY_NPOI, LOCAL_NPOIS_TO_COMPARE, + ACTIVE_INDEXERS, DIVERGING_SUBGRAPHS, INDEXER_COUNT_BY_PPOI, LOCAL_PPOIS_TO_COMPARE, }, state::PersistedState, OperationError, @@ -28,10 +28,10 @@ use crate::{ use super::Notifier; -/// A wrapper around an attested NPOI, tracks Indexers that have sent it plus their accumulated stake +/// A wrapper around an attested public POI, tracks Indexers that have sent it plus their accumulated stake #[derive(Clone, Debug, PartialEq, Eq, Hash, SimpleObject, Serialize, Deserialize)] pub struct Attestation { - pub npoi: String, + pub ppoi: String, pub stake_weight: i64, pub senders: Vec, pub sender_group_hash: String, @@ -40,12 +40,12 @@ pub struct Attestation { #[autometrics] impl Attestation { - pub fn new(npoi: String, stake_weight: f32, senders: Vec, timestamp: Vec) -> Self { + pub fn new(ppoi: String, stake_weight: f32, senders: Vec, timestamp: Vec) -> Self { let addresses = &mut senders.clone(); sort_addresses(addresses); let sender_group_hash = hash_addresses(addresses); Attestation { - npoi, + ppoi, stake_weight: stake_weight as i64, senders, sender_group_hash, @@ -53,7 +53,7 @@ impl Attestation { } } - /// Used whenever we receive a new attestation for an NPOI that already exists in the store + /// Used whenever we receive a new attestation for an PPOI that already exists in the store pub fn update( base: &Self, address: String, @@ -66,7 +66,7 @@ impl Attestation { )) } else { Ok(Self::new( - base.npoi.clone(), + base.ppoi.clone(), (base.stake_weight as f32) + stake, [base.senders.clone(), vec![address]].concat(), [base.timestamp.clone(), vec![timestamp]].concat(), @@ -79,8 +79,8 @@ impl fmt::Display for Attestation { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "NPOI: {}\nsender addresses: {:#?}\nstake weight: {}", - self.npoi, self.senders, self.stake_weight + "PPOI: {}\nsender addresses: {:#?}\nstake weight: {}", + self.ppoi, self.senders, self.stake_weight ) } } @@ -98,9 +98,9 @@ pub struct AttestationEntry { pub fn attestations_to_vec(attestations: &LocalAttestationsMap) -> Vec { attestations .iter() - .flat_map(|(npoi, inner_map)| { + .flat_map(|(ppoi, inner_map)| { inner_map.iter().map(move |(blk, att)| AttestationEntry { - deployment: npoi.clone(), + deployment: ppoi.clone(), block_number: *blk, attestation: att.clone(), }) @@ -125,7 +125,7 @@ pub async fn process_ppoi_message( for msg in messages.iter() { let radio_msg = &msg.payload.clone(); // Message has passed GraphcastMessage validation, now check for radio validation - let npoi = radio_msg.payload_content().to_string(); + let ppoi = radio_msg.payload_content().to_string(); let sender_stake = get_indexer_stake(&radio_msg.graph_account.clone(), callbook.graph_network()) .await @@ -139,7 +139,7 @@ pub async fn process_ppoi_message( .or_default(); let attestations = blocks.entry(radio_msg.block_number).or_default(); - let existing_attestation = attestations.iter_mut().find(|a| a.npoi == npoi); + let existing_attestation = attestations.iter_mut().find(|a| a.ppoi == ppoi); if let Some(existing_attestation) = existing_attestation { if let Ok(updated_attestation) = Attestation::update( @@ -169,16 +169,16 @@ pub async fn process_ppoi_message( num_attestation = remote_attestations.len(), "Process message into attestations", ); - // npoi_hist by attestation - don't care for attestation but should be grouped together + // ppoi_hist by attestation - don't care for attestation but should be grouped together // so the summed up metrics should be ACTIVE_INDEXERS - let npoi_hist = INDEXER_COUNT_BY_NPOI.with_label_values(&[&first_msg.identifier.to_string()]); + let ppoi_hist = INDEXER_COUNT_BY_PPOI.with_label_values(&[&first_msg.identifier.to_string()]); let blocks = remote_attestations .entry(first_msg.identifier.to_string()) .or_default(); for a in blocks.entry(first_msg.payload.block_number).or_default() { // this can probably sum up to active peers) - // Update INDEXER_COUNT_BY_NPOI metric - npoi_hist.observe(a.senders.len() as f64); + // Update INDEXER_COUNT_BY_PPOI metric + ppoi_hist.observe(a.senders.len() as f64); } let active_indexers = ACTIVE_INDEXERS.with_label_values(&[&first_msg.identifier.to_string()]); @@ -199,13 +199,13 @@ pub fn combine_senders(attestations: &[Attestation]) -> Vec { /// If they don't exist, then return default value that shall never be validated to trigger pub fn local_comparison_point( local_attestations: &LocalAttestationsMap, - remote_messages: &[GraphcastMessage], + remote_ppoi_messages: &[GraphcastMessage], id: String, collect_window_duration: i64, ) -> Option<(u64, i64)> { if let Some(blocks_map) = local_attestations.get(&id) { // Find the attestaion by the smallest block - let remote_blocks = remote_messages + let remote_blocks = remote_ppoi_messages .iter() .filter(|m| m.identifier == id.clone()) .map(|m| m.payload.block_number) @@ -232,7 +232,7 @@ pub fn local_comparison_point( pub fn update_blocks( block_number: u64, blocks: &HashMap>, - npoi: String, + ppoi: String, stake: f32, address: String, timestamp: i64, @@ -242,7 +242,7 @@ pub fn update_blocks( blocks_clone.insert( block_number, vec![Attestation::new( - npoi, + ppoi, stake, vec![address], vec![timestamp], @@ -251,7 +251,7 @@ pub fn update_blocks( blocks_clone } -/// Saves NPOIs that we've generated locally, in order to compare them with remote ones later +/// Saves PPOIs that we've generated locally, in order to compare them with remote ones later pub fn save_local_attestation( local_attestations: Arc>, content: String, @@ -269,10 +269,10 @@ pub fn save_local_attestation( .and_modify(|existing_attestation| *existing_attestation = attestation.clone()) .or_insert(attestation); - let npoi_gauge = LOCAL_NPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash]); + let ppoi_gauge = LOCAL_PPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash]); // The value is the total number of senders that are attesting for that subgraph - npoi_gauge.set(local_attestations.len().try_into().unwrap()); + ppoi_gauge.set(local_attestations.len().try_into().unwrap()); } /// Clear the expired local attestations after comparing with remote results @@ -288,9 +288,9 @@ pub fn clear_local_attestation( let mut blocks_clone: HashMap = HashMap::new(); blocks_clone.extend(blocks.clone()); blocks_clone.remove(&block_number); - let npoi_gauge = LOCAL_NPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash]); + let ppoi_gauge = LOCAL_PPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash]); // The value is the total number of senders that are attesting for that subgraph - npoi_gauge.set(blocks_clone.len().try_into().unwrap()); + ppoi_gauge.set(blocks_clone.len().try_into().unwrap()); local_attestations.insert(ipfs_hash, blocks_clone); }; } @@ -406,9 +406,9 @@ impl Clone for ComparisonResult { } /// Compares local attestations against remote ones using the attestation stores we populated while processing saved GraphcastMessage messages. -/// It takes our attestation (NPOI) for a given subgraph on a given block and compares it to the top-attested one from the remote attestations. +/// It takes our attestation (PPOI) for a given subgraph on a given block and compares it to the top-attested one from the remote attestations. /// The top remote attestation is found by grouping attestations together and increasing their total stake-weight every time we see a new message -/// with the same NPOI from an Indexer (NOTE: one Indexer can only send 1 attestation per subgraph per block). The attestations are then sorted +/// with the same PPOI from an Indexer (NOTE: one Indexer can only send 1 attestation per subgraph per block). The attestations are then sorted /// and we take the one with the highest total stake-weight. pub fn compare_attestations( attestation_block: u64, @@ -490,17 +490,17 @@ pub fn compare_attestations( ipfs_hash, attestation_block, sorted_attestations = tracing::field::debug(&remote_attestations), - "More than 1 nPOI found", + "More than 1 pPOI found", ); } - let most_attested_npoi = &remote_attestations.last().unwrap().npoi; - if most_attested_npoi == &local_attestation.npoi { + let most_attested_ppoi = &remote_attestations.last().unwrap().ppoi; + if most_attested_ppoi == &local_attestation.ppoi { trace!( ipfs_hash, attestation_block, - num_unique_npois = remote_attestations.len(), - "nPOI matched", + num_unique_ppois = remote_attestations.len(), + "pPOI matched", ); ComparisonResult { deployment: ipfs_hash.to_string(), @@ -514,7 +514,7 @@ pub fn compare_attestations( attestation_block, remote_attestations = tracing::field::debug(&remote_attestations), local_attestation = tracing::field::debug(&local_attestation), - "Number of nPOI submitted", + "Number of pPOI submitted", ); ComparisonResult { deployment: ipfs_hash.to_string(), @@ -535,13 +535,13 @@ pub fn compare_attestation( let mut remote_attestations = remote_attestations; remote_attestations.sort_by(|a, b| a.stake_weight.partial_cmp(&b.stake_weight).unwrap()); - let most_attested_npoi = &remote_attestations.last().unwrap().npoi; - if most_attested_npoi == &local_attestation.npoi { + let most_attested_ppoi = &remote_attestations.last().unwrap().ppoi; + if most_attested_ppoi == &local_attestation.ppoi { trace!( local.block_number, remote_attestations = tracing::field::debug(&remote_attestations), local_attestation = tracing::field::debug(&local_attestation), - "nPOI matched", + "pPOI matched", ); ComparisonResult { deployment: local.deployment.to_string(), @@ -576,7 +576,7 @@ fn sort_addresses(addresses: &mut [String]) { }); } -/// Deterministically ordering the indexer addresses attesting to a nPOI, and then hashing that list +/// Deterministically ordering the indexer addresses attesting to a pPOI, and then hashing that list fn hash_addresses(addresses: &[String]) -> String { // create a SHA3-256 object let mut hasher = Sha3_256::new(); @@ -718,28 +718,28 @@ mod tests { let block_clone = update_blocks( 42, &blocks, - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, "0xadd3".to_string(), 1, ); assert_eq!( - block_clone.get(&42).unwrap().first().unwrap().npoi, - "awesome-npoi".to_string() + block_clone.get(&42).unwrap().first().unwrap().ppoi, + "awesome-ppoi".to_string() ); } #[test] fn test_sort_sender_addresses_unique() { let attestation = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 1.0, vec!["0xaac5349585cbbf924026d25a520ffa9e8b51a39b".to_string()], vec![1], ); let attestation2 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 1.0, vec!["0xbbc5349585cbbf924026d25a520ffa9e8b51a39b".to_string()], vec![1], @@ -753,7 +753,7 @@ mod tests { #[test] fn test_sort_sender_addresses() { let attestation = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 1.0, vec![ "0xaac5349585cbbf924026d25a520ffa9e8b51a39b".to_string(), @@ -762,7 +762,7 @@ mod tests { vec![1, 2], ); let attestation2 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 1.0, vec![ "0xbbc5349585cbbf924026d25a520ffa9e8b51a39b".to_string(), @@ -779,21 +779,21 @@ mod tests { #[test] fn test_attestation_sorting() { let attestation1 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![0], ); let attestation2 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa2".to_string()], vec![1], ); let attestation3 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 1.0, vec!["0xa3".to_string()], vec![2], @@ -814,7 +814,7 @@ mod tests { #[test] fn test_attestation_update_success() { let attestation = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![2], @@ -830,7 +830,7 @@ mod tests { #[test] fn test_attestation_update_fail() { let attestation = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![0], @@ -869,7 +869,7 @@ mod tests { remote_blocks.insert( 42, vec![Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![1], @@ -878,7 +878,7 @@ mod tests { local_blocks.insert( 42, - Attestation::new("awesome-npoi".to_string(), 0.0, Vec::new(), vec![0]), + Attestation::new("awesome-ppoi".to_string(), 0.0, Vec::new(), vec![0]), ); let mut remote_attestations: HashMap>> = @@ -933,14 +933,14 @@ mod tests { let mut local_blocks: HashMap = HashMap::new(); let remote = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![0], ); remote_blocks.insert(42, vec![remote.clone()]); - let local = Attestation::new("awesome-npoi".to_string(), 0.0, Vec::new(), vec![0]); + let local = Attestation::new("awesome-ppoi".to_string(), 0.0, Vec::new(), vec![0]); local_blocks.insert(42, local.clone()); let mut remote_attestations: HashMap>> = @@ -973,21 +973,21 @@ mod tests { async fn clear_local_attestation_success() { let mut local_blocks: HashMap = HashMap::new(); let attestation1 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![0], ); let attestation2 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa2".to_string()], vec![1], ); let attestation3 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 1.0, vec!["0xa3".to_string()], vec![2], @@ -1022,7 +1022,7 @@ mod tests { graph_account: String::from("0x7e6528e4ce3055e829a32b5dc4450072bac28bc6"), payload: PublicPoiMessage { identifier: String::from("hash"), - content: String::from("awesome-npoi"), + content: String::from("awesome-ppoi"), nonce: 2, network: String::from("goerli"), block_number: 42, @@ -1037,21 +1037,21 @@ mod tests { async fn local_attestation_pointer_success() { let mut local_blocks: HashMap = HashMap::new(); let attestation1 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa1".to_string()], vec![2], ); let attestation2 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 0.0, vec!["0xa2".to_string()], vec![4], ); let attestation3 = Attestation::new( - "awesome-npoi".to_string(), + "awesome-ppoi".to_string(), 1.0, vec!["0xa3".to_string()], vec![6], @@ -1086,21 +1086,21 @@ mod tests { let local_attestations = Arc::new(SyncMutex::new(HashMap::new())); save_local_attestation( local_attestations.clone(), - "npoi-x".to_string(), + "ppoi-x".to_string(), "0xa1".to_string(), 0, ); save_local_attestation( local_attestations.clone(), - "npoi-y".to_string(), + "ppoi-y".to_string(), "0xa1".to_string(), 1, ); save_local_attestation( local_attestations.clone(), - "npoi-z".to_string(), + "ppoi-z".to_string(), "0xa2".to_string(), 2, ); @@ -1133,8 +1133,8 @@ mod tests { .unwrap() .get(&0) .unwrap() - .npoi - == *"npoi-x" + .ppoi + == *"ppoi-x" ); } } diff --git a/poi-radio/src/operator/callbook.rs b/subgraph-radio/src/operator/callbook.rs similarity index 100% rename from poi-radio/src/operator/callbook.rs rename to subgraph-radio/src/operator/callbook.rs diff --git a/poi-radio/src/operator/mod.rs b/subgraph-radio/src/operator/mod.rs similarity index 92% rename from poi-radio/src/operator/mod.rs rename to subgraph-radio/src/operator/mod.rs index 4ff0b70..84ef47f 100644 --- a/poi-radio/src/operator/mod.rs +++ b/subgraph-radio/src/operator/mod.rs @@ -10,16 +10,15 @@ use tracing::{debug, error, info, trace, warn}; use graphcast_sdk::{ build_wallet, graphcast_agent::{ - message_typing::{check_message_validity, GraphcastMessage}, - waku_handling::WakuHandlingError, - GraphcastAgent, + message_typing::check_message_validity, waku_handling::WakuHandlingError, GraphcastAgent, }, graphql::client_graph_node::{subgraph_network_blocks, update_network_chainheads}, }; -use crate::chainhead_block_str; use crate::messages::poi::PublicPoiMessage; +use crate::{chainhead_block_str, messages::poi::process_valid_message}; +use crate::config::Config; use crate::messages::upgrade::VersionUpgradeMessage; use crate::metrics::handle_serve_metrics; use crate::operator::attestation::log_gossip_summary; @@ -27,7 +26,6 @@ use crate::operator::attestation::process_comparison_results; use crate::server::run_server; use crate::state::PersistedState; use crate::GRAPHCAST_AGENT; -use crate::{config::Config, metrics::CACHED_MESSAGES}; use self::notifier::Notifier; @@ -158,22 +156,10 @@ impl RadioOperator { } }; - let identifier = msg.identifier.clone(); - let is_valid = msg.payload.validity_check(&msg, &graph_node).await; if is_valid.is_ok() { - state_ref.add_remote_message(msg.clone()); - CACHED_MESSAGES.with_label_values(&[&identifier]).set( - state_ref - .remote_messages() - .iter() - .filter(|m| m.identifier == identifier) - .collect::>>() - .len() - .try_into() - .unwrap(), - ); + process_valid_message(msg, &state_ref).await; }; } else if let Ok(msg) = agent.decode::(msg.payload()).await { trace!( @@ -199,17 +185,11 @@ impl RadioOperator { continue; } }; + let is_valid = msg.payload.validity_check(&msg, &graph_node).await; - if let Ok(payload) = is_valid { - // send notifications to the indexer? - upgrade_notifier.notify(format!( - "Subgraph owner for a deployment has shared version upgrade info:\nold deployment: {}\nnew deployment: {}\nplanned migrate time: {}\nnetwork: {}", - payload.identifier, - payload.new_hash, - payload.migrate_time, - payload.network - )).await; + if let Ok(radio_msg) = is_valid { + radio_msg.process_valid_message(&upgrade_notifier).await; }; } else { trace!("Waku message not decoded or validated, skipped message",); diff --git a/poi-radio/src/operator/notifier.rs b/subgraph-radio/src/operator/notifier.rs similarity index 100% rename from poi-radio/src/operator/notifier.rs rename to subgraph-radio/src/operator/notifier.rs diff --git a/subgraph-radio/src/operator/operation.rs b/subgraph-radio/src/operator/operation.rs new file mode 100644 index 0000000..186d4df --- /dev/null +++ b/subgraph-radio/src/operator/operation.rs @@ -0,0 +1,192 @@ +use autometrics::autometrics; + +use std::cmp::max; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::{debug, trace, warn}; + +use graphcast_sdk::{ + determine_message_block, graphcast_agent::message_typing::BuildMessageError, + networks::NetworkName, BlockPointer, NetworkBlockError, NetworkPointer, +}; + +use crate::messages::poi::{poi_message_comparison, send_poi_message}; + +use crate::{ + metrics::CACHED_MESSAGES, + operator::{attestation::ComparisonResult, RadioOperator}, + OperationError, GRAPHCAST_AGENT, +}; + +/// Determine the parameters for messages to send and compare +#[autometrics(track_concurrency)] +pub async fn gossip_set_up( + id: String, + network_chainhead_blocks: &HashMap, + subgraph_network_latest_blocks: &HashMap, +) -> Result<(NetworkName, BlockPointer, u64), BuildMessageError> { + // Get the indexing network of the deployment + // and update the NETWORK message block + let (network_name, latest_block) = match subgraph_network_latest_blocks.get(&id.clone()) { + Some(network_block) => ( + NetworkName::from_string(&network_block.network.clone()), + network_block.block.clone(), + ), + None => { + let err_msg = format!("Could not query the subgraph's indexing network, check Graph node's indexing statuses of subgraph deployment {}", id.clone()); + warn!( + err = tracing::field::debug(&err_msg), + "Failed to build message" + ); + return Err(BuildMessageError::Network(NetworkBlockError::FailedStatus( + err_msg, + ))); + } + }; + + let message_block = match determine_message_block(network_chainhead_blocks, network_name) { + Ok(block) => block, + Err(e) => return Err(BuildMessageError::Network(e)), + }; + + debug!( + deployment_hash = tracing::field::debug(&id), + network = tracing::field::debug(&network_name), + message_block = message_block, + latest_block = latest_block.number, + message_countdown_blocks = max(0, message_block as i64 - latest_block.number as i64), + "Deployment status", + ); + + Ok((network_name, latest_block, message_block)) +} + +impl RadioOperator { + pub async fn gossip_poi( + &self, + identifiers: Vec, + network_chainhead_blocks: &HashMap, + subgraph_network_latest_blocks: &HashMap, + ) -> Vec> { + let mut send_handles = vec![]; + for id in identifiers.clone() { + /* Set up */ + let (network_name, latest_block, message_block) = if let Ok(params) = gossip_set_up( + id.clone(), + network_chainhead_blocks, + subgraph_network_latest_blocks, + ) + .await + { + params + } else { + let err_msg = "Failed to set up message parameters".to_string(); + warn!(id, err_msg, "Gossip POI failed"); + continue; + }; + + /* Send message */ + let id_cloned = id.clone(); + + let callbook = self.config.callbook(); + let local_attestations = self.persisted_state.local_attestations.clone(); + let send_handle = tokio::spawn(async move { + send_poi_message( + id_cloned, + callbook, + message_block, + latest_block, + network_name, + Arc::clone(&local_attestations), + GRAPHCAST_AGENT.get().unwrap(), + ) + .await + }); + + send_handles.push(send_handle); + } + + let mut send_ops = vec![]; + for handle in send_handles { + if let Ok(s) = handle.await { + send_ops.push(s); + } + } + send_ops + } + + pub async fn compare_poi( + &self, + identifiers: Vec, + ) -> Vec> { + let mut compare_handles = vec![]; + + // Additional radio message check happens here since messages are synchronously stored to state cache in msg handler + let remote_ppoi_messages = self + .state() + .valid_ppoi_messages(&self.config.graph_node_endpoint) + .await; + + for id in identifiers.clone() { + /* Set up */ + let collect_duration: i64 = self.config.collect_message_duration().to_owned(); + let id_cloned = id.clone(); + let callbook = self.config.callbook(); + let local_attestations = self.state().local_attestations(); + let filtered_msg = remote_ppoi_messages + .iter() + .filter(|&m| m.identifier == id.clone()) + .cloned() + .collect(); + + let compare_handle = tokio::spawn(async move { + poi_message_comparison( + id_cloned, + collect_duration, + callbook.clone(), + filtered_msg, + local_attestations, + ) + .await + }); + compare_handles.push(compare_handle); + } + + let mut compare_ops = vec![]; + for handle in compare_handles { + let res = handle.await; + if let Ok(s) = res { + // Skip clean up for comparisonResult for Error and buildFailed + match s { + Ok(r) => { + compare_ops.push(Ok(r.clone())); + + /* Clean up cache */ + // Only clear the ones matching identifier and block number equal or less + // Retain the msgs with a different identifier, or if their block number is greater + // clear_local_attestation(&mut local_attestations, r.deployment_hash(), r.block()); + self.persisted_state + .clean_local_attestations(r.block(), r.deployment_hash()); + self.persisted_state + .clean_remote_ppoi_messages(r.block(), r.deployment_hash()); + CACHED_MESSAGES + .with_label_values(&[&r.deployment_hash()]) + .set( + self.state() + .remote_ppoi_messages() + .len() + .try_into() + .unwrap(), + ); + } + Err(e) => { + trace!(err = tracing::field::debug(&e), "Compare handles"); + + compare_ops.push(Err(e.clone_with_inner())); + } + } + } + } + compare_ops + } +} diff --git a/poi-radio/src/server/mod.rs b/subgraph-radio/src/server/mod.rs similarity index 92% rename from poi-radio/src/server/mod.rs rename to subgraph-radio/src/server/mod.rs index d126829..b61ab29 100644 --- a/poi-radio/src/server/mod.rs +++ b/subgraph-radio/src/server/mod.rs @@ -8,7 +8,7 @@ use tracing::{debug, info}; use crate::{ config::Config, server::{ - model::{build_schema, POIRadioContext}, + model::{build_schema, SubgraphRadioContext}, routes::{graphql_handler, graphql_playground, health}, }, shutdown_signal, @@ -31,7 +31,7 @@ pub async fn run_server( return; } let port = config.server_port().unwrap(); - let context = Arc::new(POIRadioContext::init(config.clone(), persisted_state)); + let context = Arc::new(SubgraphRadioContext::init(config.clone(), persisted_state)); let schema = build_schema(Arc::clone(&context)).await; diff --git a/poi-radio/src/server/model/mod.rs b/subgraph-radio/src/server/model/mod.rs similarity index 88% rename from poi-radio/src/server/model/mod.rs rename to subgraph-radio/src/server/model/mod.rs index a43699e..f26cd9f 100644 --- a/poi-radio/src/server/model/mod.rs +++ b/subgraph-radio/src/server/model/mod.rs @@ -15,7 +15,7 @@ use crate::{ }; use graphcast_sdk::{graphcast_agent::message_typing::GraphcastMessage, graphql::QueryError}; -pub(crate) type POIRadioSchema = Schema; +pub(crate) type SubgraphRadioSchema = Schema; // Unified query object for resolvers #[derive(Default)] @@ -30,8 +30,8 @@ impl QueryRoot { block: Option, ) -> Result>, HttpServiceError> { let msgs = ctx - .data_unchecked::>() - .remote_messages_filtered(&identifier, &block); + .data_unchecked::>() + .remote_ppoi_messages_filtered(&identifier, &block); Ok(msgs) } @@ -42,7 +42,7 @@ impl QueryRoot { block: Option, ) -> Result, HttpServiceError> { let attestations = ctx - .data_unchecked::>() + .data_unchecked::>() .local_attestations(identifier, block); let filtered = attestations_to_vec(&attestations); @@ -58,7 +58,7 @@ impl QueryRoot { result_type: Option, ) -> Result, HttpServiceError> { let res = &ctx - .data_unchecked::>() + .data_unchecked::>() .comparison_results(identifier, block, result_type) .await; @@ -72,7 +72,7 @@ impl QueryRoot { identifier: String, ) -> Result, HttpServiceError> { let res = &ctx - .data_unchecked::>() + .data_unchecked::>() .comparison_result(identifier); Ok(res.clone()) } @@ -98,12 +98,12 @@ impl QueryRoot { } else { continue; }; - let local_npoi = local_attestation.npoi.clone(); + let local_ppoi = local_attestation.ppoi.clone(); // Aggregate remote attestations with the local attestations let mut aggregated_attestations: Vec = vec![]; for a in r.attestations { - if a.npoi == local_attestation.npoi { + if a.ppoi == local_attestation.ppoi { let updateed_attestation = attestation::Attestation::update( &a, local_info.address.clone(), @@ -119,8 +119,8 @@ impl QueryRoot { aggregated_attestations.push(a) } } - let sender_ratio = sender_count_str(&aggregated_attestations, local_npoi.clone()); - let stake_ratio = stake_weight_str(&aggregated_attestations, local_npoi); + let sender_ratio = sender_count_str(&aggregated_attestations, local_ppoi.clone()); + let stake_ratio = stake_weight_str(&aggregated_attestations, local_ppoi); ratios.push(CompareRatio::new( r.deployment, r.block_number, @@ -133,7 +133,9 @@ impl QueryRoot { /// Return indexer info async fn indexer_info(&self, ctx: &Context<'_>) -> Result { - let config = ctx.data_unchecked::>().radio_config(); + let config = ctx + .data_unchecked::>() + .radio_config(); let basic_info = config .basic_info() .await @@ -146,7 +148,7 @@ impl QueryRoot { } /// Helper function to order attestations by stake weight and then find the number of unique senders -pub fn sender_count_str(attestations: &[Attestation], local_npoi: String) -> String { +pub fn sender_count_str(attestations: &[Attestation], local_ppoi: String) -> String { // Create a HashMap to store the attestation and senders let mut temp_attestations = attestations.to_owned(); let mut output = String::new(); @@ -156,7 +158,7 @@ pub fn sender_count_str(attestations: &[Attestation], local_npoi: String) -> Str // Iterate through the attestations and populate the maps // No set is needed since uniqueness is garuanteeded by validation for att in attestations.iter() { - let separator = if att.npoi == local_npoi { "*:" } else { ":" }; + let separator = if att.ppoi == local_ppoi { "*:" } else { ":" }; output.push_str(&format!("{}{}", att.senders.len(), separator)); } @@ -167,7 +169,7 @@ pub fn sender_count_str(attestations: &[Attestation], local_npoi: String) -> Str } /// Helper function to order attestations by stake weight and then find the number of unique senders -pub fn stake_weight_str(attestations: &[Attestation], local_npoi: String) -> String { +pub fn stake_weight_str(attestations: &[Attestation], local_ppoi: String) -> String { // Create a HashMap to store the attestation and senders let mut temp_attestations = attestations.to_owned(); let mut output = String::new(); @@ -177,7 +179,7 @@ pub fn stake_weight_str(attestations: &[Attestation], local_npoi: String) -> Str // Iterate through the attestations and populate the maps // No set is needed since uniqueness is garuanteeded by validation for att in attestations.iter() { - let separator = if att.npoi == local_npoi { "*:" } else { ":" }; + let separator = if att.ppoi == local_ppoi { "*:" } else { ":" }; output.push_str(&format!("{}{}", att.stake_weight, separator)); } @@ -185,18 +187,18 @@ pub fn stake_weight_str(attestations: &[Attestation], local_npoi: String) -> Str output } -pub async fn build_schema(ctx: Arc) -> POIRadioSchema { +pub async fn build_schema(ctx: Arc) -> SubgraphRadioSchema { Schema::build(QueryRoot, EmptyMutation, EmptySubscription) .data(ctx.persisted_state) .finish() } -pub struct POIRadioContext { +pub struct SubgraphRadioContext { pub radio_config: Config, pub persisted_state: &'static PersistedState, } -impl POIRadioContext { +impl SubgraphRadioContext { pub fn init(radio_config: Config, persisted_state: &'static PersistedState) -> Self { Self { radio_config, @@ -237,20 +239,20 @@ impl POIRadioContext { } } - pub fn remote_messages(&self) -> Vec> { - self.persisted_state.remote_messages() + pub fn remote_ppoi_messages(&self) -> Vec> { + self.persisted_state.remote_ppoi_messages() } - pub fn remote_messages_filtered( + pub fn remote_ppoi_messages_filtered( &self, identifier: &Option, block: &Option, ) -> Vec> { - let msgs = self.remote_messages(); + let msgs = self.remote_ppoi_messages(); let filtered = msgs .iter() .cloned() - .filter(|message| filter_remote_messages(message, identifier, block)) + .filter(|message| filter_remote_ppoi_messages(message, identifier, block)) .collect::>(); filtered } @@ -287,7 +289,7 @@ impl POIRadioContext { let mut res = vec![]; for entry in locals { let deployment_identifier = entry.deployment.clone(); - let msgs = self.remote_messages_filtered(&identifier, &block); + let msgs = self.remote_ppoi_messages_filtered(&identifier, &block); let remote_attestations = process_ppoi_message(msgs, &config.callbook()) .await .ok() @@ -315,7 +317,7 @@ impl POIRadioContext { } /// Filter funciton for Attestations on deployment and block -fn filter_remote_messages( +fn filter_remote_ppoi_messages( entry: &GraphcastMessage, identifier: &Option, block: &Option, diff --git a/poi-radio/src/server/routes/mod.rs b/subgraph-radio/src/server/routes/mod.rs similarity index 87% rename from poi-radio/src/server/routes/mod.rs rename to subgraph-radio/src/server/routes/mod.rs index d784954..f553b9a 100644 --- a/poi-radio/src/server/routes/mod.rs +++ b/subgraph-radio/src/server/routes/mod.rs @@ -12,8 +12,8 @@ use std::sync::Arc; use tracing::{span, trace, Instrument, Level}; use tracing_opentelemetry::OpenTelemetrySpanExt; -use super::model::POIRadioContext; -use crate::server::model::POIRadioSchema; +use super::model::SubgraphRadioContext; +use crate::server::model::SubgraphRadioSchema; #[derive(Serialize)] struct Health { @@ -34,8 +34,8 @@ pub(crate) async fn graphql_playground() -> impl IntoResponse { pub(crate) async fn graphql_handler( req: GraphQLRequest, - Extension(schema): Extension, - Extension(context): Extension>, + Extension(schema): Extension, + Extension(context): Extension>, ) -> GraphQLResponse { let span = span!(Level::TRACE, "graphql_execution"); diff --git a/poi-radio/src/state.rs b/subgraph-radio/src/state.rs similarity index 89% rename from poi-radio/src/state.rs rename to subgraph-radio/src/state.rs index 0a8b953..7e45828 100644 --- a/poi-radio/src/state.rs +++ b/subgraph-radio/src/state.rs @@ -30,7 +30,7 @@ type ComparisonResults = Arc>>; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct PersistedState { pub local_attestations: Local, - pub remote_messages: Remote, + pub remote_ppoi_messages: Remote, pub comparison_results: ComparisonResults, } @@ -41,30 +41,30 @@ impl PersistedState { comparison_results: Option, ) -> PersistedState { let local_attestations = local.unwrap_or(Arc::new(SyncMutex::new(HashMap::new()))); - let remote_messages = remote.unwrap_or(Arc::new(SyncMutex::new(vec![]))); + let remote_ppoi_messages = remote.unwrap_or(Arc::new(SyncMutex::new(vec![]))); let comparison_results = comparison_results.unwrap_or(Arc::new(SyncMutex::new(HashMap::new()))); PersistedState { local_attestations, - remote_messages, + remote_ppoi_messages, comparison_results, } } - /// Optional updates for either local_attestations, remote_messages or comparison_results without requiring either to be in-scope + /// Optional updates for either local_attestations, remote_ppoi_messages or comparison_results without requiring either to be in-scope pub async fn update( &mut self, local_attestations: Option, - remote_messages: Option, + remote_ppoi_messages: Option, comparison_results: Option, ) -> PersistedState { let local_attestations = match local_attestations { None => self.local_attestations.clone(), Some(l) => l, }; - let remote_messages = match remote_messages { - None => self.remote_messages.clone(), + let remote_ppoi_messages = match remote_ppoi_messages { + None => self.remote_ppoi_messages.clone(), Some(r) => r, }; let comparison_results = match comparison_results { @@ -73,7 +73,7 @@ impl PersistedState { }; PersistedState { local_attestations, - remote_messages, + remote_ppoi_messages, comparison_results, } } @@ -92,8 +92,8 @@ impl PersistedState { } /// Getter for remote_messages - pub fn remote_messages(&self) -> Vec> { - self.remote_messages.lock().unwrap().clone() + pub fn remote_ppoi_messages(&self) -> Vec> { + self.remote_ppoi_messages.lock().unwrap().clone() } /// Getter for comparison_results @@ -115,20 +115,20 @@ impl PersistedState { self.local_attestations = local_attestations; } - /// Update remote_messages + /// Update remote_ppoi_messages pub async fn update_remote( &mut self, - remote_messages: Vec>, + remote_ppoi_messages: Vec>, ) -> Vec> { - self.remote_messages = Arc::new(SyncMutex::new(remote_messages)); - self.remote_messages() + self.remote_ppoi_messages = Arc::new(SyncMutex::new(remote_ppoi_messages)); + self.remote_ppoi_messages() } - /// Add message to remote_messages + /// Add message to remote_ppoi_messages /// Generalize PublicPoiMessage - pub fn add_remote_message(&self, msg: GraphcastMessage) { + pub fn add_remote_ppoi_message(&self, msg: GraphcastMessage) { trace!(msg = tracing::field::debug(&msg), "adding remote message"); - self.remote_messages.lock().unwrap().push(msg) + self.remote_ppoi_messages.lock().unwrap().push(msg) } /// Add entry to comparison_results @@ -145,10 +145,10 @@ impl PersistedState { &mut self, graph_node_endpoint: &str, ) -> Vec> { - let remote_messages = self.remote_messages(); + let remote_ppoi_messages = self.remote_ppoi_messages(); let mut valid_messages = vec![]; - for message in remote_messages { + for message in remote_ppoi_messages { let is_valid = message .payload .validity_check(&message, graph_node_endpoint) @@ -213,13 +213,13 @@ impl PersistedState { result_type } - /// Clean remote_messages - pub fn clean_remote_messages(&self, block_number: u64, deployment: String) { + /// Clean remote_ppoi_messages + pub fn clean_remote_ppoi_messages(&self, block_number: u64, deployment: String) { trace!( - msgs = tracing::field::debug(&self.remote_messages.lock().unwrap()), + msgs = tracing::field::debug(&self.remote_ppoi_messages.lock().unwrap()), "cleaning these messages" ); - self.remote_messages + self.remote_ppoi_messages .lock() .unwrap() .retain(|msg| msg.payload.block_number >= block_number || msg.identifier != deployment) @@ -317,7 +317,7 @@ mod tests { let mut state = PersistedState::load_cache(path); assert!(state.local_attestations().is_empty()); - assert!(state.remote_messages().is_empty()); + assert!(state.remote_ppoi_messages().is_empty()); assert!(state.comparison_results().is_empty()); let local_attestations = Arc::new(SyncMutex::new(HashMap::new())); @@ -326,21 +326,21 @@ mod tests { save_local_attestation( local_attestations.clone(), - "npoi-x".to_string(), + "ppoi-x".to_string(), "0xa1".to_string(), 0, ); save_local_attestation( local_attestations.clone(), - "npoi-y".to_string(), + "ppoi-y".to_string(), "0xa1".to_string(), 1, ); save_local_attestation( local_attestations.clone(), - "npoi-z".to_string(), + "ppoi-z".to_string(), "0xa2".to_string(), 2, ); @@ -393,7 +393,7 @@ mod tests { state.update_cache(path); let state = PersistedState::load_cache(path); - assert_eq!(state.remote_messages.lock().unwrap().len(), 1); + assert_eq!(state.remote_ppoi_messages.lock().unwrap().len(), 1); assert!(!state.local_attestations.lock().unwrap().is_empty()); assert!(state.local_attestations.lock().unwrap().len() == 2); assert!( @@ -425,8 +425,8 @@ mod tests { .unwrap() .get(&0) .unwrap() - .npoi - == *"npoi-x" + .ppoi + == *"ppoi-x" ); assert_eq!(state.comparison_results.lock().unwrap().len(), 1); @@ -458,11 +458,11 @@ mod tests { async fn handle_comparison_result_new_deployment() { let notifier = Notifier::new("not-a-real-radio".to_string(), None, None, None, None, None); let local_attestations = Arc::new(SyncMutex::new(HashMap::new())); - let remote_messages = Arc::new(SyncMutex::new(Vec::new())); + let remote_ppoi_messages = Arc::new(SyncMutex::new(Vec::new())); let comparison_results = Arc::new(SyncMutex::new(HashMap::new())); let state = PersistedState { local_attestations, - remote_messages, + remote_ppoi_messages, comparison_results, }; @@ -484,11 +484,11 @@ mod tests { async fn handle_comparison_result_change_result_type() { let notifier = Notifier::new("not-a-real-radio".to_string(), None, None, None, None, None); let local_attestations = Arc::new(SyncMutex::new(HashMap::new())); - let remote_messages = Arc::new(SyncMutex::new(Vec::new())); + let remote_ppoi_messages = Arc::new(SyncMutex::new(Vec::new())); let comparison_results = Arc::new(SyncMutex::new(HashMap::new())); let state = PersistedState { local_attestations, - remote_messages, + remote_ppoi_messages, comparison_results, }; diff --git a/test-runner/Cargo.toml b/test-runner/Cargo.toml index 5bd23d7..528c91f 100644 --- a/test-runner/Cargo.toml +++ b/test-runner/Cargo.toml @@ -3,9 +3,9 @@ name = "test-runner" version = "0.0.1" edition = "2021" authors = ["GraphOps (axiomatic-aardvark, hopeyen)"] -description = "POI Radio e2e tests" +description = "Subgraph Radio e2e tests" license = "Apache-2.0" -repository = "https://github.com/graphops/poi-radio" +repository = "https://github.com/graphops/subgraph-radio" keywords = [ "graphprotocol", "data-integrity", @@ -24,7 +24,7 @@ categories = [ waku = { version = "0.1.1", package = "waku-bindings" } test-utils = { path = "../test-utils" } graphcast-sdk = "0.4.0" -poi-radio = { path = "../poi-radio" } +subgraph-radio = { path = "../subgraph-radio" } tokio = { version = "1.1.1", features = ["full", "rt"] } tracing = "0.1" tracing-subscriber = "0.3" diff --git a/test-runner/src/invalid_block_hash.rs b/test-runner/src/invalid_block_hash.rs index eaa04df..0112dea 100644 --- a/test-runner/src/invalid_block_hash.rs +++ b/test-runner/src/invalid_block_hash.rs @@ -1,4 +1,4 @@ -use poi_radio::state::PersistedState; +use subgraph_radio::state::PersistedState; use test_utils::{ config::{test_config, TestSenderConfig}, setup, teardown, @@ -40,9 +40,9 @@ pub async fn invalid_block_hash_test() { teardown(process_manager, &store_path); - let remote_messages = persisted_state.remote_messages(); + let remote_ppoi_messages = persisted_state.remote_ppoi_messages(); assert!( - remote_messages.is_empty(), + remote_ppoi_messages.is_empty(), "Remote messages should be empty" ); } diff --git a/test-runner/src/invalid_nonce.rs b/test-runner/src/invalid_nonce.rs index 4c7eff2..fbf3910 100644 --- a/test-runner/src/invalid_nonce.rs +++ b/test-runner/src/invalid_nonce.rs @@ -1,4 +1,4 @@ -use poi_radio::state::PersistedState; +use subgraph_radio::state::PersistedState; use test_utils::{ config::{test_config, TestSenderConfig}, setup, teardown, @@ -38,9 +38,9 @@ pub async fn invalid_nonce_test() { teardown(process_manager, &store_path); - let remote_messages = persisted_state.remote_messages(); + let remote_ppoi_messages = persisted_state.remote_ppoi_messages(); assert!( - remote_messages.is_empty(), + remote_ppoi_messages.is_empty(), "Remote messages should be empty" ); } diff --git a/test-runner/src/invalid_payload.rs b/test-runner/src/invalid_payload.rs index c4276c9..cd35215 100644 --- a/test-runner/src/invalid_payload.rs +++ b/test-runner/src/invalid_payload.rs @@ -1,4 +1,4 @@ -use poi_radio::state::PersistedState; +use subgraph_radio::state::PersistedState; use test_utils::{ config::{test_config, TestSenderConfig}, dummy_msg::DummyMsg, @@ -43,9 +43,9 @@ pub async fn invalid_payload_test() { teardown(process_manager, &store_path); - let remote_messages = persisted_state.remote_messages(); + let remote_ppoi_messages = persisted_state.remote_ppoi_messages(); assert!( - remote_messages.is_empty(), + remote_ppoi_messages.is_empty(), "Remote messages should be empty" ); } diff --git a/test-runner/src/invalid_sender.rs b/test-runner/src/invalid_sender.rs index 27c4d33..ff1fa5c 100644 --- a/test-runner/src/invalid_sender.rs +++ b/test-runner/src/invalid_sender.rs @@ -1,5 +1,5 @@ use graphcast_sdk::graphcast_agent::message_typing::IdentityValidation; -use poi_radio::state::PersistedState; +use subgraph_radio::state::PersistedState; use test_utils::{ config::{test_config, TestSenderConfig}, setup, teardown, @@ -40,9 +40,9 @@ pub async fn invalid_sender_test() { teardown(process_manager, &store_path); - let remote_messages = persisted_state.remote_messages(); + let remote_ppoi_messages = persisted_state.remote_ppoi_messages(); assert!( - remote_messages.is_empty(), + remote_ppoi_messages.is_empty(), "Remote messages should be empty" ); } diff --git a/test-runner/src/main.rs b/test-runner/src/main.rs index a20defd..99c8701 100644 --- a/test-runner/src/main.rs +++ b/test-runner/src/main.rs @@ -39,7 +39,7 @@ pub async fn main() { std::env::set_var( "RUST_LOG", - "off,hyper=off,graphcast_sdk=trace,poi_radio=trace,test_runner=trace,test_sender=trace,test_utils=trace", + "off,hyper=off,graphcast_sdk=trace,subgraph_radio=trace,test_runner=trace,test_sender=trace,test_utils=trace", ); init_tracing(config.log_format).expect("Could not set up global default subscriber for logger, check environmental variable `RUST_LOG` or the CLI input `log-level"); diff --git a/test-runner/src/message_handling.rs b/test-runner/src/message_handling.rs index 8ea95f4..60c5138 100644 --- a/test-runner/src/message_handling.rs +++ b/test-runner/src/message_handling.rs @@ -1,4 +1,4 @@ -use poi_radio::state::PersistedState; +use subgraph_radio::state::PersistedState; use test_utils::{ config::{test_config, TestSenderConfig}, messages_are_equal, payloads_are_equal, setup, teardown, @@ -48,7 +48,7 @@ pub async fn send_and_receive_test() { local_attestations, !local_attestations.is_empty() ); - let remote_messages = persisted_state.remote_messages(); + let remote_ppoi_messages = persisted_state.remote_ppoi_messages(); assert!( !local_attestations.is_empty(), @@ -66,7 +66,7 @@ pub async fn send_and_receive_test() { let test_hashes_remote = vec!["Qmdefault1AbcDEFghijKLmnoPQRstUVwxYzABCDEFghijklmnopq"]; for target_id in test_hashes_remote { - let has_target_id = remote_messages + let has_target_id = remote_ppoi_messages .iter() .any(|msg| msg.identifier == *target_id); assert!( @@ -76,16 +76,16 @@ pub async fn send_and_receive_test() { ); } - trace!("Num of remote messages {}", remote_messages.len()); + trace!("Num of remote messages {}", remote_ppoi_messages.len()); assert!( - remote_messages.len() >= 5, + remote_ppoi_messages.len() >= 5, "The number of remote messages should at least 5. Actual: {}", - remote_messages.len() + remote_ppoi_messages.len() ); - for (index, message1) in remote_messages.iter().enumerate() { - for message2 in remote_messages.iter().skip(index + 1) { + for (index, message1) in remote_ppoi_messages.iter().enumerate() { + for message2 in remote_ppoi_messages.iter().skip(index + 1) { if messages_are_equal(message1, message2) && payloads_are_equal(&message1.payload, &message2.payload) { diff --git a/test-runner/src/poi_divergent.rs b/test-runner/src/poi_divergent.rs index 647dd1f..4f463a2 100644 --- a/test-runner/src/poi_divergent.rs +++ b/test-runner/src/poi_divergent.rs @@ -1,4 +1,4 @@ -use poi_radio::{operator::attestation::ComparisonResultType, state::PersistedState}; +use subgraph_radio::{operator::attestation::ComparisonResultType, state::PersistedState}; use test_utils::{ config::{test_config, TestSenderConfig}, setup, teardown, diff --git a/test-runner/src/poi_match.rs b/test-runner/src/poi_match.rs index b2a314a..c6bd966 100644 --- a/test-runner/src/poi_match.rs +++ b/test-runner/src/poi_match.rs @@ -1,4 +1,4 @@ -use poi_radio::{operator::attestation::ComparisonResultType, state::PersistedState}; +use subgraph_radio::{operator::attestation::ComparisonResultType, state::PersistedState}; use test_utils::{ config::{test_config, TestSenderConfig}, setup, teardown, diff --git a/test-runner/src/topics.rs b/test-runner/src/topics.rs index df4fc14..b658124 100644 --- a/test-runner/src/topics.rs +++ b/test-runner/src/topics.rs @@ -1,4 +1,4 @@ -use poi_radio::state::PersistedState; +use subgraph_radio::state::PersistedState; use test_utils::{ config::{test_config, TestSenderConfig}, setup, teardown, @@ -45,17 +45,17 @@ pub async fn topics_test() { let persisted_state = PersistedState::load_cache(&config.persistence_file_path.unwrap()); debug!( local_attestations = tracing::field::debug(&persisted_state.local_attestations()), - remote_messages = tracing::field::debug(&persisted_state.remote_messages()), + remote_ppoi_messages = tracing::field::debug(&persisted_state.remote_ppoi_messages()), persisted_state = tracing::field::debug(&persisted_state), "loaded persisted state" ); let local_attestations = persisted_state.local_attestations(); - let remote_messages = persisted_state.remote_messages(); + let remote_ppoi_messages = persisted_state.remote_ppoi_messages(); debug!( local_attestations = tracing::field::debug(&local_attestations), - remote_messages = tracing::field::debug(&remote_messages), + remote_ppoi_messages = tracing::field::debug(&remote_ppoi_messages), "Starting topics_test" ); @@ -78,7 +78,7 @@ pub async fn topics_test() { ]; for target_id in test_hashes_remote { - let has_target_id = remote_messages + let has_target_id = remote_ppoi_messages .iter() .any(|msg| msg.identifier == *target_id); assert!( @@ -90,7 +90,7 @@ pub async fn topics_test() { let non_existent_test_hash = "QmonlyintestsenderXyZABCdeFgHIjklMNOpqrstuvWXYZabcdEFG"; - let has_non_existent_test_hash = remote_messages + let has_non_existent_test_hash = remote_ppoi_messages .iter() .any(|msg| msg.identifier == non_existent_test_hash); @@ -111,10 +111,10 @@ pub async fn topics_test() { let persisted_state = PersistedState::load_cache(&store_path); debug!("persisted state {:?}", persisted_state); - let remote_messages = persisted_state.remote_messages(); + let remote_ppoi_messages = persisted_state.remote_ppoi_messages(); let test_hash = "QmonlyintestsenderXyZABCdeFgHIjklMNOpqrstuvWXYZabcdEFG"; - let has_test_hash = remote_messages + let has_test_hash = remote_ppoi_messages .iter() .any(|msg| msg.identifier == test_hash); diff --git a/test-sender/Cargo.toml b/test-sender/Cargo.toml index 3968026..ca251a1 100644 --- a/test-sender/Cargo.toml +++ b/test-sender/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" authors = ["GraphOps (axiomatic-aardvark, hopeyen)"] description = "POI Radio e2e tests" license = "Apache-2.0" -repository = "https://github.com/graphops/poi-radio" +repository = "https://github.com/graphops/subgraph-radio" keywords = [ "graphprotocol", "data-integrity", @@ -24,7 +24,7 @@ categories = [ waku = { version = "0.1.1", package = "waku-bindings" } graphcast-sdk = "0.4.0" test-utils = { path = "../test-utils" } -poi-radio = { path = "../poi-radio" } +subgraph-radio = { path = "../subgraph-radio" } tokio = { version = "1.1.1", features = ["full", "rt"] } tracing = "0.1" tracing-subscriber = "0.3" diff --git a/test-sender/src/main.rs b/test-sender/src/main.rs index ff41c43..e1b6305 100644 --- a/test-sender/src/main.rs +++ b/test-sender/src/main.rs @@ -10,8 +10,8 @@ use graphcast_sdk::{ networks::NetworkName, wallet_address, }; -use poi_radio::messages::poi::PublicPoiMessage; use std::{net::IpAddr, str::FromStr, thread::sleep, time::Duration}; +use subgraph_radio::messages::poi::PublicPoiMessage; use test_utils::{config::TestSenderConfig, dummy_msg::DummyMsg, find_random_udp_port}; use tracing::{error, info}; use waku::{ @@ -21,7 +21,7 @@ use waku::{ async fn start_sender(config: TestSenderConfig) { std::env::set_var( "RUST_LOG", - "off,hyper=off,graphcast_sdk=trace,poi_radio=trace,test_sender=trace", + "off,hyper=off,graphcast_sdk=trace,subgraph_radio=trace,test_sender=trace", ); init_tracing("pretty".to_string()).expect("Could not set up global default subscriber for logger, check environmental variable `RUST_LOG` or the CLI input `log-level"); diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 2fdd740..19e467f 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -3,9 +3,9 @@ name = "test-utils" version = "0.0.1" edition = "2021" authors = ["GraphOps (axiomatic-aardvark, hopeyen)"] -description = "POI Radio e2e tests" +description = "Subgraph Radio e2e tests" license = "Apache-2.0" -repository = "https://github.com/graphops/poi-radio" +repository = "https://github.com/graphops/subgraph-radio" keywords = [ "graphprotocol", "data-integrity", @@ -23,7 +23,7 @@ categories = [ [dependencies] waku = { version = "0.1.1", package = "waku-bindings" } graphcast-sdk = "0.4.0" -poi-radio = { path = "../poi-radio" } +subgraph-radio = { path = "../subgraph-radio" } tokio = { version = "1.1.1", features = ["full", "rt"] } tracing = "0.1" tracing-subscriber = "0.3" diff --git a/test-utils/src/config.rs b/test-utils/src/config.rs index 5bec52f..39fc6b9 100644 --- a/test-utils/src/config.rs +++ b/test-utils/src/config.rs @@ -1,7 +1,7 @@ use clap::{ArgSettings, Parser}; use graphcast_sdk::graphcast_agent::message_typing::IdentityValidation; -use poi_radio::config::{Config, CoverageLevel}; use serde::{Deserialize, Serialize}; +use subgraph_radio::config::{Config, CoverageLevel}; #[derive(Clone, Debug, Parser, Serialize, Deserialize)] #[clap(name = "test-sender", about = "Mock message sender")] @@ -42,7 +42,7 @@ pub fn test_config() -> Config { waku_addr: None, boot_node_addresses: vec![], waku_log_level: None, - log_level: "off,hyper=off,graphcast_sdk=trace,poi_radio=trace,test_runner=trace" + log_level: "off,hyper=off,graphcast_sdk=trace,subgraph_radio=trace,test_runner=trace" .to_string(), slack_token: None, slack_channel: None, diff --git a/test-utils/src/dummy_msg.rs b/test-utils/src/dummy_msg.rs index 537e599..6665832 100644 --- a/test-utils/src/dummy_msg.rs +++ b/test-utils/src/dummy_msg.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; #[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, SimpleObject)] #[eip712( - name = "Graphcast POI Radio Dummy Msg", + name = "Graphcast Subgraph Radio Dummy Msg", version = "0", chain_id = 1, verifying_contract = "0xc944e90c64b2c07662a292be6244bdf05cda44a7" diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 9338b57..1ec7eee 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -12,12 +12,12 @@ use config::TestSenderConfig; use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage; use graphcast_sdk::graphcast_agent::message_typing::IdentityValidation; use mock_server::{start_mock_server, ServerState}; -use poi_radio::{ +use prost::Message; +use rand::Rng; +use subgraph_radio::{ config::{Config, CoverageLevel}, messages::poi::PublicPoiMessage, }; -use prost::Message; -use rand::Rng; use tracing::info; pub mod config; @@ -163,7 +163,7 @@ pub fn start_radio(config: &Config) -> Child { Command::new("cargo") .arg("run") .arg("-p") - .arg("poi-radio") + .arg("subgraph-radio") .arg("--") .arg("--graph-node-endpoint") .arg(&config.graph_node_endpoint)