Skip to content

Commit

Permalink
Handle too many heart missing with reconnection (#394)
Browse files Browse the repository at this point in the history
Remove the update status from the Dispose() function.
It is unnecessary since the disposal can be called only from the Client close() function
where the status is set.

Add a function to convert the connection close reason string to the Reliable.ChangeStatusReason Enum.
This mapping should be improved to version 2.0, where we can introduce breaking changes.
In version 1.x, we need the conversion function.

---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Oct 25, 2024
1 parent b648639 commit 3ed2978
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 39 deletions.
12 changes: 9 additions & 3 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ private async Task OnConnectionClosed(string reason)
if (ConnectionClosed != null)
{
var t = ConnectionClosed?.Invoke(reason)!;
await t.ConfigureAwait(false);
if (t != null)
await t.ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -765,7 +766,7 @@ private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffse
return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification)).ConfigureAwait(false);
}

public async Task<CloseResponse> Close(string reason)
private async Task<CloseResponse> Close(string reason, string closedStatus)
{
if (IsClosed)
{
Expand All @@ -775,7 +776,7 @@ public async Task<CloseResponse> Close(string reason)
InternalClose();
try
{
_connection.UpdateCloseStatus(ConnectionClosedReason.Normal);
_connection.UpdateCloseStatus(closedStatus);
var result =
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
TimeSpan.FromSeconds(10)).ConfigureAwait(false);
Expand All @@ -799,6 +800,11 @@ public async Task<CloseResponse> Close(string reason)
return new CloseResponse(0, ResponseCode.Ok);
}

public async Task<CloseResponse> Close(string reason)
{
return await Close(reason, ConnectionClosedReason.Normal).ConfigureAwait(false);
}

// _poolSemaphore is introduced here: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/328
// the MaybeClose can be called in different threads so we need to protect the pool
// the pool itself is thread safe but we need to protect the flow to be sure that the
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ internal static class ConnectionClosedReason
{
public const string Normal = "TCP connection closed normal";
public const string Unexpected = "TCP connection closed unexpected";
public const string MissingHeartbeat = "TCP connection closed missing heartbeat";

}

public class Connection : IDisposable
Expand Down Expand Up @@ -240,7 +242,6 @@ public void Dispose()
{
try
{
UpdateCloseStatus(ConnectionClosedReason.Normal);
if (!_cancelTokenSource.IsCancellationRequested)
{
_cancelTokenSource.Cancel();
Expand Down
12 changes: 9 additions & 3 deletions RabbitMQ.Stream.Client/HeartBeatHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ public class HeartBeatHandler
private uint _missedHeartbeat;

private readonly Func<ValueTask<bool>> _sendHeartbeatFunc;
private readonly Func<string, Task<CloseResponse>> _close;
private readonly Func<string, string, Task<CloseResponse>> _close;
private readonly int _heartbeat;
private readonly ILogger<HeartBeatHandler> _logger;

public HeartBeatHandler(Func<ValueTask<bool>> sendHeartbeatFunc,
Func<string, Task<CloseResponse>> close,
Func<string, string, Task<CloseResponse>> close,
int heartbeat,
ILogger<HeartBeatHandler> logger = null
)
Expand Down Expand Up @@ -77,7 +77,13 @@ private async Task PerformHeartBeatAsync()
// client will be closed
_logger.LogCritical("Too many heartbeats missed: {MissedHeartbeatCounter}", _missedHeartbeat);
Close();
await _close($"Too many heartbeats missed: {_missedHeartbeat}. Client connection will be closed.").ConfigureAwait(false);
// The heartbeat is missed for x times the client will be closed with the reason Unexpected
// In this way the ReliableProducer / ReliableConsumer will be able to handle the close reason
// and reconnect the client
// Even it is not a perfect solution, it is a good way to handle the case to avoid to introduce breaking changes
// we need to review all the status and the close reason on the version 2.0
await _close($"Too many heartbeats missed: {_missedHeartbeat}. Client connection will be closed.",
ConnectionClosedReason.MissingHeartbeat).ConfigureAwait(false);
}

internal void UpdateHeartBeat()
Expand Down
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ RabbitMQ.Stream.Client.Hash.Murmur3.Seed.get -> uint
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.HashRoutingMurmurStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor) -> void
RabbitMQ.Stream.Client.HeartBeatHandler
RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func<System.Threading.Tasks.ValueTask<bool>> sendHeartbeatFunc, System.Func<string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>> close, int heartbeat, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.HeartBeatHandler> logger = null) -> void
RabbitMQ.Stream.Client.IClient
RabbitMQ.Stream.Client.IClient.Close(string reason) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>
RabbitMQ.Stream.Client.IClient.ConnectionProperties.get -> System.Collections.Generic.IDictionary<string, string>
Expand Down
3 changes: 3 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.set -> void
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.get -> System.Func<RabbitMQ.Stream.Client.MetaDataUpdate, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.set -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func<System.Threading.Tasks.ValueTask<bool>> sendHeartbeatFunc, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>> close, int heartbeat, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.HeartBeatHandler> logger = null) -> void
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
RabbitMQ.Stream.Client.IClient.ClientId.init -> void
RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
Expand Down Expand Up @@ -243,6 +244,7 @@ RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.BoolFailure = 5 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByStrategyPolicy = 4 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByUser = 3 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.DisconnectedByTooManyHeartbeatMissing = 6 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.MetaDataUpdate = 2 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None = 0 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.UnexpectedlyDisconnected = 1 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
Expand Down Expand Up @@ -367,6 +369,7 @@ static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientPa
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamConsumer
static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamProducer
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
static RabbitMQ.Stream.Client.Reliable.ReliableBase.FromConnectionClosedReasonToStatusReason(string connectionClosedReason) -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
static RabbitMQ.Stream.Client.Reliable.ReliableBase.RandomWait() -> System.Threading.Tasks.Task
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderOrRandomReplicasConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
5 changes: 2 additions & 3 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
return;

await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
},
MetadataHandler = async _ =>
{
Expand Down Expand Up @@ -131,8 +131,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)

var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
ChangeStatusReason.UnexpectedlyDisconnected)
.ConfigureAwait(false);
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
},
MetadataHandler = async update =>
{
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
return;
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
ChangeStatusReason.UnexpectedlyDisconnected)
ReliableBase.FromConnectionClosedReasonToStatusReason(closeReason))
.ConfigureAwait(false);
},
MetadataHandler = async update =>
Expand Down Expand Up @@ -116,7 +116,7 @@ await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
return;

await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
ReliableBase.FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
},
ConfirmHandler = confirmation =>
{
Expand Down
20 changes: 19 additions & 1 deletion RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public enum ChangeStatusReason
MetaDataUpdate,
ClosedByUser,
ClosedByStrategyPolicy,
BoolFailure
BoolFailure,
DisconnectedByTooManyHeartbeatMissing,
}

public record ReliableConfig
Expand Down Expand Up @@ -109,6 +110,23 @@ public abstract class ReliableBase
protected abstract ILogger BaseLogger { get; }
private ReliableConfig _reliableConfig;

/// <summary>
/// The function to convert the string ConnectionClosedReason to the ChangeStatusReason enum
///
/// </summary>
/// <param name="connectionClosedReason"></param>
/// <returns></returns>
/// <exception cref="ArgumentOutOfRangeException"></exception>
protected static ChangeStatusReason FromConnectionClosedReasonToStatusReason(string connectionClosedReason)
{
// Can be removed on the version 2.0 when the ConnectionClosedReason will be an enum as well
return connectionClosedReason switch
{
ConnectionClosedReason.MissingHeartbeat => ChangeStatusReason.DisconnectedByTooManyHeartbeatMissing,
ConnectionClosedReason.Unexpected => ChangeStatusReason.UnexpectedlyDisconnected,
_ => throw new ArgumentOutOfRangeException(nameof(connectionClosedReason), connectionClosedReason, null)
};
}
protected static async Task RandomWait()
{
await Task.Delay(Consts.RandomMid()).ConfigureAwait(false);
Expand Down
4 changes: 2 additions & 2 deletions Tests/UnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public void HeartBeatRaiseClose()
var testPassed = new TaskCompletionSource<bool>();
var hBeatHandler = new HeartBeatHandler(
() => default,
s =>
(s, r) =>
{
testPassed.SetResult(true);
return null;
Expand All @@ -412,7 +412,7 @@ public void HeartBeatZeroNotRaisesClose()
// the HeartBeatHandler is disabled by default
var hBeatHandler = new HeartBeatHandler(
() => default,
s => null,
(s, r) => null,
0);
Assert.False(hBeatHandler.IsActive());
}
Expand Down
Loading

0 comments on commit 3ed2978

Please sign in to comment.