Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Auto compact(re-cluster) for multiple table insertion statement #16443

Merged
merged 8 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,12 @@ pub trait TableContext: Send + Sync {
fn set_can_scan_from_agg_index(&self, enable: bool);
fn get_enable_sort_spill(&self) -> bool;
fn set_enable_sort_spill(&self, enable: bool);
fn set_compaction_num_block_hint(&self, hint: u64);
fn get_compaction_num_block_hint(&self) -> u64;
fn set_compaction_num_block_hint(&self, _table_name: &str, _hint: u64) {
unimplemented!()
}
fn get_compaction_num_block_hint(&self, _table_name: &str) -> u64 {
unimplemented!()
}
fn set_table_snapshot(&self, snapshot: Arc<TableSnapshot>);
fn get_table_snapshot(&self) -> Option<Arc<TableSnapshot>>;
fn set_lazy_mutation_delete(&self, lazy: bool);
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ async fn do_hook_compact(
pipeline.set_on_finished(move |info: &ExecutionInfo| {
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint = ctx.get_compaction_num_block_hint();
info!("hint number of blocks need to be compacted {}", compaction_num_block_hint);
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint);
if compaction_num_block_hint == 0 {
return Ok(());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::sync::Arc;

use databend_common_catalog::catalog::CATALOG_DEFAULT;
use databend_common_catalog::lock::LockTableOption;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::UInt64Type;
Expand All @@ -34,6 +36,7 @@ use databend_common_sql::executor::physical_plans::ChunkFillAndReorder;
use databend_common_sql::executor::physical_plans::ChunkMerge;
use databend_common_sql::executor::physical_plans::FillAndReorder;
use databend_common_sql::executor::physical_plans::MultiInsertEvalScalar;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::physical_plans::SerializableTable;
use databend_common_sql::executor::physical_plans::ShuffleStrategy;
use databend_common_sql::executor::PhysicalPlan;
Expand All @@ -46,6 +49,7 @@ use databend_common_sql::plans::Plan;
use databend_common_sql::MetadataRef;
use databend_common_sql::ScalarExpr;

use super::HookOperator;
use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
Expand Down Expand Up @@ -87,8 +91,27 @@ impl Interpreter for InsertMultiTableInterpreter {
#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
let physical_plan = self.build_physical_plan().await?;
let build_res =
let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
// Execute hook.
if self
.ctx
.get_settings()
.get_enable_compact_after_multi_table_insert()?
{
for (_, (db, tbl)) in &self.plan.target_tables {
let hook_operator = HookOperator::create(
self.ctx.clone(),
// multi table insert only support default catalog
CATALOG_DEFAULT.to_string(),
db.to_string(),
tbl.to_string(),
MutationKind::Insert,
LockTableOption::LockNoRetry,
);
hook_operator.execute(&mut build_res.main_pipeline).await;
}
}
Ok(build_res)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl PipelineBuilder {

self.ctx.set_partitions(plan.parts.clone())?;

// ReadDataKind to avoid OOM.
table.read_data(self.ctx.clone(), &plan, &mut self.main_pipeline, false)?;

let num_input_columns = schema.fields().len();
Expand Down
19 changes: 14 additions & 5 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,17 +592,26 @@ impl TableContext for QueryContext {
}

// get a hint at the number of blocks that need to be compacted.
fn get_compaction_num_block_hint(&self) -> u64 {
fn get_compaction_num_block_hint(&self, table_name: &str) -> u64 {
self.shared
.num_fragmented_block_hint
.load(Ordering::Acquire)
.lock()
.get(table_name)
.copied()
.unwrap_or_default()
}

// set a hint at the number of blocks that need to be compacted.
fn set_compaction_num_block_hint(&self, hint: u64) {
self.shared
fn set_compaction_num_block_hint(&self, table_name: &str, hint: u64) {
let old = self
.shared
.num_fragmented_block_hint
.store(hint, Ordering::Release);
.lock()
.insert(table_name.to_string(), hint);
info!(
"set_compaction_num_block_hint: table_name {} old hint {:?}, new hint {}",
table_name, old, hint
);
}

fn attach_query_str(&self, kind: QueryKind, query: String) {
Expand Down
5 changes: 2 additions & 3 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Weak;
Expand Down Expand Up @@ -117,7 +116,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) partitions_shas: Arc<RwLock<Vec<String>>>,
pub(in crate::sessions) cacheable: Arc<AtomicBool>,
pub(in crate::sessions) can_scan_from_agg_index: Arc<AtomicBool>,
pub(in crate::sessions) num_fragmented_block_hint: Arc<AtomicU64>,
pub(in crate::sessions) num_fragmented_block_hint: Arc<Mutex<HashMap<String, u64>>>,
pub(in crate::sessions) enable_sort_spill: Arc<AtomicBool>,
// Status info.
pub(in crate::sessions) status: Arc<RwLock<String>>,
Expand Down Expand Up @@ -175,7 +174,7 @@ impl QueryContextShared {
partitions_shas: Arc::new(RwLock::new(vec![])),
cacheable: Arc::new(AtomicBool::new(true)),
can_scan_from_agg_index: Arc::new(AtomicBool::new(true)),
num_fragmented_block_hint: Arc::new(AtomicU64::new(0)),
num_fragmented_block_hint: Default::default(),
enable_sort_spill: Arc::new(AtomicBool::new(true)),
status: Arc::new(RwLock::new("null".to_string())),
user_agent: Arc::new(RwLock::new("null".to_string())),
Expand Down
8 changes: 0 additions & 8 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,14 +872,6 @@ impl TableContext for CtxDelegation {
todo!()
}

fn set_compaction_num_block_hint(&self, _enable: u64) {
todo!()
}

fn get_compaction_num_block_hint(&self) -> u64 {
todo!()
}

fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> {
todo!()
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/tests/it/storages/fuse/conflict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ fn test_unresolvable_delete_conflict() {
None,
TxnManager::init(),
0,
"test",
);
assert!(result.is_err());
}
Expand Down Expand Up @@ -156,6 +157,7 @@ fn test_resolvable_delete_conflict() {
None,
TxnManager::init(),
0,
"test",
);
let snapshot = result.unwrap();
let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)];
Expand Down Expand Up @@ -263,6 +265,7 @@ fn test_resolvable_replace_conflict() {
None,
TxnManager::init(),
0,
"test",
);
let snapshot = result.unwrap();
let expected = vec![
Expand Down
8 changes: 0 additions & 8 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,14 +767,6 @@ impl TableContext for CtxDelegation {
todo!()
}

fn set_compaction_num_block_hint(&self, _enable: u64) {
todo!()
}

fn get_compaction_num_block_hint(&self) -> u64 {
todo!()
}

fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> {
todo!()
}
Expand Down
9 changes: 9 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,15 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
(
SkyFan2002 marked this conversation as resolved.
Show resolved Hide resolved
"enable_compact_after_multi_table_insert",
DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables recluster and compact after multi-table insert.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}
),
("auto_compaction_imperfect_blocks_threshold", DefaultSettingValue {
value: UserSettingValue::UInt64(25),
desc: "Threshold for triggering auto compaction. This occurs when the number of imperfect blocks in a snapshot exceeds this value after write operations.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ impl Settings {
Ok(self.try_get_u64("enable_compact_after_write")? != 0)
}

pub fn get_enable_compact_after_multi_table_insert(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_compact_after_multi_table_insert")? != 0)
}

pub fn get_auto_compaction_imperfect_blocks_threshold(&self) -> Result<u64> {
self.try_get_u64("auto_compaction_imperfect_blocks_threshold")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl SnapshotGenerator for AppendGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
table_name: &str,
) -> Result<TableSnapshot> {
let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?;
if is_column_type_modified(&schema, expected_schema) {
Expand Down Expand Up @@ -223,7 +224,7 @@ impl SnapshotGenerator for AppendGenerator {
) + 1;
info!("set compact_num_block_hint to {compact_num_block_hint }");
self.ctx
.set_compaction_num_block_hint(compact_num_block_hint);
.set_compaction_num_block_hint(table_name, compact_num_block_hint);
}

Ok(TableSnapshot::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl SnapshotGenerator for MutationGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
_table_name: &str,
) -> Result<TableSnapshot> {
let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0);
match &self.conflict_resolve_ctx {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,15 @@ pub trait SnapshotGenerator {
prev_table_seq: Option<u64>,
txn_mgr: TxnManagerRef,
table_id: u64,
table_name: &str,
) -> Result<TableSnapshot> {
let mut snapshot =
self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?;
let mut snapshot = self.do_generate_new_snapshot(
schema,
cluster_key_meta,
&previous,
prev_table_seq,
table_name,
)?;

let has_pending_transactional_mutations = {
let guard = txn_mgr.lock();
Expand All @@ -73,5 +79,6 @@ pub trait SnapshotGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
table_name: &str,
) -> Result<TableSnapshot>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl SnapshotGenerator for TruncateGenerator {
cluster_key_meta: Option<ClusterKey>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
_table_name: &str,
) -> Result<TableSnapshot> {
let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous {
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async fn build_update_table_meta_req(
Some(fuse_table.table_info.ident.seq),
txn_mgr,
table.get_id(),
table.name(),
)?;

// write snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ where F: SnapshotGenerator + Send + 'static
Some(table_info.ident.seq),
self.ctx.txn_mgr(),
table_info.ident.table_id,
table_info.name.as_str(),
) {
Ok(snapshot) => {
self.state = State::TryCommit {
Expand Down
Loading
Loading