From 975f5775c9238209ae4b7e7ed766141a34fc92f9 Mon Sep 17 00:00:00 2001 From: Ivan Korotkov Date: Mon, 25 Dec 2023 20:40:08 +0100 Subject: [PATCH] feat(transports): add retries to HTTP transport --- Cargo.toml | 12 +- src/error.rs | 14 ++ src/transports/http.rs | 555 +++++++++++++++++++++++++++++++++++++---- 3 files changed, 527 insertions(+), 54 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eb95fdc7..c1b42598 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,8 @@ serde = { version = "1.0.90", features = ["derive"] } serde_json = "1.0.39" tiny-keccak = { version = "2.0.1", features = ["keccak"] } pin-project = "1.0" +async-recursion = "1.0.5" +chrono = "0.4.31" # Optional deps secp256k1 = { version = "0.28", features = ["recovery"], optional = true } once_cell = { version = "1.8.0", optional = true } @@ -73,7 +75,15 @@ tokio-stream = { version = "0.1", features = ["net"] } [features] default = ["http-tls", "signing", "ws-tls-tokio", "ipc-tokio"] -wasm = ["futures-timer/wasm-bindgen", "getrandom", "js-sys", "rand", "serde-wasm-bindgen", "wasm-bindgen", "wasm-bindgen-futures"] +wasm = [ + "futures-timer/wasm-bindgen", + "getrandom", + "js-sys", + "rand", + "serde-wasm-bindgen", + "wasm-bindgen", + "wasm-bindgen-futures", +] eip-1193 = ["wasm"] _http_base = ["reqwest", "bytes", "url", "base64"] http = ["_http_base"] diff --git a/src/error.rs b/src/error.rs index d858d688..5e54d10f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -16,6 +16,20 @@ pub enum TransportError { /// Arbitrary, developer-readable description of the occurred error. #[display(fmt = "{}", _0)] Message(String), + /// Recoverable rate limit error. + #[display(fmt = "rate limit: {}", _0)] + RateLimit(RateLimit), +} + +/// Recoverable rate limit error. +#[derive(Display, Debug, Clone, PartialEq)] +pub enum RateLimit { + /// Retry-After: + #[display(fmt = "retry after date {}", _0)] + Date(String), + /// Retry-After: + #[display(fmt = "retry after number of seconds {}", _0)] + Seconds(u64), } /// Errors which can occur when attempting to generate resource uri. diff --git a/src/transports/http.rs b/src/transports/http.rs index 252538e2..702ae8bf 100644 --- a/src/transports/http.rs +++ b/src/transports/http.rs @@ -1,14 +1,20 @@ //! HTTP Transport +use crate::error::RateLimit; use crate::{ error::{Error, Result, TransportError}, helpers, BatchTransport, RequestId, Transport, }; +use async_recursion::async_recursion; +use chrono::{DateTime, Utc}; +use core::time::Duration; #[cfg(not(feature = "wasm"))] use futures::future::BoxFuture; #[cfg(feature = "wasm")] use futures::future::LocalBoxFuture as BoxFuture; +use futures_timer::Delay; use jsonrpc_core::types::{Call, Output, Request, Value}; +use reqwest::header::HeaderMap; use reqwest::{Client, Url}; use serde::de::DeserializeOwned; use std::{ @@ -22,17 +28,40 @@ use std::{ /// HTTP Transport #[derive(Clone, Debug)] pub struct Http { - // Client is already an Arc so doesn't need to be part of inner. + url: Url, client: Client, + retries: Retries, inner: Arc, } #[derive(Debug)] struct Inner { - url: Url, id: AtomicUsize, } +/// Configures retries on rate limited and failed requests. +#[derive(Debug, Clone, Default)] +pub struct Retries { + /// Retries failed request X times when encountering 429 or 500+ status codes. + pub max_retries: u32, + + /// Retries failed request after X time when encountering 429 or 500+ status codes. + pub sleep_for: Duration, + + /// Retries rate limited request after time interval specified in the `Retry-After` header instead of specified sleep duration when encountering 429 status code. + pub use_retry_after_header: bool, +} + +impl Retries { + fn step(&self) -> Self { + Self { + max_retries: 0.max(self.max_retries - 1), + sleep_for: self.sleep_for * 2, + use_retry_after_header: self.use_retry_after_header, + } + } +} + impl Http { /// Create new HTTP transport connecting to given URL. /// @@ -54,10 +83,16 @@ impl Http { /// Like `new` but with a user provided client instance. pub fn with_client(client: Client, url: Url) -> Self { + Self::with_retries(client, url, Retries::default()) + } + + /// Creates client with provided client and user configured retries. + pub fn with_retries(client: Client, url: Url, retries: Retries) -> Self { Self { client, + url, + retries, inner: Arc::new(Inner { - url, id: AtomicUsize::new(0), }), } @@ -67,8 +102,8 @@ impl Http { self.inner.id.fetch_add(1, Ordering::AcqRel) } - fn new_request(&self) -> (Client, Url) { - (self.client.clone(), self.inner.url.clone()) + fn new_request(&self) -> (Client, Url, Retries) { + (self.client.clone(), self.url.clone(), self.retries.clone()) } } @@ -82,6 +117,7 @@ async fn execute_rpc(client: &Client, url: Url, request: &R .await .map_err(|err| Error::Transport(TransportError::Message(format!("failed to send request: {}", err))))?; let status = response.status(); + let headers = response.headers().clone(); let response = response.bytes().await.map_err(|err| { Error::Transport(TransportError::Message(format!( "failed to read response bytes: {}", @@ -93,6 +129,14 @@ async fn execute_rpc(client: &Client, url: Url, request: &R id, String::from_utf8_lossy(&response) ); + if status.as_u16() == 429 { + let error = match extract_retry_after_value(&headers) { + DelayAfter::Seconds(seconds) => TransportError::RateLimit(RateLimit::Seconds(seconds)), + DelayAfter::Date(date) => TransportError::RateLimit(RateLimit::Date(date)), + DelayAfter::None => TransportError::Code(status.as_u16()), + }; + return Err(Error::Transport(error)); + } if !status.is_success() { return Err(Error::Transport(TransportError::Code(status.as_u16()))); } @@ -105,6 +149,78 @@ async fn execute_rpc(client: &Client, url: Url, request: &R }) } +#[derive(Debug)] +enum DelayAfter { + Seconds(u64), + Date(String), + None, +} + +fn extract_retry_after_value(headers: &HeaderMap) -> DelayAfter { + let Some(header) = headers.get("Retry-After").and_then(|header| header.to_str().ok()) else { + return DelayAfter::None; + }; + + if let Ok(seconds) = header.parse::() { + return DelayAfter::Seconds(seconds); + } + + DelayAfter::Date(header.to_string()) +} + +#[async_recursion] +async fn execute_rpc_with_retries( + client: &Client, + url: Url, + request: &Request, + id: RequestId, + retries: Retries, +) -> Result { + match execute_rpc(client, url.clone(), request, id).await { + Ok(output) => Ok(output), + Err(Error::Transport(error)) => match error { + TransportError::Code(code) => { + if retries.max_retries <= 0 + || retries.sleep_for <= Duration::from_secs(0) + || (code != 429 && code < 500) + { + return Err(Error::Transport(error)); + } + + Delay::new(retries.sleep_for).await; + execute_rpc_with_retries(client, url, request, id, retries.step()).await + } + TransportError::Message(message) => Err(Error::Transport(TransportError::Message(message))), + TransportError::RateLimit(limit) => { + if !retries.use_retry_after_header && retries.max_retries <= 0 { + return Err(Error::Transport(TransportError::Code(429))); + } + + match limit { + RateLimit::Date(date) => { + let Ok(until) = DateTime::parse_from_rfc2822(&date) else { + return Err(Error::Transport(TransportError::Code(429))); + }; + + let from_now = until.with_timezone(&Utc::now().timezone()) - Utc::now(); + let secs = from_now.num_seconds() + 1; // +1 for rounding + if secs > 0 { + Delay::new(Duration::from_secs(secs as u64)).await; + } + + execute_rpc_with_retries(client, url, request, id, retries.step()).await + } + RateLimit::Seconds(seconds) => { + Delay::new(Duration::from_secs(seconds)).await; + execute_rpc_with_retries(client, url, request, id, retries.step()).await + } + } + } + }, + Err(err) => Err(err), + } +} + type RpcResult = Result; impl Transport for Http { @@ -117,9 +233,9 @@ impl Transport for Http { } fn send(&self, id: RequestId, call: Call) -> Self::Out { - let (client, url) = self.new_request(); + let (client, url, retries) = self.new_request(); Box::pin(async move { - let output: Output = execute_rpc(&client, url, &Request::Single(call), id).await?; + let output: Output = execute_rpc_with_retries(&client, url, &Request::Single(call), id, retries).await?; helpers::to_result_from_output(output) }) } @@ -134,10 +250,10 @@ impl BatchTransport for Http { { // Batch calls don't need an id but it helps associate the response log with the request log. let id = self.next_id(); - let (client, url) = self.new_request(); + let (client, url, retries) = self.new_request(); let (ids, calls): (Vec<_>, Vec<_>) = requests.into_iter().unzip(); Box::pin(async move { - let value = execute_rpc(&client, url, &Request::Batch(calls), id).await?; + let value = execute_rpc_with_retries(&client, url, &Request::Batch(calls), id, retries).await?; let outputs = handle_possible_error_object_for_batched_request(value)?; handle_batch_response(&ids, outputs) }) @@ -193,48 +309,145 @@ fn id_of_output(output: &Output) -> Result { mod tests { use super::*; use crate::Error::Rpc; + use core::pin::Pin; + use futures::lock::Mutex; + use futures::Future; + use hyper::body::HttpBody; + use hyper::service::{make_service_fn, service_fn}; + use hyper::{Body, Error, Method, Request, Response, Server}; use jsonrpc_core::ErrorCode; use std::net::TcpListener; + use tokio::task::JoinHandle; + use tokio::time::Instant; + + type HyperResponse = Pin>> + Send>>; + + type HyperHandler = Box) -> HyperResponse + Send + Sync>; fn get_available_port() -> Option { Some(TcpListener::bind(("127.0.0.1", 0)).ok()?.local_addr().ok()?.port()) } - async fn server(req: hyper::Request) -> hyper::Result> { - use hyper::body::HttpBody; + fn create_server(port: u16, handler: HyperHandler) -> JoinHandle<()> { + let addr = format!("127.0.0.1:{}", port); + let handler = Arc::new(handler); + let service = make_service_fn(move |_| { + let handler = handler.clone(); + async move { + let handler = handler.clone(); + Ok::<_, Error>(service_fn(move |req| { + let handler = handler.clone(); + async move { handler(req).await } + })) + } + }); + let server = Server::bind(&addr.parse().unwrap()).serve(service); + tokio::spawn(async move { + println!("Listening on http://{}", addr); + server.await.unwrap(); + }) + } + + fn create_client(port: u16, retries: Retries) -> Http { + Http::with_retries( + Client::new(), + format!("http://127.0.0.1:{}", port).parse().unwrap(), + retries, + ) + } + + fn return_429(retry_after_value: Option) -> HyperHandler { + Box::new(move |_req: Request| -> HyperResponse { + let retry_after_value = retry_after_value.clone(); + let response_body = Body::from( + r#"{ + "jsonrpc": "2.0", + "error": { + "code": 429, + "message": "Your app has exceeded its compute units per second capacity." + } + }"#, + ); + + let mut response = Response::builder().status(hyper::StatusCode::TOO_MANY_REQUESTS); + if let Some(value) = retry_after_value { + response = response.header("Retry-After", value) + } + + let response = response.body(response_body).unwrap(); + Box::pin(async move { Ok(response) }) + }) + } + + fn return_5xx(code: u16) -> HyperHandler { + Box::new(move |_req: Request| -> HyperResponse { + let response_body = Body::from( + r#"{ + "jsonrpc": "2.0", + "error": { + "code": 500, + "message": "We can't execute this request" + } + }"#, + ); + + let response = Response::builder().status(code).body(response_body).unwrap(); + Box::pin(async move { Ok(response) }) + }) + } + + fn check_and_return_mock_response(req: Request) -> HyperResponse { let expected = r#"{"jsonrpc":"2.0","method":"eth_getAccounts","params":[],"id":0}"#; let response = r#"{"jsonrpc":"2.0","id":0,"result":"x"}"#; - assert_eq!(req.method(), &hyper::Method::POST); + assert_eq!(req.method(), &Method::POST); assert_eq!(req.uri().path(), "/"); let mut content: Vec = vec![]; let mut body = req.into_body(); - while let Some(Ok(chunk)) = body.data().await { - content.extend(&*chunk); - } - assert_eq!(std::str::from_utf8(&*content), Ok(expected)); - Ok(hyper::Response::new(response.into())) + Box::pin(async move { + while let Some(Ok(chunk)) = body.data().await { + content.extend(&*chunk); + } + assert_eq!(std::str::from_utf8(&*content), Ok(expected)); + + Ok(Response::new(response.into())) + }) + } + + fn return_error_response(_req: Request) -> HyperResponse { + let response = r#"{ + "jsonrpc":"2.0", + "error":{ + "code":0, + "message":"we can't execute this request" + }, + "id":null + }"#; + Box::pin(async move { Ok(Response::new(response.into())) }) + } + + fn return_sequence(handlers: Vec) -> HyperHandler { + let handlers = Arc::new(Mutex::new(handlers)); + Box::new(move |_req: Request| -> HyperResponse { + let handlers = handlers.clone(); + Box::pin(async move { + let mut handlers = handlers.lock().await; + let handler = handlers.remove(0); + handler(_req).await + }) + }) } #[tokio::test] async fn should_make_a_request() { - use hyper::service::{make_service_fn, service_fn}; - // given - let addr = format!("127.0.0.1:{}", get_available_port().unwrap()); - // start server - let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(server)) }); - let server = hyper::Server::bind(&addr.parse().unwrap()).serve(service); - let addr_clone = addr.clone(); - tokio::spawn(async move { - println!("Listening on http://{}", addr_clone); - server.await.unwrap(); - }); + let port = get_available_port().unwrap(); + let _ = create_server(port, Box::new(check_and_return_mock_response)); + let client = create_client(port, Retries::default()); // when - let client = Http::new(&format!("http://{}", &addr)).unwrap(); println!("Sending request"); let response = client.execute("eth_getAccounts", vec![]).await; println!("Got response"); @@ -245,33 +458,12 @@ mod tests { #[tokio::test] async fn catch_generic_json_error_for_batched_request() { - use hyper::service::{make_service_fn, service_fn}; - - async fn handler(_req: hyper::Request) -> hyper::Result> { - let response = r#"{ - "jsonrpc":"2.0", - "error":{ - "code":0, - "message":"we can't execute this request" - }, - "id":null - }"#; - Ok(hyper::Response::::new(response.into())) - } - // given - let addr = format!("127.0.0.1:{}", get_available_port().unwrap()); - // start server - let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(handler)) }); - let server = hyper::Server::bind(&addr.parse().unwrap()).serve(service); - let addr_clone = addr.clone(); - tokio::spawn(async move { - println!("Listening on http://{}", addr_clone); - server.await.unwrap(); - }); + let port = get_available_port().unwrap(); + let _ = create_server(port, Box::new(return_error_response)); + let client = create_client(port, Retries::default()); // when - let client = Http::new(&format!("http://{}", &addr)).unwrap(); println!("Sending request"); let response = client .send_batch(vec![client.prepare("some_method", vec![])].into_iter()) @@ -311,4 +503,261 @@ mod tests { // The order of the ids should have been restored. assert_eq!(ids, results); } + + #[tokio::test] + async fn status_code_429_with_retry_after_as_seconds() { + // given + let port = get_available_port().unwrap(); + let _ = create_server( + port, + return_sequence(vec![ + return_429(Some("3".into())), + Box::new(check_and_return_mock_response), + ]), + ); + let client = create_client( + port, + Retries { + use_retry_after_header: true, + max_retries: 3, + ..Default::default() + }, + ); + + // when + println!("Sending request"); + let started = Instant::now(); + let response = client.execute("eth_getAccounts", vec![]).await; + let finished = Instant::now(); + println!("Got response"); + + // then + assert_eq!(response, Ok(Value::String("x".into()))); + assert!(finished - started >= Duration::from_secs(3)); + } + + #[tokio::test] + async fn status_code_429_with_retry_after_as_date() { + // given + let port = get_available_port().unwrap(); + let started = Instant::now(); + let retry_after_value: DateTime = DateTime::from(Utc::now() + Duration::from_secs(3)); + let _ = create_server( + port, + return_sequence(vec![ + return_429(Some(retry_after_value.to_rfc2822())), + Box::new(check_and_return_mock_response), + ]), + ); + let client = create_client( + port, + Retries { + use_retry_after_header: true, + max_retries: 3, + ..Default::default() + }, + ); + + // when + println!("Sending request"); + let response = client.execute("eth_getAccounts", vec![]).await; + let finished = Instant::now(); + println!("Got response"); + + // then + assert_eq!(response, Ok(Value::String("x".into()))); + assert!(finished - started >= Duration::from_secs(3)); + } + + #[tokio::test] + async fn status_code_429_with_invalid_retry_after() { + // given + let port = get_available_port().unwrap(); + let _ = create_server( + port, + return_sequence(vec![return_429(Some("retry some time later, idc".into()))]), + ); + let client = create_client( + port, + Retries { + use_retry_after_header: true, + max_retries: 3, + ..Default::default() + }, + ); + + // when + println!("Sending request"); + let response = client.execute("eth_getAccounts", vec![]).await; + println!("Got response"); + + // then + assert_eq!(response, Err(crate::Error::Transport(TransportError::Code(429)))); + } + + #[tokio::test] + async fn status_code_429_without_retry_after() { + // given + let port = get_available_port().unwrap(); + let _ = create_server(port, return_sequence(vec![return_429(None)])); + let client = create_client( + port, + Retries { + use_retry_after_header: true, + max_retries: 3, + ..Default::default() + }, + ); + + // when + println!("Sending request"); + let response = client.execute("eth_getAccounts", vec![]).await; + println!("Got response"); + + // then + assert_eq!(response, Err(crate::Error::Transport(TransportError::Code(429)))); + } + + #[tokio::test] + async fn status_code_429_retry_after_disabled() { + // given + let port = get_available_port().unwrap(); + let _ = create_server(port, return_sequence(vec![return_429(Some("3".into()))])); + let client = create_client( + port, + Retries { + use_retry_after_header: false, + max_retries: 0, + sleep_for: Duration::from_secs(1), + }, + ); + + // when + println!("Sending request"); + let response = client.execute("eth_getAccounts", vec![]).await; + println!("Got response"); + + // then + assert_eq!(response, Err(crate::Error::Transport(TransportError::Code(429)))); + } + + #[tokio::test] + async fn status_code_429_with_retries() { + // given + let port = get_available_port().unwrap(); + let _ = create_server( + port, + return_sequence(vec![ + return_429(Some("3".into())), // sleep for 1 second as configured below + return_429(Some("3".into())), // sleep for 2 seconds (2x 1sec) + Box::new(check_and_return_mock_response), + ]), + ); + let client = create_client( + port, + Retries { + use_retry_after_header: false, + max_retries: 3, + sleep_for: Duration::from_secs(1), + }, + ); + + // when + println!("Sending request"); + let started = Instant::now(); + let response = client.execute("eth_getAccounts", vec![]).await; + let finished = Instant::now(); + println!("Got response"); + + // then + assert_eq!(response, Ok(Value::String("x".into()))); + assert!(finished - started >= Duration::from_secs(3)); + } + + #[tokio::test] + async fn status_code_5xx_with_retries() { + // given + let port = get_available_port().unwrap(); + let _ = create_server( + port, + return_sequence(vec![ + return_5xx(500), // sleep for 1 second as configured below + return_5xx(502), // sleep for 2 seconds (2x 1sec) + Box::new(check_and_return_mock_response), + ]), + ); + let client = create_client( + port, + Retries { + use_retry_after_header: false, + max_retries: 3, + sleep_for: Duration::from_secs(1), + }, + ); + + // when + println!("Sending request"); + let started = Instant::now(); + let response = client.execute("eth_getAccounts", vec![]).await; + let finished = Instant::now(); + println!("Got response"); + + // then + assert_eq!(response, Ok(Value::String("x".into()))); + assert!(finished - started >= Duration::from_secs(3)); + } + + #[tokio::test] + async fn status_code_5xx_retries_exhausted() { + // given + let port = get_available_port().unwrap(); + let _ = create_server( + port, + return_sequence(vec![ + return_5xx(500), // sleep for 1 second as configured below + return_5xx(502), // sleep for 2 seconds (2x 1sec) + return_5xx(503), + Box::new(check_and_return_mock_response), + ]), + ); + let client = create_client( + port, + Retries { + use_retry_after_header: false, + max_retries: 2, + sleep_for: Duration::from_secs(1), + }, + ); + + // when + println!("Sending request"); + let response = client.execute("eth_getAccounts", vec![]).await; + println!("Got response"); + + // then + assert_eq!(response, Err(crate::Error::Transport(TransportError::Code(503)))); + } + + #[tokio::test] + async fn status_code_5xx_without_retries() { + // given + let port = get_available_port().unwrap(); + let _ = create_server(port, return_sequence(vec![return_5xx(500)])); + let client = create_client( + port, + Retries { + use_retry_after_header: true, + max_retries: 3, + sleep_for: Duration::from_secs(0), + }, + ); + + // when + println!("Sending request"); + let response = client.execute("eth_getAccounts", vec![]).await; + println!("Got response"); + + // then + assert_eq!(response, Err(crate::Error::Transport(TransportError::Code(500)))); + } }