From c1ae3baa41c73c022fb765dd225e0164f5c4efee Mon Sep 17 00:00:00 2001 From: Reuben Bond <203839+ReubenBond@users.noreply.github.com> Date: Wed, 25 Oct 2023 16:13:00 -0700 Subject: [PATCH] Prevent progress reversal in cluster membership (#8673) * Prevent progress reversal in cluster membership * Use more consistent version manipulation in cluster membership tests --- .../Messaging/AdoNetClusteringTable.cs | 4 +- .../AzureBasedMembershipTable.cs | 4 +- .../Manifest/MajorMinorVersion.cs | 7 +- .../Manifest/ClientClusterManifestProvider.cs | 10 +- .../IMembershipTable.cs | 13 +- src/Orleans.Core/Utils/AsyncEnumerable.cs | 151 +++++++++--------- .../Manifest/ClusterManifestProvider.cs | 8 +- .../ClusterMembershipService.cs | 8 +- .../MembershipTableManager.cs | 8 +- .../AsyncEnumerableGrainCallTests.cs | 2 + .../BasicActivationTests.cs | 80 ++++------ test/Grains/TestGrainInterfaces/ITestGrain.cs | 1 + .../Directory/MockClusterMembershipService.cs | 10 +- .../MembershipTableTestsBase.cs | 9 +- 14 files changed, 153 insertions(+), 162 deletions(-) diff --git a/src/AdoNet/Orleans.Clustering.AdoNet/Messaging/AdoNetClusteringTable.cs b/src/AdoNet/Orleans.Clustering.AdoNet/Messaging/AdoNetClusteringTable.cs index 15365d3219..0612f54077 100644 --- a/src/AdoNet/Orleans.Clustering.AdoNet/Messaging/AdoNetClusteringTable.cs +++ b/src/AdoNet/Orleans.Clustering.AdoNet/Messaging/AdoNetClusteringTable.cs @@ -100,7 +100,7 @@ public async Task InsertRow(MembershipEntry entry, TableVersion tableVersi if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.InsertRow aborted due to null check. MembershipEntry is null."); throw new ArgumentNullException(nameof(entry)); } - if (tableVersion == null) + if (tableVersion is null) { if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.InsertRow aborted due to null check. TableVersion is null "); throw new ArgumentNullException(nameof(tableVersion)); @@ -132,7 +132,7 @@ public async Task UpdateRow(MembershipEntry entry, string etag, TableVersi if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateRow aborted due to null check. MembershipEntry is null."); throw new ArgumentNullException(nameof(entry)); } - if (tableVersion == null) + if (tableVersion is null) { if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateRow aborted due to null check. TableVersion is null"); throw new ArgumentNullException(nameof(tableVersion)); diff --git a/src/Azure/Orleans.Clustering.AzureStorage/AzureBasedMembershipTable.cs b/src/Azure/Orleans.Clustering.AzureStorage/AzureBasedMembershipTable.cs index d98343451c..37d5acfcd0 100644 --- a/src/Azure/Orleans.Clustering.AzureStorage/AzureBasedMembershipTable.cs +++ b/src/Azure/Orleans.Clustering.AzureStorage/AzureBasedMembershipTable.cs @@ -121,7 +121,7 @@ public async Task InsertRow(MembershipEntry entry, TableVersion tableVersi { logger.LogWarning((int)TableStorageErrorCode.AzureTable_23, exc, - "Intermediate error inserting entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion == null ? "null" : tableVersion.ToString(), tableManager.TableName); + "Intermediate error inserting entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion is null ? "null" : tableVersion.ToString(), tableManager.TableName); throw; } } @@ -148,7 +148,7 @@ public async Task UpdateRow(MembershipEntry entry, string etag, TableVersi { logger.LogWarning((int)TableStorageErrorCode.AzureTable_25, exc, - "Intermediate error updating entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion == null ? "null" : tableVersion.ToString(), tableManager.TableName); + "Intermediate error updating entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion is null ? "null" : tableVersion.ToString(), tableManager.TableName); throw; } } diff --git a/src/Orleans.Core.Abstractions/Manifest/MajorMinorVersion.cs b/src/Orleans.Core.Abstractions/Manifest/MajorMinorVersion.cs index 56de05eca5..5562aad7c6 100644 --- a/src/Orleans.Core.Abstractions/Manifest/MajorMinorVersion.cs +++ b/src/Orleans.Core.Abstractions/Manifest/MajorMinorVersion.cs @@ -22,7 +22,12 @@ public MajorMinorVersion(long majorVersion, long minorVersion) /// /// Gets the zero value. /// - public static MajorMinorVersion Zero => new MajorMinorVersion(0, 0); + public static MajorMinorVersion Zero => new(0, 0); + + /// + /// Gets the minimum value. + /// + public static MajorMinorVersion MinValue => new(long.MinValue, long.MinValue); /// /// Gets the most significant version component. diff --git a/src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs b/src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs index b0ba5d21ac..c07d3ac229 100644 --- a/src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs +++ b/src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs @@ -41,13 +41,11 @@ public ClientClusterManifestProvider( _services = services; _gatewayManager = gatewayManager; this.LocalGrainManifest = clientManifestProvider.ClientManifest; - _current = new ClusterManifest(MajorMinorVersion.Zero, ImmutableDictionary.Empty, ImmutableArray.Create(this.LocalGrainManifest)); + _current = new ClusterManifest(MajorMinorVersion.MinValue, ImmutableDictionary.Empty, ImmutableArray.Create(this.LocalGrainManifest)); _updates = new AsyncEnumerable( - (previous, proposed) => previous is null || proposed.Version == MajorMinorVersion.Zero || proposed.Version > previous.Version, - _current) - { - OnPublished = update => Interlocked.Exchange(ref _current, update) - }; + initialValue: _current, + updateValidator: (previous, proposed) => previous is null || proposed.Version > previous.Version, + onPublished: update => Interlocked.Exchange(ref _current, update)); } /// diff --git a/src/Orleans.Core/SystemTargetInterfaces/IMembershipTable.cs b/src/Orleans.Core/SystemTargetInterfaces/IMembershipTable.cs index e17d23d00d..681e0601c0 100644 --- a/src/Orleans.Core/SystemTargetInterfaces/IMembershipTable.cs +++ b/src/Orleans.Core/SystemTargetInterfaces/IMembershipTable.cs @@ -109,7 +109,7 @@ public interface IMembershipTableSystemTarget : IMembershipTable, ISystemTarget } [Serializable, GenerateSerializer, Immutable] - public sealed class TableVersion : ISpanFormattable + public sealed class TableVersion : ISpanFormattable, IEquatable { /// /// The version part of this TableVersion. Monotonically increasing number. @@ -129,16 +129,19 @@ public TableVersion(int version, string eTag) VersionEtag = eTag; } - public TableVersion Next() - { - return new TableVersion(Version + 1, VersionEtag); - } + public TableVersion Next() => new (Version + 1, VersionEtag); public override string ToString() => $"<{Version}, {VersionEtag}>"; string IFormattable.ToString(string format, IFormatProvider formatProvider) => ToString(); bool ISpanFormattable.TryFormat(Span destination, out int charsWritten, ReadOnlySpan format, IFormatProvider provider) => destination.TryWrite($"<{Version}, {VersionEtag}>", out charsWritten); + + public override bool Equals(object obj) => Equals(obj as TableVersion); + public override int GetHashCode() => HashCode.Combine(Version, VersionEtag); + public bool Equals(TableVersion other) => other is not null && Version == other.Version && VersionEtag == other.VersionEtag; + public static bool operator ==(TableVersion left, TableVersion right) => EqualityComparer.Default.Equals(left, right); + public static bool operator !=(TableVersion left, TableVersion right) => !(left == right); } [Serializable] diff --git a/src/Orleans.Core/Utils/AsyncEnumerable.cs b/src/Orleans.Core/Utils/AsyncEnumerable.cs index 6be1ecaf59..23cef1ca95 100644 --- a/src/Orleans.Core/Utils/AsyncEnumerable.cs +++ b/src/Orleans.Core/Utils/AsyncEnumerable.cs @@ -1,43 +1,37 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; -using Orleans.Internal; namespace Orleans.Runtime.Utilities { internal static class AsyncEnumerable { - internal static readonly object InitialValue = new object(); - internal static readonly object DisposedValue = new object(); + internal static readonly object InitialValue = new(); + internal static readonly object DisposedValue = new(); } internal sealed class AsyncEnumerable : IAsyncEnumerable { - private enum PublishResult - { - Success, - InvalidUpdate, - Disposed - } - - private readonly object updateLock = new object(); - private readonly Func updateValidator; - private Element current; + private readonly object _updateLock = new(); + private readonly Func _updateValidator; + private readonly Action _onPublished; + private Element _current; - public AsyncEnumerable(Func updateValidator, T initial) + public AsyncEnumerable(T initialValue, Func updateValidator, Action onPublished) { - this.updateValidator = updateValidator; - this.current = new Element(initial); + _updateValidator = updateValidator; + _current = new Element(initialValue); + _onPublished = onPublished; + onPublished(initialValue); } - public Action OnPublished { get; set; } - - public bool TryPublish(T value) => this.TryPublish(new Element(value)) == PublishResult.Success; + public bool TryPublish(T value) => TryPublish(new Element(value)) == PublishResult.Success; public void Publish(T value) { - switch (this.TryPublish(new Element(value))) + switch (TryPublish(new Element(value))) { case PublishResult.Success: return; @@ -52,20 +46,20 @@ public void Publish(T value) private PublishResult TryPublish(Element newItem) { - if (this.current.IsDisposed) return PublishResult.Disposed; + if (_current.IsDisposed) return PublishResult.Disposed; - lock (this.updateLock) + lock (_updateLock) { - if (this.current.IsDisposed) return PublishResult.Disposed; + if (_current.IsDisposed) return PublishResult.Disposed; - if (this.current.IsValid && newItem.IsValid && !this.updateValidator(this.current.Value, newItem.Value)) + if (_current.IsValid && newItem.IsValid && !_updateValidator(_current.Value, newItem.Value)) { return PublishResult.InvalidUpdate; } - var curr = this.current; - Interlocked.Exchange(ref this.current, newItem); - if (newItem.IsValid) this.OnPublished?.Invoke(newItem.Value); + var curr = _current; + Interlocked.Exchange(ref _current, newItem); + if (newItem.IsValid) _onPublished(newItem.Value); curr.SetNext(newItem); return PublishResult.Success; @@ -74,81 +68,100 @@ private PublishResult TryPublish(Element newItem) public void Dispose() { - if (this.current.IsDisposed) return; + if (_current.IsDisposed) return; - lock (this.updateLock) + lock (_updateLock) { - if (this.current.IsDisposed) return; + if (_current.IsDisposed) return; - this.TryPublish(Element.CreateDisposed()); + TryPublish(Element.CreateDisposed()); } } - private void ThrowInvalidUpdate() => throw new ArgumentException("The value was not valid"); + [DoesNotReturn] + private static void ThrowInvalidUpdate() => throw new ArgumentException("The value was not valid."); + + [DoesNotReturn] + private static void ThrowDisposed() => throw new ObjectDisposedException("This instance has been disposed."); - private void ThrowDisposed() => throw new ObjectDisposedException("This instance has been disposed"); + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) => new AsyncEnumerator(_current, cancellationToken); - public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + private enum PublishResult { - return new AsyncEnumerator(this.current, cancellationToken); + Success, + InvalidUpdate, + Disposed } private sealed class AsyncEnumerator : IAsyncEnumerator { - private readonly Task cancellation; - private Element current; + private readonly TaskCompletionSource _cancellation = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly CancellationTokenRegistration _registration; + private Element _current; public AsyncEnumerator(Element initial, CancellationToken cancellation) { - if (!initial.IsValid) this.current = initial; + if (!initial.IsValid) + { + _current = initial; + } else { var result = Element.CreateInitial(); result.SetNext(initial); - this.current = result; + _current = result; } - if (cancellation != default) + if (cancellation.CanBeCanceled) { - this.cancellation = cancellation.WhenCancelled(); + _registration = cancellation.Register(() => _cancellation.TrySetResult()); } } - T IAsyncEnumerator.Current => this.current.Value; + T IAsyncEnumerator.Current => _current.Value; async ValueTask IAsyncEnumerator.MoveNextAsync() { - Task next; - if (this.cancellation != default) + if (_current.IsDisposed || _cancellation.Task.IsCompleted) { - next = this.current.NextAsync(); - var result = await Task.WhenAny(this.cancellation, next); - if (ReferenceEquals(result, this.cancellation)) return false; + return false; } - else + + var next = _current.NextAsync(); + var cancellationTask = _cancellation.Task; + var result = await Task.WhenAny(cancellationTask, next); + if (ReferenceEquals(result, cancellationTask)) { - next = this.current.NextAsync(); + return false; } - this.current = await next; - return this.current.IsValid; + _current = await next; + return _current.IsValid; } - ValueTask IAsyncDisposable.DisposeAsync() => default; + async ValueTask IAsyncDisposable.DisposeAsync() + { + _cancellation.TrySetResult(); + await _registration.DisposeAsync(); + } } private sealed class Element { - private readonly TaskCompletionSource next; - private readonly object value; + private readonly TaskCompletionSource _next; + private readonly object _value; - public Element(T value) + public Element(T value) : this(value, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)) { - this.value = value; - this.next = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } - public static Element CreateInitial() => new Element( + private Element(object value, TaskCompletionSource next) + { + _value = value; + _next = next; + } + + public static Element CreateInitial() => new( AsyncEnumerable.InitialValue, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); @@ -159,33 +172,27 @@ public static Element CreateDisposed() return new Element(AsyncEnumerable.DisposedValue, tcs); } - private Element(object value, TaskCompletionSource next) - { - this.value = value; - this.next = next; - } - - public bool IsValid => !this.IsInitial && !this.IsDisposed; + public bool IsValid => !IsInitial && !IsDisposed; public T Value { get { - if (this.IsInitial) ThrowInvalidInstance(); + if (IsInitial) ThrowInvalidInstance(); ObjectDisposedException.ThrowIf(IsDisposed, this); - if (this.value is T typedValue) return typedValue; + if (_value is T typedValue) return typedValue; return default; } } - public bool IsInitial => ReferenceEquals(this.value, AsyncEnumerable.InitialValue); - public bool IsDisposed => ReferenceEquals(this.value, AsyncEnumerable.DisposedValue); + public bool IsInitial => ReferenceEquals(_value, AsyncEnumerable.InitialValue); + public bool IsDisposed => ReferenceEquals(_value, AsyncEnumerable.DisposedValue); - public Task NextAsync() => this.next.Task; + public Task NextAsync() => _next.Task; - public void SetNext(Element next) => this.next.SetResult(next); + public void SetNext(Element next) => _next.SetResult(next); - private void ThrowInvalidInstance() => throw new InvalidOperationException("This instance does not have a value set."); + private static void ThrowInvalidInstance() => throw new InvalidOperationException("This instance does not have a value set."); } } } diff --git a/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs b/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs index e9254348eb..99c5dea782 100644 --- a/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs +++ b/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs @@ -42,11 +42,9 @@ public ClusterManifestProvider( ImmutableDictionary.CreateRange(new[] { new KeyValuePair(localSiloDetails.SiloAddress, this.LocalGrainManifest) }), ImmutableArray.Create(this.LocalGrainManifest)); _updates = new AsyncEnumerable( - (previous, proposed) => previous.Version <= MajorMinorVersion.Zero || proposed.Version > previous.Version, - _current) - { - OnPublished = update => Interlocked.Exchange(ref _current, update) - }; + initialValue: _current, + updateValidator: (previous, proposed) => proposed.Version > previous.Version, + onPublished: update => Interlocked.Exchange(ref _current, update)); } public ClusterManifest Current => _current; diff --git a/src/Orleans.Runtime/MembershipService/ClusterMembershipService.cs b/src/Orleans.Runtime/MembershipService/ClusterMembershipService.cs index 6c97224006..80da556090 100644 --- a/src/Orleans.Runtime/MembershipService/ClusterMembershipService.cs +++ b/src/Orleans.Runtime/MembershipService/ClusterMembershipService.cs @@ -25,11 +25,9 @@ public ClusterMembershipService( { this.snapshot = membershipTableManager.MembershipTableSnapshot.CreateClusterMembershipSnapshot(); this.updates = new AsyncEnumerable( - (previous, proposed) => proposed.Version == MembershipVersion.MinValue || proposed.Version > previous.Version, - this.snapshot) - { - OnPublished = update => Interlocked.Exchange(ref this.snapshot, update) - }; + initialValue: this.snapshot, + updateValidator: (previous, proposed) => proposed.Version > previous.Version, + onPublished: update => Interlocked.Exchange(ref this.snapshot, update)); this.membershipTableManager = membershipTableManager; this.log = log; this.fatalErrorHandler = fatalErrorHandler; diff --git a/src/Orleans.Runtime/MembershipService/MembershipTableManager.cs b/src/Orleans.Runtime/MembershipService/MembershipTableManager.cs index 7d766221f4..52deaf0556 100644 --- a/src/Orleans.Runtime/MembershipService/MembershipTableManager.cs +++ b/src/Orleans.Runtime/MembershipService/MembershipTableManager.cs @@ -63,11 +63,9 @@ public MembershipTableManager( MembershipVersion.MinValue, initialEntries); this.updates = new AsyncEnumerable( - (previous, proposed) => proposed.Version == MembershipVersion.MinValue || proposed.Version > previous.Version, - this.snapshot) - { - OnPublished = update => Interlocked.Exchange(ref this.snapshot, update) - }; + initialValue: this.snapshot, + updateValidator: (previous, proposed) => proposed.Version > previous.Version, + onPublished: update => Interlocked.Exchange(ref this.snapshot, update)); this.membershipUpdateTimer = timerFactory.Create( this.clusterMembershipOptions.TableRefreshTimeout, diff --git a/test/DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs b/test/DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs index 577b3c9de0..334038afb9 100644 --- a/test/DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs +++ b/test/DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs @@ -5,6 +5,8 @@ using UnitTests.GrainInterfaces; using Xunit; +namespace DefaultCluster.Tests; + /// /// Tests support for grain methods which return . /// diff --git a/test/DefaultCluster.Tests/BasicActivationTests.cs b/test/DefaultCluster.Tests/BasicActivationTests.cs index 2c096a1e9c..22b145a067 100644 --- a/test/DefaultCluster.Tests/BasicActivationTests.cs +++ b/test/DefaultCluster.Tests/BasicActivationTests.cs @@ -1,5 +1,4 @@ using System.Globalization; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using TestExtensions; using UnitTests.GrainInterfaces; @@ -15,28 +14,25 @@ public BasicActivationTests(DefaultClusterFixture fixture) : base(fixture) { } - private TimeSpan GetResponseTimeout() => this.Client.ServiceProvider.GetRequiredService().GetResponseTimeout(); - private void SetResponseTimeout(TimeSpan value) => this.Client.ServiceProvider.GetRequiredService().SetResponseTimeout(value); - [Fact, TestCategory("BVT"), TestCategory("ActivateDeactivate"), TestCategory("GetGrain")] - public void BasicActivation_ActivateAndUpdate() + public async Task BasicActivation_ActivateAndUpdate() { long g1Key = GetRandomGrainId(); long g2Key = GetRandomGrainId(); ITestGrain g1 = this.GrainFactory.GetGrain(g1Key); ITestGrain g2 = this.GrainFactory.GetGrain(g2Key); Assert.Equal(g1Key, g1.GetPrimaryKeyLong()); - Assert.Equal(g1Key, g1.GetKey().Result); - Assert.Equal(g1Key.ToString(), g1.GetLabel().Result); - Assert.Equal(g2Key, g2.GetKey().Result); - Assert.Equal(g2Key.ToString(), g2.GetLabel().Result); + Assert.Equal(g1Key, await g1.GetKey()); + Assert.Equal(g1Key.ToString(), await g1.GetLabel()); + Assert.Equal(g2Key, await g2.GetKey()); + Assert.Equal(g2Key.ToString(), await g2.GetLabel()); - g1.SetLabel("one").Wait(); - Assert.Equal("one", g1.GetLabel().Result); - Assert.Equal(g2Key.ToString(), g2.GetLabel().Result); + await g1.SetLabel("one"); + Assert.Equal("one", await g1.GetLabel()); + Assert.Equal(g2Key.ToString(), await g2.GetLabel()); ITestGrain g1a = this.GrainFactory.GetGrain(g1Key); - Assert.Equal("one", g1a.GetLabel().Result); + Assert.Equal("one", await g1a.GetLabel()); } [Fact, TestCategory("BVT"), TestCategory("ActivateDeactivate"), TestCategory("GetGrain")] @@ -210,49 +206,35 @@ public void BasicActivation_MultipleGrainInterfaces() [Fact, TestCategory("SlowBVT"), TestCategory("ActivateDeactivate"), TestCategory("Reentrancy")] - public void BasicActivation_Reentrant_RecoveryAfterExpiredMessage() + public async Task BasicActivation_Reentrant_RecoveryAfterExpiredMessage() { List promises = new List(); - TimeSpan prevTimeout = this.GetResponseTimeout(); + TimeSpan timeout = TimeSpan.FromMilliseconds(1000); + + ITestGrain grain = this.GrainFactory.GetGrain(GetRandomGrainId()); + int num = 10; + for (long i = 0; i < num; i++) + { + Task task = grain.DoLongAction( + TimeSpan.FromMilliseconds(timeout.TotalMilliseconds * 3), + "A_" + i); + promises.Add(task); + } try { - // set short response time and ask to do long operation, to trigger expired msgs in the silo queues. - TimeSpan shortTimeout = TimeSpan.FromMilliseconds(1000); - this.SetResponseTimeout(shortTimeout); - - ITestGrain grain = this.GrainFactory.GetGrain(GetRandomGrainId()); - int num = 10; - for (long i = 0; i < num; i++) - { - Task task = grain.DoLongAction( - TimeSpan.FromMilliseconds(shortTimeout.TotalMilliseconds * 3), - "A_" + i); - promises.Add(task); - } - try - { - Task.WhenAll(promises).Wait(); - } - catch (Exception) - { - this.Logger.LogInformation("Done with stress iteration."); - } - - // wait a bit to make sure expired msgs in the silo is trigered. - Thread.Sleep(TimeSpan.FromSeconds(10)); - - // set the regular response time back, expect msgs ot succeed. - this.SetResponseTimeout(prevTimeout); - - this.Logger.LogInformation("About to send a next legit request that should succeed."); - grain.DoLongAction(TimeSpan.FromMilliseconds(1), "B_" + 0).Wait(); - this.Logger.LogInformation("The request succeeded."); + await Task.WhenAll(promises); } - finally + catch (Exception) { - // set the regular response time back, expect msgs ot succeed. - this.SetResponseTimeout(prevTimeout); + this.Logger.LogInformation("Done with stress iteration."); } + + // wait a bit to make sure expired msgs in the silo is trigered. + await Task.Delay(TimeSpan.FromSeconds(10)); + + this.Logger.LogInformation("About to send a next legit request that should succeed."); + await grain.DoLongAction(TimeSpan.FromMilliseconds(1), "B_" + 0); + this.Logger.LogInformation("The request succeeded."); } [Fact, TestCategory("BVT"), TestCategory("RequestContext"), TestCategory("GetGrain")] diff --git a/test/Grains/TestGrainInterfaces/ITestGrain.cs b/test/Grains/TestGrainInterfaces/ITestGrain.cs index e866a4c1b9..d2d2f06f43 100644 --- a/test/Grains/TestGrainInterfaces/ITestGrain.cs +++ b/test/Grains/TestGrainInterfaces/ITestGrain.cs @@ -27,6 +27,7 @@ public interface ITestGrain : IGrainWithIntegerKey Task StartTimer(); + [ResponseTimeout("00:00:01")] Task DoLongAction(TimeSpan timespan, string str); } diff --git a/test/NonSilo.Tests/Directory/MockClusterMembershipService.cs b/test/NonSilo.Tests/Directory/MockClusterMembershipService.cs index 8109f52352..362a239fbe 100644 --- a/test/NonSilo.Tests/Directory/MockClusterMembershipService.cs +++ b/test/NonSilo.Tests/Directory/MockClusterMembershipService.cs @@ -23,12 +23,10 @@ public MockClusterMembershipService(Dictionary(); this.snapshot = ToSnapshot(this.statuses, ++version); - this.updates = this.updates = new AsyncEnumerable( - (previous, proposed) => proposed.Version == MembershipVersion.MinValue || proposed.Version > previous.Version, - this.snapshot) - { - OnPublished = update => Interlocked.Exchange(ref this.snapshot, update) - }; + this.updates = new AsyncEnumerable( + initialValue: this.snapshot, + updateValidator: (previous, proposed) => proposed.Version > previous.Version, + onPublished: update => Interlocked.Exchange(ref this.snapshot, update)); } public void UpdateSiloStatus(SiloAddress siloAddress, SiloStatus siloStatus, string name) diff --git a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs index 613a96cb5b..88e721138f 100644 --- a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs +++ b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs @@ -286,7 +286,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) "Calling UpdateRow with Entry = {Entry} correct eTag = {ETag} old version={TableVersion}", siloEntry, etagBefore, - tableVersion != null ? tableVersion.ToString() : "null"); + tableVersion?.ToString() ?? "null"); ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); tableData = await membershipTable.ReadAll(); @@ -298,7 +298,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) "Calling UpdateRow with Entry = {Entry} correct eTag = {ETag} correct version={TableVersion}", siloEntry, etagBefore, - tableVersion != null ? tableVersion.ToString() : "null"); + tableVersion?.ToString() ?? "null"); ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); @@ -308,7 +308,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) "Calling UpdateRow with Entry = {Entry} old eTag = {ETag} old version={TableVersion}", siloEntry, etagBefore, - tableVersion != null ? tableVersion.ToString() : "null"); + tableVersion?.ToString() ?? "null"); ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); @@ -326,7 +326,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true) "Calling UpdateRow with Entry = {Entry} correct eTag = {ETag} old version={TableVersion}", siloEntry, etagAfter, - tableVersion != null ? tableVersion.ToString() : "null"); + tableVersion?.ToString() ?? "null"); ok = await membershipTable.UpdateRow(siloEntry, etagAfter, tableVersion); @@ -419,6 +419,7 @@ protected async Task MembershipTable_UpdateIAmAlive(bool extendedProtocol = true // compare that the value is close to what we passed in, but not exactly, as the underlying store can set its own precision settings // (ie: in SQL Server this is defined as datetime2(3), so we don't expect precision to account for less than 0.001s values) Assert.True((amAliveTime - member.Item1.IAmAliveTime).Duration() < TimeSpan.FromSeconds(2), (amAliveTime - member.Item1.IAmAliveTime).Duration().ToString()); + Assert.Equal(newTableVersion.Version, tableData.Version.Version); } protected async Task MembershipTable_CleanupDefunctSiloEntries(bool extendedProtocol = true)