Skip to content

Commit

Permalink
Use more consistent version manipulation in cluster membership tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Oct 23, 2023
1 parent 4857ff9 commit b0c4493
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.InsertRow aborted due to null check. MembershipEntry is null.");
throw new ArgumentNullException(nameof(entry));
}
if (tableVersion == null)
if (tableVersion is null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.InsertRow aborted due to null check. TableVersion is null ");
throw new ArgumentNullException(nameof(tableVersion));
Expand Down Expand Up @@ -132,7 +132,7 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateRow aborted due to null check. MembershipEntry is null.");
throw new ArgumentNullException(nameof(entry));
}
if (tableVersion == null)
if (tableVersion is null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateRow aborted due to null check. TableVersion is null");
throw new ArgumentNullException(nameof(tableVersion));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
{
logger.LogWarning((int)TableStorageErrorCode.AzureTable_23,
exc,
"Intermediate error inserting entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion == null ? "null" : tableVersion.ToString(), tableManager.TableName);
"Intermediate error inserting entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion is null ? "null" : tableVersion.ToString(), tableManager.TableName);
throw;
}
}
Expand All @@ -148,7 +148,7 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
{
logger.LogWarning((int)TableStorageErrorCode.AzureTable_25,
exc,
"Intermediate error updating entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion == null ? "null" : tableVersion.ToString(), tableManager.TableName);
"Intermediate error updating entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion is null ? "null" : tableVersion.ToString(), tableManager.TableName);
throw;
}
}
Expand Down
13 changes: 8 additions & 5 deletions src/Orleans.Core/SystemTargetInterfaces/IMembershipTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public interface IMembershipTableSystemTarget : IMembershipTable, ISystemTarget
}

[Serializable, GenerateSerializer, Immutable]
public sealed class TableVersion : ISpanFormattable
public sealed class TableVersion : ISpanFormattable, IEquatable<TableVersion>
{
/// <summary>
/// The version part of this TableVersion. Monotonically increasing number.
Expand All @@ -129,16 +129,19 @@ public TableVersion(int version, string eTag)
VersionEtag = eTag;
}

public TableVersion Next()
{
return new TableVersion(Version + 1, VersionEtag);
}
public TableVersion Next() => new (Version + 1, VersionEtag);

public override string ToString() => $"<{Version}, {VersionEtag}>";
string IFormattable.ToString(string format, IFormatProvider formatProvider) => ToString();

bool ISpanFormattable.TryFormat(Span<char> destination, out int charsWritten, ReadOnlySpan<char> format, IFormatProvider provider)
=> destination.TryWrite($"<{Version}, {VersionEtag}>", out charsWritten);

public override bool Equals(object obj) => Equals(obj as TableVersion);
public override int GetHashCode() => HashCode.Combine(Version, VersionEtag);
public bool Equals(TableVersion other) => other is not null && Version == other.Version && VersionEtag == other.VersionEtag;
public static bool operator ==(TableVersion left, TableVersion right) => EqualityComparer<TableVersion>.Default.Equals(left, right);
public static bool operator !=(TableVersion left, TableVersion right) => !(left == right);
}

[Serializable]
Expand Down
142 changes: 71 additions & 71 deletions src/Orleans.Core/Utils/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
@@ -1,44 +1,37 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Orleans.Internal;

namespace Orleans.Runtime.Utilities
{
internal static class AsyncEnumerable
{
internal static readonly object InitialValue = new object();
internal static readonly object DisposedValue = new object();
internal static readonly object InitialValue = new();
internal static readonly object DisposedValue = new();
}

internal sealed class AsyncEnumerable<T> : IAsyncEnumerable<T>
{
private enum PublishResult
{
Success,
InvalidUpdate,
Disposed
}

private readonly object updateLock = new object();
private readonly Func<T, T, bool> updateValidator;
private readonly Action<T> onPublished;
private Element current;
private readonly object _updateLock = new();
private readonly Func<T, T, bool> _updateValidator;
private readonly Action<T> _onPublished;
private Element _current;

public AsyncEnumerable(T initialValue, Func<T, T, bool> updateValidator, Action<T> onPublished)
{
this.updateValidator = updateValidator;
this.current = new Element(initialValue);
this.onPublished = onPublished;
_updateValidator = updateValidator;
_current = new Element(initialValue);
_onPublished = onPublished;
onPublished(initialValue);
}

public bool TryPublish(T value) => this.TryPublish(new Element(value)) == PublishResult.Success;
public bool TryPublish(T value) => TryPublish(new Element(value)) == PublishResult.Success;

public void Publish(T value)
{
switch (this.TryPublish(new Element(value)))
switch (TryPublish(new Element(value)))
{
case PublishResult.Success:
return;
Expand All @@ -53,20 +46,20 @@ public void Publish(T value)

private PublishResult TryPublish(Element newItem)
{
if (this.current.IsDisposed) return PublishResult.Disposed;
if (_current.IsDisposed) return PublishResult.Disposed;

lock (this.updateLock)
lock (_updateLock)
{
if (this.current.IsDisposed) return PublishResult.Disposed;
if (_current.IsDisposed) return PublishResult.Disposed;

if (this.current.IsValid && newItem.IsValid && !this.updateValidator(this.current.Value, newItem.Value))
if (_current.IsValid && newItem.IsValid && !_updateValidator(_current.Value, newItem.Value))
{
return PublishResult.InvalidUpdate;
}

var curr = this.current;
Interlocked.Exchange(ref this.current, newItem);
if (newItem.IsValid) this.onPublished(newItem.Value);
var curr = _current;
Interlocked.Exchange(ref _current, newItem);
if (newItem.IsValid) _onPublished(newItem.Value);
curr.SetNext(newItem);

return PublishResult.Success;
Expand All @@ -75,87 +68,100 @@ private PublishResult TryPublish(Element newItem)

public void Dispose()
{
if (this.current.IsDisposed) return;
if (_current.IsDisposed) return;

lock (this.updateLock)
lock (_updateLock)
{
if (this.current.IsDisposed) return;
if (_current.IsDisposed) return;

this.TryPublish(Element.CreateDisposed());
TryPublish(Element.CreateDisposed());
}
}

private void ThrowInvalidUpdate() => throw new ArgumentException("The value was not valid");
[DoesNotReturn]
private static void ThrowInvalidUpdate() => throw new ArgumentException("The value was not valid.");

[DoesNotReturn]
private static void ThrowDisposed() => throw new ObjectDisposedException("This instance has been disposed.");

private void ThrowDisposed() => throw new ObjectDisposedException("This instance has been disposed");
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) => new AsyncEnumerator(_current, cancellationToken);

public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
private enum PublishResult
{
return new AsyncEnumerator(this.current, cancellationToken);
Success,
InvalidUpdate,
Disposed
}

private sealed class AsyncEnumerator : IAsyncEnumerator<T>
{
private readonly Task cancellation;
private Element current;
private readonly TaskCompletionSource _cancellation = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly CancellationTokenRegistration _registration;
private Element _current;

public AsyncEnumerator(Element initial, CancellationToken cancellation)
{
if (!initial.IsValid)
{
this.current = initial;
_current = initial;
}
else
{
var result = Element.CreateInitial();
result.SetNext(initial);
this.current = result;
_current = result;
}

if (cancellation != default)
if (cancellation.CanBeCanceled)
{
this.cancellation = cancellation.WhenCancelled();
_registration = cancellation.Register(() => _cancellation.TrySetResult());
}
}

T IAsyncEnumerator<T>.Current => this.current.Value;
T IAsyncEnumerator<T>.Current => _current.Value;

async ValueTask<bool> IAsyncEnumerator<T>.MoveNextAsync()
{
if (this.current.IsDisposed)
if (_current.IsDisposed || _cancellation.Task.IsCompleted)
{
return false;
}

Task<Element> next = this.current.NextAsync();
if (this.cancellation != default)
var next = _current.NextAsync();
var cancellationTask = _cancellation.Task;
var result = await Task.WhenAny(cancellationTask, next);
if (ReferenceEquals(result, cancellationTask))
{
var result = await Task.WhenAny(this.cancellation, next);
if (ReferenceEquals(result, this.cancellation))
{
return false;
}
return false;
}

this.current = await next;
return this.current.IsValid;
_current = await next;
return _current.IsValid;
}

ValueTask IAsyncDisposable.DisposeAsync() => default;
async ValueTask IAsyncDisposable.DisposeAsync()
{
_cancellation.TrySetResult();
await _registration.DisposeAsync();
}
}

private sealed class Element
{
private readonly TaskCompletionSource<Element> next;
private readonly object value;
private readonly TaskCompletionSource<Element> _next;
private readonly object _value;

public Element(T value)
public Element(T value) : this(value, new TaskCompletionSource<Element>(TaskCreationOptions.RunContinuationsAsynchronously))
{
this.value = value;
this.next = new TaskCompletionSource<Element>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public static Element CreateInitial() => new Element(
private Element(object value, TaskCompletionSource<Element> next)
{
_value = value;
_next = next;
}

public static Element CreateInitial() => new(
AsyncEnumerable.InitialValue,
new TaskCompletionSource<Element>(TaskCreationOptions.RunContinuationsAsynchronously));

Expand All @@ -166,31 +172,25 @@ public static Element CreateDisposed()
return new Element(AsyncEnumerable.DisposedValue, tcs);
}

private Element(object value, TaskCompletionSource<Element> next)
{
this.value = value;
this.next = next;
}

public bool IsValid => !this.IsInitial && !this.IsDisposed;
public bool IsValid => !IsInitial && !IsDisposed;

public T Value
{
get
{
if (this.IsInitial) ThrowInvalidInstance();
if (IsInitial) ThrowInvalidInstance();
ObjectDisposedException.ThrowIf(IsDisposed, this);
if (this.value is T typedValue) return typedValue;
if (_value is T typedValue) return typedValue;
return default;
}
}

public bool IsInitial => ReferenceEquals(this.value, AsyncEnumerable.InitialValue);
public bool IsDisposed => ReferenceEquals(this.value, AsyncEnumerable.DisposedValue);
public bool IsInitial => ReferenceEquals(_value, AsyncEnumerable.InitialValue);
public bool IsDisposed => ReferenceEquals(_value, AsyncEnumerable.DisposedValue);

public Task<Element> NextAsync() => this.next.Task;
public Task<Element> NextAsync() => _next.Task;

public void SetNext(Element next) => this.next.SetResult(next);
public void SetNext(Element next) => _next.SetResult(next);

private static void ThrowInvalidInstance() => throw new InvalidOperationException("This instance does not have a value set.");
}
Expand Down
2 changes: 2 additions & 0 deletions test/DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using UnitTests.GrainInterfaces;
using Xunit;

namespace DefaultCluster.Tests;

/// <summary>
/// Tests support for grain methods which return <see cref="IAsyncEnumerable{T}"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true)
"Calling UpdateRow with Entry = {Entry} correct eTag = {ETag} old version={TableVersion}",
siloEntry,
etagBefore,
tableVersion != null ? tableVersion.ToString() : "null");
tableVersion?.ToString() ?? "null");
ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion);
Assert.False(ok, $"row update should have failed - Table Data = {tableData}");
tableData = await membershipTable.ReadAll();
Expand All @@ -298,7 +298,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true)
"Calling UpdateRow with Entry = {Entry} correct eTag = {ETag} correct version={TableVersion}",
siloEntry,
etagBefore,
tableVersion != null ? tableVersion.ToString() : "null");
tableVersion?.ToString() ?? "null");

ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion);

Expand All @@ -308,7 +308,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true)
"Calling UpdateRow with Entry = {Entry} old eTag = {ETag} old version={TableVersion}",
siloEntry,
etagBefore,
tableVersion != null ? tableVersion.ToString() : "null");
tableVersion?.ToString() ?? "null");
ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion);
Assert.False(ok, $"row update should have failed - Table Data = {tableData}");

Expand All @@ -326,7 +326,7 @@ protected async Task MembershipTable_UpdateRow(bool extendedProtocol = true)
"Calling UpdateRow with Entry = {Entry} correct eTag = {ETag} old version={TableVersion}",
siloEntry,
etagAfter,
tableVersion != null ? tableVersion.ToString() : "null");
tableVersion?.ToString() ?? "null");

ok = await membershipTable.UpdateRow(siloEntry, etagAfter, tableVersion);

Expand Down Expand Up @@ -419,6 +419,7 @@ protected async Task MembershipTable_UpdateIAmAlive(bool extendedProtocol = true
// compare that the value is close to what we passed in, but not exactly, as the underlying store can set its own precision settings
// (ie: in SQL Server this is defined as datetime2(3), so we don't expect precision to account for less than 0.001s values)
Assert.True((amAliveTime - member.Item1.IAmAliveTime).Duration() < TimeSpan.FromSeconds(2), (amAliveTime - member.Item1.IAmAliveTime).Duration().ToString());
Assert.Equal(newTableVersion.Version, tableData.Version.Version);
}

protected async Task MembershipTable_CleanupDefunctSiloEntries(bool extendedProtocol = true)
Expand Down

0 comments on commit b0c4493

Please sign in to comment.