Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/min_max_strings
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 13, 2024
2 parents 7853d8f + ebfc155 commit 81431a0
Show file tree
Hide file tree
Showing 52 changed files with 2,421 additions and 683 deletions.
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@
DataFusion is an extensible query engine written in [Rust] that
uses [Apache Arrow] as its in-memory format.

The DataFusion libraries in this repository are used to build data-centric system software. DataFusion also provides the
following subprojects, which are packaged versions of DataFusion intended for end users.
This crate provides libraries and binaries for developers building fast and
feature rich database and analytic systems, customized to particular workloads.
See [use cases] for examples. The following related subprojects target end users:

- [DataFusion Python](https://github.com/apache/datafusion-python/) offers a Python interface for SQL and DataFrame
queries.
Expand All @@ -54,13 +55,10 @@ following subprojects, which are packaged versions of DataFusion intended for en
- [DataFusion Comet](https://github.com/apache/datafusion-comet/) is an accelerator for Apache Spark based on
DataFusion.

The target audience for the DataFusion crates in this repository are
developers building fast and feature rich database and analytic systems,
customized to particular workloads. See [use cases] for examples.

DataFusion offers [SQL] and [`Dataframe`] APIs,
excellent [performance], built-in support for CSV, Parquet, JSON, and Avro,
extensive customization, and a great community.
"Out of the box,"
DataFusion offers [SQL] and [`Dataframe`] APIs, excellent [performance],
built-in support for CSV, Parquet, JSON, and Avro, extensive customization, and
a great community.

DataFusion features a full query planner, a columnar, streaming, multi-threaded,
vectorized execution engine, and partitioned data sources. You can
Expand Down
120 changes: 120 additions & 0 deletions datafusion/common/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,88 @@ pub fn get_data_dir(
}
}

#[macro_export]
macro_rules! create_array {
(Boolean, $values: expr) => {
std::sync::Arc::new(arrow::array::BooleanArray::from($values))
};
(Int8, $values: expr) => {
std::sync::Arc::new(arrow::array::Int8Array::from($values))
};
(Int16, $values: expr) => {
std::sync::Arc::new(arrow::array::Int16Array::from($values))
};
(Int32, $values: expr) => {
std::sync::Arc::new(arrow::array::Int32Array::from($values))
};
(Int64, $values: expr) => {
std::sync::Arc::new(arrow::array::Int64Array::from($values))
};
(UInt8, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt8Array::from($values))
};
(UInt16, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt16Array::from($values))
};
(UInt32, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt32Array::from($values))
};
(UInt64, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt64Array::from($values))
};
(Float16, $values: expr) => {
std::sync::Arc::new(arrow::array::Float16Array::from($values))
};
(Float32, $values: expr) => {
std::sync::Arc::new(arrow::array::Float32Array::from($values))
};
(Float64, $values: expr) => {
std::sync::Arc::new(arrow::array::Float64Array::from($values))
};
(Utf8, $values: expr) => {
std::sync::Arc::new(arrow::array::StringArray::from($values))
};
}

/// Creates a record batch from literal slice of values, suitable for rapid
/// testing and development.
///
/// Example:
/// ```
/// use datafusion_common::{record_batch, create_array};
/// let batch = record_batch!(
/// ("a", Int32, vec![1, 2, 3]),
/// ("b", Float64, vec![Some(4.0), None, Some(5.0)]),
/// ("c", Utf8, vec!["alpha", "beta", "gamma"])
/// );
/// ```
#[macro_export]
macro_rules! record_batch {
($(($name: expr, $type: ident, $values: expr)),*) => {
{
let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![
$(
arrow_schema::Field::new($name, arrow_schema::DataType::$type, true),
)*
]));

let batch = arrow_array::RecordBatch::try_new(
schema,
vec![$(
create_array!($type, $values),
)*]
);

batch
}
}
}

#[cfg(test)]
mod tests {
use crate::cast::{as_float64_array, as_int32_array, as_string_array};
use crate::error::Result;

use super::*;
use std::env;

Expand Down Expand Up @@ -333,4 +413,44 @@ mod tests {
let res = parquet_test_data();
assert!(PathBuf::from(res).is_dir());
}

#[test]
fn test_create_record_batch() -> Result<()> {
use arrow_array::Array;

let batch = record_batch!(
("a", Int32, vec![1, 2, 3, 4]),
("b", Float64, vec![Some(4.0), None, Some(5.0), None]),
("c", Utf8, vec!["alpha", "beta", "gamma", "delta"])
)?;

assert_eq!(3, batch.num_columns());
assert_eq!(4, batch.num_rows());

let values: Vec<_> = as_int32_array(batch.column(0))?
.values()
.iter()
.map(|v| v.to_owned())
.collect();
assert_eq!(values, vec![1, 2, 3, 4]);

let values: Vec<_> = as_float64_array(batch.column(1))?
.values()
.iter()
.map(|v| v.to_owned())
.collect();
assert_eq!(values, vec![4.0, 0.0, 5.0, 0.0]);

let nulls: Vec<_> = as_float64_array(batch.column(1))?
.nulls()
.unwrap()
.iter()
.collect();
assert_eq!(nulls, vec![true, false, true, false]);

let values: Vec<_> = as_string_array(batch.column(2))?.iter().flatten().collect();
assert_eq!(values, vec!["alpha", "beta", "gamma", "delta"]);

Ok(())
}
}
2 changes: 1 addition & 1 deletion datafusion/core/src/bin/print_functions_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn print_docs(
.collect::<Vec<_>>();

// write out section header
let _ = writeln!(docs, "## {} ", doc_section.label);
let _ = writeln!(docs, "\n## {} \n", doc_section.label);

if let Some(description) = doc_section.description {
let _ = writeln!(docs, "{description}");
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ use crate::physical_planner::create_physical_sort_exprs;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_catalog::Session;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::SortExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::SortExpr;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand Down Expand Up @@ -241,7 +241,7 @@ impl TableProvider for MemTable {
)
})
.collect::<Result<Vec<_>>>()?;
exec = exec.with_sort_information(file_sort_order);
exec = exec.try_with_sort_information(file_sort_order)?;
}

Ok(Arc::new(exec))
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
let running_source = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
.unwrap()
.with_sort_information(vec![sort_keys]),
.try_with_sort_information(vec![sort_keys])
.unwrap(),
);

let aggregate_expr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ mod sp_repartition_fuzz_tests {
let running_source = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
.unwrap()
.with_sort_information(vec![sort_keys.clone()]),
.try_with_sort_information(vec![sort_keys.clone()])
.unwrap(),
);
let hash_exprs = vec![col("c", &schema).unwrap()];

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ async fn run_window_test(
];
let mut exec1 = Arc::new(
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None)?
.with_sort_information(vec![source_sort_keys.clone()]),
.try_with_sort_information(vec![source_sort_keys.clone()])?,
) as _;
// Table is ordered according to ORDER BY a, b, c In linear test we use PARTITION BY b, ORDER BY a
// For WindowAggExec to produce correct result it need table to be ordered by b,a. Hence add a sort.
Expand All @@ -673,7 +673,7 @@ async fn run_window_test(
)?) as _;
let exec2 = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)?
.with_sort_information(vec![source_sort_keys.clone()]),
.try_with_sort_information(vec![source_sort_keys.clone()])?,
);
let running_window_exec = Arc::new(BoundedWindowAggExec::try_new(
vec![create_window_expr(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ impl TableProvider for SortedTableProvider {
) -> Result<Arc<dyn ExecutionPlan>> {
let mem_exec =
MemoryExec::try_new(&self.batches, self.schema(), projection.cloned())?
.with_sort_information(self.sort_information.clone());
.try_with_sort_information(self.sort_information.clone())?;

Ok(Arc::new(mem_exec))
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/udf_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl DocumentationBuilder {
///
/// ```text
/// <arg_name>:
/// <expression_type> expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
/// <expression_type> expression to operate on. Can be a constant, column, or function, and any combination of operators.
/// ```
pub fn with_standard_argument(
self,
Expand Down
35 changes: 34 additions & 1 deletion datafusion/functions-aggregate/src/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ use datafusion_common::ScalarValue;
use datafusion_common::{
downcast_value, internal_err, not_impl_err, DataFusionError, Result,
};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_APPROXIMATE;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
use datafusion_expr::{
Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
};
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::OnceLock;
make_udaf_expr_and_func!(
ApproxDistinct,
approx_distinct,
Expand Down Expand Up @@ -303,4 +307,33 @@ impl AggregateUDFImpl for ApproxDistinct {
};
Ok(accumulator)
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_approx_distinct_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_approx_distinct_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_APPROXIMATE)
.with_description(
"Returns the approximate number of distinct input values calculated using the HyperLogLog algorithm.",
)
.with_syntax_example("approx_distinct(expression)")
.with_sql_example(r#"```sql
> SELECT approx_distinct(column_name) FROM table_name;
+-----------------------------------+
| approx_distinct(column_name) |
+-----------------------------------+
| 42 |
+-----------------------------------+
```"#,
)
.with_argument("expression", "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.")
.build()
.unwrap()
})
}
35 changes: 34 additions & 1 deletion datafusion/functions-aggregate/src/approx_median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

use std::any::Any;
use std::fmt::Debug;
use std::sync::OnceLock;

use arrow::{datatypes::DataType, datatypes::Field};
use arrow_schema::DataType::{Float64, UInt64};

use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_APPROXIMATE;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
use datafusion_expr::{
Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
};

use crate::approx_percentile_cont::ApproxPercentileAccumulator;

Expand Down Expand Up @@ -116,4 +120,33 @@ impl AggregateUDFImpl for ApproxMedian {
acc_args.exprs[0].data_type(acc_args.schema)?,
)))
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_approx_median_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_approx_median_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_APPROXIMATE)
.with_description(
"Returns the approximate median (50th percentile) of input values. It is an alias of `approx_percentile_cont(x, 0.5)`.",
)
.with_syntax_example("approx_median(expression)")
.with_sql_example(r#"```sql
> SELECT approx_median(column_name) FROM table_name;
+-----------------------------------+
| approx_median(column_name) |
+-----------------------------------+
| 23.5 |
+-----------------------------------+
```"#,
)
.with_argument("expression", "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.")
.build()
.unwrap()
})
}
Loading

0 comments on commit 81431a0

Please sign in to comment.