Skip to content

Commit

Permalink
refactor: re-enable compact before clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Sep 11, 2024
1 parent 16c7d99 commit 8d87afd
Showing 1 changed file with 49 additions and 28 deletions.
77 changes: 49 additions & 28 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_base::runtime::GlobalIORuntime;
use databend_common_catalog::lock::LockTableOption;
use databend_common_catalog::table::CompactionLimits;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
Expand All @@ -36,6 +37,7 @@ use crate::interpreters::OptimizeCompactBlockInterpreter;
use crate::interpreters::ReclusterTableInterpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;

pub struct CompactTargetTableDescription {
Expand Down Expand Up @@ -134,23 +136,32 @@ async fn compact_table(
compaction_limits: CompactionLimits,
lock_opt: LockTableOption,
) -> Result<()> {
evict_target_table_from_ctx_cache(&ctx, &compact_target)?;

let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {
catalog: compact_target.catalog.clone(),
database: compact_target.database.clone(),
table: compact_target.table.clone(),
limit: compaction_limits.clone(),
});
let s_expr = SExpr::create_leaf(Arc::new(compact_block));
let compact_interpreter =
OptimizeCompactBlockInterpreter::try_create(ctx.clone(), s_expr, lock_opt.clone(), false)?;
let build_res = compact_interpreter.execute2().await?;
execute_complete_pipeline(&ctx, build_res).await?;

evict_target_table_from_ctx_cache(&ctx, &compact_target)?;

let table = ctx
.get_table(
&compact_target.catalog,
&compact_target.database,
&compact_target.table,
)
.await?;
let do_recluster = !table.cluster_keys(ctx.clone()).is_empty();

// evict the table from cache
ctx.evict_table_from_cache(
&compact_target.catalog,
&compact_target.database,
&compact_target.table,
)?;

let mut build_res = if do_recluster {
let need_recluster = !table.cluster_keys(ctx.clone()).is_empty();
if need_recluster {
let recluster = RelOperator::Recluster(Recluster {
catalog: compact_target.catalog,
database: compact_target.database,
Expand All @@ -161,30 +172,23 @@ async fn compact_table(
let s_expr = SExpr::create_leaf(Arc::new(recluster));
let recluster_interpreter =
ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
recluster_interpreter.execute2().await?
} else {
let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {
catalog: compact_target.catalog,
database: compact_target.database,
table: compact_target.table,
limit: compaction_limits,
});
let s_expr = SExpr::create_leaf(Arc::new(compact_block));
let compact_interpreter =
OptimizeCompactBlockInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
compact_interpreter.execute2().await?
};
let build_res = recluster_interpreter.execute2().await?;
execute_complete_pipeline(&ctx, build_res).await?;
}

Ok(())
}

async fn execute_complete_pipeline(
ctx: &Arc<QueryContext>,
build_res: PipelineBuildResult,
) -> Result<()> {
if build_res.main_pipeline.is_empty() {
return Ok(());
}

// execute the compact pipeline (for table with cluster keys, re-cluster will also be executed)
let settings = ctx.get_settings();
build_res.set_max_threads(settings.get_max_threads()? as usize);
let settings = ExecutorSettings::try_create(ctx.clone())?;

if build_res.main_pipeline.is_complete_pipeline()? {
let settings = ExecutorSettings::try_create(ctx.clone())?;
let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

Expand All @@ -200,6 +204,23 @@ async fn compact_table(

// reset the progress value
ctx.get_write_progress().set(&progress_value);
Ok(())
} else {
Err(ErrorCode::Internal(format!(
"expecting complete_pipeline, but got {:?}",
build_res.main_pipeline
)))
}
Ok(())
}

// evict the table from ctx cache
fn evict_target_table_from_ctx_cache(
ctx: &QueryContext,
compact_target: &CompactTargetTableDescription,
) -> Result<()> {
ctx.evict_table_from_cache(
&compact_target.catalog,
&compact_target.database,
&compact_target.table,
)
}

0 comments on commit 8d87afd

Please sign in to comment.