Skip to content

Commit

Permalink
ParquetSerializer support for append operations (#280)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloneguid authored Mar 23, 2023
1 parent 35d1758 commit 1a5dde2
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: 'Full Workflow'

env:
VERSION: 4.6.1
ASM_VERSION: 4.6.0
ASM_VERSION: 4.0.0

on:
push:
Expand Down
25 changes: 25 additions & 0 deletions docs/serialisation.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,31 @@ Similar to JSON [supported collection types](https://learn.microsoft.com/en-us/d
`*` Technically impossible.
`**` Technically possible, but not implemented yet.

## Appending to Files

`ParquetSerializer` supports appending data to an existing Parquet file. This can be useful when you have multiple batches of data that need to be written to the same file.

To use this feature, you need to set the `Append` flag to `true` in the `ParquetSerializerOptions` object that you pass to the `SerializeAsync` method. This will tell the library to append the data batch to the end of the file stream instead of overwriting it. For example:

```csharp
await ParquetSerializer.SerializeAsync(dataBatch, ms, new ParquetSerializerOptions { Append = true });
```

However, there is one caveat: you should not set the `Append` flag to `true` for the first batch of data that you write to a new file. This is because a Parquet file has a header and a footer that contain metadata about the schema and statistics of the data. If you try to append data to an empty file stream, you will get an `IOException` because there is no header or footer to read from. Therefore, you should always set the `Append` flag to `false` for the first batch (or not pass any options, which makes it `false` by default) and then switch it to `true` for subsequent batches. For example:

```csharp
// First batch
await ParquetSerializer.SerializeAsync(dataBatch1, ms, new ParquetSerializerOptions { Append = false });

// Second batch
await ParquetSerializer.SerializeAsync(dataBatch2, ms, new ParquetSerializerOptions { Append = true });

// Third batch
await ParquetSerializer.SerializeAsync(dataBatch3, ms, new ParquetSerializerOptions { Append = true });
```

By following this pattern, you can easily append data to a Parquet file using `ParquetSerializer`.

## FAQ

**Q.** Can I specify schema for serialisation/deserialisation.
Expand Down
36 changes: 36 additions & 0 deletions src/Parquet.Test/Serialisation/ParquetSerializerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,41 @@ public async Task Map_Simple_Serde() {
XAssert.JsonEquivalent(data, data2);

}

[Fact]
public async Task Append_reads_all_data() {
var data = Enumerable.Range(0, 1_000).Select(i => new Record {
Timestamp = DateTime.UtcNow.AddSeconds(i),
EventName = i % 2 == 0 ? "on" : "off",
MeterValue = i
}).ToList();

using var ms = new MemoryStream();

const int batchSize = 100;
for(int i = 0; i < data.Count; i += batchSize) {
List<Record> dataBatch = data.Skip(i).Take(batchSize).ToList();

ms.Position = 0;
await ParquetSerializer.SerializeAsync(dataBatch, ms, new ParquetSerializerOptions { Append = i > 0 });
}

ms.Position = 0;
IList<Record> data2 = await ParquetSerializer.DeserializeAsync<Record>(ms);

Assert.Equivalent(data2, data);
}

[Fact]
public async Task Append_to_new_file_fails() {
var data = Enumerable.Range(0, 10).Select(i => new Record {
Timestamp = DateTime.UtcNow.AddSeconds(i),
EventName = i % 2 == 0 ? "on" : "off",
MeterValue = i
}).ToList();

using var ms = new MemoryStream();
await Assert.ThrowsAsync<IOException>(async () => await ParquetSerializer.SerializeAsync(data, ms, new ParquetSerializerOptions { Append = true }));
}
}
}
6 changes: 5 additions & 1 deletion src/Parquet/Encodings/SchemaEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ static bool TryBuildList(List<Thrift.SchemaElement> schema,
//as we are skipping elements set path hint
Thrift.SchemaElement tseRepeatedGroup = schema[index + 1];
field.Path = new FieldPath(tseList.Name, tseRepeatedGroup.Name);
field.ThriftSchemaElement = se;
field.GroupSchemaElement = tseRepeatedGroup;
index += 2; //skip this element and child container
ownedChildren = 1; //we should get this element assigned back
Expand Down Expand Up @@ -237,7 +238,8 @@ static bool TryBuildMap(List<Thrift.SchemaElement> schema,
var map = new MapField(root.Name) {
Path = new FieldPath(root.Name, tseContainer.Name),
IsNullable = root.Repetition_type != FieldRepetitionType.REQUIRED,
GroupSchemaElement = tseContainer
GroupSchemaElement = tseContainer,
ThriftSchemaElement = root
};

index += 1;
Expand All @@ -261,6 +263,7 @@ static bool TryBuildStruct(List<Thrift.SchemaElement> schema,
ownedChildren = container.Num_children; //make then owned to receive in .Assign()
field = StructField.CreateWithNoElements(container.Name);
field.IsNullable = container.Repetition_type != FieldRepetitionType.REQUIRED;
field.ThriftSchemaElement = container;
return true;
}

Expand Down Expand Up @@ -304,6 +307,7 @@ static bool TryBuildStruct(List<Thrift.SchemaElement> schema,

df.IsNullable = isNullable;
df.IsArray = isArray;
df.ThriftSchemaElement = se;
f = df;

index++;
Expand Down
3 changes: 3 additions & 0 deletions src/Parquet/ParquetWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ private async Task PrepareFileAsync(bool append, CancellationToken cancellationT
if(!Stream.CanSeek)
throw new IOException("destination stream must be seekable for append operations.");

if(Stream.Length == 0)
throw new IOException($"you can only append to existing streams, but current stream is empty.");

await ValidateFileAsync();

Thrift.FileMetaData fileMeta = await ReadMetadataAsync(cancellationToken);
Expand Down
6 changes: 6 additions & 0 deletions src/Parquet/Schema/Field.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ internal List<string> GetNaturalChildPath(List<string> path) {
/// </summary>
internal string? ClrPropName { get; set; }

/// <summary>
/// Low-level thrift schema element corresponding to this high-level schema element.
/// Only set when reading files.
/// </summary>
public Thrift.SchemaElement? ThriftSchemaElement { get; internal set; }

internal virtual FieldPath? PathPrefix { set { } }

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion src/Parquet/Serialization/ParquetSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public static async Task<ParquetSchema> SerializeAsync<T>(IEnumerable<T> objectI

Striper<T> striper = new Striper<T>(typeof(T).GetParquetSchema(false));

using(ParquetWriter writer = await ParquetWriter.CreateAsync(striper.Schema, destination, null, false, cancellationToken)) {
bool append = options != null && options.Append;
using(ParquetWriter writer = await ParquetWriter.CreateAsync(striper.Schema, destination, null, append, cancellationToken)) {

if(options != null) {
writer.CompressionMethod = options.CompressionMethod;
Expand Down
8 changes: 7 additions & 1 deletion src/Parquet/Serialization/ParquetSerializerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ namespace Parquet.Serialization {
/// Parquet serializer options
/// </summary>
public class ParquetSerializerOptions {

/// <summary>
/// When set to true, appends to file by creating a new row group.
/// </summary>
public bool Append { get; set; } = false;

/// <summary>
/// Page compression method
/// </summary>
Expand All @@ -15,6 +21,6 @@ public class ParquetSerializerOptions {
/// Page compression level
/// </summary>

public CompressionLevel CompressionLevel = CompressionLevel.Optimal;
public CompressionLevel CompressionLevel { get; set; } = CompressionLevel.Optimal;
}
}

0 comments on commit 1a5dde2

Please sign in to comment.