Skip to content

Commit

Permalink
refactor!: move Endpoint::send_* to Connection
Browse files Browse the repository at this point in the history
`Endpoint::send_message[_with]` methods would open a connection if one
doesn't exist, then send a message (over a unidirectional stream). These
methods have been removed from `Endpoint`, and corresponding methods
(`send[_with]`) have been added to `Connection`.

This is another step towards having callers manage their own
connections, so that we can remove the `ConnectionPool`. For now,
callers can be updated trivially by chaining `Endpoint::connect_to`
(which will look in the pool or create a new connection, and return it)
and `Connection::send`, e.g.

    endpoint.connect_to(&addr).await?;
    endpoint.send_message(msg, &addr, 0).await?

Becomes:

    endpoint
        .connect_to(&addr)
        .await?
        .send(msg)
        .await?

BREAKING CHANGE: `Endpoint::send_message` and
`Endpoint::send_message_with` have been removed. Use
`Endpoint::connect_to` in combination with `Connection::send` or
`Connection::send_with` instead.
  • Loading branch information
Chris Connelly authored and joshuef committed Sep 24, 2021
1 parent c08b0f1 commit 0f95b8a
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 116 deletions.
8 changes: 5 additions & 3 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ async fn main() -> Result<()> {
.expect("Invalid SocketAddr. Use the form 127.0.0.1:1234");
let msg = Bytes::from(MSG_MARCO);
println!("Sending to {:?} --> {:?}\n", peer, msg);
node.connect_to(&peer).await?;
node.send_message(msg.clone(), &peer, 0).await?;
node.connect_to(&peer).await?.send(msg.clone()).await?;
}
}

Expand All @@ -74,7 +73,10 @@ async fn main() -> Result<()> {
println!("Received from {:?} --> {:?}", socket_addr, bytes);
if bytes == *MSG_MARCO {
let reply = Bytes::from(MSG_POLO);
node.send_message(reply.clone(), &socket_addr, 0).await?;
node.connect_to(&socket_addr)
.await?
.send(reply.clone())
.await?;
println!("Replied to {:?} --> {:?}", socket_addr, reply);
}
println!();
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub(crate) struct InternalConfig {
pub(crate) external_port: Option<u16>,
pub(crate) external_ip: Option<IpAddr>,
pub(crate) upnp_lease_duration: Duration,
pub(crate) retry_config: RetryConfig,
pub(crate) retry_config: Arc<RetryConfig>,
}

impl InternalConfig {
Expand All @@ -244,7 +244,7 @@ impl InternalConfig {
external_port: config.external_port,
external_ip: config.external_ip,
upnp_lease_duration,
retry_config: config.retry_config,
retry_config: Arc::new(config.retry_config),
})
}

Expand Down
48 changes: 43 additions & 5 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::Endpoint;

use super::{
connection_pool::{ConnId, ConnectionPool, ConnectionRemover},
error::{ConnectionError, RecvError, RpcError, SendError, SerializationError},
wire_msg::WireMsg,
};
use crate::{Endpoint, RetryConfig};
use bytes::Bytes;
use futures::{future, stream::StreamExt};
use std::{fmt::Debug, net::SocketAddr};
use std::{fmt::Debug, net::SocketAddr, sync::Arc};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{timeout, Duration};
use tracing::{trace, warn};
Expand All @@ -34,6 +33,7 @@ use tracing::{trace, warn};
#[derive(Clone)]
pub struct Connection<I: ConnId> {
quic_conn: quinn::Connection,
default_retry_config: Arc<RetryConfig>,
remover: ConnectionRemover<I>,
}

Expand All @@ -50,8 +50,16 @@ impl DisconnectionEvents {
}

impl<I: ConnId> Connection<I> {
pub(crate) fn new(quic_conn: quinn::Connection, remover: ConnectionRemover<I>) -> Self {
Self { quic_conn, remover }
pub(crate) fn new(
quic_conn: quinn::Connection,
default_retry_config: Arc<RetryConfig>,
remover: ConnectionRemover<I>,
) -> Self {
Self {
quic_conn,
default_retry_config,
remover,
}
}

/// Get the ID under which the connection is stored in the pool.
Expand All @@ -64,6 +72,36 @@ impl<I: ConnId> Connection<I> {
self.quic_conn.remote_address()
}

/// Send a message to the peer with default configuration.
///
/// The message will be sent on a unidirectional QUIC stream, meaning the application is
/// responsible for correlating any anticipated responses from incoming streams.
///
/// The priority will be `0` and retry behaviour will be determined by the
/// [`Config`](crate::Config) that was used to construct the [`Endpoint`] this connection
/// belongs to. See [`send_message_with`](Self::send_message_with) if you want to send a message
/// with specific configuration.
pub async fn send(&self, msg: Bytes) -> Result<(), SendError> {
self.send_with(msg, 0, None).await
}

/// Send a message to the peer using the given configuration.
///
/// See [`send_message`](Self::send_message) if you want to send with the default configuration.
pub async fn send_with(
&self,
msg: Bytes,
priority: i32,
retry_config: Option<&RetryConfig>,
) -> Result<(), SendError> {
retry_config
.unwrap_or_else(|| self.default_retry_config.as_ref())
.retry(|| async { Ok(self.send_uni(msg.clone(), priority).await?) })
.await?;

Ok(())
}

/// Priority default is 0. Both lower and higher can be passed in.
pub(crate) async fn open_bi(
&self,
Expand Down
83 changes: 15 additions & 68 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::wire_msg::WireMsg;
use super::{
config::{Config, InternalConfig, RetryConfig},
connection_deduplicator::{ConnectionDeduplicator, DedupHandle},
connection_pool::{ConnId, ConnectionPool},
connection_pool::{ConnId, ConnectionPool, ConnectionRemover},
connections::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection,
DisconnectionEvents, RecvStream, SendStream,
Expand Down Expand Up @@ -364,15 +364,15 @@ impl<I: ConnId> Endpoint<I> {
self.connection_pool
.get_by_addr(addr)
.await
.map(|(connection, remover)| Connection::new(connection, remover))
.map(|(connection, remover)| self.wrap_connection(connection, remover))
}

/// Get the existing `Connection` for the given ID.
pub async fn get_connection_by_id(&self, id: &I) -> Option<Connection<I>> {
self.connection_pool
.get_by_id(id)
.await
.map(|(connection, remover)| Connection::new(connection, remover))
.map(|(connection, remover)| self.wrap_connection(connection, remover))
}

/// Open a bi-directional stream with a given peer.
Expand Down Expand Up @@ -460,7 +460,7 @@ impl<I: ConnId> Endpoint<I> {
) -> Result<(), Option<SendError>> {
if let Some((conn, guard)) = self.connection_pool.get_by_addr(dest).await {
trace!("Connection exists in the connection pool: {}", dest);
let connection = Connection::new(conn, guard);
let connection = self.wrap_connection(conn, guard);
retries
.unwrap_or(&self.config.retry_config)
.retry(|| async { Ok(connection.send_uni(msg.clone(), priority).await?) })
Expand All @@ -471,68 +471,6 @@ impl<I: ConnId> Endpoint<I> {
}
}

/// Sends a message to a peer.
///
/// # Priority
///
/// Locally buffered data from streams with higher priority will be transmitted before data from
/// streams with lower priority. Changing the priority of a stream with pending data may only
/// take effect after that data has been transmitted. Using many different priority levels per
/// connection may have a negative impact on performance.
///
/// `0` is a sensible default for 'normal' priority.
///
/// # Connection pooling
///
/// Connections are stored in an internal pool and reused if possible. A connection remains in
/// the pool until either side closes the connection (including due to timeouts or errors). This
/// method will check the pool before opening a new connection. If a new connection is opened,
/// it will be added to the pool.
pub async fn send_message(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
) -> Result<(), SendError> {
self.send_message_with(msg, dest, priority, None).await
}

/// Send a message to a peer using given retry configuration.
///
/// The given `retries`, if any, will override the [`Config::retry_config`] used to create the
/// endpoint.
///
/// # Priority
///
/// Locally buffered data from streams with higher priority will be transmitted before data from
/// streams with lower priority. Changing the priority of a stream with pending data may only
/// take effect after that data has been transmitted. Using many different priority levels per
/// connection may have a negative impact on performance.
///
/// `0` is a sensible default for 'normal' priority.
///
/// # Connection pooling
///
/// Connections are stored in an internal pool and reused if possible. A connection remains in
/// the pool until either side closes the connection (including due to timeouts or errors). This
/// method will check the pool before opening a new connection. If a new connection is opened,
/// it will be added to the pool.
pub async fn send_message_with(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
retries: Option<&RetryConfig>,
) -> Result<(), SendError> {
let connection = self.get_or_connect_to(dest).await?;
retries
.unwrap_or(&self.config.retry_config)
.retry(|| async { Ok(connection.send_uni(msg.clone(), priority).await?) })
.await?;

Ok(())
}

/// Close all the connections of this endpoint immediately and stop accepting new connections.
pub fn close(&self) {
let _ = self.termination_tx.send(());
Expand All @@ -547,7 +485,7 @@ impl<I: ConnId> Endpoint<I> {
let completion = loop {
if let Some((conn, remover)) = self.connection_pool.get_by_addr(addr).await {
trace!("We are already connected to this peer: {}", addr);
return Ok(Connection::new(conn, remover));
return Ok(self.wrap_connection(conn, remover));
}

// Check if a connect attempt to this address is already in progress.
Expand Down Expand Up @@ -579,7 +517,7 @@ impl<I: ConnId> Endpoint<I> {
);

let _ = completion.complete(Ok(()));
Ok(Connection::new(connection, remover))
Ok(self.wrap_connection(connection, remover))
}
Err(error) => {
let _ = completion.complete(Err(error.clone()));
Expand Down Expand Up @@ -731,6 +669,15 @@ impl<I: ConnId> Endpoint<I> {
msg => Err(RecvError::Serialization(SerializationError::unexpected(msg)).into()),
}
}

/// Wrap a quinn connection, setting the default retry config and pool remover.
fn wrap_connection(
&self,
connection: quinn::Connection,
remover: ConnectionRemover<I>,
) -> Connection<I> {
Connection::new(connection, self.config.retry_config.clone(), remover)
}
}

// a private helper struct for passing a bunch of channel-related things
Expand Down
Loading

0 comments on commit 0f95b8a

Please sign in to comment.