Skip to content

Commit

Permalink
refactor: move fusio-object-store to another crate
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Sep 29, 2024
1 parent 5682847 commit 10c57c6
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 55 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["fusio", "fusio-parquet"]
members = ["fusio", "fusio-object-store", "fusio-parquet"]
resolver = "2"

[workspace.package]
Expand Down
23 changes: 23 additions & 0 deletions fusio-object-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
edition.workspace = true
license.workspace = true
name = "fusio-object-store"
repository.workspace = true
version = "0.1.0"

[dependencies]
async-stream = { version = "0.3" }
fusio = { version = "0.1", path = "../fusio", features = [
"bytes",
"dyn",
"object_store",
"tokio",
] }
futures-core = { version = "0.3" }
futures-util = { version = "0.3" }
object_store = { version = "0.11" }

[dev-dependencies]
bytes = { workspace = true }
object_store = { version = "0.11", features = ["aws"] }
tokio = { version = "1", features = ["full"] }
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
use std::sync::Arc;

use async_stream::stream;
use futures_core::Stream;
use futures_util::stream::StreamExt;
use object_store::{aws::AmazonS3, ObjectStore};

use crate::{
use fusio::{
fs::{FileMeta, Fs, OpenOptions, WriteMode},
path::Path,
remotes::object_store::S3File,
Error,
};
use futures_core::Stream;
use futures_util::stream::StreamExt;
use object_store::ObjectStore;

pub struct S3Store {
inner: Arc<AmazonS3>,
use crate::{BoxedError, S3File};

pub struct S3Store<O: ObjectStore> {
inner: Arc<O>,
}

impl<O: ObjectStore> From<O> for S3Store<O> {
fn from(inner: O) -> Self {
Self {
inner: Arc::new(inner),
}
}
}

impl Fs for S3Store {
type File = S3File;
impl<O: ObjectStore> Fs for S3Store<O> {
type File = S3File<O>;

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
if let Some(WriteMode::Append) = options.write {
Expand All @@ -42,15 +50,15 @@ impl Fs for S3Store {
let mut stream = self.inner.list(Some(&path));

Ok(stream! {
while let Some(meta) = stream.next().await.transpose()? {
while let Some(meta) = stream.next().await.transpose().map_err(BoxedError::from)? {
yield Ok(FileMeta { path: meta.location.into(), size: meta.size as u64 });
}
})
}

async fn remove(&self, path: &Path) -> Result<(), Error> {
let path = path.clone().into();
self.inner.delete(&path).await?;
self.inner.delete(&path).await.map_err(BoxedError::from)?;

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
mod fs;
pub mod fs;

use std::{ops::Range, sync::Arc};

use object_store::{aws::AmazonS3, path::Path, GetOptions, GetRange, ObjectStore, PutPayload};
use fusio::{Error, IoBuf, IoBufMut, Read, Seek, Write};
use object_store::{path::Path, GetOptions, GetRange, ObjectStore, PutPayload};

use crate::{buf::IoBufMut, Error, IoBuf, Read, Seek, Write};
pub type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;

pub struct S3File {
inner: Arc<AmazonS3>,
pub struct S3File<O: ObjectStore> {
inner: Arc<O>,
path: Path,
pos: u64,
}

impl Read for S3File {
impl<O: ObjectStore> Read for S3File<O> {
async fn read_exact<B: IoBufMut>(&mut self, mut buf: B) -> Result<B, Error> {
let pos = self.pos as usize;

Expand All @@ -23,8 +24,12 @@ impl Read for S3File {
});
opts.range = Some(range);

let result = self.inner.get_opts(&self.path, opts).await?;
let bytes = result.bytes().await?;
let result = self
.inner
.get_opts(&self.path, opts)
.await
.map_err(BoxedError::from)?;
let bytes = result.bytes().await.map_err(BoxedError::from)?;

self.pos += bytes.len() as u64;

Expand All @@ -37,26 +42,30 @@ impl Read for S3File {
head: true,
..Default::default()
};
let response = self.inner.get_opts(&self.path, options).await?;
let response = self
.inner
.get_opts(&self.path, options)
.await
.map_err(BoxedError::from)?;
Ok(response.meta.size as u64)
}
}

impl Seek for S3File {
impl<O: ObjectStore> Seek for S3File<O> {
async fn seek(&mut self, pos: u64) -> Result<(), Error> {
self.pos = pos;
Ok(())
}
}

impl Write for S3File {
impl<O: ObjectStore> Write for S3File<O> {
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
let result = self
.inner
.put(&self.path, PutPayload::from_bytes(buf.as_bytes()))
.await
.map(|_| ())
.map_err(Error::ObjectStore);
.map_err(|e| BoxedError::from(e).into());

(result, buf)
}
Expand All @@ -77,15 +86,14 @@ impl Write for S3File {
#[cfg(test)]
mod tests {

#[cfg(all(feature = "tokio", not(feature = "completion-based")))]
#[tokio::test]
async fn test_s3() {
use std::{env, env::VarError, sync::Arc};

use bytes::Bytes;
use object_store::{aws::AmazonS3Builder, ObjectStore};

use crate::{remotes::object_store::S3File, Read, Write};
use crate::{Read, S3File, Write};

let fn_env = || {
let region = env::var("TEST_INTEGRATION")?;
Expand Down Expand Up @@ -119,7 +127,7 @@ mod tests {
let (result, bytes) = store.write_all(Bytes::from("hello! Fusio!")).await;
result.unwrap();

let buf = vec![0_u8; bytes.len()];
let mut buf = vec![0_u8; bytes.len()];
let buf = store.read_exact(&mut buf[..]).await.unwrap();
assert_eq!(buf, &bytes[..]);
}
Expand Down
8 changes: 2 additions & 6 deletions fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ bytes = ["dep:bytes"]
completion-based = []
default = ["dyn", "fs"]
dyn = []
futures = ["dep:futures-util"]
fs = ["tokio?/rt"]
http = [
"async-stream",
Expand All @@ -38,7 +37,7 @@ http = [
monoio = ["async-stream", "completion-based", "dep:monoio", "no-send"]
monoio-http = ["h2", "http", "hyper"]
no-send = []
object_store = ["dep:futures-util", "dep:object_store", "tokio"]
object_store = ["dep:object_store"]
tokio = ["async-stream", "dep:tokio"]
tokio-http = ["dep:reqwest", "http"]
tokio-uring = ["async-stream", "completion-based", "dep:tokio-uring", "no-send"]
Expand Down Expand Up @@ -71,9 +70,7 @@ hyper = { version = "1", optional = true, default-features = false, features = [
] }
itertools = { version = "0.13" }
monoio = { version = "0.2", optional = true, default-features = false }
object_store = { version = "0.11", optional = true, default-features = false, features = [
"aws",
] }
object_store = { version = "0.11", optional = true, default-features = false }
percent-encoding = { version = "2" }
quick-xml = { version = "0.36", features = [
"overlapped-lists",
Expand All @@ -100,7 +97,6 @@ tokio-uring = { version = "0.5", default-features = false, optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
futures-executor = "0.3"
futures-util = { version = "0.3" }
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
Expand Down
3 changes: 0 additions & 3 deletions fusio/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ pub enum Error {
Io(#[from] io::Error),
#[cfg(feature = "http")]
Http(#[from] http::Error),
#[cfg(feature = "object_store")]
ObjectStore(#[from] object_store::Error),
Path(#[from] crate::path::Error),
#[error("unsupported operation")]
Unsupported,
Expand All @@ -26,5 +24,4 @@ pub enum Error {
},
}

#[allow(unused)]
pub type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
8 changes: 4 additions & 4 deletions fusio/src/fs/options.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#[derive(PartialEq, Eq)]
pub(crate) enum WriteMode {
pub enum WriteMode {
Append,
Overwrite,
}

pub struct OpenOptions {
pub(crate) read: bool,
pub(crate) write: Option<WriteMode>,
pub(crate) create: bool,
pub read: bool,
pub write: Option<WriteMode>,
pub create: bool,
}

impl Default for OpenOptions {
Expand Down
3 changes: 1 addition & 2 deletions fusio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ mod tests {
assert_eq!(buf.as_slice(), &[2, 0, 2, 4]);
}

#[cfg(feature = "futures")]
async fn test_local_fs<S>(fs: S) -> Result<(), Error>
where
S: crate::fs::Fs,
Expand Down Expand Up @@ -386,7 +385,7 @@ mod tests {
write_and_read(File::from_std(write), File::from_std(read)).await;
}

#[cfg(all(feature = "tokio", feature = "futures"))]
#[cfg(feature = "tokio")]
#[tokio::test]
async fn test_tokio_fs() {
use crate::local::TokioFs;
Expand Down
18 changes: 9 additions & 9 deletions fusio/src/remotes/aws/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,23 @@ pub struct AmazonS3Builder {
}

impl AmazonS3Builder {
#[allow(unused_variables)]
pub fn new(bucket: String) -> Self {
cfg_if::cfg_if! {
if #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] {
let client = Arc::new(crate::remotes::http::tokio::TokioClient::new());
Self {
region: "us-east-1".into(),
bucket,
credential: None,
sign_payload: false,
checksum: false,
client,
}
} else {
unreachable!()
}
}

Self {
region: "us-east-1".into(),
bucket,
credential: None,
sign_payload: false,
checksum: false,
client,
}
}
}

Expand Down
3 changes: 0 additions & 3 deletions fusio/src/remotes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,3 @@
pub mod aws;
#[cfg(feature = "http")]
pub mod http;

#[cfg(feature = "object_store")]
pub mod object_store;

0 comments on commit 10c57c6

Please sign in to comment.