Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancellation Support #89

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::io;

use crate::{key::Key, node_id::NodeMode, ItemId};
use crate::key::Key;
use crate::node_id::NodeMode;
use crate::ItemId;

/// The different set of errors that arroy can encounter.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -61,6 +63,9 @@
/// The item ID queried
item: ItemId,
},

#[error("The corresponding build process has been cancelled")]
BuildCancelled,

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / lint

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (macos-latest-xlarge, beta)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (macos-latest-xlarge, beta)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (macos-latest-xlarge, beta)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (macos-latest-xlarge, stable)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (macos-latest-xlarge, stable)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (macos-latest-xlarge, stable)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, stable)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, stable)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, stable)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, beta)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, beta)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, beta)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest, beta)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest, beta)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest, beta)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest, stable)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest, stable)

missing documentation for a variant

Check warning on line 68 in src/error.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest, stable)

missing documentation for a variant
}

impl Error {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use node::{Node, NodeCodec};
use node_id::{NodeId, NodeMode};
pub use reader::Reader;
pub use stats::{Stats, TreeStats};
pub use writer::Writer;
pub use writer::{TreeBuildCanceller, Writer};

/// The set of types used by the [`Distance`] trait.
pub mod internals {
Expand Down
58 changes: 55 additions & 3 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::any::TypeId;
use std::borrow::Cow;
use std::mem;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use heed::types::{Bytes, DecodeIgnore};
use heed::{MdbError, PutFlags, RoTxn, RwTxn};
Expand Down Expand Up @@ -36,13 +38,15 @@ pub struct Writer<D: Distance> {
dimensions: usize,
/// The folder in which tempfile will write its temporary files.
tmpdir: Option<PathBuf>,
/// An optional boolean flag to cancel the tree building process.
cancelled: Option<TreeBuildCanceller>,
}

impl<D: Distance> Writer<D> {
/// Creates a new writer from a database, index and dimensions.
pub fn new(database: Database<D>, index: u16, dimensions: usize) -> Writer<D> {
let database: Database<D> = database.remap_data_type();
Writer { database, index, dimensions, tmpdir: None }
Writer { database, index, dimensions, tmpdir: None, cancelled: None }
}

/// Returns a writer after having deleted the tree nodes and rewrote all the items
Expand Down Expand Up @@ -73,8 +77,8 @@ impl<D: Distance> Writer<D> {
}
}

let Writer { database, index, dimensions, tmpdir } = self;
Ok(Writer { database: database.remap_data_type(), index, dimensions, tmpdir })
let Writer { database, index, dimensions, tmpdir, cancelled } = self;
Ok(Writer { database: database.remap_data_type(), index, dimensions, tmpdir, cancelled })
}

/// Specifies the folder in which arroy will write temporary files when building the tree.
Expand All @@ -85,6 +89,15 @@ impl<D: Distance> Writer<D> {
self.tmpdir = Some(path.into());
}

/// Returns a canceller for the tree building process that can be used later and transferred between threads.
pub fn tree_build_canceller(&mut self) -> TreeBuildCanceller {
self.cancelled
.get_or_insert_with(|| TreeBuildCanceller {
must_stop: Arc::new(AtomicBool::new(false)),
})
.clone()
}

/// Returns an `Option`al vector previous stored in this database.
pub fn item_vector(&self, rtxn: &RoTxn, item: ItemId) -> Result<Option<Vec<f32>>> {
Ok(item_leaf(self.database, self.index, rtxn, item)?.map(|leaf| leaf.vector.into_owned()))
Expand Down Expand Up @@ -377,6 +390,10 @@ impl<D: Distance> Writer<D> {

log::debug!("started updating the tree nodes of {} trees...", tmp_nodes.len());
for (i, tmp_node) in nodes_to_write.iter().enumerate() {
if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) {
return Err(Error::BuildCancelled);
}

log::debug!(
"started deleting the {} tree nodes of the {i}nth trees...",
tmp_node.len()
Expand Down Expand Up @@ -424,6 +441,9 @@ impl<D: Distance> Writer<D> {
&Key::metadata(self.index),
&metadata,
) {
Ok(_) if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) => {
Err(Error::BuildCancelled)
}
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
Expand All @@ -442,6 +462,10 @@ impl<D: Distance> Writer<D> {
repeatn(rng.next_u64(), metadata.roots.len())
.zip(roots)
.map(|(seed, root)| {
if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) {
return Err(Error::BuildCancelled);
}

log::debug!("started updating tree {root:X}...");
let mut rng = R::seed_from_u64(seed.wrapping_add(root as u64));
let mut tmp_nodes: TmpNodes<NodeCodec<D>> = match self.tmpdir.as_ref() {
Expand Down Expand Up @@ -647,6 +671,10 @@ impl<D: Distance> Writer<D> {
None => concurrent_node_ids.used() < n_items,
})
.map(|(i, seed)| {
if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) {
return Err(Error::BuildCancelled);
}

log::debug!("started generating tree {i:X}...");
let mut rng = R::seed_from_u64(seed.wrapping_add(i as u64));
let mut tmp_nodes = match self.tmpdir.as_ref() {
Expand Down Expand Up @@ -766,6 +794,9 @@ impl<D: Distance> Writer<D> {
log::debug!("Deleting {} trees", to_delete.len());

for tree in to_delete {
if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) {
return Err(Error::BuildCancelled);
}
self.delete_tree(wtxn, NodeId::tree(tree))?;
}
}
Expand Down Expand Up @@ -806,6 +837,27 @@ impl<D: Distance> Writer<D> {
}
}

/// A canceller for the tree building process that can be used to stop the operation early.
#[derive(Debug, Clone)]
pub struct TreeBuildCanceller {
/// The flag that, when set to true, triggers the engine to stop early.
// TODO replace this with a Weak and keep an Arc in the Writer.
// or maybe even a ref that can be shared.
must_stop: Arc<AtomicBool>,
}

impl TreeBuildCanceller {
/// Cancels the tree building process as early as possible.
pub fn cancel(&self) {
self.must_stop.store(true, Ordering::SeqCst);
}

/// Wether the corresponding build process has been canceled.
fn is_cancelled(&self) -> bool {
self.must_stop.load(Ordering::SeqCst)
}
}

/// Represents the final version of the leafs and contains
/// useful informations to synchronize the building threads.
#[derive(Clone)]
Expand Down
Loading