Skip to content

Commit

Permalink
Merge remote-tracking branch 'synnada-ai/refactor/oeq_eq_properties' …
Browse files Browse the repository at this point in the history
…into alamb/oeq_eq_properties
  • Loading branch information
alamb committed Nov 3, 2023
2 parents a5c87e0 + b582cda commit b92fd01
Show file tree
Hide file tree
Showing 49 changed files with 1,050 additions and 424 deletions.
32 changes: 32 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,41 @@ arrow-array = { version = "48.0.0", default-features = false, features = ["chron
arrow-buffer = { version = "48.0.0", default-features = false }
arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "48.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.1"
bytes = "1.4"
ctor = "0.2.0"
datafusion = { path = "datafusion/core" }
datafusion-common = { path = "datafusion/common" }
datafusion-expr = { path = "datafusion/expr" }
datafusion-sql = { path = "datafusion/sql" }
datafusion-optimizer = { path = "datafusion/optimizer" }
datafusion-physical-expr = { path = "datafusion/physical-expr" }
datafusion-physical-plan = { path = "datafusion/physical-plan" }
datafusion-execution = { path = "datafusion/execution" }
datafusion-proto = { path = "datafusion/proto" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest" }
datafusion-substrait = { path = "datafusion/substrait" }
dashmap = "5.4.0"
doc-comment = "0.3"
env_logger = "0.10"
futures = "0.3"
half = "2.2.1"
indexmap = "2.0.0"
itertools = "0.11"
log = "^0.4"
num_cpus = "1.13.0"
object_store = "0.7.0"
parking_lot = "0.12"
parquet = { version = "48.0.0", features = ["arrow", "async", "object_store"] }
rand = "0.8"
rstest = "0.18.0"
serde_json = "1"
sqlparser = { version = "0.39.0", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
chrono = { version = "0.4.31", default-features = false }
url = "2.2"

[profile.release]
codegen-units = 1
Expand Down
10 changes: 5 additions & 5 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ snmalloc = ["snmalloc-rs"]
arrow = { workspace = true }
datafusion = { path = "../datafusion/core", version = "32.0.0" }
datafusion-common = { path = "../datafusion/common", version = "32.0.0" }
env_logger = "0.10"
futures = "0.3"
log = "^0.4"
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
num_cpus = { workspace = true }
parquet = { workspace = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
serde_json = { workspace = true }
snmalloc-rs = { version = "0.3", optional = true }
structopt = { version = "0.3", default-features = false }
test-utils = { path = "../test-utils/", version = "0.1.0" }
Expand Down
1 change: 0 additions & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,26 @@ rust-version = { workspace = true }
arrow = { workspace = true }
arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
async-trait = "0.1.41"
bytes = "1.4"
dashmap = "5.4"
async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
datafusion = { path = "../datafusion/core", features = ["avro"] }
datafusion-common = { path = "../datafusion/common" }
datafusion-expr = { path = "../datafusion/expr" }
datafusion-optimizer = { path = "../datafusion/optimizer" }
datafusion-sql = { path = "../datafusion/sql" }
env_logger = "0.10"
futures = "0.3"
log = "0.4"
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
mimalloc = { version = "0.1", default-features = false }
num_cpus = "1.13.0"
num_cpus = { workspace = true }
object_store = { version = "0.7.0", features = ["aws", "http"] }
prost = { version = "0.12", default-features = false }
prost-derive = { version = "0.11", default-features = false }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.82"
tempfile = "3"
serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
tonic = "0.10"
url = "2.2"
url = { workspace = true }
uuid = "1.2"
36 changes: 21 additions & 15 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::error::Result;
use datafusion::prelude::*;
use std::fs;
use std::fs::File;
use std::io::Write;
use tempfile::tempdir;

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
Expand All @@ -41,12 +43,19 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// create a csv file waiting to be written
let dir = tempdir()?;
let file_path = dir.path().join("example.csv");
let file = File::create(&file_path)?;
write_csv_file(file);

// Reading CSV file with inferred schema example
let csv_df = example_read_csv_file_with_inferred_schema().await;
let csv_df =
example_read_csv_file_with_inferred_schema(file_path.to_str().unwrap()).await;
csv_df.show().await?;

// Reading CSV file with defined schema
let csv_df = example_read_csv_file_with_schema().await;
let csv_df = example_read_csv_file_with_schema(file_path.to_str().unwrap()).await;
csv_df.show().await?;

// Reading PARQUET file and print describe
Expand All @@ -59,31 +68,28 @@ async fn main() -> Result<()> {
}

// Function to create an test CSV file
fn create_csv_file(path: String) {
fn write_csv_file(mut file: File) {
// Create the data to put into the csv file with headers
let content = r#"id,time,vote,unixtime,rating
a1,"10 6, 2013",3,1381017600,5.0
a2,"08 9, 2013",2,1376006400,4.5"#;
// write the data
fs::write(path, content).expect("Problem with writing file!");
file.write_all(content.as_ref())
.expect("Problem with writing file!");
}

// Example to read data from a csv file with inferred schema
async fn example_read_csv_file_with_inferred_schema() -> DataFrame {
let path = "example.csv";
// Create a csv file using the predefined function
create_csv_file(path.to_string());
async fn example_read_csv_file_with_inferred_schema(file_path: &str) -> DataFrame {
// Create a session context
let ctx = SessionContext::new();
// Register a lazy DataFrame using the context
ctx.read_csv(path, CsvReadOptions::default()).await.unwrap()
ctx.read_csv(file_path, CsvReadOptions::default())
.await
.unwrap()
}

// Example to read csv file with a defined schema for the csv file
async fn example_read_csv_file_with_schema() -> DataFrame {
let path = "example.csv";
// Create a csv file using the predefined function
create_csv_file(path.to_string());
async fn example_read_csv_file_with_schema(file_path: &str) -> DataFrame {
// Create a session context
let ctx = SessionContext::new();
// Define the schema
Expand All @@ -101,5 +107,5 @@ async fn example_read_csv_file_with_schema() -> DataFrame {
..Default::default()
};
// Register a lazy DataFrame by using the context and option provider
ctx.read_csv(path, csv_read_option).await.unwrap()
ctx.read_csv(file_path, csv_read_option).await.unwrap()
}
11 changes: 6 additions & 5 deletions datafusion-examples/examples/dataframe_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::DataType;
use std::sync::Arc;

use datafusion::error::Result;
Expand All @@ -38,15 +39,15 @@ async fn main() -> Result<()> {
Ok(())
}

//select c1,c2 from t1 where (select avg(t2.c2) from t2 where t1.c1 = t2.c1)>0 limit 10;
//select c1,c2 from t1 where (select avg(t2.c2) from t2 where t1.c1 = t2.c1)>0 limit 3;
async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
ctx.table("t1")
.await?
.filter(
scalar_subquery(Arc::new(
ctx.table("t2")
.await?
.filter(col("t1.c1").eq(col("t2.c1")))?
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
.aggregate(vec![], vec![avg(col("t2.c2"))])?
.select(vec![avg(col("t2.c2"))])?
.into_unoptimized_plan(),
Expand All @@ -60,7 +61,7 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
Ok(())
}

//SELECT t1.c1, t1.c2 FROM t1 WHERE t1.c2 in (select max(t2.c2) from t2 where t2.c1 > 0 ) limit 10
//SELECT t1.c1, t1.c2 FROM t1 WHERE t1.c2 in (select max(t2.c2) from t2 where t2.c1 > 0 ) limit 3;
async fn where_in_subquery(ctx: &SessionContext) -> Result<()> {
ctx.table("t1")
.await?
Expand All @@ -82,14 +83,14 @@ async fn where_in_subquery(ctx: &SessionContext) -> Result<()> {
Ok(())
}

//SELECT t1.c1, t1.c2 FROM t1 WHERE EXISTS (select t2.c2 from t2 where t1.c1 = t2.c1) limit 10
//SELECT t1.c1, t1.c2 FROM t1 WHERE EXISTS (select t2.c2 from t2 where t1.c1 = t2.c1) limit 3;
async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
ctx.table("t1")
.await?
.filter(exists(Arc::new(
ctx.table("t2")
.await?
.filter(col("t1.c1").eq(col("t2.c1")))?
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
.select(vec![col("t2.c2")])?
.into_unoptimized_plan(),
)))?
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
half = { version = "2.1", default-features = false }
num_cpus = "1.13.0"
num_cpus = { workspace = true }
object_store = { version = "0.7.0", default-features = false, optional = true }
parquet = { workspace = true, optional = true }
pyo3 = { version = "0.20.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl From<DataFusionError> for io::Error {
}

impl DataFusionError {
const BACK_TRACE_SEP: &str = "\n\nbacktrace: ";
const BACK_TRACE_SEP: &'static str = "\n\nbacktrace: ";

/// Get deepest underlying [`DataFusionError`]
///
Expand Down
20 changes: 10 additions & 10 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cast::{
};
use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
use crate::hash_utils::create_hashes;
use crate::utils::wrap_into_list_array;
use crate::utils::array_into_list_array;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::compute::kernels::numeric::*;
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
Expand Down Expand Up @@ -1667,7 +1667,7 @@ impl ScalarValue {
} else {
Self::iter_to_array(values.iter().cloned()).unwrap()
};
Arc::new(wrap_into_list_array(values))
Arc::new(array_into_list_array(values))
}

/// Converts a scalar value into an array of `size` rows.
Expand Down Expand Up @@ -2058,7 +2058,7 @@ impl ScalarValue {
let list_array = as_list_array(array);
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let arr = Arc::new(wrap_into_list_array(nested_array));
let arr = Arc::new(array_into_list_array(nested_array));

ScalarValue::List(arr)
}
Expand All @@ -2067,7 +2067,7 @@ impl ScalarValue {
let list_array = as_fixed_size_list_array(array)?;
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let arr = Arc::new(wrap_into_list_array(nested_array));
let arr = Arc::new(array_into_list_array(nested_array));

ScalarValue::List(arr)
}
Expand Down Expand Up @@ -3052,7 +3052,7 @@ mod tests {

let array = ScalarValue::new_list(scalars.as_slice(), &DataType::Utf8);

let expected = wrap_into_list_array(Arc::new(StringArray::from(vec![
let expected = array_into_list_array(Arc::new(StringArray::from(vec![
"rust",
"arrow",
"data-fusion",
Expand Down Expand Up @@ -3091,9 +3091,9 @@ mod tests {
#[test]
fn iter_to_array_string_test() {
let arr1 =
wrap_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"])));
array_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"])));
let arr2 =
wrap_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"])));
array_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"])));

let scalars = vec![
ScalarValue::List(Arc::new(arr1)),
Expand Down Expand Up @@ -4335,13 +4335,13 @@ mod tests {
// Define list-of-structs scalars

let nl0_array = ScalarValue::iter_to_array(vec![s0.clone(), s1.clone()]).unwrap();
let nl0 = ScalarValue::List(Arc::new(wrap_into_list_array(nl0_array)));
let nl0 = ScalarValue::List(Arc::new(array_into_list_array(nl0_array)));

let nl1_array = ScalarValue::iter_to_array(vec![s2.clone()]).unwrap();
let nl1 = ScalarValue::List(Arc::new(wrap_into_list_array(nl1_array)));
let nl1 = ScalarValue::List(Arc::new(array_into_list_array(nl1_array)));

let nl2_array = ScalarValue::iter_to_array(vec![s1.clone()]).unwrap();
let nl2 = ScalarValue::List(Arc::new(wrap_into_list_array(nl2_array)));
let nl2 = ScalarValue::List(Arc::new(array_into_list_array(nl2_array)));

// iter_to_array for list-of-struct
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
Expand Down
Loading

0 comments on commit b92fd01

Please sign in to comment.