From 9dc1299f8f419e467ff271cd0bc39a566ec71573 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 11 May 2023 22:45:22 +0100 Subject: [PATCH 1/3] Consistently use GCP XML API --- object_store/Cargo.toml | 2 +- object_store/src/aws/client.rs | 69 +------------ object_store/src/aws/mod.rs | 57 ++--------- object_store/src/azure/mod.rs | 61 ++---------- object_store/src/client/header.rs | 83 ++++++++++++++++ object_store/src/client/list.rs | 85 ++++++++++++++++ object_store/src/client/mod.rs | 6 ++ object_store/src/gcp/mod.rs | 155 ++++++++++-------------------- 8 files changed, 243 insertions(+), 275 deletions(-) create mode 100644 object_store/src/client/header.rs create mode 100644 object_store/src/client/list.rs diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index b27482bcfabc..1e16a7823e47 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -67,7 +67,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-ut nix = "0.26.1" [features] -cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"] +cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] azure = ["cloud"] gcp = ["cloud", "rustls-pemfile"] aws = ["cloud"] diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 9634c740d01d..769f474bb1ff 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -18,19 +18,19 @@ use crate::aws::checksum::Checksum; use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider}; use crate::aws::STRICT_PATH_ENCODE_SET; +use crate::client::list::ListResponse; use crate::client::pagination::stream_paginated; use crate::client::retry::RetryExt; use crate::multipart::UploadPart; use crate::path::DELIMITER; use crate::util::{format_http_range, format_prefix}; use crate::{ - BoxStream, ClientOptions, ListResult, MultipartId, ObjectMeta, Path, Result, - RetryConfig, StreamExt, + BoxStream, ClientOptions, ListResult, MultipartId, Path, Result, RetryConfig, + StreamExt, }; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::{Buf, Bytes}; -use chrono::{DateTime, Utc}; use percent_encoding::{utf8_percent_encode, PercentEncode}; use reqwest::{ header::CONTENT_TYPE, Client as ReqwestClient, Method, Response, StatusCode, @@ -118,69 +118,6 @@ impl From for crate::Error { } } -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct ListResponse { - #[serde(default)] - pub contents: Vec, - #[serde(default)] - pub common_prefixes: Vec, - #[serde(default)] - pub next_continuation_token: Option, -} - -impl TryFrom for ListResult { - type Error = crate::Error; - - fn try_from(value: ListResponse) -> Result { - let common_prefixes = value - .common_prefixes - .into_iter() - .map(|x| Ok(Path::parse(x.prefix)?)) - .collect::>()?; - - let objects = value - .contents - .into_iter() - .map(TryFrom::try_from) - .collect::>()?; - - Ok(Self { - common_prefixes, - objects, - }) - } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct ListPrefix { - pub prefix: String, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct ListContents { - pub key: String, - pub size: usize, - pub last_modified: DateTime, - #[serde(rename = "ETag")] - pub e_tag: Option, -} - -impl TryFrom for ObjectMeta { - type Error = crate::Error; - - fn try_from(value: ListContents) -> Result { - Ok(Self { - location: Path::parse(value.key)?, - last_modified: value.last_modified, - size: value.size, - e_tag: value.e_tag, - }) - } -} - #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] struct InitiateMultipart { diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 17d779ff6a51..1ff96e2a8be4 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -33,7 +33,6 @@ use async_trait::async_trait; use bytes::Bytes; -use chrono::{DateTime, Utc}; use futures::stream::BoxStream; use futures::TryStreamExt; use itertools::Itertools; @@ -53,6 +52,7 @@ use crate::aws::credential::{ AwsCredential, CredentialProvider, InstanceCredentialProvider, StaticCredentialProvider, WebIdentityProvider, }; +use crate::client::header::header_meta; use crate::client::ClientConfigKey; use crate::config::ConfigValue; use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart}; @@ -86,24 +86,6 @@ static METADATA_ENDPOINT: &str = "http://169.254.169.254"; #[derive(Debug, Snafu)] #[allow(missing_docs)] enum Error { - #[snafu(display("Last-Modified Header missing from response"))] - MissingLastModified, - - #[snafu(display("Content-Length Header missing from response"))] - MissingContentLength, - - #[snafu(display("Invalid last modified '{}': {}", last_modified, source))] - InvalidLastModified { - last_modified: String, - source: chrono::ParseError, - }, - - #[snafu(display("Invalid content length '{}': {}", content_length, source))] - InvalidContentLength { - content_length: String, - source: std::num::ParseIntError, - }, - #[snafu(display("Missing region"))] MissingRegion, @@ -154,6 +136,11 @@ enum Error { #[snafu(display("Failed to parse the region for bucket '{}'", bucket))] RegionParse { bucket: String }, + + #[snafu(display("Failed to parse headers: {}", source))] + Header { + source: crate::client::header::Error, + }, } impl From for super::Error { @@ -274,40 +261,10 @@ impl ObjectStore for AmazonS3 { } async fn head(&self, location: &Path) -> Result { - use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; - // Extract meta from headers // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax let response = self.client.get_request(location, None, true).await?; - let headers = response.headers(); - - let last_modified = headers - .get(LAST_MODIFIED) - .context(MissingLastModifiedSnafu)?; - - let content_length = headers - .get(CONTENT_LENGTH) - .context(MissingContentLengthSnafu)?; - - let last_modified = last_modified.to_str().context(BadHeaderSnafu)?; - let last_modified = DateTime::parse_from_rfc2822(last_modified) - .context(InvalidLastModifiedSnafu { last_modified })? - .with_timezone(&Utc); - - let content_length = content_length.to_str().context(BadHeaderSnafu)?; - let content_length = content_length - .parse() - .context(InvalidContentLengthSnafu { content_length })?; - - let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?; - let e_tag = e_tag.to_str().context(BadHeaderSnafu)?; - - Ok(ObjectMeta { - location: location.clone(), - last_modified, - size: content_length, - e_tag: Some(e_tag.to_string()), - }) + Ok(header_meta(location, response.headers()).context(HeaderSnafu)?) } async fn delete(&self, location: &Path) -> Result<()> { diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index c2cfdfe6af32..02591f4a2a5f 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -38,7 +38,6 @@ use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::Bytes; -use chrono::{TimeZone, Utc}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use percent_encoding::percent_decode_str; use serde::{Deserialize, Serialize}; @@ -51,9 +50,9 @@ use std::{collections::BTreeSet, str::FromStr}; use tokio::io::AsyncWrite; use url::Url; +use crate::client::header::header_meta; use crate::client::ClientConfigKey; use crate::config::ConfigValue; -use crate::util::RFC1123_FMT; pub use credential::authority_hosts; mod client; @@ -74,24 +73,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT"; #[derive(Debug, Snafu)] #[allow(missing_docs)] enum Error { - #[snafu(display("Last-Modified Header missing from response"))] - MissingLastModified, - - #[snafu(display("Content-Length Header missing from response"))] - MissingContentLength, - - #[snafu(display("Invalid last modified '{}': {}", last_modified, source))] - InvalidLastModified { - last_modified: String, - source: chrono::ParseError, - }, - - #[snafu(display("Invalid content length '{}': {}", content_length, source))] - InvalidContentLength { - content_length: String, - source: std::num::ParseIntError, - }, - #[snafu(display("Received header containing non-ASCII data"))] BadHeader { source: reqwest::header::ToStrError }, @@ -145,6 +126,11 @@ enum Error { #[snafu(display("ETag Header missing from response"))] MissingEtag, + + #[snafu(display("Failed to parse headers: {}", source))] + Header { + source: crate::client::header::Error, + }, } impl From for super::Error { @@ -237,43 +223,10 @@ impl ObjectStore for MicrosoftAzure { } async fn head(&self, location: &Path) -> Result { - use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; - // Extract meta from headers // https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties let response = self.client.get_request(location, None, true).await?; - let headers = response.headers(); - - let last_modified = headers - .get(LAST_MODIFIED) - .ok_or(Error::MissingLastModified)? - .to_str() - .context(BadHeaderSnafu)?; - let last_modified = Utc - .datetime_from_str(last_modified, RFC1123_FMT) - .context(InvalidLastModifiedSnafu { last_modified })?; - - let content_length = headers - .get(CONTENT_LENGTH) - .ok_or(Error::MissingContentLength)? - .to_str() - .context(BadHeaderSnafu)?; - let content_length = content_length - .parse() - .context(InvalidContentLengthSnafu { content_length })?; - - let e_tag = headers - .get(ETAG) - .ok_or(Error::MissingEtag)? - .to_str() - .context(BadHeaderSnafu)?; - - Ok(ObjectMeta { - location: location.clone(), - last_modified, - size: content_length, - e_tag: Some(e_tag.to_string()), - }) + Ok(header_meta(location, response.headers()).context(HeaderSnafu)?) } async fn delete(&self, location: &Path) -> Result<()> { diff --git a/object_store/src/client/header.rs b/object_store/src/client/header.rs new file mode 100644 index 000000000000..cc4f16eaa599 --- /dev/null +++ b/object_store/src/client/header.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Logic for extracting ObjectMeta from headers used by AWS, GCP and Azure + +use crate::path::Path; +use crate::ObjectMeta; +use chrono::{DateTime, Utc}; +use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; +use hyper::HeaderMap; +use snafu::{OptionExt, ResultExt, Snafu}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("ETag Header missing from response"))] + MissingEtag, + + #[snafu(display("Received header containing non-ASCII data"))] + BadHeader { source: reqwest::header::ToStrError }, + + #[snafu(display("Last-Modified Header missing from response"))] + MissingLastModified, + + #[snafu(display("Content-Length Header missing from response"))] + MissingContentLength, + + #[snafu(display("Invalid last modified '{}': {}", last_modified, source))] + InvalidLastModified { + last_modified: String, + source: chrono::ParseError, + }, + + #[snafu(display("Invalid content length '{}': {}", content_length, source))] + InvalidContentLength { + content_length: String, + source: std::num::ParseIntError, + }, +} + +/// Extracts [`ObjectMeta`] from the provided [`HeaderMap`] +pub fn header_meta(location: &Path, headers: &HeaderMap) -> Result { + let last_modified = headers + .get(LAST_MODIFIED) + .context(MissingLastModifiedSnafu)?; + + let content_length = headers + .get(CONTENT_LENGTH) + .context(MissingContentLengthSnafu)?; + + let last_modified = last_modified.to_str().context(BadHeaderSnafu)?; + let last_modified = DateTime::parse_from_rfc2822(last_modified) + .context(InvalidLastModifiedSnafu { last_modified })? + .with_timezone(&Utc); + + let content_length = content_length.to_str().context(BadHeaderSnafu)?; + let content_length = content_length + .parse() + .context(InvalidContentLengthSnafu { content_length })?; + + let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?; + let e_tag = e_tag.to_str().context(BadHeaderSnafu)?; + + Ok(ObjectMeta { + location: location.clone(), + last_modified, + size: content_length, + e_tag: Some(e_tag.to_string()), + }) +} diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs new file mode 100644 index 000000000000..6a3889e3be5b --- /dev/null +++ b/object_store/src/client/list.rs @@ -0,0 +1,85 @@ +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The list response format used by GCP and AWS + +use crate::path::Path; +use crate::{ListResult, ObjectMeta, Result}; +use chrono::{DateTime, Utc}; +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListResponse { + #[serde(default)] + pub contents: Vec, + #[serde(default)] + pub common_prefixes: Vec, + #[serde(default)] + pub next_continuation_token: Option, +} + +impl TryFrom for ListResult { + type Error = crate::Error; + + fn try_from(value: ListResponse) -> Result { + let common_prefixes = value + .common_prefixes + .into_iter() + .map(|x| Ok(Path::parse(x.prefix)?)) + .collect::>()?; + + let objects = value + .contents + .into_iter() + .map(TryFrom::try_from) + .collect::>()?; + + Ok(Self { + common_prefixes, + objects, + }) + } +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListPrefix { + pub prefix: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListContents { + pub key: String, + pub size: usize, + pub last_modified: DateTime, + #[serde(rename = "ETag")] + pub e_tag: Option, +} + +impl TryFrom for ObjectMeta { + type Error = crate::Error; + + fn try_from(value: ListContents) -> Result { + Ok(Self { + location: Path::parse(value.key)?, + last_modified: value.last_modified, + size: value.size, + e_tag: value.e_tag, + }) + } +} diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index d2242dd41089..d1090fd2cdae 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -26,6 +26,12 @@ pub mod retry; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] pub mod token; +#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] +pub mod header; + +#[cfg(any(feature = "aws", feature = "gcp"))] +pub mod list; + use crate::config::ConfigValue; use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::{Client, ClientBuilder, Proxy}; diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 375b4d8f8c37..3577a9e7ff24 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -37,9 +37,8 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::{Buf, Bytes}; -use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use percent_encoding::{percent_encode, NON_ALPHANUMERIC}; +use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; use reqwest::header::RANGE; use reqwest::{header, Client, Method, Response, StatusCode}; use serde::{Deserialize, Serialize}; @@ -47,6 +46,8 @@ use snafu::{OptionExt, ResultExt, Snafu}; use tokio::io::AsyncWrite; use url::Url; +use crate::client::header::header_meta; +use crate::client::list::ListResponse; use crate::client::pagination::stream_paginated; use crate::client::retry::RetryExt; use crate::client::ClientConfigKey; @@ -82,6 +83,9 @@ enum Error { #[snafu(display("Error getting list response body: {}", source))] ListResponseBody { source: reqwest::Error }, + #[snafu(display("Got invalid list response: {}", source))] + InvalidListResponse { source: quick_xml::de::DeError }, + #[snafu(display("Error performing get request {}: {}", path, source))] GetRequest { source: crate::client::retry::Error, @@ -152,6 +156,11 @@ enum Error { #[snafu(display("Configuration key: '{}' is not known.", key))] UnknownConfigurationKey { key: String }, + + #[snafu(display("Failed to parse headers: {}", source))] + Header { + source: crate::client::header::Error, + }, } impl From for super::Error { @@ -182,25 +191,6 @@ impl From for super::Error { } } -#[derive(serde::Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -struct ListResponse { - next_page_token: Option, - #[serde(default)] - prefixes: Vec, - #[serde(default)] - items: Vec, -} - -#[derive(serde::Deserialize, Debug)] -struct Object { - name: String, - size: String, - updated: DateTime, - #[serde(rename = "etag")] - e_tag: Option, -} - #[derive(serde::Deserialize, Debug)] #[serde(rename_all = "PascalCase")] struct InitiateMultipartUploadResult { @@ -268,15 +258,11 @@ impl GoogleCloudStorageClient { } fn object_url(&self, path: &Path) -> String { - let encoded = - percent_encoding::utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC); - format!( - "{}/storage/v1/b/{}/o/{}", - self.base_url, self.bucket_name_encoded, encoded - ) + let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC); + format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, encoded) } - /// Perform a get request + /// Perform a get request async fn get_request( &self, path: &Path, @@ -286,20 +272,19 @@ impl GoogleCloudStorageClient { let token = self.get_token().await?; let url = self.object_url(path); - let mut builder = self.client.request(Method::GET, url); + let method = match head { + true => Method::HEAD, + false => Method::GET, + }; + + let mut builder = self.client.request(method, url); if let Some(range) = range { builder = builder.header(RANGE, format_http_range(range)); } - let alt = match head { - true => "json", - false => "media", - }; - let response = builder .bearer_auth(token) - .query(&[("alt", alt)]) .send_retry(&self.retry_config) .await .context(GetRequestSnafu { @@ -309,13 +294,10 @@ impl GoogleCloudStorageClient { Ok(response) } - /// Perform a put request + /// Perform a put request async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> { let token = self.get_token().await?; - let url = format!( - "{}/upload/storage/v1/b/{}/o", - self.base_url, self.bucket_name_encoded - ); + let url = self.object_url(path); let content_type = self .client_options @@ -323,11 +305,10 @@ impl GoogleCloudStorageClient { .unwrap_or("application/octet-stream"); self.client - .request(Method::POST, url) + .request(Method::PUT, url) .bearer_auth(token) .header(header::CONTENT_TYPE, content_type) .header(header::CONTENT_LENGTH, payload.len()) - .query(&[("uploadType", "media"), ("name", path.as_ref())]) .body(payload) .send_retry(&self.retry_config) .await @@ -392,7 +373,7 @@ impl GoogleCloudStorageClient { Ok(()) } - /// Perform a delete request + /// Perform a delete request async fn delete_request(&self, path: &Path) -> Result<()> { let token = self.get_token().await?; let url = self.object_url(path); @@ -409,7 +390,7 @@ impl GoogleCloudStorageClient { Ok(()) } - /// Perform a copy request + /// Perform a copy request async fn copy_request( &self, from: &Path, @@ -417,24 +398,18 @@ impl GoogleCloudStorageClient { if_not_exists: bool, ) -> Result<()> { let token = self.get_token().await?; + let url = self.object_url(to); - let source = - percent_encoding::utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC); - let destination = - percent_encoding::utf8_percent_encode(to.as_ref(), NON_ALPHANUMERIC); - let url = format!( - "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}", - self.base_url, - self.bucket_name_encoded, - source, - self.bucket_name_encoded, - destination - ); + let from = utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC); + let source = format!("{}/{}", self.bucket_name_encoded, from); - let mut builder = self.client.request(Method::POST, url); + let mut builder = self + .client + .request(Method::PUT, url) + .header("x-goog-copy-source", source); if if_not_exists { - builder = builder.query(&[("ifGenerationMatch", "0")]); + builder = builder.header("x-goog-if-generation-match", 0); } builder @@ -465,7 +440,7 @@ impl GoogleCloudStorageClient { Ok(()) } - /// Perform a list request + /// Perform a list request async fn list_request( &self, prefix: Option<&str>, @@ -473,13 +448,10 @@ impl GoogleCloudStorageClient { page_token: Option<&str>, ) -> Result { let token = self.get_token().await?; + let url = format!("{}/{}", self.base_url, self.bucket_name_encoded); - let url = format!( - "{}/storage/v1/b/{}/o", - self.base_url, self.bucket_name_encoded - ); - - let mut query = Vec::with_capacity(4); + let mut query = Vec::with_capacity(5); + query.push(("list-type", "2")); if delimiter { query.push(("delimiter", DELIMITER)) } @@ -489,14 +461,14 @@ impl GoogleCloudStorageClient { } if let Some(page_token) = page_token { - query.push(("pageToken", page_token)) + query.push(("continuation-token", page_token)) } if let Some(max_results) = &self.max_list_results { - query.push(("maxResults", max_results)) + query.push(("max-keys", max_results)) } - let response: ListResponse = self + let response = self .client .request(Method::GET, url) .query(&query) @@ -504,10 +476,13 @@ impl GoogleCloudStorageClient { .send_retry(&self.retry_config) .await .context(ListRequestSnafu)? - .json() + .bytes() .await .context(ListResponseBodySnafu)?; + let response: ListResponse = quick_xml::de::from_reader(response.reader()) + .context(InvalidListResponseSnafu)?; + Ok(response) } @@ -516,14 +491,14 @@ impl GoogleCloudStorageClient { &self, prefix: Option<&Path>, delimiter: bool, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'_, Result> { let prefix = format_prefix(prefix); stream_paginated(prefix, move |prefix, token| async move { let mut r = self .list_request(prefix.as_deref(), delimiter, token.as_deref()) .await?; - let next_token = r.next_page_token.take(); - Ok((r, prefix, next_token)) + let next_token = r.next_continuation_token.take(); + Ok((r.try_into()?, prefix, next_token)) }) .boxed() } @@ -692,10 +667,7 @@ impl ObjectStore for GoogleCloudStorage { async fn head(&self, location: &Path) -> Result { let response = self.client.get_request(location, None, true).await?; - let object = response.json().await.context(GetResponseBodySnafu { - path: location.as_ref(), - })?; - convert_object_meta(&object) + Ok(header_meta(location, response.headers()).context(HeaderSnafu)?) } async fn delete(&self, location: &Path) -> Result<()> { @@ -709,11 +681,7 @@ impl ObjectStore for GoogleCloudStorage { let stream = self .client .list_paginated(prefix, false) - .map_ok(|r| { - futures::stream::iter( - r.items.into_iter().map(|x| convert_object_meta(&x)), - ) - }) + .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) .try_flatten() .boxed(); @@ -728,15 +696,8 @@ impl ObjectStore for GoogleCloudStorage { while let Some(result) = stream.next().await { let response = result?; - - for p in response.prefixes { - common_prefixes.insert(Path::parse(p)?); - } - - objects.reserve(response.items.len()); - for object in &response.items { - objects.push(convert_object_meta(object)?); - } + common_prefixes.extend(response.common_prefixes.into_iter()); + objects.extend(response.objects.into_iter()); } Ok(ListResult { @@ -1202,20 +1163,6 @@ impl GoogleCloudStorageBuilder { } } -fn convert_object_meta(object: &Object) -> Result { - let location = Path::parse(&object.name)?; - let last_modified = object.updated; - let size = object.size.parse().context(InvalidSizeSnafu)?; - let e_tag = object.e_tag.clone(); - - Ok(ObjectMeta { - location, - last_modified, - size, - e_tag, - }) -} - #[cfg(test)] mod test { use bytes::Bytes; From f64c35ae4f8eb02b6335af64bf5f9463f674408e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 12 May 2023 17:12:07 +0100 Subject: [PATCH 2/3] Use updated fake-gcs-server --- .github/workflows/object_store.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 65c78df18466..fcd1c606db6b 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -96,7 +96,7 @@ jobs: - name: Configure Fake GCS Server (GCP emulation) run: | - docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http + docker run -d -p 4443:4443 tustvold/fake-gcs-server -scheme http -backend memory -public-host localhost:4443 # Give the container a moment to start up prior to configuring it sleep 1 curl -v -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b" From b0ef4fdbe112f4e5cbcefe22561f24d6f0620817 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 15 May 2023 17:57:15 +0100 Subject: [PATCH 3/3] Review feedback --- .github/workflows/object_store.yml | 1 + object_store/CONTRIBUTING.md | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index fcd1c606db6b..df43ae3bf76a 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -95,6 +95,7 @@ jobs: - uses: actions/checkout@v3 - name: Configure Fake GCS Server (GCP emulation) + # Custom image - see fsouza/fake-gcs-server#1164 run: | docker run -d -p 4443:4443 tustvold/fake-gcs-server -scheme http -backend memory -public-host localhost:4443 # Give the container a moment to start up prior to configuring it diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md index 550640d931b4..47c294022659 100644 --- a/object_store/CONTRIBUTING.md +++ b/object_store/CONTRIBUTING.md @@ -103,7 +103,7 @@ To test the GCS integration, we use [Fake GCS Server](https://github.com/fsouza/ Startup the fake server: ```shell -docker run -p 4443:4443 fsouza/fake-gcs-server -scheme http +docker run -p 4443:4443 tustvold/fake-gcs-server -scheme http ``` Configure the account: