Skip to content

Commit

Permalink
runner dev & prover refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Apr 18, 2024
1 parent 69a2e4b commit a2908c0
Show file tree
Hide file tree
Showing 21 changed files with 351 additions and 58 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [
"crates/delegator",
"crates/executor",
"crates/peer",
"crates/prover",
"crates/prover", "crates/runner",
]
exclude = []

Expand All @@ -28,17 +28,20 @@ futures-core = "0.3.30"
futures-util = "0.3.30"
hex = "0.4.3"
itertools = "0.12.1"
libp2p = { version = "0.53.2", features = ["tokio","gossipsub","kad","mdns","noise","macros","tcp","yamux","quic"]}
libp2p = { version = "0.53.2", features = ["secp256k1", "tokio","gossipsub","kad","mdns","noise","macros","tcp","yamux","quic"]}
libsecp256k1 = "0.7.1"
num-bigint = "0.4.4"
serde = "1.0.197"
serde_json = "1.0.115"
starknet = "0.9.0"
strum = { version = "0.26", features = ["derive"] }
tempfile = "3.10.1"
thiserror = "1.0.58"
tokio = { version = "1.36", features = ["full"] }
tokio-util = "0.7.10"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
zip-extensions = "0.6.2"

sharp-p2p-common = { path = "crates/common" }
sharp-p2p-delegator = { path = "crates/delegator" }
Expand Down
2 changes: 2 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ license-file.workspace = true
cairo-felt.workspace = true
hex.workspace = true
libp2p.workspace = true
libsecp256k1.workspace = true
num-bigint.workspace = true
serde_json.workspace = true
serde.workspace = true
tempfile.workspace = true
thiserror.workspace = true
8 changes: 8 additions & 0 deletions crates/common/src/hash_macro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#[macro_export]
macro_rules! hash {
($value:expr) => {{
let mut hasher = DefaultHasher::new();
$value.hash(&mut hasher);
hasher.finish()
}};
}
29 changes: 21 additions & 8 deletions crates/common/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
use libsecp256k1::{PublicKey, Signature};
use std::{
fmt::Display,
hash::{DefaultHasher, Hash, Hasher},
};

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
use crate::hash;

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Job {
pub reward: u32,
pub num_of_steps: u32,
pub private_input: Vec<u8>,
pub public_input: Vec<u8>,
pub cpu_air_prover_config: Vec<u8>,
pub cpu_air_params: Vec<u8>,
pub cairo_pie: Vec<u8>,
pub public_key: PublicKey,
pub signature: Signature,
// below fields not bounded by signature
pub cpu_air_params: Vec<u8>, // needed for proving
pub cpu_air_prover_config: Vec<u8>, // needed for proving
}

impl Hash for Job {
fn hash<H: Hasher>(&self, state: &mut H) {
self.reward.hash(state);
self.num_of_steps.hash(state);
self.cairo_pie.hash(state);
self.cpu_air_prover_config.hash(state);
self.cpu_air_params.hash(state);
}
}

impl Display for Job {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
write!(f, "{}", hex::encode(hasher.finish().to_be_bytes()))
write!(f, "{}", hex::encode(hash!(self).to_be_bytes()))
}
}
33 changes: 33 additions & 0 deletions crates/common/src/job_trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use crate::hash;
use std::{
fmt::Display,
hash::{DefaultHasher, Hash, Hasher},
};
use tempfile::NamedTempFile;

#[derive(Debug)]
pub struct JobTrace {
pub air_public_input: NamedTempFile,
pub air_private_input: NamedTempFile,
pub memory: NamedTempFile, // this is not used directly but needs to live for air_private_input to be valid
pub trace: NamedTempFile, // this is not used directly but needs to live for air_private_input to be valid
pub cpu_air_prover_config: NamedTempFile,
pub cpu_air_params: NamedTempFile,
}

impl Hash for JobTrace {
fn hash<H: Hasher>(&self, state: &mut H) {
self.air_public_input.path().hash(state);
self.air_private_input.path().hash(state);
self.memory.path().hash(state);
self.trace.path().hash(state);
self.cpu_air_prover_config.path().hash(state);
self.cpu_air_params.path().hash(state);
}
}

impl Display for JobTrace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(hash!(self).to_be_bytes()))
}
}
8 changes: 3 additions & 5 deletions crates/common/src/job_witness.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use crate::hash;
use cairo_felt::Felt252;
use std::{
fmt::Display,
hash::{DefaultHasher, Hash, Hasher},
};

use cairo_felt::Felt252;

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct JobWitness {
pub data: Vec<Felt252>,
}

impl Display for JobWitness {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
write!(f, "{}", hex::encode(hasher.finish().to_be_bytes()))
write!(f, "{}", hex::encode(hash!(self).to_be_bytes()))
}
}
2 changes: 2 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod hash_macro;
pub mod job;
pub mod job_trace;
pub mod job_witness;
pub mod network;
pub mod topic;
Expand Down
1 change: 1 addition & 0 deletions crates/delegator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license-file.workspace = true
[dependencies]
futures-util.workspace = true
libp2p.workspace = true
libsecp256k1.workspace = true
sharp-p2p-common.workspace = true
sharp-p2p-peer.workspace = true
tokio.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ license-file.workspace = true
[dependencies]
futures-util.workspace = true
libp2p.workspace = true
libsecp256k1.workspace = true
sharp-p2p-common.workspace = true
sharp-p2p-peer.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
tracing.workspace = true
2 changes: 1 addition & 1 deletion crates/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init();

// 1. Generate keypair for the node
let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519();
let p2p_local_keypair = libp2p::identity::Keypair::generate_secp256k1();

// 2. Generate topic
let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob);
Expand Down
3 changes: 1 addition & 2 deletions crates/prover/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod errors;
pub mod stone_prover;
#[allow(async_fn_in_trait)]
pub mod traits;

pub mod stone_prover;
84 changes: 49 additions & 35 deletions crates/prover/src/stone_prover/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use std::{collections::HashMap, io::Read};

use crate::{
errors::ProverControllerError,
traits::{Prover, ProverController},
};
use itertools::{chain, Itertools};
use sharp_p2p_common::{job::Job, job_witness::JobWitness, vec252::VecFelt252};
use std::io::Write;
use sharp_p2p_common::{hash, job_trace::JobTrace, job_witness::JobWitness, vec252::VecFelt252};
use std::{
collections::HashMap,
hash::{DefaultHasher, Hash, Hasher},
io::Read,
};
use tempfile::NamedTempFile;
use tokio::process::{Child, Command};
use tracing::{debug, trace};

pub struct StoneProver {
tasks: HashMap<Job, Child>,
tasks: HashMap<u64, Child>,
}

impl Prover for StoneProver {
Expand All @@ -22,47 +24,52 @@ impl Prover for StoneProver {
}

impl ProverController for StoneProver {
async fn prove(&mut self, job: Job) -> Result<JobWitness, ProverControllerError> {
async fn prove(&mut self, job_trace: JobTrace) -> Result<JobWitness, ProverControllerError> {
let mut out_file = NamedTempFile::new()?;
let mut private_input_file = NamedTempFile::new()?;
let mut public_input_file = NamedTempFile::new()?;
let mut prover_config_file = NamedTempFile::new()?;
let mut parameter_file = NamedTempFile::new()?;

private_input_file.write_all(&job.private_input)?;
public_input_file.write_all(&job.public_input)?;
prover_config_file.write_all(&job.cpu_air_prover_config)?;
parameter_file.write_all(&job.cpu_air_params)?;
trace!("task {} environment prepared", job);

let task = Command::new("cpu_air_prover")
.args(["out_file", out_file.path().to_string_lossy().as_ref()])
.args(["private_input_file", private_input_file.path().to_string_lossy().as_ref()])
.args(["public_input_file", public_input_file.path().to_string_lossy().as_ref()])
.args(["prover_config_file", prover_config_file.path().to_string_lossy().as_ref()])
.args(["parameter_file", parameter_file.path().to_string_lossy().as_ref()])
.args(["--out_file", out_file.path().to_string_lossy().as_ref()])
.args([
"--air_private_input",
job_trace.air_private_input.path().to_string_lossy().as_ref(),
])
.args([
"--air_public_input",
job_trace.air_public_input.path().to_string_lossy().as_ref(),
])
.args([
"--cpu_air_prover_config",
job_trace.cpu_air_prover_config.path().to_string_lossy().as_ref(),
])
.args(["--cpu_air_params", job_trace.cpu_air_params.path().to_string_lossy().as_ref()])
.arg("--generate_annotations")
.spawn()?;

debug!("task {} spawned", job);
self.tasks.insert(job.to_owned(), task);
let job_trace_hash = hash!(job_trace);

debug!("task {} spawned", job_trace_hash);
self.tasks.insert(job_trace_hash.to_owned(), task);

let task_status =
self.tasks.get_mut(&job).ok_or(ProverControllerError::TaskNotFound)?.wait().await?;
let task_status = self
.tasks
.get_mut(&job_trace_hash)
.ok_or(ProverControllerError::TaskNotFound)?
.wait()
.await?;

trace!("task {} woke up", job);
trace!("task {} woke up", job_trace_hash);
if !task_status.success() {
debug!("task terminated {}", job);
debug!("task terminated {}", job_trace_hash);
return Err(ProverControllerError::TaskTerminated);
}

let task_output = self
.tasks
.remove(&job)
.remove(&job_trace_hash)
.ok_or(ProverControllerError::TaskNotFound)?
.wait_with_output()
.await?;
trace!("task {} output {:?}", job, task_output);
trace!("task {} output {:?}", job_trace_hash, task_output);

let mut input = String::new();
out_file.read_to_string(&mut input)?;
Expand All @@ -88,16 +95,23 @@ impl ProverController for StoneProver {
Ok(JobWitness { data })
}

async fn terminate(&mut self, job: &Job) -> Result<(), ProverControllerError> {
self.tasks.get_mut(job).ok_or(ProverControllerError::TaskNotFound)?.start_kill()?;
trace!("task scheduled for termination {}", job);
async fn terminate(&mut self, job_trace_hash: u64) -> Result<(), ProverControllerError> {
self.tasks
.get_mut(&job_trace_hash)
.ok_or(ProverControllerError::TaskNotFound)?
.start_kill()?;
trace!("task scheduled for termination {}", job_trace_hash);
Ok(())
}

async fn drop(mut self) -> Result<(), ProverControllerError> {
let keys: Vec<Job> = self.tasks.keys().cloned().collect();
for job in keys.iter() {
self.terminate(job).await?;
let keys: Vec<u64> = self.tasks.keys().cloned().collect();
for job_trace_hash in keys.iter() {
self.tasks
.get_mut(job_trace_hash)
.ok_or(ProverControllerError::TaskNotFound)?
.start_kill()?;
trace!("task scheduled for termination {}", job_trace_hash);
}
Ok(())
}
Expand Down
7 changes: 3 additions & 4 deletions crates/prover/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use sharp_p2p_common::{job::Job, job_witness::JobWitness};

use crate::errors::ProverControllerError;
use sharp_p2p_common::{job_trace::JobTrace, job_witness::JobWitness};

pub trait Prover {
fn init() -> impl ProverController;
}

pub trait ProverController {
async fn prove(&mut self, job: Job) -> Result<JobWitness, ProverControllerError>;
async fn terminate(&mut self, job: &Job) -> Result<(), ProverControllerError>;
async fn prove(&mut self, job_trace: JobTrace) -> Result<JobWitness, ProverControllerError>;
async fn terminate(&mut self, job_trace_hash: u64) -> Result<(), ProverControllerError>;
async fn drop(self) -> Result<(), ProverControllerError>;
}
23 changes: 23 additions & 0 deletions crates/runner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "sharp-p2p-runner"
version.workspace = true
edition.workspace = true
repository.workspace = true
license-file.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-process.workspace = true
cairo-proof-parser.workspace = true
futures.workspace= true
itertools.workspace = true
serde_json.workspace = true
serde.workspace = true
sharp-p2p-common.workspace = true
strum.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
zip-extensions.workspace = true
Loading

0 comments on commit a2908c0

Please sign in to comment.