From 728de8905942a5f49fee882a2ea5d1a39e8aeb17 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 22 May 2024 19:49:01 -0700 Subject: [PATCH 1/8] Truncate long client provided names Fixes #980 --- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 4 +-- .../client/api/ConnectionFactory.cs | 24 ++++++++++++-- .../client/api/IConnectionExtensions.cs | 13 ++++---- .../client/api/InternalConstants.cs | 7 +++++ .../Test/Integration/TestConnectionFactory.cs | 31 ++++++++++++++++++- 5 files changed, 68 insertions(+), 11 deletions(-) diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index fc9c98161b..ea016d86c8 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -952,8 +952,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task ~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 1ca5b9c408..1bf4db3f28 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -197,6 +197,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor // just here to hold the value that was set through the setter private Uri _uri; + private string _clientProvidedName; /// /// Amount of time protocol handshake operations are allowed to take before @@ -367,7 +368,14 @@ public Uri Uri /// /// Default client provided name to be used for connections. /// - public string ClientProvidedName { get; set; } + public string ClientProvidedName + { + get => _clientProvidedName; + set + { + _clientProvidedName = EnsureClientProvidedNameLength(value); + } + } /// /// Given a list of mechanism names supported by the server, select a preferred mechanism, @@ -593,7 +601,7 @@ private ConnectionConfig CreateConfig(string clientProvidedName) CredentialsRefresher, AuthMechanisms, ClientProperties, - clientProvidedName, + EnsureClientProvidedNameLength(clientProvidedName), RequestedChannelMax, RequestedFrameMax, MaxInboundMessageBodySize, @@ -712,5 +720,17 @@ private List LocalEndpoints() { return new List { Endpoint }; } + + private static string EnsureClientProvidedNameLength(string clientProvidedName) + { + if (clientProvidedName.Length > InternalConstants.DefaultRabbitMqMaxClientProvideNameLength) + { + return clientProvidedName.Substring(0, InternalConstants.DefaultRabbitMqMaxClientProvideNameLength); + } + else + { + return clientProvidedName; + } + } } } diff --git a/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs index 58c97c17ea..33b9bc64c4 100644 --- a/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs @@ -18,17 +18,17 @@ public static class IConnectionExtensions /// (or closing), then this method will do nothing. /// It can also throw when socket was closed unexpectedly. /// - public static Task CloseAsync(this IConnection connection) + public static Task CloseAsync(this IConnection connection, CancellationToken cancellationToken = default) { return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", InternalConstants.DefaultConnectionCloseTimeout, false, - CancellationToken.None); + cancellationToken); } /// /// Asynchronously close this connection and all its channels. /// /// - /// The method behaves in the same way as , with the only + /// The method behaves in the same way as , with the only /// difference that the connection is closed with the given connection close code and message. /// /// The close code (See under "Reply Codes" in the AMQP specification). @@ -37,10 +37,11 @@ public static Task CloseAsync(this IConnection connection) /// A message indicating the reason for closing the connection. /// /// - public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText) + public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText, + CancellationToken cancellationToken = default) { return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionCloseTimeout, false, - CancellationToken.None); + cancellationToken); } /// @@ -92,7 +93,7 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st /// /// /// Note that all active channels and sessions will be closed if this method is called. - /// In comparison to normal method, will not throw + /// In comparison to normal method, will not throw /// during closing connection. ///This method waits infinitely for the in-progress close operation to complete. /// diff --git a/projects/RabbitMQ.Client/client/api/InternalConstants.cs b/projects/RabbitMQ.Client/client/api/InternalConstants.cs index 058d686389..e1c037a727 100644 --- a/projects/RabbitMQ.Client/client/api/InternalConstants.cs +++ b/projects/RabbitMQ.Client/client/api/InternalConstants.cs @@ -44,5 +44,12 @@ internal static class InternalConstants /// configures the largest message size which should be lower than this maximum of 128MiB. /// internal const uint DefaultRabbitMqMaxInboundMessageBodySize = 1_048_576 * 128; + + /// + /// Largest client provide name, in characters, allowed in RabbitMQ. + /// This is not configurable, but was discovered while working on this issue: + /// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/980 + /// + internal const int DefaultRabbitMqMaxClientProvideNameLength = 3652; } } diff --git a/projects/Test/Integration/TestConnectionFactory.cs b/projects/Test/Integration/TestConnectionFactory.cs index 8dfed9650c..ef692f99e5 100644 --- a/projects/Test/Integration/TestConnectionFactory.cs +++ b/projects/Test/Integration/TestConnectionFactory.cs @@ -436,7 +436,36 @@ public async Task TestCreateConnectionAsync_UsesValidEndpointWhenMultipleSupplie var ep = new AmqpTcpEndpoint("localhost"); using (IConnection conn = await cf.CreateConnectionAsync(new List { invalidEp, ep }, cts.Token)) { - await conn.CloseAsync(); + await conn.CloseAsync(cts.Token); + } + } + } + + [Theory] + [InlineData(3650)] + [InlineData(3651)] + [InlineData(3652)] + [InlineData(3653)] + [InlineData(3654)] + public async Task TestCreateConnectionAsync_TruncatesWhenClientNameIsLong_GH980(ushort count) + { + string cpn = GetUniqueString(count); + using (var cts = new CancellationTokenSource(WaitSpan)) + { + ConnectionFactory cf0 = new ConnectionFactory { ClientProvidedName = cpn }; + using (IConnection conn = await cf0.CreateConnectionAsync(cts.Token)) + { + await conn.CloseAsync(cts.Token); + Assert.True(cf0.ClientProvidedName.Length <= InternalConstants.DefaultRabbitMqMaxClientProvideNameLength); + Assert.Contains(cf0.ClientProvidedName, cpn); + } + + ConnectionFactory cf1 = new ConnectionFactory(); + using (IConnection conn = await cf1.CreateConnectionAsync(cpn, cts.Token)) + { + await conn.CloseAsync(cts.Token); + Assert.True(conn.ClientProvidedName.Length <= InternalConstants.DefaultRabbitMqMaxClientProvideNameLength); + Assert.Contains(conn.ClientProvidedName, cpn); } } } From 7445af6e82923b3b8a62067ffa73bcf7dd5d31da Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 23 May 2024 12:27:14 -0700 Subject: [PATCH 2/8] Add test that creates `IChannel` within async consumer callback Fixes #650 --- .../Test/Integration/TestAsyncConsumer.cs | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index ce6fd70caa..7a74712cad 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -564,6 +564,73 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); } + [Fact] + public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() + { + string exchangeName = GenerateExchangeName(); + string queue1Name = GenerateQueueName(); + string queue2Name = GenerateQueueName(); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var cts = new CancellationTokenSource(WaitSpan); + using CancellationTokenRegistration ctr = cts.Token.Register(() => + { + tcs.SetCanceled(); + }); + + _conn.ConnectionShutdown += (o, ea) => + { + HandleConnectionShutdown(_conn, ea, (args) => + { + MaybeSetException(ea, tcs); + }); + }; + + _channel.ChannelShutdown += (o, ea) => + { + HandleChannelShutdown(_channel, ea, (args) => + { + MaybeSetException(ea, tcs); + }); + }; + + // queue1 -> produce click to queue2 + // click -> exchange + // queue2 -> consume click from queue1 + await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true); + await _channel.QueueDeclareAsync(queue1Name); + await _channel.QueueBindAsync(queue1Name, exchangeName, queue1Name); + await _channel.QueueDeclareAsync(queue2Name); + await _channel.QueueBindAsync(queue2Name, exchangeName, queue2Name); + + var consumer1 = new AsyncEventingBasicConsumer(_channel); + consumer1.Received += async (sender, args) => + { + using (IChannel innerChannel = await _conn.CreateChannelAsync()) + { + await innerChannel.ConfirmSelectAsync(); + await innerChannel.BasicPublishAsync(exchangeName, queue2Name, mandatory: true); + await innerChannel.WaitForConfirmsOrDieAsync(); + await innerChannel.CloseAsync(); + } + }; + await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1); + + var consumer2 = new AsyncEventingBasicConsumer(_channel); + consumer2.Received += async (sender, args) => + { + tcs.TrySetResult(true); + await Task.Yield(); + }; + await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2); + + await _channel.ConfirmSelectAsync(); + await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024)); + await _channel.WaitForConfirmsOrDieAsync(); + + Assert.True(await tcs.Task); + } + private static void SetException(Exception ex, params TaskCompletionSource[] tcsAry) { foreach (TaskCompletionSource tcs in tcsAry) From 393032b048e07c0816ce87bdbb2f162ab64a5c00 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 23 May 2024 11:55:06 -0700 Subject: [PATCH 3/8] Remove two unnecessary `.Cast<>()` usages Noticed by @paulomorgado in #1233 --- projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs | 3 +-- projects/RabbitMQ.Client/client/impl/EventingWrapper.cs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs index 29f007b7c6..49e6c18aad 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs @@ -1,5 +1,4 @@ using System; -using System.Linq; using System.Threading.Tasks; using RabbitMQ.Client.Events; @@ -55,7 +54,7 @@ public Task InvokeAsync(object sender, T parameter) private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter) { - foreach (AsyncEventHandler action in handlers.Cast>()) + foreach (AsyncEventHandler action in handlers) { try { diff --git a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs index 4017fb4052..f9a1a334cb 100644 --- a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs @@ -1,5 +1,4 @@ using System; -using System.Linq; namespace RabbitMQ.Client.Impl { @@ -53,7 +52,7 @@ public void Invoke(object sender, T parameter) _handlers = handlers; } - foreach (EventHandler action in handlers.Cast>()) + foreach (EventHandler action in handlers) { try { From e132c9b63f609c6c3fe7f675e564e0fb162b947a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 24 May 2024 14:31:17 -0700 Subject: [PATCH 4/8] Use ConcurrentDictionary --- .../FrameworkExtension/DictionaryExtension.cs | 2 +- .../ConsumerDispatcherBase.cs | 34 +++++++------------ 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/projects/RabbitMQ.Client/FrameworkExtension/DictionaryExtension.cs b/projects/RabbitMQ.Client/FrameworkExtension/DictionaryExtension.cs index 3619dfe724..6b64c6242a 100644 --- a/projects/RabbitMQ.Client/FrameworkExtension/DictionaryExtension.cs +++ b/projects/RabbitMQ.Client/FrameworkExtension/DictionaryExtension.cs @@ -37,7 +37,7 @@ namespace RabbitMQ #if NETSTANDARD internal static class DictionaryExtension { - public static bool Remove(this Dictionary dictionary, TKey key, out TValue value) + public static bool Remove(this IDictionary dictionary, TKey key, out TValue value) { return dictionary.TryGetValue(key, out value) && dictionary.Remove(key); } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs index 88072f4f2f..1ee8f96a1e 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs @@ -1,4 +1,6 @@ -using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; @@ -7,8 +9,8 @@ namespace RabbitMQ.Client.ConsumerDispatching #nullable enable internal abstract class ConsumerDispatcherBase { - private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer(); - private readonly Dictionary _consumers = new Dictionary(); + private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer(); + private readonly IDictionary _consumers = new ConcurrentDictionary(); public IBasicConsumer? DefaultConsumer { get; set; } @@ -18,26 +20,17 @@ protected ConsumerDispatcherBase() protected void AddConsumer(IBasicConsumer consumer, string tag) { - lock (_consumers) - { - _consumers[tag] = consumer; - } + _consumers[tag] = consumer; } protected IBasicConsumer GetConsumerOrDefault(string tag) { - lock (_consumers) - { - return _consumers.TryGetValue(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer(); - } + return _consumers.TryGetValue(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer(); } public IBasicConsumer GetAndRemoveConsumer(string tag) { - lock (_consumers) - { - return _consumers.Remove(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer(); - } + return _consumers.Remove(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer(); } public void Shutdown(ShutdownEventArgs reason) @@ -54,14 +47,11 @@ public Task ShutdownAsync(ShutdownEventArgs reason) private void DoShutdownConsumers(ShutdownEventArgs reason) { - lock (_consumers) + foreach (KeyValuePair pair in _consumers.ToArray()) { - foreach (KeyValuePair pair in _consumers) - { - ShutdownConsumer(pair.Value, reason); - } - _consumers.Clear(); + ShutdownConsumer(pair.Value, reason); } + _consumers.Clear(); } protected abstract void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason); @@ -74,7 +64,7 @@ private void DoShutdownConsumers(ShutdownEventArgs reason) [MethodImpl(MethodImplOptions.NoInlining)] private IBasicConsumer GetDefaultOrFallbackConsumer() { - return DefaultConsumer ?? fallbackConsumer; + return DefaultConsumer ?? s_fallbackConsumer; } } } From a8825c67f5d7eb4b7dd706a57899b40dc0413d97 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 23 May 2024 13:15:43 -0700 Subject: [PATCH 5/8] Fix two flaky tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These two tests always fail at the same time, too 🤔 * Re-try when using HTTP API to close a connection and an error happens * Introduce `ShortSpan` wait span. * Ensure that message is actually routed and confirmed before checking. * Limit client provided name to 3000 characters * Add callback exception handlers to flaky tests * Check for null consumer tag based on CI failures. * No need for random data, just different data. Update projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs Co-authored-by: Paulo Morgado <470455+paulomorgado@users.noreply.github.com> --- .../client/api/InternalConstants.cs | 2 +- .../client/impl/AutorecoveringChannel.cs | 2 +- .../AutorecoveringConnection.Recording.cs | 6 + .../ConsumerDispatcherBase.cs | 2 +- projects/Test/Common/IntegrationFixture.cs | 71 ++++---- projects/Test/Common/Util.cs | 27 +-- .../TestExchangeRecovery.cs | 5 +- .../Test/Integration/TestAsyncConsumer.cs | 166 ++++++++++++++---- .../TestAsyncConsumerExceptions.cs | 5 +- projects/Test/Integration/TestBasicPublish.cs | 32 +++- .../Test/Integration/TestConnectionFactory.cs | 11 +- .../TestConsumerOperationDispatch.cs | 4 +- .../Test/Integration/TestPublisherConfirms.cs | 15 +- 13 files changed, 234 insertions(+), 114 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/InternalConstants.cs b/projects/RabbitMQ.Client/client/api/InternalConstants.cs index e1c037a727..ac611e4090 100644 --- a/projects/RabbitMQ.Client/client/api/InternalConstants.cs +++ b/projects/RabbitMQ.Client/client/api/InternalConstants.cs @@ -50,6 +50,6 @@ internal static class InternalConstants /// This is not configurable, but was discovered while working on this issue: /// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/980 /// - internal const int DefaultRabbitMqMaxClientProvideNameLength = 3652; + internal const int DefaultRabbitMqMaxClientProvideNameLength = 3000; } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 317099a078..879ac51b38 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -279,7 +279,7 @@ public async Task BasicConsumeAsync(string queue, bool autoAck, string c { string resultConsumerTag = await InnerChannel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer, cancellationToken) - .ConfigureAwait(false); + .ConfigureAwait(false) ?? throw new InvalidOperationException("basic.consume returned null consumer tag"); var rc = new RecordedConsumer(channel: this, consumer: consumer, consumerTag: resultConsumerTag, queue: queue, autoAck: autoAck, exclusive: exclusive, arguments: arguments); await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs index ca08d4c027..8a873749c9 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs @@ -29,6 +29,7 @@ // Copyright (c) 2007-2020 VMware, Inc. All rights reserved. //--------------------------------------------------------------------------- +using System; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -400,6 +401,11 @@ await _recordedEntitiesSemaphore.WaitAsync() private void DoDeleteRecordedConsumer(string consumerTag) { + if (consumerTag is null) + { + throw new ArgumentNullException(nameof(consumerTag)); + } + if (_recordedConsumers.Remove(consumerTag, out RecordedConsumer recordedConsumer)) { DeleteAutoDeleteQueue(recordedConsumer.Queue); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs index 1ee8f96a1e..41960fb5e1 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs @@ -10,7 +10,7 @@ namespace RabbitMQ.Client.ConsumerDispatching internal abstract class ConsumerDispatcherBase { private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer(); - private readonly IDictionary _consumers = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _consumers = new ConcurrentDictionary(); public IBasicConsumer? DefaultConsumer { get; set; } diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 57a17fde6f..b27de42eb8 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -75,6 +75,7 @@ public abstract class IntegrationFixture : IAsyncLifetime protected readonly ushort _consumerDispatchConcurrency = 1; protected readonly bool _openChannel = true; + public static readonly TimeSpan ShortSpan; public static readonly TimeSpan WaitSpan; public static readonly TimeSpan LongWaitSpan; public static readonly TimeSpan RecoveryInterval = TimeSpan.FromSeconds(2); @@ -95,12 +96,14 @@ static IntegrationFixture() if (s_isRunningInCI) { + ShortSpan = TimeSpan.FromSeconds(20); WaitSpan = TimeSpan.FromSeconds(60); LongWaitSpan = TimeSpan.FromSeconds(120); RequestedConnectionTimeout = TimeSpan.FromSeconds(4); } else { + ShortSpan = TimeSpan.FromSeconds(10); WaitSpan = TimeSpan.FromSeconds(30); LongWaitSpan = TimeSpan.FromSeconds(60); } @@ -160,9 +163,8 @@ public virtual async Task InitializeAsync() if (IsVerbose) { AddCallbackShutdownHandlers(); + AddCallbackExceptionHandlers(); } - - AddCallbackExceptionHandlers(); } if (_connFactory.AutomaticRecoveryEnabled) @@ -221,59 +223,55 @@ protected virtual void DisposeAssertions() protected void AddCallbackExceptionHandlers() { - if (_conn != null) + AddCallbackExceptionHandlers(_conn, _channel); + } + + protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel) + { + if (conn != null) { - _conn.ConnectionRecoveryError += (s, ea) => + conn.ConnectionRecoveryError += (s, ea) => { _connectionRecoveryException = ea.Exception; - if (IsVerbose) + try + { + _output.WriteLine($"{0} connection recovery exception: {1}", + _testDisplayName, _connectionRecoveryException); + } + catch (InvalidOperationException) { - try - { - _output.WriteLine($"{0} connection recovery exception: {1}", - _testDisplayName, _connectionRecoveryException); - } - catch (InvalidOperationException) - { - } } }; - _conn.CallbackException += (o, ea) => + conn.CallbackException += (o, ea) => { _connectionCallbackException = ea.Exception; - if (IsVerbose) + try + { + _output.WriteLine("{0} connection callback exception: {1}", + _testDisplayName, _connectionCallbackException); + } + catch (InvalidOperationException) { - try - { - _output.WriteLine("{0} connection callback exception: {1}", - _testDisplayName, _connectionCallbackException); - } - catch (InvalidOperationException) - { - } } }; } - if (_channel != null) + if (channel != null) { - _channel.CallbackException += (o, ea) => + channel.CallbackException += (o, ea) => { _channelCallbackException = ea.Exception; - if (IsVerbose) + try + { + _output.WriteLine("{0} channel callback exception: {1}", + _testDisplayName, _channelCallbackException); + } + catch (InvalidOperationException) { - try - { - _output.WriteLine("{0} channel callback exception: {1}", - _testDisplayName, _channelCallbackException); - } - catch (InvalidOperationException) - { - } } }; } @@ -491,11 +489,6 @@ protected static void AssertPreconditionFailed(ShutdownEventArgs args) AssertShutdownError(args, Constants.PreconditionFailed); } - protected static Task AssertRanToCompletion(params Task[] tasks) - { - return DoAssertRanToCompletion(tasks); - } - protected static Task AssertRanToCompletion(IEnumerable tasks) { return DoAssertRanToCompletion(tasks); diff --git a/projects/Test/Common/Util.cs b/projects/Test/Common/Util.cs index a206744ca9..e9afa4edfd 100644 --- a/projects/Test/Common/Util.cs +++ b/projects/Test/Common/Util.cs @@ -67,29 +67,32 @@ public static async Task CloseConnectionAsync(IConnection conn) connectionToClose = connections.Where(c0 => string.Equals((string)c0.ClientProperties["connection_name"], conn.ClientProvidedName, StringComparison.InvariantCultureIgnoreCase)).FirstOrDefault(); - - if (connectionToClose == null) - { - tries++; - } - else - { - break; - } } catch (ArgumentNullException) { // Sometimes we see this in GitHub CI tries++; + continue; } - } while (tries <= 30); + + if (connectionToClose != null) + { + try + { + await s_managementClient.CloseConnectionAsync(connectionToClose); + return; + } + catch (UnexpectedHttpStatusCodeException) + { + tries++; + } + } + } while (tries <= 10); if (connectionToClose == null) { throw new InvalidOperationException($"Could not delete connection: '{conn.ClientProvidedName}'"); } - - await s_managementClient.CloseConnectionAsync(connectionToClose); } } } diff --git a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs index 21dab6f342..d989fca6f1 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs @@ -55,6 +55,8 @@ public async Task TestExchangeRecoveryTest() [Fact] public async Task TestExchangeToExchangeBindingRecovery() { + await _channel.ConfirmSelectAsync(); + string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; string ex_source = GenerateExchangeName(); @@ -70,7 +72,8 @@ public async Task TestExchangeToExchangeBindingRecovery() { await CloseAndWaitForRecoveryAsync(); Assert.True(_channel.IsOpen); - await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg")); + await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"), mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); await AssertMessageCountAsync(q, 1); } finally diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 7a74712cad..f41a9acf3f 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -52,15 +52,16 @@ public TestAsyncConsumer(ITestOutputHelper output) [Fact] public async Task TestBasicRoundtripConcurrent() { + AddCallbackExceptionHandlers(); + _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); + QueueDeclareOk q = await _channel.QueueDeclareAsync(); - string publish1 = GetUniqueString(512); - byte[] body = _encoding.GetBytes(publish1); - await _channel.BasicPublishAsync("", q.QueueName, body); + const int length = 4096; + (byte[] body1, byte[] body2) = GenerateTwoBodies(length); - string publish2 = GetUniqueString(512); - body = _encoding.GetBytes(publish2); - await _channel.BasicPublishAsync("", q.QueueName, body); + await _channel.BasicPublishAsync("", q.QueueName, body1); + await _channel.BasicPublishAsync("", q.QueueName, body2); var consumer = new AsyncEventingBasicConsumer(_channel); @@ -70,10 +71,14 @@ public async Task TestBasicRoundtripConcurrent() var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { + _output.WriteLine("publish1SyncSource.Task Status: {0}", publish1SyncSource.Task.Status); + _output.WriteLine("publish2SyncSource.Task Status: {0}", publish2SyncSource.Task.Status); publish1SyncSource.TrySetCanceled(); publish2SyncSource.TrySetCanceled(); }); + bool body1Received = false; + bool body2Received = false; try { _conn.ConnectionShutdown += (o, ea) => @@ -94,13 +99,14 @@ public async Task TestBasicRoundtripConcurrent() consumer.Received += (o, a) => { - string decoded = _encoding.GetString(a.Body.ToArray()); - if (decoded == publish1) + if (ByteArraysEqual(a.Body.ToArray(), body1)) { + body1Received = true; publish1SyncSource.TrySetResult(true); } - else if (decoded == publish2) + else if (ByteArraysEqual(a.Body.ToArray(), body2)) { + body2Received = true; publish2SyncSource.TrySetResult(true); } else @@ -113,14 +119,21 @@ public async Task TestBasicRoundtripConcurrent() await _channel.BasicConsumeAsync(q.QueueName, true, string.Empty, false, false, null, consumer); - // ensure we get a delivery - await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); - - bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); - - bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); + try + { + bool result1 = await publish1SyncSource.Task; + Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); + bool result2 = await publish2SyncSource.Task; + Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); + } + catch (Exception ex) + { + _output.WriteLine("EXCEPTION: {0}", ex); + _output.WriteLine("body1Received: {0}, body2Received: {1}", body1Received, body2Received); + _output.WriteLine("publish1SyncSource.Task Status: {0}", publish1SyncSource.Task.Status); + _output.WriteLine("publish2SyncSource.Task Status: {0}", publish2SyncSource.Task.Status); + throw; + } } finally { @@ -132,13 +145,14 @@ public async Task TestBasicRoundtripConcurrent() [Fact] public async Task TestBasicRoundtripConcurrentManyMessages() { + AddCallbackExceptionHandlers(); + _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); + const int publish_total = 4096; + const int length = 512; string queueName = GenerateQueueName(); - string publish1 = GetUniqueString(512); - byte[] body1 = _encoding.GetBytes(publish1); - string publish2 = GetUniqueString(512); - byte[] body2 = _encoding.GetBytes(publish2); + (byte[] body1, byte[] body2) = GenerateTwoBodies(length); var publish1SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var publish2SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -147,6 +161,9 @@ public async Task TestBasicRoundtripConcurrentManyMessages() var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { + _output.WriteLine("publish1SyncSource.Task Status: {0}", publish1SyncSource.Task.Status); + _output.WriteLine("publish2SyncSource.Task Status: {0}", publish2SyncSource.Task.Status); + _output.WriteLine("consumerSyncSource.Task Status: {0}", consumerSyncSource.Task.Status); publish1SyncSource.TrySetCanceled(); publish2SyncSource.TrySetCanceled(); consumerSyncSource.TrySetCanceled(); @@ -186,6 +203,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }; using (IChannel publishChannel = await publishConn.CreateChannelAsync()) { + AddCallbackExceptionHandlers(publishConn, publishChannel); + publishChannel.DefaultConsumer = new DefaultAsyncConsumer("publishChannel,", _output); publishChannel.ChannelShutdown += (o, ea) => { HandleChannelShutdown(publishChannel, ea, (args) => @@ -209,6 +228,10 @@ public async Task TestBasicRoundtripConcurrentManyMessages() } }); + + int publish1_count = 0; + int publish2_count = 0; + Task consumeTask = Task.Run(async () => { using (IConnection consumeConn = await _connFactory.CreateConnectionAsync()) @@ -222,6 +245,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }; using (IChannel consumeChannel = await consumeConn.CreateChannelAsync()) { + AddCallbackExceptionHandlers(consumeConn, consumeChannel); + consumeChannel.DefaultConsumer = new DefaultAsyncConsumer("consumeChannel,", _output); consumeChannel.ChannelShutdown += (o, ea) => { HandleChannelShutdown(consumeChannel, ea, (args) => @@ -231,21 +256,16 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }; var consumer = new AsyncEventingBasicConsumer(consumeChannel); - - int publish1_count = 0; - int publish2_count = 0; - consumer.Received += (o, a) => { - string decoded = _encoding.GetString(a.Body.ToArray()); - if (decoded == publish1) + if (ByteArraysEqual(a.Body.ToArray(), body1)) { if (Interlocked.Increment(ref publish1_count) >= publish_total) { publish1SyncSource.TrySetResult(true); } } - else if (decoded == publish2) + else if (ByteArraysEqual(a.Body.ToArray(), body2)) { if (Interlocked.Increment(ref publish2_count) >= publish_total) { @@ -270,19 +290,28 @@ public async Task TestBasicRoundtripConcurrentManyMessages() } }); - await AssertRanToCompletion(publishTask); - - await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); - consumerSyncSource.TrySetResult(true); - - bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); - - bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + try + { + await publishTask; + bool result1 = await publish1SyncSource.Task; + Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + bool result2 = await publish2SyncSource.Task; + Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + } + catch (Exception ex) + { + _output.WriteLine("EXCEPTION: {0}", ex); + _output.WriteLine("publish1_count: {0}, publish2_count: {1}", publish1_count, publish2_count); + _output.WriteLine("publishTask Status: {0}", publishTask.Status); + _output.WriteLine("publish1SyncSource.Task Status: {0}", publish1SyncSource.Task.Status); + _output.WriteLine("publish2SyncSource.Task Status: {0}", publish2SyncSource.Task.Status); + _output.WriteLine("consumerSyncSource.Task Status: {0}", consumerSyncSource.Task.Status); + throw; + } } finally { + consumerSyncSource.TrySetResult(true); ctsr.Dispose(); tokenSource.Dispose(); } @@ -658,5 +687,66 @@ private static void MaybeSetException(ShutdownEventArgs args, TaskCompletionSour tcs.TrySetException(ex); } } + + private static bool ByteArraysEqual(ReadOnlySpan a1, ReadOnlySpan a2) + { + return a1.SequenceEqual(a2); + } + + private static (byte[] body1, byte[] body2) GenerateTwoBodies(ushort length) + { + byte[] body1 = _encoding.GetBytes(new string('x', length)); + byte[] body2 = _encoding.GetBytes(new string('y', length)); + return (body1, body2); + } + + private class DefaultAsyncConsumer : AsyncDefaultBasicConsumer + { + private readonly string _logPrefix; + private readonly ITestOutputHelper _output; + + public DefaultAsyncConsumer(string logPrefix, ITestOutputHelper output) + { + _logPrefix = logPrefix; + _output = output; + } + + public override Task HandleBasicCancel(string consumerTag) + { + _output.WriteLine("[ERROR] {0} HandleBasicCancel {1}", _logPrefix, consumerTag); + return base.HandleBasicCancel(consumerTag); + } + + public override Task HandleBasicCancelOk(string consumerTag) + { + _output.WriteLine("[ERROR] {0} HandleBasicCancelOk {1}", _logPrefix, consumerTag); + return base.HandleBasicCancelOk(consumerTag); + } + + public override Task HandleBasicConsumeOk(string consumerTag) + { + _output.WriteLine("[ERROR] {0} HandleBasicConsumeOk {1}", _logPrefix, consumerTag); + return base.HandleBasicConsumeOk(consumerTag); + } + + public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, + string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + { + _output.WriteLine("[ERROR] {0} HandleBasicDeliver {1}", _logPrefix, consumerTag); + return base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + } + + public override Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) + { + _output.WriteLine("[ERROR] {0} HandleChannelShutdown", _logPrefix); + return base.HandleChannelShutdown(channel, reason); + } + + public override Task OnCancel(params string[] consumerTags) + { + _output.WriteLine("[ERROR] {0} OnCancel {1}", _logPrefix, consumerTags[0]); + return base.OnCancel(consumerTags); + } + } } } diff --git a/projects/Test/Integration/TestAsyncConsumerExceptions.cs b/projects/Test/Integration/TestAsyncConsumerExceptions.cs index ee1c8e7579..9262474d09 100644 --- a/projects/Test/Integration/TestAsyncConsumerExceptions.cs +++ b/projects/Test/Integration/TestAsyncConsumerExceptions.cs @@ -97,10 +97,9 @@ public Task TestDeliveryExceptionHandling() protected async Task TestExceptionHandlingWith(IBasicConsumer consumer, Func action) { - var waitSpan = TimeSpan.FromSeconds(5); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var cts = new CancellationTokenSource(waitSpan); - CancellationTokenRegistration ctsr = cts.Token.Register(() => tcs.TrySetResult(false)); + var cts = new CancellationTokenSource(ShortSpan); + CancellationTokenRegistration ctsr = cts.Token.Register(() => tcs.TrySetCanceled()); try { string q = await _channel.QueueDeclareAsync(string.Empty, false, true, false); diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 1578a9f277..0fa55617b9 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -196,18 +196,18 @@ public async Task TestMaxInboundMessageBodySize() bool sawConsumerRegistered = false; bool sawConsumerCancelled = false; - using (IConnection c = await cf.CreateConnectionAsync()) + using (IConnection conn = await cf.CreateConnectionAsync()) { - c.ConnectionShutdown += (o, a) => + conn.ConnectionShutdown += (o, a) => { sawConnectionShutdown = true; }; Assert.Equal(maxMsgSize, cf.MaxInboundMessageBodySize); Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize); - Assert.Equal(maxMsgSize, c.Endpoint.MaxInboundMessageBodySize); + Assert.Equal(maxMsgSize, conn.Endpoint.MaxInboundMessageBodySize); - using (IChannel channel = await c.CreateChannelAsync()) + using (IChannel channel = await conn.CreateChannelAsync()) { channel.ChannelShutdown += (o, a) => { @@ -260,7 +260,29 @@ public async Task TestMaxInboundMessageBodySize() Assert.True(sawConsumerRegistered); Assert.True(sawConsumerCancelled); - await channel.CloseAsync(); + try + { + await channel.CloseAsync(); + } + catch (Exception chex) + { + if (IsVerbose) + { + _output.WriteLine("[INFO] {0} channel exception: {1}", nameof(TestMaxInboundMessageBodySize), chex); + } + } + } + + try + { + await conn.CloseAsync(); + } + catch (Exception connex) + { + if (IsVerbose) + { + _output.WriteLine("[INFO] {0} conn exception: {1}", nameof(TestMaxInboundMessageBodySize), connex); + } } } } diff --git a/projects/Test/Integration/TestConnectionFactory.cs b/projects/Test/Integration/TestConnectionFactory.cs index ef692f99e5..6a8d383912 100644 --- a/projects/Test/Integration/TestConnectionFactory.cs +++ b/projects/Test/Integration/TestConnectionFactory.cs @@ -442,11 +442,12 @@ public async Task TestCreateConnectionAsync_UsesValidEndpointWhenMultipleSupplie } [Theory] - [InlineData(3650)] - [InlineData(3651)] - [InlineData(3652)] - [InlineData(3653)] - [InlineData(3654)] + [InlineData(2998)] + [InlineData(2999)] + [InlineData(3000)] + [InlineData(3001)] + [InlineData(3002)] + [InlineData(3003)] public async Task TestCreateConnectionAsync_TruncatesWhenClientNameIsLong_GH980(ushort count) { string cpn = GetUniqueString(count); diff --git a/projects/Test/Integration/TestConsumerOperationDispatch.cs b/projects/Test/Integration/TestConsumerOperationDispatch.cs index 1ac54195d5..264a300440 100644 --- a/projects/Test/Integration/TestConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestConsumerOperationDispatch.cs @@ -213,12 +213,12 @@ public async Task TestChannelShutdownHandler() await _channel.BasicConsumeAsync(queue: q, autoAck: true, consumer: c); await _channel.CloseAsync(); - await c.Latch.Task.WaitAsync(TimeSpan.FromSeconds(10)); + await c.Latch.Task.WaitAsync(ShortSpan); Assert.True(c.Latch.Task.IsCompletedSuccessfully()); await Assert.ThrowsAsync(() => { - return c.DuplicateLatch.Task.WaitAsync(TimeSpan.FromSeconds(5)); + return c.DuplicateLatch.Task.WaitAsync(ShortSpan); }); Assert.False(c.DuplicateLatch.Task.IsCompletedSuccessfully()); diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 4b24d7c997..5afd88f3fb 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -111,7 +111,7 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout .GetMethod("HandleAckNack", BindingFlags.Instance | BindingFlags.NonPublic) .Invoke(actualChannel, new object[] { 10UL, false, true }); - using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4))) + using (var cts = new CancellationTokenSource(ShortSpan)) { Assert.False(await ch.WaitForConfirmsAsync(cts.Token)); } @@ -121,11 +121,12 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout [Fact] public async Task TestWaitForConfirmsWithEventsAsync() { - string queueName = string.Format("{0}:{1}", _testDisplayName, Guid.NewGuid()); + string queueName = GenerateQueueName(); using (IChannel ch = await _conn.CreateChannelAsync()) { await ch.ConfirmSelectAsync(); - await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null); + await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, + exclusive: true, autoDelete: false, arguments: null); int n = 200; // number of event handler invocations @@ -161,17 +162,19 @@ public async Task TestWaitForConfirmsWithEventsAsync() private async Task TestWaitForConfirmsAsync(int numberOfMessagesToPublish, Func fn) { - string queueName = string.Format("{0}:{1}", _testDisplayName, Guid.NewGuid()); + string queueName = GenerateQueueName(); using (IChannel ch = await _conn.CreateChannelAsync()) { var props = new BasicProperties { Persistent = true }; await ch.ConfirmSelectAsync(); - await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null); + await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, + exclusive: true, autoDelete: false, arguments: null); for (int i = 0; i < numberOfMessagesToPublish; i++) { - await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: _messageBody, mandatory: true, basicProperties: props); + await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, + body: _messageBody, mandatory: true, basicProperties: props); } try From 6135e35477e9d5b0ae225a34cc49bbd7a41714da Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 29 May 2024 17:14:27 -0700 Subject: [PATCH 6/8] Check for null clientProvidedName --- .../RabbitMQ.Client/client/api/ConnectionFactory.cs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 1bf4db3f28..93c62e561e 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -723,14 +723,15 @@ private List LocalEndpoints() private static string EnsureClientProvidedNameLength(string clientProvidedName) { - if (clientProvidedName.Length > InternalConstants.DefaultRabbitMqMaxClientProvideNameLength) + if (clientProvidedName != null) { - return clientProvidedName.Substring(0, InternalConstants.DefaultRabbitMqMaxClientProvideNameLength); - } - else - { - return clientProvidedName; + if (clientProvidedName.Length > InternalConstants.DefaultRabbitMqMaxClientProvideNameLength) + { + return clientProvidedName.Substring(0, InternalConstants.DefaultRabbitMqMaxClientProvideNameLength); + } } + + return clientProvidedName; } } } From bfccb14091f2a4dcfa7503a46436d0b4f34ca16f Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 29 May 2024 17:01:23 -0700 Subject: [PATCH 7/8] Handle AppDomain unload Fixes #826 --- .../impl/AutorecoveringConnection.Recovery.cs | 18 +++++++++++++++--- .../client/impl/Connection.Receive.cs | 13 +++++++++++++ projects/Test/Integration/ToxiproxyManager.cs | 10 ++++++++-- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index 67310eb71d..4d95a395e5 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -71,11 +71,23 @@ static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) } } - // happens when EOF is reached, e.g. due to RabbitMQ node - // connectivity loss or abrupt shutdown if (args.Initiator == ShutdownInitiator.Library) { - return true; + /* + * https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/826 + * Happens when an AppDomain is unloaded + */ + if (args.Exception is ThreadAbortException && + args.ReplyCode == Constants.InternalError) + { + return false; + } + else + { + // happens when EOF is reached, e.g. due to RabbitMQ node + // connectivity loss or abrupt shutdown + return true; + } } return false; diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index 1f6a33ef6f..eaa31ec37c 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -53,6 +53,19 @@ private async Task MainLoop() await ReceiveLoopAsync(mainLoopToken) .ConfigureAwait(false); } +#if NETSTANDARD + catch (ThreadAbortException taex) + { + /* + * https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/826 + */ + var ea = new ShutdownEventArgs(ShutdownInitiator.Library, + Constants.InternalError, + "Thread aborted (AppDomain unloaded?)", + exception: taex); + HandleMainLoopException(ea); + } +#endif catch (EndOfStreamException eose) { // Possible heartbeat exception diff --git a/projects/Test/Integration/ToxiproxyManager.cs b/projects/Test/Integration/ToxiproxyManager.cs index b1d581623e..cb58ca1749 100644 --- a/projects/Test/Integration/ToxiproxyManager.cs +++ b/projects/Test/Integration/ToxiproxyManager.cs @@ -116,8 +116,14 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - _proxyClient.DeleteAsync(_proxy).GetAwaiter().GetResult(); - _proxyConnection.Dispose(); + try + { + _proxyClient.DeleteAsync(_proxy).GetAwaiter().GetResult(); + _proxyConnection.Dispose(); + } + catch + { + } } _disposedValue = true; From 700535325748ac547903d3c791032d4ce651a59a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 31 May 2024 14:38:20 -0700 Subject: [PATCH 8/8] Demonstrate that #1038 is fixed Fixes #1038 --- .../TestAsyncEventingBasicConsumer.cs | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 projects/Test/Integration/TestAsyncEventingBasicConsumer.cs diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs new file mode 100644 index 0000000000..4c07f9bd2d --- /dev/null +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -0,0 +1,110 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration +{ + public class TestAsyncEventingBasicConsumer : IntegrationFixture + { + private readonly CancellationTokenSource _cts = new CancellationTokenSource(ShortSpan); + private readonly CancellationTokenRegistration _ctr; + private readonly TaskCompletionSource _onCallbackExceptionTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _onReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + public TestAsyncEventingBasicConsumer(ITestOutputHelper output) + : base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 2) + { + _ctr = _cts.Token.Register(OnTokenCanceled); + } + + public override Task DisposeAsync() + { + _ctr.Dispose(); + _cts.Dispose(); + return base.DisposeAsync(); + } + + private void OnTokenCanceled() + { + _onCallbackExceptionTcs.TrySetCanceled(); + _onReceivedTcs.TrySetCanceled(); + } + + private void ConsumerChannelOnCallbackException(object sender, CallbackExceptionEventArgs e) + { + _onCallbackExceptionTcs.TrySetResult(true); + } + + private Task AsyncConsumerOnReceived(object sender, BasicDeliverEventArgs @event) + { + _onReceivedTcs.TrySetResult(true); + throw new Exception("from async subscriber"); + } + + [Fact] + public async Task TestAsyncEventingBasicConsumer_GH1038() + { + string exchangeName = GenerateExchangeName(); + string queueName = GenerateQueueName(); + string routingKey = string.Empty; + + await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct); + await _channel.QueueDeclareAsync(queueName, false, false, true, null); + await _channel.QueueBindAsync(queueName, exchangeName, routingKey, null); + + _channel.CallbackException += ConsumerChannelOnCallbackException; + + //async subscriber + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.Received += AsyncConsumerOnReceived; + await _channel.BasicConsumeAsync(queueName, false, consumer); + + //publisher + using IChannel publisherChannel = await _conn.CreateChannelAsync(); + byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); + var props = new BasicProperties(); + await publisherChannel.BasicPublishAsync(exchangeName, "", props, messageBodyBytes); + + await Task.WhenAll(_onReceivedTcs.Task, _onCallbackExceptionTcs.Task); + Assert.True(await _onReceivedTcs.Task); + Assert.True(await _onCallbackExceptionTcs.Task); + } + } +}