Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-plugin recovery permitter actors #7448

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions src/core/Akka.Persistence.Tests/MultipleRecoveryPermitterSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// -----------------------------------------------------------------------
// <copyright file="MultipleRecoveryPermitterSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using FluentAssertions;
using Xunit;

namespace Akka.Persistence.Tests;

public class MultipleRecoveryPermitterSpec : PersistenceSpec
{
private readonly IActorRef _permitter1;
private readonly IActorRef _permitter2;

public MultipleRecoveryPermitterSpec() : base(ConfigurationFactory.ParseString($$"""
akka.persistence
{
# default global max recovery value
max-concurrent-recoveries = 3

journal
{
plugin = "akka.persistence.journal.inmem"
inmem2 {
# max recovery value override
max-concurrent-recoveries = 20
class = "Akka.Persistence.Journal.MemoryJournal, Akka.Persistence"
plugin-dispatcher = "akka.actor.default-dispatcher"
}
}

# snapshot store plugin is NOT defined, things should still work
snapshot-store.plugin = "akka.persistence.no-snapshot-store"
snapshot-store.local.dir = "target/snapshots-"{{typeof(RecoveryPermitterSpec).FullName}}"}
"""))
{
var extension = Persistence.Instance.Apply(Sys);
_permitter1 = extension.RecoveryPermitterFor(null);
_permitter2 = extension.RecoveryPermitterFor("akka.persistence.journal.inmem2");
}

[Fact(DisplayName = "Plugin max-concurrent-recoveries HOCON setting should override akka.persistence setting")]
public async Task HoconOverrideTest()
{
_permitter1.Tell(GetMaxPermits.Instance);
await ExpectMsgAsync(3);

_permitter2.Tell(GetMaxPermits.Instance);
await ExpectMsgAsync(20);
}

[Fact(DisplayName = "Each plugin should have their own recovery permitter")]
public void MultiRecoveryPermitterActorTest()
{
_permitter1.Equals(_permitter2).Should().BeFalse();
}
}
2 changes: 1 addition & 1 deletion src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public RecoveryPermitterSpec() : base(ConfigurationFactory.ParseString(@"
akka.persistence.snapshot-store.local.dir = ""target/snapshots-" + typeof(RecoveryPermitterSpec).FullName +
@"/"""))
{
permitter = Persistence.Instance.Apply(Sys).RecoveryPermitter();
permitter = Persistence.Instance.Apply(Sys).RecoveryPermitterFor(null);
}

private void RequestPermit(TestProbe probe)
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private void StartRecovery(Recovery recovery)

private void RequestRecoveryPermit()
{
Extension.RecoveryPermitter().Tell(Akka.Persistence.RequestRecoveryPermit.Instance, Self);
RecoveryPermitter.Tell(Akka.Persistence.RequestRecoveryPermit.Instance, Self);
ChangeState(WaitingRecoveryPermit(Recovery));
}

Expand All @@ -46,6 +46,7 @@ public override void AroundPreStart()
// Fail fast on missing plugins.
var j = Journal;
var s = SnapshotStore;
var r = RecoveryPermitter;
RequestRecoveryPermit();
base.AroundPreStart();
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Persistence/Eventsourced.Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
}

private void ReturnRecoveryPermit() =>
Extension.RecoveryPermitter().Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);
RecoveryPermitter.Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);

private void TransitToProcessingState()
{
Expand Down
3 changes: 3 additions & 0 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public abstract partial class Eventsourced : ActorBase, IPersistentIdentity, IPe
private readonly IStash _internalStash;
private IActorRef _snapshotStore;
private IActorRef _journal;
private IActorRef _recoveryPermitter;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recovery permitter is being cached here instead of in the Persistence extension. This is lazily created when the plugin is initialized for the first time.

private List<IPersistentEnvelope> _journalBatch = new();
private bool _isWriteInProgress;
private long _sequenceNr;
Expand Down Expand Up @@ -166,6 +167,8 @@ public IStash Stash
/// </summary>
public IActorRef Journal => _journal ??= Extension.JournalFor(JournalPluginId);

internal IActorRef RecoveryPermitter => _recoveryPermitter ??= Extension.RecoveryPermitterFor(JournalPluginId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lazy instantiation


/// <summary>
/// TBD
/// </summary>
Expand Down
32 changes: 20 additions & 12 deletions src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Reflection;
using System.Threading;
using Akka.Actor;
using Akka.Annotations;
Expand All @@ -21,18 +20,21 @@ namespace Akka.Persistence
{
internal struct PluginHolder
{
public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config)
public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config, IActorRef recoveryPermitter)
{
Ref = @ref;
Adapters = adapters;
Config = config;
RecoveryPermitter = recoveryPermitter;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recovery permitter is also cached inside the PluginHolder so that it can be looked up.

}

public IActorRef Ref { get; }

public EventAdapters Adapters { get; }

public Config Config { get; }

public IActorRef RecoveryPermitter { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

/// <summary>
Expand All @@ -50,7 +52,6 @@ public class PersistenceExtension : IExtension
private readonly Lazy<string> _defaultJournalPluginId;
private readonly Lazy<string> _defaultSnapshotPluginId;
private readonly Lazy<IStashOverflowStrategy> _defaultInternalStashOverflowStrategy;
private readonly Lazy<IActorRef> _recoveryPermitter;

private readonly ConcurrentDictionary<string, Lazy<PluginHolder>> _pluginExtensionIds = new();

Expand Down Expand Up @@ -120,12 +121,6 @@ public PersistenceExtension(ExtendedActorSystem system)
_log.Info("Auto-starting snapshot store `{0}`", id);
SnapshotStoreFor(id);
});

_recoveryPermitter = new Lazy<IActorRef>(() =>
{
var maxPermits = _config.GetInt("max-concurrent-recoveries", 0);
return _system.SystemActorOf(Akka.Persistence.RecoveryPermitter.Props(maxPermits), "recoveryPermitter");
});
}

/// <summary>
Expand All @@ -152,9 +147,10 @@ public string PersistenceId(IActorRef actor)
/// INTERNAL API: When starting many persistent actors at the same time the journal its data store is protected
/// from being overloaded by limiting number of recoveries that can be in progress at the same time.
/// </summary>
internal IActorRef RecoveryPermitter()
internal IActorRef RecoveryPermitterFor(string journalPluginId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same design as AdaptersFor() to look up recovery permitter actor per plugin basis

{
return _recoveryPermitter.Value;
var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
return PluginHolderFor(configPath, JournalFallbackConfigPath).RecoveryPermitter;
}

/// <summary>
Expand Down Expand Up @@ -270,6 +266,17 @@ private PluginHolder PluginHolderFor(string configPath, string fallbackPath)
return pluginContainer.Value;
}

private static IActorRef CreateRecoveryPermitter(ExtendedActorSystem system, string configPath, Config pluginConfig)
{
// backward compatibility
// get the setting from the plugin path, if not found, default to the one defined in "akka.persistence"
var maxPermits = pluginConfig.HasPath("max-concurrent-recoveries")
? pluginConfig.GetInt("max-concurrent-recoveries")
: system.Settings.Config.GetInt("akka.persistence.max-concurrent-recoveries");

return system.SystemActorOf(RecoveryPermitter.Props(maxPermits), $"recoveryPermitter-{configPath}");
}

private static IActorRef CreatePlugin(ExtendedActorSystem system, string configPath, Config pluginConfig)
{
var pluginActorName = configPath;
Expand Down Expand Up @@ -303,8 +310,9 @@ private static PluginHolder NewPluginHolder(ExtendedActorSystem system, string c
var config = system.Settings.Config.GetConfig(configPath).WithFallback(system.Settings.Config.GetConfig(fallbackPath));
var plugin = CreatePlugin(system, configPath, config);
var adapters = CreateAdapters(system, configPath);
var recoveryPermitter = CreateRecoveryPermitter(system, configPath, config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


return new PluginHolder(plugin, adapters, config);
return new PluginHolder(plugin, adapters, config, recoveryPermitter);
}
}

Expand Down
57 changes: 35 additions & 22 deletions src/core/Akka.Persistence/RecoveryPermitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ internal sealed class ReturnRecoveryPermit
private ReturnRecoveryPermit() { }
}

internal sealed class GetMaxPermits
{
public static GetMaxPermits Instance { get; } = new();
private GetMaxPermits() { }
}

/// <summary>
/// When starting many persistent actors at the same time the journal its data store is protected
/// from being overloaded by limiting number of recoveries that can be in progress at the same time.
Expand All @@ -55,29 +61,36 @@ public RecoveryPermitter(int maxPermits)

protected override void OnReceive(object message)
{
if (message is RequestRecoveryPermit)
{
Context.Watch(Sender);
if (_usedPermits >= MaxPermits)
{
if (pending.Count == 0)
Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender);
pending.AddLast(Sender);
_maxPendingStats = Math.Max(_maxPendingStats, pending.Count);
}
else
{
RecoveryPermitGranted(Sender);
}
}
else if (message is ReturnRecoveryPermit)
{
ReturnRecoveryPermit(Sender);
}
else if (message is Terminated terminated && !pending.Remove(terminated.ActorRef))
switch (message)
{
// pre-mature termination should be rare
ReturnRecoveryPermit(terminated.ActorRef);
case RequestRecoveryPermit:
Context.Watch(Sender);
if (_usedPermits >= MaxPermits)
{
if (pending.Count == 0)
Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender);
pending.AddLast(Sender);
_maxPendingStats = Math.Max(_maxPendingStats, pending.Count);
}
else
{
RecoveryPermitGranted(Sender);
}

break;

case Akka.Persistence.ReturnRecoveryPermit:
ReturnRecoveryPermit(Sender);
break;

case Terminated terminated when !pending.Remove(terminated.ActorRef):
// pre-mature termination should be rare
ReturnRecoveryPermit(terminated.ActorRef);
break;

case GetMaxPermits:
Sender.Tell(MaxPermits);
break;
}
}

Expand Down
Loading