Skip to content

Commit

Permalink
add test case for deeply nested struct
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jan 25, 2025
1 parent 80db872 commit 3ababc3
Showing 1 changed file with 62 additions and 17 deletions.
79 changes: 62 additions & 17 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,17 @@ impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {

#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::types::Int32Type;
use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray};
use arrow_buffer::NullBuffer;
use arrow_schema::DataType;
use arrow_schema::{DataType, Field, Fields};
use arrow_select::concat::concat_batches;
use itertools::Itertools;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
use uuid::Uuid;
Expand Down Expand Up @@ -657,7 +659,7 @@ mod test {
#[tokio::test]
async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> {
// prepare data
// Int, Struct(Int)
// Int, Struct(Int), Struct(Struct(Int))
let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![
Expand All @@ -673,28 +675,70 @@ mod test {
.into()])),
)
.into(),
NestedField::optional(
3,
"col2",
Type::Struct(StructType::new(vec![NestedField::optional(
4,
"sub_struct_col",
Type::Struct(StructType::new(vec![NestedField::optional(
5,
"sub_sub_col",
Type::Primitive(PrimitiveType::Int),
)
.into()])),
)
.into()])),
)
.into(),
])
.build()
.unwrap();
let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
// null 1
// 2 null(struct)
// 3 null(field)
// null 1 null(struct)
// 2 null(struct) null(sub_struct_col)
// 3 null(field) null(sub_sub_col)
let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef;
let nulls = NullBuffer::from(vec![true, false, true]);
let col1 = Arc::new(StructArray::new(
if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
fields.clone()
} else {
unreachable!()
},
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
Some(nulls),
));
let columns = vec![col0, col1];
let col1 = {
let nulls = NullBuffer::from(vec![true, false, true]);
Arc::new(StructArray::new(
if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() {
fields.clone()
} else {
unreachable!()
},
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
Some(nulls),
))
};
let col2 = {
let inner_col = {
let nulls = NullBuffer::from(vec![true, false, true]);
Arc::new(StructArray::new(
Fields::from(vec![Field::new("sub_sub_col", DataType::Int32, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)]))]),
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
Some(nulls),
))
};
let nulls = NullBuffer::from(vec![false, true, true]);
Arc::new(StructArray::new(
if let DataType::Struct(fields) = arrow_schema.fields.get(2).unwrap().data_type() {
fields.clone()
} else {
unreachable!()
},
vec![inner_col],
Some(nulls),
))
};
let columns = vec![col0, col1, col2];

let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
let equality_ids = vec![0_i32, 2];
let equality_ids = vec![0_i32, 2, 5];
let equality_config =
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
let projector = equality_config.projector.clone();
Expand All @@ -705,6 +749,7 @@ mod test {
RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![
Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef,
Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef,
Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef,
])
.unwrap();
assert_eq!(to_write_projected, expect_batch);
Expand Down

0 comments on commit 3ababc3

Please sign in to comment.