diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 4a2df18ee8..63e724e1e6 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -18,7 +18,9 @@ use tracing::{debug, error}; use super::{time_utils, ProtocolError}; use crate::kernel::arrow::delta_log_schema_for_table; -use crate::kernel::{Action, Add as AddAction, DataType, PrimitiveType, Remove, StructField, Txn}; +use crate::kernel::{ + Action, Add as AddAction, DataType, PrimitiveType, Protocol, Remove, StructField, Txn, +}; use crate::logstore::LogStore; use crate::table::state::DeltaTableState; use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder}; @@ -274,39 +276,52 @@ fn parquet_bytes_from_state( } let files = state.file_actions().unwrap(); // protocol - let jsons = std::iter::once(Action::Protocol(state.protocol().clone())) - // metaData - .chain(std::iter::once(Action::Metadata(current_metadata.clone()))) - // txns - .chain( - state - .app_transaction_version() - .iter() - .map(|(app_id, version)| { - Action::Txn(Txn { - app_id: app_id.clone(), - version: *version, - last_updated: None, - }) - }), - ) - // removes - .chain(tombstones.iter().map(|r| { - let mut r = (*r).clone(); - - // As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly. - // https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314 - if r.extended_file_metadata.is_none() { - r.extended_file_metadata = Some(false); - } + let jsons = std::iter::once(Action::Protocol(Protocol { + min_reader_version: state.protocol().min_reader_version, + min_writer_version: state.protocol().min_writer_version, + writer_features: if state.protocol().min_writer_version >= 7 { + Some(state.protocol().writer_features.clone().unwrap_or_default()) + } else { + None + }, + reader_features: if state.protocol().min_reader_version >= 3 { + Some(state.protocol().reader_features.clone().unwrap_or_default()) + } else { + None + }, + })) + // metaData + .chain(std::iter::once(Action::Metadata(current_metadata.clone()))) + // txns + .chain( + state + .app_transaction_version() + .iter() + .map(|(app_id, version)| { + Action::Txn(Txn { + app_id: app_id.clone(), + version: *version, + last_updated: None, + }) + }), + ) + // removes + .chain(tombstones.iter().map(|r| { + let mut r = (*r).clone(); + + // As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly. + // https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314 + if r.extended_file_metadata.is_none() { + r.extended_file_metadata = Some(false); + } - Action::Remove(r) - })) - .map(|a| serde_json::to_value(a).map_err(ProtocolError::from)) - // adds - .chain(files.iter().map(|f| { - checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions) - })); + Action::Remove(r) + })) + .map(|a| serde_json::to_value(a).map_err(ProtocolError::from)) + // adds + .chain(files.iter().map(|f| { + checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions) + })); // Create the arrow schema that represents the Checkpoint parquet file. let arrow_schema = delta_log_schema_for_table( diff --git a/python/Cargo.toml b/python/Cargo.toml index 4a9b141e48..7c91cdc890 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.16.1" +version = "0.16.2" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" diff --git a/python/tests/pyspark_integration/test_writer_readable.py b/python/tests/pyspark_integration/test_writer_readable.py index 3ade57c6e9..6d1a528ee6 100644 --- a/python/tests/pyspark_integration/test_writer_readable.py +++ b/python/tests/pyspark_integration/test_writer_readable.py @@ -96,3 +96,37 @@ def test_issue_1591_roundtrip_special_characters(tmp_path: pathlib.Path): loaded = DeltaTable(spark_path).to_pandas() assert loaded.shape == data.shape + + +@pytest.mark.pyspark +@pytest.mark.integration +def test_read_checkpointed_table(tmp_path: pathlib.Path): + data = pa.table( + { + "int": pa.array([1]), + } + ) + write_deltalake(tmp_path, data) + + dt = DeltaTable(tmp_path) + dt.create_checkpoint() + + assert_spark_read_equal(data, str(tmp_path), ["int"]) + + +@pytest.mark.pyspark +@pytest.mark.integration +def test_read_checkpointed_features_table(tmp_path: pathlib.Path): + from datetime import datetime + + data = pa.table( + { + "timestamp": pa.array([datetime(2010, 1, 1)]), + } + ) + write_deltalake(tmp_path, data) + + dt = DeltaTable(tmp_path) + dt.create_checkpoint() + + assert_spark_read_equal(data, str(tmp_path), ["timestamp"]) diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 1cf4b45dd6..65f6a4405c 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -3,6 +3,7 @@ import pathlib import pyarrow as pa +import pyarrow.parquet as pq from deltalake import DeltaTable, write_deltalake @@ -105,3 +106,31 @@ def test_features_maintained_after_checkpoint(tmp_path: pathlib.Path): assert protocol_after_checkpoint.reader_features == ["timestampNtz"] assert current_protocol == protocol_after_checkpoint + + +def test_features_null_on_below_v3_v7(tmp_path: pathlib.Path): + data = pa.table( + { + "int": pa.array([1]), + } + ) + write_deltalake(tmp_path, data) + + dt = DeltaTable(tmp_path) + current_protocol = dt.protocol() + + dt.create_checkpoint() + + dt = DeltaTable(tmp_path) + protocol_after_checkpoint = dt.protocol() + + assert protocol_after_checkpoint.reader_features is None + assert protocol_after_checkpoint.writer_features is None + assert current_protocol == protocol_after_checkpoint + + checkpoint = pq.read_table( + os.path.join(tmp_path, "_delta_log/00000000000000000000.checkpoint.parquet") + ) + + assert checkpoint["protocol"][0]["writerFeatures"].as_py() is None + assert checkpoint["protocol"][0]["readerFeatures"].as_py() is None