diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs index 68892847513c..5f01dbd6d4c9 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs @@ -50,7 +50,7 @@ where Self: Sized + Clone fn from_column(col: &Column, desc: &[SortColumnDescription]) -> Result { Self::try_from_column(col, desc).ok_or_else(|| { ErrorCode::BadDataValueType(format!( - "Order column type mismatched. Expecetd {} but got {}", + "Order column type mismatched. Expected {} but got {}", Self::data_type(), col.data_type() )) diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index 4d89a58f3511..28e35c504a81 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -103,14 +103,12 @@ impl PipelineBuilder { .settings .get_enable_experimental_aggregate_hashtable()?; - let in_cluster = !self.ctx.get_cluster().is_empty(); - let params = Self::build_aggregator_params( aggregate.input.output_schema()?, &aggregate.group_by, &aggregate.agg_funcs, enable_experimental_aggregate_hashtable, - in_cluster, + self.is_exchange_neighbor, max_block_size as usize, None, )?; @@ -129,7 +127,7 @@ impl PipelineBuilder { let method = DataBlock::choose_hash_method(&sample_block, group_cols, efficiently_memory)?; // Need a global atomic to read the max current radix bits hint - let partial_agg_config = if self.ctx.get_cluster().is_empty() { + let partial_agg_config = if !self.is_exchange_neighbor { HashTableConfig::default().with_partial(true, max_threads as usize) } else { HashTableConfig::default() @@ -164,7 +162,7 @@ impl PipelineBuilder { })?; // If cluster mode, spill write will be completed in exchange serialize, because we need scatter the block data first - if self.ctx.get_cluster().is_empty() { + if !self.is_exchange_neighbor { let operator = DataOperator::instance().operator(); let location_prefix = query_spill_prefix(self.ctx.get_tenant().tenant_name(), &self.ctx.get_id()); @@ -216,13 +214,12 @@ impl PipelineBuilder { let enable_experimental_aggregate_hashtable = self .settings .get_enable_experimental_aggregate_hashtable()?; - let in_cluster = !self.ctx.get_cluster().is_empty(); let params = Self::build_aggregator_params( aggregate.before_group_by_schema.clone(), &aggregate.group_by, &aggregate.agg_funcs, enable_experimental_aggregate_hashtable, - in_cluster, + self.is_exchange_neighbor, max_block_size as usize, aggregate.limit, )?; @@ -288,7 +285,7 @@ impl PipelineBuilder { group_by: &[IndexType], agg_funcs: &[AggregateFunctionDesc], enable_experimental_aggregate_hashtable: bool, - in_cluster: bool, + cluster_aggregator: bool, max_block_size: usize, limit: Option, ) -> Result> { @@ -330,7 +327,7 @@ impl PipelineBuilder { &aggs, &agg_args, enable_experimental_aggregate_hashtable, - in_cluster, + cluster_aggregator, max_block_size, limit, )?; diff --git a/src/query/service/src/pipelines/builders/builder_exchange.rs b/src/query/service/src/pipelines/builders/builder_exchange.rs index a554c2dc6f1f..6c27b81ae366 100644 --- a/src/query/service/src/pipelines/builders/builder_exchange.rs +++ b/src/query/service/src/pipelines/builders/builder_exchange.rs @@ -40,6 +40,11 @@ impl PipelineBuilder { pub fn build_exchange_sink(&mut self, exchange_sink: &ExchangeSink) -> Result<()> { // ExchangeSink will be appended by `ExchangeManager::execute_pipeline` - self.build_pipeline(&exchange_sink.input) + let is_exchange_neighbor = self.is_exchange_neighbor; + + self.is_exchange_neighbor = true; + self.build_pipeline(&exchange_sink.input)?; + self.is_exchange_neighbor = is_exchange_neighbor; + Ok(()) } } diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index 0575acdaecc0..0e43eeaedab9 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -109,7 +109,11 @@ impl PipelineBuilder { .insert(build_cache_index, state.clone()); } self.expand_build_side_pipeline(&join.build, join, state.clone())?; - self.build_join_probe(join, state) + self.build_join_probe(join, state)?; + + // In the case of spilling, we need to share state among multiple threads. Quickly fetch all data from this round to quickly start the next round. + self.main_pipeline + .resize(self.main_pipeline.output_len(), true) } fn build_join_state( diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index be92a1eac35f..627838055a3c 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -57,6 +57,7 @@ pub struct PipelineBuilder { pub hash_join_states: HashMap>, pub r_cte_scan_interpreters: Vec, + pub(crate) is_exchange_neighbor: bool, } impl PipelineBuilder { @@ -78,6 +79,7 @@ impl PipelineBuilder { join_state: None, hash_join_states: HashMap::new(), r_cte_scan_interpreters: vec![], + is_exchange_neighbor: false, } } @@ -133,9 +135,25 @@ impl PipelineBuilder { } } + fn is_exchange_neighbor(&self, plan: &PhysicalPlan) -> bool { + let mut is_empty = true; + let mut all_exchange_source = true; + for children in plan.children() { + is_empty = false; + if !matches!(children, PhysicalPlan::ExchangeSource(_)) { + all_exchange_source = false; + } + } + + !is_empty && all_exchange_source + } + #[recursive::recursive] pub(crate) fn build_pipeline(&mut self, plan: &PhysicalPlan) -> Result<()> { let _guard = self.add_plan_scope(plan)?; + let is_exchange_neighbor = self.is_exchange_neighbor; + self.is_exchange_neighbor |= self.is_exchange_neighbor(plan); + match plan { PhysicalPlan::TableScan(scan) => self.build_table_scan(scan), PhysicalPlan::CteScan(scan) => self.build_cte_scan(scan), @@ -235,6 +253,9 @@ impl PipelineBuilder { PhysicalPlan::ColumnMutation(column_mutation) => { self.build_column_mutation(column_mutation) } - } + }?; + + self.is_exchange_neighbor = is_exchange_neighbor; + Ok(()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs index e30a1a5addb5..643c758f2472 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs @@ -42,7 +42,7 @@ pub struct AggregatorParams { pub offsets_aggregate_states: Vec, pub enable_experimental_aggregate_hashtable: bool, - pub in_cluster: bool, + pub cluster_aggregator: bool, pub max_block_size: usize, // Limit is push down to AggregatorTransform pub limit: Option, @@ -56,7 +56,7 @@ impl AggregatorParams { agg_funcs: &[AggregateFunctionRef], agg_args: &[Vec], enable_experimental_aggregate_hashtable: bool, - in_cluster: bool, + cluster_aggregator: bool, max_block_size: usize, limit: Option, ) -> Result> { @@ -76,7 +76,7 @@ impl AggregatorParams { layout: states_layout, offsets_aggregate_states: states_offsets, enable_experimental_aggregate_hashtable, - in_cluster, + cluster_aggregator, max_block_size, limit, })) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs index 216007252714..2f4c8cb29036 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs @@ -107,7 +107,7 @@ impl self.initialized_all_inputs = true; // in a cluster where partitions are only 8 and 128, // we need to pull all data where the partition equals 8 until the partition changes to 128 or there is no data available. - if self.params.in_cluster { + if self.params.cluster_aggregator { for index in 0..self.inputs.len() { if self.inputs[index].port.is_finished() { continue; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index bdede012ae3a..b0c5abf69aac 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -436,7 +436,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=u64::MAX)), }), ("aggregate_spilling_memory_ratio", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(60), desc: "Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.", mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=100)), diff --git a/tests/sqllogictests/suites/tpch/spill.test b/tests/sqllogictests/suites/tpch/spill.test index 7cd97ed0ee3e..a93abf9e6732 100644 --- a/tests/sqllogictests/suites/tpch/spill.test +++ b/tests/sqllogictests/suites/tpch/spill.test @@ -26,6 +26,12 @@ set window_partition_spilling_memory_ratio = 1; statement ok set window_partition_spilling_bytes_threshold_per_proc = 1; +statement ok +set aggregate_spilling_memory_ratio = 1; + +statement ok +set aggregate_spilling_bytes_threshold_per_proc = 1; + # TPC-H TEST include ./queries.test @@ -434,4 +440,10 @@ statement ok set window_partition_spilling_memory_ratio = 60; statement ok -set window_partition_spilling_bytes_threshold_per_proc = 0; \ No newline at end of file +set window_partition_spilling_bytes_threshold_per_proc = 0; + +statement ok +set aggregate_spilling_memory_ratio = 60; + +statement ok +set aggregate_spilling_bytes_threshold_per_proc = 0; \ No newline at end of file