Skip to content

Commit

Permalink
Spark: Disable rewriting position deletes for V3 tables
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jan 24, 2025
1 parent 026a9b0 commit 561b6c5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.ImmutableRewritePositionDeleteFiles;
import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.actions.RewritePositionDeletesCommitManager;
Expand Down Expand Up @@ -402,6 +403,9 @@ private void validateAndInitOptions() {
PARTIAL_PROGRESS_MAX_COMMITS,
maxCommits,
PARTIAL_PROGRESS_ENABLED);

Preconditions.checkArgument(
TableUtil.formatVersion(table) <= 2, "Cannot rewrite position deletes for V3 table");
}

private String jobDesc(RewritePositionDeletesGroup group, RewriteExecutionContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,22 @@ public void testRewriteManyColumns() throws Exception {
assertEquals("Position deletes", expectedDeletes, actualDeletes);
}

@TestTemplate
public void testRewritePositionDeletesForV3TableFails() {
Table table =
validationCatalog.createTable(
TableIdentifier.of("default", TABLE_NAME),
SCHEMA,
PartitionSpec.unpartitioned(),
tableProperties(3));

writeRecords(table, 2, SCALE);

assertThatThrownBy(() -> SparkActions.get(spark).rewritePositionDeletes(table).execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot rewrite position deletes for V3 table");
}

private Table createTablePartitioned(int partitions, int files, int numRecords) {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build();
Table table =
Expand All @@ -751,11 +767,15 @@ private Table createTableUnpartitioned(int files, int numRecords) {
}

private Map<String, String> tableProperties() {
return tableProperties(2);
}

private Map<String, String> tableProperties(int formatVersion) {
return ImmutableMap.of(
TableProperties.DEFAULT_WRITE_METRICS_MODE,
"full",
TableProperties.FORMAT_VERSION,
"2",
String.valueOf(formatVersion),
TableProperties.DEFAULT_FILE_FORMAT,
format.toString(),
TableProperties.DELETE_GRANULARITY,
Expand Down

0 comments on commit 561b6c5

Please sign in to comment.