Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Jun 29, 2024
1 parent 99482d1 commit 5556bdb
Show file tree
Hide file tree
Showing 17 changed files with 505 additions and 145 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ tokio = { version = "1.36", features = ["full"] }
tokio-util = "0.7.10"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tonic = "0.11.0"
prost = "0.12"
prost-types = "0.12"
tonic-build = "0.11.0"

zetina-common = { path = "crates/common" }
zetina-compiler = { path = "crates/compiler" }
Expand All @@ -72,4 +76,4 @@ zetina-executor = { path = "crates/executor" }
zetina-peer = { path = "crates/peer" }
zetina-prover = { path = "crates/prover" }
zetina-runner = { path = "crates/runner" }
zetina-tests = { path = "crates/tests" }
zetina-tests = { path = "crates/tests" }
8 changes: 4 additions & 4 deletions crates/common/src/job_witness.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::hash;
use starknet_crypto::FieldElement;
use serde::{Deserialize, Serialize};
use std::{
fmt::Display,
hash::{DefaultHasher, Hash, Hasher},
Expand All @@ -8,13 +8,13 @@ use std::{
/*
Job Witness Object
This object represents the output from the proving process.
It holds a serialized proof as an array of FieldElement objects.
It holds a serialized proof as an array of bytes.
This serialized proof can be deserialized into a StarkProof object by the verifier to proceed with the verification of the statement.
*/

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct JobWitness {
pub proof: Vec<FieldElement>, // Serialized proof
pub proof: Vec<u8>, // Serialized proof
}

impl Display for JobWitness {
Expand Down
2 changes: 2 additions & 0 deletions crates/common/src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use crate::network::Network;
pub enum Topic {
NewJob,
PickedJob,
FinishedJob,
}

impl Topic {
pub fn as_str(&self) -> &'static str {
match self {
Topic::NewJob => "new-job",
Topic::PickedJob => "picked-job",
Topic::FinishedJob => "finished-job",
}
}
}
Expand Down
20 changes: 2 additions & 18 deletions crates/compiler/src/cairo_compiler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use crate::{errors::CompilerControllerError, traits::CompilerController};
use async_process::Stdio;
use futures::Future;
use rand::{thread_rng, Rng};
use serde_json::json;
use starknet::signers::SigningKey;
use std::io::Write;
use std::path::PathBuf;
use std::{io::Read, pin::Pin};
use tempfile::NamedTempFile;
Expand All @@ -30,7 +27,7 @@ impl<'identity> CompilerController for CairoCompiler<'identity> {
fn run(
&self,
program_path: PathBuf,
_program_input_path: PathBuf,
program_input_path: PathBuf,
) -> Result<Process<Result<Job, CompilerControllerError>>, CompilerControllerError> {
let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10);
let future: Pin<Box<dyn Future<Output = Result<Job, CompilerControllerError>> + '_>> =
Expand Down Expand Up @@ -66,18 +63,6 @@ impl<'identity> CompilerController for CairoCompiler<'identity> {
}
}

// TODO remove it is just to make every job a little diffirent for testing purposes
let mut random_input = NamedTempFile::new()?;
let mut rng = thread_rng();
random_input.write_all(
json!({
"fibonacci_claim_index": rng.gen_range(10..10000)
})
.to_string()
.as_bytes(),
)?;

// output
let mut cairo_pie = NamedTempFile::new()?;

let mut task = Command::new("cairo-run")
Expand All @@ -86,7 +71,7 @@ impl<'identity> CompilerController for CairoCompiler<'identity> {
.arg("--layout")
.arg(layout)
.arg("--program_input")
.arg(random_input.path())
.arg(program_input_path)
.arg("--cairo_pie_output")
.arg(cairo_pie.path())
.arg("--print_output")
Expand All @@ -112,7 +97,6 @@ impl<'identity> CompilerController for CairoCompiler<'identity> {
}
}

// cairo run had finished
let mut cairo_pie_compressed = Vec::new();
cairo_pie.read_to_end(&mut cairo_pie_compressed)?;

Expand Down
7 changes: 7 additions & 0 deletions crates/delegator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
starknet.workspace = true
tonic.workspace = true
prost.workspace = true
async-stream.workspace = true
tokio-util.workspace = true

[build-dependencies]
tonic-build.workspace = true
4 changes: 4 additions & 0 deletions crates/delegator/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/delegator.proto")?;
Ok(())
}
10 changes: 10 additions & 0 deletions crates/delegator/proto/delegator.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";

package delegator;

service DelegatorService {
rpc delegate(DelegateRequest) returns (DelegateResponse) {}
}

message DelegateRequest { string name = 1; }
message DelegateResponse { string message = 1; }
60 changes: 60 additions & 0 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
pub mod event_loop;

use event_loop::delegator_loop;
use futures::executor::block_on;
use futures::FutureExt;
use libp2p::gossipsub::Event;
use starknet::signers::SigningKey;
use std::hash::{DefaultHasher, Hash, Hasher};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::info;
use zetina_common::{
hash,
job::{Job, JobData},
job_witness::JobWitness,
};

pub struct Delegator<'identity> {
signing_key: &'identity SigningKey,
job_topic_tx: mpsc::Sender<Vec<u8>>,
cancellation_token: CancellationToken,
handle: Option<JoinHandle<()>>,
}

impl<'identity> Delegator<'identity> {
pub fn new(
signing_key: &'identity SigningKey,
job_topic_tx: mpsc::Sender<Vec<u8>>,
job_witness_tx: mpsc::Sender<JobWitness>,
events_rx: mpsc::Receiver<Event>,
) -> Self {
let cancellation_token = CancellationToken::new();

Self {
signing_key,
job_topic_tx,
cancellation_token: cancellation_token.to_owned(),
handle: Some(tokio::spawn(async move {
delegator_loop(events_rx, job_witness_tx, cancellation_token).boxed().await
})),
}
}

pub async fn delegate(self, job_data: JobData) -> Result<(), mpsc::error::SendError<Vec<u8>>> {
let job = Job::try_from_job_data(job_data, self.signing_key);
let serialized_job = serde_json::to_string(&job).unwrap();
self.job_topic_tx.send(serialized_job.into()).await?;
info!("Sent a new job: {}", hash!(&job));
Ok(())
}
}

impl<'identity> Drop for Delegator<'identity> {
fn drop(&mut self) {
self.cancellation_token.cancel();
block_on(async move {
self.handle.take().unwrap().await.unwrap();
})
}
}
56 changes: 56 additions & 0 deletions crates/delegator/src/delegator/event_loop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use libp2p::gossipsub::Event;
use std::hash::{DefaultHasher, Hash, Hasher};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::info;
use zetina_common::{
hash,
job::Job,
job_witness::JobWitness,
network::Network,
topic::{gossipsub_ident_topic, Topic},
};

pub async fn delegator_loop(
mut message_stream: mpsc::Receiver<Event>,
job_witness_tx: mpsc::Sender<JobWitness>,
cancellation_token: CancellationToken,
) {
loop {
tokio::select! {
Some(event) = message_stream.recv() => {
match event {
Event::Message { message, .. } => {
// Received a new-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::NewJob).into() {
let job: Job = serde_json::from_slice(&message.data).unwrap();
info!("Received a new job event: {}", hash!(&job));
}
// Received a picked-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob).into() {
let job: Job = serde_json::from_slice(&message.data).unwrap();
info!("Received picked job event: {}", hash!(&job));
}
// Received a finished-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::FinishedJob).into() {
let job_witness: JobWitness = serde_json::from_slice(&message.data).unwrap();
info!("Received finished job event: {}", hash!(&job_witness));
job_witness_tx.send(job_witness).await.unwrap();
}
},
Event::Subscribed { peer_id, topic } => {
info!("{} subscribed to the topic {}", peer_id.to_string(), topic.to_string());
},
Event::Unsubscribed { peer_id, topic }=> {
info!("{} unsubscribed to the topic {}", peer_id.to_string(), topic.to_string());
},
_ => {}
}
},
_ = cancellation_token.cancelled() => {
break
}
else => break
}
}
}
100 changes: 24 additions & 76 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
#![deny(unused_crate_dependencies)]
pub mod delegator;
pub mod swarm;
pub mod tonic;

use futures::{stream::FuturesUnordered, StreamExt};
use libp2p::gossipsub::Event;
use delegator::Delegator;
use libp2p::gossipsub;
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Url};
use std::hash::{DefaultHasher, Hash, Hasher};
use tokio::{
io::{stdin, AsyncBufReadExt, BufReader},
sync::mpsc,
};
use tracing::info;
use swarm::SwarmRunner;
use tokio::sync::mpsc;
use tracing_subscriber::EnvFilter;
use zetina_common::{
graceful_shutdown::shutdown_signal,
hash,
job::Job,
job_witness::JobWitness,
network::Network,
node_account::NodeAccount,
process::Process,
topic::{gossipsub_ident_topic, Topic},
};
use zetina_compiler::{
cairo_compiler::{tests::models::fixture, CairoCompiler},
errors::CompilerControllerError,
traits::CompilerController,
};
use zetina_peer::swarm::SwarmRunner;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -48,67 +37,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Generate topic
let new_job_topic = gossipsub_ident_topic(network, Topic::NewJob);
let picked_job_topic = gossipsub_ident_topic(network, Topic::PickedJob);
let finished_job_topic = gossipsub_ident_topic(network, Topic::FinishedJob);

let mut swarm_runner = SwarmRunner::new(
node_account.get_keypair(),
&[new_job_topic.to_owned(), picked_job_topic],
)?;

let (send_topic_tx, send_topic_rx) = mpsc::channel::<Vec<u8>>(1000);
let mut message_stream = swarm_runner.run(new_job_topic, send_topic_rx);

let compiler = CairoCompiler::new(node_account.get_signing_key());
let (swarm_events_tx, swarm_events_rx) = mpsc::channel::<gossipsub::Event>(100);
let (job_witness_tx, job_witness_rx) = mpsc::channel::<JobWitness>(100);

let mut compiler_scheduler =
FuturesUnordered::<Process<'_, Result<Job, CompilerControllerError>>>::new();
let (new_job_topic_tx, new_job_topic_rx) = mpsc::channel::<Vec<u8>>(100);

// Read cairo program path from stdin
let mut stdin = BufReader::new(stdin()).lines();

loop {
tokio::select! {
Ok(Some(_)) = stdin.next_line() => {
// TODO: handle fixture better way
let fixture = fixture();
compiler_scheduler.push(compiler.run(fixture.program_path.clone(), fixture.program_input_path)?);
info!("Scheduled compiling program at path: {:?}", fixture.program_path);

},
Some(event) = message_stream.next() => {
match event {
Event::Message { message, .. } => {
// Received a new-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::NewJob).into() {
let job: Job = serde_json::from_slice(&message.data).unwrap();
info!("Received a new job event: {}", hash!(&job));
SwarmRunner::new(
node_account.get_keypair(),
vec![new_job_topic.to_owned(), picked_job_topic, finished_job_topic],
vec![(new_job_topic.to_owned(), new_job_topic_rx)],
swarm_events_tx,
)?;

}
// Received a picked-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob).into() {
let job: Job = serde_json::from_slice(&message.data).unwrap();
info!("Received picked job event: {}", hash!(&job));
}
},
Event::Subscribed { peer_id, topic } => {
info!("{} subscribed to the topic {}", peer_id.to_string(), topic.to_string());
},
Event::Unsubscribed { peer_id, topic }=> {
info!("{} unsubscribed to the topic {}", peer_id.to_string(), topic.to_string());
},
_ => {}
}
},
Some(Ok(job)) = compiler_scheduler.next() => {
let serialized_job = serde_json::to_string(&job).unwrap();
send_topic_tx.send(serialized_job.into()).await?;
info!("Sent a new job: {}", hash!(&job));
},
_ = shutdown_signal() => {
break
}
else => break
}
}
Delegator::new(
node_account.get_signing_key(),
new_job_topic_tx,
job_witness_tx,
swarm_events_rx,
);

Ok(())
}
Loading

0 comments on commit 5556bdb

Please sign in to comment.