diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 1a9f41ad786e..8f70cb988e85 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -157,6 +157,8 @@ pub trait TableContext: Send + Sync { fn set_cacheable(&self, cacheable: bool); fn get_can_scan_from_agg_index(&self) -> bool; fn set_can_scan_from_agg_index(&self, enable: bool); + fn get_enable_sort_spill(&self) -> bool; + fn set_enable_sort_spill(&self, enable: bool); fn set_compaction_num_block_hint(&self, hint: u64); fn get_compaction_num_block_hint(&self) -> u64; diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index e19b557b3bbb..44a156fd1856 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -170,6 +170,7 @@ impl PipelineBuilder { }) .collect(); + self.ctx.set_enable_sort_spill(false); let sort_pipeline_builder = SortPipelineBuilder::create(self.ctx.clone(), schema, Arc::new(sort_descs)) .with_partial_block_size(partial_block_size) diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index 340df58bb4a4..c3f7c5e265cb 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -210,6 +210,11 @@ impl SortPipelineBuilder { } fn get_memory_settings(&self, num_threads: usize) -> Result<(usize, usize)> { + let enable_sort_spill = self.ctx.get_enable_sort_spill(); + if !enable_sort_spill { + return Ok((0, 0)); + } + let settings = self.ctx.get_settings(); let memory_ratio = settings.get_sort_spilling_memory_ratio()?; let bytes_limit_per_proc = settings.get_sort_spilling_bytes_threshold_per_proc()?; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 53e0f980e15a..3d4585383c35 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -490,6 +490,16 @@ impl TableContext for QueryContext { .store(enable, Ordering::Release); } + fn get_enable_sort_spill(&self) -> bool { + self.shared.enable_sort_spill.load(Ordering::Acquire) + } + + fn set_enable_sort_spill(&self, enable: bool) { + self.shared + .enable_sort_spill + .store(enable, Ordering::Release); + } + // get a hint at the number of blocks that need to be compacted. fn get_compaction_num_block_hint(&self) -> u64 { self.shared diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 24d83012015e..2baf012016c7 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -111,6 +111,7 @@ pub struct QueryContextShared { pub(in crate::sessions) cacheable: Arc, pub(in crate::sessions) can_scan_from_agg_index: Arc, pub(in crate::sessions) num_fragmented_block_hint: Arc, + pub(in crate::sessions) enable_sort_spill: Arc, // Status info. pub(in crate::sessions) status: Arc>, @@ -166,6 +167,7 @@ impl QueryContextShared { cacheable: Arc::new(AtomicBool::new(true)), can_scan_from_agg_index: Arc::new(AtomicBool::new(true)), num_fragmented_block_hint: Arc::new(AtomicU64::new(0)), + enable_sort_spill: Arc::new(AtomicBool::new(true)), status: Arc::new(RwLock::new("null".to_string())), user_agent: Arc::new(RwLock::new("null".to_string())), materialized_cte_tables: Arc::new(Default::default()), diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index 0038d8f2274f..d8f8b62b89c9 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -548,6 +548,13 @@ impl TableContext for CtxDelegation { todo!() } + fn get_enable_sort_spill(&self) -> bool { + todo!() + } + fn set_enable_sort_spill(&self, _enable: bool) { + todo!() + } + fn attach_query_str(&self, _kind: QueryKind, _query: String) {} fn get_query_str(&self) -> String { diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index dc83779d479e..ff5ee85e4a08 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -489,6 +489,13 @@ impl TableContext for CtxDelegation { todo!() } + fn get_enable_sort_spill(&self) -> bool { + todo!() + } + fn set_enable_sort_spill(&self, _enable: bool) { + todo!() + } + fn attach_query_str(&self, _kind: QueryKind, _query: String) {} fn get_query_str(&self) -> String { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index da0ccaac894d..704d9df0289c 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -808,7 +808,7 @@ impl DefaultSettings { let max_memory_usage = Self::max_memory_usage()?; // The sort merge consumes more than twice as much memory, // so the block size is set relatively conservatively here. - let recluster_block_size = max_memory_usage * 35 / 100; + let recluster_block_size = max_memory_usage * 32 / 100; Ok(recluster_block_size) } diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index 721e4136506f..ff7a53675676 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -108,7 +108,7 @@ impl ReclusterMutator { let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?; let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize; - let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 40 / 100); + let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 35 / 100); let max_blocks_num = std::cmp::max( memory_threshold / self.block_thresholds.max_bytes_per_block, diff --git a/tests/suites/1_stateful/02_query/02_0006_set_priority.py b/tests/suites/1_stateful/02_query/02_0006_set_priority.py index 8c65d1157585..1fb341bbc64d 100755 --- a/tests/suites/1_stateful/02_query/02_0006_set_priority.py +++ b/tests/suites/1_stateful/02_query/02_0006_set_priority.py @@ -19,7 +19,7 @@ ) with NativeClient(name="client1>") as client1: -# TODO: Enable this test after enable new queries executor + # TODO: Enable this test after enable new queries executor client1.expect(prompt) # client1.expect("") #