diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bd1c360..88c0fe3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,19 +34,19 @@ jobs: uses: actions-rs/cargo@v1 with: command: check - args: --package fusio --features "tokio, futures" + args: --package fusio --features "tokio" - name: Run cargo build on tokio uses: actions-rs/cargo@v1 with: command: build - args: --package fusio --features "tokio, futures" + args: --package fusio --features "tokio" - name: Run cargo test on tokio uses: actions-rs/cargo@v1 with: command: test - args: --package fusio --features "tokio, futures" + args: --package fusio --features "tokio" monoio_check: name: Rust project check on monoio @@ -71,19 +71,19 @@ jobs: uses: actions-rs/cargo@v1 with: command: check - args: --package fusio --features "monoio, futures" + args: --package fusio --features "monoio" - name: Run cargo build on monoio uses: actions-rs/cargo@v1 with: command: build - args: --package fusio --features "monoio, futures" + args: --package fusio --features "monoio" - name: Run cargo test on monoio uses: actions-rs/cargo@v1 with: command: test - args: --package fusio --features "monoio, futures" + args: --package fusio --features "monoio" # 2 fmt: diff --git a/Cargo.toml b/Cargo.toml index 5d471ff..86be0d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["fusio", "fusio-parquet"] +members = ["fusio", "fusio-object-store", "fusio-parquet"] resolver = "2" [workspace.package] diff --git a/fusio-object-store/Cargo.toml b/fusio-object-store/Cargo.toml new file mode 100644 index 0000000..8fa2947 --- /dev/null +++ b/fusio-object-store/Cargo.toml @@ -0,0 +1,23 @@ +[package] +edition.workspace = true +license.workspace = true +name = "fusio-object-store" +repository.workspace = true +version = "0.1.0" + +[dependencies] +async-stream = { version = "0.3" } +fusio = { version = "0.1", path = "../fusio", features = [ + "bytes", + "dyn", + "object_store", + "tokio", +] } +futures-core = { version = "0.3" } +futures-util = { version = "0.3" } +object_store = { version = "0.11" } + +[dev-dependencies] +bytes = { workspace = true } +object_store = { version = "0.11", features = ["aws"] } +tokio = { version = "1", features = ["full"] } diff --git a/fusio/src/remotes/object_store/fs.rs b/fusio-object-store/src/fs.rs similarity index 72% rename from fusio/src/remotes/object_store/fs.rs rename to fusio-object-store/src/fs.rs index db230ba..f4b670b 100644 --- a/fusio/src/remotes/object_store/fs.rs +++ b/fusio-object-store/src/fs.rs @@ -1,23 +1,31 @@ use std::sync::Arc; use async_stream::stream; -use futures_core::Stream; -use futures_util::stream::StreamExt; -use object_store::{aws::AmazonS3, ObjectStore}; - -use crate::{ +use fusio::{ fs::{FileMeta, Fs, OpenOptions, WriteMode}, path::Path, - remotes::object_store::S3File, Error, }; +use futures_core::Stream; +use futures_util::stream::StreamExt; +use object_store::ObjectStore; -pub struct S3Store { - inner: Arc, +use crate::{BoxedError, S3File}; + +pub struct S3Store { + inner: Arc, +} + +impl From for S3Store { + fn from(inner: O) -> Self { + Self { + inner: Arc::new(inner), + } + } } -impl Fs for S3Store { - type File = S3File; +impl Fs for S3Store { + type File = S3File; async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { if let Some(WriteMode::Append) = options.write { @@ -42,7 +50,7 @@ impl Fs for S3Store { let mut stream = self.inner.list(Some(&path)); Ok(stream! { - while let Some(meta) = stream.next().await.transpose()? { + while let Some(meta) = stream.next().await.transpose().map_err(BoxedError::from)? { yield Ok(FileMeta { path: meta.location.into(), size: meta.size as u64 }); } }) @@ -50,7 +58,7 @@ impl Fs for S3Store { async fn remove(&self, path: &Path) -> Result<(), Error> { let path = path.clone().into(); - self.inner.delete(&path).await?; + self.inner.delete(&path).await.map_err(BoxedError::from)?; Ok(()) } diff --git a/fusio/src/remotes/object_store/mod.rs b/fusio-object-store/src/lib.rs similarity index 75% rename from fusio/src/remotes/object_store/mod.rs rename to fusio-object-store/src/lib.rs index 046456f..c111d3b 100644 --- a/fusio/src/remotes/object_store/mod.rs +++ b/fusio-object-store/src/lib.rs @@ -1,18 +1,19 @@ -mod fs; +pub mod fs; use std::{ops::Range, sync::Arc}; -use object_store::{aws::AmazonS3, path::Path, GetOptions, GetRange, ObjectStore, PutPayload}; +use fusio::{Error, IoBuf, IoBufMut, Read, Seek, Write}; +use object_store::{path::Path, GetOptions, GetRange, ObjectStore, PutPayload}; -use crate::{buf::IoBufMut, Error, IoBuf, Read, Seek, Write}; +pub type BoxedError = Box; -pub struct S3File { - inner: Arc, +pub struct S3File { + inner: Arc, path: Path, pos: u64, } -impl Read for S3File { +impl Read for S3File { async fn read_exact(&mut self, mut buf: B) -> Result { let pos = self.pos as usize; @@ -23,8 +24,12 @@ impl Read for S3File { }); opts.range = Some(range); - let result = self.inner.get_opts(&self.path, opts).await?; - let bytes = result.bytes().await?; + let result = self + .inner + .get_opts(&self.path, opts) + .await + .map_err(BoxedError::from)?; + let bytes = result.bytes().await.map_err(BoxedError::from)?; self.pos += bytes.len() as u64; @@ -37,26 +42,30 @@ impl Read for S3File { head: true, ..Default::default() }; - let response = self.inner.get_opts(&self.path, options).await?; + let response = self + .inner + .get_opts(&self.path, options) + .await + .map_err(BoxedError::from)?; Ok(response.meta.size as u64) } } -impl Seek for S3File { +impl Seek for S3File { async fn seek(&mut self, pos: u64) -> Result<(), Error> { self.pos = pos; Ok(()) } } -impl Write for S3File { +impl Write for S3File { async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { let result = self .inner .put(&self.path, PutPayload::from_bytes(buf.as_bytes())) .await .map(|_| ()) - .map_err(Error::ObjectStore); + .map_err(|e| BoxedError::from(e).into()); (result, buf) } @@ -77,7 +86,6 @@ impl Write for S3File { #[cfg(test)] mod tests { - #[cfg(all(feature = "tokio", not(feature = "completion-based")))] #[tokio::test] async fn test_s3() { use std::{env, env::VarError, sync::Arc}; @@ -85,7 +93,7 @@ mod tests { use bytes::Bytes; use object_store::{aws::AmazonS3Builder, ObjectStore}; - use crate::{remotes::object_store::S3File, Read, Write}; + use crate::{Read, S3File, Write}; let fn_env = || { let region = env::var("TEST_INTEGRATION")?; @@ -119,7 +127,7 @@ mod tests { let (result, bytes) = store.write_all(Bytes::from("hello! Fusio!")).await; result.unwrap(); - let buf = vec![0_u8; bytes.len()]; + let mut buf = vec![0_u8; bytes.len()]; let buf = store.read_exact(&mut buf[..]).await.unwrap(); assert_eq!(buf, &bytes[..]); } diff --git a/fusio/Cargo.toml b/fusio/Cargo.toml index d17ee43..1ec016b 100644 --- a/fusio/Cargo.toml +++ b/fusio/Cargo.toml @@ -23,7 +23,6 @@ bytes = ["dep:bytes"] completion-based = [] default = ["dyn", "fs"] dyn = [] -futures = ["dep:futures-util"] fs = ["tokio?/rt"] http = [ "async-stream", @@ -38,7 +37,7 @@ http = [ monoio = ["async-stream", "completion-based", "dep:monoio", "no-send"] monoio-http = ["h2", "http", "hyper"] no-send = [] -object_store = ["dep:futures-util", "dep:object_store", "tokio"] +object_store = ["dep:object_store"] tokio = ["async-stream", "dep:tokio"] tokio-http = ["dep:reqwest", "http"] tokio-uring = ["async-stream", "completion-based", "dep:tokio-uring", "no-send"] @@ -71,9 +70,7 @@ hyper = { version = "1", optional = true, default-features = false, features = [ ] } itertools = { version = "0.13" } monoio = { version = "0.2", optional = true, default-features = false } -object_store = { version = "0.11", optional = true, default-features = false, features = [ - "aws", -] } +object_store = { version = "0.11", optional = true, default-features = false } percent-encoding = { version = "2" } quick-xml = { version = "0.36", features = [ "overlapped-lists", @@ -100,7 +97,6 @@ tokio-uring = { version = "0.5", default-features = false, optional = true } [dev-dependencies] criterion = { version = "0.5", features = ["async_tokio", "html_reports"] } -futures-executor = "0.3" futures-util = { version = "0.3" } hyper = { version = "1", features = ["full"] } hyper-util = { version = "0.1", features = ["full"] } diff --git a/fusio/src/error.rs b/fusio/src/error.rs index a15ad54..21fe069 100644 --- a/fusio/src/error.rs +++ b/fusio/src/error.rs @@ -9,8 +9,6 @@ pub enum Error { Io(#[from] io::Error), #[cfg(feature = "http")] Http(#[from] http::Error), - #[cfg(feature = "object_store")] - ObjectStore(#[from] object_store::Error), Path(#[from] crate::path::Error), #[error("unsupported operation")] Unsupported, @@ -26,5 +24,4 @@ pub enum Error { }, } -#[allow(unused)] pub type BoxedError = Box; diff --git a/fusio/src/fs/options.rs b/fusio/src/fs/options.rs index 9c8e502..5b21442 100644 --- a/fusio/src/fs/options.rs +++ b/fusio/src/fs/options.rs @@ -1,13 +1,13 @@ #[derive(PartialEq, Eq)] -pub(crate) enum WriteMode { +pub enum WriteMode { Append, Overwrite, } pub struct OpenOptions { - pub(crate) read: bool, - pub(crate) write: Option, - pub(crate) create: bool, + pub read: bool, + pub write: Option, + pub create: bool, } impl Default for OpenOptions { diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 9c9bbc3..e543fd7 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -268,7 +268,6 @@ mod tests { assert_eq!(buf.as_slice(), &[2, 0, 2, 4]); } - #[cfg(feature = "futures")] async fn test_local_fs(fs: S) -> Result<(), Error> where S: crate::fs::Fs, @@ -386,7 +385,7 @@ mod tests { write_and_read(File::from_std(write), File::from_std(read)).await; } - #[cfg(all(feature = "tokio", feature = "futures"))] + #[cfg(feature = "tokio")] #[tokio::test] async fn test_tokio_fs() { use crate::local::TokioFs; diff --git a/fusio/src/remotes/aws/fs.rs b/fusio/src/remotes/aws/fs.rs index 5d00e24..4c013be 100644 --- a/fusio/src/remotes/aws/fs.rs +++ b/fusio/src/remotes/aws/fs.rs @@ -30,23 +30,23 @@ pub struct AmazonS3Builder { } impl AmazonS3Builder { + #[allow(unused_variables)] pub fn new(bucket: String) -> Self { cfg_if::cfg_if! { if #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] { let client = Arc::new(crate::remotes::http::tokio::TokioClient::new()); + Self { + region: "us-east-1".into(), + bucket, + credential: None, + sign_payload: false, + checksum: false, + client, + } } else { unreachable!() } } - - Self { - region: "us-east-1".into(), - bucket, - credential: None, - sign_payload: false, - checksum: false, - client, - } } } diff --git a/fusio/src/remotes/mod.rs b/fusio/src/remotes/mod.rs index faba988..56ed013 100644 --- a/fusio/src/remotes/mod.rs +++ b/fusio/src/remotes/mod.rs @@ -2,6 +2,3 @@ pub mod aws; #[cfg(feature = "http")] pub mod http; - -#[cfg(feature = "object_store")] -pub mod object_store;