Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable compression of docs stored on S3 #944

Merged
merged 3 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions modules/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bytes = { workspace = true }
clap = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
http = "0.2" # workspace version conflicts with rust-s3 0.35
log = { workspace = true }
rust-s3 = { workspace = true }
strum = { workspace = true, features = ["derive"] }
Expand Down
21 changes: 20 additions & 1 deletion modules/storage/src/service/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::task::{Context, Poll};
use tokio::io;
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufReader, ReadBuf};

#[derive(Copy, Clone, Eq, PartialEq, Default, Debug, strum::EnumIter, clap::ValueEnum)]
#[derive(
Copy, Clone, Eq, PartialEq, Default, Debug, strum::EnumIter, strum::EnumString, clap::ValueEnum,
)]
pub enum Compression {
#[default]
None,
Expand All @@ -30,6 +32,23 @@ impl Compression {
}
}

pub fn content_encoding(&self) -> &'static str {
match self {
Self::None => "none",
Self::Zstd => "zstd",
}
}

pub async fn compress<R>(&self, r: R) -> Box<dyn AsyncRead + Unpin>
where
R: AsyncBufRead + Unpin + 'static,
{
match self {
Self::None => Box::new(r),
Self::Zstd => Box::new(bufread::ZstdEncoder::new(r)),
}
}

pub async fn write<R, W>(&self, r: &mut R, w: &mut W) -> io::Result<u64>
where
R: AsyncBufRead + Unpin + 'static,
Expand Down
18 changes: 8 additions & 10 deletions modules/storage/src/service/fs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::temp::TempFile;
use crate::service::{
compression::Compression, StorageBackend, StorageKey, StorageResult, StoreError,
compression::Compression, temp::TempFile, StorageBackend, StorageKey, StorageResult, StoreError,
};
use anyhow::Context;
use bytes::Bytes;
Expand All @@ -15,7 +14,7 @@ use strum::IntoEnumIterator;
use tempfile::{tempdir, TempDir};
use tokio::{
fs::{create_dir_all, File},
io::{AsyncWriteExt, BufReader},
io::AsyncWriteExt,
};
use tokio_util::io::ReaderStream;

Expand Down Expand Up @@ -105,29 +104,28 @@ impl StorageBackend for FileSystemBackend {
S: Stream<Item = Result<Bytes, E>>,
{
let stream = pin!(stream);
let mut file = TempFile::new(stream).await.map_err(StoreError::Backend)?;
let mut source = BufReader::new(file.reader().await.map_err(StoreError::Backend)?);
let mut file = TempFile::new(stream).await?;
let mut source = file.reader().await?;

let result = file.result();
let key = result.key().to_string();

// create the target path

let target = level_dir(&self.content, &key, NUM_LEVELS);
create_dir_all(&target).await.map_err(StoreError::Backend)?;
create_dir_all(&target).await?;
let mut target = target.join(&key);
target.set_extension(self.write_compression.extension());

let mut target = File::create(target).await.map_err(StoreError::Backend)?;
let mut target = File::create(target).await?;
self.write_compression
.write(&mut source, &mut target)
.await
.map_err(StoreError::Backend)?;
.await?;

// ensure we have all bytes on disk for the target file,
// then close it

target.flush().await.map_err(StoreError::Backend)?;
target.flush().await?;
drop(target);

// the content is at the right place, close (destroy) the temp file
Expand Down
6 changes: 6 additions & 0 deletions modules/storage/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ pub enum StoreError<S: Debug, B: Debug> {
Backend(#[source] B),
}

impl<E: Debug> From<std::io::Error> for StoreError<E, std::io::Error> {
fn from(e: std::io::Error) -> Self {
StoreError::Backend(e)
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct StorageKey(String);

Expand Down
75 changes: 59 additions & 16 deletions modules/storage/src/service/s3.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use crate::{
config::S3Config,
service::{temp::TempFile, StorageBackend, StorageKey, StorageResult, StoreError},
service::{
compression::Compression, temp::TempFile, StorageBackend, StorageKey, StorageResult,
StoreError,
},
};
use bytes::Bytes;
use futures::Stream;
use futures::{Stream, TryStreamExt};
use http::{header::CONTENT_ENCODING, HeaderMap, HeaderValue};
use s3::{creds::Credentials, error::S3Error, Bucket};
use std::{fmt::Debug, pin::pin};
use std::{fmt::Debug, io, pin::pin, str::FromStr};
use tokio_util::io::{ReaderStream, StreamReader};

#[derive(Clone, Debug)]
pub struct S3Backend {
bucket: Box<Bucket>,
bucket: Bucket,
compression: Compression,
}

impl S3Backend {
pub async fn new(config: S3Config) -> Result<Self, S3Error> {
pub async fn new(config: S3Config, compression: Compression) -> Result<Self, S3Error> {
let bucket = Bucket::new(
&config.bucket.unwrap_or_default(),
config.region.unwrap_or_default().parse()?,
Expand All @@ -24,36 +30,44 @@ impl S3Backend {
None,
None,
)?,
)?;
)?
.with_extra_headers(HeaderMap::from_iter([(
CONTENT_ENCODING,
HeaderValue::from_str(&compression.to_string())?,
)]))?;
assert!(bucket.exists().await?, "S3 bucket not found");
log::info!(
"Using S3 bucket '{}' in '{}' for doc storage",
bucket.name,
bucket.region
);
Ok(S3Backend { bucket })
Ok(S3Backend {
bucket,
compression,
})
}
}

impl StorageBackend for S3Backend {
type Error = S3Error;
type Error = Error;

async fn store<E, S>(&self, stream: S) -> Result<StorageResult, StoreError<E, Self::Error>>
where
E: Debug,
S: Stream<Item = Result<Bytes, E>>,
{
let errhdlr = |e| StoreError::Backend(S3Error::Io(e));

let stream = pin!(stream);
let mut file = TempFile::new(stream).await.map_err(errhdlr)?;
let mut source = file.reader().await.map_err(errhdlr)?;
let mut file = TempFile::new(stream).await.map_err(Error::Io)?;
let mut source = self
.compression
.compress(file.reader().await.map_err(Error::Io)?)
.await;
let result = file.result();

self.bucket
.put_object_stream(&mut source, result.key().to_string())
.await
.map_err(StoreError::Backend)?;
.map_err(Error::S3)?;

Ok(result)
}
Expand All @@ -62,10 +76,39 @@ impl StorageBackend for S3Backend {
&self,
StorageKey(key): StorageKey,
) -> Result<Option<impl Stream<Item = Result<Bytes, Self::Error>> + 'a>, Self::Error> {
match self.bucket.get_object_stream(key).await {
Ok(resp) => Ok(Some(resp.bytes)),
let (head, _status) = self.bucket.head_object(&key).await?;
let encoding = head
.content_encoding
.unwrap_or(Compression::None.to_string());
let compression = Compression::from_str(&encoding)?;
match self.bucket.get_object_stream(&key).await {
Ok(resp) => {
let reader = StreamReader::new(resp.bytes.map_err(|e| match e {
S3Error::Io(e) => e,
_ => io::Error::new(io::ErrorKind::Other, e),
}));
Ok(Some(
ReaderStream::new(compression.reader(reader)).map_err(Error::Io),
))
}
Err(S3Error::HttpFailWithBody(404, _)) => Ok(None),
Err(e) => Err(e),
Err(e) => Err(e.into()),
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("{0}")]
S3(#[from] S3Error),
#[error("{0}")]
Io(#[from] io::Error),
#[error("{0}")]
Parse(#[from] strum::ParseError),
}

impl<E: Debug> From<Error> for StoreError<E, Error> {
fn from(e: Error) -> Self {
StoreError::Backend(e)
}
}
6 changes: 3 additions & 3 deletions modules/storage/src/service/temp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use tempfile::tempfile;
use tokio::{
fs::File,
io::{AsyncRead, AsyncSeekExt, AsyncWriteExt},
io::{AsyncBufRead, AsyncSeekExt, AsyncWriteExt, BufReader},
};
use trustify_common::hashing::{Contexts, Digests};

Expand Down Expand Up @@ -47,9 +47,9 @@ impl TempFile {
}

/// Return a clone of the temp file after resetting its position
pub async fn reader(&mut self) -> Result<impl AsyncRead, Error> {
pub async fn reader(&mut self) -> Result<impl AsyncBufRead, Error> {
self.file.seek(SeekFrom::Start(0)).await?;
self.file.try_clone().await
Ok(BufReader::new(self.file.try_clone().await?))
}

/// Passing self should ensure self.file is dropped and hence
Expand Down
6 changes: 3 additions & 3 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ impl InitData {
FileSystemBackend::new(storage, run.storage.compression).await?,
)
}
StorageStrategy::S3 => {
DispatchBackend::S3(S3Backend::new(run.storage.s3_config).await?)
}
StorageStrategy::S3 => DispatchBackend::S3(
S3Backend::new(run.storage.s3_config, run.storage.compression).await?,
),
};

let ui = UI {
Expand Down