Skip to content

Commit

Permalink
Update test
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 1, 2023
1 parent 5a3a9ea commit ad301ca
Showing 1 changed file with 42 additions and 7 deletions.
49 changes: 42 additions & 7 deletions datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::record_batch::RecordBatch;
use arrow_array::{DictionaryArray, ArrayRef};
use arrow_schema::SortOptions;
use async_trait::async_trait;
use datafusion::assert_batches_eq;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::memory::MemoryExec;
Expand Down Expand Up @@ -199,25 +200,43 @@ async fn symmetric_hash_join() {
],
1_000,
)
.with_scenario(Scenario::AccessLogStreaming)
.with_scenario(Scenario::AccessLogStreaming)
.run()
.await
}

#[tokio::test]
async fn sort_preserving_merge() {
TestCase::new(
"select * from t ORDER BY a ASC NULL LAST, b ASC NULLS LAST LIMIT 10",
// use same order as the inpput table to avoid a sort
"select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10",
vec![
"Resources exhausted: Failed to allocate additional",
"SortPreservingMergeExec",
],
// Each batch takes 5264 bytes, so provide enough memory to sort them all, but not to mere them
5264 * 5 + 1000,
// Each batch takes 5264 bytes so
5200,
)
.with_scenario(Scenario::DictionaryStrings)
.run()
.await
.with_scenario(Scenario::DictionaryStrings)
.with_expected_plan(
// It is important that this plan does not have a Sort in
// it (it should only have a SortPreservingMergeExec
&[
"+---------------+--------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+--------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Limit: skip=0, fetch=10 |",
"| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | GlobalLimitExec: skip=0, fetch=10 |",
"| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering: a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+--------------------------------------------------------------------------------------------------------------+",
]
)
.run()
.await
}

/// Run the query with the specified memory limit,
Expand All @@ -229,6 +248,8 @@ struct TestCase {
memory_limit: usize,
config: SessionConfig,
scenario: Scenario,
/// Expected explain plan, if non emptry
expected_plan: Vec<String>,
}

impl TestCase {
Expand All @@ -246,6 +267,7 @@ impl TestCase {
memory_limit,
config: SessionConfig::new(),
scenario: Scenario::AccessLog,
expected_plan: vec![],
}
}

Expand All @@ -261,6 +283,12 @@ impl TestCase {
self
}

/// Specify an expected plan to review
pub fn with_expected_plan(mut self, expected_plan: &[&str]) -> Self {
self.expected_plan = expected_plan.iter().map(|s| s.to_string()).collect();
self
}

/// Run the test, panic'ing on error
async fn run(self) {
let Self {
Expand All @@ -269,6 +297,7 @@ impl TestCase {
memory_limit,
config,
scenario,
expected_plan,
} = self;

let table = scenario.table();
Expand All @@ -292,6 +321,12 @@ impl TestCase {

let df = ctx.sql(&query).await.expect("Planning query");

if !expected_plan.is_empty() {
let expected_plan: Vec<_> = expected_plan.iter().map(|s| s.as_str()).collect();
let actual_plan = df.clone().explain(false, false).unwrap().collect().await.unwrap();
assert_batches_eq!(expected_plan, &actual_plan);
}

match df.collect().await {
Ok(_batches) => {
panic!("Unexpected success when running, expected memory limit failure")
Expand Down

0 comments on commit ad301ca

Please sign in to comment.