diff --git a/src/error.rs b/src/error.rs index db14e071..0ea01834 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,8 @@ use std::io; -use crate::{key::Key, node_id::NodeMode, ItemId}; +use crate::key::Key; +use crate::node_id::NodeMode; +use crate::ItemId; /// The different set of errors that arroy can encounter. #[derive(Debug, thiserror::Error)] @@ -61,6 +63,9 @@ pub enum Error { /// The item ID queried item: ItemId, }, + + #[error("The corresponding build process has been cancelled")] + BuildCancelled, } impl Error { diff --git a/src/lib.rs b/src/lib.rs index 46a61e12..1fb7c94c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,7 +97,7 @@ use node::{Node, NodeCodec}; use node_id::{NodeId, NodeMode}; pub use reader::Reader; pub use stats::{Stats, TreeStats}; -pub use writer::Writer; +pub use writer::{TreeBuildCanceller, Writer}; /// The set of types used by the [`Distance`] trait. pub mod internals { diff --git a/src/writer.rs b/src/writer.rs index b7df0400..d51af8bc 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -2,6 +2,8 @@ use std::any::TypeId; use std::borrow::Cow; use std::mem; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use heed::types::{Bytes, DecodeIgnore}; use heed::{MdbError, PutFlags, RoTxn, RwTxn}; @@ -36,13 +38,15 @@ pub struct Writer { dimensions: usize, /// The folder in which tempfile will write its temporary files. tmpdir: Option, + /// An optional boolean flag to cancel the tree building process. + cancelled: Option, } impl Writer { /// Creates a new writer from a database, index and dimensions. pub fn new(database: Database, index: u16, dimensions: usize) -> Writer { let database: Database = database.remap_data_type(); - Writer { database, index, dimensions, tmpdir: None } + Writer { database, index, dimensions, tmpdir: None, cancelled: None } } /// Returns a writer after having deleted the tree nodes and rewrote all the items @@ -73,8 +77,8 @@ impl Writer { } } - let Writer { database, index, dimensions, tmpdir } = self; - Ok(Writer { database: database.remap_data_type(), index, dimensions, tmpdir }) + let Writer { database, index, dimensions, tmpdir, cancelled } = self; + Ok(Writer { database: database.remap_data_type(), index, dimensions, tmpdir, cancelled }) } /// Specifies the folder in which arroy will write temporary files when building the tree. @@ -85,6 +89,15 @@ impl Writer { self.tmpdir = Some(path.into()); } + /// Returns a canceller for the tree building process that can be used later and transferred between threads. + pub fn tree_build_canceller(&mut self) -> TreeBuildCanceller { + self.cancelled + .get_or_insert_with(|| TreeBuildCanceller { + must_stop: Arc::new(AtomicBool::new(false)), + }) + .clone() + } + /// Returns an `Option`al vector previous stored in this database. pub fn item_vector(&self, rtxn: &RoTxn, item: ItemId) -> Result>> { Ok(item_leaf(self.database, self.index, rtxn, item)?.map(|leaf| leaf.vector.into_owned())) @@ -377,6 +390,10 @@ impl Writer { log::debug!("started updating the tree nodes of {} trees...", tmp_nodes.len()); for (i, tmp_node) in nodes_to_write.iter().enumerate() { + if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) { + return Err(Error::BuildCancelled); + } + log::debug!( "started deleting the {} tree nodes of the {i}nth trees...", tmp_node.len() @@ -424,6 +441,9 @@ impl Writer { &Key::metadata(self.index), &metadata, ) { + Ok(_) if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) => { + Err(Error::BuildCancelled) + } Ok(_) => Ok(()), Err(e) => Err(e.into()), } @@ -442,6 +462,10 @@ impl Writer { repeatn(rng.next_u64(), metadata.roots.len()) .zip(roots) .map(|(seed, root)| { + if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) { + return Err(Error::BuildCancelled); + } + log::debug!("started updating tree {root:X}..."); let mut rng = R::seed_from_u64(seed.wrapping_add(root as u64)); let mut tmp_nodes: TmpNodes> = match self.tmpdir.as_ref() { @@ -647,6 +671,10 @@ impl Writer { None => concurrent_node_ids.used() < n_items, }) .map(|(i, seed)| { + if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) { + return Err(Error::BuildCancelled); + } + log::debug!("started generating tree {i:X}..."); let mut rng = R::seed_from_u64(seed.wrapping_add(i as u64)); let mut tmp_nodes = match self.tmpdir.as_ref() { @@ -766,6 +794,9 @@ impl Writer { log::debug!("Deleting {} trees", to_delete.len()); for tree in to_delete { + if self.cancelled.as_ref().map_or(false, |tbc| tbc.is_cancelled()) { + return Err(Error::BuildCancelled); + } self.delete_tree(wtxn, NodeId::tree(tree))?; } } @@ -806,6 +837,27 @@ impl Writer { } } +/// A canceller for the tree building process that can be used to stop the operation early. +#[derive(Debug, Clone)] +pub struct TreeBuildCanceller { + /// The flag that, when set to true, triggers the engine to stop early. + // TODO replace this with a Weak and keep an Arc in the Writer. + // or maybe even a ref that can be shared. + must_stop: Arc, +} + +impl TreeBuildCanceller { + /// Cancels the tree building process as early as possible. + pub fn cancel(&self) { + self.must_stop.store(true, Ordering::SeqCst); + } + + /// Wether the corresponding build process has been canceled. + fn is_cancelled(&self) -> bool { + self.must_stop.load(Ordering::SeqCst) + } +} + /// Represents the final version of the leafs and contains /// useful informations to synchronize the building threads. #[derive(Clone)]