Skip to content

Commit

Permalink
Merge pull request #27 from iosis-tech/distributed_hash_table
Browse files Browse the repository at this point in the history
Distributed hash table
  • Loading branch information
Okm165 authored Aug 12, 2024
2 parents df2a6a1 + 6459cdd commit 17dddea
Show file tree
Hide file tree
Showing 25 changed files with 709 additions and 138 deletions.
13 changes: 3 additions & 10 deletions crates/common/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::hash;
use cairo_vm::vm::runners::cairo_pie::CairoPie;
use libp2p::PeerId;
use libp2p::{kad, PeerId};
use serde::{Deserialize, Serialize};
use starknet::signers::{SigningKey, VerifyingKey};
use starknet_crypto::{poseidon_hash_many, FieldElement, Signature};
Expand Down Expand Up @@ -101,17 +101,10 @@ impl Display for Job {
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobBid {
pub identity: PeerId,
pub job_hash: u64,
pub price: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct JobDelegation {
pub identity: PeerId,
pub job: Job,
pub job_key: kad::RecordKey,
pub price: u64,
}

Expand Down
35 changes: 32 additions & 3 deletions crates/common/src/job_trace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::hash;
use libp2p::kad;
use std::{
fmt::Display,
hash::{DefaultHasher, Hash, Hasher},
mem::ManuallyDrop,
};
use tempfile::NamedTempFile;

Expand All @@ -13,11 +15,38 @@ use tempfile::NamedTempFile;

#[derive(Debug)]
pub struct JobTrace {
pub job_hash: u64,
pub job_key: kad::RecordKey,
pub air_public_input: NamedTempFile, // Temporary file containing the public input
pub air_private_input: NamedTempFile, // Temporary file containing the private input; memory and trace files must exist for this to be valid
pub memory: NamedTempFile, // Temporary file containing memory data (required for air_private_input validity)
pub trace: NamedTempFile, // Temporary file containing trace data (required for air_private_input validity)
pub memory: ManuallyDrop<NamedTempFile>, // Temporary file containing memory data (required for air_private_input validity)
pub trace: ManuallyDrop<NamedTempFile>, // Temporary file containing trace data (required for air_private_input validity)
}

impl JobTrace {
pub fn new(
job_key: kad::RecordKey,
air_public_input: NamedTempFile,
air_private_input: NamedTempFile,
memory: NamedTempFile,
trace: NamedTempFile,
) -> Self {
Self {
job_key,
air_public_input,
air_private_input,
memory: ManuallyDrop::new(memory),
trace: ManuallyDrop::new(trace),
}
}
}

impl Drop for JobTrace {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.memory);
ManuallyDrop::drop(&mut self.trace);
}
}
}

impl Hash for JobTrace {
Expand Down
3 changes: 2 additions & 1 deletion crates/common/src/job_witness.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::hash;
use libp2p::kad;
use serde::{Deserialize, Serialize};
use std::{
fmt::Display,
Expand All @@ -14,7 +15,7 @@ use std::{

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct JobWitness {
pub job_hash: u64,
pub job_key: kad::RecordKey,
pub proof: Vec<u8>,
}

Expand Down
30 changes: 16 additions & 14 deletions crates/delegator/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
};
use futures::StreamExt;
use hyper::StatusCode;
use libp2p::PeerId;
use libp2p::kad;
use serde::{Deserialize, Serialize};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{io, time::Duration};
Expand All @@ -19,7 +19,7 @@ use crate::delegator::DelegatorEvent;
#[derive(Debug)]
pub struct ServerState {
pub delegate_tx: mpsc::Sender<JobData>,
pub events_rx: broadcast::Receiver<(u64, DelegatorEvent)>,
pub events_rx: broadcast::Receiver<(kad::RecordKey, DelegatorEvent)>,
}

impl Clone for ServerState {
Expand All @@ -39,29 +39,29 @@ pub struct DelegateRequest {

#[derive(Debug, Serialize)]
pub struct DelegateResponse {
job_hash: String,
job_key: String,
}

pub async fn deletage_handler(
State(state): State<ServerState>,
Json(input): Json<DelegateRequest>,
) -> Result<Json<DelegateResponse>, StatusCode> {
let job_data = JobData::new(input.pie);
let job_data_hash = hash!(&job_data);
let job_data_hash = kad::RecordKey::new(&hash!(job_data).to_be_bytes());
state.delegate_tx.send(job_data).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(DelegateResponse { job_hash: job_data_hash.to_string() }))
Ok(Json(DelegateResponse { job_key: hex::encode(job_data_hash) }))
}

#[derive(Debug, Deserialize)]
pub struct JobEventsRequest {
job_hash: String,
job_key: String,
}

#[derive(Debug, Serialize)]
#[serde(tag = "type", content = "data")]
pub enum JobEventsResponse {
BidReceived(PeerId),
Delegated(PeerId),
BidReceived(String),
Delegated(String),
Finished(Vec<u8>),
}

Expand All @@ -70,17 +70,19 @@ pub async fn job_events_handler(
Query(input): Query<JobEventsRequest>,
) -> Sse<impl Stream<Item = Result<Event, io::Error>>> {
let stream = stream! {
let job_hash = input.job_hash.parse::<u64>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
let job_key = kad::RecordKey::new(
&hex::decode(input.job_key)
.map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()))?
);
loop {
tokio::select! {
Ok((hash, event)) = state.events_rx.recv() => {
if hash == job_hash {
Ok((key, event)) = state.events_rx.recv() => {
if key == job_key {
yield Event::default()
.json_data(
match event {
DelegatorEvent::BidReceived(peer_id) => { JobEventsResponse::BidReceived(peer_id) },
DelegatorEvent::Delegated(peer_id) => { JobEventsResponse::Delegated(peer_id) },
DelegatorEvent::BidReceived(peer_id) => { JobEventsResponse::BidReceived(peer_id.to_base58()) },
DelegatorEvent::Delegated(peer_id) => { JobEventsResponse::Delegated(peer_id.to_base58()) },
DelegatorEvent::Finished(data) => { JobEventsResponse::Finished(data) },
}
)
Expand Down
18 changes: 11 additions & 7 deletions crates/delegator/src/bid_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use libp2p::PeerId;
use libp2p::{kad, PeerId};
use std::{collections::BTreeMap, future::Future, pin::Pin, time::Duration};
use tokio::{sync::mpsc, time::sleep};
use zetina_common::{job::Job, process::Process};
use zetina_common::process::Process;

pub struct BidQueue {}

Expand All @@ -19,17 +19,21 @@ impl Default for BidQueue {

impl BidQueue {
pub fn run<'future>(
job: Job,
job_hash: kad::RecordKey,
) -> (
Process<'future, Result<(Job, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>,
Process<'future, Result<(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>,
mpsc::Sender<(u64, PeerId)>,
) {
let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10);
let (bid_tx, mut bid_rx) = mpsc::channel::<(u64, PeerId)>(10);
let future: Pin<
Box<
dyn Future<Output = Result<(Job, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>
+ Send
dyn Future<
Output = Result<
(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>),
BidControllerError,
>,
> + Send
+ '_,
>,
> = Box::pin(async move {
Expand All @@ -53,7 +57,7 @@ impl BidQueue {
}
}
_ = sleep(duration) => {
break Ok((job, bids.take().ok_or(BidControllerError::BidsTerminated)?))
break Ok((job_hash, bids.take().ok_or(BidControllerError::BidsTerminated)?))
}
_ = terminate_rx.recv() => {
break Err(BidControllerError::TaskTerminated);
Expand Down
89 changes: 63 additions & 26 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::bid_queue::{BidControllerError, BidQueue};
use futures::stream::FuturesUnordered;
use futures::Stream;
use libp2p::{gossipsub, PeerId};
use libp2p::{gossipsub, kad, PeerId};
use starknet::signers::SigningKey;
use std::collections::{BTreeMap, HashMap};
use std::hash::{DefaultHasher, Hash, Hasher};
Expand All @@ -13,10 +13,11 @@ use tokio_stream::StreamExt;
use tracing::{error, info};
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_common::hash;
use zetina_common::job::{Job, JobData, JobDelegation};
use zetina_common::job::{Job, JobBid, JobData};
use zetina_common::job_witness::JobWitness;
use zetina_common::process::Process;
use zetina_peer::swarm::{
DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic,
DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic,
};

pub struct Delegator {
Expand All @@ -27,68 +28,99 @@ impl Delegator {
pub fn new(
mut swarm_events: Pin<Box<dyn Stream<Item = PeerBehaviourEvent> + Send>>,
gossipsub_tx: Sender<GossipsubMessage>,
kademlia_tx: Sender<KademliaMessage>,
mut delegate_rx: mpsc::Receiver<JobData>,
events_tx: broadcast::Sender<(u64, DelegatorEvent)>,
events_tx: broadcast::Sender<(kad::RecordKey, DelegatorEvent)>,
signing_key: SigningKey,
) -> Self {
Self {
handle: Some(tokio::spawn(async move {
let mut job_bid_scheduler = FuturesUnordered::<
Process<Result<(Job, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>,
Process<
Result<(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>), BidControllerError>,
>,
>::new();
let mut job_hash_store = HashMap::<u64, mpsc::Sender<(u64, PeerId)>>::new();
let mut job_hash_store =
HashMap::<kad::RecordKey, mpsc::Sender<(u64, PeerId)>>::new();
let mut proof_hash_store = HashMap::<kad::RecordKey, kad::RecordKey>::new();

loop {
tokio::select! {
Some(job_data) = delegate_rx.recv() => {
let job = Job::try_from_job_data(job_data, &signing_key);
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Market.into(),
data: serde_json::to_vec(&MarketMessage::Job(job.to_owned()))?
}).await?;
info!("Propagated job: {} for bidding", hash!(job));
let (process, bid_tx) = BidQueue::run(job.to_owned());
job_bid_scheduler.push(process);
job_hash_store.insert(hash!(job), bid_tx);
let job_key = kad::RecordKey::new(&hash!(job).to_be_bytes());
kademlia_tx.send(KademliaMessage::PUT(
(job_key, serde_json::to_vec(&job)?)
)).await?;
},
Some(event) = swarm_events.next() => {
match event {
PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }) => {
if message.topic == Topic::Market.into() {
match serde_json::from_slice::<MarketMessage>(&message.data)? {
MarketMessage::JobBid(job_bid) => {
if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_hash) {
info!("Received job bid: {} price: {} from: {}", job_bid.job_hash, job_bid.price, job_bid.identity);
if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_key) {
info!("Received job bid: {} price: {} from: {}", hex::encode(&job_bid.job_key), job_bid.price, job_bid.identity);
bid_tx.send((job_bid.price, job_bid.identity)).await?;
events_tx.send((job_bid.job_hash, DelegatorEvent::BidReceived(job_bid.identity)))?;
events_tx.send((job_bid.job_key, DelegatorEvent::BidReceived(job_bid.identity)))?;
}
}
_ => {}
}
}
if message.topic == Topic::Delegation.into() {
match serde_json::from_slice::<DelegationMessage>(&message.data)? {
DelegationMessage::Finished(job_witness) => {
info!("Received finished job: {}", job_witness.job_hash);
events_tx.send((job_witness.job_hash, DelegatorEvent::Finished(job_witness.proof)))?;
DelegationMessage::Finished(proof_key, job_key) => {
if job_hash_store.remove(&job_key).is_some() {
info!("Received finished job: {} proof key: {}", hex::encode(&job_key), hex::encode(&proof_key));
proof_hash_store.insert(proof_key.to_owned(), job_key);
kademlia_tx.send(KademliaMessage::GET(proof_key)).await?;
}
}
_ => {}
}
}
},
PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..}) => {
match result {
kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Market.into(),
data: serde_json::to_vec(&MarketMessage::JobBidPropagation(key.to_owned()))?
}).await?;
info!("Propagated job: {} for bidding", hex::encode(&key));
let (process, bid_tx) = BidQueue::run(key.to_owned());
job_bid_scheduler.push(process);
job_hash_store.insert(key, bid_tx);
},
kad::QueryResult::GetRecord(Ok(
kad::GetRecordOk::FoundRecord(kad::PeerRecord {
record: kad::Record { key, value, .. },
..
})
)) => {
if let Some ((proof_key, job_key)) = proof_hash_store.remove_entry(&key) {
info!("job {} proof with key: {} returned in DHT", hex::encode(&job_key), hex::encode(&proof_key));
let job_witness: JobWitness = serde_json::from_slice(&value)?;
events_tx.send((job_key, DelegatorEvent::Finished(job_witness.proof)))?;
}
},
_ => {}
}
}
_ => {}
}
}
Some(Ok((job, bids))) = job_bid_scheduler.next() => {
job_hash_store.remove(&hash!(job));
Some(Ok((job_key, bids))) = job_bid_scheduler.next() => {
let bid = bids.first_key_value().unwrap();
let price = *bid.0;
let identity = *bid.1.first().unwrap();
info!("Job {} delegated to best bidder: {}", hash!(job), identity);
info!("Job {} delegated to best bidder: {}", hex::encode(&job_key), identity);
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Delegation.into(),
data: serde_json::to_vec(&DelegationMessage::Delegate(JobDelegation{identity, job: job.to_owned(), price}))?
data: serde_json::to_vec(&DelegationMessage::Delegate(JobBid{identity, job_key: job_key.to_owned(), price}))?
}).await?;
events_tx.send((hash!(job), DelegatorEvent::Delegated(identity)))?;
events_tx.send((job_key, DelegatorEvent::Delegated(identity)))?;
}
_ = shutdown_signal() => {
break
Expand Down Expand Up @@ -125,8 +157,13 @@ pub enum Error {
#[error("mpsc_send_error GossipsubMessage")]
MpscSendErrorGossipsubMessage(#[from] mpsc::error::SendError<GossipsubMessage>),

#[error("mpsc_send_error KademliaMessage")]
MpscSendErrorKademliaMessage(#[from] mpsc::error::SendError<KademliaMessage>),

#[error("mpsc_send_error DelegatorEvent")]
BreadcastSendErrorDelegatorEvent(#[from] broadcast::error::SendError<(u64, DelegatorEvent)>),
BreadcastSendErrorDelegatorEvent(
#[from] broadcast::error::SendError<(kad::RecordKey, DelegatorEvent)>,
),

#[error("mpsc_send_error JobBid")]
MpscSendErrorJobBid(#[from] mpsc::error::SendError<(u64, PeerId)>),
Expand Down
Loading

0 comments on commit 17dddea

Please sign in to comment.