Skip to content

Commit

Permalink
refactor(rpc_module): RpcModule::raw_json_request -> String (#1287)
Browse files Browse the repository at this point in the history
* change RpcModule::<call, subscribe, raw> -> String

This PR changes the RpcModule to support the new async API in the
MethodResponse and RpcModule::<call, subscribe, raw> now returns a
String instead of MethodResponse.

* refactor async response payload test
  • Loading branch information
niklasad1 authored Feb 7, 2024
1 parent 8470f2b commit e1e7d73
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 103 deletions.
28 changes: 18 additions & 10 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub type MaxResponseSize = usize;
/// A tuple containing:
/// - Call result as a `String`,
/// - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
pub type RawRpcResponse = (MethodResponse, mpsc::Receiver<String>);
pub type RawRpcResponse = (String, mpsc::Receiver<String>);

/// The error that can occur when [`Methods::call`] or [`Methods::subscribe`] is invoked.
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -290,8 +290,9 @@ impl Methods {
let params = params.to_rpc_params()?;
let req = Request::new(method.into(), params.as_ref().map(|p| p.as_ref()), Id::Number(0));
tracing::trace!(target: LOG_TARGET, "[Methods::call] Method: {:?}, params: {:?}", method, params);
let (resp, _) = self.inner_call(req, 1, mock_subscription_permit()).await;
let rp = serde_json::from_str::<Response<T>>(resp.as_result())?;
let (rp, _) = self.inner_call(req, 1, mock_subscription_permit()).await;

let rp = serde_json::from_str::<Response<T>>(&rp)?;
ResponseSuccess::try_from(rp).map(|s| s.result).map_err(|e| MethodsError::JsonRpc(e.into_owned()))
}

Expand All @@ -318,7 +319,8 @@ impl Methods {
/// Ok(())
/// }).unwrap();
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#, 1).await.unwrap();
/// let resp: Success<u64> = serde_json::from_str::<Response<u64>>(&resp.as_result()).unwrap().try_into().unwrap();
/// // If the response is an error converting it to `Success` will fail.
/// let resp: Success<u64> = serde_json::from_str::<Response<u64>>(&resp).unwrap().try_into().unwrap();
/// let sub_resp = stream.recv().await.unwrap();
/// assert_eq!(
/// format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result),
Expand All @@ -330,7 +332,7 @@ impl Methods {
&self,
request: &str,
buf_size: usize,
) -> Result<(MethodResponse, mpsc::Receiver<String>), serde_json::Error> {
) -> Result<(String, mpsc::Receiver<String>), serde_json::Error> {
tracing::trace!("[Methods::raw_json_request] Request: {:?}", request);
let req: Request = serde_json::from_str(request)?;
let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await;
Expand Down Expand Up @@ -369,9 +371,16 @@ impl Methods {
Some(MethodCallback::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
};

tracing::trace!(target: LOG_TARGET, "[Methods::inner_call] Method: {}, response: {:?}", req.method, response);
let is_success = response.is_success();
let (rp, notif) = response.into_parts();

if let Some(n) = notif {
n.notify(is_success);
}

tracing::trace!(target: LOG_TARGET, "[Methods::inner_call] Method: {}, response: {}", req.method, rp);

(response, rx)
(rp, rx)
}

/// Helper to create a subscription on the `RPC module` without having to spin up a server.
Expand Down Expand Up @@ -426,10 +435,9 @@ impl Methods {
let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await;

// TODO: hack around the lifetime on the `SubscriptionId` by deserialize first to serde_json::Value.
let as_success: ResponseSuccess<serde_json::Value> =
serde_json::from_str::<Response<_>>(resp.as_result())?.try_into()?;
let as_success: ResponseSuccess<serde_json::Value> = serde_json::from_str::<Response<_>>(&resp)?.try_into()?;

let sub_id = as_success.result.try_into().map_err(|_| MethodsError::InvalidSubscriptionId(resp.to_result()))?;
let sub_id = as_success.result.try_into().map_err(|_| MethodsError::InvalidSubscriptionId(resp.clone()))?;

Ok(Subscription { sub_id, rx })
}
Expand Down
60 changes: 3 additions & 57 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
use jsonrpsee::server::{
PendingSubscriptionSink, RpcModule, Server, ServerBuilder, ServerHandle, SubscriptionMessage, TrySendError,
};
use jsonrpsee::types::{ErrorCode, ErrorObject, ErrorObjectOwned};
use jsonrpsee::{MethodResponseError, ResponsePayload, SubscriptionCloseResponse};
use serde::{Deserialize, Serialize};
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
use jsonrpsee::SubscriptionCloseResponse;
use serde::Serialize;
use tokio::net::TcpStream;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use tower_http::cors::CorsLayer;
Expand Down Expand Up @@ -288,56 +287,3 @@ pub async fn connect_over_socks_stream(server_addr: SocketAddr) -> Socks5Stream<
.await
.unwrap()
}

#[derive(Copy, Clone, Deserialize, Serialize)]
pub enum Notify {
Success,
Error,
All,
}

pub type NotifyRpcModule = RpcModule<UnboundedSender<Result<(), MethodResponseError>>>;
pub type Sender = tokio::sync::mpsc::UnboundedSender<Result<(), MethodResponseError>>;
pub type Receiver = tokio::sync::mpsc::UnboundedReceiver<Result<(), MethodResponseError>>;

pub async fn run_test_notify_test(
module: &NotifyRpcModule,
server_rx: &mut Receiver,
is_success: bool,
kind: Notify,
) -> Result<(), MethodResponseError> {
use jsonrpsee_test_utils::mocks::Id;

let req = jsonrpsee_test_utils::helpers::call("hey", vec![kind], Id::Num(1));
let (rp, _) = module.raw_json_request(&req, 1).await.unwrap();
let (_, notify_rx) = rp.into_parts();
notify_rx.unwrap().notify(is_success);
server_rx.recv().await.expect("Channel is not dropped")
}

/// Helper module that will send the results on the channel passed in.
pub fn rpc_module_notify_on_response(tx: Sender) -> NotifyRpcModule {
let mut module = RpcModule::new(tx);

module
.register_method("hey", |params, ctx| {
let kind: Notify = params.one().unwrap();
let server_sender = ctx.clone();

let (rp, rp_future) = match kind {
Notify::All => ResponsePayload::success("lo").notify_on_completion(),
Notify::Success => ResponsePayload::success("lo").notify_on_completion(),
Notify::Error => ResponsePayload::error(ErrorCode::InvalidParams).notify_on_completion(),
};

tokio::spawn(async move {
let rp = rp_future.await;
server_sender.send(rp).unwrap();
});

rp
})
.unwrap();

module
}
6 changes: 3 additions & 3 deletions tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ async fn macro_optional_param_parsing() {
.raw_json_request(r#"{"jsonrpc":"2.0","method":"foo_optional_params","params":{"a":22,"c":50},"id":0}"#, 1)
.await
.unwrap();
assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":"Called with: 22, None, Some(50)","id":0}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":"Called with: 22, None, Some(50)","id":0}"#);
}

#[tokio::test]
Expand All @@ -290,14 +290,14 @@ async fn macro_zero_copy_cow() {
.unwrap();

// std::borrow::Cow<str> always deserialized to owned variant here
assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":"Zero copy params: false, true","id":0}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":"Zero copy params: false, true","id":0}"#);

// serde_json will have to allocate a new string to replace `\t` with byte 0x09 (tab)
let (resp, _) = module
.raw_json_request(r#"{"jsonrpc":"2.0","method":"foo_zero_copy_cow","params":["\tfoo", "\tbar"],"id":0}"#, 1)
.await
.unwrap();
assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":"Zero copy params: false, false","id":0}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":"Zero copy params: false, false","id":0}"#);
}

// Disabled on MacOS as GH CI timings on Mac vary wildly (~100ms) making this test fail.
Expand Down
75 changes: 42 additions & 33 deletions tests/tests/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,11 @@ use jsonrpsee::core::{server::*, RpcResult};
use jsonrpsee::types::error::{ErrorCode, ErrorObject, INVALID_PARAMS_MSG, PARSE_ERROR_CODE};
use jsonrpsee::types::{ErrorObjectOwned, Params, Response, ResponsePayload};
use jsonrpsee::SubscriptionMessage;
use jsonrpsee_test_utils::mocks::Id;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

use crate::helpers::{rpc_module_notify_on_response, run_test_notify_test, Notify};

// Helper macro to assert that a binding is of a specific type.
macro_rules! assert_type {
( $ty:ty, $expected:expr $(,)?) => {{
Expand Down Expand Up @@ -387,13 +384,13 @@ async fn subscribe_unsubscribe_without_server() {
let unsub_req = format!("{{\"jsonrpc\":\"2.0\",\"method\":\"my_unsub\",\"params\":[{}],\"id\":1}}", ser_id);
let (resp, _) = module.raw_json_request(&unsub_req, 1).await.unwrap();

assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":true,"id":1}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":true,"id":1}"#);

// Unsubscribe already performed; should be error.
let unsub_req = format!("{{\"jsonrpc\":\"2.0\",\"method\":\"my_unsub\",\"params\":[{}],\"id\":1}}", ser_id);
let (resp, _) = module.raw_json_request(&unsub_req, 2).await.unwrap();

assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":false,"id":1}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":false,"id":1}"#);
}

let sub1 = subscribe_and_assert(&module);
Expand Down Expand Up @@ -433,7 +430,7 @@ async fn reject_works() {
.unwrap();

let (rp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"my_sub","id":0}"#, 1).await.unwrap();
assert_eq!(rp.into_result(), r#"{"jsonrpc":"2.0","error":{"code":-32700,"message":"rejected"},"id":0}"#);
assert_eq!(rp, r#"{"jsonrpc":"2.0","error":{"code":-32700,"message":"rejected"},"id":0}"#);
assert!(stream.recv().await.is_none());
}

Expand Down Expand Up @@ -524,7 +521,7 @@ async fn serialize_sub_error_adds_extra_string_quotes() {
.unwrap();

let (rp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"my_sub","id":0}"#, 1).await.unwrap();
let resp = serde_json::from_str::<Response<u64>>(rp.as_result()).unwrap();
let resp = serde_json::from_str::<Response<u64>>(&rp).unwrap();
let sub_resp = stream.recv().await.unwrap();

let resp = match resp.payload {
Expand Down Expand Up @@ -569,7 +566,7 @@ async fn subscription_close_response_works() {
{
let (rp, mut stream) =
module.raw_json_request(r#"{"jsonrpc":"2.0","method":"my_sub","params":[1],"id":0}"#, 1).await.unwrap();
let resp = serde_json::from_str::<Response<u64>>(rp.as_result()).unwrap();
let resp = serde_json::from_str::<Response<u64>>(&rp).unwrap();

let sub_id = match resp.payload {
ResponsePayload::Success(val) => val,
Expand All @@ -592,39 +589,51 @@ async fn subscription_close_response_works() {

#[tokio::test]
async fn method_response_notify_on_completion() {
use jsonrpsee::server::ResponsePayload;

init_logger();

// The outcome of the response future is sent out on this channel
// to test whether the call produced a valid response or not.
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let module = rpc_module_notify_on_response(tx);

assert!(
run_test_notify_test(&module, &mut rx, true, Notify::Success).await.is_ok(),
"Successful response should be notified"
);
assert!(matches!(
run_test_notify_test(&module, &mut rx, false, Notify::Success).await,
Err(MethodResponseError::JsonRpcError),
));
let module = {
let mut module = RpcModule::new(tx);

assert!(matches!(
run_test_notify_test(&module, &mut rx, false, Notify::Error).await,
Err(MethodResponseError::JsonRpcError),
));
}
module
.register_method("hey", |params, ctx| {
let kind: String = params.one().unwrap();
let server_sender = ctx.clone();

#[tokio::test]
async fn method_response_dropped() {
init_logger();
let (rp, rp_future) = if kind == "success" {
ResponsePayload::success("lo").notify_on_completion()
} else {
ResponsePayload::error(ErrorCode::InvalidParams).notify_on_completion()
};

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let module = rpc_module_notify_on_response(tx);
tokio::spawn(async move {
let rp = rp_future.await;
server_sender.send(rp).unwrap();
});

rp
})
.unwrap();

module
};

let req = jsonrpsee_test_utils::helpers::call("hey", vec![Notify::Success], Id::Num(1));
// Successful call should return a successful notification.
assert_eq!(module.call::<_, String>("hey", ["success"]).await.unwrap(), "lo");
assert!(matches!(rx.recv().await, Some(Ok(_))));

// Make a call and drop the method response including its "notify sender"
// This could happen if the connection is closed.
let (rp, _) = module.raw_json_request(&req, 1).await.unwrap();
drop(rp);
// Low level call should also work.
let (rp, _) =
module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hey","params":["success"],"id":0}"#, 1).await.unwrap();
assert_eq!(rp, r#"{"jsonrpc":"2.0","result":"lo","id":0}"#);
assert!(matches!(rx.recv().await, Some(Ok(_))));

assert!(matches!(rx.recv().await, Some(Err(MethodResponseError::Closed))));
// Error call should return a failed notification.
assert!(module.call::<_, String>("hey", ["not success"]).await.is_err());
assert!(matches!(rx.recv().await, Some(Err(MethodResponseError::JsonRpcError))));
}

0 comments on commit e1e7d73

Please sign in to comment.