From f1736953ecc5a01cc15de88b5893a3d532a694ff Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Sat, 7 Dec 2024 02:20:47 +0800 Subject: [PATCH 1/4] refactor: wrap parquet reader/writer with Arc and Mutex --- fusio-parquet/src/reader.rs | 47 ++++++++++++++++-------------- fusio-parquet/src/writer.rs | 57 ++++++++++++++++++++++++------------- 2 files changed, 64 insertions(+), 40 deletions(-) diff --git a/fusio-parquet/src/reader.rs b/fusio-parquet/src/reader.rs index f18a2b1..a6e598c 100644 --- a/fusio-parquet/src/reader.rs +++ b/fusio-parquet/src/reader.rs @@ -16,6 +16,9 @@ use parquet::{ const PREFETCH_FOOTER_SIZE: usize = 512 * 1024; pub struct AsyncReader { + #[cfg(feature = "opfs")] + inner: Arc>>, + #[cfg(not(feature = "opfs"))] inner: Box, content_length: u64, // The prefetch size for fetching file footer. @@ -31,6 +34,9 @@ fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize { impl AsyncReader { pub async fn new(reader: Box, content_length: u64) -> Result { + #[cfg(feature = "opfs")] + #[allow(clippy::arc_with_non_send_sync)] + let reader = Arc::new(futures::lock::Mutex::new(reader)); Ok(Self { inner: reader, content_length, @@ -52,22 +58,20 @@ impl AsyncFileReader for AsyncReader { cfg_if::cfg_if! { if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { - let (sender, receiver) = - futures::channel::oneshot::channel::>(); - let opfs = unsafe { - std::mem::transmute::<&Box, &Box>(&self.inner) - }; - let file_handle = opfs.file_handle().unwrap(); + let (sender, receiver) = futures::channel::oneshot::channel::>(); + let reader = self.inner.clone(); wasm_bindgen_futures::spawn_local(async move { - - let (result, buf) = file_handle.read_exact_at(buf, range.start as u64).await; - - let ret = match result { - Ok(_) => Ok(buf.freeze()), - Err(err) => Err(ParquetError::External(Box::new(err))) + let result = { + let mut guard = reader.lock().await; + let (result, buf) = guard.read_exact_at(buf, range.start as u64).await; + match result { + Ok(_) => Ok(buf.freeze()), + Err(err) => Err(ParquetError::External(Box::new(err))) + } }; - let _ = sender.send(ret); + + let _ = sender.send(result); }); async move { @@ -96,15 +100,12 @@ impl AsyncFileReader for AsyncReader { if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { let mut buf = BytesMut::with_capacity(footer_size); buf.resize(footer_size, 0); - let (sender, receiver) = - futures::channel::oneshot::channel::, ParquetError>>(); - let opfs = unsafe { - std::mem::transmute::<&Box, &Box>(&self.inner) - }; - let file_handle = opfs.file_handle().unwrap(); + let (sender, receiver) = futures::channel::oneshot::channel::, ParquetError>>(); + let reader = self.inner.clone(); wasm_bindgen_futures::spawn_local(async move { - let (result, prefetched_footer_content) = file_handle + let mut guard = reader.lock().await; + let (result, prefetched_footer_content) = guard .read_exact_at(buf, content_length - footer_size as u64) .await; if let Err(err) = result { @@ -130,6 +131,7 @@ impl AsyncFileReader for AsyncReader { - FOOTER_SIZE) ..(prefetched_footer_length - FOOTER_SIZE)]; + drop(guard); let _ = sender.send(ParquetMetaDataReader::decode_metadata(buf) .map(|meta| Arc::new(meta))); @@ -137,12 +139,15 @@ impl AsyncFileReader for AsyncReader { let mut buf = BytesMut::with_capacity(metadata_length); buf.resize(metadata_length, 0); - let (result, bytes) = file_handle + let (result, bytes) = guard .read_exact_at( buf, content_length - metadata_length as u64 - FOOTER_SIZE as u64, ) .await; + + drop(guard); + if let Err(err) = result { let _ = sender.send(Err(ParquetError::External(Box::new(err)))); return ; diff --git a/fusio-parquet/src/writer.rs b/fusio-parquet/src/writer.rs index 1c910d0..c77916a 100644 --- a/fusio-parquet/src/writer.rs +++ b/fusio-parquet/src/writer.rs @@ -5,18 +5,23 @@ use futures::future::BoxFuture; use parquet::{arrow::async_writer::AsyncFileWriter, errors::ParquetError}; pub struct AsyncWriter { - inner: Option>, #[cfg(feature = "opfs")] - pos: u64, + #[allow(clippy::arc_with_non_send_sync)] + inner: Option>>>, + #[cfg(not(feature = "opfs"))] + inner: Option>, } unsafe impl Send for AsyncWriter {} impl AsyncWriter { pub fn new(writer: Box) -> Self { - Self { - inner: Some(writer), - #[cfg(feature = "opfs")] - pos: 0, + #[cfg(feature = "opfs")] + #[allow(clippy::arc_with_non_send_sync)] + let writer = std::sync::Arc::new(futures::lock::Mutex::new(writer)); + { + Self { + inner: Some(writer), + } } } } @@ -27,17 +32,15 @@ impl AsyncFileWriter for AsyncWriter { if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { match self.inner.as_mut() { Some(writer) => { - let pos = self.pos; - self.pos += bs.len() as u64; - let (sender, receiver) = - futures::channel::oneshot::channel::>(); - let opfs = unsafe { - std::mem::transmute::<&Box, &Box>(writer) - }; - let handle = opfs.file_handle().unwrap(); - + let (sender, receiver) = futures::channel::oneshot::channel::>(); + let writer = writer.clone(); wasm_bindgen_futures::spawn_local(async move { - let (result, _) = handle.write_at(bs, pos).await; + let result = { + let mut guard = writer.lock().await; + let (result, _) = guard.write_all(bs).await; + result + }; + let _ = sender.send(result .map_err(|err| ParquetError::External(Box::new(err)))); }); @@ -68,9 +71,25 @@ impl AsyncFileWriter for AsyncWriter { fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> { cfg_if::cfg_if! { if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { - Box::pin(async move { - Ok(()) - }) + match self.inner.take() { + Some(writer) => { + let (sender, receiver) = futures::channel::oneshot::channel::>(); + wasm_bindgen_futures::spawn_local(async move { + let result = { + let mut guard = writer.lock().await; + guard.close().await + }; + let _ = sender.send(result + .map_err(|err| ParquetError::External(Box::new(err)))); + }); + Box::pin(async move { + receiver.await.unwrap() + }) + } + None => Box::pin(async move { + Ok(()) + }) + } } else { Box::pin(async move { if let Some(mut writer) = self.inner.take() { From 7d2f51989beb63473f9631f6bdf31e10daa69c52 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Sat, 7 Dec 2024 20:18:17 +0800 Subject: [PATCH 2/4] rename feature name from opfs to wasm --- .github/workflows/ci.yml | 4 ++-- examples/opfs/Cargo.toml | 2 +- fusio-parquet/Cargo.toml | 2 +- fusio-parquet/src/reader.rs | 10 +++++----- fusio-parquet/src/writer.rs | 10 +++++----- fusio-parquet/tests/{opfs.rs => wasm.rs} | 2 +- 6 files changed, 15 insertions(+), 15 deletions(-) rename fusio-parquet/tests/{opfs.rs => wasm.rs} (99%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f11c4ce..c4c350f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -135,7 +135,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - args: --target wasm32-unknown-unknown --package fusio-parquet --features=opfs + args: --target wasm32-unknown-unknown --package fusio-parquet --features=wasm - name: Run cargo build on fusio-dispatch uses: actions-rs/cargo@v1 @@ -165,4 +165,4 @@ jobs: - name: Run wasm-pack test on fusion-parquet run: | export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/ - wasm-pack test --chrome --headless fusio-parquet --features opfs + wasm-pack test --chrome --headless fusio-parquet --features wasm diff --git a/examples/opfs/Cargo.toml b/examples/opfs/Cargo.toml index 913dcd6..4b8580b 100644 --- a/examples/opfs/Cargo.toml +++ b/examples/opfs/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["cdylib", "rlib"] arrow = "53" fusio = { path = "../../fusio", features = ["opfs"] } fusio-dispatch = { path = "../../fusio-dispatch", features = ["opfs"] } -fusio-parquet = { path = "../../fusio-parquet", features = ["opfs"] } +fusio-parquet = { path = "../../fusio-parquet", features = ["wasm"] } futures = { version = "0.3" } parquet = { version = "53", default-features = false, features = [ "arrow", diff --git a/fusio-parquet/Cargo.toml b/fusio-parquet/Cargo.toml index 5b2c903..5f66a13 100644 --- a/fusio-parquet/Cargo.toml +++ b/fusio-parquet/Cargo.toml @@ -8,7 +8,7 @@ version = "0.2.2" [features] default = [] -opfs = ["fusio/opfs"] +wasm = ["fusio/opfs"] tokio = ["fusio/tokio"] [dependencies] diff --git a/fusio-parquet/src/reader.rs b/fusio-parquet/src/reader.rs index a6e598c..a1a3602 100644 --- a/fusio-parquet/src/reader.rs +++ b/fusio-parquet/src/reader.rs @@ -16,9 +16,9 @@ use parquet::{ const PREFETCH_FOOTER_SIZE: usize = 512 * 1024; pub struct AsyncReader { - #[cfg(feature = "opfs")] + #[cfg(feature = "wasm")] inner: Arc>>, - #[cfg(not(feature = "opfs"))] + #[cfg(not(feature = "wasm"))] inner: Box, content_length: u64, // The prefetch size for fetching file footer. @@ -34,7 +34,7 @@ fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize { impl AsyncReader { pub async fn new(reader: Box, content_length: u64) -> Result { - #[cfg(feature = "opfs")] + #[cfg(feature = "wasm")] #[allow(clippy::arc_with_non_send_sync)] let reader = Arc::new(futures::lock::Mutex::new(reader)); Ok(Self { @@ -57,7 +57,7 @@ impl AsyncFileReader for AsyncReader { buf.resize(len, 0); cfg_if::cfg_if! { - if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] { let (sender, receiver) = futures::channel::oneshot::channel::>(); let reader = self.inner.clone(); @@ -97,7 +97,7 @@ impl AsyncFileReader for AsyncReader { let content_length = self.content_length; cfg_if::cfg_if! { - if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] { let mut buf = BytesMut::with_capacity(footer_size); buf.resize(footer_size, 0); let (sender, receiver) = futures::channel::oneshot::channel::, ParquetError>>(); diff --git a/fusio-parquet/src/writer.rs b/fusio-parquet/src/writer.rs index c77916a..0b60f12 100644 --- a/fusio-parquet/src/writer.rs +++ b/fusio-parquet/src/writer.rs @@ -5,17 +5,17 @@ use futures::future::BoxFuture; use parquet::{arrow::async_writer::AsyncFileWriter, errors::ParquetError}; pub struct AsyncWriter { - #[cfg(feature = "opfs")] + #[cfg(feature = "wasm")] #[allow(clippy::arc_with_non_send_sync)] inner: Option>>>, - #[cfg(not(feature = "opfs"))] + #[cfg(not(feature = "wasm"))] inner: Option>, } unsafe impl Send for AsyncWriter {} impl AsyncWriter { pub fn new(writer: Box) -> Self { - #[cfg(feature = "opfs")] + #[cfg(feature = "wasm")] #[allow(clippy::arc_with_non_send_sync)] let writer = std::sync::Arc::new(futures::lock::Mutex::new(writer)); { @@ -29,7 +29,7 @@ impl AsyncWriter { impl AsyncFileWriter for AsyncWriter { fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> { cfg_if::cfg_if! { - if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] { match self.inner.as_mut() { Some(writer) => { let (sender, receiver) = futures::channel::oneshot::channel::>(); @@ -70,7 +70,7 @@ impl AsyncFileWriter for AsyncWriter { fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> { cfg_if::cfg_if! { - if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] { match self.inner.take() { Some(writer) => { let (sender, receiver) = futures::channel::oneshot::channel::>(); diff --git a/fusio-parquet/tests/opfs.rs b/fusio-parquet/tests/wasm.rs similarity index 99% rename from fusio-parquet/tests/opfs.rs rename to fusio-parquet/tests/wasm.rs index 6a18d6b..4dbe12a 100644 --- a/fusio-parquet/tests/opfs.rs +++ b/fusio-parquet/tests/wasm.rs @@ -1,5 +1,5 @@ #[cfg(test)] -#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +#[cfg(all(feature = "wasm", target_arch = "wasm32"))] pub(crate) mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); From 77fb06047b63b320046cb399273623111bc3a0bf Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Mon, 9 Dec 2024 11:47:33 +0800 Subject: [PATCH 3/4] change close semantics --- fusio/src/impls/disk/opfs/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/fusio/src/impls/disk/opfs/mod.rs b/fusio/src/impls/disk/opfs/mod.rs index 0e2760c..584f10a 100644 --- a/fusio/src/impls/disk/opfs/mod.rs +++ b/fusio/src/impls/disk/opfs/mod.rs @@ -258,7 +258,6 @@ impl Write for OPFSFile { if let Some(writer) = writer { JsFuture::from(writer.close()).await.map_err(wasm_err)?; } - self.file_handle.take(); Ok(()) } } From 25e6da71827871cd858399663c4c5212a7d30e15 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Mon, 9 Dec 2024 12:28:07 +0800 Subject: [PATCH 4/4] change feature name to web --- .github/workflows/ci.yml | 4 ++-- examples/opfs/Cargo.toml | 2 +- fusio-parquet/Cargo.toml | 2 +- fusio-parquet/src/reader.rs | 10 +++++----- fusio-parquet/src/writer.rs | 10 +++++----- fusio-parquet/tests/wasm.rs | 2 +- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c4c350f..e96ebf1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -135,7 +135,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - args: --target wasm32-unknown-unknown --package fusio-parquet --features=wasm + args: --target wasm32-unknown-unknown --package fusio-parquet --features=web - name: Run cargo build on fusio-dispatch uses: actions-rs/cargo@v1 @@ -165,4 +165,4 @@ jobs: - name: Run wasm-pack test on fusion-parquet run: | export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/ - wasm-pack test --chrome --headless fusio-parquet --features wasm + wasm-pack test --chrome --headless fusio-parquet --features web diff --git a/examples/opfs/Cargo.toml b/examples/opfs/Cargo.toml index 4b8580b..8e1f892 100644 --- a/examples/opfs/Cargo.toml +++ b/examples/opfs/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["cdylib", "rlib"] arrow = "53" fusio = { path = "../../fusio", features = ["opfs"] } fusio-dispatch = { path = "../../fusio-dispatch", features = ["opfs"] } -fusio-parquet = { path = "../../fusio-parquet", features = ["wasm"] } +fusio-parquet = { path = "../../fusio-parquet", features = ["web"] } futures = { version = "0.3" } parquet = { version = "53", default-features = false, features = [ "arrow", diff --git a/fusio-parquet/Cargo.toml b/fusio-parquet/Cargo.toml index 5f66a13..3e609e7 100644 --- a/fusio-parquet/Cargo.toml +++ b/fusio-parquet/Cargo.toml @@ -8,7 +8,7 @@ version = "0.2.2" [features] default = [] -wasm = ["fusio/opfs"] +web = ["fusio/opfs"] tokio = ["fusio/tokio"] [dependencies] diff --git a/fusio-parquet/src/reader.rs b/fusio-parquet/src/reader.rs index a1a3602..eba1790 100644 --- a/fusio-parquet/src/reader.rs +++ b/fusio-parquet/src/reader.rs @@ -16,9 +16,9 @@ use parquet::{ const PREFETCH_FOOTER_SIZE: usize = 512 * 1024; pub struct AsyncReader { - #[cfg(feature = "wasm")] + #[cfg(feature = "web")] inner: Arc>>, - #[cfg(not(feature = "wasm"))] + #[cfg(not(feature = "web"))] inner: Box, content_length: u64, // The prefetch size for fetching file footer. @@ -34,7 +34,7 @@ fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize { impl AsyncReader { pub async fn new(reader: Box, content_length: u64) -> Result { - #[cfg(feature = "wasm")] + #[cfg(feature = "web")] #[allow(clippy::arc_with_non_send_sync)] let reader = Arc::new(futures::lock::Mutex::new(reader)); Ok(Self { @@ -57,7 +57,7 @@ impl AsyncFileReader for AsyncReader { buf.resize(len, 0); cfg_if::cfg_if! { - if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] { + if #[cfg(all(feature = "web", target_arch = "wasm32"))] { let (sender, receiver) = futures::channel::oneshot::channel::>(); let reader = self.inner.clone(); @@ -97,7 +97,7 @@ impl AsyncFileReader for AsyncReader { let content_length = self.content_length; cfg_if::cfg_if! { - if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] { + if #[cfg(all(feature = "web", target_arch = "wasm32"))] { let mut buf = BytesMut::with_capacity(footer_size); buf.resize(footer_size, 0); let (sender, receiver) = futures::channel::oneshot::channel::, ParquetError>>(); diff --git a/fusio-parquet/src/writer.rs b/fusio-parquet/src/writer.rs index 0b60f12..da50eab 100644 --- a/fusio-parquet/src/writer.rs +++ b/fusio-parquet/src/writer.rs @@ -5,17 +5,17 @@ use futures::future::BoxFuture; use parquet::{arrow::async_writer::AsyncFileWriter, errors::ParquetError}; pub struct AsyncWriter { - #[cfg(feature = "wasm")] + #[cfg(feature = "web")] #[allow(clippy::arc_with_non_send_sync)] inner: Option>>>, - #[cfg(not(feature = "wasm"))] + #[cfg(not(feature = "web"))] inner: Option>, } unsafe impl Send for AsyncWriter {} impl AsyncWriter { pub fn new(writer: Box) -> Self { - #[cfg(feature = "wasm")] + #[cfg(feature = "web")] #[allow(clippy::arc_with_non_send_sync)] let writer = std::sync::Arc::new(futures::lock::Mutex::new(writer)); { @@ -29,7 +29,7 @@ impl AsyncWriter { impl AsyncFileWriter for AsyncWriter { fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> { cfg_if::cfg_if! { - if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] { + if #[cfg(all(feature = "web", target_arch = "wasm32"))] { match self.inner.as_mut() { Some(writer) => { let (sender, receiver) = futures::channel::oneshot::channel::>(); @@ -70,7 +70,7 @@ impl AsyncFileWriter for AsyncWriter { fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> { cfg_if::cfg_if! { - if #[cfg(all(feature = "wasm", target_arch = "wasm32"))] { + if #[cfg(all(feature = "web", target_arch = "wasm32"))] { match self.inner.take() { Some(writer) => { let (sender, receiver) = futures::channel::oneshot::channel::>(); diff --git a/fusio-parquet/tests/wasm.rs b/fusio-parquet/tests/wasm.rs index 4dbe12a..9c1be2a 100644 --- a/fusio-parquet/tests/wasm.rs +++ b/fusio-parquet/tests/wasm.rs @@ -1,5 +1,5 @@ #[cfg(test)] -#[cfg(all(feature = "wasm", target_arch = "wasm32"))] +#[cfg(all(feature = "web", target_arch = "wasm32"))] pub(crate) mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);