Skip to content

Commit

Permalink
Add Vacuum command in DeltaTable
Browse files Browse the repository at this point in the history
  • Loading branch information
fvaleye committed Apr 28, 2021
1 parent e18ec7c commit 2a3e3d7
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 32 deletions.
6 changes: 2 additions & 4 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,15 @@ def metadata(self) -> Metadata:

def vacuum(self, retention_hours: int, dry_run: bool = True) -> List[str]:
"""
Run the Vacuum command on the Delta Table: lists files no longer referenced by the Delta table and are older than the retention threshold.
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
:param retention_hours: the retention threshold in hours
:param dry_run: when activated, lists only the files, removed otherwise
:param dry_run: when activated, list only the files, delete otherwise
:return: the list of files no longer referenced by the Delta Table and are older than the retention threshold.
"""
if retention_hours < 0:
raise ValueError("The retention periods should be positive.")

if not dry_run:
raise NotImplementedError("Only Vacuum with dry_run is available.")
return self._table.vacuum(dry_run, retention_hours)

def pyarrow_schema(self) -> pyarrow.Schema:
Expand Down
14 changes: 5 additions & 9 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,11 @@ impl RawDeltaTable {
.map_err(|_| PyDeltaTableError::new_err("Got invalid table schema"))
}

/// Run the Vacuum command on the Delta Table: lists and removes files no longer referenced by the Delta table and are older than the retention threshold.
pub fn vacuum(&self, dry_run: bool, retention_hours: u64) -> PyResult<Vec<String>> {
match dry_run {
true => Ok(self
._table
.vacuum_dry_run(retention_hours)
.map_err(PyDeltaTableError::from_raw)?),
false => unimplemented!("Only Vacuum with dry_run is available."),
}
/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
pub fn vacuum(&mut self, dry_run: bool, retention_hours: u64) -> PyResult<Vec<String>> {
rt()?
.block_on(self._table.vacuum(retention_hours, dry_run))
.map_err(PyDeltaTableError::from_raw)
}

pub fn arrow_schema_json(&self) -> PyResult<String> {
Expand Down
5 changes: 0 additions & 5 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ def test_vacuum_dry_run_simple_table():
== "Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
)

retention_periods = 167
with pytest.raises(Exception) as exception:
dt.vacuum(retention_periods, dry_run=False)
assert str(exception.value) == "Only Vacuum with dry_run is available."


def test_read_partitioned_table_metadata():
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
Expand Down
68 changes: 65 additions & 3 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,9 +610,8 @@ impl DeltaTable {
self.state.min_writer_version
}

/// Run the dry run of the Vacuum command on the Delta Table: list files no longer referenced by a Delta table and are older than the retention threshold.
/// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed.
pub fn vacuum_dry_run(&self, retention_hours: u64) -> Result<Vec<String>, DeltaTableError> {
/// List files no longer referenced by a Delta table and are older than the retention threshold.
fn get_stale_files(&self, retention_hours: u64) -> Result<Vec<String>, DeltaTableError> {
if retention_hours < 168 {
return Err(DeltaTableError::InvalidVacuumRetentionPeriod);
}
Expand All @@ -631,6 +630,69 @@ impl DeltaTable {
.collect::<Vec<String>>())
}

/// Whether a path should be hidden for delta-related file operations, such as Vacuum.
/// Names of the form partitionCol=[value] are partition directories, and should be
/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter)
/// indexes and these must be deleted when the data they are tied to is deleted.
fn is_hidden_directory(&self, path_name: &str) -> Result<bool, DeltaTableError> {
Ok(
(path_name.starts_with(&self.storage.join_path(&self.table_path, "."))
|| path_name.starts_with(&self.storage.join_path(&self.table_path, "_")))
&& !path_name
.starts_with(&self.storage.join_path(&self.table_path, "_delta_index"))
&& !path_name
.starts_with(&self.storage.join_path(&self.table_path, "_change_data"))
&& !self
.state
.current_metadata
.as_ref()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns
.iter()
.any(|partition_column| {
path_name.starts_with(
&self.storage.join_path(&self.table_path, partition_column),
)
}),
)
}

/// Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold.
/// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed.
pub async fn vacuum(
&mut self,
retention_hours: u64,
dry_run: bool,
) -> Result<Vec<String>, DeltaTableError> {
let tombstones_path = self.get_stale_files(retention_hours)?;

let mut tombstones = vec![];
let mut all_files = self.storage.list_objs(&self.table_path).await?;
while let Some(obj_meta) = all_files.next().await {
let obj_meta = obj_meta?;
let is_not_valid_file = !self.get_file_paths().contains(&obj_meta.path);
let is_valid_tombstone = tombstones_path.contains(&obj_meta.path);
let is_not_hidden_directory = !self.is_hidden_directory(&obj_meta.path)?;
if is_not_valid_file && is_valid_tombstone && is_not_hidden_directory {
tombstones.push(obj_meta.path);
}
}

if dry_run {
return Ok(tombstones);
}

for tombstone in &tombstones {
match self.storage.delete_obj(&tombstone).await {
Ok(_) => continue,
Err(StorageError::NotFound) => continue,
Err(err) => return Err(DeltaTableError::StorageError { source: err }),
}
}

Ok(tombstones)
}

pub fn schema(&self) -> Option<&Schema> {
self.state.current_metadata.as_ref().map(|m| &m.schema)
}
Expand Down
23 changes: 12 additions & 11 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,34 +193,35 @@ async fn read_delta_8_0_table_with_partitions() {

#[tokio::test]
async fn vacuum_delta_8_0_table() {
let table = deltalake::open_table("./tests/data/delta-0.8.0")
let mut table = deltalake::open_table("./tests/data/delta-0.8.0")
.await
.unwrap();

let retention_hours = 169;
let retention_hours = 1;
let backend = FileStorageBackend::new("./tests/data/delta-0.8.0");
let dry_run = true;

assert!(matches!(
table.vacuum(retention_hours, dry_run).await.unwrap_err(),
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod,
));

let retention_hours = 169;

assert_eq!(
table.vacuum_dry_run(retention_hours).unwrap(),
table.vacuum(retention_hours, dry_run).await.unwrap(),
vec![backend.join_path(
"./tests/data/delta-0.8.0",
"part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"
)]
);

let retention_hours = 1;

assert!(matches!(
table.vacuum_dry_run(retention_hours).unwrap_err(),
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod,
));

let retention_hours = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
/ 3600;
let empty: Vec<String> = Vec::new();

assert_eq!(table.vacuum_dry_run(retention_hours).unwrap(), empty);
assert_eq!(table.vacuum(retention_hours, dry_run).await.unwrap(), empty);
}

0 comments on commit 2a3e3d7

Please sign in to comment.