Skip to content

Commit

Permalink
Benchmark updates, fixes for FasterKVEventStore.cs
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Dec 14, 2023
1 parent a6738aa commit e3d5241
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

<ItemGroup>
<ProjectReference Include="..\..\src\NexusMods.EventSourcing.Abstractions\NexusMods.EventSourcing.Abstractions.csproj" />
<ProjectReference Include="..\..\src\NexusMods.EventSourcing.FasterKV\NexusMods.EventSourcing.FasterKV.csproj" />
<ProjectReference Include="..\..\src\NexusMods.EventSourcing\NexusMods.EventSourcing.csproj" />
<ProjectReference Include="..\..\tests\NexusMods.EventSourcing.TestModel\NexusMods.EventSourcing.TestModel.csproj" />
</ItemGroup>
Expand Down
9 changes: 5 additions & 4 deletions benchmarks/NexusMods.EventSourcing.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
using BenchmarkDotNet.Running;
using NexusMods.EventSourcing;
using NexusMods.EventSourcing.Benchmarks;
using NexusMods.EventSourcing.FasterKV;
using NexusMods.EventSourcing.TestModel;


#if DEBUG
var readBenchmarks = new ReadBenchmarks();
readBenchmarks.EventStoreType = typeof(InMemoryEventStore<EventSerializer>);
readBenchmarks.EventCount = 10;
readBenchmarks.EntityCount = 10;
await readBenchmarks.Setup();
readBenchmarks.EventStoreType = typeof(FasterKVEventStore<EventSerializer>);
readBenchmarks.EventCount = 10000;
readBenchmarks.EntityCount = 10000;
readBenchmarks.Setup();
readBenchmarks.ReadEvents();
#else
BenchmarkRunner.Run<ReadBenchmarks>();
Expand Down
20 changes: 15 additions & 5 deletions benchmarks/NexusMods.EventSourcing.Benchmarks/ReadBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,29 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.FasterKV;
using NexusMods.EventSourcing.TestModel;
using NexusMods.EventSourcing.TestModel.Events;
using NexusMods.EventSourcing.TestModel.Model;
using NexusMods.Paths;

namespace NexusMods.EventSourcing.Benchmarks;

[MemoryDiagnoser]
public class ReadBenchmarks
{
private readonly IServiceProvider _services;
private InMemoryEventStore<EventSerializer> _eventStore = null!;
private IEventStore _eventStore = null!;
private EntityId<Loadout>[] _ids = Array.Empty<EntityId<Loadout>>();

[Params(typeof(InMemoryEventStore<EventSerializer>))]
[Params(typeof(InMemoryEventStore<EventSerializer>),
typeof(FasterKVEventStore<EventSerializer>))]
public Type EventStoreType { get; set; } = typeof(InMemoryEventStore<EventSerializer>);

[Params(100, 10000)]
[Params(100, 1000)]
public int EventCount { get; set; }

[Params(100, 10000)]
[Params(100, 1000)]
public int EntityCount { get; set; }

public ReadBenchmarks()
Expand All @@ -46,6 +49,14 @@ public void Setup()
{
_eventStore = new InMemoryEventStore<EventSerializer>(_services.GetRequiredService<EventSerializer>());
}
else if (EventStoreType == typeof(FasterKVEventStore<EventSerializer>))
{
_eventStore = new FasterKVEventStore<EventSerializer>(_services.GetRequiredService<EventSerializer>(),
new Settings
{
StorageLocation = FileSystem.Shared.GetKnownPath(KnownPath.EntryDirectory).Combine("FasterKV.EventStore" + Guid.NewGuid())
});
}
else
{
throw new NotSupportedException($"EventStoreType '{EventStoreType}' is not supported.");
Expand All @@ -67,7 +78,6 @@ public void Setup()
_eventStore.Add(new RenameLoadout(_ids[e], $"Loadout {e} {ev}")).GetAwaiter().GetResult();
}
}

}

[Benchmark]
Expand Down
129 changes: 122 additions & 7 deletions src/NexusMods.EventSourcing.FasterKV/FasterKVEventStore.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,144 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;
using NexusMods.EventSourcing.Abstractions;
using Reloaded.Memory;

namespace NexusMods.EventSourcing.FasterKV;

public class FasterKVEventStore<TSerializer> : IEventStore
where TSerializer : IEventSerializer
{
private readonly FasterKVSettings<SpanByteAndMemory, SpanByteAndMemory> _settings;
private readonly FasterKV<SpanByteAndMemory,SpanByteAndMemory> _kvStore;
private readonly FasterKVSettings<byte[], byte[]> _settings;
private readonly FasterKV<byte[], byte[]> _kvStore;
private TransactionId _tx;
private readonly TSerializer _serializer;
private readonly SimpleFunctions<byte[], byte[]> _functions;
private readonly Task _timer;


public FasterKVEventStore(TSerializer serializer, Settings settings)
{
_settings = new FasterKVSettings<SpanByteAndMemory, SpanByteAndMemory>(settings.StorageLocation.ToString());
_kvStore = new FasterKV<SpanByteAndMemory, SpanByteAndMemory>(_settings);
_serializer = serializer;
_settings = new FasterKVSettings<byte[], byte[]>(settings.StorageLocation.ToString());
_kvStore = new FasterKV<byte[], byte[]>(_settings);
_tx = TransactionId.From(0);

_functions = new SimpleFunctions<byte[], byte[]>((input, oldValue) =>
{
var newMemory = new byte[oldValue.Length + input.Length];
Array.Copy(oldValue, newMemory, oldValue.Length);
Array.Copy(input, 0, newMemory, oldValue.Length, input.Length);
return newMemory;
});

_timer = Task.Run(async () =>
{
await CheckPoint();
await Task.Delay(TimeSpan.FromSeconds(10));
});

}

public async Task CheckPoint()
{
var sw = Stopwatch.StartNew();
await _kvStore.TakeFullCheckpointAsync(CheckpointType.Snapshot);
Console.WriteLine($"Checkpoint took {sw.ElapsedMilliseconds}ms");
}

public async ValueTask Flush()
{
await _kvStore.CompleteCheckpointAsync();
}

public async ValueTask Add<T>(T eventEntity) where T : IEvent
{
using var session = _kvStore.NewSession(_functions);
WriteInner(eventEntity, session);
}

private void WriteInner<T>(T eventEntity, ClientSession<byte[], byte[], byte[], byte[], Empty, IFunctions<byte[], byte[], byte[], byte[], Empty>> session) where T : IEvent
{
_tx = _tx.Next();
var ingester = new ModifiedEntityLogger();
eventEntity.Apply(ingester);
{
var key = new byte[16];
var value = new byte[8];
BinaryPrimitives.WriteUInt64BigEndian(value, _tx.Value);

foreach (var entityId in ingester.Entities)
{
entityId.Value.TryWriteBytes(key);
session.RMW(ref key, ref value);
}

var eventBytes = _serializer.Serialize(eventEntity);
var eventKey = new byte[8];
BinaryPrimitives.WriteUInt64BigEndian(eventKey, _tx.Value);
session.RMW(ref eventKey, ref eventBytes);
}
}

public ValueTask Add<T>(T eventEntity) where T : IEvent

/// <summary>
/// Simplistic context that just logs the entities that were modified.
/// </summary>
private readonly struct ModifiedEntityLogger() : IEventContext
{
throw new System.NotImplementedException();
public readonly HashSet<EntityId> Entities = new();
public void Emit<TOwner, TVal>(EntityId<TOwner> entity, AttributeDefinition<TOwner, TVal> attr, TVal value) where TOwner : IEntity
{
Entities.Add(entity.Value);
}

public void Emit<TOwner, TVal>(EntityId<TOwner> entity, MultiEntityAttributeDefinition<TOwner, TVal> attr, EntityId<TVal> value) where TOwner : IEntity where TVal : IEntity
{
Entities.Add(entity.Value);
}

public void Retract<TOwner, TVal>(EntityId<TOwner> entity, AttributeDefinition<TOwner, TVal> attr, TVal value) where TOwner : IEntity
{
Entities.Add(entity.Value);
}

public void Retract<TOwner, TVal>(EntityId<TOwner> entity, MultiEntityAttributeDefinition<TOwner, TVal> attr, EntityId<TVal> value) where TOwner : IEntity where TVal : IEntity
{
Entities.Add(entity.Value);
}

public void New<TType>(EntityId<TType> id) where TType : IEntity
{
Entities.Add(id.Value);
}
}

public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester) where TIngester : IEventIngester
{
throw new System.NotImplementedException();
var key = new byte[16];
entityId.Value.TryWriteBytes(key);
using var session = _kvStore.NewSession(_functions);
var value = Array.Empty<byte>();
var result = session.Read(ref key, ref value);
Debug.Assert(result.Found);


for (var idx = 0; idx < (value.Length / 8); idx += 8)
{
var span = value.AsSpan(idx, 8).ToArray();
var eventArray = Array.Empty<byte>();
var eventResult = session.Read(ref span, ref eventArray);
Debug.Assert(eventResult.Found);

var evt = _serializer.Deserialize(eventArray);
ingester.Ingest(evt);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
Expand Down
137 changes: 137 additions & 0 deletions src/NexusMods.EventSourcing.FasterKV/TransactionFunctions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using FASTER.core;
using NexusMods.EventSourcing.Abstractions;

namespace NexusMods.EventSourcing.FasterKV;

public class TransactionFunctions : IFunctions<SpanByteAndMemory, SpanByteAndMemory, IEvent, IEvent, Empty> {
public bool SingleReader(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory value, ref IEvent dst,
ref ReadInfo readInfo)
{
throw new System.NotImplementedException();
}

public bool ConcurrentReader(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory value, ref IEvent dst,
ref ReadInfo readInfo)
{
throw new System.NotImplementedException();
}

public void ReadCompletionCallback(ref SpanByteAndMemory key, ref IEvent input, ref IEvent output, Empty ctx, Status status,
RecordMetadata recordMetadata)
{
throw new System.NotImplementedException();
}

public bool SingleWriter(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory src, ref SpanByteAndMemory dst,
ref IEvent output, ref UpsertInfo upsertInfo, WriteReason reason)
{
throw new System.NotImplementedException();
}

public void PostSingleWriter(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory src, ref SpanByteAndMemory dst,
ref IEvent output, ref UpsertInfo upsertInfo, WriteReason reason)
{
throw new System.NotImplementedException();
}

public bool ConcurrentWriter(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory src, ref SpanByteAndMemory dst,
ref IEvent output, ref UpsertInfo upsertInfo)
{
throw new System.NotImplementedException();
}

public bool NeedInitialUpdate(ref SpanByteAndMemory key, ref IEvent input, ref IEvent output, ref RMWInfo rmwInfo)
{
throw new System.NotImplementedException();
}

public bool InitialUpdater(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory value, ref IEvent output,
ref RMWInfo rmwInfo)
{
throw new System.NotImplementedException();
}

public void PostInitialUpdater(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory value, ref IEvent output,
ref RMWInfo rmwInfo)
{
throw new System.NotImplementedException();
}

public bool NeedCopyUpdate(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory oldValue, ref IEvent output,
ref RMWInfo rmwInfo)
{
throw new System.NotImplementedException();
}

public bool CopyUpdater(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory oldValue,
ref SpanByteAndMemory newValue, ref IEvent output, ref RMWInfo rmwInfo)
{
throw new System.NotImplementedException();
}

public void PostCopyUpdater(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory oldValue,
ref SpanByteAndMemory newValue, ref IEvent output, ref RMWInfo rmwInfo)
{
throw new System.NotImplementedException();
}

public bool InPlaceUpdater(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory value, ref IEvent output,
ref RMWInfo rmwInfo)
{
throw new System.NotImplementedException();
}

public void RMWCompletionCallback(ref SpanByteAndMemory key, ref IEvent input, ref IEvent output, Empty ctx, Status status,
RecordMetadata recordMetadata)
{
throw new System.NotImplementedException();
}

public bool SingleDeleter(ref SpanByteAndMemory key, ref SpanByteAndMemory value, ref DeleteInfo deleteInfo)
{
throw new System.NotImplementedException();
}

public void PostSingleDeleter(ref SpanByteAndMemory key, ref DeleteInfo deleteInfo)
{
throw new System.NotImplementedException();
}

public bool ConcurrentDeleter(ref SpanByteAndMemory key, ref SpanByteAndMemory value, ref DeleteInfo deleteInfo)
{
throw new System.NotImplementedException();
}

public void DisposeSingleWriter(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory src,
ref SpanByteAndMemory dst, ref IEvent output, ref UpsertInfo upsertInfo, WriteReason reason)
{
throw new System.NotImplementedException();
}

public void DisposeCopyUpdater(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory oldValue,
ref SpanByteAndMemory newValue, ref IEvent output, ref RMWInfo rmwInfo)
{
throw new System.NotImplementedException();
}

public void DisposeInitialUpdater(ref SpanByteAndMemory key, ref IEvent input, ref SpanByteAndMemory value, ref IEvent output,
ref RMWInfo rmwInfo)
{
throw new System.NotImplementedException();
}

public void DisposeSingleDeleter(ref SpanByteAndMemory key, ref SpanByteAndMemory value, ref DeleteInfo deleteInfo)
{
throw new System.NotImplementedException();
}

public void DisposeDeserializedFromDisk(ref SpanByteAndMemory key, ref SpanByteAndMemory value)
{
throw new System.NotImplementedException();
}

public void CheckpointCompletionCallback(int sessionID, string sessionName, CommitPoint commitPoint)
{
throw new System.NotImplementedException();
}
}
1 change: 1 addition & 0 deletions src/NexusMods.EventSourcing/EventSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand Down

0 comments on commit e3d5241

Please sign in to comment.