From 3ed2978a4fad5600e130646cb878ebf5542770a7 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 25 Oct 2024 09:56:54 +0200 Subject: [PATCH] Handle too many heart missing with reconnection (#394) Remove the update status from the Dispose() function. It is unnecessary since the disposal can be called only from the Client close() function where the status is set. Add a function to convert the connection close reason string to the Reliable.ChangeStatusReason Enum. This mapping should be improved to version 2.0, where we can introduce breaking changes. In version 1.x, we need the conversion function. --------- Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Client.cs | 12 +++-- RabbitMQ.Stream.Client/Connection.cs | 3 +- RabbitMQ.Stream.Client/HeartBeatHandler.cs | 12 +++-- RabbitMQ.Stream.Client/PublicAPI.Shipped.txt | 1 - .../PublicAPI.Unshipped.txt | 3 ++ .../Reliable/ConsumerFactory.cs | 5 +- .../Reliable/ProducerFactory.cs | 4 +- .../Reliable/ReliableBase.cs | 20 +++++++- Tests/UnitTests.cs | 4 +- docs/ReliableClient/BestPracticesClient.cs | 49 ++++++++++--------- 10 files changed, 74 insertions(+), 39 deletions(-) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index f116c581..28ffd9e1 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -194,7 +194,8 @@ private async Task OnConnectionClosed(string reason) if (ConnectionClosed != null) { var t = ConnectionClosed?.Invoke(reason)!; - await t.ConfigureAwait(false); + if (t != null) + await t.ConfigureAwait(false); } } @@ -765,7 +766,7 @@ private async ValueTask ConsumerUpdateResponse(uint rCorrelationId, IOffse return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification)).ConfigureAwait(false); } - public async Task Close(string reason) + private async Task Close(string reason, string closedStatus) { if (IsClosed) { @@ -775,7 +776,7 @@ public async Task Close(string reason) InternalClose(); try { - _connection.UpdateCloseStatus(ConnectionClosedReason.Normal); + _connection.UpdateCloseStatus(closedStatus); var result = await Request(corr => new CloseRequest(corr, reason), TimeSpan.FromSeconds(10)).ConfigureAwait(false); @@ -799,6 +800,11 @@ public async Task Close(string reason) return new CloseResponse(0, ResponseCode.Ok); } + public async Task Close(string reason) + { + return await Close(reason, ConnectionClosedReason.Normal).ConfigureAwait(false); + } + // _poolSemaphore is introduced here: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/328 // the MaybeClose can be called in different threads so we need to protect the pool // the pool itself is thread safe but we need to protect the flow to be sure that the diff --git a/RabbitMQ.Stream.Client/Connection.cs b/RabbitMQ.Stream.Client/Connection.cs index d31de0ac..e5c6a68e 100644 --- a/RabbitMQ.Stream.Client/Connection.cs +++ b/RabbitMQ.Stream.Client/Connection.cs @@ -18,6 +18,8 @@ internal static class ConnectionClosedReason { public const string Normal = "TCP connection closed normal"; public const string Unexpected = "TCP connection closed unexpected"; + public const string MissingHeartbeat = "TCP connection closed missing heartbeat"; + } public class Connection : IDisposable @@ -240,7 +242,6 @@ public void Dispose() { try { - UpdateCloseStatus(ConnectionClosedReason.Normal); if (!_cancelTokenSource.IsCancellationRequested) { _cancelTokenSource.Cancel(); diff --git a/RabbitMQ.Stream.Client/HeartBeatHandler.cs b/RabbitMQ.Stream.Client/HeartBeatHandler.cs index fcb833a8..1097c5bd 100644 --- a/RabbitMQ.Stream.Client/HeartBeatHandler.cs +++ b/RabbitMQ.Stream.Client/HeartBeatHandler.cs @@ -19,12 +19,12 @@ public class HeartBeatHandler private uint _missedHeartbeat; private readonly Func> _sendHeartbeatFunc; - private readonly Func> _close; + private readonly Func> _close; private readonly int _heartbeat; private readonly ILogger _logger; public HeartBeatHandler(Func> sendHeartbeatFunc, - Func> close, + Func> close, int heartbeat, ILogger logger = null ) @@ -77,7 +77,13 @@ private async Task PerformHeartBeatAsync() // client will be closed _logger.LogCritical("Too many heartbeats missed: {MissedHeartbeatCounter}", _missedHeartbeat); Close(); - await _close($"Too many heartbeats missed: {_missedHeartbeat}. Client connection will be closed.").ConfigureAwait(false); + // The heartbeat is missed for x times the client will be closed with the reason Unexpected + // In this way the ReliableProducer / ReliableConsumer will be able to handle the close reason + // and reconnect the client + // Even it is not a perfect solution, it is a good way to handle the case to avoid to introduce breaking changes + // we need to review all the status and the close reason on the version 2.0 + await _close($"Too many heartbeats missed: {_missedHeartbeat}. Client connection will be closed.", + ConnectionClosedReason.MissingHeartbeat).ConfigureAwait(false); } internal void UpdateHeartBeat() diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index c640b9c7..53c35247 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -308,7 +308,6 @@ RabbitMQ.Stream.Client.Hash.Murmur3.Seed.get -> uint RabbitMQ.Stream.Client.HashRoutingMurmurStrategy RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.HashRoutingMurmurStrategy(System.Func routingKeyExtractor) -> void RabbitMQ.Stream.Client.HeartBeatHandler -RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func> sendHeartbeatFunc, System.Func> close, int heartbeat, Microsoft.Extensions.Logging.ILogger logger = null) -> void RabbitMQ.Stream.Client.IClient RabbitMQ.Stream.Client.IClient.Close(string reason) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.IClient.ConnectionProperties.get -> System.Collections.Generic.IDictionary diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 1de1ca34..6e7c29ab 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -137,6 +137,7 @@ RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.set -> void RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.get -> System.Func RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.set -> void RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> +RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func> sendHeartbeatFunc, System.Func> close, int heartbeat, Microsoft.Extensions.Logging.ILogger logger = null) -> void RabbitMQ.Stream.Client.IClient.ClientId.get -> string RabbitMQ.Stream.Client.IClient.ClientId.init -> void RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary @@ -243,6 +244,7 @@ RabbitMQ.Stream.Client.Reliable.ChangeStatusReason RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.BoolFailure = 5 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByStrategyPolicy = 4 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByUser = 3 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason +RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.DisconnectedByTooManyHeartbeatMissing = 6 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.MetaDataUpdate = 2 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None = 0 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.UnexpectedlyDisconnected = 1 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason @@ -367,6 +369,7 @@ static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientPa static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamConsumer static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamProducer static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task +static RabbitMQ.Stream.Client.Reliable.ReliableBase.FromConnectionClosedReasonToStatusReason(string connectionClosedReason) -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason static RabbitMQ.Stream.Client.Reliable.ReliableBase.RandomWait() -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RoutingHelper.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RoutingHelper.LookupLeaderOrRandomReplicasConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index 8dc944bf..5b32043a 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -63,7 +63,7 @@ private async Task StandardConsumer(bool boot) return; await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, - ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false); + FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false); }, MetadataHandler = async _ => { @@ -131,8 +131,7 @@ private async Task SuperConsumer(bool boot) var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r, - ChangeStatusReason.UnexpectedlyDisconnected) - .ConfigureAwait(false); + FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false); }, MetadataHandler = async update => { diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index df0c0114..dd3fd136 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -51,7 +51,7 @@ private async Task SuperStreamProducer(bool boot) return; var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r, - ChangeStatusReason.UnexpectedlyDisconnected) + ReliableBase.FromConnectionClosedReasonToStatusReason(closeReason)) .ConfigureAwait(false); }, MetadataHandler = async update => @@ -116,7 +116,7 @@ await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream, return; await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream, - ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false); + ReliableBase.FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false); }, ConfirmHandler = confirmation => { diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs index 6fad5d6e..04110c23 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs @@ -34,7 +34,8 @@ public enum ChangeStatusReason MetaDataUpdate, ClosedByUser, ClosedByStrategyPolicy, - BoolFailure + BoolFailure, + DisconnectedByTooManyHeartbeatMissing, } public record ReliableConfig @@ -109,6 +110,23 @@ public abstract class ReliableBase protected abstract ILogger BaseLogger { get; } private ReliableConfig _reliableConfig; + /// + /// The function to convert the string ConnectionClosedReason to the ChangeStatusReason enum + /// + /// + /// + /// + /// + protected static ChangeStatusReason FromConnectionClosedReasonToStatusReason(string connectionClosedReason) + { + // Can be removed on the version 2.0 when the ConnectionClosedReason will be an enum as well + return connectionClosedReason switch + { + ConnectionClosedReason.MissingHeartbeat => ChangeStatusReason.DisconnectedByTooManyHeartbeatMissing, + ConnectionClosedReason.Unexpected => ChangeStatusReason.UnexpectedlyDisconnected, + _ => throw new ArgumentOutOfRangeException(nameof(connectionClosedReason), connectionClosedReason, null) + }; + } protected static async Task RandomWait() { await Task.Delay(Consts.RandomMid()).ConfigureAwait(false); diff --git a/Tests/UnitTests.cs b/Tests/UnitTests.cs index 1da68b26..9d2e4e8c 100644 --- a/Tests/UnitTests.cs +++ b/Tests/UnitTests.cs @@ -393,7 +393,7 @@ public void HeartBeatRaiseClose() var testPassed = new TaskCompletionSource(); var hBeatHandler = new HeartBeatHandler( () => default, - s => + (s, r) => { testPassed.SetResult(true); return null; @@ -412,7 +412,7 @@ public void HeartBeatZeroNotRaisesClose() // the HeartBeatHandler is disabled by default var hBeatHandler = new HeartBeatHandler( () => default, - s => null, + (s, r) => null, 0); Assert.False(hBeatHandler.IsActive()); } diff --git a/docs/ReliableClient/BestPracticesClient.cs b/docs/ReliableClient/BestPracticesClient.cs index af91fd51..683bcda7 100644 --- a/docs/ReliableClient/BestPracticesClient.cs +++ b/docs/ReliableClient/BestPracticesClient.cs @@ -22,7 +22,7 @@ public record Config public int Port { get; set; } = 5552; public string? Username { get; set; } = "guest"; public string? Password { get; set; } = "guest"; - + public string? StreamName { get; set; } = "DotNetClientTest"; public bool LoadBalancer { get; set; } = false; public bool SuperStream { get; set; } = false; @@ -32,10 +32,8 @@ public record Config public int MessagesPerProducer { get; set; } = 5_000_000; public int Consumers { get; set; } = 9; public byte ConsumersPerConnection { get; set; } = 8; - + public int DelayDuringSendMs { get; set; } = 0; - - } public static async Task Start(Config config) @@ -49,7 +47,7 @@ public static async Task Start(Config config) options.TimestampFormat = "[HH:mm:ss] "; options.ColorBehavior = LoggerColorBehavior.Default; }) - .AddFilter(level => level >= LogLevel.Debug) + .AddFilter(level => level >= LogLevel.Information) ); var loggerFactory = serviceCollection.BuildServiceProvider() .GetService(); @@ -59,6 +57,7 @@ public static async Task Start(Config config) { var lp = loggerFactory.CreateLogger(); var lc = loggerFactory.CreateLogger(); + var ls = loggerFactory.CreateLogger(); var ep = new IPEndPoint(IPAddress.Loopback, config.Port); @@ -86,7 +85,7 @@ public static async Task Start(Config config) { UserName = config.Username, Password = config.Password, - Endpoints = new List() {ep}, + Endpoints = new List() { ep }, ConnectionPoolConfig = new ConnectionPoolConfig() { ProducersPerConnection = config.ProducersPerConnection, @@ -108,12 +107,12 @@ public static async Task Start(Config config) ProducersPerConnection = config.ProducersPerConnection, ConsumersPerConnection = config.ConsumersPerConnection, }, - Endpoints = new List() {resolver.EndPoint} + Endpoints = new List() { resolver.EndPoint } }; } - var system = await StreamSystem.Create(streamConf).ConfigureAwait(false); + var system = await StreamSystem.Create(streamConf, ls).ConfigureAwait(false); var streamsList = new List(); if (config.SuperStream) { @@ -159,12 +158,13 @@ public static async Task Start(Config config) { await system.DeleteSuperStream(streamsList[0]).ConfigureAwait(false); } - - - await system.CreateSuperStream(new PartitionsSuperStreamSpec(streamsList[0], config.Streams)).ConfigureAwait(false); + + + await system.CreateSuperStream(new PartitionsSuperStreamSpec(streamsList[0], config.Streams)) + .ConfigureAwait(false); } - - + + foreach (var stream in streamsList) { if (!config.SuperStream) @@ -174,10 +174,10 @@ public static async Task Start(Config config) await system.DeleteStream(stream).ConfigureAwait(false); } - await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_000,}) + await system.CreateStream(new StreamSpec(stream) { MaxLengthBytes = 30_000_000_000, }) .ConfigureAwait(false); await Task.Delay(TimeSpan.FromSeconds(3)).ConfigureAwait(false); - } + } for (var z = 0; z < config.Consumers; z++) { @@ -186,7 +186,7 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00 OffsetSpec = new OffsetTypeLast(), IsSuperStream = config.SuperStream, IsSingleActiveConsumer = config.SuperStream, - Reference = "myApp",// needed for the Single Active Consumer or fot the store offset + Reference = "myApp", // needed for the Single Active Consumer or fot the store offset // can help to identify the consumer on the logs and RabbitMQ Management Identifier = $"my_consumer_{z}", InitialCredits = 10, @@ -198,10 +198,11 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00 // store the offset every 1_000/5_000/10_000 messages await consumer.StoreOffset(ctx.Offset).ConfigureAwait(false); } + Interlocked.Increment(ref totalConsumed); }, }; - + // This is the callback that will be called when the consumer status changes // DON'T PUT ANY BLOCKING CODE HERE conf.StatusChanged += (status) => @@ -210,8 +211,9 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00 ? $" Partition {status.Partition} of super stream: {status.Stream}" : $"Stream: {status.Stream}"; - lc.LogInformation("Consumer: {Id} - status changed from: {From} to: {To} reason: {Reason} {Info}", - status.Identifier, status.From, status.To,status.Reason, streamInfo); + lc.LogInformation( + "Consumer: {Id} - status changed from: {From} to: {To} reason: {Reason} {Info}", + status.Identifier, status.From, status.To, status.Reason, streamInfo); }; consumersList.Add( await Consumer.Create(conf, lc).ConfigureAwait(false)); @@ -258,7 +260,7 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis return Task.CompletedTask; }, }; - + // Like the consumer don't put any blocking code here producerConfig.StatusChanged += (status) => { @@ -267,8 +269,9 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis : $"Stream: {status.Stream}"; // just log the status change - lp.LogInformation("Consumer: {Id} - status changed from: {From} to: {To} reason: {Reason} {Info}", - status.Identifier, status.From, status.To,status.Reason, streamInfo); + lp.LogInformation( + "Consumer: {Id} - status changed from: {From} to: {To} reason: {Reason} {Info}", + status.Identifier, status.From, status.To, status.Reason, streamInfo); // in case of disconnection the event will be reset // in case of reconnection the event will be set so the producer can send messages @@ -304,7 +307,7 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis var message = new Message(Encoding.Default.GetBytes("hello")) { - Properties = new Properties() {MessageId = $"hello{i}"} + Properties = new Properties() { MessageId = $"hello{i}" } }; await MaybeSend(producer, message, publishEvent).ConfigureAwait(false); // You don't need this it is only for the example