Skip to content

Commit

Permalink
Merge pull request #92 from JackKelly/O_DIRECT_attempt_2
Browse files Browse the repository at this point in the history
O_DIRECT attempt 2!
  • Loading branch information
JackKelly authored Mar 12, 2024
2 parents d303b00 + 42de105 commit 8e1553d
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 43 deletions.
7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ harness = false

[profile.bench]
debug = true # Enable debuginfo when profiling with cargo flamegraph.
panic = 'abort' # Exit the whole process if any thread panics.

[profile.release]
panic = 'abort'
panic = 'abort' # Exit the whole process if any thread panics.

[profile.dev]
panic = 'abort'
panic = 'abort' # Exit the whole process if any thread panics.

[profile.test]
panic = 'abort'
6 changes: 3 additions & 3 deletions benches/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use tokio::runtime::Runtime;

const FILE_SIZE_BYTES: usize = 262_144;
const DATA_PATH: &str = "/tmp/fio/";
const DATA_PATH: &str = "/mnt/t700-2tb/fio/";
const RANGE: Range<isize> = 0..(1024 * 16);

async fn uring_get(filenames: &Vec<ObjectStorePath>, n_iterations: u64) -> Duration {
Expand All @@ -27,7 +27,7 @@ async fn uring_get(filenames: &Vec<ObjectStorePath>, n_iterations: u64) -> Durat
}
for f in futures {
let b = f.await.expect("At least one Result was an Error");
assert_eq!(b.len(), FILE_SIZE_BYTES);
assert_eq!(b.as_slice().len(), FILE_SIZE_BYTES);
}
total_time += start_of_iter.elapsed();
}
Expand All @@ -49,7 +49,7 @@ async fn uring_get_range(filenames: &Vec<ObjectStorePath>, n_iterations: u64) ->
}
for f in futures {
let b = f.await.expect("At least one Result was an Error");
assert_eq!(b.len(), RANGE.len());
assert_eq!(b.as_slice().len(), RANGE.len());
}
total_time += start_of_iter.elapsed();
}
Expand Down
4 changes: 2 additions & 2 deletions flamegraph.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
109 changes: 109 additions & 0 deletions src/aligned_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use core::slice;
use std::alloc;

/// A memory buffer allocated on the heap, where the start position and end position are both
/// aligned to `align` bytes. This is useful for working with O_DIRECT file IO, where the
/// filesystem will often expect the buffer to be aligned to the logical block size (typically 512
/// bytes).
#[derive(Debug)]
pub struct AlignedBuffer {
buf: *mut u8,
len: usize, // The number of bytes requested by the user.
start_offset: usize, // The number of bytes unused at the start of the buffer.
layout: alloc::Layout, // `layout.size()` gives the number of bytes _actually_ allocated,
// which will be a multiple of `align`.
}

unsafe impl Send for AlignedBuffer {}

impl AlignedBuffer {
/// Aligns the start and end of the buffer with `align`.
/// 'align' must not be zero, and must be a power of two.
pub(crate) fn new(len: usize, align: usize, start_offset: usize) -> Self {
assert_ne!(len, 0);
// Let's say the user requests a buffer of len 3 and offset 2; and align is 4:
// index: 0 1 2 3 4 5 6 7
// aligned blocks: |------|------|
// requested: |---|
// In this case, we need to allocate 8 bytes becuase we need to move the start
// backwards to the first byte, and move the end forwards to the eighth byte.
let layout = alloc::Layout::from_size_align(len + (start_offset % align), align)
.expect("failed to create Layout!")
.pad_to_align();
let buf = unsafe { alloc::alloc(layout) };
if buf.is_null() {
alloc::handle_alloc_error(layout);
}
Self {
buf,
len,
start_offset,
layout,
}
}

pub(crate) fn aligned_start_offset(&self) -> usize {
(self.start_offset / self.layout.align()) * self.layout.align()
}

pub(crate) const fn aligned_len(&self) -> usize {
self.layout.size()
}

pub(crate) fn as_ptr(&mut self) -> *mut u8 {
self.buf
}

pub fn as_slice(&self) -> &[u8] {
unsafe {
slice::from_raw_parts(
self.buf.offset(self.start_offset.try_into().unwrap()),
self.len,
)
}
}
}

impl Drop for AlignedBuffer {
fn drop(&mut self) {
unsafe { alloc::dealloc(self.buf, self.layout) };
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_write_and_read() {
// Create a new buffer:
const LEN: usize = 16;
let mut aligned_buf1 = AlignedBuffer::new(LEN, 8, 0);
let mut aligned_buf2 = AlignedBuffer::new(LEN, 8, 0);

// Set the values of the buffer:
{
let ptr1 = aligned_buf1.as_ptr();
let ptr2 = aligned_buf2.as_ptr();
unsafe {
for i in 0..LEN {
*ptr1.offset(i as _) = i as u8;
*ptr2.offset(i as _) = i as u8;
}
}
}
// Read the values back out:
{
let slice1 = aligned_buf1.as_slice();
let slice2 = aligned_buf2.as_slice();
for i in 0..LEN {
assert_eq!(slice1[i], i as u8);
assert_eq!(slice2[i], i as u8);
}
assert_eq!(
slice1,
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
);
}
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod aligned_buffer;
pub mod object_store_adapter;
mod operation;
mod tracker;
Expand Down
23 changes: 13 additions & 10 deletions src/object_store_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use bytes::Bytes;
use object_store::path::Path as ObjectStorePath;
use snafu::{ensure, Snafu};
use std::ffi::CString;
Expand All @@ -13,6 +12,7 @@ use std::thread;
use tokio::sync::oneshot;
use url::Url;

use crate::aligned_buffer::AlignedBuffer;
use crate::operation::{Operation, OperationOutput};
use crate::uring;

Expand Down Expand Up @@ -180,7 +180,7 @@ impl ObjectStoreAdapter {
pub fn get(
&self,
location: &ObjectStorePath,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Bytes>> + Send + Sync>> {
) -> Pin<Box<dyn Future<Output = anyhow::Result<AlignedBuffer>> + Send + Sync>> {
let path = self.config.path_to_filesystem(location).unwrap();
let path = CString::new(path.as_os_str().as_bytes())
.expect("Failed to convert path '{path}' to CString.");
Expand All @@ -192,7 +192,7 @@ impl ObjectStoreAdapter {
Box::pin(async {
let out = rx.await.expect("Sender hung up!");
out.map(|out| match out {
OperationOutput::Get(buffer) => Bytes::from(buffer),
OperationOutput::Get(buffer) => buffer,
_ => panic!("out must be a Get variant!"),
})
})
Expand All @@ -202,7 +202,7 @@ impl ObjectStoreAdapter {
&self,
location: &ObjectStorePath,
range: Range<isize>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Bytes>> + Send + Sync>> {
) -> Pin<Box<dyn Future<Output = anyhow::Result<AlignedBuffer>> + Send + Sync>> {
let path = self.config.path_to_filesystem(location).unwrap();
let path = CString::new(path.as_os_str().as_bytes())
.expect("Failed to convert path '{path}' to CString.");
Expand All @@ -214,7 +214,7 @@ impl ObjectStoreAdapter {
Box::pin(async {
let out = rx.await.expect("Sender hung up!");
out.map(|out| match out {
OperationOutput::GetRange(buffer) => Bytes::from(buffer),
OperationOutput::GetRange(buffer) => buffer,
_ => panic!("out must be a Get variant!"),
})
})
Expand Down Expand Up @@ -258,8 +258,8 @@ mod tests {
let result = future.await;
if i < 2 {
let b = result.unwrap();
println!("GET: Loaded {} bytes", b.len());
println!("GET: {:?}", std::str::from_utf8(&b[..]).unwrap());
println!("GET: Loaded {} bytes", b.as_slice().len());
println!("GET: {:?}", std::str::from_utf8(&b.as_slice()[..]).unwrap());
} else {
let err = result.unwrap_err();
dbg!(&err);
Expand Down Expand Up @@ -287,9 +287,12 @@ mod tests {
let result = future.await;
if i < 2 {
let b = result.unwrap();
println!("GET_RANGE: Loaded {} bytes", b.len());
assert_eq!(b.len(), 90);
println!("GET_RANGE: {:?}", std::str::from_utf8(&b[..]).unwrap());
println!("GET_RANGE: Loaded {} bytes", b.as_slice().len());
assert_eq!(b.as_slice().len(), 90);
println!(
"GET_RANGE: {:?}",
std::str::from_utf8(&b.as_slice()[..]).unwrap()
);
} else {
let err = result.unwrap_err();
dbg!(&err);
Expand Down
7 changes: 4 additions & 3 deletions src/operation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::aligned_buffer::AlignedBuffer;
/// `Operation`s are used to communicate the user's instructions
/// to the backend. The intention is that there will be
/// one `Operation` variant per `ObjectStore` method.
Expand Down Expand Up @@ -26,8 +27,8 @@ pub enum Operation {

#[derive(Debug)]
pub enum OperationOutput {
Get(Vec<u8>),
GetRange(Vec<u8>),
Get(AlignedBuffer),
GetRange(AlignedBuffer),
#[allow(dead_code)] // TODO: Remove this `allow` when we implement GetRange!
GetRanges(Vec<Vec<u8>>),
GetRanges(Vec<AlignedBuffer>),
}
47 changes: 27 additions & 20 deletions src/uring/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use std::ffi::CString;
use std::ops::Range;
use tokio::sync::oneshot;

use crate::operation;
use crate::{aligned_buffer::AlignedBuffer, operation};

const ALIGN: usize = 512;

pub(super) trait Operation {
fn process_cqe(&mut self, cqe: cqueue::Entry);
Expand Down Expand Up @@ -79,12 +81,23 @@ impl InnerState {
}

pub(super) fn send_error(&mut self, error: anyhow::Error) {
let error = error.context(format!("IoUringUserOp = {self:?}"));

if self.error_has_occurred {
eprintln!("The output_channel has already been consumed (probably by sending a previous error)! But a new error has been reported: {error}");
eprintln!("The output_channel has already been consumed (probably by sending a previous error)! But a new error has been reported:");
for cause in error.chain() {
eprintln!("{cause}");
}
return;
}

let error = error.context(format!("IoUringUserOp = {self:?}"));
if self.output_channel.is_none() {
eprintln!("The output_channel has already been consumed, but `error_has_occurred` is false. The `output_channel` was probably consumed by sending a valid output back to the user. The new error is:");
for cause in error.chain() {
eprintln!("{cause}");
}
return;
}

self.output_channel
.take()
Expand All @@ -95,7 +108,7 @@ impl InnerState {

pub(super) fn cqe_error_to_anyhow_error(&self) -> anyhow::Error {
let cqe = self.last_cqe.as_ref().unwrap();
let nix_err = nix::Error::from_i32(-cqe.result());
let nix_err = nix::Error::from_raw(-cqe.result());
anyhow::Error::new(nix_err).context(format!(
"{nix_err} (reported by io_uring completion queue entry (CQE) for opcode = {}, opname = {})",
self.last_opcode.unwrap(), opcode_to_opname(self.last_opcode.unwrap())
Expand Down Expand Up @@ -137,7 +150,7 @@ pub(super) fn build_openat_sqe(path: &CString, index_of_op: usize) -> Vec<squeue
path_ptr,
)
.file_index(Some(file_index))
.flags(libc::O_RDONLY) // | libc::O_DIRECT) // TODO: Re-enable O_DIRECT.
.flags(libc::O_RDONLY | libc::O_DIRECT)
.build()
.user_data(index_of_op | (opcode::OpenAt::CODE as u64));

Expand All @@ -158,17 +171,15 @@ pub(super) fn create_linked_read_close_sqes(
let filesize_bytes = get_filesize_bytes(path.as_c_str());

// Allocate vector:
let mut buffer = Vec::with_capacity(filesize_bytes as _);
let mut buffer = AlignedBuffer::new(filesize_bytes as _, ALIGN, 0);

// Prepare the "read" opcode:
let read_op = opcode::Read::new(*fixed_fd, buffer.as_mut_ptr(), filesize_bytes as u32)
let read_op = opcode::Read::new(*fixed_fd, buffer.as_ptr(), buffer.aligned_len() as u32)
.build()
.user_data(index_of_op | (opcode::Read::CODE as u64))
.flags(squeue::Flags::IO_LINK);

unsafe {
buffer.set_len(filesize_bytes as _);
}
.flags(squeue::Flags::IO_HARDLINK); // We need a _hard_ link because read will fail if we read
// beyond the end of the file, which is very likely to happen when we're using O_DIRECT.
// When using O_DIRECT, the read length has to be a multiple of ALIGN.

// Prepare the "close" opcode:
let close_op = opcode::Close::new(*fixed_fd)
Expand All @@ -192,18 +203,14 @@ pub(super) fn create_linked_read_range_close_sqes(
let index_of_op: u64 = (index_of_op as u64) << 32;

// Allocate vector:
let mut buffer = Vec::with_capacity(range.len());
let mut buffer = AlignedBuffer::new(range.len(), ALIGN, range.start.try_into().unwrap());

// Prepare the "read" opcode:
let read_op = opcode::Read::new(*fixed_fd, buffer.as_mut_ptr(), range.len() as u32)
.offset(range.start as _)
let read_op = opcode::Read::new(*fixed_fd, buffer.as_ptr(), buffer.aligned_len() as u32)
.offset(buffer.aligned_start_offset() as _)
.build()
.user_data(index_of_op | (opcode::Read::CODE as u64))
.flags(squeue::Flags::IO_LINK);

unsafe {
buffer.set_len(range.len());
}
.flags(squeue::Flags::IO_HARDLINK);

// Prepare the "close" opcode:
let close_op = opcode::Close::new(*fixed_fd)
Expand Down

0 comments on commit 8e1553d

Please sign in to comment.