Skip to content

Commit

Permalink
chore: recluster disable sort spill (#15490)
Browse files Browse the repository at this point in the history
recluster disable sort spill
  • Loading branch information
zhyass authored May 14, 2024
1 parent 185a47b commit 5c408b8
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ pub trait TableContext: Send + Sync {
fn set_cacheable(&self, cacheable: bool);
fn get_can_scan_from_agg_index(&self) -> bool;
fn set_can_scan_from_agg_index(&self, enable: bool);
fn get_enable_sort_spill(&self) -> bool;
fn set_enable_sort_spill(&self, enable: bool);
fn set_compaction_num_block_hint(&self, hint: u64);
fn get_compaction_num_block_hint(&self) -> u64;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl PipelineBuilder {
})
.collect();

self.ctx.set_enable_sort_spill(false);
let sort_pipeline_builder =
SortPipelineBuilder::create(self.ctx.clone(), schema, Arc::new(sort_descs))
.with_partial_block_size(partial_block_size)
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/src/pipelines/builders/builder_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ impl SortPipelineBuilder {
}

fn get_memory_settings(&self, num_threads: usize) -> Result<(usize, usize)> {
let enable_sort_spill = self.ctx.get_enable_sort_spill();
if !enable_sort_spill {
return Ok((0, 0));
}

let settings = self.ctx.get_settings();
let memory_ratio = settings.get_sort_spilling_memory_ratio()?;
let bytes_limit_per_proc = settings.get_sort_spilling_bytes_threshold_per_proc()?;
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,16 @@ impl TableContext for QueryContext {
.store(enable, Ordering::Release);
}

fn get_enable_sort_spill(&self) -> bool {
self.shared.enable_sort_spill.load(Ordering::Acquire)
}

fn set_enable_sort_spill(&self, enable: bool) {
self.shared
.enable_sort_spill
.store(enable, Ordering::Release);
}

// get a hint at the number of blocks that need to be compacted.
fn get_compaction_num_block_hint(&self) -> u64 {
self.shared
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) cacheable: Arc<AtomicBool>,
pub(in crate::sessions) can_scan_from_agg_index: Arc<AtomicBool>,
pub(in crate::sessions) num_fragmented_block_hint: Arc<AtomicU64>,
pub(in crate::sessions) enable_sort_spill: Arc<AtomicBool>,
// Status info.
pub(in crate::sessions) status: Arc<RwLock<String>>,

Expand Down Expand Up @@ -166,6 +167,7 @@ impl QueryContextShared {
cacheable: Arc::new(AtomicBool::new(true)),
can_scan_from_agg_index: Arc::new(AtomicBool::new(true)),
num_fragmented_block_hint: Arc::new(AtomicU64::new(0)),
enable_sort_spill: Arc::new(AtomicBool::new(true)),
status: Arc::new(RwLock::new("null".to_string())),
user_agent: Arc::new(RwLock::new("null".to_string())),
materialized_cte_tables: Arc::new(Default::default()),
Expand Down
7 changes: 7 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,13 @@ impl TableContext for CtxDelegation {
todo!()
}

fn get_enable_sort_spill(&self) -> bool {
todo!()
}
fn set_enable_sort_spill(&self, _enable: bool) {
todo!()
}

fn attach_query_str(&self, _kind: QueryKind, _query: String) {}

fn get_query_str(&self) -> String {
Expand Down
7 changes: 7 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,13 @@ impl TableContext for CtxDelegation {
todo!()
}

fn get_enable_sort_spill(&self) -> bool {
todo!()
}
fn set_enable_sort_spill(&self, _enable: bool) {
todo!()
}

fn attach_query_str(&self, _kind: QueryKind, _query: String) {}

fn get_query_str(&self) -> String {
Expand Down
2 changes: 1 addition & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ impl DefaultSettings {
let max_memory_usage = Self::max_memory_usage()?;
// The sort merge consumes more than twice as much memory,
// so the block size is set relatively conservatively here.
let recluster_block_size = max_memory_usage * 35 / 100;
let recluster_block_size = max_memory_usage * 32 / 100;
Ok(recluster_block_size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl ReclusterMutator {

let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?;
let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize;
let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 40 / 100);
let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 35 / 100);

let max_blocks_num = std::cmp::max(
memory_threshold / self.block_thresholds.max_bytes_per_block,
Expand Down
2 changes: 1 addition & 1 deletion tests/suites/1_stateful/02_query/02_0006_set_priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)

with NativeClient(name="client1>") as client1:
# TODO: Enable this test after enable new queries executor
# TODO: Enable this test after enable new queries executor
client1.expect(prompt)
# client1.expect("")
#
Expand Down

0 comments on commit 5c408b8

Please sign in to comment.