From 468546c78727048d56544f32600989ef63d0b1de Mon Sep 17 00:00:00 2001 From: Chris Hoare Date: Wed, 8 Jan 2025 22:28:43 +0000 Subject: [PATCH] Add support for optional snapshots (#7444) * Add support for optional snapshots * Fix markdown title formatting * Fix markdown consecutive blank lines --------- Co-authored-by: Chris Hoare Co-authored-by: Aaron Stannard --- docs/articles/persistence/snapshots.md | 11 ++++++ .../SnapshotFailureRobustnessSpec.cs | 37 +++++++++++++++++++ .../Akka.Persistence/Eventsourced.Recovery.cs | 24 +++++++++--- src/core/Akka.Persistence/Persistence.cs | 16 ++++++++ src/core/Akka.Persistence/persistence.conf | 8 ++++ 5 files changed, 90 insertions(+), 6 deletions(-) diff --git a/docs/articles/persistence/snapshots.md b/docs/articles/persistence/snapshots.md index 8e1d11295ea..e0aae700ec7 100644 --- a/docs/articles/persistence/snapshots.md +++ b/docs/articles/persistence/snapshots.md @@ -51,6 +51,17 @@ persistent actors should use the `deleteSnapshots` method. Depending on the jour best practice to do specific deletes with `deleteSnapshot` or to include a `minSequenceNr` as well as a `maxSequenceNr` for the `SnapshotSelectionCriteria`. +## Optional Snapshots + +By default, the persistent actor will unconditionally be stopped if the snapshot can't be loaded in the recovery. +It is possible to make snapshot loading optional. This can be useful when it is alright to ignore snapshot in case +of for example deserialization errors. When snapshot loading fails it will instead recover by replaying all events. + +Enable this feature by setting `snapshot-is-optional = true` in the snapshot store configuration. + +> [!WARNING] +>Don't set `snapshot-is-optional = true` if events have been deleted because that would result in wrong recovered state if snapshot load fails. + ## Snapshot Status Handling Saving or deleting snapshots can either succeed or fail – this information is reported back to the persistent actor via status messages as illustrated in the following table. diff --git a/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs b/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs index 1c431edb5e3..aad1bb33866 100644 --- a/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs +++ b/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs @@ -292,4 +292,41 @@ public void PersistentActor_with_a_failing_snapshot_should_receive_failure_messa m.Cause.Message.Contains("Failed to delete")); } } + + public class SnapshotIsOptionalSpec : PersistenceSpec + { + public SnapshotIsOptionalSpec() : base(Configuration("SnapshotIsOptionalSpec", serialization: "off", + extraConfig: @" +akka.persistence.snapshot-store.local.snapshot-is-optional = true +akka.persistence.snapshot-store.local.class = ""Akka.Persistence.Tests.SnapshotFailureRobustnessSpec+FailingLocalSnapshotStore, Akka.Persistence.Tests"" +")) + { + } + + [Fact] + public void PersistentActor_with_a_failing_snapshot_with_snapshot_is_optional_true_falls_back_to_events() + { + var spref = Sys.ActorOf(Props.Create(() => new SnapshotFailureRobustnessSpec.SaveSnapshotTestActor(Name, TestActor))); + + ExpectMsg(); + spref.Tell(new SnapshotFailureRobustnessSpec.Cmd("boom")); + ExpectMsg(1L); + + Sys.EventStream.Subscribe(TestActor, typeof(Error)); + try + { + + var lpref = Sys.ActorOf(Props.Create(() => new SnapshotFailureRobustnessSpec.LoadSnapshotTestActor(Name, TestActor))); + ExpectMsg(m => m.Message.ToString().StartsWith("Error loading snapshot")); + ExpectMsg("boom-1"); + ExpectMsg(); + + } + finally + { + Sys.EventStream.Unsubscribe(TestActor, typeof(Error)); + } + + } + } } diff --git a/src/core/Akka.Persistence/Eventsourced.Recovery.cs b/src/core/Akka.Persistence/Eventsourced.Recovery.cs index 06a797a98ac..2a66a47f655 100644 --- a/src/core/Akka.Persistence/Eventsourced.Recovery.cs +++ b/src/core/Akka.Persistence/Eventsourced.Recovery.cs @@ -7,6 +7,7 @@ using System; using Akka.Actor; +using Akka.Event; using Akka.Persistence.Internal; namespace Akka.Persistence @@ -61,7 +62,9 @@ private EventsourcedState RecoveryStarted(long maxReplays) // protect against snapshot stalling forever because journal overloaded and such var timeout = Extension.JournalConfigFor(JournalPluginId).GetTimeSpan("recovery-event-timeout", null, false); var timeoutCancelable = Context.System.Scheduler.ScheduleTellOnceCancelable(timeout, Self, new RecoveryTick(true), Self); - + + var snapshotIsOptional = Extension.SnapshotStoreConfigFor(SnapshotPluginId).GetBoolean("snapshot-is-optional", false); + bool RecoveryBehavior(object message) { Receive receiveRecover = ReceiveRecover; @@ -120,15 +123,24 @@ bool RecoveryBehavior(object message) } case LoadSnapshotFailed failed: timeoutCancelable.Cancel(); - try + if (snapshotIsOptional) { - OnRecoveryFailure(failed.Cause); + Log.Info("Snapshot load error for persistenceId [{0}]. Replaying all events since snapshot-is-optional=true", PersistenceId); + ChangeState(Recovering(RecoveryBehavior, timeout)); + Journal.Tell(new ReplayMessages(LastSequenceNr +1L, long.MaxValue, maxReplays, PersistenceId, Self)); } - finally + else { - Context.Stop(Self); + try + { + OnRecoveryFailure(failed.Cause); + } + finally + { + Context.Stop(Self); + } + ReturnRecoveryPermit(); } - ReturnRecoveryPermit(); break; case RecoveryTick { Snapshot: true }: try diff --git a/src/core/Akka.Persistence/Persistence.cs b/src/core/Akka.Persistence/Persistence.cs index 2d6da79d108..33013b23bf8 100644 --- a/src/core/Akka.Persistence/Persistence.cs +++ b/src/core/Akka.Persistence/Persistence.cs @@ -198,6 +198,22 @@ internal Config JournalConfigFor(string journalPluginId) var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId; return PluginHolderFor(configPath, JournalFallbackConfigPath).Config; } + + /// + /// Returns the plugin config identified by . + /// When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path. + /// When configured, uses as absolute path to the journal configuration entry. + /// + /// TBD + /// + /// This exception is thrown when either the plugin class name is undefined or the configuration path is missing. + /// + /// TBD + internal Config SnapshotStoreConfigFor(string snapshotPluginId) + { + var configPath = string.IsNullOrEmpty(snapshotPluginId) ? _defaultSnapshotPluginId.Value : snapshotPluginId; + return PluginHolderFor(configPath, SnapshotStoreFallbackConfigPath).Config; + } /// /// Looks up the plugin config by plugin's ActorRef. diff --git a/src/core/Akka.Persistence/persistence.conf b/src/core/Akka.Persistence/persistence.conf index 57f3e552b7a..3c1273edca5 100644 --- a/src/core/Akka.Persistence/persistence.conf +++ b/src/core/Akka.Persistence/persistence.conf @@ -181,6 +181,14 @@ akka.persistence { call-timeout = 20s reset-timeout = 60s } + + # Set this to true if successful loading of snapshot is not necessary. + # This can be useful when it is alright to ignore snapshot in case of + # for example deserialization errors. When snapshot loading fails it will instead + # recover by replaying all events. + # Don't set to true if events are deleted because that would + # result in wrong recovered state if snapshot load fails. + snapshot-is-optional = false } fsm {