diff --git a/crates/datafusion_ext/src/planner/relation/mod.rs b/crates/datafusion_ext/src/planner/relation/mod.rs index c6d3954d9..db8ae1ac7 100644 --- a/crates/datafusion_ext/src/planner/relation/mod.rs +++ b/crates/datafusion_ext/src/planner/relation/mod.rs @@ -17,10 +17,12 @@ use std::collections::HashMap; use std::path::Path; +use std::path::PathBuf; use async_recursion::async_recursion; use datafusion::common::{DataFusionError, OwnedTableReference, Result}; use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder}; +use datafusion::parquet::file; use datafusion::scalar::ScalarValue; use datafusion::sql::planner::PlannerContext; use datafusion::sql::sqlparser::ast; @@ -242,6 +244,12 @@ fn infer_func_for_file(path: &str) -> Result { .ok_or_else(|| DataFusionError::Plan(format!("strange file extension: {path}")))? .to_lowercase(); + let filename = PathBuf::from(path) + .file_name() + .ok_or_else(|| DataFusionError::Plan(format!("NO file name provided: {path}")))? + .to_str() + .ok_or_else(|| DataFusionError::Plan(format!("Improper file name: {path}")))?; + // TODO: We can be a bit more sophisticated here and handle compression // schemes as well. Ok(match ext.as_str() { @@ -261,6 +269,10 @@ fn infer_func_for_file(path: &str) -> Result { schema: "public".into(), table: "read_bson".into(), }, + "gz" => { + //handling compressed files with .gz extension + super::infer_func_from_compressed_file(filename)? + } ext => { return Err(DataFusionError::Plan(format!( "unable to infer how to handle file extension: {ext}" @@ -268,3 +280,34 @@ fn infer_func_for_file(path: &str) -> Result { } }) } + +fn infer_func_from_compressed_file(filename: &str) -> Result { + if filename.contains(".json.gz") + | filename.contains(".json1.gz") + | filename.contains(".ndjson.gz") + { + return Ok(OwnedTableReference::Partial { + schema: "public".into(), + table: "ndjson_scan".into(), + }); + } else if filename.contains(".parquet.gz") { + return Ok(OwnedTableReference::Partial { + schema: "public".into(), + table: "parquet_scan".into(), + }); + } else if filename.contains(".csv.gz") { + return Ok(OwnedTableReference::Partial { + schema: "public".into(), + table: "csv_scan".into(), + }); + } else if filename.contains(".bson.gz") { + return Ok(OwnedTableReference::Partial { + schema: "public".into(), + table: "read_bson".into(), + }); + } else { + return Err(DataFusionError::Plan(format!( + "Improper compressed filename with extension .gz : {filename}" + ))); + } +} diff --git a/crates/datasources/src/native/access.rs b/crates/datasources/src/native/access.rs index 4e5a23b8d..c813a4c24 100644 --- a/crates/datasources/src/native/access.rs +++ b/crates/datasources/src/native/access.rs @@ -120,6 +120,17 @@ fn arrow_to_delta_safe(arrow_type: &DataType) -> DeltaResult { metadata: Some(metadata), }) } + dtype @ (DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64) => { + let mut metadata = HashMap::new(); + metadata.insert("arrow_type".to_string(), json!(dtype)); + + let delta_type = dtype.try_into()?; + + Ok(DeltaField { + data_type: delta_type, + metadata: Some(metadata), + }) + } other => { let delta_type = other.try_into()?; Ok(DeltaField { @@ -130,7 +141,6 @@ fn arrow_to_delta_safe(arrow_type: &DataType) -> DeltaResult { } } - impl NativeTableStorage { /// Create a native table storage provider from a URL and an object store instance /// rooted at that location. @@ -194,7 +204,6 @@ impl NativeTableStorage { .with_table_name(&table.meta.name) .with_log_store(delta_store); - for col in &opts.columns { let delta_col = arrow_to_delta_safe(&col.arrow_type)?; @@ -371,7 +380,6 @@ impl TableProvider for NativeTable { if let Some(arrow_type) = metadata.get("arrow_type") { // this is dumb AF, delta-lake is returning a string of a json object instead of a json object - // any panics here are bugs in writing the metadata in the first place let s: String = serde_json::from_str(arrow_type).expect("metadata was not correctly written");