diff --git a/src/core/Akka.Persistence.Tests/MultipleRecoveryPermitterSpec.cs b/src/core/Akka.Persistence.Tests/MultipleRecoveryPermitterSpec.cs
new file mode 100644
index 00000000000..30dfbf9d653
--- /dev/null
+++ b/src/core/Akka.Persistence.Tests/MultipleRecoveryPermitterSpec.cs
@@ -0,0 +1,63 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2025 Lightbend Inc.
+// Copyright (C) 2013-2025 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using System.Threading.Tasks;
+using Akka.Actor;
+using Akka.Configuration;
+using FluentAssertions;
+using Xunit;
+
+namespace Akka.Persistence.Tests;
+
+public class MultipleRecoveryPermitterSpec : PersistenceSpec
+{
+ private readonly IActorRef _permitter1;
+ private readonly IActorRef _permitter2;
+
+ public MultipleRecoveryPermitterSpec() : base(ConfigurationFactory.ParseString($$"""
+ akka.persistence
+ {
+ # default global max recovery value
+ max-concurrent-recoveries = 3
+
+ journal
+ {
+ plugin = "akka.persistence.journal.inmem"
+ inmem2 {
+ # max recovery value override
+ max-concurrent-recoveries = 20
+ class = "Akka.Persistence.Journal.MemoryJournal, Akka.Persistence"
+ plugin-dispatcher = "akka.actor.default-dispatcher"
+ }
+ }
+
+ # snapshot store plugin is NOT defined, things should still work
+ snapshot-store.plugin = "akka.persistence.no-snapshot-store"
+ snapshot-store.local.dir = "target/snapshots-"{{typeof(RecoveryPermitterSpec).FullName}}"}
+ """))
+ {
+ var extension = Persistence.Instance.Apply(Sys);
+ _permitter1 = extension.RecoveryPermitterFor(null);
+ _permitter2 = extension.RecoveryPermitterFor("akka.persistence.journal.inmem2");
+ }
+
+ [Fact(DisplayName = "Plugin max-concurrent-recoveries HOCON setting should override akka.persistence setting")]
+ public async Task HoconOverrideTest()
+ {
+ _permitter1.Tell(GetMaxPermits.Instance);
+ await ExpectMsgAsync(3);
+
+ _permitter2.Tell(GetMaxPermits.Instance);
+ await ExpectMsgAsync(20);
+ }
+
+ [Fact(DisplayName = "Each plugin should have their own recovery permitter")]
+ public void MultiRecoveryPermitterActorTest()
+ {
+ _permitter1.Equals(_permitter2).Should().BeFalse();
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs b/src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs
index f70196d47e1..d0944fa4eda 100644
--- a/src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs
+++ b/src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs
@@ -75,7 +75,7 @@ public RecoveryPermitterSpec() : base(ConfigurationFactory.ParseString(@"
akka.persistence.snapshot-store.local.dir = ""target/snapshots-" + typeof(RecoveryPermitterSpec).FullName +
@"/"""))
{
- permitter = Persistence.Instance.Apply(Sys).RecoveryPermitter();
+ permitter = Persistence.Instance.Apply(Sys).RecoveryPermitterFor(null);
}
private void RequestPermit(TestProbe probe)
diff --git a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
index d1b582df3c0..5f0f92013c9 100644
--- a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
+++ b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
@@ -27,7 +27,7 @@ private void StartRecovery(Recovery recovery)
private void RequestRecoveryPermit()
{
- Extension.RecoveryPermitter().Tell(Akka.Persistence.RequestRecoveryPermit.Instance, Self);
+ RecoveryPermitter.Tell(Akka.Persistence.RequestRecoveryPermit.Instance, Self);
ChangeState(WaitingRecoveryPermit(Recovery));
}
@@ -46,6 +46,7 @@ public override void AroundPreStart()
// Fail fast on missing plugins.
var j = Journal;
var s = SnapshotStore;
+ var r = RecoveryPermitter;
RequestRecoveryPermit();
base.AroundPreStart();
}
diff --git a/src/core/Akka.Persistence/Eventsourced.Recovery.cs b/src/core/Akka.Persistence/Eventsourced.Recovery.cs
index 1150b4130c8..06a797a98ac 100644
--- a/src/core/Akka.Persistence/Eventsourced.Recovery.cs
+++ b/src/core/Akka.Persistence/Eventsourced.Recovery.cs
@@ -270,7 +270,7 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
}
private void ReturnRecoveryPermit() =>
- Extension.RecoveryPermitter().Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);
+ RecoveryPermitter.Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);
private void TransitToProcessingState()
{
diff --git a/src/core/Akka.Persistence/Eventsourced.cs b/src/core/Akka.Persistence/Eventsourced.cs
index 94aa6cfad18..80eff346edc 100644
--- a/src/core/Akka.Persistence/Eventsourced.cs
+++ b/src/core/Akka.Persistence/Eventsourced.cs
@@ -81,6 +81,7 @@ public abstract partial class Eventsourced : ActorBase, IPersistentIdentity, IPe
private readonly IStash _internalStash;
private IActorRef _snapshotStore;
private IActorRef _journal;
+ private IActorRef _recoveryPermitter;
private List _journalBatch = new();
private bool _isWriteInProgress;
private long _sequenceNr;
@@ -166,6 +167,8 @@ public IStash Stash
///
public IActorRef Journal => _journal ??= Extension.JournalFor(JournalPluginId);
+ internal IActorRef RecoveryPermitter => _recoveryPermitter ??= Extension.RecoveryPermitterFor(JournalPluginId);
+
///
/// TBD
///
diff --git a/src/core/Akka.Persistence/Persistence.cs b/src/core/Akka.Persistence/Persistence.cs
index 0f4685e0356..2d6da79d108 100644
--- a/src/core/Akka.Persistence/Persistence.cs
+++ b/src/core/Akka.Persistence/Persistence.cs
@@ -8,7 +8,6 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
-using System.Reflection;
using System.Threading;
using Akka.Actor;
using Akka.Annotations;
@@ -21,11 +20,12 @@ namespace Akka.Persistence
{
internal struct PluginHolder
{
- public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config)
+ public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config, IActorRef recoveryPermitter)
{
Ref = @ref;
Adapters = adapters;
Config = config;
+ RecoveryPermitter = recoveryPermitter;
}
public IActorRef Ref { get; }
@@ -33,6 +33,8 @@ public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config)
public EventAdapters Adapters { get; }
public Config Config { get; }
+
+ public IActorRef RecoveryPermitter { get; }
}
///
@@ -50,7 +52,6 @@ public class PersistenceExtension : IExtension
private readonly Lazy _defaultJournalPluginId;
private readonly Lazy _defaultSnapshotPluginId;
private readonly Lazy _defaultInternalStashOverflowStrategy;
- private readonly Lazy _recoveryPermitter;
private readonly ConcurrentDictionary> _pluginExtensionIds = new();
@@ -120,12 +121,6 @@ public PersistenceExtension(ExtendedActorSystem system)
_log.Info("Auto-starting snapshot store `{0}`", id);
SnapshotStoreFor(id);
});
-
- _recoveryPermitter = new Lazy(() =>
- {
- var maxPermits = _config.GetInt("max-concurrent-recoveries", 0);
- return _system.SystemActorOf(Akka.Persistence.RecoveryPermitter.Props(maxPermits), "recoveryPermitter");
- });
}
///
@@ -152,9 +147,10 @@ public string PersistenceId(IActorRef actor)
/// INTERNAL API: When starting many persistent actors at the same time the journal its data store is protected
/// from being overloaded by limiting number of recoveries that can be in progress at the same time.
///
- internal IActorRef RecoveryPermitter()
+ internal IActorRef RecoveryPermitterFor(string journalPluginId)
{
- return _recoveryPermitter.Value;
+ var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
+ return PluginHolderFor(configPath, JournalFallbackConfigPath).RecoveryPermitter;
}
///
@@ -270,6 +266,17 @@ private PluginHolder PluginHolderFor(string configPath, string fallbackPath)
return pluginContainer.Value;
}
+ private static IActorRef CreateRecoveryPermitter(ExtendedActorSystem system, string configPath, Config pluginConfig)
+ {
+ // backward compatibility
+ // get the setting from the plugin path, if not found, default to the one defined in "akka.persistence"
+ var maxPermits = pluginConfig.HasPath("max-concurrent-recoveries")
+ ? pluginConfig.GetInt("max-concurrent-recoveries")
+ : system.Settings.Config.GetInt("akka.persistence.max-concurrent-recoveries");
+
+ return system.SystemActorOf(RecoveryPermitter.Props(maxPermits), $"recoveryPermitter-{configPath}");
+ }
+
private static IActorRef CreatePlugin(ExtendedActorSystem system, string configPath, Config pluginConfig)
{
var pluginActorName = configPath;
@@ -303,8 +310,9 @@ private static PluginHolder NewPluginHolder(ExtendedActorSystem system, string c
var config = system.Settings.Config.GetConfig(configPath).WithFallback(system.Settings.Config.GetConfig(fallbackPath));
var plugin = CreatePlugin(system, configPath, config);
var adapters = CreateAdapters(system, configPath);
+ var recoveryPermitter = CreateRecoveryPermitter(system, configPath, config);
- return new PluginHolder(plugin, adapters, config);
+ return new PluginHolder(plugin, adapters, config, recoveryPermitter);
}
}
diff --git a/src/core/Akka.Persistence/RecoveryPermitter.cs b/src/core/Akka.Persistence/RecoveryPermitter.cs
index 2f3eabfe1a8..694443d4062 100644
--- a/src/core/Akka.Persistence/RecoveryPermitter.cs
+++ b/src/core/Akka.Persistence/RecoveryPermitter.cs
@@ -32,6 +32,12 @@ internal sealed class ReturnRecoveryPermit
private ReturnRecoveryPermit() { }
}
+ internal sealed class GetMaxPermits
+ {
+ public static GetMaxPermits Instance { get; } = new();
+ private GetMaxPermits() { }
+ }
+
///
/// When starting many persistent actors at the same time the journal its data store is protected
/// from being overloaded by limiting number of recoveries that can be in progress at the same time.
@@ -55,29 +61,36 @@ public RecoveryPermitter(int maxPermits)
protected override void OnReceive(object message)
{
- if (message is RequestRecoveryPermit)
- {
- Context.Watch(Sender);
- if (_usedPermits >= MaxPermits)
- {
- if (pending.Count == 0)
- Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender);
- pending.AddLast(Sender);
- _maxPendingStats = Math.Max(_maxPendingStats, pending.Count);
- }
- else
- {
- RecoveryPermitGranted(Sender);
- }
- }
- else if (message is ReturnRecoveryPermit)
- {
- ReturnRecoveryPermit(Sender);
- }
- else if (message is Terminated terminated && !pending.Remove(terminated.ActorRef))
+ switch (message)
{
- // pre-mature termination should be rare
- ReturnRecoveryPermit(terminated.ActorRef);
+ case RequestRecoveryPermit:
+ Context.Watch(Sender);
+ if (_usedPermits >= MaxPermits)
+ {
+ if (pending.Count == 0)
+ Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender);
+ pending.AddLast(Sender);
+ _maxPendingStats = Math.Max(_maxPendingStats, pending.Count);
+ }
+ else
+ {
+ RecoveryPermitGranted(Sender);
+ }
+
+ break;
+
+ case Akka.Persistence.ReturnRecoveryPermit:
+ ReturnRecoveryPermit(Sender);
+ break;
+
+ case Terminated terminated when !pending.Remove(terminated.ActorRef):
+ // pre-mature termination should be rare
+ ReturnRecoveryPermit(terminated.ActorRef);
+ break;
+
+ case GetMaxPermits:
+ Sender.Tell(MaxPermits);
+ break;
}
}