diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index ceafa3cfa372b..69fa79077b922 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -204,10 +204,6 @@ pub trait TableContext: Send + Sync { this: S, } impl CheckAbort for Checker> { - fn is_aborting(&self) -> bool { - self.this.check_aborting().is_err() - } - fn try_check_aborting(&self) -> Result<()> { self.this.check_aborting().with_context(|| "query aborted") } @@ -387,6 +383,5 @@ pub trait TableContext: Send + Sync { pub type AbortChecker = Arc; pub trait CheckAbort { - fn is_aborting(&self) -> bool; - fn try_check_aborting(&self) -> databend_common_exception::Result<()>; + fn try_check_aborting(&self) -> Result<()>; } diff --git a/src/query/ee/src/storages/fuse/operations/handler.rs b/src/query/ee/src/storages/fuse/operations/handler.rs index b4e19a2977249..51574620ee98d 100644 --- a/src/query/ee/src/storages/fuse/operations/handler.rs +++ b/src/query/ee/src/storages/fuse/operations/handler.rs @@ -19,6 +19,7 @@ use chrono::DateTime; use chrono::Utc; use databend_common_base::base::GlobalInstance; use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::AbortChecker; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_storages_fuse::FuseTable; @@ -54,11 +55,12 @@ impl VacuumHandler for RealVacuumHandler { async fn do_vacuum_temporary_files( &self, + abort_checker: AbortChecker, temporary_dir: String, retain: Option, vacuum_limit: usize, ) -> Result { - do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit).await + do_vacuum_temporary_files(abort_checker, temporary_dir, retain, vacuum_limit).await } } diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs index 81ef70142bb1f..7678b1e4f181c 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs @@ -17,6 +17,7 @@ use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use databend_common_catalog::table_context::AbortChecker; use databend_common_exception::Result; use databend_common_storage::DataOperator; use futures_util::stream; @@ -31,6 +32,7 @@ const DEFAULT_RETAIN_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 3); #[async_backtrace::framed] pub async fn do_vacuum_temporary_files( + abort_checker: AbortChecker, temporary_dir: String, retain: Option, limit: usize, @@ -66,6 +68,7 @@ pub async fn do_vacuum_temporary_files( let mut batch_size = 0; while let Some(de) = ds.try_next().await? { + abort_checker.try_check_aborting()?; if de.path() == temporary_dir { continue; } @@ -81,6 +84,7 @@ pub async fn do_vacuum_temporary_files( }; vacuum_finished_query( + &abort_checker, start_time, &mut removed_temp_files, &mut total_cleaned_size, @@ -159,6 +163,7 @@ pub async fn do_vacuum_temporary_files( } async fn vacuum_finished_query( + abort_checker: &AbortChecker, total_instant: Instant, removed_temp_files: &mut usize, total_cleaned_size: &mut usize, @@ -184,6 +189,7 @@ async fn vacuum_finished_query( let mut remove_temp_files_path = Vec::with_capacity(1001); while let Some(de) = ds.try_next().await? { + abort_checker.try_check_aborting()?; if de.path() == parent.path() { continue; } diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index 5eefbb43a1cca..b3336281b7b50 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -17,6 +17,9 @@ use std::sync::Arc; use std::time::Duration; use databend_common_base::base::tokio; +use databend_common_catalog::table_context::AbortChecker; +use databend_common_catalog::table_context::CheckAbort; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; @@ -125,8 +128,41 @@ async fn test_do_vacuum_temporary_files() -> Result<()> { let size = operator.list_with("test_dir/").recursive(true).await?.len(); assert!((3..=4).contains(&size)); + struct NoAbort; + struct AbortRightNow; + impl CheckAbort for NoAbort { + fn try_check_aborting(&self) -> Result<()> { + Ok(()) + } + } + + impl CheckAbort for AbortRightNow { + fn try_check_aborting(&self) -> Result<()> { + Err(ErrorCode::AbortedQuery("")) + } + } + + // check abort + + let r = do_vacuum_temporary_files( + Arc::new(AbortRightNow), + "test_dir/".to_string(), + Some(Duration::from_secs(2)), + 1, + ) + .await; + + assert!(r.is_err_and(|e| e.code() == ErrorCode::ABORTED_QUERY)); + + let no_abort = Arc::new(NoAbort); tokio::time::sleep(Duration::from_secs(2)).await; - do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 1).await?; + do_vacuum_temporary_files( + no_abort.clone(), + "test_dir/".to_string(), + Some(Duration::from_secs(2)), + 1, + ) + .await?; let size = operator.list("test_dir/").await?.len(); assert!((2..=3).contains(&size)); @@ -137,12 +173,24 @@ async fn test_do_vacuum_temporary_files() -> Result<()> { .write("test_dir/test5/finished", vec![1, 2]) .await?; - do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 2).await?; + do_vacuum_temporary_files( + no_abort.clone(), + "test_dir/".to_string(), + Some(Duration::from_secs(2)), + 2, + ) + .await?; let size = operator.list("test_dir/").await?.len(); assert!((2..=3).contains(&size)); tokio::time::sleep(Duration::from_secs(3)).await; - do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(3)), 1000).await?; + do_vacuum_temporary_files( + no_abort.clone(), + "test_dir/".to_string(), + Some(Duration::from_secs(3)), + 1000, + ) + .await?; dbg!(operator.list_with("test_dir/").await?); diff --git a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs index aba77da7d7628..7f2640ea0bf2c 100644 --- a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs +++ b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs @@ -20,6 +20,7 @@ use chrono::DateTime; use chrono::Utc; use databend_common_base::base::GlobalInstance; use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::AbortChecker; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_storages_fuse::FuseTable; @@ -48,6 +49,7 @@ pub trait VacuumHandler: Sync + Send { async fn do_vacuum_temporary_files( &self, + abort_checker: AbortChecker, temporary_dir: String, retain: Option, vacuum_limit: usize, @@ -91,12 +93,13 @@ impl VacuumHandlerWrapper { #[async_backtrace::framed] pub async fn do_vacuum_temporary_files( &self, + abort_checker: AbortChecker, temporary_dir: String, retain: Option, vacuum_limit: usize, ) -> Result { self.handler - .do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit) + .do_vacuum_temporary_files(abort_checker, temporary_dir, retain, vacuum_limit) .await } } diff --git a/src/query/service/src/interpreters/hook/vacuum_hook.rs b/src/query/service/src/interpreters/hook/vacuum_hook.rs index 9a9e7c28107bd..31e55c3c5e7fc 100644 --- a/src/query/service/src/interpreters/hook/vacuum_hook.rs +++ b/src/query/service/src/interpreters/hook/vacuum_hook.rs @@ -44,9 +44,11 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { { let handler = get_vacuum_handler(); + let abort_checker = query_ctx.clone().get_abort_checker(); let _ = GlobalIORuntime::instance().block_on(async move { let removed_files = handler .do_vacuum_temporary_files( + abort_checker, spill_prefix.clone(), Some(Duration::from_secs(0)), vacuum_limit as usize, diff --git a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs index 322958dd614dd..56345773ce6c5 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs @@ -60,6 +60,7 @@ impl Interpreter for VacuumTemporaryFilesInterpreter { let temporary_files_prefix = query_spill_prefix(self.ctx.get_tenant().tenant_name(), ""); let removed_files = handler .do_vacuum_temporary_files( + self.ctx.clone().get_abort_checker(), temporary_files_prefix, self.plan.retain, self.plan.limit.map(|x| x as usize).unwrap_or(usize::MAX),