diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index f6f5f856c8760..c36e912b63ef6 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -22,6 +22,7 @@ use arrow::record_batch::RecordBatch; use arrow_array::{DictionaryArray, ArrayRef}; use arrow_schema::SortOptions; use async_trait::async_trait; +use datafusion::assert_batches_eq; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::common::batch_byte_size; use datafusion::physical_plan::memory::MemoryExec; @@ -199,7 +200,7 @@ async fn symmetric_hash_join() { ], 1_000, ) - .with_scenario(Scenario::AccessLogStreaming) + .with_scenario(Scenario::AccessLogStreaming) .run() .await } @@ -207,17 +208,35 @@ async fn symmetric_hash_join() { #[tokio::test] async fn sort_preserving_merge() { TestCase::new( - "select * from t ORDER BY a ASC NULL LAST, b ASC NULLS LAST LIMIT 10", + // use same order as the inpput table to avoid a sort + "select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10", vec![ "Resources exhausted: Failed to allocate additional", "SortPreservingMergeExec", ], - // Each batch takes 5264 bytes, so provide enough memory to sort them all, but not to mere them - 5264 * 5 + 1000, + // Each batch takes 5264 bytes so + 5200, ) - .with_scenario(Scenario::DictionaryStrings) - .run() - .await + .with_scenario(Scenario::DictionaryStrings) + .with_expected_plan( + // It is important that this plan does not have a Sort in + // it (it should only have a SortPreservingMergeExec + &[ + "+---------------+--------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+--------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Limit: skip=0, fetch=10 |", + "| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |", + "| | TableScan: t projection=[a, b] |", + "| physical_plan | GlobalLimitExec: skip=0, fetch=10 |", + "| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |", + "| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering: a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |", + "| | |", + "+---------------+--------------------------------------------------------------------------------------------------------------+", + ] + ) + .run() + .await } /// Run the query with the specified memory limit, @@ -229,6 +248,8 @@ struct TestCase { memory_limit: usize, config: SessionConfig, scenario: Scenario, + /// Expected explain plan, if non emptry + expected_plan: Vec, } impl TestCase { @@ -246,6 +267,7 @@ impl TestCase { memory_limit, config: SessionConfig::new(), scenario: Scenario::AccessLog, + expected_plan: vec![], } } @@ -261,6 +283,12 @@ impl TestCase { self } + /// Specify an expected plan to review + pub fn with_expected_plan(mut self, expected_plan: &[&str]) -> Self { + self.expected_plan = expected_plan.iter().map(|s| s.to_string()).collect(); + self + } + /// Run the test, panic'ing on error async fn run(self) { let Self { @@ -269,6 +297,7 @@ impl TestCase { memory_limit, config, scenario, + expected_plan, } = self; let table = scenario.table(); @@ -292,6 +321,12 @@ impl TestCase { let df = ctx.sql(&query).await.expect("Planning query"); + if !expected_plan.is_empty() { + let expected_plan: Vec<_> = expected_plan.iter().map(|s| s.as_str()).collect(); + let actual_plan = df.clone().explain(false, false).unwrap().collect().await.unwrap(); + assert_batches_eq!(expected_plan, &actual_plan); + } + match df.collect().await { Ok(_batches) => { panic!("Unexpected success when running, expected memory limit failure")