Skip to content

Commit

Permalink
Update test
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 1, 2023
1 parent 4e887bd commit 5a3a9ea
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 5 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ impl DataSink for MemSink {
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down
86 changes: 81 additions & 5 deletions datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@
use arrow::datatypes::{Int32Type, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::{DictionaryArray, ArrayRef};
use arrow_schema::SortOptions;
use async_trait::async_trait;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion_expr::{TableType, Expr};
use datafusion_physical_expr::PhysicalSortExpr;
use futures::StreamExt;
use std::any::Any;
use std::sync::Arc;

use datafusion::datasource::streaming::StreamingTable;
Expand All @@ -33,8 +39,8 @@ use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::assert_contains;
use datafusion::physical_plan::{SendableRecordBatchStream, ExecutionPlan};
use datafusion_common::{assert_contains, Result};

use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -201,7 +207,7 @@ async fn symmetric_hash_join() {
#[tokio::test]
async fn sort_preserving_merge() {
TestCase::new(
"select * from t ORDER BY a, b LIMIT 10",
"select * from t ORDER BY a ASC NULL LAST, b ASC NULLS LAST LIMIT 10",
vec![
"Resources exhausted: Failed to allocate additional",
"SortPreservingMergeExec",
Expand Down Expand Up @@ -342,10 +348,25 @@ impl Scenario {
Arc::new(table)
}
Self::DictionaryStrings => {
use datafusion::physical_expr::expressions::col;
let batches = vec![dict_batches(), dict_batches()];
let schema = batches[0][0].schema();

let table = MemTable::try_new(schema, batches).unwrap();
let options = SortOptions {
descending: false,
nulls_first: false,
};
let sort_information = vec![
PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: options.clone(),
},
PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
options,
},
];

let table = SortedTableProvider::new(batches, sort_information);
Arc::new(table)
}
}
Expand Down Expand Up @@ -441,3 +462,58 @@ impl PartitionStream for DummyStreamPartition {
))
}
}



/// Wrapper over a TableProvider that can provide ordering information
struct SortedTableProvider {
schema: SchemaRef,
batches: Vec<Vec<RecordBatch>>,
sort_information: Vec<PhysicalSortExpr>,
}

impl SortedTableProvider {
fn new(
batches: Vec<Vec<RecordBatch>>,
sort_information: Vec<PhysicalSortExpr>,
) -> Self {
let schema = batches[0][0].schema();
Self {
schema,
batches,
sort_information,
}
}
}

#[async_trait]
impl TableProvider for SortedTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mem_exec = MemoryExec::try_new(
&self.batches,
self.schema(),
projection.cloned(),
)?
.with_sort_information(self.sort_information.clone());

Ok(Arc::new(mem_exec))
}
}

0 comments on commit 5a3a9ea

Please sign in to comment.