Skip to content

Commit

Permalink
Add Vacuum command in DeltaTable
Browse files Browse the repository at this point in the history
  • Loading branch information
fvaleye committed Apr 26, 2021
1 parent e18ec7c commit d4b25d5
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 33 deletions.
6 changes: 2 additions & 4 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,15 @@ def metadata(self) -> Metadata:

def vacuum(self, retention_hours: int, dry_run: bool = True) -> List[str]:
"""
Run the Vacuum command on the Delta Table: lists files no longer referenced by the Delta table and are older than the retention threshold.
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
:param retention_hours: the retention threshold in hours
:param dry_run: when activated, lists only the files, removed otherwise
:param dry_run: when activated, list only the files, delete otherwise
:return: the list of files no longer referenced by the Delta Table and are older than the retention threshold.
"""
if retention_hours < 0:
raise ValueError("The retention periods should be positive.")

if not dry_run:
raise NotImplementedError("Only Vacuum with dry_run is available.")
return self._table.vacuum(dry_run, retention_hours)

def pyarrow_schema(self) -> pyarrow.Schema:
Expand Down
14 changes: 5 additions & 9 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,11 @@ impl RawDeltaTable {
.map_err(|_| PyDeltaTableError::new_err("Got invalid table schema"))
}

/// Run the Vacuum command on the Delta Table: lists and removes files no longer referenced by the Delta table and are older than the retention threshold.
pub fn vacuum(&self, dry_run: bool, retention_hours: u64) -> PyResult<Vec<String>> {
match dry_run {
true => Ok(self
._table
.vacuum_dry_run(retention_hours)
.map_err(PyDeltaTableError::from_raw)?),
false => unimplemented!("Only Vacuum with dry_run is available."),
}
/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
pub fn vacuum(&mut self, dry_run: bool, retention_hours: u64) -> PyResult<Vec<String>> {
rt()?
.block_on(self._table.vacuum(retention_hours, dry_run))
.map_err(PyDeltaTableError::from_raw)
}

pub fn arrow_schema_json(&self) -> PyResult<String> {
Expand Down
5 changes: 0 additions & 5 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ def test_vacuum_dry_run_simple_table():
== "Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
)

retention_periods = 167
with pytest.raises(Exception) as exception:
dt.vacuum(retention_periods, dry_run=False)
assert str(exception.value) == "Only Vacuum with dry_run is available."


def test_read_partitioned_table_metadata():
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
Expand Down
47 changes: 43 additions & 4 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,9 +610,11 @@ impl DeltaTable {
self.state.min_writer_version
}

/// Run the dry run of the Vacuum command on the Delta Table: list files no longer referenced by a Delta table and are older than the retention threshold.
/// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed.
pub fn vacuum_dry_run(&self, retention_hours: u64) -> Result<Vec<String>, DeltaTableError> {
/// List files no longer referenced by a Delta table and are older than the retention threshold.
fn get_stale_files(
&self,
retention_hours: u64,
) -> Result<Vec<&action::Remove>, DeltaTableError> {
if retention_hours < 168 {
return Err(DeltaTableError::InvalidVacuumRetentionPeriod);
}
Expand All @@ -627,8 +629,45 @@ impl DeltaTable {
.get_tombstones()
.iter()
.filter(|tombstone| tombstone.deletionTimestamp < delete_before_timestamp)
.collect())
}

/// Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold.
/// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed.
pub async fn vacuum(
&mut self,
retention_hours: u64,
dry_run: bool,
) -> Result<Vec<String>, DeltaTableError> {
let tombstones = self.get_stale_files(retention_hours)?;
let tombstones_path = tombstones
.iter()
.map(|tombstone| self.storage.join_path(&self.table_path, &tombstone.path))
.collect::<Vec<String>>())
.collect::<Vec<String>>();

if dry_run {
return Ok(tombstones_path);
}

let mut deleted_tombstones = vec![];
for tombstone in tombstones {
match self
.storage
.delete_obj(&self.storage.join_path(&self.table_path, &tombstone.path))
.await
{
Ok(_) => deleted_tombstones.push(tombstone.clone()),
Err(StorageError::NotFound) => deleted_tombstones.push(tombstone.clone()),
Err(err) => return Err(DeltaTableError::StorageError { source: err }),
}
}

self.state
.tombstones
.retain(|tombstone| !deleted_tombstones.contains(tombstone));
//TODO: Commit the new state

Ok(tombstones_path)
}

pub fn schema(&self) -> Option<&Schema> {
Expand Down
23 changes: 12 additions & 11 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,34 +193,35 @@ async fn read_delta_8_0_table_with_partitions() {

#[tokio::test]
async fn vacuum_delta_8_0_table() {
let table = deltalake::open_table("./tests/data/delta-0.8.0")
let mut table = deltalake::open_table("./tests/data/delta-0.8.0")
.await
.unwrap();

let retention_hours = 169;
let retention_hours = 1;
let backend = FileStorageBackend::new("./tests/data/delta-0.8.0");
let dry_run = true;

assert!(matches!(
table.vacuum(retention_hours, dry_run).await.unwrap_err(),
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod,
));

let retention_hours = 169;

assert_eq!(
table.vacuum_dry_run(retention_hours).unwrap(),
table.vacuum(retention_hours, dry_run).await.unwrap(),
vec![backend.join_path(
"./tests/data/delta-0.8.0",
"part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"
)]
);

let retention_hours = 1;

assert!(matches!(
table.vacuum_dry_run(retention_hours).unwrap_err(),
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod,
));

let retention_hours = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
/ 3600;
let empty: Vec<String> = Vec::new();

assert_eq!(table.vacuum_dry_run(retention_hours).unwrap(), empty);
assert_eq!(table.vacuum(retention_hours, dry_run).await.unwrap(), empty);
}

0 comments on commit d4b25d5

Please sign in to comment.