Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into opfs-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Nov 11, 2024
2 parents c9a65f1 + d793c8b commit 969e8f9
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 38 deletions.
4 changes: 2 additions & 2 deletions fusio-dispatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ edition.workspace = true
license.workspace = true
name = "fusio-dispatch"
repository.workspace = true
version = "0.2.1"
version = "0.2.2"

[features]
aws = ["fusio/aws"]
Expand All @@ -15,6 +15,6 @@ object_store = ["dep:fusio-object-store", "object_store/aws"]
tokio = ["fusio/tokio"]

[dependencies]
fusio = { version = "0.3.0", path = "../fusio" }
fusio = { version = "0.3.2", path = "../fusio" }
fusio-object-store = { version = "0.2.0", path = "../fusio-object-store", optional = true }
object_store = { version = "0.11", optional = true }
4 changes: 2 additions & 2 deletions fusio-object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ edition.workspace = true
license.workspace = true
name = "fusio-object-store"
repository.workspace = true
version = "0.2.1"
version = "0.2.2"

[dependencies]
async-stream = { version = "0.3" }
fusio = { version = "0.3.0", path = "../fusio", features = [
fusio = { version = "0.3.2", path = "../fusio", features = [
"bytes",
"dyn",
"object_store",
Expand Down
2 changes: 1 addition & 1 deletion fusio-object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mod tests {
if let Ok((region, bucket_name, access_key_id, secret_access_key)) = fn_env() {
let path = object_store::path::Path::parse("/test_file").unwrap();
let s3 = AmazonS3Builder::new()
.with_aws_region(region)
.with_region(region)
.with_bucket_name(bucket_name)
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key)
Expand Down
4 changes: 2 additions & 2 deletions fusio-opendal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repository.workspace = true
version = "0.1.0"

[dependencies]
fusio = { version = "0.3.0", path = "../fusio", features = [ "bytes" ] }
fusio = { version = "0.3.2", path = "../fusio", features = ["bytes"] }
futures-core = { version = "0.3" }
futures-util = { version = "0.3" }
opendal = { version ="0.50.1", default-features = false }
opendal = { version = "0.50.1", default-features = false }
7 changes: 4 additions & 3 deletions fusio-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ edition.workspace = true
license.workspace = true
name = "fusio-parquet"
repository.workspace = true
version = "0.2.1"
version = "0.2.2"

[features]
default = []
tokio = ["fusio/tokio"]
opfs = ["fusio/opfs"]
tokio = ["fusio/tokio"]

[dependencies]
bytes = { workspace = true }
cfg-if = "1.0.0"
fusio = { version = "0.3.0", path = "../fusio", features = [
fusio = { version = "0.3.2", path = "../fusio", features = [
"bytes",
"dyn",
"tokio",
] }
futures = { version = "0.3" }
parquet = { version = "53", default-features = false, features = [
Expand Down
4 changes: 2 additions & 2 deletions fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ license.workspace = true
name = "fusio"
readme = "../README.md"
repository.workspace = true
version = "0.3.1"
version = "0.3.2"

[features]
aws = [
Expand Down Expand Up @@ -94,7 +94,7 @@ thiserror = "1"
tokio = { version = "1", optional = true, default-features = false, features = [
"io-util",
] }
url = { version = "2" }
url = { version = "2.5.3", default-features = false, features = ["std"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
Expand Down
35 changes: 34 additions & 1 deletion fusio/src/dynamic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use fs::{DynFile, DynFs};

use crate::{
buf::{Slice, SliceMut},
Error, MaybeSend, MaybeSync, Read, Write,
Error, IoBuf, IoBufMut, MaybeSend, MaybeSync, Read, Write,
};

pub trait MaybeSendFuture: Future + MaybeSend {}
Expand Down Expand Up @@ -50,6 +50,22 @@ impl<W: Write> DynWrite for W {
}
}

impl<'write> Write for Box<dyn DynWrite + 'write> {
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
let (result, buf) =
DynWrite::write_all(self.as_mut(), unsafe { buf.slice_unchecked(..) }).await;
(result, unsafe { B::recover_from_slice(buf) })
}

async fn flush(&mut self) -> Result<(), Error> {
DynWrite::flush(self.as_mut()).await
}

async fn close(&mut self) -> Result<(), Error> {
DynWrite::close(self.as_mut()).await
}
}

pub trait DynRead: MaybeSend + MaybeSync {
//! Dyn compatible(object safety) version of [`Read`].
//! Same as [`DynWrite`].
Expand Down Expand Up @@ -93,3 +109,20 @@ where
Box::pin(R::size(self))
}
}

impl<'read> Read for Box<dyn DynRead + 'read> {
async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
let (result, buf) =
DynRead::read_exact_at(self.as_mut(), unsafe { buf.slice_mut_unchecked(..) }, pos)
.await;
(result, unsafe { B::recover_from_slice_mut(buf) })
}

async fn read_to_end_at(&mut self, buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
DynRead::read_to_end_at(self.as_mut(), buf, pos).await
}

async fn size(&self) -> Result<u64, Error> {
DynRead::size(self.as_ref()).await
}
}
50 changes: 25 additions & 25 deletions fusio/src/impls/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,40 +92,26 @@ impl<F: Read> BufReader<F> {
pub struct BufWriter<F> {
inner: F,
buf: Option<Vec<u8>>,
capacity: usize,
pos: usize,
}

impl<F> BufWriter<F> {
pub fn new(file: F, capacity: usize) -> Self {
Self {
inner: file,
buf: Some(Vec::with_capacity(capacity)),
capacity,
pos: 0,
}
}
}

impl<F: Read> Read for BufWriter<F> {
async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
self.inner.read_exact_at(buf, pos).await
}

async fn read_to_end_at(&mut self, buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
self.inner.read_to_end_at(buf, pos).await
}

async fn size(&self) -> Result<u64, Error> {
let size = self.inner.size().await?;
Ok(size)
}
}

impl<F: Write> Write for BufWriter<F> {
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
let (len, capacity) = {
let buf = self.buf.as_ref().expect("no buffer available");
(buf.len(), buf.capacity())
};

let written_size = buf.bytes_init();
if self.pos + written_size > self.capacity {
if len + written_size > capacity {
let result = self.flush().await;
if result.is_err() {
return (result, buf);
Expand All @@ -136,12 +122,11 @@ impl<F: Write> Write for BufWriter<F> {
// 1. There is no enough space to hold data, which means buffer is empty and written size >
// capacity
// 2. Data can be written to buffer
if self.pos + written_size > self.capacity {
if len + written_size > capacity {
self.inner.write_all(buf).await
} else {
let owned_buf = self.buf.as_mut().unwrap();
owned_buf.extend_from_slice(buf.as_slice());
self.pos += written_size;
(Ok(()), buf)
}
}
Expand All @@ -152,9 +137,8 @@ impl<F: Write> Write for BufWriter<F> {
let (result, mut data) = self.inner.write_all(data).await;
result?;

data.drain(..self.pos);
data.clear();
self.buf = Some(data);
self.pos = 0;
self.inner.flush().await?;

Ok(())
Expand All @@ -173,7 +157,23 @@ impl<F: Write> Write for BufWriter<F> {
pub(crate) mod tests {
use tokio::io::AsyncWriteExt;

use crate::{buffered::BufReader, Read};
use super::BufWriter;
use crate::{buffered::BufReader, Error, IoBufMut, Read};

impl<F: Read> Read for BufWriter<F> {
async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
self.inner.read_exact_at(buf, pos).await
}

async fn read_to_end_at(&mut self, buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
self.inner.read_to_end_at(buf, pos).await
}

async fn size(&self) -> Result<u64, Error> {
let size = self.inner.size().await?;
Ok(size)
}
}

#[cfg(all(feature = "tokio", not(feature = "completion-based")))]
#[tokio::test]
Expand Down

0 comments on commit 969e8f9

Please sign in to comment.