From c48822cfff3f59180e50a2fefd6fab4effa22be8 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 22 Jan 2025 13:05:41 -0800 Subject: [PATCH 1/2] Fix very rare deadlock Fixes #1751 Attempt to fix deadlock by waiting on channel dispatcher first, then channel reader. --- .../ConsumerDispatcherChannelBase.cs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index b2ff4f2bc..87b292c30 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -161,10 +161,17 @@ public async Task WaitForShutdownAsync() { try { - await _reader.Completion - .ConfigureAwait(false); await _worker .ConfigureAwait(false); + + /* + * rabbitmq/rabbitmq-dotnet-client#1751 + * + * Wait for the worker first to ensure all items have been read out of the channel, + * otherwise the following will never return (https://stackoverflow.com/a/66521303) + */ + await _reader.Completion + .ConfigureAwait(false); } catch (AggregateException aex) { From b2025e287d2a567a0be79ca5156812230b7b2bd0 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 24 Jan 2025 09:45:01 -0800 Subject: [PATCH 2/2] Drain and log pending work when AsyncConsumerDispatcher loop ends. --- .../ConsumerDispatching/AsyncConsumerDispatcher.cs | 11 +++++++++++ .../ConsumerDispatcherChannelBase.cs | 11 ++--------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs index 071cd537e..d9958fae0 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -3,6 +3,7 @@ using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Impl; +using RabbitMQ.Client.Logging; namespace RabbitMQ.Client.ConsumerDispatching { @@ -71,6 +72,16 @@ await _channel.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(e, work throw; } } + finally + { + while (_reader.TryRead(out WorkStruct work)) + { + using (work) + { + ESLog.Warn($"discarding consumer work: {work.WorkType}"); + } + } + } } } } diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 87b292c30..b2ff4f2bc 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -161,17 +161,10 @@ public async Task WaitForShutdownAsync() { try { - await _worker - .ConfigureAwait(false); - - /* - * rabbitmq/rabbitmq-dotnet-client#1751 - * - * Wait for the worker first to ensure all items have been read out of the channel, - * otherwise the following will never return (https://stackoverflow.com/a/66521303) - */ await _reader.Completion .ConfigureAwait(false); + await _worker + .ConfigureAwait(false); } catch (AggregateException aex) {