Skip to content

Commit

Permalink
Core: Change RemoveSnapshots to remove unused schemas
Browse files Browse the repository at this point in the history
Similarly to removing partition specs, this PR improves RemoveSnapshots
to also remove unused schemas.
  • Loading branch information
gaborkaszab committed Jan 24, 2025
1 parent f7d40f0 commit 0884ada
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 62 deletions.
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class RemoveSchemas implements MetadataUpdate {
private final Set<Integer> schemaIds;

public RemoveSchemas(Set<Integer> schemaIds) {
this.schemaIds = schemaIds;
}

public Set<Integer> schemaIds() {
return schemaIds;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removeSchemas(schemaIds);
}
}

class AddSortOrder implements MetadataUpdate {
private final UnboundSortOrder sortOrder;

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private MetadataUpdateParser() {}
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
static final String REMOVE_SCHEMAS = "remove-schemas";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -130,6 +131,9 @@ private MetadataUpdateParser() {}
// RemovePartitionSpecs
private static final String SPEC_IDS = "spec-ids";

// RemoveSchemas
private static final String SCHEMA_IDS = "schema-ids";

private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand All @@ -154,6 +158,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.put(MetadataUpdate.RemoveSchemas.class, REMOVE_SCHEMAS)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -249,6 +254,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
break;
case REMOVE_SCHEMAS:
writeRemoveSchemas((MetadataUpdate.RemoveSchemas) metadataUpdate, generator);
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -322,6 +330,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemovePartitionSpecs(jsonNode);
case REMOVE_SCHEMAS:
return readRemoveSchemas(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down Expand Up @@ -462,6 +472,11 @@ private static void writeRemovePartitionSpecs(
JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen);
}

private static void writeRemoveSchemas(
MetadataUpdate.RemoveSchemas metadataUpdate, JsonGenerator gen) throws IOException {
JsonUtil.writeIntegerArray(SCHEMA_IDS, metadataUpdate.schemaIds(), gen);
}

private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
Expand Down Expand Up @@ -615,4 +630,8 @@ private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) {
return new MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node));
}

private static MetadataUpdate readRemoveSchemas(JsonNode node) {
return new MetadataUpdate.RemoveSchemas(JsonUtil.getIntegerSet(SCHEMA_IDS, node));
}
}
21 changes: 16 additions & 5 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,23 +218,34 @@ private TableMetadata internalApply() {
updatedMetaBuilder.removeSnapshots(idsToRemove);

if (cleanExpiredMetadata) {
// TODO: Support cleaning expired schema as well.
Set<Integer> reachableSpecs = Sets.newConcurrentHashSet();
reachableSpecs.add(base.defaultSpecId());
Set<Integer> reachableSchemas = Sets.newConcurrentHashSet();
reachableSchemas.add(base.currentSchemaId());

Tasks.foreach(idsToRetain)
.executeWith(planExecutorService)
.run(
snapshot ->
base.snapshot(snapshot).allManifests(ops.io()).stream()
.map(ManifestFile::partitionSpecId)
.forEach(reachableSpecs::add));
snapshotId -> {
base.snapshot(snapshotId).allManifests(ops.io()).stream()
.map(ManifestFile::partitionSpecId)
.forEach(reachableSpecs::add);
reachableSchemas.add(base.snapshot(snapshotId).schemaId());
});

Set<Integer> specsToRemove =
base.specs().stream()
.map(PartitionSpec::specId)
.filter(specId -> !reachableSpecs.contains(specId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeSpecs(specsToRemove);

Set<Integer> schemasToRemove =
base.schemas().stream()
.map(Schema::schemaId)
.filter(schemaId -> !reachableSchemas.contains(schemaId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeSchemas(schemasToRemove);
}

return updatedMetaBuilder.build();
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ public static class Builder {
private long lastSequenceNumber;
private int lastColumnId;
private int currentSchemaId;
private final List<Schema> schemas;
private List<Schema> schemas;
private int defaultSpecId;
private List<PartitionSpec> specs;
private int lastAssignedPartitionId;
Expand Down Expand Up @@ -1148,6 +1148,21 @@ Builder removeSpecs(Iterable<Integer> specIds) {
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));

return this;
}

Builder removeSchemas(Iterable<Integer> schemaIds) {
Set<Integer> schemaIdsToRemove = Sets.newHashSet(schemaIds);
Preconditions.checkArgument(
!schemaIdsToRemove.contains(currentSchemaId), "Cannot remove the current schema");

this.schemas =
schemas.stream()
.filter(s -> !schemaIdsToRemove.contains(s.schemaId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemoveSchemas(schemaIdsToRemove));

return this;
}

Expand Down
45 changes: 29 additions & 16 deletions core/src/main/java/org/apache/iceberg/UpdateRequirements.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ private Builder update(MetadataUpdate update) {
update((MetadataUpdate.SetDefaultSortOrder) update);
} else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
update((MetadataUpdate.RemovePartitionSpecs) update);
} else if (update instanceof MetadataUpdate.RemoveSchemas) {
update((MetadataUpdate.RemoveSchemas) update);
}

return this;
Expand Down Expand Up @@ -136,13 +138,7 @@ private void update(MetadataUpdate.AddSchema unused) {
}

private void update(MetadataUpdate.SetCurrentSchema unused) {
if (!setSchemaId) {
if (base != null && !isReplace) {
// require that the current schema has not changed
require(new UpdateRequirement.AssertCurrentSchemaID(base.currentSchemaId()));
}
this.setSchemaId = true;
}
requireCurrentSchemaNotChanged();
}

private void update(MetadataUpdate.AddPartitionSpec unused) {
Expand All @@ -156,13 +152,7 @@ private void update(MetadataUpdate.AddPartitionSpec unused) {
}

private void update(MetadataUpdate.SetDefaultPartitionSpec unused) {
if (!setSpecId) {
if (base != null && !isReplace) {
// require that the default spec has not changed
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
}
this.setSpecId = true;
}
requireDefaultPartitionSpecNotChanged();
}

private void update(MetadataUpdate.SetDefaultSortOrder unused) {
Expand All @@ -176,15 +166,38 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) {
}

private void update(MetadataUpdate.RemovePartitionSpecs unused) {
// require that the default partition spec has not changed
requireDefaultPartitionSpecNotChanged();

// require that no branches have changed, so that old specs won't be written.
requireNoBranchesChanged();
}

private void update(MetadataUpdate.RemoveSchemas unused) {
requireCurrentSchemaNotChanged();

// require that no branches have changed, so that old schemas won't be written.
requireNoBranchesChanged();
}

private void requireDefaultPartitionSpecNotChanged() {
if (!setSpecId) {
if (base != null && !isReplace) {
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
}
this.setSpecId = true;
}
}

// require that no branches have changed, so that old specs won't be written.
private void requireCurrentSchemaNotChanged() {
if (!setSchemaId) {
if (base != null && !isReplace) {
require(new UpdateRequirement.AssertCurrentSchemaID(base.currentSchemaId()));
}
this.setSchemaId = true;
}
}

private void requireNoBranchesChanged() {
if (base != null && !isReplace) {
base.refs()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,17 @@ public void testRemovePartitionSpec() {
.isEqualTo(json);
}

@Test
public void testRemoveSchemas() {
String action = MetadataUpdateParser.REMOVE_SCHEMAS;
String json = "{\"action\":\"remove-schemas\",\"schema-ids\":[1,2,3]}";
MetadataUpdate expected = new MetadataUpdate.RemoveSchemas(ImmutableSet.of(1, 2, 3));
assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
assertThat(MetadataUpdateParser.toJson(expected))
.as("Remove schemas should convert to the correct JSON value")
.isEqualTo(json);
}

public void assertEquals(
String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) {
switch (action) {
Expand Down Expand Up @@ -1032,6 +1043,11 @@ public void assertEquals(
(MetadataUpdate.RemovePartitionSpecs) expectedUpdate,
(MetadataUpdate.RemovePartitionSpecs) actualUpdate);
break;
case MetadataUpdateParser.REMOVE_SCHEMAS:
assertEqualsRemoveSchemas(
(MetadataUpdate.RemoveSchemas) expectedUpdate,
(MetadataUpdate.RemoveSchemas) actualUpdate);
break;
default:
fail("Unrecognized metadata update action: " + action);
}
Expand Down Expand Up @@ -1258,6 +1274,11 @@ private static void assertEqualsRemovePartitionSpecs(
assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds());
}

private static void assertEqualsRemoveSchemas(
MetadataUpdate.RemoveSchemas expected, MetadataUpdate.RemoveSchemas actual) {
assertThat(actual.schemaIds()).containsExactlyInAnyOrderElementsOf(expected.schemaIds());
}

private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId)
throws IOException {
File manifestList = File.createTempFile("manifests", null, temp.toFile());
Expand Down
31 changes: 31 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -1705,6 +1706,36 @@ public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException {
.containsExactly(dataBucketSpec.specId());
}

@TestTemplate
public void testRemoveSchemas() {
table.newAppend().appendFile(FILE_A).commit();

Set<String> expectedDeletedFiles = Sets.newHashSet();
expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation());

table.updateSchema().addColumn("extra_col1", Types.StringType.get()).commit();

table.newAppend().appendFile(FILE_B).commit();
expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation());

table.updateSchema().addColumn("extra_col2", Types.LongType.get()).deleteColumn("id").commit();

table.newAppend().appendFile(FILE_A2).commit();

assertThat(table.schemas().size()).isEqualTo(3);

Set<String> deletedFiles = Sets.newHashSet();
// Expire all snapshots except the current one. Also expire all schemas except the current one.
removeSnapshots(table)
.expireOlderThan(System.currentTimeMillis())
.cleanExpiredMetadata(true)
.deleteWith(deletedFiles::add)
.commit();

assertThat(deletedFiles).containsExactlyInAnyOrderElementsOf(expectedDeletedFiles);
assertThat(table.schemas().values()).containsExactly(table.schema());
}

private Set<String> manifestPaths(Snapshot snapshot, FileIO io) {
return snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet());
}
Expand Down
Loading

0 comments on commit 0884ada

Please sign in to comment.