From 560f89b882d47ad142c52acc651ee951f63a47e3 Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Mon, 30 Sep 2024 06:48:27 +0000 Subject: [PATCH] feat: update Read & Write for tokio-uring impl --- .github/workflows/ci.yml | 37 +++++++++++++++ fusio/src/lib.rs | 29 +++++++++--- fusio/src/local/mod.rs | 4 +- fusio/src/local/tokio_uring/fs.rs | 55 ++++++++++++++++------ fusio/src/local/tokio_uring/mod.rs | 75 ++++++++++++++++++++++-------- 5 files changed, 161 insertions(+), 39 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bd1c360..9371e12 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -85,6 +85,43 @@ jobs: command: test args: --package fusio --features "monoio, futures" + tokio_uring_check: + name: Rust project check on tokio_uring + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: + - ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install latest + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + components: rustfmt, clippy + + # `cargo check` command here will use installed `nightly` + # as it is set as an "override" for current directory + + - name: Run cargo clippy on tokio-uring + uses: actions-rs/cargo@v1 + with: + command: check + args: --package fusio --features "tokio-uring, futures" + + - name: Run cargo build on tokio-uring + uses: actions-rs/cargo@v1 + with: + command: build + args: --package fusio --features "tokio-uring, futures" + + - name: Run cargo test on tokio-uring + uses: actions-rs/cargo@v1 + with: + command: test + args: --package fusio --features "tokio-uring, futures" + # 2 fmt: name: Rust fmt diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 9c9bbc3..26f455a 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -259,13 +259,24 @@ mod tests { writer.sync_data().await.unwrap(); let mut reader = CountRead::new(read); - reader.seek(0).await.unwrap(); + { + reader.seek(0).await.unwrap(); + + let mut buf = vec![]; + buf = reader.read_to_end(buf).await.unwrap(); + + assert_eq!(buf.bytes_init(), 4); + assert_eq!(buf.as_slice(), &[2, 0, 2, 4]); + } + { + reader.seek(2).await.unwrap(); - let mut buf = vec![]; - buf = reader.read_to_end(buf).await.unwrap(); + let mut buf = vec![]; + buf = reader.read_to_end(buf).await.unwrap(); - assert_eq!(buf.bytes_init(), 4); - assert_eq!(buf.as_slice(), &[2, 0, 2, 4]); + assert_eq!(buf.bytes_init(), 2); + assert_eq!(buf.as_slice(), &[2, 4]); + } } #[cfg(feature = "futures")] @@ -418,11 +429,17 @@ mod tests { use tempfile::tempfile; use tokio_uring::fs::File; + use crate::local::tokio_uring::TokioUringFile; + tokio_uring::start(async { let read = tempfile().unwrap(); let write = read.try_clone().unwrap(); - write_and_read(File::from_std(write), File::from_std(read)).await; + write_and_read( + TokioUringFile::from(File::from_std(write)), + TokioUringFile::from(File::from_std(read)), + ) + .await; }); } } diff --git a/fusio/src/local/mod.rs b/fusio/src/local/mod.rs index c71df41..94014a5 100644 --- a/fusio/src/local/mod.rs +++ b/fusio/src/local/mod.rs @@ -3,7 +3,7 @@ pub(crate) mod monoio; #[cfg(feature = "tokio")] pub(crate) mod tokio; #[cfg(all(feature = "tokio-uring", target_os = "linux"))] -mod tokio_uring; +pub(crate) mod tokio_uring; #[cfg(all(feature = "monoio", feature = "fs"))] #[allow(unused)] @@ -21,5 +21,7 @@ cfg_if::cfg_if! { pub type LocalFs = TokioFs; } else if #[cfg(feature = "monoio")] { pub type LocalFs = MonoIoFs; + } else if #[cfg(feature = "tokio-uring")] { + pub type LocalFs = TokioUringFs; } } diff --git a/fusio/src/local/tokio_uring/fs.rs b/fusio/src/local/tokio_uring/fs.rs index 4121548..bc72c26 100644 --- a/fusio/src/local/tokio_uring/fs.rs +++ b/fusio/src/local/tokio_uring/fs.rs @@ -1,33 +1,62 @@ -use std::{io, path::Path}; - use async_stream::stream; use futures_core::Stream; -use tokio_uring::fs::{remove_file, File}; +use tokio_uring::fs::{create_dir_all, remove_file}; -use crate::fs::{FileMeta, Fs}; +use crate::{ + fs::{FileMeta, Fs, OpenOptions, WriteMode}, + local::tokio_uring::TokioUringFile, + path::{path_to_local, Path}, + Error, +}; pub struct TokioUringFs; impl Fs for TokioUringFs { - type File = File; + type File = TokioUringFile; + + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { + let local_path = path_to_local(path)?; + + let file = tokio_uring::fs::OpenOptions::new() + .read(options.read) + .write(options.write.is_some()) + .create(options.create) + .append(options.write == Some(WriteMode::Append)) + .truncate(options.write == Some(WriteMode::Overwrite)) + .open(&local_path) + .await?; - async fn open(&self, path: impl AsRef) -> io::Result { - File::open(path).await + Ok(TokioUringFile { + file: Some(file), + pos: 0, + }) + } + + async fn create_dir_all(path: &Path) -> Result<(), Error> { + let path = path_to_local(path)?; + create_dir_all(path).await?; + + Ok(()) } async fn list( &self, - path: impl AsRef, - ) -> io::Result>> { - let dir = path.as_ref().read_dir()?; + path: &Path, + ) -> Result>, Error> { + let path = path_to_local(path)?; + let dir = path.read_dir()?; + Ok(stream! { for entry in dir { - yield Ok(crate::fs::FileMeta { path: entry?.path() }); + let entry = entry?; + yield Ok(FileMeta { path: Path::from_filesystem_path(entry.path())?, size: entry.metadata()?.len() }); } }) } - async fn remove(&self, path: impl AsRef) -> io::Result<()> { - remove_file(path).await + async fn remove(&self, path: &Path) -> Result<(), Error> { + let path = path_to_local(path)?; + + Ok(remove_file(path).await?) } } diff --git a/fusio/src/local/tokio_uring/mod.rs b/fusio/src/local/tokio_uring/mod.rs index 4b96017..783ce86 100644 --- a/fusio/src/local/tokio_uring/mod.rs +++ b/fusio/src/local/tokio_uring/mod.rs @@ -3,7 +3,7 @@ pub mod fs; use tokio_uring::fs::File; -use crate::{Error, IoBuf, IoBufMut, Read, Write}; +use crate::{Error, IoBuf, IoBufMut, Read, Seek, Write}; #[repr(transparent)] struct TokioUringBuf { @@ -35,43 +35,80 @@ where self.buf.as_mut_ptr() } - unsafe fn set_init(&mut self, pos: usize) { - IoBufMut::set_init(&mut self.buf, pos) + unsafe fn set_init(&mut self, _pos: usize) {} +} + +pub struct TokioUringFile { + file: Option, + pos: u64, +} + +impl From for TokioUringFile { + fn from(file: File) -> Self { + Self { + file: Some(file), + pos: 0, + } } } -impl Write for File { - async fn write(&mut self, buf: B, pos: u64) -> (Result, B) { - let (result, buf) = self.write_at(TokioUringBuf { buf }, pos).submit().await; +impl Write for TokioUringFile { + async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + let (result, buf) = self + .file + .as_ref() + .expect("read file after closed") + .write_all_at(TokioUringBuf { buf }, self.pos) + .await; + self.pos += buf.buf.bytes_init() as u64; (result.map_err(Error::from), buf.buf) } async fn sync_data(&self) -> Result<(), Error> { - File::sync_data(self).await?; + File::sync_data(self.file.as_ref().expect("read file after closed")).await?; Ok(()) } async fn sync_all(&self) -> Result<(), Error> { - File::sync_all(self).await?; + File::sync_all(self.file.as_ref().expect("read file after closed")).await?; Ok(()) } - async fn close(self) -> Result<(), Error> { - File::close(self).await?; + async fn close(&mut self) -> Result<(), Error> { + File::close(self.file.take().expect("close file twice")).await?; Ok(()) } } -impl Read for File { - async fn read(&mut self, pos: u64, len: Option) -> Result { - let buf = vec![0; len.unwrap_or(0) as usize]; - - let (result, buf) = self.read_at(TokioUringBuf { buf }, pos).await; +impl Read for TokioUringFile { + async fn read_exact(&mut self, buf: B) -> Result { + let (result, buf) = self + .file + .as_ref() + .expect("read file after closed") + .read_exact_at(TokioUringBuf { buf }, self.pos) + .await; result?; + self.pos += buf.buf.bytes_init() as u64; + + Ok(buf.buf) + } + + async fn size(&self) -> Result { + let stat = self + .file + .as_ref() + .expect("read file after closed") + .statx() + .await?; + Ok(stat.stx_size) + } +} + +impl Seek for TokioUringFile { + async fn seek(&mut self, pos: u64) -> Result<(), Error> { + self.pos = pos; - #[cfg(not(feature = "bytes"))] - return Ok(buf.buf); - #[cfg(feature = "bytes")] - return Ok(bytes::Bytes::from(buf.buf)); + Ok(()) } }