Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse arrow and parquet schema from PG table's tuple descriptor #24

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/arrow_parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod schema_visitor;
350 changes: 350 additions & 0 deletions src/arrow_parquet/schema_visitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
use std::{collections::HashMap, ops::Deref, sync::Arc};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to add a high-level description at the top of the file of what it does. schema_visitor is vague


use arrow::datatypes::{Field, Fields, Schema};
use arrow_schema::FieldRef;
use parquet::arrow::{arrow_to_parquet_schema, PARQUET_FIELD_ID_META_KEY};
use pg_sys::{
Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID, FLOAT8OID, INT2OID, INT4OID, INT8OID,
NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, TIMESTAMPTZOID, TIMETZOID,
};
use pgrx::{prelude::*, PgTupleDesc};

use crate::{
pgrx_utils::{
array_element_typoid, collect_valid_attributes, domain_array_base_elem_typoid,
is_array_type, is_composite_type, tuple_desc,
},
type_compat::{
geometry::is_postgis_geometry_type,
map::is_crunchy_map_type,
pg_arrow_type_conversions::{
extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod,
MAX_DECIMAL_PRECISION,
},
},
};

pub(crate) fn parquet_schema_string_from_tupledesc(tupledesc: &PgTupleDesc) -> String {
let arrow_schema = parse_arrow_schema_from_tupledesc(tupledesc);
let parquet_schema = arrow_to_parquet_schema(&arrow_schema)
.unwrap_or_else(|e| panic!("failed to convert arrow schema to parquet schema: {}", e));

let mut buf = Vec::new();
parquet::schema::printer::print_schema(&mut buf, &parquet_schema.root_schema_ptr());
String::from_utf8(buf).unwrap_or_else(|e| panic!("failed to convert schema to string: {}", e))
}

pub(crate) fn parse_arrow_schema_from_tupledesc(tupledesc: &PgTupleDesc) -> Schema {
let mut field_id = 0;

let mut struct_attribute_fields = vec![];

let include_generated_columns = true;
let attributes = collect_valid_attributes(tupledesc, include_generated_columns);

for attribute in attributes {
let attribute_name = attribute.name();
let attribute_typoid = attribute.type_oid().value();
let attribute_typmod = attribute.type_mod();

let field = if is_composite_type(attribute_typoid) {
let attribute_tupledesc = tuple_desc(attribute_typoid, attribute_typmod);
parse_struct_schema(attribute_tupledesc, attribute_name, &mut field_id)
} else if is_crunchy_map_type(attribute_typoid) {
let attribute_base_elem_typoid = domain_array_base_elem_typoid(attribute_typoid);
parse_map_schema(
attribute_base_elem_typoid,
attribute_typmod,
attribute_name,
&mut field_id,
)
} else if is_array_type(attribute_typoid) {
let attribute_element_typoid = array_element_typoid(attribute_typoid);
parse_list_schema(
attribute_element_typoid,
attribute_typmod,
attribute_name,
&mut field_id,
)
} else {
parse_primitive_schema(
attribute_typoid,
attribute_typmod,
attribute_name,
&mut field_id,
)
};

struct_attribute_fields.push(field);
}

Schema::new(Fields::from(struct_attribute_fields))
}

fn parse_struct_schema(tupledesc: PgTupleDesc, elem_name: &str, field_id: &mut i32) -> Arc<Field> {
pgrx::pg_sys::check_for_interrupts!();

let metadata = HashMap::from_iter(vec![(
PARQUET_FIELD_ID_META_KEY.into(),
field_id.to_string(),
)]);

*field_id += 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of wondering whether we should be slightly cleverer about field IDs to not instantly renumber everything if a column gets dropped

(but no great ideas there)


let mut child_fields: Vec<Arc<Field>> = vec![];

let include_generated_columns = true;
let attributes = collect_valid_attributes(&tupledesc, include_generated_columns);

for attribute in attributes {
if attribute.is_dropped() {
continue;
}

let attribute_name = attribute.name();
let attribute_oid = attribute.type_oid().value();
let attribute_typmod = attribute.type_mod();

let child_field = if is_composite_type(attribute_oid) {
let attribute_tupledesc = tuple_desc(attribute_oid, attribute_typmod);
parse_struct_schema(attribute_tupledesc, attribute_name, field_id)
} else if is_crunchy_map_type(attribute_oid) {
let attribute_base_elem_typoid = domain_array_base_elem_typoid(attribute_oid);
parse_map_schema(
attribute_base_elem_typoid,
attribute_typmod,
attribute_name,
field_id,
)
} else if is_array_type(attribute_oid) {
let attribute_element_typoid = array_element_typoid(attribute_oid);
parse_list_schema(
attribute_element_typoid,
attribute_typmod,
attribute_name,
field_id,
)
} else {
parse_primitive_schema(attribute_oid, attribute_typmod, attribute_name, field_id)
};

child_fields.push(child_field);
}

Field::new(
elem_name,
arrow::datatypes::DataType::Struct(Fields::from(child_fields)),
true,
)
.with_metadata(metadata)
.into()
}

fn parse_list_schema(typoid: Oid, typmod: i32, array_name: &str, field_id: &mut i32) -> Arc<Field> {
pgrx::pg_sys::check_for_interrupts!();

let list_metadata = HashMap::from_iter(vec![(
PARQUET_FIELD_ID_META_KEY.into(),
field_id.to_string(),
)]);

*field_id += 1;

let elem_field = if is_composite_type(typoid) {
let tupledesc = tuple_desc(typoid, typmod);
parse_struct_schema(tupledesc, array_name, field_id)
} else if is_crunchy_map_type(typoid) {
let base_elem_typoid = domain_array_base_elem_typoid(typoid);
parse_map_schema(base_elem_typoid, typmod, array_name, field_id)
} else {
parse_primitive_schema(typoid, typmod, array_name, field_id)
};

Field::new(
array_name,
arrow::datatypes::DataType::List(elem_field),
true,
)
.with_metadata(list_metadata)
.into()
}

fn parse_map_schema(typoid: Oid, typmod: i32, map_name: &str, field_id: &mut i32) -> Arc<Field> {
let map_metadata = HashMap::from_iter(vec![(
PARQUET_FIELD_ID_META_KEY.into(),
field_id.to_string(),
)]);

*field_id += 1;

let tupledesc = tuple_desc(typoid, typmod);
let entries_field = parse_struct_schema(tupledesc, map_name, field_id);
let entries_field = adjust_map_entries_field(entries_field);

Field::new(
map_name,
arrow::datatypes::DataType::Map(entries_field, false),
true,
)
.with_metadata(map_metadata)
.into()
}

fn parse_primitive_schema(
typoid: Oid,
typmod: i32,
elem_name: &str,
field_id: &mut i32,
) -> Arc<Field> {
pgrx::pg_sys::check_for_interrupts!();

let primitive_metadata = HashMap::<String, String>::from_iter(vec![(
PARQUET_FIELD_ID_META_KEY.into(),
field_id.to_string(),
)]);

*field_id += 1;

let field = match typoid {
FLOAT4OID => Field::new(elem_name, arrow::datatypes::DataType::Float32, true),
FLOAT8OID => Field::new(elem_name, arrow::datatypes::DataType::Float64, true),
BOOLOID => Field::new(elem_name, arrow::datatypes::DataType::Boolean, true),
INT2OID => Field::new(elem_name, arrow::datatypes::DataType::Int16, true),
INT4OID => Field::new(elem_name, arrow::datatypes::DataType::Int32, true),
INT8OID => Field::new(elem_name, arrow::datatypes::DataType::Int64, true),
NUMERICOID => {
let precision = extract_precision_from_numeric_typmod(typmod);
let scale = extract_scale_from_numeric_typmod(typmod);

if precision > MAX_DECIMAL_PRECISION {
Field::new(elem_name, arrow::datatypes::DataType::Utf8, true)
} else {
Field::new(
elem_name,
arrow::datatypes::DataType::Decimal128(precision as _, scale as _),
true,
)
}
}
DATEOID => Field::new(elem_name, arrow::datatypes::DataType::Date32, true),
TIMESTAMPOID => Field::new(
elem_name,
arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
true,
),
TIMESTAMPTZOID => Field::new(
elem_name,
arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Microsecond,
Some("+00:00".into()),
),
true,
),
TIMEOID => Field::new(
elem_name,
arrow::datatypes::DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
true,
),
TIMETZOID => Field::new(
elem_name,
arrow::datatypes::DataType::Time64(arrow::datatypes::TimeUnit::Microsecond),
true,
)
.with_metadata(HashMap::from_iter(vec![(
"adjusted_to_utc".into(),
"true".into(),
)])),
CHAROID => Field::new(elem_name, arrow::datatypes::DataType::Utf8, true),
TEXTOID => Field::new(elem_name, arrow::datatypes::DataType::Utf8, true),
BYTEAOID => Field::new(elem_name, arrow::datatypes::DataType::Binary, true),
OIDOID => Field::new(elem_name, arrow::datatypes::DataType::UInt32, true),
_ => {
if is_postgis_geometry_type(typoid) {
Field::new(elem_name, arrow::datatypes::DataType::Binary, true)
} else {
Field::new(elem_name, arrow::datatypes::DataType::Utf8, true)
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment on interval / other things that we'd like to have in this list

};

// Combine the field metadata with the field metadata from the schema visitor
let primitive_metadata = field
.metadata()
.iter()
.chain(primitive_metadata.iter())
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

field.with_metadata(primitive_metadata).into()
}

fn adjust_map_entries_field(field: FieldRef) -> FieldRef {
let name = field.deref().name();
let data_type = field.deref().data_type();
let metadata = field.deref().metadata().clone();

let not_nullable_key_field;
let nullable_value_field;

match data_type {
arrow::datatypes::DataType::Struct(fields) => {
let key_field = fields.find("key").expect("expected key field").1;
let value_field = fields.find("val").expect("expected val field").1;

not_nullable_key_field =
Field::new(key_field.name(), key_field.data_type().clone(), false)
.with_metadata(key_field.metadata().clone());

nullable_value_field =
Field::new(value_field.name(), value_field.data_type().clone(), true)
.with_metadata(value_field.metadata().clone());
}
_ => {
panic!("expected struct data type for map entries")
}
};

let entries_field = Field::new(
name,
arrow::datatypes::DataType::Struct(Fields::from(vec![
not_nullable_key_field,
nullable_value_field,
])),
false,
)
.with_metadata(metadata);

Arc::new(entries_field)
}

pub(crate) fn ensure_arrow_schema_match_tupledesc(
file_schema: Arc<Schema>,
tupledesc: &PgTupleDesc,
) {
let table_schema = parse_arrow_schema_from_tupledesc(tupledesc);

for table_schema_field in table_schema.fields().iter() {
let table_schema_field_name = table_schema_field.name();
let table_schema_field_type = table_schema_field.data_type();

let file_schema_field = file_schema.column_with_name(table_schema_field_name);

if let Some(file_schema_field) = file_schema_field {
let file_schema_field_type = file_schema_field.1.data_type();

if file_schema_field_type != table_schema_field_type {
panic!(
"type mismatch for column \"{}\" between table and parquet file. table expected \"{}\" but file had \"{}\"",
table_schema_field_name,
table_schema_field_type,
file_schema_field_type,
);
}
} else {
panic!(
"column \"{}\" is not found in parquet file",
table_schema_field_name
);
}
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use pgrx::prelude::*;

mod arrow_parquet;
mod pgrx_utils;
mod type_compat;

Expand Down
Loading