Skip to content

Commit

Permalink
feat: Auto compact(re-cluster) for multiple table insertion statement (
Browse files Browse the repository at this point in the history
…#16443)

* rm unused comment

* feat: Auto compact(re-cluster) for multiple table insertion statement

* add setting

* add logic test

* fix hint

* modify test

* rm log

* disable by default
  • Loading branch information
SkyFan2002 authored Sep 14, 2024
1 parent b4e0a2b commit 1ee6d66
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 33 deletions.
8 changes: 6 additions & 2 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,12 @@ pub trait TableContext: Send + Sync {
fn set_can_scan_from_agg_index(&self, enable: bool);
fn get_enable_sort_spill(&self) -> bool;
fn set_enable_sort_spill(&self, enable: bool);
fn set_compaction_num_block_hint(&self, hint: u64);
fn get_compaction_num_block_hint(&self) -> u64;
fn set_compaction_num_block_hint(&self, _table_name: &str, _hint: u64) {
unimplemented!()
}
fn get_compaction_num_block_hint(&self, _table_name: &str) -> u64 {
unimplemented!()
}
fn set_table_snapshot(&self, snapshot: Arc<TableSnapshot>);
fn get_table_snapshot(&self) -> Option<Arc<TableSnapshot>>;
fn set_lazy_mutation_delete(&self, lazy: bool);
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ async fn do_hook_compact(
pipeline.set_on_finished(move |info: &ExecutionInfo| {
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint = ctx.get_compaction_num_block_hint();
info!("hint number of blocks need to be compacted {}", compaction_num_block_hint);
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint);
if compaction_num_block_hint == 0 {
return Ok(());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::sync::Arc;

use databend_common_catalog::catalog::CATALOG_DEFAULT;
use databend_common_catalog::lock::LockTableOption;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::UInt64Type;
Expand All @@ -34,6 +36,7 @@ use databend_common_sql::executor::physical_plans::ChunkFillAndReorder;
use databend_common_sql::executor::physical_plans::ChunkMerge;
use databend_common_sql::executor::physical_plans::FillAndReorder;
use databend_common_sql::executor::physical_plans::MultiInsertEvalScalar;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::physical_plans::SerializableTable;
use databend_common_sql::executor::physical_plans::ShuffleStrategy;
use databend_common_sql::executor::PhysicalPlan;
Expand All @@ -46,6 +49,7 @@ use databend_common_sql::plans::Plan;
use databend_common_sql::MetadataRef;
use databend_common_sql::ScalarExpr;

use super::HookOperator;
use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
Expand Down Expand Up @@ -87,8 +91,27 @@ impl Interpreter for InsertMultiTableInterpreter {
#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
let physical_plan = self.build_physical_plan().await?;
let build_res =
let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
// Execute hook.
if self
.ctx
.get_settings()
.get_enable_compact_after_multi_table_insert()?
{
for (_, (db, tbl)) in &self.plan.target_tables {
let hook_operator = HookOperator::create(
self.ctx.clone(),
// multi table insert only support default catalog
CATALOG_DEFAULT.to_string(),
db.to_string(),
tbl.to_string(),
MutationKind::Insert,
LockTableOption::LockNoRetry,
);
hook_operator.execute(&mut build_res.main_pipeline).await;
}
}
Ok(build_res)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl PipelineBuilder {

self.ctx.set_partitions(plan.parts.clone())?;

// ReadDataKind to avoid OOM.
table.read_data(self.ctx.clone(), &plan, &mut self.main_pipeline, false)?;

let num_input_columns = schema.fields().len();
Expand Down
19 changes: 14 additions & 5 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,17 +592,26 @@ impl TableContext for QueryContext {
}

// get a hint at the number of blocks that need to be compacted.
fn get_compaction_num_block_hint(&self) -> u64 {
fn get_compaction_num_block_hint(&self, table_name: &str) -> u64 {
self.shared
.num_fragmented_block_hint
.load(Ordering::Acquire)
.lock()
.get(table_name)
.copied()
.unwrap_or_default()
}

// set a hint at the number of blocks that need to be compacted.
fn set_compaction_num_block_hint(&self, hint: u64) {
self.shared
fn set_compaction_num_block_hint(&self, table_name: &str, hint: u64) {
let old = self
.shared
.num_fragmented_block_hint
.store(hint, Ordering::Release);
.lock()
.insert(table_name.to_string(), hint);
info!(
"set_compaction_num_block_hint: table_name {} old hint {:?}, new hint {}",
table_name, old, hint
);
}

fn attach_query_str(&self, kind: QueryKind, query: String) {
Expand Down
5 changes: 2 additions & 3 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Weak;
Expand Down Expand Up @@ -117,7 +116,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) partitions_shas: Arc<RwLock<Vec<String>>>,
pub(in crate::sessions) cacheable: Arc<AtomicBool>,
pub(in crate::sessions) can_scan_from_agg_index: Arc<AtomicBool>,
pub(in crate::sessions) num_fragmented_block_hint: Arc<AtomicU64>,
pub(in crate::sessions) num_fragmented_block_hint: Arc<Mutex<HashMap<String, u64>>>,
pub(in crate::sessions) enable_sort_spill: Arc<AtomicBool>,
// Status info.
pub(in crate::sessions) status: Arc<RwLock<String>>,
Expand Down Expand Up @@ -175,7 +174,7 @@ impl QueryContextShared {
partitions_shas: Arc::new(RwLock::new(vec![])),
cacheable: Arc::new(AtomicBool::new(true)),
can_scan_from_agg_index: Arc::new(AtomicBool::new(true)),
num_fragmented_block_hint: Arc::new(AtomicU64::new(0)),
num_fragmented_block_hint: Default::default(),
enable_sort_spill: Arc::new(AtomicBool::new(true)),
status: Arc::new(RwLock::new("null".to_string())),
user_agent: Arc::new(RwLock::new("null".to_string())),
Expand Down
8 changes: 0 additions & 8 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,14 +871,6 @@ impl TableContext for CtxDelegation {
todo!()
}

fn set_compaction_num_block_hint(&self, _enable: u64) {
todo!()
}

fn get_compaction_num_block_hint(&self) -> u64 {
todo!()
}

fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> {
todo!()
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/tests/it/storages/fuse/conflict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ fn test_unresolvable_delete_conflict() {
None,
TxnManager::init(),
0,
"test",
);
assert!(result.is_err());
}
Expand Down Expand Up @@ -156,6 +157,7 @@ fn test_resolvable_delete_conflict() {
None,
TxnManager::init(),
0,
"test",
);
let snapshot = result.unwrap();
let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)];
Expand Down Expand Up @@ -263,6 +265,7 @@ fn test_resolvable_replace_conflict() {
None,
TxnManager::init(),
0,
"test",
);
let snapshot = result.unwrap();
let expected = vec![
Expand Down
8 changes: 0 additions & 8 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,14 +766,6 @@ impl TableContext for CtxDelegation {
todo!()
}

fn set_compaction_num_block_hint(&self, _enable: u64) {
todo!()
}

fn get_compaction_num_block_hint(&self) -> u64 {
todo!()
}

fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> {
todo!()
}
Expand Down
9 changes: 9 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,15 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
(
"enable_compact_after_multi_table_insert",
DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables recluster and compact after multi-table insert.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}
),
("auto_compaction_imperfect_blocks_threshold", DefaultSettingValue {
value: UserSettingValue::UInt64(25),
desc: "Threshold for triggering auto compaction. This occurs when the number of imperfect blocks in a snapshot exceeds this value after write operations.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ impl Settings {
Ok(self.try_get_u64("enable_compact_after_write")? != 0)
}

pub fn get_enable_compact_after_multi_table_insert(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_compact_after_multi_table_insert")? != 0)
}

pub fn get_auto_compaction_imperfect_blocks_threshold(&self) -> Result<u64> {
self.try_get_u64("auto_compaction_imperfect_blocks_threshold")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl SnapshotGenerator for AppendGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
table_name: &str,
) -> Result<TableSnapshot> {
let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?;
if is_column_type_modified(&schema, expected_schema) {
Expand Down Expand Up @@ -223,7 +224,7 @@ impl SnapshotGenerator for AppendGenerator {
) + 1;
info!("set compact_num_block_hint to {compact_num_block_hint }");
self.ctx
.set_compaction_num_block_hint(compact_num_block_hint);
.set_compaction_num_block_hint(table_name, compact_num_block_hint);
}

Ok(TableSnapshot::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl SnapshotGenerator for MutationGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
_table_name: &str,
) -> Result<TableSnapshot> {
let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0);
match &self.conflict_resolve_ctx {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,15 @@ pub trait SnapshotGenerator {
prev_table_seq: Option<u64>,
txn_mgr: TxnManagerRef,
table_id: u64,
table_name: &str,
) -> Result<TableSnapshot> {
let mut snapshot =
self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?;
let mut snapshot = self.do_generate_new_snapshot(
schema,
cluster_key_meta,
&previous,
prev_table_seq,
table_name,
)?;

let has_pending_transactional_mutations = {
let guard = txn_mgr.lock();
Expand All @@ -73,5 +79,6 @@ pub trait SnapshotGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
table_name: &str,
) -> Result<TableSnapshot>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl SnapshotGenerator for TruncateGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
_table_name: &str,
) -> Result<TableSnapshot> {
let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous {
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async fn build_update_table_meta_req(
Some(fuse_table.table_info.ident.seq),
txn_mgr,
table.get_id(),
table.name(),
)?;

// write snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ where F: SnapshotGenerator + Send + 'static
Some(table_info.ident.seq),
self.ctx.txn_mgr(),
table_info.ident.table_id,
table_info.name.as_str(),
) {
Ok(snapshot) => {
self.state = State::TryCommit {
Expand Down
Loading

0 comments on commit 1ee6d66

Please sign in to comment.