From ffba39b43f6cc89f4fb8e943b629b161373759be Mon Sep 17 00:00:00 2001 From: coldWater Date: Wed, 16 Oct 2024 11:54:47 +0800 Subject: [PATCH] rename to spilling_file_format Signed-off-by: coldWater --- .../src/pipelines/builders/builder_sort.rs | 2 +- .../transforms/hash_join/hash_join_spiller.rs | 2 +- .../transforms/transform_sort_spill.rs | 2 +- .../transform_window_partition_collect.rs | 2 +- .../partition/window_partition_buffer.rs | 8 ++-- src/query/service/src/spillers/serialize.rs | 2 +- src/query/service/src/spillers/spiller.rs | 32 +++++++-------- .../service/tests/it/spillers/spiller.rs | 2 +- src/query/settings/src/settings_default.rs | 10 +++-- .../settings/src/settings_getter_setter.rs | 41 ++++++++++++++++++- 10 files changed, 68 insertions(+), 35 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index 5c79b234379e..1312f0ccd15e 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -289,7 +289,7 @@ impl SortPipelineBuilder { &self.ctx.get_id(), ), disk_spill: None, - use_parquet: settings.get_spilling_use_parquet()?, + use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; pipeline.add_transform(|input, output| { let op = DataOperator::instance().operator(); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index 09d5269592c9..23f0472489ed 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -71,7 +71,7 @@ impl HashJoinSpiller { spiller_type, location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_use_parquet()?, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; let operator = DataOperator::instance().operator(); let spiller = Spiller::create(ctx.clone(), operator, spill_config)?; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index 3d3c4b6f318b..a1aa3e9a9ecf 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -491,7 +491,7 @@ mod tests { spiller_type: SpillerType::OrderBy, location_prefix: "_spill_test".to_string(), disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_use_parquet()?, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; let spiller = Spiller::create(ctx.clone(), op, spill_config)?; diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index 88ad2cfbd767..746735aa9743 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -119,7 +119,7 @@ impl TransformWindowPartitionCollect { spiller_type: SpillerType::Window, location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), disk_spill, - use_parquet: settings.get_spilling_use_parquet()?, + use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; // Create an inner `Spiller` to spill data. diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs index c31010e561a3..6c4f9a2a8984 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs @@ -146,7 +146,7 @@ impl WindowPartitionBuffer { .spill_with_merged_partitions(partitions_to_spill) .await?; let index = self.spilled_merged_partitions.len(); - for (id, _, _) in &spilled.partitions { + for (id, _) in &spilled.partitions { self.spilled_small_partitions[*id].push(index); } self.spilled_merged_partitions.push((spilled, false, false)); @@ -176,12 +176,10 @@ impl WindowPartitionBuffer { partitions, } = merged_partitions; if out_of_memory_limit || *partial_restored { - if let Some(pos) = partitions.iter().position(|p| p.0 == partition_id) { - let data_range = &partitions[pos].1; - let columns_layout = &partitions[pos].2; + if let Some(pos) = partitions.iter().position(|(id, _)| *id == partition_id) { let data_block = self .spiller - .read_range(location, data_range.clone(), columns_layout) + .read_chunk(location, &partitions[pos].1) .await?; self.restored_partition_buffer .add_data_block(partition_id, data_block); diff --git a/src/query/service/src/spillers/serialize.rs b/src/query/service/src/spillers/serialize.rs index df88ac0e2dd4..ee88b2c23e7d 100644 --- a/src/query/service/src/spillers/serialize.rs +++ b/src/query/service/src/spillers/serialize.rs @@ -67,7 +67,7 @@ impl BlocksEncoder { // Currently we splice multiple complete parquet files into one, // so that the file contains duplicate headers/footers and metadata, // which can lead to file bloat. A better approach would be for the entire file to be ONE parquet, - // with each group of blocks (i.e., a unit of the upstream read range) corresponding to one or more row groupsx + // with each group of blocks (i.e. Chunk) corresponding to one or more row groupsx bare_blocks_to_parquet(blocks, &mut self.buf).unwrap(); Layout::Parquet } else { diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 19b28bd1914e..6246d86a2ffc 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -236,7 +236,7 @@ impl Spiller { .map(|x| x[0]..x[1]) .zip(columns_layout.into_iter()), ) - .map(|(id, (range, layout))| (id, range, layout)) + .map(|(id, (range, layout))| (id, Chunk { range, layout })) .collect(); // Spill data to storage. @@ -322,7 +322,7 @@ impl Spiller { let file_size = path.size(); debug_assert_eq!( file_size, - if let Some((_, range, _)) = partitions.last() { + if let Some((_, Chunk { range, .. })) = partitions.last() { range.end } else { 0 @@ -346,8 +346,8 @@ impl Spiller { // Deserialize partitioned data block. let partitioned_data = partitions .iter() - .map(|(partition_id, range, columns_layout)| { - let block = deserialize_block(columns_layout, data.slice(range.clone())); + .map(|(partition_id, Chunk { range, layout })| { + let block = deserialize_block(layout, data.slice(range.clone())); (*partition_id, block) }) .collect(); @@ -355,15 +355,11 @@ impl Spiller { Ok(partitioned_data) } - pub async fn read_range( - &self, - location: &Location, - data_range: Range, - columns_layout: &Layout, - ) -> Result { + pub async fn read_chunk(&self, location: &Location, chunk: &Chunk) -> Result { // Read spilled data from storage. let instant = Instant::now(); - let data_range = data_range.start as u64..data_range.end as u64; + let Chunk { range, layout } = chunk; + let data_range = range.start as u64..range.end as u64; let data = match location { Location::Local(path) => match &self.local_operator { @@ -383,7 +379,7 @@ impl Spiller { record_read_profile(location, &instant, data.len()); - Ok(deserialize_block(columns_layout, data)) + Ok(deserialize_block(layout, data)) } async fn write_encodes(&mut self, size: usize, buf: DmaWriteBuf) -> Result { @@ -438,14 +434,14 @@ impl Spiller { } } -pub enum SpilledData { - Partition(Location), - MergedPartition(MergedPartition), -} - pub struct MergedPartition { pub location: Location, - pub partitions: Vec<(usize, Range, Layout)>, + pub partitions: Vec<(usize, Chunk)>, +} + +pub struct Chunk { + pub range: Range, + pub layout: Layout, } #[derive(Debug, Clone, Hash, PartialEq, Eq)] diff --git a/src/query/service/tests/it/spillers/spiller.rs b/src/query/service/tests/it/spillers/spiller.rs index 958433bf3735..52319f6a667b 100644 --- a/src/query/service/tests/it/spillers/spiller.rs +++ b/src/query/service/tests/it/spillers/spiller.rs @@ -42,7 +42,7 @@ async fn test_spill_with_partition() -> Result<()> { spiller_type: SpillerType::HashJoinBuild, location_prefix: query_spill_prefix(tenant.tenant_name(), &ctx.get_id()), disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_use_parquet()?, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; let operator = DataOperator::instance().operator(); diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index f67275c27fa5..260f72e66e38 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -23,6 +23,8 @@ use databend_common_exception::Result; use databend_common_meta_app::principal::UserSettingValue; use once_cell::sync::OnceCell; +use super::settings_getter_setter::SpillFileFormat; + static DEFAULT_SETTINGS: OnceCell> = OnceCell::new(); // Default value of cost factor settings @@ -309,11 +311,11 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), - ("spilling_use_parquet", DefaultSettingValue { - value: UserSettingValue::UInt64(1), - desc: "Set whether to use Parquet or Arrow IPC for spilling.", + ("spilling_file_format", DefaultSettingValue { + value: UserSettingValue::String("parquet".to_string()), + desc: "Set the storage file format for spilling.", mode: SettingMode::Both, - range: Some(SettingRange::Numeric(0..=1)), + range: Some(SettingRange::String(SpillFileFormat::range())), }), ("spilling_to_disk_vacuum_unknown_temp_dirs_limit", DefaultSettingValue { value: UserSettingValue::UInt64(u64::MAX), diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0bf4e4489efb..8938fcd616dd 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; + use databend_common_ast::parser::Dialect; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -31,6 +33,41 @@ pub enum FlightCompression { Zstd, } +#[derive(Clone, Copy)] +pub enum SpillFileFormat { + Arrow, + Parquet, +} + +impl SpillFileFormat { + pub fn range() -> Vec { + ["arrow", "parquet"] + .iter() + .copied() + .map(String::from) + .collect() + } + + pub fn is_parquet(&self) -> bool { + matches!(self, SpillFileFormat::Parquet) + } +} + +impl FromStr for SpillFileFormat { + type Err = ErrorCode; + + fn from_str(s: &str) -> std::result::Result { + match s { + "arrow" => Ok(SpillFileFormat::Arrow), + "parquet" => Ok(Self::Parquet), + _ => Err(ErrorCode::InvalidConfig(format!( + "invalid SpillFileFormat: {:?}", + s + ))), + } + } +} + impl Settings { // Get u64 value, we don't get from the metasrv. fn try_get_u64(&self, key: &str) -> Result { @@ -290,8 +327,8 @@ impl Settings { Ok(self.try_get_u64("join_spilling_buffer_threshold_per_proc_mb")? as usize) } - pub fn get_spilling_use_parquet(&self) -> Result { - Ok(self.try_get_u64("spilling_use_parquet")? != 0) + pub fn get_spilling_file_format(&self) -> Result { + self.try_get_string("spilling_file_format")?.parse() } pub fn get_spilling_to_disk_vacuum_unknown_temp_dirs_limit(&self) -> Result {