From 47650260c7d66f5cfa0ee571b93d5847f2e93cd7 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 3 Feb 2022 00:15:32 +0700 Subject: [PATCH 1/2] Add issue spec --- .../PostgreSqlJournalRecoverySpec.cs | 150 ++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 src/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalRecoverySpec.cs diff --git a/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalRecoverySpec.cs b/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalRecoverySpec.cs new file mode 100644 index 0000000..9f28522 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalRecoverySpec.cs @@ -0,0 +1,150 @@ +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlJournalRecoverySpec: Akka.TestKit.Xunit2.TestKit + { + private const int TotalActors = 50; + private const int TotalPersistedMessages = 100; + + private static Config Initialize(PostgresFixture fixture) + { + //need to make sure db is created before the tests start + DbUtils.Initialize(fixture); + + var config = @" + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.postgresql"" + postgresql { + class = ""Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = event_journal + schema-name = public + auto-initialize = on + connection-string = """ + DbUtils.ConnectionString + @""" + } + } + } + akka.test.single-expect-default = 10s"; + + return ConfigurationFactory.ParseString(config); + } + + public PostgreSqlJournalRecoverySpec(ITestOutputHelper output, PostgresFixture fixture) + : base(Initialize(fixture), "PostgreSqlJournalSpec", output: output) + { + } + + [Fact] + public async Task Recovery_should_emit_messages_in_the_right_order() + { + var probes = new TestProbe[TotalActors]; + var actors = new IActorRef[TotalActors]; + var states = new int[TotalActors]; + + // Create persistent actors and populate them with data + for (var i = 0; i < TotalActors; i++) + { + probes[i] = CreateTestProbe(); + actors[i] = Sys.ActorOf(Props.Create(() => new PersistentActor($"test_id_{i}", probes[i]))); + + probes[i].ExpectMsg("Recovered. State: 0"); + + foreach (var data in Enumerable.Range(1, TotalPersistedMessages)) + { + states[i] += await actors[i].Ask(data); + } + } + + // Kill all actors + for (var i = 0; i < TotalActors; i++) + { + probes[i].Watch(actors[i]); + actors[i].Tell(PoisonPill.Instance); + probes[i].ExpectTerminated(actors[i]); + probes[i].Unwatch(actors[i]); + } + + // Restore all actors + for (var i = 0; i < TotalActors; i++) + { + actors[i] = Sys.ActorOf(Props.Create(() => new PersistentActor($"test_id_{i}", probes[i]))); + } + + // All actors should restore fine + for (var i = 0; i < TotalActors; i++) + { + probes[i].ExpectMsg($"Recovered. State: {states[i]}"); + } + } + + internal class PersistentActor : ReceivePersistentActor + { + private bool _recoveryCompleted; + private readonly IActorRef _probe; + private int _state; + public override string PersistenceId { get; } + + public PersistentActor(string persistenceId, IActorRef probe) + { + PersistenceId = persistenceId; + _probe = probe; + + Command(l => + { + var sender = Sender; + Persist(new Persisted(l), p => + { + _state += p.Data; + sender.Tell(p.Data); + }); + }); + + Recover(p => + { + if(_recoveryCompleted) + { + _probe.Tell($"Persisted {p.Data} arrived after recovery was completed."); + return; + } + + _state += p.Data; + }); + } + + protected override void Unhandled(object message) + { + switch (message) + { + case RecoveryCompleted _: + _recoveryCompleted = true; + _probe.Tell($"Recovered. State: {_state}"); + return; + case Persisted p: + _probe.Tell($"Persisted {p.Data} was unhandled"); + break; + } + base.Unhandled(message); + } + } + + internal class Persisted + { + public Persisted(int data) + { + Data = data; + } + + public int Data { get; } + } + } +} \ No newline at end of file From c27961e8e06be5e88354e751adb2d04a11e16001 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 3 Feb 2022 01:03:29 +0700 Subject: [PATCH 2/2] Update spec, use predictable randomized chaotic seeding --- .../PostgreSqlJournalRecoverySpec.cs | 80 +++++++++++++------ 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalRecoverySpec.cs b/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalRecoverySpec.cs index 9f28522..cbefdf4 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalRecoverySpec.cs +++ b/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalRecoverySpec.cs @@ -1,8 +1,11 @@ -using System.Linq; +using System; +using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; using Akka.TestKit; +using FluentAssertions; using Xunit; using Xunit.Abstractions; @@ -13,6 +16,16 @@ public class PostgreSqlJournalRecoverySpec: Akka.TestKit.Xunit2.TestKit { private const int TotalActors = 50; private const int TotalPersistedMessages = 100; + private static readonly int Seed; + private static readonly Random Rnd; + + static PostgreSqlJournalRecoverySpec() + { + // Generate seed from DateTime + var now = DateTime.Now; + Seed = now.Year + now.Month + now.Day + now.Hour + now.Minute + now.Second + now.Millisecond; + Rnd = new Random(Seed); + } private static Config Initialize(PostgresFixture fixture) { @@ -47,56 +60,77 @@ public PostgreSqlJournalRecoverySpec(ITestOutputHelper output, PostgresFixture f [Fact] public async Task Recovery_should_emit_messages_in_the_right_order() { - var probes = new TestProbe[TotalActors]; + var probe = CreateTestProbe(); + //var probes = new TestProbe[TotalActors]; var actors = new IActorRef[TotalActors]; var states = new int[TotalActors]; // Create persistent actors and populate them with data for (var i = 0; i < TotalActors; i++) { - probes[i] = CreateTestProbe(); - actors[i] = Sys.ActorOf(Props.Create(() => new PersistentActor($"test_id_{i}", probes[i]))); - - probes[i].ExpectMsg("Recovered. State: 0"); - - foreach (var data in Enumerable.Range(1, TotalPersistedMessages)) - { - states[i] += await actors[i].Ask(data); - } + actors[i] = Sys.ActorOf(Props.Create(() => new PersistentActor(i, probe))); + var m = probe.ExpectMsg<(int, string)>(); + var (idx, msg) = m; + idx.Should().Be(i); + msg.Should().Contain("Recovered. State: 0"); + } + + // Chaotically populate actors with data + var lastData = new int[TotalActors]; + var actorIndices = new List(); + for (var i = 0; i < TotalActors; i++) + { + actorIndices.Add(i); + } + + while (actorIndices.Count > 0) + { + var i = actorIndices[Rnd.Next(0, actorIndices.Count)]; + states[i] += await actors[i].Ask(++lastData[i]); + if (lastData[i] == TotalPersistedMessages) + actorIndices.Remove(i); } // Kill all actors for (var i = 0; i < TotalActors; i++) { - probes[i].Watch(actors[i]); + probe.Watch(actors[i]); actors[i].Tell(PoisonPill.Instance); - probes[i].ExpectTerminated(actors[i]); - probes[i].Unwatch(actors[i]); + probe.ExpectTerminated(actors[i]); + probe.Unwatch(actors[i]); } - + // Restore all actors + probe = CreateTestProbe(); for (var i = 0; i < TotalActors; i++) { - actors[i] = Sys.ActorOf(Props.Create(() => new PersistentActor($"test_id_{i}", probes[i]))); + actors[i] = Sys.ActorOf(Props.Create(() => new PersistentActor(i, probe))); } // All actors should restore fine for (var i = 0; i < TotalActors; i++) { - probes[i].ExpectMsg($"Recovered. State: {states[i]}"); + var m = probe.ExpectMsg<(int, string)>(); + var (index, msg) = m; + msg.Should().Contain($"Recovered. State: {states[index]}"); + Output.WriteLine($"{index}. {msg}"); } + + // No message should arrive after + probe.ExpectNoMsg(TimeSpan.FromSeconds(2)); } internal class PersistentActor : ReceivePersistentActor { private bool _recoveryCompleted; private readonly IActorRef _probe; + private readonly int _index; private int _state; - public override string PersistenceId { get; } + public override string PersistenceId => $"test_id_{_index}"; - public PersistentActor(string persistenceId, IActorRef probe) + public PersistentActor(int index, IActorRef probe) { - PersistenceId = persistenceId; + _index = index; _probe = probe; Command(l => @@ -113,7 +147,7 @@ public PersistentActor(string persistenceId, IActorRef probe) { if(_recoveryCompleted) { - _probe.Tell($"Persisted {p.Data} arrived after recovery was completed."); + _probe.Tell($"{PersistenceId}: Persisted {p.Data} arrived after recovery was completed. Random seed: {Seed}"); return; } @@ -127,10 +161,10 @@ protected override void Unhandled(object message) { case RecoveryCompleted _: _recoveryCompleted = true; - _probe.Tell($"Recovered. State: {_state}"); + _probe.Tell((_index, $"Recovered. State: {_state}")); return; case Persisted p: - _probe.Tell($"Persisted {p.Data} was unhandled"); + _probe.Tell($"{PersistenceId}: Persisted {p.Data} was unhandled. Random seed: {Seed}"); break; } base.Unhandled(message);