Skip to content

Commit

Permalink
Merge branch 'main' into tycho/release-atomically
Browse files Browse the repository at this point in the history
  • Loading branch information
greyscaled authored Feb 3, 2024
2 parents d54d21e + f8be59e commit 241e569
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 28 deletions.
4 changes: 3 additions & 1 deletion crates/datafusion_ext/src/runtime/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ impl TableProvider for RuntimeAwareTableProvider {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
self.provider.supports_filters_pushdown(filters)
let supports_pushdowns = self.provider.supports_filters_pushdown(filters)?;

Ok(supports_pushdowns)
}

fn statistics(&self) -> Option<Statistics> {
Expand Down
72 changes: 67 additions & 5 deletions crates/datasources/src/bson/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::arrow::array::{
StructArray,
TimestampMicrosecondBuilder,
TimestampMillisecondBuilder,
TimestampNanosecondBuilder,
TimestampSecondBuilder,
};
use datafusion::arrow::datatypes::{DataType, Field, Fields, TimeUnit};
Expand Down Expand Up @@ -331,7 +332,10 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
)
}

// Datetime (actual timestamps that you'd actually use. in an application )
// Datetime (actual timestamps that you'd actually use in an application)
(RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Second, _)) => {
append_scalar!(TimestampSecondBuilder, col, v.timestamp_millis() / 1000)
}
(RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Millisecond, _)) => {
append_scalar!(TimestampMillisecondBuilder, col, v.timestamp_millis())
}
Expand All @@ -342,9 +346,19 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
v.timestamp_millis() * 1000
)
}
(RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Nanosecond, _)) => {
append_scalar!(
TimestampNanosecondBuilder,
col,
v.timestamp_millis() * 1000 * 1000
)
}
(RawBsonRef::DateTime(v), DataType::Date64) => {
append_scalar!(Date64Builder, col, v.timestamp_millis())
}
(RawBsonRef::DateTime(v), DataType::Date32) => {
append_scalar!(Date32Builder, col, (v.timestamp_millis() / 1000) as i32)
}

// Document
(RawBsonRef::Document(nested), DataType::Struct(_)) => {
Expand Down Expand Up @@ -411,21 +425,56 @@ fn append_null(typ: &DataType, col: &mut dyn ArrayBuilder) -> Result<()> {
.downcast_mut::<Float64Builder>()
.unwrap()
.append_null(),
&DataType::Timestamp(_, _) => col
&DataType::Timestamp(TimeUnit::Nanosecond, _) => col
.as_any_mut()
.downcast_mut::<TimestampNanosecondBuilder>()
.unwrap()
.append_null(),
&DataType::Timestamp(TimeUnit::Microsecond, _) => col
.as_any_mut()
.downcast_mut::<TimestampMillisecondBuilder>()
.unwrap()
.append_null(),
&DataType::Timestamp(TimeUnit::Millisecond, _) => col
.as_any_mut()
.downcast_mut::<TimestampMillisecondBuilder>()
.unwrap()
.append_null(),
&DataType::Timestamp(TimeUnit::Second, _) => col
.as_any_mut()
.downcast_mut::<TimestampMillisecondBuilder>() // TODO: Possibly change to nanosecond.
.downcast_mut::<TimestampSecondBuilder>()
.unwrap()
.append_null(),
&DataType::Date64 => col
.as_any_mut()
.downcast_mut::<Date64Builder>()
.unwrap()
.append_null(),
&DataType::Date32 => col
.as_any_mut()
.downcast_mut::<Date32Builder>()
.unwrap()
.append_null(),
&DataType::Utf8 => col
.as_any_mut()
.downcast_mut::<StringBuilder>()
.unwrap()
.append_null(),
&DataType::LargeUtf8 => col
.as_any_mut()
.downcast_mut::<LargeStringBuilder>()
.unwrap()
.append_null(),
&DataType::Binary => col
.as_any_mut()
.downcast_mut::<BinaryBuilder>()
.unwrap()
.append_null(),
&DataType::LargeBinary => col
.as_any_mut()
.downcast_mut::<LargeBinaryBuilder>()
.unwrap()
.append_null(),
&DataType::Struct(_) => col
.as_any_mut()
.downcast_mut::<RecordStructBuilder>()
Expand Down Expand Up @@ -453,11 +502,24 @@ fn column_builders_for_fields(
DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
DataType::Timestamp(_, _) => {
Box::new(TimestampMicrosecondBuilder::with_capacity(capacity)) // TODO: Possibly change to nanosecond.
DataType::Timestamp(TimeUnit::Second, _) => {
Box::new(TimestampSecondBuilder::with_capacity(capacity))
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
Box::new(TimestampMicrosecondBuilder::with_capacity(capacity))
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
Box::new(TimestampMillisecondBuilder::with_capacity(capacity))
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
Box::new(TimestampNanosecondBuilder::with_capacity(capacity))
}
DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, 10)), // TODO: Can collect avg when inferring schema.
DataType::LargeUtf8 => Box::new(LargeStringBuilder::with_capacity(capacity, 10)), // TODO: Can collect avg when inferring schema.
DataType::Binary => Box::new(BinaryBuilder::with_capacity(capacity, 10)), // TODO: Can collect avg when inferring schema.
DataType::LargeBinary => Box::new(LargeBinaryBuilder::with_capacity(capacity, 10)), // TODO: Can collect avg when inferring schema.
DataType::Decimal128(_, _) => Box::new(Decimal128Builder::with_capacity(capacity)), // TODO: Can collect avg when inferring schema.
DataType::Struct(fields) => {
let nested = column_builders_for_fields(fields.clone(), capacity)?;
Expand Down
44 changes: 25 additions & 19 deletions crates/datasources/src/bson/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod schema;
pub mod stream;
pub mod table;

use bson::DateTime;
use datafusion::arrow::array::cast::as_string_array;
use datafusion::arrow::array::types::{
Date32Type,
Expand Down Expand Up @@ -36,7 +37,13 @@ use datafusion::arrow::array::types::{
UInt8Type,
};
use datafusion::arrow::array::{Array, AsArray, StructArray};
use datafusion::arrow::datatypes::{DataType, Fields, IntervalUnit, TimeUnit};
use datafusion::arrow::datatypes::{
DataType,
Fields,
IntervalUnit,
TimeUnit,
TimestampNanosecondType,
};
use datafusion::arrow::error::ArrowError;

pub struct BsonBatchConverter {
Expand Down Expand Up @@ -193,14 +200,16 @@ pub fn array_to_bson(array: &dyn Array) -> Result<Vec<bson::Bson>, ArrowError> {
}))
})
}
DataType::Date32 => array
.as_primitive::<Date32Type>()
.iter()
.for_each(|val| out.push(bson::Bson::Int32(val.unwrap_or_default()))),
DataType::Date64 => array
.as_primitive::<Date64Type>()
.iter()
.for_each(|val| out.push(bson::Bson::Int64(val.unwrap_or_default()))),
DataType::Date64 => array.as_primitive::<Date64Type>().iter().for_each(|val| {
out.push(bson::Bson::DateTime(DateTime::from_millis(
val.unwrap_or_default(),
)))
}),
DataType::Date32 => array.as_primitive::<Date32Type>().iter().for_each(|val| {
out.push(bson::Bson::DateTime(DateTime::from_millis(
(val.unwrap_or_default() / 1000) as i64,
)))
}),
DataType::Interval(IntervalUnit::DayTime) => array
.as_primitive::<IntervalDayTimeType>()
.iter()
Expand Down Expand Up @@ -235,11 +244,11 @@ pub fn array_to_bson(array: &dyn Array) -> Result<Vec<bson::Bson>, ArrowError> {
.iter()
.for_each(|val| {
out.push(bson::Bson::DateTime(bson::datetime::DateTime::from_millis(
val.unwrap_or_default() / 100,
val.unwrap_or_default() / 1000,
)))
}),
DataType::Timestamp(TimeUnit::Nanosecond, _) => array
.as_primitive::<TimestampMicrosecondType>()
.as_primitive::<TimestampNanosecondType>()
.iter()
.for_each(|val| {
out.push(bson::Bson::DateTime(bson::datetime::DateTime::from_millis(
Expand All @@ -254,14 +263,6 @@ pub fn array_to_bson(array: &dyn Array) -> Result<Vec<bson::Bson>, ArrowError> {
.as_primitive::<Time32MillisecondType>()
.iter()
.for_each(|val| out.push(bson::Bson::Int32(val.unwrap_or_default()))),
DataType::Time32(TimeUnit::Nanosecond)
| DataType::Time32(TimeUnit::Microsecond)
| DataType::Time64(TimeUnit::Second)
| DataType::Time64(TimeUnit::Millisecond) => {
return Err(ArrowError::CastError(
"unreasonable time value conversion BSON".to_string(),
))
}
DataType::Time64(TimeUnit::Microsecond) => array
.as_primitive::<Time64MicrosecondType>()
.iter()
Expand All @@ -270,6 +271,11 @@ pub fn array_to_bson(array: &dyn Array) -> Result<Vec<bson::Bson>, ArrowError> {
.as_primitive::<Time64NanosecondType>()
.iter()
.for_each(|val| out.push(bson::Bson::Int64(val.unwrap_or_default()))),
DataType::Time32(_) | DataType::Time64(_) => {
return Err(ArrowError::CastError(
"unreasonable time value conversion BSON".to_string(),
))
}
DataType::Duration(TimeUnit::Second) => array
.as_primitive::<DurationSecondType>()
.iter()
Expand Down
35 changes: 32 additions & 3 deletions crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use datafusion::error::{DataFusionError, Result as DatafusionResult};
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_expr::TableType;
use datafusion::logical_expr::{TableProviderFilterPushDown, TableType};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{project_schema, ExecutionPlan};
use datafusion::prelude::Expr;
use datafusion_ext::metrics::ReadOnlyDataSourceMetricsExecAdapter;
use errors::{ObjectStoreSourceError, Result};
Expand Down Expand Up @@ -105,6 +106,16 @@ impl TableProvider for MultiSourceTableProvider {
Ok(Arc::new(UnionExec::new(plans)))
}
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
// we just look at the first source
self.sources
.first()
.unwrap()
.supports_filters_pushdown(filters)
}
}

#[async_trait]
Expand Down Expand Up @@ -308,6 +319,13 @@ impl TableProvider for ObjStoreTableProvider {
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
let (files, statistics) = get_statistics_with_limit(files, self.schema(), limit).await?;

// If there are no files, return an empty exec plan.
if files.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

let config = FileScanConfig {
object_store_url: self.base_url.clone(),
file_schema: self.arrow_schema.clone(),
Expand All @@ -331,9 +349,20 @@ impl TableProvider for ObjStoreTableProvider {
.create_physical_plan(ctx, config, filters.as_ref())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

Ok(Arc::new(ReadOnlyDataSourceMetricsExecAdapter::new(plan)))
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> std::result::Result<Vec<TableProviderFilterPushDown>, datafusion::error::DataFusionError>
{
// todo: support exact pushdonws based on hive style partitioning
filters
.iter()
.map(|_| Ok(TableProviderFilterPushDown::Inexact))
.collect()
}
}

pub fn file_type_from_path(path: &ObjectStorePath) -> Result<FileType> {
Expand Down
8 changes: 8 additions & 0 deletions testdata/sqllogictests/functions/parquet_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ select count(*) from parquet_scan([
statement error No such file or directory
select * from parquet_scan('./testdata/parquet/userdata1.paruqet');


# filter pushdowns
# Skipping until we have a way to test the outputs of 'explain'. See https://github.com/GlareDB/glaredb/issues/2581
# query I
# select * from (explain select count(*) from parquet_scan('./testdata/parquet/userdata1.parquet') where country = 'Sweden') where plan_type = 'physical_plan' and plan like 'predicate=country@8 = Sweden, pruning_predicate=country_min@0 <= Sweden AND Sweden <= country_max@'
# ----


# Ambiguous name.
# query I
# select count(*)
Expand Down
33 changes: 33 additions & 0 deletions testdata/sqllogictests_object_store/local/bson.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
statement ok
COPY (select * from '${PWD}/testdata/parquet/userdata1.parquet') to '${TMP}/userdata1.bson';

query I
SELECT count(*) FROM '${TMP}/userdata1.bson';
----
1000

statement ok
create table timestamps (t timestamp);

statement ok
insert into timestamps values (arrow_cast(946684860000000, 'Timestamp(Microsecond, None)'));

query
select * from timestamps;
----
2000-01-01 00:01:00

statement ok
copy timestamps to '${TMP}/timestamp_test_out.bson';

# bson's date format is an int64, of milliseconds since unix epoch;
# which most closely maps to arrow's Date64, and since that's the only
# date format, (the bson "timestamp" type is really a mongodb
# implementation detail, generally regarded to be a mistake.) all
# arrow timetstamp and date types are upcast or truncated into this
# type. (Time64 and Time32 just go to integers)

query
select * from '${TMP}/timestamp_test_out.bson';
----
946684860000

0 comments on commit 241e569

Please sign in to comment.