From 5a3a9ea1ccfaae399910d47462142db7ff9f5c82 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 1 Aug 2023 11:32:15 -0400 Subject: [PATCH] Update test --- datafusion/core/src/datasource/memory.rs | 1 + datafusion/core/tests/memory_limit.rs | 86 ++++++++++++++++++++++-- 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 4b6653c6889f3..360b2b8c8b135 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -280,6 +280,7 @@ impl DataSink for MemSink { } } + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index e4e3c49bcc488..f6f5f856c8760 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -20,10 +20,16 @@ use arrow::datatypes::{Int32Type, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::{DictionaryArray, ArrayRef}; +use arrow_schema::SortOptions; +use async_trait::async_trait; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::common::batch_byte_size; +use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::streaming::PartitionStream; +use datafusion_expr::{TableType, Expr}; +use datafusion_physical_expr::PhysicalSortExpr; use futures::StreamExt; +use std::any::Any; use std::sync::Arc; use datafusion::datasource::streaming::StreamingTable; @@ -33,8 +39,8 @@ use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_optimizer::join_selection::JoinSelection; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::SendableRecordBatchStream; -use datafusion_common::assert_contains; +use datafusion::physical_plan::{SendableRecordBatchStream, ExecutionPlan}; +use datafusion_common::{assert_contains, Result}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_execution::TaskContext; @@ -201,7 +207,7 @@ async fn symmetric_hash_join() { #[tokio::test] async fn sort_preserving_merge() { TestCase::new( - "select * from t ORDER BY a, b LIMIT 10", + "select * from t ORDER BY a ASC NULL LAST, b ASC NULLS LAST LIMIT 10", vec![ "Resources exhausted: Failed to allocate additional", "SortPreservingMergeExec", @@ -342,10 +348,25 @@ impl Scenario { Arc::new(table) } Self::DictionaryStrings => { + use datafusion::physical_expr::expressions::col; let batches = vec![dict_batches(), dict_batches()]; let schema = batches[0][0].schema(); - - let table = MemTable::try_new(schema, batches).unwrap(); + let options = SortOptions { + descending: false, + nulls_first: false, + }; + let sort_information = vec![ + PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: options.clone(), + }, + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options, + }, + ]; + + let table = SortedTableProvider::new(batches, sort_information); Arc::new(table) } } @@ -441,3 +462,58 @@ impl PartitionStream for DummyStreamPartition { )) } } + + + +/// Wrapper over a TableProvider that can provide ordering information +struct SortedTableProvider { + schema: SchemaRef, + batches: Vec>, + sort_information: Vec, +} + +impl SortedTableProvider { + fn new( + batches: Vec>, + sort_information: Vec, + ) -> Self { + let schema = batches[0][0].schema(); + Self { + schema, + batches, + sort_information, + } + } +} + +#[async_trait] +impl TableProvider for SortedTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let mem_exec = MemoryExec::try_new( + &self.batches, + self.schema(), + projection.cloned(), + )? + .with_sort_information(self.sort_information.clone()); + + Ok(Arc::new(mem_exec)) + } +}