Skip to content

Commit

Permalink
fix(client): parse notification without params (#1436)
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 authored Aug 1, 2024
1 parent bc7f8eb commit 6c29395
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
20 changes: 20 additions & 0 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,26 @@ async fn notification_handler_works() {
}
}

#[tokio::test]
async fn notification_no_params() {
let server = WebSocketTestServer::with_hardcoded_notification(
"127.0.0.1:0".parse().unwrap(),
server_notification_without_params("no_params"),
)
.with_default_timeout()
.await
.unwrap();

let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
{
let mut nh: Subscription<serde_json::Value> =
client.subscribe_to_method("no_params").with_default_timeout().await.unwrap().unwrap();
let response = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap();
assert_eq!(response, serde_json::Value::Null);
}
}

#[tokio::test]
async fn batched_notifs_works() {
init_logger();
Expand Down
10 changes: 5 additions & 5 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::async_client::LOG_TARGET;
use crate::client::async_client::{Notification, LOG_TARGET};
use crate::client::{subscription_channel, Error, RequestMessage, TransportSenderT, TrySubscriptionSendError};
use crate::params::ArrayParams;
use crate::traits::ToRpcParams;
Expand All @@ -36,8 +36,7 @@ use tokio::sync::oneshot;

use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
ErrorObject, Id, InvalidRequestId, Notification, RequestSer, Response, ResponseSuccess, SubscriptionId,
SubscriptionResponse,
ErrorObject, Id, InvalidRequestId, RequestSer, Response, ResponseSuccess, SubscriptionId, SubscriptionResponse,
};
use serde_json::Value as JsonValue;
use std::ops::Range;
Expand Down Expand Up @@ -148,9 +147,10 @@ pub(crate) fn process_subscription_close_response(
/// will continue.
///
/// It's possible that user close down the subscription before this notification is received.
pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification<JsonValue>) {
pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification) {
match manager.as_notification_handler_mut(notif.method.to_string()) {
Some(send_back_sink) => match send_back_sink.send(notif.params) {
// If the notification doesn't have params, we just send an empty JSON object to indicate that to the user.
Some(send_back_sink) => match send_back_sink.send(notif.params.unwrap_or_default()) {
Ok(()) => (),
Err(TrySubscriptionSendError::Closed) => {
let _ = manager.remove_notification_handler(&notif.method);
Expand Down
9 changes: 5 additions & 4 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,16 @@ use futures_util::future::{self, Either};
use futures_util::stream::StreamExt;
use futures_util::Stream;
use jsonrpsee_types::response::{ResponsePayload, SubscriptionError};
use jsonrpsee_types::{Notification, NotificationSer, RequestSer, Response, SubscriptionResponse};
use jsonrpsee_types::{NotificationSer, RequestSer, Response, SubscriptionResponse};
use serde::de::DeserializeOwned;
use tokio::sync::{mpsc, oneshot};
use tracing::instrument;

use self::utils::{InactivityCheck, IntervalStream};

use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager};

pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<serde_json::Value>>;

const LOG_TARGET: &str = "jsonrpsee-client";
const NOT_POISONED: &str = "Not poisoned; qed";

Expand Down Expand Up @@ -728,7 +729,7 @@ fn handle_backend_messages<R: TransportReceiverT>(
process_subscription_close_response(&mut manager.lock(), response);
}
// Incoming Notification
else if let Ok(notif) = serde_json::from_slice::<Notification<_>>(raw) {
else if let Ok(notif) = serde_json::from_slice::<Notification>(raw) {
process_notification(&mut manager.lock(), notif);
} else {
return Err(unparse_error(raw));
Expand Down Expand Up @@ -765,7 +766,7 @@ fn handle_backend_messages<R: TransportReceiverT>(
} else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
got_notif = true;
process_subscription_close_response(&mut manager.lock(), response);
} else if let Ok(notif) = serde_json::from_str::<Notification<_>>(r.get()) {
} else if let Ok(notif) = serde_json::from_str::<Notification>(r.get()) {
got_notif = true;
process_notification(&mut manager.lock(), notif);
} else {
Expand Down
5 changes: 5 additions & 0 deletions test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ pub fn server_notification(method: &str, params: Value) -> String {
format!(r#"{{"jsonrpc":"2.0","method":"{}", "params":{} }}"#, method, serde_json::to_string(&params).unwrap())
}

/// Server originated notification without params.
pub fn server_notification_without_params(method: &str) -> String {
format!(r#"{{"jsonrpc":"2.0","method":"{}"}}"#, method)
}

pub async fn http_request(body: Body, uri: Uri) -> Result<HttpResponse, String> {
let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http();
http_post(client, body, uri).await
Expand Down

0 comments on commit 6c29395

Please sign in to comment.