Skip to content

Commit

Permalink
make VACUUM TEMPORARY FILES killable
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Nov 2, 2024
1 parent 5bede31 commit 9e66ac6
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 11 deletions.
7 changes: 1 addition & 6 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ pub trait TableContext: Send + Sync {
this: S,
}
impl<S: TableContext + ?Sized> CheckAbort for Checker<Arc<S>> {
fn is_aborting(&self) -> bool {
self.this.check_aborting().is_err()
}

fn try_check_aborting(&self) -> Result<()> {
self.this.check_aborting().with_context(|| "query aborted")
}
Expand Down Expand Up @@ -387,6 +383,5 @@ pub trait TableContext: Send + Sync {
pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

pub trait CheckAbort {
fn is_aborting(&self) -> bool;
fn try_check_aborting(&self) -> databend_common_exception::Result<()>;
fn try_check_aborting(&self) -> Result<()>;
}
4 changes: 3 additions & 1 deletion src/query/ee/src/storages/fuse/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use chrono::DateTime;
use chrono::Utc;
use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_storages_fuse::FuseTable;
Expand Down Expand Up @@ -54,11 +55,12 @@ impl VacuumHandler for RealVacuumHandler {

async fn do_vacuum_temporary_files(
&self,
abort_checker: AbortChecker,
temporary_dir: String,
retain: Option<Duration>,
vacuum_limit: usize,
) -> Result<usize> {
do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit).await
do_vacuum_temporary_files(abort_checker, temporary_dir, retain, vacuum_limit).await
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use databend_common_catalog::table_context::AbortChecker;
use databend_common_exception::Result;
use databend_common_storage::DataOperator;
use futures_util::stream;
Expand All @@ -31,6 +32,7 @@ const DEFAULT_RETAIN_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 3);

#[async_backtrace::framed]
pub async fn do_vacuum_temporary_files(
abort_checker: AbortChecker,
temporary_dir: String,
retain: Option<Duration>,
limit: usize,
Expand Down Expand Up @@ -66,6 +68,7 @@ pub async fn do_vacuum_temporary_files(
let mut batch_size = 0;

while let Some(de) = ds.try_next().await? {
abort_checker.try_check_aborting()?;
if de.path() == temporary_dir {
continue;
}
Expand All @@ -81,6 +84,7 @@ pub async fn do_vacuum_temporary_files(
};

vacuum_finished_query(
&abort_checker,
start_time,
&mut removed_temp_files,
&mut total_cleaned_size,
Expand Down Expand Up @@ -159,6 +163,7 @@ pub async fn do_vacuum_temporary_files(
}

async fn vacuum_finished_query(
abort_checker: &AbortChecker,
total_instant: Instant,
removed_temp_files: &mut usize,
total_cleaned_size: &mut usize,
Expand All @@ -184,6 +189,7 @@ async fn vacuum_finished_query(
let mut remove_temp_files_path = Vec::with_capacity(1001);

while let Some(de) = ds.try_next().await? {
abort_checker.try_check_aborting()?;
if de.path() == parent.path() {
continue;
}
Expand Down
54 changes: 51 additions & 3 deletions src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use std::sync::Arc;
use std::time::Duration;

use databend_common_base::base::tokio;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::CheckAbort;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
Expand Down Expand Up @@ -125,8 +128,41 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
let size = operator.list_with("test_dir/").recursive(true).await?.len();
assert!((3..=4).contains(&size));

struct NoAbort;
struct AbortRightNow;
impl CheckAbort for NoAbort {
fn try_check_aborting(&self) -> Result<()> {
Ok(())
}
}

impl CheckAbort for AbortRightNow {
fn try_check_aborting(&self) -> Result<()> {
Err(ErrorCode::AbortedQuery(""))
}
}

// check abort

let r = do_vacuum_temporary_files(
Arc::new(AbortRightNow),
"test_dir/".to_string(),
Some(Duration::from_secs(2)),
1,
)
.await;

assert!(r.is_err_and(|e| e.code() == ErrorCode::ABORTED_QUERY));

let no_abort = Arc::new(NoAbort);
tokio::time::sleep(Duration::from_secs(2)).await;
do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 1).await?;
do_vacuum_temporary_files(
no_abort.clone(),
"test_dir/".to_string(),
Some(Duration::from_secs(2)),
1,
)
.await?;

let size = operator.list("test_dir/").await?.len();
assert!((2..=3).contains(&size));
Expand All @@ -137,12 +173,24 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
.write("test_dir/test5/finished", vec![1, 2])
.await?;

do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 2).await?;
do_vacuum_temporary_files(
no_abort.clone(),
"test_dir/".to_string(),
Some(Duration::from_secs(2)),
2,
)
.await?;
let size = operator.list("test_dir/").await?.len();
assert!((2..=3).contains(&size));

tokio::time::sleep(Duration::from_secs(3)).await;
do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(3)), 1000).await?;
do_vacuum_temporary_files(
no_abort.clone(),
"test_dir/".to_string(),
Some(Duration::from_secs(3)),
1000,
)
.await?;

dbg!(operator.list_with("test_dir/").await?);

Expand Down
5 changes: 4 additions & 1 deletion src/query/ee_features/vacuum_handler/src/vacuum_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use chrono::DateTime;
use chrono::Utc;
use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_storages_fuse::FuseTable;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub trait VacuumHandler: Sync + Send {

async fn do_vacuum_temporary_files(
&self,
abort_checker: AbortChecker,
temporary_dir: String,
retain: Option<Duration>,
vacuum_limit: usize,
Expand Down Expand Up @@ -91,12 +93,13 @@ impl VacuumHandlerWrapper {
#[async_backtrace::framed]
pub async fn do_vacuum_temporary_files(
&self,
abort_checker: AbortChecker,
temporary_dir: String,
retain: Option<Duration>,
vacuum_limit: usize,
) -> Result<usize> {
self.handler
.do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit)
.do_vacuum_temporary_files(abort_checker, temporary_dir, retain, vacuum_limit)
.await
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/hook/vacuum_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
{
let handler = get_vacuum_handler();

let abort_checker = query_ctx.clone().get_abort_checker();
let _ = GlobalIORuntime::instance().block_on(async move {
let removed_files = handler
.do_vacuum_temporary_files(
abort_checker,
spill_prefix.clone(),
Some(Duration::from_secs(0)),
vacuum_limit as usize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Interpreter for VacuumTemporaryFilesInterpreter {
let temporary_files_prefix = query_spill_prefix(self.ctx.get_tenant().tenant_name(), "");
let removed_files = handler
.do_vacuum_temporary_files(
self.ctx.clone().get_abort_checker(),
temporary_files_prefix,
self.plan.retain,
self.plan.limit.map(|x| x as usize).unwrap_or(usize::MAX),
Expand Down

0 comments on commit 9e66ac6

Please sign in to comment.