Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Silo Metadata and Placement Filtering #9271

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Orleans.Core.Abstractions/Manifest/GrainProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public static class WellKnownGrainTypeProperties
/// </summary>
public const string PlacementStrategy = "placement-strategy";

/// <summary>
/// The name of the placement strategy for grains of this type.
/// </summary>
public const string PlacementFilter = "placement-filter";

/// <summary>
/// The directory policy for grains of this type.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using Orleans.Metadata;
using Orleans.Runtime;

#nullable enable
namespace Orleans.Placement;

/// <summary>
/// Base for all placement filter marker attributes.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public abstract class PlacementFilterAttribute : Attribute, IGrainPropertiesProviderAttribute
{
public PlacementFilterStrategy PlacementFilterStrategy { get; private set; }

protected PlacementFilterAttribute(PlacementFilterStrategy placement)
{
ArgumentNullException.ThrowIfNull(placement);
PlacementFilterStrategy = placement;
}

/// <inheritdoc />
public virtual void Populate(IServiceProvider services, Type grainClass, GrainType grainType, Dictionary<string, string> properties)
=> PlacementFilterStrategy?.PopulateGrainProperties(services, grainClass, grainType, properties);
}
77 changes: 77 additions & 0 deletions src/Orleans.Core.Abstractions/Placement/PlacementFilterStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System;
using System.Collections.Generic;
using Orleans.Metadata;
using Orleans.Runtime;

#nullable enable
namespace Orleans.Placement;

public abstract class PlacementFilterStrategy
{
public int Order { get; private set; }

protected PlacementFilterStrategy(int order)
{
Order = order;
}

/// <summary>
/// Initializes an instance of this type using the provided grain properties.
/// </summary>
/// <param name="properties">
/// The grain properties.
/// </param>
public void Initialize(GrainProperties properties)
{
var orderProperty = GetPlacementFilterGrainProperty("order", properties);
if (!int.TryParse(orderProperty, out var parsedOrder))
{
throw new ArgumentException("Invalid order property value.");
}

Order = parsedOrder;

AdditionalInitialize(properties);
}

public virtual void AdditionalInitialize(GrainProperties properties)
{

}

/// <summary>
/// Populates grain properties to specify the preferred placement strategy.
/// </summary>
/// <param name="services">The service provider.</param>
/// <param name="grainClass">The grain class.</param>
/// <param name="grainType">The grain type.</param>
/// <param name="properties">The grain properties which will be populated by this method call.</param>
public void PopulateGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, Dictionary<string, string> properties)
{
var typeName = GetType().Name;
if (properties.TryGetValue(WellKnownGrainTypeProperties.PlacementFilter, out var existingValue))
{
properties[WellKnownGrainTypeProperties.PlacementFilter] = $"{existingValue},{typeName}";
}
else
{
properties[WellKnownGrainTypeProperties.PlacementFilter] = typeName;
}

properties[$"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.order"] = Order.ToString();

foreach (var additionalGrainProperty in GetAdditionalGrainProperties(services, grainClass, grainType, properties))
{
properties[$"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.{additionalGrainProperty.Key}"] = additionalGrainProperty.Value;
}
}

protected string? GetPlacementFilterGrainProperty(string key, GrainProperties properties)
{
var typeName = GetType().Name;
return properties.Properties.TryGetValue($"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.{key}", out var value) ? value : null;
}

protected virtual IEnumerable<KeyValuePair<string, string>> GetAdditionalGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, IReadOnlyDictionary<string, string> existingProperties)
=> Array.Empty<KeyValuePair<string, string>>();
}
12 changes: 12 additions & 0 deletions src/Orleans.Core/Placement/IPlacementFilterDirector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Collections.Generic;
using Orleans.Runtime;
using Orleans.Runtime.Placement;

#nullable enable
namespace Orleans.Placement;

public interface IPlacementFilterDirector
{
IEnumerable<SiloAddress> Filter(PlacementFilterStrategy filterStrategy, PlacementTarget target,
IEnumerable<SiloAddress> silos);
}
26 changes: 26 additions & 0 deletions src/Orleans.Core/Placement/PlacementFilterExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Microsoft.Extensions.DependencyInjection;

#nullable enable
namespace Orleans.Placement;

public static class PlacementFilterExtensions
{
/// <summary>
/// Configures a <typeparamref name="TFilter"/> for filtering candidate grain placements.
/// </summary>
/// <typeparam name="TFilter">The placement filter.</typeparam>
/// <typeparam name="TDirector">The placement filter director.</typeparam>
/// <param name="services">The service collection.</param>
/// <param name="strategyLifetime">The lifetime of the placement strategy.</param>
/// <returns>The service collection.</returns>
public static IServiceCollection AddPlacementFilter<TFilter, TDirector>(this IServiceCollection services, ServiceLifetime strategyLifetime)
where TFilter : PlacementFilterStrategy
where TDirector : class, IPlacementFilterDirector
{
services.Add(ServiceDescriptor.DescribeKeyed(typeof(PlacementFilterStrategy), typeof(TFilter).Name, typeof(TFilter), strategyLifetime));
services.AddKeyedSingleton<IPlacementFilterDirector, TDirector>(typeof(TFilter));

return services;
}

}
7 changes: 5 additions & 2 deletions src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@
using Orleans.Serialization.Internal;
using Orleans.Core;
using Orleans.Placement.Repartitioning;
using Orleans.GrainDirectory;
using Orleans.Runtime.Hosting;
using Orleans.Runtime.Placement.Filtering;

namespace Orleans.Hosting
{
Expand Down Expand Up @@ -206,6 +205,10 @@ internal static void AddDefaultServices(ISiloBuilder builder)
// Configure the default placement strategy.
services.TryAddSingleton<PlacementStrategy, RandomPlacement>();

// Placement filters
services.AddSingleton<PlacementFilterStrategyResolver>();
services.AddSingleton<PlacementFilterDirectorResolver>();

// Placement directors
services.AddPlacementDirector<RandomPlacement, RandomPlacementDirector>();
services.AddPlacementDirector<PreferLocalPlacement, PreferLocalPlacementDirector>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

public interface ISiloMetadataCache
{
SiloMetadata GetMetadata(SiloAddress siloAddress);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Threading.Tasks;
using Orleans.Services;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

public interface ISiloMetadataClient : IGrainServiceClient<ISiloMetadataGrainService>
{
Task<SiloMetadata> GetSiloMetadata(SiloAddress siloAddress);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading.Tasks;
using Orleans.Services;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

[Alias("Orleans.Runtime.MembershipService.SiloMetadata.ISiloMetadataGrainService")]
public interface ISiloMetadataGrainService : IGrainService
{
[Alias("GetSiloMetadata")]
Task<SiloMetadata> GetSiloMetadata();
}
103 changes: 103 additions & 0 deletions src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadaCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Internal;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

internal class SiloMetadataCache(
ISiloMetadataClient siloMetadataClient,
MembershipTableManager membershipTableManager,
ILogger<SiloMetadataCache> logger)
: ISiloMetadataCache, ILifecycleParticipant<ISiloLifecycle>, IDisposable
{
private readonly ConcurrentDictionary<SiloAddress, SiloMetadata> _metadata = new();
private readonly CancellationTokenSource _cts = new();

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
var tasks = new List<Task>(1);
rkargMsft marked this conversation as resolved.
Show resolved Hide resolved
var cancellation = new CancellationTokenSource();
Task OnRuntimeInitializeStart(CancellationToken _)
{
tasks.Add(Task.Run(() => this.ProcessMembershipUpdates(cancellation.Token)));
return Task.CompletedTask;
}

async Task OnRuntimeInitializeStop(CancellationToken ct)
{
cancellation.Cancel(throwOnFirstException: false);
var shutdownGracePeriod = Task.WhenAll(Task.Delay(ClusterMembershipOptions.ClusteringShutdownGracePeriod), ct.WhenCancelled());
await Task.WhenAny(shutdownGracePeriod, Task.WhenAll(tasks));
}

lifecycle.Subscribe(
nameof(ClusterMembershipService),
ServiceLifecycleStage.RuntimeInitialize,
OnRuntimeInitializeStart,
OnRuntimeInitializeStop);
}


private async Task ProcessMembershipUpdates(CancellationToken ct)
{
try
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Starting to process membership updates");
await foreach (var update in membershipTableManager.MembershipTableUpdates.WithCancellation(ct))
{
// Add entries for members that aren't already in the cache
foreach (var membershipEntry in update.Entries.Where(e => e.Value.Status != SiloStatus.Dead))
{
if (!_metadata.ContainsKey(membershipEntry.Key))
{
try
{
var metadata = await siloMetadataClient.GetSiloMetadata(membershipEntry.Key);
_metadata.TryAdd(membershipEntry.Key, metadata);
}
catch(Exception exception)
{
logger.LogError(exception, "Error fetching metadata for silo {Silo}", membershipEntry.Key);
}
}
}

// Remove entries for members that are now dead
foreach (var membershipEntry in update.Entries.Where(e => e.Value.Status == SiloStatus.Dead))
{
_metadata.TryRemove(membershipEntry.Key, out _);
}

// Remove entries for members that are no longer in the table
foreach (var silo in _metadata.Keys.ToList())
{
if (!update.Entries.ContainsKey(silo))
{
_metadata.TryRemove(silo, out _);
}
}
}
}
catch (Exception exception)
{
logger.LogError(exception, "Error processing membership updates");
}
finally
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Stopping membership update processor");
}
}

public SiloMetadata GetMetadata(SiloAddress siloAddress) => _metadata.GetValueOrDefault(siloAddress) ?? SiloMetadata.Empty;

public void SetMetadata(SiloAddress siloAddress, SiloMetadata metadata) => _metadata.TryAdd(siloAddress, metadata);

public void Dispose() => _cts.Cancel();
}
18 changes: 18 additions & 0 deletions src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadata.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.Collections.Generic;
using System.Collections.Immutable;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

[GenerateSerializer]
[Alias("Orleans.Runtime.MembershipService.SiloMetadata.SiloMetadata")]
public record SiloMetadata
{
public static SiloMetadata Empty { get; } = new SiloMetadata();

[Id(0)]
public ImmutableDictionary<string, string> Metadata { get; private set; } = ImmutableDictionary<string, string>.Empty;
rkargMsft marked this conversation as resolved.
Show resolved Hide resolved

internal void AddMetadata(IEnumerable<KeyValuePair<string, string>> metadata) => Metadata = Metadata.SetItems(metadata);
internal void AddMetadata(string key, string value) => Metadata = Metadata.SetItem(key, value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using System.Threading.Tasks;
using Orleans.Runtime.Services;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

public class SiloMetadataClient(IServiceProvider serviceProvider)
: GrainServiceClient<ISiloMetadataGrainService>(serviceProvider), ISiloMetadataClient
{
public async Task<SiloMetadata> GetSiloMetadata(SiloAddress siloAddress)
{
var grainService = GetGrainService(siloAddress);
var metadata = await grainService.GetSiloMetadata();
return metadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

public class SiloMetadataGrainService : GrainService, ISiloMetadataGrainService
{
private readonly SiloMetadata _siloMetadata;

public SiloMetadataGrainService(IOptions<SiloMetadata> siloMetadata) : base()
{
_siloMetadata = siloMetadata.Value;
}

public SiloMetadataGrainService(IOptions<SiloMetadata> siloMetadata, GrainId grainId, Silo silo, ILoggerFactory loggerFactory) : base(grainId, silo, loggerFactory)
{
_siloMetadata = siloMetadata.Value;
}

public Task<SiloMetadata> GetSiloMetadata() => Task.FromResult(_siloMetadata);
}
Loading
Loading