-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Managed Iceberg] Allow updating partition specs at runtime #32879
base: master
Are you sure you want to change the base?
Changes from all commits
98d6f0e
806e13d
1ad9f3d
3ee46c6
baba789
40cdde1
602a2fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
{ | ||
"comment": "Modify this file in a trivial way to cause this test suite to run", | ||
"modification": 4 | ||
"modification": 3 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,12 @@ | |
*/ | ||
package org.apache.beam.sdk.io.iceberg; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.UUID; | ||
import org.apache.beam.sdk.coders.KvCoder; | ||
import org.apache.beam.sdk.coders.StringUtf8Coder; | ||
import org.apache.beam.sdk.metrics.Counter; | ||
|
@@ -29,14 +35,21 @@ | |
import org.apache.beam.sdk.transforms.SerializableFunction; | ||
import org.apache.beam.sdk.transforms.WithKeys; | ||
import org.apache.beam.sdk.transforms.windowing.BoundedWindow; | ||
import org.apache.beam.sdk.util.Preconditions; | ||
import org.apache.beam.sdk.values.KV; | ||
import org.apache.beam.sdk.values.PCollection; | ||
import org.apache.iceberg.AppendFiles; | ||
import org.apache.iceberg.DataFile; | ||
import org.apache.iceberg.FileFormat; | ||
import org.apache.iceberg.ManifestFiles; | ||
import org.apache.iceberg.ManifestWriter; | ||
import org.apache.iceberg.PartitionSpec; | ||
import org.apache.iceberg.Snapshot; | ||
import org.apache.iceberg.Table; | ||
import org.apache.iceberg.catalog.Catalog; | ||
import org.apache.iceberg.catalog.TableIdentifier; | ||
import org.apache.iceberg.io.FileIO; | ||
import org.apache.iceberg.io.OutputFile; | ||
import org.checkerframework.checker.nullness.qual.MonotonicNonNull; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -45,9 +58,11 @@ class AppendFilesToTables | |
extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, SnapshotInfo>>> { | ||
private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class); | ||
private final IcebergCatalogConfig catalogConfig; | ||
private final String manifestFilePrefix; | ||
|
||
AppendFilesToTables(IcebergCatalogConfig catalogConfig) { | ||
AppendFilesToTables(IcebergCatalogConfig catalogConfig, String manifestFilePrefix) { | ||
this.catalogConfig = catalogConfig; | ||
this.manifestFilePrefix = manifestFilePrefix; | ||
} | ||
|
||
@Override | ||
|
@@ -67,27 +82,27 @@ public String apply(FileWriteResult input) { | |
.apply("Group metadata updates by table", GroupByKey.create()) | ||
.apply( | ||
"Append metadata updates to tables", | ||
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig))) | ||
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig, manifestFilePrefix))) | ||
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER)); | ||
} | ||
|
||
private static class AppendFilesToTablesDoFn | ||
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> { | ||
private final Counter snapshotsCreated = | ||
Metrics.counter(AppendFilesToTables.class, "snapshotsCreated"); | ||
private final Counter dataFilesCommitted = | ||
Metrics.counter(AppendFilesToTables.class, "dataFilesCommitted"); | ||
private final Distribution committedDataFileByteSize = | ||
Metrics.distribution(RecordWriter.class, "committedDataFileByteSize"); | ||
private final Distribution committedDataFileRecordCount = | ||
Metrics.distribution(RecordWriter.class, "committedDataFileRecordCount"); | ||
|
||
private final IcebergCatalogConfig catalogConfig; | ||
private final String manifestFilePrefix; | ||
|
||
private transient @MonotonicNonNull Catalog catalog; | ||
|
||
private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) { | ||
private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig, String manifestFilePrefix) { | ||
this.catalogConfig = catalogConfig; | ||
this.manifestFilePrefix = manifestFilePrefix; | ||
} | ||
|
||
private Catalog getCatalog() { | ||
|
@@ -97,36 +112,104 @@ private Catalog getCatalog() { | |
return catalog; | ||
} | ||
|
||
private boolean containsMultiplePartitionSpecs(Iterable<FileWriteResult> fileWriteResults) { | ||
int id = fileWriteResults.iterator().next().getSerializableDataFile().getPartitionSpecId(); | ||
for (FileWriteResult result : fileWriteResults) { | ||
if (id != result.getSerializableDataFile().getPartitionSpecId()) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
@ProcessElement | ||
public void processElement( | ||
@Element KV<String, Iterable<FileWriteResult>> element, | ||
OutputReceiver<KV<String, SnapshotInfo>> out, | ||
BoundedWindow window) { | ||
BoundedWindow window) | ||
throws IOException { | ||
String tableStringIdentifier = element.getKey(); | ||
Iterable<FileWriteResult> fileWriteResults = element.getValue(); | ||
if (!fileWriteResults.iterator().hasNext()) { | ||
return; | ||
} | ||
|
||
Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); | ||
|
||
// vast majority of the time, we will simply append data files. | ||
// in the rare case we get a batch that contains multiple partition specs, we will group | ||
// data into manifest files and append. | ||
// note: either way, we must use a single commit operation for atomicity. | ||
if (containsMultiplePartitionSpecs(fileWriteResults)) { | ||
appendManifestFiles(table, fileWriteResults); | ||
} else { | ||
appendDataFiles(table, fileWriteResults); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we return the AppendFiles update and commit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any particular reason? i'm leaning towards keeping it this way for simplicity There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep reduce duplicated code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but it's not a big duplication issue, so you can discard |
||
} | ||
|
||
Snapshot snapshot = table.currentSnapshot(); | ||
LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot); | ||
snapshotsCreated.inc(); | ||
out.outputWithTimestamp( | ||
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp()); | ||
} | ||
|
||
// This works only when all files are using the same partition spec. | ||
private void appendDataFiles(Table table, Iterable<FileWriteResult> fileWriteResults) { | ||
AppendFiles update = table.newAppend(); | ||
long numFiles = 0; | ||
for (FileWriteResult result : fileWriteResults) { | ||
DataFile dataFile = result.getDataFile(table.spec()); | ||
DataFile dataFile = result.getDataFile(table.specs()); | ||
update.appendFile(dataFile); | ||
committedDataFileByteSize.update(dataFile.fileSizeInBytes()); | ||
committedDataFileRecordCount.update(dataFile.recordCount()); | ||
numFiles++; | ||
} | ||
// this commit will create a ManifestFile. we don't need to manually create one. | ||
update.commit(); | ||
dataFilesCommitted.inc(numFiles); | ||
} | ||
|
||
Snapshot snapshot = table.currentSnapshot(); | ||
LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot); | ||
snapshotsCreated.inc(); | ||
out.outputWithTimestamp( | ||
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp()); | ||
// When a user updates their table partition spec during runtime, we can end up with | ||
// a batch of files where some are written with the old spec and some are written with the new | ||
// spec. | ||
// A table commit is limited to a single partition spec. | ||
// To handle this, we create a manifest file for each partition spec, and group data files | ||
// accordingly. | ||
// Afterward, we append all manifests using a single commit operation. | ||
private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWriteResults) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a long method 😲 |
||
throws IOException { | ||
String uuid = UUID.randomUUID().toString(); | ||
Map<Integer, PartitionSpec> specs = table.specs(); | ||
|
||
Map<Integer, List<DataFile>> dataFilesBySpec = new HashMap<>(); | ||
for (FileWriteResult result : fileWriteResults) { | ||
ahmedabu98 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
DataFile dataFile = result.getDataFile(specs); | ||
dataFilesBySpec.computeIfAbsent(dataFile.specId(), i -> new ArrayList<>()).add(dataFile); | ||
} | ||
|
||
AppendFiles update = table.newAppend(); | ||
for (Map.Entry<Integer, List<DataFile>> entry : dataFilesBySpec.entrySet()) { | ||
int specId = entry.getKey(); | ||
List<DataFile> files = entry.getValue(); | ||
PartitionSpec spec = Preconditions.checkStateNotNull(specs.get(specId)); | ||
ManifestWriter<DataFile> writer = | ||
createManifestWriter(table.location(), uuid, spec, table.io()); | ||
for (DataFile file : files) { | ||
writer.add(file); | ||
committedDataFileByteSize.update(file.fileSizeInBytes()); | ||
committedDataFileRecordCount.update(file.recordCount()); | ||
} | ||
writer.close(); | ||
update.appendManifest(writer.toManifestFile()); | ||
} | ||
update.commit(); | ||
} | ||
|
||
private ManifestWriter<DataFile> createManifestWriter( | ||
String tableLocation, String uuid, PartitionSpec spec, FileIO io) { | ||
String location = | ||
FileFormat.AVRO.addExtension( | ||
String.format( | ||
"%s/metadata/%s-%s-%s.manifest", | ||
tableLocation, manifestFilePrefix, uuid, spec.specId())); | ||
OutputFile outputFile = io.newOutputFile(location); | ||
return ManifestFiles.write(spec, outputFile); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather distinguish between partitioned and unpartitioned table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See Spark-Iceberg implmentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg allows us to use one commit operation that appends data files of different partitions. I think having a manifest file per partition might be a little overkill and unnecessary.
The only reason we need manifest files is to differentiate between two specs. Technically, this code path addresses an edge case, and realistically will only be used during the few moments after a table's spec gets updated -- in these moments, we will potentially have two specs in the same batch of files. So we'd need one manifest file for each spec.
I'm open to other perspectives though in case I'm missing something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. I got confused with something else.