Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Vacuum command in DeltaTable #207

Merged
merged 3 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,15 @@ def metadata(self) -> Metadata:

def vacuum(self, retention_hours: int, dry_run: bool = True) -> List[str]:
"""
Run the Vacuum command on the Delta Table: lists files no longer referenced by the Delta table and are older than the retention threshold.
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.

:param retention_hours: the retention threshold in hours
:param dry_run: when activated, lists only the files, removed otherwise
:param dry_run: when activated, list only the files, delete otherwise
:return: the list of files no longer referenced by the Delta Table and are older than the retention threshold.
"""
if retention_hours < 0:
raise ValueError("The retention periods should be positive.")

if not dry_run:
raise NotImplementedError("Only Vacuum with dry_run is available.")
return self._table.vacuum(dry_run, retention_hours)

def pyarrow_schema(self) -> pyarrow.Schema:
Expand Down
14 changes: 5 additions & 9 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,11 @@ impl RawDeltaTable {
.map_err(|_| PyDeltaTableError::new_err("Got invalid table schema"))
}

/// Run the Vacuum command on the Delta Table: lists and removes files no longer referenced by the Delta table and are older than the retention threshold.
pub fn vacuum(&self, dry_run: bool, retention_hours: u64) -> PyResult<Vec<String>> {
match dry_run {
true => Ok(self
._table
.vacuum_dry_run(retention_hours)
.map_err(PyDeltaTableError::from_raw)?),
false => unimplemented!("Only Vacuum with dry_run is available."),
}
/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
pub fn vacuum(&mut self, dry_run: bool, retention_hours: u64) -> PyResult<Vec<String>> {
rt()?
.block_on(self._table.vacuum(retention_hours, dry_run))
.map_err(PyDeltaTableError::from_raw)
}

pub fn arrow_schema_json(&self) -> PyResult<String> {
Expand Down
9 changes: 3 additions & 6 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def test_vacuum_dry_run_simple_table():
table_path = "../rust/tests/data/delta-0.2.0"
dt = DeltaTable(table_path)
retention_periods = 169
assert dt.vacuum(retention_periods) == [
tombstones = dt.vacuum(retention_periods)
tombstones.sort()
assert tombstones == [
"../rust/tests/data/delta-0.2.0/part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet",
"../rust/tests/data/delta-0.2.0/part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet",
"../rust/tests/data/delta-0.2.0/part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet",
Expand All @@ -41,11 +43,6 @@ def test_vacuum_dry_run_simple_table():
== "Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
)

retention_periods = 167
with pytest.raises(Exception) as exception:
dt.vacuum(retention_periods, dry_run=False)
assert str(exception.value) == "Only Vacuum with dry_run is available."


def test_read_partitioned_table_metadata():
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
Expand Down
68 changes: 65 additions & 3 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,8 @@ impl DeltaTable {
self.state.min_writer_version
}

/// Run the dry run of the Vacuum command on the Delta Table: list files no longer referenced by a Delta table and are older than the retention threshold.
/// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed.
pub fn vacuum_dry_run(&self, retention_hours: u64) -> Result<Vec<String>, DeltaTableError> {
/// List files no longer referenced by a Delta table and are older than the retention threshold.
fn get_stale_files(&self, retention_hours: u64) -> Result<Vec<String>, DeltaTableError> {
if retention_hours < 168 {
return Err(DeltaTableError::InvalidVacuumRetentionPeriod);
}
Expand All @@ -707,6 +706,69 @@ impl DeltaTable {
.collect::<Vec<String>>())
}

/// Whether a path should be hidden for delta-related file operations, such as Vacuum.
/// Names of the form partitionCol=[value] are partition directories, and should be
/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter)
/// indexes and these must be deleted when the data they are tied to is deleted.
fn is_hidden_directory(&self, path_name: &str) -> Result<bool, DeltaTableError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lack of unit tests here make me uneasy, considering the importance not deleting data that shouldn't be deleted 😆

For the purposes of vacuum is it insufficient to just delete files ending in .parquet 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree! We should add integration tests to validate the different scenarios.

Ok(
(path_name.starts_with(&self.storage.join_path(&self.table_path, "."))
|| path_name.starts_with(&self.storage.join_path(&self.table_path, "_")))
&& !path_name
.starts_with(&self.storage.join_path(&self.table_path, "_delta_index"))
&& !path_name
.starts_with(&self.storage.join_path(&self.table_path, "_change_data"))
&& !self
.state
.current_metadata
.as_ref()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns
.iter()
.any(|partition_column| {
path_name.starts_with(
&self.storage.join_path(&self.table_path, partition_column),
)
}),
)
}

/// Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold.
/// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed.
pub async fn vacuum(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is clean, nice work!

&mut self,
retention_hours: u64,
dry_run: bool,
) -> Result<Vec<String>, DeltaTableError> {
let tombstones_path = self.get_stale_files(retention_hours)?;
Copy link
Member

@houqp houqp May 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick, i think expired_tombstones would be a better name here so readers won't mix this up with valid tombstones that are not supposed to be deleted.


let mut tombstones = vec![];
let mut all_files = self.storage.list_objs(&self.table_path).await?;
while let Some(obj_meta) = all_files.next().await {
let obj_meta = obj_meta?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to strip table path from obj_meta.path right after here and only work with object path like how it's done in the scala implementation, this way, we can avoid all the self.storage.join_path calls, which can get expensive if we are dealing with 100k+ objects.

let is_not_valid_file = !self.get_file_paths().contains(&obj_meta.path);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fvaleye this will result in performance issue right? we are recreating a new string vector on every listed s3 object here? would be better to construct a hashset based on the return of get_file_paths before entering the while loop.

let is_valid_tombstone = tombstones_path.contains(&obj_meta.path);
let is_not_hidden_directory = !self.is_hidden_directory(&obj_meta.path)?;
if is_not_valid_file && is_valid_tombstone && is_not_hidden_directory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.is_hidden_directory is always invoked in this tight loop even when previous two conditions are not met, would be better to perform early exit of the loop using continue so we don't call self.is_hidden_directory when it's not needed. This method builds serval strings at runtime, which is an expensive routine.

tombstones.push(obj_meta.path);
}
}

if dry_run {
return Ok(tombstones);
}

for tombstone in &tombstones {
match self.storage.delete_obj(&tombstone).await {
Ok(_) => continue,
Err(StorageError::NotFound) => continue,
Err(err) => return Err(DeltaTableError::StorageError { source: err }),
}
}

Ok(tombstones)
}

/// Return table schema parsed from transaction log. Return None if table hasn't been loaded or
/// no metadata was found in the log.
pub fn schema(&self) -> Option<&Schema> {
Expand Down
23 changes: 12 additions & 11 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,34 +193,35 @@ async fn read_delta_8_0_table_with_partitions() {

#[tokio::test]
async fn vacuum_delta_8_0_table() {
let table = deltalake::open_table("./tests/data/delta-0.8.0")
let mut table = deltalake::open_table("./tests/data/delta-0.8.0")
.await
.unwrap();

let retention_hours = 169;
let retention_hours = 1;
let backend = FileStorageBackend::new("./tests/data/delta-0.8.0");
let dry_run = true;

assert!(matches!(
table.vacuum(retention_hours, dry_run).await.unwrap_err(),
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod,
));

let retention_hours = 169;

assert_eq!(
table.vacuum_dry_run(retention_hours).unwrap(),
table.vacuum(retention_hours, dry_run).await.unwrap(),
vec![backend.join_path(
"./tests/data/delta-0.8.0",
"part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"
)]
);

let retention_hours = 1;

assert!(matches!(
table.vacuum_dry_run(retention_hours).unwrap_err(),
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod,
));

let retention_hours = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
/ 3600;
let empty: Vec<String> = Vec::new();

assert_eq!(table.vacuum_dry_run(retention_hours).unwrap(), empty);
assert_eq!(table.vacuum(retention_hours, dry_run).await.unwrap(), empty);
}