diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f11c4ce..e96ebf1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -135,7 +135,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - args: --target wasm32-unknown-unknown --package fusio-parquet --features=opfs + args: --target wasm32-unknown-unknown --package fusio-parquet --features=web - name: Run cargo build on fusio-dispatch uses: actions-rs/cargo@v1 @@ -165,4 +165,4 @@ jobs: - name: Run wasm-pack test on fusion-parquet run: | export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/ - wasm-pack test --chrome --headless fusio-parquet --features opfs + wasm-pack test --chrome --headless fusio-parquet --features web diff --git a/examples/opfs/Cargo.toml b/examples/opfs/Cargo.toml index 913dcd6..8e1f892 100644 --- a/examples/opfs/Cargo.toml +++ b/examples/opfs/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["cdylib", "rlib"] arrow = "53" fusio = { path = "../../fusio", features = ["opfs"] } fusio-dispatch = { path = "../../fusio-dispatch", features = ["opfs"] } -fusio-parquet = { path = "../../fusio-parquet", features = ["opfs"] } +fusio-parquet = { path = "../../fusio-parquet", features = ["web"] } futures = { version = "0.3" } parquet = { version = "53", default-features = false, features = [ "arrow", diff --git a/fusio-parquet/Cargo.toml b/fusio-parquet/Cargo.toml index 5b2c903..3e609e7 100644 --- a/fusio-parquet/Cargo.toml +++ b/fusio-parquet/Cargo.toml @@ -8,7 +8,7 @@ version = "0.2.2" [features] default = [] -opfs = ["fusio/opfs"] +web = ["fusio/opfs"] tokio = ["fusio/tokio"] [dependencies] diff --git a/fusio-parquet/src/reader.rs b/fusio-parquet/src/reader.rs index f18a2b1..eba1790 100644 --- a/fusio-parquet/src/reader.rs +++ b/fusio-parquet/src/reader.rs @@ -16,6 +16,9 @@ use parquet::{ const PREFETCH_FOOTER_SIZE: usize = 512 * 1024; pub struct AsyncReader { + #[cfg(feature = "web")] + inner: Arc>>, + #[cfg(not(feature = "web"))] inner: Box, content_length: u64, // The prefetch size for fetching file footer. @@ -31,6 +34,9 @@ fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize { impl AsyncReader { pub async fn new(reader: Box, content_length: u64) -> Result { + #[cfg(feature = "web")] + #[allow(clippy::arc_with_non_send_sync)] + let reader = Arc::new(futures::lock::Mutex::new(reader)); Ok(Self { inner: reader, content_length, @@ -51,23 +57,21 @@ impl AsyncFileReader for AsyncReader { buf.resize(len, 0); cfg_if::cfg_if! { - if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { - let (sender, receiver) = - futures::channel::oneshot::channel::>(); - let opfs = unsafe { - std::mem::transmute::<&Box, &Box>(&self.inner) - }; - let file_handle = opfs.file_handle().unwrap(); + if #[cfg(all(feature = "web", target_arch = "wasm32"))] { + let (sender, receiver) = futures::channel::oneshot::channel::>(); + let reader = self.inner.clone(); wasm_bindgen_futures::spawn_local(async move { - - let (result, buf) = file_handle.read_exact_at(buf, range.start as u64).await; - - let ret = match result { - Ok(_) => Ok(buf.freeze()), - Err(err) => Err(ParquetError::External(Box::new(err))) + let result = { + let mut guard = reader.lock().await; + let (result, buf) = guard.read_exact_at(buf, range.start as u64).await; + match result { + Ok(_) => Ok(buf.freeze()), + Err(err) => Err(ParquetError::External(Box::new(err))) + } }; - let _ = sender.send(ret); + + let _ = sender.send(result); }); async move { @@ -93,18 +97,15 @@ impl AsyncFileReader for AsyncReader { let content_length = self.content_length; cfg_if::cfg_if! { - if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + if #[cfg(all(feature = "web", target_arch = "wasm32"))] { let mut buf = BytesMut::with_capacity(footer_size); buf.resize(footer_size, 0); - let (sender, receiver) = - futures::channel::oneshot::channel::, ParquetError>>(); - let opfs = unsafe { - std::mem::transmute::<&Box, &Box>(&self.inner) - }; - let file_handle = opfs.file_handle().unwrap(); + let (sender, receiver) = futures::channel::oneshot::channel::, ParquetError>>(); + let reader = self.inner.clone(); wasm_bindgen_futures::spawn_local(async move { - let (result, prefetched_footer_content) = file_handle + let mut guard = reader.lock().await; + let (result, prefetched_footer_content) = guard .read_exact_at(buf, content_length - footer_size as u64) .await; if let Err(err) = result { @@ -130,6 +131,7 @@ impl AsyncFileReader for AsyncReader { - FOOTER_SIZE) ..(prefetched_footer_length - FOOTER_SIZE)]; + drop(guard); let _ = sender.send(ParquetMetaDataReader::decode_metadata(buf) .map(|meta| Arc::new(meta))); @@ -137,12 +139,15 @@ impl AsyncFileReader for AsyncReader { let mut buf = BytesMut::with_capacity(metadata_length); buf.resize(metadata_length, 0); - let (result, bytes) = file_handle + let (result, bytes) = guard .read_exact_at( buf, content_length - metadata_length as u64 - FOOTER_SIZE as u64, ) .await; + + drop(guard); + if let Err(err) = result { let _ = sender.send(Err(ParquetError::External(Box::new(err)))); return ; diff --git a/fusio-parquet/src/writer.rs b/fusio-parquet/src/writer.rs index 1c910d0..da50eab 100644 --- a/fusio-parquet/src/writer.rs +++ b/fusio-parquet/src/writer.rs @@ -5,18 +5,23 @@ use futures::future::BoxFuture; use parquet::{arrow::async_writer::AsyncFileWriter, errors::ParquetError}; pub struct AsyncWriter { + #[cfg(feature = "web")] + #[allow(clippy::arc_with_non_send_sync)] + inner: Option>>>, + #[cfg(not(feature = "web"))] inner: Option>, - #[cfg(feature = "opfs")] - pos: u64, } unsafe impl Send for AsyncWriter {} impl AsyncWriter { pub fn new(writer: Box) -> Self { - Self { - inner: Some(writer), - #[cfg(feature = "opfs")] - pos: 0, + #[cfg(feature = "web")] + #[allow(clippy::arc_with_non_send_sync)] + let writer = std::sync::Arc::new(futures::lock::Mutex::new(writer)); + { + Self { + inner: Some(writer), + } } } } @@ -24,20 +29,18 @@ impl AsyncWriter { impl AsyncFileWriter for AsyncWriter { fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> { cfg_if::cfg_if! { - if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + if #[cfg(all(feature = "web", target_arch = "wasm32"))] { match self.inner.as_mut() { Some(writer) => { - let pos = self.pos; - self.pos += bs.len() as u64; - let (sender, receiver) = - futures::channel::oneshot::channel::>(); - let opfs = unsafe { - std::mem::transmute::<&Box, &Box>(writer) - }; - let handle = opfs.file_handle().unwrap(); - + let (sender, receiver) = futures::channel::oneshot::channel::>(); + let writer = writer.clone(); wasm_bindgen_futures::spawn_local(async move { - let (result, _) = handle.write_at(bs, pos).await; + let result = { + let mut guard = writer.lock().await; + let (result, _) = guard.write_all(bs).await; + result + }; + let _ = sender.send(result .map_err(|err| ParquetError::External(Box::new(err)))); }); @@ -67,10 +70,26 @@ impl AsyncFileWriter for AsyncWriter { fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> { cfg_if::cfg_if! { - if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { - Box::pin(async move { - Ok(()) - }) + if #[cfg(all(feature = "web", target_arch = "wasm32"))] { + match self.inner.take() { + Some(writer) => { + let (sender, receiver) = futures::channel::oneshot::channel::>(); + wasm_bindgen_futures::spawn_local(async move { + let result = { + let mut guard = writer.lock().await; + guard.close().await + }; + let _ = sender.send(result + .map_err(|err| ParquetError::External(Box::new(err)))); + }); + Box::pin(async move { + receiver.await.unwrap() + }) + } + None => Box::pin(async move { + Ok(()) + }) + } } else { Box::pin(async move { if let Some(mut writer) = self.inner.take() { diff --git a/fusio-parquet/tests/opfs.rs b/fusio-parquet/tests/wasm.rs similarity index 99% rename from fusio-parquet/tests/opfs.rs rename to fusio-parquet/tests/wasm.rs index 6a18d6b..9c1be2a 100644 --- a/fusio-parquet/tests/opfs.rs +++ b/fusio-parquet/tests/wasm.rs @@ -1,5 +1,5 @@ #[cfg(test)] -#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +#[cfg(all(feature = "web", target_arch = "wasm32"))] pub(crate) mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); diff --git a/fusio/src/impls/disk/opfs/mod.rs b/fusio/src/impls/disk/opfs/mod.rs index 0e2760c..584f10a 100644 --- a/fusio/src/impls/disk/opfs/mod.rs +++ b/fusio/src/impls/disk/opfs/mod.rs @@ -258,7 +258,6 @@ impl Write for OPFSFile { if let Some(writer) = writer { JsFuture::from(writer.close()).await.map_err(wasm_err)?; } - self.file_handle.take(); Ok(()) } }