From 769a5852a88a4c9be00520657553eefa6ee1875d Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Mon, 13 Jan 2025 10:52:55 -0800 Subject: [PATCH] init --- .../core/src/datasource/physical_plan/mod.rs | 2 +- .../src/datasource/physical_plan/row_serde.rs | 219 +++++++++++++++++ datafusion/execution/Cargo.toml | 1 + datafusion/execution/src/lib.rs | 5 +- datafusion/execution/src/stream.rs | 64 ++++- datafusion/physical-plan/src/sorts/builder.rs | 12 +- datafusion/physical-plan/src/sorts/cursor.rs | 4 + datafusion/physical-plan/src/sorts/mod.rs | 1 - datafusion/physical-plan/src/sorts/sort.rs | 220 +++++++++++++++--- datafusion/physical-plan/src/sorts/stream.rs | 14 +- .../src/sorts/streaming_merge.rs | 5 +- datafusion/physical-plan/src/spill.rs | 27 ++- 12 files changed, 524 insertions(+), 50 deletions(-) create mode 100644 datafusion/core/src/datasource/physical_plan/row_serde.rs diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 3146d124d9f1..09511778e164 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -26,13 +26,13 @@ mod file_stream; mod json; #[cfg(feature = "parquet")] pub mod parquet; +mod row_serde; mod statistics; pub(crate) use self::csv::plan_to_csv; pub(crate) use self::json::plan_to_json; #[cfg(feature = "parquet")] pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; - pub use arrow_file::ArrowExec; pub use avro::AvroExec; pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener}; diff --git a/datafusion/core/src/datasource/physical_plan/row_serde.rs b/datafusion/core/src/datasource/physical_plan/row_serde.rs new file mode 100644 index 000000000000..67df093de4ae --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/row_serde.rs @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use arrow::row::Row; +use arrow::row::RowConverter; +use arrow::row::RowConverter; +use arrow::row::Rows; +use bzip2::{read::BzDecoder, write::BzEncoder}; +use datafusion_common::error::DataFusionError; +use datafusion_common::Result; +use datafusion_common::{exec_err, parsers::CompressionTypeVariant}; +use flate2::{read::GzDecoder, write::GzEncoder, Compression}; +use itertools::Itertools; +use std::io::{self, Read, Write}; +use std::sync::Arc; +use xz2::{read::XzDecoder, write::XzEncoder}; +use zstd::stream::{Decoder as ZstdDecoder, Encoder as ZstdEncoder}; +/// used for spill Rows +pub trait RowDataWriter { + type Data; + fn write(&mut self, rows: &Self::Data) -> Result<(), DataFusionError>; + fn compression(&self) -> CompressionTypeVariant; + fn compress(&self, data: &[u8]) -> Result, DataFusionError> { + match self.compression() { + CompressionTypeVariant::UNCOMPRESSED => Ok(data.to_vec()), + CompressionTypeVariant::GZIP => { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(data).map_err(DataFusionError::IoError)?; + Ok(encoder.finish().map_err(DataFusionError::IoError)?) + } + CompressionTypeVariant::BZIP2 => { + let mut encoder = BzEncoder::new(Vec::new(), bzip2::Compression::Default); + encoder.write_all(data).map_err(DataFusionError::IoError)?; + Ok(encoder.finish().map_err(DataFusionError::IoError)?) + } + CompressionTypeVariant::XZ => { + let mut encoder = XzEncoder::new(Vec::new(), 9); + encoder.write_all(data).map_err(DataFusionError::IoError)?; + Ok(encoder.finish().map_err(DataFusionError::IoError)?) + } + CompressionTypeVariant::ZSTD => { + let mut encoder = + ZstdEncoder::new(Vec::new(), 0).map_err(DataFusionError::IoError)?; + encoder.write_all(data).map_err(DataFusionError::IoError)?; + Ok(encoder.finish().map_err(DataFusionError::IoError)?) + } + } + } +} + +pub trait RowDataReader { + type Data; + fn read_all(&mut self) -> Result, DataFusionError>; + fn compression(&self) -> CompressionTypeVariant; + fn decompress(&self, data: &[u8]) -> Result, DataFusionError> { + match self.compression() { + CompressionTypeVariant::UNCOMPRESSED => Ok(data.to_vec()), + CompressionTypeVariant::GZIP => { + let mut decoder = GzDecoder::new(data); + let mut decompressed = Vec::new(); + decoder + .read_to_end(&mut decompressed) + .map_err(DataFusionError::IoError)?; + Ok(decompressed) + } + CompressionTypeVariant::BZIP2 => { + let mut decoder = BzDecoder::new(data); + let mut decompressed = Vec::new(); + decoder + .read_to_end(&mut decompressed) + .map_err(DataFusionError::IoError)?; + Ok(decompressed) + } + CompressionTypeVariant::XZ => { + let mut decoder = XzDecoder::new(data); + let mut decompressed = Vec::new(); + decoder + .read_to_end(&mut decompressed) + .map_err(DataFusionError::IoError)?; + Ok(decompressed) + } + CompressionTypeVariant::ZSTD => { + let mut decoder = + ZstdDecoder::new(data).map_err(DataFusionError::IoError)?; + let mut decompressed = Vec::new(); + decoder + .read_to_end(&mut decompressed) + .map_err(DataFusionError::IoError)?; + Ok(decompressed) + } + } + } +} + +fn serilize_row(row: Row<'_>) -> Result> { + let mut serilized_row = Vec::new(); + let len = row.data().len(); + serialize +} + +pub struct CommonRowWriter { + compression: CompressionTypeVariant, +} + +impl CommonRowWriter { + pub fn new(compression: CompressionTypeVariant) -> Self { + Self { compression } + } +} + +impl RowDataWriter for CommonRowWriter { + type Data = Rows; + + fn write(&mut self, rows: &Self::Data) -> Result<(), DataFusionError> { + let mut buffer: Vec = Vec::new(); + let mut current_offset = 0; + let mut offsets = Vec::new(); + for row in rows.into_iter() { + offsets.push(current_offset); + current_offset += row.data().len(); + } + let offsets_len = offsets.len() as u32; + buffer.extend_from_slice(&offsets_len.to_le_bytes()); + + for &offset in &offsets { + buffer.extend_from_slice(&(offset as u32).to_le_bytes()); + } + + for row in rows.into_iter() { + buffer.extend_from_slice(row.data()); + } + let compressed_data = self.compress(&buffer)?; + Ok(()) + } + + fn compression(&self) -> CompressionTypeVariant { + self.compression + } +} + +pub struct CommonRowReader { + compression: CompressionTypeVariant, + converter: Arc, +} + +impl CommonRowReader { + pub fn new( + compression: CompressionTypeVariant, + converter: Arc, + ) -> Self { + Self { + compression, + converter, + } + } +} + +impl RowDataReader for CommonRowReader { + type Data = Rows; + + fn read_all(&mut self) -> Result, DataFusionError> { + let compressed_data = vec![]; + let decompressed_data = self.decompress(&compressed_data)?; + let mut cursor = std::io::Cursor::new(decompressed_data); + + let mut offsets_len_buf = [0u8; 4]; + cursor + .read_exact(&mut offsets_len_buf) + .map_err(DataFusionError::IoError)?; + let offsets_len = u32::from_le_bytes(offsets_len_buf) as usize; + let mut offsets = Vec::with_capacity(offsets_len); + for _ in 0..offsets_len { + let mut offset_buf = [0u8; 4]; + cursor + .read_exact(&mut offset_buf) + .map_err(DataFusionError::IoError)?; + offsets.push(u32::from_le_bytes(offset_buf) as usize); + } + let mut buffer = Vec::new(); + cursor + .read_to_end(&mut buffer) + .map_err(DataFusionError::IoError)?; + + let parser = self.converter.parser(); + let mut rows = self.converter.empty_rows(0, 0); + + for i in 0..offsets.len() { + let start = offsets[i]; + let end = if i + 1 < offsets.len() { + offsets[i + 1] + } else { + buffer.len() + }; + + let row_data = &buffer[start..end]; + let row = parser.parse(row_data); + rows.push(row); + } + + Ok(vec![rows]) + } + + fn compression(&self) -> CompressionTypeVariant { + self.compression + } +} diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index bb86868a8214..74e36873f187 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -47,6 +47,7 @@ parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } url = { workspace = true } +pin-project-lite = "^0.2.7" [dev-dependencies] chrono = { workspace = true } diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 317bd3203ab1..3a53a69a76a1 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -37,5 +37,8 @@ pub mod registry { pub use disk_manager::DiskManager; pub use registry::FunctionRegistry; -pub use stream::{RecordBatchStream, SendableRecordBatchStream}; +pub use stream::{ + RecordBatchStream, RowOrColumn, RowOrColumnStream, RowOrColumnStreamAdapter, + SendableRecordBatchStream, +}; pub use task::TaskContext; diff --git a/datafusion/execution/src/stream.rs b/datafusion/execution/src/stream.rs index f3eb7b77e03c..1bacb047b22a 100644 --- a/datafusion/execution/src/stream.rs +++ b/datafusion/execution/src/stream.rs @@ -15,11 +15,15 @@ // specific language governing permissions and limitations // under the License. -use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; -use datafusion_common::Result; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch, row::Rows}; +use datafusion_common::{DataFusionError, Result}; use futures::Stream; -use std::pin::Pin; - +use pin_project_lite::pin_project; +use std::{ + fmt, + pin::Pin, + task::{Context, Poll}, +}; /// Trait for types that stream [RecordBatch] /// /// See [`SendableRecordBatchStream`] for more details. @@ -51,3 +55,55 @@ pub trait RecordBatchStream: Stream> { /// [`Stream`]s there is no mechanism to prevent callers polling so returning /// `Ready(None)` is recommended. pub type SendableRecordBatchStream = Pin>; + +pub enum RowOrColumn { + Row(Rows), + Column(RecordBatch), +} + +/// Contains a Rows or a Recordbatch +pub type RowOrColumnStream = Pin> + Send>>; + +pin_project! { + pub struct RowOrColumnStreamAdapter { + #[pin] + stream: S, + } +} + +impl RowOrColumnStreamAdapter { + pub fn new(stream: S) -> Self { + Self { stream } + } +} + +impl fmt::Debug for RowOrColumnStreamAdapter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RowOrColumnStreamAdapter").finish() + } +} + +impl Stream for RowOrColumnStreamAdapter +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +impl From> for RowOrColumnStream +where + S: Stream> + Send + 'static, +{ + fn from(adapter: RowOrColumnStreamAdapter) -> Self { + Box::pin(adapter) + } +} diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index 9b2fa968222c..708edbb6baf8 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -19,6 +19,8 @@ use crate::spill::get_record_batch_memory_size; use arrow::compute::interleave; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use arrow::row::Rows; +use arrow_array::ArrayRef; use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; use std::sync::Arc; @@ -31,14 +33,14 @@ struct BatchCursor { row_idx: usize, } -/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`] +/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`Rows`] #[derive(Debug)] pub struct BatchBuilder { /// The schema of the RecordBatches yielded by this stream schema: SchemaRef, /// Maintain a list of [`RecordBatch`] and their corresponding stream - batches: Vec<(usize, RecordBatch)>, + rows: Vec<(usize, Vec)>, /// Accounts for memory used by buffered batches reservation: MemoryReservation, @@ -61,7 +63,7 @@ impl BatchBuilder { ) -> Self { Self { schema, - batches: Vec::with_capacity(stream_count * 2), + rows: Vec::with_capacity(stream_count * 2), cursors: vec![BatchCursor::default(); stream_count], indices: Vec::with_capacity(batch_size), reservation, @@ -73,7 +75,7 @@ impl BatchBuilder { self.reservation .try_grow(get_record_batch_memory_size(&batch))?; let batch_idx = self.batches.len(); - self.batches.push((stream_idx, batch)); + self.rows.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { batch_idx, row_idx: 0, @@ -117,7 +119,7 @@ impl BatchBuilder { let columns = (0..self.schema.fields.len()) .map(|column_idx| { let arrays: Vec<_> = self - .batches + .rows .iter() .map(|(_, batch)| batch.column(column_idx).as_ref()) .collect(); diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 5cd24b89f5c1..dcf4901407fb 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -177,6 +177,10 @@ impl RowValues { _reservation: reservation, } } + /// get the size of row_values + pub fn size(&self) -> usize { + self.rows.size() + } } impl CursorValues for RowValues { diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index ab5df37ed327..1eeb7066ea56 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -26,5 +26,4 @@ pub mod sort; pub mod sort_preserving_merge; mod stream; pub mod streaming_merge; - pub use index::RowIndex; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 33c8a2b2fee3..de663ee4b741 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -33,7 +33,7 @@ use crate::metrics::{ }; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::{ - get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, + get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, SpillFormat, }; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; @@ -53,13 +53,17 @@ use datafusion_common::{internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_execution::TaskContext; +use datafusion_execution::{ + RowOrColumn, RowOrColumnStream, RowOrColumnStreamAdapter, TaskContext, +}; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; +use arrow::row::Rows; + struct ExternalSorterMetrics { /// metrics baseline: BaselineMetrics, @@ -202,7 +206,7 @@ impl ExternalSorterMetrics { /// /// in_mem_batches /// ``` -struct ExternalSorter { +pub struct ExternalSorter { // ======================================================================== // PROPERTIES: // Fields that define the sorter's configuration and remain constant @@ -225,7 +229,7 @@ struct ExternalSorter { // Fields that hold intermediate data during sorting // ======================================================================== /// Potentially unsorted in memory buffer - in_mem_batches: Vec, + in_mem_rows: Vec, /// if `Self::in_mem_batches` are sorted in_mem_batches_sorted: bool, @@ -251,6 +255,8 @@ struct ExternalSorter { /// How much memory to reserve for performing in-memory sort/merges /// prior to spilling. sort_spill_reservation_bytes: usize, + /// serilize or deserilize Rows + converter: Option, } impl ExternalSorter { @@ -276,10 +282,16 @@ impl ExternalSorter { let merge_reservation = MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) .register(&runtime.memory_pool); - + let fields = expr + .iter() + .map(|e| { + let data_type = e.expr.data_type(&schema)?; + Ok(SortField::new_with_options(data_type, e.options)) + }) + .collect::>>()?; Self { schema, - in_mem_batches: vec![], + in_mem_rows: vec![], in_mem_batches_sorted: true, spills: vec![], expr: expr.inner.into(), @@ -291,6 +303,7 @@ impl ExternalSorter { batch_size, sort_spill_reservation_bytes, sort_in_place_threshold_bytes, + converter: RowConverter::new(fields)?, } } @@ -302,9 +315,8 @@ impl ExternalSorter { return Ok(()); } self.reserve_memory_for_merge()?; - - let size = get_record_batch_memory_size(&input); - + let rows = self.convert_record_batch(&input)?; + let size = rows.size(); if self.reservation.try_grow(size).is_err() { let before = self.reservation.size(); self.in_mem_sort().await?; @@ -326,15 +338,39 @@ impl ExternalSorter { } } - self.in_mem_batches.push(input); + self.in_mem_rows.push(rows); self.in_mem_batches_sorted = false; Ok(()) } + fn convert_record_batch(&self, batch: &RecordBatch) -> Result { + let sort_columns = self + .expr + .iter() + .map(|expr| expr.evaluate_to_sort_column(&batch)) + .collect::>>()?; + let columns = + sort_columns + .into_iter() + .fold(vec![], |mut columns, sort_column| { + columns.push(sort_column.values); + columns + }); + Ok(self.converter.convert_columns(&columns)?) + } + fn spilled_before(&self) -> bool { !self.spills.is_empty() } + fn spill_format(&self) -> SpillFormat { + if self.converter.is_some() { + SpillFormat::Row + } else { + SpillFormat::Column + } + } + /// Returns the final sorted output of all batches inserted via /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es. /// @@ -347,17 +383,22 @@ impl ExternalSorter { fn sort(&mut self) -> Result { if self.spilled_before() { let mut streams = vec![]; - if !self.in_mem_batches.is_empty() { + if !self.in_mem_rows.is_empty() { let in_mem_stream = self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; streams.push(in_mem_stream); } - + let spill_format = self.spill_format(); for spill in self.spills.drain(..) { if !spill.path().exists() { return internal_err!("Spill file {:?} does not exist", spill.path()); } - let stream = read_spill_as_stream(spill, Arc::clone(&self.schema), 2)?; + let stream = read_spill_as_stream( + spill, + Arc::clone(&self.schema), + 2, + spill_format.clone(), + )?; streams.push(stream); } @@ -403,7 +444,7 @@ impl ExternalSorter { /// Returns the amount of memory freed. async fn spill(&mut self) -> Result { // we could always get a chance to free some memory as long as we are holding some - if self.in_mem_batches.is_empty() { + if self.in_mem_rows.is_empty() { return Ok(0); } @@ -412,7 +453,7 @@ impl ExternalSorter { self.in_mem_sort().await?; let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; - let batches = std::mem::take(&mut self.in_mem_batches); + let rows = std::mem::take(&mut self.in_mem_rows); let spilled_rows = spill_record_batches( batches, spill_file.path().into(), @@ -437,7 +478,7 @@ impl ExternalSorter { // allocation. self.merge_reservation.free(); - self.in_mem_batches = self + self.in_mem_rows = self .in_mem_sort_stream(self.metrics.baseline.intermediate())? .try_collect() .await?; @@ -518,7 +559,7 @@ impl ExternalSorter { &mut self, metrics: BaselineMetrics, ) -> Result { - if self.in_mem_batches.is_empty() { + if self.in_mem_rows.is_empty() { return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( &self.schema, )))); @@ -529,29 +570,28 @@ impl ExternalSorter { let elapsed_compute = metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); - if self.in_mem_batches.len() == 1 { - let batch = self.in_mem_batches.swap_remove(0); + if self.in_mem_rows.len() == 1 { + let rows = self.in_mem_rows.swap_remove(0); let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); + return self.sort_batch_stream(rows, metrics, reservation); } // If less than sort_in_place_threshold_bytes, concatenate and sort in place if self.reservation.size() < self.sort_in_place_threshold_bytes { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; - self.in_mem_batches.clear(); + self.in_mem_rows.clear(); self.reservation .try_resize(get_record_batch_memory_size(&batch))?; let reservation = self.reservation.take(); return self.sort_batch_stream(batch, metrics, reservation); } - let streams = std::mem::take(&mut self.in_mem_batches) + let streams = std::mem::take(&mut self.in_mem_rows) .into_iter() - .map(|batch| { + .map(|row| { let metrics = self.metrics.baseline.intermediate(); - let reservation = - self.reservation.split(get_record_batch_memory_size(&batch)); + let reservation = self.reservation.split(row.size()); let input = self.sort_batch_stream(batch, metrics, reservation)?; Ok(spawn_buffered(input, 1)) }) @@ -576,10 +616,10 @@ impl ExternalSorter { /// is released when the sort is complete fn sort_batch_stream( &self, - batch: RecordBatch, + batch: RowOrColumn, metrics: BaselineMetrics, reservation: MemoryReservation, - ) -> Result { + ) -> Result { assert_eq!(get_record_batch_memory_size(&batch), reservation.size()); let schema = batch.schema(); @@ -587,14 +627,19 @@ impl ExternalSorter { let expressions: LexOrdering = self.expr.iter().cloned().collect(); let stream = futures::stream::once(futures::future::lazy(move |_| { let timer = metrics.elapsed_compute().timer(); - let sorted = sort_batch(&batch, &expressions, fetch)?; + let sorted = match batch { + RowOrColumn::Column(batch) => { + RowOrColumn::Column(sort_batch(&batch, &expressions, fetch)?) + } + RowOrColumn::Row(rows) => RowOrColumn::Row(sort_row(rows, fetch)), + }; timer.done(); metrics.record_output(sorted.num_rows()); drop(batch); drop(reservation); Ok(sorted) })); - Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + Ok(Box::pin(RowOrColumnStreamAdapter::new(stream))) } /// If this sort may spill, pre-allocates @@ -652,6 +697,12 @@ pub fn sort_batch( )?) } +pub fn sort_row(rows: Rows, fetch: Option) -> Result { + let mut row_data: Vec<_> = rows.into_iter().collect(); + row_data.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); + Ok(row_data.into()) +} + #[inline] fn is_multi_column_with_lists(sort_columns: &[SortColumn]) -> bool { sort_columns.iter().any(|c| { @@ -1026,6 +1077,117 @@ impl ExecutionPlan for SortExec { } } +pub struct ExternalSorterBuilder { + partition_id: usize, + schema: SchemaRef, + expr: LexOrdering, + batch_size: usize, + fetch: Option, + sort_spill_reservation_bytes: usize, + sort_in_place_threshold_bytes: usize, + metrics: ExecutionPlanMetricsSet, + runtime: Arc, +} + +impl ExternalSorterBuilder { + pub fn new(partition_id: usize, schema: SchemaRef, expr: LexOrdering) -> Self { + Self { + partition_id, + schema, + expr, + batch_size: 0, + fetch: None, + sort_spill_reservation_bytes: 0, + sort_in_place_threshold_bytes: 0, + metrics: ExecutionPlanMetricsSet::new(), + runtime: Arc::new(RuntimeEnv::default()), + } + } + + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self + } + + pub fn with_sort_spill_reservation_bytes(mut self, bytes: usize) -> Self { + self.sort_spill_reservation_bytes = bytes; + self + } + + pub fn with_sort_in_place_threshold_bytes(mut self, bytes: usize) -> Self { + self.sort_in_place_threshold_bytes = bytes; + self + } + + pub fn with_metrics(mut self, metrics: ExecutionPlanMetricsSet) -> Self { + self.metrics = metrics; + self + } + + pub fn with_runtime(mut self, runtime: Arc) -> Self { + self.runtime = runtime; + self + } + + pub fn build(self) -> Result { + let metrics = ExternalSorterMetrics::new(&self.metrics, self.partition_id); + let reservation = + MemoryConsumer::new(format!("ExternalSorter[{}]", self.partition_id)) + .with_can_spill(true) + .register(&self.runtime.memory_pool); + + let merge_reservation = + MemoryConsumer::new(format!("ExternalSorterMerge[{}]", self.partition_id)) + .register(&self.runtime.memory_pool); + let fields = self + .expr + .iter() + .map(|e| { + let data_type = e.expr.data_type(&self.schema)?; + Ok(SortField::new_with_options(data_type, e.options)) + }) + .collect::>>()?; + // for single column, don't need to transform + let converter = if fields.len() == 1 { + None + } else { + Some(RowConverter::new(fields)?) + }; + Ok(ExternalSorter { + schema: self.schema, + in_mem_rows: vec![], + in_mem_batches_sorted: true, + spills: vec![], + expr: self.expr.inner.into(), + metrics, + fetch: self.fetch, + reservation, + merge_reservation, + runtime: self.runtime, + batch_size: self.batch_size, + sort_spill_reservation_bytes: self.sort_spill_reservation_bytes, + sort_in_place_threshold_bytes: self.sort_in_place_threshold_bytes, + converter, + }) + } +} + +// Update ExternalSorter implementation +impl ExternalSorter { + pub fn builder( + partition_id: usize, + schema: SchemaRef, + expr: LexOrdering, + ) -> ExternalSorterBuilder { + ExternalSorterBuilder::new(partition_id, schema, expr) + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index ab8054be59a8..6487b10c5531 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -21,9 +21,10 @@ use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::Array; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; -use arrow::row::{RowConverter, SortField}; +use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; +use datafusion_execution::{RowOrColumn, RowOrColumnStream}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use futures::stream::{Fuse, StreamExt}; use std::marker::PhantomData; @@ -49,9 +50,9 @@ pub trait PartitionedStream: std::fmt::Debug + Send { ) -> Poll>; } -/// A new type wrapper around a set of fused [`SendableRecordBatchStream`] +/// A new type wrapper around a set of fused [`RowOrColumnStream`] /// that implements debug, and skips over empty [`RecordBatch`] -struct FusedStreams(Vec>); +struct FusedStreams(Vec>); impl std::fmt::Debug for FusedStreams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -66,7 +67,7 @@ impl FusedStreams { &mut self, cx: &mut Context<'_>, stream_idx: usize, - ) -> Poll>> { + ) -> Poll>> { loop { match ready!(self.0[stream_idx].poll_next_unpin(cx)) { Some(Ok(b)) if b.num_rows() == 0 => continue, @@ -94,7 +95,7 @@ impl RowCursorStream { pub fn try_new( schema: &Schema, expressions: &LexOrdering, - streams: Vec, + streams: Vec, reservation: MemoryReservation, ) -> Result { let sort_fields = expressions @@ -107,6 +108,7 @@ impl RowCursorStream { let streams = streams.into_iter().map(|s| s.fuse()).collect(); let converter = RowConverter::new(sort_fields)?; + Ok(Self { converter, reservation, @@ -130,6 +132,8 @@ impl RowCursorStream { rows_reservation.try_grow(rows.size())?; Ok(RowValues::new(rows, rows_reservation)) } + + fn row_values(&mut self, rows: ) } impl PartitionedStream for RowCursorStream { diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 448d70760de1..419c1c186982 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -28,6 +28,7 @@ use arrow::datatypes::{DataType, SchemaRef}; use arrow_array::*; use datafusion_common::{internal_err, Result}; use datafusion_execution::memory_pool::MemoryReservation; +use datafusion_execution::RowOrColumnStream; use datafusion_physical_expr_common::sort_expr::LexOrdering; macro_rules! primitive_merge_helper { @@ -52,7 +53,7 @@ macro_rules! merge_helper { } pub struct StreamingMergeBuilder<'a> { - streams: Vec, + streams: Vec, schema: Option, expressions: &'a LexOrdering, metrics: Option, @@ -85,7 +86,7 @@ impl<'a> StreamingMergeBuilder<'a> { } } - pub fn with_streams(mut self, streams: Vec) -> Self { + pub fn with_streams(mut self, streams: Vec) -> Self { self.streams = streams; self } diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index f20adb0d2fab..e10f18373f1b 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -37,6 +37,13 @@ use datafusion_execution::SendableRecordBatchStream; use crate::common::IPCWriter; use crate::stream::RecordBatchReceiverStream; +/// spill format +#[derive(Clone)] +pub enum SpillFormat { + Row, + Column, +} + /// Read spilled batches from the disk /// /// `path` - temp file @@ -46,11 +53,12 @@ pub(crate) fn read_spill_as_stream( path: RefCountedTempFile, schema: SchemaRef, buffer: usize, + format: SpillFormat, ) -> Result { let mut builder = RecordBatchReceiverStream::builder(schema, buffer); let sender = builder.tx(); - builder.spawn_blocking(move || read_spill(sender, path.path())); + builder.spawn_blocking(move || read_spill(sender, path.path(), format)); Ok(builder.build()) } @@ -77,7 +85,11 @@ pub(crate) fn spill_record_batches( Ok(writer.num_rows) } -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { +fn read_spill( + sender: Sender>, + path: &Path, + format: SpillFormat, +) -> Result<()> { let file = BufReader::new(File::open(path)?); let reader = FileReader::try_new(file, None)?; for batch in reader { @@ -179,6 +191,17 @@ fn count_array_data_memory_size( } } +pub fn estimate_rows_size(batch: &RecordBatch) -> usize { + let record_batch_size = get_record_batch_memory_size(batch); + let num_rows = batch.num_rows(); + if num_rows == 0 { + return 0; + } + let offsets_size = (num_rows + 1) * size_of::(); + let row_overhead = offsets_size; + record_batch_size + row_overhead +} + #[cfg(test)] mod tests { use super::*;