Skip to content

Commit

Permalink
Update hyper dependency to 1.0
Browse files Browse the repository at this point in the history
This change updates the hyper dependency to 1.0 and bumps other crates
as necessary.
  • Loading branch information
d-e-s-o committed Jul 19, 2024
1 parent 7382030 commit 74a33a5
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Unreleased
----------
- Added `weighted_average` member to `data::v2::bars::Bar` type
- Bumped `hyper` dependency to `1.0`
- Bumped `websocket-util` dependency to `0.13`
- Bumped `tokio-tungstenite` dependency to `0.23`

Expand Down
10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ async-compression = {version = "0.4", default-features = false, optional = true}
async-trait = "0.1.51"
chrono = {version = "0.4.19", features = ["serde"]}
futures = {version = "0.3", default-features = false}
http = {version = "0.2", default-features = false}
http-endpoint = "0.5"
hyper = {version = "0.14", features = ["client", "http1", "stream"]}
hyper-tls = {version = "0.5", default-features = false}
http = {version = "1.1", default-features = false}
http-body-util = {version = "0.1", default-features = false}
http-endpoint = {version = "0.6", default-features = false}
hyper = {version = "1.1", default-features = false, features = ["client", "http1"]}
hyper-util = {version = "0.1.3", default-features = false, features = ["client", "client-legacy", "http1", "tokio"]}
hyper-tls = {version = "0.6", default-features = false}
num-decimal = {version = "0.2.4", default-features = false, features = ["num-v04", "serde"]}
serde = {version = "1.0.103", features = ["derive"]}
serde_json = {version = "1.0", default-features = false, features = ["std"]}
Expand Down
57 changes: 36 additions & 21 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ use http::HeaderMap;
use http::HeaderValue;
use http::Request;
use http::Response;
use http_body_util::BodyExt;
use http_body_util::Full;
use http_endpoint::Endpoint;

use hyper::body::Bytes;
use hyper::body::HttpBody as _;
use hyper::client::Builder as HttpClientBuilder;
use hyper::client::HttpConnector;
use hyper::Body;
use hyper::Client as HttpClient;
use hyper::body::Incoming;
use hyper::Error as HyperError;
use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Builder as HttpClientBuilder;
use hyper_util::client::legacy::Client as HttpClient;
use hyper_util::rt::TokioExecutor;

use tracing::debug;
use tracing::field::debug;
Expand Down Expand Up @@ -69,7 +71,7 @@ impl<'h> Debug for DebugHeaders<'h> {
/// A type providing a debug representation of an HTTP request, with
/// sensitive data being masked out.
struct DebugRequest<'r> {
request: &'r Request<Body>,
request: &'r Request<Full<Bytes>>,
}

impl<'r> Debug for DebugRequest<'r> {
Expand All @@ -92,7 +94,7 @@ impl<'r> Debug for DebugRequest<'r> {


/// Emit a debug representation of an HTTP request.
fn debug_request(request: &Request<Body>) -> DebugValue<DebugRequest<'_>> {
fn debug_request(request: &Request<Full<Bytes>>) -> DebugValue<DebugRequest<'_>> {
debug(DebugRequest { request })
}

Expand Down Expand Up @@ -131,7 +133,7 @@ impl Default for Builder {
// disable idle connections for them.
// While at it, also use the minimum number of threads for the
// `HttpsConnector`.
let mut builder = HttpClient::builder();
let mut builder = HttpClient::builder(TokioExecutor::new());
let _ = builder.pool_max_idle_per_host(0);

Self { builder }
Expand All @@ -141,7 +143,7 @@ impl Default for Builder {
#[inline]
fn default() -> Self {
Self {
builder: HttpClient::builder(),
builder: HttpClient::builder(TokioExecutor::new()),
}
}
}
Expand All @@ -152,7 +154,7 @@ impl Default for Builder {
#[derive(Debug)]
pub struct Client {
api_info: ApiInfo,
client: HttpClient<HttpsConnector<HttpConnector>, Body>,
client: HttpClient<HttpsConnector<HttpConnector>, Full<Bytes>>,
}

impl Client {
Expand All @@ -171,7 +173,7 @@ impl Client {

/// Add "gzip" as an accepted encoding to the request.
#[cfg(feature = "gzip")]
fn maybe_add_gzip_header(request: &mut Request<Body>) {
fn maybe_add_gzip_header(request: &mut Request<Full<Bytes>>) {
use http::header::ACCEPT_ENCODING;

let _ = request
Expand All @@ -181,10 +183,10 @@ impl Client {

/// An implementation stub not actually doing anything.
#[cfg(not(feature = "gzip"))]
fn maybe_add_gzip_header(_request: &mut Request<Body>) {}
fn maybe_add_gzip_header(_request: &mut Request<Full<Bytes>>) {}

/// Create a `Request` to the endpoint.
fn request<R>(&self, input: &R::Input) -> Result<Request<Body>, R::Error>
fn request<R>(&self, input: &R::Input) -> Result<Request<Full<Bytes>>, R::Error>
where
R: Endpoint,
{
Expand All @@ -195,21 +197,26 @@ impl Client {
url.set_path(&R::path(input));
url.set_query(R::query(input)?.as_ref().map(AsRef::as_ref));

let body = match R::body(input)? {
None => Bytes::new(),
Some(Cow::Borrowed(slice)) => Bytes::from(slice),
Some(Cow::Owned(vec)) => Bytes::from(vec),
};

let mut request = HttpRequestBuilder::new()
.method(R::method())
.uri(url.as_str())
// Add required authentication information.
.header(HDR_KEY_ID, self.api_info.key_id.as_str())
.header(HDR_SECRET, self.api_info.secret.as_str())
.body(Body::from(
R::body(input)?.unwrap_or(Cow::Borrowed(&[0; 0])),
))?;
.body(Full::new(body))?;


Self::maybe_add_gzip_header(&mut request);
Ok(request)
}

async fn retrieve_raw_body(response: Body) -> Result<Bytes, HyperError> {
async fn retrieve_raw_body(response: Incoming) -> Result<Bytes, HyperError> {
// We unconditionally wait for the full body to be received
// before even evaluating the header. That is mostly done for
// simplicity and it shouldn't really matter anyway because most
Expand All @@ -219,13 +226,18 @@ impl Client {
// to cause trouble: when we receive, for example, the
// list of all orders it now needs to be stored in memory
// in its entirety. That may blow things.
Ok(response.collect().await?.to_bytes())
let bytes = BodyExt::collect(response)
.await
// SANITY: The operation is infallible.
.unwrap()
.to_bytes();
Ok(bytes)
}

/// Retrieve the HTTP body, possible uncompressing it if it was gzip
/// encoded.
#[cfg(feature = "gzip")]
async fn retrieve_body<E>(response: Response<Body>) -> Result<Bytes, RequestError<E>> {
async fn retrieve_body<E>(response: Response<Incoming>) -> Result<Bytes, RequestError<E>> {
use async_compression::futures::bufread::GzipDecoder;
use futures::AsyncReadExt as _;
use http::header::CONTENT_ENCODING;
Expand All @@ -248,7 +260,7 @@ impl Client {

/// Retrieve the HTTP body.
#[cfg(not(feature = "gzip"))]
async fn retrieve_body<E>(response: Response<Body>) -> Result<Bytes, RequestError<E>> {
async fn retrieve_body<E>(response: Response<Incoming>) -> Result<Bytes, RequestError<E>> {
let bytes = Self::retrieve_raw_body(response.into_body()).await?;
Ok(bytes)
}
Expand Down Expand Up @@ -276,7 +288,10 @@ impl Client {

/// Issue a request.
#[allow(clippy::cognitive_complexity)]
async fn issue_<R>(&self, request: Request<Body>) -> Result<R::Output, RequestError<R::Error>>
async fn issue_<R>(
&self,
request: Request<Full<Bytes>>,
) -> Result<R::Output, RequestError<R::Error>>
where
R: Endpoint,
{
Expand Down
9 changes: 8 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2019-2023 The apca Developers
// Copyright (C) 2019-2024 The apca Developers
// SPDX-License-Identifier: GPL-3.0-or-later

use std::fmt::Debug;
Expand Down Expand Up @@ -32,6 +32,13 @@ pub enum RequestError<E> {
#[source]
HyperError,
),
/// An error reported by the `hyper-util` crate.
#[error("the hyper-util crate reported an error")]
HyperUtil(
#[from]
#[source]
hyper_util::client::legacy::Error,
),
/// An error reported while reading data.
#[error("failed to read data")]
Io(
Expand Down

0 comments on commit 74a33a5

Please sign in to comment.