From a2908c09bada4b1561854db49633dc43d5958851 Mon Sep 17 00:00:00 2001 From: Bartosz Nowak Date: Thu, 18 Apr 2024 20:23:38 +0200 Subject: [PATCH] runner dev & prover refactor --- Cargo.toml | 7 +- crates/common/Cargo.toml | 2 + crates/common/src/hash_macro.rs | 8 ++ crates/common/src/job.rs | 29 ++++-- crates/common/src/job_trace.rs | 33 +++++++ crates/common/src/job_witness.rs | 8 +- crates/common/src/lib.rs | 2 + crates/delegator/Cargo.toml | 1 + crates/executor/Cargo.toml | 3 +- crates/executor/src/main.rs | 2 +- crates/prover/src/lib.rs | 3 +- crates/prover/src/stone_prover/mod.rs | 84 ++++++++++-------- crates/prover/src/traits.rs | 7 +- crates/runner/Cargo.toml | 23 +++++ crates/runner/src/cairo_runner/mod.rs | 121 ++++++++++++++++++++++++++ crates/runner/src/errors.rs | 19 ++++ crates/runner/src/lib.rs | 5 ++ crates/runner/src/traits.rs | 13 +++ crates/runner/src/types/input.rs | 30 +++++++ crates/runner/src/types/layout.rs | 7 ++ crates/runner/src/types/mod.rs | 2 + 21 files changed, 351 insertions(+), 58 deletions(-) create mode 100644 crates/common/src/hash_macro.rs create mode 100644 crates/common/src/job_trace.rs create mode 100644 crates/runner/Cargo.toml create mode 100644 crates/runner/src/cairo_runner/mod.rs create mode 100644 crates/runner/src/errors.rs create mode 100644 crates/runner/src/lib.rs create mode 100644 crates/runner/src/traits.rs create mode 100644 crates/runner/src/types/input.rs create mode 100644 crates/runner/src/types/layout.rs create mode 100644 crates/runner/src/types/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 23d6cb4..1545272 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ "crates/delegator", "crates/executor", "crates/peer", - "crates/prover", + "crates/prover", "crates/runner", ] exclude = [] @@ -28,17 +28,20 @@ futures-core = "0.3.30" futures-util = "0.3.30" hex = "0.4.3" itertools = "0.12.1" -libp2p = { version = "0.53.2", features = ["tokio","gossipsub","kad","mdns","noise","macros","tcp","yamux","quic"]} +libp2p = { version = "0.53.2", features = ["secp256k1", "tokio","gossipsub","kad","mdns","noise","macros","tcp","yamux","quic"]} +libsecp256k1 = "0.7.1" num-bigint = "0.4.4" serde = "1.0.197" serde_json = "1.0.115" starknet = "0.9.0" +strum = { version = "0.26", features = ["derive"] } tempfile = "3.10.1" thiserror = "1.0.58" tokio = { version = "1.36", features = ["full"] } tokio-util = "0.7.10" tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +zip-extensions = "0.6.2" sharp-p2p-common = { path = "crates/common" } sharp-p2p-delegator = { path = "crates/delegator" } diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 6d964e3..6eeebdc 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -11,7 +11,9 @@ license-file.workspace = true cairo-felt.workspace = true hex.workspace = true libp2p.workspace = true +libsecp256k1.workspace = true num-bigint.workspace = true serde_json.workspace = true serde.workspace = true +tempfile.workspace = true thiserror.workspace = true \ No newline at end of file diff --git a/crates/common/src/hash_macro.rs b/crates/common/src/hash_macro.rs new file mode 100644 index 0000000..32dc4dd --- /dev/null +++ b/crates/common/src/hash_macro.rs @@ -0,0 +1,8 @@ +#[macro_export] +macro_rules! hash { + ($value:expr) => {{ + let mut hasher = DefaultHasher::new(); + $value.hash(&mut hasher); + hasher.finish() + }}; +} diff --git a/crates/common/src/job.rs b/crates/common/src/job.rs index 90c6c99..3f6e61b 100644 --- a/crates/common/src/job.rs +++ b/crates/common/src/job.rs @@ -1,22 +1,35 @@ +use libsecp256k1::{PublicKey, Signature}; use std::{ fmt::Display, hash::{DefaultHasher, Hash, Hasher}, }; -#[derive(Debug, PartialEq, Eq, Hash, Clone)] +use crate::hash; + +#[derive(Debug, PartialEq, Eq, Clone)] pub struct Job { pub reward: u32, pub num_of_steps: u32, - pub private_input: Vec, - pub public_input: Vec, - pub cpu_air_prover_config: Vec, - pub cpu_air_params: Vec, + pub cairo_pie: Vec, + pub public_key: PublicKey, + pub signature: Signature, + // below fields not bounded by signature + pub cpu_air_params: Vec, // needed for proving + pub cpu_air_prover_config: Vec, // needed for proving +} + +impl Hash for Job { + fn hash(&self, state: &mut H) { + self.reward.hash(state); + self.num_of_steps.hash(state); + self.cairo_pie.hash(state); + self.cpu_air_prover_config.hash(state); + self.cpu_air_params.hash(state); + } } impl Display for Job { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut hasher = DefaultHasher::new(); - self.hash(&mut hasher); - write!(f, "{}", hex::encode(hasher.finish().to_be_bytes())) + write!(f, "{}", hex::encode(hash!(self).to_be_bytes())) } } diff --git a/crates/common/src/job_trace.rs b/crates/common/src/job_trace.rs new file mode 100644 index 0000000..bd7c627 --- /dev/null +++ b/crates/common/src/job_trace.rs @@ -0,0 +1,33 @@ +use crate::hash; +use std::{ + fmt::Display, + hash::{DefaultHasher, Hash, Hasher}, +}; +use tempfile::NamedTempFile; + +#[derive(Debug)] +pub struct JobTrace { + pub air_public_input: NamedTempFile, + pub air_private_input: NamedTempFile, + pub memory: NamedTempFile, // this is not used directly but needs to live for air_private_input to be valid + pub trace: NamedTempFile, // this is not used directly but needs to live for air_private_input to be valid + pub cpu_air_prover_config: NamedTempFile, + pub cpu_air_params: NamedTempFile, +} + +impl Hash for JobTrace { + fn hash(&self, state: &mut H) { + self.air_public_input.path().hash(state); + self.air_private_input.path().hash(state); + self.memory.path().hash(state); + self.trace.path().hash(state); + self.cpu_air_prover_config.path().hash(state); + self.cpu_air_params.path().hash(state); + } +} + +impl Display for JobTrace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(hash!(self).to_be_bytes())) + } +} diff --git a/crates/common/src/job_witness.rs b/crates/common/src/job_witness.rs index de95469..a038193 100644 --- a/crates/common/src/job_witness.rs +++ b/crates/common/src/job_witness.rs @@ -1,10 +1,10 @@ +use crate::hash; +use cairo_felt::Felt252; use std::{ fmt::Display, hash::{DefaultHasher, Hash, Hasher}, }; -use cairo_felt::Felt252; - #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct JobWitness { pub data: Vec, @@ -12,8 +12,6 @@ pub struct JobWitness { impl Display for JobWitness { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut hasher = DefaultHasher::new(); - self.hash(&mut hasher); - write!(f, "{}", hex::encode(hasher.finish().to_be_bytes())) + write!(f, "{}", hex::encode(hash!(self).to_be_bytes())) } } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index e83a2cf..d40106a 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,4 +1,6 @@ +pub mod hash_macro; pub mod job; +pub mod job_trace; pub mod job_witness; pub mod network; pub mod topic; diff --git a/crates/delegator/Cargo.toml b/crates/delegator/Cargo.toml index 889f138..69504f7 100644 --- a/crates/delegator/Cargo.toml +++ b/crates/delegator/Cargo.toml @@ -10,6 +10,7 @@ license-file.workspace = true [dependencies] futures-util.workspace = true libp2p.workspace = true +libsecp256k1.workspace = true sharp-p2p-common.workspace = true sharp-p2p-peer.workspace = true tokio.workspace = true diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml index 152c39a..c99cdd3 100644 --- a/crates/executor/Cargo.toml +++ b/crates/executor/Cargo.toml @@ -10,8 +10,9 @@ license-file.workspace = true [dependencies] futures-util.workspace = true libp2p.workspace = true +libsecp256k1.workspace = true sharp-p2p-common.workspace = true sharp-p2p-peer.workspace = true tokio.workspace = true tracing-subscriber.workspace = true -tracing.workspace = true +tracing.workspace = true \ No newline at end of file diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 8660dc8..422d2c6 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box> { let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); // 1. Generate keypair for the node - let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519(); + let p2p_local_keypair = libp2p::identity::Keypair::generate_secp256k1(); // 2. Generate topic let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob); diff --git a/crates/prover/src/lib.rs b/crates/prover/src/lib.rs index b92842a..20652e8 100644 --- a/crates/prover/src/lib.rs +++ b/crates/prover/src/lib.rs @@ -1,5 +1,4 @@ pub mod errors; +pub mod stone_prover; #[allow(async_fn_in_trait)] pub mod traits; - -pub mod stone_prover; diff --git a/crates/prover/src/stone_prover/mod.rs b/crates/prover/src/stone_prover/mod.rs index b750727..efad759 100644 --- a/crates/prover/src/stone_prover/mod.rs +++ b/crates/prover/src/stone_prover/mod.rs @@ -1,18 +1,20 @@ -use std::{collections::HashMap, io::Read}; - use crate::{ errors::ProverControllerError, traits::{Prover, ProverController}, }; use itertools::{chain, Itertools}; -use sharp_p2p_common::{job::Job, job_witness::JobWitness, vec252::VecFelt252}; -use std::io::Write; +use sharp_p2p_common::{hash, job_trace::JobTrace, job_witness::JobWitness, vec252::VecFelt252}; +use std::{ + collections::HashMap, + hash::{DefaultHasher, Hash, Hasher}, + io::Read, +}; use tempfile::NamedTempFile; use tokio::process::{Child, Command}; use tracing::{debug, trace}; pub struct StoneProver { - tasks: HashMap, + tasks: HashMap, } impl Prover for StoneProver { @@ -22,47 +24,52 @@ impl Prover for StoneProver { } impl ProverController for StoneProver { - async fn prove(&mut self, job: Job) -> Result { + async fn prove(&mut self, job_trace: JobTrace) -> Result { let mut out_file = NamedTempFile::new()?; - let mut private_input_file = NamedTempFile::new()?; - let mut public_input_file = NamedTempFile::new()?; - let mut prover_config_file = NamedTempFile::new()?; - let mut parameter_file = NamedTempFile::new()?; - - private_input_file.write_all(&job.private_input)?; - public_input_file.write_all(&job.public_input)?; - prover_config_file.write_all(&job.cpu_air_prover_config)?; - parameter_file.write_all(&job.cpu_air_params)?; - trace!("task {} environment prepared", job); let task = Command::new("cpu_air_prover") - .args(["out_file", out_file.path().to_string_lossy().as_ref()]) - .args(["private_input_file", private_input_file.path().to_string_lossy().as_ref()]) - .args(["public_input_file", public_input_file.path().to_string_lossy().as_ref()]) - .args(["prover_config_file", prover_config_file.path().to_string_lossy().as_ref()]) - .args(["parameter_file", parameter_file.path().to_string_lossy().as_ref()]) + .args(["--out_file", out_file.path().to_string_lossy().as_ref()]) + .args([ + "--air_private_input", + job_trace.air_private_input.path().to_string_lossy().as_ref(), + ]) + .args([ + "--air_public_input", + job_trace.air_public_input.path().to_string_lossy().as_ref(), + ]) + .args([ + "--cpu_air_prover_config", + job_trace.cpu_air_prover_config.path().to_string_lossy().as_ref(), + ]) + .args(["--cpu_air_params", job_trace.cpu_air_params.path().to_string_lossy().as_ref()]) .arg("--generate_annotations") .spawn()?; - debug!("task {} spawned", job); - self.tasks.insert(job.to_owned(), task); + let job_trace_hash = hash!(job_trace); + + debug!("task {} spawned", job_trace_hash); + self.tasks.insert(job_trace_hash.to_owned(), task); - let task_status = - self.tasks.get_mut(&job).ok_or(ProverControllerError::TaskNotFound)?.wait().await?; + let task_status = self + .tasks + .get_mut(&job_trace_hash) + .ok_or(ProverControllerError::TaskNotFound)? + .wait() + .await?; - trace!("task {} woke up", job); + trace!("task {} woke up", job_trace_hash); if !task_status.success() { - debug!("task terminated {}", job); + debug!("task terminated {}", job_trace_hash); return Err(ProverControllerError::TaskTerminated); } let task_output = self .tasks - .remove(&job) + .remove(&job_trace_hash) .ok_or(ProverControllerError::TaskNotFound)? .wait_with_output() .await?; - trace!("task {} output {:?}", job, task_output); + trace!("task {} output {:?}", job_trace_hash, task_output); let mut input = String::new(); out_file.read_to_string(&mut input)?; @@ -88,16 +95,23 @@ impl ProverController for StoneProver { Ok(JobWitness { data }) } - async fn terminate(&mut self, job: &Job) -> Result<(), ProverControllerError> { - self.tasks.get_mut(job).ok_or(ProverControllerError::TaskNotFound)?.start_kill()?; - trace!("task scheduled for termination {}", job); + async fn terminate(&mut self, job_trace_hash: u64) -> Result<(), ProverControllerError> { + self.tasks + .get_mut(&job_trace_hash) + .ok_or(ProverControllerError::TaskNotFound)? + .start_kill()?; + trace!("task scheduled for termination {}", job_trace_hash); Ok(()) } async fn drop(mut self) -> Result<(), ProverControllerError> { - let keys: Vec = self.tasks.keys().cloned().collect(); - for job in keys.iter() { - self.terminate(job).await?; + let keys: Vec = self.tasks.keys().cloned().collect(); + for job_trace_hash in keys.iter() { + self.tasks + .get_mut(job_trace_hash) + .ok_or(ProverControllerError::TaskNotFound)? + .start_kill()?; + trace!("task scheduled for termination {}", job_trace_hash); } Ok(()) } diff --git a/crates/prover/src/traits.rs b/crates/prover/src/traits.rs index 9d75910..2d78f22 100644 --- a/crates/prover/src/traits.rs +++ b/crates/prover/src/traits.rs @@ -1,13 +1,12 @@ -use sharp_p2p_common::{job::Job, job_witness::JobWitness}; - use crate::errors::ProverControllerError; +use sharp_p2p_common::{job_trace::JobTrace, job_witness::JobWitness}; pub trait Prover { fn init() -> impl ProverController; } pub trait ProverController { - async fn prove(&mut self, job: Job) -> Result; - async fn terminate(&mut self, job: &Job) -> Result<(), ProverControllerError>; + async fn prove(&mut self, job_trace: JobTrace) -> Result; + async fn terminate(&mut self, job_trace_hash: u64) -> Result<(), ProverControllerError>; async fn drop(self) -> Result<(), ProverControllerError>; } diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml new file mode 100644 index 0000000..17b2e24 --- /dev/null +++ b/crates/runner/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "sharp-p2p-runner" +version.workspace = true +edition.workspace = true +repository.workspace = true +license-file.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-process.workspace = true +cairo-proof-parser.workspace = true +futures.workspace= true +itertools.workspace = true +serde_json.workspace = true +serde.workspace = true +sharp-p2p-common.workspace = true +strum.workspace = true +tempfile.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +zip-extensions.workspace = true \ No newline at end of file diff --git a/crates/runner/src/cairo_runner/mod.rs b/crates/runner/src/cairo_runner/mod.rs new file mode 100644 index 0000000..0702b1d --- /dev/null +++ b/crates/runner/src/cairo_runner/mod.rs @@ -0,0 +1,121 @@ +use crate::{ + errors::RunnerControllerError, + traits::{Runner, RunnerController}, + types::{ + input::{BootloaderInput, Task}, + layout::Layout, + }, +}; +use sharp_p2p_common::{hash, job::Job, job_trace::JobTrace}; +use std::io::Write; +use std::{ + collections::HashMap, + hash::{DefaultHasher, Hash, Hasher}, +}; +use tempfile::NamedTempFile; +use tokio::process::{Child, Command}; +use tracing::{debug, trace}; + +pub struct CairoRunner { + tasks: HashMap, +} + +impl Runner for CairoRunner { + fn init() -> impl RunnerController { + Self { tasks: HashMap::new() } + } +} + +impl RunnerController for CairoRunner { + async fn run(&mut self, job: Job) -> Result { + let program = NamedTempFile::new()?; + let layout: &str = Layout::Recursive.into(); + + let mut cairo_pie = NamedTempFile::new()?; + cairo_pie.write_all(&job.cairo_pie)?; + + let input = BootloaderInput { + tasks: vec![Task { path: cairo_pie.path().to_path_buf(), ..Default::default() }], + ..Default::default() + }; + + let mut program_input = NamedTempFile::new()?; + program_input.write_all(&serde_json::to_string(&input)?.into_bytes())?; + + // outputs + let air_public_input = NamedTempFile::new()?; + let air_private_input = NamedTempFile::new()?; + let trace = NamedTempFile::new()?; + let memory = NamedTempFile::new()?; + + let task = Command::new("cairo-run") + .args(["--program", program.path().to_string_lossy().as_ref()]) + .args(["--layout", layout]) + .args(["--program_input", program_input.path().to_string_lossy().as_ref()]) + .args(["--air_public_input", air_public_input.path().to_string_lossy().as_ref()]) + .args(["--air_private_input", air_private_input.path().to_string_lossy().as_ref()]) + .args(["--trace_file", trace.path().to_string_lossy().as_ref()]) + .args(["--memory_file", memory.path().to_string_lossy().as_ref()]) + .arg("--proof_mode") + .arg("--print_output") + .spawn()?; + + let job_hash = hash!(job); + + debug!("task {} spawned", job_hash); + self.tasks.insert(job_hash.to_owned(), task); + + let task_status = self + .tasks + .get_mut(&job_hash) + .ok_or(RunnerControllerError::TaskNotFound)? + .wait() + .await?; + + trace!("task {} woke up", job_hash); + if !task_status.success() { + debug!("task terminated {}", job_hash); + return Err(RunnerControllerError::TaskTerminated); + } + + let task_output = self + .tasks + .remove(&job_hash) + .ok_or(RunnerControllerError::TaskNotFound)? + .wait_with_output() + .await?; + trace!("task {} output {:?}", job_hash, task_output); + + let mut cpu_air_params = NamedTempFile::new()?; + let mut cpu_air_prover_config = NamedTempFile::new()?; + cpu_air_params.write_all(&job.cpu_air_params)?; + cpu_air_prover_config.write_all(&job.cpu_air_prover_config)?; + + Ok(JobTrace { + air_public_input, + air_private_input, + memory, + trace, + cpu_air_prover_config, + cpu_air_params, + }) + } + + async fn terminate(&mut self, job_hash: u64) -> Result<(), RunnerControllerError> { + self.tasks.get_mut(&job_hash).ok_or(RunnerControllerError::TaskNotFound)?.start_kill()?; + trace!("task scheduled for termination {}", job_hash); + Ok(()) + } + + async fn drop(mut self) -> Result<(), RunnerControllerError> { + let keys: Vec = self.tasks.keys().cloned().collect(); + for job_hash in keys.iter() { + self.tasks + .get_mut(job_hash) + .ok_or(RunnerControllerError::TaskNotFound)? + .start_kill()?; + trace!("task scheduled for termination {}", job_hash); + } + Ok(()) + } +} diff --git a/crates/runner/src/errors.rs b/crates/runner/src/errors.rs new file mode 100644 index 0000000..e115f1b --- /dev/null +++ b/crates/runner/src/errors.rs @@ -0,0 +1,19 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum RunnerControllerError { + #[error("task not found")] + TaskNotFound, + + #[error("task not found")] + TaskTerminated, + + #[error("io")] + Io(#[from] std::io::Error), + + #[error("serde")] + Serde(#[from] serde_json::Error), + + #[error("proof parsing error")] + ProofParseError(String), +} diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs new file mode 100644 index 0000000..26e9b70 --- /dev/null +++ b/crates/runner/src/lib.rs @@ -0,0 +1,5 @@ +pub mod cairo_runner; +pub mod errors; +#[allow(async_fn_in_trait)] +pub mod traits; +pub mod types; diff --git a/crates/runner/src/traits.rs b/crates/runner/src/traits.rs new file mode 100644 index 0000000..fe748e4 --- /dev/null +++ b/crates/runner/src/traits.rs @@ -0,0 +1,13 @@ +use sharp_p2p_common::{job::Job, job_trace::JobTrace}; + +use crate::errors::RunnerControllerError; + +pub trait Runner { + fn init() -> impl RunnerController; +} + +pub trait RunnerController { + async fn run(&mut self, job: Job) -> Result; + async fn terminate(&mut self, job_hash: u64) -> Result<(), RunnerControllerError>; + async fn drop(self) -> Result<(), RunnerControllerError>; +} diff --git a/crates/runner/src/types/input.rs b/crates/runner/src/types/input.rs new file mode 100644 index 0000000..af01ee3 --- /dev/null +++ b/crates/runner/src/types/input.rs @@ -0,0 +1,30 @@ +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +#[derive(Serialize, Deserialize)] +pub struct Task { + #[serde(rename = "type")] + pub type_: String, + pub path: PathBuf, + pub use_poseidon: bool, +} + +#[derive(Serialize, Deserialize)] +pub struct BootloaderInput { + pub tasks: Vec, + pub single_page: bool, +} + +impl Default for Task { + fn default() -> Self { + Self { type_: "CairoPiePath".to_string(), path: PathBuf::default(), use_poseidon: false } + } +} + +impl Default for BootloaderInput { + fn default() -> Self { + Self { tasks: Vec::default(), single_page: true } + } +} + +pub fn write_cairo_pie_zip() {} diff --git a/crates/runner/src/types/layout.rs b/crates/runner/src/types/layout.rs new file mode 100644 index 0000000..d7edd3f --- /dev/null +++ b/crates/runner/src/types/layout.rs @@ -0,0 +1,7 @@ +use strum::IntoStaticStr; + +#[derive(Debug, PartialEq, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum Layout { + Recursive, +} diff --git a/crates/runner/src/types/mod.rs b/crates/runner/src/types/mod.rs new file mode 100644 index 0000000..1d2c2cd --- /dev/null +++ b/crates/runner/src/types/mod.rs @@ -0,0 +1,2 @@ +pub mod input; +pub mod layout;