Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nullify directory entries of defunct local predecessors during grain activation #8704

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 71 additions & 30 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,23 @@ public SiloAddress ForwardingAddress
}
}

/// <summary>
/// Gets the previous directory registration for this grain, if known.
/// This is used to update the grain directory to point to the new registration during activation.
/// </summary>
public GrainAddress PreviousRegistration
{
get => _extras?.PreviousRegistration;
set
{
lock (this)
{
_extras ??= new();
_extras.PreviousRegistration = value;
}
}
}

private Exception DeactivationException => _extras?.DeactivationReason.Exception;

private DeactivationReason DeactivationReason
Expand Down Expand Up @@ -895,7 +912,7 @@ void ProcessPendingRequests()
{
// Add this activation to cache invalidation headers.
message.CacheInvalidationHeader ??= new List<GrainAddressCacheUpdate>();
message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(new GrainAddress { GrainId = GrainId, SiloAddress = Address.SiloAddress }, validAddress: null));
message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(Address, validAddress: null));

var reason = new DeactivationReason(
DeactivationReasonCode.IncompatibleRequest,
Expand Down Expand Up @@ -1082,7 +1099,7 @@ private void RehydrateInternal(IRehydrationContext context)
if (context.TryGetValue(GrainAddressMigrationContextKey, out GrainAddress previousRegistration) && previousRegistration is not null)
{
// Propagate the previous registration, so that the new activation can atomically replace it with its new address.
(_extras ??= new()).PreviousRegistration = previousRegistration;
PreviousRegistration = previousRegistration;
if (_shared.Logger.IsEnabled(LogLevel.Debug))
{
_shared.Logger.LogDebug("Previous activation address was {PreviousRegistration}", previousRegistration);
Expand Down Expand Up @@ -1487,42 +1504,66 @@ private async ValueTask<bool> RegisterActivationInGrainDirectoryAndValidate()
else
{
Exception registrationException;
var previousRegistration = PreviousRegistration;
try
{
var result = await _shared.InternalRuntime.GrainLocator.Register(Address, _extras?.PreviousRegistration);
if (Address.Matches(result))
{
success = true;
}
else
while (true)
{
// Set the forwarding address so that messages enqueued on this activation can be forwarded to
// the existing activation.
ForwardingAddress = result?.SiloAddress;
if (ForwardingAddress is { } address)
var result = await _shared.InternalRuntime.GrainLocator.Register(Address, previousRegistration);
if (Address.Matches(result))
{
DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, $"This grain is active on another host ({address}).");
success = true;
}
else if (result?.SiloAddress is { } registeredSilo && registeredSilo.Equals(Address.SiloAddress))
{
if (_shared.Logger.IsEnabled(LogLevel.Debug))
{
_shared.Logger.LogDebug(
"The grain directory has an existing entry pointing to a different activation of this grain on this silo, {PreviousRegistration}."
+ " This may indicate that the previous activation was deactivated but the directory was not successfully updated."
+ " The directory will be updated to point to this activation.",
previousRegistration);
}

success = false;
CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1);
if (_shared.Logger.IsEnabled(LogLevel.Debug))
// Attempt to register this activation again, using the registration of the previous instance of this grain,
// which is registered to this silo. That activation must be a defunct predecessor of this activation,
// since the catalog only allows one activation of a given grain at a time.
// This could occur if the previous activation failed to unregister itself from the grain directory.
previousRegistration = result;
continue;
}
else
{
// If this was a duplicate, it's not an error, just a race.
// Forward on all of the pending messages, and then forget about this activation.
var primary = _shared.InternalRuntime.LocalGrainDirectory.GetPrimaryForGrain(GrainId);
_shared.Logger.LogDebug(
(int)ErrorCode.Catalog_DuplicateActivation,
"Tried to create a duplicate activation {Address}, but we'll use {ForwardingAddress} instead. "
+ "GrainInstance type is {GrainInstanceType}. {PrimaryMessage}"
+ "Full activation address is {Address}. We have {WaitingCount} messages to forward.",
Address,
ForwardingAddress,
GrainInstance?.GetType(),
primary != null ? "Primary Directory partition for this grain is " + primary + ". " : string.Empty,
Address.ToFullString(),
WaitingCount);
// Set the forwarding address so that messages enqueued on this activation can be forwarded to
// the existing activation.
ForwardingAddress = result?.SiloAddress;
if (ForwardingAddress is { } address)
{
DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, $"This grain is active on another host ({address}).");
}

success = false;
CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1);
if (_shared.Logger.IsEnabled(LogLevel.Debug))
{
// If this was a duplicate, it's not an error, just a race.
// Forward on all of the pending messages, and then forget about this activation.
var primary = _shared.InternalRuntime.LocalGrainDirectory.GetPrimaryForGrain(GrainId);
_shared.Logger.LogDebug(
(int)ErrorCode.Catalog_DuplicateActivation,
"Tried to create a duplicate activation {Address}, but we'll use {ForwardingAddress} instead. "
+ "GrainInstance type is {GrainInstanceType}. {PrimaryMessage}"
+ "Full activation address is {Address}. We have {WaitingCount} messages to forward.",
Address,
ForwardingAddress,
GrainInstance?.GetType(),
primary != null ? "Primary Directory partition for this grain is " + primary + ". " : string.Empty,
Address.ToFullString(),
WaitingCount);
}
}

break;
}

registrationException = null;
Expand Down
20 changes: 6 additions & 14 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,6 @@ public DetailedGrainReport GetDetailedGrainReport(GrainId grain)
};
}

/// <summary>
/// Register a new object to which messages can be delivered with the local lookup table and scheduler.
/// </summary>
/// <param name="activation"></param>
public void RegisterMessageTarget(IGrainContext activation)
{
activations.RecordNewTarget(activation);
CatalogInstruments.ActivationsCreated.Add(1);
}

/// <summary>
/// Unregister message target and stop delivering messages to it
/// </summary>
Expand Down Expand Up @@ -271,9 +261,9 @@ public IGrainContext GetOrCreateActivation(

if (!SiloStatusOracle.CurrentStatus.IsTerminating())
{
var address = GrainAddress.GetAddress(Silo, grainId, new ActivationId(Guid.NewGuid()));
var address = GrainAddress.GetAddress(Silo, grainId, ActivationId.NewId());
result = this.grainActivator.CreateInstance(address);
RegisterMessageTarget(result);
activations.RecordNewTarget(result);
}
} // End lock

Expand All @@ -283,6 +273,8 @@ public IGrainContext GetOrCreateActivation(
return UnableToCreateActivation(this, grainId);
}

CatalogInstruments.ActivationsCreated.Add(1);

// Rehydration occurs before activation.
if (rehydrationContext is not null)
{
Expand Down Expand Up @@ -317,7 +309,7 @@ static IGrainContext UnableToCreateActivation(Catalog self, GrainId grainId)
// Unregister the target activation so we don't keep getting spurious messages.
// The time delay (one minute, as of this writing) is to handle the unlikely but possible race where
// this request snuck ahead of another request, with new placement requested, for the same activation.
// If the activation registration request from the new placement somehow sneaks ahead of this unregistration,
// If the activation registration request from the new placement somehow sneaks ahead of this deregistration,
// we want to make sure that we don't unregister the activation we just created.
var address = new GrainAddress { SiloAddress = self.Silo, GrainId = grainId };
_ = self.UnregisterNonExistentActivation(address);
Expand All @@ -344,7 +336,7 @@ private async Task UnregisterNonExistentActivation(GrainAddress address)
/// <summary>
/// Try to get runtime data for an activation
/// </summary>
public bool TryGetGrainContext(GrainId grainId, out IGrainContext data)
private bool TryGetGrainContext(GrainId grainId, out IGrainContext data)
{
data = activations.FindTarget(grainId);
return data != null;
Expand Down
3 changes: 0 additions & 3 deletions src/Orleans.Runtime/Messaging/MessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable
private readonly SiloAddress _siloAddress;
private readonly SiloMessagingOptions messagingOptions;
private readonly PlacementService placementService;
private readonly ActivationDirectory activationDirectory;
private readonly GrainLocator _grainLocator;
private readonly ILogger log;
private readonly Catalog catalog;
Expand All @@ -43,7 +42,6 @@ public MessageCenter(
RuntimeMessagingTrace messagingTrace,
IOptions<SiloMessagingOptions> messagingOptions,
PlacementService placementService,
ActivationDirectory activationDirectory,
GrainLocator grainLocator)
{
this.catalog = catalog;
Expand All @@ -52,7 +50,6 @@ public MessageCenter(
this.connectionManager = senderManager;
this.messagingTrace = messagingTrace;
this.placementService = placementService;
this.activationDirectory = activationDirectory;
_grainLocator = grainLocator;
this.log = logger;
this.messageFactory = messageFactory;
Expand Down
2 changes: 2 additions & 0 deletions test/Grains/TestGrainInterfaces/ITestGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface IGuidTestGrain : IGrainWithGuidKey
Task<string> GetRuntimeInstanceId();

Task<string> GetActivationId();

Task<SiloAddress> GetSiloAddress();
}

public interface IOneWayGrain : IGrainWithGuidKey
Expand Down
3 changes: 3 additions & 0 deletions test/Grains/TestInternalGrains/TestGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public Task<long> GetKey()
}
}

[GrainType("guid-test-grain")]
internal class GuidTestGrain : Grain, IGuidTestGrain
{
private readonly string _id = Guid.NewGuid().ToString();
Expand Down Expand Up @@ -200,6 +201,8 @@ public Task<string> GetActivationId()
{
return Task.FromResult(_id);
}

public Task<SiloAddress> GetSiloAddress() => Task.FromResult(ServiceProvider.GetRequiredService<ILocalSiloDetails>().SiloAddress);
}

internal class OneWayGrain : Grain, IOneWayGrain, ISimpleGrainObserver
Expand Down
82 changes: 82 additions & 0 deletions test/TesterInternal/GrainLocatorActivationResiliencyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System.Diagnostics;
using System.Net;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

using Orleans.Runtime;
using Orleans.Runtime.GrainDirectory;
using Orleans.Runtime.Placement;
using Orleans.TestingHost;
using TestExtensions;
using UnitTests.GrainInterfaces;
using Xunit;

namespace UnitTests
{
/// <summary>
/// Tests that grain activation can recover from invalid grain directory entries.
/// </summary>
public class GrainLocatorActivationResiliencyTests : HostedTestClusterEnsureDefaultStarted
{
public GrainLocatorActivationResiliencyTests(DefaultClusterFixture fixture) : base(fixture)
{
}

/// <summary>
/// Tests that a grain can be activated even if the grain locator indicates that there is an existing registration for that grain on the same silo.
/// </summary>
[Fact, TestCategory("BVT")]
public async Task ReactivateGrainWithPoisonGrainDirectoryEntry_LocalSilo()
{
var primarySilo = (InProcessSiloHandle)Fixture.HostedCluster.Primary;
var primarySiloAddress = primarySilo.SiloAddress;
var grain = GrainFactory.GetGrain<IGuidTestGrain>(Guid.NewGuid());

// Insert an entry into the grain directory which points to a grain which is not active.
// The entry points to the silo which the activation will be created on, but it points to a different activation.
// This will cause the first registration attempt to fail, but the activation should recognize that the grain
// directory entry is incorrect (points to the current silo, but an activation which must be defunct) and retry
// registration, passing the previous registration so that the grain directory entry is updated.
var grainLocator = primarySilo.SiloHost.Services.GetRequiredService<GrainLocator>();
var badAddress = GrainAddress.GetAddress(primarySiloAddress, grain.GetGrainId(), ActivationId.NewId());
await grainLocator.Register(badAddress, previousRegistration: null);

{
// Rig placement to occurs on the primary silo.
RequestContext.Set(IPlacementDirector.PlacementHintKey, primarySiloAddress);
var silo = await grain.GetSiloAddress();
Assert.Equal(primarySiloAddress, silo);
}
}

/// <summary>
/// Similar to <see cref="ReactivateGrainWithPoisonGrainDirectoryEntry_LocalSilo"/>, except this test attempts to activate the grain on a different silo
/// from the one specified in the grain directory entry, to ensure that the call will be forwarded to the correct silo.
/// </summary>
[Fact, TestCategory("BVT")]
public async Task ReactivateGrainWithPoisonGrainDirectoryEntry_RemoteSilo()
{
var primarySilo = (InProcessSiloHandle)Fixture.HostedCluster.Primary;
var secondarySiloAddress = Fixture.HostedCluster.SecondarySilos.First().SiloAddress;
var primarySiloAddress = primarySilo.SiloAddress;
var grain = GrainFactory.GetGrain<IGuidTestGrain>(Guid.NewGuid());

// Insert an entry into the grain directory which points to a grain which is not active.
// The entry points to another silo, but that silo also does not host the registered activation.
// This will cause the first registration attempt to fail, but the activation will forward messages to the
// silo which the registration points to and the subsequent activation will also initially fail registration,
// but succeed later.
var grainLocator = primarySilo.SiloHost.Services.GetRequiredService<GrainLocator>();
var badAddress = GrainAddress.GetAddress(primarySiloAddress, grain.GetGrainId(), ActivationId.NewId());
await grainLocator.Register(badAddress, previousRegistration: null);

{
// Rig placement to occurs on the secondary silo, but since there is a directory entry pointing to the primary silo,
// the call should be forwarded to the primary silo where the grain will be activated.
RequestContext.Set(IPlacementDirector.PlacementHintKey, secondarySiloAddress);
var silo = await grain.GetSiloAddress();
Assert.Equal(primarySiloAddress, silo);
}
}
}
}
Loading