Skip to content

Commit

Permalink
introduce binary_as_string parquet option
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal authored and alamb committed Oct 13, 2024
1 parent 711bf93 commit abce0e9
Show file tree
Hide file tree
Showing 16 changed files with 452 additions and 154 deletions.
15 changes: 9 additions & 6 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,15 @@ impl RunOpt {
None => queries.min_query_id()..=queries.max_query_id(),
};

// configure parquet options
let mut config = self.common.config();
config
.options_mut()
.execution
.parquet
.schema_force_view_types = self.common.force_view_types;
{
let parquet_options = &mut config.options_mut().execution.parquet;
parquet_options.schema_force_view_types = self.common.force_view_types;
// The hits_partitioned dataset specifies string columns
// as binary due to how it was written. Force it to strings
parquet_options.binary_as_string = true;
}

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;
Expand Down Expand Up @@ -149,7 +152,7 @@ impl RunOpt {
Ok(())
}

/// Registrs the `hits.parquet` as a table named `hits`
/// Registers the `hits.parquet` as a table named `hits`
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
let options = Default::default();
let path = self.path.as_os_str().to_str().unwrap();
Expand Down
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ config_namespace! {
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_view_types: bool, default = false

/// (reading) If true, parquet reader will read columns of
/// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
///
/// Parquet files generated by some legacy writers do not correctly set
/// the UTF8 flag for strings, causing string columns to be loaded as
/// BLOB instead.
pub binary_as_string: bool, default = false

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl ParquetOptions {
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _, // reads not used for writer props
schema_force_view_types: _,
binary_as_string: _, // not used for writer props
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -442,6 +443,7 @@ mod tests {
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_view_types: defaults.schema_force_view_types,
binary_as_string: defaults.binary_as_string,
}
}

Expand Down Expand Up @@ -543,6 +545,7 @@ mod tests {
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
schema_force_view_types: global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
},
column_specific_options,
key_value_metadata,
Expand Down
81 changes: 81 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,87 @@ pub(crate) fn coerce_file_schema_to_view_type(
))
}

/// Transform a schema to force binary types to be strings
pub fn transform_binary_to_string(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Binary => Arc::new(
Field::new(field.name(), DataType::Utf8, field.is_nullable())
.with_metadata(field.metadata().to_owned()),
),
DataType::LargeBinary => Arc::new(
Field::new(field.name(), DataType::LargeUtf8, field.is_nullable())
.with_metadata(field.metadata().to_owned()),
),
_ => field.clone(),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}

/// If the table schema uses a string type, coerce the file schema to use a string type.
pub(crate) fn coerce_file_schema_to_string_type(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
let mut transform = false;
let table_fields: HashMap<_, _> = table_schema
.fields
.iter()
.map(|f| (f.name(), f.data_type()))
.collect();
let transformed_fields: Vec<Arc<Field>> = file_schema
.fields
.iter()
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
(Some(DataType::Utf8), DataType::Binary) => {
transform = true;
Arc::new(Field::new(
field.name(),
DataType::Utf8,
field.is_nullable(),
))
}
(Some(DataType::LargeUtf8), DataType::LargeBinary) => {
transform = true;
Arc::new(Field::new(
field.name(),
DataType::LargeUtf8,
field.is_nullable(),
))
}
// If `schema_force_view_types` is enabled, the actual data could be `Binary` or `LargeBinary`
// because we will first change the table schema for binary-to-string coercion, then apply the
// string-to-view transformation. So we need all binary types to be coerced to `Utf8View` here.
(
Some(DataType::Utf8View),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
))
}
_ => field.clone(),
},
)
.collect();

if !transform {
None
} else {
Some(Schema::new_with_metadata(
transformed_fields,
file_schema.metadata.clone(),
))
}
}

#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
Expand Down
28 changes: 26 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use std::sync::Arc;
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{
coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat,
FileFormatFactory, FilePushdownSupport, FileScanConfig,
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
transform_binary_to_string, transform_schema_to_view, FileFormat, FileFormatFactory,
FilePushdownSupport, FileScanConfig,
};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
Expand Down Expand Up @@ -260,6 +261,19 @@ impl ParquetFormat {
self.options.global.schema_force_view_types = use_views;
self
}

/// Return `true` if binary type will be read as string.
pub fn binary_as_string(&self) -> bool {
self.options.global.binary_as_string
}

/// If true, will read binary type as string.
///
/// Refer to [`Self::binary_as_string`].
pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
self.options.global.binary_as_string = binary_as_string;
self
}
}

/// Clears all metadata (Schema level and field level) on an iterator
Expand Down Expand Up @@ -350,6 +364,12 @@ impl FileFormat for ParquetFormat {
Schema::try_merge(schemas)
}?;

let schema = if self.binary_as_string() {
transform_binary_to_string(&schema)
} else {
schema
};

let schema = if self.force_view_types() {
transform_schema_to_view(&schema)
} else {
Expand Down Expand Up @@ -552,6 +572,10 @@ pub fn statistics_from_parquet_meta_calc(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;
if let Some(merged) = coerce_file_schema_to_string_type(&table_schema, &file_schema) {
file_schema = merged;
}

if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &file_schema) {
file_schema = merged;
}
Expand Down
23 changes: 16 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

//! [`ParquetOpener`] for opening Parquet files

use crate::datasource::file_format::coerce_file_schema_to_view_type;
use crate::datasource::file_format::{
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
};
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
Expand Down Expand Up @@ -80,7 +82,7 @@ pub(super) struct ParquetOpener {
}

impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
let extensions = file_meta.extensions.clone();
let file_name = file_meta.location().to_string();
Expand Down Expand Up @@ -121,7 +123,14 @@ impl FileOpener for ParquetOpener {
let mut metadata_timer = file_metrics.metadata_load_time.timer();
let metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
let mut schema = metadata.schema().clone();
let mut schema = Arc::clone(metadata.schema());

if let Some(merged) =
coerce_file_schema_to_string_type(&table_schema, &schema)
{
schema = Arc::new(merged);
}

// read with view types
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
{
Expand All @@ -130,16 +139,16 @@ impl FileOpener for ParquetOpener {

let options = ArrowReaderOptions::new()
.with_page_index(enable_page_index)
.with_schema(schema.clone());
.with_schema(Arc::clone(&schema));
let metadata =
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;
ArrowReaderMetadata::try_new(Arc::clone(metadata.metadata()), options)?;

metadata_timer.stop();

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);

let file_schema = builder.schema().clone();
let file_schema = Arc::clone(builder.schema());

let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(&file_schema)?;
Expand Down Expand Up @@ -177,7 +186,7 @@ impl FileOpener for ParquetOpener {

// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
let file_metadata = builder.metadata().clone();
let file_metadata = Arc::clone(builder.metadata());
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ message ParquetOptions {
bool bloom_filter_on_read = 26; // default = true
bool bloom_filter_on_write = 27; // default = false
bool schema_force_view_types = 28; // default = false
bool binary_as_string = 29; // default = false

oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
Expand Down
11 changes: 6 additions & 5 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
pruning: value.pruning,
skip_metadata: value.skip_metadata,
metadata_size_hint: value
.metadata_size_hint_opt.clone()
.metadata_size_hint_opt
.map(|opt| match opt {
protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
})
Expand Down Expand Up @@ -958,6 +958,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
schema_force_view_types: value.schema_force_view_types,
binary_as_string: value.binary_as_string,
})
}
}
Expand All @@ -979,7 +980,7 @@ impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
})
.unwrap_or(None),
max_statistics_size: value
.max_statistics_size_opt.clone()
.max_statistics_size_opt
.map(|opt| match opt {
protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => Some(v as usize),
})
Expand All @@ -990,18 +991,18 @@ impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
})
.unwrap_or(None),
bloom_filter_enabled: value.bloom_filter_enabled_opt.clone().map(|opt| match opt {
bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
})
.unwrap_or(None),
bloom_filter_fpp: value
.bloom_filter_fpp_opt.clone()
.bloom_filter_fpp_opt
.map(|opt| match opt {
protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
})
.unwrap_or(None),
bloom_filter_ndv: value
.bloom_filter_ndv_opt.clone()
.bloom_filter_ndv_opt
.map(|opt| match opt {
protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
})
Expand Down
Loading

0 comments on commit abce0e9

Please sign in to comment.