From a057e25d951e72b7e03e5325a813ac4aca962e12 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Tue, 7 Jan 2025 20:50:53 +0100 Subject: [PATCH 1/4] Fix: Actually release EventFlow.Redis --- .../EventFlow.Redis.Tests.csproj | 25 ++++++------ Source/EventFlow.Redis/EventFlow.Redis.csproj | 38 +++++++++++-------- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/Source/EventFlow.Redis.Tests/EventFlow.Redis.Tests.csproj b/Source/EventFlow.Redis.Tests/EventFlow.Redis.Tests.csproj index 45e1be260..0220bd56c 100644 --- a/Source/EventFlow.Redis.Tests/EventFlow.Redis.Tests.csproj +++ b/Source/EventFlow.Redis.Tests/EventFlow.Redis.Tests.csproj @@ -1,19 +1,18 @@ - - net6.0 - enable - enable - + + net6.0;net6.0 + enable + enable + - - - + + + - - - - - + + + + diff --git a/Source/EventFlow.Redis/EventFlow.Redis.csproj b/Source/EventFlow.Redis/EventFlow.Redis.csproj index 9da4ff114..4f4d2eacb 100644 --- a/Source/EventFlow.Redis/EventFlow.Redis.csproj +++ b/Source/EventFlow.Redis/EventFlow.Redis.csproj @@ -1,22 +1,28 @@ - - net6.0 - enable - enable - + + netstandard2.1;net6.0;net8.0 + EventFlow + Async/await first CQRS+ES and DDD framework for .NET - https://docs.geteventflow.net/ + CQRS ES event sourcing + UPDATED BY BUILD + bin\$(Configuration)\$(TargetFramework)\EventFlow.xml + true + enable + enable + + + + + - - - + + + + - - - - - - - - + + + From 03d94a8774ed54ae92a53085e5bc4c3b34161d8d Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Tue, 7 Jan 2025 20:52:00 +0100 Subject: [PATCH 2/4] Update release notes --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5cbee06c5..5c1c8073e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,13 +1,13 @@ ### New in 1.1.1 (working version, not released yet) * New: NuGet `EventFlow.SQLite` is now released as part of v1 and enables support for SQLite +* New: NuGet `EventFlow.Redis` is now (properly) released as part of v1 and enables support for Redis * Fix: Invoking `UseEventPersistence` now removes any previously registered event persistence. This fixes a service ordering issue in the following event store configurations - MongoDB - MSSQL - PostgreSQL - ### New in 1.1.0 (released 2024-12-16) * New: More control of event naming by introducing the interface `IEventNamingStrategy`, see the From 7f99658e9c66a26c9b6037f10e0255b9cfdfc925 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Tue, 7 Jan 2025 21:01:33 +0100 Subject: [PATCH 3/4] Setup Redis csproj properly --- Source/EventFlow.Redis/EventFlow.Redis.csproj | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Source/EventFlow.Redis/EventFlow.Redis.csproj b/Source/EventFlow.Redis/EventFlow.Redis.csproj index 4f4d2eacb..14a66134d 100644 --- a/Source/EventFlow.Redis/EventFlow.Redis.csproj +++ b/Source/EventFlow.Redis/EventFlow.Redis.csproj @@ -2,11 +2,11 @@ netstandard2.1;net6.0;net8.0 - EventFlow + EventFlow.Redis Async/await first CQRS+ES and DDD framework for .NET - https://docs.geteventflow.net/ - CQRS ES event sourcing + CQRS ES event sourcing Redis UPDATED BY BUILD - bin\$(Configuration)\$(TargetFramework)\EventFlow.xml + bin\$(Configuration)\$(TargetFramework)\EventFlow.Redis.xml true enable enable From 686284b00252c70775496ce88e28220195f74aa0 Mon Sep 17 00:00:00 2001 From: Rasmus Mikkelsen Date: Tue, 7 Jan 2025 21:39:25 +0100 Subject: [PATCH 4/4] Cleanup --- EventFlow.sln | 17 ++ .../EventFlow.Redis.Tests.csproj | 4 +- Source/EventFlow.Redis/Constants.cs | 15 +- Source/EventFlow.Redis/EventFlow.Redis.csproj | 1 - .../EventStreamCollectionResolver.cs | 65 ++--- .../IEventStreamCollectionResolver.cs | 21 +- .../EventStore/RedisCommittedDomainEvent.cs | 33 ++- .../EventStore/RedisEventPersistence.cs | 252 ++++++++++-------- Source/EventFlow.Redis/Extensions.cs | 200 +++++++------- Source/EventFlow.Redis/PrefixedKey.cs | 35 +-- .../EventFlow.Redis/ReadStore/Extensions.cs | 13 +- .../ReadStore/IRedisHashBuilder.cs | 21 +- .../ReadStore/RedisHashBuilder.cs | 176 ++++++------ .../ReadStore/RedisQueryHandler.cs | 17 +- .../ReadStore/RedisReadModel.cs | 15 +- .../ReadStore/RedisReadModelStore.cs | 218 +++++++-------- .../SnapshotStore/RedisSnapshot.cs | 27 +- .../SnapshotStore/RedisSnapshotPersistence.cs | 149 ++++++----- .../SnapshotStore/SnapshotId.cs | 11 +- 19 files changed, 691 insertions(+), 599 deletions(-) diff --git a/EventFlow.sln b/EventFlow.sln index 04312f60b..a52aac0d7 100644 --- a/EventFlow.sln +++ b/EventFlow.sln @@ -63,6 +63,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.SQLite", "Source\ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.SQLite.Tests", "Source\EventFlow.SQLite.Tests\EventFlow.SQLite.Tests.csproj", "{8FAC191C-340D-47C6-B8AE-3D57783749C4}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Redis", "Redis", "{B1B5E6C8-1AB0-4848-9803-3CAF6A10F14C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.Redis", "Source\EventFlow.Redis\EventFlow.Redis.csproj", "{6B8E25D6-F7B8-4C2A-A956-8C87BF70D80C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.Redis.Tests", "Source\EventFlow.Redis.Tests\EventFlow.Redis.Tests.csproj", "{00DED999-173E-4518-81DC-2F1CFCE2F937}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -145,6 +151,14 @@ Global {8FAC191C-340D-47C6-B8AE-3D57783749C4}.Debug|Any CPU.Build.0 = Debug|Any CPU {8FAC191C-340D-47C6-B8AE-3D57783749C4}.Release|Any CPU.ActiveCfg = Release|Any CPU {8FAC191C-340D-47C6-B8AE-3D57783749C4}.Release|Any CPU.Build.0 = Release|Any CPU + {6B8E25D6-F7B8-4C2A-A956-8C87BF70D80C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6B8E25D6-F7B8-4C2A-A956-8C87BF70D80C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6B8E25D6-F7B8-4C2A-A956-8C87BF70D80C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6B8E25D6-F7B8-4C2A-A956-8C87BF70D80C}.Release|Any CPU.Build.0 = Release|Any CPU + {00DED999-173E-4518-81DC-2F1CFCE2F937}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {00DED999-173E-4518-81DC-2F1CFCE2F937}.Debug|Any CPU.Build.0 = Debug|Any CPU + {00DED999-173E-4518-81DC-2F1CFCE2F937}.Release|Any CPU.ActiveCfg = Release|Any CPU + {00DED999-173E-4518-81DC-2F1CFCE2F937}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -175,6 +189,9 @@ Global {74EFCDE2-CB0F-49B1-9CEC-BE748EB1FBF7} = {88359036-4F35-487C-BF2C-4F31C7BC92D8} {D2B5B5CA-57C2-4354-ADB7-47A6D81AD521} = {74EFCDE2-CB0F-49B1-9CEC-BE748EB1FBF7} {8FAC191C-340D-47C6-B8AE-3D57783749C4} = {74EFCDE2-CB0F-49B1-9CEC-BE748EB1FBF7} + {B1B5E6C8-1AB0-4848-9803-3CAF6A10F14C} = {5EE323DE-E69B-451A-8AC3-22DD6A004FBA} + {6B8E25D6-F7B8-4C2A-A956-8C87BF70D80C} = {B1B5E6C8-1AB0-4848-9803-3CAF6A10F14C} + {00DED999-173E-4518-81DC-2F1CFCE2F937} = {B1B5E6C8-1AB0-4848-9803-3CAF6A10F14C} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {17607E2C-4E8E-45A2-85BD-0A5808E1C0F3} diff --git a/Source/EventFlow.Redis.Tests/EventFlow.Redis.Tests.csproj b/Source/EventFlow.Redis.Tests/EventFlow.Redis.Tests.csproj index 0220bd56c..6c88ccdf2 100644 --- a/Source/EventFlow.Redis.Tests/EventFlow.Redis.Tests.csproj +++ b/Source/EventFlow.Redis.Tests/EventFlow.Redis.Tests.csproj @@ -1,7 +1,7 @@ - + - net6.0;net6.0 + net6.0;net8.0 enable enable diff --git a/Source/EventFlow.Redis/Constants.cs b/Source/EventFlow.Redis/Constants.cs index f57f88f2b..375f8fc68 100644 --- a/Source/EventFlow.Redis/Constants.cs +++ b/Source/EventFlow.Redis/Constants.cs @@ -20,11 +20,12 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -namespace EventFlow.Redis; - -public static class Constants +namespace EventFlow.Redis { - public const string StreamPrefix = "eventflow.events"; - public const string ReadModelPrefix = "eventflow.readmodel"; - public const string SnapshotPrefix = "eventflow.snapshot"; -} \ No newline at end of file + public static class Constants + { + public const string StreamPrefix = "eventflow.events"; + public const string ReadModelPrefix = "eventflow.readmodel"; + public const string SnapshotPrefix = "eventflow.snapshot"; + } +} diff --git a/Source/EventFlow.Redis/EventFlow.Redis.csproj b/Source/EventFlow.Redis/EventFlow.Redis.csproj index 14a66134d..62ea40bcc 100644 --- a/Source/EventFlow.Redis/EventFlow.Redis.csproj +++ b/Source/EventFlow.Redis/EventFlow.Redis.csproj @@ -9,7 +9,6 @@ bin\$(Configuration)\$(TargetFramework)\EventFlow.Redis.xml true enable - enable diff --git a/Source/EventFlow.Redis/EventStore/EventStreamCollectionResolver.cs b/Source/EventFlow.Redis/EventStore/EventStreamCollectionResolver.cs index d2a3d60dc..f9139487c 100644 --- a/Source/EventFlow.Redis/EventStore/EventStreamCollectionResolver.cs +++ b/Source/EventFlow.Redis/EventStore/EventStreamCollectionResolver.cs @@ -22,41 +22,42 @@ using StackExchange.Redis; -namespace EventFlow.Redis.EventStore; - -internal class EventStreamCollectionResolver : IEventStreamCollectionResolver +namespace EventFlow.Redis.EventStore { - private readonly IConnectionMultiplexer _multiplexer; - - public EventStreamCollectionResolver(IConnectionMultiplexer multiplexer) + internal class EventStreamCollectionResolver : IEventStreamCollectionResolver { - _multiplexer = multiplexer; - } + private readonly IConnectionMultiplexer _multiplexer; - // Using https://redis.io/commands/scan/ instead of KEYS to reduce blocking on the server. - public async Task> GetStreamIdsAsync(CancellationToken cancellationToken = default) - { - var cursor = 0; - var names = new List(); - do + public EventStreamCollectionResolver(IConnectionMultiplexer multiplexer) { - var result = await _multiplexer - .GetDatabase() - .ExecuteAsync("scan", cursor, "MATCH", $"{Constants.StreamPrefix}*") - .ConfigureAwait(false); - - var arr = (RedisResult[]) result; - cursor = (int) arr[0]; - var prefixedKeys = ((RedisResult[]) arr[1]).Select(n => AsPrefixedKey((string) n)); - names.AddRange(prefixedKeys); - } while (cursor != 0); + _multiplexer = multiplexer; + } - return names; - } - - private static PrefixedKey AsPrefixedKey(string k) - { - var split = k.Split(':'); - return new PrefixedKey(split[0], split[1]); + // Using https://redis.io/commands/scan/ instead of KEYS to reduce blocking on the server. + public async Task> GetStreamIdsAsync(CancellationToken cancellationToken = default) + { + var cursor = 0; + var names = new List(); + do + { + var result = await _multiplexer + .GetDatabase() + .ExecuteAsync("scan", cursor, "MATCH", $"{Constants.StreamPrefix}*") + .ConfigureAwait(false); + + var arr = (RedisResult[])result; + cursor = (int)arr[0]; + var prefixedKeys = ((RedisResult[])arr[1]).Select(n => AsPrefixedKey((string)n)); + names.AddRange(prefixedKeys); + } while (cursor != 0); + + return names; + } + + private static PrefixedKey AsPrefixedKey(string k) + { + var split = k.Split(':'); + return new PrefixedKey(split[0], split[1]); + } } -} \ No newline at end of file +} diff --git a/Source/EventFlow.Redis/EventStore/IEventStreamCollectionResolver.cs b/Source/EventFlow.Redis/EventStore/IEventStreamCollectionResolver.cs index 31a4fa6cf..7ceb05582 100644 --- a/Source/EventFlow.Redis/EventStore/IEventStreamCollectionResolver.cs +++ b/Source/EventFlow.Redis/EventStore/IEventStreamCollectionResolver.cs @@ -20,14 +20,15 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -namespace EventFlow.Redis.EventStore; - -internal interface IEventStreamCollectionResolver +namespace EventFlow.Redis.EventStore { - /// - /// Returns the ids of all streams (aggregates) used by eventflow. - /// - /// - /// - Task> GetStreamIdsAsync(CancellationToken cancellationToken = default); -} \ No newline at end of file + internal interface IEventStreamCollectionResolver + { + /// + /// Returns the ids of all streams (aggregates) used by EventFlow. + /// + /// + /// + Task> GetStreamIdsAsync(CancellationToken cancellationToken = default); + } +} diff --git a/Source/EventFlow.Redis/EventStore/RedisCommittedDomainEvent.cs b/Source/EventFlow.Redis/EventStore/RedisCommittedDomainEvent.cs index d894ef4c9..0c6459551 100644 --- a/Source/EventFlow.Redis/EventStore/RedisCommittedDomainEvent.cs +++ b/Source/EventFlow.Redis/EventStore/RedisCommittedDomainEvent.cs @@ -22,20 +22,25 @@ using EventFlow.EventStores; -namespace EventFlow.Redis.EventStore; - -internal class RedisCommittedDomainEvent : ICommittedDomainEvent +namespace EventFlow.Redis.EventStore { - public RedisCommittedDomainEvent(string aggregateId, string data, string metadata, int aggregateSequenceNumber) + internal class RedisCommittedDomainEvent : ICommittedDomainEvent { - AggregateId = aggregateId; - Data = data; - Metadata = metadata; - AggregateSequenceNumber = aggregateSequenceNumber; - } + public RedisCommittedDomainEvent( + string aggregateId, + string data, + string metadata, + int aggregateSequenceNumber) + { + AggregateId = aggregateId; + Data = data; + Metadata = metadata; + AggregateSequenceNumber = aggregateSequenceNumber; + } - public string AggregateId { get; init; } - public string Data { get; init; } - public string Metadata { get; init; } - public int AggregateSequenceNumber { get; init; } -} \ No newline at end of file + public string AggregateId { get; } + public string Data { get; } + public string Metadata { get; } + public int AggregateSequenceNumber { get; } + } +} diff --git a/Source/EventFlow.Redis/EventStore/RedisEventPersistence.cs b/Source/EventFlow.Redis/EventStore/RedisEventPersistence.cs index 548a5617c..b91580222 100644 --- a/Source/EventFlow.Redis/EventStore/RedisEventPersistence.cs +++ b/Source/EventFlow.Redis/EventStore/RedisEventPersistence.cs @@ -26,150 +26,180 @@ using Microsoft.Extensions.Logging; using StackExchange.Redis; -namespace EventFlow.Redis.EventStore; - -internal class RedisEventPersistence : IEventPersistence +namespace EventFlow.Redis.EventStore { - private readonly ILogger _logger; - private readonly IConnectionMultiplexer _multiplexer; - private readonly IEventStreamCollectionResolver _resolver; - - public RedisEventPersistence(IConnectionMultiplexer multiplexer, ILogger logger, - IEventStreamCollectionResolver resolver) + internal class RedisEventPersistence : IEventPersistence { - _multiplexer = multiplexer; - _logger = logger; - _resolver = resolver; - } + private readonly ILogger _logger; + private readonly IConnectionMultiplexer _multiplexer; + private readonly IEventStreamCollectionResolver _resolver; - public async Task LoadAllCommittedEvents(GlobalPosition globalPosition, int pageSize, - CancellationToken cancellationToken) - { - var startPosition = globalPosition.IsStart - ? 0 - : long.Parse(globalPosition.Value); + public RedisEventPersistence(IConnectionMultiplexer multiplexer, ILogger logger, + IEventStreamCollectionResolver resolver) + { + _multiplexer = multiplexer; + _logger = logger; + _resolver = resolver; + } - var streamNames = await _resolver.GetStreamIdsAsync(cancellationToken).ConfigureAwait(false); - var streamTasks = streamNames.Select(prefixedKey => - GetCommittedEventsAsync(prefixedKey, startPosition, cancellationToken, pageSize)).ToList(); + public async Task LoadAllCommittedEvents( + GlobalPosition globalPosition, + int pageSize, + CancellationToken cancellationToken) + { + var startPosition = globalPosition.IsStart + ? 0 + : long.Parse(globalPosition.Value); - await Task.WhenAll(streamTasks).ConfigureAwait(false); - var events = streamTasks.SelectMany(t => t.Result); + var streamNames = await _resolver.GetStreamIdsAsync(cancellationToken).ConfigureAwait(false); + var streamTasks = streamNames.Select(prefixedKey => + GetCommittedEventsAsync(prefixedKey, startPosition, cancellationToken, pageSize)).ToList(); - var nextPos = events.Any() - ? events.Max(e => e.AggregateSequenceNumber) - : startPosition; + await Task.WhenAll(streamTasks).ConfigureAwait(false); + var events = streamTasks + .SelectMany(t => t.Result) + .ToArray(); - if (_logger.IsEnabled(LogLevel.Trace)) - { - _logger.LogTrace("Loaded {Count} events from redis", events.Count()); - } + var nextPos = events.Any() + ? events.Max(e => e.AggregateSequenceNumber) + : startPosition; - return new AllCommittedEventsPage(new GlobalPosition(nextPos.ToString()), events.ToList()); - } + if (_logger.IsEnabled(LogLevel.Trace)) + { + _logger.LogTrace("Loaded {Count} events from redis", events.Count()); + } - public async Task> CommitEventsAsync(IIdentity id, - IReadOnlyCollection serializedEvents, CancellationToken cancellationToken) - { - var committedEvents = new List(); - var database = _multiplexer.GetDatabase(); - var prefixedKey = GetAsPrefixedKey(id.Value); + return new AllCommittedEventsPage(new GlobalPosition(nextPos.ToString()), events.ToList()); + } - foreach (var serializedEvent in serializedEvents) + public async Task> CommitEventsAsync( + IIdentity id, + IReadOnlyCollection serializedEvents, + CancellationToken cancellationToken) { - //Redis stream entry id uses the format: -, we leave the timestamp blank to ensure optimistic concurrency works - var messageId = $"0-{serializedEvent.AggregateSequenceNumber}"; + var committedEvents = new List(); + var database = _multiplexer.GetDatabase(); + var prefixedKey = GetAsPrefixedKey(id.Value); - var data = new NameValueEntry("data", serializedEvent.SerializedData); - var metadata = new NameValueEntry("metadata", serializedEvent.SerializedMetadata); - - try + foreach (var serializedEvent in serializedEvents) { - await database - .StreamAddAsync(prefixedKey, new[] {data, metadata}, messageId) - .ConfigureAwait(false); + //Redis stream entry id uses the format: -, we leave the timestamp blank to ensure optimistic concurrency works + var messageId = $"0-{serializedEvent.AggregateSequenceNumber}"; + + var data = new NameValueEntry("data", serializedEvent.SerializedData); + var metadata = new NameValueEntry("metadata", serializedEvent.SerializedMetadata); + + try + { + await database + .StreamAddAsync(prefixedKey, new[] { data, metadata }, messageId) + .ConfigureAwait(false); - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace( "Committed event with id {EventId} for aggregate with Id {AggregateId} to Redis ", - messageId, prefixedKey.Key); - - committedEvents.Add(new RedisCommittedDomainEvent(prefixedKey.Key, data.Value, metadata.Value, - serializedEvent.AggregateSequenceNumber)); - } - catch (RedisServerException e) - { - if (e.Message.Contains( - "ERR The ID specified in XADD is equal or smaller than the target stream top item")) - throw new OptimisticConcurrencyException(messageId, e); + messageId, + prefixedKey.Key); + + committedEvents.Add(new RedisCommittedDomainEvent( + prefixedKey.Key, + data.Value!, + metadata.Value!, + serializedEvent.AggregateSequenceNumber)); + } + catch (RedisServerException e) + { + // TODO: Make this more robust + if (e.Message.Contains("ERR The ID specified in XADD is equal or smaller than the target stream top item")) + { + throw new OptimisticConcurrencyException(messageId, e); + } + } } - } - return committedEvents; - } + return committedEvents; + } - public async Task> LoadCommittedEventsAsync(IIdentity id, - int fromEventSequenceNumber, CancellationToken cancellationToken) - { - var prefixedKey = GetAsPrefixedKey(id.Value); - var events = await GetCommittedEventsAsync(prefixedKey, fromEventSequenceNumber, cancellationToken) - .ConfigureAwait(false); + public async Task> LoadCommittedEventsAsync( + IIdentity id, + int fromEventSequenceNumber, + CancellationToken cancellationToken) + { + var prefixedKey = GetAsPrefixedKey(id.Value); + var events = await GetCommittedEventsAsync(prefixedKey, fromEventSequenceNumber, cancellationToken).ConfigureAwait(false); - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Loaded {Count} events for aggregate with id {AggregateId} from redis", events.Count(), + _logger.LogTrace( + "Loaded {Count} events for aggregate with id {AggregateId} from redis", + events.Count, id.Value); - return events.ToList(); - } - - public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken) - { - var database = _multiplexer.GetDatabase(); - var keyWithPrefix = GetAsPrefixedKey(id.Value); + return events; + } - var result = await database.KeyDeleteAsync(keyWithPrefix).ConfigureAwait(false); - if (!result) + public Task> LoadCommittedEventsAsync( + IIdentity id, + int fromEventSequenceNumber, + int toEventSequenceNumber, + CancellationToken cancellationToken) { - _logger.LogWarning("Failed to delete the Redis Stream with id {Id}", id.Value); - return; + throw new NotImplementedException(); } - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Deleted events for aggregate with id {AggregateId}", id.Value); - } + public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken) + { + var database = _multiplexer.GetDatabase(); + var keyWithPrefix = GetAsPrefixedKey(id.Value); - private async Task> GetCommittedEventsAsync(PrefixedKey prefixedKey, - long fromPosition, - CancellationToken token, int? limit = null) - { - //Stackexchange.Redis uses XREAD to read streams, which does not include the item at fromPosition, so we have to start at fromPosition -1 - var fromMessageId = fromPosition == 0 ? $"0-{fromPosition}" : $"0-{fromPosition - 1}"; - var database = _multiplexer.GetDatabase(); + var result = await database.KeyDeleteAsync(keyWithPrefix).ConfigureAwait(false); + if (!result) + { + _logger.LogWarning("Failed to delete the Redis Stream with id {Id}", id.Value); + return; + } - var result = await database.StreamReadAsync(prefixedKey, fromMessageId, count: limit).ConfigureAwait(false); - if (!result.Any()) - { if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("No events found for aggregate {AggregateId}", prefixedKey.Key); - return Array.Empty(); + _logger.LogTrace("Deleted events for aggregate with id {AggregateId}", id.Value); } - var committedEvents = result.Select(se => + private async Task> GetCommittedEventsAsync( + PrefixedKey prefixedKey, + long fromPosition, + CancellationToken token, + int? limit = null) { - var data = se.Values.FirstOrDefault(v => v.Name == "data").Value; - var metadata = se.Values.FirstOrDefault(v => v.Name == "metadata").Value; - var sequenceNumber = int.Parse(((string) se.Id).Split("-").Last()); + //Stackexchange.Redis uses XREAD to read streams, which does not include the item at fromPosition, so we have to start at fromPosition -1 + var fromMessageId = fromPosition == 0 ? $"0-{fromPosition}" : $"0-{fromPosition - 1}"; + var database = _multiplexer.GetDatabase(); + + var result = await database.StreamReadAsync(prefixedKey, fromMessageId, count: limit).ConfigureAwait(false); + if (!result.Any()) + { + if (_logger.IsEnabled(LogLevel.Trace)) + _logger.LogTrace("No events found for aggregate {AggregateId}", prefixedKey.Key); + return Array.Empty(); + } - return new RedisCommittedDomainEvent(prefixedKey.Key, data, metadata, sequenceNumber); - }); + var committedEvents = result + .Select(se => + { + var data = se.Values.FirstOrDefault(v => v.Name == "data").Value; + var metadata = se.Values.FirstOrDefault(v => v.Name == "metadata").Value; + var sequenceNumber = int.Parse(((string)se.Id).Split("-").Last()); - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Found {Count} events for aggregate with id {AggregateId}", committedEvents.Count(), - prefixedKey.Key); + return new RedisCommittedDomainEvent(prefixedKey.Key, data!, metadata!, sequenceNumber); + }) + .ToList(); - return committedEvents.ToList(); - } + if (_logger.IsEnabled(LogLevel.Trace)) + { + _logger.LogTrace( + "Found {Count} events for aggregate with id {AggregateId}", + committedEvents.Count, + prefixedKey.Key); + } - private static PrefixedKey GetAsPrefixedKey(string id) - => new PrefixedKey(Constants.StreamPrefix, id); -} \ No newline at end of file + return committedEvents; + } + + private static PrefixedKey GetAsPrefixedKey(string id) => new PrefixedKey(Constants.StreamPrefix, id); + } +} diff --git a/Source/EventFlow.Redis/Extensions.cs b/Source/EventFlow.Redis/Extensions.cs index d847718ff..fe605707f 100644 --- a/Source/EventFlow.Redis/Extensions.cs +++ b/Source/EventFlow.Redis/Extensions.cs @@ -30,106 +30,108 @@ using Redis.OM; using StackExchange.Redis; -namespace EventFlow.Redis; - -public static class Extensions +namespace EventFlow.Redis { - /// - /// Adds the required services to use redis with EventFlow - /// - /// - /// A Multiplexer connected to a redis database - /// - public static IEventFlowOptions ConfigureRedis(this IEventFlowOptions options, IConnectionMultiplexer multiplexer) - { - var provider = new RedisConnectionProvider(multiplexer); - options.ServiceCollection.TryAddSingleton(multiplexer); - options.ServiceCollection.TryAddSingleton(provider); - - return options; - } - /// - /// Adds the required services to use redis with EventFlow - /// - /// - /// The connection string to connect with redis - /// - public static IEventFlowOptions ConfigureRedis(this IEventFlowOptions options, string connectionString) + public static class Extensions { - var multiplexer = ConnectionMultiplexer.Connect(connectionString); - var provider = new RedisConnectionProvider(multiplexer); - options.ServiceCollection.TryAddSingleton(multiplexer); - options.ServiceCollection.TryAddSingleton(provider); - - return options; - } - - /// - /// Configures Redis as the event persistence. Requires Redis >= v5.0 - /// - /// - /// - public static IEventFlowOptions UseRedisEventStore(this IEventFlowOptions options) - { - options.ServiceCollection.TryAddTransient(); - options.UseEventPersistence(); - - return options; - } - - /// - /// Configures Redis as the read model store for the given read model. Requires Redis >= 5.0 and the redis search extension - /// - /// - /// The type of the redis ReadModel. Can be queried by using the IndexedAttribute from Redis.Om on the required properties and inheriting from - /// - public static IEventFlowOptions UseRedisReadModel(this IEventFlowOptions options) - where TReadModel : RedisReadModel - { - options.ServiceCollection.TryAddTransient(); - options.ServiceCollection.TryAddTransient, RedisReadModelStore>(); - - options.UseReadStoreFor, TReadModel>(); - - var provider = options.ServiceCollection.BuildServiceProvider().GetRequiredService(); - provider.Connection.CreateIndex(typeof(TReadModel)); - - return options; - } - - /// - /// Configures Redis as the read model store for the given read model. Requires Redis >= 5.0 and the redis search extension - /// - /// - /// The type of the redis ReadModel. Can be queried by using the IndexedAttribute from Redis.Om on the required properties and inheriting from - /// The type of the ReadModelLocator - /// - public static IEventFlowOptions UseRedisReadModel(this IEventFlowOptions options) - where TReadModel : RedisReadModel where TReadModelLocator : IReadModelLocator - { - options.ServiceCollection.TryAddTransient(); - options.ServiceCollection.TryAddTransient, RedisReadModelStore>(); - - options.UseReadStoreFor, TReadModel, TReadModelLocator>(); - - var provider = options.ServiceCollection.BuildServiceProvider().GetRequiredService(); - provider.Connection.CreateIndex(typeof(TReadModel)); - - return options; - } - - /// - /// Configures Redis as a snapshot store for EventFlow. Requires Redis >= v5.0 and the Redis Json extension - /// - /// - /// - public static IEventFlowOptions UseRedisSnapshotStore(this IEventFlowOptions options) - { - options.UseSnapshotPersistence(ServiceLifetime.Transient); - var provider = options.ServiceCollection.BuildServiceProvider().GetRequiredService(); - provider.Connection.CreateIndex(typeof(RedisSnapshot)); - - return options; + /// + /// Adds the required services to use redis with EventFlow + /// + /// + /// A Multiplexer connected to a redis database + /// + public static IEventFlowOptions ConfigureRedis(this IEventFlowOptions options, IConnectionMultiplexer multiplexer) + { + var provider = new RedisConnectionProvider(multiplexer); + options.ServiceCollection.TryAddSingleton(multiplexer); + options.ServiceCollection.TryAddSingleton(provider); + + return options; + } + + /// + /// Adds the required services to use redis with EventFlow + /// + /// + /// The connection string to connect with redis + /// + public static IEventFlowOptions ConfigureRedis(this IEventFlowOptions options, string connectionString) + { + var multiplexer = ConnectionMultiplexer.Connect(connectionString); + var provider = new RedisConnectionProvider(multiplexer); + options.ServiceCollection.TryAddSingleton(multiplexer); + options.ServiceCollection.TryAddSingleton(provider); + + return options; + } + + /// + /// Configures Redis as the event persistence. Requires Redis >= v5.0 + /// + /// + /// + public static IEventFlowOptions UseRedisEventStore(this IEventFlowOptions options) + { + options.ServiceCollection.TryAddTransient(); + options.UseEventPersistence(); + + return options; + } + + /// + /// Configures Redis as the read model store for the given read model. Requires Redis >= 5.0 and the redis search extension + /// + /// + /// The type of the redis ReadModel. Can be queried by using the IndexedAttribute from Redis.Om on the required properties and inheriting from + /// + public static IEventFlowOptions UseRedisReadModel(this IEventFlowOptions options) + where TReadModel : RedisReadModel + { + options.ServiceCollection.TryAddTransient(); + options.ServiceCollection.TryAddTransient, RedisReadModelStore>(); + + options.UseReadStoreFor, TReadModel>(); + + var provider = options.ServiceCollection.BuildServiceProvider().GetRequiredService(); + provider.Connection.CreateIndex(typeof(TReadModel)); + + return options; + } + + /// + /// Configures Redis as the read model store for the given read model. Requires Redis >= 5.0 and the redis search extension + /// + /// + /// The type of the redis ReadModel. Can be queried by using the IndexedAttribute from Redis.Om on the required properties and inheriting from + /// The type of the ReadModelLocator + /// + public static IEventFlowOptions UseRedisReadModel(this IEventFlowOptions options) + where TReadModel : RedisReadModel where TReadModelLocator : IReadModelLocator + { + options.ServiceCollection.TryAddTransient(); + options.ServiceCollection.TryAddTransient, RedisReadModelStore>(); + + options.UseReadStoreFor, TReadModel, TReadModelLocator>(); + + var provider = options.ServiceCollection.BuildServiceProvider().GetRequiredService(); + provider.Connection.CreateIndex(typeof(TReadModel)); + + return options; + } + + /// + /// Configures Redis as a snapshot store for EventFlow. Requires Redis >= v5.0 and the Redis Json extension + /// + /// + /// + public static IEventFlowOptions UseRedisSnapshotStore(this IEventFlowOptions options) + { + options.UseSnapshotPersistence(ServiceLifetime.Transient); + var provider = options.ServiceCollection.BuildServiceProvider().GetRequiredService(); + provider.Connection.CreateIndex(typeof(RedisSnapshot)); + + return options; + } } -} \ No newline at end of file +} diff --git a/Source/EventFlow.Redis/PrefixedKey.cs b/Source/EventFlow.Redis/PrefixedKey.cs index c856fa5f6..5ef289afc 100644 --- a/Source/EventFlow.Redis/PrefixedKey.cs +++ b/Source/EventFlow.Redis/PrefixedKey.cs @@ -22,26 +22,27 @@ using StackExchange.Redis; -namespace EventFlow.Redis; - -public record PrefixedKey +namespace EventFlow.Redis { - public PrefixedKey(string prefix, string key) + public class PrefixedKey { - if (string.IsNullOrEmpty(prefix)) - throw new ArgumentException("Prefix cant be empty"); - if (string.IsNullOrEmpty(key)) - throw new ArgumentException("Key cant be empty"); + public PrefixedKey(string prefix, string key) + { + if (string.IsNullOrEmpty(prefix)) + throw new ArgumentException("Prefix cant be empty"); + if (string.IsNullOrEmpty(key)) + throw new ArgumentException("Key cant be empty"); - Prefix = prefix; - Key = key; - } + Prefix = prefix; + Key = key; + } - public string Prefix { get; private set; } - public string Key { get; private set; } + public string Prefix { get; private set; } + public string Key { get; private set; } - public override string ToString() => $"{Prefix}:{Key}"; + public override string ToString() => $"{Prefix}:{Key}"; - public static implicit operator RedisKey(PrefixedKey prefixedKey) => new RedisKey(prefixedKey.ToString()); - public static implicit operator string(PrefixedKey prefixedKey) => prefixedKey.ToString(); -} \ No newline at end of file + public static implicit operator RedisKey(PrefixedKey prefixedKey) => new RedisKey(prefixedKey.ToString()); + public static implicit operator string(PrefixedKey prefixedKey) => prefixedKey.ToString(); + } +} diff --git a/Source/EventFlow.Redis/ReadStore/Extensions.cs b/Source/EventFlow.Redis/ReadStore/Extensions.cs index 99a929b8a..cd133c928 100644 --- a/Source/EventFlow.Redis/ReadStore/Extensions.cs +++ b/Source/EventFlow.Redis/ReadStore/Extensions.cs @@ -22,12 +22,13 @@ using StackExchange.Redis; -namespace EventFlow.Redis.ReadStore; - -internal static class Extensions +namespace EventFlow.Redis.ReadStore { - internal static IEnumerable ToHashEntries(this IReadOnlyDictionary dict) + internal static class Extensions { - return dict.ToArray().Select(kv => new HashEntry(kv.Key, kv.Value)); + internal static IEnumerable ToHashEntries(this IReadOnlyDictionary dict) + { + return dict.ToArray().Select(kv => new HashEntry(kv.Key, kv.Value)); + } } -} \ No newline at end of file +} diff --git a/Source/EventFlow.Redis/ReadStore/IRedisHashBuilder.cs b/Source/EventFlow.Redis/ReadStore/IRedisHashBuilder.cs index 7b8adcd01..3c9cb79c5 100644 --- a/Source/EventFlow.Redis/ReadStore/IRedisHashBuilder.cs +++ b/Source/EventFlow.Redis/ReadStore/IRedisHashBuilder.cs @@ -20,14 +20,15 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -namespace EventFlow.Redis.ReadStore; - -internal interface IRedisHashBuilder +namespace EventFlow.Redis.ReadStore { - /// - /// Builds a Dictionary out of the provided object by using reflection - /// - /// The object - /// A Dictionary containing all properties, indexed by their property name - IReadOnlyDictionary BuildHashSet(object obj); -} \ No newline at end of file + internal interface IRedisHashBuilder + { + /// + /// Builds a Dictionary out of the provided object by using reflection + /// + /// The object + /// A Dictionary containing all properties, indexed by their property name + IReadOnlyDictionary BuildHashSet(object obj); + } +} diff --git a/Source/EventFlow.Redis/ReadStore/RedisHashBuilder.cs b/Source/EventFlow.Redis/ReadStore/RedisHashBuilder.cs index c0de85862..281030400 100644 --- a/Source/EventFlow.Redis/ReadStore/RedisHashBuilder.cs +++ b/Source/EventFlow.Redis/ReadStore/RedisHashBuilder.cs @@ -25,115 +25,139 @@ using System.Reflection; using Redis.OM.Modeling; -namespace EventFlow.Redis.ReadStore; +namespace EventFlow.Redis.ReadStore +{ + //original source: https://github.com/redis/redis-om-dotnet/tree/main/src/Redis.OM/Modeling + /* + Copyright (c) 2022 Redis Inc. -//original source: https://github.dev/redis/redis-om-dotnet/tree/main/src/Redis.OM/Modeling + Permission is hereby granted, free of charge, to any person + obtaining a copy of this software and associated documentation + files (the "Software"), to deal in the Software without + restriction, including without limitation the rights to use, + copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the + Software is furnished to do so, subject to the following + conditions: -public class RedisHashBuilder : IRedisHashBuilder -{ - private static readonly ConcurrentDictionary CachedProperties = new(); + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + OTHER DEALINGS IN THE SOFTWARE. + */ - public IReadOnlyDictionary BuildHashSet(object? obj) + public class RedisHashBuilder : IRedisHashBuilder { - if (obj is null) - return new Dictionary(); + private static readonly ConcurrentDictionary CachedProperties = new ConcurrentDictionary(); - var objType = obj.GetType(); - if (!CachedProperties.TryGetValue(objType, out var properties)) + public IReadOnlyDictionary BuildHashSet(object obj) { - properties = objType.GetProperties(); - CachedProperties.TryAdd(objType, properties); - } + if (obj is null) + { + return new Dictionary(); + } - properties = properties.Where(x => x.GetValue(obj) != null).ToArray(); + var objType = obj.GetType(); + if (!CachedProperties.TryGetValue(objType, out var properties)) + { + properties = objType.GetProperties(); + CachedProperties.TryAdd(objType, properties); + } - var hash = new Dictionary(); - foreach (var property in properties) - { - var type = Nullable.GetUnderlyingType(property.PropertyType) ?? property.PropertyType; + properties = properties.Where(x => x.GetValue(obj) != null).ToArray(); - var propertyName = property.Name; - ExtractPropertyName(property, ref propertyName); - if (type.IsPrimitive || type == typeof(decimal) || type == typeof(string) || type == typeof(GeoLoc) || - type == typeof(Ulid) || type == typeof(Guid)) + var hash = new Dictionary(); + foreach (var property in properties) { - var val = property.GetValue(obj); - if (val != null) + var type = Nullable.GetUnderlyingType(property.PropertyType) ?? property.PropertyType; + + var propertyName = property.Name; + ExtractPropertyName(property, ref propertyName); + if (type.IsPrimitive || type == typeof(decimal) || type == typeof(string) || type == typeof(GeoLoc) || + type == typeof(Ulid) || type == typeof(Guid)) { - hash.Add(propertyName, val.ToString()); + var val = property.GetValue(obj); + if (val != null) + { + hash.Add(propertyName, val.ToString()); + } } - } - else if (type.IsEnum) - { - var val = property.GetValue(obj); - hash.Add(propertyName, ((int) val).ToString()); - } - else if (type == typeof(DateTimeOffset)) - { - var val = (DateTimeOffset) property.GetValue(obj); - if (val != null) + else if (type.IsEnum) { - hash.Add(propertyName, val.ToString("O")); + var val = property.GetValue(obj); + hash.Add(propertyName, ((int)val).ToString()); } - } - else if (type == typeof(DateTime) || type == typeof(DateTime?)) - { - var val = (DateTime) property.GetValue(obj); - if (val != default) + else if (type == typeof(DateTimeOffset)) { - hash.Add(propertyName, new DateTimeOffset(val).ToUnixTimeMilliseconds().ToString()); + var val = (DateTimeOffset)property.GetValue(obj); + hash.Add(propertyName, val.ToString("O")); } - } - else if (type == typeof(IEnumerable<>) || type.GetInterfaces() - .Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IEnumerable<>))) - { - var e = (IEnumerable) property.GetValue(obj); - var i = 0; - foreach (var v in e) + else if (type == typeof(DateTime) || type == typeof(DateTime?)) { - var innerType = v.GetType(); - if (innerType.IsPrimitive || innerType == typeof(decimal) || innerType == typeof(string)) + var val = (DateTime)property.GetValue(obj); + if (val != default) { - hash.Add($"{propertyName}[{i}]", v.ToString()); + hash.Add(propertyName, new DateTimeOffset(val).ToUnixTimeMilliseconds().ToString()); } - else + } + else if (type == typeof(IEnumerable<>) || type.GetInterfaces() + .Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IEnumerable<>))) + { + var e = (IEnumerable)property.GetValue(obj); + var i = 0; + foreach (var v in e) { - var subHash = BuildHashSet(v); - foreach (var kvp in subHash) + var innerType = v.GetType(); + if (innerType.IsPrimitive || innerType == typeof(decimal) || innerType == typeof(string)) { - hash.Add($"{propertyName}[{i}].{kvp.Key}", kvp.Value); + hash.Add($"{propertyName}[{i}]", v.ToString()); + } + else + { + var subHash = BuildHashSet(v); + foreach (var kvp in subHash) + { + hash.Add($"{propertyName}[{i}].{kvp.Key}", kvp.Value); + } } - } - i++; + i++; + } } - } - else - { - var subHash = BuildHashSet(property.GetValue(obj)); - if (subHash != null) + else { - foreach (var kvp in subHash) + var subHash = BuildHashSet(property.GetValue(obj)); + if (subHash != null) { - hash.Add($"{propertyName}.{kvp.Key}", kvp.Value); + foreach (var kvp in subHash) + { + hash.Add($"{propertyName}.{kvp.Key}", kvp.Value); + } } } } - } - return hash; - } + return hash; + } - private static void ExtractPropertyName(PropertyInfo property, ref string propertyName) - { - var fieldAttr = property.GetCustomAttributes(typeof(RedisFieldAttribute), true); - if (fieldAttr.Any()) + private static void ExtractPropertyName(PropertyInfo property, ref string propertyName) { - var rfa = (RedisFieldAttribute) fieldAttr.First(); - if (!string.IsNullOrEmpty(rfa.PropertyName)) + var fieldAttr = property.GetCustomAttributes(typeof(RedisFieldAttribute), true); + if (fieldAttr.Any()) { - propertyName = rfa.PropertyName; + var rfa = (RedisFieldAttribute)fieldAttr.First(); + if (!string.IsNullOrEmpty(rfa.PropertyName)) + { + propertyName = rfa.PropertyName; + } } } } -} \ No newline at end of file +} diff --git a/Source/EventFlow.Redis/ReadStore/RedisQueryHandler.cs b/Source/EventFlow.Redis/ReadStore/RedisQueryHandler.cs index c77da480a..8f5f87cb8 100644 --- a/Source/EventFlow.Redis/ReadStore/RedisQueryHandler.cs +++ b/Source/EventFlow.Redis/ReadStore/RedisQueryHandler.cs @@ -23,14 +23,15 @@ using Redis.OM; using Redis.OM.Searching; -namespace EventFlow.Redis.ReadStore; - -public abstract class RedisQueryHandler where TReadModel : RedisReadModel +namespace EventFlow.Redis.ReadStore { - protected readonly IRedisCollection Collection; - - protected RedisQueryHandler(RedisConnectionProvider provider) + public abstract class RedisQueryHandler where TReadModel : RedisReadModel { - Collection = provider.RedisCollection(); + protected readonly IRedisCollection Collection; + + protected RedisQueryHandler(RedisConnectionProvider provider) + { + Collection = provider.RedisCollection(); + } } -} \ No newline at end of file +} diff --git a/Source/EventFlow.Redis/ReadStore/RedisReadModel.cs b/Source/EventFlow.Redis/ReadStore/RedisReadModel.cs index 0a5ade394..bc12f22b3 100644 --- a/Source/EventFlow.Redis/ReadStore/RedisReadModel.cs +++ b/Source/EventFlow.Redis/ReadStore/RedisReadModel.cs @@ -23,11 +23,12 @@ using EventFlow.ReadStores; using Redis.OM.Modeling; -namespace EventFlow.Redis.ReadStore; - -[Document(Prefixes = new[] {Constants.ReadModelPrefix})] -public abstract class RedisReadModel : IReadModel +namespace EventFlow.Redis.ReadStore { - [RedisIdField] public string Id { get; set; } - [Indexed] public long? Version { get; set; } -} \ No newline at end of file + [Document(Prefixes = new[] { Constants.ReadModelPrefix })] + public abstract class RedisReadModel : IReadModel + { + [RedisIdField] public string Id { get; set; } + [Indexed] public long? Version { get; set; } + } +} diff --git a/Source/EventFlow.Redis/ReadStore/RedisReadModelStore.cs b/Source/EventFlow.Redis/ReadStore/RedisReadModelStore.cs index 7604513f2..1e4924aed 100644 --- a/Source/EventFlow.Redis/ReadStore/RedisReadModelStore.cs +++ b/Source/EventFlow.Redis/ReadStore/RedisReadModelStore.cs @@ -24,144 +24,144 @@ using EventFlow.Core; using EventFlow.Core.RetryStrategies; using EventFlow.Exceptions; +using EventFlow.Extensions; using EventFlow.ReadStores; using Microsoft.Extensions.Logging; using Redis.OM; using Redis.OM.Searching; using StackExchange.Redis; -namespace EventFlow.Redis.ReadStore; - -internal class RedisReadModelStore : IReadModelStore - where TReadModel : RedisReadModel +namespace EventFlow.Redis.ReadStore { - private readonly IRedisCollection _collection; - private readonly IRedisHashBuilder _hashBuilder; - private readonly ILogger> _logger; - private readonly IConnectionMultiplexer _multiplexer; - private readonly RedisConnectionProvider _provider; - private readonly ITransientFaultHandler _transientFaultHandler; - - public RedisReadModelStore(RedisConnectionProvider redisConnectionProvider, - ITransientFaultHandler transientFaultHandler, - IConnectionMultiplexer multiplexer, IRedisHashBuilder hashBuilder, - ILogger> logger) - { - _provider = redisConnectionProvider; - _transientFaultHandler = transientFaultHandler; - _multiplexer = multiplexer; - _hashBuilder = hashBuilder; - _logger = logger; - _collection = redisConnectionProvider.RedisCollection(); - } - public async Task DeleteAsync(string id, CancellationToken cancellationToken) + internal class RedisReadModelStore : IReadModelStore + where TReadModel : RedisReadModel { - var prefixedKey = new PrefixedKey(Constants.ReadModelPrefix, id); - var first = await _collection.FindByIdAsync(prefixedKey).ConfigureAwait(false); - if (first is null) + private readonly IRedisCollection _collection; + private readonly IRedisHashBuilder _hashBuilder; + private readonly ILogger> _logger; + private readonly IConnectionMultiplexer _multiplexer; + private readonly RedisConnectionProvider _provider; + private readonly ITransientFaultHandler _transientFaultHandler; + + public RedisReadModelStore(RedisConnectionProvider redisConnectionProvider, + ITransientFaultHandler transientFaultHandler, + IConnectionMultiplexer multiplexer, IRedisHashBuilder hashBuilder, + ILogger> logger) { - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Failed to delete readmodel with id {Id} because it was not found", id); - return; + _provider = redisConnectionProvider; + _transientFaultHandler = transientFaultHandler; + _multiplexer = multiplexer; + _hashBuilder = hashBuilder; + _logger = logger; + _collection = redisConnectionProvider.RedisCollection(); } - await _collection.DeleteAsync(first).ConfigureAwait(false); - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Deleted Readmodel {Id}", id); - } + public async Task DeleteAsync(string id, CancellationToken cancellationToken) + { + var prefixedKey = new PrefixedKey(Constants.ReadModelPrefix, id); + var first = await _collection.FindByIdAsync(prefixedKey).ConfigureAwait(false); + if (first is null) + { + _logger.LogTrace("Failed to delete read model with id {Id} because it was not found", id); + return; + } + + await _collection.DeleteAsync(first).ConfigureAwait(false); + _logger.LogTrace("Deleted Read model {Id}", id); + } - public Task DeleteAllAsync(CancellationToken cancellationToken) - { - var result = _provider.Connection.DropIndexAndAssociatedRecords(typeof(TReadModel)); - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace( - !result - ? "Failed to delete index and records of readmodel {Name}" - : "Deleted index and records of readmodel {Name}", typeof(TReadModel)); - - return Task.CompletedTask; - } + public Task DeleteAllAsync(CancellationToken cancellationToken) + { + var result = _provider.Connection.DropIndexAndAssociatedRecords(typeof(TReadModel)); + if (_logger.IsEnabled(LogLevel.Trace)) + { + _logger.LogTrace( + !result + ? "Failed to delete index and records of read model {Name}" + : "Deleted index and records of read model {Name}", typeof(TReadModel).PrettyPrint()); - public async Task> GetAsync(string id, CancellationToken cancellationToken) - { - var rm = await _collection.FindByIdAsync(id).ConfigureAwait(false); - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Found readmodel with {Id}: {ReadModel}", id, rm); + } - return rm is null ? ReadModelEnvelope.Empty(id) : ReadModelEnvelope.With(id, rm); - } + return Task.CompletedTask; + } - public async Task UpdateAsync(IReadOnlyCollection readModelUpdates, - IReadModelContextFactory readModelContextFactory, - Func, ReadModelEnvelope, CancellationToken, - Task>> updateReadModel, CancellationToken cancellationToken) - { - foreach (var rmUpdate in readModelUpdates) + public async Task> GetAsync(string id, CancellationToken cancellationToken) { - await _transientFaultHandler.TryAsync( - c => UpdateReadModelAsync(rmUpdate, updateReadModel, readModelContextFactory, c), - Label.Named("redis-readmodel-update"), cancellationToken).ConfigureAwait(false); + var rm = await _collection.FindByIdAsync(id).ConfigureAwait(false); + _logger.LogTrace("Found read model with {Id}: {ReadModel}", id, rm); + + return rm is null ? ReadModelEnvelope.Empty(id) : ReadModelEnvelope.With(id, rm); } - } - private async Task UpdateReadModelAsync(ReadModelUpdate update, - Func, ReadModelEnvelope, CancellationToken, - Task>> updateReadModel, IReadModelContextFactory readModelContextFactory, - CancellationToken token) - { - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Updating readmodel {Id}", update.ReadModelId); + public async Task UpdateAsync(IReadOnlyCollection readModelUpdates, + IReadModelContextFactory readModelContextFactory, + Func, ReadModelEnvelope, CancellationToken, + Task>> updateReadModel, CancellationToken cancellationToken) + { + foreach (var rmUpdate in readModelUpdates) + { + await _transientFaultHandler.TryAsync( + c => UpdateReadModelAsync(rmUpdate, updateReadModel, readModelContextFactory, c), + Label.Named("redis-readmodel-update"), cancellationToken).ConfigureAwait(false); + } + } - var prefixedKey = new PrefixedKey(Constants.ReadModelPrefix, update.ReadModelId); + private async Task UpdateReadModelAsync(ReadModelUpdate update, + Func, ReadModelEnvelope, CancellationToken, + Task>> updateReadModel, IReadModelContextFactory readModelContextFactory, + CancellationToken token) + { + _logger.LogTrace("Updating read model {Id}", update.ReadModelId); - var readModel = await _collection.FindByIdAsync(prefixedKey).ConfigureAwait(false); - var readModelEnvelope = readModel is null - ? ReadModelEnvelope.Empty(update.ReadModelId) - : ReadModelEnvelope.With(update.ReadModelId, readModel); + var prefixedKey = new PrefixedKey(Constants.ReadModelPrefix, update.ReadModelId); - var context = - readModelContextFactory.Create(readModelEnvelope.ReadModelId, readModelEnvelope.ReadModel is null); - var updateResult = await updateReadModel(context, update.DomainEvents, readModelEnvelope, token); + var readModel = await _collection.FindByIdAsync(prefixedKey).ConfigureAwait(false); + var readModelEnvelope = readModel is null + ? ReadModelEnvelope.Empty(update.ReadModelId) + : ReadModelEnvelope.With(update.ReadModelId, readModel); - if (!updateResult.IsModified) - { - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Readmodel not modified"); - return; - } + var context = + readModelContextFactory.Create(readModelEnvelope.ReadModelId, readModelEnvelope.ReadModel is null); + var updateResult = await updateReadModel(context, update.DomainEvents, readModelEnvelope, token); - if (context.IsMarkedForDeletion) - { - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Deleting deadmodel because was marked for deletion"); - await DeleteAsync(update.ReadModelId, token).ConfigureAwait(false); + if (!updateResult.IsModified) + { + _logger.LogTrace("Read model not modified"); + return; + } - return; - } + if (context.IsMarkedForDeletion) + { + _logger.LogTrace("Deleting read model because was marked for deletion"); + await DeleteAsync(update.ReadModelId, token).ConfigureAwait(false); - readModelEnvelope = updateResult.Envelope; - var originalVersion = readModelEnvelope.ReadModel.Version; - readModelEnvelope.ReadModel.Version = readModelEnvelope.Version; + return; + } - var hashEntries = _hashBuilder.BuildHashSet(readModelEnvelope.ReadModel).ToHashEntries(); + readModelEnvelope = updateResult.Envelope; + var originalVersion = readModelEnvelope.ReadModel.Version; + readModelEnvelope.ReadModel.Version = readModelEnvelope.Version; - var tran = _multiplexer.GetDatabase().CreateTransaction(); - tran.AddCondition(Condition.HashEqual(prefixedKey, "Version", originalVersion)); - tran.HashSetAsync(prefixedKey, hashEntries.ToArray()); + var hashEntries = _hashBuilder.BuildHashSet(readModelEnvelope.ReadModel).ToHashEntries(); - var result = await tran.ExecuteAsync().ConfigureAwait(false); - if (!result) - { - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace( - "Transaction failed because of a wrong aggregate version, throwing OptimisticConcurrencyException"); + var tran = _multiplexer.GetDatabase().CreateTransaction(); + tran.AddCondition(Condition.HashEqual(prefixedKey, "Version", originalVersion)); + await tran.HashSetAsync(prefixedKey, hashEntries.ToArray()); - throw new OptimisticConcurrencyException( - $"The version of the readmodel {prefixedKey.Key} is not the expected version {originalVersion}"); - } + var result = await tran.ExecuteAsync().ConfigureAwait(false); + if (!result) + { + _logger.LogTrace($"Transaction failed because of a wrong aggregate version, throwing {nameof(OptimisticConcurrencyException)}"); + + throw new OptimisticConcurrencyException( + $"The version of the read model {prefixedKey.Key} is not the expected version {originalVersion}"); + } - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Updated and saved readmodel with id {Id}", readModelEnvelope.ReadModelId); + if (_logger.IsEnabled(LogLevel.Trace)) + { + _logger.LogTrace("Updated and saved read model with id {Id}", readModelEnvelope.ReadModelId); + } + } } -} \ No newline at end of file +} diff --git a/Source/EventFlow.Redis/SnapshotStore/RedisSnapshot.cs b/Source/EventFlow.Redis/SnapshotStore/RedisSnapshot.cs index 7d4e47674..52c3f44a0 100644 --- a/Source/EventFlow.Redis/SnapshotStore/RedisSnapshot.cs +++ b/Source/EventFlow.Redis/SnapshotStore/RedisSnapshot.cs @@ -22,17 +22,18 @@ using Redis.OM.Modeling; -namespace EventFlow.Redis.SnapshotStore; - -//Storage type json is required due to https://github.com/redis/redis-om-dotnet/issues/175 -[Document(StorageType = StorageType.Json, Prefixes = new[] {Constants.SnapshotPrefix})] -public class RedisSnapshot +namespace EventFlow.Redis.SnapshotStore { - [RedisIdField] public string Id { get; set; } - public long? Version { get; set; } - [Indexed] public string AggregateId { get; set; } - [Indexed] public string AggregateName { get; set; } - [Indexed] public int AggregateSequenceNumber { get; set; } - public string Data { get; set; } - public string Metadata { get; set; } -} \ No newline at end of file + //Storage type json is required due to https://github.com/redis/redis-om-dotnet/issues/175 + [Document(StorageType = StorageType.Json, Prefixes = new[] { Constants.SnapshotPrefix })] + public class RedisSnapshot + { + [RedisIdField] public string Id { get; set; } + public long? Version { get; set; } + [Indexed] public string AggregateId { get; set; } + [Indexed] public string AggregateName { get; set; } + [Indexed] public int AggregateSequenceNumber { get; set; } + public string Data { get; set; } + public string Metadata { get; set; } + } +} diff --git a/Source/EventFlow.Redis/SnapshotStore/RedisSnapshotPersistence.cs b/Source/EventFlow.Redis/SnapshotStore/RedisSnapshotPersistence.cs index ddf01f269..e8b966b20 100644 --- a/Source/EventFlow.Redis/SnapshotStore/RedisSnapshotPersistence.cs +++ b/Source/EventFlow.Redis/SnapshotStore/RedisSnapshotPersistence.cs @@ -28,93 +28,98 @@ using Redis.OM; using Redis.OM.Searching; -namespace EventFlow.Redis.SnapshotStore; - -public class RedisSnapshotPersistence : ISnapshotPersistence +namespace EventFlow.Redis.SnapshotStore { - private readonly IRedisCollection _collection; - private readonly ILogger _logger; - private readonly RedisConnectionProvider _provider; - - public RedisSnapshotPersistence(RedisConnectionProvider provider, ILogger logger) + public class RedisSnapshotPersistence : ISnapshotPersistence { - _provider = provider; - _logger = logger; - _collection = provider.RedisCollection(); - } - - public async Task GetSnapshotAsync(Type aggregateType, IIdentity identity, - CancellationToken cancellationToken) - { - var snapshot = await _collection.FirstOrDefaultAsync(sn => sn.AggregateId == identity.Value) - .ConfigureAwait(false); - if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Found snapshot for id {Id}: {Snapshot}", identity.Value, snapshot); - - return snapshot is null ? null : new CommittedSnapshot(snapshot.Metadata, snapshot.Data); - } + private readonly IRedisCollection _collection; + private readonly ILogger _logger; + private readonly RedisConnectionProvider _provider; - public async Task SetSnapshotAsync(Type aggregateType, IIdentity identity, SerializedSnapshot serializedSnapshot, - CancellationToken cancellationToken) - { - var aggregateName = aggregateType.GetAggregateName().Value; - var snapshot = new RedisSnapshot + public RedisSnapshotPersistence(RedisConnectionProvider provider, ILogger logger) { - Id = SnapshotId.New.Value, - AggregateId = identity.Value, - AggregateName = aggregateName, - AggregateSequenceNumber = serializedSnapshot.Metadata.AggregateSequenceNumber, - Data = serializedSnapshot.SerializedData, - Metadata = serializedSnapshot.SerializedMetadata - }; - + _provider = provider; + _logger = logger; + _collection = provider.RedisCollection(); + } - var prevSnapshots = await _collection.Where(sn => - sn.AggregateId == identity.Value && sn.AggregateName == aggregateName && sn.AggregateSequenceNumber == - serializedSnapshot.Metadata.AggregateSequenceNumber).ToListAsync(); + public async Task GetSnapshotAsync( + Type aggregateType, + IIdentity identity, + CancellationToken cancellationToken) + { + var snapshot = await _collection.FirstOrDefaultAsync(sn => sn.AggregateId == identity.Value).ConfigureAwait(false); + _logger.LogTrace("Found snapshot for id {Id}: {Snapshot}", identity.Value, snapshot); - var deleteTasks = prevSnapshots.Select(pr => _collection.DeleteAsync(pr)); - await Task.WhenAll(deleteTasks).ConfigureAwait(false); + return snapshot is null ? null : new CommittedSnapshot(snapshot.Metadata, snapshot.Data); + } - await _collection.InsertAsync(snapshot).ConfigureAwait(false); + public async Task SetSnapshotAsync(Type aggregateType, IIdentity identity, SerializedSnapshot serializedSnapshot, + CancellationToken cancellationToken) + { + var aggregateName = aggregateType.GetAggregateName().Value; + var snapshot = new RedisSnapshot + { + Id = SnapshotId.New.Value, + AggregateId = identity.Value, + AggregateName = aggregateName, + AggregateSequenceNumber = serializedSnapshot.Metadata.AggregateSequenceNumber, + Data = serializedSnapshot.SerializedData, + Metadata = serializedSnapshot.SerializedMetadata + }; + + var prevSnapshots = await _collection + .Where(sn => + sn.AggregateId == identity.Value && + sn.AggregateName == aggregateName && + sn.AggregateSequenceNumber == serializedSnapshot.Metadata.AggregateSequenceNumber) + .ToListAsync(); + + var deleteTasks = prevSnapshots.Select(pr => _collection.DeleteAsync(pr)); + await Task.WhenAll(deleteTasks).ConfigureAwait(false); + + await _collection.InsertAsync(snapshot).ConfigureAwait(false); - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Saved snapshot with id {Id}", identity.Value); - } + } - public async Task DeleteSnapshotAsync(Type aggregateType, IIdentity identity, CancellationToken cancellationToken) - { - var snapshot = await _collection.FirstOrDefaultAsync(sn => sn.AggregateId == identity.Value) - .ConfigureAwait(false); - if (snapshot is not null) + public async Task DeleteSnapshotAsync(Type aggregateType, IIdentity identity, CancellationToken cancellationToken) { - await _collection.DeleteAsync(snapshot); - if (_logger.IsEnabled(LogLevel.Trace)) + var snapshot = await _collection.FirstOrDefaultAsync(sn => sn.AggregateId == identity.Value).ConfigureAwait(false); + if (snapshot != null) + { + await _collection.DeleteAsync(snapshot); _logger.LogTrace("Deleted snapshot with id {Id}", identity.Value); + } + else + { + _logger.LogTrace("Failed to delete snapshot with id {Id}, snapshot was not found", identity.Value); + } } - else - _logger.LogTrace("Failed to delete snapshot with id {Id}, snapshot was not found", identity.Value); - } - public async Task PurgeSnapshotsAsync(Type aggregateType, CancellationToken cancellationToken) - { - var aggregateName = aggregateType.GetAggregateName().Value; - var snapshots = await _collection.Where(sn => sn.AggregateName == aggregateName).ToListAsync() - .ConfigureAwait(false); - await Task.WhenAll(snapshots.Select(sn => _collection.DeleteAsync(sn))).ConfigureAwait(false); + public async Task PurgeSnapshotsAsync(Type aggregateType, CancellationToken cancellationToken) + { + var aggregateName = aggregateType.GetAggregateName().Value; + var snapshots = await _collection.Where(sn => sn.AggregateName == aggregateName).ToListAsync() + .ConfigureAwait(false); + await Task.WhenAll(snapshots.Select(sn => _collection.DeleteAsync(sn))).ConfigureAwait(false); - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Purged all snapshots of aggregate {Aggregate}", aggregateName); - } - - public Task PurgeSnapshotsAsync(CancellationToken cancellationToken) - { - var result = _provider.Connection.DropIndexAndAssociatedRecords(typeof(RedisSnapshot)); - if (!result) - _logger.LogWarning("Failed to purge all snapshots"); - else if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Purged all snapshots"); + } - return Task.CompletedTask; + public Task PurgeSnapshotsAsync(CancellationToken cancellationToken) + { + var result = _provider.Connection.DropIndexAndAssociatedRecords(typeof(RedisSnapshot)); + if (!result) + { + _logger.LogWarning("Failed to purge all snapshots"); + } + else + { + _logger.LogTrace("Purged all snapshots"); + } + + return Task.CompletedTask; + } } -} \ No newline at end of file +} diff --git a/Source/EventFlow.Redis/SnapshotStore/SnapshotId.cs b/Source/EventFlow.Redis/SnapshotStore/SnapshotId.cs index 99de37a68..860f9718e 100644 --- a/Source/EventFlow.Redis/SnapshotStore/SnapshotId.cs +++ b/Source/EventFlow.Redis/SnapshotStore/SnapshotId.cs @@ -22,11 +22,12 @@ using EventFlow.Core; -namespace EventFlow.Redis.SnapshotStore; - -public class SnapshotId : Identity +namespace EventFlow.Redis.SnapshotStore { - public SnapshotId(string value) : base(value) + public class SnapshotId : Identity { + public SnapshotId(string value) : base(value) + { + } } -} \ No newline at end of file +}