From 50ce5b4edf67b2b4d7a09798b85ddf57ae845774 Mon Sep 17 00:00:00 2001 From: coldWater Date: Mon, 14 Oct 2024 14:47:22 +0800 Subject: [PATCH 1/5] bare_blocks_to_parquet Signed-off-by: coldWater --- Cargo.lock | 1 + src/query/storages/common/blocks/Cargo.toml | 1 + src/query/storages/common/blocks/src/lib.rs | 2 +- .../storages/common/blocks/src/parquet_rs.rs | 85 ++++++++++++++++++- 4 files changed, 86 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6f1aee416be..29e1639ea336 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5368,6 +5368,7 @@ dependencies = [ name = "databend-storages-common-blocks" version = "0.1.0" dependencies = [ + "bytes", "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index a713100df441..3487846b5ff3 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -11,6 +11,7 @@ doctest = false test = true [dependencies] +bytes = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-storages-common-table-meta = { workspace = true } diff --git a/src/query/storages/common/blocks/src/lib.rs b/src/query/storages/common/blocks/src/lib.rs index 1af2fc63dfed..0434a7671b1d 100644 --- a/src/query/storages/common/blocks/src/lib.rs +++ b/src/query/storages/common/blocks/src/lib.rs @@ -15,5 +15,5 @@ #![allow(clippy::uninlined_format_args)] mod parquet_rs; -pub use parquet_rs::blocks_to_parquet; +pub use parquet_rs::*; pub mod memory; diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index c002896a74df..522f40af385b 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -12,24 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Write; use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::converts::arrow::table_schema_to_arrow_schema; +use databend_common_expression::infer_table_schema; use databend_common_expression::DataBlock; +use databend_common_expression::DataField; +use databend_common_expression::DataSchema; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::table::TableCompression; +use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::ArrowWriter; use parquet::basic::Encoding; use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; +use parquet::file::reader::ChunkReader; use parquet::format::FileMetaData; /// Serialize data blocks to parquet format. -pub fn blocks_to_parquet( +pub fn blocks_to_parquet( table_schema: &TableSchema, blocks: Vec, - write_buffer: &mut Vec, + write_buffer: W, compression: TableCompression, ) -> Result { assert!(!blocks.is_empty()); @@ -54,3 +60,78 @@ pub fn blocks_to_parquet( let file_meta = writer.close()?; Ok(file_meta) } + +/// Serialize bare data blocks to parquet format. +pub fn bare_blocks_to_parquet( + blocks: Vec, + write_buffer: W, + compression: TableCompression, +) -> Result { + let data_schema = fake_data_schema(blocks.first().unwrap()); + let table_schema = infer_table_schema(&data_schema)?; + + blocks_to_parquet(&table_schema, blocks, write_buffer, compression) +} + +fn fake_data_schema(block: &DataBlock) -> DataSchema { + let fields = block + .columns() + .iter() + .enumerate() + .map(|(idx, arg)| DataField::new(&format!("arg{}", idx + 1), arg.data_type.clone())) + .collect::>(); + DataSchema::new(fields) +} + +/// Deserialize bare data block from parquet format. +pub fn bare_blocks_from_parquet(data: R) -> Result { + let reader = ParquetRecordBatchReader::try_new(data, usize::MAX)?; + let mut blocks = Vec::with_capacity(1); + for record_batch in reader { + let record_batch = record_batch?; + let schema = DataSchema::try_from(record_batch.schema().as_ref())?; + let (block, _) = DataBlock::from_record_batch(&schema, &record_batch)?; + blocks.push(block); + } + DataBlock::concat(&blocks) +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use databend_common_expression::block_debug::assert_block_value_eq; + use databend_common_expression::types::Int64Type; + use databend_common_expression::types::StringType; + use databend_common_expression::FromData; + + use super::*; + + #[test] + fn test_serde_bin_column() -> Result<()> { + let blocks = vec![ + [ + StringType::from_data(vec!["SM CASE", "a"]), + StringType::from_data(vec!["SM CASE", "axx"]), + Int64Type::from_data(vec![1, 3]), + ], + [ + StringType::from_data(vec!["b", "e", "f", "g"]), + StringType::from_data(vec!["", "", "", "x"]), + Int64Type::from_data(vec![99, 7, 3, 4]), + ], + ] + .into_iter() + .map(|columns| DataBlock::new_from_columns(columns.to_vec())) + .collect::>(); + + let mut data = Vec::new(); + bare_blocks_to_parquet(blocks.clone(), &mut data, TableCompression::LZ4)?; + + let got = bare_blocks_from_parquet(Bytes::from(data))?; + let want = DataBlock::concat(&blocks)?; + + assert_block_value_eq(&want, &got); + + Ok(()) + } +} From 321e70667656a8b859bbaf81f7f7b26cd115f9d8 Mon Sep 17 00:00:00 2001 From: coldWater Date: Mon, 14 Oct 2024 19:27:14 +0800 Subject: [PATCH 2/5] ChunkReader Signed-off-by: coldWater --- Cargo.lock | 1 + .../partition/window_partition_buffer.rs | 3 +- src/query/service/src/spillers/spiller.rs | 100 ++++++++++++------ src/query/storages/common/blocks/Cargo.toml | 1 + .../storages/common/blocks/src/parquet_rs.rs | 38 ++++++- 5 files changed, 107 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 29e1639ea336..75f106828873 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5372,6 +5372,7 @@ dependencies = [ "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", + "opendal 0.49.0", "parking_lot 0.12.3", "parquet", ] diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs index 28c6b9b2068d..d98c0dc04f2f 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs @@ -127,8 +127,7 @@ impl WindowPartitionBuffer { .partition_buffer .fetch_data_blocks(partition_id, &option)? { - let data_block = DataBlock::concat(&data_blocks)?; - partitions_to_spill.push((partition_id, data_block)); + partitions_to_spill.push((partition_id, data_blocks)); accumulated_bytes += partition_memory_size; } if accumulated_bytes >= spill_unit_size { diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 964b7ff4b04b..eb392c3472ed 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -37,8 +37,12 @@ use databend_common_exception::Result; use databend_common_expression::arrow::read_column; use databend_common_expression::arrow::write_column; use databend_common_expression::DataBlock; +use databend_storages_common_blocks::bare_blocks_from_parquet; +use databend_storages_common_blocks::bare_blocks_to_parquet; +use databend_storages_common_blocks::Reader; use databend_storages_common_cache::TempDir; use databend_storages_common_cache::TempPath; +use databend_storages_common_table_meta::table::TableCompression; use opendal::Buffer; use opendal::Operator; @@ -99,11 +103,17 @@ pub struct Spiller { /// 1 partition -> N partition files pub partition_location: HashMap>, /// Record columns layout for spilled data, will be used when read data from disk - pub columns_layout: HashMap>, + pub columns_layout: HashMap, /// Record how many bytes have been spilled for each partition. pub partition_spilled_bytes: HashMap, } +#[derive(Clone)] +pub enum Layout { + ArrowIpc(Box<[usize]>), + Parquet, +} + impl Spiller { /// Create a new spiller pub fn create( @@ -150,7 +160,7 @@ impl Spiller { // Spill data to storage. let mut encoder = self.block_encoder(); - encoder.add_block(data_block); + encoder.add_blocks(vec![data_block]); let data_size = encoder.size(); let BlocksEncoder { buf, @@ -206,14 +216,14 @@ impl Spiller { pub async fn spill_with_merged_partitions( &mut self, - partitioned_data: Vec<(usize, DataBlock)>, + partitioned_data: Vec<(usize, Vec)>, ) -> Result { // Serialize data block. let mut encoder = self.block_encoder(); let mut partition_ids = Vec::new(); - for (partition_id, data_block) in partitioned_data.into_iter() { + for (partition_id, data_blocks) in partitioned_data.into_iter() { partition_ids.push(partition_id); - encoder.add_block(data_block); + encoder.add_blocks(data_blocks); } let write_bytes = encoder.size(); @@ -260,13 +270,23 @@ impl Spiller { let instant = Instant::now(); let data = match (location, &self.local_operator) { (Location::Local(path), None) => { - debug_assert_eq!(path.size(), columns_layout.iter().sum::()); + match columns_layout { + Layout::ArrowIpc(layout) => { + debug_assert_eq!(path.size(), layout.iter().sum::()) + } + Layout::Parquet => {} + } let file_size = path.size(); let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; Buffer::from(dma_buffer_as_vec(buf)).slice(range) } (Location::Local(path), Some(ref local)) => { - debug_assert_eq!(path.size(), columns_layout.iter().sum::()); + match columns_layout { + Layout::ArrowIpc(layout) => { + debug_assert_eq!(path.size(), layout.iter().sum::()) + } + Layout::Parquet => {} + } local .read(path.file_name().unwrap().to_str().unwrap()) .await? @@ -360,7 +380,7 @@ impl Spiller { &self, location: &Location, data_range: Range, - columns_layout: &[usize], + columns_layout: &Layout, ) -> Result { // Read spilled data from storage. let instant = Instant::now(); @@ -443,7 +463,7 @@ pub enum SpilledData { Partition(Location), MergedPartition { location: Location, - partitions: Vec<(usize, Range, Vec)>, + partitions: Vec<(usize, Range, Layout)>, }, } @@ -456,7 +476,7 @@ pub enum Location { struct BlocksEncoder { buf: DmaWriteBuf, offsets: Vec, - columns_layout: Vec>, + columns_layout: Vec, } impl BlocksEncoder { @@ -468,19 +488,28 @@ impl BlocksEncoder { } } - fn add_block(&mut self, block: DataBlock) { - let columns_layout = std::iter::once(self.size()) - .chain(block.columns().iter().map(|entry| { - let column = entry - .value - .convert_to_full_column(&entry.data_type, block.num_rows()); - write_column(&column, &mut self.buf).unwrap(); - self.size() - })) - .map_windows(|x: &[_; 2]| x[1] - x[0]) - .collect(); + fn add_blocks(&mut self, blocks: Vec) { + let layout = if true { + let block = DataBlock::concat(&blocks).unwrap(); + let columns_layout = std::iter::once(self.size()) + .chain(block.columns().iter().map(|entry| { + let column = entry + .value + .convert_to_full_column(&entry.data_type, block.num_rows()); + write_column(&column, &mut self.buf).unwrap(); + self.size() + })) + .map_windows(|x: &[_; 2]| x[1] - x[0]) + .collect::>() + .into_boxed_slice(); + + Layout::ArrowIpc(columns_layout) + } else { + bare_blocks_to_parquet(blocks, &mut self.buf, TableCompression::LZ4).unwrap(); + Layout::Parquet + }; - self.columns_layout.push(columns_layout); + self.columns_layout.push(layout); self.offsets.push(self.size()) } @@ -489,18 +518,23 @@ impl BlocksEncoder { } } -pub fn deserialize_block(columns_layout: &[usize], mut data: Buffer) -> DataBlock { - let columns = columns_layout - .iter() - .map(|&layout| { - let ls = BufList::from_iter(data.slice(0..layout)); - data.advance(layout); - let mut cursor = Cursor::new(ls); - read_column(&mut cursor).unwrap() - }) - .collect::>(); +fn deserialize_block(columns_layout: &Layout, mut data: Buffer) -> DataBlock { + match columns_layout { + Layout::ArrowIpc(layout) => { + let columns = layout + .iter() + .map(|&layout| { + let ls = BufList::from_iter(data.slice(0..layout)); + data.advance(layout); + let mut cursor = Cursor::new(ls); + read_column(&mut cursor).unwrap() + }) + .collect::>(); - DataBlock::new_from_columns(columns) + DataBlock::new_from_columns(columns) + } + Layout::Parquet => bare_blocks_from_parquet(Reader(data)).unwrap(), + } } pub fn record_remote_write_profile(start: &Instant, write_bytes: usize) { diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index 3487846b5ff3..a070c66d4478 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -15,6 +15,7 @@ bytes = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-storages-common-table-meta = { workspace = true } +opendal = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true } diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 522f40af385b..b3189b771c71 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -15,6 +15,7 @@ use std::io::Write; use std::sync::Arc; +use bytes::Buf; use databend_common_exception::Result; use databend_common_expression::converts::arrow::table_schema_to_arrow_schema; use databend_common_expression::infer_table_schema; @@ -23,12 +24,15 @@ use databend_common_expression::DataField; use databend_common_expression::DataSchema; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::table::TableCompression; +use opendal::Buffer; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::ArrowWriter; use parquet::basic::Encoding; +use parquet::errors; use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; use parquet::file::reader::ChunkReader; +use parquet::file::reader::Length; use parquet::format::FileMetaData; /// Serialize data blocks to parquet format. @@ -96,6 +100,36 @@ pub fn bare_blocks_from_parquet(data: R) -> Result u64 { + self.0.len() as u64 + } +} + +impl ChunkReader for Reader { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64) -> errors::Result { + let start = start as usize; + if start > self.0.remaining() { + return Err(errors::ParquetError::IndexOutOfBound( + start, + self.0.remaining(), + )); + } + let mut r = self.0.clone(); + r.advance(start); + Ok(r.reader()) + } + + fn get_bytes(&self, start: u64, length: usize) -> errors::Result { + let start = start as usize; + Ok(self.0.slice(start..start + length).to_bytes()) + } +} + #[cfg(test)] mod tests { use bytes::Bytes; @@ -127,7 +161,9 @@ mod tests { let mut data = Vec::new(); bare_blocks_to_parquet(blocks.clone(), &mut data, TableCompression::LZ4)?; - let got = bare_blocks_from_parquet(Bytes::from(data))?; + let reader = Reader(Buffer::from(Bytes::from(data))); + + let got = bare_blocks_from_parquet(reader)?; let want = DataBlock::concat(&blocks)?; assert_block_value_eq(&want, &got); From 29d0fea2df51b1cc2df675e22e9bec7e73b21dbf Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 15 Oct 2024 14:08:55 +0800 Subject: [PATCH 3/5] refactor Signed-off-by: coldWater --- Cargo.lock | 152 +++----- .../src/pipelines/builders/builder_sort.rs | 3 +- .../transforms/hash_join/hash_join_spiller.rs | 6 +- .../transforms/transform_sort_spill.rs | 7 +- .../transform_window_partition_collect.rs | 3 +- .../partition/window_partition_buffer.rs | 90 ++--- src/query/service/src/spillers/mod.rs | 1 + src/query/service/src/spillers/serialize.rs | 243 ++++++++++++ src/query/service/src/spillers/spiller.rs | 358 +++++++----------- .../service/tests/it/spillers/spiller.rs | 5 +- src/query/settings/src/settings_default.rs | 6 + .../settings/src/settings_getter_setter.rs | 4 + src/query/storages/common/blocks/Cargo.toml | 2 - .../storages/common/blocks/src/parquet_rs.rs | 121 +----- 14 files changed, 503 insertions(+), 498 deletions(-) create mode 100644 src/query/service/src/spillers/serialize.rs diff --git a/Cargo.lock b/Cargo.lock index 75f106828873..1e56348fa1a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1571,7 +1571,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.12.1", "lazy_static", "lazycell", "proc-macro2", @@ -1933,12 +1933,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdd7a427adc0135366d99db65b36dae9237130997e560ed61118041fb72be6e8" -[[package]] -name = "cache-padded" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "981520c98f422fcc584dc1a95c334e6953900b9106bc47a9839b81790009eb21" - [[package]] name = "camino" version = "1.1.7" @@ -3112,7 +3106,7 @@ dependencies = [ "databend-storages-common-table-meta", "limits-rs", "log", - "opendal 0.49.0", + "opendal 0.49.2", "serde", "serde_json", "serfig", @@ -3183,7 +3177,7 @@ dependencies = [ "lz4", "num", "num-traits", - "opendal 0.49.0", + "opendal 0.49.2", "ordered-float 4.2.2", "proptest", "quanta 0.11.1", @@ -3209,7 +3203,7 @@ dependencies = [ "enum-as-inner 0.5.1", "ethnum", "fast-float", - "fastrace 0.7.2", + "fastrace", "goldenfile", "indent", "itertools 0.10.5", @@ -3258,7 +3252,7 @@ dependencies = [ "databend-common-building", "databend-common-exception", "enquote", - "fastrace 0.7.2", + "fastrace", "futures", "libc", "log", @@ -3429,7 +3423,7 @@ dependencies = [ "geos", "geozero 0.13.0", "http 1.1.0", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "paste", "prost 0.12.6", @@ -3701,7 +3695,7 @@ dependencies = [ "databend-common-proto-conv", "databend-common-storage", "enumflags2", - "fastrace 0.7.2", + "fastrace", "futures", "log", "mockall", @@ -3727,7 +3721,7 @@ dependencies = [ "databend-common-meta-stoerr", "databend-common-meta-types", "databend-common-proto-conv", - "fastrace 0.7.2", + "fastrace", "futures", "itertools 0.10.5", "log", @@ -3766,7 +3760,7 @@ dependencies = [ "maplit", "num-derive", "num-traits", - "opendal 0.49.0", + "opendal 0.49.2", "paste", "prost 0.12.6", "serde", @@ -3795,7 +3789,7 @@ dependencies = [ "databend-common-metrics", "databend-common-tracing", "derive_more", - "fastrace 0.7.2", + "fastrace", "futures", "itertools 0.10.5", "log", @@ -3824,7 +3818,7 @@ dependencies = [ "databend-common-meta-stoerr", "databend-common-meta-types", "databend-common-tracing", - "fastrace 0.7.2", + "fastrace", "log", "tempfile", "test-harness", @@ -3837,7 +3831,7 @@ dependencies = [ "anyhow", "async-trait", "databend-common-meta-types", - "fastrace 0.7.2", + "fastrace", "futures-util", "log", "serde", @@ -3884,7 +3878,7 @@ dependencies = [ "databend-common-meta-types", "databend-common-tracing", "derive_more", - "fastrace 0.7.2", + "fastrace", "futures", "futures-async-stream", "futures-util", @@ -3918,7 +3912,7 @@ dependencies = [ "databend-common-meta-stoerr", "databend-common-meta-types", "databend-common-tracing", - "fastrace 0.7.2", + "fastrace", "log", "openraft", "pretty_assertions", @@ -4019,7 +4013,7 @@ dependencies = [ "flate2", "futures", "lz4", - "opendal 0.49.0", + "opendal 0.49.2", "parquet-format-safe", "rand 0.8.5", "seq-macro", @@ -4040,7 +4034,7 @@ dependencies = [ "databend-common-base", "databend-common-exception", "databend-common-expression", - "fastrace 0.7.2", + "fastrace", "futures", "log", "petgraph", @@ -4113,7 +4107,7 @@ dependencies = [ "databend-common-meta-types", "databend-common-protos", "enumflags2", - "fastrace 0.7.2", + "fastrace", "maplit", "num", "pretty_assertions", @@ -4142,7 +4136,7 @@ dependencies = [ "databend-common-ast", "databend-common-exception", "derive-visitor", - "fastrace 0.7.2", + "fastrace", "goldenfile", "tokio", "unindent", @@ -4213,7 +4207,7 @@ dependencies = [ "derive-visitor", "educe 0.4.23", "enum-as-inner 0.5.1", - "fastrace 0.7.2", + "fastrace", "globiter", "indexmap 2.4.0", "itertools 0.10.5", @@ -4221,7 +4215,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "opendal 0.49.0", + "opendal 0.49.2", "parking_lot 0.12.3", "percent-encoding", "prqlc", @@ -4256,7 +4250,7 @@ dependencies = [ "flagset", "futures", "log", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "prometheus-client", "regex", @@ -4283,7 +4277,7 @@ dependencies = [ "databend-common-storages-parquet", "databend-storages-common-table-meta", "deltalake", - "fastrace 0.7.2", + "fastrace", "maplit", "match-template", "object_store_opendal", @@ -4360,14 +4354,14 @@ dependencies = [ "databend-storages-common-session", "databend-storages-common-table-meta", "enum-as-inner 0.5.1", - "fastrace 0.7.2", + "fastrace", "futures", "futures-util", "indexmap 2.4.0", "itertools 0.10.5", "jsonb", "log", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "rand 0.8.5", "serde", @@ -4409,12 +4403,12 @@ dependencies = [ "databend-storages-common-cache", "databend-storages-common-index", "databend-storages-common-table-meta", - "fastrace 0.7.2", + "fastrace", "faststr", "futures", "hive_metastore", "log", - "opendal 0.49.0", + "opendal 0.49.2", "recursive", "serde", "typetag", @@ -4440,7 +4434,7 @@ dependencies = [ "databend-common-pipeline-core", "databend-common-storages-parquet", "databend-storages-common-table-meta", - "fastrace 0.7.2", + "fastrace", "futures", "iceberg", "iceberg-catalog-hms", @@ -4527,7 +4521,7 @@ dependencies = [ "databend-storages-common-table-meta", "futures-util", "log", - "opendal 0.49.0", + "opendal 0.49.2", "orc-rust", "serde", "serde_json", @@ -4563,7 +4557,7 @@ dependencies = [ "ethnum", "futures", "log", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "rand 0.8.5", "serde", @@ -4610,7 +4604,7 @@ dependencies = [ "databend-common-storages-parquet", "databend-storages-common-blocks", "databend-storages-common-table-meta", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "serde", "serde_json", @@ -4651,7 +4645,7 @@ dependencies = [ "enum-as-inner 0.6.0", "futures", "log", - "opendal 0.49.0", + "opendal 0.49.2", "parquet", "serde", "serde_json", @@ -4675,7 +4669,7 @@ dependencies = [ "databend-common-sql", "databend-common-storages-fuse", "databend-storages-common-table-meta", - "fastrace 0.7.2", + "fastrace", "futures", "log", ] @@ -4716,7 +4710,7 @@ dependencies = [ "jsonb", "log", "once_cell", - "opendal 0.49.0", + "opendal 0.49.2", "parking_lot 0.12.3", "regex", "serde", @@ -4748,7 +4742,7 @@ dependencies = [ "databend-common-base", "databend-common-exception", "defer", - "fastrace 0.7.2", + "fastrace", "fastrace-opentelemetry", "itertools 0.10.5", "libc", @@ -4970,7 +4964,7 @@ dependencies = [ "jsonb", "jwt-simple 0.11.9", "log", - "opendal 0.49.0", + "opendal 0.49.2", "tantivy", "tempfile", ] @@ -5060,7 +5054,7 @@ dependencies = [ "deepsize", "derive_more", "env_logger", - "fastrace 0.7.2", + "fastrace", "feature-set", "futures", "futures-async-stream", @@ -5109,7 +5103,7 @@ dependencies = [ "databend-common-meta-types", "databend-common-tracing", "databend-meta", - "fastrace 0.7.2", + "fastrace", "futures", "log", "rand 0.8.5", @@ -5219,7 +5213,7 @@ dependencies = [ "databend-storages-common-table-meta", "derive-visitor", "ethnum", - "fastrace 0.7.2", + "fastrace", "flatbuffers", "futures", "futures-util", @@ -5244,7 +5238,7 @@ dependencies = [ "num", "num_cpus", "once_cell", - "opendal 0.49.0", + "opendal 0.49.2", "opensrv-mysql", "opentelemetry", "opentelemetry_sdk", @@ -5368,11 +5362,9 @@ dependencies = [ name = "databend-storages-common-blocks" version = "0.1.0" dependencies = [ - "bytes", "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", - "opendal 0.49.0", "parking_lot 0.12.3", "parquet", ] @@ -5418,7 +5410,7 @@ dependencies = [ "databend-common-expression", "databend-common-functions", "databend-storages-common-table-meta", - "fastrace 0.7.2", + "fastrace", "jsonb", "levenshtein_automata", "log", @@ -5445,10 +5437,10 @@ dependencies = [ "databend-common-exception", "databend-common-expression", "databend-common-metrics", - "fastrace 0.7.2", + "fastrace", "futures", "log", - "opendal 0.49.0", + "opendal 0.49.2", ] [[package]] @@ -6327,46 +6319,19 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2a2b11eda1d40935b26cf18f6833c526845ae8c41e58d09af6adeb6f0269183" -[[package]] -name = "fastrace" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfd798348670042f0628810b10bc11b767c1f42aafbd3b1ae31f282740c549fe" -dependencies = [ - "fastrace-macro 0.6.8", - "minstant", - "once_cell", - "parking_lot 0.12.3", - "pin-project", - "rand 0.8.5", - "rtrb 0.2.3", -] - [[package]] name = "fastrace" version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25767929385a5128ff5a58c77d5fef43779bb580656801f72d4d32f7a97fea28" dependencies = [ - "fastrace-macro 0.7.2", + "fastrace-macro", "minstant", "once_cell", "parking_lot 0.12.3", "pin-project", "rand 0.8.5", - "rtrb 0.3.1", -] - -[[package]] -name = "fastrace-macro" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eea27de901dc6b14aa952aeefcc41144da4dc97e4db3eb3593c19f3b80406786" -dependencies = [ - "proc-macro-error 1.0.4", - "proc-macro2", - "quote", - "syn 1.0.109", + "rtrb", ] [[package]] @@ -6387,7 +6352,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f33f99bae33fb7b952a933500e77cb9976b7672bb5d9def8e464663dfdfc107" dependencies = [ - "fastrace 0.7.2", + "fastrace", "futures", "log", "opentelemetry", @@ -9741,7 +9706,7 @@ dependencies = [ "anyhow", "colored", "crossbeam-channel", - "fastrace 0.7.2", + "fastrace", "jiff", "log", "opentelemetry", @@ -10757,9 +10722,9 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.46.0" +version = "0.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7e5902fc99e9fb9e32c93f6a67dc5cc0772dc0fb348e2ef4ce258b03666d034" +checksum = "4493c44c90a25391782b52886365d00f6be7d0f75aaecab4365279203438ebec" dependencies = [ "async-trait", "bytes", @@ -10767,7 +10732,7 @@ dependencies = [ "futures", "futures-util", "object_store", - "opendal 0.49.0", + "opendal 0.49.2", "pin-project", "tokio", ] @@ -10803,9 +10768,9 @@ dependencies = [ [[package]] name = "opendal" -version = "0.49.0" +version = "0.49.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494" +checksum = "9b04d09b9822c2f75a1d2fc513a2c1279c70e91e7407936fffdf6a6976ec530a" dependencies = [ "anyhow", "async-backtrace", @@ -10815,7 +10780,7 @@ dependencies = [ "bytes", "chrono", "crc32c", - "fastrace 0.6.8", + "fastrace", "flagset", "futures", "getrandom 0.2.15", @@ -12090,7 +12055,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -12110,7 +12075,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.58", @@ -13170,15 +13135,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "rtrb" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e704dd104faf2326a320140f70f0b736d607c1caa1b1748a6c568a79819109" -dependencies = [ - "cache-padded", -] - [[package]] name = "rtrb" version = "0.3.1" diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index cb039e615312..5c79b234379e 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -283,12 +283,13 @@ impl SortPipelineBuilder { if may_spill { let schema = add_order_field(sort_merge_output_schema.clone(), &self.sort_desc); let config = SpillerConfig { + spiller_type: SpillerType::OrderBy, location_prefix: query_spill_prefix( self.ctx.get_tenant().tenant_name(), &self.ctx.get_id(), ), disk_spill: None, - spiller_type: SpillerType::OrderBy, + use_parquet: settings.get_spilling_use_parquet()?, }; pipeline.add_transform(|input, output| { let op = DataOperator::instance().operator(); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index 876542882644..09d5269592c9 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -68,9 +68,10 @@ impl HashJoinSpiller { SpillerType::HashJoinProbe }; let spill_config = SpillerConfig { + spiller_type, location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), disk_spill: None, - spiller_type, + use_parquet: ctx.get_settings().get_spilling_use_parquet()?, }; let operator = DataOperator::instance().operator(); let spiller = Spiller::create(ctx.clone(), operator, spill_config)?; @@ -145,9 +146,8 @@ impl HashJoinSpiller { .partition_buffer .fetch_data_blocks(partition_id, &fetch_option)? { - let data_block = DataBlock::concat(&data_blocks)?; self.spiller - .spill_with_partition(partition_id, data_block) + .spill_with_partition(partition_id, data_blocks) .await?; } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index f0c6bd97d556..e7cf861a1caa 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -270,7 +270,7 @@ where R: Rows + Sync + Send + 'static async fn spill(&mut self, block: DataBlock) -> Result<()> { debug_assert!(self.num_merge >= 2 && self.batch_rows > 0); - let location = self.spiller.spill(block).await?; + let location = self.spiller.spill(vec![block]).await?; self.unmerged_blocks.push_back(vec![location].into()); Ok(()) @@ -347,7 +347,7 @@ where R: Rows + Sync + Send + 'static let mut spilled = VecDeque::new(); while let Some(block) = merger.async_next_block().await? { - let location = self.spiller.spill(block).await?; + let location = self.spiller.spill(vec![block]).await?; spilled.push_back(location); } @@ -487,9 +487,10 @@ mod tests { ) -> Result>> { let op = DataOperator::instance().operator(); let spill_config = SpillerConfig { + spiller_type: SpillerType::OrderBy, location_prefix: "_spill_test".to_string(), disk_spill: None, - spiller_type: SpillerType::OrderBy, + use_parquet: ctx.get_settings().get_spilling_use_parquet(), }; let spiller = Spiller::create(ctx.clone(), op, spill_config)?; diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index e73dd63f6056..88ad2cfbd767 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -116,9 +116,10 @@ impl TransformWindowPartitionCollect { } let spill_config = SpillerConfig { + spiller_type: SpillerType::Window, location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), disk_spill, - spiller_type: SpillerType::Window, + use_parquet: settings.get_spilling_use_parquet()?, }; // Create an inner `Spiller` to spill data. diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs index d98c0dc04f2f..c31010e561a3 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs @@ -17,9 +17,9 @@ use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_settings::Settings; +use crate::spillers::MergedPartition; use crate::spillers::PartitionBuffer; use crate::spillers::PartitionBufferFetchOption; -use crate::spillers::SpilledData; use crate::spillers::Spiller; /// The `WindowPartitionBuffer` is used to control memory usage of Window operator. @@ -33,7 +33,7 @@ pub struct WindowPartitionBuffer { can_spill: bool, next_to_restore_partition_id: isize, spilled_small_partitions: Vec>, - spilled_merged_partitions: Vec<(SpilledData, bool, bool)>, + spilled_merged_partitions: Vec<(MergedPartition, bool, bool)>, } impl WindowPartitionBuffer { @@ -110,7 +110,7 @@ impl WindowPartitionBuffer { { return self .spiller - .spill_with_partition(partition_id, DataBlock::concat(&data_blocks)?) + .spill_with_partition(partition_id, data_blocks) .await; } } @@ -136,34 +136,20 @@ impl WindowPartitionBuffer { } } - if accumulated_bytes > 0 { - let spilled_data = self - .spiller - .spill_with_merged_partitions(partitions_to_spill) - .await?; - if let SpilledData::MergedPartition { - location, - partitions, - } = spilled_data - { - let index = self.spilled_merged_partitions.len(); - for partition in partitions.iter() { - self.spilled_small_partitions[partition.0].push(index); - } - self.spilled_merged_partitions.push(( - SpilledData::MergedPartition { - location, - partitions, - }, - false, - false, - )); - return Ok(()); - } + if accumulated_bytes == 0 { + self.can_spill = false; + return Ok(()); } - self.can_spill = false; - + let spilled = self + .spiller + .spill_with_merged_partitions(partitions_to_spill) + .await?; + let index = self.spilled_merged_partitions.len(); + for (id, _, _) in &spilled.partitions { + self.spilled_small_partitions[*id].push(index); + } + self.spilled_merged_partitions.push((spilled, false, false)); Ok(()) } @@ -185,35 +171,33 @@ impl WindowPartitionBuffer { if *restored { continue; } - if let SpilledData::MergedPartition { + let MergedPartition { location, partitions, - } = merged_partitions - { - if out_of_memory_limit || *partial_restored { - if let Some(pos) = partitions.iter().position(|p| p.0 == partition_id) { - let data_range = &partitions[pos].1; - let columns_layout = &partitions[pos].2; - let data_block = self - .spiller - .read_range(location, data_range.clone(), columns_layout) - .await?; - self.restored_partition_buffer - .add_data_block(partition_id, data_block); - partitions.remove(pos); - *partial_restored = true; - } - } else { - let partitioned_data = self + } = merged_partitions; + if out_of_memory_limit || *partial_restored { + if let Some(pos) = partitions.iter().position(|p| p.0 == partition_id) { + let data_range = &partitions[pos].1; + let columns_layout = &partitions[pos].2; + let data_block = self .spiller - .read_merged_partitions(merged_partitions) + .read_range(location, data_range.clone(), columns_layout) .await?; - for (partition_id, data_block) in partitioned_data.into_iter() { - self.restored_partition_buffer - .add_data_block(partition_id, data_block); - } - *restored = true; + self.restored_partition_buffer + .add_data_block(partition_id, data_block); + partitions.remove(pos); + *partial_restored = true; + } + } else { + let partitioned_data = self + .spiller + .read_merged_partitions(merged_partitions) + .await?; + for (partition_id, data_block) in partitioned_data.into_iter() { + self.restored_partition_buffer + .add_data_block(partition_id, data_block); } + *restored = true; } } diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index f867e56749f9..b51bb3a765fa 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod partition_buffer; +mod serialize; mod spiller; pub use partition_buffer::PartitionBuffer; diff --git a/src/query/service/src/spillers/serialize.rs b/src/query/service/src/spillers/serialize.rs new file mode 100644 index 000000000000..74c5c79dc172 --- /dev/null +++ b/src/query/service/src/spillers/serialize.rs @@ -0,0 +1,243 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Write; +use std::sync::Arc; + +use buf_list::BufList; +use buf_list::Cursor; +use bytes::Buf; +use databend_common_base::base::Alignment; +use databend_common_base::base::DmaWriteBuf; +use databend_common_exception::Result; +use databend_common_expression::arrow::read_column; +use databend_common_expression::arrow::write_column; +use databend_common_expression::converts::arrow::table_schema_to_arrow_schema; +use databend_common_expression::infer_table_schema; +use databend_common_expression::DataBlock; +use databend_common_expression::DataField; +use databend_common_expression::DataSchema; +use databend_storages_common_table_meta::table::TableCompression; +use opendal::Buffer; +use parquet::arrow::arrow_reader::ParquetRecordBatchReader; +use parquet::arrow::ArrowWriter; +use parquet::basic::Compression; +use parquet::errors; +use parquet::file::properties::EnabledStatistics; +use parquet::file::properties::WriterProperties; +use parquet::file::reader::ChunkReader; +use parquet::file::reader::Length; +use parquet::format::FileMetaData; + +#[derive(Clone)] +pub enum Layout { + ArrowIpc(Box<[usize]>), + Parquet, +} + +pub(super) struct BlocksEncoder { + pub(super) use_parquet: bool, + pub(super) buf: DmaWriteBuf, + pub(super) offsets: Vec, + pub(super) columns_layout: Vec, +} + +impl BlocksEncoder { + pub(super) fn new(use_parquet: bool, align: Alignment, chunk: usize) -> Self { + Self { + use_parquet, + buf: DmaWriteBuf::new(align, chunk), + offsets: vec![0], + columns_layout: Vec::new(), + } + } + + pub(super) fn add_blocks(&mut self, blocks: Vec) { + let layout = if self.use_parquet { + // Currently we splice multiple complete parquet files into one, + // so that the file contains duplicate headers/footers and metadata, + // which can lead to file bloat. A better approach would be for the entire file to be ONE parquet, + // with each group of blocks (i.e., a unit of the upstream read range) corresponding to one or more row groupsx + bare_blocks_to_parquet(blocks, &mut self.buf).unwrap(); + Layout::Parquet + } else { + let block = DataBlock::concat(&blocks).unwrap(); + let columns_layout = std::iter::once(self.size()) + .chain(block.columns().iter().map(|entry| { + let column = entry + .value + .convert_to_full_column(&entry.data_type, block.num_rows()); + write_column(&column, &mut self.buf).unwrap(); + self.size() + })) + .map_windows(|x: &[_; 2]| x[1] - x[0]) + .collect::>() + .into_boxed_slice(); + + Layout::ArrowIpc(columns_layout) + }; + + self.columns_layout.push(layout); + self.offsets.push(self.size()) + } + + pub(super) fn size(&self) -> usize { + self.buf.size() + } +} + +pub(super) fn deserialize_block(columns_layout: &Layout, mut data: Buffer) -> DataBlock { + match columns_layout { + Layout::ArrowIpc(layout) => { + let columns = layout + .iter() + .map(|&layout| { + let ls = BufList::from_iter(data.slice(0..layout)); + data.advance(layout); + let mut cursor = Cursor::new(ls); + read_column(&mut cursor).unwrap() + }) + .collect::>(); + + DataBlock::new_from_columns(columns) + } + Layout::Parquet => bare_blocks_from_parquet(Reader(data)).unwrap(), + } +} + +fn fake_data_schema(block: &DataBlock) -> DataSchema { + let fields = block + .columns() + .iter() + .enumerate() + .map(|(idx, arg)| DataField::new(&format!("arg{}", idx + 1), arg.data_type.clone())) + .collect::>(); + DataSchema::new(fields) +} + +/// Deserialize bare data block from parquet format. +fn bare_blocks_from_parquet(data: R) -> Result { + let reader = ParquetRecordBatchReader::try_new(data, usize::MAX)?; + let mut blocks = Vec::new(); + for record_batch in reader { + let record_batch = record_batch?; + let schema = DataSchema::try_from(record_batch.schema().as_ref())?; + let (block, _) = DataBlock::from_record_batch(&schema, &record_batch)?; + blocks.push(block); + } + DataBlock::concat(&blocks) +} + +/// Serialize bare data blocks to parquet format. +fn bare_blocks_to_parquet( + blocks: Vec, + write_buffer: W, +) -> Result { + assert!(!blocks.is_empty()); + + let data_schema = fake_data_schema(blocks.first().unwrap()); + let table_schema = infer_table_schema(&data_schema)?; + + let props = WriterProperties::builder() + .set_compression(Compression::LZ4_RAW) + .set_data_page_size_limit(4 * 1024 * 1024) + .set_write_batch_size(usize::MAX) + .set_max_row_group_size(usize::MAX) + .set_statistics_enabled(EnabledStatistics::None) + .set_bloom_filter_enabled(false) + .build(); + let batches = blocks + .into_iter() + .map(|block| block.to_record_batch(&table_schema)) + .collect::>>()?; + let arrow_schema = Arc::new(table_schema_to_arrow_schema(&table_schema)); + let mut writer = ArrowWriter::try_new(write_buffer, arrow_schema, Some(props))?; + for batch in batches { + writer.write(&batch)?; + } + let file_meta = writer.close()?; + Ok(file_meta) +} + +pub struct Reader(pub Buffer); + +impl Length for Reader { + fn len(&self) -> u64 { + self.0.len() as u64 + } +} + +impl ChunkReader for Reader { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64) -> errors::Result { + let start = start as usize; + if start > self.0.remaining() { + return Err(errors::ParquetError::IndexOutOfBound( + start, + self.0.remaining(), + )); + } + let mut r = self.0.clone(); + r.advance(start); + Ok(r.reader()) + } + + fn get_bytes(&self, start: u64, length: usize) -> errors::Result { + let start = start as usize; + Ok(self.0.slice(start..start + length).to_bytes()) + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use databend_common_expression::block_debug::assert_block_value_eq; + use databend_common_expression::types::Int64Type; + use databend_common_expression::types::StringType; + use databend_common_expression::FromData; + + use super::*; + + #[test] + fn test_serde_bin_column() -> Result<()> { + let blocks = vec![ + [ + StringType::from_data(vec!["SM CASE", "a"]), + StringType::from_data(vec!["SM CASE", "axx"]), + Int64Type::from_data(vec![1, 3]), + ], + [ + StringType::from_data(vec!["b", "e", "f", "g"]), + StringType::from_data(vec!["", "", "", "x"]), + Int64Type::from_data(vec![99, 7, 3, 4]), + ], + ] + .into_iter() + .map(|columns| DataBlock::new_from_columns(columns.to_vec())) + .collect::>(); + + let mut data = Vec::new(); + bare_blocks_to_parquet(blocks.clone(), &mut data)?; + + let reader = Reader(Buffer::from(Bytes::from(data))); + + let got = bare_blocks_from_parquet(reader)?; + let want = DataBlock::concat(&blocks)?; + + assert_block_value_eq(&want, &got); + + Ok(()) + } +} diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index eb392c3472ed..19b28bd1914e 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -20,9 +20,6 @@ use std::ops::Range; use std::sync::Arc; use std::time::Instant; -use buf_list::BufList; -use buf_list::Cursor; -use bytes::Buf; use bytes::Bytes; use databend_common_base::base::dma_buffer_as_vec; use databend_common_base::base::dma_read_file_range; @@ -34,18 +31,13 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; -use databend_common_expression::arrow::read_column; -use databend_common_expression::arrow::write_column; use databend_common_expression::DataBlock; -use databend_storages_common_blocks::bare_blocks_from_parquet; -use databend_storages_common_blocks::bare_blocks_to_parquet; -use databend_storages_common_blocks::Reader; use databend_storages_common_cache::TempDir; use databend_storages_common_cache::TempPath; -use databend_storages_common_table_meta::table::TableCompression; use opendal::Buffer; use opendal::Operator; +use super::serialize::*; use crate::sessions::QueryContext; /// Spiller type, currently only supports HashJoin @@ -73,9 +65,10 @@ impl Display for SpillerType { /// Spiller configuration #[derive(Clone)] pub struct SpillerConfig { + pub spiller_type: SpillerType, pub location_prefix: String, pub disk_spill: Option, - pub spiller_type: SpillerType, + pub use_parquet: bool, } #[derive(Clone)] @@ -98,6 +91,7 @@ pub struct Spiller { temp_dir: Option>, // for dio disabled local_operator: Option, + use_parquet: bool, _spiller_type: SpillerType, pub join_spilling_partition_bits: usize, /// 1 partition -> N partition files @@ -108,12 +102,6 @@ pub struct Spiller { pub partition_spilled_bytes: HashMap, } -#[derive(Clone)] -pub enum Layout { - ArrowIpc(Box<[usize]>), - Parquet, -} - impl Spiller { /// Create a new spiller pub fn create( @@ -126,6 +114,7 @@ impl Spiller { location_prefix, disk_spill, spiller_type, + use_parquet, } = config; let (temp_dir, local_operator) = match disk_spill { @@ -142,6 +131,7 @@ impl Spiller { location_prefix, temp_dir, local_operator, + use_parquet, _spiller_type: spiller_type, join_spilling_partition_bits: settings.get_join_spilling_partition_bits()?, partition_location: Default::default(), @@ -155,12 +145,13 @@ impl Spiller { } /// Spill a [`DataBlock`] to storage. - pub async fn spill(&mut self, data_block: DataBlock) -> Result { + pub async fn spill(&mut self, data_block: Vec) -> Result { + debug_assert!(!data_block.is_empty()); let instant = Instant::now(); // Spill data to storage. let mut encoder = self.block_encoder(); - encoder.add_blocks(vec![data_block]); + encoder.add_blocks(data_block); let data_size = encoder.size(); let BlocksEncoder { buf, @@ -171,10 +162,7 @@ impl Spiller { let location = self.write_encodes(data_size, buf).await?; // Record statistics. - match location { - Location::Remote(_) => record_remote_write_profile(&instant, data_size), - Location::Local(_) => record_local_write_profile(&instant, data_size), - } + record_write_profile(&location, &instant, data_size); // Record columns layout for spilled data. self.columns_layout @@ -188,19 +176,25 @@ impl Spiller { pub async fn spill_with_partition( &mut self, partition_id: usize, - data: DataBlock, + data: Vec, ) -> Result<()> { + let (num_rows, memory_size) = data + .iter() + .map(|b| (b.num_rows(), b.memory_size())) + .reduce(|acc, x| (acc.0 + x.0, acc.1 + x.1)) + .unwrap(); + let progress_val = ProgressValues { - rows: data.num_rows(), - bytes: data.memory_size(), + rows: num_rows, + bytes: memory_size, }; self.partition_spilled_bytes .entry(partition_id) .and_modify(|bytes| { - *bytes += data.memory_size() as u64; + *bytes += memory_size as u64; }) - .or_insert(data.memory_size() as u64); + .or_insert(memory_size as u64); let location = self.spill(data).await?; self.partition_location @@ -217,7 +211,7 @@ impl Spiller { pub async fn spill_with_merged_partitions( &mut self, partitioned_data: Vec<(usize, Vec)>, - ) -> Result { + ) -> Result { // Serialize data block. let mut encoder = self.block_encoder(); let mut partition_ids = Vec::new(); @@ -250,12 +244,9 @@ impl Spiller { let location = self.write_encodes(write_bytes, buf).await?; // Record statistics. - match location { - Location::Remote(_) => record_remote_write_profile(&instant, write_bytes), - Location::Local(_) => record_local_write_profile(&instant, write_bytes), - } + record_write_profile(&location, &instant, write_bytes); - Ok(SpilledData::MergedPartition { + Ok(MergedPartition { location, partitions, }) @@ -268,39 +259,34 @@ impl Spiller { // Read spilled data from storage. let instant = Instant::now(); - let data = match (location, &self.local_operator) { - (Location::Local(path), None) => { + let data = match location { + Location::Local(path) => { match columns_layout { Layout::ArrowIpc(layout) => { debug_assert_eq!(path.size(), layout.iter().sum::()) } Layout::Parquet => {} } - let file_size = path.size(); - let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - Buffer::from(dma_buffer_as_vec(buf)).slice(range) - } - (Location::Local(path), Some(ref local)) => { - match columns_layout { - Layout::ArrowIpc(layout) => { - debug_assert_eq!(path.size(), layout.iter().sum::()) + + match self.local_operator { + Some(ref local) => { + local + .read(path.file_name().unwrap().to_str().unwrap()) + .await? + } + None => { + let file_size = path.size(); + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) } - Layout::Parquet => {} } - local - .read(path.file_name().unwrap().to_str().unwrap()) - .await? } - (Location::Remote(loc), _) => self.operator.read(loc).await?, + Location::Remote(loc) => self.operator.read(loc).await?, }; - match location { - Location::Remote(_) => record_remote_read_profile(&instant, data.len()), - Location::Local(_) => record_local_read_profile(&instant, data.len()), - } + record_read_profile(location, &instant, data.len()); - let block = deserialize_block(columns_layout, data); - Ok(block) + Ok(deserialize_block(columns_layout, data)) } #[async_backtrace::framed] @@ -323,57 +309,50 @@ impl Spiller { pub async fn read_merged_partitions( &self, - merged_partitions: &SpilledData, - ) -> Result> { - if let SpilledData::MergedPartition { + MergedPartition { location, partitions, - } = merged_partitions - { - // Read spilled data from storage. - let instant = Instant::now(); - - let data = match (location, &self.local_operator) { - (Location::Local(path), None) => { - let file_size = path.size(); - debug_assert_eq!( - file_size, - if let Some((_, range, _)) = partitions.last() { - range.end - } else { - 0 - } - ); - - let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - Buffer::from(dma_buffer_as_vec(buf)).slice(range) - } - (Location::Local(path), Some(ref local)) => { - local - .read(path.file_name().unwrap().to_str().unwrap()) - .await? - } - (Location::Remote(loc), _) => self.operator.read(loc).await?, - }; - - // Record statistics. - match location { - Location::Remote(_) => record_remote_read_profile(&instant, data.len()), - Location::Local(_) => record_local_read_profile(&instant, data.len()), - }; - - // Deserialize partitioned data block. - let partitioned_data = partitions - .iter() - .map(|(partition_id, range, columns_layout)| { - let block = deserialize_block(columns_layout, data.slice(range.clone())); - (*partition_id, block) - }) - .collect(); - - return Ok(partitioned_data); - } - Ok(vec![]) + }: &MergedPartition, + ) -> Result> { + // Read spilled data from storage. + let instant = Instant::now(); + + let data = match (location, &self.local_operator) { + (Location::Local(path), None) => { + let file_size = path.size(); + debug_assert_eq!( + file_size, + if let Some((_, range, _)) = partitions.last() { + range.end + } else { + 0 + } + ); + + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) + } + (Location::Local(path), Some(ref local)) => { + local + .read(path.file_name().unwrap().to_str().unwrap()) + .await? + } + (Location::Remote(loc), _) => self.operator.read(loc).await?, + }; + + // Record statistics. + record_read_profile(location, &instant, data.len()); + + // Deserialize partitioned data block. + let partitioned_data = partitions + .iter() + .map(|(partition_id, range, columns_layout)| { + let block = deserialize_block(columns_layout, data.slice(range.clone())); + (*partition_id, block) + }) + .collect(); + + Ok(partitioned_data) } pub async fn read_range( @@ -386,24 +365,24 @@ impl Spiller { let instant = Instant::now(); let data_range = data_range.start as u64..data_range.end as u64; - let data = match (location, &self.local_operator) { - (Location::Local(path), None) => { - let (buf, range) = dma_read_file_range(path, data_range).await?; - Buffer::from(dma_buffer_as_vec(buf)).slice(range) - } - (Location::Local(path), Some(ref local)) => { - local - .read_with(path.file_name().unwrap().to_str().unwrap()) - .range(data_range) - .await? - } - (Location::Remote(loc), _) => self.operator.read_with(loc).range(data_range).await?, + let data = match location { + Location::Local(path) => match &self.local_operator { + Some(ref local) => { + local + .read_with(path.file_name().unwrap().to_str().unwrap()) + .range(data_range) + .await? + } + None => { + let (buf, range) = dma_read_file_range(path, data_range).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) + } + }, + Location::Remote(loc) => self.operator.read_with(loc).range(data_range).await?, }; - match location { - Location::Remote(_) => record_remote_read_profile(&instant, data.len()), - Location::Local(_) => record_local_read_profile(&instant, data.len()), - } + record_read_profile(location, &instant, data.len()); + Ok(deserialize_block(columns_layout, data)) } @@ -451,7 +430,7 @@ impl Spiller { .as_ref() .map(|dir| dir.block_alignment()) .unwrap_or(Alignment::MIN); - BlocksEncoder::new(align, 8 * 1024 * 1024) + BlocksEncoder::new(self.use_parquet, align, 8 * 1024 * 1024) } pub(crate) fn spilled_files(&self) -> Vec { @@ -461,10 +440,12 @@ impl Spiller { pub enum SpilledData { Partition(Location), - MergedPartition { - location: Location, - partitions: Vec<(usize, Range, Layout)>, - }, + MergedPartition(MergedPartition), +} + +pub struct MergedPartition { + pub location: Location, + pub partitions: Vec<(usize, Range, Layout)>, } #[derive(Debug, Clone, Hash, PartialEq, Eq)] @@ -473,102 +454,47 @@ pub enum Location { Local(TempPath), } -struct BlocksEncoder { - buf: DmaWriteBuf, - offsets: Vec, - columns_layout: Vec, -} - -impl BlocksEncoder { - fn new(align: Alignment, chunk: usize) -> Self { - Self { - buf: DmaWriteBuf::new(align, chunk), - offsets: vec![0], - columns_layout: Vec::new(), +fn record_write_profile(location: &Location, start: &Instant, write_bytes: usize) { + match location { + Location::Remote(_) => { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, + start.elapsed().as_millis() as usize, + ); + } + Location::Local(_) => { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteBytes, write_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillWriteTime, + start.elapsed().as_millis() as usize, + ); } - } - - fn add_blocks(&mut self, blocks: Vec) { - let layout = if true { - let block = DataBlock::concat(&blocks).unwrap(); - let columns_layout = std::iter::once(self.size()) - .chain(block.columns().iter().map(|entry| { - let column = entry - .value - .convert_to_full_column(&entry.data_type, block.num_rows()); - write_column(&column, &mut self.buf).unwrap(); - self.size() - })) - .map_windows(|x: &[_; 2]| x[1] - x[0]) - .collect::>() - .into_boxed_slice(); - - Layout::ArrowIpc(columns_layout) - } else { - bare_blocks_to_parquet(blocks, &mut self.buf, TableCompression::LZ4).unwrap(); - Layout::Parquet - }; - - self.columns_layout.push(layout); - self.offsets.push(self.size()) - } - - fn size(&self) -> usize { - self.buf.size() } } -fn deserialize_block(columns_layout: &Layout, mut data: Buffer) -> DataBlock { - match columns_layout { - Layout::ArrowIpc(layout) => { - let columns = layout - .iter() - .map(|&layout| { - let ls = BufList::from_iter(data.slice(0..layout)); - data.advance(layout); - let mut cursor = Cursor::new(ls); - read_column(&mut cursor).unwrap() - }) - .collect::>(); - - DataBlock::new_from_columns(columns) +fn record_read_profile(location: &Location, start: &Instant, read_bytes: usize) { + match location { + Location::Remote(_) => { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillReadTime, + start.elapsed().as_millis() as usize, + ); + } + Location::Local(_) => { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillReadTime, + start.elapsed().as_millis() as usize, + ); } - Layout::Parquet => bare_blocks_from_parquet(Reader(data)).unwrap(), } } - -pub fn record_remote_write_profile(start: &Instant, write_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteBytes, write_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillWriteTime, - start.elapsed().as_millis() as usize, - ); -} - -pub fn record_remote_read_profile(start: &Instant, read_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillReadTime, - start.elapsed().as_millis() as usize, - ); -} - -pub fn record_local_write_profile(start: &Instant, write_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteBytes, write_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::LocalSpillWriteTime, - start.elapsed().as_millis() as usize, - ); -} - -pub fn record_local_read_profile(start: &Instant, read_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::LocalSpillReadTime, - start.elapsed().as_millis() as usize, - ); -} diff --git a/src/query/service/tests/it/spillers/spiller.rs b/src/query/service/tests/it/spillers/spiller.rs index ad9779a7d615..b8fbcc224a6b 100644 --- a/src/query/service/tests/it/spillers/spiller.rs +++ b/src/query/service/tests/it/spillers/spiller.rs @@ -39,9 +39,10 @@ async fn test_spill_with_partition() -> Result<()> { let ctx = fixture.new_query_ctx().await?; let tenant = ctx.get_tenant(); let spiller_config = SpillerConfig { + spiller_type: SpillerType::HashJoinBuild, location_prefix: query_spill_prefix(tenant.tenant_name(), &ctx.get_id()), disk_spill: None, - spiller_type: SpillerType::HashJoinBuild, + use_parquet: ctx.get_settings().get_spilling_use_parquet(), }; let operator = DataOperator::instance().operator(); @@ -53,7 +54,7 @@ async fn test_spill_with_partition() -> Result<()> { Int32Type::from_data((1..101).collect::>()), ]); - let res = spiller.spill_with_partition(0, data).await; + let res = spiller.spill_with_partition(0, vec![data]).await; assert!(res.is_ok()); let location = &spiller.partition_location.get(&0).unwrap()[0]; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 3600ee150d5a..f67275c27fa5 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -309,6 +309,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("spilling_use_parquet", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Set whether to use Parquet or Arrow IPC for spilling.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("spilling_to_disk_vacuum_unknown_temp_dirs_limit", DefaultSettingValue { value: UserSettingValue::UInt64(u64::MAX), desc: "Set the maximum number of directories to clean up. If there are some temporary dirs when another query is unexpectedly interrupted, which needs to be cleaned up after this query.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index cb45b3d00f34..0bf4e4489efb 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -290,6 +290,10 @@ impl Settings { Ok(self.try_get_u64("join_spilling_buffer_threshold_per_proc_mb")? as usize) } + pub fn get_spilling_use_parquet(&self) -> Result { + Ok(self.try_get_u64("spilling_use_parquet")? != 0) + } + pub fn get_spilling_to_disk_vacuum_unknown_temp_dirs_limit(&self) -> Result { Ok(self.try_get_u64("spilling_to_disk_vacuum_unknown_temp_dirs_limit")? as usize) } diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index a070c66d4478..a713100df441 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -11,11 +11,9 @@ doctest = false test = true [dependencies] -bytes = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-storages-common-table-meta = { workspace = true } -opendal = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true } diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index b3189b771c71..c002896a74df 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -12,34 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Write; use std::sync::Arc; -use bytes::Buf; use databend_common_exception::Result; use databend_common_expression::converts::arrow::table_schema_to_arrow_schema; -use databend_common_expression::infer_table_schema; use databend_common_expression::DataBlock; -use databend_common_expression::DataField; -use databend_common_expression::DataSchema; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::table::TableCompression; -use opendal::Buffer; -use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::ArrowWriter; use parquet::basic::Encoding; -use parquet::errors; use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; -use parquet::file::reader::ChunkReader; -use parquet::file::reader::Length; use parquet::format::FileMetaData; /// Serialize data blocks to parquet format. -pub fn blocks_to_parquet( +pub fn blocks_to_parquet( table_schema: &TableSchema, blocks: Vec, - write_buffer: W, + write_buffer: &mut Vec, compression: TableCompression, ) -> Result { assert!(!blocks.is_empty()); @@ -64,110 +54,3 @@ pub fn blocks_to_parquet( let file_meta = writer.close()?; Ok(file_meta) } - -/// Serialize bare data blocks to parquet format. -pub fn bare_blocks_to_parquet( - blocks: Vec, - write_buffer: W, - compression: TableCompression, -) -> Result { - let data_schema = fake_data_schema(blocks.first().unwrap()); - let table_schema = infer_table_schema(&data_schema)?; - - blocks_to_parquet(&table_schema, blocks, write_buffer, compression) -} - -fn fake_data_schema(block: &DataBlock) -> DataSchema { - let fields = block - .columns() - .iter() - .enumerate() - .map(|(idx, arg)| DataField::new(&format!("arg{}", idx + 1), arg.data_type.clone())) - .collect::>(); - DataSchema::new(fields) -} - -/// Deserialize bare data block from parquet format. -pub fn bare_blocks_from_parquet(data: R) -> Result { - let reader = ParquetRecordBatchReader::try_new(data, usize::MAX)?; - let mut blocks = Vec::with_capacity(1); - for record_batch in reader { - let record_batch = record_batch?; - let schema = DataSchema::try_from(record_batch.schema().as_ref())?; - let (block, _) = DataBlock::from_record_batch(&schema, &record_batch)?; - blocks.push(block); - } - DataBlock::concat(&blocks) -} - -pub struct Reader(pub Buffer); - -impl Length for Reader { - fn len(&self) -> u64 { - self.0.len() as u64 - } -} - -impl ChunkReader for Reader { - type T = bytes::buf::Reader; - - fn get_read(&self, start: u64) -> errors::Result { - let start = start as usize; - if start > self.0.remaining() { - return Err(errors::ParquetError::IndexOutOfBound( - start, - self.0.remaining(), - )); - } - let mut r = self.0.clone(); - r.advance(start); - Ok(r.reader()) - } - - fn get_bytes(&self, start: u64, length: usize) -> errors::Result { - let start = start as usize; - Ok(self.0.slice(start..start + length).to_bytes()) - } -} - -#[cfg(test)] -mod tests { - use bytes::Bytes; - use databend_common_expression::block_debug::assert_block_value_eq; - use databend_common_expression::types::Int64Type; - use databend_common_expression::types::StringType; - use databend_common_expression::FromData; - - use super::*; - - #[test] - fn test_serde_bin_column() -> Result<()> { - let blocks = vec![ - [ - StringType::from_data(vec!["SM CASE", "a"]), - StringType::from_data(vec!["SM CASE", "axx"]), - Int64Type::from_data(vec![1, 3]), - ], - [ - StringType::from_data(vec!["b", "e", "f", "g"]), - StringType::from_data(vec!["", "", "", "x"]), - Int64Type::from_data(vec![99, 7, 3, 4]), - ], - ] - .into_iter() - .map(|columns| DataBlock::new_from_columns(columns.to_vec())) - .collect::>(); - - let mut data = Vec::new(); - bare_blocks_to_parquet(blocks.clone(), &mut data, TableCompression::LZ4)?; - - let reader = Reader(Buffer::from(Bytes::from(data))); - - let got = bare_blocks_from_parquet(reader)?; - let want = DataBlock::concat(&blocks)?; - - assert_block_value_eq(&want, &got); - - Ok(()) - } -} From 5b9c532c1059696936bccb517b5f2633b20a190d Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 15 Oct 2024 21:53:48 +0800 Subject: [PATCH 4/5] fix Signed-off-by: coldWater --- src/query/ast/src/parser/parser.rs | 2 ++ .../pipelines/processors/transforms/transform_sort_spill.rs | 3 ++- src/query/service/src/spillers/serialize.rs | 4 ---- src/query/service/tests/it/spillers/spiller.rs | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/query/ast/src/parser/parser.rs b/src/query/ast/src/parser/parser.rs index 4959ad25405d..6335c012a632 100644 --- a/src/query/ast/src/parser/parser.rs +++ b/src/query/ast/src/parser/parser.rs @@ -145,6 +145,7 @@ pub fn run_parser( } /// Check that the statement can be displayed and reparsed without loss +#[allow(dead_code)] fn assert_reparse(sql: &str, stmt: StatementWithFormat) { let stmt = reset_ast(stmt); let new_sql = stmt.to_string(); @@ -162,6 +163,7 @@ fn assert_reparse(sql: &str, stmt: StatementWithFormat) { assert_eq!(stmt, new_stmt, "\nleft:\n{}\nright:\n{}", sql, new_sql); } +#[allow(dead_code)] fn reset_ast(mut stmt: StatementWithFormat) -> StatementWithFormat { #[derive(VisitorMut)] #[visitor(Range(enter), Literal(enter), ExplainKind(enter), SelectTarget(enter))] diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index e7cf861a1caa..3d3c4b6f318b 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -456,6 +456,7 @@ mod tests { use std::sync::Arc; use databend_common_base::base::tokio; + use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::block_debug::pretty_format_blocks; use databend_common_expression::types::DataType; @@ -490,7 +491,7 @@ mod tests { spiller_type: SpillerType::OrderBy, location_prefix: "_spill_test".to_string(), disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_use_parquet(), + use_parquet: ctx.get_settings().get_spilling_use_parquet()?, }; let spiller = Spiller::create(ctx.clone(), op, spill_config)?; diff --git a/src/query/service/src/spillers/serialize.rs b/src/query/service/src/spillers/serialize.rs index 74c5c79dc172..df88ac0e2dd4 100644 --- a/src/query/service/src/spillers/serialize.rs +++ b/src/query/service/src/spillers/serialize.rs @@ -28,7 +28,6 @@ use databend_common_expression::infer_table_schema; use databend_common_expression::DataBlock; use databend_common_expression::DataField; use databend_common_expression::DataSchema; -use databend_storages_common_table_meta::table::TableCompression; use opendal::Buffer; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::ArrowWriter; @@ -151,9 +150,6 @@ fn bare_blocks_to_parquet( let props = WriterProperties::builder() .set_compression(Compression::LZ4_RAW) - .set_data_page_size_limit(4 * 1024 * 1024) - .set_write_batch_size(usize::MAX) - .set_max_row_group_size(usize::MAX) .set_statistics_enabled(EnabledStatistics::None) .set_bloom_filter_enabled(false) .build(); diff --git a/src/query/service/tests/it/spillers/spiller.rs b/src/query/service/tests/it/spillers/spiller.rs index b8fbcc224a6b..958433bf3735 100644 --- a/src/query/service/tests/it/spillers/spiller.rs +++ b/src/query/service/tests/it/spillers/spiller.rs @@ -42,7 +42,7 @@ async fn test_spill_with_partition() -> Result<()> { spiller_type: SpillerType::HashJoinBuild, location_prefix: query_spill_prefix(tenant.tenant_name(), &ctx.get_id()), disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_use_parquet(), + use_parquet: ctx.get_settings().get_spilling_use_parquet()?, }; let operator = DataOperator::instance().operator(); From ffba39b43f6cc89f4fb8e943b629b161373759be Mon Sep 17 00:00:00 2001 From: coldWater Date: Wed, 16 Oct 2024 11:54:47 +0800 Subject: [PATCH 5/5] rename to spilling_file_format Signed-off-by: coldWater --- .../src/pipelines/builders/builder_sort.rs | 2 +- .../transforms/hash_join/hash_join_spiller.rs | 2 +- .../transforms/transform_sort_spill.rs | 2 +- .../transform_window_partition_collect.rs | 2 +- .../partition/window_partition_buffer.rs | 8 ++-- src/query/service/src/spillers/serialize.rs | 2 +- src/query/service/src/spillers/spiller.rs | 32 +++++++-------- .../service/tests/it/spillers/spiller.rs | 2 +- src/query/settings/src/settings_default.rs | 10 +++-- .../settings/src/settings_getter_setter.rs | 41 ++++++++++++++++++- 10 files changed, 68 insertions(+), 35 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index 5c79b234379e..1312f0ccd15e 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -289,7 +289,7 @@ impl SortPipelineBuilder { &self.ctx.get_id(), ), disk_spill: None, - use_parquet: settings.get_spilling_use_parquet()?, + use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; pipeline.add_transform(|input, output| { let op = DataOperator::instance().operator(); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index 09d5269592c9..23f0472489ed 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -71,7 +71,7 @@ impl HashJoinSpiller { spiller_type, location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_use_parquet()?, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; let operator = DataOperator::instance().operator(); let spiller = Spiller::create(ctx.clone(), operator, spill_config)?; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index 3d3c4b6f318b..a1aa3e9a9ecf 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -491,7 +491,7 @@ mod tests { spiller_type: SpillerType::OrderBy, location_prefix: "_spill_test".to_string(), disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_use_parquet()?, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; let spiller = Spiller::create(ctx.clone(), op, spill_config)?; diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index 88ad2cfbd767..746735aa9743 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -119,7 +119,7 @@ impl TransformWindowPartitionCollect { spiller_type: SpillerType::Window, location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), disk_spill, - use_parquet: settings.get_spilling_use_parquet()?, + use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; // Create an inner `Spiller` to spill data. diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs index c31010e561a3..6c4f9a2a8984 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs @@ -146,7 +146,7 @@ impl WindowPartitionBuffer { .spill_with_merged_partitions(partitions_to_spill) .await?; let index = self.spilled_merged_partitions.len(); - for (id, _, _) in &spilled.partitions { + for (id, _) in &spilled.partitions { self.spilled_small_partitions[*id].push(index); } self.spilled_merged_partitions.push((spilled, false, false)); @@ -176,12 +176,10 @@ impl WindowPartitionBuffer { partitions, } = merged_partitions; if out_of_memory_limit || *partial_restored { - if let Some(pos) = partitions.iter().position(|p| p.0 == partition_id) { - let data_range = &partitions[pos].1; - let columns_layout = &partitions[pos].2; + if let Some(pos) = partitions.iter().position(|(id, _)| *id == partition_id) { let data_block = self .spiller - .read_range(location, data_range.clone(), columns_layout) + .read_chunk(location, &partitions[pos].1) .await?; self.restored_partition_buffer .add_data_block(partition_id, data_block); diff --git a/src/query/service/src/spillers/serialize.rs b/src/query/service/src/spillers/serialize.rs index df88ac0e2dd4..ee88b2c23e7d 100644 --- a/src/query/service/src/spillers/serialize.rs +++ b/src/query/service/src/spillers/serialize.rs @@ -67,7 +67,7 @@ impl BlocksEncoder { // Currently we splice multiple complete parquet files into one, // so that the file contains duplicate headers/footers and metadata, // which can lead to file bloat. A better approach would be for the entire file to be ONE parquet, - // with each group of blocks (i.e., a unit of the upstream read range) corresponding to one or more row groupsx + // with each group of blocks (i.e. Chunk) corresponding to one or more row groupsx bare_blocks_to_parquet(blocks, &mut self.buf).unwrap(); Layout::Parquet } else { diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 19b28bd1914e..6246d86a2ffc 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -236,7 +236,7 @@ impl Spiller { .map(|x| x[0]..x[1]) .zip(columns_layout.into_iter()), ) - .map(|(id, (range, layout))| (id, range, layout)) + .map(|(id, (range, layout))| (id, Chunk { range, layout })) .collect(); // Spill data to storage. @@ -322,7 +322,7 @@ impl Spiller { let file_size = path.size(); debug_assert_eq!( file_size, - if let Some((_, range, _)) = partitions.last() { + if let Some((_, Chunk { range, .. })) = partitions.last() { range.end } else { 0 @@ -346,8 +346,8 @@ impl Spiller { // Deserialize partitioned data block. let partitioned_data = partitions .iter() - .map(|(partition_id, range, columns_layout)| { - let block = deserialize_block(columns_layout, data.slice(range.clone())); + .map(|(partition_id, Chunk { range, layout })| { + let block = deserialize_block(layout, data.slice(range.clone())); (*partition_id, block) }) .collect(); @@ -355,15 +355,11 @@ impl Spiller { Ok(partitioned_data) } - pub async fn read_range( - &self, - location: &Location, - data_range: Range, - columns_layout: &Layout, - ) -> Result { + pub async fn read_chunk(&self, location: &Location, chunk: &Chunk) -> Result { // Read spilled data from storage. let instant = Instant::now(); - let data_range = data_range.start as u64..data_range.end as u64; + let Chunk { range, layout } = chunk; + let data_range = range.start as u64..range.end as u64; let data = match location { Location::Local(path) => match &self.local_operator { @@ -383,7 +379,7 @@ impl Spiller { record_read_profile(location, &instant, data.len()); - Ok(deserialize_block(columns_layout, data)) + Ok(deserialize_block(layout, data)) } async fn write_encodes(&mut self, size: usize, buf: DmaWriteBuf) -> Result { @@ -438,14 +434,14 @@ impl Spiller { } } -pub enum SpilledData { - Partition(Location), - MergedPartition(MergedPartition), -} - pub struct MergedPartition { pub location: Location, - pub partitions: Vec<(usize, Range, Layout)>, + pub partitions: Vec<(usize, Chunk)>, +} + +pub struct Chunk { + pub range: Range, + pub layout: Layout, } #[derive(Debug, Clone, Hash, PartialEq, Eq)] diff --git a/src/query/service/tests/it/spillers/spiller.rs b/src/query/service/tests/it/spillers/spiller.rs index 958433bf3735..52319f6a667b 100644 --- a/src/query/service/tests/it/spillers/spiller.rs +++ b/src/query/service/tests/it/spillers/spiller.rs @@ -42,7 +42,7 @@ async fn test_spill_with_partition() -> Result<()> { spiller_type: SpillerType::HashJoinBuild, location_prefix: query_spill_prefix(tenant.tenant_name(), &ctx.get_id()), disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_use_parquet()?, + use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), }; let operator = DataOperator::instance().operator(); diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index f67275c27fa5..260f72e66e38 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -23,6 +23,8 @@ use databend_common_exception::Result; use databend_common_meta_app::principal::UserSettingValue; use once_cell::sync::OnceCell; +use super::settings_getter_setter::SpillFileFormat; + static DEFAULT_SETTINGS: OnceCell> = OnceCell::new(); // Default value of cost factor settings @@ -309,11 +311,11 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), - ("spilling_use_parquet", DefaultSettingValue { - value: UserSettingValue::UInt64(1), - desc: "Set whether to use Parquet or Arrow IPC for spilling.", + ("spilling_file_format", DefaultSettingValue { + value: UserSettingValue::String("parquet".to_string()), + desc: "Set the storage file format for spilling.", mode: SettingMode::Both, - range: Some(SettingRange::Numeric(0..=1)), + range: Some(SettingRange::String(SpillFileFormat::range())), }), ("spilling_to_disk_vacuum_unknown_temp_dirs_limit", DefaultSettingValue { value: UserSettingValue::UInt64(u64::MAX), diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0bf4e4489efb..8938fcd616dd 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; + use databend_common_ast::parser::Dialect; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -31,6 +33,41 @@ pub enum FlightCompression { Zstd, } +#[derive(Clone, Copy)] +pub enum SpillFileFormat { + Arrow, + Parquet, +} + +impl SpillFileFormat { + pub fn range() -> Vec { + ["arrow", "parquet"] + .iter() + .copied() + .map(String::from) + .collect() + } + + pub fn is_parquet(&self) -> bool { + matches!(self, SpillFileFormat::Parquet) + } +} + +impl FromStr for SpillFileFormat { + type Err = ErrorCode; + + fn from_str(s: &str) -> std::result::Result { + match s { + "arrow" => Ok(SpillFileFormat::Arrow), + "parquet" => Ok(Self::Parquet), + _ => Err(ErrorCode::InvalidConfig(format!( + "invalid SpillFileFormat: {:?}", + s + ))), + } + } +} + impl Settings { // Get u64 value, we don't get from the metasrv. fn try_get_u64(&self, key: &str) -> Result { @@ -290,8 +327,8 @@ impl Settings { Ok(self.try_get_u64("join_spilling_buffer_threshold_per_proc_mb")? as usize) } - pub fn get_spilling_use_parquet(&self) -> Result { - Ok(self.try_get_u64("spilling_use_parquet")? != 0) + pub fn get_spilling_file_format(&self) -> Result { + self.try_get_string("spilling_file_format")?.parse() } pub fn get_spilling_to_disk_vacuum_unknown_temp_dirs_limit(&self) -> Result {