Skip to content

Commit

Permalink
JobBidQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Aug 7, 2024
1 parent c2c98a4 commit ab981ba
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 66 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ libp2p = { version = "0.53.2", features = [
"tcp",
"yamux",
"quic",
"serde",
] }
num-bigint = "0.4.4"
proptest = "1.4.0"
Expand Down
8 changes: 8 additions & 0 deletions crates/common/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::hash;
use cairo_vm::vm::runners::cairo_pie::CairoPie;
use libp2p::PeerId;
use serde::{Deserialize, Serialize};
use starknet::signers::{SigningKey, VerifyingKey};
use starknet_crypto::{poseidon_hash_many, FieldElement, Signature};
Expand Down Expand Up @@ -106,6 +107,13 @@ pub struct JobBid {
pub price: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct JobDelegation {
pub identity: PeerId,
pub job: Job,
pub price: u64,
}

mod chunk_felt_array {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use starknet_crypto::FieldElement;
Expand Down
106 changes: 106 additions & 0 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use futures::executor::block_on;
use futures::Stream;
use libp2p::gossipsub;
use starknet::signers::SigningKey;
use std::pin::Pin;
use thiserror::Error;
use tokio::sync::mpsc::{self, Receiver};
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use tokio_stream::StreamExt;
use tracing::{error, info};
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_common::job::{Job, JobData, JobDelegation};
use zetina_peer::swarm::{
DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic,
};

use crate::job_bid_queue::JobBidQueue;

pub struct Delegator {
handle: Option<JoinHandle<Result<(), Error>>>,
}

impl Delegator {
pub fn new(
mut swarm_events: Pin<Box<dyn Stream<Item = PeerBehaviourEvent> + Send>>,
gossipsub_tx: Sender<GossipsubMessage>,
mut delegate_rx: Receiver<JobData>,
signing_key: SigningKey,
) -> Self {
Self {
handle: Some(tokio::spawn(async move {
let mut job_bid_queue = JobBidQueue::new();
loop {
tokio::select! {
Some(job_data) = delegate_rx.recv() => {
let job = Job::try_from_job_data(job_data, &signing_key);
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Market.into(),
data: serde_json::to_vec(&MarketMessage::Job(job.to_owned()))?
}).await?;
job_bid_queue.insert_job(job);
},
Some(event) = swarm_events.next() => {
match event {
PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, propagation_source, .. }) => {
if message.topic == Topic::Market.into() {
match serde_json::from_slice::<MarketMessage>(&message.data)? {
MarketMessage::JobBid(job_bid) => {
job_bid_queue.insert_bid(job_bid.to_owned(), propagation_source);
if let Some((job, identity, price)) = job_bid_queue.get_best(job_bid.job_hash) {
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Delegation.into(),
data: serde_json::to_vec(&DelegationMessage::Delegate(JobDelegation{
identity,
job,
price
}))?
}).await?;
}
}
_ => {}
}
}
if message.topic == Topic::Delegation.into() {
match serde_json::from_slice::<DelegationMessage>(&message.data)? {
DelegationMessage::Finished(job_witness) => {
info!("Received finished job: {}", job_witness.job_hash);
todo!()
}
_ => {}
}
}
}
_ => {}
}
}
_ = shutdown_signal() => {
break
}
else => break
};
}
Ok(())
})),
}
}
}

impl Drop for Delegator {
fn drop(&mut self) {
let handle = self.handle.take();
block_on(async move { handle.unwrap().await.unwrap().unwrap() });
}
}

#[derive(Error, Debug)]
pub enum Error {
#[error("mpsc_send_error")]
MpscSendError(#[from] mpsc::error::SendError<GossipsubMessage>),

#[error("io")]
Io(#[from] std::io::Error),

#[error("serde")]
Serde(#[from] serde_json::Error),
}
41 changes: 41 additions & 0 deletions crates/delegator/src/job_bid_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use libp2p::PeerId;
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use zetina_common::{
hash,
job::{Job, JobBid},
};

pub struct JobBidQueue {
map: HashMap<u64, (Job, HashMap<PeerId, u64>)>,
}

impl Default for JobBidQueue {
fn default() -> Self {
Self::new()
}
}

impl JobBidQueue {
pub fn new() -> Self {
Self { map: HashMap::new() }
}

pub fn insert_job(&mut self, job: Job) -> Option<(Job, HashMap<PeerId, u64>)> {
self.map.insert(hash!(job), (job, HashMap::new()))
}

pub fn get_best(&self, job_hash: u64) -> Option<(Job, PeerId, u64)> {
self.map.get(&job_hash).and_then(|(job, bids)| {
bids.iter()
.max_by_key(|&(_, price)| price)
.map(|(peer_id, &price)| (job.clone(), *peer_id, price))
})
}

pub fn insert_bid(&mut self, job_bid: JobBid, identity: PeerId) {
if let Some((_, map)) = self.map.get_mut(&job_bid.job_hash) {
map.insert(identity, job_bid.price);
}
}
}
24 changes: 14 additions & 10 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
pub mod delegator;
pub mod job_bid_queue;

use axum::Router;
use clap::Parser;
use delegator::Delegator;
use libp2p::Multiaddr;
use starknet::{core::types::FieldElement, signers::SigningKey};
use std::{str::FromStr, time::Duration};
use tokio::{net::TcpListener, sync::mpsc};
use tokio_stream::StreamExt;
use tower_http::{timeout::TimeoutLayer, trace::TraceLayer};
use tracing::debug;
use tracing_subscriber::EnvFilter;
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_common::{graceful_shutdown::shutdown_signal, job::JobData};
use zetina_peer::swarm::{GossipsubMessage, SwarmRunner};

#[derive(Parser)]
Expand Down Expand Up @@ -36,6 +39,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let p2p_keypair =
libp2p::identity::Keypair::from(libp2p::identity::ecdsa::Keypair::from(secret_key));

let signing_key = SigningKey::from_secret_scalar(
FieldElement::from_byte_slice_be(private_key.as_slice()).unwrap(),
);

let mut swarm_runner =
SwarmRunner::new(p2p_keypair, Multiaddr::from_str(&cli.address).unwrap())?;

Expand All @@ -44,14 +51,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.try_for_each(|addr| swarm_runner.swarm.dial(Multiaddr::from_str(&addr).unwrap()))
.unwrap();

let (_gossipsub_tx, gossipsub_rx) = mpsc::channel::<GossipsubMessage>(100);
let mut swarm_events = swarm_runner.run(gossipsub_rx);
let (gossipsub_tx, gossipsub_rx) = mpsc::channel::<GossipsubMessage>(100);
let (delegate_tx, delegate_rx) = mpsc::channel::<JobData>(100);
let swarm_events = swarm_runner.run(gossipsub_rx);

tokio::spawn(async move {
while let Some(_event) = swarm_events.next().await {
debug!("");
}
});
Delegator::new(swarm_events, gossipsub_tx, delegate_rx, signing_key);

// Create a `TcpListener` using tokio.
let listener = TcpListener::bind("0.0.0.0:3010").await.unwrap();
Expand Down
127 changes: 127 additions & 0 deletions crates/executor/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use futures::executor::block_on;
use futures::{stream::FuturesUnordered, Stream};
use libp2p::{gossipsub, PeerId};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::pin::Pin;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use tokio_stream::StreamExt;
use tracing::{error, info};
use zetina_common::{
graceful_shutdown::shutdown_signal, hash, job::JobBid, job_trace::JobTrace,
job_witness::JobWitness, process::Process,
};
use zetina_peer::swarm::{
DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic,
};
use zetina_prover::{
errors::ProverControllerError, stone_prover::StoneProver, traits::ProverController,
};
use zetina_runner::{
cairo_runner::CairoRunner, errors::RunnerControllerError, traits::RunnerController,
};

pub struct Executor {
handle: Option<JoinHandle<Result<(), Error>>>,
}

impl Executor {
pub fn new(
identity: PeerId,
mut swarm_events: Pin<Box<dyn Stream<Item = PeerBehaviourEvent> + Send>>,
gossipsub_tx: Sender<GossipsubMessage>,
runner: CairoRunner,
prover: StoneProver,
) -> Self {
Self {
handle: Some(tokio::spawn(async move {
let mut runner_scheduler =
FuturesUnordered::<Process<'_, Result<JobTrace, RunnerControllerError>>>::new();
let mut prover_scheduler = FuturesUnordered::<
Process<'_, Result<JobWitness, ProverControllerError>>,
>::new();

loop {
tokio::select! {
Some(event) = swarm_events.next() => {
match event {
PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }) => {
if message.topic == Topic::Market.into() {
match serde_json::from_slice::<MarketMessage>(&message.data)? {
MarketMessage::Job(job) => {
gossipsub_tx
.send(GossipsubMessage {
topic: Topic::Market.into(),
data: serde_json::to_vec(&MarketMessage::JobBid(JobBid {
job_hash: hash!(job),
price: (runner_scheduler.len() * prover_scheduler.len()) as u64,
}))?
})
.await?
}
_ => {}
}
}
if message.topic == Topic::Delegation.into() {
match serde_json::from_slice::<DelegationMessage>(&message.data)? {
DelegationMessage::Delegate(job_delegation) => {
if job_delegation.identity == identity {
info!("Scheduled running of job: {}", hash!(job_delegation.job));
runner_scheduler.push(runner.run(job_delegation.job)?);
}
}
_ => {}
}
}
}
_ => {}
}
}
Some(Ok(job_trace)) = runner_scheduler.next() => {
info!("Scheduled proving of job_trace: {}", &job_trace.job_hash);
prover_scheduler.push(prover.run(job_trace)?);
},
Some(Ok(job_witness)) = prover_scheduler.next() => {
info!("Finished proving: {}", &job_witness.job_hash);
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Delegation.into(),
data: serde_json::to_vec(&DelegationMessage::Finished(job_witness))?
}).await?;
},
_ = shutdown_signal() => {
break
}
else => break
};
}
Ok(())
})),
}
}
}

impl Drop for Executor {
fn drop(&mut self) {
let handle = self.handle.take();
block_on(async move { handle.unwrap().await.unwrap().unwrap() });
}
}

#[derive(Error, Debug)]
pub enum Error {
#[error("prover_controller_error")]
ProverControllerError(#[from] ProverControllerError),

#[error("runner_controller_error")]
RunnerControllerError(#[from] RunnerControllerError),

#[error("mpsc_send_error")]
MpscSendError(#[from] mpsc::error::SendError<GossipsubMessage>),

#[error("io")]
Io(#[from] std::io::Error),

#[error("serde")]
Serde(#[from] serde_json::Error),
}
Loading

0 comments on commit ab981ba

Please sign in to comment.