diff --git a/rust/src/checkpoints.rs b/rust/src/checkpoints.rs index 78ca3207e5..e93fcb154d 100644 --- a/rust/src/checkpoints.rs +++ b/rust/src/checkpoints.rs @@ -4,19 +4,27 @@ use arrow::datatypes::Schema as ArrowSchema; use arrow::error::ArrowError; use arrow::json::reader::Decoder; use chrono::Datelike; +use chrono::Duration; +use chrono::Utc; +use chrono::MIN_DATETIME; +use futures::StreamExt; +use lazy_static::lazy_static; use log::*; use parquet::arrow::ArrowWriter; use parquet::errors::ParquetError; use parquet::file::writer::InMemoryWriteableCursor; +use regex::Regex; use serde_json::Value; use std::collections::HashMap; use std::convert::TryFrom; +use std::iter::Iterator; +use std::ops::Add; use super::action; use super::delta_arrow::delta_log_schema_for_table; use super::open_table_with_version; use super::schema::*; -use super::storage::{StorageBackend, StorageError}; +use super::storage::{ObjectMeta, StorageBackend, StorageError}; use super::table_state::DeltaTableState; use super::writer::time_utils; use super::{CheckPoint, DeltaTableError}; @@ -88,6 +96,18 @@ pub async fn create_checkpoint_from_table_uri( table_uri, ) .await?; + + if table.version >= 0 { + let deleted_log_num = cleanup_expired_logs( + table.version + 1, + table.storage.as_ref(), + table.get_state(), + table_uri, + ) + .await?; + debug!("Deleted {:?} log files.", deleted_log_num); + } + Ok(()) } @@ -100,6 +120,34 @@ pub async fn create_checkpoint_from_table(table: &DeltaTable) -> Result<(), Chec &table.table_uri, ) .await?; + + if table.version >= 0 { + let deleted_log_num = cleanup_expired_logs( + table.version + 1, + table.storage.as_ref(), + table.get_state(), + &table.table_uri, + ) + .await?; + debug!("Deleted {:?} log files.", deleted_log_num); + } + + Ok(()) +} + +/// Creates checkpoint at `table.version` for given `table`, without deleting expired log files. +/// Exposed for tests. +pub async fn create_checkpoint_from_table_without_cleaning_logs( + table: &DeltaTable, +) -> Result<(), CheckpointError> { + create_checkpoint( + table.version, + table.get_state(), + table.storage.as_ref(), + &table.table_uri, + ) + .await?; + Ok(()) } @@ -140,6 +188,150 @@ async fn create_checkpoint( Ok(()) } +async fn flush_delete_files bool>( + storage: &dyn StorageBackend, + maybe_delete_files: &mut Vec<(DeltaDataTypeVersion, ObjectMeta)>, + files_to_delete: &mut Vec<(DeltaDataTypeVersion, ObjectMeta)>, + should_delete_file: T, +) -> Result { + if !maybe_delete_files.is_empty() && should_delete_file(maybe_delete_files.last().unwrap()) { + files_to_delete.append(maybe_delete_files); + } + + let deleted = files_to_delete + .iter_mut() + .map(|file| async move { + match storage.delete_obj(&file.1.path).await { + Ok(_) => Ok(1), + Err(e) => Err(DeltaTableError::from(e)), + } + }) + .collect::>(); + + let mut deleted_num = 0; + for x in deleted { + match x.await { + Ok(_) => deleted_num += 1, + Err(e) => return Err(e), + } + } + + files_to_delete.clear(); + + Ok(deleted_num) +} + +/// Delete expires log files before given version from table. The table log retention is based on +/// the `logRetentionDuration` property of the Delta Table, 30 days by default. +async fn cleanup_expired_logs( + version: DeltaDataTypeVersion, + storage: &dyn StorageBackend, + state: &DeltaTableState, + table_uri: &str, +) -> Result { + lazy_static! { + static ref DELTA_LOG_REGEX: Regex = + Regex::new(r#"^*[/\\]_delta_log[/\\](\d{20})\.(json|checkpoint)*$"#).unwrap(); + } + + let log_retention_timestamp = Utc::now().timestamp_millis() - state.log_retention_millis(); + let mut deleted_log_num = 0; + + // Get file objects from table. + let log_uri = storage.join_path(table_uri, "_delta_log"); + let mut candidates: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new(); + let mut stream = storage.list_objs(&log_uri).await?; + while let Some(obj_meta) = stream.next().await { + let obj_meta = obj_meta?; + + let ts = obj_meta.modified.timestamp(); + + if let Some(captures) = DELTA_LOG_REGEX.captures(&obj_meta.path) { + let log_ver_str = captures.get(1).unwrap().as_str(); + let log_ver: DeltaDataTypeVersion = log_ver_str.parse().unwrap(); + if log_ver < version && ts <= log_retention_timestamp { + candidates.push((log_ver, obj_meta)); + } + } + } + + // Sort files by file object version. + candidates.sort_by(|a, b| a.0.cmp(&b.0)); + + let mut last_file: (i64, ObjectMeta) = ( + 0, + ObjectMeta { + path: String::new(), + modified: MIN_DATETIME, + }, + ); + let file_needs_time_adjustment = + |current_file: &(i64, ObjectMeta), last_file: &(i64, ObjectMeta)| { + last_file.0 < current_file.0 + && last_file.1.modified.timestamp() >= current_file.1.modified.timestamp() + }; + + let should_delete_file = |file: &(i64, ObjectMeta)| { + file.1.modified.timestamp() <= log_retention_timestamp && file.0 < version + }; + + let mut maybe_delete_files: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new(); + let mut files_to_delete: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new(); + + // Init + if !candidates.is_empty() { + let removed = candidates.remove(0); + last_file = (removed.0, removed.1.clone()); + maybe_delete_files.push(removed); + } + + let mut current_file: (DeltaDataTypeVersion, ObjectMeta); + loop { + if candidates.is_empty() { + deleted_log_num += flush_delete_files( + storage, + &mut maybe_delete_files, + &mut files_to_delete, + should_delete_file, + ) + .await?; + + return Ok(deleted_log_num); + } + current_file = candidates.remove(0); + + if file_needs_time_adjustment(¤t_file, &last_file) { + let updated = ( + current_file.0, + ObjectMeta { + path: current_file.1.path.clone(), + modified: last_file.1.modified.add(Duration::seconds(1)), + }, + ); + maybe_delete_files.push(updated); + last_file = ( + maybe_delete_files.last().unwrap().0, + maybe_delete_files.last().unwrap().1.clone(), + ); + } else { + let deleted = flush_delete_files( + storage, + &mut maybe_delete_files, + &mut files_to_delete, + should_delete_file, + ) + .await?; + if deleted == 0 { + return Ok(deleted_log_num); + } + deleted_log_num += deleted; + + maybe_delete_files.push(current_file.clone()); + last_file = current_file; + } + } +} + fn parquet_bytes_from_state(state: &DeltaTableState) -> Result, CheckpointError> { let current_metadata = state .current_metadata() diff --git a/rust/src/delta_config.rs b/rust/src/delta_config.rs index 7a0103949c..2682635c7a 100644 --- a/rust/src/delta_config.rs +++ b/rust/src/delta_config.rs @@ -18,6 +18,11 @@ lazy_static! { /// still needs to read old files. pub static ref TOMBSTONE_RETENTION: DeltaConfig = DeltaConfig::new("deletedFileRetentionDuration", "interval 1 week"); + + /// The shortest duration we have to keep delta files around before deleting them. We can only + /// delete delta files that are before a compaction. We may keep files beyond this duration until + /// the next calendar day. + pub static ref LOG_RETENTION: DeltaConfig = DeltaConfig::new("logRetentionDuration", "interval 30 day"); } /// Delta configuration error diff --git a/rust/src/storage/file/rename.rs b/rust/src/storage/file/rename.rs index 825fc35b63..64f4c76f65 100644 --- a/rust/src/storage/file/rename.rs +++ b/rust/src/storage/file/rename.rs @@ -30,6 +30,11 @@ mod imp { return Err(StorageError::AlreadyExists(to_path)); } std::fs::rename(&from_path, &to_path).map_err(|e| { + let to_exists = std::fs::metadata(&to_path).is_ok(); + if to_exists { + return StorageError::AlreadyExists(to_path); + } + StorageError::other_std_io_err(format!( "failed to rename {} to {}: {}", from_path, to_path, e diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index d0a07e6859..6f4be6461e 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -481,6 +481,15 @@ pub struct ObjectMeta { pub modified: DateTime, } +impl Clone for ObjectMeta { + fn clone(&self) -> Self { + Self { + path: self.path.clone(), + modified: self.modified, + } + } +} + /// Abstractions for underlying blob storages hosting the Delta table. To add support for new cloud /// or local storage systems, simply implement this trait. #[async_trait::async_trait] diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 1f1fadcd94..84ab3ab799 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -31,6 +31,7 @@ pub struct DeltaTableState { min_writer_version: i32, current_metadata: Option, tombstone_retention_millis: DeltaDataTypeLong, + log_retention_millis: DeltaDataTypeLong, } impl DeltaTableState { @@ -92,6 +93,10 @@ impl DeltaTableState { self.tombstone_retention_millis } + pub fn log_retention_millis(&self) -> DeltaDataTypeLong { + self.log_retention_millis + } + /// Full list of tombstones (remove actions) representing files removed from table state). pub fn all_tombstones(&self) -> &HashSet { &self.tombstones @@ -204,6 +209,9 @@ impl DeltaTableState { self.tombstone_retention_millis = delta_config::TOMBSTONE_RETENTION .get_interval_from_metadata(&md)? .as_millis() as i64; + self.log_retention_millis = delta_config::LOG_RETENTION + .get_interval_from_metadata(&md)? + .as_millis() as i64; self.current_metadata = Some(md); } action::Action::txn(v) => { @@ -242,6 +250,7 @@ mod tests { min_writer_version: 2, app_transaction_version, tombstone_retention_millis: 0, + log_retention_millis: 0, }; let txn_action = action::Action::txn(action::Txn { diff --git a/rust/tests/checkpoint_writer_test.rs b/rust/tests/checkpoint_writer_test.rs index cec1019cfa..6c8775a6ea 100644 --- a/rust/tests/checkpoint_writer_test.rs +++ b/rust/tests/checkpoint_writer_test.rs @@ -33,7 +33,7 @@ mod simple_checkpoint { .unwrap(); // Write a checkpoint - checkpoints::create_checkpoint_from_table(&table) + checkpoints::create_checkpoint_from_table_without_cleaning_logs(&table) .await .unwrap(); @@ -46,7 +46,7 @@ mod simple_checkpoint { assert_eq!(5, version); table.load_version(10).await.unwrap(); - checkpoints::create_checkpoint_from_table(&table) + checkpoints::create_checkpoint_from_table_without_cleaning_logs(&table) .await .unwrap(); @@ -99,6 +99,46 @@ mod simple_checkpoint { } } +mod delete_expired_delta_log_in_checkpoint { + use super::*; + + #[tokio::test] + async fn test_delete_expired_logs() { + let mut table = fs_common::create_table( + "./tests/data/checkpoints_with_expired_logs/expired", + Some(hashmap! { + delta_config::LOG_RETENTION.key.clone() => Some("interval 1 second".to_string()) + }), + ) + .await; + + let a1 = fs_common::add(3 * 60 * 1000); // 3 mins ago, + let a2 = fs_common::add(2 * 60 * 1000); // 2 mins ago, + assert_eq!(1, fs_common::commit_add(&mut table, &a1).await); + assert_eq!(2, fs_common::commit_add(&mut table, &a2).await); + + table.load_version(0).await.expect("Cannot load version 0"); + table.load_version(1).await.expect("Cannot load version 1"); + + checkpoints::create_checkpoint_from_table(&table) + .await + .unwrap(); + table.update().await.unwrap(); // make table to read the checkpoint + assert_eq!(table.get_files(), vec![a1.path.as_str(), a2.path.as_str()]); + + table + .load_version(0) + .await + .expect_err("Should not load version 0"); + table + .load_version(1) + .await + .expect_err("Should not load version 1"); + + table.load_version(2).await.expect("Cannot load version 2"); + } +} + mod checkpoints_with_tombstones { use super::*;