diff --git a/ffi/src/test_ffi.rs b/ffi/src/test_ffi.rs index 14eec1b86..55456d7e5 100644 --- a/ffi/src/test_ffi.rs +++ b/ffi/src/test_ffi.rs @@ -25,18 +25,17 @@ pub unsafe extern "C" fn get_testing_kernel_expression() -> Handle arrow::datatypes::Schema { fn create_kernel_schema() -> delta_kernel::schema::Schema { use delta_kernel::schema::{DataType, Schema, StructField}; - let field_a = StructField::new("a", DataType::LONG, false); - let field_b = StructField::new("b", DataType::BOOLEAN, false); + let field_a = StructField::not_null("a", DataType::LONG); + let field_b = StructField::not_null("b", DataType::BOOLEAN); Schema::new(vec![field_a, field_b]) } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index d03abc616..2352e0db7 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -524,38 +524,30 @@ mod tests { .project(&[METADATA_NAME]) .expect("Couldn't get metaData field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "metaData", StructType::new([ - StructField::new("id", DataType::STRING, false), - StructField::new("name", DataType::STRING, true), - StructField::new("description", DataType::STRING, true), - StructField::new( + StructField::not_null("id", DataType::STRING), + StructField::nullable("name", DataType::STRING), + StructField::nullable("description", DataType::STRING), + StructField::not_null( "format", StructType::new([ - StructField::new("provider", DataType::STRING, false), - StructField::new( + StructField::not_null("provider", DataType::STRING), + StructField::not_null( "options", MapType::new(DataType::STRING, DataType::STRING, false), - false, ), ]), - false, ), - StructField::new("schemaString", DataType::STRING, false), - StructField::new( - "partitionColumns", - ArrayType::new(DataType::STRING, false), - false, - ), - StructField::new("createdTime", DataType::LONG, true), - StructField::new( + StructField::not_null("schemaString", DataType::STRING), + StructField::not_null("partitionColumns", ArrayType::new(DataType::STRING, false)), + StructField::nullable("createdTime", DataType::LONG), + StructField::not_null( "configuration", MapType::new(DataType::STRING, DataType::STRING, false), - false, ), ]), - true, )])); assert_eq!(schema, expected); } @@ -566,61 +558,55 @@ mod tests { .project(&[ADD_NAME]) .expect("Couldn't get add field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "add", StructType::new([ - StructField::new("path", DataType::STRING, false), - StructField::new( + StructField::not_null("path", DataType::STRING), + StructField::not_null( "partitionValues", MapType::new(DataType::STRING, DataType::STRING, true), - false, ), - StructField::new("size", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("stats", DataType::STRING, true), - StructField::new( + StructField::not_null("size", DataType::LONG), + StructField::not_null("modificationTime", DataType::LONG), + StructField::not_null("dataChange", DataType::BOOLEAN), + StructField::nullable("stats", DataType::STRING), + StructField::nullable( "tags", MapType::new(DataType::STRING, DataType::STRING, false), - true, ), deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - StructField::new("clusteringProvider", DataType::STRING, true), + StructField::nullable("baseRowId", DataType::LONG), + StructField::nullable("defaultRowCommitVersion", DataType::LONG), + StructField::nullable("clusteringProvider", DataType::STRING), ]), - true, )])); assert_eq!(schema, expected); } fn tags_field() -> StructField { - StructField::new( + StructField::nullable( "tags", MapType::new(DataType::STRING, DataType::STRING, false), - true, ) } fn partition_values_field() -> StructField { - StructField::new( + StructField::nullable( "partitionValues", MapType::new(DataType::STRING, DataType::STRING, false), - true, ) } fn deletion_vector_field() -> StructField { - StructField::new( + StructField::nullable( "deletionVector", DataType::struct_type([ - StructField::new("storageType", DataType::STRING, false), - StructField::new("pathOrInlineDv", DataType::STRING, false), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, false), - StructField::new("cardinality", DataType::LONG, false), + StructField::not_null("storageType", DataType::STRING), + StructField::not_null("pathOrInlineDv", DataType::STRING), + StructField::nullable("offset", DataType::INTEGER), + StructField::not_null("sizeInBytes", DataType::INTEGER), + StructField::not_null("cardinality", DataType::LONG), ]), - true, ) } @@ -629,21 +615,20 @@ mod tests { let schema = get_log_schema() .project(&[REMOVE_NAME]) .expect("Couldn't get remove field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "remove", StructType::new([ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), + StructField::not_null("path", DataType::STRING), + StructField::nullable("deletionTimestamp", DataType::LONG), + StructField::not_null("dataChange", DataType::BOOLEAN), + StructField::nullable("extendedFileMetadata", DataType::BOOLEAN), partition_values_field(), - StructField::new("size", DataType::LONG, true), + StructField::nullable("size", DataType::LONG), tags_field(), deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), + StructField::nullable("baseRowId", DataType::LONG), + StructField::nullable("defaultRowCommitVersion", DataType::LONG), ]), - true, )])); assert_eq!(schema, expected); } @@ -653,20 +638,18 @@ mod tests { let schema = get_log_schema() .project(&[CDC_NAME]) .expect("Couldn't get remove field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "cdc", StructType::new([ - StructField::new("path", DataType::STRING, false), - StructField::new( + StructField::not_null("path", DataType::STRING), + StructField::not_null( "partitionValues", MapType::new(DataType::STRING, DataType::STRING, true), - false, ), - StructField::new("size", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), + StructField::not_null("size", DataType::LONG), + StructField::not_null("dataChange", DataType::BOOLEAN), tags_field(), ]), - true, )])); assert_eq!(schema, expected); } @@ -677,14 +660,13 @@ mod tests { .project(&["txn"]) .expect("Couldn't get transaction field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "txn", StructType::new([ - StructField::new("appId", DataType::STRING, false), - StructField::new("version", DataType::LONG, false), - StructField::new("lastUpdated", DataType::LONG, true), + StructField::not_null("appId", DataType::STRING), + StructField::not_null("version", DataType::LONG), + StructField::nullable("lastUpdated", DataType::LONG), ]), - true, )])); assert_eq!(schema, expected); } @@ -695,25 +677,22 @@ mod tests { .project(&["commitInfo"]) .expect("Couldn't get commitInfo field"); - let expected = Arc::new(StructType::new(vec![StructField::new( + let expected = Arc::new(StructType::new(vec![StructField::nullable( "commitInfo", StructType::new(vec![ - StructField::new("timestamp", DataType::LONG, true), - StructField::new("inCommitTimestamp", DataType::LONG, true), - StructField::new("operation", DataType::STRING, true), - StructField::new( + StructField::nullable("timestamp", DataType::LONG), + StructField::nullable("inCommitTimestamp", DataType::LONG), + StructField::nullable("operation", DataType::STRING), + StructField::nullable( "operationParameters", MapType::new(DataType::STRING, DataType::STRING, false), - true, ), - StructField::new("kernelVersion", DataType::STRING, true), - StructField::new( + StructField::nullable("kernelVersion", DataType::STRING), + StructField::nullable( "engineCommitInfo", MapType::new(DataType::STRING, DataType::STRING, false), - true, ), ]), - true, )])); assert_eq!(schema, expected); } diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index 0588c04d4..aa3b3e47b 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -84,20 +84,20 @@ pub(crate) trait GetNullableContainerStructField { // nullable values impl GetNullableContainerStructField for T { fn get_nullable_container_struct_field(name: impl Into) -> StructField { - StructField::new(name, T::to_nullable_container_type(), false) + StructField::not_null(name, T::to_nullable_container_type()) } } // Normal types produce non-nullable fields impl GetStructField for T { fn get_struct_field(name: impl Into) -> StructField { - StructField::new(name, T::to_data_type(), false) + StructField::not_null(name, T::to_data_type()) } } // Option types produce nullable fields impl GetStructField for Option { fn get_struct_field(name: impl Into) -> StructField { - StructField::new(name, T::to_data_type(), true) + StructField::nullable(name, T::to_data_type()) } } diff --git a/kernel/src/engine/arrow_conversion.rs b/kernel/src/engine/arrow_conversion.rs index fbfdb487a..0b905ff3a 100644 --- a/kernel/src/engine/arrow_conversion.rs +++ b/kernel/src/engine/arrow_conversion.rs @@ -263,8 +263,7 @@ mod tests { fn test_metadata_string_conversion() -> DeltaResult<()> { let mut metadata = HashMap::new(); metadata.insert("description", "hello world".to_owned()); - let struct_field = - StructField::new("name", DataType::STRING, false).with_metadata(metadata); + let struct_field = StructField::not_null("name", DataType::STRING).with_metadata(metadata); let arrow_field = ArrowField::try_from(&struct_field)?; let new_metadata = arrow_field.metadata(); diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index a3e184574..06441b9d4 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -763,9 +763,9 @@ mod tests { #[test] fn simple_mask_indices() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::STRING, true), - StructField::new("i2", DataType::INTEGER, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::STRING), + StructField::nullable("i2", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -787,8 +787,8 @@ mod tests { #[test] fn ensure_data_types_fails_correctly() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::INTEGER, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -798,8 +798,8 @@ mod tests { assert!(res.is_err()); let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::STRING, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::STRING), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -811,10 +811,9 @@ mod tests { #[test] fn mask_with_map() { - let requested_schema = Arc::new(StructType::new([StructField::new( + let requested_schema = Arc::new(StructType::new([StructField::not_null( "map", MapType::new(DataType::INTEGER, DataType::STRING, false), - false, )])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new_map( "map", @@ -835,9 +834,9 @@ mod tests { #[test] fn simple_reorder_indices() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::STRING, true), - StructField::new("i2", DataType::INTEGER, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::STRING), + StructField::nullable("i2", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i2", ArrowDataType::Int32, true), @@ -859,9 +858,9 @@ mod tests { #[test] fn simple_nullable_field_missing() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::STRING, true), - StructField::new("i2", DataType::INTEGER, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::STRING), + StructField::nullable("i2", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -882,16 +881,15 @@ mod tests { #[test] fn nested_indices() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "nested", StructType::new([ - StructField::new("int32", DataType::INTEGER, false), - StructField::new("string", DataType::STRING, false), + StructField::not_null("int32", DataType::INTEGER), + StructField::not_null("string", DataType::STRING), ]), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = nested_parquet_schema(); let (mask_indices, reorder_indices) = @@ -912,16 +910,15 @@ mod tests { #[test] fn nested_indices_reorder() { let requested_schema = Arc::new(StructType::new([ - StructField::new( + StructField::not_null( "nested", StructType::new([ - StructField::new("string", DataType::STRING, false), - StructField::new("int32", DataType::INTEGER, false), + StructField::not_null("string", DataType::STRING), + StructField::not_null("int32", DataType::INTEGER), ]), - false, ), - StructField::new("j", DataType::INTEGER, false), - StructField::new("i", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), + StructField::not_null("i", DataType::INTEGER), ])); let parquet_schema = nested_parquet_schema(); let (mask_indices, reorder_indices) = @@ -942,13 +939,12 @@ mod tests { #[test] fn nested_indices_mask_inner() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "nested", - StructType::new([StructField::new("int32", DataType::INTEGER, false)]), - false, + StructType::new([StructField::not_null("int32", DataType::INTEGER)]), ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = nested_parquet_schema(); let (mask_indices, reorder_indices) = @@ -966,9 +962,9 @@ mod tests { #[test] fn simple_list_mask() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("list", ArrayType::new(DataType::INTEGER, false), false), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("i", DataType::INTEGER), + StructField::not_null("list", ArrayType::new(DataType::INTEGER, false)), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -997,10 +993,9 @@ mod tests { #[test] fn list_skip_earlier_element() { - let requested_schema = Arc::new(StructType::new([StructField::new( + let requested_schema = Arc::new(StructType::new([StructField::not_null( "list", ArrayType::new(DataType::INTEGER, false), - false, )])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -1025,20 +1020,19 @@ mod tests { #[test] fn nested_indices_list() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "list", ArrayType::new( StructType::new([ - StructField::new("int32", DataType::INTEGER, false), - StructField::new("string", DataType::STRING, false), + StructField::not_null("int32", DataType::INTEGER), + StructField::not_null("string", DataType::STRING), ]) .into(), false, ), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -1077,8 +1071,8 @@ mod tests { #[test] fn nested_indices_unselected_list() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("i", DataType::INTEGER), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -1110,16 +1104,15 @@ mod tests { #[test] fn nested_indices_list_mask_inner() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "list", ArrayType::new( - StructType::new([StructField::new("int32", DataType::INTEGER, false)]).into(), + StructType::new([StructField::not_null("int32", DataType::INTEGER)]).into(), false, ), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -1155,20 +1148,19 @@ mod tests { #[test] fn nested_indices_list_mask_inner_reorder() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "list", ArrayType::new( StructType::new([ - StructField::new("string", DataType::STRING, false), - StructField::new("int2", DataType::INTEGER, false), + StructField::not_null("string", DataType::STRING), + StructField::not_null("int2", DataType::INTEGER), ]) .into(), false, ), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), // field 0 @@ -1208,16 +1200,15 @@ mod tests { #[test] fn skipped_struct() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "nested", StructType::new([ - StructField::new("int32", DataType::INTEGER, false), - StructField::new("string", DataType::STRING, false), + StructField::not_null("int32", DataType::INTEGER), + StructField::not_null("string", DataType::STRING), ]), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new( @@ -1386,8 +1377,8 @@ mod tests { #[test] fn no_matches() { let requested_schema = Arc::new(StructType::new([ - StructField::new("s", DataType::STRING, true), - StructField::new("i2", DataType::INTEGER, true), + StructField::nullable("s", DataType::STRING), + StructField::nullable("i2", DataType::INTEGER), ])); let nots_field = ArrowField::new("NOTs", ArrowDataType::Utf8, true); let noti2_field = ArrowField::new("NOTi2", ArrowDataType::Int32, true); diff --git a/kernel/src/engine/ensure_data_types.rs b/kernel/src/engine/ensure_data_types.rs index 9b7ea7819..88ff01626 100644 --- a/kernel/src/engine/ensure_data_types.rs +++ b/kernel/src/engine/ensure_data_types.rs @@ -400,36 +400,33 @@ mod tests { #[test] fn ensure_struct() { - let schema = DataType::struct_type([StructField::new( + let schema = DataType::struct_type([StructField::nullable( "a", ArrayType::new( DataType::struct_type([ - StructField::new("w", DataType::LONG, true), - StructField::new("x", ArrayType::new(DataType::LONG, true), true), - StructField::new( + StructField::nullable("w", DataType::LONG), + StructField::nullable("x", ArrayType::new(DataType::LONG, true)), + StructField::nullable( "y", MapType::new(DataType::LONG, DataType::STRING, true), - true, ), - StructField::new( + StructField::nullable( "z", DataType::struct_type([ - StructField::new("n", DataType::LONG, true), - StructField::new("m", DataType::STRING, true), + StructField::nullable("n", DataType::LONG), + StructField::nullable("m", DataType::STRING), ]), - true, ), ]), true, ), - true, )]); let arrow_struct: ArrowDataType = (&schema).try_into().unwrap(); assert!(ensure_data_types(&schema, &arrow_struct, true).is_ok()); let kernel_simple = DataType::struct_type([ - StructField::new("w", DataType::LONG, true), - StructField::new("x", DataType::LONG, true), + StructField::nullable("w", DataType::LONG), + StructField::nullable("x", DataType::LONG), ]); let arrow_simple_ok = ArrowField::new_struct( diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index 847855d4a..b30711f48 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -59,7 +59,7 @@ impl DataSkippingFilter { physical_predicate: Option<(ExpressionRef, SchemaRef)>, ) -> Option { static PREDICATE_SCHEMA: LazyLock = LazyLock::new(|| { - DataType::struct_type([StructField::new("predicate", DataType::BOOLEAN, true)]) + DataType::struct_type([StructField::nullable("predicate", DataType::BOOLEAN)]) }); static STATS_EXPR: LazyLock = LazyLock::new(|| column_expr!("add.stats")); static FILTER_EXPR: LazyLock = @@ -82,10 +82,10 @@ impl DataSkippingFilter { .transform_struct(&referenced_schema)? .into_owned(); let stats_schema = Arc::new(StructType::new([ - StructField::new("numRecords", DataType::LONG, true), - StructField::new("nullCount", nullcount_schema, true), - StructField::new("minValues", referenced_schema.clone(), true), - StructField::new("maxValues", referenced_schema, true), + StructField::nullable("numRecords", DataType::LONG), + StructField::nullable("nullCount", nullcount_schema), + StructField::nullable("minValues", referenced_schema.clone()), + StructField::nullable("maxValues", referenced_schema), ])); // Skipping happens in several steps: diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index fb5c2b0fa..d7f83a4fa 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -162,21 +162,21 @@ pub(crate) static SCAN_ROW_SCHEMA: LazyLock> = LazyLock::new(|| // Note that fields projected out of a nullable struct must be nullable let partition_values = MapType::new(DataType::STRING, DataType::STRING, true); let file_constant_values = - StructType::new([StructField::new("partitionValues", partition_values, true)]); + StructType::new([StructField::nullable("partitionValues", partition_values)]); let deletion_vector = StructType::new([ - StructField::new("storageType", DataType::STRING, true), - StructField::new("pathOrInlineDv", DataType::STRING, true), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, true), - StructField::new("cardinality", DataType::LONG, true), + StructField::nullable("storageType", DataType::STRING), + StructField::nullable("pathOrInlineDv", DataType::STRING), + StructField::nullable("offset", DataType::INTEGER), + StructField::nullable("sizeInBytes", DataType::INTEGER), + StructField::nullable("cardinality", DataType::LONG), ]); Arc::new(StructType::new([ - StructField::new("path", DataType::STRING, true), - StructField::new("size", DataType::LONG, true), - StructField::new("modificationTime", DataType::LONG, true), - StructField::new("stats", DataType::STRING, true), - StructField::new("deletionVector", deletion_vector, true), - StructField::new("fileConstantValues", file_constant_values, true), + StructField::nullable("path", DataType::STRING), + StructField::nullable("size", DataType::LONG), + StructField::nullable("modificationTime", DataType::LONG), + StructField::nullable("stats", DataType::STRING), + StructField::nullable("deletionVector", deletion_vector), + StructField::nullable("fileConstantValues", file_constant_values), ])) }); diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index e0d345b56..cd17bca7d 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -820,34 +820,32 @@ mod tests { #[test] fn test_physical_predicate() { let logical_schema = StructType::new(vec![ - StructField::new("a", DataType::LONG, true), - StructField::new("b", DataType::LONG, true).with_metadata([( + StructField::nullable("a", DataType::LONG), + StructField::nullable("b", DataType::LONG).with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_b", )]), - StructField::new("phys_b", DataType::LONG, true).with_metadata([( + StructField::nullable("phys_b", DataType::LONG).with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_c", )]), - StructField::new( + StructField::nullable( "nested", StructType::new(vec![ - StructField::new("x", DataType::LONG, true), - StructField::new("y", DataType::LONG, true).with_metadata([( + StructField::nullable("x", DataType::LONG), + StructField::nullable("y", DataType::LONG).with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_y", )]), ]), - true, ), - StructField::new( + StructField::nullable( "mapped", - StructType::new(vec![StructField::new("n", DataType::LONG, true) + StructType::new(vec![StructField::nullable("n", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_n", )])]), - true, ) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), @@ -868,14 +866,14 @@ mod tests { column_expr!("a"), Some(PhysicalPredicate::Some( column_expr!("a").into(), - StructType::new(vec![StructField::new("a", DataType::LONG, true)]).into(), + StructType::new(vec![StructField::nullable("a", DataType::LONG)]).into(), )), ), ( column_expr!("b"), Some(PhysicalPredicate::Some( column_expr!("phys_b").into(), - StructType::new(vec![StructField::new("phys_b", DataType::LONG, true) + StructType::new(vec![StructField::nullable("phys_b", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_b", @@ -887,10 +885,9 @@ mod tests { column_expr!("nested.x"), Some(PhysicalPredicate::Some( column_expr!("nested.x").into(), - StructType::new(vec![StructField::new( + StructType::new(vec![StructField::nullable( "nested", - StructType::new(vec![StructField::new("x", DataType::LONG, true)]), - true, + StructType::new(vec![StructField::nullable("x", DataType::LONG)]), )]) .into(), )), @@ -899,14 +896,13 @@ mod tests { column_expr!("nested.y"), Some(PhysicalPredicate::Some( column_expr!("nested.phys_y").into(), - StructType::new(vec![StructField::new( + StructType::new(vec![StructField::nullable( "nested", - StructType::new(vec![StructField::new("phys_y", DataType::LONG, true) + StructType::new(vec![StructField::nullable("phys_y", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_y", )])]), - true, )]) .into(), )), @@ -915,14 +911,13 @@ mod tests { column_expr!("mapped.n"), Some(PhysicalPredicate::Some( column_expr!("phys_mapped.phys_n").into(), - StructType::new(vec![StructField::new( + StructType::new(vec![StructField::nullable( "phys_mapped", - StructType::new(vec![StructField::new("phys_n", DataType::LONG, true) + StructType::new(vec![StructField::nullable("phys_n", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_n", )])]), - true, ) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), @@ -935,14 +930,13 @@ mod tests { Expression::and(column_expr!("mapped.n"), true), Some(PhysicalPredicate::Some( Expression::and(column_expr!("phys_mapped.phys_n"), true).into(), - StructType::new(vec![StructField::new( + StructType::new(vec![StructField::nullable( "phys_mapped", - StructType::new(vec![StructField::new("phys_n", DataType::LONG, true) + StructType::new(vec![StructField::nullable("phys_n", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_n", )])]), - true, ) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), diff --git a/kernel/src/schema.rs b/kernel/src/schema.rs index d2ff65193..a4cd44a6a 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema.rs @@ -122,6 +122,16 @@ impl StructField { } } + /// Creates a new nullable field + pub fn nullable(name: impl Into, data_type: impl Into) -> Self { + Self::new(name, data_type, true) + } + + /// Creates a new non-nullable field + pub fn not_null(name: impl Into, data_type: impl Into) -> Self { + Self::new(name, data_type, false) + } + pub fn with_metadata( mut self, metadata: impl IntoIterator, impl Into)>, @@ -421,7 +431,7 @@ impl MapType { /// Create a schema assuming the map is stored as a struct with the specified key and value field names pub fn as_struct_schema(&self, key_name: String, val_name: String) -> Schema { StructType::new([ - StructField::new(key_name, self.key_type.clone(), false), + StructField::not_null(key_name, self.key_type.clone()), StructField::new(val_name, self.value_type.clone(), self.value_contains_null), ]) } @@ -1090,69 +1100,61 @@ mod tests { #[test] fn test_depth_checker() { let schema = DataType::struct_type([ - StructField::new( + StructField::nullable( "a", ArrayType::new( DataType::struct_type([ - StructField::new("w", DataType::LONG, true), - StructField::new("x", ArrayType::new(DataType::LONG, true), true), - StructField::new( + StructField::nullable("w", DataType::LONG), + StructField::nullable("x", ArrayType::new(DataType::LONG, true)), + StructField::nullable( "y", MapType::new(DataType::LONG, DataType::STRING, true), - true, ), - StructField::new( + StructField::nullable( "z", DataType::struct_type([ - StructField::new("n", DataType::LONG, true), - StructField::new("m", DataType::STRING, true), + StructField::nullable("n", DataType::LONG), + StructField::nullable("m", DataType::STRING), ]), - true, ), ]), true, ), - true, ), - StructField::new( + StructField::nullable( "b", DataType::struct_type([ - StructField::new("o", ArrayType::new(DataType::LONG, true), true), - StructField::new( + StructField::nullable("o", ArrayType::new(DataType::LONG, true)), + StructField::nullable( "p", MapType::new(DataType::LONG, DataType::STRING, true), - true, ), - StructField::new( + StructField::nullable( "q", DataType::struct_type([ - StructField::new( + StructField::nullable( "s", DataType::struct_type([ - StructField::new("u", DataType::LONG, true), - StructField::new("v", DataType::LONG, true), + StructField::nullable("u", DataType::LONG), + StructField::nullable("v", DataType::LONG), ]), - true, ), - StructField::new("t", DataType::LONG, true), + StructField::nullable("t", DataType::LONG), ]), - true, ), - StructField::new("r", DataType::LONG, true), + StructField::nullable("r", DataType::LONG), ]), - true, ), - StructField::new( + StructField::nullable( "c", MapType::new( DataType::LONG, DataType::struct_type([ - StructField::new("f", DataType::LONG, true), - StructField::new("g", DataType::STRING, true), + StructField::nullable("f", DataType::LONG), + StructField::nullable("g", DataType::STRING), ]), true, ), - true, ), ]); diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index 29e076c07..35c4a99f8 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -23,8 +23,8 @@ use std::sync::Arc; fn get_schema() -> StructType { StructType::new([ - StructField::new("id", DataType::INTEGER, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::INTEGER), + StructField::nullable("value", DataType::STRING), ]) } @@ -219,17 +219,17 @@ async fn incompatible_schemas_fail() { // The CDF schema has fields: `id: int` and `value: string`. // This commit has schema with fields: `id: long`, `value: string` and `year: int` (nullable). let schema = StructType::new([ - StructField::new("id", DataType::LONG, true), - StructField::new("value", DataType::STRING, true), - StructField::new("year", DataType::INTEGER, true), + StructField::nullable("id", DataType::LONG), + StructField::nullable("value", DataType::STRING), + StructField::nullable("year", DataType::INTEGER), ]); assert_incompatible_schema(schema, get_schema()).await; // The CDF schema has fields: `id: int` and `value: string`. // This commit has schema with fields: `id: long` and `value: string`. let schema = StructType::new([ - StructField::new("id", DataType::LONG, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::LONG), + StructField::nullable("value", DataType::STRING), ]); assert_incompatible_schema(schema, get_schema()).await; @@ -238,12 +238,12 @@ async fn incompatible_schemas_fail() { // The CDF schema has fields: `id: long` and `value: string`. // This commit has schema with fields: `id: int` and `value: string`. let cdf_schema = StructType::new([ - StructField::new("id", DataType::LONG, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::LONG), + StructField::nullable("value", DataType::STRING), ]); let commit_schema = StructType::new([ - StructField::new("id", DataType::INTEGER, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::INTEGER), + StructField::nullable("value", DataType::STRING), ]); assert_incompatible_schema(cdf_schema, commit_schema).await; @@ -252,16 +252,16 @@ async fn incompatible_schemas_fail() { // The CDF schema has fields: nullable `id` and nullable `value`. // This commit has schema with fields: non-nullable `id` and nullable `value`. let schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("value", DataType::STRING, true), + StructField::not_null("id", DataType::LONG), + StructField::nullable("value", DataType::STRING), ]); assert_incompatible_schema(schema, get_schema()).await; // The CDF schema has fields: `id: int` and `value: string`. // This commit has schema with fields:`id: string` and `value: string`. let schema = StructType::new([ - StructField::new("id", DataType::STRING, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::STRING), + StructField::nullable("value", DataType::STRING), ]); assert_incompatible_schema(schema, get_schema()).await; diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index dad2f4e9b..a855668d8 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -60,9 +60,9 @@ static ADD_CHANGE_TYPE: &str = "insert"; static REMOVE_CHANGE_TYPE: &str = "delete"; static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { [ - StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false), - StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false), - StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false), + StructField::not_null(CHANGE_TYPE_COL_NAME, DataType::STRING), + StructField::not_null(COMMIT_VERSION_COL_NAME, DataType::LONG), + StructField::not_null(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP), ] }); @@ -316,8 +316,8 @@ mod tests { let engine = Box::new(SyncEngine::new()); let table = Table::try_from_uri(path).unwrap(); let expected_schema = [ - StructField::new("part", DataType::INTEGER, true), - StructField::new("id", DataType::INTEGER, true), + StructField::nullable("part", DataType::INTEGER), + StructField::nullable("id", DataType::INTEGER), ] .into_iter() .chain(CDF_FIELDS.clone()); diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs index bc8488081..a953048a9 100644 --- a/kernel/src/table_changes/physical_to_logical.rs +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -69,7 +69,7 @@ pub(crate) fn scan_file_physical_schema( physical_schema: &StructType, ) -> SchemaRef { if scan_file.scan_type == CdfScanFileType::Cdc { - let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false); + let change_type = StructField::not_null(CHANGE_TYPE_COL_NAME, DataType::STRING); let fields = physical_schema.fields().cloned().chain(Some(change_type)); StructType::new(fields).into() } else { @@ -104,11 +104,11 @@ mod tests { commit_timestamp: 1234, }; let logical_schema = StructType::new([ - StructField::new("id", DataType::STRING, true), - StructField::new("age", DataType::LONG, false), - StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false), - StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false), - StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false), + StructField::nullable("id", DataType::STRING), + StructField::not_null("age", DataType::LONG), + StructField::not_null(CHANGE_TYPE_COL_NAME, DataType::STRING), + StructField::not_null(COMMIT_VERSION_COL_NAME, DataType::LONG), + StructField::not_null(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP), ]); let all_fields = vec![ ColumnType::Selected("id".to_string()), diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 9b0ba3067..dffd40f68 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -420,8 +420,8 @@ mod tests { assert_eq!( scan.logical_schema, StructType::new([ - StructField::new("id", DataType::INTEGER, true), - StructField::new("_commit_version", DataType::LONG, false), + StructField::nullable("id", DataType::INTEGER), + StructField::not_null("_commit_version", DataType::LONG), ]) .into() ); @@ -429,7 +429,7 @@ mod tests { scan.physical_predicate, PhysicalPredicate::Some( predicate, - StructType::new([StructField::new("id", DataType::INTEGER, true),]).into() + StructType::new([StructField::nullable("id", DataType::INTEGER),]).into() ) ); } diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index cc4514186..f428e09df 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -176,37 +176,37 @@ impl RowVisitor for CdfScanFileVisitor<'_, T> { pub(crate) fn cdf_scan_row_schema() -> SchemaRef { static CDF_SCAN_ROW_SCHEMA: LazyLock> = LazyLock::new(|| { let deletion_vector = StructType::new([ - StructField::new("storageType", DataType::STRING, true), - StructField::new("pathOrInlineDv", DataType::STRING, true), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, true), - StructField::new("cardinality", DataType::LONG, true), + StructField::nullable("storageType", DataType::STRING), + StructField::nullable("pathOrInlineDv", DataType::STRING), + StructField::nullable("offset", DataType::INTEGER), + StructField::nullable("sizeInBytes", DataType::INTEGER), + StructField::nullable("cardinality", DataType::LONG), ]); let partition_values = MapType::new(DataType::STRING, DataType::STRING, true); let file_constant_values = - StructType::new([StructField::new("partitionValues", partition_values, true)]); + StructType::new([StructField::nullable("partitionValues", partition_values)]); let add = StructType::new([ - StructField::new("path", DataType::STRING, true), - StructField::new("deletionVector", deletion_vector.clone(), true), - StructField::new("fileConstantValues", file_constant_values.clone(), true), + StructField::nullable("path", DataType::STRING), + StructField::nullable("deletionVector", deletion_vector.clone()), + StructField::nullable("fileConstantValues", file_constant_values.clone()), ]); let remove = StructType::new([ - StructField::new("path", DataType::STRING, true), - StructField::new("deletionVector", deletion_vector, true), - StructField::new("fileConstantValues", file_constant_values.clone(), true), + StructField::nullable("path", DataType::STRING), + StructField::nullable("deletionVector", deletion_vector), + StructField::nullable("fileConstantValues", file_constant_values.clone()), ]); let cdc = StructType::new([ - StructField::new("path", DataType::STRING, true), - StructField::new("fileConstantValues", file_constant_values, true), + StructField::nullable("path", DataType::STRING), + StructField::nullable("fileConstantValues", file_constant_values), ]); Arc::new(StructType::new([ - StructField::new("add", add, true), - StructField::new("remove", remove, true), - StructField::new("cdc", cdc, true), - StructField::new("timestamp", DataType::LONG, false), - StructField::new("commit_version", DataType::LONG, false), + StructField::nullable("add", add), + StructField::nullable("remove", remove), + StructField::nullable("cdc", cdc), + StructField::not_null("timestamp", DataType::LONG), + StructField::not_null("commit_version", DataType::LONG), ])) }); CDF_SCAN_ROW_SCHEMA.clone() @@ -334,8 +334,8 @@ mod tests { ) .unwrap(); let table_schema = StructType::new([ - StructField::new("id", DataType::INTEGER, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::INTEGER), + StructField::nullable("value", DataType::STRING), ]); let scan_data = table_changes_action_iter( Arc::new(engine), diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 6cc7ff38b..d74c2456a 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -277,10 +277,9 @@ fn generate_commit_info( // HACK (part 1/2): since we don't have proper map support, we create a literal struct with // one null field to create data that serializes as "operationParameters": {} Expression::literal(Scalar::Struct(StructData::try_new( - vec![StructField::new( + vec![StructField::nullable( "operation_parameter_int", DataType::INTEGER, - true, )], vec![Scalar::Null(DataType::INTEGER)], )?)), @@ -304,10 +303,9 @@ fn generate_commit_info( }; let engine_commit_info_schema = commit_info_data_type.project_as_struct(&["engineCommitInfo"])?; - let hack_data_type = DataType::Struct(Box::new(StructType::new(vec![StructField::new( + let hack_data_type = DataType::Struct(Box::new(StructType::new(vec![StructField::nullable( "hack_operation_parameter_int", DataType::INTEGER, - true, )]))); commit_info_data_type @@ -677,15 +675,14 @@ mod tests { fn test_write_metadata_schema() { let schema = get_write_metadata_schema(); let expected = StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new( + StructField::not_null("path", DataType::STRING), + StructField::not_null( "partitionValues", MapType::new(DataType::STRING, DataType::STRING, true), - false, ), - StructField::new("size", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), + StructField::not_null("size", DataType::LONG), + StructField::not_null("modificationTime", DataType::LONG), + StructField::not_null("dataChange", DataType::BOOLEAN), ]); assert_eq!(*schema, expected.into()); } diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index e62f8fd7c..2ee6dfdd5 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -147,10 +147,9 @@ async fn test_commit_info() -> Result<(), Box> { let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( + let schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table(store.clone(), table_location, schema, &[]).await?; @@ -201,10 +200,9 @@ async fn test_empty_commit() -> Result<(), Box> { let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( + let schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table(store.clone(), table_location, schema, &[]).await?; @@ -224,10 +222,9 @@ async fn test_invalid_commit_info() -> Result<(), Box> { let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( + let schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table(store.clone(), table_location, schema, &[]).await?; @@ -336,10 +333,9 @@ async fn test_append() -> Result<(), Box> { let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( + let schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table(store.clone(), table_location, schema.clone(), &[]).await?; @@ -466,13 +462,12 @@ async fn test_append_partitioned() -> Result<(), Box> { // create a simple partitioned table: one int column named 'number', partitioned by string // column named 'partition' let table_schema = Arc::new(StructType::new(vec![ - StructField::new("number", DataType::INTEGER, true), - StructField::new("partition", DataType::STRING, true), + StructField::nullable("number", DataType::INTEGER), + StructField::nullable("partition", DataType::STRING), ])); - let data_schema = Arc::new(StructType::new(vec![StructField::new( + let data_schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table( store.clone(), @@ -611,16 +606,14 @@ async fn test_append_invalid_schema() -> Result<(), Box> let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let table_schema = Arc::new(StructType::new(vec![StructField::new( + let table_schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); // incompatible data schema: one string column named 'string' - let data_schema = Arc::new(StructType::new(vec![StructField::new( + let data_schema = Arc::new(StructType::new(vec![StructField::nullable( "string", DataType::STRING, - true, )])); let table = create_table(store.clone(), table_location, table_schema.clone(), &[]).await?;