diff --git a/Cargo.toml b/Cargo.toml index 86be0d3..ccf98f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["fusio", "fusio-object-store", "fusio-parquet"] +members = ["examples", "fusio", "fusio-object-store", "fusio-parquet"] resolver = "2" [workspace.package] diff --git a/README.md b/README.md new file mode 100644 index 0000000..daf5970 --- /dev/null +++ b/README.md @@ -0,0 +1,74 @@ +# Fusio + +Fusio provides [Read](https://github.com/tonbo-io/fusio/blob/main/fusio/src/lib.rs#L81) / [Write](https://github.com/tonbo-io/fusio/blob/main/fusio/src/lib.rs#L63) trait to operate multiple storage (local disk, Amazon S3) on multiple poll-based ([tokio](https://github.com/tokio-rs/tokio)) / completion-based async runtime ([tokio-uring](https://github.com/tokio-rs/tokio-uring), [monoio](https://github.com/bytedance/monoio)) with: +- lean: binary size is at least 14x smaller than others +- minimal cost abstraction: compare with bare storage backend, trait definitions promise dispatching file operations without extra costs +- extensible: expose traits to support implementing storage backend as [third-party crates](https://github.com/tonbo-io/fusio/tree/main/fusio-object-store) + +> **Fusio is now at preview version, please join our [community](https://discord.gg/j27XVFVmJM) to attend its development and semantic / behavior discussion.** + +## Why need Fusio? + +Since we start to integrate object storage in [Tonbo](https://github.com/tonbo-io/tonbo), we need file and file system abstractions to dispatch read and write operations to multiple storage backend: memory, local disk, remote object storage and so on. We found that the exist solution has limitations as below: +- local or remote file system accessing is not able to be usable in kinds of async runtimes (not only completion-based runime, but also Python / JS event loop) +- most of VFS implementations are designed for backend server scenarios. As an embedded database, Tonbo needs a lean implementation for embedded, and also a set of traits, allows to extend asynchronous file / file system approach as third-party crates. + +For more context, please check [apache/arrow-rs#6051](https://github.com/apache/arrow-rs/issues/6051). + +## How to use it? + +Because it is not possible to make poll-based async runtime compatible with completion-based at runtime, `fusio` supports switch runtime at compile time + +### Installation +```toml +fusio = { version = "*", features = ["tokio"] } +``` + +### Examples +- + +## When choose fusio? + +Targets of fusio is different with [object_store](https://github.com/apache/arrow-rs/tree/master/object_store) or [opendal](https://github.com/apache/opendal). + +### compare with `object_store` + +`object_store` is locked on [tokio](https://github.com/tokio-rs/tokio) runtime in the current, and also [bytes](https://github.com/apache/arrow-rs/blob/master/object_store/src/payload.rs#L23). `fusio` chooses completion-based like API (inspired by [monoio](https://docs.rs/monoio/latest/monoio/io/trait.AsyncReadRent.html)) to get the minimal cost abstraction in all kinds of async runtimes. + +`fusio` also uses [IoBuf](https://github.com/tonbo-io/fusio/blob/main/fusio/src/lib.rs#L53) / [IoBufMut](https://github.com/tonbo-io/fusio/blob/main/fusio/src/lib.rs#L64) to allow `&[u8]`, `Vec` avoid potential runtime costs. If you are not aware of vendor lock-in, try `object_store`, as the official implementation, it integrates well with `arrow` and `parquet`. + +### compare with `opendal` + +`fusio` does not aim to be a full data access layer like `opendal`. `fusio` is able to enable features and their dependencies on by one. The default binary size of `fusio` is 245KB, which is much more smaller than `opendal` (8.9MB). If you need a full ecosystem of DAL (tracing, cache, etc.) try `opendal`. + +Also, + +## Roadmap +- abstractions + - [x] file operations + - [x] (partial) file system operations +- storage backend implementations + - disk + - [x] tokio + - [x] tokio-uring + - [x] monoio + - [x] network + - [x] HTTP client trait wi + - [x] network storage runtime support + - [x] tokio (base on reqwest) + - [ ] monoio (base on hyper-tls) + - [ ] tokio-uring (base on hyper-tls) + - [x] Amazon S3 + - [ ] Azure Blob Storage + - [ ] Cloudflare R2 + - [ ] in-memory +- [ ] [conditional operations](https://aws.amazon.com/cn/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/) +- extensions + - [x] parquet support + - [x] object_store support + +## Credits +- `monoio`: all core traits: buffer, read and write are highly inspire by it +- `futures`: how it designs abstractions and organizes several crates (core, util, etc.) to avoid coupling impact `fusio`'s design +- `opendal`: compile-time poll-based / completion-based runtime switch insipres `fusio` +- `object_store`: `fusio` copies S3 credential and path behaviors from it diff --git a/examples/Cargo.toml b/examples/Cargo.toml new file mode 100644 index 0000000..acaf97e --- /dev/null +++ b/examples/Cargo.toml @@ -0,0 +1,16 @@ +[package] +edition.workspace = true +license.workspace = true +name = "examples" +repository.workspace = true +version = "0.1.0" + +[features] +default = ["fusio/aws", "tokio"] +monoio = ["dep:monoio", "fusio/monoio"] +tokio = ["dep:tokio", "fusio/tokio"] + +[dependencies] +fusio = { path = "../fusio" } +monoio = { version = "0.2", optional = true } +tokio = { version = "1.0", features = ["full"], optional = true } diff --git a/examples/src/fs.rs b/examples/src/fs.rs new file mode 100644 index 0000000..7194775 --- /dev/null +++ b/examples/src/fs.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +use fusio::disk::LocalFs; +use fusio::dynamic::DynFile; +use fusio::DynFs; + +#[allow(unused)] +async fn use_fs() { + let fs: Arc = Arc::new(LocalFs {}); + + let mut file: Box = Box::new(fs.open(&"foo.txt".into()).await.unwrap()); + + let write_buf = "hello, world".as_bytes(); + let mut read_buf = [0; 12]; + + let (result, _, read_buf) = + crate::write_without_runtime_awareness(&mut file, write_buf, &mut read_buf[..]).await; + result.unwrap(); + + assert_eq!(&read_buf, b"hello, world"); +} diff --git a/examples/src/lib.rs b/examples/src/lib.rs new file mode 100644 index 0000000..a7e3112 --- /dev/null +++ b/examples/src/lib.rs @@ -0,0 +1,33 @@ +mod fs; +mod multi_runtime; +mod object; +mod s3; + +use fusio::{Error, IoBuf, IoBufMut, Read, Seek, Write}; + +#[allow(unused)] +async fn write_without_runtime_awareness( + file: &mut F, + write_buf: B, + read_buf: BM, +) -> (Result<(), Error>, B, BM) +where + F: Read + Write + Seek, + B: IoBuf, + BM: IoBufMut, +{ + let (result, write_buf) = file.write_all(write_buf).await; + if result.is_err() { + return (result, write_buf, read_buf); + } + + file.sync_all().await.unwrap(); + file.seek(0).await.unwrap(); + + let (result, read_buf) = file.read(read_buf).await; + if result.is_err() { + return (result.map(|_| ()), write_buf, read_buf); + } + + (Ok(()), write_buf, read_buf) +} diff --git a/examples/src/multi_runtime.rs b/examples/src/multi_runtime.rs new file mode 100644 index 0000000..613637b --- /dev/null +++ b/examples/src/multi_runtime.rs @@ -0,0 +1,31 @@ +use crate::write_without_runtime_awareness; + +#[allow(unused)] +#[cfg(feature = "tokio")] +async fn use_tokio_file() { + use tokio::fs::File; + + let mut file = File::open("foo.txt").await.unwrap(); + let write_buf = "hello, world".as_bytes(); + let mut read_buf = [0; 12]; + let (result, _, read_buf) = + write_without_runtime_awareness(&mut file, write_buf, &mut read_buf[..]).await; + result.unwrap(); + assert_eq!(&read_buf, b"hello, world"); +} + +#[allow(unused)] +#[cfg(feature = "monoio")] +async fn use_monoio_file() { + use fusio::disk::MonoioFile; + use monoio::fs::File; + + let mut file: MonoioFile = File::open("foo.txt").await.unwrap().into(); + let write_buf = "hello, world".as_bytes(); + let read_buf = vec![0; 12]; + // completion-based runtime has to pass owned buffer to the function + let (result, _, read_buf) = + write_without_runtime_awareness(&mut file, write_buf, read_buf).await; + result.unwrap(); + assert_eq!(&read_buf, b"hello, world"); +} diff --git a/examples/src/object.rs b/examples/src/object.rs new file mode 100644 index 0000000..e69c3a5 --- /dev/null +++ b/examples/src/object.rs @@ -0,0 +1,25 @@ +use fusio::dynamic::DynFile; +use fusio::{Error, IoBuf, IoBufMut, Read, Write}; + +#[allow(unused)] +async fn object_safe_file_trait( + mut file: Box, + write_buf: B, + read_buf: BM, +) -> (Result<(), Error>, B, BM) +where + B: IoBuf, + BM: IoBufMut, +{ + let (result, write_buf) = file.write_all(write_buf).await; + if result.is_err() { + return (result, write_buf, read_buf); + } + + let (result, read_buf) = file.read(read_buf).await; + if result.is_err() { + return (result.map(|_| ()), write_buf, read_buf); + } + + (Ok(()), write_buf, read_buf) +} diff --git a/examples/src/s3.rs b/examples/src/s3.rs new file mode 100644 index 0000000..ef6584b --- /dev/null +++ b/examples/src/s3.rs @@ -0,0 +1,32 @@ +use std::env; +use std::sync::Arc; + +use fusio::remotes::aws::fs::AmazonS3Builder; +use fusio::remotes::aws::AwsCredential; +use fusio::DynFs; + +use crate::write_without_runtime_awareness; + +#[allow(unused)] +async fn use_fs() { + let key_id = env::var("AWS_ACCESS_KEY_ID").unwrap(); + let secret_key = env::var("AWS_SECRET_ACCESS_KEY").unwrap(); + + let s3 = AmazonS3Builder::new("fusio-test".into()) + .credential(AwsCredential { + key_id, + secret_key, + token: None, + }) + .region("ap-southeast-1".into()) + .sign_payload(true) + .build(); + + let fs: Arc = Arc::new(s3); + let _ = write_without_runtime_awareness( + &mut fs.open(&"foo.txt".into()).await.unwrap(), + "hello, world".as_bytes(), + &mut [0; 12][..], + ) + .await; +} diff --git a/fusio-object-store/Cargo.toml b/fusio-object-store/Cargo.toml index 8fa2947..00d4a1a 100644 --- a/fusio-object-store/Cargo.toml +++ b/fusio-object-store/Cargo.toml @@ -1,4 +1,5 @@ [package] +description = "the object_store integration of Fusio." edition.workspace = true license.workspace = true name = "fusio-object-store" diff --git a/fusio-object-store/src/lib.rs b/fusio-object-store/src/lib.rs index ff38427..c674682 100644 --- a/fusio-object-store/src/lib.rs +++ b/fusio-object-store/src/lib.rs @@ -18,27 +18,35 @@ impl S3File { &mut self, range: GetRange, mut buf: B, - ) -> Result { + ) -> (Result, B) { let opts = GetOptions { range: Some(range), ..Default::default() }; - let result = self + let result = match self .inner .get_opts(&self.path, opts) .await - .map_err(BoxedError::from)?; - let bytes = result.bytes().await.map_err(BoxedError::from)?; + .map_err(BoxedError::from) + { + Ok(result) => result, + Err(e) => return (Err(e.into()), buf), + }; + + let bytes = match result.bytes().await.map_err(BoxedError::from) { + Ok(bytes) => bytes, + Err(e) => return (Err(e.into()), buf), + }; buf.set_init(bytes.len()); buf.as_slice_mut().copy_from_slice(&bytes); - Ok(buf) + (Ok(bytes.len() as u64), buf) } } impl Read for S3File { - async fn read_exact(&mut self, buf: B) -> Result { + async fn read(&mut self, buf: B) -> (Result, B) { let pos = self.pos as usize; let range = GetRange::Bounded(Range { @@ -49,11 +57,18 @@ impl Read for S3File { self.read_with_range(range, buf).await } - async fn read_to_end(&mut self, buf: Vec) -> Result, Error> { + async fn read_to_end(&mut self, buf: Vec) -> (Result<(), Error>, Vec) { let pos = self.pos as usize; let range = GetRange::Offset(pos); - self.read_with_range(range, buf).await + let (result, buf) = self.read_with_range(range, buf).await; + match result { + Ok(size) => { + self.pos += size; + (Ok(()), buf) + } + Err(e) => (Err(e), buf), + } } async fn size(&self) -> Result { @@ -147,7 +162,8 @@ mod tests { result.unwrap(); let mut buf = vec![0_u8; bytes.len()]; - let buf = store.read_exact(&mut buf[..]).await.unwrap(); + let (result, buf) = store.read(&mut buf[..]).await; + result.unwrap(); assert_eq!(buf, &bytes[..]); } } diff --git a/fusio-parquet/Cargo.toml b/fusio-parquet/Cargo.toml index 0aa92b1..ab1bbda 100644 --- a/fusio-parquet/Cargo.toml +++ b/fusio-parquet/Cargo.toml @@ -1,4 +1,5 @@ [package] +description = "Parquet reader and writer implementations for Fusio." edition.workspace = true license.workspace = true name = "fusio-parquet" @@ -20,6 +21,3 @@ tokio = { version = "1.40" } arrow = "53" rand = "0.8" tempfile = "3.12.0" - -[lib] -path = "src/lib.rs" diff --git a/fusio-parquet/src/reader.rs b/fusio-parquet/src/reader.rs index aaf7d0b..e685870 100644 --- a/fusio-parquet/src/reader.rs +++ b/fusio-parquet/src/reader.rs @@ -53,11 +53,8 @@ impl AsyncFileReader for AsyncReader { .seek(range.start as u64) .await .map_err(|err| ParquetError::External(Box::new(err)))?; - let buf = self - .inner - .read_exact(buf) - .await - .map_err(|err| ParquetError::External(Box::new(err)))?; + let (result, buf) = self.inner.read(buf).await; + result.map_err(|err| ParquetError::External(Box::new(err)))?; Ok(buf.freeze()) } .boxed() @@ -73,11 +70,8 @@ impl AsyncFileReader for AsyncReader { .seek(self.content_length - footer_size as u64) .await .map_err(|err| ParquetError::External(Box::new(err)))?; - let prefetched_footer_content = self - .inner - .read_exact(buf) - .await - .map_err(|err| ParquetError::External(Box::new(err)))?; + let (result, prefetched_footer_content) = self.inner.read(buf).await; + result.map_err(|err| ParquetError::External(Box::new(err)))?; let prefetched_footer_slice = prefetched_footer_content.as_ref(); let prefetched_footer_length = prefetched_footer_slice.len(); @@ -106,11 +100,9 @@ impl AsyncFileReader for AsyncReader { let mut buf = BytesMut::with_capacity(metadata_length); buf.resize(metadata_length, 0); - let bytes = self - .inner - .read_exact(buf) - .await - .map_err(|err| ParquetError::External(Box::new(err)))?; + let (result, bytes) = self.inner.read(buf).await; + result.map_err(|err| ParquetError::External(Box::new(err)))?; + Ok(Arc::new(decode_metadata(&bytes)?)) } } diff --git a/fusio/Cargo.toml b/fusio/Cargo.toml index 1ec016b..2a1614c 100644 --- a/fusio/Cargo.toml +++ b/fusio/Cargo.toml @@ -1,4 +1,5 @@ [package] +description = "Fusio provides lean, minimal cost abstraction and extensible Read / Write trait to multiple storage on multiple poll-based / completion-based async runtime." edition.workspace = true license.workspace = true name = "fusio" @@ -71,14 +72,12 @@ hyper = { version = "1", optional = true, default-features = false, features = [ itertools = { version = "0.13" } monoio = { version = "0.2", optional = true, default-features = false } object_store = { version = "0.11", optional = true, default-features = false } -percent-encoding = { version = "2" } +percent-encoding = { version = "2", default-features = false } quick-xml = { version = "0.36", features = [ "overlapped-lists", "serialize", ], optional = true } -reqwest = { git = "https://github.com/seanmonstar/reqwest.git", optional = true, features = [ - "stream", -] } +reqwest = { version = "0.12.8", optional = true } ring = { version = "0.17", optional = true, default-features = false, features = [ "std", ] } @@ -90,7 +89,7 @@ tokio = { version = "1", optional = true, default-features = false, features = [ "fs", "io-util", ] } -url = { version = "2" } +url = { version = "2", default-features = false } [target.'cfg(target_os = "linux")'.dependencies] tokio-uring = { version = "0.5", default-features = false, optional = true } diff --git a/fusio/benches/tokio.rs b/fusio/benches/tokio.rs index a613131..8fddadf 100644 --- a/fusio/benches/tokio.rs +++ b/fusio/benches/tokio.rs @@ -40,10 +40,7 @@ fn write(c: &mut Criterion) { async move { let file = &mut *(*file).borrow_mut(); - let (result, _) = fusio::dynamic::DynWrite::write_all(file, unsafe { - (&bytes.as_ref()[..]).to_buf_nocopy() - }) - .await; + let (result, _) = fusio::Write::write_all(file, &bytes.as_ref()[..]).await; result.unwrap(); } }) @@ -96,16 +93,15 @@ fn read(c: &mut Criterion) { group.bench_function("fusio read 4K", |b| { b.to_async(&runtime).iter(|| { let file = file.clone(); + let mut bytes = [0u8; 4096]; async move { fusio::dynamic::DynSeek::seek(&mut *(*file).borrow_mut(), 0) .await .unwrap(); - let _ = fusio::dynamic::DynRead::read_exact(&mut *(*file).borrow_mut(), unsafe { - vec![0u8; 4096].to_buf_mut_nocopy() - }) - .await - .unwrap(); + let (result, _) = + fusio::Read::read(&mut *(*file).borrow_mut(), &mut bytes[..]).await; + result.unwrap(); } }) }); diff --git a/fusio/src/dynamic/fs.rs b/fusio/src/dynamic/fs.rs index 5182c77..81e3691 100644 --- a/fusio/src/dynamic/fs.rs +++ b/fusio/src/dynamic/fs.rs @@ -21,12 +21,12 @@ impl<'seek> Seek for Box { } impl<'read> Read for Box { - async fn read_exact(&mut self, buf: B) -> Result { - let buf = DynRead::read_exact(self.as_mut(), unsafe { buf.to_buf_mut_nocopy() }).await?; - Ok(unsafe { B::recover_from_buf_mut(buf) }) + async fn read(&mut self, buf: B) -> (Result, B) { + let (result, buf) = DynRead::read(self.as_mut(), unsafe { buf.to_buf_mut_nocopy() }).await; + (result, unsafe { B::recover_from_buf_mut(buf) }) } - async fn read_to_end(&mut self, buf: Vec) -> Result, Error> { + async fn read_to_end(&mut self, buf: Vec) -> (Result<(), Error>, Vec) { DynRead::read_to_end(self.as_mut(), buf).await } diff --git a/fusio/src/dynamic/mod.rs b/fusio/src/dynamic/mod.rs index 59a1000..a23edfe 100644 --- a/fusio/src/dynamic/mod.rs +++ b/fusio/src/dynamic/mod.rs @@ -7,7 +7,7 @@ use std::{future::Future, pin::Pin}; pub use fs::{DynFile, DynFs}; use crate::{ - buf::{Buf, BufMut, IoBufMut}, + buf::{Buf, BufMut}, Error, MaybeSend, MaybeSync, Read, Seek, Write, }; @@ -50,21 +50,15 @@ impl DynWrite for W { } pub trait DynRead: MaybeSend + MaybeSync { - fn read_exact( + fn read( &mut self, buf: BufMut, - ) -> Pin> + '_>>; + ) -> Pin, BufMut)> + '_>>; fn read_to_end( &mut self, - mut buf: Vec, - ) -> Pin, Error>> + '_>> { - Box::pin(async move { - buf.resize(self.size().await? as usize, 0); - let buf = self.read_exact(unsafe { buf.to_buf_mut_nocopy() }).await?; - Ok(unsafe { Vec::recover_from_buf_mut(buf) }) - }) - } + buf: Vec, + ) -> Pin, Vec)> + '_>>; fn size(&self) -> Pin> + '_>>; } @@ -73,14 +67,18 @@ impl DynRead for R where R: Read, { - fn read_exact( + fn read( &mut self, buf: BufMut, - ) -> Pin> + '_>> { - Box::pin(async move { - let buf = R::read_exact(self, buf).await?; - Ok(buf) - }) + ) -> Pin, BufMut)> + '_>> { + Box::pin(async move { R::read(self, buf).await }) + } + + fn read_to_end( + &mut self, + buf: Vec, + ) -> Pin, Vec)> + '_>> { + Box::pin(async move { R::read_to_end(self, buf).await }) } fn size(&self) -> Pin> + '_>> { diff --git a/fusio/src/impls/disk/mod.rs b/fusio/src/impls/disk/mod.rs index 94014a5..9309025 100644 --- a/fusio/src/impls/disk/mod.rs +++ b/fusio/src/impls/disk/mod.rs @@ -8,6 +8,9 @@ pub(crate) mod tokio_uring; #[cfg(all(feature = "monoio", feature = "fs"))] #[allow(unused)] pub use monoio::fs::*; +#[cfg(all(feature = "monoio", feature = "fs"))] +#[allow(unused)] +pub use monoio::MonoioFile; #[cfg(all(feature = "tokio", feature = "fs"))] #[allow(unused)] pub use tokio::fs::*; diff --git a/fusio/src/impls/disk/monoio/mod.rs b/fusio/src/impls/disk/monoio/mod.rs index bef6569..df61154 100644 --- a/fusio/src/impls/disk/monoio/mod.rs +++ b/fusio/src/impls/disk/monoio/mod.rs @@ -81,23 +81,45 @@ impl Write for MonoioFile { } impl Read for MonoioFile { - async fn read_exact(&mut self, buf: B) -> Result { + async fn read(&mut self, buf: B) -> (Result, B) { let (result, buf) = self .file .as_ref() .expect("read file after closed") - .read_exact_at(MonoioBuf { buf }, self.pos) + .read_at(MonoioBuf { buf }, self.pos) .await; - result?; - self.pos += buf.buf.bytes_init() as u64; - Ok(buf.buf) + match result { + Ok(n) => { + self.pos += n as u64; + (Ok(n as u64), buf.buf) + } + Err(e) => (Err(Error::from(e)), buf.buf), + } } - async fn read_to_end(&mut self, mut buf: Vec) -> Result, Error> { - buf.resize((self.size().await? - self.pos) as usize, 0); + async fn read_to_end(&mut self, mut buf: Vec) -> (Result<(), Error>, Vec) { + match self.size().await { + Ok(size) => { + buf.resize((size - self.pos) as usize, 0); + } + Err(e) => return (Err(e), buf), + } + + let (result, buf) = self + .file + .as_ref() + .expect("read file after closed") + .read_exact_at(MonoioBuf { buf }, self.pos) + .await; - Ok(self.read_exact(buf).await?) + match result { + Ok(_) => { + self.pos += buf.buf.len() as u64; + (Ok(()), buf.buf) + } + Err(e) => (Err(Error::from(e)), buf.buf), + } } async fn size(&self) -> Result { diff --git a/fusio/src/impls/disk/tokio/mod.rs b/fusio/src/impls/disk/tokio/mod.rs index 9f8f44c..567fc0d 100644 --- a/fusio/src/impls/disk/tokio/mod.rs +++ b/fusio/src/impls/disk/tokio/mod.rs @@ -39,14 +39,18 @@ impl Write for File { } impl Read for File { - async fn read_exact(&mut self, mut buf: B) -> Result { - AsyncReadExt::read_exact(self, buf.as_slice_mut()).await?; - Ok(buf) + async fn read(&mut self, mut buf: B) -> (Result, B) { + match AsyncReadExt::read(self, buf.as_slice_mut()).await { + Ok(size) => (Ok(size as u64), buf), + Err(e) => (Err(Error::Io(e)), buf), + } } - async fn read_to_end(&mut self, mut buf: Vec) -> Result, Error> { - let _ = AsyncReadExt::read_to_end(self, &mut buf).await?; - Ok(buf) + async fn read_to_end(&mut self, mut buf: Vec) -> (Result<(), Error>, Vec) { + match AsyncReadExt::read_to_end(self, &mut buf).await { + Ok(_) => (Ok(()), buf), + Err(e) => (Err(Error::Io(e)), buf), + } } async fn size(&self) -> Result { @@ -57,7 +61,6 @@ impl Read for File { impl Seek for File { async fn seek(&mut self, pos: u64) -> Result<(), Error> { AsyncSeekExt::seek(self, SeekFrom::Start(pos)).await?; - Ok(()) } } diff --git a/fusio/src/impls/mod.rs b/fusio/src/impls/mod.rs index afdb70d..73dd2eb 100644 --- a/fusio/src/impls/mod.rs +++ b/fusio/src/impls/mod.rs @@ -1,2 +1,63 @@ pub mod disk; pub mod remotes; + +use std::io::Cursor; + +use crate::{Error, IoBuf, IoBufMut, MaybeSend, Read, Seek, Write}; + +impl Read for Cursor +where + T: AsRef<[u8]> + Unpin + Send + Sync, +{ + async fn read(&mut self, mut buf: B) -> (Result, B) { + match std::io::Read::read(self, buf.as_slice_mut()) { + Ok(n) => (Ok(n as u64), buf), + Err(e) => (Err(Error::Io(e)), buf), + } + } + + async fn read_to_end(&mut self, mut buf: Vec) -> (Result<(), Error>, Vec) { + match std::io::Read::read_to_end(self, &mut buf) { + Ok(n) => { + buf.resize(n, 0); + (Ok(()), buf) + } + Err(e) => (Err(Error::Io(e)), buf), + } + } + + async fn size(&self) -> Result { + Ok(self.get_ref().as_ref().len() as u64) + } +} + +impl Seek for Cursor +where + T: AsRef<[u8]> + MaybeSend, +{ + async fn seek(&mut self, pos: u64) -> Result<(), Error> { + std::io::Seek::seek(self, std::io::SeekFrom::Start(pos)) + .map_err(Error::Io) + .map(|_| ()) + } +} + +impl Write for Cursor<&mut Vec> { + async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + let result = std::io::Write::write_all(self, buf.as_slice()).map_err(Error::Io); + + (result, buf) + } + + async fn sync_data(&self) -> Result<(), Error> { + Ok(()) + } + + async fn sync_all(&self) -> Result<(), Error> { + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + Ok(()) + } +} diff --git a/fusio/src/impls/remotes/aws/mod.rs b/fusio/src/impls/remotes/aws/mod.rs index 01b267f..1574799 100644 --- a/fusio/src/impls/remotes/aws/mod.rs +++ b/fusio/src/impls/remotes/aws/mod.rs @@ -6,6 +6,7 @@ pub(crate) mod options; mod s3; pub(crate) mod sign; +pub use credential::AwsCredential; pub use error::S3Error; pub use s3::S3File; diff --git a/fusio/src/impls/remotes/aws/s3.rs b/fusio/src/impls/remotes/aws/s3.rs index d4b88af..0133c00 100644 --- a/fusio/src/impls/remotes/aws/s3.rs +++ b/fusio/src/impls/remotes/aws/s3.rs @@ -50,8 +50,8 @@ impl S3File { } impl Read for S3File { - async fn read_exact(&mut self, mut buf: B) -> Result { - let mut request = self + async fn read(&mut self, mut buf: B) -> (Result, B) { + let request = self .build_request(Method::GET) .header( RANGE, @@ -62,43 +62,60 @@ impl Read for S3File { ), ) .body(Empty::new()) - .map_err(|e| S3Error::from(HttpError::from(e)))?; - request.sign(&self.options).await.map_err(S3Error::from)?; + .map_err(|e| S3Error::from(HttpError::from(e))); - let response = self + let mut request = match request { + Ok(request) => request, + Err(e) => return (Err(e.into()), buf), + }; + + if let Err(e) = request.sign(&self.options).await.map_err(S3Error::from) { + return (Err(e.into()), buf); + } + + let response = match self .client .send_request(request) .await - .map_err(S3Error::from)?; + .map_err(S3Error::from) + { + Ok(response) => response, + Err(e) => return (Err(e.into()), buf), + }; if !response.status().is_success() { - return Err(S3Error::from(HttpError::HttpNotSuccess { - status: response.status(), - body: String::from_utf8_lossy( - &response - .into_body() - .collect() - .await - .map_err(S3Error::from)? - .to_bytes(), - ) - .to_string(), - }) - .into()); + return ( + Err(S3Error::from(HttpError::HttpNotSuccess { + status: response.status(), + body: String::from_utf8_lossy( + &response + .into_body() + .collect() + .await + .map(|b| b.to_bytes()) + .unwrap_or_default(), + ) + .to_string(), + }) + .into()), + buf, + ); } else { - std::io::copy( - &mut response - .into_body() - .collect() - .await - .map_err(S3Error::from)? - .aggregate() - .reader(), - &mut buf.as_slice_mut(), - )?; - - self.pos += buf.as_slice().len() as u64; - Ok(buf) + match response.into_body().collect().await.map_err(S3Error::from) { + Ok(body) => { + if let Err(e) = std::io::Read::read_exact( + &mut body.aggregate().reader(), + buf.as_slice_mut(), + ) { + return (Err(e.into()), buf); + } + } + Err(e) => return (Err(e.into()), buf), + } + + let size = buf.as_slice().len() as u64; + self.pos += size; + (Ok(size), buf) } } @@ -142,48 +159,58 @@ impl Read for S3File { } } - async fn read_to_end(&mut self, mut buf: Vec) -> Result, Error> { - let mut request = self + async fn read_to_end(&mut self, mut buf: Vec) -> (Result<(), Error>, Vec) { + let mut request = match self .build_request(Method::GET) .header(RANGE, format!("bytes={}-", self.pos,)) .body(Empty::new()) - .map_err(|e| S3Error::from(HttpError::from(e)))?; - request.sign(&self.options).await.map_err(S3Error::from)?; + .map_err(|e| S3Error::from(HttpError::from(e))) + { + Err(e) => return (Err(e.into()), buf), + Ok(request) => request, + }; - let response = self + if let Err(e) = request.sign(&self.options).await.map_err(S3Error::from) { + return (Err(e.into()), buf); + } + + let response = match self .client .send_request(request) .await - .map_err(S3Error::from)?; + .map_err(S3Error::from) + { + Ok(response) => response, + Err(e) => return (Err(e.into()), buf), + }; if !response.status().is_success() { - return Err(S3Error::from(HttpError::HttpNotSuccess { - status: response.status(), - body: String::from_utf8_lossy( - &response - .into_body() - .collect() - .await - .map_err(S3Error::from)? - .to_bytes(), - ) - .to_string(), - }) - .into()); + return ( + Err(S3Error::from(HttpError::HttpNotSuccess { + status: response.status(), + body: String::from_utf8_lossy( + &response + .into_body() + .collect() + .await + .map(|b| b.to_bytes()) + .unwrap_or_default(), + ) + .to_string(), + }) + .into()), + buf, + ); } else { - std::io::copy( - &mut response - .into_body() - .collect() - .await - .map_err(S3Error::from)? - .aggregate() - .reader(), - &mut buf, - )?; - - self.pos += buf.as_slice().len() as u64; - Ok(buf) + match response.into_body().collect().await.map_err(S3Error::from) { + Ok(body) => { + let mut body = body.to_bytes(); + buf.resize(body.len(), 0); + body.copy_to_slice(&mut buf[..]); + (Ok(()), buf) + } + Err(e) => (Err(e.into()), buf), + } } } } @@ -294,7 +321,7 @@ mod tests { let client = Arc::new(client); - let mut s3 = S3File::new(options, "test.txt".into(), client); + let mut s3 = S3File::new(options, "read-write.txt".into(), client); let (result, _) = s3 .write_all(&b"The answer of life, universe and everthing"[..]) @@ -306,7 +333,8 @@ mod tests { let size = s3.size().await.unwrap(); assert_eq!(size, 42); let buf = Vec::new(); - let buf = s3.read_to_end(buf).await.unwrap(); + let (result, buf) = s3.read_to_end(buf).await; + result.unwrap(); assert_eq!(buf, b"The answer of life, universe and everthing"); } } diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index f6497a3..1de65cd 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -7,7 +7,7 @@ pub mod fs; pub mod impls; pub mod path; -use std::{future::Future, io::Cursor}; +use std::future::Future; pub use buf::{IoBuf, IoBufMut}; #[cfg(all(feature = "dyn", feature = "fs"))] @@ -61,15 +61,15 @@ pub trait Write: MaybeSend + MaybeSync { } pub trait Read: MaybeSend + MaybeSync { - fn read_exact( + fn read( &mut self, buf: B, - ) -> impl Future> + MaybeSend; + ) -> impl Future, B)> + MaybeSend; fn read_to_end( &mut self, buf: Vec, - ) -> impl Future, Error>> + MaybeSend; + ) -> impl Future, Vec)> + MaybeSend; fn size(&self) -> impl Future> + MaybeSend; } @@ -78,56 +78,6 @@ pub trait Seek: MaybeSend { fn seek(&mut self, pos: u64) -> impl Future> + MaybeSend; } -impl Read for Cursor -where - T: AsRef<[u8]> + Unpin + Send + Sync, -{ - async fn read_exact(&mut self, mut buf: B) -> Result { - std::io::Read::read_exact(self, buf.as_slice_mut())?; - Ok(buf) - } - - async fn read_to_end(&mut self, mut buf: Vec) -> Result, Error> { - let _ = std::io::Read::read_to_end(self, &mut buf)?; - Ok(buf) - } - - async fn size(&self) -> Result { - Ok(self.get_ref().as_ref().len() as u64) - } -} - -impl Seek for Cursor -where - T: AsRef<[u8]> + MaybeSend, -{ - async fn seek(&mut self, pos: u64) -> Result<(), Error> { - std::io::Seek::seek(self, std::io::SeekFrom::Start(pos)) - .map_err(Error::Io) - .map(|_| ()) - } -} - -impl Write for Cursor<&mut Vec> { - async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { - let result = std::io::Write::write_all(self, buf.as_slice()).map_err(Error::Io); - - (result, buf) - } - - async fn sync_data(&self) -> Result<(), Error> { - Ok(()) - } - - async fn sync_all(&self) -> Result<(), Error> { - Ok(()) - } - - async fn close(&mut self) -> Result<(), Error> { - Ok(()) - } -} - impl Seek for &mut S { fn seek(&mut self, pos: u64) -> impl Future> + MaybeSend { S::seek(self, pos) @@ -135,17 +85,17 @@ impl Seek for &mut S { } impl Read for &mut R { - fn read_exact( + fn read( &mut self, buf: B, - ) -> impl Future> + MaybeSend { - R::read_exact(self, buf) + ) -> impl Future, B)> + MaybeSend { + R::read(self, buf) } fn read_to_end( &mut self, buf: Vec, - ) -> impl Future, Error>> + MaybeSend { + ) -> impl Future, Vec)> + MaybeSend { R::read_to_end(self, buf) } @@ -232,18 +182,26 @@ mod tests { where R: Read, { - async fn read_exact(&mut self, buf: B) -> Result { - self.r - .read_exact(buf) - .await - .inspect(|buf| self.cnt += buf.bytes_init()) + async fn read(&mut self, buf: B) -> (Result, B) { + let (result, buf) = self.r.read(buf).await; + match result { + Ok(result) => { + self.cnt += buf.bytes_init(); + (Ok(result), buf) + } + Err(e) => (Err(e), buf), + } } - async fn read_to_end(&mut self, buf: Vec) -> Result, Error> { - self.r - .read_to_end(buf) - .await - .inspect(|buf| self.cnt += buf.bytes_init()) + async fn read_to_end(&mut self, buf: Vec) -> (Result<(), Error>, Vec) { + let (result, buf) = self.r.read_to_end(buf).await; + match result { + Ok(()) => { + self.cnt += buf.bytes_init(); + (Ok(()), buf) + } + Err(e) => (Err(e), buf), + } } async fn size(&self) -> Result { @@ -276,7 +234,8 @@ mod tests { reader.seek(0).await.unwrap(); let mut buf = vec![]; - buf = reader.read_to_end(buf).await.unwrap(); + let (result, buf) = reader.read_to_end(buf).await; + result.unwrap(); assert_eq!(buf.bytes_init(), 4); assert_eq!(buf.as_slice(), &[2, 0, 2, 4]); @@ -285,7 +244,8 @@ mod tests { reader.seek(2).await.unwrap(); let mut buf = vec![]; - buf = reader.read_to_end(buf).await.unwrap(); + let (result, buf) = reader.read_to_end(buf).await; + result.unwrap(); assert_eq!(buf.bytes_init(), 2); assert_eq!(buf.as_slice(), &[2, 4]); @@ -344,56 +304,14 @@ mod tests { .await?; file.write_all("Hello! world".as_bytes()).await.0?; - assert!(file.read_exact(vec![0u8; 24]).await.is_err()); - } - { - let mut file = fs - .open_options( - &Path::from_absolute_path(&work_file_path)?, - OpenOptions::default().append(true), - ) - .await?; - file.write_all("Hello! fusio".as_bytes()).await.0?; - - assert!(file.read_exact(vec![0u8; 24]).await.is_err()); - } - { - let mut file = fs - .open_options( - &Path::from_absolute_path(&work_file_path)?, - OpenOptions::default(), - ) - .await?; + file.sync_all().await.unwrap(); - assert_eq!( - "Hello! worldHello! fusio".as_bytes(), - &file.read_to_end(Vec::new()).await? - ) - } - fs.remove(&Path::from_filesystem_path(&work_file_path)?) - .await?; - assert!(!work_file_path.exists()); + file.seek(0).await; - let mut file_set = HashSet::new(); - for i in 0..10 { - let _ = fs - .open_options( - &Path::from_absolute_path(work_dir_path.join(i.to_string()))?, - OpenOptions::default().create(true).write(true), - ) - .await?; - file_set.insert(i.to_string()); - } - - let path = Path::from_filesystem_path(&work_dir_path)?; - let mut file_stream = Box::pin(fs.list(&path).await?); - - while let Some(file_meta) = file_stream.next().await { - if let Some(file_name) = file_meta?.path.filename() { - assert!(file_set.remove(file_name)); - } + let (result, buf) = file.read(vec![0u8; 12]).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! world"); } - assert!(file_set.is_empty()); Ok(()) } diff --git a/fusio/src/path/mod.rs b/fusio/src/path/mod.rs index bd60280..8e9df23 100644 --- a/fusio/src/path/mod.rs +++ b/fusio/src/path/mod.rs @@ -263,6 +263,8 @@ pub fn path_to_local(location: &Path) -> Result { #[cfg(test)] mod tests { + use std::fs::canonicalize; + use tempfile::NamedTempFile; use super::*; @@ -513,6 +515,6 @@ mod tests { let this_path = Path::from_filesystem_path(temp_file.path()).unwrap(); let std_path = path_to_local(&this_path).unwrap(); - assert_eq!(std_path, temp_file.path()); + assert_eq!(std_path, canonicalize(temp_file.path()).unwrap()); } }