From 098df626d048441eec765a94965bf35e7caf057d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20M=C3=BCller?= Date: Fri, 19 Jul 2024 14:49:46 -0700 Subject: [PATCH] Update hyper dependency to 1.0 This change updates the hyper dependency to 1.0 and bumps other crates as necessary. Unfortunately, because more of the logic was moved out of hyper itself and into various utility crates, the number of dependencies increases further. --- CHANGELOG.md | 1 + Cargo.toml | 10 +++++---- src/client.rs | 57 ++++++++++++++++++++++++++++++++------------------- src/error.rs | 9 +++++++- 4 files changed, 51 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b5267a09..f8c591b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ Unreleased ---------- - Added `weighted_average` member to `data::v2::bars::Bar` type +- Bumped `hyper` dependency to `1.0` - Bumped `websocket-util` dependency to `0.13` - Bumped `tokio-tungstenite` dependency to `0.23` diff --git a/Cargo.toml b/Cargo.toml index 1c0f5779..00e1d5f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,10 +31,12 @@ async-compression = {version = "0.4", default-features = false, optional = true} async-trait = "0.1.51" chrono = {version = "0.4.19", features = ["serde"]} futures = {version = "0.3", default-features = false} -http = {version = "0.2", default-features = false} -http-endpoint = "0.5" -hyper = {version = "0.14", features = ["client", "http1", "stream"]} -hyper-tls = {version = "0.5", default-features = false} +http = {version = "1.1", default-features = false} +http-body-util = {version = "0.1", default-features = false} +http-endpoint = {version = "0.6", default-features = false} +hyper = {version = "1.1", default-features = false, features = ["client", "http1"]} +hyper-util = {version = "0.1.3", default-features = false, features = ["client", "client-legacy", "http1", "tokio"]} +hyper-tls = {version = "0.6", default-features = false} num-decimal = {version = "0.2.4", default-features = false, features = ["num-v04", "serde"]} serde = {version = "1.0.103", features = ["derive"]} serde_json = {version = "1.0", default-features = false, features = ["std"]} diff --git a/src/client.rs b/src/client.rs index d81254bf..56c76251 100644 --- a/src/client.rs +++ b/src/client.rs @@ -13,16 +13,18 @@ use http::HeaderMap; use http::HeaderValue; use http::Request; use http::Response; +use http_body_util::BodyExt; +use http_body_util::Full; use http_endpoint::Endpoint; use hyper::body::Bytes; -use hyper::body::HttpBody as _; -use hyper::client::Builder as HttpClientBuilder; -use hyper::client::HttpConnector; -use hyper::Body; -use hyper::Client as HttpClient; +use hyper::body::Incoming; use hyper::Error as HyperError; use hyper_tls::HttpsConnector; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::client::legacy::Builder as HttpClientBuilder; +use hyper_util::client::legacy::Client as HttpClient; +use hyper_util::rt::TokioExecutor; use tracing::debug; use tracing::field::debug; @@ -69,7 +71,7 @@ impl<'h> Debug for DebugHeaders<'h> { /// A type providing a debug representation of an HTTP request, with /// sensitive data being masked out. struct DebugRequest<'r> { - request: &'r Request, + request: &'r Request>, } impl<'r> Debug for DebugRequest<'r> { @@ -92,7 +94,7 @@ impl<'r> Debug for DebugRequest<'r> { /// Emit a debug representation of an HTTP request. -fn debug_request(request: &Request) -> DebugValue> { +fn debug_request(request: &Request>) -> DebugValue> { debug(DebugRequest { request }) } @@ -131,7 +133,7 @@ impl Default for Builder { // disable idle connections for them. // While at it, also use the minimum number of threads for the // `HttpsConnector`. - let mut builder = HttpClient::builder(); + let mut builder = HttpClient::builder(TokioExecutor::new()); let _ = builder.pool_max_idle_per_host(0); Self { builder } @@ -141,7 +143,7 @@ impl Default for Builder { #[inline] fn default() -> Self { Self { - builder: HttpClient::builder(), + builder: HttpClient::builder(TokioExecutor::new()), } } } @@ -152,7 +154,7 @@ impl Default for Builder { #[derive(Debug)] pub struct Client { api_info: ApiInfo, - client: HttpClient, Body>, + client: HttpClient, Full>, } impl Client { @@ -171,7 +173,7 @@ impl Client { /// Add "gzip" as an accepted encoding to the request. #[cfg(feature = "gzip")] - fn maybe_add_gzip_header(request: &mut Request) { + fn maybe_add_gzip_header(request: &mut Request>) { use http::header::ACCEPT_ENCODING; let _ = request @@ -181,10 +183,10 @@ impl Client { /// An implementation stub not actually doing anything. #[cfg(not(feature = "gzip"))] - fn maybe_add_gzip_header(_request: &mut Request) {} + fn maybe_add_gzip_header(_request: &mut Request>) {} /// Create a `Request` to the endpoint. - fn request(&self, input: &R::Input) -> Result, R::Error> + fn request(&self, input: &R::Input) -> Result>, R::Error> where R: Endpoint, { @@ -195,21 +197,26 @@ impl Client { url.set_path(&R::path(input)); url.set_query(R::query(input)?.as_ref().map(AsRef::as_ref)); + let body = match R::body(input)? { + None => Bytes::new(), + Some(Cow::Borrowed(slice)) => Bytes::from(slice), + Some(Cow::Owned(vec)) => Bytes::from(vec), + }; + let mut request = HttpRequestBuilder::new() .method(R::method()) .uri(url.as_str()) // Add required authentication information. .header(HDR_KEY_ID, self.api_info.key_id.as_str()) .header(HDR_SECRET, self.api_info.secret.as_str()) - .body(Body::from( - R::body(input)?.unwrap_or(Cow::Borrowed(&[0; 0])), - ))?; + .body(Full::new(body))?; + Self::maybe_add_gzip_header(&mut request); Ok(request) } - async fn retrieve_raw_body(response: Body) -> Result { + async fn retrieve_raw_body(response: Incoming) -> Result { // We unconditionally wait for the full body to be received // before even evaluating the header. That is mostly done for // simplicity and it shouldn't really matter anyway because most @@ -219,13 +226,18 @@ impl Client { // to cause trouble: when we receive, for example, the // list of all orders it now needs to be stored in memory // in its entirety. That may blow things. - Ok(response.collect().await?.to_bytes()) + let bytes = BodyExt::collect(response) + .await + // SANITY: The operation is infallible. + .unwrap() + .to_bytes(); + Ok(bytes) } /// Retrieve the HTTP body, possible uncompressing it if it was gzip /// encoded. #[cfg(feature = "gzip")] - async fn retrieve_body(response: Response) -> Result> { + async fn retrieve_body(response: Response) -> Result> { use async_compression::futures::bufread::GzipDecoder; use futures::AsyncReadExt as _; use http::header::CONTENT_ENCODING; @@ -248,7 +260,7 @@ impl Client { /// Retrieve the HTTP body. #[cfg(not(feature = "gzip"))] - async fn retrieve_body(response: Response) -> Result> { + async fn retrieve_body(response: Response) -> Result> { let bytes = Self::retrieve_raw_body(response.into_body()).await?; Ok(bytes) } @@ -276,7 +288,10 @@ impl Client { /// Issue a request. #[allow(clippy::cognitive_complexity)] - async fn issue_(&self, request: Request) -> Result> + async fn issue_( + &self, + request: Request>, + ) -> Result> where R: Endpoint, { diff --git a/src/error.rs b/src/error.rs index b89046fe..493e7d61 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019-2023 The apca Developers +// Copyright (C) 2019-2024 The apca Developers // SPDX-License-Identifier: GPL-3.0-or-later use std::fmt::Debug; @@ -32,6 +32,13 @@ pub enum RequestError { #[source] HyperError, ), + /// An error reported by the `hyper-util` crate. + #[error("the hyper-util crate reported an error")] + HyperUtil( + #[from] + #[source] + hyper_util::client::legacy::Error, + ), /// An error reported while reading data. #[error("failed to read data")] Io(