From 84a167804c3186da364f40ff2a5b8afcbef6d100 Mon Sep 17 00:00:00 2001
From: Reuben Bond <203839+ReubenBond@users.noreply.github.com>
Date: Fri, 3 Nov 2023 11:39:11 -0700
Subject: [PATCH] Nullify directory entries of defunct local predecessors
during grain activation (#8704)
* Nullify the directory entry of a known-defunct predecessor on the local silo during grain activation
* Fix formatting
---
src/Orleans.Runtime/Catalog/ActivationData.cs | 101 ++++++++++++------
src/Orleans.Runtime/Catalog/Catalog.cs | 20 ++--
.../Messaging/MessageCenter.cs | 3 -
test/Grains/TestGrainInterfaces/ITestGrain.cs | 2 +
test/Grains/TestInternalGrains/TestGrain.cs | 3 +
.../GrainLocatorActivationResiliencyTests.cs | 82 ++++++++++++++
6 files changed, 164 insertions(+), 47 deletions(-)
create mode 100644 test/TesterInternal/GrainLocatorActivationResiliencyTests.cs
diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs
index 6da9ef23e8..dc28bf830d 100644
--- a/src/Orleans.Runtime/Catalog/ActivationData.cs
+++ b/src/Orleans.Runtime/Catalog/ActivationData.cs
@@ -120,6 +120,23 @@ public SiloAddress ForwardingAddress
}
}
+ ///
+ /// Gets the previous directory registration for this grain, if known.
+ /// This is used to update the grain directory to point to the new registration during activation.
+ ///
+ public GrainAddress PreviousRegistration
+ {
+ get => _extras?.PreviousRegistration;
+ set
+ {
+ lock (this)
+ {
+ _extras ??= new();
+ _extras.PreviousRegistration = value;
+ }
+ }
+ }
+
private Exception DeactivationException => _extras?.DeactivationReason.Exception;
private DeactivationReason DeactivationReason
@@ -895,7 +912,7 @@ void ProcessPendingRequests()
{
// Add this activation to cache invalidation headers.
message.CacheInvalidationHeader ??= new List();
- message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(new GrainAddress { GrainId = GrainId, SiloAddress = Address.SiloAddress }, validAddress: null));
+ message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(Address, validAddress: null));
var reason = new DeactivationReason(
DeactivationReasonCode.IncompatibleRequest,
@@ -1082,7 +1099,7 @@ private void RehydrateInternal(IRehydrationContext context)
if (context.TryGetValue(GrainAddressMigrationContextKey, out GrainAddress previousRegistration) && previousRegistration is not null)
{
// Propagate the previous registration, so that the new activation can atomically replace it with its new address.
- (_extras ??= new()).PreviousRegistration = previousRegistration;
+ PreviousRegistration = previousRegistration;
if (_shared.Logger.IsEnabled(LogLevel.Debug))
{
_shared.Logger.LogDebug("Previous activation address was {PreviousRegistration}", previousRegistration);
@@ -1487,42 +1504,66 @@ private async ValueTask RegisterActivationInGrainDirectoryAndValidate()
else
{
Exception registrationException;
+ var previousRegistration = PreviousRegistration;
try
{
- var result = await _shared.InternalRuntime.GrainLocator.Register(Address, _extras?.PreviousRegistration);
- if (Address.Matches(result))
- {
- success = true;
- }
- else
+ while (true)
{
- // Set the forwarding address so that messages enqueued on this activation can be forwarded to
- // the existing activation.
- ForwardingAddress = result?.SiloAddress;
- if (ForwardingAddress is { } address)
+ var result = await _shared.InternalRuntime.GrainLocator.Register(Address, previousRegistration);
+ if (Address.Matches(result))
{
- DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, $"This grain is active on another host ({address}).");
+ success = true;
}
+ else if (result?.SiloAddress is { } registeredSilo && registeredSilo.Equals(Address.SiloAddress))
+ {
+ if (_shared.Logger.IsEnabled(LogLevel.Debug))
+ {
+ _shared.Logger.LogDebug(
+ "The grain directory has an existing entry pointing to a different activation of this grain on this silo, {PreviousRegistration}."
+ + " This may indicate that the previous activation was deactivated but the directory was not successfully updated."
+ + " The directory will be updated to point to this activation.",
+ previousRegistration);
+ }
- success = false;
- CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1);
- if (_shared.Logger.IsEnabled(LogLevel.Debug))
+ // Attempt to register this activation again, using the registration of the previous instance of this grain,
+ // which is registered to this silo. That activation must be a defunct predecessor of this activation,
+ // since the catalog only allows one activation of a given grain at a time.
+ // This could occur if the previous activation failed to unregister itself from the grain directory.
+ previousRegistration = result;
+ continue;
+ }
+ else
{
- // If this was a duplicate, it's not an error, just a race.
- // Forward on all of the pending messages, and then forget about this activation.
- var primary = _shared.InternalRuntime.LocalGrainDirectory.GetPrimaryForGrain(GrainId);
- _shared.Logger.LogDebug(
- (int)ErrorCode.Catalog_DuplicateActivation,
- "Tried to create a duplicate activation {Address}, but we'll use {ForwardingAddress} instead. "
- + "GrainInstance type is {GrainInstanceType}. {PrimaryMessage}"
- + "Full activation address is {Address}. We have {WaitingCount} messages to forward.",
- Address,
- ForwardingAddress,
- GrainInstance?.GetType(),
- primary != null ? "Primary Directory partition for this grain is " + primary + ". " : string.Empty,
- Address.ToFullString(),
- WaitingCount);
+ // Set the forwarding address so that messages enqueued on this activation can be forwarded to
+ // the existing activation.
+ ForwardingAddress = result?.SiloAddress;
+ if (ForwardingAddress is { } address)
+ {
+ DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, $"This grain is active on another host ({address}).");
+ }
+
+ success = false;
+ CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1);
+ if (_shared.Logger.IsEnabled(LogLevel.Debug))
+ {
+ // If this was a duplicate, it's not an error, just a race.
+ // Forward on all of the pending messages, and then forget about this activation.
+ var primary = _shared.InternalRuntime.LocalGrainDirectory.GetPrimaryForGrain(GrainId);
+ _shared.Logger.LogDebug(
+ (int)ErrorCode.Catalog_DuplicateActivation,
+ "Tried to create a duplicate activation {Address}, but we'll use {ForwardingAddress} instead. "
+ + "GrainInstance type is {GrainInstanceType}. {PrimaryMessage}"
+ + "Full activation address is {Address}. We have {WaitingCount} messages to forward.",
+ Address,
+ ForwardingAddress,
+ GrainInstance?.GetType(),
+ primary != null ? "Primary Directory partition for this grain is " + primary + ". " : string.Empty,
+ Address.ToFullString(),
+ WaitingCount);
+ }
}
+
+ break;
}
registrationException = null;
diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs
index f384e9c272..b56c6e283b 100644
--- a/src/Orleans.Runtime/Catalog/Catalog.cs
+++ b/src/Orleans.Runtime/Catalog/Catalog.cs
@@ -167,16 +167,6 @@ public DetailedGrainReport GetDetailedGrainReport(GrainId grain)
};
}
- ///
- /// Register a new object to which messages can be delivered with the local lookup table and scheduler.
- ///
- ///
- public void RegisterMessageTarget(IGrainContext activation)
- {
- activations.RecordNewTarget(activation);
- CatalogInstruments.ActivationsCreated.Add(1);
- }
-
///
/// Unregister message target and stop delivering messages to it
///
@@ -271,9 +261,9 @@ public IGrainContext GetOrCreateActivation(
if (!SiloStatusOracle.CurrentStatus.IsTerminating())
{
- var address = GrainAddress.GetAddress(Silo, grainId, new ActivationId(Guid.NewGuid()));
+ var address = GrainAddress.GetAddress(Silo, grainId, ActivationId.NewId());
result = this.grainActivator.CreateInstance(address);
- RegisterMessageTarget(result);
+ activations.RecordNewTarget(result);
}
} // End lock
@@ -283,6 +273,8 @@ public IGrainContext GetOrCreateActivation(
return UnableToCreateActivation(this, grainId);
}
+ CatalogInstruments.ActivationsCreated.Add(1);
+
// Rehydration occurs before activation.
if (rehydrationContext is not null)
{
@@ -317,7 +309,7 @@ static IGrainContext UnableToCreateActivation(Catalog self, GrainId grainId)
// Unregister the target activation so we don't keep getting spurious messages.
// The time delay (one minute, as of this writing) is to handle the unlikely but possible race where
// this request snuck ahead of another request, with new placement requested, for the same activation.
- // If the activation registration request from the new placement somehow sneaks ahead of this unregistration,
+ // If the activation registration request from the new placement somehow sneaks ahead of this deregistration,
// we want to make sure that we don't unregister the activation we just created.
var address = new GrainAddress { SiloAddress = self.Silo, GrainId = grainId };
_ = self.UnregisterNonExistentActivation(address);
@@ -344,7 +336,7 @@ private async Task UnregisterNonExistentActivation(GrainAddress address)
///
/// Try to get runtime data for an activation
///
- public bool TryGetGrainContext(GrainId grainId, out IGrainContext data)
+ private bool TryGetGrainContext(GrainId grainId, out IGrainContext data)
{
data = activations.FindTarget(grainId);
return data != null;
diff --git a/src/Orleans.Runtime/Messaging/MessageCenter.cs b/src/Orleans.Runtime/Messaging/MessageCenter.cs
index 394702bf49..e095ddc4ba 100644
--- a/src/Orleans.Runtime/Messaging/MessageCenter.cs
+++ b/src/Orleans.Runtime/Messaging/MessageCenter.cs
@@ -24,7 +24,6 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable
private readonly SiloAddress _siloAddress;
private readonly SiloMessagingOptions messagingOptions;
private readonly PlacementService placementService;
- private readonly ActivationDirectory activationDirectory;
private readonly GrainLocator _grainLocator;
private readonly ILogger log;
private readonly Catalog catalog;
@@ -43,7 +42,6 @@ public MessageCenter(
RuntimeMessagingTrace messagingTrace,
IOptions messagingOptions,
PlacementService placementService,
- ActivationDirectory activationDirectory,
GrainLocator grainLocator)
{
this.catalog = catalog;
@@ -52,7 +50,6 @@ public MessageCenter(
this.connectionManager = senderManager;
this.messagingTrace = messagingTrace;
this.placementService = placementService;
- this.activationDirectory = activationDirectory;
_grainLocator = grainLocator;
this.log = logger;
this.messageFactory = messageFactory;
diff --git a/test/Grains/TestGrainInterfaces/ITestGrain.cs b/test/Grains/TestGrainInterfaces/ITestGrain.cs
index d2d2f06f43..6cc55208fe 100644
--- a/test/Grains/TestGrainInterfaces/ITestGrain.cs
+++ b/test/Grains/TestGrainInterfaces/ITestGrain.cs
@@ -49,6 +49,8 @@ public interface IGuidTestGrain : IGrainWithGuidKey
Task GetRuntimeInstanceId();
Task GetActivationId();
+
+ Task GetSiloAddress();
}
public interface IOneWayGrain : IGrainWithGuidKey
diff --git a/test/Grains/TestInternalGrains/TestGrain.cs b/test/Grains/TestInternalGrains/TestGrain.cs
index 6b004a343b..0fa918fcea 100644
--- a/test/Grains/TestInternalGrains/TestGrain.cs
+++ b/test/Grains/TestInternalGrains/TestGrain.cs
@@ -152,6 +152,7 @@ public Task GetKey()
}
}
+ [GrainType("guid-test-grain")]
internal class GuidTestGrain : Grain, IGuidTestGrain
{
private readonly string _id = Guid.NewGuid().ToString();
@@ -200,6 +201,8 @@ public Task GetActivationId()
{
return Task.FromResult(_id);
}
+
+ public Task GetSiloAddress() => Task.FromResult(ServiceProvider.GetRequiredService().SiloAddress);
}
internal class OneWayGrain : Grain, IOneWayGrain, ISimpleGrainObserver
diff --git a/test/TesterInternal/GrainLocatorActivationResiliencyTests.cs b/test/TesterInternal/GrainLocatorActivationResiliencyTests.cs
new file mode 100644
index 0000000000..7332cf49a0
--- /dev/null
+++ b/test/TesterInternal/GrainLocatorActivationResiliencyTests.cs
@@ -0,0 +1,82 @@
+using System.Diagnostics;
+using System.Net;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+using Orleans.Runtime;
+using Orleans.Runtime.GrainDirectory;
+using Orleans.Runtime.Placement;
+using Orleans.TestingHost;
+using TestExtensions;
+using UnitTests.GrainInterfaces;
+using Xunit;
+
+namespace UnitTests
+{
+ ///
+ /// Tests that grain activation can recover from invalid grain directory entries.
+ ///
+ public class GrainLocatorActivationResiliencyTests : HostedTestClusterEnsureDefaultStarted
+ {
+ public GrainLocatorActivationResiliencyTests(DefaultClusterFixture fixture) : base(fixture)
+ {
+ }
+
+ ///
+ /// Tests that a grain can be activated even if the grain locator indicates that there is an existing registration for that grain on the same silo.
+ ///
+ [Fact, TestCategory("BVT")]
+ public async Task ReactivateGrainWithPoisonGrainDirectoryEntry_LocalSilo()
+ {
+ var primarySilo = (InProcessSiloHandle)Fixture.HostedCluster.Primary;
+ var primarySiloAddress = primarySilo.SiloAddress;
+ var grain = GrainFactory.GetGrain(Guid.NewGuid());
+
+ // Insert an entry into the grain directory which points to a grain which is not active.
+ // The entry points to the silo which the activation will be created on, but it points to a different activation.
+ // This will cause the first registration attempt to fail, but the activation should recognize that the grain
+ // directory entry is incorrect (points to the current silo, but an activation which must be defunct) and retry
+ // registration, passing the previous registration so that the grain directory entry is updated.
+ var grainLocator = primarySilo.SiloHost.Services.GetRequiredService();
+ var badAddress = GrainAddress.GetAddress(primarySiloAddress, grain.GetGrainId(), ActivationId.NewId());
+ await grainLocator.Register(badAddress, previousRegistration: null);
+
+ {
+ // Rig placement to occurs on the primary silo.
+ RequestContext.Set(IPlacementDirector.PlacementHintKey, primarySiloAddress);
+ var silo = await grain.GetSiloAddress();
+ Assert.Equal(primarySiloAddress, silo);
+ }
+ }
+
+ ///
+ /// Similar to , except this test attempts to activate the grain on a different silo
+ /// from the one specified in the grain directory entry, to ensure that the call will be forwarded to the correct silo.
+ ///
+ [Fact, TestCategory("BVT")]
+ public async Task ReactivateGrainWithPoisonGrainDirectoryEntry_RemoteSilo()
+ {
+ var primarySilo = (InProcessSiloHandle)Fixture.HostedCluster.Primary;
+ var secondarySiloAddress = Fixture.HostedCluster.SecondarySilos.First().SiloAddress;
+ var primarySiloAddress = primarySilo.SiloAddress;
+ var grain = GrainFactory.GetGrain(Guid.NewGuid());
+
+ // Insert an entry into the grain directory which points to a grain which is not active.
+ // The entry points to another silo, but that silo also does not host the registered activation.
+ // This will cause the first registration attempt to fail, but the activation will forward messages to the
+ // silo which the registration points to and the subsequent activation will also initially fail registration,
+ // but succeed later.
+ var grainLocator = primarySilo.SiloHost.Services.GetRequiredService();
+ var badAddress = GrainAddress.GetAddress(primarySiloAddress, grain.GetGrainId(), ActivationId.NewId());
+ await grainLocator.Register(badAddress, previousRegistration: null);
+
+ {
+ // Rig placement to occurs on the secondary silo, but since there is a directory entry pointing to the primary silo,
+ // the call should be forwarded to the primary silo where the grain will be activated.
+ RequestContext.Set(IPlacementDirector.PlacementHintKey, secondarySiloAddress);
+ var silo = await grain.GetSiloAddress();
+ Assert.Equal(primarySiloAddress, silo);
+ }
+ }
+ }
+}