Skip to content

Commit

Permalink
fix: reading cdf from a checkpointed table
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Carman <[email protected]>
  • Loading branch information
hntd187 committed Jan 9, 2025
1 parent 0b90a11 commit a3fa752
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 16 deletions.
3 changes: 3 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ pub enum DeltaTableError {

#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },

#[error("No starting version or timestamp provided for CDC")]
NoStartingVersionOrTimestamp,
}

impl From<object_store::path::Error> for DeltaTableError {
Expand Down
93 changes: 82 additions & 11 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct CdfLoadBuilder {
/// Columns to project
columns: Option<Vec<String>>,
/// Version to read from
starting_version: i64,
starting_version: Option<i64>,
/// Version to stop reading at
ending_version: Option<i64>,
/// Starting timestamp of commits to accept
Expand All @@ -56,7 +56,7 @@ impl CdfLoadBuilder {
snapshot,
log_store,
columns: None,
starting_version: 0,
starting_version: None,
ending_version: None,
starting_timestamp: None,
ending_timestamp: None,
Expand All @@ -67,7 +67,7 @@ impl CdfLoadBuilder {

/// Version to start at (version 0 if not provided)
pub fn with_starting_version(mut self, starting_version: i64) -> Self {
self.starting_version = starting_version;
self.starting_version = Some(starting_version);
self
}

Expand Down Expand Up @@ -107,6 +107,25 @@ impl CdfLoadBuilder {
self
}

async fn calculate_earliest_version(&self) -> DeltaResult<i64> {
let ts = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
for v in 0..self.snapshot.version() {
if let Ok(Some(bytes)) = self.log_store.read_commit_entry(v).await {
if let Ok(actions) = get_actions(v, bytes).await {
if actions.iter().any(|action| match action {
Action::CommitInfo(CommitInfo {
timestamp: Some(t), ..
}) if ts.timestamp_millis() < *t => true,
_ => false,
}) {
return Ok(v);
}
}
}
}
Ok(0)
}

/// This is a rust version of https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala#L418
/// Which iterates through versions of the delta table collects the relevant actions / commit info and returns those
/// groupings for later use. The scala implementation has a lot more edge case handling and read schema checking (and just error checking in general)
Expand All @@ -118,8 +137,16 @@ impl CdfLoadBuilder {
Vec<CdcDataSpec<Add>>,
Vec<CdcDataSpec<Remove>>,
)> {
let start = self.starting_version;
let latest_version = self.log_store.get_latest_version(0).await?; // Start from 0 since if start > latest commit, the returned commit is not a valid commit
if self.starting_version.is_none() && self.starting_timestamp.is_none() {
return Err(DeltaTableError::NoStartingVersionOrTimestamp);
}
let start = if let Some(s) = self.starting_version {
s
} else {
self.calculate_earliest_version().await?
};
let latest_version = self.log_store.get_latest_version(start).await?; // Start from 0 since if start > latest commit, the returned commit is not a valid commit

let mut end = self.ending_version.unwrap_or(latest_version);

let mut change_files: Vec<CdcDataSpec<AddCDCFile>> = vec![];
Expand All @@ -130,19 +157,18 @@ impl CdfLoadBuilder {
end = latest_version;
}

if start > latest_version {
if end < start {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::InvalidVersion(start))
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
};
}

if end < start {
if start >= latest_version {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
Err(DeltaTableError::InvalidVersion(start))
};
}

Expand All @@ -151,7 +177,7 @@ impl CdfLoadBuilder {
.ending_timestamp
.unwrap_or(DateTime::from(SystemTime::now()));

// Check that starting_timestmp is within boundaries of the latest version
// Check that starting_timestamp is within boundaries of the latest version
let latest_snapshot_bytes = self
.log_store
.read_commit_entry(latest_version)
Expand Down Expand Up @@ -296,6 +322,7 @@ impl CdfLoadBuilder {
Some(ScalarValue::Utf8(Some(String::from("insert"))))
}

#[inline]
fn get_remove_action_type() -> Option<ScalarValue> {
Some(ScalarValue::Utf8(Some(String::from("delete"))))
}
Expand Down Expand Up @@ -520,6 +547,7 @@ pub(crate) mod tests {
.await?
.load_cdf()
.with_session_ctx(ctx.clone())
.with_starting_version(0)
.with_ending_timestamp(starting_timestamp.and_utc())
.build()
.await?;
Expand Down Expand Up @@ -732,6 +760,49 @@ pub(crate) mod tests {
Ok(())
}

#[tokio::test]
async fn test_load_vacuumed_table() -> TestResult {
let ending_timestamp = NaiveDateTime::from_str("2024-01-06T15:44:59.570")?;
let ctx = SessionContext::new();
let table = DeltaOps::try_from_uri("../test/tests/data/checkpoint-cdf-table")
.await?
.load_cdf()
.with_session_ctx(ctx.clone())
.with_starting_timestamp(ending_timestamp.and_utc())
.build()
.await?;

let batches = collect_batches(
table.properties().output_partitioning().partition_count(),
table,
ctx,
)
.await?;

assert_batches_sorted_eq! {
["+----+--------+------------------+-----------------+-------------------------+------------+",
"| id | name | _change_type | _commit_version | _commit_timestamp | birthday |",
"+----+--------+------------------+-----------------+-------------------------+------------+",
"| 11 | Ossama | update_preimage | 5 | 2025-01-06T16:38:19.623 | 2024-12-30 |",
"| 12 | Ossama | update_postimage | 5 | 2025-01-06T16:38:19.623 | 2024-12-30 |",
"| 7 | Dennis | delete | 3 | 2024-01-06T16:44:59.570 | 2023-12-29 |",
"| 14 | Zach | update_preimage | 5 | 2025-01-06T16:38:19.623 | 2023-12-25 |",
"| 15 | Zach | update_postimage | 5 | 2025-01-06T16:38:19.623 | 2023-12-25 |",
"| 13 | Ryan | update_preimage | 5 | 2025-01-06T16:38:19.623 | 2023-12-22 |",
"| 14 | Ryan | update_postimage | 5 | 2025-01-06T16:38:19.623 | 2023-12-22 |",
"| 12 | Nick | update_preimage | 5 | 2025-01-06T16:38:19.623 | 2023-12-29 |",
"| 13 | Nick | update_postimage | 5 | 2025-01-06T16:38:19.623 | 2023-12-29 |",
"| 11 | Ossama | insert | 4 | 2025-01-06T16:33:18.167 | 2024-12-30 |",
"| 12 | Nick | insert | 4 | 2025-01-06T16:33:18.167 | 2023-12-29 |",
"| 13 | Ryan | insert | 4 | 2025-01-06T16:33:18.167 | 2023-12-22 |",
"| 14 | Zach | insert | 4 | 2025-01-06T16:33:18.167 | 2023-12-25 |",
"+----+--------+------------------+-----------------+-------------------------+------------+"],
&batches
}

Ok(())
}

#[tokio::test]
async fn test_use_remove_actions_for_deletions() -> TestResult {
let delta_schema = TestSchemas::simple();
Expand Down
10 changes: 6 additions & 4 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,22 +779,24 @@ impl RawDeltaTable {
Ok(())
}

#[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, allow_out_of_range = false))]
#[pyo3(signature = (starting_version = None, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, allow_out_of_range = false))]
#[allow(clippy::too_many_arguments)]
pub fn load_cdf(
&self,
py: Python,
starting_version: i64,
starting_version: Option<i64>,
ending_version: Option<i64>,
starting_timestamp: Option<String>,
ending_timestamp: Option<String>,
columns: Option<Vec<String>>,
allow_out_of_range: bool,
) -> PyResult<PyArrowType<ArrowArrayStreamReader>> {
let ctx = SessionContext::new();
let mut cdf_read = CdfLoadBuilder::new(self.log_store()?, self.cloned_state()?)
.with_starting_version(starting_version);
let mut cdf_read = CdfLoadBuilder::new(self.log_store()?, self.cloned_state()?);

if let Some(sv) = starting_version {
cdf_read = cdf_read.with_starting_version(sv);
}
if let Some(ev) = ending_version {
cdf_read = cdf_read.with_ending_version(ev);
}
Expand Down
2 changes: 1 addition & 1 deletion python/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use pyo3::prelude::*;

use crate::{error::PythonError, utils::rt, RawDeltaTable};

/// PyQueryBuilder supports the _experimental_ `QueryBuilder` Pythoh interface which allows users
/// PyQueryBuilder supports the _experimental_ `QueryBuilder` Python interface which allows users
/// to take advantage of the [Apache DataFusion](https://datafusion.apache.org) engine already
/// present in the Python package.
#[pyclass(module = "deltalake._internal")]
Expand Down

0 comments on commit a3fa752

Please sign in to comment.