Skip to content

Commit

Permalink
fix(query): fix aggregate panic in cluster mode (#16319)
Browse files Browse the repository at this point in the history
* fix(query): enable spill test

* fix(query): add exchange neighbor flag for pipeline builder

* fix(query): add exchange neighbor flag for pipeline builder

* fix(query): add exchange neighbor flag for pipeline builder

* fix(query): add exchange neighbor flag for pipeline builder

* fix(query): add exchange neighbor flag for pipeline builder

* fix(query): add exchange neighbor flag for pipeline builder

* fix(query): add exchange neighbor flag for pipeline builder

* fix(query): add exchange neighbor flag for pipeline builder

* fix(query): add exchange neighbor flag for pipeline builder

* fix(query): add exchange neighbor flag for pipeline builder
  • Loading branch information
zhang2014 authored Aug 31, 2024
1 parent 88fb9e3 commit fb38893
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where Self: Sized + Clone
fn from_column(col: &Column, desc: &[SortColumnDescription]) -> Result<Self> {
Self::try_from_column(col, desc).ok_or_else(|| {
ErrorCode::BadDataValueType(format!(
"Order column type mismatched. Expecetd {} but got {}",
"Order column type mismatched. Expected {} but got {}",
Self::data_type(),
col.data_type()
))
Expand Down
15 changes: 6 additions & 9 deletions src/query/service/src/pipelines/builders/builder_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,12 @@ impl PipelineBuilder {
.settings
.get_enable_experimental_aggregate_hashtable()?;

let in_cluster = !self.ctx.get_cluster().is_empty();

let params = Self::build_aggregator_params(
aggregate.input.output_schema()?,
&aggregate.group_by,
&aggregate.agg_funcs,
enable_experimental_aggregate_hashtable,
in_cluster,
self.is_exchange_neighbor,
max_block_size as usize,
None,
)?;
Expand All @@ -129,7 +127,7 @@ impl PipelineBuilder {
let method = DataBlock::choose_hash_method(&sample_block, group_cols, efficiently_memory)?;

// Need a global atomic to read the max current radix bits hint
let partial_agg_config = if self.ctx.get_cluster().is_empty() {
let partial_agg_config = if !self.is_exchange_neighbor {
HashTableConfig::default().with_partial(true, max_threads as usize)
} else {
HashTableConfig::default()
Expand Down Expand Up @@ -164,7 +162,7 @@ impl PipelineBuilder {
})?;

// If cluster mode, spill write will be completed in exchange serialize, because we need scatter the block data first
if self.ctx.get_cluster().is_empty() {
if !self.is_exchange_neighbor {
let operator = DataOperator::instance().operator();
let location_prefix =
query_spill_prefix(self.ctx.get_tenant().tenant_name(), &self.ctx.get_id());
Expand Down Expand Up @@ -216,13 +214,12 @@ impl PipelineBuilder {
let enable_experimental_aggregate_hashtable = self
.settings
.get_enable_experimental_aggregate_hashtable()?;
let in_cluster = !self.ctx.get_cluster().is_empty();
let params = Self::build_aggregator_params(
aggregate.before_group_by_schema.clone(),
&aggregate.group_by,
&aggregate.agg_funcs,
enable_experimental_aggregate_hashtable,
in_cluster,
self.is_exchange_neighbor,
max_block_size as usize,
aggregate.limit,
)?;
Expand Down Expand Up @@ -288,7 +285,7 @@ impl PipelineBuilder {
group_by: &[IndexType],
agg_funcs: &[AggregateFunctionDesc],
enable_experimental_aggregate_hashtable: bool,
in_cluster: bool,
cluster_aggregator: bool,
max_block_size: usize,
limit: Option<usize>,
) -> Result<Arc<AggregatorParams>> {
Expand Down Expand Up @@ -330,7 +327,7 @@ impl PipelineBuilder {
&aggs,
&agg_args,
enable_experimental_aggregate_hashtable,
in_cluster,
cluster_aggregator,
max_block_size,
limit,
)?;
Expand Down
7 changes: 6 additions & 1 deletion src/query/service/src/pipelines/builders/builder_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ impl PipelineBuilder {

pub fn build_exchange_sink(&mut self, exchange_sink: &ExchangeSink) -> Result<()> {
// ExchangeSink will be appended by `ExchangeManager::execute_pipeline`
self.build_pipeline(&exchange_sink.input)
let is_exchange_neighbor = self.is_exchange_neighbor;

self.is_exchange_neighbor = true;
self.build_pipeline(&exchange_sink.input)?;
self.is_exchange_neighbor = is_exchange_neighbor;
Ok(())
}
}
6 changes: 5 additions & 1 deletion src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ impl PipelineBuilder {
.insert(build_cache_index, state.clone());
}
self.expand_build_side_pipeline(&join.build, join, state.clone())?;
self.build_join_probe(join, state)
self.build_join_probe(join, state)?;

// In the case of spilling, we need to share state among multiple threads. Quickly fetch all data from this round to quickly start the next round.
self.main_pipeline
.resize(self.main_pipeline.output_len(), true)
}

fn build_join_state(
Expand Down
23 changes: 22 additions & 1 deletion src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct PipelineBuilder {
pub hash_join_states: HashMap<usize, Arc<HashJoinState>>,

pub r_cte_scan_interpreters: Vec<CreateTableInterpreter>,
pub(crate) is_exchange_neighbor: bool,
}

impl PipelineBuilder {
Expand All @@ -78,6 +79,7 @@ impl PipelineBuilder {
join_state: None,
hash_join_states: HashMap::new(),
r_cte_scan_interpreters: vec![],
is_exchange_neighbor: false,
}
}

Expand Down Expand Up @@ -133,9 +135,25 @@ impl PipelineBuilder {
}
}

fn is_exchange_neighbor(&self, plan: &PhysicalPlan) -> bool {
let mut is_empty = true;
let mut all_exchange_source = true;
for children in plan.children() {
is_empty = false;
if !matches!(children, PhysicalPlan::ExchangeSource(_)) {
all_exchange_source = false;
}
}

!is_empty && all_exchange_source
}

#[recursive::recursive]
pub(crate) fn build_pipeline(&mut self, plan: &PhysicalPlan) -> Result<()> {
let _guard = self.add_plan_scope(plan)?;
let is_exchange_neighbor = self.is_exchange_neighbor;
self.is_exchange_neighbor |= self.is_exchange_neighbor(plan);

match plan {
PhysicalPlan::TableScan(scan) => self.build_table_scan(scan),
PhysicalPlan::CteScan(scan) => self.build_cte_scan(scan),
Expand Down Expand Up @@ -235,6 +253,9 @@ impl PipelineBuilder {
PhysicalPlan::ColumnMutation(column_mutation) => {
self.build_column_mutation(column_mutation)
}
}
}?;

self.is_exchange_neighbor = is_exchange_neighbor;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct AggregatorParams {
pub offsets_aggregate_states: Vec<usize>,

pub enable_experimental_aggregate_hashtable: bool,
pub in_cluster: bool,
pub cluster_aggregator: bool,
pub max_block_size: usize,
// Limit is push down to AggregatorTransform
pub limit: Option<usize>,
Expand All @@ -56,7 +56,7 @@ impl AggregatorParams {
agg_funcs: &[AggregateFunctionRef],
agg_args: &[Vec<usize>],
enable_experimental_aggregate_hashtable: bool,
in_cluster: bool,
cluster_aggregator: bool,
max_block_size: usize,
limit: Option<usize>,
) -> Result<Arc<AggregatorParams>> {
Expand All @@ -76,7 +76,7 @@ impl AggregatorParams {
layout: states_layout,
offsets_aggregate_states: states_offsets,
enable_experimental_aggregate_hashtable,
in_cluster,
cluster_aggregator,
max_block_size,
limit,
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>
self.initialized_all_inputs = true;
// in a cluster where partitions are only 8 and 128,
// we need to pull all data where the partition equals 8 until the partition changes to 128 or there is no data available.
if self.params.in_cluster {
if self.params.cluster_aggregator {
for index in 0..self.inputs.len() {
if self.inputs[index].port.is_finished() {
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl DefaultSettings {
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("aggregate_spilling_memory_ratio", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
value: UserSettingValue::UInt64(60),
desc: "Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=100)),
Expand Down
14 changes: 13 additions & 1 deletion tests/sqllogictests/suites/tpch/spill.test
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ set window_partition_spilling_memory_ratio = 1;
statement ok
set window_partition_spilling_bytes_threshold_per_proc = 1;

statement ok
set aggregate_spilling_memory_ratio = 1;

statement ok
set aggregate_spilling_bytes_threshold_per_proc = 1;

# TPC-H TEST
include ./queries.test

Expand Down Expand Up @@ -434,4 +440,10 @@ statement ok
set window_partition_spilling_memory_ratio = 60;

statement ok
set window_partition_spilling_bytes_threshold_per_proc = 0;
set window_partition_spilling_bytes_threshold_per_proc = 0;

statement ok
set aggregate_spilling_memory_ratio = 60;

statement ok
set aggregate_spilling_bytes_threshold_per_proc = 0;

0 comments on commit fb38893

Please sign in to comment.