Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support metadata table "Manifests" #861

Merged
merged 17 commits into from
Jan 2, 2025
251 changes: 240 additions & 11 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

use std::sync::Arc;

use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::builder::{
BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
};
use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};

use crate::spec::TableMetadata;
use crate::table::Table;
use crate::Result;

Expand All @@ -45,19 +46,18 @@ impl MetadataTable {

/// Get the snapshots table.
pub fn snapshots(&self) -> SnapshotsTable {
SnapshotsTable {
metadata_table: self,
}
SnapshotsTable { table: &self.0 }
}

fn metadata(&self) -> &TableMetadata {
self.0.metadata()
/// Get the manifests table.
pub fn manifests(&self) -> ManifestsTable {
ManifestsTable { table: &self.0 }
}
}

/// Snapshots table.
pub struct SnapshotsTable<'a> {
metadata_table: &'a MetadataTable,
table: &'a Table,
}

impl<'a> SnapshotsTable<'a> {
Expand Down Expand Up @@ -104,7 +104,7 @@ impl<'a> SnapshotsTable<'a> {
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

for snapshot in self.metadata_table.metadata().snapshots() {
for snapshot in self.table.metadata().snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
Expand All @@ -128,6 +128,133 @@ impl<'a> SnapshotsTable<'a> {
}
}

/// Manifests table.
pub struct ManifestsTable<'a> {
table: &'a Table,
}

impl<'a> ManifestsTable<'a> {
fn partition_summary_fields(&self) -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Field::new("lower_bound", DataType::Utf8, true),
Field::new("upper_bound", DataType::Utf8, true),
]
}

/// Returns the schema of the manifests table.
pub fn schema(&self) -> Schema {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return iceberg schema here, and user could easily convert it to arrow schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return iceberg schema here, and user could easily convert it to arrow schema.

Would you like to create an issue for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #868

Schema::new(vec![
Field::new("content", DataType::Int8, false),
Field::new("path", DataType::Utf8, false),
Field::new("length", DataType::Int64, false),
Field::new("partition_spec_id", DataType::Int32, false),
Field::new("added_snapshot_id", DataType::Int64, false),
Field::new("added_data_files_count", DataType::Int32, false),
Field::new("existing_data_files_count", DataType::Int32, false),
Field::new("deleted_data_files_count", DataType::Int32, false),
Field::new("added_delete_files_count", DataType::Int32, false),
Field::new("existing_delete_files_count", DataType::Int32, false),
Field::new("deleted_delete_files_count", DataType::Int32, false),
Field::new(
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
false,
))),
false,
),
])
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<RecordBatch> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(self.partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
false,
)));

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i8);
path.append_value(manifest.manifest_path.clone());
length.append_value(manifest.manifest_length);
partition_spec_id.append_value(manifest.partition_spec_id);
added_snapshot_id.append_value(manifest.added_snapshot_id);
added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
existing_data_files_count
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_data_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
added_delete_files_count
.append_value(manifest.added_files_count.unwrap_or(0) as i32);
existing_delete_files_count
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_delete_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);

let partition_summaries_builder = partition_summaries.values();
for summary in &manifest.partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
partition_summaries.append(true);
}
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Arc::new(partition_spec_id.finish()),
Arc::new(added_snapshot_id.finish()),
Arc::new(added_data_files_count.finish()),
Arc::new(existing_data_files_count.finish()),
Arc::new(deleted_data_files_count.finish()),
Arc::new(added_delete_files_count.finish()),
Arc::new(existing_delete_files_count.finish()),
Arc::new(deleted_delete_files_count.finish()),
Arc::new(partition_summaries.finish()),
])?)
}
}

#[cfg(test)]
mod tests {
use expect_test::{expect, Expect};
Expand Down Expand Up @@ -253,4 +380,106 @@ mod tests {
Some("committed_at"),
);
}

#[tokio::test]
async fn test_manifests_table() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let record_batch = fixture
.table
.metadata_table()
.manifests()
.scan()
.await
.unwrap();

check_record_batch(
record_batch,
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
expect![[r#"
content: PrimitiveArray<Int8>
[
0,
],
path: (skipped),
length: (skipped),
partition_spec_id: PrimitiveArray<Int32>
[
0,
],
added_snapshot_id: PrimitiveArray<Int64>
[
3055729675574597004,
],
added_data_files_count: PrimitiveArray<Int32>
[
1,
],
existing_data_files_count: PrimitiveArray<Int32>
[
1,
],
deleted_data_files_count: PrimitiveArray<Int32>
[
1,
],
added_delete_files_count: PrimitiveArray<Int32>
[
1,
],
existing_delete_files_count: PrimitiveArray<Int32>
[
1,
],
deleted_delete_files_count: PrimitiveArray<Int32>
[
1,
],
partition_summaries: ListArray
[
StructArray
-- validity:
[
valid,
]
[
-- child 0: "contains_null" (Boolean)
BooleanArray
[
false,
]
-- child 1: "contains_nan" (Boolean)
BooleanArray
[
false,
]
-- child 2: "lower_bound" (Utf8)
StringArray
[
"100",
]
-- child 3: "upper_bound" (Utf8)
StringArray
[
"300",
]
],
]"#]],
&["path", "length"],
Some("path"),
);
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ pub mod tests {
.unwrap()
}

async fn setup_manifest_files(&mut self) {
pub async fn setup_manifest_files(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let parent_snapshot = current_snapshot
.parent_snapshot(self.table.metadata())
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ impl ManifestFile {
}

/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests
#[derive(Debug, PartialEq, Clone, Eq)]
#[derive(Debug, PartialEq, Clone, Copy, Eq)]
pub enum ManifestContentType {
/// The manifest content is data.
Data = 0,
Expand Down
Loading