Skip to content

Commit

Permalink
deps: drop async-trait
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed Dec 29, 2023
1 parent 9a24e9f commit 5b0377d
Show file tree
Hide file tree
Showing 22 changed files with 233 additions and 253 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ documentation = "https://docs.rs/pea2pea"
readme = "README.md"
categories = ["network-programming", "asynchronous"]
keywords = ["p2p", "peer-to-peer", "networking"]
rust-version = "1.70"
rust-version = "1.75"

[badges]
maintenance = { status = "actively-developed" }
Expand All @@ -22,7 +22,6 @@ crate-type = ["lib"]
test = []

[dependencies]
async-trait = "0.1"
bytes = "1"
futures-util = { version = "0.3", features = ["sink"] }
parking_lot = "0.12"
Expand Down
1 change: 0 additions & 1 deletion examples/dining_philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ impl Encoder<Message> for Codec {
}
}

#[async_trait::async_trait]
impl Reading for Philosopher {
type Message = Message;
type Codec = Codec;
Expand Down
2 changes: 0 additions & 2 deletions examples/fixed_length_crusaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ impl Pea2Pea for JoJoNode {
}
}

#[async_trait::async_trait]
impl Handshake for JoJoNode {
const TIMEOUT_MS: u64 = 10_000;

Expand Down Expand Up @@ -87,7 +86,6 @@ impl Encoder<BattleCry> for SingleByteCodec {
}
}

#[async_trait::async_trait]
impl Reading for JoJoNode {
type Message = BattleCry;
type Codec = SingleByteCodec;
Expand Down
3 changes: 0 additions & 3 deletions examples/hapsburgs_plan_b.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl Pea2Pea for NakedNode {
}
}

#[async_trait::async_trait]
impl Handshake for NakedNode {
async fn perform_handshake(&self, conn: Connection) -> io::Result<Connection> {
if self.node().name() == "Drebin" {
Expand All @@ -45,7 +44,6 @@ impl Handshake for NakedNode {
}
}

#[async_trait::async_trait]
impl Reading for NakedNode {
type Message = String;
type Codec = common::TestCodec<Self::Message>;
Expand Down Expand Up @@ -84,7 +82,6 @@ impl Writing for NakedNode {
}
}

#[async_trait::async_trait]
impl OnDisconnect for NakedNode {
async fn on_disconnect(&self, _addr: SocketAddr) {
if self.node().name() == "Drebin" {
Expand Down
2 changes: 0 additions & 2 deletions examples/hot_potato_game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl Pea2Pea for Player {
}
}

#[async_trait::async_trait]
impl Handshake for Player {
async fn perform_handshake(&self, mut conn: Connection) -> io::Result<Connection> {
let mut buffer = [0u8; 16];
Expand Down Expand Up @@ -144,7 +143,6 @@ impl Decoder for common::TestCodec<Message> {
}
}

#[async_trait::async_trait]
impl Reading for Player {
type Message = Message;
type Codec = common::TestCodec<Self::Message>;
Expand Down
3 changes: 0 additions & 3 deletions examples/libp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ pub struct NoiseHandshakePayload {
pub data: ::prost::alloc::vec::Vec<u8>,
}

#[async_trait::async_trait]
impl Handshake for Libp2pNode {
const TIMEOUT_MS: u64 = 5_000;

Expand Down Expand Up @@ -344,7 +343,6 @@ macro_rules! get_streams_mut {
};
}

#[async_trait::async_trait]
impl Reading for Libp2pNode {
type Message = yamux::Frame;
type Codec = Codec;
Expand Down Expand Up @@ -466,7 +464,6 @@ impl Writing for Libp2pNode {
}
}

#[async_trait::async_trait]
impl OnDisconnect for Libp2pNode {
async fn on_disconnect(&self, addr: SocketAddr) {
self.peer_states.write().remove(&addr);
Expand Down
2 changes: 0 additions & 2 deletions examples/noise_handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ impl SecureNode {
}
}

#[async_trait::async_trait]
impl Handshake for SecureNode {
async fn perform_handshake(&self, mut conn: Connection) -> io::Result<Connection> {
// create the noise objects
Expand All @@ -64,7 +63,6 @@ impl Handshake for SecureNode {
}
}

#[async_trait::async_trait]
impl Reading for SecureNode {
type Message = BytesMut;
type Codec = noise::Codec;
Expand Down
1 change: 0 additions & 1 deletion examples/rate_limiting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ impl Pea2Pea for GenericNode {
}
}

#[async_trait::async_trait]
impl Reading for GenericNode {
type Message = BytesMut;
type Codec = common::TestCodec<Self::Message>;
Expand Down
1 change: 0 additions & 1 deletion examples/telephone_game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ impl Pea2Pea for Player {

const NUM_PLAYERS: usize = 100;

#[async_trait::async_trait]
impl Reading for Player {
type Message = String;
type Codec = common::TestCodec<Self::Message>;
Expand Down
2 changes: 0 additions & 2 deletions examples/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl Pea2Pea for TlsNode {
}
}

#[async_trait::async_trait]
impl Handshake for TlsNode {
async fn perform_handshake(&self, mut conn: Connection) -> io::Result<Connection> {
let node_conn_side = !conn.side();
Expand All @@ -93,7 +92,6 @@ impl Handshake for TlsNode {
}
}

#[async_trait::async_trait]
impl Reading for TlsNode {
type Message = BytesMut;
type Codec = BytesCodec;
Expand Down
122 changes: 63 additions & 59 deletions src/protocols/handshake.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io, time::Duration};
use std::{future::Future, io, time::Duration};

use tokio::{
io::{split, AsyncRead, AsyncWrite},
Expand All @@ -17,7 +17,6 @@ use crate::{
/// Can be used to specify and enable network handshakes. Upon establishing a connection, both sides will
/// need to adhere to the specified handshake rules in order to finalize the connection and be able to send
/// or receive any messages.
#[async_trait::async_trait]
pub trait Handshake: Pea2Pea
where
Self: Clone + Send + Sync + 'static,
Expand All @@ -28,74 +27,79 @@ where
const TIMEOUT_MS: u64 = 3_000;

/// Prepares the node to perform specified network handshakes.
async fn enable_handshake(&self) {
let (from_node_sender, mut from_node_receiver) =
mpsc::unbounded_channel::<ReturnableConnection>();
fn enable_handshake(&self) -> impl Future<Output = ()> + Send {
async {
let (from_node_sender, mut from_node_receiver) =
mpsc::unbounded_channel::<ReturnableConnection>();

// use a channel to know when the handshake task is ready
let (tx, rx) = oneshot::channel();
// use a channel to know when the handshake task is ready
let (tx, rx) = oneshot::channel();

// spawn a background task dedicated to handling the handshakes
let self_clone = self.clone();
let handshake_task = tokio::spawn(async move {
trace!(parent: self_clone.node().span(), "spawned the Handshake handler task");
if tx.send(()).is_err() {
error!(parent: self_clone.node().span(), "Handshake handler creation interrupted! shutting down the node");
self_clone.node().shut_down().await;
return;
}
// spawn a background task dedicated to handling the handshakes
let self_clone = self.clone();
let handshake_task = tokio::spawn(async move {
trace!(parent: self_clone.node().span(), "spawned the Handshake handler task");
if tx.send(()).is_err() {
error!(parent: self_clone.node().span(), "Handshake handler creation interrupted! shutting down the node");
self_clone.node().shut_down().await;
return;
}

while let Some((conn, result_sender)) = from_node_receiver.recv().await {
let addr = conn.addr();
while let Some((conn, result_sender)) = from_node_receiver.recv().await {
let addr = conn.addr();

let node = self_clone.clone();
tokio::spawn(async move {
debug!(parent: node.node().span(), "shaking hands with {} as the {:?}", addr, !conn.side());
let result = timeout(
Duration::from_millis(Self::TIMEOUT_MS),
node.perform_handshake(conn),
)
.await;
let node = self_clone.clone();
tokio::spawn(async move {
debug!(parent: node.node().span(), "shaking hands with {} as the {:?}", addr, !conn.side());
let result = timeout(
Duration::from_millis(Self::TIMEOUT_MS),
node.perform_handshake(conn),
)
.await;

let ret = match result {
Ok(Ok(conn)) => {
debug!(parent: node.node().span(), "successfully handshaken with {}", addr);
Ok(conn)
}
Ok(Err(e)) => {
error!(parent: node.node().span(), "handshake with {} failed: {}", addr, e);
Err(e)
}
Err(_) => {
error!(parent: node.node().span(), "handshake with {} timed out", addr);
Err(io::ErrorKind::TimedOut.into())
}
};
let ret = match result {
Ok(Ok(conn)) => {
debug!(parent: node.node().span(), "successfully handshaken with {}", addr);
Ok(conn)
}
Ok(Err(e)) => {
error!(parent: node.node().span(), "handshake with {} failed: {}", addr, e);
Err(e)
}
Err(_) => {
error!(parent: node.node().span(), "handshake with {} timed out", addr);
Err(io::ErrorKind::TimedOut.into())
}
};

// return the Connection to the Node, resuming Node::adapt_stream
if result_sender.send(ret).is_err() {
error!(parent: node.node().span(), "couldn't return a Connection with {} from the Handshake handler", addr);
}
});
}
});
let _ = rx.await;
self.node()
.tasks
.lock()
.insert(NodeTask::Handshake, handshake_task);
// return the Connection to the Node, resuming Node::adapt_stream
if result_sender.send(ret).is_err() {
error!(parent: node.node().span(), "couldn't return a Connection with {} from the Handshake handler", addr);
}
});
}
});
let _ = rx.await;
self.node()
.tasks
.lock()
.insert(NodeTask::Handshake, handshake_task);

// register the Handshake handler with the Node
let hdl = ProtocolHandler(from_node_sender);
assert!(
self.node().protocols.handshake.set(hdl).is_ok(),
"the Handshake protocol was enabled more than once!"
);
// register the Handshake handler with the Node
let hdl = ProtocolHandler(from_node_sender);
assert!(
self.node().protocols.handshake.set(hdl).is_ok(),
"the Handshake protocol was enabled more than once!"
);
}
}

/// Performs the handshake; temporarily assumes control of the [`Connection`] and returns it if the handshake is
/// successful.
async fn perform_handshake(&self, conn: Connection) -> io::Result<Connection>;
fn perform_handshake(
&self,
conn: Connection,
) -> impl Future<Output = io::Result<Connection>> + Send;

/// Borrows the full connection stream to be used in the implementation of [`Handshake::perform_handshake`].
fn borrow_stream<'a>(&self, conn: &'a mut Connection) -> &'a mut TcpStream {
Expand Down
Loading

0 comments on commit 5b0377d

Please sign in to comment.