Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: syncronize ws ping/pong messages #1437

Closed
wants to merge 16 commits into from
8 changes: 6 additions & 2 deletions examples/examples/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
use std::net::SocketAddr;

use jsonrpsee::core::client::ClientT;
use jsonrpsee::server::{RpcServiceBuilder, Server};
use jsonrpsee::server::{PingConfig, RpcServiceBuilder, Server};
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::{rpc_params, RpcModule};
use tracing_subscriber::util::SubscriberInitExt;
Expand All @@ -51,7 +51,11 @@ async fn main() -> anyhow::Result<()> {

async fn run_server() -> anyhow::Result<SocketAddr> {
let rpc_middleware = RpcServiceBuilder::new().rpc_logger(1024);
let server = Server::builder().set_rpc_middleware(rpc_middleware).build("127.0.0.1:0").await?;
let server = Server::builder()
.enable_ws_ping(PingConfig::new())
.set_rpc_middleware(rpc_middleware)
.build("127.0.0.1:0")
.await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _, _| "lo")?;
let addr = server.local_addr()?;
Expand Down
178 changes: 119 additions & 59 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use crate::future::{IntervalStream, SessionClose};
use crate::middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceCfg, RpcServiceT};
use crate::server::{handle_rpc_call, ConnectionState, ServerConfig};
use crate::utils::PendingPings;
use crate::{HttpBody, HttpRequest, HttpResponse, PingConfig, LOG_TARGET};

use futures_util::future::{self, Either};
use futures_util::future::{self, Either, Fuse};
use futures_util::io::{BufReader, BufWriter};
use futures_util::{Future, StreamExt, TryStreamExt};
use futures_util::{Future, FutureExt, StreamExt, TryStreamExt};
use hyper::upgrade::Upgraded;
use hyper_util::rt::TokioIo;
use jsonrpsee_core::server::{BoundedSubscriptions, MethodSink, Methods};
Expand All @@ -18,7 +19,7 @@ use soketto::connection::Error as SokettoError;
use soketto::data::ByteSlice125;

use tokio::sync::{mpsc, oneshot};
use tokio::time::{interval, interval_at};
use tokio::time::interval;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};

Expand All @@ -38,7 +39,6 @@ pub(crate) async fn send_message(sender: &mut Sender, response: String) -> Resul
}

pub(crate) async fn send_ping(sender: &mut Sender) -> Result<(), SokettoError> {
tracing::debug!(target: LOG_TARGET, "Send ping");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this, it's quite useless and possible get it by enabling soketto logs

// Submit empty slice as "optional" parameter.
let slice: &[u8] = &[];
// Byte slice fails if the provided slice is larger than 125 bytes.
Expand Down Expand Up @@ -76,17 +76,28 @@ where
mut on_session_close,
extensions,
} = params;

let conn_id = conn.conn_id;
let ServerConfig { ping_config, batch_requests_config, max_request_body_size, max_response_body_size, .. } =
server_cfg;

let (conn_tx, conn_rx) = oneshot::channel();

// Spawn ping/pong task if ping config is provided.
let ping_config = if let Some(ping_config) = ping_config {
let (ping_tx, ping_rx) = mpsc::channel::<KeepAlive>(4);
tokio::spawn(ping_pong_task(ping_rx, ping_config.inactive_limit, ping_config.max_failures, conn_id));
Some((ping_config, ping_tx))
} else {
None
};

let ping_tx = ping_config.as_ref().map(|(_, tx)| tx.clone());

// Spawn another task that sends out the responses on the Websocket.
let send_task_handle = tokio::spawn(send_task(rx, ws_sender, ping_config, conn_rx));

let stopped = conn.stop_handle.clone().shutdown();
let rpc_service = Arc::new(rpc_service);
let mut missed_pings = 0;

tokio::pin!(stopped);

Expand All @@ -106,8 +117,9 @@ where
tokio::pin!(ws_stream);

let result = loop {
let data = match try_recv(&mut ws_stream, stopped, ping_config, &mut missed_pings).await {
let data = match try_recv(&mut ws_stream, stopped, ping_tx.as_ref()).await {
Receive::ConnectionClosed => break Ok(Shutdown::ConnectionClosed),
Receive::KeepAliveExpired => break Ok(Shutdown::KeepAliveExpired),
Receive::Stopped => break Ok(Shutdown::Stopped),
Receive::Ok(data, stop) => {
stopped = stop;
Expand All @@ -134,7 +146,6 @@ where
continue;
}
err => {
tracing::debug!(target: LOG_TARGET, "WS error: {}; terminate connection: {}", err, conn.conn_id);
break Err(err);
}
};
Expand Down Expand Up @@ -186,6 +197,8 @@ where
});
};

tracing::debug!(target: LOG_TARGET, "Connection closed for conn_id={conn_id}, reason={:?}", result);

// Drive all running methods to completion.
// **NOTE** Do not return early in this function. This `await` needs to run to guarantee
// proper drop behaviour.
Expand All @@ -203,23 +216,23 @@ where
async fn send_task(
rx: mpsc::Receiver<String>,
mut ws_sender: Sender,
ping_config: Option<PingConfig>,
ping_config: Option<(PingConfig, mpsc::Sender<KeepAlive>)>,
stop: oneshot::Receiver<()>,
) {
let ping_interval = match ping_config {
None => IntervalStream::pending(),
// NOTE: we are emitted a tick here immediately to sync
// with how the receive task work because it starts measuring the pong
// when it starts up.
Some(p) => IntervalStream::new(interval(p.ping_interval)),
// Ping task is only spawned if ping config is provided.
let ping = match ping_config {
None => Either::Left(IntervalStream::pending().map(|_| None)),
Some((p, ping_tx)) => {
Either::Right(IntervalStream::new(interval(p.ping_interval)).map(move |_| Some(ping_tx.clone())))
}
};
let rx = ReceiverStream::new(rx);

tokio::pin!(ping_interval, rx, stop);
tokio::pin!(ping, rx, stop);

// Received messages from the WebSocket.
let mut rx_item = rx.next();
let next_ping = ping_interval.next();
let next_ping = ping.next();
let mut futs = future::select(next_ping, stop);

loop {
Expand All @@ -244,16 +257,29 @@ async fn send_task(
}

// Handle timer intervals.
Either::Right((Either::Left((_instant, _stopped)), next_rx)) => {
Either::Right((Either::Left((Some(ping_tx), _stopped)), next_rx)) => {
stop = _stopped;
if let Err(err) = send_ping(&mut ws_sender).await {
tracing::debug!(target: LOG_TARGET, "WS send ping error: {}", err);
break;
}

rx_item = next_rx;
futs = future::select(ping_interval.next(), stop);

let ping_tx = ping_tx.expect("ping tx is only `None` if ping_config is `None` checked above; qed");
tokio::spawn(async move {
ping_tx.send(KeepAlive::Ping(Instant::now())).await.ok();
});
Comment on lines +270 to +272
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no ping_rx (ie because no ping config, so it's dropped ie on https://github.dev/paritytech/jsonrpsee/blob/705148f0c05dca1bf945e2387a866561a58beac5/server/src/transport/ws.rs#L92), I guess we are just doing this work for no gain. Is it worth putting an if around the task spawning to avoid in this case or does it over complicate things?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I can make it an option instead.


futs = future::select(ping.next(), stop);
}

// The interval stream has been closed.
// This should be unreachable because the interval stream never ends.
Either::Right((Either::Left((None, _stopped)), _)) => {
break;
}

Either::Right((Either::Right((_stopped, _)), _)) => {
// server has stopped
break;
Expand All @@ -268,68 +294,101 @@ async fn send_task(

enum Receive<S> {
ConnectionClosed,
KeepAliveExpired,
Stopped,
Err(SokettoError, S),
Ok(Vec<u8>, S),
}

/// Attempts to read data from WebSocket fails if the server was stopped.
async fn try_recv<T, S>(
ws_stream: &mut T,
mut stopped: S,
ping_config: Option<PingConfig>,
missed_pings: &mut usize,
) -> Receive<S>
async fn try_recv<T, S>(ws_stream: &mut T, stopped: S, ping_tx: Option<&mpsc::Sender<KeepAlive>>) -> Receive<S>
where
S: Future<Output = ()> + Unpin,
T: StreamExt<Item = Result<Incoming, SokettoError>> + Unpin,
{
let mut last_active = Instant::now();
let inactivity_check = match ping_config {
Some(p) => IntervalStream::new(interval_at(tokio::time::Instant::now() + p.ping_interval, p.ping_interval)),
None => IntervalStream::pending(),
let mut futs = future::select(ws_stream.next(), stopped);
let closed = match ping_tx {
Some(ping_tx) => ping_tx.closed().fuse(),
None => Fuse::terminated(),
};

tokio::pin!(inactivity_check);

let mut futs = futures_util::future::select(ws_stream.next(), inactivity_check.next());
tokio::pin!(closed);

loop {
match futures_util::future::select(futs, stopped).await {
match future::select(futs, closed).await {
// The connection is closed.
Either::Left((Either::Left((None, _)), _)) => break Receive::ConnectionClosed,
// The message has been received, we are done
Either::Left((Either::Left((Some(Ok(Incoming::Data(d))), _)), s)) => break Receive::Ok(d, s),
// Got a pong response, update our "last seen" timestamp.
Either::Left((Either::Left((Some(Ok(Incoming::Pong)), inactive)), s)) => {
last_active = Instant::now();
stopped = s;
futs = futures_util::future::select(ws_stream.next(), inactive);
Either::Left((Either::Left((Some(Ok(Incoming::Data(d))), s)), _)) => {
if let Some(ping_tx) = ping_tx {
let ping_tx = ping_tx.clone();
tokio::spawn(async move {
_ = ping_tx.send(KeepAlive::Data(Instant::now())).await;
});
}

break Receive::Ok(d, s);
}
// Got a pong response send status to the ping_pong_task.
Either::Left((Either::Left((Some(Ok(Incoming::Pong)), s)), c)) => {
if let Some(ping_tx) = ping_tx {
let ping_tx = ping_tx.clone();
tokio::spawn(async move {
_ = ping_tx.send(KeepAlive::Pong(Instant::now())).await;
});
}
futs = futures_util::future::select(ws_stream.next(), s);
closed = c;
}
// Received an error, terminate the connection.
Either::Left((Either::Left((Some(Err(e)), _)), s)) => break Receive::Err(e, s),
// Max inactivity timeout fired, check if the connection has been idle too long.
Either::Left((Either::Right((_instant, rcv)), s)) => {
if let Some(p) = ping_config {
if last_active.elapsed() > p.inactive_limit {
*missed_pings += 1;

if *missed_pings >= p.max_failures {
tracing::debug!(
target: LOG_TARGET,
"WS ping/pong inactivity limit `{}` exceeded; closing connection",
p.max_failures,
);
break Receive::ConnectionClosed;
Either::Left((Either::Left((Some(Err(e)), s)), _)) => break Receive::Err(e, s),

// Server has been stopped or closed by inactive peer.
Either::Left((Either::Right((_, _)), _)) => break Receive::Stopped,
// Ping task has been stopped.
Either::Right((_, _)) => break Receive::KeepAliveExpired,
}
}
}

#[derive(Debug, Copy, Clone)]
pub(crate) enum KeepAlive {
Ping(Instant),
Data(Instant),
Pong(Instant),
}

async fn ping_pong_task(
mut rx: mpsc::Receiver<KeepAlive>,
max_inactivity_dur: Duration,
max_missed_pings: usize,
conn_id: u32,
) {
let mut polling_interval = IntervalStream::new(interval(max_inactivity_dur));
let mut pending_pings = PendingPings::new(max_missed_pings, max_inactivity_dur, conn_id);

loop {
tokio::select! {
// If the ping is never answered, we use this timer as a fallback.
_ = polling_interval.next() => {
if !pending_pings.check_alive() {
break;
}
}
// Data on the connection.
msg = rx.recv() => {
match msg {
Some(KeepAlive::Ping(start)) => {
pending_pings.push(start);
}
Some(KeepAlive::Pong(end)) | Some(KeepAlive::Data(end)) => {
if !pending_pings.alive_response(end) {
break;
}
}
None => break,
}

stopped = s;
futs = futures_util::future::select(rcv, inactivity_check.next());
}
// Server has been stopped.
Either::Right(_) => break Receive::Stopped,
}
}
}
Expand All @@ -338,6 +397,7 @@ where
pub(crate) enum Shutdown {
Stopped,
ConnectionClosed,
KeepAliveExpired,
}

/// Enforce a graceful shutdown.
Expand Down
Loading
Loading