Skip to content

Commit

Permalink
feat(ethexe): Initial peer scoring system (#4195)
Browse files Browse the repository at this point in the history
  • Loading branch information
ark0f authored Sep 9, 2024
1 parent 17a485e commit 056d0dd
Show file tree
Hide file tree
Showing 4 changed files with 460 additions and 41 deletions.
37 changes: 13 additions & 24 deletions ethexe/network/src/db_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
SendRequestErrorKind,
},
export::{Multiaddr, PeerId},
peer_score,
utils::ParityScaleCodec,
};
use ethexe_db::Database;
Expand All @@ -34,14 +35,14 @@ use libp2p::{
request_response,
request_response::{InboundFailure, Message, OutboundFailure, ProtocolSupport},
swarm::{
CloseConnection, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
},
StreamProtocol,
};
use parity_scale_codec::{Decode, Encode};
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
collections::{BTreeMap, BTreeSet},
task::{Context, Poll},
time::Duration,
};
Expand Down Expand Up @@ -260,21 +261,21 @@ type InnerBehaviour = request_response::Behaviour<ParityScaleCodec<Request, Resp

pub(crate) struct Behaviour {
inner: InnerBehaviour,
pending_events: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
peer_score_handle: peer_score::Handle,
ongoing_requests: OngoingRequests,
ongoing_responses: OngoingResponses,
}

impl Behaviour {
/// TODO: use database via traits
pub(crate) fn new(config: Config, db: Database) -> Self {
pub(crate) fn new(config: Config, peer_score_handle: peer_score::Handle, db: Database) -> Self {
Self {
inner: InnerBehaviour::new(
[(STREAM_PROTOCOL, ProtocolSupport::Full)],
request_response::Config::default(),
),
pending_events: VecDeque::new(),
ongoing_requests: OngoingRequests::from_config(&config),
peer_score_handle: peer_score_handle.clone(),
ongoing_requests: OngoingRequests::new(&config, peer_score_handle),
ongoing_responses: OngoingResponses::new(db, &config),
}
}
Expand Down Expand Up @@ -361,13 +362,8 @@ impl Behaviour {
log::trace!("outbound failure for request {request_id} to {peer}: {error}");

if let OutboundFailure::UnsupportedProtocols = error {
log::debug!("request to {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol. Disconnecting...");

// TODO: remove queue when peer scoring system is introduced
self.pending_events.push_front(ToSwarm::CloseConnection {
peer_id: peer,
connection: CloseConnection::All,
});
log::debug!("request to {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol");
self.peer_score_handle.unsupported_protocol(peer);
}

let event =
Expand Down Expand Up @@ -400,11 +396,8 @@ impl Behaviour {
request_id: _,
error: InboundFailure::UnsupportedProtocols,
} => {
log::debug!("Request from {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol. Disconnecting...");
return Poll::Ready(ToSwarm::CloseConnection {
peer_id: peer,
connection: CloseConnection::All,
});
log::debug!("request from {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol");
self.peer_score_handle.unsupported_protocol(peer);
}
request_response::Event::InboundFailure { .. } => {}
request_response::Event::ResponseSent { .. } => {}
Expand Down Expand Up @@ -494,10 +487,6 @@ impl NetworkBehaviour for Behaviour {
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.pending_events.pop_back() {
return Poll::Ready(event);
}

if let Some(request_id) = self.ongoing_requests.remove_if_timeout(cx) {
return Poll::Ready(ToSwarm::GenerateEvent(Event::RequestFailed {
request_id,
Expand Down Expand Up @@ -563,7 +552,7 @@ mod tests {

async fn new_swarm_with_config(config: Config) -> (Swarm<Behaviour>, Database) {
let db = Database::from_one(&MemDb::default(), [0; 20]);
let behaviour = Behaviour::new(config, db.clone());
let behaviour = Behaviour::new(config, peer_score::Handle::new_test(), db.clone());
let mut swarm = Swarm::new_ephemeral(move |_keypair| behaviour);
swarm.listen().with_memory_addr_external().await;
(swarm, db)
Expand Down
23 changes: 19 additions & 4 deletions ethexe/network/src/db_sync/ongoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::{
db_sync::{Config, InnerBehaviour, Request, RequestId, Response, ResponseId},
export::PeerId,
peer_score::Handle,
};
use ethexe_db::{CodesStorage, Database};
use libp2p::{
Expand Down Expand Up @@ -72,17 +73,24 @@ pub(crate) struct OngoingRequest {
response: Option<Response>,
tried_peers: HashSet<PeerId>,
timeout: Pin<Box<Sleep>>,
peer_score_handle: Handle,
}

impl OngoingRequest {
pub(crate) fn new(request_id: RequestId, request: Request, timeout: Duration) -> Self {
pub(crate) fn new(
request_id: RequestId,
request: Request,
timeout: Duration,
peer_score_handle: Handle,
) -> Self {
Self {
request_id,
original_request: request.clone(),
request,
response: None,
tried_peers: HashSet::new(),
timeout: Box::pin(time::sleep(timeout)),
peer_score_handle,
}
}

Expand All @@ -95,11 +103,11 @@ impl OngoingRequest {
};

if response.strip(&self.original_request) {
// TODO: tell scoring system there was excessive data
log::debug!(
"data stripped in response from {peer} for {:?}",
self.request_id
);
self.peer_score_handle.excessive_data(peer);
}

response
Expand Down Expand Up @@ -162,17 +170,19 @@ pub(crate) struct OngoingRequests {
active_requests: HashMap<OutboundRequestId, OngoingRequest>,
max_rounds_per_request: u32,
request_timeout: Duration,
peer_score_handle: Handle,
}

impl OngoingRequests {
pub(crate) fn from_config(config: &Config) -> Self {
pub(crate) fn new(config: &Config, peer_score_handle: Handle) -> Self {
Self {
connections: Default::default(),
request_id_counter: 0,
pending_requests: Default::default(),
active_requests: Default::default(),
max_rounds_per_request: config.max_rounds_per_request,
request_timeout: config.request_timeout,
peer_score_handle,
}
}

Expand Down Expand Up @@ -210,7 +220,12 @@ impl OngoingRequests {

pub(crate) fn push_pending_request(&mut self, request: Request) -> RequestId {
let request_id = self.next_request_id();
let ongoing_request = OngoingRequest::new(request_id, request, self.request_timeout);
let ongoing_request = OngoingRequest::new(
request_id,
request,
self.request_timeout,
self.peer_score_handle.clone(),
);
self.pending_requests.push_front(ongoing_request);
request_id
}
Expand Down
101 changes: 88 additions & 13 deletions ethexe/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

mod custom_connection_limits;
pub mod db_sync;
pub mod peer_score;
mod utils;

pub mod export {
Expand Down Expand Up @@ -150,6 +151,7 @@ pub enum NetworkReceiverEvent {
data: Vec<u8>,
},
DbResponse(Result<db_sync::Response, db_sync::RequestFailure>),
PeerBlocked(PeerId),
}

pub struct NetworkReceiver {
Expand Down Expand Up @@ -249,8 +251,12 @@ impl NetworkEventLoop {
Ok(identity::Keypair::from(pair))
}

pub fn local_peer_id(&self) -> &PeerId {
self.swarm.local_peer_id()
pub fn local_peer_id(&self) -> PeerId {
*self.swarm.local_peer_id()
}

pub fn score_handle(&self) -> peer_score::Handle {
self.swarm.behaviour().peer_score.handle()
}

fn create_swarm(
Expand Down Expand Up @@ -318,35 +324,53 @@ impl NetworkEventLoop {
//
BehaviourEvent::ConnectionLimits(void) => void::unreachable(void),
//
BehaviourEvent::PeerScore(peer_score::Event::PeerBlocked {
peer_id,
last_reason: _,
}) => {
let _res = self
.external_tx
.send(NetworkReceiverEvent::PeerBlocked(peer_id));
}
BehaviourEvent::PeerScore(_) => {}
//
BehaviourEvent::Ping(ping::Event {
peer,
connection: _,
result,
}) => {
if let Err(e) = result {
log::debug!("Ping to {peer} failed: {e}. Disconnecting...");
log::debug!("ping to {peer} failed: {e}. Disconnecting...");
let _res = self.swarm.disconnect_peer_id(peer);
}
}
//
BehaviourEvent::Identify(identify::Event::Received { peer_id, info, .. }) => {
let behaviour = self.swarm.behaviour_mut();

if info.protocol_version != PROTOCOL_VERSION || info.agent_version != AGENT_VERSION
{
log::debug!("{peer_id} is not supported with `{}` protocol and `{}` agent. Disconnecting...", info.protocol_version, info.agent_version);
let _res = self.swarm.disconnect_peer_id(peer_id);
log::debug!(
"{peer_id} is not supported with `{}` protocol and `{}` agent",
info.protocol_version,
info.agent_version
);
behaviour.peer_score.handle().unsupported_protocol(peer_id);
}

let behaviour = self.swarm.behaviour_mut();

// add listen addresses of new peers to KadDHT
// according to `identify` and `kad` protocols docs
for listen_addr in info.listen_addrs {
behaviour.kad.add_address(&peer_id, listen_addr);
}
}
BehaviourEvent::Identify(identify::Event::Error { peer_id, error, .. }) => {
log::debug!("{peer_id} is not identified: {error}. Disconnecting...");
let _res = self.swarm.disconnect_peer_id(peer_id);
log::debug!("{peer_id} is not identified: {error}");
self.swarm
.behaviour()
.peer_score
.handle()
.unsupported_protocol(peer_id);
}
BehaviourEvent::Identify(_) => {}
//
Expand Down Expand Up @@ -395,8 +419,12 @@ impl NetworkEventLoop {
.send(NetworkReceiverEvent::Message { source, data });
}
BehaviourEvent::Gossipsub(gossipsub::Event::GossipsubNotSupported { peer_id }) => {
log::debug!("`gossipsub` protocol is not supported. Disconnecting...");
let _res = self.swarm.disconnect_peer_id(peer_id);
log::debug!("`gossipsub` protocol is not supported");
self.swarm
.behaviour()
.peer_score
.handle()
.unsupported_protocol(peer_id);
}
BehaviourEvent::Gossipsub(_) => {}
//
Expand Down Expand Up @@ -445,6 +473,8 @@ pub(crate) struct Behaviour {
pub custom_connection_limits: custom_connection_limits::Behaviour,
// limit connections
pub connection_limits: connection_limits::Behaviour,
// peer scoring system
pub peer_score: peer_score::Behaviour,
// fast peer liveliness check
pub ping: ping::Behaviour,
// friend or foe system
Expand Down Expand Up @@ -486,6 +516,9 @@ impl Behaviour {
.with_max_established_incoming(Some(MAX_ESTABLISHED_INCOMING_CONNECTIONS));
let connection_limits = connection_limits::Behaviour::new(connection_limits);

let peer_score = peer_score::Behaviour::new(peer_score::Config::default());
let peer_score_handle = peer_score.handle();

let ping = ping::Behaviour::default();

let identify_config = identify::Config::new(PROTOCOL_VERSION.to_string(), keypair.public())
Expand Down Expand Up @@ -520,11 +553,12 @@ impl Behaviour {

gossipsub.subscribe(&gpu_commitments_topic())?;

let db_sync = db_sync::Behaviour::new(db_sync::Config::default(), db);
let db_sync = db_sync::Behaviour::new(db_sync::Config::default(), peer_score_handle, db);

Ok(Self {
custom_connection_limits,
connection_limits,
peer_score,
ping,
identify,
mdns4,
Expand Down Expand Up @@ -573,7 +607,6 @@ mod tests {
config2.bootstrap_addresses = [multiaddr].into();

let service2 = NetworkService::new(config2.clone(), &signer2, db).unwrap();

tokio::spawn(service2.event_loop.run());

// Wait for the connection to be established
Expand Down Expand Up @@ -652,4 +685,46 @@ mod tests {
)))
);
}

#[tokio::test]
async fn peer_blocked_by_score() {
init_logger();

let tmp_dir1 = tempfile::tempdir().unwrap();
let config = NetworkEventLoopConfig::new_memory(tmp_dir1.path().to_path_buf(), "/memory/5");
let signer1 = ethexe_signer::Signer::new(tmp_dir1.path().join("key")).unwrap();
let db = Database::from_one(&MemDb::default(), [0; 20]);
let mut service1 = NetworkService::new(config.clone(), &signer1, db).unwrap();

let peer_id = service1.event_loop.local_peer_id();
let multiaddr: Multiaddr = format!("/memory/3/p2p/{peer_id}").parse().unwrap();

let peer_score_handle = service1.event_loop.score_handle();

tokio::spawn(service1.event_loop.run());

// second service
let tmp_dir2 = tempfile::tempdir().unwrap();
let signer2 = ethexe_signer::Signer::new(tmp_dir2.path().join("key")).unwrap();
let mut config2 =
NetworkEventLoopConfig::new_memory(tmp_dir2.path().to_path_buf(), "/memory/6");
config2.bootstrap_addresses = [multiaddr].into();
let db = Database::from_one(&MemDb::default(), [0; 20]);
let service2 = NetworkService::new(config2.clone(), &signer2, db).unwrap();

let service2_peer_id = service2.event_loop.local_peer_id();

tokio::spawn(service2.event_loop.run());

// Wait for the connection to be established
tokio::time::sleep(Duration::from_secs(1)).await;

peer_score_handle.unsupported_protocol(service2_peer_id);

let event = service1.receiver.recv().await;
assert_eq!(
event,
Some(NetworkReceiverEvent::PeerBlocked(service2_peer_id))
);
}
}
Loading

0 comments on commit 056d0dd

Please sign in to comment.