Skip to content

Commit

Permalink
Merge branch 'main' into convert-lead-lag-udwf
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsherin committed Oct 14, 2024
2 parents 7dbb7e8 + 6c0670d commit 7892e05
Show file tree
Hide file tree
Showing 90 changed files with 3,913 additions and 886 deletions.
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@
DataFusion is an extensible query engine written in [Rust] that
uses [Apache Arrow] as its in-memory format.

The DataFusion libraries in this repository are used to build data-centric system software. DataFusion also provides the
following subprojects, which are packaged versions of DataFusion intended for end users.
This crate provides libraries and binaries for developers building fast and
feature rich database and analytic systems, customized to particular workloads.
See [use cases] for examples. The following related subprojects target end users:

- [DataFusion Python](https://github.com/apache/datafusion-python/) offers a Python interface for SQL and DataFrame
queries.
Expand All @@ -54,13 +55,10 @@ following subprojects, which are packaged versions of DataFusion intended for en
- [DataFusion Comet](https://github.com/apache/datafusion-comet/) is an accelerator for Apache Spark based on
DataFusion.

The target audience for the DataFusion crates in this repository are
developers building fast and feature rich database and analytic systems,
customized to particular workloads. See [use cases] for examples.

DataFusion offers [SQL] and [`Dataframe`] APIs,
excellent [performance], built-in support for CSV, Parquet, JSON, and Avro,
extensive customization, and a great community.
"Out of the box,"
DataFusion offers [SQL] and [`Dataframe`] APIs, excellent [performance],
built-in support for CSV, Parquet, JSON, and Avro, extensive customization, and
a great community.

DataFusion features a full query planner, a columnar, streaming, multi-threaded,
vectorized execution engine, and partitioned data sources. You can
Expand Down
120 changes: 120 additions & 0 deletions datafusion/common/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,88 @@ pub fn get_data_dir(
}
}

#[macro_export]
macro_rules! create_array {
(Boolean, $values: expr) => {
std::sync::Arc::new(arrow::array::BooleanArray::from($values))
};
(Int8, $values: expr) => {
std::sync::Arc::new(arrow::array::Int8Array::from($values))
};
(Int16, $values: expr) => {
std::sync::Arc::new(arrow::array::Int16Array::from($values))
};
(Int32, $values: expr) => {
std::sync::Arc::new(arrow::array::Int32Array::from($values))
};
(Int64, $values: expr) => {
std::sync::Arc::new(arrow::array::Int64Array::from($values))
};
(UInt8, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt8Array::from($values))
};
(UInt16, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt16Array::from($values))
};
(UInt32, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt32Array::from($values))
};
(UInt64, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt64Array::from($values))
};
(Float16, $values: expr) => {
std::sync::Arc::new(arrow::array::Float16Array::from($values))
};
(Float32, $values: expr) => {
std::sync::Arc::new(arrow::array::Float32Array::from($values))
};
(Float64, $values: expr) => {
std::sync::Arc::new(arrow::array::Float64Array::from($values))
};
(Utf8, $values: expr) => {
std::sync::Arc::new(arrow::array::StringArray::from($values))
};
}

/// Creates a record batch from literal slice of values, suitable for rapid
/// testing and development.
///
/// Example:
/// ```
/// use datafusion_common::{record_batch, create_array};
/// let batch = record_batch!(
/// ("a", Int32, vec![1, 2, 3]),
/// ("b", Float64, vec![Some(4.0), None, Some(5.0)]),
/// ("c", Utf8, vec!["alpha", "beta", "gamma"])
/// );
/// ```
#[macro_export]
macro_rules! record_batch {
($(($name: expr, $type: ident, $values: expr)),*) => {
{
let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![
$(
arrow_schema::Field::new($name, arrow_schema::DataType::$type, true),
)*
]));

let batch = arrow_array::RecordBatch::try_new(
schema,
vec![$(
create_array!($type, $values),
)*]
);

batch
}
}
}

#[cfg(test)]
mod tests {
use crate::cast::{as_float64_array, as_int32_array, as_string_array};
use crate::error::Result;

use super::*;
use std::env;

Expand Down Expand Up @@ -333,4 +413,44 @@ mod tests {
let res = parquet_test_data();
assert!(PathBuf::from(res).is_dir());
}

#[test]
fn test_create_record_batch() -> Result<()> {
use arrow_array::Array;

let batch = record_batch!(
("a", Int32, vec![1, 2, 3, 4]),
("b", Float64, vec![Some(4.0), None, Some(5.0), None]),
("c", Utf8, vec!["alpha", "beta", "gamma", "delta"])
)?;

assert_eq!(3, batch.num_columns());
assert_eq!(4, batch.num_rows());

let values: Vec<_> = as_int32_array(batch.column(0))?
.values()
.iter()
.map(|v| v.to_owned())
.collect();
assert_eq!(values, vec![1, 2, 3, 4]);

let values: Vec<_> = as_float64_array(batch.column(1))?
.values()
.iter()
.map(|v| v.to_owned())
.collect();
assert_eq!(values, vec![4.0, 0.0, 5.0, 0.0]);

let nulls: Vec<_> = as_float64_array(batch.column(1))?
.nulls()
.unwrap()
.iter()
.collect();
assert_eq!(nulls, vec![true, false, true, false]);

let values: Vec<_> = as_string_array(batch.column(2))?.iter().flatten().collect();
assert_eq!(values, vec!["alpha", "beta", "gamma", "delta"]);

Ok(())
}
}
25 changes: 23 additions & 2 deletions datafusion/core/src/bin/print_functions_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ use datafusion_expr::{
aggregate_doc_sections, scalar_doc_sections, window_doc_sections, AggregateUDF,
DocSection, Documentation, ScalarUDF, WindowUDF,
};
use hashbrown::HashSet;
use itertools::Itertools;
use std::env::args;
use std::fmt::Write as _;

/// Print documentation for all functions of a given type to stdout
///
/// Usage: `cargo run --bin print_functions_docs -- <type>`
///
/// Called from `dev/update_function_docs.sh`
fn main() {
let args: Vec<String> = args().collect();

Expand Down Expand Up @@ -83,9 +89,12 @@ fn print_docs(
) -> String {
let mut docs = "".to_string();

// Ensure that all providers have documentation
let mut providers_with_no_docs = HashSet::new();

// doc sections only includes sections that have 'include' == true
for doc_section in doc_sections {
// make sure there is a function that is in this doc section
// make sure there is at least one function that is in this doc section
if !&providers.iter().any(|f| {
if let Some(documentation) = f.get_documentation() {
documentation.doc_section == doc_section
Expand All @@ -96,19 +105,21 @@ fn print_docs(
continue;
}

// filter out functions that are not in this doc section
let providers: Vec<&Box<dyn DocProvider>> = providers
.iter()
.filter(|&f| {
if let Some(documentation) = f.get_documentation() {
documentation.doc_section == doc_section
} else {
providers_with_no_docs.insert(f.get_name());
false
}
})
.collect::<Vec<_>>();

// write out section header
let _ = writeln!(docs, "## {} ", doc_section.label);
let _ = writeln!(docs, "\n## {} \n", doc_section.label);

if let Some(description) = doc_section.description {
let _ = writeln!(docs, "{description}");
Expand Down Expand Up @@ -202,9 +213,19 @@ fn print_docs(
}
}

// If there are any functions that do not have documentation, print them out
// eventually make this an error: https://github.com/apache/datafusion/issues/12872
if !providers_with_no_docs.is_empty() {
eprintln!("INFO: The following functions do not have documentation:");
for f in providers_with_no_docs {
eprintln!(" - {f}");
}
}

docs
}

/// Trait for accessing name / aliases / documentation for differnet functions
trait DocProvider {
fn get_name(&self) -> String;
fn get_aliases(&self) -> Vec<String>;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/catalog_common/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl ListingSchemaProvider {
file_type: self.format.clone(),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ mod tests {
schema: Arc::new(DFSchema::empty()),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
Expand Down Expand Up @@ -236,6 +237,7 @@ mod tests {
schema: Arc::new(DFSchema::empty()),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ use crate::physical_planner::create_physical_sort_exprs;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_catalog::Session;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::SortExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::SortExpr;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand Down Expand Up @@ -241,7 +241,7 @@ impl TableProvider for MemTable {
)
})
.collect::<Result<Vec<_>>>()?;
exec = exec.with_sort_information(file_sort_order);
exec = exec.try_with_sort_information(file_sort_order)?;
}

Ok(Arc::new(exec))
Expand Down
16 changes: 16 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,11 @@ impl SessionContext {
cmd: &CreateExternalTable,
) -> Result<DataFrame> {
let exist = self.table_exist(cmd.name.clone())?;

if cmd.temporary {
return not_impl_err!("Temporary tables not supported");
}

if exist {
match cmd.if_not_exists {
true => return self.return_empty_dataframe(),
Expand All @@ -761,10 +766,16 @@ impl SessionContext {
or_replace,
constraints,
column_defaults,
temporary,
} = cmd;

let input = Arc::unwrap_or_clone(input);
let input = self.state().optimize(&input)?;

if temporary {
return not_impl_err!("Temporary tables not supported");
}

let table = self.table(name.clone()).await;
match (if_not_exists, or_replace, table) {
(true, false, Ok(_)) => self.return_empty_dataframe(),
Expand Down Expand Up @@ -813,10 +824,15 @@ impl SessionContext {
input,
or_replace,
definition,
temporary,
} = cmd;

let view = self.table(name.clone()).await;

if temporary {
return not_impl_err!("Temporary views not supported");
}

match (or_replace, view) {
(true, Ok(_)) => {
self.deregister_table(name.clone())?;
Expand Down
Loading

0 comments on commit 7892e05

Please sign in to comment.