Skip to content

Commit

Permalink
@snow SNOW-835618 Snowpipe Streaming: send uncompressed chunk length …
Browse files Browse the repository at this point in the history
…from SDK to GS
  • Loading branch information
sfc-gh-azagrebin authored and sfc-gh-tzhang committed Jul 24, 2023
1 parent 1a6b5b0 commit 6d03498
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ static <T> Blob constructBlobAndMetadata(
// The paddedChunkLength is used because it is the actual data size used for
// decompression and md5 calculation on server side.
.setChunkLength(paddedChunkLength)
.setUncompressedChunkLength((int) serializedChunk.chunkEstimatedUncompressedSize)
.setChannelList(serializedChunk.channelsMetadataList)
.setChunkMD5(md5)
.setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId())
Expand All @@ -132,13 +133,13 @@ static <T> Blob constructBlobAndMetadata(

logger.logInfo(
"Finish building chunk in blob={}, table={}, rowCount={}, startOffset={},"
+ " uncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={},"
+ " estimatedUncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={},"
+ " bdecVersion={}",
filePath,
firstChannelFlushContext.getFullyQualifiedTableName(),
serializedChunk.rowCount,
startOffset,
serializedChunk.chunkUncompressedSize,
serializedChunk.chunkEstimatedUncompressedSize,
paddedChunkLength,
encryptedCompressedChunkDataSize,
bdecVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ChunkMetadata {
private final String tableName;
private Long chunkStartOffset;
private final Integer chunkLength;
private final Integer uncompressedChunkLength;
private final List<ChannelMetadata> channels;
private final String chunkMD5;
private final EpInfo epInfo;
Expand All @@ -33,6 +34,9 @@ static class Builder {
private String tableName;
private Long chunkStartOffset;
private Integer chunkLength; // compressedChunkLength

private Integer uncompressedChunkLength;

private List<ChannelMetadata> channels;
private String chunkMD5;
private EpInfo epInfo;
Expand Down Expand Up @@ -62,6 +66,15 @@ Builder setChunkLength(Integer chunkLength) {
return this;
}

/**
* Currently we send estimated uncompressed size that is close to the actual parquet data size
* and mostly about user data but parquet encoding overhead may be slightly different.
*/
public Builder setUncompressedChunkLength(Integer uncompressedChunkLength) {
this.uncompressedChunkLength = uncompressedChunkLength;
return this;
}

Builder setChannelList(List<ChannelMetadata> channels) {
this.channels = channels;
return this;
Expand Down Expand Up @@ -110,6 +123,7 @@ private ChunkMetadata(Builder builder) {
this.tableName = builder.tableName;
this.chunkStartOffset = builder.chunkStartOffset;
this.chunkLength = builder.chunkLength;
this.uncompressedChunkLength = builder.uncompressedChunkLength;
this.channels = builder.channels;
this.chunkMD5 = builder.chunkMD5;
this.epInfo = builder.epInfo;
Expand Down Expand Up @@ -152,6 +166,11 @@ Integer getChunkLength() {
return chunkLength;
}

@JsonProperty("chunk_length_uncompressed")
public Integer getUncompressedChunkLength() {
return uncompressedChunkLength;
}

@JsonProperty("channels")
List<ChannelMetadata> getChannels() {
return this.channels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ class SerializationResult {
final List<ChannelMetadata> channelsMetadataList;
final Map<String, RowBufferStats> columnEpStatsMapCombined;
final long rowCount;
final float chunkUncompressedSize;
final float chunkEstimatedUncompressedSize;
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Map<String, RowBufferStats> columnEpStatsMapCombined,
long rowCount,
float chunkUncompressedSize,
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkUncompressedSize = chunkUncompressedSize;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private SerializationResult serializeFromParquetWriteBuffers(
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
float chunkUncompressedSize = 0f;
float chunkEstimatedUncompressedSize = 0f;
String firstChannelFullyQualifiedTableName = null;
Map<String, RowBufferStats> columnEpStatsMapCombined = null;
BdecParquetWriter mergedChannelWriter = null;
Expand Down Expand Up @@ -104,7 +104,7 @@ private SerializationResult serializeFromParquetWriteBuffers(
}

rowCount += data.getRowCount();
chunkUncompressedSize += data.getBufferSize();
chunkEstimatedUncompressedSize += data.getBufferSize();

logger.logDebug(
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}",
Expand All @@ -121,7 +121,7 @@ private SerializationResult serializeFromParquetWriteBuffers(
channelsMetadataList,
columnEpStatsMapCombined,
rowCount,
chunkUncompressedSize,
chunkEstimatedUncompressedSize,
mergedChunkData,
chunkMinMaxInsertTimeInMs);
}
Expand All @@ -131,7 +131,7 @@ private SerializationResult serializeFromJavaObjects(
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
float chunkUncompressedSize = 0f;
float chunkEstimatedUncompressedSize = 0f;
String firstChannelFullyQualifiedTableName = null;
Map<String, RowBufferStats> columnEpStatsMapCombined = null;
List<List<Object>> rows = null;
Expand Down Expand Up @@ -183,7 +183,7 @@ private SerializationResult serializeFromJavaObjects(
rows.addAll(data.getVectors().rows);

rowCount += data.getRowCount();
chunkUncompressedSize += data.getBufferSize();
chunkEstimatedUncompressedSize += data.getBufferSize();

logger.logDebug(
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={},"
Expand All @@ -206,7 +206,7 @@ private SerializationResult serializeFromJavaObjects(
channelsMetadataList,
columnEpStatsMapCombined,
rowCount,
chunkUncompressedSize,
chunkEstimatedUncompressedSize,
mergedData,
chunkMinMaxInsertTimeInMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,7 @@ public void testBlobBuilder() throws Exception {
.setOwningTableFromChannelContext(channel1.getChannelContext())
.setChunkStartOffset(0L)
.setChunkLength(dataSize)
.setUncompressedChunkLength(dataSize * 2)
.setChannelList(Collections.singletonList(channelMetadata))
.setChunkMD5("md5")
.setEncryptionKeyId(1234L)
Expand All @@ -867,7 +868,8 @@ public void testBlobBuilder() throws Exception {
Arrays.copyOfRange(blob, offset, offset += BLOB_TAG_SIZE_IN_BYTES),
StandardCharsets.UTF_8));
Assert.assertEquals(
bdecVersion, Arrays.copyOfRange(blob, offset, offset += BLOB_VERSION_SIZE_IN_BYTES)[0]);
bdecVersion.toByte(),
Arrays.copyOfRange(blob, offset, offset += BLOB_VERSION_SIZE_IN_BYTES)[0]);
long totalSize =
ByteBuffer.wrap(Arrays.copyOfRange(blob, offset, offset += BLOB_FILE_SIZE_SIZE_IN_BYTES))
.getLong();
Expand All @@ -892,6 +894,9 @@ public void testBlobBuilder() throws Exception {
Assert.assertEquals(chunkMetadata.getTableName(), map.get("table"));
Assert.assertEquals(chunkMetadata.getSchemaName(), map.get("schema"));
Assert.assertEquals(chunkMetadata.getDBName(), map.get("database"));
Assert.assertEquals(chunkMetadata.getChunkLength(), map.get("chunk_length"));
Assert.assertEquals(
chunkMetadata.getUncompressedChunkLength(), map.get("chunk_length_uncompressed"));
Assert.assertEquals(
Long.toString(chunkMetadata.getChunkStartOffset() - offset),
map.get("chunk_start_offset").toString());
Expand Down

0 comments on commit 6d03498

Please sign in to comment.