diff --git a/Cargo.lock b/Cargo.lock index bfe4ce1cca8c..0eb783a04eee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1571,7 +1571,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.12.1", "lazy_static", "lazycell", "proc-macro2", @@ -1933,12 +1933,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdd7a427adc0135366d99db65b36dae9237130997e560ed61118041fb72be6e8" -[[package]] -name = "cache-padded" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "981520c98f422fcc584dc1a95c334e6953900b9106bc47a9839b81790009eb21" - [[package]] name = "camino" version = "1.1.7" @@ -3112,7 +3106,7 @@ dependencies = [ "databend-storages-common-table-meta", "limits-rs", "log", - "opendal 0.49.0", + "opendal 0.49.2", "serde", "serde_json", "serfig", @@ -3183,7 +3177,7 @@ dependencies = [ "lz4", "num", "num-traits", - "opendal 0.49.0", + "opendal 0.49.2", "ordered-float 4.2.2", "proptest", "quanta 0.11.1", @@ -3209,7 +3203,7 @@ dependencies = [ "enum-as-inner 0.5.1", "ethnum", "fast-float", - "fastrace 0.7.2", + "fastrace", "goldenfile", "indent", "itertools 0.10.5", @@ -3258,7 +3252,7 @@ dependencies = [ "databend-common-building", "databend-common-exception", "enquote", - "fastrace 0.7.2", + "fastrace", "futures", "libc", "log", @@ -3430,7 +3424,7 @@ dependencies = [ "geos", "geozero 0.13.0", "http 1.1.0", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "paste", "prost 0.12.6", @@ -3701,7 +3695,7 @@ dependencies = [ "databend-common-proto-conv", "databend-common-storage", "enumflags2", - "fastrace 0.7.2", + "fastrace", "futures", "log", "mockall", @@ -3727,7 +3721,7 @@ dependencies = [ "databend-common-meta-stoerr", "databend-common-meta-types", "databend-common-proto-conv", - "fastrace 0.7.2", + "fastrace", "futures", "itertools 0.10.5", "log", @@ -3766,7 +3760,7 @@ dependencies = [ "maplit", "num-derive", "num-traits", - "opendal 0.49.0", + "opendal 0.49.2", "paste", "prost 0.12.6", "serde", @@ -3795,7 +3789,7 @@ dependencies = [ "databend-common-metrics", "databend-common-tracing", "derive_more", - "fastrace 0.7.2", + "fastrace", "futures", "itertools 0.10.5", "log", @@ -3824,7 +3818,7 @@ dependencies = [ "databend-common-meta-stoerr", "databend-common-meta-types", "databend-common-tracing", - "fastrace 0.7.2", + "fastrace", "log", "tempfile", "test-harness", @@ -3837,7 +3831,7 @@ dependencies = [ "anyhow", "async-trait", "databend-common-meta-types", - "fastrace 0.7.2", + "fastrace", "futures-util", "log", "serde", @@ -3884,7 +3878,7 @@ dependencies = [ "databend-common-meta-types", "databend-common-tracing", "derive_more", - "fastrace 0.7.2", + "fastrace", "futures", "futures-async-stream", "futures-util", @@ -3918,7 +3912,7 @@ dependencies = [ "databend-common-meta-stoerr", "databend-common-meta-types", "databend-common-tracing", - "fastrace 0.7.2", + "fastrace", "log", "openraft", "pretty_assertions", @@ -4019,7 +4013,7 @@ dependencies = [ "flate2", "futures", "lz4", - "opendal 0.49.0", + "opendal 0.49.2", "parquet-format-safe", "rand 0.8.5", "seq-macro", @@ -4040,7 +4034,7 @@ dependencies = [ "databend-common-base", "databend-common-exception", "databend-common-expression", - "fastrace 0.7.2", + "fastrace", "futures", "log", "petgraph", @@ -4113,7 +4107,7 @@ dependencies = [ "databend-common-meta-types", "databend-common-protos", "enumflags2", - "fastrace 0.7.2", + "fastrace", "maplit", "num", "pretty_assertions", @@ -4142,7 +4136,7 @@ dependencies = [ "databend-common-ast", "databend-common-exception", "derive-visitor", - "fastrace 0.7.2", + "fastrace", "goldenfile", "tokio", "unindent", @@ -4213,7 +4207,7 @@ dependencies = [ "derive-visitor", "educe 0.4.23", "enum-as-inner 0.5.1", - "fastrace 0.7.2", + "fastrace", "globiter", "indexmap 2.4.0", "itertools 0.10.5", @@ -4221,7 +4215,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "opendal 0.49.0", + "opendal 0.49.2", "parking_lot 0.12.3", "percent-encoding", "prqlc", @@ -4256,7 +4250,7 @@ dependencies = [ "flagset", "futures", "log", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "prometheus-client", "regex", @@ -4285,7 +4279,7 @@ dependencies = [ "databend-storages-common-pruner", "databend-storages-common-table-meta", "deltalake", - "fastrace 0.7.2", + "fastrace", "match-template", "object_store_opendal", "parquet", @@ -4361,14 +4355,14 @@ dependencies = [ "databend-storages-common-session", "databend-storages-common-table-meta", "enum-as-inner 0.5.1", - "fastrace 0.7.2", + "fastrace", "futures", "futures-util", "indexmap 2.4.0", "itertools 0.10.5", "jsonb", "log", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "rand 0.8.5", "serde", @@ -4409,12 +4403,12 @@ dependencies = [ "databend-common-storages-parquet", "databend-storages-common-pruner", "databend-storages-common-table-meta", - "fastrace 0.7.2", + "fastrace", "faststr", "futures", "hive_metastore", "log", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "recursive", "serde", @@ -4441,7 +4435,7 @@ dependencies = [ "databend-common-pipeline-core", "databend-common-storages-parquet", "databend-storages-common-table-meta", - "fastrace 0.7.2", + "fastrace", "futures", "iceberg", "iceberg-catalog-hms", @@ -4528,7 +4522,7 @@ dependencies = [ "databend-storages-common-table-meta", "futures-util", "log", - "opendal 0.49.0", + "opendal 0.49.2", "orc-rust", "serde", "serde_json", @@ -4565,7 +4559,7 @@ dependencies = [ "ethnum", "futures", "log", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "rand 0.8.5", "serde", @@ -4612,7 +4606,7 @@ dependencies = [ "databend-common-storages-parquet", "databend-storages-common-blocks", "databend-storages-common-table-meta", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "serde", "serde_json", @@ -4653,7 +4647,7 @@ dependencies = [ "enum-as-inner 0.6.0", "futures", "log", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "serde", "serde_json", @@ -4676,7 +4670,7 @@ dependencies = [ "databend-common-sql", "databend-common-storages-fuse", "databend-storages-common-table-meta", - "fastrace 0.7.2", + "fastrace", "futures", "log", ] @@ -4717,7 +4711,7 @@ dependencies = [ "jsonb", "log", "once_cell", - "opendal 0.49.0", + "opendal 0.49.2", "parking_lot 0.12.3", "regex", "serde", @@ -4749,7 +4743,7 @@ dependencies = [ "databend-common-base", "databend-common-exception", "defer", - "fastrace 0.7.2", + "fastrace", "fastrace-opentelemetry", "itertools 0.10.5", "libc", @@ -4971,7 +4965,7 @@ dependencies = [ "jsonb", "jwt-simple 0.11.9", "log", - "opendal 0.49.0", + "opendal 0.49.2", "tantivy", "tempfile", ] @@ -5061,7 +5055,7 @@ dependencies = [ "deepsize", "derive_more", "env_logger", - "fastrace 0.7.2", + "fastrace", "feature-set", "futures", "futures-async-stream", @@ -5110,7 +5104,7 @@ dependencies = [ "databend-common-meta-types", "databend-common-tracing", "databend-meta", - "fastrace 0.7.2", + "fastrace", "futures", "log", "rand 0.8.5", @@ -5220,7 +5214,7 @@ dependencies = [ "databend-storages-common-table-meta", "derive-visitor", "ethnum", - "fastrace 0.7.2", + "fastrace", "flatbuffers", "futures", "futures-util", @@ -5245,7 +5239,7 @@ dependencies = [ "num", "num_cpus", "once_cell", - "opendal 0.49.0", + "opendal 0.49.2", "opensrv-mysql", "opentelemetry", "opentelemetry_sdk", @@ -5418,7 +5412,7 @@ dependencies = [ "databend-common-expression", "databend-common-functions", "databend-storages-common-table-meta", - "fastrace 0.7.2", + "fastrace", "jsonb", "levenshtein_automata", "log", @@ -5445,10 +5439,10 @@ dependencies = [ "databend-common-exception", "databend-common-expression", "databend-common-metrics", - "fastrace 0.7.2", + "fastrace", "futures", "log", - "opendal 0.49.0", + "opendal 0.49.2", ] [[package]] @@ -6327,46 +6321,19 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2a2b11eda1d40935b26cf18f6833c526845ae8c41e58d09af6adeb6f0269183" -[[package]] -name = "fastrace" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfd798348670042f0628810b10bc11b767c1f42aafbd3b1ae31f282740c549fe" -dependencies = [ - "fastrace-macro 0.6.8", - "minstant", - "once_cell", - "parking_lot 0.12.3", - "pin-project", - "rand 0.8.5", - "rtrb 0.2.3", -] - [[package]] name = "fastrace" version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25767929385a5128ff5a58c77d5fef43779bb580656801f72d4d32f7a97fea28" dependencies = [ - "fastrace-macro 0.7.2", + "fastrace-macro", "minstant", "once_cell", "parking_lot 0.12.3", "pin-project", "rand 0.8.5", - "rtrb 0.3.1", -] - -[[package]] -name = "fastrace-macro" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eea27de901dc6b14aa952aeefcc41144da4dc97e4db3eb3593c19f3b80406786" -dependencies = [ - "proc-macro-error 1.0.4", - "proc-macro2", - "quote", - "syn 1.0.109", + "rtrb", ] [[package]] @@ -6387,7 +6354,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f33f99bae33fb7b952a933500e77cb9976b7672bb5d9def8e464663dfdfc107" dependencies = [ - "fastrace 0.7.2", + "fastrace", "futures", "log", "opentelemetry", @@ -9741,7 +9708,7 @@ dependencies = [ "anyhow", "colored", "crossbeam-channel", - "fastrace 0.7.2", + "fastrace", "jiff", "log", "opentelemetry", @@ -10735,9 +10702,9 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.46.0" +version = "0.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7e5902fc99e9fb9e32c93f6a67dc5cc0772dc0fb348e2ef4ce258b03666d034" +checksum = "4493c44c90a25391782b52886365d00f6be7d0f75aaecab4365279203438ebec" dependencies = [ "async-trait", "bytes", @@ -10745,7 +10712,7 @@ dependencies = [ "futures", "futures-util", "object_store", - "opendal 0.49.0", + "opendal 0.49.2", "pin-project", "tokio", ] @@ -10781,9 +10748,9 @@ dependencies = [ [[package]] name = "opendal" -version = "0.49.0" +version = "0.49.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494" +checksum = "9b04d09b9822c2f75a1d2fc513a2c1279c70e91e7407936fffdf6a6976ec530a" dependencies = [ "anyhow", "async-backtrace", @@ -10793,7 +10760,7 @@ dependencies = [ "bytes", "chrono", "crc32c", - "fastrace 0.6.8", + "fastrace", "flagset", "futures", "getrandom 0.2.15", @@ -12068,7 +12035,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -12088,7 +12055,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.58", @@ -13148,15 +13115,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "rtrb" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e704dd104faf2326a320140f70f0b736d607c1caa1b1748a6c568a79819109" -dependencies = [ - "cache-padded", -] - [[package]] name = "rtrb" version = "0.3.1" diff --git a/src/query/ast/src/parser/parser.rs b/src/query/ast/src/parser/parser.rs index 4959ad25405d..6335c012a632 100644 --- a/src/query/ast/src/parser/parser.rs +++ b/src/query/ast/src/parser/parser.rs @@ -145,6 +145,7 @@ pub fn run_parser( } /// Check that the statement can be displayed and reparsed without loss +#[allow(dead_code)] fn assert_reparse(sql: &str, stmt: StatementWithFormat) { let stmt = reset_ast(stmt); let new_sql = stmt.to_string(); @@ -162,6 +163,7 @@ fn assert_reparse(sql: &str, stmt: StatementWithFormat) { assert_eq!(stmt, new_stmt, "\nleft:\n{}\nright:\n{}", sql, new_sql); } +#[allow(dead_code)] fn reset_ast(mut stmt: StatementWithFormat) -> StatementWithFormat { #[derive(VisitorMut)] #[visitor(Range(enter), Literal(enter), ExplainKind(enter), SelectTarget(enter))] diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index cb039e615312..1312f0ccd15e 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -283,12 +283,13 @@ impl SortPipelineBuilder { if may_spill { let schema = add_order_field(sort_merge_output_schema.clone(), &self.sort_desc); let config = SpillerConfig { + spiller_type: SpillerType::OrderBy, location_prefix: query_spill_prefix( self.ctx.get_tenant().tenant_name(), &self.ctx.get_id(), ), disk_spill: None, - spiller_type: SpillerType::OrderBy, + use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; pipeline.add_transform(|input, output| { let op = DataOperator::instance().operator(); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index 876542882644..23f0472489ed 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -68,9 +68,10 @@ impl HashJoinSpiller { SpillerType::HashJoinProbe }; let spill_config = SpillerConfig { + spiller_type, location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), disk_spill: None, - spiller_type, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; let operator = DataOperator::instance().operator(); let spiller = Spiller::create(ctx.clone(), operator, spill_config)?; @@ -145,9 +146,8 @@ impl HashJoinSpiller { .partition_buffer .fetch_data_blocks(partition_id, &fetch_option)? { - let data_block = DataBlock::concat(&data_blocks)?; self.spiller - .spill_with_partition(partition_id, data_block) + .spill_with_partition(partition_id, data_blocks) .await?; } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index f0c6bd97d556..a1aa3e9a9ecf 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -270,7 +270,7 @@ where R: Rows + Sync + Send + 'static async fn spill(&mut self, block: DataBlock) -> Result<()> { debug_assert!(self.num_merge >= 2 && self.batch_rows > 0); - let location = self.spiller.spill(block).await?; + let location = self.spiller.spill(vec![block]).await?; self.unmerged_blocks.push_back(vec![location].into()); Ok(()) @@ -347,7 +347,7 @@ where R: Rows + Sync + Send + 'static let mut spilled = VecDeque::new(); while let Some(block) = merger.async_next_block().await? { - let location = self.spiller.spill(block).await?; + let location = self.spiller.spill(vec![block]).await?; spilled.push_back(location); } @@ -456,6 +456,7 @@ mod tests { use std::sync::Arc; use databend_common_base::base::tokio; + use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::block_debug::pretty_format_blocks; use databend_common_expression::types::DataType; @@ -487,9 +488,10 @@ mod tests { ) -> Result>> { let op = DataOperator::instance().operator(); let spill_config = SpillerConfig { + spiller_type: SpillerType::OrderBy, location_prefix: "_spill_test".to_string(), disk_spill: None, - spiller_type: SpillerType::OrderBy, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; let spiller = Spiller::create(ctx.clone(), op, spill_config)?; diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index e73dd63f6056..746735aa9743 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -116,9 +116,10 @@ impl TransformWindowPartitionCollect { } let spill_config = SpillerConfig { + spiller_type: SpillerType::Window, location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), disk_spill, - spiller_type: SpillerType::Window, + use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; // Create an inner `Spiller` to spill data. diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs index 28c6b9b2068d..6c4f9a2a8984 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs @@ -17,9 +17,9 @@ use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_settings::Settings; +use crate::spillers::MergedPartition; use crate::spillers::PartitionBuffer; use crate::spillers::PartitionBufferFetchOption; -use crate::spillers::SpilledData; use crate::spillers::Spiller; /// The `WindowPartitionBuffer` is used to control memory usage of Window operator. @@ -33,7 +33,7 @@ pub struct WindowPartitionBuffer { can_spill: bool, next_to_restore_partition_id: isize, spilled_small_partitions: Vec>, - spilled_merged_partitions: Vec<(SpilledData, bool, bool)>, + spilled_merged_partitions: Vec<(MergedPartition, bool, bool)>, } impl WindowPartitionBuffer { @@ -110,7 +110,7 @@ impl WindowPartitionBuffer { { return self .spiller - .spill_with_partition(partition_id, DataBlock::concat(&data_blocks)?) + .spill_with_partition(partition_id, data_blocks) .await; } } @@ -127,8 +127,7 @@ impl WindowPartitionBuffer { .partition_buffer .fetch_data_blocks(partition_id, &option)? { - let data_block = DataBlock::concat(&data_blocks)?; - partitions_to_spill.push((partition_id, data_block)); + partitions_to_spill.push((partition_id, data_blocks)); accumulated_bytes += partition_memory_size; } if accumulated_bytes >= spill_unit_size { @@ -137,34 +136,20 @@ impl WindowPartitionBuffer { } } - if accumulated_bytes > 0 { - let spilled_data = self - .spiller - .spill_with_merged_partitions(partitions_to_spill) - .await?; - if let SpilledData::MergedPartition { - location, - partitions, - } = spilled_data - { - let index = self.spilled_merged_partitions.len(); - for partition in partitions.iter() { - self.spilled_small_partitions[partition.0].push(index); - } - self.spilled_merged_partitions.push(( - SpilledData::MergedPartition { - location, - partitions, - }, - false, - false, - )); - return Ok(()); - } + if accumulated_bytes == 0 { + self.can_spill = false; + return Ok(()); } - self.can_spill = false; - + let spilled = self + .spiller + .spill_with_merged_partitions(partitions_to_spill) + .await?; + let index = self.spilled_merged_partitions.len(); + for (id, _) in &spilled.partitions { + self.spilled_small_partitions[*id].push(index); + } + self.spilled_merged_partitions.push((spilled, false, false)); Ok(()) } @@ -186,35 +171,31 @@ impl WindowPartitionBuffer { if *restored { continue; } - if let SpilledData::MergedPartition { + let MergedPartition { location, partitions, - } = merged_partitions - { - if out_of_memory_limit || *partial_restored { - if let Some(pos) = partitions.iter().position(|p| p.0 == partition_id) { - let data_range = &partitions[pos].1; - let columns_layout = &partitions[pos].2; - let data_block = self - .spiller - .read_range(location, data_range.clone(), columns_layout) - .await?; - self.restored_partition_buffer - .add_data_block(partition_id, data_block); - partitions.remove(pos); - *partial_restored = true; - } - } else { - let partitioned_data = self + } = merged_partitions; + if out_of_memory_limit || *partial_restored { + if let Some(pos) = partitions.iter().position(|(id, _)| *id == partition_id) { + let data_block = self .spiller - .read_merged_partitions(merged_partitions) + .read_chunk(location, &partitions[pos].1) .await?; - for (partition_id, data_block) in partitioned_data.into_iter() { - self.restored_partition_buffer - .add_data_block(partition_id, data_block); - } - *restored = true; + self.restored_partition_buffer + .add_data_block(partition_id, data_block); + partitions.remove(pos); + *partial_restored = true; + } + } else { + let partitioned_data = self + .spiller + .read_merged_partitions(merged_partitions) + .await?; + for (partition_id, data_block) in partitioned_data.into_iter() { + self.restored_partition_buffer + .add_data_block(partition_id, data_block); } + *restored = true; } } diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index f867e56749f9..b51bb3a765fa 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod partition_buffer; +mod serialize; mod spiller; pub use partition_buffer::PartitionBuffer; diff --git a/src/query/service/src/spillers/serialize.rs b/src/query/service/src/spillers/serialize.rs new file mode 100644 index 000000000000..ee88b2c23e7d --- /dev/null +++ b/src/query/service/src/spillers/serialize.rs @@ -0,0 +1,239 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Write; +use std::sync::Arc; + +use buf_list::BufList; +use buf_list::Cursor; +use bytes::Buf; +use databend_common_base::base::Alignment; +use databend_common_base::base::DmaWriteBuf; +use databend_common_exception::Result; +use databend_common_expression::arrow::read_column; +use databend_common_expression::arrow::write_column; +use databend_common_expression::converts::arrow::table_schema_to_arrow_schema; +use databend_common_expression::infer_table_schema; +use databend_common_expression::DataBlock; +use databend_common_expression::DataField; +use databend_common_expression::DataSchema; +use opendal::Buffer; +use parquet::arrow::arrow_reader::ParquetRecordBatchReader; +use parquet::arrow::ArrowWriter; +use parquet::basic::Compression; +use parquet::errors; +use parquet::file::properties::EnabledStatistics; +use parquet::file::properties::WriterProperties; +use parquet::file::reader::ChunkReader; +use parquet::file::reader::Length; +use parquet::format::FileMetaData; + +#[derive(Clone)] +pub enum Layout { + ArrowIpc(Box<[usize]>), + Parquet, +} + +pub(super) struct BlocksEncoder { + pub(super) use_parquet: bool, + pub(super) buf: DmaWriteBuf, + pub(super) offsets: Vec, + pub(super) columns_layout: Vec, +} + +impl BlocksEncoder { + pub(super) fn new(use_parquet: bool, align: Alignment, chunk: usize) -> Self { + Self { + use_parquet, + buf: DmaWriteBuf::new(align, chunk), + offsets: vec![0], + columns_layout: Vec::new(), + } + } + + pub(super) fn add_blocks(&mut self, blocks: Vec) { + let layout = if self.use_parquet { + // Currently we splice multiple complete parquet files into one, + // so that the file contains duplicate headers/footers and metadata, + // which can lead to file bloat. A better approach would be for the entire file to be ONE parquet, + // with each group of blocks (i.e. Chunk) corresponding to one or more row groupsx + bare_blocks_to_parquet(blocks, &mut self.buf).unwrap(); + Layout::Parquet + } else { + let block = DataBlock::concat(&blocks).unwrap(); + let columns_layout = std::iter::once(self.size()) + .chain(block.columns().iter().map(|entry| { + let column = entry + .value + .convert_to_full_column(&entry.data_type, block.num_rows()); + write_column(&column, &mut self.buf).unwrap(); + self.size() + })) + .map_windows(|x: &[_; 2]| x[1] - x[0]) + .collect::>() + .into_boxed_slice(); + + Layout::ArrowIpc(columns_layout) + }; + + self.columns_layout.push(layout); + self.offsets.push(self.size()) + } + + pub(super) fn size(&self) -> usize { + self.buf.size() + } +} + +pub(super) fn deserialize_block(columns_layout: &Layout, mut data: Buffer) -> DataBlock { + match columns_layout { + Layout::ArrowIpc(layout) => { + let columns = layout + .iter() + .map(|&layout| { + let ls = BufList::from_iter(data.slice(0..layout)); + data.advance(layout); + let mut cursor = Cursor::new(ls); + read_column(&mut cursor).unwrap() + }) + .collect::>(); + + DataBlock::new_from_columns(columns) + } + Layout::Parquet => bare_blocks_from_parquet(Reader(data)).unwrap(), + } +} + +fn fake_data_schema(block: &DataBlock) -> DataSchema { + let fields = block + .columns() + .iter() + .enumerate() + .map(|(idx, arg)| DataField::new(&format!("arg{}", idx + 1), arg.data_type.clone())) + .collect::>(); + DataSchema::new(fields) +} + +/// Deserialize bare data block from parquet format. +fn bare_blocks_from_parquet(data: R) -> Result { + let reader = ParquetRecordBatchReader::try_new(data, usize::MAX)?; + let mut blocks = Vec::new(); + for record_batch in reader { + let record_batch = record_batch?; + let schema = DataSchema::try_from(record_batch.schema().as_ref())?; + let (block, _) = DataBlock::from_record_batch(&schema, &record_batch)?; + blocks.push(block); + } + DataBlock::concat(&blocks) +} + +/// Serialize bare data blocks to parquet format. +fn bare_blocks_to_parquet( + blocks: Vec, + write_buffer: W, +) -> Result { + assert!(!blocks.is_empty()); + + let data_schema = fake_data_schema(blocks.first().unwrap()); + let table_schema = infer_table_schema(&data_schema)?; + + let props = WriterProperties::builder() + .set_compression(Compression::LZ4_RAW) + .set_statistics_enabled(EnabledStatistics::None) + .set_bloom_filter_enabled(false) + .build(); + let batches = blocks + .into_iter() + .map(|block| block.to_record_batch(&table_schema)) + .collect::>>()?; + let arrow_schema = Arc::new(table_schema_to_arrow_schema(&table_schema)); + let mut writer = ArrowWriter::try_new(write_buffer, arrow_schema, Some(props))?; + for batch in batches { + writer.write(&batch)?; + } + let file_meta = writer.close()?; + Ok(file_meta) +} + +pub struct Reader(pub Buffer); + +impl Length for Reader { + fn len(&self) -> u64 { + self.0.len() as u64 + } +} + +impl ChunkReader for Reader { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64) -> errors::Result { + let start = start as usize; + if start > self.0.remaining() { + return Err(errors::ParquetError::IndexOutOfBound( + start, + self.0.remaining(), + )); + } + let mut r = self.0.clone(); + r.advance(start); + Ok(r.reader()) + } + + fn get_bytes(&self, start: u64, length: usize) -> errors::Result { + let start = start as usize; + Ok(self.0.slice(start..start + length).to_bytes()) + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use databend_common_expression::block_debug::assert_block_value_eq; + use databend_common_expression::types::Int64Type; + use databend_common_expression::types::StringType; + use databend_common_expression::FromData; + + use super::*; + + #[test] + fn test_serde_bin_column() -> Result<()> { + let blocks = vec![ + [ + StringType::from_data(vec!["SM CASE", "a"]), + StringType::from_data(vec!["SM CASE", "axx"]), + Int64Type::from_data(vec![1, 3]), + ], + [ + StringType::from_data(vec!["b", "e", "f", "g"]), + StringType::from_data(vec!["", "", "", "x"]), + Int64Type::from_data(vec![99, 7, 3, 4]), + ], + ] + .into_iter() + .map(|columns| DataBlock::new_from_columns(columns.to_vec())) + .collect::>(); + + let mut data = Vec::new(); + bare_blocks_to_parquet(blocks.clone(), &mut data)?; + + let reader = Reader(Buffer::from(Bytes::from(data))); + + let got = bare_blocks_from_parquet(reader)?; + let want = DataBlock::concat(&blocks)?; + + assert_block_value_eq(&want, &got); + + Ok(()) + } +} diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 964b7ff4b04b..6246d86a2ffc 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -20,9 +20,6 @@ use std::ops::Range; use std::sync::Arc; use std::time::Instant; -use buf_list::BufList; -use buf_list::Cursor; -use bytes::Buf; use bytes::Bytes; use databend_common_base::base::dma_buffer_as_vec; use databend_common_base::base::dma_read_file_range; @@ -34,14 +31,13 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; -use databend_common_expression::arrow::read_column; -use databend_common_expression::arrow::write_column; use databend_common_expression::DataBlock; use databend_storages_common_cache::TempDir; use databend_storages_common_cache::TempPath; use opendal::Buffer; use opendal::Operator; +use super::serialize::*; use crate::sessions::QueryContext; /// Spiller type, currently only supports HashJoin @@ -69,9 +65,10 @@ impl Display for SpillerType { /// Spiller configuration #[derive(Clone)] pub struct SpillerConfig { + pub spiller_type: SpillerType, pub location_prefix: String, pub disk_spill: Option, - pub spiller_type: SpillerType, + pub use_parquet: bool, } #[derive(Clone)] @@ -94,12 +91,13 @@ pub struct Spiller { temp_dir: Option>, // for dio disabled local_operator: Option, + use_parquet: bool, _spiller_type: SpillerType, pub join_spilling_partition_bits: usize, /// 1 partition -> N partition files pub partition_location: HashMap>, /// Record columns layout for spilled data, will be used when read data from disk - pub columns_layout: HashMap>, + pub columns_layout: HashMap, /// Record how many bytes have been spilled for each partition. pub partition_spilled_bytes: HashMap, } @@ -116,6 +114,7 @@ impl Spiller { location_prefix, disk_spill, spiller_type, + use_parquet, } = config; let (temp_dir, local_operator) = match disk_spill { @@ -132,6 +131,7 @@ impl Spiller { location_prefix, temp_dir, local_operator, + use_parquet, _spiller_type: spiller_type, join_spilling_partition_bits: settings.get_join_spilling_partition_bits()?, partition_location: Default::default(), @@ -145,12 +145,13 @@ impl Spiller { } /// Spill a [`DataBlock`] to storage. - pub async fn spill(&mut self, data_block: DataBlock) -> Result { + pub async fn spill(&mut self, data_block: Vec) -> Result { + debug_assert!(!data_block.is_empty()); let instant = Instant::now(); // Spill data to storage. let mut encoder = self.block_encoder(); - encoder.add_block(data_block); + encoder.add_blocks(data_block); let data_size = encoder.size(); let BlocksEncoder { buf, @@ -161,10 +162,7 @@ impl Spiller { let location = self.write_encodes(data_size, buf).await?; // Record statistics. - match location { - Location::Remote(_) => record_remote_write_profile(&instant, data_size), - Location::Local(_) => record_local_write_profile(&instant, data_size), - } + record_write_profile(&location, &instant, data_size); // Record columns layout for spilled data. self.columns_layout @@ -178,19 +176,25 @@ impl Spiller { pub async fn spill_with_partition( &mut self, partition_id: usize, - data: DataBlock, + data: Vec, ) -> Result<()> { + let (num_rows, memory_size) = data + .iter() + .map(|b| (b.num_rows(), b.memory_size())) + .reduce(|acc, x| (acc.0 + x.0, acc.1 + x.1)) + .unwrap(); + let progress_val = ProgressValues { - rows: data.num_rows(), - bytes: data.memory_size(), + rows: num_rows, + bytes: memory_size, }; self.partition_spilled_bytes .entry(partition_id) .and_modify(|bytes| { - *bytes += data.memory_size() as u64; + *bytes += memory_size as u64; }) - .or_insert(data.memory_size() as u64); + .or_insert(memory_size as u64); let location = self.spill(data).await?; self.partition_location @@ -206,14 +210,14 @@ impl Spiller { pub async fn spill_with_merged_partitions( &mut self, - partitioned_data: Vec<(usize, DataBlock)>, - ) -> Result { + partitioned_data: Vec<(usize, Vec)>, + ) -> Result { // Serialize data block. let mut encoder = self.block_encoder(); let mut partition_ids = Vec::new(); - for (partition_id, data_block) in partitioned_data.into_iter() { + for (partition_id, data_blocks) in partitioned_data.into_iter() { partition_ids.push(partition_id); - encoder.add_block(data_block); + encoder.add_blocks(data_blocks); } let write_bytes = encoder.size(); @@ -232,7 +236,7 @@ impl Spiller { .map(|x| x[0]..x[1]) .zip(columns_layout.into_iter()), ) - .map(|(id, (range, layout))| (id, range, layout)) + .map(|(id, (range, layout))| (id, Chunk { range, layout })) .collect(); // Spill data to storage. @@ -240,12 +244,9 @@ impl Spiller { let location = self.write_encodes(write_bytes, buf).await?; // Record statistics. - match location { - Location::Remote(_) => record_remote_write_profile(&instant, write_bytes), - Location::Local(_) => record_local_write_profile(&instant, write_bytes), - } + record_write_profile(&location, &instant, write_bytes); - Ok(SpilledData::MergedPartition { + Ok(MergedPartition { location, partitions, }) @@ -258,29 +259,34 @@ impl Spiller { // Read spilled data from storage. let instant = Instant::now(); - let data = match (location, &self.local_operator) { - (Location::Local(path), None) => { - debug_assert_eq!(path.size(), columns_layout.iter().sum::()); - let file_size = path.size(); - let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - Buffer::from(dma_buffer_as_vec(buf)).slice(range) - } - (Location::Local(path), Some(ref local)) => { - debug_assert_eq!(path.size(), columns_layout.iter().sum::()); - local - .read(path.file_name().unwrap().to_str().unwrap()) - .await? + let data = match location { + Location::Local(path) => { + match columns_layout { + Layout::ArrowIpc(layout) => { + debug_assert_eq!(path.size(), layout.iter().sum::()) + } + Layout::Parquet => {} + } + + match self.local_operator { + Some(ref local) => { + local + .read(path.file_name().unwrap().to_str().unwrap()) + .await? + } + None => { + let file_size = path.size(); + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) + } + } } - (Location::Remote(loc), _) => self.operator.read(loc).await?, + Location::Remote(loc) => self.operator.read(loc).await?, }; - match location { - Location::Remote(_) => record_remote_read_profile(&instant, data.len()), - Location::Local(_) => record_local_read_profile(&instant, data.len()), - } + record_read_profile(location, &instant, data.len()); - let block = deserialize_block(columns_layout, data); - Ok(block) + Ok(deserialize_block(columns_layout, data)) } #[async_backtrace::framed] @@ -303,88 +309,77 @@ impl Spiller { pub async fn read_merged_partitions( &self, - merged_partitions: &SpilledData, - ) -> Result> { - if let SpilledData::MergedPartition { + MergedPartition { location, partitions, - } = merged_partitions - { - // Read spilled data from storage. - let instant = Instant::now(); - - let data = match (location, &self.local_operator) { - (Location::Local(path), None) => { - let file_size = path.size(); - debug_assert_eq!( - file_size, - if let Some((_, range, _)) = partitions.last() { - range.end - } else { - 0 - } - ); - - let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - Buffer::from(dma_buffer_as_vec(buf)).slice(range) - } - (Location::Local(path), Some(ref local)) => { - local - .read(path.file_name().unwrap().to_str().unwrap()) - .await? - } - (Location::Remote(loc), _) => self.operator.read(loc).await?, - }; - - // Record statistics. - match location { - Location::Remote(_) => record_remote_read_profile(&instant, data.len()), - Location::Local(_) => record_local_read_profile(&instant, data.len()), - }; - - // Deserialize partitioned data block. - let partitioned_data = partitions - .iter() - .map(|(partition_id, range, columns_layout)| { - let block = deserialize_block(columns_layout, data.slice(range.clone())); - (*partition_id, block) - }) - .collect(); - - return Ok(partitioned_data); - } - Ok(vec![]) - } - - pub async fn read_range( - &self, - location: &Location, - data_range: Range, - columns_layout: &[usize], - ) -> Result { + }: &MergedPartition, + ) -> Result> { // Read spilled data from storage. let instant = Instant::now(); - let data_range = data_range.start as u64..data_range.end as u64; let data = match (location, &self.local_operator) { (Location::Local(path), None) => { - let (buf, range) = dma_read_file_range(path, data_range).await?; + let file_size = path.size(); + debug_assert_eq!( + file_size, + if let Some((_, Chunk { range, .. })) = partitions.last() { + range.end + } else { + 0 + } + ); + + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; Buffer::from(dma_buffer_as_vec(buf)).slice(range) } (Location::Local(path), Some(ref local)) => { local - .read_with(path.file_name().unwrap().to_str().unwrap()) - .range(data_range) + .read(path.file_name().unwrap().to_str().unwrap()) .await? } - (Location::Remote(loc), _) => self.operator.read_with(loc).range(data_range).await?, + (Location::Remote(loc), _) => self.operator.read(loc).await?, }; - match location { - Location::Remote(_) => record_remote_read_profile(&instant, data.len()), - Location::Local(_) => record_local_read_profile(&instant, data.len()), - } - Ok(deserialize_block(columns_layout, data)) + // Record statistics. + record_read_profile(location, &instant, data.len()); + + // Deserialize partitioned data block. + let partitioned_data = partitions + .iter() + .map(|(partition_id, Chunk { range, layout })| { + let block = deserialize_block(layout, data.slice(range.clone())); + (*partition_id, block) + }) + .collect(); + + Ok(partitioned_data) + } + + pub async fn read_chunk(&self, location: &Location, chunk: &Chunk) -> Result { + // Read spilled data from storage. + let instant = Instant::now(); + let Chunk { range, layout } = chunk; + let data_range = range.start as u64..range.end as u64; + + let data = match location { + Location::Local(path) => match &self.local_operator { + Some(ref local) => { + local + .read_with(path.file_name().unwrap().to_str().unwrap()) + .range(data_range) + .await? + } + None => { + let (buf, range) = dma_read_file_range(path, data_range).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) + } + }, + Location::Remote(loc) => self.operator.read_with(loc).range(data_range).await?, + }; + + record_read_profile(location, &instant, data.len()); + + Ok(deserialize_block(layout, data)) } async fn write_encodes(&mut self, size: usize, buf: DmaWriteBuf) -> Result { @@ -431,7 +426,7 @@ impl Spiller { .as_ref() .map(|dir| dir.block_alignment()) .unwrap_or(Alignment::MIN); - BlocksEncoder::new(align, 8 * 1024 * 1024) + BlocksEncoder::new(self.use_parquet, align, 8 * 1024 * 1024) } pub(crate) fn spilled_files(&self) -> Vec { @@ -439,12 +434,14 @@ impl Spiller { } } -pub enum SpilledData { - Partition(Location), - MergedPartition { - location: Location, - partitions: Vec<(usize, Range, Vec)>, - }, +pub struct MergedPartition { + pub location: Location, + pub partitions: Vec<(usize, Chunk)>, +} + +pub struct Chunk { + pub range: Range, + pub layout: Layout, } #[derive(Debug, Clone, Hash, PartialEq, Eq)] @@ -453,88 +450,47 @@ pub enum Location { Local(TempPath), } -struct BlocksEncoder { - buf: DmaWriteBuf, - offsets: Vec, - columns_layout: Vec>, -} - -impl BlocksEncoder { - fn new(align: Alignment, chunk: usize) -> Self { - Self { - buf: DmaWriteBuf::new(align, chunk), - offsets: vec![0], - columns_layout: Vec::new(), +fn record_write_profile(location: &Location, start: &Instant, write_bytes: usize) { + match location { + Location::Remote(_) => { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, + start.elapsed().as_millis() as usize, + ); + } + Location::Local(_) => { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteBytes, write_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillWriteTime, + start.elapsed().as_millis() as usize, + ); } } - - fn add_block(&mut self, block: DataBlock) { - let columns_layout = std::iter::once(self.size()) - .chain(block.columns().iter().map(|entry| { - let column = entry - .value - .convert_to_full_column(&entry.data_type, block.num_rows()); - write_column(&column, &mut self.buf).unwrap(); - self.size() - })) - .map_windows(|x: &[_; 2]| x[1] - x[0]) - .collect(); - - self.columns_layout.push(columns_layout); - self.offsets.push(self.size()) - } - - fn size(&self) -> usize { - self.buf.size() - } -} - -pub fn deserialize_block(columns_layout: &[usize], mut data: Buffer) -> DataBlock { - let columns = columns_layout - .iter() - .map(|&layout| { - let ls = BufList::from_iter(data.slice(0..layout)); - data.advance(layout); - let mut cursor = Cursor::new(ls); - read_column(&mut cursor).unwrap() - }) - .collect::>(); - - DataBlock::new_from_columns(columns) -} - -pub fn record_remote_write_profile(start: &Instant, write_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteBytes, write_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillWriteTime, - start.elapsed().as_millis() as usize, - ); -} - -pub fn record_remote_read_profile(start: &Instant, read_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillReadTime, - start.elapsed().as_millis() as usize, - ); } -pub fn record_local_write_profile(start: &Instant, write_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteBytes, write_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::LocalSpillWriteTime, - start.elapsed().as_millis() as usize, - ); -} - -pub fn record_local_read_profile(start: &Instant, read_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::LocalSpillReadTime, - start.elapsed().as_millis() as usize, - ); +fn record_read_profile(location: &Location, start: &Instant, read_bytes: usize) { + match location { + Location::Remote(_) => { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillReadTime, + start.elapsed().as_millis() as usize, + ); + } + Location::Local(_) => { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillReadTime, + start.elapsed().as_millis() as usize, + ); + } + } } diff --git a/src/query/service/tests/it/spillers/spiller.rs b/src/query/service/tests/it/spillers/spiller.rs index ad9779a7d615..52319f6a667b 100644 --- a/src/query/service/tests/it/spillers/spiller.rs +++ b/src/query/service/tests/it/spillers/spiller.rs @@ -39,9 +39,10 @@ async fn test_spill_with_partition() -> Result<()> { let ctx = fixture.new_query_ctx().await?; let tenant = ctx.get_tenant(); let spiller_config = SpillerConfig { + spiller_type: SpillerType::HashJoinBuild, location_prefix: query_spill_prefix(tenant.tenant_name(), &ctx.get_id()), disk_spill: None, - spiller_type: SpillerType::HashJoinBuild, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; let operator = DataOperator::instance().operator(); @@ -53,7 +54,7 @@ async fn test_spill_with_partition() -> Result<()> { Int32Type::from_data((1..101).collect::>()), ]); - let res = spiller.spill_with_partition(0, data).await; + let res = spiller.spill_with_partition(0, vec![data]).await; assert!(res.is_ok()); let location = &spiller.partition_location.get(&0).unwrap()[0]; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 7c3b09dcc5ed..88e72e9db513 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -23,6 +23,8 @@ use databend_common_exception::Result; use databend_common_meta_app::principal::UserSettingValue; use once_cell::sync::OnceCell; +use super::settings_getter_setter::SpillFileFormat; + static DEFAULT_SETTINGS: OnceCell> = OnceCell::new(); // Default value of cost factor settings @@ -309,6 +311,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("spilling_file_format", DefaultSettingValue { + value: UserSettingValue::String("parquet".to_string()), + desc: "Set the storage file format for spilling.", + mode: SettingMode::Both, + range: Some(SettingRange::String(SpillFileFormat::range())), + }), ("spilling_to_disk_vacuum_unknown_temp_dirs_limit", DefaultSettingValue { value: UserSettingValue::UInt64(u64::MAX), desc: "Set the maximum number of directories to clean up. If there are some temporary dirs when another query is unexpectedly interrupted, which needs to be cleaned up after this query.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index cb45b3d00f34..8938fcd616dd 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; + use databend_common_ast::parser::Dialect; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -31,6 +33,41 @@ pub enum FlightCompression { Zstd, } +#[derive(Clone, Copy)] +pub enum SpillFileFormat { + Arrow, + Parquet, +} + +impl SpillFileFormat { + pub fn range() -> Vec { + ["arrow", "parquet"] + .iter() + .copied() + .map(String::from) + .collect() + } + + pub fn is_parquet(&self) -> bool { + matches!(self, SpillFileFormat::Parquet) + } +} + +impl FromStr for SpillFileFormat { + type Err = ErrorCode; + + fn from_str(s: &str) -> std::result::Result { + match s { + "arrow" => Ok(SpillFileFormat::Arrow), + "parquet" => Ok(Self::Parquet), + _ => Err(ErrorCode::InvalidConfig(format!( + "invalid SpillFileFormat: {:?}", + s + ))), + } + } +} + impl Settings { // Get u64 value, we don't get from the metasrv. fn try_get_u64(&self, key: &str) -> Result { @@ -290,6 +327,10 @@ impl Settings { Ok(self.try_get_u64("join_spilling_buffer_threshold_per_proc_mb")? as usize) } + pub fn get_spilling_file_format(&self) -> Result { + self.try_get_string("spilling_file_format")?.parse() + } + pub fn get_spilling_to_disk_vacuum_unknown_temp_dirs_limit(&self) -> Result { Ok(self.try_get_u64("spilling_to_disk_vacuum_unknown_temp_dirs_limit")? as usize) } diff --git a/src/query/storages/common/blocks/src/lib.rs b/src/query/storages/common/blocks/src/lib.rs index 1af2fc63dfed..0434a7671b1d 100644 --- a/src/query/storages/common/blocks/src/lib.rs +++ b/src/query/storages/common/blocks/src/lib.rs @@ -15,5 +15,5 @@ #![allow(clippy::uninlined_format_args)] mod parquet_rs; -pub use parquet_rs::blocks_to_parquet; +pub use parquet_rs::*; pub mod memory;