diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f681ae57a3f1c..bf6fb450e3f8b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -235,6 +235,23 @@ config_namespace! { /// /// Defaults to the number of CPU cores on the system pub planning_concurrency: usize, default = num_cpus::get() + + /// How much memory is set aside, for each spillable sort, to + /// ensure an in-memory merge can occur. This setting has no + /// if the sort can not spill (there is no `DiskManager` + /// configured) + /// + /// As part of spilling to disk, in memory data must be sorted + /// / merged before writing the file. This in-memory + /// sort/merge requires memory as well, so To avoid allocating + /// once memory is exhausted, DataFusion sets aside this + /// many bytes before. + pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024 + + /// When sorting, below what size should data be concatenated + /// and sorted in a single RecordBatch rather than sorted in + /// batches and merged. + pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 } } diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index c47c9926819b3..c94d2f9b115fa 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -576,14 +576,21 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering: let sort_exprs = self.input.output_ordering().unwrap_or(&[]); - // Merge streams (while preserving ordering) coming from input partitions to this partition: + + // Merge streams (while preserving ordering) coming from + // input partitions to this partition: + let fetch = None; + let merge_reservation = + MemoryConsumer::new(format!("{}[Merge {partition}]", self.name())) + .register(context.memory_pool()); streaming_merge( input_streams, self.schema(), sort_exprs, BaselineMetrics::new(&self.metrics, partition), context.session_config().batch_size(), - None, + fetch, + merge_reservation, ) } else { Ok(Box::pin(RepartitionStream { diff --git a/datafusion/core/src/physical_plan/sorts/builder.rs b/datafusion/core/src/physical_plan/sorts/builder.rs index 1c5ec356eed9f..3527d57382230 100644 --- a/datafusion/core/src/physical_plan/sorts/builder.rs +++ b/datafusion/core/src/physical_plan/sorts/builder.rs @@ -19,6 +19,7 @@ use arrow::compute::interleave; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; #[derive(Debug, Copy, Clone, Default)] struct BatchCursor { @@ -37,6 +38,9 @@ pub struct BatchBuilder { /// Maintain a list of [`RecordBatch`] and their corresponding stream batches: Vec<(usize, RecordBatch)>, + /// Accounts for memory used by buffered batches + reservation: MemoryReservation, + /// The current [`BatchCursor`] for each stream cursors: Vec, @@ -47,23 +51,31 @@ pub struct BatchBuilder { impl BatchBuilder { /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size` - pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self { + pub fn new( + schema: SchemaRef, + stream_count: usize, + batch_size: usize, + reservation: MemoryReservation, + ) -> Self { Self { schema, batches: Vec::with_capacity(stream_count * 2), cursors: vec![BatchCursor::default(); stream_count], indices: Vec::with_capacity(batch_size), + reservation, } } /// Append a new batch in `stream_idx` - pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) { + pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { + self.reservation.try_grow(batch.get_array_memory_size())?; let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { batch_idx, row_idx: 0, - } + }; + Ok(()) } /// Append the next row from `stream_idx` @@ -119,7 +131,7 @@ impl BatchBuilder { // We can therefore drop all but the last batch for each stream let mut batch_idx = 0; let mut retained = 0; - self.batches.retain(|(stream_idx, _)| { + self.batches.retain(|(stream_idx, batch)| { let stream_cursor = &mut self.cursors[*stream_idx]; let retain = stream_cursor.batch_idx == batch_idx; batch_idx += 1; @@ -127,6 +139,8 @@ impl BatchBuilder { if retain { stream_cursor.batch_idx = retained; retained += 1; + } else { + self.reservation.shrink(batch.get_array_memory_size()); } retain }); diff --git a/datafusion/core/src/physical_plan/sorts/cursor.rs b/datafusion/core/src/physical_plan/sorts/cursor.rs index a9e5122130572..52fb9b5ef209a 100644 --- a/datafusion/core/src/physical_plan/sorts/cursor.rs +++ b/datafusion/core/src/physical_plan/sorts/cursor.rs @@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp; use arrow::row::{Row, Rows}; use arrow_array::types::ByteArrayType; use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray}; +use datafusion_execution::memory_pool::MemoryReservation; use std::cmp::Ordering; /// A [`Cursor`] for [`Rows`] @@ -29,6 +30,11 @@ pub struct RowCursor { num_rows: usize, rows: Rows, + + /// tracks the reservation for the memory in the `Rows` of this + /// cursor. Freed on drop + #[allow(dead_code)] + reservation: MemoryReservation, } impl std::fmt::Debug for RowCursor { @@ -41,12 +47,18 @@ impl std::fmt::Debug for RowCursor { } impl RowCursor { - /// Create a new SortKeyCursor - pub fn new(rows: Rows) -> Self { + /// Create a new SortKeyCursor from `rows` and the associated `reservation` + pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { + assert_eq!( + rows.size(), + reservation.size(), + "memory reservation mismatch" + ); Self { cur_row: 0, num_rows: rows.num_rows(), rows, + reservation, } } diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index 736df7dbe81a1..f8a1457dd62a1 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::*; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; use futures::Stream; use std::pin::Pin; use std::task::{ready, Context, Poll}; @@ -42,7 +43,7 @@ macro_rules! primitive_merge_helper { } macro_rules! merge_helper { - ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{ + ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{ let streams = FieldCursorStream::<$t>::new($sort, $streams); return Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), @@ -50,6 +51,7 @@ macro_rules! merge_helper { $tracking_metrics, $batch_size, $fetch, + $reservation, ))); }}; } @@ -63,28 +65,36 @@ pub fn streaming_merge( metrics: BaselineMetrics, batch_size: usize, fetch: Option, + reservation: MemoryReservation, ) -> Result { // Special case single column comparisons with optimized cursor implementations if expressions.len() == 1 { let sort = expressions[0].clone(); let data_type = sort.expr.data_type(schema.as_ref())?; downcast_primitive! { - data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch), - DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch) - DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch) - DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch) - DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch) + data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation), + DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) _ => {} } } - let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?; + let streams = RowCursorStream::try_new( + schema.as_ref(), + expressions, + streams, + reservation.new_empty(), + )?; + Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), schema, metrics, batch_size, fetch, + reservation, ))) } @@ -162,11 +172,12 @@ impl SortPreservingMergeStream { metrics: BaselineMetrics, batch_size: usize, fetch: Option, + reservation: MemoryReservation, ) -> Self { let stream_count = streams.partitions(); Self { - in_progress: BatchBuilder::new(schema, stream_count, batch_size), + in_progress: BatchBuilder::new(schema, stream_count, batch_size, reservation), streams, metrics, aborted: false, @@ -197,8 +208,7 @@ impl SortPreservingMergeStream { Some(Err(e)) => Poll::Ready(Err(e)), Some(Ok((cursor, batch))) => { self.cursors[idx] = Some(cursor); - self.in_progress.push_batch(idx, batch); - Poll::Ready(Ok(())) + Poll::Ready(self.in_progress.push_batch(idx, batch)) } } } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index c7ae09bb2e340..a8069cbcd3b7a 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -210,23 +210,35 @@ struct ExternalSorter { /// If Some, the maximum number of output rows that will be /// produced. fetch: Option, - /// Memory usage tracking + /// Reservation for in_mem_batches reservation: MemoryReservation, - /// The partition id that this Sort is handling (for identification) - partition_id: usize, + /// Reservation for in merging of in-memory batches. If the sort + /// might spill, `sort_spill_reservation_bytes` will be + /// pre-reserved to try and ensure the spill can happen + merge_reservation: MemoryReservation, /// A handle to the runtime to get Disk spill files runtime: Arc, /// The target number of rows for output batches batch_size: usize, + /// How much memory to reserve for performing in-memory sorts + /// prior to spilling. + sort_spill_reservation_bytes: usize, + /// below what size should data be concatenated into a single + /// record batch and sorted in place? + sort_in_place_threshold_bytes: usize, } impl ExternalSorter { + // TOOD: make a builder or some other nicer API + #[allow(clippy::too_many_arguments)] pub fn new( partition_id: usize, schema: SchemaRef, expr: Vec, batch_size: usize, fetch: Option, + sort_spill_reservation_bytes: usize, + sort_in_place_threshold_bytes: usize, metrics: &ExecutionPlanMetricsSet, runtime: Arc, ) -> Self { @@ -235,6 +247,12 @@ impl ExternalSorter { .with_can_spill(true) .register(&runtime.memory_pool); + let merge_reservation = + MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) + .register(&runtime.memory_pool); + + // todo update the reservation if needed + Self { schema, in_mem_batches: vec![], @@ -244,9 +262,11 @@ impl ExternalSorter { metrics, fetch, reservation, - partition_id, + merge_reservation, runtime, batch_size, + sort_spill_reservation_bytes, + sort_in_place_threshold_bytes, } } @@ -257,6 +277,7 @@ impl ExternalSorter { if input.num_rows() == 0 { return Ok(()); } + self.reserve_memory_for_merge()?; let size = batch_byte_size(&input); if self.reservation.try_grow(size).is_err() { @@ -318,12 +339,10 @@ impl ExternalSorter { self.metrics.baseline.clone(), self.batch_size, self.fetch, + self.reservation.new_empty(), ) } else if !self.in_mem_batches.is_empty() { - let result = self.in_mem_sort_stream(self.metrics.baseline.clone()); - // Report to the memory manager we are no longer using memory - self.reservation.free(); - result + self.in_mem_sort_stream(self.metrics.baseline.clone()) } else { Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) } @@ -374,6 +393,10 @@ impl ExternalSorter { return Ok(()); } + // Release the memory reserved for merge so there is some left + // in the pool when merge_streams requests an allocation. + self.merge_reservation.free(); + self.in_mem_batches = self .in_mem_sort_stream(self.metrics.baseline.intermediate())? .try_collect() @@ -385,7 +408,10 @@ impl ExternalSorter { .map(|x| x.get_array_memory_size()) .sum(); - self.reservation.resize(size); + // Reserve headroom for next merge sort + self.reserve_memory_for_merge()?; + + self.reservation.try_resize(size)?; self.in_mem_batches_sorted = true; Ok(()) } @@ -455,26 +481,27 @@ impl ExternalSorter { assert_ne!(self.in_mem_batches.len(), 0); if self.in_mem_batches.len() == 1 { let batch = self.in_mem_batches.remove(0); - let stream = self.sort_batch_stream(batch, metrics)?; - self.in_mem_batches.clear(); - return Ok(stream); + let reservation = self.reservation.take(); + return self.sort_batch_stream(batch, metrics, reservation); } - // If less than 1MB of in-memory data, concatenate and sort in place - // - // This is a very rough heuristic and likely could be refined further - if self.reservation.size() < 1048576 { + // If less than sort_in_place_threshold_bytes, concatenate and sort in place + if self.reservation.size() < self.sort_in_place_threshold_bytes { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); - return self.sort_batch_stream(batch, metrics); + self.reservation.try_resize(batch.get_array_memory_size())?; + let reservation = self.reservation.take(); + return self.sort_batch_stream(batch, metrics, reservation); } let streams = std::mem::take(&mut self.in_mem_batches) .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); - Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1)) + let reservation = self.reservation.split(batch.get_array_memory_size()); + let input = self.sort_batch_stream(batch, metrics, reservation)?; + Ok(spawn_buffered(input, 1)) }) .collect::>()?; @@ -485,6 +512,7 @@ impl ExternalSorter { metrics, self.batch_size, self.fetch, + self.merge_reservation.new_empty(), ) } @@ -493,27 +521,35 @@ impl ExternalSorter { &self, batch: RecordBatch, metrics: BaselineMetrics, + reservation: MemoryReservation, ) -> Result { let schema = batch.schema(); - let mut reservation = - MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) - .register(&self.runtime.memory_pool); - - // TODO: This should probably be try_grow (#5885) - reservation.resize(batch.get_array_memory_size()); - let fetch = self.fetch; let expressions = self.expr.clone(); let stream = futures::stream::once(futures::future::lazy(move |_| { let sorted = sort_batch(&batch, &expressions, fetch)?; metrics.record_output(sorted.num_rows()); drop(batch); - reservation.free(); + drop(reservation); Ok(sorted) })); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } + + /// Ensure the appropriate amount of memory has been pre-reserved + /// for the merging phase, if needed. + fn reserve_memory_for_merge(&mut self) -> Result<()> { + // Reserve headroom for next merge sort + if self.runtime.disk_manager.tmp_files_enabled() { + let size = self.sort_spill_reservation_bytes; + if self.merge_reservation.size() != size { + self.merge_reservation.try_resize(size)?; + } + } + + Ok(()) + } } impl Debug for ExternalSorter { @@ -801,6 +837,8 @@ impl ExecutionPlan for SortExec { let mut input = self.input.execute(partition, context.clone())?; + let execution_options = &context.session_config().options().execution; + trace!("End SortExec's input.execute for partition: {}", partition); let mut sorter = ExternalSorter::new( @@ -809,6 +847,8 @@ impl ExecutionPlan for SortExec { self.expr.clone(), context.session_config().batch_size(), self.fetch, + execution_options.sort_spill_reservation_bytes, + execution_options.sort_in_place_threshold_bytes, &self.metrics_set, context.runtime_env(), ); @@ -914,9 +954,15 @@ mod tests { #[tokio::test] async fn test_sort_spill() -> Result<()> { // trigger spill there will be 4 batches with 5.5KB for each - let config = RuntimeConfig::new().with_memory_limit(12288, 1.0); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); + let session_config = SessionConfig::new(); + let sort_spill_reservation_bytes = session_config + .options() + .execution + .sort_spill_reservation_bytes; + let rt_config = RuntimeConfig::new() + .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0); + let runtime = Arc::new(RuntimeEnv::new(rt_config)?); + let session_ctx = SessionContext::with_config_rt(session_config, runtime); let partitions = 4; let csv = test::scan_partitioned_csv(partitions)?; @@ -996,11 +1042,18 @@ mod tests { ]; for (fetch, expect_spillage) in test_options { - let config = RuntimeConfig::new() - .with_memory_limit(avg_batch_size * (partitions - 1), 1.0); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let session_ctx = - SessionContext::with_config_rt(SessionConfig::new(), runtime); + let session_config = SessionConfig::new(); + let sort_spill_reservation_bytes = session_config + .options() + .execution + .sort_spill_reservation_bytes; + + let rt_config = RuntimeConfig::new().with_memory_limit( + sort_spill_reservation_bytes + avg_batch_size * (partitions - 1), + 1.0, + ); + let runtime = Arc::new(RuntimeEnv::new(rt_config)?); + let session_ctx = SessionContext::with_config_rt(session_config, runtime); let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index e8d571631bab9..6b978b5ee753d 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -30,6 +30,7 @@ use crate::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; +use datafusion_execution::memory_pool::MemoryConsumer; use arrow::datatypes::SchemaRef; use datafusion_common::{DataFusionError, Result}; @@ -213,6 +214,10 @@ impl ExecutionPlan for SortPreservingMergeExec { ); let schema = self.schema(); + let reservation = + MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]")) + .register(&context.runtime_env().memory_pool); + match input_partitions { 0 => Err(DataFusionError::Internal( "SortPreservingMergeExec requires at least one input partition" @@ -241,6 +246,7 @@ impl ExecutionPlan for SortPreservingMergeExec { BaselineMetrics::new(&self.metrics, partition), context.session_config().batch_size(), self.fetch, + reservation, )?; debug!("Got stream result from SortPreservingMergeStream::new_from_receivers"); @@ -843,14 +849,18 @@ mod tests { } let metrics = ExecutionPlanMetricsSet::new(); + let reservation = + MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool); + let fetch = None; let merge_stream = streaming_merge( streams, batches.schema(), sort.as_slice(), BaselineMetrics::new(&metrics, 0), task_ctx.session_config().batch_size(), - None, + fetch, + reservation, ) .unwrap(); diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs b/datafusion/core/src/physical_plan/sorts/stream.rs index 97a3b85fa5353..85fd63f54a5c2 100644 --- a/datafusion/core/src/physical_plan/sorts/stream.rs +++ b/datafusion/core/src/physical_plan/sorts/stream.rs @@ -23,6 +23,7 @@ use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; use futures::stream::{Fuse, StreamExt}; use std::marker::PhantomData; use std::sync::Arc; @@ -84,6 +85,8 @@ pub struct RowCursorStream { column_expressions: Vec>, /// Input streams streams: FusedStreams, + /// Tracks the memory used by `converter` + reservation: MemoryReservation, } impl RowCursorStream { @@ -91,6 +94,7 @@ impl RowCursorStream { schema: &Schema, expressions: &[PhysicalSortExpr], streams: Vec, + reservation: MemoryReservation, ) -> Result { let sort_fields = expressions .iter() @@ -104,6 +108,7 @@ impl RowCursorStream { let converter = RowConverter::new(sort_fields)?; Ok(Self { converter, + reservation, column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), streams: FusedStreams(streams), }) @@ -117,7 +122,11 @@ impl RowCursorStream { .collect::>>()?; let rows = self.converter.convert_columns(&cols)?; - Ok(RowCursor::new(rows)) + self.reservation.try_resize(self.converter.size())?; + + let mut reservation = self.reservation.new_empty(); + reservation.try_grow(rows.size())?; + Ok(RowCursor::new(rows, reservation)) } } diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs index 1f72e0fcb45bf..d487423a4d7d7 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs @@ -22,13 +22,13 @@ use arrow::{ compute::SortOptions, record_batch::RecordBatch, }; -use datafusion::execution::memory_pool::GreedyMemoryPool; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::expressions::{col, PhysicalSortExpr}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::GreedyMemoryPool; use rand::Rng; use std::sync::Arc; use test_utils::{batches_to_vec, partitions_to_sorted_vec}; @@ -76,6 +76,9 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { let exec = MemoryExec::try_new(&input, schema, None).unwrap(); let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); + // Add IN_MEMORY_SORT_THRESHOLD from sort.rs + let pool_size = pool_size.saturating_add(10 * 1024 * 1024); + let runtime_config = RuntimeConfig::new() .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); @@ -88,9 +91,17 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { let actual = batches_to_vec(&collected); if spill { - assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); + assert_ne!( + sort.metrics().unwrap().spill_count().unwrap(), + 0, + "{pool_size} {size}" + ); } else { - assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); + assert_eq!( + sort.metrics().unwrap().spill_count().unwrap(), + 0, + "{pool_size} {size}" + ); } assert_eq!( diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index a7cff6cbd7581..96250c80bccab 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -17,12 +17,21 @@ //! This module contains tests for limiting memory at runtime in DataFusion -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Int32Type, SchemaRef}; use arrow::record_batch::RecordBatch; +use arrow_array::{ArrayRef, DictionaryArray}; +use arrow_schema::SortOptions; +use async_trait::async_trait; +use datafusion::assert_batches_eq; use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::common::batch_byte_size; +use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::streaming::PartitionStream; +use datafusion_expr::{Expr, TableType}; +use datafusion_physical_expr::PhysicalSortExpr; use futures::StreamExt; -use std::sync::Arc; +use std::any::Any; +use std::sync::{Arc, OnceLock}; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; @@ -31,8 +40,8 @@ use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_optimizer::join_selection::JoinSelection; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::SendableRecordBatchStream; -use datafusion_common::assert_contains; +use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; +use datafusion_common::{assert_contains, Result}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_execution::TaskContext; @@ -196,6 +205,110 @@ async fn symmetric_hash_join() { .await } +#[tokio::test] +async fn sort_preserving_merge() { + TestCase::new( + // use same order as the inpput table to avoid a sort + "select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10", + vec![ + "Resources exhausted: Failed to allocate additional", + "SortPreservingMergeExec", + ], + // Each batch takes 5264 bytes so + 5200, + ) + // two partitions so a merge is required + .with_scenario(Scenario::DictionaryStrings(2)) + .with_expected_plan( + // It is important that this plan does not have a Sort in + // it (it should only have a SortPreservingMergeExec + &[ + "+---------------+-------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+-------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Limit: skip=0, fetch=10 |", + "| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |", + "| | TableScan: t projection=[a, b] |", + "| physical_plan | GlobalLimitExec: skip=0, fetch=10 |", + "| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |", + "| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |", + "| | |", + "+---------------+-------------------------------------------------------------------------------------------------------------+", + ] + ) + .run() + .await +} + +#[tokio::test] +async fn sort_spill_reservation() { + // Memory required for the input batches in each partition + let partition_size = batches_byte_size(&dict_batches()); + + let base_config = SessionConfig::new() + // do not allow the sort to use the non-merge path + .with_sort_in_place_threshold_bytes(10); + + // Show how sort spill reservation works + let test = TestCase::new( + // use different order as the input table to force a sort and + // multi-columns to force RowFormat / interner that makes + // merge require substantial memory + "select * from t ORDER BY a , b DESC", + vec![ + "Resources exhausted: Failed to allocate additional", + "YYY", + ], + // provide enough space to hold 1.5 as much data (so the + // batches can be merged as long as they are not also merged + // at the same time) + partition_size * 3, + ) + // use a single partiton so only a sort is needed + .with_scenario(Scenario::DictionaryStrings(1)) + .with_disk_manager_config(DiskManagerConfig::NewOs) + .with_expected_plan( + // It is important that this plan only has a SortExec, not + // also merge, so we can ensure the sort could finish + // given enough merging memory + &[ + "+---------------+--------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+--------------------------------------------------------------------------------------------------------+", + "| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST |", + "| | TableScan: t projection=[a, b] |", + "| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] |", + "| | MemoryExec: partitions=1, partition_sizes=[5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |", + "| | |", + "+---------------+--------------------------------------------------------------------------------------------------------+", + ] + ); + + let config = base_config + .clone() + // provide insufficient reserved space for merging, + // the sort will fail while trying to merge + .with_sort_spill_reservation_bytes(1024); + + test.clone() + // Each batch takes 5264 bytes so allocate enough to buffer + // the entire input but not also merge + .with_memory_limit(2 * 6 * 5300) + .with_expected_errors(vec![ + "Resources exhausted: Failed to allocate additional", + "ExternalSorterMerge", // merging in sort fails + ]) + .with_config(config) + .run() + .await; + + let config = base_config + // reserve sufficeint space for merge and this time it should succeed + .with_sort_spill_reservation_bytes(2 * partition_size); + + test.with_config(config).with_expected_success().run().await; +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] @@ -205,9 +318,17 @@ struct TestCase { memory_limit: usize, config: SessionConfig, scenario: Scenario, + /// How should the disk manager (that allows spilling) be + /// configured? Defaults to `Disabled` + disk_manager_config: DiskManagerConfig, + /// Expected explain plan, if non emptry + expected_plan: Vec, + /// Is the plan expected to pass? Defaults to false + expected_success: bool, } impl TestCase { + // TODO remove expected errors and memory limits and query from constructor fn new<'a>( query: impl Into, expected_errors: impl IntoIterator, @@ -222,21 +343,62 @@ impl TestCase { memory_limit, config: SessionConfig::new(), scenario: Scenario::AccessLog, + disk_manager_config: DiskManagerConfig::Disabled, + expected_plan: vec![], + expected_success: false, } } + /// Set a list of expected strings that must appear in any errors + fn with_expected_errors<'a>( + mut self, + expected_errors: impl IntoIterator, + ) -> Self { + self.expected_errors = + expected_errors.into_iter().map(|s| s.to_string()).collect(); + self + } + + /// Set the amount of memory that can be used + fn with_memory_limit(mut self, memory_limit: usize) -> Self { + self.memory_limit = memory_limit; + self + } + /// Specify the configuration to use pub fn with_config(mut self, config: SessionConfig) -> Self { self.config = config; self } + /// Mark that the test expects the query to run successfully + pub fn with_expected_success(mut self) -> Self { + self.expected_success = true; + self + } + /// Specify the scenario to run pub fn with_scenario(mut self, scenario: Scenario) -> Self { self.scenario = scenario; self } + /// Specify if the disk manager should be enabled. If true, + /// operators that support it can spill + pub fn with_disk_manager_config( + mut self, + disk_manager_config: DiskManagerConfig, + ) -> Self { + self.disk_manager_config = disk_manager_config; + self + } + + /// Specify an expected plan to review + pub fn with_expected_plan(mut self, expected_plan: &[&str]) -> Self { + self.expected_plan = expected_plan.iter().map(|s| s.to_string()).collect(); + self + } + /// Run the test, panic'ing on error async fn run(self) { let Self { @@ -245,33 +407,62 @@ impl TestCase { memory_limit, config, scenario, + disk_manager_config, + expected_plan, + expected_success, } = self; let table = scenario.table(); let rt_config = RuntimeConfig::new() // do not allow spilling - .with_disk_manager(DiskManagerConfig::Disabled) + .with_disk_manager(disk_manager_config) .with_memory_limit(memory_limit, MEMORY_FRACTION); let runtime = RuntimeEnv::new(rt_config).unwrap(); // Configure execution - let state = SessionState::with_config_rt(config, Arc::new(runtime)) - .with_physical_optimizer_rules(scenario.rules()); + let state = SessionState::with_config_rt(config, Arc::new(runtime)); + let state = match scenario.rules() { + Some(rules) => state.with_physical_optimizer_rules(rules), + None => state, + }; let ctx = SessionContext::with_state(state); ctx.register_table("t", table).expect("registering table"); let df = ctx.sql(&query).await.expect("Planning query"); + if !expected_plan.is_empty() { + let expected_plan: Vec<_> = + expected_plan.iter().map(|s| s.as_str()).collect(); + let actual_plan = df + .clone() + .explain(false, false) + .unwrap() + .collect() + .await + .unwrap(); + assert_batches_eq!(expected_plan, &actual_plan); + } + match df.collect().await { Ok(_batches) => { - panic!("Unexpected success when running, expected memory limit failure") + if !expected_success { + panic!( + "Unexpected success when running, expected memory limit failure" + ) + } } Err(e) => { - for error_substring in expected_errors { - assert_contains!(e.to_string(), error_substring); + if expected_success { + panic!( + "Unexpected failure when running, expected success but got: {e}" + ) + } else { + for error_substring in expected_errors { + assert_contains!(e.to_string(), error_substring); + } } } } @@ -290,6 +481,9 @@ enum Scenario { /// 1000 rows of access log data with batches of 50 rows in a /// [`StreamingTable`] AccessLogStreaming, + + /// N partitions of of sorted, dictionary encoded strings + DictionaryStrings(usize), } impl Scenario { @@ -317,24 +511,53 @@ impl Scenario { .with_infinite_table(true); Arc::new(table) } + Self::DictionaryStrings(num_partitions) => { + use datafusion::physical_expr::expressions::col; + let batches: Vec> = std::iter::repeat(dict_batches()) + .take(*num_partitions) + .collect(); + + let schema = batches[0][0].schema(); + let options = SortOptions { + descending: false, + nulls_first: false, + }; + let sort_information = vec![ + PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options, + }, + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options, + }, + ]; + + let table = SortedTableProvider::new(batches, sort_information); + Arc::new(table) + } } } - /// return the optimizer rules to use - fn rules(&self) -> Vec> { + /// return specific physical optimizer rules to use + fn rules(&self) -> Option>> { match self { Self::AccessLog => { // Disabling physical optimizer rules to avoid sorts / // repartitions (since RepartitionExec / SortExec also // has a memory budget which we'll likely hit first) - vec![] + Some(vec![]) } Self::AccessLogStreaming => { // Disable all physical optimizer rules except the // JoinSelection rule to avoid sorts or repartition, // as they also have memory budgets that may be hit // first - vec![Arc::new(JoinSelection::new())] + Some(vec![Arc::new(JoinSelection::new())]) + } + Self::DictionaryStrings(_) => { + // Use default rules + None } } } @@ -347,6 +570,56 @@ fn access_log_batches() -> Vec { .collect() } +static DICT_BATCHES: OnceLock> = OnceLock::new(); + +/// Returns 5 sorted string dictionary batches each with 50 rows with +/// this schema. +/// +/// a: Dictionary, +/// b: Dictionary, +fn dict_batches() -> Vec { + DICT_BATCHES.get_or_init(make_dict_batches).clone() +} + +fn make_dict_batches() -> Vec { + let batch_size = 50; + + let mut i = 0; + let gen = std::iter::from_fn(move || { + // create values like + // 0000000001 + // 0000000002 + // ... + // 0000000002 + + let values: Vec<_> = (i..i + batch_size).map(|x| format!("{x:010}")).collect(); + //println!("values: \n{values:?}"); + let array: DictionaryArray = + values.iter().map(|s| s.as_str()).collect(); + let array = Arc::new(array) as ArrayRef; + let batch = + RecordBatch::try_from_iter(vec![("a", array.clone()), ("b", array)]).unwrap(); + + i += batch_size; + Some(batch) + }); + + let num_batches = 5; + + let batches: Vec<_> = gen.take(num_batches).collect(); + + batches.iter().enumerate().for_each(|(i, batch)| { + println!("Dict batch[{i}] size is: {}", batch_byte_size(batch)); + }); + + batches +} + +// How many bytes does the memory from dict_batches consume? +fn batches_byte_size(batches: &[RecordBatch]) -> usize { + batches.iter().map(batch_byte_size).sum() +} + struct DummyStreamPartition { schema: SchemaRef, batches: Vec, @@ -366,3 +639,53 @@ impl PartitionStream for DummyStreamPartition { )) } } + +/// Wrapper over a TableProvider that can provide ordering information +struct SortedTableProvider { + schema: SchemaRef, + batches: Vec>, + sort_information: Vec, +} + +impl SortedTableProvider { + fn new( + batches: Vec>, + sort_information: Vec, + ) -> Self { + let schema = batches[0][0].schema(); + Self { + schema, + batches, + sort_information, + } + } +} + +#[async_trait] +impl TableProvider for SortedTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let mem_exec = + MemoryExec::try_new(&self.batches, self.schema(), projection.cloned())? + .with_sort_information(self.sort_information.clone()); + + Ok(Arc::new(mem_exec)) + } +} diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index fcb818d5fd481..162e208201911 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -153,6 +153,8 @@ datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.skip_metadata true datafusion.execution.planning_concurrency 13 +datafusion.execution.sort_in_place_threshold_bytes 1048576 +datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.target_partitions 7 datafusion.execution.time_zone +00:00 datafusion.explain.logical_plan_only false diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index c8478499365ef..44fcc2ab49b47 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -287,6 +287,32 @@ impl SessionConfig { self.options.optimizer.enable_round_robin_repartition } + /// Set the size of [`sort_spill_reservation_bytes`] to control + /// memory pre-reservation + /// + /// [`sort_spill_reservation_bytes`]: datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes + pub fn with_sort_spill_reservation_bytes( + mut self, + sort_spill_reservation_bytes: usize, + ) -> Self { + self.options.execution.sort_spill_reservation_bytes = + sort_spill_reservation_bytes; + self + } + + /// Set the size of [`sort_in_place_threshold_bytes`] to control + /// how sort does things. + /// + /// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes + pub fn with_sort_in_place_threshold_bytes( + mut self, + sort_in_place_threshold_bytes: usize, + ) -> Self { + self.options.execution.sort_in_place_threshold_bytes = + sort_in_place_threshold_bytes; + self + } + /// Convert configuration options to name-value pairs with values /// converted to strings. /// diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 107c58fbe327d..e8d2ed9cc0f51 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -102,6 +102,13 @@ impl DiskManager { } } + /// Return true if this disk manager supports creating temporary + /// files. If this returns false, any call to `create_tmp_file` + /// will error. + pub fn tmp_files_enabled(&self) -> bool { + self.local_dirs.lock().is_some() + } + /// Return a temporary file from a randomized choice in the configured locations /// /// If the file can not be created for some reason, returns an @@ -198,6 +205,7 @@ mod tests { ); let dm = DiskManager::try_new(config)?; + assert!(dm.tmp_files_enabled()); let actual = dm.create_tmp_file("Testing")?; // the file should be in one of the specified local directories @@ -210,6 +218,7 @@ mod tests { fn test_disabled_disk_manager() { let config = DiskManagerConfig::Disabled; let manager = DiskManager::try_new(config).unwrap(); + assert!(!manager.tmp_files_enabled()); assert_eq!( manager.create_tmp_file("Testing").unwrap_err().to_string(), "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)", diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index bff7cb4da0125..abbc682e3d31f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -57,6 +57,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.sort_spill_reservation_bytes | 10485760 | How much memory is set aside, for each spillable sort, to ensure an in-memory merge can occur. This setting has no if the sort can not spill (there is no `DiskManager` configured) As part of spilling to disk, in memory data must be sorted / merged before writing the file. This in-memory sort/merge requires memory as well, so To avoid allocating once memory is exhausted, DataFusion sets aside this many bytes before. | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level |