From 056d0ddbfddbd363c1e2af075ea1c1badfec272d Mon Sep 17 00:00:00 2001 From: Arsenii Lyashenko Date: Mon, 9 Sep 2024 17:59:39 +0300 Subject: [PATCH] feat(ethexe): Initial peer scoring system (#4195) --- ethexe/network/src/db_sync/mod.rs | 37 +-- ethexe/network/src/db_sync/ongoing.rs | 23 +- ethexe/network/src/lib.rs | 101 +++++++- ethexe/network/src/peer_score.rs | 340 ++++++++++++++++++++++++++ 4 files changed, 460 insertions(+), 41 deletions(-) create mode 100644 ethexe/network/src/peer_score.rs diff --git a/ethexe/network/src/db_sync/mod.rs b/ethexe/network/src/db_sync/mod.rs index f5d69a806da..66ae4900d60 100644 --- a/ethexe/network/src/db_sync/mod.rs +++ b/ethexe/network/src/db_sync/mod.rs @@ -24,6 +24,7 @@ use crate::{ SendRequestErrorKind, }, export::{Multiaddr, PeerId}, + peer_score, utils::ParityScaleCodec, }; use ethexe_db::Database; @@ -34,14 +35,14 @@ use libp2p::{ request_response, request_response::{InboundFailure, Message, OutboundFailure, ProtocolSupport}, swarm::{ - CloseConnection, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, }, StreamProtocol, }; use parity_scale_codec::{Decode, Encode}; use std::{ - collections::{BTreeMap, BTreeSet, VecDeque}, + collections::{BTreeMap, BTreeSet}, task::{Context, Poll}, time::Duration, }; @@ -260,21 +261,21 @@ type InnerBehaviour = request_response::Behaviour>>, + peer_score_handle: peer_score::Handle, ongoing_requests: OngoingRequests, ongoing_responses: OngoingResponses, } impl Behaviour { /// TODO: use database via traits - pub(crate) fn new(config: Config, db: Database) -> Self { + pub(crate) fn new(config: Config, peer_score_handle: peer_score::Handle, db: Database) -> Self { Self { inner: InnerBehaviour::new( [(STREAM_PROTOCOL, ProtocolSupport::Full)], request_response::Config::default(), ), - pending_events: VecDeque::new(), - ongoing_requests: OngoingRequests::from_config(&config), + peer_score_handle: peer_score_handle.clone(), + ongoing_requests: OngoingRequests::new(&config, peer_score_handle), ongoing_responses: OngoingResponses::new(db, &config), } } @@ -361,13 +362,8 @@ impl Behaviour { log::trace!("outbound failure for request {request_id} to {peer}: {error}"); if let OutboundFailure::UnsupportedProtocols = error { - log::debug!("request to {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol. Disconnecting..."); - - // TODO: remove queue when peer scoring system is introduced - self.pending_events.push_front(ToSwarm::CloseConnection { - peer_id: peer, - connection: CloseConnection::All, - }); + log::debug!("request to {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol"); + self.peer_score_handle.unsupported_protocol(peer); } let event = @@ -400,11 +396,8 @@ impl Behaviour { request_id: _, error: InboundFailure::UnsupportedProtocols, } => { - log::debug!("Request from {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol. Disconnecting..."); - return Poll::Ready(ToSwarm::CloseConnection { - peer_id: peer, - connection: CloseConnection::All, - }); + log::debug!("request from {peer} failed because it doesn't support {STREAM_PROTOCOL} protocol"); + self.peer_score_handle.unsupported_protocol(peer); } request_response::Event::InboundFailure { .. } => {} request_response::Event::ResponseSent { .. } => {} @@ -494,10 +487,6 @@ impl NetworkBehaviour for Behaviour { &mut self, cx: &mut Context<'_>, ) -> Poll>> { - if let Some(event) = self.pending_events.pop_back() { - return Poll::Ready(event); - } - if let Some(request_id) = self.ongoing_requests.remove_if_timeout(cx) { return Poll::Ready(ToSwarm::GenerateEvent(Event::RequestFailed { request_id, @@ -563,7 +552,7 @@ mod tests { async fn new_swarm_with_config(config: Config) -> (Swarm, Database) { let db = Database::from_one(&MemDb::default(), [0; 20]); - let behaviour = Behaviour::new(config, db.clone()); + let behaviour = Behaviour::new(config, peer_score::Handle::new_test(), db.clone()); let mut swarm = Swarm::new_ephemeral(move |_keypair| behaviour); swarm.listen().with_memory_addr_external().await; (swarm, db) diff --git a/ethexe/network/src/db_sync/ongoing.rs b/ethexe/network/src/db_sync/ongoing.rs index e7077c3bd70..cd1e481c784 100644 --- a/ethexe/network/src/db_sync/ongoing.rs +++ b/ethexe/network/src/db_sync/ongoing.rs @@ -19,6 +19,7 @@ use crate::{ db_sync::{Config, InnerBehaviour, Request, RequestId, Response, ResponseId}, export::PeerId, + peer_score::Handle, }; use ethexe_db::{CodesStorage, Database}; use libp2p::{ @@ -72,10 +73,16 @@ pub(crate) struct OngoingRequest { response: Option, tried_peers: HashSet, timeout: Pin>, + peer_score_handle: Handle, } impl OngoingRequest { - pub(crate) fn new(request_id: RequestId, request: Request, timeout: Duration) -> Self { + pub(crate) fn new( + request_id: RequestId, + request: Request, + timeout: Duration, + peer_score_handle: Handle, + ) -> Self { Self { request_id, original_request: request.clone(), @@ -83,6 +90,7 @@ impl OngoingRequest { response: None, tried_peers: HashSet::new(), timeout: Box::pin(time::sleep(timeout)), + peer_score_handle, } } @@ -95,11 +103,11 @@ impl OngoingRequest { }; if response.strip(&self.original_request) { - // TODO: tell scoring system there was excessive data log::debug!( "data stripped in response from {peer} for {:?}", self.request_id ); + self.peer_score_handle.excessive_data(peer); } response @@ -162,10 +170,11 @@ pub(crate) struct OngoingRequests { active_requests: HashMap, max_rounds_per_request: u32, request_timeout: Duration, + peer_score_handle: Handle, } impl OngoingRequests { - pub(crate) fn from_config(config: &Config) -> Self { + pub(crate) fn new(config: &Config, peer_score_handle: Handle) -> Self { Self { connections: Default::default(), request_id_counter: 0, @@ -173,6 +182,7 @@ impl OngoingRequests { active_requests: Default::default(), max_rounds_per_request: config.max_rounds_per_request, request_timeout: config.request_timeout, + peer_score_handle, } } @@ -210,7 +220,12 @@ impl OngoingRequests { pub(crate) fn push_pending_request(&mut self, request: Request) -> RequestId { let request_id = self.next_request_id(); - let ongoing_request = OngoingRequest::new(request_id, request, self.request_timeout); + let ongoing_request = OngoingRequest::new( + request_id, + request, + self.request_timeout, + self.peer_score_handle.clone(), + ); self.pending_requests.push_front(ongoing_request); request_id } diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index fcb2855c227..ae71cc5e48b 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -18,6 +18,7 @@ mod custom_connection_limits; pub mod db_sync; +pub mod peer_score; mod utils; pub mod export { @@ -150,6 +151,7 @@ pub enum NetworkReceiverEvent { data: Vec, }, DbResponse(Result), + PeerBlocked(PeerId), } pub struct NetworkReceiver { @@ -249,8 +251,12 @@ impl NetworkEventLoop { Ok(identity::Keypair::from(pair)) } - pub fn local_peer_id(&self) -> &PeerId { - self.swarm.local_peer_id() + pub fn local_peer_id(&self) -> PeerId { + *self.swarm.local_peer_id() + } + + pub fn score_handle(&self) -> peer_score::Handle { + self.swarm.behaviour().peer_score.handle() } fn create_swarm( @@ -318,26 +324,40 @@ impl NetworkEventLoop { // BehaviourEvent::ConnectionLimits(void) => void::unreachable(void), // + BehaviourEvent::PeerScore(peer_score::Event::PeerBlocked { + peer_id, + last_reason: _, + }) => { + let _res = self + .external_tx + .send(NetworkReceiverEvent::PeerBlocked(peer_id)); + } + BehaviourEvent::PeerScore(_) => {} + // BehaviourEvent::Ping(ping::Event { peer, connection: _, result, }) => { if let Err(e) = result { - log::debug!("Ping to {peer} failed: {e}. Disconnecting..."); + log::debug!("ping to {peer} failed: {e}. Disconnecting..."); let _res = self.swarm.disconnect_peer_id(peer); } } // BehaviourEvent::Identify(identify::Event::Received { peer_id, info, .. }) => { + let behaviour = self.swarm.behaviour_mut(); + if info.protocol_version != PROTOCOL_VERSION || info.agent_version != AGENT_VERSION { - log::debug!("{peer_id} is not supported with `{}` protocol and `{}` agent. Disconnecting...", info.protocol_version, info.agent_version); - let _res = self.swarm.disconnect_peer_id(peer_id); + log::debug!( + "{peer_id} is not supported with `{}` protocol and `{}` agent", + info.protocol_version, + info.agent_version + ); + behaviour.peer_score.handle().unsupported_protocol(peer_id); } - let behaviour = self.swarm.behaviour_mut(); - // add listen addresses of new peers to KadDHT // according to `identify` and `kad` protocols docs for listen_addr in info.listen_addrs { @@ -345,8 +365,12 @@ impl NetworkEventLoop { } } BehaviourEvent::Identify(identify::Event::Error { peer_id, error, .. }) => { - log::debug!("{peer_id} is not identified: {error}. Disconnecting..."); - let _res = self.swarm.disconnect_peer_id(peer_id); + log::debug!("{peer_id} is not identified: {error}"); + self.swarm + .behaviour() + .peer_score + .handle() + .unsupported_protocol(peer_id); } BehaviourEvent::Identify(_) => {} // @@ -395,8 +419,12 @@ impl NetworkEventLoop { .send(NetworkReceiverEvent::Message { source, data }); } BehaviourEvent::Gossipsub(gossipsub::Event::GossipsubNotSupported { peer_id }) => { - log::debug!("`gossipsub` protocol is not supported. Disconnecting..."); - let _res = self.swarm.disconnect_peer_id(peer_id); + log::debug!("`gossipsub` protocol is not supported"); + self.swarm + .behaviour() + .peer_score + .handle() + .unsupported_protocol(peer_id); } BehaviourEvent::Gossipsub(_) => {} // @@ -445,6 +473,8 @@ pub(crate) struct Behaviour { pub custom_connection_limits: custom_connection_limits::Behaviour, // limit connections pub connection_limits: connection_limits::Behaviour, + // peer scoring system + pub peer_score: peer_score::Behaviour, // fast peer liveliness check pub ping: ping::Behaviour, // friend or foe system @@ -486,6 +516,9 @@ impl Behaviour { .with_max_established_incoming(Some(MAX_ESTABLISHED_INCOMING_CONNECTIONS)); let connection_limits = connection_limits::Behaviour::new(connection_limits); + let peer_score = peer_score::Behaviour::new(peer_score::Config::default()); + let peer_score_handle = peer_score.handle(); + let ping = ping::Behaviour::default(); let identify_config = identify::Config::new(PROTOCOL_VERSION.to_string(), keypair.public()) @@ -520,11 +553,12 @@ impl Behaviour { gossipsub.subscribe(&gpu_commitments_topic())?; - let db_sync = db_sync::Behaviour::new(db_sync::Config::default(), db); + let db_sync = db_sync::Behaviour::new(db_sync::Config::default(), peer_score_handle, db); Ok(Self { custom_connection_limits, connection_limits, + peer_score, ping, identify, mdns4, @@ -573,7 +607,6 @@ mod tests { config2.bootstrap_addresses = [multiaddr].into(); let service2 = NetworkService::new(config2.clone(), &signer2, db).unwrap(); - tokio::spawn(service2.event_loop.run()); // Wait for the connection to be established @@ -652,4 +685,46 @@ mod tests { ))) ); } + + #[tokio::test] + async fn peer_blocked_by_score() { + init_logger(); + + let tmp_dir1 = tempfile::tempdir().unwrap(); + let config = NetworkEventLoopConfig::new_memory(tmp_dir1.path().to_path_buf(), "/memory/5"); + let signer1 = ethexe_signer::Signer::new(tmp_dir1.path().join("key")).unwrap(); + let db = Database::from_one(&MemDb::default(), [0; 20]); + let mut service1 = NetworkService::new(config.clone(), &signer1, db).unwrap(); + + let peer_id = service1.event_loop.local_peer_id(); + let multiaddr: Multiaddr = format!("/memory/3/p2p/{peer_id}").parse().unwrap(); + + let peer_score_handle = service1.event_loop.score_handle(); + + tokio::spawn(service1.event_loop.run()); + + // second service + let tmp_dir2 = tempfile::tempdir().unwrap(); + let signer2 = ethexe_signer::Signer::new(tmp_dir2.path().join("key")).unwrap(); + let mut config2 = + NetworkEventLoopConfig::new_memory(tmp_dir2.path().to_path_buf(), "/memory/6"); + config2.bootstrap_addresses = [multiaddr].into(); + let db = Database::from_one(&MemDb::default(), [0; 20]); + let service2 = NetworkService::new(config2.clone(), &signer2, db).unwrap(); + + let service2_peer_id = service2.event_loop.local_peer_id(); + + tokio::spawn(service2.event_loop.run()); + + // Wait for the connection to be established + tokio::time::sleep(Duration::from_secs(1)).await; + + peer_score_handle.unsupported_protocol(service2_peer_id); + + let event = service1.receiver.recv().await; + assert_eq!( + event, + Some(NetworkReceiverEvent::PeerBlocked(service2_peer_id)) + ); + } } diff --git a/ethexe/network/src/peer_score.rs b/ethexe/network/src/peer_score.rs new file mode 100644 index 00000000000..14bea14d156 --- /dev/null +++ b/ethexe/network/src/peer_score.rs @@ -0,0 +1,340 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::export::{Multiaddr, PeerId}; +use libp2p::{ + allow_block_list, + core::{transport::PortUse, Endpoint}, + swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, + }, +}; +use std::{ + collections::HashMap, + task::{Context, Poll}, +}; +use tokio::sync::mpsc; + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub(crate) enum ScoreChangedReason { + UnsupportedProtocol, + ExcessiveData, +} + +impl ScoreChangedReason { + fn to_u8(self, config: &Config) -> u8 { + match self { + ScoreChangedReason::UnsupportedProtocol => config.unsupported_protocol, + ScoreChangedReason::ExcessiveData => config.excessive_data, + } + } +} + +/// Handle to report peer actions +#[derive(Debug, Clone)] +pub struct Handle(mpsc::UnboundedSender<(PeerId, ScoreChangedReason)>); + +impl Handle { + #[cfg(test)] + pub fn new_test() -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + std::mem::forget(rx); + Self(tx) + } + + pub fn unsupported_protocol(&self, peer_id: PeerId) { + let _res = self + .0 + .send((peer_id, ScoreChangedReason::UnsupportedProtocol)); + } + + pub fn excessive_data(&self, peer_id: PeerId) { + let _res = self.0.send((peer_id, ScoreChangedReason::ExcessiveData)); + } +} + +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum Event { + /// Peer got so low score it's blocked now + PeerBlocked { + /// Peer we blocked + peer_id: PeerId, + /// The last reason changed peer score + last_reason: ScoreChangedReason, + }, + /// Peer score has been changed + ScoreChanged { + /// Peer whose score has been changed + peer_id: PeerId, + /// Reason why score is changed + reason: ScoreChangedReason, + /// Current peer score + score: u8, + }, +} + +/// Behaviour config. +/// +/// All values represented by number that will be subtracted from peer score. +pub(crate) struct Config { + unsupported_protocol: u8, + excessive_data: u8, +} + +impl Default for Config { + fn default() -> Self { + Self { + unsupported_protocol: u8::MAX, + excessive_data: u8::MAX, + } + } +} + +#[cfg(test)] // used only in tests yet +impl Config { + #[allow(dead_code)] // not used anywhere yet + pub(crate) fn with_unsupported_protocol(mut self, value: u8) -> Self { + self.unsupported_protocol = value; + self + } + + pub(crate) fn with_excessive_data(mut self, value: u8) -> Self { + self.excessive_data = value; + self + } +} + +type BlockListBehaviour = allow_block_list::Behaviour; + +pub(crate) struct Behaviour { + config: Config, + block_list: BlockListBehaviour, + handle: Handle, + rx: mpsc::UnboundedReceiver<(PeerId, ScoreChangedReason)>, + score: HashMap, +} + +impl Behaviour { + pub(crate) fn new(config: Config) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + let handle = Handle(tx); + Self { + config, + block_list: BlockListBehaviour::default(), + handle, + rx, + score: HashMap::new(), + } + } + + pub(crate) fn handle(&self) -> Handle { + self.handle.clone() + } + + fn on_score_event(&mut self, peer_id: PeerId, reason: ScoreChangedReason) -> Event { + let peer_score = self.score.entry(peer_id).or_insert(u8::MAX); + *peer_score = peer_score.saturating_sub(reason.to_u8(&self.config)); + + if *peer_score == u8::MIN { + let removed = self.score.remove(&peer_id); + debug_assert!(removed.is_some()); + self.block_list.block_peer(peer_id); + + Event::PeerBlocked { + peer_id, + last_reason: reason, + } + } else { + Event::ScoreChanged { + peer_id, + reason, + score: *peer_score, + } + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = THandler; + type ToSwarm = Event; + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.block_list + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.block_list.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + self.block_list.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + port_use: PortUse, + ) -> Result, ConnectionDenied> { + self.block_list.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + self.block_list.on_swarm_event(event); + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + self.block_list + .on_connection_handler_event(peer_id, connection_id, event); + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Poll::Ready(to_swarm) = self.block_list.poll(cx) { + return Poll::Ready(to_swarm.map_out(|void| void::unreachable(void))); + } + + if let Poll::Ready(Some((peer_id, reason))) = self.rx.poll_recv(cx) { + return Poll::Ready(ToSwarm::GenerateEvent(self.on_score_event(peer_id, reason))); + } + + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use libp2p::{swarm::SwarmEvent, Swarm}; + use libp2p_swarm_test::SwarmExt; + + async fn new_swarm_with_config(config: Config) -> Swarm { + let mut swarm = Swarm::new_ephemeral(|_keypair| Behaviour::new(config)); + swarm.listen().with_memory_addr_external().await; + swarm + } + + async fn new_swarm() -> Swarm { + new_swarm_with_config(Config::default()).await + } + + #[tokio::test] + async fn peer_blocked() { + const EXCESSIVE_DATA_ABS_DIFF: u8 = u8::MAX / 3; + + let alice_config = Config::default().with_excessive_data(EXCESSIVE_DATA_ABS_DIFF); + let mut alice = new_swarm_with_config(alice_config).await; + let mut chad = new_swarm().await; + let alice_peer_id = *alice.local_peer_id(); + let chad_peer_id = *chad.local_peer_id(); + alice.connect(&mut chad).await; + + let handle = alice.behaviour_mut().handle(); + handle.excessive_data(chad_peer_id); + + let event = alice.next_behaviour_event().await; + assert_eq!( + event, + Event::ScoreChanged { + peer_id: chad_peer_id, + reason: ScoreChangedReason::ExcessiveData, + score: u8::MAX - EXCESSIVE_DATA_ABS_DIFF, + } + ); + + handle.excessive_data(chad_peer_id); + + let event = alice.next_behaviour_event().await; + assert_eq!( + event, + Event::ScoreChanged { + peer_id: chad_peer_id, + reason: ScoreChangedReason::ExcessiveData, + score: u8::MAX - 2 * EXCESSIVE_DATA_ABS_DIFF, + } + ); + + handle.excessive_data(chad_peer_id); + + let event = alice.next_behaviour_event().await; + assert_eq!( + event, + Event::PeerBlocked { + peer_id: chad_peer_id, + last_reason: ScoreChangedReason::ExcessiveData + } + ); + + let event = chad.next_swarm_event().await; + assert!( + matches!( + event, + SwarmEvent::ConnectionClosed { + peer_id, + num_established: 0, + .. + } if peer_id == alice_peer_id + ), + "{event:?}" + ); + } +}