Skip to content

Commit

Permalink
Merge branch 'main' into otel-integration-package
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken authored Jun 3, 2024
2 parents 582bd5c + f8087b6 commit c3f2168
Show file tree
Hide file tree
Showing 23 changed files with 524 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace RabbitMQ
#if NETSTANDARD
internal static class DictionaryExtension
{
public static bool Remove<TKey, TValue>(this Dictionary<TKey, TValue> dictionary, TKey key, out TValue value)
public static bool Remove<TKey, TValue>(this IDictionary<TKey, TValue> dictionary, TKey key, out TValue value)
{
return dictionary.TryGetValue(key, out value) && dictionary.Remove(key);
}
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -952,9 +952,9 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func<RabbitMQ.Client.IReadOnlyBasicProperties, System.Diagnostics.ActivityContext>
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
Expand Down
25 changes: 23 additions & 2 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor

// just here to hold the value that was set through the setter
private Uri _uri;
private string _clientProvidedName;

/// <summary>
/// Amount of time protocol handshake operations are allowed to take before
Expand Down Expand Up @@ -367,7 +368,14 @@ public Uri Uri
/// <summary>
/// Default client provided name to be used for connections.
/// </summary>
public string ClientProvidedName { get; set; }
public string ClientProvidedName
{
get => _clientProvidedName;
set
{
_clientProvidedName = EnsureClientProvidedNameLength(value);
}
}

/// <summary>
/// Given a list of mechanism names supported by the server, select a preferred mechanism,
Expand Down Expand Up @@ -593,7 +601,7 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
CredentialsRefresher,
AuthMechanisms,
ClientProperties,
clientProvidedName,
EnsureClientProvidedNameLength(clientProvidedName),
RequestedChannelMax,
RequestedFrameMax,
MaxInboundMessageBodySize,
Expand Down Expand Up @@ -712,5 +720,18 @@ private List<AmqpTcpEndpoint> LocalEndpoints()
{
return new List<AmqpTcpEndpoint> { Endpoint };
}

private static string EnsureClientProvidedNameLength(string clientProvidedName)
{
if (clientProvidedName != null)
{
if (clientProvidedName.Length > InternalConstants.DefaultRabbitMqMaxClientProvideNameLength)
{
return clientProvidedName.Substring(0, InternalConstants.DefaultRabbitMqMaxClientProvideNameLength);
}
}

return clientProvidedName;
}
}
}
13 changes: 7 additions & 6 deletions projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ public static class IConnectionExtensions
/// (or closing), then this method will do nothing.
/// It can also throw <see cref="IOException"/> when socket was closed unexpectedly.
/// </remarks>
public static Task CloseAsync(this IConnection connection)
public static Task CloseAsync(this IConnection connection, CancellationToken cancellationToken = default)
{
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", InternalConstants.DefaultConnectionCloseTimeout, false,
CancellationToken.None);
cancellationToken);
}

/// <summary>
/// Asynchronously close this connection and all its channels.
/// </summary>
/// <remarks>
/// The method behaves in the same way as <see cref="CloseAsync(IConnection)"/>, with the only
/// The method behaves in the same way as <see cref="CloseAsync(IConnection, CancellationToken)"/>, with the only
/// difference that the connection is closed with the given connection close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP specification).
Expand All @@ -37,10 +37,11 @@ public static Task CloseAsync(this IConnection connection)
/// A message indicating the reason for closing the connection.
/// </para>
/// </remarks>
public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText)
public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText,
CancellationToken cancellationToken = default)
{
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionCloseTimeout, false,
CancellationToken.None);
cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -92,7 +93,7 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
/// </summary>
/// <remarks>
/// Note that all active channels and sessions will be closed if this method is called.
/// In comparison to normal <see cref="CloseAsync(IConnection)"/> method, <see cref="AbortAsync(IConnection)"/> will not throw
/// In comparison to normal <see cref="CloseAsync(IConnection, CancellationToken)"/> method, <see cref="AbortAsync(IConnection)"/> will not throw
/// <see cref="IOException"/> during closing connection.
///This method waits infinitely for the in-progress close operation to complete.
/// </remarks>
Expand Down
7 changes: 7 additions & 0 deletions projects/RabbitMQ.Client/client/api/InternalConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,12 @@ internal static class InternalConstants
/// configures the largest message size which should be lower than this maximum of 128MiB.
/// </summary>
internal const uint DefaultRabbitMqMaxInboundMessageBodySize = 1_048_576 * 128;

/// <summary>
/// Largest client provide name, in characters, allowed in RabbitMQ.
/// This is not configurable, but was discovered while working on this issue:
/// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/980
/// </summary>
internal const int DefaultRabbitMqMaxClientProvideNameLength = 3000;
}
}
3 changes: 1 addition & 2 deletions projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;

Expand Down Expand Up @@ -55,7 +54,7 @@ public Task InvokeAsync(object sender, T parameter)

private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter)
{
foreach (AsyncEventHandler<T> action in handlers.Cast<AsyncEventHandler<T>>())
foreach (AsyncEventHandler<T> action in handlers)
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public async Task<string> BasicConsumeAsync(string queue, bool autoAck, string c
{
string resultConsumerTag = await InnerChannel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal,
exclusive, arguments, consumer, cancellationToken)
.ConfigureAwait(false);
.ConfigureAwait(false) ?? throw new InvalidOperationException("basic.consume returned null consumer tag");
var rc = new RecordedConsumer(channel: this, consumer: consumer, consumerTag: resultConsumerTag,
queue: queue, autoAck: autoAck, exclusive: exclusive, arguments: arguments);
await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -400,6 +401,11 @@ await _recordedEntitiesSemaphore.WaitAsync()

private void DoDeleteRecordedConsumer(string consumerTag)
{
if (consumerTag is null)
{
throw new ArgumentNullException(nameof(consumerTag));
}

if (_recordedConsumers.Remove(consumerTag, out RecordedConsumer recordedConsumer))
{
DeleteAutoDeleteQueue(recordedConsumer.Queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,23 @@ static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
}
}

// happens when EOF is reached, e.g. due to RabbitMQ node
// connectivity loss or abrupt shutdown
if (args.Initiator == ShutdownInitiator.Library)
{
return true;
/*
* https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/826
* Happens when an AppDomain is unloaded
*/
if (args.Exception is ThreadAbortException &&
args.ReplyCode == Constants.InternalError)
{
return false;
}
else
{
// happens when EOF is reached, e.g. due to RabbitMQ node
// connectivity loss or abrupt shutdown
return true;
}
}

return false;
Expand Down
13 changes: 13 additions & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ private async Task MainLoop()
await ReceiveLoopAsync(mainLoopToken)
.ConfigureAwait(false);
}
#if NETSTANDARD
catch (ThreadAbortException taex)
{
/*
* https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/826
*/
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
"Thread aborted (AppDomain unloaded?)",
exception: taex);
HandleMainLoopException(ea);
}
#endif
catch (EndOfStreamException eose)
{
// Possible heartbeat exception
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

Expand All @@ -7,8 +9,8 @@ namespace RabbitMQ.Client.ConsumerDispatching
#nullable enable
internal abstract class ConsumerDispatcherBase
{
private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer();
private readonly Dictionary<string, IBasicConsumer> _consumers = new Dictionary<string, IBasicConsumer>();
private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer();
private readonly ConcurrentDictionary<string, IBasicConsumer> _consumers = new ConcurrentDictionary<string, IBasicConsumer>();

public IBasicConsumer? DefaultConsumer { get; set; }

Expand All @@ -18,26 +20,17 @@ protected ConsumerDispatcherBase()

protected void AddConsumer(IBasicConsumer consumer, string tag)
{
lock (_consumers)
{
_consumers[tag] = consumer;
}
_consumers[tag] = consumer;
}

protected IBasicConsumer GetConsumerOrDefault(string tag)
{
lock (_consumers)
{
return _consumers.TryGetValue(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer();
}
return _consumers.TryGetValue(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer();
}

public IBasicConsumer GetAndRemoveConsumer(string tag)
{
lock (_consumers)
{
return _consumers.Remove(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer();
}
return _consumers.Remove(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer();
}

public void Shutdown(ShutdownEventArgs reason)
Expand All @@ -54,14 +47,11 @@ public Task ShutdownAsync(ShutdownEventArgs reason)

private void DoShutdownConsumers(ShutdownEventArgs reason)
{
lock (_consumers)
foreach (KeyValuePair<string, IBasicConsumer> pair in _consumers.ToArray())
{
foreach (KeyValuePair<string, IBasicConsumer> pair in _consumers)
{
ShutdownConsumer(pair.Value, reason);
}
_consumers.Clear();
ShutdownConsumer(pair.Value, reason);
}
_consumers.Clear();
}

protected abstract void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason);
Expand All @@ -74,7 +64,7 @@ private void DoShutdownConsumers(ShutdownEventArgs reason)
[MethodImpl(MethodImplOptions.NoInlining)]
private IBasicConsumer GetDefaultOrFallbackConsumer()
{
return DefaultConsumer ?? fallbackConsumer;
return DefaultConsumer ?? s_fallbackConsumer;
}
}
}
3 changes: 1 addition & 2 deletions projects/RabbitMQ.Client/client/impl/EventingWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Linq;

namespace RabbitMQ.Client.Impl
{
Expand Down Expand Up @@ -53,7 +52,7 @@ public void Invoke(object sender, T parameter)
_handlers = handlers;
}

foreach (EventHandler<T> action in handlers.Cast<EventHandler<T>>())
foreach (EventHandler<T> action in handlers)
{
try
{
Expand Down
Loading

0 comments on commit c3f2168

Please sign in to comment.