Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: wrap parquet reader/writer with Arc and Mutex #110

Merged
merged 4 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: build
args: --target wasm32-unknown-unknown --package fusio-parquet --features=opfs
args: --target wasm32-unknown-unknown --package fusio-parquet --features=wasm

- name: Run cargo build on fusio-dispatch
uses: actions-rs/cargo@v1
Expand Down Expand Up @@ -165,4 +165,4 @@ jobs:
- name: Run wasm-pack test on fusion-parquet
run: |
export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/
wasm-pack test --chrome --headless fusio-parquet --features opfs
wasm-pack test --chrome --headless fusio-parquet --features wasm
2 changes: 1 addition & 1 deletion examples/opfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ crate-type = ["cdylib", "rlib"]
arrow = "53"
fusio = { path = "../../fusio", features = ["opfs"] }
fusio-dispatch = { path = "../../fusio-dispatch", features = ["opfs"] }
fusio-parquet = { path = "../../fusio-parquet", features = ["opfs"] }
fusio-parquet = { path = "../../fusio-parquet", features = ["wasm"] }
futures = { version = "0.3" }
parquet = { version = "53", default-features = false, features = [
"arrow",
Expand Down
2 changes: 1 addition & 1 deletion fusio-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ version = "0.2.2"

[features]
default = []
opfs = ["fusio/opfs"]
wasm = ["fusio/opfs"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

web should be better, because not all wasm target platform support opfs.

tokio = ["fusio/tokio"]

[dependencies]
Expand Down
51 changes: 28 additions & 23 deletions fusio-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use parquet::{
const PREFETCH_FOOTER_SIZE: usize = 512 * 1024;

pub struct AsyncReader {
#[cfg(feature = "wasm")]
inner: Arc<futures::lock::Mutex<Box<dyn DynFile>>>,
#[cfg(not(feature = "wasm"))]
inner: Box<dyn DynFile>,
content_length: u64,
// The prefetch size for fetching file footer.
Expand All @@ -31,6 +34,9 @@ fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize {

impl AsyncReader {
pub async fn new(reader: Box<dyn DynFile>, content_length: u64) -> Result<Self, fusio::Error> {
#[cfg(feature = "wasm")]
#[allow(clippy::arc_with_non_send_sync)]
let reader = Arc::new(futures::lock::Mutex::new(reader));
Ok(Self {
inner: reader,
content_length,
Expand All @@ -51,23 +57,21 @@ impl AsyncFileReader for AsyncReader {
buf.resize(len, 0);

cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
let (sender, receiver) =
futures::channel::oneshot::channel::<Result<Bytes, ParquetError>>();
let opfs = unsafe {
std::mem::transmute::<&Box<dyn DynFile>, &Box<fusio::disk::OPFSFile>>(&self.inner)
};
let file_handle = opfs.file_handle().unwrap();
if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] {
let (sender, receiver) = futures::channel::oneshot::channel::<Result<Bytes, ParquetError>>();

let reader = self.inner.clone();
wasm_bindgen_futures::spawn_local(async move {

let (result, buf) = file_handle.read_exact_at(buf, range.start as u64).await;

let ret = match result {
Ok(_) => Ok(buf.freeze()),
Err(err) => Err(ParquetError::External(Box::new(err)))
let result = {
let mut guard = reader.lock().await;
let (result, buf) = guard.read_exact_at(buf, range.start as u64).await;
match result {
Ok(_) => Ok(buf.freeze()),
Err(err) => Err(ParquetError::External(Box::new(err)))
}
};
let _ = sender.send(ret);

let _ = sender.send(result);
});

async move {
Expand All @@ -93,18 +97,15 @@ impl AsyncFileReader for AsyncReader {
let content_length = self.content_length;

cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] {
let mut buf = BytesMut::with_capacity(footer_size);
buf.resize(footer_size, 0);
let (sender, receiver) =
futures::channel::oneshot::channel::<Result<Arc<ParquetMetaData>, ParquetError>>();
let opfs = unsafe {
std::mem::transmute::<&Box<dyn DynFile>, &Box<fusio::disk::OPFSFile>>(&self.inner)
};
let file_handle = opfs.file_handle().unwrap();
let (sender, receiver) = futures::channel::oneshot::channel::<Result<Arc<ParquetMetaData>, ParquetError>>();

let reader = self.inner.clone();
wasm_bindgen_futures::spawn_local(async move {
let (result, prefetched_footer_content) = file_handle
let mut guard = reader.lock().await;
let (result, prefetched_footer_content) = guard
.read_exact_at(buf, content_length - footer_size as u64)
.await;
if let Err(err) = result {
Expand All @@ -130,19 +131,23 @@ impl AsyncFileReader for AsyncReader {
- FOOTER_SIZE)
..(prefetched_footer_length - FOOTER_SIZE)];

drop(guard);

let _ = sender.send(ParquetMetaDataReader::decode_metadata(buf)
.map(|meta| Arc::new(meta)));
} else {
let mut buf = BytesMut::with_capacity(metadata_length);
buf.resize(metadata_length, 0);

let (result, bytes) = file_handle
let (result, bytes) = guard
.read_exact_at(
buf,
content_length - metadata_length as u64 - FOOTER_SIZE as u64,
)
.await;

drop(guard);

if let Err(err) = result {
let _ = sender.send(Err(ParquetError::External(Box::new(err))));
return ;
Expand Down
61 changes: 40 additions & 21 deletions fusio-parquet/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,42 @@ use futures::future::BoxFuture;
use parquet::{arrow::async_writer::AsyncFileWriter, errors::ParquetError};

pub struct AsyncWriter {
#[cfg(feature = "wasm")]
#[allow(clippy::arc_with_non_send_sync)]
inner: Option<std::sync::Arc<futures::lock::Mutex<Box<dyn DynFile>>>>,
#[cfg(not(feature = "wasm"))]
inner: Option<Box<dyn DynFile>>,
#[cfg(feature = "opfs")]
pos: u64,
}

unsafe impl Send for AsyncWriter {}
impl AsyncWriter {
pub fn new(writer: Box<dyn DynFile>) -> Self {
Self {
inner: Some(writer),
#[cfg(feature = "opfs")]
pos: 0,
#[cfg(feature = "wasm")]
#[allow(clippy::arc_with_non_send_sync)]
let writer = std::sync::Arc::new(futures::lock::Mutex::new(writer));
{
Self {
inner: Some(writer),
}
}
}
}

impl AsyncFileWriter for AsyncWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] {
match self.inner.as_mut() {
Some(writer) => {
let pos = self.pos;
self.pos += bs.len() as u64;
let (sender, receiver) =
futures::channel::oneshot::channel::<Result<(), ParquetError>>();
let opfs = unsafe {
std::mem::transmute::<&Box<dyn DynFile>, &Box<fusio::disk::OPFSFile>>(writer)
};
let handle = opfs.file_handle().unwrap();

let (sender, receiver) = futures::channel::oneshot::channel::<Result<(), ParquetError>>();
let writer = writer.clone();
wasm_bindgen_futures::spawn_local(async move {
let (result, _) = handle.write_at(bs, pos).await;
let result = {
let mut guard = writer.lock().await;
let (result, _) = guard.write_all(bs).await;
result
};

let _ = sender.send(result
.map_err(|err| ParquetError::External(Box::new(err))));
});
Expand Down Expand Up @@ -67,10 +70,26 @@ impl AsyncFileWriter for AsyncWriter {

fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
Box::pin(async move {
Ok(())
})
if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] {
match self.inner.take() {
Some(writer) => {
let (sender, receiver) = futures::channel::oneshot::channel::<Result<(), ParquetError>>();
wasm_bindgen_futures::spawn_local(async move {
let result = {
let mut guard = writer.lock().await;
guard.close().await
};
let _ = sender.send(result
.map_err(|err| ParquetError::External(Box::new(err))));
});
Box::pin(async move {
receiver.await.unwrap()
})
}
None => Box::pin(async move {
Ok(())
})
}
} else {
Box::pin(async move {
if let Some(mut writer) = self.inner.take() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(test)]
#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
#[cfg(all(feature = "wasm", target_arch = "wasm32"))]
pub(crate) mod tests {

wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
Expand Down
1 change: 0 additions & 1 deletion fusio/src/impls/disk/opfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ impl Write for OPFSFile {
if let Some(writer) = writer {
JsFuture::from(writer.close()).await.map_err(wasm_err)?;
}
self.file_handle.take();
Ok(())
}
}
Expand Down
Loading