diff --git a/python/deltalake/table.py b/python/deltalake/table.py index a2d577a799..e3d1b197c5 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -161,17 +161,15 @@ def metadata(self) -> Metadata: def vacuum(self, retention_hours: int, dry_run: bool = True) -> List[str]: """ - Run the Vacuum command on the Delta Table: lists files no longer referenced by the Delta table and are older than the retention threshold. + Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. :param retention_hours: the retention threshold in hours - :param dry_run: when activated, lists only the files, removed otherwise + :param dry_run: when activated, list only the files, delete otherwise :return: the list of files no longer referenced by the Delta Table and are older than the retention threshold. """ if retention_hours < 0: raise ValueError("The retention periods should be positive.") - if not dry_run: - raise NotImplementedError("Only Vacuum with dry_run is available.") return self._table.vacuum(dry_run, retention_hours) def pyarrow_schema(self) -> pyarrow.Schema: diff --git a/python/src/lib.rs b/python/src/lib.rs index 9745f7a45e..f1648d7fe4 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -144,15 +144,11 @@ impl RawDeltaTable { .map_err(|_| PyDeltaTableError::new_err("Got invalid table schema")) } - /// Run the Vacuum command on the Delta Table: lists and removes files no longer referenced by the Delta table and are older than the retention threshold. - pub fn vacuum(&self, dry_run: bool, retention_hours: u64) -> PyResult> { - match dry_run { - true => Ok(self - ._table - .vacuum_dry_run(retention_hours) - .map_err(PyDeltaTableError::from_raw)?), - false => unimplemented!("Only Vacuum with dry_run is available."), - } + /// Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. + pub fn vacuum(&mut self, dry_run: bool, retention_hours: u64) -> PyResult> { + rt()? + .block_on(self._table.vacuum(retention_hours, dry_run)) + .map_err(PyDeltaTableError::from_raw) } pub fn arrow_schema_json(&self) -> PyResult { diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 744eb8f6dc..cc7ea50f40 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -41,11 +41,6 @@ def test_vacuum_dry_run_simple_table(): == "Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)" ) - retention_periods = 167 - with pytest.raises(Exception) as exception: - dt.vacuum(retention_periods, dry_run=False) - assert str(exception.value) == "Only Vacuum with dry_run is available." - def test_read_partitioned_table_metadata(): table_path = "../rust/tests/data/delta-0.8.0-partitioned" diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 7ffac173b7..ad914d9508 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -610,9 +610,11 @@ impl DeltaTable { self.state.min_writer_version } - /// Run the dry run of the Vacuum command on the Delta Table: list files no longer referenced by a Delta table and are older than the retention threshold. - /// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed. - pub fn vacuum_dry_run(&self, retention_hours: u64) -> Result, DeltaTableError> { + /// List files no longer referenced by a Delta table and are older than the retention threshold. + fn get_stale_files( + &self, + retention_hours: u64, + ) -> Result, DeltaTableError> { if retention_hours < 168 { return Err(DeltaTableError::InvalidVacuumRetentionPeriod); } @@ -627,8 +629,45 @@ impl DeltaTable { .get_tombstones() .iter() .filter(|tombstone| tombstone.deletionTimestamp < delete_before_timestamp) + .collect()) + } + + /// Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold. + /// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed. + pub async fn vacuum( + &mut self, + retention_hours: u64, + dry_run: bool, + ) -> Result, DeltaTableError> { + let tombstones = self.get_stale_files(retention_hours)?; + let tombstones_path = tombstones + .iter() .map(|tombstone| self.storage.join_path(&self.table_path, &tombstone.path)) - .collect::>()) + .collect::>(); + + if dry_run { + return Ok(tombstones_path); + } + + let mut deleted_tombstones = vec![]; + for tombstone in tombstones { + match self + .storage + .delete_obj(&self.storage.join_path(&self.table_path, &tombstone.path)) + .await + { + Ok(_) => deleted_tombstones.push(tombstone.clone()), + Err(StorageError::NotFound) => deleted_tombstones.push(tombstone.clone()), + Err(err) => return Err(DeltaTableError::StorageError { source: err }), + } + } + + self.state + .tombstones + .retain(|tombstone| !deleted_tombstones.contains(tombstone)); + //TODO: Commit the new state + + Ok(tombstones_path) } pub fn schema(&self) -> Option<&Schema> { diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index eaafc295e8..4781b8b96e 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -193,28 +193,29 @@ async fn read_delta_8_0_table_with_partitions() { #[tokio::test] async fn vacuum_delta_8_0_table() { - let table = deltalake::open_table("./tests/data/delta-0.8.0") + let mut table = deltalake::open_table("./tests/data/delta-0.8.0") .await .unwrap(); - let retention_hours = 169; + let retention_hours = 1; let backend = FileStorageBackend::new("./tests/data/delta-0.8.0"); + let dry_run = true; + + assert!(matches!( + table.vacuum(retention_hours, dry_run).await.unwrap_err(), + deltalake::DeltaTableError::InvalidVacuumRetentionPeriod, + )); + + let retention_hours = 169; assert_eq!( - table.vacuum_dry_run(retention_hours).unwrap(), + table.vacuum(retention_hours, dry_run).await.unwrap(), vec![backend.join_path( "./tests/data/delta-0.8.0", "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet" )] ); - let retention_hours = 1; - - assert!(matches!( - table.vacuum_dry_run(retention_hours).unwrap_err(), - deltalake::DeltaTableError::InvalidVacuumRetentionPeriod, - )); - let retention_hours = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() @@ -222,5 +223,5 @@ async fn vacuum_delta_8_0_table() { / 3600; let empty: Vec = Vec::new(); - assert_eq!(table.vacuum_dry_run(retention_hours).unwrap(), empty); + assert_eq!(table.vacuum(retention_hours, dry_run).await.unwrap(), empty); }