Skip to content

Commit

Permalink
feat: update README
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Oct 1, 2024
1 parent 147a072 commit 04fed06
Show file tree
Hide file tree
Showing 24 changed files with 532 additions and 262 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["fusio", "fusio-object-store", "fusio-parquet"]
members = ["examples", "fusio", "fusio-object-store", "fusio-parquet"]
resolver = "2"

[workspace.package]
Expand Down
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Fusio

Fusio provides [Read](https://github.com/tonbo-io/fusio/blob/main/fusio/src/lib.rs#L81) / [Write](https://github.com/tonbo-io/fusio/blob/main/fusio/src/lib.rs#L63) trait to operate multiple storage (local disk, Amazon S3) on multiple poll-based ([tokio](https://github.com/tokio-rs/tokio)) / completion-based async runtime ([tokio-uring](https://github.com/tokio-rs/tokio-uring), [monoio](https://github.com/bytedance/monoio)) with:
- lean: binary size is at least 14x smaller than others
- minimal cost abstraction: compare with bare storage backend, trait definitions promise dispatching file operations without extra costs
- extensible: expose traits to support implementing storage backend as [third-party crates](https://github.com/tonbo-io/fusio/tree/main/fusio-object-store)

> **Fusio is now at preview version, please join our [community](https://discord.gg/j27XVFVmJM) to attend its development and semantic / behavior discussion.**
## Why need Fusio?

Since we start to integrate object storage in [Tonbo](https://github.com/tonbo-io/tonbo), we need file and file system abstractions to dispatch read and write operations to multiple storage backend: memory, local disk, remote object storage and so on. We found that the exist solution has limitations as below:
- local or remote file system accessing is not able to be usable in kinds of async runtimes (not only completion-based runime, but also Python / JS event loop)
- most of VFS implementations are designed for backend server scenarios. As an embedded database, Tonbo needs a lean implementation for embedded, and also a set of traits, allows to extend asynchronous file / file system approach as third-party crates.

For more context, please check [apache/arrow-rs#6051](https://github.com/apache/arrow-rs/issues/6051).

## How to use it?

Because it is not possible to make poll-based async runtime compatible with completion-based at runtime, `fusio` supports switch runtime at compile time

### Installation
```toml
fusio = { version = "*", features = ["tokio"] }
```

### Examples
-

## When choose fusio?

Targets of fusio is different with [object_store](https://github.com/apache/arrow-rs/tree/master/object_store) or [opendal](https://github.com/apache/opendal).

### compare with `object_store`

`object_store` is locked on [tokio](https://github.com/tokio-rs/tokio) runtime in the current, and also [bytes](https://github.com/apache/arrow-rs/blob/master/object_store/src/payload.rs#L23). `fusio` chooses completion-based like API (inspired by [monoio](https://docs.rs/monoio/latest/monoio/io/trait.AsyncReadRent.html)) to get the minimal cost abstraction in all kinds of async runtimes.

`fusio` also uses [IoBuf](https://github.com/tonbo-io/fusio/blob/main/fusio/src/lib.rs#L53) / [IoBufMut](https://github.com/tonbo-io/fusio/blob/main/fusio/src/lib.rs#L64) to allow `&[u8]`, `Vec<u8>` avoid potential runtime costs. If you are not aware of vendor lock-in, try `object_store`, as the official implementation, it integrates well with `arrow` and `parquet`.

### compare with `opendal`

`fusio` does not aim to be a full data access layer like `opendal`. `fusio` is able to enable features and their dependencies on by one. The default binary size of `fusio` is 245KB, which is much more smaller than `opendal` (8.9MB). If you need a full ecosystem of DAL (tracing, cache, etc.) try `opendal`.

Also,

## Roadmap
- abstractions
- [x] file operations
- [x] (partial) file system operations
- storage backend implementations
- disk
- [x] tokio
- [x] tokio-uring
- [x] monoio
- [x] network
- [x] HTTP client trait wi
- [x] network storage runtime support
- [x] tokio (base on reqwest)
- [ ] monoio (base on hyper-tls)
- [ ] tokio-uring (base on hyper-tls)
- [x] Amazon S3
- [ ] Azure Blob Storage
- [ ] Cloudflare R2
- [ ] in-memory
- [ ] [conditional operations](https://aws.amazon.com/cn/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/)
- extensions
- [x] parquet support
- [x] object_store support

## Credits
- `monoio`: all core traits: buffer, read and write are highly inspire by it
- `futures`: how it designs abstractions and organizes several crates (core, util, etc.) to avoid coupling impact `fusio`'s design
- `opendal`: compile-time poll-based / completion-based runtime switch insipres `fusio`
- `object_store`: `fusio` copies S3 credential and path behaviors from it
16 changes: 16 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
edition.workspace = true
license.workspace = true
name = "examples"
repository.workspace = true
version = "0.1.0"

[features]
default = ["fusio/aws", "tokio"]
monoio = ["dep:monoio", "fusio/monoio"]
tokio = ["dep:tokio", "fusio/tokio"]

[dependencies]
fusio = { path = "../fusio" }
monoio = { version = "0.2", optional = true }
tokio = { version = "1.0", features = ["full"], optional = true }
21 changes: 21 additions & 0 deletions examples/src/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::sync::Arc;

use fusio::disk::LocalFs;
use fusio::dynamic::DynFile;
use fusio::DynFs;

#[allow(unused)]
async fn use_fs() {
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});

let mut file: Box<dyn DynFile> = Box::new(fs.open(&"foo.txt".into()).await.unwrap());

let write_buf = "hello, world".as_bytes();
let mut read_buf = [0; 12];

let (result, _, read_buf) =
crate::write_without_runtime_awareness(&mut file, write_buf, &mut read_buf[..]).await;
result.unwrap();

assert_eq!(&read_buf, b"hello, world");
}
33 changes: 33 additions & 0 deletions examples/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
mod fs;
mod multi_runtime;
mod object;
mod s3;

use fusio::{Error, IoBuf, IoBufMut, Read, Seek, Write};

#[allow(unused)]
async fn write_without_runtime_awareness<F, B, BM>(
file: &mut F,
write_buf: B,
read_buf: BM,
) -> (Result<(), Error>, B, BM)
where
F: Read + Write + Seek,
B: IoBuf,
BM: IoBufMut,
{
let (result, write_buf) = file.write_all(write_buf).await;
if result.is_err() {
return (result, write_buf, read_buf);
}

file.sync_all().await.unwrap();
file.seek(0).await.unwrap();

let (result, read_buf) = file.read(read_buf).await;
if result.is_err() {
return (result.map(|_| ()), write_buf, read_buf);
}

(Ok(()), write_buf, read_buf)
}
31 changes: 31 additions & 0 deletions examples/src/multi_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::write_without_runtime_awareness;

#[allow(unused)]
#[cfg(feature = "tokio")]
async fn use_tokio_file() {
use tokio::fs::File;

let mut file = File::open("foo.txt").await.unwrap();
let write_buf = "hello, world".as_bytes();
let mut read_buf = [0; 12];
let (result, _, read_buf) =
write_without_runtime_awareness(&mut file, write_buf, &mut read_buf[..]).await;
result.unwrap();
assert_eq!(&read_buf, b"hello, world");
}

#[allow(unused)]
#[cfg(feature = "monoio")]
async fn use_monoio_file() {
use fusio::disk::MonoioFile;
use monoio::fs::File;

let mut file: MonoioFile = File::open("foo.txt").await.unwrap().into();
let write_buf = "hello, world".as_bytes();
let read_buf = vec![0; 12];
// completion-based runtime has to pass owned buffer to the function
let (result, _, read_buf) =
write_without_runtime_awareness(&mut file, write_buf, read_buf).await;
result.unwrap();
assert_eq!(&read_buf, b"hello, world");
}
25 changes: 25 additions & 0 deletions examples/src/object.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use fusio::dynamic::DynFile;
use fusio::{Error, IoBuf, IoBufMut, Read, Write};

#[allow(unused)]
async fn object_safe_file_trait<B, BM>(
mut file: Box<dyn DynFile>,
write_buf: B,
read_buf: BM,
) -> (Result<(), Error>, B, BM)
where
B: IoBuf,
BM: IoBufMut,
{
let (result, write_buf) = file.write_all(write_buf).await;
if result.is_err() {
return (result, write_buf, read_buf);
}

let (result, read_buf) = file.read(read_buf).await;
if result.is_err() {
return (result.map(|_| ()), write_buf, read_buf);
}

(Ok(()), write_buf, read_buf)
}
32 changes: 32 additions & 0 deletions examples/src/s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::env;
use std::sync::Arc;

use fusio::remotes::aws::fs::AmazonS3Builder;
use fusio::remotes::aws::AwsCredential;
use fusio::DynFs;

use crate::write_without_runtime_awareness;

#[allow(unused)]
async fn use_fs() {
let key_id = env::var("AWS_ACCESS_KEY_ID").unwrap();
let secret_key = env::var("AWS_SECRET_ACCESS_KEY").unwrap();

let s3 = AmazonS3Builder::new("fusio-test".into())
.credential(AwsCredential {
key_id,
secret_key,
token: None,
})
.region("ap-southeast-1".into())
.sign_payload(true)
.build();

let fs: Arc<dyn DynFs> = Arc::new(s3);
let _ = write_without_runtime_awareness(
&mut fs.open(&"foo.txt".into()).await.unwrap(),
"hello, world".as_bytes(),
&mut [0; 12][..],
)
.await;
}
1 change: 1 addition & 0 deletions fusio-object-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[package]
description = "the object_store integration of Fusio."
edition.workspace = true
license.workspace = true
name = "fusio-object-store"
Expand Down
34 changes: 25 additions & 9 deletions fusio-object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,35 @@ impl<O: ObjectStore> S3File<O> {
&mut self,
range: GetRange,
mut buf: B,
) -> Result<B, Error> {
) -> (Result<u64, Error>, B) {
let opts = GetOptions {
range: Some(range),
..Default::default()
};
let result = self
let result = match self
.inner
.get_opts(&self.path, opts)
.await
.map_err(BoxedError::from)?;
let bytes = result.bytes().await.map_err(BoxedError::from)?;
.map_err(BoxedError::from)
{
Ok(result) => result,
Err(e) => return (Err(e.into()), buf),
};

let bytes = match result.bytes().await.map_err(BoxedError::from) {
Ok(bytes) => bytes,
Err(e) => return (Err(e.into()), buf),
};

buf.set_init(bytes.len());

buf.as_slice_mut().copy_from_slice(&bytes);
Ok(buf)
(Ok(bytes.len() as u64), buf)
}
}

impl<O: ObjectStore> Read for S3File<O> {
async fn read_exact<B: IoBufMut>(&mut self, buf: B) -> Result<B, Error> {
async fn read<B: IoBufMut>(&mut self, buf: B) -> (Result<u64, Error>, B) {
let pos = self.pos as usize;

let range = GetRange::Bounded(Range {
Expand All @@ -49,11 +57,18 @@ impl<O: ObjectStore> Read for S3File<O> {
self.read_with_range(range, buf).await
}

async fn read_to_end(&mut self, buf: Vec<u8>) -> Result<Vec<u8>, Error> {
async fn read_to_end(&mut self, buf: Vec<u8>) -> (Result<(), Error>, Vec<u8>) {
let pos = self.pos as usize;
let range = GetRange::Offset(pos);

self.read_with_range(range, buf).await
let (result, buf) = self.read_with_range(range, buf).await;
match result {
Ok(size) => {
self.pos += size;
(Ok(()), buf)
}
Err(e) => (Err(e), buf),
}
}

async fn size(&self) -> Result<u64, Error> {
Expand Down Expand Up @@ -147,7 +162,8 @@ mod tests {
result.unwrap();

let mut buf = vec![0_u8; bytes.len()];
let buf = store.read_exact(&mut buf[..]).await.unwrap();
let (result, buf) = store.read(&mut buf[..]).await;
result.unwrap();
assert_eq!(buf, &bytes[..]);
}
}
Expand Down
4 changes: 1 addition & 3 deletions fusio-parquet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[package]
description = "Parquet reader and writer implementations for Fusio."
edition.workspace = true
license.workspace = true
name = "fusio-parquet"
Expand All @@ -20,6 +21,3 @@ tokio = { version = "1.40" }
arrow = "53"
rand = "0.8"
tempfile = "3.12.0"

[lib]
path = "src/lib.rs"
22 changes: 7 additions & 15 deletions fusio-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@ impl AsyncFileReader for AsyncReader {
.seek(range.start as u64)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
let buf = self
.inner
.read_exact(buf)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
let (result, buf) = self.inner.read(buf).await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;
Ok(buf.freeze())
}
.boxed()
Expand All @@ -73,11 +70,8 @@ impl AsyncFileReader for AsyncReader {
.seek(self.content_length - footer_size as u64)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
let prefetched_footer_content = self
.inner
.read_exact(buf)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
let (result, prefetched_footer_content) = self.inner.read(buf).await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;
let prefetched_footer_slice = prefetched_footer_content.as_ref();
let prefetched_footer_length = prefetched_footer_slice.len();

Expand Down Expand Up @@ -106,11 +100,9 @@ impl AsyncFileReader for AsyncReader {
let mut buf = BytesMut::with_capacity(metadata_length);
buf.resize(metadata_length, 0);

let bytes = self
.inner
.read_exact(buf)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
let (result, bytes) = self.inner.read(buf).await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;

Ok(Arc::new(decode_metadata(&bytes)?))
}
}
Expand Down
Loading

0 comments on commit 04fed06

Please sign in to comment.