Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

Commit

Permalink
Merge main into dvir/random_table_test
Browse files Browse the repository at this point in the history
  • Loading branch information
DvirYo-starkware authored Apr 14, 2024
2 parents b1cc44b + fbb2c98 commit b529875
Show file tree
Hide file tree
Showing 6 changed files with 435 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/papyrus_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license-file.workspace = true
[dependencies]
async-stream.workspace = true
bytes.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
defaultmap.workspace = true
derive_more.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod converters;
mod db_executor;
pub mod main_behaviour;
pub mod network_manager;
mod peer_manager;
pub mod protobuf_messages;
pub mod streamed_bytes;
#[cfg(test)]
Expand Down
123 changes: 123 additions & 0 deletions crates/papyrus_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::collections::HashMap;

use chrono::Duration;
use libp2p::{Multiaddr, PeerId};

use self::peer::PeerTrait;
use crate::db_executor::QueryId;

mod peer;
#[cfg(test)]
mod test;

#[cfg_attr(test, derive(Debug, PartialEq))]
#[allow(dead_code)]
pub enum ReputationModifier {
// TODO: Implement this enum
Bad,
}

struct PeerManager<P> {
peers: HashMap<PeerId, P>,
// TODO: consider implementing a cleanup mechanism to not store all queries forever
query_to_peer_map: HashMap<QueryId, PeerId>,
config: PeerManagerConfig,
last_peer_index: usize,
}

#[derive(Clone)]
struct PeerManagerConfig {
target_num_for_peers: usize,
blacklist_timeout: Duration,
}

#[derive(thiserror::Error, Debug)]
pub(crate) enum PeerManagerError {
#[error("No such peer: {0}")]
NoSuchPeer(PeerId),
#[error("No such query: {0}")]
NoSuchQuery(QueryId),
}

impl Default for PeerManagerConfig {
fn default() -> Self {
Self { target_num_for_peers: 100, blacklist_timeout: Duration::max_value() }
}
}

#[allow(dead_code)]
impl<P> PeerManager<P>
where
P: PeerTrait,
{
pub fn new(config: PeerManagerConfig) -> Self {
let peers = HashMap::new();
Self { peers, query_to_peer_map: HashMap::new(), config, last_peer_index: 0 }
}

pub fn add_peer(&mut self, mut peer: P) {
peer.set_timeout_duration(self.config.blacklist_timeout);
self.peers.insert(peer.peer_id(), peer);
}

#[cfg(test)]
fn get_mut_peer(&mut self, peer_id: PeerId) -> Option<&mut P> {
self.peers.get_mut(&peer_id)
}

pub fn assign_peer_to_query(&mut self, query_id: QueryId) -> Option<(PeerId, Multiaddr)> {
if self.peers.is_empty() {
return None;
}
let peer = self
.peers
.iter()
.skip(self.last_peer_index)
.find(|(_, peer)| !peer.is_blocked())
.or_else(|| {
self.peers.iter().take(self.last_peer_index).find(|(_, peer)| !peer.is_blocked())
});
self.last_peer_index = (self.last_peer_index + 1) % self.peers.len();
peer.map(|(peer_id, peer)| {
// TODO: consider not allowing reassignment of the same query
self.query_to_peer_map.insert(query_id, *peer_id);
(*peer_id, peer.multiaddr())
})
}

pub fn report_peer(
&mut self,
peer_id: PeerId,
reason: ReputationModifier,
) -> Result<(), PeerManagerError> {
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.update_reputation(reason);
Ok(())
} else {
Err(PeerManagerError::NoSuchPeer(peer_id))
}
}

pub fn report_query(
&mut self,
query_id: QueryId,
reason: ReputationModifier,
) -> Result<(), PeerManagerError> {
if let Some(peer_id) = self.query_to_peer_map.get(&query_id) {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.update_reputation(reason);
Ok(())
} else {
Err(PeerManagerError::NoSuchPeer(*peer_id))
}
} else {
Err(PeerManagerError::NoSuchQuery(query_id))
}
}

pub fn more_peers_needed(&self) -> bool {
// TODO: consider if we should count blocked peers (and in what cases? what if they are
// blocked temporarily?)
self.peers.len() < self.config.target_num_for_peers
}
}
68 changes: 68 additions & 0 deletions crates/papyrus_network/src/peer_manager/peer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// using chrono time and not std since std does not have the ability for std::time::Instance to
// represent the maximum time of the system.
use chrono::{DateTime, Duration, Utc};
use libp2p::{Multiaddr, PeerId};
#[cfg(test)]
use mockall::automock;
use tracing::debug;

use super::ReputationModifier;

#[cfg_attr(test, automock)]
pub trait PeerTrait {
fn update_reputation(&mut self, reason: ReputationModifier);

fn peer_id(&self) -> PeerId;

fn multiaddr(&self) -> Multiaddr;

fn set_timeout_duration(&mut self, duration: Duration);

fn is_blocked(&self) -> bool;
}

#[derive(Clone)]
pub(crate) struct Peer {
peer_id: PeerId,
multiaddr: Multiaddr,
timed_out_until: Option<DateTime<Utc>>,
timeout_duration: Option<Duration>,
}

#[allow(dead_code)]
impl Peer {
pub fn new(peer_id: PeerId, multiaddr: Multiaddr) -> Self {
Self { peer_id, multiaddr, timeout_duration: None, timed_out_until: None }
}
}

impl PeerTrait for Peer {
fn update_reputation(&mut self, _reason: ReputationModifier) {
if let Some(timeout_duration) = self.timeout_duration {
self.timed_out_until =
Utc::now().checked_add_signed(timeout_duration).or(Some(DateTime::<Utc>::MAX_UTC));
return;
}
debug!("Timeout duration not set for peer: {:?}", self.peer_id);
}

fn peer_id(&self) -> PeerId {
self.peer_id
}

fn multiaddr(&self) -> Multiaddr {
self.multiaddr.clone()
}

fn set_timeout_duration(&mut self, duration: Duration) {
self.timeout_duration = Some(duration);
}

fn is_blocked(&self) -> bool {
if let Some(timed_out_until) = self.timed_out_until {
timed_out_until > Utc::now()
} else {
false
}
}
}
Loading

0 comments on commit b529875

Please sign in to comment.