From 848cd4c17ea6b6f5b825b6ab509ed4fe1defac85 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 9 Jan 2025 03:29:58 +0700 Subject: [PATCH] Add per-plugin recovery permitter actors (#7448) * Add per-plugin recovery permitter actors * Add specs --- .../MultipleRecoveryPermitterSpec.cs | 63 +++++++++++++++++++ .../RecoveryPermitterSpec.cs | 2 +- .../Eventsourced.Lifecycle.cs | 3 +- .../Akka.Persistence/Eventsourced.Recovery.cs | 2 +- src/core/Akka.Persistence/Eventsourced.cs | 3 + src/core/Akka.Persistence/Persistence.cs | 32 ++++++---- .../Akka.Persistence/RecoveryPermitter.cs | 57 ++++++++++------- 7 files changed, 125 insertions(+), 37 deletions(-) create mode 100644 src/core/Akka.Persistence.Tests/MultipleRecoveryPermitterSpec.cs diff --git a/src/core/Akka.Persistence.Tests/MultipleRecoveryPermitterSpec.cs b/src/core/Akka.Persistence.Tests/MultipleRecoveryPermitterSpec.cs new file mode 100644 index 00000000000..30dfbf9d653 --- /dev/null +++ b/src/core/Akka.Persistence.Tests/MultipleRecoveryPermitterSpec.cs @@ -0,0 +1,63 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2025 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using FluentAssertions; +using Xunit; + +namespace Akka.Persistence.Tests; + +public class MultipleRecoveryPermitterSpec : PersistenceSpec +{ + private readonly IActorRef _permitter1; + private readonly IActorRef _permitter2; + + public MultipleRecoveryPermitterSpec() : base(ConfigurationFactory.ParseString($$""" + akka.persistence + { + # default global max recovery value + max-concurrent-recoveries = 3 + + journal + { + plugin = "akka.persistence.journal.inmem" + inmem2 { + # max recovery value override + max-concurrent-recoveries = 20 + class = "Akka.Persistence.Journal.MemoryJournal, Akka.Persistence" + plugin-dispatcher = "akka.actor.default-dispatcher" + } + } + + # snapshot store plugin is NOT defined, things should still work + snapshot-store.plugin = "akka.persistence.no-snapshot-store" + snapshot-store.local.dir = "target/snapshots-"{{typeof(RecoveryPermitterSpec).FullName}}"} + """)) + { + var extension = Persistence.Instance.Apply(Sys); + _permitter1 = extension.RecoveryPermitterFor(null); + _permitter2 = extension.RecoveryPermitterFor("akka.persistence.journal.inmem2"); + } + + [Fact(DisplayName = "Plugin max-concurrent-recoveries HOCON setting should override akka.persistence setting")] + public async Task HoconOverrideTest() + { + _permitter1.Tell(GetMaxPermits.Instance); + await ExpectMsgAsync(3); + + _permitter2.Tell(GetMaxPermits.Instance); + await ExpectMsgAsync(20); + } + + [Fact(DisplayName = "Each plugin should have their own recovery permitter")] + public void MultiRecoveryPermitterActorTest() + { + _permitter1.Equals(_permitter2).Should().BeFalse(); + } +} \ No newline at end of file diff --git a/src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs b/src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs index f70196d47e1..d0944fa4eda 100644 --- a/src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs +++ b/src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs @@ -75,7 +75,7 @@ public RecoveryPermitterSpec() : base(ConfigurationFactory.ParseString(@" akka.persistence.snapshot-store.local.dir = ""target/snapshots-" + typeof(RecoveryPermitterSpec).FullName + @"/""")) { - permitter = Persistence.Instance.Apply(Sys).RecoveryPermitter(); + permitter = Persistence.Instance.Apply(Sys).RecoveryPermitterFor(null); } private void RequestPermit(TestProbe probe) diff --git a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs index d1b582df3c0..5f0f92013c9 100644 --- a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs +++ b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs @@ -27,7 +27,7 @@ private void StartRecovery(Recovery recovery) private void RequestRecoveryPermit() { - Extension.RecoveryPermitter().Tell(Akka.Persistence.RequestRecoveryPermit.Instance, Self); + RecoveryPermitter.Tell(Akka.Persistence.RequestRecoveryPermit.Instance, Self); ChangeState(WaitingRecoveryPermit(Recovery)); } @@ -46,6 +46,7 @@ public override void AroundPreStart() // Fail fast on missing plugins. var j = Journal; var s = SnapshotStore; + var r = RecoveryPermitter; RequestRecoveryPermit(); base.AroundPreStart(); } diff --git a/src/core/Akka.Persistence/Eventsourced.Recovery.cs b/src/core/Akka.Persistence/Eventsourced.Recovery.cs index 1150b4130c8..06a797a98ac 100644 --- a/src/core/Akka.Persistence/Eventsourced.Recovery.cs +++ b/src/core/Akka.Persistence/Eventsourced.Recovery.cs @@ -270,7 +270,7 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout) } private void ReturnRecoveryPermit() => - Extension.RecoveryPermitter().Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self); + RecoveryPermitter.Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self); private void TransitToProcessingState() { diff --git a/src/core/Akka.Persistence/Eventsourced.cs b/src/core/Akka.Persistence/Eventsourced.cs index 94aa6cfad18..80eff346edc 100644 --- a/src/core/Akka.Persistence/Eventsourced.cs +++ b/src/core/Akka.Persistence/Eventsourced.cs @@ -81,6 +81,7 @@ public abstract partial class Eventsourced : ActorBase, IPersistentIdentity, IPe private readonly IStash _internalStash; private IActorRef _snapshotStore; private IActorRef _journal; + private IActorRef _recoveryPermitter; private List _journalBatch = new(); private bool _isWriteInProgress; private long _sequenceNr; @@ -166,6 +167,8 @@ public IStash Stash /// public IActorRef Journal => _journal ??= Extension.JournalFor(JournalPluginId); + internal IActorRef RecoveryPermitter => _recoveryPermitter ??= Extension.RecoveryPermitterFor(JournalPluginId); + /// /// TBD /// diff --git a/src/core/Akka.Persistence/Persistence.cs b/src/core/Akka.Persistence/Persistence.cs index 0f4685e0356..2d6da79d108 100644 --- a/src/core/Akka.Persistence/Persistence.cs +++ b/src/core/Akka.Persistence/Persistence.cs @@ -8,7 +8,6 @@ using System; using System.Collections.Concurrent; using System.Linq; -using System.Reflection; using System.Threading; using Akka.Actor; using Akka.Annotations; @@ -21,11 +20,12 @@ namespace Akka.Persistence { internal struct PluginHolder { - public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config) + public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config, IActorRef recoveryPermitter) { Ref = @ref; Adapters = adapters; Config = config; + RecoveryPermitter = recoveryPermitter; } public IActorRef Ref { get; } @@ -33,6 +33,8 @@ public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config) public EventAdapters Adapters { get; } public Config Config { get; } + + public IActorRef RecoveryPermitter { get; } } /// @@ -50,7 +52,6 @@ public class PersistenceExtension : IExtension private readonly Lazy _defaultJournalPluginId; private readonly Lazy _defaultSnapshotPluginId; private readonly Lazy _defaultInternalStashOverflowStrategy; - private readonly Lazy _recoveryPermitter; private readonly ConcurrentDictionary> _pluginExtensionIds = new(); @@ -120,12 +121,6 @@ public PersistenceExtension(ExtendedActorSystem system) _log.Info("Auto-starting snapshot store `{0}`", id); SnapshotStoreFor(id); }); - - _recoveryPermitter = new Lazy(() => - { - var maxPermits = _config.GetInt("max-concurrent-recoveries", 0); - return _system.SystemActorOf(Akka.Persistence.RecoveryPermitter.Props(maxPermits), "recoveryPermitter"); - }); } /// @@ -152,9 +147,10 @@ public string PersistenceId(IActorRef actor) /// INTERNAL API: When starting many persistent actors at the same time the journal its data store is protected /// from being overloaded by limiting number of recoveries that can be in progress at the same time. /// - internal IActorRef RecoveryPermitter() + internal IActorRef RecoveryPermitterFor(string journalPluginId) { - return _recoveryPermitter.Value; + var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId; + return PluginHolderFor(configPath, JournalFallbackConfigPath).RecoveryPermitter; } /// @@ -270,6 +266,17 @@ private PluginHolder PluginHolderFor(string configPath, string fallbackPath) return pluginContainer.Value; } + private static IActorRef CreateRecoveryPermitter(ExtendedActorSystem system, string configPath, Config pluginConfig) + { + // backward compatibility + // get the setting from the plugin path, if not found, default to the one defined in "akka.persistence" + var maxPermits = pluginConfig.HasPath("max-concurrent-recoveries") + ? pluginConfig.GetInt("max-concurrent-recoveries") + : system.Settings.Config.GetInt("akka.persistence.max-concurrent-recoveries"); + + return system.SystemActorOf(RecoveryPermitter.Props(maxPermits), $"recoveryPermitter-{configPath}"); + } + private static IActorRef CreatePlugin(ExtendedActorSystem system, string configPath, Config pluginConfig) { var pluginActorName = configPath; @@ -303,8 +310,9 @@ private static PluginHolder NewPluginHolder(ExtendedActorSystem system, string c var config = system.Settings.Config.GetConfig(configPath).WithFallback(system.Settings.Config.GetConfig(fallbackPath)); var plugin = CreatePlugin(system, configPath, config); var adapters = CreateAdapters(system, configPath); + var recoveryPermitter = CreateRecoveryPermitter(system, configPath, config); - return new PluginHolder(plugin, adapters, config); + return new PluginHolder(plugin, adapters, config, recoveryPermitter); } } diff --git a/src/core/Akka.Persistence/RecoveryPermitter.cs b/src/core/Akka.Persistence/RecoveryPermitter.cs index 2f3eabfe1a8..694443d4062 100644 --- a/src/core/Akka.Persistence/RecoveryPermitter.cs +++ b/src/core/Akka.Persistence/RecoveryPermitter.cs @@ -32,6 +32,12 @@ internal sealed class ReturnRecoveryPermit private ReturnRecoveryPermit() { } } + internal sealed class GetMaxPermits + { + public static GetMaxPermits Instance { get; } = new(); + private GetMaxPermits() { } + } + /// /// When starting many persistent actors at the same time the journal its data store is protected /// from being overloaded by limiting number of recoveries that can be in progress at the same time. @@ -55,29 +61,36 @@ public RecoveryPermitter(int maxPermits) protected override void OnReceive(object message) { - if (message is RequestRecoveryPermit) - { - Context.Watch(Sender); - if (_usedPermits >= MaxPermits) - { - if (pending.Count == 0) - Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender); - pending.AddLast(Sender); - _maxPendingStats = Math.Max(_maxPendingStats, pending.Count); - } - else - { - RecoveryPermitGranted(Sender); - } - } - else if (message is ReturnRecoveryPermit) - { - ReturnRecoveryPermit(Sender); - } - else if (message is Terminated terminated && !pending.Remove(terminated.ActorRef)) + switch (message) { - // pre-mature termination should be rare - ReturnRecoveryPermit(terminated.ActorRef); + case RequestRecoveryPermit: + Context.Watch(Sender); + if (_usedPermits >= MaxPermits) + { + if (pending.Count == 0) + Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender); + pending.AddLast(Sender); + _maxPendingStats = Math.Max(_maxPendingStats, pending.Count); + } + else + { + RecoveryPermitGranted(Sender); + } + + break; + + case Akka.Persistence.ReturnRecoveryPermit: + ReturnRecoveryPermit(Sender); + break; + + case Terminated terminated when !pending.Remove(terminated.ActorRef): + // pre-mature termination should be rare + ReturnRecoveryPermit(terminated.ActorRef); + break; + + case GetMaxPermits: + Sender.Tell(MaxPermits); + break; } }