Skip to content

Commit

Permalink
Track publisher confirmations automatically
Browse files Browse the repository at this point in the history
Fixes #1682

* Remove `ConfirmSelectAsync` from `IChannel`
* Add parameters to enable confirmations on `IConnection.CreateChannelAsync`
* Remove / comment out all use of `WaitForConfirms...` methods.
* Dispose -> DisposeAsync
* Implement confirmation tracking and await-ing in `BasicPublishAsync`
* Ensure exceptions make into inner exception for `HardProtocolException`
* Remove commented-out code related to `WaitForConfirms...` methods.
* Unblock so that `CloseAsync` can succeed.
* Introduce channel creation options
* Allow cancellation of final await for publisher confirmation in `BasicPublishAsync`
* Fix `dotnet format` verification error
* Make `ConfirmSelectAsync` `private` and assume that semaphore is held.
* Track sequence number for basic.return
* Implement `basic.return` support.
* Fix the code that adds OTel and publish sequence number headers.
* Add publish sequence number as long as `_publisherConfirmationsEnabled` is true
* Fix the `PublisherConfirms` test program
* Add license headers
* Enable publisher confirms
* Spike an exception based approach (misses removing the bool value return type)
* Extend the use of `_confirmSemaphore` to the duration of when exceptions could be caught.
* Restore how @danielmarbach serialized the publish sequence number.
* Fix bug in how headers are added to `BasicProperties` that don't already have them.
* Use `ValueTask` as the `BasicPublishAsync` return value.

---------

Co-authored-by: Daniel Marbach <[email protected]>
Co-authored-by: Luke Bakken <[email protected]>

* Add `PublishException` class.

* Test that non-routable messages result in `PublishException` with `IsReturn = true`

* Code documentation

Simplify code.
  • Loading branch information
lukebakken committed Oct 8, 2024
1 parent c82e567 commit e93adfa
Show file tree
Hide file tree
Showing 52 changed files with 892 additions and 858 deletions.
8 changes: 7 additions & 1 deletion projects/RabbitMQ.Client/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,15 @@ public static class Constants
/// <summary>
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
/// to set this value for every channel created on a connection,
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
/// and <see cref="IConnection.CreateChannelAsync(CreateChannelOptions?, System.Threading.CancellationToken)" />
/// for setting this value for a particular channel.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;

/// <summary>
/// The message header used to track publish sequence numbers, to allow correlation when
/// <c>basic.return</c> is sent via the broker.
/// </summary>
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";
}
}
35 changes: 35 additions & 0 deletions projects/RabbitMQ.Client/CreateChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace RabbitMQ.Client
{
/// <summary>
/// Channel creation options.
/// </summary>
public sealed class CreateChannelOptions
{
/// <summary>
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
/// </summary>
public bool PublisherConfirmationsEnabled { get; set; } = false;

/// <summary>
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
/// </summary>
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// </summary>
public ushort? ConsumerDispatchConcurrency { get; set; } = null;

/// <summary>
/// The default channel options.
/// </summary>
public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
}
}
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
/// <summary>
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default)
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
Exception exception, CancellationToken cancellationToken = default)
: this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken)
{
_exception = exception ?? throw new ArgumentNullException(nameof(exception));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,30 @@ namespace RabbitMQ.Client.Exceptions
public class OperationInterruptedException
: RabbitMQClientException
{
///<summary>Construct an OperationInterruptedException with
///the passed-in explanation, if any.</summary>
public OperationInterruptedException(ShutdownEventArgs? reason)
: base(reason is null ? "The AMQP operation was interrupted" :
$"The AMQP operation was interrupted: {reason}")
///<summary>
///Construct an OperationInterruptedException
///</summary>
public OperationInterruptedException() : base("The AMQP operation was interrupted")
{
ShutdownReason = reason;
}

///<summary>Construct an OperationInterruptedException with
///the passed-in explanation and prefix, if any.</summary>
public OperationInterruptedException(ShutdownEventArgs? reason, string prefix)
: base(reason is null ? $"{prefix}: The AMQP operation was interrupted" :
$"{prefix}: The AMQP operation was interrupted: {reason}")
{
ShutdownReason = reason;
}

protected OperationInterruptedException()
///<summary>
///Construct an OperationInterruptedException with
///the passed-in explanation, if any.
///</summary>
public OperationInterruptedException(ShutdownEventArgs reason)
: base($"The AMQP operation was interrupted: {reason}", reason.Exception)
{
ShutdownReason = reason;
}

protected OperationInterruptedException(string message) : base(message)
{
}

protected OperationInterruptedException(string message, Exception inner)
: base(message, inner)
///<summary>Construct an OperationInterruptedException with
///the passed-in explanation and prefix, if any.</summary>
public OperationInterruptedException(ShutdownEventArgs reason, string prefix)
: base($"{prefix}: The AMQP operation was interrupted: {reason}", reason.Exception)
{
ShutdownReason = reason;
}

///<summary>Retrieves the explanation for the shutdown. May
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ namespace RabbitMQ.Client.Exceptions
[Serializable]
public class ProtocolViolationException : RabbitMQClientException
{
public ProtocolViolationException(string message) : base(message)
public ProtocolViolationException() : base()
{
}
public ProtocolViolationException(string message, Exception inner) : base(message, inner)

public ProtocolViolationException(string message) : base(message)
{
}
public ProtocolViolationException()

public ProtocolViolationException(string message, Exception inner) : base(message, inner)
{
}
}
Expand Down
66 changes: 66 additions & 0 deletions projects/RabbitMQ.Client/Exceptions/PublishException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;

namespace RabbitMQ.Client.Exceptions
{
/// <summary>
/// Class for exceptions related to publisher confirmations
/// or the <c>mandatory</c> flag.
/// </summary>
public class PublishException : RabbitMQClientException
{
private bool _isReturn = false;
private ulong _publishSequenceNumber = ulong.MinValue;

public PublishException(ulong publishSequenceNumber, bool isReturn) : base()
{
if (publishSequenceNumber == ulong.MinValue)
{
throw new ArgumentException($"{nameof(publishSequenceNumber)} must not be 0");
}

_isReturn = isReturn;
_publishSequenceNumber = publishSequenceNumber;
}

/// <summary>
/// <c>true</c> if this exception is due to a <c>basic.return</c>
/// </summary>
public bool IsReturn => _isReturn;

/// <summary>
/// Retrieve the publish sequence number.
/// </summary>
public ulong PublishSequenceNumber => _publishSequenceNumber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,21 @@ namespace RabbitMQ.Client.Exceptions
public abstract class RabbitMQClientException : Exception
{
/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class.</summary>
protected RabbitMQClientException()
protected RabbitMQClientException() : base()
{

}

/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class with a specified error message.</summary>
/// <param name="message">The message that describes the error. </param>
protected RabbitMQClientException(string message) : base(message)
{

}

/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class with a specified error message and a reference to the inner exception that is the cause of this exception.</summary>
/// <param name="message">The error message that explains the reason for the exception. </param>
/// <param name="innerException">The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. </param>
protected RabbitMQClientException(string message, Exception innerException) : base(message, innerException)
protected RabbitMQClientException(string message, Exception? innerException) : base(message, innerException)
{

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
namespace RabbitMQ.Client.Exceptions
{
/// <summary>
/// TODO WHY IS THIS UNREFERENCED?
/// Thrown when the channel receives an RPC reply that it wasn't expecting.
/// </summary>
[Serializable]
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/Framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

namespace RabbitMQ.Client.Framing
{
// TODO merge into ChannelBase
internal class Channel : ChannelBase
{
public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
Expand Down
36 changes: 2 additions & 34 deletions projects/RabbitMQ.Client/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ Task<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
Expand All @@ -221,6 +222,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
Expand Down Expand Up @@ -265,14 +267,6 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
Task CloseAsync(ShutdownEventArgs reason, bool abort,
CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously enable publisher confirmations.
/// </summary>
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcksAsync"/> and <see cref="BasicNacksAsync"/> yourself.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
Task ConfirmSelectAsync(bool trackConfirmations = true,
CancellationToken cancellationToken = default);

/// <summary>Asynchronously declare an exchange.</summary>
/// <param name="exchange">The name of the exchange.</param>
/// <param name="type">The type of the exchange.</param>
Expand Down Expand Up @@ -451,32 +445,6 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey,
/// <param name="cancellationToken">The cancellation token.</param>
Task TxSelectAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously wait until all published messages on this channel have been confirmed.
/// </summary>
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// Waits until all messages published on this channel since the last call have
/// been either ack'd or nack'd by the server. Returns whether
/// all the messages were ack'd (and none were nack'd).
/// Throws an exception when called on a channel
/// that does not have publisher confirms enabled.
/// </remarks>
Task<bool> WaitForConfirmsAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Wait until all published messages on this channel have been confirmed.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// Waits until all messages published on this channel since the last call have
/// been ack'd by the server. If a nack is received or the timeout
/// elapses, throws an IOException exception immediately and closes
/// the channel.
/// </remarks>
Task WaitForConfirmsOrDieAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
/// timing out.
Expand Down
13 changes: 3 additions & 10 deletions projects/RabbitMQ.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,11 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
/// <param name="consumerDispatchConcurrency">
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// <param name="options">
/// The channel creation options.
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
CancellationToken cancellationToken = default);
}
}
Loading

0 comments on commit e93adfa

Please sign in to comment.