Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

O_DIRECT attempt 2! #92

Merged
merged 10 commits into from
Mar 12, 2024
7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ harness = false

[profile.bench]
debug = true # Enable debuginfo when profiling with cargo flamegraph.
panic = 'abort' # Exit the whole process if any thread panics.

[profile.release]
panic = 'abort'
panic = 'abort' # Exit the whole process if any thread panics.

[profile.dev]
panic = 'abort'
panic = 'abort' # Exit the whole process if any thread panics.

[profile.test]
panic = 'abort'
6 changes: 3 additions & 3 deletions benches/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use tokio::runtime::Runtime;

const FILE_SIZE_BYTES: usize = 262_144;
const DATA_PATH: &str = "/tmp/fio/";
const DATA_PATH: &str = "/mnt/t700-2tb/fio/";
const RANGE: Range<isize> = 0..(1024 * 16);

async fn uring_get(filenames: &Vec<ObjectStorePath>, n_iterations: u64) -> Duration {
Expand All @@ -27,7 +27,7 @@ async fn uring_get(filenames: &Vec<ObjectStorePath>, n_iterations: u64) -> Durat
}
for f in futures {
let b = f.await.expect("At least one Result was an Error");
assert_eq!(b.len(), FILE_SIZE_BYTES);
assert_eq!(b.as_slice().len(), FILE_SIZE_BYTES);
}
total_time += start_of_iter.elapsed();
}
Expand All @@ -49,7 +49,7 @@ async fn uring_get_range(filenames: &Vec<ObjectStorePath>, n_iterations: u64) ->
}
for f in futures {
let b = f.await.expect("At least one Result was an Error");
assert_eq!(b.len(), RANGE.len());
assert_eq!(b.as_slice().len(), RANGE.len());
}
total_time += start_of_iter.elapsed();
}
Expand Down
4 changes: 2 additions & 2 deletions flamegraph.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
109 changes: 109 additions & 0 deletions src/aligned_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use core::slice;
use std::alloc;

/// A memory buffer allocated on the heap, where the start position and end position are both
/// aligned to `align` bytes. This is useful for working with O_DIRECT file IO, where the
/// filesystem will often expect the buffer to be aligned to the logical block size (typically 512
/// bytes).
#[derive(Debug)]
pub struct AlignedBuffer {
buf: *mut u8,
len: usize, // The number of bytes requested by the user.
start_offset: usize, // The number of bytes unused at the start of the buffer.
layout: alloc::Layout, // `layout.size()` gives the number of bytes _actually_ allocated,
// which will be a multiple of `align`.
}

unsafe impl Send for AlignedBuffer {}

impl AlignedBuffer {
/// Aligns the start and end of the buffer with `align`.
/// 'align' must not be zero, and must be a power of two.
pub(crate) fn new(len: usize, align: usize, start_offset: usize) -> Self {
assert_ne!(len, 0);
// Let's say the user requests a buffer of len 3 and offset 2; and align is 4:
// index: 0 1 2 3 4 5 6 7
// aligned blocks: |------|------|
// requested: |---|
// In this case, we need to allocate 8 bytes becuase we need to move the start
// backwards to the first byte, and move the end forwards to the eighth byte.
let layout = alloc::Layout::from_size_align(len + (start_offset % align), align)
.expect("failed to create Layout!")
.pad_to_align();
let buf = unsafe { alloc::alloc(layout) };
if buf.is_null() {
alloc::handle_alloc_error(layout);
}
Self {
buf,
len,
start_offset,
layout,
}
}

pub(crate) fn aligned_start_offset(&self) -> usize {
(self.start_offset / self.layout.align()) * self.layout.align()
}

pub(crate) const fn aligned_len(&self) -> usize {
self.layout.size()
}

pub(crate) fn as_ptr(&mut self) -> *mut u8 {
self.buf
}

pub fn as_slice(&self) -> &[u8] {
unsafe {
slice::from_raw_parts(
self.buf.offset(self.start_offset.try_into().unwrap()),
self.len,
)
}
}
}

impl Drop for AlignedBuffer {
fn drop(&mut self) {
unsafe { alloc::dealloc(self.buf, self.layout) };
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_write_and_read() {
// Create a new buffer:
const LEN: usize = 16;
let mut aligned_buf1 = AlignedBuffer::new(LEN, 8, 0);
let mut aligned_buf2 = AlignedBuffer::new(LEN, 8, 0);

// Set the values of the buffer:
{
let ptr1 = aligned_buf1.as_ptr();
let ptr2 = aligned_buf2.as_ptr();
unsafe {
for i in 0..LEN {
*ptr1.offset(i as _) = i as u8;
*ptr2.offset(i as _) = i as u8;
}
}
}
// Read the values back out:
{
let slice1 = aligned_buf1.as_slice();
let slice2 = aligned_buf2.as_slice();
for i in 0..LEN {
assert_eq!(slice1[i], i as u8);
assert_eq!(slice2[i], i as u8);
}
assert_eq!(
slice1,
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
);
}
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod aligned_buffer;
pub mod object_store_adapter;
mod operation;
mod tracker;
Expand Down
23 changes: 13 additions & 10 deletions src/object_store_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use bytes::Bytes;
use object_store::path::Path as ObjectStorePath;
use snafu::{ensure, Snafu};
use std::ffi::CString;
Expand All @@ -13,6 +12,7 @@ use std::thread;
use tokio::sync::oneshot;
use url::Url;

use crate::aligned_buffer::AlignedBuffer;
use crate::operation::{Operation, OperationOutput};
use crate::uring;

Expand Down Expand Up @@ -180,7 +180,7 @@ impl ObjectStoreAdapter {
pub fn get(
&self,
location: &ObjectStorePath,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Bytes>> + Send + Sync>> {
) -> Pin<Box<dyn Future<Output = anyhow::Result<AlignedBuffer>> + Send + Sync>> {
let path = self.config.path_to_filesystem(location).unwrap();
let path = CString::new(path.as_os_str().as_bytes())
.expect("Failed to convert path '{path}' to CString.");
Expand All @@ -192,7 +192,7 @@ impl ObjectStoreAdapter {
Box::pin(async {
let out = rx.await.expect("Sender hung up!");
out.map(|out| match out {
OperationOutput::Get(buffer) => Bytes::from(buffer),
OperationOutput::Get(buffer) => buffer,
_ => panic!("out must be a Get variant!"),
})
})
Expand All @@ -202,7 +202,7 @@ impl ObjectStoreAdapter {
&self,
location: &ObjectStorePath,
range: Range<isize>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Bytes>> + Send + Sync>> {
) -> Pin<Box<dyn Future<Output = anyhow::Result<AlignedBuffer>> + Send + Sync>> {
let path = self.config.path_to_filesystem(location).unwrap();
let path = CString::new(path.as_os_str().as_bytes())
.expect("Failed to convert path '{path}' to CString.");
Expand All @@ -214,7 +214,7 @@ impl ObjectStoreAdapter {
Box::pin(async {
let out = rx.await.expect("Sender hung up!");
out.map(|out| match out {
OperationOutput::GetRange(buffer) => Bytes::from(buffer),
OperationOutput::GetRange(buffer) => buffer,
_ => panic!("out must be a Get variant!"),
})
})
Expand Down Expand Up @@ -258,8 +258,8 @@ mod tests {
let result = future.await;
if i < 2 {
let b = result.unwrap();
println!("GET: Loaded {} bytes", b.len());
println!("GET: {:?}", std::str::from_utf8(&b[..]).unwrap());
println!("GET: Loaded {} bytes", b.as_slice().len());
println!("GET: {:?}", std::str::from_utf8(&b.as_slice()[..]).unwrap());
} else {
let err = result.unwrap_err();
dbg!(&err);
Expand Down Expand Up @@ -287,9 +287,12 @@ mod tests {
let result = future.await;
if i < 2 {
let b = result.unwrap();
println!("GET_RANGE: Loaded {} bytes", b.len());
assert_eq!(b.len(), 90);
println!("GET_RANGE: {:?}", std::str::from_utf8(&b[..]).unwrap());
println!("GET_RANGE: Loaded {} bytes", b.as_slice().len());
assert_eq!(b.as_slice().len(), 90);
println!(
"GET_RANGE: {:?}",
std::str::from_utf8(&b.as_slice()[..]).unwrap()
);
} else {
let err = result.unwrap_err();
dbg!(&err);
Expand Down
7 changes: 4 additions & 3 deletions src/operation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::aligned_buffer::AlignedBuffer;
/// `Operation`s are used to communicate the user's instructions
/// to the backend. The intention is that there will be
/// one `Operation` variant per `ObjectStore` method.
Expand Down Expand Up @@ -26,8 +27,8 @@ pub enum Operation {

#[derive(Debug)]
pub enum OperationOutput {
Get(Vec<u8>),
GetRange(Vec<u8>),
Get(AlignedBuffer),
GetRange(AlignedBuffer),
#[allow(dead_code)] // TODO: Remove this `allow` when we implement GetRange!
GetRanges(Vec<Vec<u8>>),
GetRanges(Vec<AlignedBuffer>),
}
47 changes: 27 additions & 20 deletions src/uring/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use std::ffi::CString;
use std::ops::Range;
use tokio::sync::oneshot;

use crate::operation;
use crate::{aligned_buffer::AlignedBuffer, operation};

const ALIGN: usize = 512;

pub(super) trait Operation {
fn process_cqe(&mut self, cqe: cqueue::Entry);
Expand Down Expand Up @@ -79,12 +81,23 @@ impl InnerState {
}

pub(super) fn send_error(&mut self, error: anyhow::Error) {
let error = error.context(format!("IoUringUserOp = {self:?}"));

if self.error_has_occurred {
eprintln!("The output_channel has already been consumed (probably by sending a previous error)! But a new error has been reported: {error}");
eprintln!("The output_channel has already been consumed (probably by sending a previous error)! But a new error has been reported:");
for cause in error.chain() {
eprintln!("{cause}");
}
return;
}

let error = error.context(format!("IoUringUserOp = {self:?}"));
if self.output_channel.is_none() {
eprintln!("The output_channel has already been consumed, but `error_has_occurred` is false. The `output_channel` was probably consumed by sending a valid output back to the user. The new error is:");
for cause in error.chain() {
eprintln!("{cause}");
}
return;
}

self.output_channel
.take()
Expand All @@ -95,7 +108,7 @@ impl InnerState {

pub(super) fn cqe_error_to_anyhow_error(&self) -> anyhow::Error {
let cqe = self.last_cqe.as_ref().unwrap();
let nix_err = nix::Error::from_i32(-cqe.result());
let nix_err = nix::Error::from_raw(-cqe.result());
anyhow::Error::new(nix_err).context(format!(
"{nix_err} (reported by io_uring completion queue entry (CQE) for opcode = {}, opname = {})",
self.last_opcode.unwrap(), opcode_to_opname(self.last_opcode.unwrap())
Expand Down Expand Up @@ -137,7 +150,7 @@ pub(super) fn build_openat_sqe(path: &CString, index_of_op: usize) -> Vec<squeue
path_ptr,
)
.file_index(Some(file_index))
.flags(libc::O_RDONLY) // | libc::O_DIRECT) // TODO: Re-enable O_DIRECT.
.flags(libc::O_RDONLY | libc::O_DIRECT)
.build()
.user_data(index_of_op | (opcode::OpenAt::CODE as u64));

Expand All @@ -158,17 +171,15 @@ pub(super) fn create_linked_read_close_sqes(
let filesize_bytes = get_filesize_bytes(path.as_c_str());

// Allocate vector:
let mut buffer = Vec::with_capacity(filesize_bytes as _);
let mut buffer = AlignedBuffer::new(filesize_bytes as _, ALIGN, 0);

// Prepare the "read" opcode:
let read_op = opcode::Read::new(*fixed_fd, buffer.as_mut_ptr(), filesize_bytes as u32)
let read_op = opcode::Read::new(*fixed_fd, buffer.as_ptr(), buffer.aligned_len() as u32)
.build()
.user_data(index_of_op | (opcode::Read::CODE as u64))
.flags(squeue::Flags::IO_LINK);

unsafe {
buffer.set_len(filesize_bytes as _);
}
.flags(squeue::Flags::IO_HARDLINK); // We need a _hard_ link because read will fail if we read
// beyond the end of the file, which is very likely to happen when we're using O_DIRECT.
// When using O_DIRECT, the read length has to be a multiple of ALIGN.

// Prepare the "close" opcode:
let close_op = opcode::Close::new(*fixed_fd)
Expand All @@ -192,18 +203,14 @@ pub(super) fn create_linked_read_range_close_sqes(
let index_of_op: u64 = (index_of_op as u64) << 32;

// Allocate vector:
let mut buffer = Vec::with_capacity(range.len());
let mut buffer = AlignedBuffer::new(range.len(), ALIGN, range.start.try_into().unwrap());

// Prepare the "read" opcode:
let read_op = opcode::Read::new(*fixed_fd, buffer.as_mut_ptr(), range.len() as u32)
.offset(range.start as _)
let read_op = opcode::Read::new(*fixed_fd, buffer.as_ptr(), buffer.aligned_len() as u32)
.offset(buffer.aligned_start_offset() as _)
.build()
.user_data(index_of_op | (opcode::Read::CODE as u64))
.flags(squeue::Flags::IO_LINK);

unsafe {
buffer.set_len(range.len());
}
.flags(squeue::Flags::IO_HARDLINK);

// Prepare the "close" opcode:
let close_op = opcode::Close::new(*fixed_fd)
Expand Down