diff --git a/src/Orleans.Core.Abstractions/Manifest/GrainProperties.cs b/src/Orleans.Core.Abstractions/Manifest/GrainProperties.cs index 41181c248d..4f994e01f2 100644 --- a/src/Orleans.Core.Abstractions/Manifest/GrainProperties.cs +++ b/src/Orleans.Core.Abstractions/Manifest/GrainProperties.cs @@ -68,6 +68,11 @@ public static class WellKnownGrainTypeProperties /// public const string PlacementStrategy = "placement-strategy"; + /// + /// The name of the placement strategy for grains of this type. + /// + public const string PlacementFilter = "placement-filter"; + /// /// The directory policy for grains of this type. /// diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs index 67f98907cd..bba9fb3276 100644 --- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs +++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs @@ -43,8 +43,7 @@ using Orleans.Serialization.Internal; using Orleans.Core; using Orleans.Placement.Repartitioning; -using Orleans.GrainDirectory; -using Orleans.Runtime.Hosting; +using Orleans.Runtime.Placement.Filtering; namespace Orleans.Hosting { @@ -206,6 +205,10 @@ internal static void AddDefaultServices(ISiloBuilder builder) // Configure the default placement strategy. services.TryAddSingleton(); + // Placement filters + services.AddSingleton(); + services.AddSingleton(); + // Placement directors services.AddPlacementDirector(); services.AddPlacementDirector(); diff --git a/src/Orleans.Runtime/MembershipService/SiloMetadata/ISiloMetadataCache.cs b/src/Orleans.Runtime/MembershipService/SiloMetadata/ISiloMetadataCache.cs new file mode 100644 index 0000000000..a15cb67a36 --- /dev/null +++ b/src/Orleans.Runtime/MembershipService/SiloMetadata/ISiloMetadataCache.cs @@ -0,0 +1,6 @@ +namespace Orleans.Runtime.MembershipService.SiloMetadata; +#nullable enable +public interface ISiloMetadataCache +{ + SiloMetadata GetMetadata(SiloAddress siloAddress); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/MembershipService/SiloMetadata/ISiloMetadataClient.cs b/src/Orleans.Runtime/MembershipService/SiloMetadata/ISiloMetadataClient.cs new file mode 100644 index 0000000000..eb622eaaa4 --- /dev/null +++ b/src/Orleans.Runtime/MembershipService/SiloMetadata/ISiloMetadataClient.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; +using Orleans.Services; + +namespace Orleans.Runtime.MembershipService.SiloMetadata; + +public interface ISiloMetadataClient : IGrainServiceClient +{ + Task GetSiloMetadata(SiloAddress siloAddress); +} diff --git a/src/Orleans.Runtime/MembershipService/SiloMetadata/ISiloMetadataGrainService.cs b/src/Orleans.Runtime/MembershipService/SiloMetadata/ISiloMetadataGrainService.cs new file mode 100644 index 0000000000..97e3c7e600 --- /dev/null +++ b/src/Orleans.Runtime/MembershipService/SiloMetadata/ISiloMetadataGrainService.cs @@ -0,0 +1,11 @@ +using System.Threading.Tasks; +using Orleans.Services; + +namespace Orleans.Runtime.MembershipService.SiloMetadata; + +[Alias("Orleans.Runtime.MembershipService.SiloMetadata.ISiloMetadataGrainService")] +public interface ISiloMetadataGrainService : IGrainService +{ + [Alias("GetSiloMetadata")] + Task GetSiloMetadata(); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadaCache.cs b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadaCache.cs new file mode 100644 index 0000000000..afdcb39c95 --- /dev/null +++ b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadaCache.cs @@ -0,0 +1,96 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Orleans.Configuration; +using Orleans.Internal; + +namespace Orleans.Runtime.MembershipService.SiloMetadata; +#nullable enable +internal class SiloMetadataCache( + ISiloMetadataClient siloMetadataClient, + MembershipTableManager membershipTableManager, + ILogger logger) + : ISiloMetadataCache, ILifecycleParticipant, IDisposable +{ + private readonly ConcurrentDictionary _metadata = new(); + private readonly CancellationTokenSource _cts = new(); + + void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) + { + var tasks = new List(1); + var cancellation = new CancellationTokenSource(); + Task OnRuntimeInitializeStart(CancellationToken _) + { + tasks.Add(Task.Run(() => this.ProcessMembershipUpdates(cancellation.Token))); + return Task.CompletedTask; + } + + async Task OnRuntimeInitializeStop(CancellationToken ct) + { + cancellation.Cancel(throwOnFirstException: false); + var shutdownGracePeriod = Task.WhenAll(Task.Delay(ClusterMembershipOptions.ClusteringShutdownGracePeriod), ct.WhenCancelled()); + await Task.WhenAny(shutdownGracePeriod, Task.WhenAll(tasks)); + } + + lifecycle.Subscribe( + nameof(ClusterMembershipService), + ServiceLifecycleStage.RuntimeInitialize, + OnRuntimeInitializeStart, + OnRuntimeInitializeStop); + } + + + private async Task ProcessMembershipUpdates(CancellationToken ct) + { + try + { + if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Starting to process membership updates"); + await foreach (var update in membershipTableManager.MembershipTableUpdates.WithCancellation(ct)) + { + // Add entries for members that aren't already in the cache + foreach (var membershipEntry in update.Entries) + { + if (!_metadata.ContainsKey(membershipEntry.Key)) + { + try + { + var metadata = await siloMetadataClient.GetSiloMetadata(membershipEntry.Key); + _metadata.TryAdd(membershipEntry.Key, metadata); + } + catch(Exception exception) + { + logger.LogError(exception, "Error fetching metadata for silo {Silo}", membershipEntry.Key); + } + } + } + + // Remove entries for members that are no longer in the table + foreach (var silo in _metadata.Keys.ToList()) + { + if (!update.Entries.ContainsKey(silo)) + { + _metadata.TryRemove(silo, out _); + } + } + } + } + catch (Exception exception) + { + logger.LogError(exception, "Error processing membership updates"); + } + finally + { + if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Stopping membership update processor"); + } + } + + public SiloMetadata GetMetadata(SiloAddress siloAddress) => _metadata.GetValueOrDefault(siloAddress) ?? SiloMetadata.Empty; + + public void SetMetadata(SiloAddress siloAddress, SiloMetadata metadata) => _metadata.TryAdd(siloAddress, metadata); + + public void Dispose() => _cts.Cancel(); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadata.cs b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadata.cs new file mode 100644 index 0000000000..a8adf8397c --- /dev/null +++ b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadata.cs @@ -0,0 +1,17 @@ +using System.Collections.Generic; +using System.Collections.Immutable; + +namespace Orleans.Runtime.MembershipService.SiloMetadata; + +[GenerateSerializer] +[Alias("Orleans.Runtime.MembershipService.SiloMetadata.SiloMetadata")] +public record SiloMetadata +{ + public static SiloMetadata Empty { get; } = new SiloMetadata(); + + [Id(0)] + public ImmutableDictionary Metadata { get; private set; } = ImmutableDictionary.Empty; + + internal void AddMetadata(IEnumerable> metadata) => Metadata = Metadata.AddRange(metadata); + internal void AddMetadata(string key, string value) => Metadata = Metadata.Add(key, value); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataClient.cs b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataClient.cs new file mode 100644 index 0000000000..377bf30288 --- /dev/null +++ b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataClient.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading.Tasks; +using Orleans.Runtime.Services; + +namespace Orleans.Runtime.MembershipService.SiloMetadata; + +public class SiloMetadataClient(IServiceProvider serviceProvider) + : GrainServiceClient(serviceProvider), ISiloMetadataClient +{ + public async Task GetSiloMetadata(SiloAddress siloAddress) + { + var grainService = GetGrainService(siloAddress); + var metadata = await grainService.GetSiloMetadata(); + return metadata; + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataGrainService.cs b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataGrainService.cs new file mode 100644 index 0000000000..336b527458 --- /dev/null +++ b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataGrainService.cs @@ -0,0 +1,22 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Orleans.Runtime.MembershipService.SiloMetadata; + +public class SiloMetadataGrainService : GrainService, ISiloMetadataGrainService +{ + private readonly SiloMetadata _siloMetadata; + + public SiloMetadataGrainService(IOptions siloMetadata) : base() + { + _siloMetadata = siloMetadata.Value; + } + + public SiloMetadataGrainService(IOptions siloMetadata, GrainId grainId, Silo silo, ILoggerFactory loggerFactory) : base(grainId, silo, loggerFactory) + { + _siloMetadata = siloMetadata.Value; + } + + public Task GetSiloMetadata() => Task.FromResult(_siloMetadata); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataHostingExtensions.cs b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataHostingExtensions.cs new file mode 100644 index 0000000000..3457bd0984 --- /dev/null +++ b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataHostingExtensions.cs @@ -0,0 +1,90 @@ +using System.Collections.Generic; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration.Internal; +using Orleans.Hosting; +using Orleans.Runtime.Placement.Filtering; + +namespace Orleans.Runtime.MembershipService.SiloMetadata; + +public static class SiloMetadataHostingExtensions +{ + + /// + /// Configure silo metadata from the builder configuration. + /// + /// Silo builder + /// + /// Get the ORLEANS__METADATA section from config + /// Key/value pairs in configuration as a will look like this as environment variables: + /// ORLEANS__METADATA__key1=value1 + /// + /// + public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder) => builder.UseSiloMetadata(builder.Configuration); + + /// + /// Configure silo metadata from configuration. + /// + /// Silo builder + /// Configuration to pull from + /// + /// Get the ORLEANS__METADATA section from config + /// Key/value pairs in configuration as a will look like this as environment variables: + /// ORLEANS__METADATA__key1=value1 + /// + /// + public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, IConfiguration configuration) + { + + var metadataConfigSection = builder.Configuration.GetSection("ORLEANS").GetSection("METADATA"); + + return builder.UseSiloMetadata(metadataConfigSection); + } + + /// + /// Configure silo metadata from configuration section. + /// + /// Silo builder + /// Configuration section to pull from + /// + /// Get the ORLEANS__METADATA section from config section + /// Key/value pairs in configuration as a will look like this as environment variables: + /// ORLEANS__METADATA__key1=value1 + /// + /// + public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, IConfigurationSection configurationSection) + { + var dictionary = configurationSection.Get>(); + + return builder.UseSiloMetadata(dictionary ?? new Dictionary()); + } + + /// + /// Configure silo metadata from configuration section. + /// + /// Silo builder + /// Metadata to add + /// + public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, Dictionary metadata) + { + builder.ConfigureServices(services => + { + services + .AddOptionsWithValidateOnStart() + .Configure(m => + { + m.AddMetadata(metadata); + }); + + services.AddGrainService(); + services.AddSingleton(); + services.AddFromExisting(); + services.AddFromExisting, SiloMetadataCache>(); + services.AddSingleton(); + // Placement filters + services.AddPlacementFilter(ServiceLifetime.Transient); + services.AddPlacementFilter(ServiceLifetime.Transient); + }); + return builder; + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/IPlacementFilterDirector.cs b/src/Orleans.Runtime/Placement/Filtering/IPlacementFilterDirector.cs new file mode 100644 index 0000000000..23b2437771 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/IPlacementFilterDirector.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace Orleans.Runtime.Placement.Filtering; + +public interface IPlacementFilterDirector +{ + IEnumerable Filter(PlacementFilterStrategy filterStrategy, PlacementTarget target, + IEnumerable silos); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/PlacementFilterAttribute.cs b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterAttribute.cs new file mode 100644 index 0000000000..b67e23fd10 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterAttribute.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using Orleans.Metadata; + +namespace Orleans.Runtime.Placement.Filtering; + +/// +/// Base for all placement filter marker attributes. +/// +[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)] +public abstract class PlacementFilterAttribute : Attribute, IGrainPropertiesProviderAttribute +{ + public PlacementFilterStrategy PlacementFilterStrategy { get; private set; } + + protected PlacementFilterAttribute(PlacementFilterStrategy placement) + { + ArgumentNullException.ThrowIfNull(placement); + PlacementFilterStrategy = placement; + } + + /// + public virtual void Populate(IServiceProvider services, Type grainClass, GrainType grainType, Dictionary properties) + => PlacementFilterStrategy?.PopulateGrainProperties(services, grainClass, grainType, properties); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/PlacementFilterDirectorResolver.cs b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterDirectorResolver.cs new file mode 100644 index 0000000000..8401f5eac7 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterDirectorResolver.cs @@ -0,0 +1,12 @@ +using System; +using Microsoft.Extensions.DependencyInjection; + +namespace Orleans.Runtime.Placement.Filtering; + +/// +/// Responsible for resolving an for a . +/// +public sealed class PlacementFilterDirectorResolver(IServiceProvider services) +{ + public IPlacementFilterDirector GetFilterDirector(PlacementFilterStrategy placementFilterStrategy) => services.GetRequiredKeyedService(placementFilterStrategy.GetType()); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/PlacementFilterExtensions.cs b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterExtensions.cs new file mode 100644 index 0000000000..faf9367f27 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterExtensions.cs @@ -0,0 +1,23 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace Orleans.Runtime.Placement.Filtering; + +public static class PlacementFilterExtensions +{ + /// + /// Configures a for filtering candidate grain placements. + /// + /// The placement filter. + /// The placement filter director. + /// The service collection. + /// The lifetime of the placement strategy. + /// The service collection. + public static void AddPlacementFilter(this IServiceCollection services, ServiceLifetime strategyLifetime) + where TFilter : PlacementFilterStrategy + where TDirector : class, IPlacementFilterDirector + { + services.Add(ServiceDescriptor.DescribeKeyed(typeof(PlacementFilterStrategy), typeof(TFilter).Name, typeof(TFilter), strategyLifetime)); + services.AddKeyedSingleton(typeof(TFilter)); + } + +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/PlacementFilterStrategy.cs b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterStrategy.cs new file mode 100644 index 0000000000..614b42ec99 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterStrategy.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections.Generic; +using Orleans.Metadata; + +namespace Orleans.Runtime.Placement.Filtering; + +public abstract class PlacementFilterStrategy +{ + /// + /// Initializes an instance of this type using the provided grain properties. + /// + /// + /// The grain properties. + /// + public virtual void Initialize(GrainProperties properties) + { + } + + /// + /// Populates grain properties to specify the preferred placement strategy. + /// + /// The service provider. + /// The grain class. + /// The grain type. + /// The grain properties which will be populated by this method call. + public void PopulateGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, Dictionary properties) + { + var typeName = GetType().Name; + if (properties.TryGetValue(WellKnownGrainTypeProperties.PlacementFilter, out var existingValue)) + { + properties[WellKnownGrainTypeProperties.PlacementFilter] = $"{existingValue},{typeName}"; + } + else + { + properties[WellKnownGrainTypeProperties.PlacementFilter] = typeName; + } + + foreach (var additionalGrainProperty in GetAdditionalGrainProperties(services, grainClass, grainType, properties)) + { + properties[$"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.{additionalGrainProperty.Key}"] = additionalGrainProperty.Value; + } + } + + protected string GetPlacementFilterGrainProperty(string key, GrainProperties properties) + { + var typeName = GetType().Name; + return properties.Properties.TryGetValue($"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.{key}", out var value) ? value : null; + } + + protected virtual IEnumerable> GetAdditionalGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, IReadOnlyDictionary existingProperties) + => Array.Empty>(); +} diff --git a/src/Orleans.Runtime/Placement/Filtering/PlacementFilterStrategyResolver.cs b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterStrategyResolver.cs new file mode 100644 index 0000000000..cb36817056 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/PlacementFilterStrategyResolver.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Metadata; + +namespace Orleans.Runtime.Placement.Filtering; + +/// +/// Responsible for resolving an for a . +/// +public sealed class PlacementFilterStrategyResolver +{ + private readonly ConcurrentDictionary _resolvedFilters = new(); + private readonly Func _getFiltersInternal; + private readonly GrainPropertiesResolver _grainPropertiesResolver; + private readonly IServiceProvider _services; + + /// + /// Create a instance. + /// + public PlacementFilterStrategyResolver( + IServiceProvider services, + GrainPropertiesResolver grainPropertiesResolver) + { + _services = services; + _getFiltersInternal = GetPlacementFilterStrategyInternal; + _grainPropertiesResolver = grainPropertiesResolver; + } + + /// + /// Gets the placement filter strategy associated with the provided grain type. + /// + public PlacementFilterStrategy[] GetPlacementFilterStrategies(GrainType grainType) => _resolvedFilters.GetOrAdd(grainType, _getFiltersInternal); + + private PlacementFilterStrategy[] GetPlacementFilterStrategyInternal(GrainType grainType) + { + _grainPropertiesResolver.TryGetGrainProperties(grainType, out var properties); + + if (properties is not null + && properties.Properties.TryGetValue(WellKnownGrainTypeProperties.PlacementFilter, out var placementFilterIds) + && !string.IsNullOrWhiteSpace(placementFilterIds)) + { + var filterList = new List(); + foreach (var filterId in placementFilterIds.Split(",")) + { + var filter = _services.GetKeyedService(filterId); + if (filter is not null) + { + filter.Initialize(properties); + filterList.Add(filter); + } + else + { + throw new KeyNotFoundException($"Could not resolve placement filter strategy {filterId} for grain type {grainType}. Ensure that dependencies for that filter have been configured in the Container. This is often through a .Use* extension method provided by the implementation."); + } + } + return filterList.ToArray(); + } + + return []; + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/PreferredSiloMetadataPlacementFilterAttribute.cs b/src/Orleans.Runtime/Placement/Filtering/PreferredSiloMetadataPlacementFilterAttribute.cs new file mode 100644 index 0000000000..1c2183fcf1 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/PreferredSiloMetadataPlacementFilterAttribute.cs @@ -0,0 +1,7 @@ +using System; + +namespace Orleans.Runtime.Placement.Filtering; + +[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)] +public class PreferredSiloMetadataPlacementFilterAttribute(string[] orderedMetadataKeys) + : PlacementFilterAttribute(new PreferredSiloMetadataPlacementFilterStrategy(orderedMetadataKeys)); \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/PreferredSiloMetadataPlacementFilterDirector.cs b/src/Orleans.Runtime/Placement/Filtering/PreferredSiloMetadataPlacementFilterDirector.cs new file mode 100644 index 0000000000..e6d08aa060 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/PreferredSiloMetadataPlacementFilterDirector.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Orleans.Runtime.MembershipService.SiloMetadata; +#nullable enable +namespace Orleans.Runtime.Placement.Filtering; + +internal class PreferredSiloMetadataPlacementFilterDirector( + ILocalSiloDetails localSiloDetails, + ISiloMetadataCache siloMetadataCache) + : IPlacementFilterDirector +{ + public IEnumerable Filter(PlacementFilterStrategy filterStrategy, PlacementTarget target, IEnumerable silos) + { + var orderedMetadataKeys = (filterStrategy as PreferredSiloMetadataPlacementFilterStrategy)?.OrderedMetadataKeys ?? []; + var localSiloMetadata = siloMetadataCache.GetMetadata(localSiloDetails.SiloAddress).Metadata; + + if (localSiloMetadata.Count == 0) + { + // yield return all silos if no metadata keys are configured + foreach (var silo in silos) + { + yield return silo; + } + } + else + { + // return the list of silos that match the most metadata keys. The first key in the list is the least important. + // This means that the last key in the list is the most important. + // If no silos match any metadata keys, return the original list of silos. + + var siloList = silos.ToList(); + var maxScore = 0; + var siloScores = new int[siloList.Count]; + for (var i = 0; i < siloList.Count; i++) + { + var siloMetadata = siloMetadataCache.GetMetadata(siloList[i]).Metadata; + for (var j = orderedMetadataKeys.Length - 1; j >= 0; --j) + { + if (siloMetadata.TryGetValue(orderedMetadataKeys[j], out var siloMetadataValue) && + localSiloMetadata.TryGetValue(orderedMetadataKeys[j], out var localSiloMetadataValue) && + siloMetadataValue == localSiloMetadataValue) + { + var newScore = siloScores[i]++; + maxScore = Math.Max(maxScore, newScore); + } + else + { + break; + } + } + } + + if (maxScore == 0) + { + // yield return all silos if no silos match any metadata keys + foreach (var silo in siloList) + { + yield return silo; + } + } + + // return the list of silos that match the most metadata keys + for (var i = 0; i < siloScores.Length; i++) + { + if (siloScores[i] == maxScore) + { + yield return siloList[i]; + } + } + } + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/PreferredSiloMetadataPlacementFilterStrategy.cs b/src/Orleans.Runtime/Placement/Filtering/PreferredSiloMetadataPlacementFilterStrategy.cs new file mode 100644 index 0000000000..6316fd0c0b --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/PreferredSiloMetadataPlacementFilterStrategy.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; +using Orleans.Metadata; + +namespace Orleans.Runtime.Placement.Filtering; + +public class PreferredSiloMetadataPlacementFilterStrategy(string[] orderedMetadataKeys) : PlacementFilterStrategy +{ + public string[] OrderedMetadataKeys { get; set; } = orderedMetadataKeys; + + public PreferredSiloMetadataPlacementFilterStrategy() : this([]) + { + } + + public override void Initialize(GrainProperties properties) + { + base.Initialize(properties); + OrderedMetadataKeys = GetPlacementFilterGrainProperty("ordered-metadata-keys", properties).Split(","); + } + + protected override IEnumerable> GetAdditionalGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, + IReadOnlyDictionary existingProperties) + { + yield return new KeyValuePair("ordered-metadata-keys", string.Join(",", OrderedMetadataKeys)); + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/RequiredSiloMetadataFilterDirector.cs b/src/Orleans.Runtime/Placement/Filtering/RequiredSiloMetadataFilterDirector.cs new file mode 100644 index 0000000000..ba1fd242bd --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/RequiredSiloMetadataFilterDirector.cs @@ -0,0 +1,59 @@ +using System.Collections.Generic; +using System.Linq; +using Orleans.Runtime.MembershipService.SiloMetadata; + +namespace Orleans.Runtime.Placement.Filtering; + +internal class RequiredSiloMetadataFilterDirector(ILocalSiloDetails localSiloDetails, ISiloMetadataCache siloMetadataCache) + : IPlacementFilterDirector +{ + public IEnumerable Filter(PlacementFilterStrategy filterStrategy, PlacementTarget target, IEnumerable silos) + { + var metadataKeys = (filterStrategy as RequiredSiloMetadataPlacementFilterStrategy)?.MetadataKeys ?? []; + + // yield return all silos if no silos match any metadata keys + if (metadataKeys.Length == 0) + { + foreach (var silo in silos) + { + yield return silo; + } + } + else + { + var localMetadata = siloMetadataCache.GetMetadata(localSiloDetails.SiloAddress); + var localRequiredMetadata = GetMetadata(localMetadata, metadataKeys); + + foreach (var silo in silos) + { + var remoteMetadata = siloMetadataCache.GetMetadata(silo); + if(DoesMetadataMatch(localRequiredMetadata, remoteMetadata, metadataKeys)) + { + yield return silo; + } + } + } + } + + private static bool DoesMetadataMatch(string[] localMetadata, SiloMetadata siloMetadata, string[] metadataKeys) + { + for (var i = 0; i < metadataKeys.Length; i++) + { + if(localMetadata[i] != siloMetadata.Metadata?.GetValueOrDefault(metadataKeys[i])) + { + return false; + } + } + + return true; + } + private static string[] GetMetadata(SiloMetadata siloMetadata, string[] metadataKeys) + { + var result = new string[metadataKeys.Length]; + for (var i = 0; i < metadataKeys.Length; i++) + { + result[i] = siloMetadata.Metadata?.GetValueOrDefault(metadataKeys[i]); + } + return result; + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/RequiredSiloMetadataPlacementFilterAttribute.cs b/src/Orleans.Runtime/Placement/Filtering/RequiredSiloMetadataPlacementFilterAttribute.cs new file mode 100644 index 0000000000..180a409c46 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/RequiredSiloMetadataPlacementFilterAttribute.cs @@ -0,0 +1,7 @@ +using System; + +namespace Orleans.Runtime.Placement.Filtering; + +[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)] +public class RequiredSiloMetadataPlacementFilterAttribute(string[] orderedMetadataKeys) + : PlacementFilterAttribute(new RequiredSiloMetadataPlacementFilterStrategy(orderedMetadataKeys)); \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Filtering/RequiredSiloMetadataPlacementFilterStrategy.cs b/src/Orleans.Runtime/Placement/Filtering/RequiredSiloMetadataPlacementFilterStrategy.cs new file mode 100644 index 0000000000..ac79c94080 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Filtering/RequiredSiloMetadataPlacementFilterStrategy.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; +using Orleans.Metadata; + +namespace Orleans.Runtime.Placement.Filtering; + +public class RequiredSiloMetadataPlacementFilterStrategy(string[] metadataKeys) : PlacementFilterStrategy +{ + public string[] MetadataKeys { get; private set; } = metadataKeys; + + public RequiredSiloMetadataPlacementFilterStrategy() : this([]) + { + } + + public override void Initialize(GrainProperties properties) + { + base.Initialize(properties); + MetadataKeys = GetPlacementFilterGrainProperty("metadata-keys", properties).Split(","); + } + + protected override IEnumerable> GetAdditionalGrainProperties(IServiceProvider services, Type grainClass, GrainType grainType, + IReadOnlyDictionary existingProperties) + { + yield return new KeyValuePair("metadata-keys", String.Join(",", MetadataKeys)); + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/PlacementService.cs b/src/Orleans.Runtime/Placement/PlacementService.cs index cd17709a05..4688ccafeb 100644 --- a/src/Orleans.Runtime/Placement/PlacementService.cs +++ b/src/Orleans.Runtime/Placement/PlacementService.cs @@ -9,6 +9,7 @@ using Orleans.Configuration; using Orleans.Runtime.GrainDirectory; using Orleans.Runtime.Internal; +using Orleans.Runtime.Placement.Filtering; using Orleans.Runtime.Versions; namespace Orleans.Runtime.Placement @@ -28,6 +29,8 @@ internal class PlacementService : IPlacementContext private readonly ISiloStatusOracle _siloStatusOracle; private readonly bool _assumeHomogeneousSilosForTesting; private readonly PlacementWorker[] _workers; + private readonly PlacementFilterStrategyResolver _filterStrategyResolver; + private readonly PlacementFilterDirectorResolver _placementFilterDirectoryResolver; /// /// Create a instance. @@ -41,11 +44,15 @@ public PlacementService( GrainVersionManifest grainInterfaceVersions, CachedVersionSelectorManager versionSelectorManager, PlacementDirectorResolver directorResolver, - PlacementStrategyResolver strategyResolver) + PlacementStrategyResolver strategyResolver, + PlacementFilterStrategyResolver filterStrategyResolver, + PlacementFilterDirectorResolver placementFilterDirectoryResolver) { LocalSilo = localSiloDetails.SiloAddress; _strategyResolver = strategyResolver; _directorResolver = directorResolver; + _filterStrategyResolver = filterStrategyResolver; + _placementFilterDirectoryResolver = placementFilterDirectoryResolver; _logger = logger; _grainLocator = grainLocator; _grainInterfaceVersions = grainInterfaceVersions; @@ -117,6 +124,22 @@ public SiloAddress[] GetCompatibleSilos(PlacementTarget target) : _grainInterfaceVersions.GetSupportedSilos(grainType).Result; var compatibleSilos = silos.Intersect(AllActiveSilos).ToArray(); + + + var filters = _filterStrategyResolver.GetPlacementFilterStrategies(grainType); + if (filters.Length > 0) + { + IEnumerable filteredSilos = compatibleSilos; + foreach (var placementFilter in filters) + { + var director = _placementFilterDirectoryResolver.GetFilterDirector(placementFilter); + filteredSilos = director.Filter(placementFilter, target, filteredSilos); + } + + compatibleSilos = filteredSilos.ToArray(); + } + + if (compatibleSilos.Length == 0) { var allWithType = _grainInterfaceVersions.GetSupportedSilos(grainType).Result;