Skip to content

Commit

Permalink
Merge pull request #2 from iosis-tech/streams
Browse files Browse the repository at this point in the history
streams&refactor
  • Loading branch information
rkdud007 authored Apr 15, 2024
2 parents 03e4dfd + acb9ac1 commit 578577f
Show file tree
Hide file tree
Showing 17 changed files with 235 additions and 290 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[env]
RUST_LOG = "info"
18 changes: 6 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,28 @@ license-file = "LICENSE"

[workspace.dependencies]
async-process = "2.2.0"
async-stream = "0.3.5"
cairo-felt = "0.9.1"
cairo-proof-parser = { git = "https://github.com/Okm165/cairo-proof-parser", rev = "97a04bbee07330311b38d6f4cecfed3acb237626" }
futures = "0.3.30"
futures-core = "0.3.30"
futures-util = "0.3.30"
hex = "0.4.3"
itertools = "0.12.1"
libp2p = { version = "0.53.2", features = [
"tokio",
"gossipsub",
"kad",
"mdns",
"noise",
"macros",
"tcp",
"yamux",
"quic",
] }
libp2p = { version = "0.53.2", features = ["tokio","gossipsub","kad","mdns","noise","macros","tcp","yamux","quic"]}
num-bigint = "0.4.4"
serde = "1.0.197"
serde_json = "1.0.115"
starknet = "0.9.0"
tempfile = "3.10.1"
thiserror = "1.0.58"
tokio = { version = "1.36", features = ["full"] }
tokio-util = "0.7.10"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

sharp-p2p-common = { path = "crates/common" }
sharp-p2p-delegator = { path = "crates/delegator" }
sharp-p2p-executor = { path = "crates/executor" }
sharp-p2p-prover = { path = "crates/prover" }
sharp-p2p-peer = { path = "crates/peer" }
sharp-p2p-prover = { path = "crates/prover" }
1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license-file.workspace = true
[dependencies]
cairo-felt.workspace = true
hex.workspace = true
libp2p.workspace = true
num-bigint.workspace = true
serde_json.workspace = true
serde.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod job;
pub mod job_witness;
pub mod network;
pub mod topic;
pub mod vec252;
2 changes: 0 additions & 2 deletions crates/peer/src/network.rs → crates/common/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Network {
/// Starknet mainnet.
Mainnet,
/// Sepolia testnet.
Sepolia,
}

Expand Down
25 changes: 25 additions & 0 deletions crates/common/src/topic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use libp2p::gossipsub::IdentTopic;

use crate::network::Network;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Topic {
NewJob,
PickedJob,
}

impl Topic {
pub fn as_str(&self) -> &'static str {
match self {
Topic::NewJob => "new-job",
Topic::PickedJob => "picked-job",
}
}
}

pub fn gossipsub_ident_topic(network: Network, topic: Topic) -> IdentTopic {
let network = network.as_str();
let topic = topic.as_str();
let s = format!("/{network}/{topic}");
IdentTopic::new(s)
}
4 changes: 3 additions & 1 deletion crates/delegator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ license-file.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures-util.workspace = true
libp2p.workspace = true
sharp-p2p-common.workspace = true
sharp-p2p-peer.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
sharp-p2p-peer.workspace = true
54 changes: 38 additions & 16 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use sharp_p2p_peer::network::Network;
use sharp_p2p_peer::node::{Node, NodeConfig, NodeType};
use sharp_p2p_peer::store::Store;
use futures_util::StreamExt;
use sharp_p2p_common::network::Network;
use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic};
use sharp_p2p_peer::registry::RegistryHandler;
use sharp_p2p_peer::swarm::SwarmRunner;
use std::error::Error;
use std::time::Duration;
use tokio::time::sleep;
use tokio::io::{stdin, AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;
use tracing::info;
use tracing_subscriber::EnvFilter;

#[tokio::main]
Expand All @@ -13,19 +16,38 @@ async fn main() -> Result<(), Box<dyn Error>> {
// 1. Generate keypair for the node
let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519();

// 2. Initiate a new node to sync with other peers
let store = Store::new();
let node_config = NodeConfig::new(
NodeType::Delegator,
Network::Sepolia,
p2p_local_keypair,
Vec::new(),
store,
// 2. Generate topic
let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob);
let picked_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob);

let mut swarm_runner =
SwarmRunner::new(&p2p_local_keypair, &[new_job_topic.to_owned(), picked_job_topic])?;
let mut registry_handler = RegistryHandler::new(
"https://starknet-sepolia.public.blastapi.io",
"0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b",
);
let node = Node::new(node_config).await.unwrap();
println!("node: {:?}", node);

let (send_topic_tx, send_topic_rx) = mpsc::channel::<Vec<u8>>(1000);
let mut message_stream = swarm_runner.run(new_job_topic, send_topic_rx);
let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]);

// Read full lines from stdin
let mut stdin = BufReader::new(stdin()).lines();

loop {
sleep(Duration::from_secs(1)).await;
tokio::select! {
Ok(Some(line)) = stdin.next_line() => {
send_topic_tx.send(line.as_bytes().to_vec()).await?;
},
Some(event) = message_stream.next() => {
info!("{:?}", event);
},
Some(Ok(event_vec)) = event_stream.next() => {
info!("{:?}", event_vec);
},
else => break
}
}

Ok(())
}
4 changes: 3 additions & 1 deletion crates/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ license-file.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures-util.workspace = true
libp2p.workspace = true
sharp-p2p-common.workspace = true
sharp-p2p-peer.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
sharp-p2p-peer.workspace = true
1 change: 0 additions & 1 deletion crates/executor/readme.md

This file was deleted.

53 changes: 39 additions & 14 deletions crates/executor/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use sharp_p2p_peer::network::Network;
use sharp_p2p_peer::node::{Node, NodeConfig, NodeType};
use sharp_p2p_peer::store::Store;
use futures_util::StreamExt;
use sharp_p2p_common::network::Network;
use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic};
use sharp_p2p_peer::registry::RegistryHandler;
use sharp_p2p_peer::swarm::SwarmRunner;
use std::error::Error;

use std::time::Duration;
use tokio::time::sleep;

use tokio::io::{stdin, AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;
use tracing::info;
use tracing_subscriber::EnvFilter;

#[tokio::main]
Expand All @@ -15,14 +16,38 @@ async fn main() -> Result<(), Box<dyn Error>> {
// 1. Generate keypair for the node
let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519();

// 2. Initiate a new node to sync with other peers
let store = Store::new();
let node_config =
NodeConfig::new(NodeType::Executor, Network::Sepolia, p2p_local_keypair, Vec::new(), store);
let node = Node::new(node_config).await.unwrap();
println!("node: {:?}", node);
// 2. Generate topic
let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob);
let picked_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob);

let mut swarm_runner =
SwarmRunner::new(&p2p_local_keypair, &[new_job_topic, picked_job_topic.to_owned()])?;
let mut registry_handler = RegistryHandler::new(
"https://starknet-sepolia.public.blastapi.io",
"0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b",
);

let (send_topic_tx, send_topic_rx) = mpsc::channel::<Vec<u8>>(1000);
let mut message_stream = swarm_runner.run(picked_job_topic, send_topic_rx);
let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]);

// Read full lines from stdin
let mut stdin = BufReader::new(stdin()).lines();

loop {
sleep(Duration::from_secs(1)).await;
tokio::select! {
Ok(Some(line)) = stdin.next_line() => {
send_topic_tx.send(line.as_bytes().to_vec()).await?;
},
Some(event) = message_stream.next() => {
info!("{:?}", event);
},
Some(Ok(event_vec)) = event_stream.next() => {
info!("{:?}", event_vec);
},
else => break
}
}

Ok(())
}
11 changes: 8 additions & 3 deletions crates/peer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ license-file.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-stream.workspace = true
futures-core.workspace = true
futures-util.workspace = true
futures.workspace = true
libp2p.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
sharp-p2p-common.workspace = true
starknet.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
3 changes: 0 additions & 3 deletions crates/peer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
pub mod network;
pub mod node;
pub mod registry;
pub mod store;
pub mod swarm;
64 changes: 0 additions & 64 deletions crates/peer/src/node.rs

This file was deleted.

Loading

0 comments on commit 578577f

Please sign in to comment.