Skip to content

Commit

Permalink
Clean up expired delta table commit logs after checkpoint (#484)
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya authored Nov 16, 2021
1 parent 7e5aaa7 commit 8a0475c
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 3 deletions.
194 changes: 193 additions & 1 deletion rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@ use arrow::datatypes::Schema as ArrowSchema;
use arrow::error::ArrowError;
use arrow::json::reader::Decoder;
use chrono::Datelike;
use chrono::Duration;
use chrono::Utc;
use chrono::MIN_DATETIME;
use futures::StreamExt;
use lazy_static::lazy_static;
use log::*;
use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
use parquet::file::writer::InMemoryWriteableCursor;
use regex::Regex;
use serde_json::Value;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::iter::Iterator;
use std::ops::Add;

use super::action;
use super::delta_arrow::delta_log_schema_for_table;
use super::open_table_with_version;
use super::schema::*;
use super::storage::{StorageBackend, StorageError};
use super::storage::{ObjectMeta, StorageBackend, StorageError};
use super::table_state::DeltaTableState;
use super::writer::time_utils;
use super::{CheckPoint, DeltaTableError};
Expand Down Expand Up @@ -88,6 +96,18 @@ pub async fn create_checkpoint_from_table_uri(
table_uri,
)
.await?;

if table.version >= 0 {
let deleted_log_num = cleanup_expired_logs(
table.version + 1,
table.storage.as_ref(),
table.get_state(),
table_uri,
)
.await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}

Ok(())
}

Expand All @@ -100,6 +120,34 @@ pub async fn create_checkpoint_from_table(table: &DeltaTable) -> Result<(), Chec
&table.table_uri,
)
.await?;

if table.version >= 0 {
let deleted_log_num = cleanup_expired_logs(
table.version + 1,
table.storage.as_ref(),
table.get_state(),
&table.table_uri,
)
.await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}

Ok(())
}

/// Creates checkpoint at `table.version` for given `table`, without deleting expired log files.
/// Exposed for tests.
pub async fn create_checkpoint_from_table_without_cleaning_logs(
table: &DeltaTable,
) -> Result<(), CheckpointError> {
create_checkpoint(
table.version,
table.get_state(),
table.storage.as_ref(),
&table.table_uri,
)
.await?;

Ok(())
}

Expand Down Expand Up @@ -140,6 +188,150 @@ async fn create_checkpoint(
Ok(())
}

async fn flush_delete_files<T: Fn(&(DeltaDataTypeVersion, ObjectMeta)) -> bool>(
storage: &dyn StorageBackend,
maybe_delete_files: &mut Vec<(DeltaDataTypeVersion, ObjectMeta)>,
files_to_delete: &mut Vec<(DeltaDataTypeVersion, ObjectMeta)>,
should_delete_file: T,
) -> Result<i32, DeltaTableError> {
if !maybe_delete_files.is_empty() && should_delete_file(maybe_delete_files.last().unwrap()) {
files_to_delete.append(maybe_delete_files);
}

let deleted = files_to_delete
.iter_mut()
.map(|file| async move {
match storage.delete_obj(&file.1.path).await {
Ok(_) => Ok(1),
Err(e) => Err(DeltaTableError::from(e)),
}
})
.collect::<Vec<_>>();

let mut deleted_num = 0;
for x in deleted {
match x.await {
Ok(_) => deleted_num += 1,
Err(e) => return Err(e),
}
}

files_to_delete.clear();

Ok(deleted_num)
}

/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
async fn cleanup_expired_logs(
version: DeltaDataTypeVersion,
storage: &dyn StorageBackend,
state: &DeltaTableState,
table_uri: &str,
) -> Result<i32, DeltaTableError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"^*[/\\]_delta_log[/\\](\d{20})\.(json|checkpoint)*$"#).unwrap();
}

let log_retention_timestamp = Utc::now().timestamp_millis() - state.log_retention_millis();
let mut deleted_log_num = 0;

// Get file objects from table.
let log_uri = storage.join_path(table_uri, "_delta_log");
let mut candidates: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new();
let mut stream = storage.list_objs(&log_uri).await?;
while let Some(obj_meta) = stream.next().await {
let obj_meta = obj_meta?;

let ts = obj_meta.modified.timestamp();

if let Some(captures) = DELTA_LOG_REGEX.captures(&obj_meta.path) {
let log_ver_str = captures.get(1).unwrap().as_str();
let log_ver: DeltaDataTypeVersion = log_ver_str.parse().unwrap();
if log_ver < version && ts <= log_retention_timestamp {
candidates.push((log_ver, obj_meta));
}
}
}

// Sort files by file object version.
candidates.sort_by(|a, b| a.0.cmp(&b.0));

let mut last_file: (i64, ObjectMeta) = (
0,
ObjectMeta {
path: String::new(),
modified: MIN_DATETIME,
},
);
let file_needs_time_adjustment =
|current_file: &(i64, ObjectMeta), last_file: &(i64, ObjectMeta)| {
last_file.0 < current_file.0
&& last_file.1.modified.timestamp() >= current_file.1.modified.timestamp()
};

let should_delete_file = |file: &(i64, ObjectMeta)| {
file.1.modified.timestamp() <= log_retention_timestamp && file.0 < version
};

let mut maybe_delete_files: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new();
let mut files_to_delete: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new();

// Init
if !candidates.is_empty() {
let removed = candidates.remove(0);
last_file = (removed.0, removed.1.clone());
maybe_delete_files.push(removed);
}

let mut current_file: (DeltaDataTypeVersion, ObjectMeta);
loop {
if candidates.is_empty() {
deleted_log_num += flush_delete_files(
storage,
&mut maybe_delete_files,
&mut files_to_delete,
should_delete_file,
)
.await?;

return Ok(deleted_log_num);
}
current_file = candidates.remove(0);

if file_needs_time_adjustment(&current_file, &last_file) {
let updated = (
current_file.0,
ObjectMeta {
path: current_file.1.path.clone(),
modified: last_file.1.modified.add(Duration::seconds(1)),
},
);
maybe_delete_files.push(updated);
last_file = (
maybe_delete_files.last().unwrap().0,
maybe_delete_files.last().unwrap().1.clone(),
);
} else {
let deleted = flush_delete_files(
storage,
&mut maybe_delete_files,
&mut files_to_delete,
should_delete_file,
)
.await?;
if deleted == 0 {
return Ok(deleted_log_num);
}
deleted_log_num += deleted;

maybe_delete_files.push(current_file.clone());
last_file = current_file;
}
}
}

fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, CheckpointError> {
let current_metadata = state
.current_metadata()
Expand Down
5 changes: 5 additions & 0 deletions rust/src/delta_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ lazy_static! {
/// still needs to read old files.
pub static ref TOMBSTONE_RETENTION: DeltaConfig =
DeltaConfig::new("deletedFileRetentionDuration", "interval 1 week");

/// The shortest duration we have to keep delta files around before deleting them. We can only
/// delete delta files that are before a compaction. We may keep files beyond this duration until
/// the next calendar day.
pub static ref LOG_RETENTION: DeltaConfig = DeltaConfig::new("logRetentionDuration", "interval 30 day");
}

/// Delta configuration error
Expand Down
5 changes: 5 additions & 0 deletions rust/src/storage/file/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ mod imp {
return Err(StorageError::AlreadyExists(to_path));
}
std::fs::rename(&from_path, &to_path).map_err(|e| {
let to_exists = std::fs::metadata(&to_path).is_ok();
if to_exists {
return StorageError::AlreadyExists(to_path);
}

StorageError::other_std_io_err(format!(
"failed to rename {} to {}: {}",
from_path, to_path, e
Expand Down
9 changes: 9 additions & 0 deletions rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,15 @@ pub struct ObjectMeta {
pub modified: DateTime<Utc>,
}

impl Clone for ObjectMeta {
fn clone(&self) -> Self {
Self {
path: self.path.clone(),
modified: self.modified,
}
}
}

/// Abstractions for underlying blob storages hosting the Delta table. To add support for new cloud
/// or local storage systems, simply implement this trait.
#[async_trait::async_trait]
Expand Down
9 changes: 9 additions & 0 deletions rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct DeltaTableState {
min_writer_version: i32,
current_metadata: Option<DeltaTableMetaData>,
tombstone_retention_millis: DeltaDataTypeLong,
log_retention_millis: DeltaDataTypeLong,
}

impl DeltaTableState {
Expand Down Expand Up @@ -92,6 +93,10 @@ impl DeltaTableState {
self.tombstone_retention_millis
}

pub fn log_retention_millis(&self) -> DeltaDataTypeLong {
self.log_retention_millis
}

/// Full list of tombstones (remove actions) representing files removed from table state).
pub fn all_tombstones(&self) -> &HashSet<action::Remove> {
&self.tombstones
Expand Down Expand Up @@ -204,6 +209,9 @@ impl DeltaTableState {
self.tombstone_retention_millis = delta_config::TOMBSTONE_RETENTION
.get_interval_from_metadata(&md)?
.as_millis() as i64;
self.log_retention_millis = delta_config::LOG_RETENTION
.get_interval_from_metadata(&md)?
.as_millis() as i64;
self.current_metadata = Some(md);
}
action::Action::txn(v) => {
Expand Down Expand Up @@ -242,6 +250,7 @@ mod tests {
min_writer_version: 2,
app_transaction_version,
tombstone_retention_millis: 0,
log_retention_millis: 0,
};

let txn_action = action::Action::txn(action::Txn {
Expand Down
44 changes: 42 additions & 2 deletions rust/tests/checkpoint_writer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod simple_checkpoint {
.unwrap();

// Write a checkpoint
checkpoints::create_checkpoint_from_table(&table)
checkpoints::create_checkpoint_from_table_without_cleaning_logs(&table)
.await
.unwrap();

Expand All @@ -46,7 +46,7 @@ mod simple_checkpoint {
assert_eq!(5, version);

table.load_version(10).await.unwrap();
checkpoints::create_checkpoint_from_table(&table)
checkpoints::create_checkpoint_from_table_without_cleaning_logs(&table)
.await
.unwrap();

Expand Down Expand Up @@ -99,6 +99,46 @@ mod simple_checkpoint {
}
}

mod delete_expired_delta_log_in_checkpoint {
use super::*;

#[tokio::test]
async fn test_delete_expired_logs() {
let mut table = fs_common::create_table(
"./tests/data/checkpoints_with_expired_logs/expired",
Some(hashmap! {
delta_config::LOG_RETENTION.key.clone() => Some("interval 1 second".to_string())
}),
)
.await;

let a1 = fs_common::add(3 * 60 * 1000); // 3 mins ago,
let a2 = fs_common::add(2 * 60 * 1000); // 2 mins ago,
assert_eq!(1, fs_common::commit_add(&mut table, &a1).await);
assert_eq!(2, fs_common::commit_add(&mut table, &a2).await);

table.load_version(0).await.expect("Cannot load version 0");
table.load_version(1).await.expect("Cannot load version 1");

checkpoints::create_checkpoint_from_table(&table)
.await
.unwrap();
table.update().await.unwrap(); // make table to read the checkpoint
assert_eq!(table.get_files(), vec![a1.path.as_str(), a2.path.as_str()]);

table
.load_version(0)
.await
.expect_err("Should not load version 0");
table
.load_version(1)
.await
.expect_err("Should not load version 1");

table.load_version(2).await.expect("Cannot load version 2");
}
}

mod checkpoints_with_tombstones {
use super::*;

Expand Down

0 comments on commit 8a0475c

Please sign in to comment.