diff --git a/src/query/storages/common/table_meta/src/meta/mod.rs b/src/query/storages/common/table_meta/src/meta/mod.rs index 165c8bfe758f..3dbf37e855f3 100644 --- a/src/query/storages/common/table_meta/src/meta/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/mod.rs @@ -37,6 +37,7 @@ pub use statistics::*; // export legacy versioned table meta types locally, // currently, used by versioned readers only pub(crate) use testing::*; +pub use utils::is_possible_non_standard_decimal_block; pub use utils::parse_storage_prefix; pub use utils::trim_vacuum2_object_prefix; pub use utils::try_extract_uuid_str_from_path; diff --git a/src/query/storages/common/table_meta/src/meta/utils.rs b/src/query/storages/common/table_meta/src/meta/utils.rs index f0d6f6edef11..81929471cbe2 100644 --- a/src/query/storages/common/table_meta/src/meta/utils.rs +++ b/src/query/storages/common/table_meta/src/meta/utils.rs @@ -83,7 +83,27 @@ pub fn parse_storage_prefix(options: &BTreeMap, table_id: u64) - #[inline] pub fn trim_vacuum2_object_prefix(key: &str) -> &str { - key.strip_prefix(VACUUM2_OBJECT_KEY_PREFIX).unwrap_or(key) + // if object key (the file_name/stem part only) starts with a char which is larger or equals to + // VACUUM2_OBJECT_KEY_PREFIX( i.e. char 'g'), strip it off + if key >= VACUUM2_OBJECT_KEY_PREFIX { + &key[1..] + } else { + key + } +} + +pub fn is_possible_non_standard_decimal_block(block_full_path: &str) -> Result { + let file_name = Path::new(block_full_path) + .file_name() + .ok_or_else(|| { + ErrorCode::StorageOther(format!( + "Illegal block path, no file name found: {}", + block_full_path + )) + })? + .to_str() + .expect("File stem of a block full path should always be valid UTF-8"); + Ok(file_name < VACUUM2_OBJECT_KEY_PREFIX) } // Extracts the UUID part from the object key. @@ -144,4 +164,26 @@ mod tests { assert_eq!(try_extract_uuid_str_from_path(input).unwrap(), expected); } } + + #[test] + fn test_is_possible_non_standard_decimal_block() { + let test_cases = vec![ + ( + // stem of block path starts with 'g', should not contain non-standard decimals + "bucket/root/115/122/_b/g0191114d30fd78b89fae8e5c88327725_v2.parquet", + false, + ), + ( + "bucket/root/115/122/_b/0191114d30fd78b89fae8e5c88327725_v2.parquet", + true, + ), + ]; + + for (input, expected) in test_cases { + assert_eq!( + is_possible_non_standard_decimal_block(input).unwrap(), + expected + ); + } + } } diff --git a/src/query/storages/fuse/src/io/locations.rs b/src/query/storages/fuse/src/io/locations.rs index 234e0ee27dd7..f8dd407653fe 100644 --- a/src/query/storages/fuse/src/io/locations.rs +++ b/src/query/storages/fuse/src/io/locations.rs @@ -81,7 +81,7 @@ impl TableMetaLocationGenerator { pub fn gen_block_location(&self) -> (Location, Uuid) { let part_uuid = Uuid::new_v4(); let location_path = format!( - "{}/{}/{}{}_v{}.parquet", + "{}/{}/g{}{}_v{}.parquet", &self.prefix, FUSE_TBL_BLOCK_PREFIX, &self.part_prefix, diff --git a/src/query/storages/fuse/src/io/read/block/parquet/arrow_parquet.rs b/src/query/storages/fuse/src/io/read/block/parquet/arrow_parquet.rs new file mode 100644 index 000000000000..d7da6b76e58d --- /dev/null +++ b/src/query/storages/fuse/src/io/read/block/parquet/arrow_parquet.rs @@ -0,0 +1,356 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module makes a read fix to read old parquet incompat formats which take decimal256 as 32 fixed bytes + +use std::sync::Arc; + +use arrow::datatypes::DataType; +use arrow::datatypes::Field; +use arrow::datatypes::Schema; +use arrow::datatypes::TimeUnit; +use parquet::basic::ConvertedType; +use parquet::basic::LogicalType; +use parquet::basic::Repetition; +use parquet::basic::TimeUnit as ParquetTimeUnit; +use parquet::basic::Type as PhysicalType; +use parquet::errors::ParquetError; +use parquet::errors::Result; +use parquet::schema::types::SchemaDescriptor; +use parquet::schema::types::Type; + +pub fn arrow_to_parquet_schema_fix(schema: &Schema) -> Result { + arrow_to_parquet_schema_with_root_fix(schema, "arrow_schema") +} + +/// Convert arrow schema to parquet schema specifying the name of the root schema element +pub fn arrow_to_parquet_schema_with_root_fix( + schema: &Schema, + root: &str, +) -> Result { + let fields = schema + .fields() + .iter() + .map(|field| arrow_to_parquet_type_fix(field).map(Arc::new)) + .collect::>()?; + let group = Type::group_type_builder(root).with_fields(fields).build()?; + Ok(SchemaDescriptor::new(Arc::new(group))) +} + +fn decimal_length_from_precision(precision: u8) -> usize { + // digits = floor(log_10(2^(8*n - 1) - 1)) + // ceil(digits) = log10(2^(8*n - 1) - 1) + // 10^ceil(digits) = 2^(8*n - 1) - 1 + // 10^ceil(digits) + 1 = 2^(8*n - 1) + // log2(10^ceil(digits) + 1) = (8*n - 1) + // log2(10^ceil(digits) + 1) + 1 = 8*n + // (log2(10^ceil(a) + 1) + 1) / 8 = n + (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize +} + +/// Convert an arrow field to a parquet `Type` +pub(crate) fn arrow_to_parquet_type_fix(field: &Field) -> Result { + let name = field.name().as_str(); + let repetition = if field.is_nullable() { + Repetition::OPTIONAL + } else { + Repetition::REQUIRED + }; + let id = field_id(field); + // create type from field + match field.data_type() { + DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Unknown)) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Integer { + bit_width: 8, + is_signed: true, + })) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Integer { + bit_width: 16, + is_signed: true, + })) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Integer { + bit_width: 8, + is_signed: false, + })) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Integer { + bit_width: 16, + is_signed: false, + })) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Integer { + bit_width: 32, + is_signed: false, + })) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64) + .with_logical_type(Some(LogicalType::Integer { + bit_width: 64, + is_signed: false, + })) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) + .with_repetition(repetition) + .with_id(id) + .with_logical_type(Some(LogicalType::Float16)) + .with_length(2) + .build(), + DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Timestamp(TimeUnit::Second, _) => { + // Cannot represent seconds in LogicalType + Type::primitive_type_builder(name, PhysicalType::INT64) + .with_repetition(repetition) + .with_id(id) + .build() + } + DataType::Timestamp(time_unit, tz) => { + Type::primitive_type_builder(name, PhysicalType::INT64) + .with_logical_type(Some(LogicalType::Timestamp { + // If timezone set, values are normalized to UTC timezone + is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()), + unit: match time_unit { + TimeUnit::Second => unreachable!(), + TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), + TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), + TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), + }, + })) + .with_repetition(repetition) + .with_id(id) + .build() + } + DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Date)) + .with_repetition(repetition) + .with_id(id) + .build(), + // date64 is cast to date32 (#1666) + DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Date)) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Time32(TimeUnit::Second) => { + // Cannot represent seconds in LogicalType + Type::primitive_type_builder(name, PhysicalType::INT32) + .with_repetition(repetition) + .with_id(id) + .build() + } + DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Time { + is_adjusted_to_u_t_c: false, + unit: match unit { + TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), + u => unreachable!("Invalid unit for Time32: {:?}", u), + }, + })) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64) + .with_logical_type(Some(LogicalType::Time { + is_adjusted_to_u_t_c: false, + unit: match unit { + TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), + TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), + u => unreachable!("Invalid unit for Time64: {:?}", u), + }, + })) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Duration(_) => Err(ParquetError::ArrowError( + "Converting Duration to parquet not supported".to_string(), + )), + DataType::Interval(_) => { + Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) + .with_converted_type(ConvertedType::INTERVAL) + .with_repetition(repetition) + .with_id(id) + .with_length(12) + .build() + } + DataType::Binary | DataType::LargeBinary => { + Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_repetition(repetition) + .with_id(id) + .build() + } + DataType::FixedSizeBinary(length) => { + Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) + .with_repetition(repetition) + .with_id(id) + .with_length(*length) + .build() + } + DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { + // Decimal precision determines the Parquet physical type to use. + // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal + let (physical_type, length) = if *precision > 1 && *precision <= 9 { + (PhysicalType::INT32, -1) + } else if *precision <= 18 { + (PhysicalType::INT64, -1) + } else if *precision > 38 { + // For fix read + (PhysicalType::FIXED_LEN_BYTE_ARRAY, 32) + } else { + ( + PhysicalType::FIXED_LEN_BYTE_ARRAY, + decimal_length_from_precision(*precision) as i32, + ) + }; + Type::primitive_type_builder(name, physical_type) + .with_repetition(repetition) + .with_id(id) + .with_length(length) + .with_logical_type(Some(LogicalType::Decimal { + scale: *scale as i32, + precision: *precision as i32, + })) + .with_precision(*precision as i32) + .with_scale(*scale as i32) + .build() + } + DataType::Utf8 | DataType::LargeUtf8 => { + Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::String)) + .with_repetition(repetition) + .with_id(id) + .build() + } + DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::String)) + .with_repetition(repetition) + .with_id(id) + .build(), + DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => { + Type::group_type_builder(name) + .with_fields(vec![Arc::new( + Type::group_type_builder("list") + .with_fields(vec![Arc::new(arrow_to_parquet_type_fix(f)?)]) + .with_repetition(Repetition::REPEATED) + .build()?, + )]) + .with_logical_type(Some(LogicalType::List)) + .with_repetition(repetition) + .with_id(id) + .build() + } + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView/LargeListView not implemented") + } + DataType::Struct(fields) => { + if fields.is_empty() { + return Err(ParquetError::ArrowError( + "Parquet does not support writing empty structs".to_string(), + )); + } + // recursively convert children to types/nodes + let fields = fields + .iter() + .map(|f| arrow_to_parquet_type_fix(f).map(Arc::new)) + .collect::>()?; + Type::group_type_builder(name) + .with_fields(fields) + .with_repetition(repetition) + .with_id(id) + .build() + } + DataType::Map(field, _) => { + if let DataType::Struct(struct_fields) = field.data_type() { + Type::group_type_builder(name) + .with_fields(vec![Arc::new( + Type::group_type_builder(field.name()) + .with_fields(vec![ + Arc::new(arrow_to_parquet_type_fix(&struct_fields[0])?), + Arc::new(arrow_to_parquet_type_fix(&struct_fields[1])?), + ]) + .with_repetition(Repetition::REPEATED) + .build()?, + )]) + .with_logical_type(Some(LogicalType::Map)) + .with_repetition(repetition) + .with_id(id) + .build() + } else { + Err(ParquetError::ArrowError( + "DataType::Map should contain a struct field child".to_string(), + )) + } + } + DataType::Union(_, _) => unimplemented!("See ARROW-8817."), + DataType::Dictionary(_, ref value) => { + // Dictionary encoding not handled at the schema level + let dict_field = field.clone().with_data_type(value.as_ref().clone()); + arrow_to_parquet_type_fix(&dict_field) + } + DataType::RunEndEncoded(_, _) => Err(ParquetError::ArrowError( + "Converting RunEndEncodedType to parquet not supported".to_string(), + )), + } +} + +fn field_id(field: &Field) -> Option { + let value = field + .metadata() + .get(parquet::arrow::PARQUET_FIELD_ID_META_KEY)?; + value.parse().ok() // Fail quietly if not a valid integer +} diff --git a/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs b/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs index 8f4495402f39..56d36a8e16b9 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs @@ -16,8 +16,11 @@ use std::collections::HashMap; use arrow_array::RecordBatch; use databend_common_expression::converts::arrow::table_schema_to_arrow_schema; +use databend_common_expression::types::DecimalDataType; use databend_common_expression::ColumnId; +use databend_common_expression::TableDataType; use databend_common_expression::TableSchema; +use databend_storages_common_table_meta::meta::is_possible_non_standard_decimal_block; use databend_storages_common_table_meta::meta::Compression; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::arrow_to_parquet_schema; @@ -27,6 +30,7 @@ use parquet::basic::Compression as ParquetCompression; use crate::io::read::block::block_reader_merge_io::DataItem; use crate::io::read::block::parquet::adapter::RowGroupImplBuilder; +use crate::io::read::block::parquet::arrow_parquet::arrow_to_parquet_schema_fix; /// The returned record batch contains all deserialized columns in the same nested structure as the original schema. pub fn column_chunks_to_record_batch( @@ -34,9 +38,23 @@ pub fn column_chunks_to_record_batch( num_rows: usize, column_chunks: &HashMap, compression: &Compression, + block_path: &str, ) -> databend_common_exception::Result { + let use_v1 = is_possible_non_standard_decimal_block(block_path)? + && original_schema.fields.iter().any(|f| { + matches!( + f.data_type.remove_nullable(), + TableDataType::Decimal(DecimalDataType::Decimal256(_)) + ) + }); + log::info!("deserialize {block_path} with use_v1: {use_v1}"); let arrow_schema = table_schema_to_arrow_schema(original_schema); - let parquet_schema = arrow_to_parquet_schema(&arrow_schema)?; + let parquet_schema = if !use_v1 { + arrow_to_parquet_schema(&arrow_schema)? + } else { + arrow_to_parquet_schema_fix(&arrow_schema)? + }; + let column_id_to_dfs_id = original_schema .to_leaf_column_ids() .iter() diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index c4b212b4e0f4..b42c696a614e 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -33,6 +33,7 @@ use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::Compression; mod adapter; +mod arrow_parquet; mod deserialize; pub use adapter::RowGroupImplBuilder; @@ -58,6 +59,7 @@ impl BlockReader { num_rows, &column_chunks, compression, + block_path, )?; let mut columns = Vec::with_capacity(self.projected_schema.fields.len()); let name_paths = column_name_paths(&self.projection, &self.original_schema); diff --git a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs index 34392346ca00..7c6bd1fae3fa 100644 --- a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs @@ -200,6 +200,7 @@ impl VirtualColumnReader { part.nums_rows, &columns_chunks, &part.compression, + "gx", ) }) .transpose()?;