Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi committed Jan 21, 2025
1 parent e7768b4 commit d0f3ebc
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 65 deletions.
1 change: 0 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ pub mod table_properties;
pub mod transaction;

pub(crate) mod predicates;
pub(crate) mod schema_compat;
pub(crate) mod utils;

#[cfg(feature = "developer-visibility")]
Expand Down
2 changes: 2 additions & 0 deletions kernel/src/schema.rs → kernel/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub(crate) use crate::expressions::{column_name, ColumnName};
use crate::utils::require;
use crate::{DeltaResult, Error};

pub(crate) mod schema_compare;

pub type Schema = StructType;
pub type SchemaRef = Arc<StructType>;

Expand Down
195 changes: 131 additions & 64 deletions kernel/src/schema_compat.rs → kernel/src/schema/schema_compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,104 +15,138 @@
//! StructField::new("value", DataType::STRING, true),
//! StructField::new("year", DataType::INTEGER, true),
//! ]);
//! // Schemas are compatible since the `year` value is nullable
//! assert!(schema.can_read_as(&read_schema).is_ok());
//! ````
//!
//! [`Schema`]: crate::schema::Schema
use std::collections::{HashMap, HashSet};

use crate::utils::require;
use crate::{DeltaResult, Error};

use super::{DataType, StructField, StructType};

/// Represents the ways a schema comparison can fail.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error("The nullability was tightened for a schema field")]
NullabilityTightening,
#[error("Field names do not match")]
FieldNameMismatch,
#[error("Schema is invalid")]
InvalidSchema,
#[error("The read schema is missing a column present in the schema")]
MissingColumn,
#[error("Read schema has a non-nullable column that is not present in the schema")]
NewNonNullableColumn,
#[error("Types for two schema fields did not match")]
TypeMismatch,
}

/// A [`std::result::Result`] that has the schema comparison [`Error`] as the error variant.
#[allow(unused)]
pub(crate) type SchemaComparisonResult = Result<(), Error>;

/// Represents a schema compatibility check for the type. If `self` can be read as `read_type`,
/// this function returns `Ok(())`. Otherwise, this function returns `Err`.
#[allow(unused)]
pub(crate) trait SchemaComparison {
fn can_read_as(&self, read_type: &Self) -> SchemaComparisonResult;
}

/// The nullability flag of a schema's field. This can be compared with a read schema field's
/// nullability flag using [`NullabilityFlag::can_read_as`].
struct NullabilityFlag(bool);
#[allow(unused)]
pub(crate) struct NullabilityFlag(bool);

impl NullabilityFlag {
impl SchemaComparison for NullabilityFlag {
/// Represents a nullability comparison between two schemas' fields. Returns true if the
/// read nullability is the same or wider than the nullability of self.
fn can_read_as(&self, read_nullable: NullabilityFlag) -> DeltaResult<()> {
fn can_read_as(&self, read_nullable: &NullabilityFlag) -> SchemaComparisonResult {
// The case to avoid is when the column is nullable, but the read schema specifies the
// column as non-nullable. So we avoid the case where !read_nullable && nullable
// Hence we check that !(!read_nullable && existing_nullable)
// == read_nullable || !existing_nullable
require!(
read_nullable.0 || !self.0,
Error::generic("Read field is non-nullable while this field is nullable")
);
require!(read_nullable.0 || !self.0, Error::NullabilityTightening);
Ok(())
}
}

impl crate::schema::StructField {
/// Returns `Ok` if this [`StructField`] can be read as `read_field` in the read schema.
fn can_read_as(&self, read_field: &Self) -> DeltaResult<()> {
NullabilityFlag(self.nullable).can_read_as(NullabilityFlag(read_field.nullable))?;
require!(
self.name() == read_field.name(),
Error::generic(format!(
"Struct field with name {} cannot be read with name {}",
self.name(),
read_field.name()
))
);
impl SchemaComparison for StructField {
/// Returns `Ok` if this [`StructField`] can be read as `read_field`. Three requirements must
/// be satisfied:
/// 1. The read schema field mustn't be non-nullable if this [`StructField`] is nullable.
/// 2. The both this field and `read_field` must have the same name.
/// 3. You can read this data type as the `read_field`'s data type.
fn can_read_as(&self, read_field: &Self) -> SchemaComparisonResult {
NullabilityFlag(self.nullable).can_read_as(&NullabilityFlag(read_field.nullable))?;
require!(self.name() == read_field.name(), Error::FieldNameMismatch);
self.data_type().can_read_as(read_field.data_type())?;
Ok(())
}
}
impl crate::schema::StructType {
/// Returns `Ok` if this [`StructType`] can be read as `read_type` in the read schema.
#[allow(unused)]
pub(crate) fn can_read_as(&self, read_type: &Self) -> DeltaResult<()> {
let field_map: HashMap<String, &crate::schema::StructField> = self
impl SchemaComparison for StructType {
/// Returns `Ok` if this [`StructType`] can be read as `read_type`. This is the case when:
/// 1. The set of fields in this struct type are a subset of the `read_type`.
/// 2. For each field in this struct, you can read it as the `read_type`'s field. See
/// [`StructField::can_read_as`].
/// 3. If a field in `read_type` is not present in this struct, then it must be nullable.
/// 4. Both [`StructTypes`] must be valid schemas. No two fields of a structs may share a
/// name that only differs by case. TODO: This check should be moved into the constructor
/// for [`StructType`].
fn can_read_as(&self, read_type: &Self) -> SchemaComparisonResult {
let lowercase_field_map: HashMap<String, &StructField> = self
.fields
.iter()
.map(|(name, field)| (name.to_lowercase(), field))
.collect();
require!(
field_map.len() == self.fields.len(),
Error::generic("Delta tables don't allow field names that only differ by case")
lowercase_field_map.len() == self.fields.len(),
Error::InvalidSchema
);

let read_field_names: HashSet<String> =
let lowercase_read_field_names: HashSet<String> =
read_type.fields.keys().map(|x| x.to_lowercase()).collect();
require!(
read_field_names.len() == read_type.fields.len(),
Error::generic("Delta tables don't allow field names that only differ by case")
lowercase_read_field_names.len() == read_type.fields.len(),
Error::InvalidSchema
);

// Check that the field names are a subset of the read fields.
if !field_map.keys().all(|name| read_field_names.contains(name)) {
return Err(Error::generic(
"Struct has column that does not exist in the read schema",
));
if !lowercase_field_map
.keys()
.all(|name| lowercase_read_field_names.contains(name))
{
return Err(Error::MissingColumn);
}
for read_field in read_type.fields() {
match field_map.get(&read_field.name().to_lowercase()) {
match lowercase_field_map.get(&read_field.name().to_lowercase()) {
Some(existing_field) => existing_field.can_read_as(read_field)?,
None => {
// Note: Delta spark does not perform the following check. Hence it ignores fields
// that exist in the read schema that aren't in this schema.
require!(
read_field.is_nullable(),
Error::generic(
"read type has non-nullable column that does not exist in this struct",
)
);
require!(read_field.is_nullable(), Error::NewNonNullableColumn);
}
}
}
Ok(())
}
}

impl crate::schema::DataType {
/// Returns `Ok` if this [`DataType`] can be read as `read_type` in the read schema.
fn can_read_as(&self, read_type: &Self) -> DeltaResult<()> {
impl SchemaComparison for DataType {
/// Returns `Ok` if this [`DataType`] can be read as `read_type`. This is the case when:
/// 1. The data types are the same. Note: This condition will be relaxed to include
/// compatible data types with type widening. See issue [`#623`]
/// 2. For complex data types, the nested types must be compatible as defined by [`SchemaComparison`]
/// 3. For array data types, the nullability may not be tightened in the `read_type`. See
/// [`NullabilityFlag::can_read_as`]
///
/// [`#623`]: <https://github.com/delta-io/delta-kernel-rs/issues/623>
fn can_read_as(&self, read_type: &Self) -> SchemaComparisonResult {
match (self, read_type) {
(Self::Array(self_array), Self::Array(read_array)) => {
NullabilityFlag(self_array.contains_null())
.can_read_as(NullabilityFlag(read_array.contains_null()))?;
.can_read_as(&NullabilityFlag(read_array.contains_null()))?;
self_array
.element_type()
.can_read_as(read_array.element_type())?;
Expand All @@ -122,17 +156,14 @@ impl crate::schema::DataType {
}
(Self::Map(self_map), Self::Map(read_map)) => {
NullabilityFlag(self_map.value_contains_null())
.can_read_as(NullabilityFlag(read_map.value_contains_null()))?;
.can_read_as(&NullabilityFlag(read_map.value_contains_null()))?;
self_map.key_type().can_read_as(read_map.key_type())?;
self_map.value_type().can_read_as(read_map.value_type())?;
}
(a, b) => {
// TODO: In the future, we will change this to support type widening.
// See: https://github.com/delta-io/delta-kernel-rs/issues/623
require!(
a == b,
Error::generic(format!("Types {} and {} are not compatible", a, b))
);
// See: #623
require!(a == b, Error::TypeMismatch);
}
};
Ok(())
Expand All @@ -141,6 +172,7 @@ impl crate::schema::DataType {

#[cfg(test)]
mod tests {
use crate::schema::schema_compare::{Error, SchemaComparison};
use crate::schema::{ArrayType, DataType, MapType, StructField, StructType};

#[test]
Expand Down Expand Up @@ -220,7 +252,10 @@ mod tests {
false,
)]);

assert!(existing_schema.can_read_as(&read_schema).is_err());
assert!(matches!(
existing_schema.can_read_as(&read_schema),
Err(Error::NullabilityTightening)
));
}
#[test]
fn different_field_name_case_fails() {
Expand All @@ -235,7 +270,10 @@ mod tests {
StructField::new("name", DataType::STRING, false),
StructField::new("age", DataType::INTEGER, true),
]);
assert!(existing_schema.can_read_as(&read_schema).is_err());
assert!(matches!(
existing_schema.can_read_as(&read_schema),
Err(Error::FieldNameMismatch)
));
}
#[test]
fn different_type_fails() {
Expand All @@ -249,7 +287,10 @@ mod tests {
StructField::new("name", DataType::STRING, false),
StructField::new("age", DataType::INTEGER, true),
]);
assert!(existing_schema.can_read_as(&read_schema).is_err());
assert!(matches!(
existing_schema.can_read_as(&read_schema),
Err(Error::TypeMismatch)
));
}
#[test]
fn set_nullable_to_true() {
Expand Down Expand Up @@ -277,40 +318,57 @@ mod tests {
StructField::new("name", DataType::STRING, false),
StructField::new("age", DataType::INTEGER, false),
]);
assert!(existing_schema.can_read_as(&read_schema).is_err());
assert!(matches!(
existing_schema.can_read_as(&read_schema),
Err(Error::NullabilityTightening)
));
}
#[test]
fn new_nullable_column() {
let existing_schema = StructType::new([
fn differ_by_nullable_column() {
let a = StructType::new([
StructField::new("id", DataType::LONG, false),
StructField::new("name", DataType::STRING, false),
StructField::new("age", DataType::INTEGER, true),
]);

let read_schema = StructType::new([
let b = StructType::new([
StructField::new("id", DataType::LONG, false),
StructField::new("name", DataType::STRING, false),
StructField::new("age", DataType::INTEGER, true),
StructField::new("location", DataType::STRING, true),
]);
assert!(existing_schema.can_read_as(&read_schema).is_ok());

// Read `a` as `b`. `b` adds a new nullable column. This is compatible with `a`'s schema.
assert!(a.can_read_as(&b).is_ok());

// Read `b` as `a`. `a` is missing a column that is present in `b`.
assert!(matches!(b.can_read_as(&a), Err(Error::MissingColumn)));
}
#[test]
fn new_non_nullable_column_fails() {
let existing_schema = StructType::new([
fn differ_by_non_nullable_column() {
let a = StructType::new([
StructField::new("id", DataType::LONG, false),
StructField::new("name", DataType::STRING, false),
StructField::new("age", DataType::INTEGER, true),
]);

let read_schema = StructType::new([
let b = StructType::new([
StructField::new("id", DataType::LONG, false),
StructField::new("name", DataType::STRING, false),
StructField::new("age", DataType::INTEGER, true),
StructField::new("location", DataType::STRING, false),
]);
assert!(existing_schema.can_read_as(&read_schema).is_err());

// Read `a` as `b`. `b` has an extra non-nullable column.
assert!(matches!(
a.can_read_as(&b),
Err(Error::NewNonNullableColumn)
));

// Read `b` as `a`. `a` is missing a column that is present in `b`.
assert!(matches!(b.can_read_as(&a), Err(Error::MissingColumn)));
}

#[test]
fn duplicate_field_modulo_case() {
let existing_schema = StructType::new([
Expand All @@ -326,6 +384,15 @@ mod tests {
StructField::new("name", DataType::STRING, false),
StructField::new("age", DataType::INTEGER, true),
]);
assert!(existing_schema.can_read_as(&read_schema).is_err());
assert!(matches!(
existing_schema.can_read_as(&read_schema),
Err(Error::InvalidSchema)
));

// Checks in the inverse order
assert!(matches!(
read_schema.can_read_as(&existing_schema),
Err(Error::InvalidSchema)
));
}
}

0 comments on commit d0f3ebc

Please sign in to comment.