Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/apache/datafusion into vect…
Browse files Browse the repository at this point in the history
…orized-hash-eq-vec
  • Loading branch information
jayzhan211 committed Oct 15, 2024
2 parents 7e4f4e3 + d9450da commit e07006d
Show file tree
Hide file tree
Showing 71 changed files with 2,841 additions and 965 deletions.
4 changes: 2 additions & 2 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl FileFormatFactory for TSVFileFactory {
&self,
state: &SessionState,
format_options: &std::collections::HashMap<String, String>,
) -> Result<std::sync::Arc<dyn FileFormat>> {
) -> Result<Arc<dyn FileFormat>> {
let mut new_options = format_options.clone();
new_options.insert("format.delimiter".to_string(), "\t".to_string());

Expand All @@ -164,7 +164,7 @@ impl FileFormatFactory for TSVFileFactory {
Ok(tsv_file_format)
}

fn default(&self) -> std::sync::Arc<dyn FileFormat> {
fn default(&self) -> Arc<dyn FileFormat> {
todo!()
}

Expand Down
120 changes: 120 additions & 0 deletions datafusion/common/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,88 @@ pub fn get_data_dir(
}
}

#[macro_export]
macro_rules! create_array {
(Boolean, $values: expr) => {
std::sync::Arc::new(arrow::array::BooleanArray::from($values))
};
(Int8, $values: expr) => {
std::sync::Arc::new(arrow::array::Int8Array::from($values))
};
(Int16, $values: expr) => {
std::sync::Arc::new(arrow::array::Int16Array::from($values))
};
(Int32, $values: expr) => {
std::sync::Arc::new(arrow::array::Int32Array::from($values))
};
(Int64, $values: expr) => {
std::sync::Arc::new(arrow::array::Int64Array::from($values))
};
(UInt8, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt8Array::from($values))
};
(UInt16, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt16Array::from($values))
};
(UInt32, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt32Array::from($values))
};
(UInt64, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt64Array::from($values))
};
(Float16, $values: expr) => {
std::sync::Arc::new(arrow::array::Float16Array::from($values))
};
(Float32, $values: expr) => {
std::sync::Arc::new(arrow::array::Float32Array::from($values))
};
(Float64, $values: expr) => {
std::sync::Arc::new(arrow::array::Float64Array::from($values))
};
(Utf8, $values: expr) => {
std::sync::Arc::new(arrow::array::StringArray::from($values))
};
}

/// Creates a record batch from literal slice of values, suitable for rapid
/// testing and development.
///
/// Example:
/// ```
/// use datafusion_common::{record_batch, create_array};
/// let batch = record_batch!(
/// ("a", Int32, vec![1, 2, 3]),
/// ("b", Float64, vec![Some(4.0), None, Some(5.0)]),
/// ("c", Utf8, vec!["alpha", "beta", "gamma"])
/// );
/// ```
#[macro_export]
macro_rules! record_batch {
($(($name: expr, $type: ident, $values: expr)),*) => {
{
let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![
$(
arrow_schema::Field::new($name, arrow_schema::DataType::$type, true),
)*
]));

let batch = arrow_array::RecordBatch::try_new(
schema,
vec![$(
create_array!($type, $values),
)*]
);

batch
}
}
}

#[cfg(test)]
mod tests {
use crate::cast::{as_float64_array, as_int32_array, as_string_array};
use crate::error::Result;

use super::*;
use std::env;

Expand Down Expand Up @@ -333,4 +413,44 @@ mod tests {
let res = parquet_test_data();
assert!(PathBuf::from(res).is_dir());
}

#[test]
fn test_create_record_batch() -> Result<()> {
use arrow_array::Array;

let batch = record_batch!(
("a", Int32, vec![1, 2, 3, 4]),
("b", Float64, vec![Some(4.0), None, Some(5.0), None]),
("c", Utf8, vec!["alpha", "beta", "gamma", "delta"])
)?;

assert_eq!(3, batch.num_columns());
assert_eq!(4, batch.num_rows());

let values: Vec<_> = as_int32_array(batch.column(0))?
.values()
.iter()
.map(|v| v.to_owned())
.collect();
assert_eq!(values, vec![1, 2, 3, 4]);

let values: Vec<_> = as_float64_array(batch.column(1))?
.values()
.iter()
.map(|v| v.to_owned())
.collect();
assert_eq!(values, vec![4.0, 0.0, 5.0, 0.0]);

let nulls: Vec<_> = as_float64_array(batch.column(1))?
.nulls()
.unwrap()
.iter()
.collect();
assert_eq!(nulls, vec![true, false, true, false]);

let values: Vec<_> = as_string_array(batch.column(2))?.iter().flatten().collect();
assert_eq!(values, vec!["alpha", "beta", "gamma", "delta"]);

Ok(())
}
}
23 changes: 22 additions & 1 deletion datafusion/core/src/bin/print_functions_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ use datafusion_expr::{
aggregate_doc_sections, scalar_doc_sections, window_doc_sections, AggregateUDF,
DocSection, Documentation, ScalarUDF, WindowUDF,
};
use hashbrown::HashSet;
use itertools::Itertools;
use std::env::args;
use std::fmt::Write as _;

/// Print documentation for all functions of a given type to stdout
///
/// Usage: `cargo run --bin print_functions_docs -- <type>`
///
/// Called from `dev/update_function_docs.sh`
fn main() {
let args: Vec<String> = args().collect();

Expand Down Expand Up @@ -83,9 +89,12 @@ fn print_docs(
) -> String {
let mut docs = "".to_string();

// Ensure that all providers have documentation
let mut providers_with_no_docs = HashSet::new();

// doc sections only includes sections that have 'include' == true
for doc_section in doc_sections {
// make sure there is a function that is in this doc section
// make sure there is at least one function that is in this doc section
if !&providers.iter().any(|f| {
if let Some(documentation) = f.get_documentation() {
documentation.doc_section == doc_section
Expand All @@ -96,12 +105,14 @@ fn print_docs(
continue;
}

// filter out functions that are not in this doc section
let providers: Vec<&Box<dyn DocProvider>> = providers
.iter()
.filter(|&f| {
if let Some(documentation) = f.get_documentation() {
documentation.doc_section == doc_section
} else {
providers_with_no_docs.insert(f.get_name());
false
}
})
Expand Down Expand Up @@ -202,9 +213,19 @@ fn print_docs(
}
}

// If there are any functions that do not have documentation, print them out
// eventually make this an error: https://github.com/apache/datafusion/issues/12872
if !providers_with_no_docs.is_empty() {
eprintln!("INFO: The following functions do not have documentation:");
for f in providers_with_no_docs {
eprintln!(" - {f}");
}
}

docs
}

/// Trait for accessing name / aliases / documentation for differnet functions
trait DocProvider {
fn get_name(&self) -> String;
fn get_aliases(&self) -> Vec<String>;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/catalog_common/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl ListingSchemaProvider {
file_type: self.format.clone(),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ mod tests {
schema: Arc::new(DFSchema::empty()),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
Expand Down Expand Up @@ -236,6 +237,7 @@ mod tests {
schema: Arc::new(DFSchema::empty()),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
Expand Down
16 changes: 16 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,11 @@ impl SessionContext {
cmd: &CreateExternalTable,
) -> Result<DataFrame> {
let exist = self.table_exist(cmd.name.clone())?;

if cmd.temporary {
return not_impl_err!("Temporary tables not supported");
}

if exist {
match cmd.if_not_exists {
true => return self.return_empty_dataframe(),
Expand All @@ -761,10 +766,16 @@ impl SessionContext {
or_replace,
constraints,
column_defaults,
temporary,
} = cmd;

let input = Arc::unwrap_or_clone(input);
let input = self.state().optimize(&input)?;

if temporary {
return not_impl_err!("Temporary tables not supported");
}

let table = self.table(name.clone()).await;
match (if_not_exists, or_replace, table) {
(true, false, Ok(_)) => self.return_empty_dataframe(),
Expand Down Expand Up @@ -813,10 +824,15 @@ impl SessionContext {
input,
or_replace,
definition,
temporary,
} = cmd;

let view = self.table(name.clone()).await;

if temporary {
return not_impl_err!("Temporary views not supported");
}

match (or_replace, view) {
(true, Ok(_)) => {
self.deregister_table(name.clone())?;
Expand Down
Loading

0 comments on commit e07006d

Please sign in to comment.