Skip to content

Commit

Permalink
Nullify directory entries of defunct local predecessors during grain …
Browse files Browse the repository at this point in the history
…activation (#8704)

* Nullify the directory entry of a known-defunct predecessor on the local silo during grain activation

* Fix formatting
  • Loading branch information
ReubenBond authored Nov 3, 2023
1 parent 938dde3 commit 84a1678
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 47 deletions.
101 changes: 71 additions & 30 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,23 @@ public SiloAddress ForwardingAddress
}
}

/// <summary>
/// Gets the previous directory registration for this grain, if known.
/// This is used to update the grain directory to point to the new registration during activation.
/// </summary>
public GrainAddress PreviousRegistration
{
get => _extras?.PreviousRegistration;
set
{
lock (this)
{
_extras ??= new();
_extras.PreviousRegistration = value;
}
}
}

private Exception DeactivationException => _extras?.DeactivationReason.Exception;

private DeactivationReason DeactivationReason
Expand Down Expand Up @@ -895,7 +912,7 @@ void ProcessPendingRequests()
{
// Add this activation to cache invalidation headers.
message.CacheInvalidationHeader ??= new List<GrainAddressCacheUpdate>();
message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(new GrainAddress { GrainId = GrainId, SiloAddress = Address.SiloAddress }, validAddress: null));
message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(Address, validAddress: null));

var reason = new DeactivationReason(
DeactivationReasonCode.IncompatibleRequest,
Expand Down Expand Up @@ -1082,7 +1099,7 @@ private void RehydrateInternal(IRehydrationContext context)
if (context.TryGetValue(GrainAddressMigrationContextKey, out GrainAddress previousRegistration) && previousRegistration is not null)
{
// Propagate the previous registration, so that the new activation can atomically replace it with its new address.
(_extras ??= new()).PreviousRegistration = previousRegistration;
PreviousRegistration = previousRegistration;
if (_shared.Logger.IsEnabled(LogLevel.Debug))
{
_shared.Logger.LogDebug("Previous activation address was {PreviousRegistration}", previousRegistration);
Expand Down Expand Up @@ -1487,42 +1504,66 @@ private async ValueTask<bool> RegisterActivationInGrainDirectoryAndValidate()
else
{
Exception registrationException;
var previousRegistration = PreviousRegistration;
try
{
var result = await _shared.InternalRuntime.GrainLocator.Register(Address, _extras?.PreviousRegistration);
if (Address.Matches(result))
{
success = true;
}
else
while (true)
{
// Set the forwarding address so that messages enqueued on this activation can be forwarded to
// the existing activation.
ForwardingAddress = result?.SiloAddress;
if (ForwardingAddress is { } address)
var result = await _shared.InternalRuntime.GrainLocator.Register(Address, previousRegistration);
if (Address.Matches(result))
{
DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, $"This grain is active on another host ({address}).");
success = true;
}
else if (result?.SiloAddress is { } registeredSilo && registeredSilo.Equals(Address.SiloAddress))
{
if (_shared.Logger.IsEnabled(LogLevel.Debug))
{
_shared.Logger.LogDebug(
"The grain directory has an existing entry pointing to a different activation of this grain on this silo, {PreviousRegistration}."
+ " This may indicate that the previous activation was deactivated but the directory was not successfully updated."
+ " The directory will be updated to point to this activation.",
previousRegistration);
}

success = false;
CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1);
if (_shared.Logger.IsEnabled(LogLevel.Debug))
// Attempt to register this activation again, using the registration of the previous instance of this grain,
// which is registered to this silo. That activation must be a defunct predecessor of this activation,
// since the catalog only allows one activation of a given grain at a time.
// This could occur if the previous activation failed to unregister itself from the grain directory.
previousRegistration = result;
continue;
}
else
{
// If this was a duplicate, it's not an error, just a race.
// Forward on all of the pending messages, and then forget about this activation.
var primary = _shared.InternalRuntime.LocalGrainDirectory.GetPrimaryForGrain(GrainId);
_shared.Logger.LogDebug(
(int)ErrorCode.Catalog_DuplicateActivation,
"Tried to create a duplicate activation {Address}, but we'll use {ForwardingAddress} instead. "
+ "GrainInstance type is {GrainInstanceType}. {PrimaryMessage}"
+ "Full activation address is {Address}. We have {WaitingCount} messages to forward.",
Address,
ForwardingAddress,
GrainInstance?.GetType(),
primary != null ? "Primary Directory partition for this grain is " + primary + ". " : string.Empty,
Address.ToFullString(),
WaitingCount);
// Set the forwarding address so that messages enqueued on this activation can be forwarded to
// the existing activation.
ForwardingAddress = result?.SiloAddress;
if (ForwardingAddress is { } address)
{
DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, $"This grain is active on another host ({address}).");
}

success = false;
CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1);
if (_shared.Logger.IsEnabled(LogLevel.Debug))
{
// If this was a duplicate, it's not an error, just a race.
// Forward on all of the pending messages, and then forget about this activation.
var primary = _shared.InternalRuntime.LocalGrainDirectory.GetPrimaryForGrain(GrainId);
_shared.Logger.LogDebug(
(int)ErrorCode.Catalog_DuplicateActivation,
"Tried to create a duplicate activation {Address}, but we'll use {ForwardingAddress} instead. "
+ "GrainInstance type is {GrainInstanceType}. {PrimaryMessage}"
+ "Full activation address is {Address}. We have {WaitingCount} messages to forward.",
Address,
ForwardingAddress,
GrainInstance?.GetType(),
primary != null ? "Primary Directory partition for this grain is " + primary + ". " : string.Empty,
Address.ToFullString(),
WaitingCount);
}
}

break;
}

registrationException = null;
Expand Down
20 changes: 6 additions & 14 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,6 @@ public DetailedGrainReport GetDetailedGrainReport(GrainId grain)
};
}

/// <summary>
/// Register a new object to which messages can be delivered with the local lookup table and scheduler.
/// </summary>
/// <param name="activation"></param>
public void RegisterMessageTarget(IGrainContext activation)
{
activations.RecordNewTarget(activation);
CatalogInstruments.ActivationsCreated.Add(1);
}

/// <summary>
/// Unregister message target and stop delivering messages to it
/// </summary>
Expand Down Expand Up @@ -271,9 +261,9 @@ public IGrainContext GetOrCreateActivation(

if (!SiloStatusOracle.CurrentStatus.IsTerminating())
{
var address = GrainAddress.GetAddress(Silo, grainId, new ActivationId(Guid.NewGuid()));
var address = GrainAddress.GetAddress(Silo, grainId, ActivationId.NewId());
result = this.grainActivator.CreateInstance(address);
RegisterMessageTarget(result);
activations.RecordNewTarget(result);
}
} // End lock

Expand All @@ -283,6 +273,8 @@ public IGrainContext GetOrCreateActivation(
return UnableToCreateActivation(this, grainId);
}

CatalogInstruments.ActivationsCreated.Add(1);

// Rehydration occurs before activation.
if (rehydrationContext is not null)
{
Expand Down Expand Up @@ -317,7 +309,7 @@ static IGrainContext UnableToCreateActivation(Catalog self, GrainId grainId)
// Unregister the target activation so we don't keep getting spurious messages.
// The time delay (one minute, as of this writing) is to handle the unlikely but possible race where
// this request snuck ahead of another request, with new placement requested, for the same activation.
// If the activation registration request from the new placement somehow sneaks ahead of this unregistration,
// If the activation registration request from the new placement somehow sneaks ahead of this deregistration,
// we want to make sure that we don't unregister the activation we just created.
var address = new GrainAddress { SiloAddress = self.Silo, GrainId = grainId };
_ = self.UnregisterNonExistentActivation(address);
Expand All @@ -344,7 +336,7 @@ private async Task UnregisterNonExistentActivation(GrainAddress address)
/// <summary>
/// Try to get runtime data for an activation
/// </summary>
public bool TryGetGrainContext(GrainId grainId, out IGrainContext data)
private bool TryGetGrainContext(GrainId grainId, out IGrainContext data)
{
data = activations.FindTarget(grainId);
return data != null;
Expand Down
3 changes: 0 additions & 3 deletions src/Orleans.Runtime/Messaging/MessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable
private readonly SiloAddress _siloAddress;
private readonly SiloMessagingOptions messagingOptions;
private readonly PlacementService placementService;
private readonly ActivationDirectory activationDirectory;
private readonly GrainLocator _grainLocator;
private readonly ILogger log;
private readonly Catalog catalog;
Expand All @@ -43,7 +42,6 @@ public MessageCenter(
RuntimeMessagingTrace messagingTrace,
IOptions<SiloMessagingOptions> messagingOptions,
PlacementService placementService,
ActivationDirectory activationDirectory,
GrainLocator grainLocator)
{
this.catalog = catalog;
Expand All @@ -52,7 +50,6 @@ public MessageCenter(
this.connectionManager = senderManager;
this.messagingTrace = messagingTrace;
this.placementService = placementService;
this.activationDirectory = activationDirectory;
_grainLocator = grainLocator;
this.log = logger;
this.messageFactory = messageFactory;
Expand Down
2 changes: 2 additions & 0 deletions test/Grains/TestGrainInterfaces/ITestGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface IGuidTestGrain : IGrainWithGuidKey
Task<string> GetRuntimeInstanceId();

Task<string> GetActivationId();

Task<SiloAddress> GetSiloAddress();
}

public interface IOneWayGrain : IGrainWithGuidKey
Expand Down
3 changes: 3 additions & 0 deletions test/Grains/TestInternalGrains/TestGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public Task<long> GetKey()
}
}

[GrainType("guid-test-grain")]
internal class GuidTestGrain : Grain, IGuidTestGrain
{
private readonly string _id = Guid.NewGuid().ToString();
Expand Down Expand Up @@ -200,6 +201,8 @@ public Task<string> GetActivationId()
{
return Task.FromResult(_id);
}

public Task<SiloAddress> GetSiloAddress() => Task.FromResult(ServiceProvider.GetRequiredService<ILocalSiloDetails>().SiloAddress);
}

internal class OneWayGrain : Grain, IOneWayGrain, ISimpleGrainObserver
Expand Down
82 changes: 82 additions & 0 deletions test/TesterInternal/GrainLocatorActivationResiliencyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System.Diagnostics;
using System.Net;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

using Orleans.Runtime;
using Orleans.Runtime.GrainDirectory;
using Orleans.Runtime.Placement;
using Orleans.TestingHost;
using TestExtensions;
using UnitTests.GrainInterfaces;
using Xunit;

namespace UnitTests
{
/// <summary>
/// Tests that grain activation can recover from invalid grain directory entries.
/// </summary>
public class GrainLocatorActivationResiliencyTests : HostedTestClusterEnsureDefaultStarted
{
public GrainLocatorActivationResiliencyTests(DefaultClusterFixture fixture) : base(fixture)
{
}

/// <summary>
/// Tests that a grain can be activated even if the grain locator indicates that there is an existing registration for that grain on the same silo.
/// </summary>
[Fact, TestCategory("BVT")]
public async Task ReactivateGrainWithPoisonGrainDirectoryEntry_LocalSilo()
{
var primarySilo = (InProcessSiloHandle)Fixture.HostedCluster.Primary;
var primarySiloAddress = primarySilo.SiloAddress;
var grain = GrainFactory.GetGrain<IGuidTestGrain>(Guid.NewGuid());

// Insert an entry into the grain directory which points to a grain which is not active.
// The entry points to the silo which the activation will be created on, but it points to a different activation.
// This will cause the first registration attempt to fail, but the activation should recognize that the grain
// directory entry is incorrect (points to the current silo, but an activation which must be defunct) and retry
// registration, passing the previous registration so that the grain directory entry is updated.
var grainLocator = primarySilo.SiloHost.Services.GetRequiredService<GrainLocator>();
var badAddress = GrainAddress.GetAddress(primarySiloAddress, grain.GetGrainId(), ActivationId.NewId());
await grainLocator.Register(badAddress, previousRegistration: null);

{
// Rig placement to occurs on the primary silo.
RequestContext.Set(IPlacementDirector.PlacementHintKey, primarySiloAddress);
var silo = await grain.GetSiloAddress();
Assert.Equal(primarySiloAddress, silo);
}
}

/// <summary>
/// Similar to <see cref="ReactivateGrainWithPoisonGrainDirectoryEntry_LocalSilo"/>, except this test attempts to activate the grain on a different silo
/// from the one specified in the grain directory entry, to ensure that the call will be forwarded to the correct silo.
/// </summary>
[Fact, TestCategory("BVT")]
public async Task ReactivateGrainWithPoisonGrainDirectoryEntry_RemoteSilo()
{
var primarySilo = (InProcessSiloHandle)Fixture.HostedCluster.Primary;
var secondarySiloAddress = Fixture.HostedCluster.SecondarySilos.First().SiloAddress;
var primarySiloAddress = primarySilo.SiloAddress;
var grain = GrainFactory.GetGrain<IGuidTestGrain>(Guid.NewGuid());

// Insert an entry into the grain directory which points to a grain which is not active.
// The entry points to another silo, but that silo also does not host the registered activation.
// This will cause the first registration attempt to fail, but the activation will forward messages to the
// silo which the registration points to and the subsequent activation will also initially fail registration,
// but succeed later.
var grainLocator = primarySilo.SiloHost.Services.GetRequiredService<GrainLocator>();
var badAddress = GrainAddress.GetAddress(primarySiloAddress, grain.GetGrainId(), ActivationId.NewId());
await grainLocator.Register(badAddress, previousRegistration: null);

{
// Rig placement to occurs on the secondary silo, but since there is a directory entry pointing to the primary silo,
// the call should be forwarded to the primary silo where the grain will be activated.
RequestContext.Set(IPlacementDirector.PlacementHintKey, secondarySiloAddress);
var silo = await grain.GetSiloAddress();
Assert.Equal(primarySiloAddress, silo);
}
}
}
}

0 comments on commit 84a1678

Please sign in to comment.