diff --git a/.github/workflows/full.yml b/.github/workflows/full.yml index ba430f3c..8ac5e230 100644 --- a/.github/workflows/full.yml +++ b/.github/workflows/full.yml @@ -1,7 +1,7 @@ name: 'Full Workflow' env: - VERSION: 4.6.1 + VERSION: 4.6.2 ASM_VERSION: 4.0.0 on: diff --git a/src/Parquet.PerfRunner/Benchmarks/Classes.cs b/src/Parquet.PerfRunner/Benchmarks/Classes.cs new file mode 100644 index 00000000..b09a6ccf --- /dev/null +++ b/src/Parquet.PerfRunner/Benchmarks/Classes.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using BenchmarkDotNet.Attributes; +using Microsoft.Win32.SafeHandles; +using Parquet.Serialization; + +namespace Parquet.PerfRunner.Benchmarks { + + class Record { + public DateTime Timestamp { get; set; } + public string? EventName { get; set; } + public double MeterValue { get; set; } + } + + + [ShortRunJob] + [MeanColumn] + [MemoryDiagnoser] + [MarkdownExporter] + public class Classes { + private List? _testData; + private MemoryStream _ms = new MemoryStream(); + + [GlobalSetup] + public async Task SetUp() { + _testData = Enumerable.Range(0, 1_000).Select(i => new Record { + Timestamp = DateTime.UtcNow.AddSeconds(i), + EventName = i % 2 == 0 ? "on" : "off", + MeterValue = i + }).ToList(); + + await ParquetSerializer.SerializeAsync(_testData, _ms); + } + + + [Benchmark(Baseline = true)] + public async Task Serialise_Legacy() { + using var ms = new MemoryStream(); + await ParquetConvert.SerializeAsync(_testData, ms); + } + + [Benchmark] + public async Task Deserialise_Legacy() { + _ms.Position = 0; + await ParquetConvert.DeserializeAsync(_ms); + } + + [Benchmark] + public async Task Serialise() { + using var ms = new MemoryStream(); + await ParquetSerializer.SerializeAsync(_testData, ms); + } + + [Benchmark] + public async Task Deserialise() { + _ms.Position = 0; + await ParquetSerializer.DeserializeAsync(_ms); + } + } +} diff --git a/src/Parquet.PerfRunner/Program.cs b/src/Parquet.PerfRunner/Program.cs index a06b6bb3..0584f509 100644 --- a/src/Parquet.PerfRunner/Program.cs +++ b/src/Parquet.PerfRunner/Program.cs @@ -11,8 +11,14 @@ case "progression": VersionedBenchmark.Run(); break; + case "classes": + BenchmarkRunner.Run(); + break; } } else { //new VsParquetSharp().Main(); - await new DataTypes().NullableInts(); + //await new DataTypes().NullableInts(); + var c = new Classes(); + c.SetUp(); + c.Serialise(); } diff --git a/src/Parquet.Test/Serialisation/ParquetSerializerTest.cs b/src/Parquet.Test/Serialisation/ParquetSerializerTest.cs index 2ead00de..e3ac287c 100644 --- a/src/Parquet.Test/Serialisation/ParquetSerializerTest.cs +++ b/src/Parquet.Test/Serialisation/ParquetSerializerTest.cs @@ -118,6 +118,31 @@ public async Task Struct_WithNullProps_Serde() { Assert.Equivalent(data2, data); } + //[Fact] + public async Task Struct_With_NestedNulls_Serde() { + + var data = new List { + new AddressBookEntry { + FirstName = "Joe", + LastName = "Bloggs", + Address = new Address() { + City = null, + Country = null + } + } + }; + + // serialiser puts (null, 0) for Address.City, but should put (null, 1) + + using var ms = new MemoryStream(); + await ParquetSerializer.SerializeAsync(data, ms); + + ms.Position = 0; + IList data2 = await ParquetSerializer.DeserializeAsync(ms); + + XAssert.JsonEquivalent(data, data2); + } + [Fact] public async Task List_Structs_Serde() { var data = Enumerable.Range(0, 1_000).Select(i => new MovementHistory { diff --git a/src/Parquet/Globals.cs b/src/Parquet/Globals.cs index c9ac4eeb..2c2da47e 100644 --- a/src/Parquet/Globals.cs +++ b/src/Parquet/Globals.cs @@ -17,7 +17,7 @@ public static class Globals { /// public static readonly string GithubSha = "${GITHUB_SHA}"; - internal const string DataTypeEnumObsolete = "Please resort to using System.Type overloads. Will be removed in v6."; - internal const string ParquetConvertObsolete = "ParquetConvert was an experimental project and is not obsolete. Consider switching to ParquetSerializer which supports all data types, including nested ones, and is just superior. ParquetConvert will be removed in v6."; + internal const string DataTypeEnumObsolete = "Please resort to using System.Type overloads. Will be removed in v5."; + internal const string ParquetConvertObsolete = "ParquetConvert was an experimental project and is now obsolete. Consider switching to ParquetSerializer which supports all data types, including nested ones, and is just superior. ParquetConvert will be removed in v5."; } } diff --git a/src/Parquet/Serialization/Dremel/FieldAssemblerCompiler.cs b/src/Parquet/Serialization/Dremel/FieldAssemblerCompiler.cs index 7ba26ca4..0d7014d2 100644 --- a/src/Parquet/Serialization/Dremel/FieldAssemblerCompiler.cs +++ b/src/Parquet/Serialization/Dremel/FieldAssemblerCompiler.cs @@ -144,6 +144,7 @@ private static void Discover(Field field, out bool isRepeated) { (field.SchemaType == SchemaType.Data && field is DataField rdf && rdf.IsArray); } +#if DEBUG private static void InjectLevelDebug(string levelPropertyName, object value, int dataIdx, int dl, int rl, @@ -151,6 +152,7 @@ private static void InjectLevelDebug(string levelPropertyName, int[] rsm) { Console.WriteLine("debug"); } +#endif /// /// Transitions RSM for current RL iteration @@ -259,7 +261,7 @@ private Expression InjectLevel(Expression rootVar, Type rootType, Field[] levelF } else { if(isAtomic) { - // C#: dlDepth <= _dlVar? + // C#: dlDepth == _dlVar? iteration = Expression.IfThen( Expression.Equal(Expression.Constant(dlDepth), _dlVar), diff --git a/src/Parquet/Serialization/Dremel/FieldStriperCompiler.cs b/src/Parquet/Serialization/Dremel/FieldStriperCompiler.cs index 5fe4dbaf..b74f13b0 100644 --- a/src/Parquet/Serialization/Dremel/FieldStriperCompiler.cs +++ b/src/Parquet/Serialization/Dremel/FieldStriperCompiler.cs @@ -15,6 +15,8 @@ class FieldStriperCompiler { private readonly ParquetSchema _schema; private readonly DataField _df; + private readonly bool _hasRls; + private readonly bool _hasDls; // input parameters private readonly ParameterExpression _dfParam = Expression.Parameter(typeof(DataField), "df"); @@ -34,10 +36,14 @@ class FieldStriperCompiler { // currently iterated class element private readonly ParameterExpression _classElementVar = Expression.Variable(typeof(TClass), "curr"); + private static readonly Expression NullListOfInt = Expression.Convert(Expression.Constant(null), typeof(List)); + public FieldStriperCompiler(ParquetSchema schema, DataField df) { _schema = schema; _df = df; + _hasRls = _df.MaxRepetitionLevel > 0; + _hasDls = _df.MaxDefinitionLevel > 0; // _valuesListType = typeof(List<>).MakeGenericType(df.ClrType); @@ -82,38 +88,40 @@ private Expression WriteValue(ParameterExpression valueVar, // only need RL and DL-1 Expression.Block( - Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl - 1)), - Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar)), + _hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl - 1)) : Expression.Empty(), + _hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty()), // everything, but value must be non-null Expression.Block( Expression.Call(_valuesVar, _valuesListAddMethod, getNonNullValue), - Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)), - Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar))); + _hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)) : Expression.Empty(), + _hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty())); } else { // required atomics are simple - add value, RL and DL as is return Expression.Block( Expression.Call(_valuesVar, _valuesListAddMethod, valueVar), - Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)), - Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar)); + _hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)) : Expression.Empty(), + _hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty()); } } // non-atomics still need RL and DL dumped return Expression.Block( - Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)), - Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar)); + _hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)) : Expression.Empty(), + _hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty()); } private Expression WriteMissingValue(int dl, Expression currentRlVar) { return Expression.Block( - Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)), - Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar)); + _hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)) : Expression.Empty(), + _hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty()); } - private Expression WhileBody(Expression element, bool isAtomic, int dl, ParameterExpression currentRlVar, ParameterExpression seenFieldsVar, Field field, int rlDepth, Type elementType, List path) { + private Expression WhileBody(Expression element, bool isAtomic, int dl, ParameterExpression currentRlVar, + ParameterExpression seenFieldsVar, Field field, int rlDepth, Type elementType, List path) { + string suffix = field.Path.ToString().Replace(".", "_"); ParameterExpression chRepetitionLevelVar = Expression.Variable(typeof(int), $"chRepetitionLevel_{suffix}"); ParameterExpression valueVar = Expression.Variable(elementType, $"value_{suffix}"); @@ -127,13 +135,15 @@ private Expression WhileBody(Expression element, bool isAtomic, int dl, Paramete // L9-13 Expression.IfThenElse( // if seenFields.Contains(field.Path) - Expression.Call(seenFieldsVar, typeof(HashSet).GetMethod("Contains")!, Expression.Constant(field.Path.ToString())), + //Expression.Call(seenFieldsVar, typeof(HashSet).GetMethod("Contains")!, Expression.Constant(field.Path.ToString())), + Expression.IsTrue(seenFieldsVar), // chRepetitionLevelVar = treeDepth Expression.Assign(chRepetitionLevelVar, Expression.Constant(rlDepth)), // seenFields.Add(field.Path) - Expression.Call(seenFieldsVar, typeof(HashSet).GetMethod("Add")!, Expression.Constant(field.Path.ToString())) + //Expression.Call(seenFieldsVar, typeof(HashSet).GetMethod("Add")!, Expression.Constant(field.Path.ToString())) + Expression.Assign(seenFieldsVar, Expression.Constant(true)) ), // L14- @@ -195,13 +205,14 @@ private Expression DissectRecord( Expression levelProperty = Expression.Property(rootVar, levelPropertyName); Type levelPropertyType = rootType.GetProperty(levelPropertyName)!.PropertyType; ParameterExpression seenFieldsVar = Expression.Variable(typeof(HashSet), $"seenFieldsVar_{levelPropertyName}"); + ParameterExpression seenVar = Expression.Variable(typeof(bool), $"seen_{levelPropertyName}"); Expression extraBody; if(isRepeated) { Type elementType = ExtractElementTypeFromEnumerableType(levelPropertyType); Expression collection = levelProperty; ParameterExpression element = Expression.Variable(elementType, "element"); - Expression elementProcessor = WhileBody(element, isAtomic, dl, currentRlVar, seenFieldsVar, field, rlDepth, elementType, path); + Expression elementProcessor = WhileBody(element, isAtomic, dl, currentRlVar, seenVar, field, rlDepth, elementType, path); extraBody = elementProcessor.Loop(collection, elementType, element); // todo: if levelProperty (collection) is null, we need extra iteration with null value (which rep and def level?) @@ -212,12 +223,12 @@ private Expression DissectRecord( extraBody); } else { Expression element = levelProperty; - extraBody = WhileBody(element, isAtomic, dl, currentRlVar, seenFieldsVar, field, rlDepth, levelPropertyType, path); + extraBody = WhileBody(element, isAtomic, dl, currentRlVar, seenVar, field, rlDepth, levelPropertyType, path); } return Expression.Block( - new[] { seenFieldsVar }, - Expression.Assign(seenFieldsVar, Expression.New(typeof(HashSet))), + new[] { seenVar }, + Expression.Assign(seenVar, Expression.Constant(false)), extraBody); } @@ -236,16 +247,16 @@ public FieldStriper Compile() { // init 3 building blocks Expression.Block( Expression.Assign(_valuesVar, Expression.New(_valuesListType)), - Expression.Assign(_dlsVar, Expression.New(typeof(List))), - Expression.Assign(_rlsVar, Expression.New(typeof(List)))), + Expression.Assign(_dlsVar, _hasDls ? Expression.New(typeof(List)) : NullListOfInt), + Expression.Assign(_rlsVar, _hasRls ? Expression.New(typeof(List)) : NullListOfInt)), iterationLoop, // result: use triple to construct ShreddedColumn and return (last element in the block) Expression.New(ShreddedColumnConstructor, Expression.Call(_valuesVar, _valuesListType.GetMethod("ToArray")!), - _df.MaxDefinitionLevel == 0 ? Expression.Convert(Expression.Constant(null), typeof(List)) : _dlsVar, - _df.MaxRepetitionLevel == 0 ? Expression.Convert(Expression.Constant(null), typeof(List)) : _rlsVar) + _dlsVar, + _rlsVar) ); Func, ShreddedColumn> lambda = Expression diff --git a/src/Parquet/Serialization/ParquetSerializer.cs b/src/Parquet/Serialization/ParquetSerializer.cs index 604ccf68..931df081 100644 --- a/src/Parquet/Serialization/ParquetSerializer.cs +++ b/src/Parquet/Serialization/ParquetSerializer.cs @@ -17,6 +17,9 @@ namespace Parquet.Serialization { /// public static class ParquetSerializer { + private static readonly Dictionary _typeToStriper = new(); + private static readonly Dictionary _typeToAssembler = new(); + /// /// Serialize /// @@ -31,7 +34,14 @@ public static async Task SerializeAsync(IEnumerable objectI ParquetSerializerOptions? options = null, CancellationToken cancellationToken = default) { - Striper striper = new Striper(typeof(T).GetParquetSchema(false)); + Striper striper; + + if(_typeToStriper.TryGetValue(typeof(T), out object? boxedStriper)) { + striper = (Striper)boxedStriper; + } else { + striper = new Striper(typeof(T).GetParquetSchema(false)); + _typeToStriper[typeof(T)] = striper; + } bool append = options != null && options.Append; using(ParquetWriter writer = await ParquetWriter.CreateAsync(striper.Schema, destination, null, append, cancellationToken)) { @@ -86,7 +96,15 @@ public static async Task> DeserializeAsync(Stream source, CancellationToken cancellationToken = default) where T : new() { - Assembler asm = new Assembler(typeof(T).GetParquetSchema(true)); + Assembler asm; + + if(_typeToAssembler.TryGetValue(typeof(T), out object? boxedAssembler)) { + asm = (Assembler)boxedAssembler; + } else { + asm = new Assembler(typeof(T).GetParquetSchema(true)); + _typeToAssembler[typeof(T)] = asm; + } + var result = new List(); using ParquetReader reader = await ParquetReader.CreateAsync(source, new ParquetOptions { UnpackDefinitions = false });