Skip to content

Commit

Permalink
feat: update Read & Write for tokio-uring impl
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Sep 30, 2024
1 parent 5682847 commit 560f89b
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 39 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,43 @@ jobs:
command: test
args: --package fusio --features "monoio, futures"

tokio_uring_check:
name: Rust project check on tokio_uring
runs-on: ${{ matrix.os }}
strategy:
matrix:
os:
- ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install latest
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy

# `cargo check` command here will use installed `nightly`
# as it is set as an "override" for current directory

- name: Run cargo clippy on tokio-uring
uses: actions-rs/cargo@v1
with:
command: check
args: --package fusio --features "tokio-uring, futures"

- name: Run cargo build on tokio-uring
uses: actions-rs/cargo@v1
with:
command: build
args: --package fusio --features "tokio-uring, futures"

- name: Run cargo test on tokio-uring
uses: actions-rs/cargo@v1
with:
command: test
args: --package fusio --features "tokio-uring, futures"

# 2
fmt:
name: Rust fmt
Expand Down
29 changes: 23 additions & 6 deletions fusio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,24 @@ mod tests {
writer.sync_data().await.unwrap();

let mut reader = CountRead::new(read);
reader.seek(0).await.unwrap();
{
reader.seek(0).await.unwrap();

let mut buf = vec![];
buf = reader.read_to_end(buf).await.unwrap();

assert_eq!(buf.bytes_init(), 4);
assert_eq!(buf.as_slice(), &[2, 0, 2, 4]);
}
{
reader.seek(2).await.unwrap();

let mut buf = vec![];
buf = reader.read_to_end(buf).await.unwrap();
let mut buf = vec![];
buf = reader.read_to_end(buf).await.unwrap();

assert_eq!(buf.bytes_init(), 4);
assert_eq!(buf.as_slice(), &[2, 0, 2, 4]);
assert_eq!(buf.bytes_init(), 2);
assert_eq!(buf.as_slice(), &[2, 4]);
}
}

#[cfg(feature = "futures")]
Expand Down Expand Up @@ -418,11 +429,17 @@ mod tests {
use tempfile::tempfile;
use tokio_uring::fs::File;

use crate::local::tokio_uring::TokioUringFile;

tokio_uring::start(async {
let read = tempfile().unwrap();
let write = read.try_clone().unwrap();

write_and_read(File::from_std(write), File::from_std(read)).await;
write_and_read(
TokioUringFile::from(File::from_std(write)),
TokioUringFile::from(File::from_std(read)),
)
.await;
});
}
}
4 changes: 3 additions & 1 deletion fusio/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub(crate) mod monoio;
#[cfg(feature = "tokio")]
pub(crate) mod tokio;
#[cfg(all(feature = "tokio-uring", target_os = "linux"))]
mod tokio_uring;
pub(crate) mod tokio_uring;

#[cfg(all(feature = "monoio", feature = "fs"))]
#[allow(unused)]
Expand All @@ -21,5 +21,7 @@ cfg_if::cfg_if! {
pub type LocalFs = TokioFs;
} else if #[cfg(feature = "monoio")] {
pub type LocalFs = MonoIoFs;
} else if #[cfg(feature = "tokio-uring")] {
pub type LocalFs = TokioUringFs;
}
}
55 changes: 42 additions & 13 deletions fusio/src/local/tokio_uring/fs.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,62 @@
use std::{io, path::Path};

use async_stream::stream;
use futures_core::Stream;
use tokio_uring::fs::{remove_file, File};
use tokio_uring::fs::{create_dir_all, remove_file};

use crate::fs::{FileMeta, Fs};
use crate::{
fs::{FileMeta, Fs, OpenOptions, WriteMode},
local::tokio_uring::TokioUringFile,
path::{path_to_local, Path},
Error,
};

pub struct TokioUringFs;

impl Fs for TokioUringFs {
type File = File;
type File = TokioUringFile;

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
let local_path = path_to_local(path)?;

let file = tokio_uring::fs::OpenOptions::new()
.read(options.read)
.write(options.write.is_some())
.create(options.create)
.append(options.write == Some(WriteMode::Append))
.truncate(options.write == Some(WriteMode::Overwrite))
.open(&local_path)
.await?;

async fn open(&self, path: impl AsRef<Path>) -> io::Result<Self::File> {
File::open(path).await
Ok(TokioUringFile {
file: Some(file),
pos: 0,
})
}

async fn create_dir_all(path: &Path) -> Result<(), Error> {
let path = path_to_local(path)?;
create_dir_all(path).await?;

Ok(())
}

async fn list(
&self,
path: impl AsRef<Path>,
) -> io::Result<impl Stream<Item = io::Result<FileMeta>>> {
let dir = path.as_ref().read_dir()?;
path: &Path,
) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> {
let path = path_to_local(path)?;
let dir = path.read_dir()?;

Ok(stream! {
for entry in dir {
yield Ok(crate::fs::FileMeta { path: entry?.path() });
let entry = entry?;
yield Ok(FileMeta { path: Path::from_filesystem_path(entry.path())?, size: entry.metadata()?.len() });
}
})
}

async fn remove(&self, path: impl AsRef<Path>) -> io::Result<()> {
remove_file(path).await
async fn remove(&self, path: &Path) -> Result<(), Error> {
let path = path_to_local(path)?;

Ok(remove_file(path).await?)
}
}
75 changes: 56 additions & 19 deletions fusio/src/local/tokio_uring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod fs;

use tokio_uring::fs::File;

use crate::{Error, IoBuf, IoBufMut, Read, Write};
use crate::{Error, IoBuf, IoBufMut, Read, Seek, Write};

#[repr(transparent)]
struct TokioUringBuf<B> {
Expand Down Expand Up @@ -35,43 +35,80 @@ where
self.buf.as_mut_ptr()
}

unsafe fn set_init(&mut self, pos: usize) {
IoBufMut::set_init(&mut self.buf, pos)
unsafe fn set_init(&mut self, _pos: usize) {}
}

pub struct TokioUringFile {
file: Option<File>,
pos: u64,
}

impl From<File> for TokioUringFile {
fn from(file: File) -> Self {
Self {
file: Some(file),
pos: 0,
}
}
}

impl Write for File {
async fn write<B: IoBuf>(&mut self, buf: B, pos: u64) -> (Result<usize, Error>, B) {
let (result, buf) = self.write_at(TokioUringBuf { buf }, pos).submit().await;
impl Write for TokioUringFile {
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
let (result, buf) = self
.file
.as_ref()
.expect("read file after closed")
.write_all_at(TokioUringBuf { buf }, self.pos)
.await;
self.pos += buf.buf.bytes_init() as u64;
(result.map_err(Error::from), buf.buf)
}

async fn sync_data(&self) -> Result<(), Error> {
File::sync_data(self).await?;
File::sync_data(self.file.as_ref().expect("read file after closed")).await?;
Ok(())
}

async fn sync_all(&self) -> Result<(), Error> {
File::sync_all(self).await?;
File::sync_all(self.file.as_ref().expect("read file after closed")).await?;
Ok(())
}

async fn close(self) -> Result<(), Error> {
File::close(self).await?;
async fn close(&mut self) -> Result<(), Error> {
File::close(self.file.take().expect("close file twice")).await?;
Ok(())
}
}

impl Read for File {
async fn read(&mut self, pos: u64, len: Option<u64>) -> Result<impl IoBuf, Error> {
let buf = vec![0; len.unwrap_or(0) as usize];

let (result, buf) = self.read_at(TokioUringBuf { buf }, pos).await;
impl Read for TokioUringFile {
async fn read_exact<B: IoBufMut>(&mut self, buf: B) -> Result<B, Error> {
let (result, buf) = self
.file
.as_ref()
.expect("read file after closed")
.read_exact_at(TokioUringBuf { buf }, self.pos)
.await;
result?;
self.pos += buf.buf.bytes_init() as u64;

Ok(buf.buf)
}

async fn size(&self) -> Result<u64, Error> {
let stat = self
.file
.as_ref()
.expect("read file after closed")
.statx()
.await?;
Ok(stat.stx_size)
}
}

impl Seek for TokioUringFile {
async fn seek(&mut self, pos: u64) -> Result<(), Error> {
self.pos = pos;

#[cfg(not(feature = "bytes"))]
return Ok(buf.buf);
#[cfg(feature = "bytes")]
return Ok(bytes::Bytes::from(buf.buf));
Ok(())
}
}

0 comments on commit 560f89b

Please sign in to comment.