Skip to content

Commit

Permalink
Merge #1549
Browse files Browse the repository at this point in the history
1549: Fixing I/O engine crashes r=dsavitskiy a=dsavitskiy



Co-authored-by: Dmitry Savitskiy <[email protected]>
  • Loading branch information
mayastor-bors and dsavitskiy committed Dec 5, 2023
2 parents 6c12c66 + bd8335d commit 033dc9d
Show file tree
Hide file tree
Showing 37 changed files with 471 additions and 130 deletions.
31 changes: 29 additions & 2 deletions ci.nix
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{ nospdk ? false, norust ? false, spdk_rel ? false }:
{ nospdk ? false, norust ? false, spdk_rel ? false, asan ? false }:
let
sources = import ./nix/sources.nix;
pkgs = import sources.nixpkgs {
Expand Down Expand Up @@ -61,7 +61,8 @@ mkShell {
xfsprogs
yasm
] ++ (if (nospdk) then [ spdk.buildInputs ] else [ spdk ])
++ pkgs.lib.optional (!norust) channel.stable
++ pkgs.lib.optional (!norust && asan) channel.asan
++ pkgs.lib.optional (!norust && !asan) channel.stable
++ pkgs.lib.optional (!norust) channel.nightly;

RUST_NIGHTLY_PATH = channel.nightly;
Expand All @@ -72,12 +73,38 @@ mkShell {
FIO_SPDK = if nospdk then null else "${spdk}/fio/spdk_nvme";
ETCD_BIN = "${etcd}/bin/etcd";

IO_ENGINE_DIR = if asan then "target/x86_64-unknown-linux-gnu/debug" else "target/debug";

# ASAN-related Cargo settings.
ASAN_ENABLE = if asan then "1" else null;
ASAN_OPTIONS = if asan then "detect_leaks=0" else null;
ASAN_BUILD_ENV = if asan then "shell" else null;
RUSTFLAGS = if asan then "-Zsanitizer=address" else null;
CARGO_BUILD_RUSTFLAGS = if asan then "-Zbuild-std" else null;
CARGO_BUILD_TARGET = if asan then "x86_64-unknown-linux-gnu" else null;
CARGO_PROFILE_DEV_PANIC = if asan then "unwind" else null;
RUST_BACKTRACE = if asan then "full" else null;

shellHook = ''
${pkgs.lib.optionalString (asan) "export LLVM_SYMBOLIZER_DIR=$(dirname $(realpath $(which llvm-symbolizer)))"}
${pkgs.lib.optionalString (asan) "echo 'AddressSanitizer is enabled, forcing nightly rustc.'"}
${pkgs.lib.optionalString (asan) "echo ' ASAN_ENABLE :' $\{ASAN_ENABLE\}"}
${pkgs.lib.optionalString (asan) "echo ' ASAN_OPTIONS :' $\{ASAN_OPTIONS\}"}
${pkgs.lib.optionalString (asan) "echo ' RUSTFLAGS :' $\{RUSTFLAGS\}"}
${pkgs.lib.optionalString (asan) "echo ' CARGO_BUILD_RUSTFLAGS :' $\{CARGO_BUILD_RUSTFLAGS\}"}
${pkgs.lib.optionalString (asan) "echo ' CARGO_BUILD_TARGET :' $\{CARGO_BUILD_TARGET\}"}
${pkgs.lib.optionalString (asan) "echo ' CARGO_PROFILE_DEV_PANIC :' $\{CARGO_PROFILE_DEV_PANIC\}"}
${pkgs.lib.optionalString (asan) "echo ' RUST_BACKTRACE :' $\{RUST_BACKTRACE\}"}
${pkgs.lib.optionalString (asan) "echo ' LLVM_SYMBOLIZER_DIR :' $\{LLVM_SYMBOLIZER_DIR\}"}
${pkgs.lib.optionalString (asan) "echo"}
${pkgs.lib.optionalString (!nospdk) "echo 'SPDK version :' $(echo $SPDK_PATH | sed 's/.*libspdk-//g')"}
${pkgs.lib.optionalString (!nospdk) "echo 'SPDK path :' $SPDK_PATH"}
${pkgs.lib.optionalString (!nospdk) "echo 'SPDK FIO plugin :' $FIO_SPDK"}
${pkgs.lib.optionalString (!norust) "echo 'Rust version :' $(rustc --version 2> /dev/null || echo '${norustc_msg}')"}
${pkgs.lib.optionalString (!norust) "echo 'Rust path :' $(which rustc 2> /dev/null || echo '${norustc_msg}')"}
echo 'I/O engine dir :' $IO_ENGINE_DIR
${pkgs.lib.optionalString (nospdk) "cowsay ${nospdk_moth}"}
${pkgs.lib.optionalString (nospdk) "export CFLAGS=-msse4"}
${pkgs.lib.optionalString (nospdk) "echo"}
Expand Down
17 changes: 16 additions & 1 deletion io-engine/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,10 +851,11 @@ impl<'n> Nexus<'n> {
let name = self.name.clone();

// After calling unregister_bdev_async(), Nexus is gone.
let evt = self.event(EventAction::Delete);
match self.as_mut().bdev_mut().unregister_bdev_async().await {
Ok(_) => {
info!("Nexus '{name}': nexus destroyed ok");
self.event(EventAction::Delete).generate();
evt.generate();
Ok(())
}
Err(err) => {
Expand Down Expand Up @@ -1142,6 +1143,20 @@ impl<'n> Nexus<'n> {
unsafe { Pin::new_unchecked(self.bdev_mut()) }
}

/// Gets a nexus reference from an untyped bdev.
/// # Warning:
/// No checks are performed (e.g. bdev module name check), as it is assumed
/// that the provided bdev is a nexus bdev.
#[inline(always)]
pub(crate) unsafe fn unsafe_from_untyped_bdev(
bdev: spdk_rs::UntypedBdev,
) -> &'n Nexus<'n> {
spdk_rs::Bdev::<Nexus<'n>>::unsafe_from_inner_ptr(
bdev.unsafe_inner_ptr() as *mut _,
)
.data()
}

/// Sets the required alignment of the Nexus.
pub(crate) unsafe fn set_required_alignment(
self: Pin<&mut Self>,
Expand Down
70 changes: 56 additions & 14 deletions io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,10 @@ impl<'n> Nexus<'n> {
// Close and remove the child.
let res = match self.lookup_child(uri) {
Some(child) => {
// Remove child from the I/O path.
// Detach the child from the I/O path, and close its handles.
if let Some(device) = child.get_device_name() {
self.disconnect_device(&device).await;
self.detach_device(&device).await;
self.disconnect_all_detached_devices().await;
}

// Close child's device.
Expand Down Expand Up @@ -974,18 +975,39 @@ impl<'n> Nexus<'n> {
return Ok(());
}

// Disconnect the device from all the channels.
// Detach the device from all the channels.
//
// After disconnecting, the device will no longer be used by the
// channels, and all I/Os failing due to this device will eventually
// resubmit and succeeded (if any healthy children are left).
self.disconnect_device(&dev).await;
//
// Device disconnection is done in two steps (detach, than disconnect)
// in order to prevent an I/O race when retiring a device.
self.detach_device(&dev).await;

// Disconnect the devices with failed controllers _before_ pause,
// otherwise pause would stuck. Keep all controoled that are _not_
// failed (e.g., in the case I/O failed due to ENOSPC).
self.traverse_io_channels_async((), |channel, _| {
channel.disconnect_detached_devices(|h| h.is_ctrlr_failed());
})
.await;

// Destroy (close) the device. The subsystem must be paused to do this
// properly.
// Disconnect, destroy and close the device. The subsystem must be
// paused to do this properly.
{
debug!("{self:?}: retire: pausing...");
self.as_mut().pause().await?;
debug!("{self:?}: retire: pausing ok");
let res = self.as_mut().pause().await;
match &res {
Ok(_) => debug!("{self:?}: retire: pausing ok"),
Err(e) => warn!("{self:?}: retire: pausing failed: {e}"),
};

// Disconnect the all previously detached device handles. This has
// to be done after the pause to prevent an I/O race.
self.disconnect_all_detached_devices().await;

res?;

self.child_retire_destroy_device(&dev).await;

Expand Down Expand Up @@ -1055,20 +1077,39 @@ impl<'n> Nexus<'n> {
Ok(())
}

/// Disconnects a device from all I/O channels.
pub(crate) async fn disconnect_device(&self, dev: &str) {
/// Detaches the device's handles from all I/O channels.
///
/// The detached handles must be disconnected and dropped by a
/// `disconnect_detached_devices()` call.
///
/// Device disconnection is done in two steps (detach, than disconnect) in
/// order to prevent an I/O race when retiring a device.
pub(crate) async fn detach_device(&self, dev: &str) {
if !self.has_io_device {
return;
}

debug!("{self:?}: disconnecting '{dev}' from all channels ...");
debug!("{self:?}: detaching '{dev}' from all channels...");

self.traverse_io_channels_async(dev, |channel, dev| {
channel.disconnect_device(dev);
channel.detach_device(dev);
})
.await;

debug!("{self:?}: '{dev}' detached from all I/O channels");
}

/// Disconnects all the detached devices on all I/O channels by dropping
/// their handles.
pub(crate) async fn disconnect_all_detached_devices(&self) {
debug!("{self:?}: disconnecting all detached devices ...");

self.traverse_io_channels_async((), |channel, _| {
channel.disconnect_detached_devices(|_| true);
})
.await;

debug!("{self:?}: '{dev}' disconnected from all I/O channels");
debug!("{self:?}: disconnected all detached devices");
}

/// Destroys the device being retired.
Expand Down Expand Up @@ -1143,7 +1184,8 @@ impl<'n> Nexus<'n> {

// Step 1: Close I/O channels for all children.
for dev in nexus.child_devices() {
nexus.disconnect_device(&dev).await;
nexus.detach_device(&dev).await;
nexus.disconnect_all_detached_devices().await;

device_cmd_queue().enqueue(DeviceCommand::RetireDevice {
nexus_name: nexus.name.clone(),
Expand Down
57 changes: 50 additions & 7 deletions io-engine/src/bdev/nexus/nexus_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use spdk_rs::Thread;
pub struct NexusChannel<'n> {
writers: Vec<Box<dyn BlockDeviceHandle>>,
readers: Vec<Box<dyn BlockDeviceHandle>>,
detached: Vec<Box<dyn BlockDeviceHandle>>,
io_logs: Vec<IOLogChannel>,
previous_reader: UnsafeCell<usize>,
fail_fast: u32,
Expand Down Expand Up @@ -123,6 +124,7 @@ impl<'n> NexusChannel<'n> {
Self {
writers,
readers,
detached: Vec::new(),
io_logs: nexus.io_log_channels(),
previous_reader: UnsafeCell::new(0),
nexus: unsafe { nexus.pinned_mut() },
Expand Down Expand Up @@ -209,16 +211,57 @@ impl<'n> NexusChannel<'n> {
}
}

/// Disconnects a child device from the I/O path.
pub fn disconnect_device(&mut self, device_name: &str) {
/// Detaches a child device from this I/O channel, moving the device's
/// handles to the list of detached devices to disconnect later.
///
/// The detached handles must be disconnected and dropped by a
/// `disconnect_detached_devices()` call.
pub(super) fn detach_device(&mut self, device_name: &str) {
self.previous_reader = UnsafeCell::new(0);

self.readers
.retain(|c| c.get_device().device_name() != device_name);
self.writers
.retain(|c| c.get_device().device_name() != device_name);
if let Some(d) = self
.readers
.iter()
.position(|c| c.get_device().device_name() == device_name)
{
let t = self.readers.remove(d);
self.detached.push(t);
}

if let Some(d) = self
.writers
.iter()
.position(|c| c.get_device().device_name() == device_name)
{
let t = self.writers.remove(d);
self.detached.push(t);
}

debug!("{self:?}: device '{device_name}' detached");
}

/// Disconnects previously detached device handles by dropping them.
/// Devices to drop are filtered by the given predicate: true to drop
/// a device, false to keep it.
pub(super) fn disconnect_detached_devices<F>(&mut self, mut drop_pred: F)
where
F: FnMut(&dyn BlockDeviceHandle) -> bool,
{
let n = self.detached.len();
info!("{self:?}: disconnecting {n} detached device handles...");

self.detached.retain(|h| !drop_pred(h.as_ref()));

debug!("{self:?}: device '{device_name}' disconnected");
let m = self.detached.len();
if m == 0 {
info!("{self:?}: all detached device handles disconnected");
} else {
let d = n - m;
info!(
"{self:?}: {d} detached device handle(s) disconnected, \
{m} remain(s)"
);
}
}

/// Refreshing our channels simply means that we either have a child going
Expand Down
3 changes: 2 additions & 1 deletion io-engine/src/bdev/nexus/nexus_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,8 @@ impl<'n> NexusBio<'n> {
// set the IO as failed in the submission stage.
self.ctx_mut().failed += 1;

self.channel_mut().disconnect_device(&device);
self.channel_mut().detach_device(&device);
self.channel_mut().disconnect_detached_devices(|_| true);

if let Some(log) = self.fault_device(
&device,
Expand Down
2 changes: 2 additions & 0 deletions io-engine/src/bdev/nvmx/controller_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ impl SpdkNvmeController {
}
}

/// Returns a pointer to the underlying SPDK struct.
#[inline(always)]
pub fn as_ptr(&self) -> *mut spdk_nvme_ctrlr {
self.0.as_ptr()
}
Expand Down
5 changes: 5 additions & 0 deletions io-engine/src/bdev/nvmx/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,11 @@ impl BlockDeviceHandle for NvmeDeviceHandle {
let id = inner.ext_host_id();
Ok(*id)
}

/// Determines if the underlying controller is failed.
fn is_ctrlr_failed(&self) -> bool {
self.ctrlr.is_failed
}
}

impl Drop for NvmeDeviceHandle {
Expand Down
5 changes: 5 additions & 0 deletions io-engine/src/core/block_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,11 @@ pub trait BlockDeviceHandle {
cb: IoCompletionCallback,
cb_arg: IoCompletionCallbackArg,
) -> Result<(), CoreError>;

/// Determines if the underlying controller is failed.
fn is_ctrlr_failed(&self) -> bool {
false
}
}

fn block_device_io_completion(
Expand Down
26 changes: 26 additions & 0 deletions io-engine/src/core/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,10 @@ impl MayastorEnvironment {
// setup the logger as soon as possible
self.init_logger();

if option_env!("ASAN_ENABLE").unwrap_or_default() == "1" {
print_asan_env();
}

self.load_yaml_config();

if let Some(ptpl) = &self.ptpl_dir {
Expand Down Expand Up @@ -1064,3 +1068,25 @@ fn make_hostnqn(node_name: Option<&String>) -> Option<String> {
node_name.map(|n| format!("{NVME_NQN_PREFIX}:node-name:{n}"))
})
}

fn print_asan_env() {
fn print_var(s: &str, v: Option<&str>) {
let v = v.unwrap_or_default();
info!(" {s:25} = {v}");
}

warn!("Compiled with Address Sanitizer enabled");
print_var("ASAN_OPTIONS", option_env!("ASAN_OPTIONS"));
print_var("ASAN_BUILD_ENV", option_env!("ASAN_BUILD_ENV"));
print_var("RUSTFLAGS", option_env!("RUSTFLAGS"));
print_var(
"CARGO_BUILD_RUSTFLAGS",
option_env!("CARGO_BUILD_RUSTFLAGS"),
);
print_var("CARGO_BUILD_TARGET", option_env!("CARGO_BUILD_TARGET"));
print_var(
"CARGO_PROFILE_DEV_PANIC",
option_env!("CARGO_PROFILE_DEV_PANIC"),
);
print_var("RUST_BACKTRACE", option_env!("RUST_BACKTRACE"));
}
7 changes: 4 additions & 3 deletions io-engine/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ impl ToErrno for CoreError {

/// Logical volume layer failure.
#[derive(Debug, Copy, Clone, Eq, PartialOrd, PartialEq)]
#[repr(C)]
pub enum LvolFailure {
NoSpace,
}
Expand All @@ -472,6 +473,7 @@ pub enum IoSubmissionFailure {
// Generic I/O completion status for block devices, which supports per-protocol
// error domains.
#[derive(Copy, Clone, Eq, PartialOrd, PartialEq)]
#[repr(C)]
pub enum IoCompletionStatus {
Success,
NvmeError(NvmeStatus),
Expand Down Expand Up @@ -501,10 +503,9 @@ impl From<NvmeStatus> for IoCompletionStatus {
match s {
NvmeStatus::NO_SPACE
| NvmeStatus::Generic(SPDK_NVME_SC_CAPACITY_EXCEEDED) => {
IoCompletionStatus::LvolError(LvolFailure::NoSpace)
Self::LvolError(LvolFailure::NoSpace)
}

_ => IoCompletionStatus::NvmeError(s),
_ => Self::NvmeError(s),
}
}
}
Expand Down
Loading

0 comments on commit 033dc9d

Please sign in to comment.