From e5dd8e2167b94e6856aa531d878584397d5bea69 Mon Sep 17 00:00:00 2001 From: Cole Mackenzie Date: Tue, 20 Jun 2023 14:39:07 -0700 Subject: [PATCH] fix: add `sizeInBytes` to _last_checkpoint and change `size` to # of actions (#1477) # Description The `size` field should be the number of actions stored in the checkpoint while `sizeInBytes` is used for the total size in bytes. Added `CheckPointBuilder` to make the creation of these easier to use. # Related Issue(s) - Closes #1468 # Documentation https://github.com/delta-io/delta/blob/master/PROTOCOL.md#last-checkpoint-file --- rust/src/action/checkpoints.rs | 25 +++++++------ rust/src/action/mod.rs | 12 ++----- rust/src/delta.rs | 65 ++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 20 deletions(-) diff --git a/rust/src/action/checkpoints.rs b/rust/src/action/checkpoints.rs index 499e49e6c7..18054fa1ba 100644 --- a/rust/src/action/checkpoints.rs +++ b/rust/src/action/checkpoints.rs @@ -1,8 +1,14 @@ //! Implementation for writing delta checkpoints. +use std::collections::HashMap; +use std::convert::TryFrom; +use std::iter::Iterator; +use std::ops::Add; + use arrow::datatypes::Schema as ArrowSchema; use arrow::error::ArrowError; use arrow::json::ReaderBuilder; + use chrono::{DateTime, Datelike, Duration, Utc}; use futures::StreamExt; use lazy_static::lazy_static; @@ -12,17 +18,13 @@ use parquet::arrow::ArrowWriter; use parquet::errors::ParquetError; use regex::Regex; use serde_json::Value; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::iter::Iterator; -use std::ops::Add; use super::{Action, Add as AddAction, MetaData, Protocol, ProtocolError, Txn}; use crate::delta_arrow::delta_log_schema_for_table; -use crate::schema::*; use crate::storage::DeltaObjectStore; use crate::table_state::DeltaTableState; use crate::{open_table_with_version, time_utils, CheckPoint, DeltaTable}; +use crate::{schema::*, CheckPointBuilder}; type SchemaPath = Vec; @@ -118,9 +120,7 @@ async fn create_checkpoint_for( let last_checkpoint_path = storage.log_path().child("_last_checkpoint"); debug!("Writing parquet bytes to checkpoint buffer."); - let parquet_bytes = parquet_bytes_from_state(state)?; - let size = parquet_bytes.len() as i64; - let checkpoint = CheckPoint::new(version, size, None); + let (checkpoint, parquet_bytes) = parquet_bytes_from_state(state)?; let file_name = format!("{version:020}.checkpoint.parquet"); let checkpoint_path = storage.log_path().child(file_name); @@ -281,7 +281,9 @@ pub async fn cleanup_expired_logs_for( } } -fn parquet_bytes_from_state(state: &DeltaTableState) -> Result { +fn parquet_bytes_from_state( + state: &DeltaTableState, +) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> { let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?; let partition_col_data_types = current_metadata.get_partition_col_data_types(); @@ -375,7 +377,10 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result cp.unwrap().version { - cp = Some(CheckPoint { - version: curr_ver, - size: 0, - parts: None, - }); + cp = Some(CheckPoint::new(curr_ver, 0, None)); } continue; } @@ -810,11 +806,7 @@ pub(crate) async fn find_latest_check_point_for_version( if cp.is_none() || curr_ver > cp.unwrap().version { let parts_str = captures.get(2).unwrap().as_str(); let parts = parts_str.parse().unwrap(); - cp = Some(CheckPoint { - version: curr_ver, - size: 0, - parts: Some(parts), - }); + cp = Some(CheckPoint::new(curr_ver, 0, Some(parts))); } continue; } diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 9a068d839b..dbb9f78d35 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -42,8 +42,71 @@ pub use crate::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion}; pub struct CheckPoint { /// Delta table version pub(crate) version: i64, // 20 digits decimals + /// The number of actions that are stored in the checkpoint. pub(crate) size: i64, + /// The number of fragments if the last checkpoint was written in multiple parts. This field is optional. pub(crate) parts: Option, // 10 digits decimals + /// The number of bytes of the checkpoint. This field is optional. + pub(crate) size_in_bytes: Option, + /// The number of AddFile actions in the checkpoint. This field is optional. + pub(crate) num_of_add_files: Option, +} + +/// Builder for CheckPoint +pub struct CheckPointBuilder { + /// Delta table version + pub(crate) version: i64, // 20 digits decimals + /// The number of actions that are stored in the checkpoint. + pub(crate) size: i64, + /// The number of fragments if the last checkpoint was written in multiple parts. This field is optional. + pub(crate) parts: Option, // 10 digits decimals + /// The number of bytes of the checkpoint. This field is optional. + pub(crate) size_in_bytes: Option, + /// The number of AddFile actions in the checkpoint. This field is optional. + pub(crate) num_of_add_files: Option, +} + +impl CheckPointBuilder { + /// Creates a new [`CheckPointBuilder`] instance with the provided `version` and `size`. + /// Size is the total number of actions in the checkpoint. See size_in_bytes for total size in bytes. + pub fn new(version: i64, size: i64) -> Self { + CheckPointBuilder { + version, + size, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + } + } + + /// The number of fragments if the last checkpoint was written in multiple parts. This field is optional. + pub fn with_parts(mut self, parts: u32) -> Self { + self.parts = Some(parts); + self + } + + /// The number of bytes of the checkpoint. This field is optional. + pub fn with_size_in_bytes(mut self, size_in_bytes: i64) -> Self { + self.size_in_bytes = Some(size_in_bytes); + self + } + + /// The number of AddFile actions in the checkpoint. This field is optional. + pub fn with_num_of_add_files(mut self, num_of_add_files: i64) -> Self { + self.num_of_add_files = Some(num_of_add_files); + self + } + + /// Build the final [`CheckPoint`] struct. + pub fn build(self) -> CheckPoint { + CheckPoint { + version: self.version, + size: self.size, + parts: self.parts, + size_in_bytes: self.size_in_bytes, + num_of_add_files: self.num_of_add_files, + } + } } impl CheckPoint { @@ -53,6 +116,8 @@ impl CheckPoint { version, size, parts, + size_in_bytes: None, + num_of_add_files: None, } } }