Skip to content

Commit

Permalink
Spark 3.5: Select for rewriting the files belonging to old partitioni…
Browse files Browse the repository at this point in the history
…ng schemas
  • Loading branch information
adrians committed Jan 24, 2025
1 parent 026a9b0 commit 486cc1f
Showing 1 changed file with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,20 @@ protected Iterable<FileScanTask> filterFiles(Iterable<FileScanTask> tasks) {
}

private boolean shouldRewrite(FileScanTask task) {
return wronglySized(task) || tooManyDeletes(task) || tooHighDeleteRatio(task);
return wronglySized(task)
|| tooManyDeletes(task)
|| tooHighDeleteRatio(task)
|| oldPartitioning(task);
}

private boolean tooManyDeletes(FileScanTask task) {
return task.deletes() != null && task.deletes().size() >= deleteFileThreshold;
}

private boolean oldPartitioning(FileScanTask task) {
return task.file().specId() != table().spec().specId();
}

@Override
protected Iterable<List<FileScanTask>> filterFileGroups(List<List<FileScanTask>> groups) {
return Iterables.filter(groups, this::shouldRewrite);
Expand All @@ -92,7 +99,12 @@ private boolean shouldRewrite(List<FileScanTask> group) {
|| enoughContent(group)
|| tooMuchContent(group)
|| anyTaskHasTooManyDeletes(group)
|| anyTaskHasTooHighDeleteRatio(group);
|| anyTaskHasTooHighDeleteRatio(group)
|| anyTaskHasOldPartitioning(group);
}

private boolean anyTaskHasOldPartitioning(List<FileScanTask> group) {
return group.stream().anyMatch(this::oldPartitioning);
}

private boolean anyTaskHasTooManyDeletes(List<FileScanTask> group) {
Expand Down

0 comments on commit 486cc1f

Please sign in to comment.