From 868f446844dd9325164be4c7ca6168ccc0e12604 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 28 Nov 2024 13:12:35 -0800 Subject: [PATCH 01/16] Initial schema compat --- kernel/src/table_changes/mod.rs | 1 + kernel/src/table_changes/schema_compat.rs | 144 ++++++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 kernel/src/table_changes/schema_compat.rs diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index a855668d8..96ecaf648 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -52,6 +52,7 @@ mod physical_to_logical; mod resolve_dvs; pub mod scan; mod scan_file; +mod schema_compat; static CHANGE_TYPE_COL_NAME: &str = "_change_type"; static COMMIT_VERSION_COL_NAME: &str = "_commit_version"; diff --git a/kernel/src/table_changes/schema_compat.rs b/kernel/src/table_changes/schema_compat.rs new file mode 100644 index 000000000..b0b2ff08d --- /dev/null +++ b/kernel/src/table_changes/schema_compat.rs @@ -0,0 +1,144 @@ +use std::collections::{HashMap, HashSet}; + +use crate::schema::{DataType, Schema, StructField, StructType}; + +fn is_schema_compatible(existing: &Schema, new: &Schema) -> bool { + for field in later.fields() {} + todo!() +} +fn is_nullability_compatible(existing_nullable: bool, new_nullable: bool) -> bool { + // The case to avoid is when the new is non-nullable, but the existing one is. + // Hence !(!new_nullable && existing_nullable) + // == new_nullable || !existing_nullable + new_nullable || !existing_nullable +} +fn is_struct_read_compatible(existing: &StructType, newtype: &StructType) -> bool { + // Delta tables do not allow fields that differ in name only by case + let existing_fields: HashMap<&String, &StructField> = existing + .fields() + .map(|field| (field.name(), field)) + .collect(); + + let existing_names: HashSet = existing + .fields() + .map(|field| field.name().clone()) + .collect(); + let new_names: HashSet = newtype.fields().map(|field| field.name().clone()).collect(); + if new_names.is_subset(&existing_names) { + return false; + } + newtype.fields().all(|new_field| { + existing_fields + .get(new_field.name()) + .into_iter() + .all(|existing_field| { + existing_field.name() == new_field.name() + && is_nullability_compatible(existing_field.nullable, new_field.nullable) + && is_datatype_read_compatible( + existing_field.data_type(), + new_field.data_type(), + ) + }) + }); + // + // new_fields{ newField => + // // new fields are fine, they just won't be returned + // existingFields.get(newField.name).forall { existingField => + // // we know the name matches modulo case - now verify exact match + // (existingField.name == newField.name + // // if existing value is non-nullable, so should be the new value + // && isNullabilityCompatible(existingField.nullable, newField.nullable) + // // and the type of the field must be compatible, too + // && isDatatypeReadCompatible(existingField.dataType, newField.dataType)) + // } + // } + false +} +fn is_datatype_read_compatible(existing: &DataType, newtype: &DataType) -> bool { + match (existing, newtype) { + // TODO: Add support for type widening + (DataType::Array(a), DataType::Array(b)) => { + is_datatype_read_compatible(a.element_type(), b.element_type()) + && is_nullability_compatible(a.contains_null(), b.contains_null()) + } + (DataType::Struct(a), DataType::Struct(b)) => is_struct_read_compatible(a, b), + (DataType::Map(a), DataType::Map(b)) => { + is_nullability_compatible(a.value_contains_null(), b.value_contains_null()) + && is_datatype_read_compatible(a.key_type(), b.key_type()) + && is_datatype_read_compatible(a.value_type(), b.value_type()) + } + (a, b) => a == b, + } +} + +//def isReadCompatible( +// existingSchema: StructType, +// readSchema: StructType, +// forbidTightenNullability: Boolean = false, +// allowMissingColumns: Boolean = false, +// allowTypeWidening: Boolean = false, +// newPartitionColumns: Seq[String] = Seq.empty, +// oldPartitionColumns: Seq[String] = Seq.empty): Boolean = { +// +// def isNullabilityCompatible(existingNullable: Boolean, readNullable: Boolean): Boolean = { +// if (forbidTightenNullability) { +// readNullable || !existingNullable +// } else { +// existingNullable || !readNullable +// } +// } +// +// def isDatatypeReadCompatible(existing: DataType, newtype: DataType): Boolean = { +// (existing, newtype) match { +// case (e: StructType, n: StructType) => +// isReadCompatible(e, n, forbidTightenNullability, allowTypeWidening = allowTypeWidening) +// case (e: ArrayType, n: ArrayType) => +// // if existing elements are non-nullable, so should be the new element +// isNullabilityCompatible(e.containsNull, n.containsNull) && +// isDatatypeReadCompatible(e.elementType, n.elementType) +// case (e: MapType, n: MapType) => +// // if existing value is non-nullable, so should be the new value +// isNullabilityCompatible(e.valueContainsNull, n.valueContainsNull) && +// isDatatypeReadCompatible(e.keyType, n.keyType) && +// isDatatypeReadCompatible(e.valueType, n.valueType) +// case (e: AtomicType, n: AtomicType) if allowTypeWidening => +// TypeWidening.isTypeChangeSupportedForSchemaEvolution(e, n) +// case (a, b) => a == b +// } +// } +// +// def isStructReadCompatible(existing: StructType, newtype: StructType): Boolean = { +// val existingFields = toFieldMap(existing) +// // scalastyle:off caselocale +// val existingFieldNames = existing.fieldNames.map(_.toLowerCase).toSet +// assert(existingFieldNames.size == existing.length, +// "Delta tables don't allow field names that only differ by case") +// val newFields = newtype.fieldNames.map(_.toLowerCase).toSet +// assert(newFields.size == newtype.length, +// "Delta tables don't allow field names that only differ by case") +// // scalastyle:on caselocale +// +// if (!allowMissingColumns && +// !(existingFieldNames.subsetOf(newFields) && +// isPartitionCompatible(newPartitionColumns, oldPartitionColumns))) { +// // Dropped a column that was present in the DataFrame schema +// return false +// } +// newtype.forall { newField => +// // new fields are fine, they just won't be returned +// existingFields.get(newField.name).forall { existingField => +// // we know the name matches modulo case - now verify exact match +// (existingField.name == newField.name +// // if existing value is non-nullable, so should be the new value +// && isNullabilityCompatible(existingField.nullable, newField.nullable) +// // and the type of the field must be compatible, too +// && isDatatypeReadCompatible(existingField.dataType, newField.dataType)) +// } +// } +// } +// +// isStructReadCompatible(existingSchema, readSchema) +//} +// +#[cfg(test)] +mod tests {} From 277361eb05c044efa826cc3e48fb8ae1ea1404d3 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 28 Nov 2024 16:24:41 -0800 Subject: [PATCH 02/16] Add schema compatibility tests --- kernel/src/table_changes/schema_compat.rs | 378 +++++++++++++++------- 1 file changed, 268 insertions(+), 110 deletions(-) diff --git a/kernel/src/table_changes/schema_compat.rs b/kernel/src/table_changes/schema_compat.rs index b0b2ff08d..d2593a985 100644 --- a/kernel/src/table_changes/schema_compat.rs +++ b/kernel/src/table_changes/schema_compat.rs @@ -2,17 +2,14 @@ use std::collections::{HashMap, HashSet}; use crate::schema::{DataType, Schema, StructField, StructType}; -fn is_schema_compatible(existing: &Schema, new: &Schema) -> bool { - for field in later.fields() {} - todo!() +fn is_nullability_compatible(existing_nullable: bool, read_nullable: bool) -> bool { + // The case to avoid is when the read_schema is non-nullable and the existing one is nullable. + // So we avoid the case where !read_nullable && existing_nullable + // Hence we check that !(!read_nullable && existing_nullable) + // == read_nullable || !existing_nullable + read_nullable || !existing_nullable } -fn is_nullability_compatible(existing_nullable: bool, new_nullable: bool) -> bool { - // The case to avoid is when the new is non-nullable, but the existing one is. - // Hence !(!new_nullable && existing_nullable) - // == new_nullable || !existing_nullable - new_nullable || !existing_nullable -} -fn is_struct_read_compatible(existing: &StructType, newtype: &StructType) -> bool { +fn is_struct_read_compatible(existing: &StructType, read_type: &StructType) -> bool { // Delta tables do not allow fields that differ in name only by case let existing_fields: HashMap<&String, &StructField> = existing .fields() @@ -23,39 +20,34 @@ fn is_struct_read_compatible(existing: &StructType, newtype: &StructType) -> boo .fields() .map(|field| field.name().clone()) .collect(); - let new_names: HashSet = newtype.fields().map(|field| field.name().clone()).collect(); - if new_names.is_subset(&existing_names) { + let read_names: HashSet = read_type + .fields() + .map(|field| field.name().clone()) + .collect(); + if !existing_names.is_subset(&read_names) { return false; } - newtype.fields().all(|new_field| { - existing_fields - .get(new_field.name()) - .into_iter() - .all(|existing_field| { - existing_field.name() == new_field.name() - && is_nullability_compatible(existing_field.nullable, new_field.nullable) - && is_datatype_read_compatible( - existing_field.data_type(), - new_field.data_type(), - ) - }) - }); - // - // new_fields{ newField => - // // new fields are fine, they just won't be returned - // existingFields.get(newField.name).forall { existingField => - // // we know the name matches modulo case - now verify exact match - // (existingField.name == newField.name - // // if existing value is non-nullable, so should be the new value - // && isNullabilityCompatible(existingField.nullable, newField.nullable) - // // and the type of the field must be compatible, too - // && isDatatypeReadCompatible(existingField.dataType, newField.dataType)) - // } - // } - false + read_type + .fields() + .all(|read_field| match existing_fields.get(read_field.name()) { + Some(existing_field) => { + let name_equal = existing_field.name() == read_field.name(); + + let nullability_equal = + is_nullability_compatible(existing_field.nullable, read_field.nullable); + let data_type_equal = + is_datatype_read_compatible(existing_field.data_type(), read_field.data_type()); + println!( + "name_equal {} nullability: {}, datatype: {}", + name_equal, nullability_equal, data_type_equal + ); + name_equal && nullability_equal && data_type_equal + } + None => read_field.is_nullable(), + }) } -fn is_datatype_read_compatible(existing: &DataType, newtype: &DataType) -> bool { - match (existing, newtype) { +fn is_datatype_read_compatible(existing: &DataType, read_type: &DataType) -> bool { + match (existing, read_type) { // TODO: Add support for type widening (DataType::Array(a), DataType::Array(b)) => { is_datatype_read_compatible(a.element_type(), b.element_type()) @@ -71,74 +63,240 @@ fn is_datatype_read_compatible(existing: &DataType, newtype: &DataType) -> bool } } -//def isReadCompatible( -// existingSchema: StructType, -// readSchema: StructType, -// forbidTightenNullability: Boolean = false, -// allowMissingColumns: Boolean = false, -// allowTypeWidening: Boolean = false, -// newPartitionColumns: Seq[String] = Seq.empty, -// oldPartitionColumns: Seq[String] = Seq.empty): Boolean = { -// -// def isNullabilityCompatible(existingNullable: Boolean, readNullable: Boolean): Boolean = { -// if (forbidTightenNullability) { -// readNullable || !existingNullable -// } else { -// existingNullable || !readNullable -// } -// } -// -// def isDatatypeReadCompatible(existing: DataType, newtype: DataType): Boolean = { -// (existing, newtype) match { -// case (e: StructType, n: StructType) => -// isReadCompatible(e, n, forbidTightenNullability, allowTypeWidening = allowTypeWidening) -// case (e: ArrayType, n: ArrayType) => -// // if existing elements are non-nullable, so should be the new element -// isNullabilityCompatible(e.containsNull, n.containsNull) && -// isDatatypeReadCompatible(e.elementType, n.elementType) -// case (e: MapType, n: MapType) => -// // if existing value is non-nullable, so should be the new value -// isNullabilityCompatible(e.valueContainsNull, n.valueContainsNull) && -// isDatatypeReadCompatible(e.keyType, n.keyType) && -// isDatatypeReadCompatible(e.valueType, n.valueType) -// case (e: AtomicType, n: AtomicType) if allowTypeWidening => -// TypeWidening.isTypeChangeSupportedForSchemaEvolution(e, n) -// case (a, b) => a == b -// } -// } -// -// def isStructReadCompatible(existing: StructType, newtype: StructType): Boolean = { -// val existingFields = toFieldMap(existing) -// // scalastyle:off caselocale -// val existingFieldNames = existing.fieldNames.map(_.toLowerCase).toSet -// assert(existingFieldNames.size == existing.length, -// "Delta tables don't allow field names that only differ by case") -// val newFields = newtype.fieldNames.map(_.toLowerCase).toSet -// assert(newFields.size == newtype.length, -// "Delta tables don't allow field names that only differ by case") -// // scalastyle:on caselocale -// -// if (!allowMissingColumns && -// !(existingFieldNames.subsetOf(newFields) && -// isPartitionCompatible(newPartitionColumns, oldPartitionColumns))) { -// // Dropped a column that was present in the DataFrame schema -// return false -// } -// newtype.forall { newField => -// // new fields are fine, they just won't be returned -// existingFields.get(newField.name).forall { existingField => -// // we know the name matches modulo case - now verify exact match -// (existingField.name == newField.name -// // if existing value is non-nullable, so should be the new value -// && isNullabilityCompatible(existingField.nullable, newField.nullable) -// // and the type of the field must be compatible, too -// && isDatatypeReadCompatible(existingField.dataType, newField.dataType)) -// } -// } -// } -// -// isStructReadCompatible(existingSchema, readSchema) -//} -// +fn is_partition_compatible( + existing_partition_cols: &[String], + read_partition_cols: &[String], +) -> bool { + existing_partition_cols == read_partition_cols +} + +fn is_schema_compatible(existing_schema: &Schema, read_schema: &Schema) -> bool { + is_struct_read_compatible(existing_schema, read_schema) +} #[cfg(test)] -mod tests {} +mod tests { + + use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; + + use super::is_schema_compatible; + + #[test] + fn equal_schema() { + let map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let map_value = StructType::new([StructField::new("age", DataType::INTEGER, true)]); + let map_type = MapType::new(map_key, map_value, true); + + let array_type = ArrayType::new(DataType::TIMESTAMP, false); + + let nested_struct = StructType::new([ + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("map", map_type, false), + StructField::new("array", array_type, false), + StructField::new("nested_struct", nested_struct, false), + ]); + + assert!(is_schema_compatible(&schema, &schema)); + } + + #[test] + fn different_schema_fails() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("company", DataType::STRING, false), + StructField::new("employee_name", DataType::STRING, false), + StructField::new("salary", DataType::LONG, false), + StructField::new("position_name", DataType::STRING, true), + ]); + assert!(!is_schema_compatible(&existing_schema, &read_schema)); + } + + #[test] + fn map_nullability_and_ok_schema_evolution() { + let existing_map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let existing_map_value = + StructType::new([StructField::new("age", DataType::INTEGER, true)]); + let existing_schema = StructType::new([StructField::new( + "map", + MapType::new(existing_map_key, existing_map_value, false), + false, + )]); + + let read_map_key = StructType::new([ + StructField::new("id", DataType::LONG, true), + StructField::new("name", DataType::STRING, true), + StructField::new("location", DataType::STRING, true), + ]); + let read_map_value = StructType::new([ + StructField::new("age", DataType::INTEGER, true), + StructField::new("years_of_experience", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([StructField::new( + "map", + MapType::new(read_map_key, read_map_value, true), + false, + )]); + + assert!(is_schema_compatible(&existing_schema, &read_schema)); + } + #[test] + fn map_value_becomes_non_nullable_fails() { + let map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let map_value = StructType::new([StructField::new("age", DataType::INTEGER, true)]); + let existing_schema = StructType::new([StructField::new( + "map", + MapType::new(map_key, map_value, false), + false, + )]); + + let map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let map_value = StructType::new([StructField::new("age", DataType::INTEGER, false)]); + let read_schema = StructType::new([StructField::new( + "map", + MapType::new(map_key, map_value, false), + false, + )]); + + assert!(!is_schema_compatible(&existing_schema, &read_schema)); + } + #[test] + fn map_schema_new_non_nullable_value_fails() { + let existing_map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let existing_map_value = + StructType::new([StructField::new("age", DataType::INTEGER, true)]); + let existing_schema = StructType::new([StructField::new( + "map", + MapType::new(existing_map_key, existing_map_value, false), + false, + )]); + + let read_map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let read_map_value = StructType::new([ + StructField::new("age", DataType::INTEGER, true), + StructField::new("years_of_experience", DataType::INTEGER, false), + ]); + let read_schema = StructType::new([StructField::new( + "map", + MapType::new(read_map_key, read_map_value, false), + false, + )]); + + assert!(!is_schema_compatible(&existing_schema, &read_schema)); + } + + #[test] + fn different_field_name_fails() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("new_id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + assert!(!is_schema_compatible(&existing_schema, &read_schema)); + } + + #[test] + fn different_type_fails() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("id", DataType::INTEGER, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + assert!(!is_schema_compatible(&existing_schema, &read_schema)); + } + #[test] + fn set_nullable_to_true() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, true), + StructField::new("age", DataType::INTEGER, true), + ]); + assert!(is_schema_compatible(&existing_schema, &read_schema)); + } + #[test] + fn set_nullable_to_false_fails() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, false), + ]); + assert!(!is_schema_compatible(&existing_schema, &read_schema)); + } + #[test] + fn new_nullable_column() { + let existing = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + + let read = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + StructField::new("location", DataType::STRING, true), + ]); + assert!(is_schema_compatible(&existing, &read)); + } + + #[test] + fn new_non_nullable_column_fails() { + let existing = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + + let read = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + StructField::new("location", DataType::STRING, false), + ]); + assert!(!is_schema_compatible(&existing, &read)); + } +} From 0d8bdc60caaa253644f2e476771b1519be14f8b6 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 2 Jan 2025 14:44:52 -0800 Subject: [PATCH 03/16] Add error-based schema compatibility check --- kernel/src/lib.rs | 1 + kernel/src/schema_compat.rs | 335 ++++++++++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+) create mode 100644 kernel/src/schema_compat.rs diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 49dceea75..57c88e741 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -87,6 +87,7 @@ pub mod table_properties; pub mod transaction; pub(crate) mod predicates; +pub(crate) mod schema_compat; pub(crate) mod utils; #[cfg(feature = "developer-visibility")] diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs new file mode 100644 index 000000000..33fd8e618 --- /dev/null +++ b/kernel/src/schema_compat.rs @@ -0,0 +1,335 @@ +use std::collections::HashSet; + +use crate::{ + schema::{DataType, StructField, StructType}, + utils::require, + DeltaResult, Error, +}; + +struct NullabilityCheck { + nullable: bool, + read_nullable: bool, +} +impl NullabilityCheck { + fn is_compatible(&self) -> DeltaResult<()> { + // The case to avoid is when the read_schema is non-nullable and the existing one is nullable. + // So we avoid the case where !read_nullable && existing_nullable + // Hence we check that !(!read_nullable && existing_nullable) + // == read_nullable || !existing_nullable + require!( + self.read_nullable || !self.nullable, + Error::generic("Read field is non-nullable while this field is nullable") + ); + Ok(()) + } +} + +impl StructField { + fn can_read_as(&self, read_field: &StructField) -> DeltaResult<()> { + require!( + self.name() == read_field.name(), + Error::generic(format!( + "field names {} and {} are not the same", + self.name(), + read_field.name() + )) + ); + + NullabilityCheck { + nullable: self.nullable, + read_nullable: read_field.nullable, + } + .is_compatible()?; + + self.data_type().can_read_as(read_field.data_type())?; + Ok(()) + } +} +impl StructType { + pub(crate) fn can_read_as(&self, read_type: &StructType) -> DeltaResult<()> { + // Delta tables do not allow fields that differ in name only by case + let names: HashSet<&String> = self.fields.keys().collect(); + let read_names: HashSet<&String> = read_type.fields.keys().collect(); + if !names.is_subset(&read_names) { + return Err(Error::generic( + "Struct has column that does not exist in the read schema", + )); + } + for read_field in read_type.fields() { + match self.fields.get(read_field.name()) { + Some(existing_field) => existing_field.can_read_as(read_field)?, + None => require!( + read_field.is_nullable(), + Error::generic( + "read type has non-nullable column that does not exist in this struct", + ) + ), + } + } + + Ok(()) + } +} + +impl DataType { + fn can_read_as(&self, read_type: &DataType) -> DeltaResult<()> { + match (self, read_type) { + // TODO: Add support for type widening + (DataType::Array(self_array), DataType::Array(read_array)) => { + NullabilityCheck { + nullable: self_array.contains_null(), + read_nullable: read_array.contains_null(), + } + .is_compatible()?; + self_array + .element_type() + .can_read_as(read_array.element_type())?; + } + (DataType::Struct(self_struct), DataType::Struct(read_struct)) => { + self_struct.can_read_as(read_struct)? + } + (DataType::Map(self_map), DataType::Map(read_map)) => { + NullabilityCheck { + nullable: self_map.value_contains_null(), + read_nullable: read_map.value_contains_null(), + } + .is_compatible()?; + self_map.key_type().can_read_as(read_map.key_type())?; + self_map.value_type().can_read_as(read_map.value_type())?; + } + (a, b) => { + require!( + a == b, + Error::generic(format!("Types {} and {} are not compatible", a, b)) + ); + } + }; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; + + #[test] + fn equal_schema() { + let map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let map_value = StructType::new([StructField::new("age", DataType::INTEGER, true)]); + let map_type = MapType::new(map_key, map_value, true); + + let array_type = ArrayType::new(DataType::TIMESTAMP, false); + + let nested_struct = StructType::new([ + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("map", map_type, false), + StructField::new("array", array_type, false), + StructField::new("nested_struct", nested_struct, false), + ]); + + assert!(schema.can_read_as(&schema).is_ok()); + } + + #[test] + fn different_schema_fails() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("company", DataType::STRING, false), + StructField::new("employee_name", DataType::STRING, false), + StructField::new("salary", DataType::LONG, false), + StructField::new("position_name", DataType::STRING, true), + ]); + assert!(existing_schema.can_read_as(&read_schema).is_err()); + } + + #[test] + fn map_nullability_and_ok_schema_evolution() { + let existing_map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let existing_map_value = + StructType::new([StructField::new("age", DataType::INTEGER, true)]); + let existing_schema = StructType::new([StructField::new( + "map", + MapType::new(existing_map_key, existing_map_value, false), + false, + )]); + + let read_map_key = StructType::new([ + StructField::new("id", DataType::LONG, true), + StructField::new("name", DataType::STRING, true), + StructField::new("location", DataType::STRING, true), + ]); + let read_map_value = StructType::new([ + StructField::new("age", DataType::INTEGER, true), + StructField::new("years_of_experience", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([StructField::new( + "map", + MapType::new(read_map_key, read_map_value, true), + false, + )]); + + assert!(existing_schema.can_read_as(&read_schema).is_ok()); + } + #[test] + fn map_value_becomes_non_nullable_fails() { + let map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let map_value = StructType::new([StructField::new("age", DataType::INTEGER, true)]); + let existing_schema = StructType::new([StructField::new( + "map", + MapType::new(map_key, map_value, false), + false, + )]); + + let map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let map_value = StructType::new([StructField::new("age", DataType::INTEGER, false)]); + let read_schema = StructType::new([StructField::new( + "map", + MapType::new(map_key, map_value, false), + false, + )]); + + assert!(existing_schema.can_read_as(&read_schema).is_err()); + } + #[test] + fn map_schema_new_non_nullable_value_fails() { + let existing_map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let existing_map_value = + StructType::new([StructField::new("age", DataType::INTEGER, true)]); + let existing_schema = StructType::new([StructField::new( + "map", + MapType::new(existing_map_key, existing_map_value, false), + false, + )]); + + let read_map_key = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + ]); + let read_map_value = StructType::new([ + StructField::new("age", DataType::INTEGER, true), + StructField::new("years_of_experience", DataType::INTEGER, false), + ]); + let read_schema = StructType::new([StructField::new( + "map", + MapType::new(read_map_key, read_map_value, false), + false, + )]); + + assert!(existing_schema.can_read_as(&read_schema).is_err()); + } + + #[test] + fn different_field_name_fails() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("new_id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + assert!(existing_schema.can_read_as(&read_schema).is_err()); + } + + #[test] + fn different_type_fails() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("id", DataType::INTEGER, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + assert!(existing_schema.can_read_as(&read_schema).is_err()); + } + #[test] + fn set_nullable_to_true() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, true), + StructField::new("age", DataType::INTEGER, true), + ]); + assert!(existing_schema.can_read_as(&read_schema).is_ok()); + } + #[test] + fn set_nullable_to_false_fails() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + let read_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, false), + ]); + assert!(existing_schema.can_read_as(&read_schema).is_err()); + } + #[test] + fn new_nullable_column() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + + let read_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + StructField::new("location", DataType::STRING, true), + ]); + assert!(existing_schema.can_read_as(&read_schema).is_ok()); + } + + #[test] + fn new_non_nullable_column_fails() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + + let read_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + StructField::new("location", DataType::STRING, false), + ]); + assert!(existing_schema.can_read_as(&read_schema).is_err()); + } +} From 39e3755ee0587596cc879388345122188dfa8f9c Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Fri, 3 Jan 2025 15:34:06 -0800 Subject: [PATCH 04/16] Remove old schema compat --- kernel/src/schema_compat.rs | 11 +- kernel/src/table_changes/mod.rs | 1 - kernel/src/table_changes/schema_compat.rs | 302 ---------------------- 3 files changed, 5 insertions(+), 309 deletions(-) delete mode 100644 kernel/src/table_changes/schema_compat.rs diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index 33fd8e618..a2e5079c8 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -1,15 +1,14 @@ use std::collections::HashSet; -use crate::{ - schema::{DataType, StructField, StructType}, - utils::require, - DeltaResult, Error, -}; +use crate::schema::{DataType, StructField, StructType}; +use crate::utils::require; +use crate::{DeltaResult, Error}; struct NullabilityCheck { nullable: bool, read_nullable: bool, } + impl NullabilityCheck { fn is_compatible(&self) -> DeltaResult<()> { // The case to avoid is when the read_schema is non-nullable and the existing one is nullable. @@ -46,6 +45,7 @@ impl StructField { } } impl StructType { + #[allow(unused)] pub(crate) fn can_read_as(&self, read_type: &StructType) -> DeltaResult<()> { // Delta tables do not allow fields that differ in name only by case let names: HashSet<&String> = self.fields.keys().collect(); @@ -66,7 +66,6 @@ impl StructType { ), } } - Ok(()) } } diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 96ecaf648..a855668d8 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -52,7 +52,6 @@ mod physical_to_logical; mod resolve_dvs; pub mod scan; mod scan_file; -mod schema_compat; static CHANGE_TYPE_COL_NAME: &str = "_change_type"; static COMMIT_VERSION_COL_NAME: &str = "_commit_version"; diff --git a/kernel/src/table_changes/schema_compat.rs b/kernel/src/table_changes/schema_compat.rs deleted file mode 100644 index d2593a985..000000000 --- a/kernel/src/table_changes/schema_compat.rs +++ /dev/null @@ -1,302 +0,0 @@ -use std::collections::{HashMap, HashSet}; - -use crate::schema::{DataType, Schema, StructField, StructType}; - -fn is_nullability_compatible(existing_nullable: bool, read_nullable: bool) -> bool { - // The case to avoid is when the read_schema is non-nullable and the existing one is nullable. - // So we avoid the case where !read_nullable && existing_nullable - // Hence we check that !(!read_nullable && existing_nullable) - // == read_nullable || !existing_nullable - read_nullable || !existing_nullable -} -fn is_struct_read_compatible(existing: &StructType, read_type: &StructType) -> bool { - // Delta tables do not allow fields that differ in name only by case - let existing_fields: HashMap<&String, &StructField> = existing - .fields() - .map(|field| (field.name(), field)) - .collect(); - - let existing_names: HashSet = existing - .fields() - .map(|field| field.name().clone()) - .collect(); - let read_names: HashSet = read_type - .fields() - .map(|field| field.name().clone()) - .collect(); - if !existing_names.is_subset(&read_names) { - return false; - } - read_type - .fields() - .all(|read_field| match existing_fields.get(read_field.name()) { - Some(existing_field) => { - let name_equal = existing_field.name() == read_field.name(); - - let nullability_equal = - is_nullability_compatible(existing_field.nullable, read_field.nullable); - let data_type_equal = - is_datatype_read_compatible(existing_field.data_type(), read_field.data_type()); - println!( - "name_equal {} nullability: {}, datatype: {}", - name_equal, nullability_equal, data_type_equal - ); - name_equal && nullability_equal && data_type_equal - } - None => read_field.is_nullable(), - }) -} -fn is_datatype_read_compatible(existing: &DataType, read_type: &DataType) -> bool { - match (existing, read_type) { - // TODO: Add support for type widening - (DataType::Array(a), DataType::Array(b)) => { - is_datatype_read_compatible(a.element_type(), b.element_type()) - && is_nullability_compatible(a.contains_null(), b.contains_null()) - } - (DataType::Struct(a), DataType::Struct(b)) => is_struct_read_compatible(a, b), - (DataType::Map(a), DataType::Map(b)) => { - is_nullability_compatible(a.value_contains_null(), b.value_contains_null()) - && is_datatype_read_compatible(a.key_type(), b.key_type()) - && is_datatype_read_compatible(a.value_type(), b.value_type()) - } - (a, b) => a == b, - } -} - -fn is_partition_compatible( - existing_partition_cols: &[String], - read_partition_cols: &[String], -) -> bool { - existing_partition_cols == read_partition_cols -} - -fn is_schema_compatible(existing_schema: &Schema, read_schema: &Schema) -> bool { - is_struct_read_compatible(existing_schema, read_schema) -} -#[cfg(test)] -mod tests { - - use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; - - use super::is_schema_compatible; - - #[test] - fn equal_schema() { - let map_key = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - ]); - let map_value = StructType::new([StructField::new("age", DataType::INTEGER, true)]); - let map_type = MapType::new(map_key, map_value, true); - - let array_type = ArrayType::new(DataType::TIMESTAMP, false); - - let nested_struct = StructType::new([ - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - let schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("map", map_type, false), - StructField::new("array", array_type, false), - StructField::new("nested_struct", nested_struct, false), - ]); - - assert!(is_schema_compatible(&schema, &schema)); - } - - #[test] - fn different_schema_fails() { - let existing_schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - let read_schema = StructType::new([ - StructField::new("company", DataType::STRING, false), - StructField::new("employee_name", DataType::STRING, false), - StructField::new("salary", DataType::LONG, false), - StructField::new("position_name", DataType::STRING, true), - ]); - assert!(!is_schema_compatible(&existing_schema, &read_schema)); - } - - #[test] - fn map_nullability_and_ok_schema_evolution() { - let existing_map_key = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - ]); - let existing_map_value = - StructType::new([StructField::new("age", DataType::INTEGER, true)]); - let existing_schema = StructType::new([StructField::new( - "map", - MapType::new(existing_map_key, existing_map_value, false), - false, - )]); - - let read_map_key = StructType::new([ - StructField::new("id", DataType::LONG, true), - StructField::new("name", DataType::STRING, true), - StructField::new("location", DataType::STRING, true), - ]); - let read_map_value = StructType::new([ - StructField::new("age", DataType::INTEGER, true), - StructField::new("years_of_experience", DataType::INTEGER, true), - ]); - let read_schema = StructType::new([StructField::new( - "map", - MapType::new(read_map_key, read_map_value, true), - false, - )]); - - assert!(is_schema_compatible(&existing_schema, &read_schema)); - } - #[test] - fn map_value_becomes_non_nullable_fails() { - let map_key = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - ]); - let map_value = StructType::new([StructField::new("age", DataType::INTEGER, true)]); - let existing_schema = StructType::new([StructField::new( - "map", - MapType::new(map_key, map_value, false), - false, - )]); - - let map_key = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - ]); - let map_value = StructType::new([StructField::new("age", DataType::INTEGER, false)]); - let read_schema = StructType::new([StructField::new( - "map", - MapType::new(map_key, map_value, false), - false, - )]); - - assert!(!is_schema_compatible(&existing_schema, &read_schema)); - } - #[test] - fn map_schema_new_non_nullable_value_fails() { - let existing_map_key = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - ]); - let existing_map_value = - StructType::new([StructField::new("age", DataType::INTEGER, true)]); - let existing_schema = StructType::new([StructField::new( - "map", - MapType::new(existing_map_key, existing_map_value, false), - false, - )]); - - let read_map_key = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - ]); - let read_map_value = StructType::new([ - StructField::new("age", DataType::INTEGER, true), - StructField::new("years_of_experience", DataType::INTEGER, false), - ]); - let read_schema = StructType::new([StructField::new( - "map", - MapType::new(read_map_key, read_map_value, false), - false, - )]); - - assert!(!is_schema_compatible(&existing_schema, &read_schema)); - } - - #[test] - fn different_field_name_fails() { - let existing_schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - let read_schema = StructType::new([ - StructField::new("new_id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - assert!(!is_schema_compatible(&existing_schema, &read_schema)); - } - - #[test] - fn different_type_fails() { - let existing_schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - let read_schema = StructType::new([ - StructField::new("id", DataType::INTEGER, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - assert!(!is_schema_compatible(&existing_schema, &read_schema)); - } - #[test] - fn set_nullable_to_true() { - let existing_schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - let read_schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, true), - StructField::new("age", DataType::INTEGER, true), - ]); - assert!(is_schema_compatible(&existing_schema, &read_schema)); - } - #[test] - fn set_nullable_to_false_fails() { - let existing_schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - let read_schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, false), - ]); - assert!(!is_schema_compatible(&existing_schema, &read_schema)); - } - #[test] - fn new_nullable_column() { - let existing = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - - let read = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - StructField::new("location", DataType::STRING, true), - ]); - assert!(is_schema_compatible(&existing, &read)); - } - - #[test] - fn new_non_nullable_column_fails() { - let existing = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - - let read = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - StructField::new("location", DataType::STRING, false), - ]); - assert!(!is_schema_compatible(&existing, &read)); - } -} From e50a8dd8cbe7bc83242f1ab4ea3ac11a26751683 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 6 Jan 2025 16:46:44 -0800 Subject: [PATCH 05/16] change naming, remove redundant test --- kernel/src/schema_compat.rs | 61 +++++++++---------------------------- 1 file changed, 15 insertions(+), 46 deletions(-) diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index a2e5079c8..b594216db 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -11,8 +11,8 @@ struct NullabilityCheck { impl NullabilityCheck { fn is_compatible(&self) -> DeltaResult<()> { - // The case to avoid is when the read_schema is non-nullable and the existing one is nullable. - // So we avoid the case where !read_nullable && existing_nullable + // The case to avoid is when the column is nullable, but the read schema specifies the + // column as non-nullable. So we avoid the case where !read_nullable && nullable // Hence we check that !(!read_nullable && existing_nullable) // == read_nullable || !existing_nullable require!( @@ -28,7 +28,7 @@ impl StructField { require!( self.name() == read_field.name(), Error::generic(format!( - "field names {} and {} are not the same", + "Struct field with name {} cannot be read with name {}", self.name(), read_field.name() )) @@ -48,8 +48,9 @@ impl StructType { #[allow(unused)] pub(crate) fn can_read_as(&self, read_type: &StructType) -> DeltaResult<()> { // Delta tables do not allow fields that differ in name only by case - let names: HashSet<&String> = self.fields.keys().collect(); - let read_names: HashSet<&String> = read_type.fields.keys().collect(); + let names: HashSet = self.fields.keys().map(|x| x.to_lowercase()).collect(); + let read_names: HashSet = + read_type.fields.keys().map(|x| x.to_lowercase()).collect(); if !names.is_subset(&read_names) { return Err(Error::generic( "Struct has column that does not exist in the read schema", @@ -73,7 +74,6 @@ impl StructType { impl DataType { fn can_read_as(&self, read_type: &DataType) -> DeltaResult<()> { match (self, read_type) { - // TODO: Add support for type widening (DataType::Array(self_array), DataType::Array(read_array)) => { NullabilityCheck { nullable: self_array.contains_null(), @@ -97,6 +97,8 @@ impl DataType { self_map.value_type().can_read_as(read_map.value_type())?; } (a, b) => { + // TODO: In the future, we will change this to support type widening. + // See: https://github.com/delta-io/delta-kernel-rs/issues/623 require!( a == b, Error::generic(format!("Types {} and {} are not compatible", a, b)) @@ -113,16 +115,14 @@ mod tests { use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; #[test] - fn equal_schema() { + fn can_read_is_reflexive() { let map_key = StructType::new([ StructField::new("id", DataType::LONG, false), StructField::new("name", DataType::STRING, false), ]); let map_value = StructType::new([StructField::new("age", DataType::INTEGER, true)]); let map_type = MapType::new(map_key, map_value, true); - let array_type = ArrayType::new(DataType::TIMESTAMP, false); - let nested_struct = StructType::new([ StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, true), @@ -154,13 +154,13 @@ mod tests { } #[test] - fn map_nullability_and_ok_schema_evolution() { + fn add_nullable_column_to_map_key_and_value() { let existing_map_key = StructType::new([ StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), + StructField::new("name", DataType::STRING, true), ]); let existing_map_value = - StructType::new([StructField::new("age", DataType::INTEGER, true)]); + StructType::new([StructField::new("age", DataType::INTEGER, false)]); let existing_schema = StructType::new([StructField::new( "map", MapType::new(existing_map_key, existing_map_value, false), @@ -168,7 +168,7 @@ mod tests { )]); let read_map_key = StructType::new([ - StructField::new("id", DataType::LONG, true), + StructField::new("id", DataType::LONG, false), StructField::new("name", DataType::STRING, true), StructField::new("location", DataType::STRING, true), ]); @@ -178,7 +178,7 @@ mod tests { ]); let read_schema = StructType::new([StructField::new( "map", - MapType::new(read_map_key, read_map_value, true), + MapType::new(read_map_key, read_map_value, false), false, )]); @@ -210,37 +210,6 @@ mod tests { assert!(existing_schema.can_read_as(&read_schema).is_err()); } - #[test] - fn map_schema_new_non_nullable_value_fails() { - let existing_map_key = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - ]); - let existing_map_value = - StructType::new([StructField::new("age", DataType::INTEGER, true)]); - let existing_schema = StructType::new([StructField::new( - "map", - MapType::new(existing_map_key, existing_map_value, false), - false, - )]); - - let read_map_key = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - ]); - let read_map_value = StructType::new([ - StructField::new("age", DataType::INTEGER, true), - StructField::new("years_of_experience", DataType::INTEGER, false), - ]); - let read_schema = StructType::new([StructField::new( - "map", - MapType::new(read_map_key, read_map_value, false), - false, - )]); - - assert!(existing_schema.can_read_as(&read_schema).is_err()); - } - #[test] fn different_field_name_fails() { let existing_schema = StructType::new([ @@ -249,7 +218,7 @@ mod tests { StructField::new("age", DataType::INTEGER, true), ]); let read_schema = StructType::new([ - StructField::new("new_id", DataType::LONG, false), + StructField::new("Id", DataType::LONG, false), StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, true), ]); From cc4f067c926050663ea8f9e8a3dff2feb2293f93 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 6 Jan 2025 18:19:26 -0800 Subject: [PATCH 06/16] Improve comments, add check that field names aren't case-insensitive duplicates --- kernel/src/schema_compat.rs | 81 ++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 32 deletions(-) diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index b594216db..1dfd899ee 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use crate::schema::{DataType, StructField, StructType}; use crate::utils::require; @@ -47,24 +47,43 @@ impl StructField { impl StructType { #[allow(unused)] pub(crate) fn can_read_as(&self, read_type: &StructType) -> DeltaResult<()> { + let field_map: HashMap = self + .fields + .iter() + .map(|(name, field)| (name.to_lowercase(), field)) + .collect(); // Delta tables do not allow fields that differ in name only by case - let names: HashSet = self.fields.keys().map(|x| x.to_lowercase()).collect(); - let read_names: HashSet = + require!( + field_map.len() == self.fields.len(), + Error::generic("Delta tables don't allow field names that only differ by case") + ); + + let read_field_names: HashSet = read_type.fields.keys().map(|x| x.to_lowercase()).collect(); - if !names.is_subset(&read_names) { + require!( + read_field_names.len() == read_type.fields.len(), + Error::generic("Delta tables don't allow field names that only differ by case") + ); + + // Check that the field names are a subset of the read fields. + if !field_map.keys().all(|name| read_field_names.contains(name)) { return Err(Error::generic( "Struct has column that does not exist in the read schema", )); } for read_field in read_type.fields() { - match self.fields.get(read_field.name()) { + match field_map.get(&read_field.name().to_lowercase()) { Some(existing_field) => existing_field.can_read_as(read_field)?, - None => require!( - read_field.is_nullable(), - Error::generic( - "read type has non-nullable column that does not exist in this struct", - ) - ), + None => { + // Note: Delta spark does not perform the following check. Hence it ignores fields + // that exist in the read schema that aren't in this schema. + require!( + read_field.is_nullable(), + Error::generic( + "read type has non-nullable column that does not exist in this struct", + ) + ); + } } } Ok(()) @@ -111,7 +130,6 @@ impl DataType { #[cfg(test)] mod tests { - use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; #[test] @@ -136,23 +154,6 @@ mod tests { assert!(schema.can_read_as(&schema).is_ok()); } - - #[test] - fn different_schema_fails() { - let existing_schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("name", DataType::STRING, false), - StructField::new("age", DataType::INTEGER, true), - ]); - let read_schema = StructType::new([ - StructField::new("company", DataType::STRING, false), - StructField::new("employee_name", DataType::STRING, false), - StructField::new("salary", DataType::LONG, false), - StructField::new("position_name", DataType::STRING, true), - ]); - assert!(existing_schema.can_read_as(&read_schema).is_err()); - } - #[test] fn add_nullable_column_to_map_key_and_value() { let existing_map_key = StructType::new([ @@ -211,7 +212,8 @@ mod tests { assert!(existing_schema.can_read_as(&read_schema).is_err()); } #[test] - fn different_field_name_fails() { + fn different_field_name_case_fails() { + // names differing only in case are not the same let existing_schema = StructType::new([ StructField::new("id", DataType::LONG, false), StructField::new("name", DataType::STRING, false), @@ -224,7 +226,6 @@ mod tests { ]); assert!(existing_schema.can_read_as(&read_schema).is_err()); } - #[test] fn different_type_fails() { let existing_schema = StructType::new([ @@ -283,7 +284,6 @@ mod tests { ]); assert!(existing_schema.can_read_as(&read_schema).is_ok()); } - #[test] fn new_non_nullable_column_fails() { let existing_schema = StructType::new([ @@ -300,4 +300,21 @@ mod tests { ]); assert!(existing_schema.can_read_as(&read_schema).is_err()); } + #[test] + fn duplicate_field_modulo_case() { + let existing_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("Id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + + let read_schema = StructType::new([ + StructField::new("id", DataType::LONG, false), + StructField::new("Id", DataType::LONG, false), + StructField::new("name", DataType::STRING, false), + StructField::new("age", DataType::INTEGER, true), + ]); + assert!(existing_schema.can_read_as(&read_schema).is_err()); + } } From 112ef819bf7fe26db0da7097ac8960d8540f3406 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 6 Jan 2025 18:31:10 -0800 Subject: [PATCH 07/16] Change ordering, remove unnecessary comment --- kernel/src/schema_compat.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index 1dfd899ee..6b3ffc6b9 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -25,6 +25,11 @@ impl NullabilityCheck { impl StructField { fn can_read_as(&self, read_field: &StructField) -> DeltaResult<()> { + NullabilityCheck { + nullable: self.nullable, + read_nullable: read_field.nullable, + } + .is_compatible()?; require!( self.name() == read_field.name(), Error::generic(format!( @@ -33,13 +38,6 @@ impl StructField { read_field.name() )) ); - - NullabilityCheck { - nullable: self.nullable, - read_nullable: read_field.nullable, - } - .is_compatible()?; - self.data_type().can_read_as(read_field.data_type())?; Ok(()) } @@ -52,7 +50,6 @@ impl StructType { .iter() .map(|(name, field)| (name.to_lowercase(), field)) .collect(); - // Delta tables do not allow fields that differ in name only by case require!( field_map.len() == self.fields.len(), Error::generic("Delta tables don't allow field names that only differ by case") From 17148fd731b8168d75d6b4c94e7beb96f31ea13a Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 7 Jan 2025 15:33:03 -0800 Subject: [PATCH 08/16] Add comments to can_read_as methods --- kernel/src/schema_compat.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index 6b3ffc6b9..e77de5e2f 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -1,15 +1,41 @@ +//! Provides utilities to perform comparisons between a [`Schema`]s. The api used to check schema +//! compatibility is [`can_read_as`]. +//! +//! # Examples +//! ```rust, ignore +//! # use delta_kernel::schema::StructType; +//! # use delta_kernel::schema::StructField; +//! # use delta_kernel::schema::DataType; +//! let schema = StructType::new([ +//! StructField::new("id", DataType::LONG, false), +//! StructField::new("value", DataType::STRING, true), +//! ]); +//! let read_schema = StructType::new([ +//! StructField::new("id", DataType::LONG, true), +//! StructField::new("value", DataType::STRING, true), +//! StructField::new("year", DataType::INTEGER, true), +//! ]); +//! assert!(schema.can_read_as(&read_schema).is_ok()); +//! ```` +//! +//! [`Schema`]: crate::schema::Schema use std::collections::{HashMap, HashSet}; use crate::schema::{DataType, StructField, StructType}; use crate::utils::require; use crate::{DeltaResult, Error}; +/// Represents a nullability comparison between two schemas' fields. A schema whose field +/// has nullability `nullable` is being read with a schema whose field has nullability +/// `read_nullable`. `is_compatible` is true if the read nullability is the same or wider than +/// the schema's nullability. struct NullabilityCheck { nullable: bool, read_nullable: bool, } impl NullabilityCheck { + /// Returns true if a value with nullability `nullable` can be read using `read_nullable`. fn is_compatible(&self) -> DeltaResult<()> { // The case to avoid is when the column is nullable, but the read schema specifies the // column as non-nullable. So we avoid the case where !read_nullable && nullable @@ -24,6 +50,7 @@ impl NullabilityCheck { } impl StructField { + /// Returns `Ok` if this [`StructField`] can be read as `read_field` in the read schema. fn can_read_as(&self, read_field: &StructField) -> DeltaResult<()> { NullabilityCheck { nullable: self.nullable, @@ -44,6 +71,7 @@ impl StructField { } impl StructType { #[allow(unused)] + /// Returns `Ok` if this [`StructType`] can be read as `read_type` in the read schema. pub(crate) fn can_read_as(&self, read_type: &StructType) -> DeltaResult<()> { let field_map: HashMap = self .fields @@ -88,6 +116,7 @@ impl StructType { } impl DataType { + /// Returns `Ok` if this [`DataType`] can be read as `read_type` in the read schema. fn can_read_as(&self, read_type: &DataType) -> DeltaResult<()> { match (self, read_type) { (DataType::Array(self_array), DataType::Array(read_array)) => { From d80b8e0d4e5e9bbeadb7f8fbfd9c226c09b8bfe1 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Wed, 8 Jan 2025 15:21:15 -0800 Subject: [PATCH 09/16] Move to struct/impl based nullability check --- kernel/src/schema_compat.rs | 40 ++++++++++++------------------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index e77de5e2f..37c8e8899 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -25,24 +25,20 @@ use crate::schema::{DataType, StructField, StructType}; use crate::utils::require; use crate::{DeltaResult, Error}; -/// Represents a nullability comparison between two schemas' fields. A schema whose field -/// has nullability `nullable` is being read with a schema whose field has nullability -/// `read_nullable`. `is_compatible` is true if the read nullability is the same or wider than -/// the schema's nullability. -struct NullabilityCheck { - nullable: bool, - read_nullable: bool, -} +/// The nullability flag of a schema's field. This can be compared with a read schema field's +/// nullability flag using [`NullabilityFlag::can_read_as`]. +struct NullabilityFlag(bool); -impl NullabilityCheck { - /// Returns true if a value with nullability `nullable` can be read using `read_nullable`. - fn is_compatible(&self) -> DeltaResult<()> { +impl NullabilityFlag { + /// Represents a nullability comparison between two schemas' fields. Returns true if the + /// read nullability is the same or wider than the nullability of self. + fn can_read_as(&self, read_nullable: NullabilityFlag) -> DeltaResult<()> { // The case to avoid is when the column is nullable, but the read schema specifies the // column as non-nullable. So we avoid the case where !read_nullable && nullable // Hence we check that !(!read_nullable && existing_nullable) // == read_nullable || !existing_nullable require!( - self.read_nullable || !self.nullable, + read_nullable.0 || !self.0, Error::generic("Read field is non-nullable while this field is nullable") ); Ok(()) @@ -52,11 +48,7 @@ impl NullabilityCheck { impl StructField { /// Returns `Ok` if this [`StructField`] can be read as `read_field` in the read schema. fn can_read_as(&self, read_field: &StructField) -> DeltaResult<()> { - NullabilityCheck { - nullable: self.nullable, - read_nullable: read_field.nullable, - } - .is_compatible()?; + NullabilityFlag(self.nullable).can_read_as(NullabilityFlag(read_field.nullable))?; require!( self.name() == read_field.name(), Error::generic(format!( @@ -120,11 +112,8 @@ impl DataType { fn can_read_as(&self, read_type: &DataType) -> DeltaResult<()> { match (self, read_type) { (DataType::Array(self_array), DataType::Array(read_array)) => { - NullabilityCheck { - nullable: self_array.contains_null(), - read_nullable: read_array.contains_null(), - } - .is_compatible()?; + NullabilityFlag(self_array.contains_null()) + .can_read_as(NullabilityFlag(read_array.contains_null()))?; self_array .element_type() .can_read_as(read_array.element_type())?; @@ -133,11 +122,8 @@ impl DataType { self_struct.can_read_as(read_struct)? } (DataType::Map(self_map), DataType::Map(read_map)) => { - NullabilityCheck { - nullable: self_map.value_contains_null(), - read_nullable: read_map.value_contains_null(), - } - .is_compatible()?; + NullabilityFlag(self_map.value_contains_null()) + .can_read_as(NullabilityFlag(read_map.value_contains_null()))?; self_map.key_type().can_read_as(read_map.key_type())?; self_map.value_type().can_read_as(read_map.value_type())?; } From 8510615ec23a7c5e9599cac9598582b3451a3d00 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Wed, 8 Jan 2025 15:44:49 -0800 Subject: [PATCH 10/16] Make StructType::can_read_as developer-visibility pub --- kernel/src/schema_compat.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index 37c8e8899..132f62ae3 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -2,7 +2,7 @@ //! compatibility is [`can_read_as`]. //! //! # Examples -//! ```rust, ignore +//! ```rust //! # use delta_kernel::schema::StructType; //! # use delta_kernel::schema::StructField; //! # use delta_kernel::schema::DataType; @@ -64,6 +64,7 @@ impl StructField { impl StructType { #[allow(unused)] /// Returns `Ok` if this [`StructType`] can be read as `read_type` in the read schema. + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) fn can_read_as(&self, read_type: &StructType) -> DeltaResult<()> { let field_map: HashMap = self .fields From b8cd7fb117e41fb389f44cc1f2ec53d42729e728 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Wed, 8 Jan 2025 16:05:17 -0800 Subject: [PATCH 11/16] Switch macro order --- kernel/src/schema_compat.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index 132f62ae3..ff95e13af 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -62,8 +62,8 @@ impl StructField { } } impl StructType { - #[allow(unused)] /// Returns `Ok` if this [`StructType`] can be read as `read_type` in the read schema. + #[allow(unused)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) fn can_read_as(&self, read_type: &StructType) -> DeltaResult<()> { let field_map: HashMap = self From 5d2bd6095731bf19c5f4b527e8266aeadda88c6e Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 9 Jan 2025 17:52:50 -0800 Subject: [PATCH 12/16] remove imports to make it clear the types that are impl --- kernel/src/schema_compat.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index ff95e13af..39e464872 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -21,7 +21,6 @@ //! [`Schema`]: crate::schema::Schema use std::collections::{HashMap, HashSet}; -use crate::schema::{DataType, StructField, StructType}; use crate::utils::require; use crate::{DeltaResult, Error}; @@ -45,9 +44,9 @@ impl NullabilityFlag { } } -impl StructField { +impl crate::schema::StructField { /// Returns `Ok` if this [`StructField`] can be read as `read_field` in the read schema. - fn can_read_as(&self, read_field: &StructField) -> DeltaResult<()> { + fn can_read_as(&self, read_field: &Self) -> DeltaResult<()> { NullabilityFlag(self.nullable).can_read_as(NullabilityFlag(read_field.nullable))?; require!( self.name() == read_field.name(), @@ -61,12 +60,12 @@ impl StructField { Ok(()) } } -impl StructType { +impl crate::schema::StructType { /// Returns `Ok` if this [`StructType`] can be read as `read_type` in the read schema. #[allow(unused)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] - pub(crate) fn can_read_as(&self, read_type: &StructType) -> DeltaResult<()> { - let field_map: HashMap = self + pub(crate) fn can_read_as(&self, read_type: &Self) -> DeltaResult<()> { + let field_map: HashMap = self .fields .iter() .map(|(name, field)| (name.to_lowercase(), field)) @@ -108,21 +107,21 @@ impl StructType { } } -impl DataType { +impl crate::schema::DataType { /// Returns `Ok` if this [`DataType`] can be read as `read_type` in the read schema. - fn can_read_as(&self, read_type: &DataType) -> DeltaResult<()> { + fn can_read_as(&self, read_type: &Self) -> DeltaResult<()> { match (self, read_type) { - (DataType::Array(self_array), DataType::Array(read_array)) => { + (Self::Array(self_array), Self::Array(read_array)) => { NullabilityFlag(self_array.contains_null()) .can_read_as(NullabilityFlag(read_array.contains_null()))?; self_array .element_type() .can_read_as(read_array.element_type())?; } - (DataType::Struct(self_struct), DataType::Struct(read_struct)) => { + (Self::Struct(self_struct), Self::Struct(read_struct)) => { self_struct.can_read_as(read_struct)? } - (DataType::Map(self_map), DataType::Map(read_map)) => { + (Self::Map(self_map), Self::Map(read_map)) => { NullabilityFlag(self_map.value_contains_null()) .can_read_as(NullabilityFlag(read_map.value_contains_null()))?; self_map.key_type().can_read_as(read_map.key_type())?; From e7768b4873419b26f96f585034f438522a45d5bd Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Fri, 10 Jan 2025 11:05:02 -0800 Subject: [PATCH 13/16] ignore doctest for private member --- kernel/src/schema_compat.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema_compat.rs index 39e464872..ccc748a04 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema_compat.rs @@ -2,7 +2,7 @@ //! compatibility is [`can_read_as`]. //! //! # Examples -//! ```rust +//! ```rust, ignore //! # use delta_kernel::schema::StructType; //! # use delta_kernel::schema::StructField; //! # use delta_kernel::schema::DataType; @@ -63,7 +63,6 @@ impl crate::schema::StructField { impl crate::schema::StructType { /// Returns `Ok` if this [`StructType`] can be read as `read_type` in the read schema. #[allow(unused)] - #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) fn can_read_as(&self, read_type: &Self) -> DeltaResult<()> { let field_map: HashMap = self .fields From d0f3ebcf96cbb443cfa140dd309729a7b128ca5d Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 21 Jan 2025 14:53:15 -0800 Subject: [PATCH 14/16] Address PR comments --- kernel/src/lib.rs | 1 - kernel/src/{schema.rs => schema/mod.rs} | 2 + .../schema_compare.rs} | 195 ++++++++++++------ 3 files changed, 133 insertions(+), 65 deletions(-) rename kernel/src/{schema.rs => schema/mod.rs} (99%) rename kernel/src/{schema_compat.rs => schema/schema_compare.rs} (63%) diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 57c88e741..49dceea75 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -87,7 +87,6 @@ pub mod table_properties; pub mod transaction; pub(crate) mod predicates; -pub(crate) mod schema_compat; pub(crate) mod utils; #[cfg(feature = "developer-visibility")] diff --git a/kernel/src/schema.rs b/kernel/src/schema/mod.rs similarity index 99% rename from kernel/src/schema.rs rename to kernel/src/schema/mod.rs index a4cd44a6a..a1e6250a2 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema/mod.rs @@ -14,6 +14,8 @@ pub(crate) use crate::expressions::{column_name, ColumnName}; use crate::utils::require; use crate::{DeltaResult, Error}; +pub(crate) mod schema_compare; + pub type Schema = StructType; pub type SchemaRef = Arc; diff --git a/kernel/src/schema_compat.rs b/kernel/src/schema/schema_compare.rs similarity index 63% rename from kernel/src/schema_compat.rs rename to kernel/src/schema/schema_compare.rs index ccc748a04..48e7d0b1c 100644 --- a/kernel/src/schema_compat.rs +++ b/kernel/src/schema/schema_compare.rs @@ -15,6 +15,7 @@ //! StructField::new("value", DataType::STRING, true), //! StructField::new("year", DataType::INTEGER, true), //! ]); +//! // Schemas are compatible since the `year` value is nullable //! assert!(schema.can_read_as(&read_schema).is_ok()); //! ```` //! @@ -22,83 +23,109 @@ use std::collections::{HashMap, HashSet}; use crate::utils::require; -use crate::{DeltaResult, Error}; + +use super::{DataType, StructField, StructType}; + +/// Represents the ways a schema comparison can fail. +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + #[error("The nullability was tightened for a schema field")] + NullabilityTightening, + #[error("Field names do not match")] + FieldNameMismatch, + #[error("Schema is invalid")] + InvalidSchema, + #[error("The read schema is missing a column present in the schema")] + MissingColumn, + #[error("Read schema has a non-nullable column that is not present in the schema")] + NewNonNullableColumn, + #[error("Types for two schema fields did not match")] + TypeMismatch, +} + +/// A [`std::result::Result`] that has the schema comparison [`Error`] as the error variant. +#[allow(unused)] +pub(crate) type SchemaComparisonResult = Result<(), Error>; + +/// Represents a schema compatibility check for the type. If `self` can be read as `read_type`, +/// this function returns `Ok(())`. Otherwise, this function returns `Err`. +#[allow(unused)] +pub(crate) trait SchemaComparison { + fn can_read_as(&self, read_type: &Self) -> SchemaComparisonResult; +} /// The nullability flag of a schema's field. This can be compared with a read schema field's /// nullability flag using [`NullabilityFlag::can_read_as`]. -struct NullabilityFlag(bool); +#[allow(unused)] +pub(crate) struct NullabilityFlag(bool); -impl NullabilityFlag { +impl SchemaComparison for NullabilityFlag { /// Represents a nullability comparison between two schemas' fields. Returns true if the /// read nullability is the same or wider than the nullability of self. - fn can_read_as(&self, read_nullable: NullabilityFlag) -> DeltaResult<()> { + fn can_read_as(&self, read_nullable: &NullabilityFlag) -> SchemaComparisonResult { // The case to avoid is when the column is nullable, but the read schema specifies the // column as non-nullable. So we avoid the case where !read_nullable && nullable // Hence we check that !(!read_nullable && existing_nullable) // == read_nullable || !existing_nullable - require!( - read_nullable.0 || !self.0, - Error::generic("Read field is non-nullable while this field is nullable") - ); + require!(read_nullable.0 || !self.0, Error::NullabilityTightening); Ok(()) } } -impl crate::schema::StructField { - /// Returns `Ok` if this [`StructField`] can be read as `read_field` in the read schema. - fn can_read_as(&self, read_field: &Self) -> DeltaResult<()> { - NullabilityFlag(self.nullable).can_read_as(NullabilityFlag(read_field.nullable))?; - require!( - self.name() == read_field.name(), - Error::generic(format!( - "Struct field with name {} cannot be read with name {}", - self.name(), - read_field.name() - )) - ); +impl SchemaComparison for StructField { + /// Returns `Ok` if this [`StructField`] can be read as `read_field`. Three requirements must + /// be satisfied: + /// 1. The read schema field mustn't be non-nullable if this [`StructField`] is nullable. + /// 2. The both this field and `read_field` must have the same name. + /// 3. You can read this data type as the `read_field`'s data type. + fn can_read_as(&self, read_field: &Self) -> SchemaComparisonResult { + NullabilityFlag(self.nullable).can_read_as(&NullabilityFlag(read_field.nullable))?; + require!(self.name() == read_field.name(), Error::FieldNameMismatch); self.data_type().can_read_as(read_field.data_type())?; Ok(()) } } -impl crate::schema::StructType { - /// Returns `Ok` if this [`StructType`] can be read as `read_type` in the read schema. - #[allow(unused)] - pub(crate) fn can_read_as(&self, read_type: &Self) -> DeltaResult<()> { - let field_map: HashMap = self +impl SchemaComparison for StructType { + /// Returns `Ok` if this [`StructType`] can be read as `read_type`. This is the case when: + /// 1. The set of fields in this struct type are a subset of the `read_type`. + /// 2. For each field in this struct, you can read it as the `read_type`'s field. See + /// [`StructField::can_read_as`]. + /// 3. If a field in `read_type` is not present in this struct, then it must be nullable. + /// 4. Both [`StructTypes`] must be valid schemas. No two fields of a structs may share a + /// name that only differs by case. TODO: This check should be moved into the constructor + /// for [`StructType`]. + fn can_read_as(&self, read_type: &Self) -> SchemaComparisonResult { + let lowercase_field_map: HashMap = self .fields .iter() .map(|(name, field)| (name.to_lowercase(), field)) .collect(); require!( - field_map.len() == self.fields.len(), - Error::generic("Delta tables don't allow field names that only differ by case") + lowercase_field_map.len() == self.fields.len(), + Error::InvalidSchema ); - let read_field_names: HashSet = + let lowercase_read_field_names: HashSet = read_type.fields.keys().map(|x| x.to_lowercase()).collect(); require!( - read_field_names.len() == read_type.fields.len(), - Error::generic("Delta tables don't allow field names that only differ by case") + lowercase_read_field_names.len() == read_type.fields.len(), + Error::InvalidSchema ); // Check that the field names are a subset of the read fields. - if !field_map.keys().all(|name| read_field_names.contains(name)) { - return Err(Error::generic( - "Struct has column that does not exist in the read schema", - )); + if !lowercase_field_map + .keys() + .all(|name| lowercase_read_field_names.contains(name)) + { + return Err(Error::MissingColumn); } for read_field in read_type.fields() { - match field_map.get(&read_field.name().to_lowercase()) { + match lowercase_field_map.get(&read_field.name().to_lowercase()) { Some(existing_field) => existing_field.can_read_as(read_field)?, None => { // Note: Delta spark does not perform the following check. Hence it ignores fields // that exist in the read schema that aren't in this schema. - require!( - read_field.is_nullable(), - Error::generic( - "read type has non-nullable column that does not exist in this struct", - ) - ); + require!(read_field.is_nullable(), Error::NewNonNullableColumn); } } } @@ -106,13 +133,20 @@ impl crate::schema::StructType { } } -impl crate::schema::DataType { - /// Returns `Ok` if this [`DataType`] can be read as `read_type` in the read schema. - fn can_read_as(&self, read_type: &Self) -> DeltaResult<()> { +impl SchemaComparison for DataType { + /// Returns `Ok` if this [`DataType`] can be read as `read_type`. This is the case when: + /// 1. The data types are the same. Note: This condition will be relaxed to include + /// compatible data types with type widening. See issue [`#623`] + /// 2. For complex data types, the nested types must be compatible as defined by [`SchemaComparison`] + /// 3. For array data types, the nullability may not be tightened in the `read_type`. See + /// [`NullabilityFlag::can_read_as`] + /// + /// [`#623`]: + fn can_read_as(&self, read_type: &Self) -> SchemaComparisonResult { match (self, read_type) { (Self::Array(self_array), Self::Array(read_array)) => { NullabilityFlag(self_array.contains_null()) - .can_read_as(NullabilityFlag(read_array.contains_null()))?; + .can_read_as(&NullabilityFlag(read_array.contains_null()))?; self_array .element_type() .can_read_as(read_array.element_type())?; @@ -122,17 +156,14 @@ impl crate::schema::DataType { } (Self::Map(self_map), Self::Map(read_map)) => { NullabilityFlag(self_map.value_contains_null()) - .can_read_as(NullabilityFlag(read_map.value_contains_null()))?; + .can_read_as(&NullabilityFlag(read_map.value_contains_null()))?; self_map.key_type().can_read_as(read_map.key_type())?; self_map.value_type().can_read_as(read_map.value_type())?; } (a, b) => { // TODO: In the future, we will change this to support type widening. - // See: https://github.com/delta-io/delta-kernel-rs/issues/623 - require!( - a == b, - Error::generic(format!("Types {} and {} are not compatible", a, b)) - ); + // See: #623 + require!(a == b, Error::TypeMismatch); } }; Ok(()) @@ -141,6 +172,7 @@ impl crate::schema::DataType { #[cfg(test)] mod tests { + use crate::schema::schema_compare::{Error, SchemaComparison}; use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; #[test] @@ -220,7 +252,10 @@ mod tests { false, )]); - assert!(existing_schema.can_read_as(&read_schema).is_err()); + assert!(matches!( + existing_schema.can_read_as(&read_schema), + Err(Error::NullabilityTightening) + )); } #[test] fn different_field_name_case_fails() { @@ -235,7 +270,10 @@ mod tests { StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, true), ]); - assert!(existing_schema.can_read_as(&read_schema).is_err()); + assert!(matches!( + existing_schema.can_read_as(&read_schema), + Err(Error::FieldNameMismatch) + )); } #[test] fn different_type_fails() { @@ -249,7 +287,10 @@ mod tests { StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, true), ]); - assert!(existing_schema.can_read_as(&read_schema).is_err()); + assert!(matches!( + existing_schema.can_read_as(&read_schema), + Err(Error::TypeMismatch) + )); } #[test] fn set_nullable_to_true() { @@ -277,40 +318,57 @@ mod tests { StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, false), ]); - assert!(existing_schema.can_read_as(&read_schema).is_err()); + assert!(matches!( + existing_schema.can_read_as(&read_schema), + Err(Error::NullabilityTightening) + )); } #[test] - fn new_nullable_column() { - let existing_schema = StructType::new([ + fn differ_by_nullable_column() { + let a = StructType::new([ StructField::new("id", DataType::LONG, false), StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, true), ]); - let read_schema = StructType::new([ + let b = StructType::new([ StructField::new("id", DataType::LONG, false), StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, true), StructField::new("location", DataType::STRING, true), ]); - assert!(existing_schema.can_read_as(&read_schema).is_ok()); + + // Read `a` as `b`. `b` adds a new nullable column. This is compatible with `a`'s schema. + assert!(a.can_read_as(&b).is_ok()); + + // Read `b` as `a`. `a` is missing a column that is present in `b`. + assert!(matches!(b.can_read_as(&a), Err(Error::MissingColumn))); } #[test] - fn new_non_nullable_column_fails() { - let existing_schema = StructType::new([ + fn differ_by_non_nullable_column() { + let a = StructType::new([ StructField::new("id", DataType::LONG, false), StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, true), ]); - let read_schema = StructType::new([ + let b = StructType::new([ StructField::new("id", DataType::LONG, false), StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, true), StructField::new("location", DataType::STRING, false), ]); - assert!(existing_schema.can_read_as(&read_schema).is_err()); + + // Read `a` as `b`. `b` has an extra non-nullable column. + assert!(matches!( + a.can_read_as(&b), + Err(Error::NewNonNullableColumn) + )); + + // Read `b` as `a`. `a` is missing a column that is present in `b`. + assert!(matches!(b.can_read_as(&a), Err(Error::MissingColumn))); } + #[test] fn duplicate_field_modulo_case() { let existing_schema = StructType::new([ @@ -326,6 +384,15 @@ mod tests { StructField::new("name", DataType::STRING, false), StructField::new("age", DataType::INTEGER, true), ]); - assert!(existing_schema.can_read_as(&read_schema).is_err()); + assert!(matches!( + existing_schema.can_read_as(&read_schema), + Err(Error::InvalidSchema) + )); + + // Checks in the inverse order + assert!(matches!( + read_schema.can_read_as(&existing_schema), + Err(Error::InvalidSchema) + )); } } From 8cd9e09357f965eeb3f7fe469e22c9b1944d259d Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 21 Jan 2025 16:20:43 -0800 Subject: [PATCH 15/16] address pr comments --- .../schema/{schema_compare.rs => compare.rs} | 41 ++++++++++--------- kernel/src/schema/mod.rs | 2 +- 2 files changed, 23 insertions(+), 20 deletions(-) rename kernel/src/schema/{schema_compare.rs => compare.rs} (93%) diff --git a/kernel/src/schema/schema_compare.rs b/kernel/src/schema/compare.rs similarity index 93% rename from kernel/src/schema/schema_compare.rs rename to kernel/src/schema/compare.rs index 48e7d0b1c..150e1aa08 100644 --- a/kernel/src/schema/schema_compare.rs +++ b/kernel/src/schema/compare.rs @@ -15,7 +15,7 @@ //! StructField::new("value", DataType::STRING, true), //! StructField::new("year", DataType::INTEGER, true), //! ]); -//! // Schemas are compatible since the `year` value is nullable +//! // Schemas are compatible since the `read_schema` adds a nullable column `year` //! assert!(schema.can_read_as(&read_schema).is_ok()); //! ```` //! @@ -26,6 +26,12 @@ use crate::utils::require; use super::{DataType, StructField, StructType}; +/// The nullability flag of a schema's field. This can be compared with a read schema field's +/// nullability flag using [`Nullable::can_read_as`]. +#[allow(unused)] +#[derive(Clone, Copy)] +pub(crate) struct Nullable(bool); + /// Represents the ways a schema comparison can fail. #[derive(Debug, thiserror::Error)] pub(crate) enum Error { @@ -49,20 +55,17 @@ pub(crate) type SchemaComparisonResult = Result<(), Error>; /// Represents a schema compatibility check for the type. If `self` can be read as `read_type`, /// this function returns `Ok(())`. Otherwise, this function returns `Err`. +/// +/// TODO (Oussama): Remove the `allow(unsued)` once this is used in CDF. #[allow(unused)] pub(crate) trait SchemaComparison { fn can_read_as(&self, read_type: &Self) -> SchemaComparisonResult; } -/// The nullability flag of a schema's field. This can be compared with a read schema field's -/// nullability flag using [`NullabilityFlag::can_read_as`]. -#[allow(unused)] -pub(crate) struct NullabilityFlag(bool); - -impl SchemaComparison for NullabilityFlag { +impl SchemaComparison for Nullable { /// Represents a nullability comparison between two schemas' fields. Returns true if the /// read nullability is the same or wider than the nullability of self. - fn can_read_as(&self, read_nullable: &NullabilityFlag) -> SchemaComparisonResult { + fn can_read_as(&self, read_nullable: &Nullable) -> SchemaComparisonResult { // The case to avoid is when the column is nullable, but the read schema specifies the // column as non-nullable. So we avoid the case where !read_nullable && nullable // Hence we check that !(!read_nullable && existing_nullable) @@ -79,7 +82,7 @@ impl SchemaComparison for StructField { /// 2. The both this field and `read_field` must have the same name. /// 3. You can read this data type as the `read_field`'s data type. fn can_read_as(&self, read_field: &Self) -> SchemaComparisonResult { - NullabilityFlag(self.nullable).can_read_as(&NullabilityFlag(read_field.nullable))?; + Nullable(self.nullable).can_read_as(&Nullable(read_field.nullable))?; require!(self.name() == read_field.name(), Error::FieldNameMismatch); self.data_type().can_read_as(read_field.data_type())?; Ok(()) @@ -113,9 +116,9 @@ impl SchemaComparison for StructType { ); // Check that the field names are a subset of the read fields. - if !lowercase_field_map + if lowercase_field_map .keys() - .all(|name| lowercase_read_field_names.contains(name)) + .any(|name| !lowercase_read_field_names.contains(name)) { return Err(Error::MissingColumn); } @@ -123,8 +126,8 @@ impl SchemaComparison for StructType { match lowercase_field_map.get(&read_field.name().to_lowercase()) { Some(existing_field) => existing_field.can_read_as(read_field)?, None => { - // Note: Delta spark does not perform the following check. Hence it ignores fields - // that exist in the read schema that aren't in this schema. + // Note: Delta spark does not perform the following check. Hence it ignores + // non-null fields that exist in the read schema that aren't in this schema. require!(read_field.is_nullable(), Error::NewNonNullableColumn); } } @@ -139,14 +142,14 @@ impl SchemaComparison for DataType { /// compatible data types with type widening. See issue [`#623`] /// 2. For complex data types, the nested types must be compatible as defined by [`SchemaComparison`] /// 3. For array data types, the nullability may not be tightened in the `read_type`. See - /// [`NullabilityFlag::can_read_as`] + /// [`Nullable::can_read_as`] /// /// [`#623`]: fn can_read_as(&self, read_type: &Self) -> SchemaComparisonResult { match (self, read_type) { (Self::Array(self_array), Self::Array(read_array)) => { - NullabilityFlag(self_array.contains_null()) - .can_read_as(&NullabilityFlag(read_array.contains_null()))?; + Nullable(self_array.contains_null()) + .can_read_as(&Nullable(read_array.contains_null()))?; self_array .element_type() .can_read_as(read_array.element_type())?; @@ -155,8 +158,8 @@ impl SchemaComparison for DataType { self_struct.can_read_as(read_struct)? } (Self::Map(self_map), Self::Map(read_map)) => { - NullabilityFlag(self_map.value_contains_null()) - .can_read_as(&NullabilityFlag(read_map.value_contains_null()))?; + Nullable(self_map.value_contains_null()) + .can_read_as(&Nullable(read_map.value_contains_null()))?; self_map.key_type().can_read_as(read_map.key_type())?; self_map.value_type().can_read_as(read_map.value_type())?; } @@ -172,7 +175,7 @@ impl SchemaComparison for DataType { #[cfg(test)] mod tests { - use crate::schema::schema_compare::{Error, SchemaComparison}; + use crate::schema::compare::{Error, SchemaComparison}; use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; #[test] diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index a1e6250a2..6086a7031 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -14,7 +14,7 @@ pub(crate) use crate::expressions::{column_name, ColumnName}; use crate::utils::require; use crate::{DeltaResult, Error}; -pub(crate) mod schema_compare; +pub(crate) mod compare; pub type Schema = StructType; pub type SchemaRef = Arc; From b53b68f49689ff6fdbe704a1634b554022494b58 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 23 Jan 2025 13:43:27 -0800 Subject: [PATCH 16/16] fix small bits --- kernel/src/schema/compare.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kernel/src/schema/compare.rs b/kernel/src/schema/compare.rs index 150e1aa08..e465f1618 100644 --- a/kernel/src/schema/compare.rs +++ b/kernel/src/schema/compare.rs @@ -1,5 +1,5 @@ //! Provides utilities to perform comparisons between a [`Schema`]s. The api used to check schema -//! compatibility is [`can_read_as`]. +//! compatibility is [`can_read_as`] that is exposed through the [`SchemaComparison`] trait. //! //! # Examples //! ```rust, ignore @@ -35,7 +35,7 @@ pub(crate) struct Nullable(bool); /// Represents the ways a schema comparison can fail. #[derive(Debug, thiserror::Error)] pub(crate) enum Error { - #[error("The nullability was tightened for a schema field")] + #[error("The nullability was tightened for a field")] NullabilityTightening, #[error("Field names do not match")] FieldNameMismatch,