diff --git a/Cargo.toml b/Cargo.toml index 4eecb62..cccf005 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,13 @@ overflow-checks = true [workspace] resolver = "2" -members = [ "crates/common", "crates/delegator", "crates/executor", "crates/peer" , "crates/prover"] +members = [ + "crates/common", + "crates/delegator", + "crates/executor", + "crates/peer", + "crates/prover", +] exclude = [] [workspace.package] @@ -15,14 +21,25 @@ license-file = "LICENSE" [workspace.dependencies] async-process = "2.2.0" cairo-felt = "0.9.1" -cairo-proof-parser = { git = "https://github.com/Okm165/cairo-proof-parser", rev = "97a04bbee07330311b38d6f4cecfed3acb237626"} +cairo-proof-parser = { git = "https://github.com/Okm165/cairo-proof-parser", rev = "97a04bbee07330311b38d6f4cecfed3acb237626" } futures = "0.3.30" hex = "0.4.3" itertools = "0.12.1" -libp2p = { version = "0.53.2", features = [ "tokio", "gossipsub", "kad", "mdns", "noise", "macros", "tcp", "yamux", "quic"] } +libp2p = { version = "0.53.2", features = [ + "tokio", + "gossipsub", + "kad", + "mdns", + "noise", + "macros", + "tcp", + "yamux", + "quic", +] } num-bigint = "0.4.4" serde = "1.0.197" serde_json = "1.0.115" +starknet = "0.9.0" tempfile = "3.10.1" thiserror = "1.0.58" tokio = { version = "1.36", features = ["full"] } @@ -32,4 +49,5 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } sharp-p2p-common = { path = "crates/common" } sharp-p2p-delegator = { path = "crates/delegator" } sharp-p2p-executor = { path = "crates/executor" } -sharp-p2p-prover = { path = "crates/prover" } \ No newline at end of file +sharp-p2p-prover = { path = "crates/prover" } +sharp-p2p-peer = { path = "crates/peer" } diff --git a/crates/delegator/Cargo.toml b/crates/delegator/Cargo.toml index 7a5172c..5d1eaf8 100644 --- a/crates/delegator/Cargo.toml +++ b/crates/delegator/Cargo.toml @@ -8,6 +8,8 @@ license-file.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +libp2p.workspace = true tokio.workspace = true tracing-subscriber.workspace = true -tracing.workspace = true \ No newline at end of file +tracing.workspace = true +sharp-p2p-peer.workspace = true diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index e7a11a9..313a737 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -1,3 +1,31 @@ -fn main() { - println!("Hello, world!"); +use sharp_p2p_peer::network::Network; +use sharp_p2p_peer::node::{Node, NodeConfig, NodeType}; +use sharp_p2p_peer::store::Store; +use std::error::Error; +use std::time::Duration; +use tokio::time::sleep; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); + + // 1. Generate keypair for the node + let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519(); + + // 2. Initiate a new node to sync with other peers + let store = Store::new(); + let node_config = NodeConfig::new( + NodeType::Delegator, + Network::Sepolia, + p2p_local_keypair, + Vec::new(), + store, + ); + let node = Node::new(node_config).await.unwrap(); + println!("node: {:?}", node); + + loop { + sleep(Duration::from_secs(1)).await; + } } diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml index 39ee5e3..7aabc73 100644 --- a/crates/executor/Cargo.toml +++ b/crates/executor/Cargo.toml @@ -8,6 +8,8 @@ license-file.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +libp2p.workspace = true tokio.workspace = true tracing-subscriber.workspace = true -tracing.workspace = true \ No newline at end of file +tracing.workspace = true +sharp-p2p-peer.workspace = true diff --git a/crates/executor/readme.md b/crates/executor/readme.md new file mode 100644 index 0000000..d7f3de2 --- /dev/null +++ b/crates/executor/readme.md @@ -0,0 +1 @@ +## Executor diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index e7a11a9..7fd571d 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -1,3 +1,28 @@ -fn main() { - println!("Hello, world!"); +use sharp_p2p_peer::network::Network; +use sharp_p2p_peer::node::{Node, NodeConfig, NodeType}; +use sharp_p2p_peer::store::Store; +use std::error::Error; + +use std::time::Duration; +use tokio::time::sleep; + +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); + + // 1. Generate keypair for the node + let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519(); + + // 2. Initiate a new node to sync with other peers + let store = Store::new(); + let node_config = + NodeConfig::new(NodeType::Executor, Network::Sepolia, p2p_local_keypair, Vec::new(), store); + let node = Node::new(node_config).await.unwrap(); + println!("node: {:?}", node); + + loop { + sleep(Duration::from_secs(1)).await; + } } diff --git a/crates/peer/Cargo.toml b/crates/peer/Cargo.toml index df77a4f..d3a73de 100644 --- a/crates/peer/Cargo.toml +++ b/crates/peer/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "peer" +name = "sharp-p2p-peer" version.workspace = true edition.workspace = true repository.workspace = true @@ -11,4 +11,6 @@ license-file.workspace = true libp2p.workspace = true tokio.workspace = true tracing-subscriber.workspace = true -tracing.workspace = true \ No newline at end of file +tracing.workspace = true +sharp-p2p-common.workspace = true +starknet.workspace = true diff --git a/crates/peer/src/lib.rs b/crates/peer/src/lib.rs new file mode 100644 index 0000000..31c83b7 --- /dev/null +++ b/crates/peer/src/lib.rs @@ -0,0 +1,5 @@ +pub mod network; +pub mod node; +pub mod registry; +pub mod store; +pub mod swarm; diff --git a/crates/peer/src/main.rs b/crates/peer/src/main.rs deleted file mode 100644 index e7a11a9..0000000 --- a/crates/peer/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/crates/peer/src/network.rs b/crates/peer/src/network.rs new file mode 100644 index 0000000..262b072 --- /dev/null +++ b/crates/peer/src/network.rs @@ -0,0 +1,16 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Network { + /// Starknet mainnet. + Mainnet, + /// Sepolia testnet. + Sepolia, +} + +impl Network { + pub fn as_str(&self) -> &'static str { + match self { + Network::Mainnet => "mainnet", + Network::Sepolia => "sepolia", + } + } +} diff --git a/crates/peer/src/node.rs b/crates/peer/src/node.rs new file mode 100644 index 0000000..a40272c --- /dev/null +++ b/crates/peer/src/node.rs @@ -0,0 +1,64 @@ +use libp2p::identity::Keypair; +use libp2p::Multiaddr; +use std::error::Error; +use std::sync::Arc; + +use crate::network::Network; +use crate::registry::RegistryHandler; +use crate::store::Store; +use crate::swarm::SwarmRunner; + +pub enum NodeType { + Delegator, + Executor, +} + +pub struct NodeConfig { + pub node_type: NodeType, + /// An id of the network to connect to. + pub network: Network, + /// The keypair to be used as [`Node`]s identity. + pub p2p_local_keypair: Keypair, + /// List of the addresses where [`Node`] will listen for incoming connections. + pub p2p_listen_on: Vec, + /// The store for job record. + pub store: Store, +} + +impl NodeConfig { + pub fn new( + node_type: NodeType, + network: Network, + p2p_local_keypair: Keypair, + p2p_listen_on: Vec, + store: Store, + ) -> Self { + Self { node_type, network, p2p_local_keypair, p2p_listen_on, store } + } +} + +#[derive(Debug)] +pub struct Node { + pub store: Arc, +} + +impl Node { + pub async fn new(node_config: NodeConfig) -> Result> { + let mut swarm_runner = SwarmRunner::new(&node_config)?; + let registry_handler = RegistryHandler::new( + "https://starknet-sepolia.public.blastapi.io", + "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", + ); + // Node should run swarm runner and registry handler concurrently. + tokio::spawn(async move { + swarm_runner.run(node_config.node_type).await; + }); + tokio::spawn(async move { + registry_handler.run().await; + }); + + let store = Arc::new(node_config.store); + + Ok(Self { store }) + } +} diff --git a/crates/peer/src/registry.rs b/crates/peer/src/registry.rs new file mode 100644 index 0000000..ff12311 --- /dev/null +++ b/crates/peer/src/registry.rs @@ -0,0 +1,72 @@ +use std::error::Error; + +use starknet::{ + core::types::{BlockId, EmittedEvent, EventFilter, FieldElement}, + providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider, Url}, +}; + +pub struct RegistryHandler { + pub provider: JsonRpcClient, + address: FieldElement, +} + +impl RegistryHandler { + pub fn new(url: &str, address: &str) -> Self { + let provider = JsonRpcClient::new(HttpTransport::new(Url::parse(url).unwrap())); + let address = FieldElement::from_hex_be(address).unwrap(); + Self { provider, address } + } + + async fn scrape_event( + &self, + event_keys: Vec, + from_block: u64, + ) -> Result, Box> { + let keys = event_keys + .iter() + .map(|key| FieldElement::from_hex_be(key)) + .collect::, _>>()?; + + let latest_block_number = self.provider.block_number().await?; + + let filter = EventFilter { + from_block: Some(BlockId::Number(from_block)), + to_block: Some(BlockId::Number(latest_block_number)), + address: Some(self.address), + keys: Some(vec![keys.clone()]), + }; + + let events = self.provider.get_events(filter, None, 1000).await?.events; + Ok(events) + } + + pub async fn run(&self) { + // Create an interval of every 5 seconds + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); + + loop { + interval.tick().await; + + println!("Scraping events..."); + + // Scrape the event + let result = self + .scrape_event( + vec!["0x17ef19eae2188756c1689ef60586c692a3aee6fecc18ee1b21f3028f75b9988" + .to_string()], + 0, + ) + .await; + + // Handle the result + match result { + Ok(events) => { + println!("{} Events Found", events.len()); + } + Err(e) => { + eprintln!("Error scraping events: {:?}", e); + } + } + } + } +} diff --git a/crates/peer/src/store.rs b/crates/peer/src/store.rs new file mode 100644 index 0000000..7dcfb18 --- /dev/null +++ b/crates/peer/src/store.rs @@ -0,0 +1,22 @@ +use sharp_p2p_common::job::Job; + +use std::collections::VecDeque; + +#[derive(Debug)] +pub struct Store { + /// For delegator, FIFO queue to publish message + /// For executor, FIFO queue to prove job + pub job_queue: VecDeque, +} + +impl Store { + pub fn new() -> Self { + Self { job_queue: VecDeque::new() } + } +} + +impl Default for Store { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs new file mode 100644 index 0000000..2730842 --- /dev/null +++ b/crates/peer/src/swarm.rs @@ -0,0 +1,159 @@ +use std::error::Error; + +use libp2p::futures::StreamExt; +use libp2p::gossipsub::{self, IdentTopic}; +use libp2p::multiaddr::Protocol; +use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; +use libp2p::{mdns, noise, tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder}; +use tokio::io::{self, AsyncBufReadExt}; + +use crate::network::Network; +use crate::node::{NodeConfig, NodeType}; + +#[derive(NetworkBehaviour)] +struct PeerBehaviour { + gossipsub: gossipsub::Behaviour, + mdns: mdns::tokio::Behaviour, +} + +pub enum Topic { + NewJob, + PickedJob, +} + +impl Topic { + pub fn as_str(&self) -> &'static str { + match self { + Topic::NewJob => "new-job", + Topic::PickedJob => "picked-job", + } + } +} + +pub(crate) fn gossipsub_ident_topic(network: Network, topic: Topic) -> IdentTopic { + let network = network.as_str(); + let topic = topic.as_str(); + let s = format!("/{network}/{topic}"); + IdentTopic::new(s) +} + +pub struct SwarmRunner { + swarm: Swarm, + network: Network, +} + +impl SwarmRunner { + pub fn new(node_config: &NodeConfig) -> Result> { + let mdns = mdns::tokio::Behaviour::new( + mdns::Config::default(), + node_config.p2p_local_keypair.public().to_peer_id(), + )?; + let gossipsub = init_gossip(node_config)?; + let behaviour = PeerBehaviour { gossipsub, mdns }; + let local_keypair = node_config.p2p_local_keypair.clone(); + let mut swarm = SwarmBuilder::with_existing_identity(local_keypair) + .with_tokio() + .with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)? + .with_quic() + .with_behaviour(|_| behaviour) + .expect("Moving behaviour doesn't fail") + .build(); + + swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + + Ok(SwarmRunner { swarm, network: node_config.network }) + } + + pub async fn run(&mut self, node_type: NodeType) { + // Read full lines from stdin + let mut stdin = io::BufReader::new(io::stdin()).lines(); + + let publish_topic = match node_type { + NodeType::Delegator => gossipsub_ident_topic(self.network, Topic::NewJob), + NodeType::Executor => gossipsub_ident_topic(self.network, Topic::PickedJob), + }; + + loop { + tokio::select! { + Ok(Some(line)) = stdin.next_line() => { + println!("Publishing to topic: {:?}", publish_topic); + if let Err(e) = self.swarm + .behaviour_mut().gossipsub + .publish(publish_topic.clone(), line.as_bytes()) { + println!("Publish error: {e:?}"); + } + }, + event = self.swarm.select_next_some() => match event { + SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => { + for (peer_id, _multiaddr) in list { + println!("mDNS discovered a new peer: {peer_id}"); + self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + } + }, + SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Expired(list))) => { + for (peer_id, _multiaddr) in list { + println!("mDNS discover peer has expired: {peer_id}"); + self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); + } + }, + SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { + propagation_source: peer_id, + message_id: id, + message, + })) => println!( + "Got message: '{}' with id: {id} from peer: {peer_id}", + String::from_utf8_lossy(&message.data), + ), + SwarmEvent::NewListenAddr { address, .. } => { + println!("Local node is listening on {address}"); + } + _ => {} + } + } + } + } +} + +fn init_gossip(node_config: &NodeConfig) -> Result> { + let message_authenticity = + gossipsub::MessageAuthenticity::Signed(node_config.p2p_local_keypair.clone()); + let config = gossipsub::ConfigBuilder::default() + .validation_mode(gossipsub::ValidationMode::Strict) + .validate_messages() + .build() + .unwrap(); + let mut gossipsub: gossipsub::Behaviour = + gossipsub::Behaviour::new(message_authenticity, config).unwrap(); + + // `new-job` is the topic about new job to be proven + let new_job_topic = gossipsub_ident_topic(node_config.network, Topic::NewJob); + // `picked-job` is the topic about picked job to processing prover + let picked_job_topic = gossipsub_ident_topic(node_config.network, Topic::PickedJob); + + match node_config.node_type { + NodeType::Delegator => { + println!("Delegator: Subscribed no topic"); + } + NodeType::Executor => { + gossipsub.subscribe(&picked_job_topic)?; + gossipsub.subscribe(&new_job_topic)?; + println!("Executor: Subscribed to topic: {:?}, {:?}", new_job_topic, picked_job_topic); + } + } + + Ok(gossipsub) +} + +pub(crate) trait MultiaddrExt { + fn peer_id(&self) -> Option; +} + +impl MultiaddrExt for Multiaddr { + fn peer_id(&self) -> Option { + self.iter().find_map(|proto| match proto { + Protocol::P2p(peer_id) => Some(peer_id), + _ => None, + }) + } +}