From c137a9dfca471d5e698ec1c697d066fd798222c9 Mon Sep 17 00:00:00 2001 From: Bartosz Nowak Date: Sun, 14 Apr 2024 13:22:42 +0200 Subject: [PATCH 1/4] streams&refactor --- Cargo.toml | 17 +-- crates/common/Cargo.toml | 1 + crates/common/src/lib.rs | 2 + crates/{peer => common}/src/network.rs | 2 - crates/common/src/topic.rs | 25 ++++ crates/delegator/Cargo.toml | 4 +- crates/delegator/src/main.rs | 42 +++--- crates/executor/Cargo.toml | 4 +- crates/executor/readme.md | 1 - crates/executor/src/main.rs | 41 ++++-- crates/peer/Cargo.toml | 10 +- crates/peer/src/lib.rs | 3 - crates/peer/src/node.rs | 64 --------- crates/peer/src/registry.rs | 65 ++++----- crates/peer/src/store.rs | 22 --- crates/peer/src/swarm.rs | 191 ++++++++++--------------- 16 files changed, 201 insertions(+), 293 deletions(-) rename crates/{peer => common}/src/network.rs (84%) create mode 100644 crates/common/src/topic.rs delete mode 100644 crates/executor/readme.md delete mode 100644 crates/peer/src/node.rs delete mode 100644 crates/peer/src/store.rs diff --git a/Cargo.toml b/Cargo.toml index cccf005..083d97e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,22 +20,15 @@ license-file = "LICENSE" [workspace.dependencies] async-process = "2.2.0" +async-stream = "0.3.5" cairo-felt = "0.9.1" cairo-proof-parser = { git = "https://github.com/Okm165/cairo-proof-parser", rev = "97a04bbee07330311b38d6f4cecfed3acb237626" } futures = "0.3.30" +futures-core = "0.3.30" +futures-util = "0.3.30" hex = "0.4.3" itertools = "0.12.1" -libp2p = { version = "0.53.2", features = [ - "tokio", - "gossipsub", - "kad", - "mdns", - "noise", - "macros", - "tcp", - "yamux", - "quic", -] } +libp2p = { version = "0.53.2", features = ["tokio","gossipsub","kad","mdns","noise","macros","tcp","yamux","quic"]} num-bigint = "0.4.4" serde = "1.0.197" serde_json = "1.0.115" @@ -49,5 +42,5 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } sharp-p2p-common = { path = "crates/common" } sharp-p2p-delegator = { path = "crates/delegator" } sharp-p2p-executor = { path = "crates/executor" } -sharp-p2p-prover = { path = "crates/prover" } sharp-p2p-peer = { path = "crates/peer" } +sharp-p2p-prover = { path = "crates/prover" } diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 7869fea..6d964e3 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -10,6 +10,7 @@ license-file.workspace = true [dependencies] cairo-felt.workspace = true hex.workspace = true +libp2p.workspace = true num-bigint.workspace = true serde_json.workspace = true serde.workspace = true diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index a914345..e83a2cf 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,3 +1,5 @@ pub mod job; pub mod job_witness; +pub mod network; +pub mod topic; pub mod vec252; diff --git a/crates/peer/src/network.rs b/crates/common/src/network.rs similarity index 84% rename from crates/peer/src/network.rs rename to crates/common/src/network.rs index 262b072..159dca5 100644 --- a/crates/peer/src/network.rs +++ b/crates/common/src/network.rs @@ -1,8 +1,6 @@ #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Network { - /// Starknet mainnet. Mainnet, - /// Sepolia testnet. Sepolia, } diff --git a/crates/common/src/topic.rs b/crates/common/src/topic.rs new file mode 100644 index 0000000..90d347a --- /dev/null +++ b/crates/common/src/topic.rs @@ -0,0 +1,25 @@ +use libp2p::gossipsub::IdentTopic; + +use crate::network::Network; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Topic { + NewJob, + PickedJob, +} + +impl Topic { + pub fn as_str(&self) -> &'static str { + match self { + Topic::NewJob => "new-job", + Topic::PickedJob => "picked-job", + } + } +} + +pub fn gossipsub_ident_topic(network: Network, topic: Topic) -> IdentTopic { + let network = network.as_str(); + let topic = topic.as_str(); + let s = format!("/{network}/{topic}"); + IdentTopic::new(s) +} diff --git a/crates/delegator/Cargo.toml b/crates/delegator/Cargo.toml index 5d1eaf8..889f138 100644 --- a/crates/delegator/Cargo.toml +++ b/crates/delegator/Cargo.toml @@ -8,8 +8,10 @@ license-file.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +futures-util.workspace = true libp2p.workspace = true +sharp-p2p-common.workspace = true +sharp-p2p-peer.workspace = true tokio.workspace = true tracing-subscriber.workspace = true tracing.workspace = true -sharp-p2p-peer.workspace = true diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index 313a737..d3618a8 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -1,9 +1,11 @@ -use sharp_p2p_peer::network::Network; -use sharp_p2p_peer::node::{Node, NodeConfig, NodeType}; -use sharp_p2p_peer::store::Store; +use futures_util::StreamExt; +use sharp_p2p_common::network::Network; +use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic}; +use sharp_p2p_peer::registry::RegistryHandler; +use sharp_p2p_peer::swarm::SwarmRunner; use std::error::Error; -use std::time::Duration; -use tokio::time::sleep; +use tokio::sync::mpsc; +use tracing::debug; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -13,19 +15,27 @@ async fn main() -> Result<(), Box> { // 1. Generate keypair for the node let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519(); - // 2. Initiate a new node to sync with other peers - let store = Store::new(); - let node_config = NodeConfig::new( - NodeType::Delegator, - Network::Sepolia, - p2p_local_keypair, - Vec::new(), - store, + // 2. Generate topic + let topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob); + + let swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; + let mut registry_handler = RegistryHandler::new( + "https://starknet-sepolia.public.blastapi.io", + "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", ); - let node = Node::new(node_config).await.unwrap(); - println!("node: {:?}", node); + + let (_send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); + let mut message_stream = swarm_runner.run(topic, send_topic_rx); + let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]); loop { - sleep(Duration::from_secs(1)).await; + tokio::select! { + Some(event) = message_stream.next() => { + debug!("{:?}", event); + }, + Some(Ok(event_vec)) = event_stream.next() => { + debug!("{:?}", event_vec); + }, + } } } diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml index 7aabc73..152c39a 100644 --- a/crates/executor/Cargo.toml +++ b/crates/executor/Cargo.toml @@ -8,8 +8,10 @@ license-file.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +futures-util.workspace = true libp2p.workspace = true +sharp-p2p-common.workspace = true +sharp-p2p-peer.workspace = true tokio.workspace = true tracing-subscriber.workspace = true tracing.workspace = true -sharp-p2p-peer.workspace = true diff --git a/crates/executor/readme.md b/crates/executor/readme.md deleted file mode 100644 index d7f3de2..0000000 --- a/crates/executor/readme.md +++ /dev/null @@ -1 +0,0 @@ -## Executor diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 7fd571d..54d20df 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -1,11 +1,11 @@ -use sharp_p2p_peer::network::Network; -use sharp_p2p_peer::node::{Node, NodeConfig, NodeType}; -use sharp_p2p_peer::store::Store; +use futures_util::StreamExt; +use sharp_p2p_common::network::Network; +use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic}; +use sharp_p2p_peer::registry::RegistryHandler; +use sharp_p2p_peer::swarm::SwarmRunner; use std::error::Error; - -use std::time::Duration; -use tokio::time::sleep; - +use tokio::sync::mpsc; +use tracing::debug; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -15,14 +15,27 @@ async fn main() -> Result<(), Box> { // 1. Generate keypair for the node let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519(); - // 2. Initiate a new node to sync with other peers - let store = Store::new(); - let node_config = - NodeConfig::new(NodeType::Executor, Network::Sepolia, p2p_local_keypair, Vec::new(), store); - let node = Node::new(node_config).await.unwrap(); - println!("node: {:?}", node); + // 2. Generate topic + let topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob); + + let swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; + let mut registry_handler = RegistryHandler::new( + "https://starknet-sepolia.public.blastapi.io", + "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", + ); + + let (_send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); + let mut message_stream = swarm_runner.run(topic, send_topic_rx); + let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]); loop { - sleep(Duration::from_secs(1)).await; + tokio::select! { + Some(event) = message_stream.next() => { + debug!("{:?}", event); + }, + Some(Ok(event_vec)) = event_stream.next() => { + debug!("{:?}", event_vec); + }, + } } } diff --git a/crates/peer/Cargo.toml b/crates/peer/Cargo.toml index d3a73de..d0f47bf 100644 --- a/crates/peer/Cargo.toml +++ b/crates/peer/Cargo.toml @@ -8,9 +8,13 @@ license-file.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-stream.workspace = true +futures-core.workspace = true +futures-util.workspace = true +futures.workspace = true libp2p.workspace = true -tokio.workspace = true -tracing-subscriber.workspace = true -tracing.workspace = true sharp-p2p-common.workspace = true starknet.workspace = true +tokio.workspace = true +tracing-subscriber.workspace = true +tracing.workspace = true \ No newline at end of file diff --git a/crates/peer/src/lib.rs b/crates/peer/src/lib.rs index 31c83b7..eb9839a 100644 --- a/crates/peer/src/lib.rs +++ b/crates/peer/src/lib.rs @@ -1,5 +1,2 @@ -pub mod network; -pub mod node; pub mod registry; -pub mod store; pub mod swarm; diff --git a/crates/peer/src/node.rs b/crates/peer/src/node.rs deleted file mode 100644 index a40272c..0000000 --- a/crates/peer/src/node.rs +++ /dev/null @@ -1,64 +0,0 @@ -use libp2p::identity::Keypair; -use libp2p::Multiaddr; -use std::error::Error; -use std::sync::Arc; - -use crate::network::Network; -use crate::registry::RegistryHandler; -use crate::store::Store; -use crate::swarm::SwarmRunner; - -pub enum NodeType { - Delegator, - Executor, -} - -pub struct NodeConfig { - pub node_type: NodeType, - /// An id of the network to connect to. - pub network: Network, - /// The keypair to be used as [`Node`]s identity. - pub p2p_local_keypair: Keypair, - /// List of the addresses where [`Node`] will listen for incoming connections. - pub p2p_listen_on: Vec, - /// The store for job record. - pub store: Store, -} - -impl NodeConfig { - pub fn new( - node_type: NodeType, - network: Network, - p2p_local_keypair: Keypair, - p2p_listen_on: Vec, - store: Store, - ) -> Self { - Self { node_type, network, p2p_local_keypair, p2p_listen_on, store } - } -} - -#[derive(Debug)] -pub struct Node { - pub store: Arc, -} - -impl Node { - pub async fn new(node_config: NodeConfig) -> Result> { - let mut swarm_runner = SwarmRunner::new(&node_config)?; - let registry_handler = RegistryHandler::new( - "https://starknet-sepolia.public.blastapi.io", - "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", - ); - // Node should run swarm runner and registry handler concurrently. - tokio::spawn(async move { - swarm_runner.run(node_config.node_type).await; - }); - tokio::spawn(async move { - registry_handler.run().await; - }); - - let store = Arc::new(node_config.store); - - Ok(Self { store }) - } -} diff --git a/crates/peer/src/registry.rs b/crates/peer/src/registry.rs index ff12311..fb959e5 100644 --- a/crates/peer/src/registry.rs +++ b/crates/peer/src/registry.rs @@ -1,26 +1,30 @@ -use std::error::Error; - +use async_stream::try_stream; +use futures_core::stream::Stream; use starknet::{ core::types::{BlockId, EmittedEvent, EventFilter, FieldElement}, providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider, Url}, }; +use std::{error::Error, pin::Pin}; +use tracing::trace; + +const EVENT_SCRAPE_INTERVAL: u64 = 2; pub struct RegistryHandler { pub provider: JsonRpcClient, - address: FieldElement, + contract_address: FieldElement, + last_block_number: u64, } impl RegistryHandler { - pub fn new(url: &str, address: &str) -> Self { + pub fn new(url: &str, contract_address: &str) -> Self { let provider = JsonRpcClient::new(HttpTransport::new(Url::parse(url).unwrap())); - let address = FieldElement::from_hex_be(address).unwrap(); - Self { provider, address } + let contract_address = FieldElement::from_hex_be(contract_address).unwrap(); + Self { provider, contract_address, last_block_number: 0 } } async fn scrape_event( - &self, + &mut self, event_keys: Vec, - from_block: u64, ) -> Result, Box> { let keys = event_keys .iter() @@ -30,43 +34,32 @@ impl RegistryHandler { let latest_block_number = self.provider.block_number().await?; let filter = EventFilter { - from_block: Some(BlockId::Number(from_block)), + from_block: Some(BlockId::Number(self.last_block_number)), to_block: Some(BlockId::Number(latest_block_number)), - address: Some(self.address), + address: Some(self.contract_address), keys: Some(vec![keys.clone()]), }; let events = self.provider.get_events(filter, None, 1000).await?.events; + self.last_block_number = latest_block_number; Ok(events) } - pub async fn run(&self) { - // Create an interval of every 5 seconds - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); - - loop { - interval.tick().await; - - println!("Scraping events..."); - - // Scrape the event - let result = self - .scrape_event( - vec!["0x17ef19eae2188756c1689ef60586c692a3aee6fecc18ee1b21f3028f75b9988" - .to_string()], - 0, - ) - .await; - - // Handle the result - match result { - Ok(events) => { - println!("{} Events Found", events.len()); - } - Err(e) => { - eprintln!("Error scraping events: {:?}", e); + pub fn subscribe_events( + &mut self, + event_keys: Vec, + ) -> Pin, Box>>>> { + let stream = try_stream! { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(EVENT_SCRAPE_INTERVAL)); + loop { + interval.tick().await; + trace!("Scraping events..."); + let events = self.scrape_event(event_keys.clone()).await?; + if !events.is_empty() { + yield events } } - } + }; + Box::pin(stream) } } diff --git a/crates/peer/src/store.rs b/crates/peer/src/store.rs deleted file mode 100644 index 7dcfb18..0000000 --- a/crates/peer/src/store.rs +++ /dev/null @@ -1,22 +0,0 @@ -use sharp_p2p_common::job::Job; - -use std::collections::VecDeque; - -#[derive(Debug)] -pub struct Store { - /// For delegator, FIFO queue to publish message - /// For executor, FIFO queue to prove job - pub job_queue: VecDeque, -} - -impl Store { - pub fn new() -> Self { - Self { job_queue: VecDeque::new() } - } -} - -impl Default for Store { - fn default() -> Self { - Self::new() - } -} diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index 2730842..6947872 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -1,14 +1,14 @@ -use std::error::Error; - +use async_stream::stream; +use futures_core::stream::Stream; use libp2p::futures::StreamExt; use libp2p::gossipsub::{self, IdentTopic}; -use libp2p::multiaddr::Protocol; +use libp2p::identity::Keypair; use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; -use libp2p::{mdns, noise, tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder}; -use tokio::io::{self, AsyncBufReadExt}; - -use crate::network::Network; -use crate::node::{NodeConfig, NodeType}; +use libp2p::{mdns, noise, tcp, yamux, Swarm, SwarmBuilder}; +use std::error::Error; +use std::pin::Pin; +use tokio::sync::mpsc; +use tracing::{debug, error}; #[derive(NetworkBehaviour)] struct PeerBehaviour { @@ -16,41 +16,22 @@ struct PeerBehaviour { mdns: mdns::tokio::Behaviour, } -pub enum Topic { - NewJob, - PickedJob, -} - -impl Topic { - pub fn as_str(&self) -> &'static str { - match self { - Topic::NewJob => "new-job", - Topic::PickedJob => "picked-job", - } - } -} - -pub(crate) fn gossipsub_ident_topic(network: Network, topic: Topic) -> IdentTopic { - let network = network.as_str(); - let topic = topic.as_str(); - let s = format!("/{network}/{topic}"); - IdentTopic::new(s) -} - pub struct SwarmRunner { swarm: Swarm, - network: Network, } impl SwarmRunner { - pub fn new(node_config: &NodeConfig) -> Result> { + pub fn new( + p2p_local_keypair: &Keypair, + subscribe_topic: &IdentTopic, + ) -> Result> { let mdns = mdns::tokio::Behaviour::new( mdns::Config::default(), - node_config.p2p_local_keypair.public().to_peer_id(), + p2p_local_keypair.public().to_peer_id(), )?; - let gossipsub = init_gossip(node_config)?; + let gossipsub = Self::init_gossip(p2p_local_keypair, subscribe_topic)?; let behaviour = PeerBehaviour { gossipsub, mdns }; - let local_keypair = node_config.p2p_local_keypair.clone(); + let local_keypair = p2p_local_keypair.clone(); let mut swarm = SwarmBuilder::with_existing_identity(local_keypair) .with_tokio() .with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)? @@ -62,98 +43,72 @@ impl SwarmRunner { swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - Ok(SwarmRunner { swarm, network: node_config.network }) + Ok(SwarmRunner { swarm }) } - pub async fn run(&mut self, node_type: NodeType) { - // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); - - let publish_topic = match node_type { - NodeType::Delegator => gossipsub_ident_topic(self.network, Topic::NewJob), - NodeType::Executor => gossipsub_ident_topic(self.network, Topic::PickedJob), - }; + fn init_gossip( + p2p_local_keypair: &Keypair, + subscribe_topic: &IdentTopic, + ) -> Result> { + let message_authenticity = + gossipsub::MessageAuthenticity::Signed(p2p_local_keypair.clone()); + let config = gossipsub::ConfigBuilder::default() + .validation_mode(gossipsub::ValidationMode::Strict) + .validate_messages() + .build() + .unwrap(); + let mut gossipsub: gossipsub::Behaviour = + gossipsub::Behaviour::new(message_authenticity, config).unwrap(); + + gossipsub.subscribe(subscribe_topic)?; + + Ok(gossipsub) + } - loop { - tokio::select! { - Ok(Some(line)) = stdin.next_line() => { - println!("Publishing to topic: {:?}", publish_topic); - if let Err(e) = self.swarm - .behaviour_mut().gossipsub - .publish(publish_topic.clone(), line.as_bytes()) { - println!("Publish error: {e:?}"); - } - }, - event = self.swarm.select_next_some() => match event { - SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => { - for (peer_id, _multiaddr) in list { - println!("mDNS discovered a new peer: {peer_id}"); - self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + pub fn run( + mut self, + send_topic: IdentTopic, + mut send_topic_rx: mpsc::Receiver>, + ) -> Pin>> { + let stream = stream! { + loop { + tokio::select! { + Some(data) = send_topic_rx.recv() => { + debug!("Publishing to topic: {:?}", send_topic); + if let Err(e) = self.swarm + .behaviour_mut().gossipsub + .publish(send_topic.clone(), data) { + error!("Publish error: {e:?}"); } }, - SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Expired(list))) => { - for (peer_id, _multiaddr) in list { - println!("mDNS discover peer has expired: {peer_id}"); - self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); + event = self.swarm.select_next_some() => match event { + SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => { + for (peer_id, _multiaddr) in list { + debug!("mDNS discovered a new peer: {peer_id}"); + self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + } + }, + SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Expired(list))) => { + for (peer_id, _multiaddr) in list { + debug!("mDNS discover peer has expired: {peer_id}"); + self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); + } + }, + SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { + propagation_source, + message_id, + message, + })) => { + yield gossipsub::Event::Message { propagation_source, message_id, message }; + }, + SwarmEvent::NewListenAddr { address, .. } => { + debug!("Local node is listening on {address}"); } - }, - SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { - propagation_source: peer_id, - message_id: id, - message, - })) => println!( - "Got message: '{}' with id: {id} from peer: {peer_id}", - String::from_utf8_lossy(&message.data), - ), - SwarmEvent::NewListenAddr { address, .. } => { - println!("Local node is listening on {address}"); + _ => {} } - _ => {} } } - } - } -} - -fn init_gossip(node_config: &NodeConfig) -> Result> { - let message_authenticity = - gossipsub::MessageAuthenticity::Signed(node_config.p2p_local_keypair.clone()); - let config = gossipsub::ConfigBuilder::default() - .validation_mode(gossipsub::ValidationMode::Strict) - .validate_messages() - .build() - .unwrap(); - let mut gossipsub: gossipsub::Behaviour = - gossipsub::Behaviour::new(message_authenticity, config).unwrap(); - - // `new-job` is the topic about new job to be proven - let new_job_topic = gossipsub_ident_topic(node_config.network, Topic::NewJob); - // `picked-job` is the topic about picked job to processing prover - let picked_job_topic = gossipsub_ident_topic(node_config.network, Topic::PickedJob); - - match node_config.node_type { - NodeType::Delegator => { - println!("Delegator: Subscribed no topic"); - } - NodeType::Executor => { - gossipsub.subscribe(&picked_job_topic)?; - gossipsub.subscribe(&new_job_topic)?; - println!("Executor: Subscribed to topic: {:?}, {:?}", new_job_topic, picked_job_topic); - } - } - - Ok(gossipsub) -} - -pub(crate) trait MultiaddrExt { - fn peer_id(&self) -> Option; -} - -impl MultiaddrExt for Multiaddr { - fn peer_id(&self) -> Option { - self.iter().find_map(|proto| match proto { - Protocol::P2p(peer_id) => Some(peer_id), - _ => None, - }) + }; + Box::pin(stream) } } From 9d7f3cb80855374b5bee1b1172eb8d039e76bba2 Mon Sep 17 00:00:00 2001 From: Bartosz Nowak Date: Sun, 14 Apr 2024 14:00:44 +0200 Subject: [PATCH 2/4] handling termination --- crates/delegator/src/main.rs | 5 ++++- crates/executor/src/main.rs | 5 ++++- crates/peer/src/swarm.rs | 7 ++++--- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index d3618a8..dc14300 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -18,7 +18,7 @@ async fn main() -> Result<(), Box> { // 2. Generate topic let topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob); - let swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; + let mut swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; let mut registry_handler = RegistryHandler::new( "https://starknet-sepolia.public.blastapi.io", "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", @@ -36,6 +36,9 @@ async fn main() -> Result<(), Box> { Some(Ok(event_vec)) = event_stream.next() => { debug!("{:?}", event_vec); }, + else => break } } + + Ok(()) } diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 54d20df..2709a79 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -18,7 +18,7 @@ async fn main() -> Result<(), Box> { // 2. Generate topic let topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob); - let swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; + let mut swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; let mut registry_handler = RegistryHandler::new( "https://starknet-sepolia.public.blastapi.io", "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", @@ -36,6 +36,9 @@ async fn main() -> Result<(), Box> { Some(Ok(event_vec)) = event_stream.next() => { debug!("{:?}", event_vec); }, + else => break } } + + Ok(()) } diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index 6947872..78d9c3e 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -66,10 +66,10 @@ impl SwarmRunner { } pub fn run( - mut self, + &mut self, send_topic: IdentTopic, mut send_topic_rx: mpsc::Receiver>, - ) -> Pin>> { + ) -> Pin + '_>> { let stream = stream! { loop { tokio::select! { @@ -105,7 +105,8 @@ impl SwarmRunner { debug!("Local node is listening on {address}"); } _ => {} - } + }, + else => break } } }; From d2633d5868f2d63aa58d9ff7322a3716883cefbd Mon Sep 17 00:00:00 2001 From: Bartosz Nowak Date: Sun, 14 Apr 2024 14:26:32 +0200 Subject: [PATCH 3/4] gracefull drop --- Cargo.toml | 1 + crates/peer/Cargo.toml | 1 + crates/peer/src/swarm.rs | 14 ++++++++++++-- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 083d97e..23d6cb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ starknet = "0.9.0" tempfile = "3.10.1" thiserror = "1.0.58" tokio = { version = "1.36", features = ["full"] } +tokio-util = "0.7.10" tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/peer/Cargo.toml b/crates/peer/Cargo.toml index d0f47bf..88dbf99 100644 --- a/crates/peer/Cargo.toml +++ b/crates/peer/Cargo.toml @@ -15,6 +15,7 @@ futures.workspace = true libp2p.workspace = true sharp-p2p-common.workspace = true starknet.workspace = true +tokio-util.workspace = true tokio.workspace = true tracing-subscriber.workspace = true tracing.workspace = true \ No newline at end of file diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index 78d9c3e..aea6100 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -8,6 +8,7 @@ use libp2p::{mdns, noise, tcp, yamux, Swarm, SwarmBuilder}; use std::error::Error; use std::pin::Pin; use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; use tracing::{debug, error}; #[derive(NetworkBehaviour)] @@ -18,6 +19,7 @@ struct PeerBehaviour { pub struct SwarmRunner { swarm: Swarm, + cancellation_token: CancellationToken, } impl SwarmRunner { @@ -43,7 +45,7 @@ impl SwarmRunner { swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - Ok(SwarmRunner { swarm }) + Ok(SwarmRunner { swarm, cancellation_token: CancellationToken::new() }) } fn init_gossip( @@ -106,10 +108,18 @@ impl SwarmRunner { } _ => {} }, - else => break + _ = self.cancellation_token.cancelled() => { + break + } } } }; Box::pin(stream) } } + +impl Drop for SwarmRunner { + fn drop(&mut self) { + self.cancellation_token.cancel(); + } +} From acb9ac172e8b7b433db93ed97a5536aa0816e79a Mon Sep 17 00:00:00 2001 From: Bartosz Nowak Date: Sun, 14 Apr 2024 15:42:55 +0200 Subject: [PATCH 4/4] gossip fix stdin testing --- .cargo/config.toml | 2 ++ crates/delegator/src/main.rs | 23 ++++++++++++++++------- crates/executor/src/main.rs | 23 ++++++++++++++++------- crates/peer/src/swarm.rs | 30 ++++++++++++++---------------- 4 files changed, 48 insertions(+), 30 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..f4ab6e5 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[env] +RUST_LOG = "info" \ No newline at end of file diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index dc14300..08723aa 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -4,8 +4,9 @@ use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic}; use sharp_p2p_peer::registry::RegistryHandler; use sharp_p2p_peer::swarm::SwarmRunner; use std::error::Error; +use tokio::io::{stdin, AsyncBufReadExt, BufReader}; use tokio::sync::mpsc; -use tracing::debug; +use tracing::info; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -16,25 +17,33 @@ async fn main() -> Result<(), Box> { let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519(); // 2. Generate topic - let topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob); + let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob); + let picked_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob); - let mut swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; + let mut swarm_runner = + SwarmRunner::new(&p2p_local_keypair, &[new_job_topic.to_owned(), picked_job_topic])?; let mut registry_handler = RegistryHandler::new( "https://starknet-sepolia.public.blastapi.io", "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", ); - let (_send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); - let mut message_stream = swarm_runner.run(topic, send_topic_rx); + let (send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); + let mut message_stream = swarm_runner.run(new_job_topic, send_topic_rx); let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]); + // Read full lines from stdin + let mut stdin = BufReader::new(stdin()).lines(); + loop { tokio::select! { + Ok(Some(line)) = stdin.next_line() => { + send_topic_tx.send(line.as_bytes().to_vec()).await?; + }, Some(event) = message_stream.next() => { - debug!("{:?}", event); + info!("{:?}", event); }, Some(Ok(event_vec)) = event_stream.next() => { - debug!("{:?}", event_vec); + info!("{:?}", event_vec); }, else => break } diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 2709a79..8660dc8 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -4,8 +4,9 @@ use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic}; use sharp_p2p_peer::registry::RegistryHandler; use sharp_p2p_peer::swarm::SwarmRunner; use std::error::Error; +use tokio::io::{stdin, AsyncBufReadExt, BufReader}; use tokio::sync::mpsc; -use tracing::debug; +use tracing::info; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -16,25 +17,33 @@ async fn main() -> Result<(), Box> { let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519(); // 2. Generate topic - let topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob); + let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob); + let picked_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob); - let mut swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; + let mut swarm_runner = + SwarmRunner::new(&p2p_local_keypair, &[new_job_topic, picked_job_topic.to_owned()])?; let mut registry_handler = RegistryHandler::new( "https://starknet-sepolia.public.blastapi.io", "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", ); - let (_send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); - let mut message_stream = swarm_runner.run(topic, send_topic_rx); + let (send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); + let mut message_stream = swarm_runner.run(picked_job_topic, send_topic_rx); let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]); + // Read full lines from stdin + let mut stdin = BufReader::new(stdin()).lines(); + loop { tokio::select! { + Ok(Some(line)) = stdin.next_line() => { + send_topic_tx.send(line.as_bytes().to_vec()).await?; + }, Some(event) = message_stream.next() => { - debug!("{:?}", event); + info!("{:?}", event); }, Some(Ok(event_vec)) = event_stream.next() => { - debug!("{:?}", event_vec); + info!("{:?}", event_vec); }, else => break } diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index aea6100..52ee51a 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -7,6 +7,7 @@ use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; use libp2p::{mdns, noise, tcp, yamux, Swarm, SwarmBuilder}; use std::error::Error; use std::pin::Pin; +use std::time::Duration; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; @@ -25,46 +26,43 @@ pub struct SwarmRunner { impl SwarmRunner { pub fn new( p2p_local_keypair: &Keypair, - subscribe_topic: &IdentTopic, + subscribe_topics: &[IdentTopic], ) -> Result> { let mdns = mdns::tokio::Behaviour::new( mdns::Config::default(), p2p_local_keypair.public().to_peer_id(), )?; - let gossipsub = Self::init_gossip(p2p_local_keypair, subscribe_topic)?; + let gossipsub = Self::init_gossip(p2p_local_keypair)?; let behaviour = PeerBehaviour { gossipsub, mdns }; let local_keypair = p2p_local_keypair.clone(); let mut swarm = SwarmBuilder::with_existing_identity(local_keypair) .with_tokio() .with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)? .with_quic() - .with_behaviour(|_| behaviour) - .expect("Moving behaviour doesn't fail") + .with_behaviour(|_| behaviour)? + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) .build(); + for topic in subscribe_topics { + swarm.behaviour_mut().gossipsub.subscribe(topic)?; + } + swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; Ok(SwarmRunner { swarm, cancellation_token: CancellationToken::new() }) } - fn init_gossip( - p2p_local_keypair: &Keypair, - subscribe_topic: &IdentTopic, - ) -> Result> { + fn init_gossip(p2p_local_keypair: &Keypair) -> Result> { let message_authenticity = gossipsub::MessageAuthenticity::Signed(p2p_local_keypair.clone()); + let config = gossipsub::ConfigBuilder::default() + .heartbeat_interval(Duration::from_secs(10)) .validation_mode(gossipsub::ValidationMode::Strict) - .validate_messages() - .build() - .unwrap(); - let mut gossipsub: gossipsub::Behaviour = - gossipsub::Behaviour::new(message_authenticity, config).unwrap(); - - gossipsub.subscribe(subscribe_topic)?; + .build()?; - Ok(gossipsub) + Ok(gossipsub::Behaviour::new(message_authenticity, config)?) } pub fn run(