Skip to content

Commit

Permalink
Add support for multiple Uris
Browse files Browse the repository at this point in the history
Part of #95

* Start by adding the bare minimum to support a list of Uris, and randomly selecting from that list when an Address is needed
  • Loading branch information
lukebakken committed Nov 25, 2024
1 parent 181f5cb commit 41d3098
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 32 deletions.
105 changes: 102 additions & 3 deletions RabbitMQ.AMQP.Client/ConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ConnectionSettingsBuilder
private uint _maxFrameSize = Consts.DefaultMaxFrameSize;
private SaslMechanism _saslMechanism = Client.SaslMechanism.Anonymous;
private IRecoveryConfiguration _recoveryConfiguration = new RecoveryConfiguration();
private IList<Uri>? _uris;
private List<Uri>? _uris;

public static ConnectionSettingsBuilder Create()
{
Expand Down Expand Up @@ -127,13 +127,21 @@ public ConnectionSettings Build()
public class ConnectionSettings : IEquatable<ConnectionSettings>
{
private readonly Address _address;
private readonly List<Address> _addresses = new();
private readonly string _virtualHost = "/";
private readonly string _containerId = "";
private readonly uint _maxFrameSize = Consts.DefaultMaxFrameSize;
private readonly TlsSettings? _tlsSettings;
private readonly SaslMechanism _saslMechanism = SaslMechanism.Plain;
private readonly IRecoveryConfiguration _recoveryConfiguration = new RecoveryConfiguration();

/*
* TODO: support these:
SaslMechanism saslMechanism,
IRecoveryConfiguration recoveryConfiguration,
uint maxFrameSize = Consts.DefaultMaxFrameSize,
TlsSettings? tlsSettings = null)
*/
public ConnectionSettings(Uri uri)
{
string? user = null;
Expand Down Expand Up @@ -172,13 +180,84 @@ public ConnectionSettings(Uri uri)
password: password,
path: "/",
scheme: uri.Scheme);
_addresses.Add(_address);

if (_address.UseSsl && _tlsSettings == null)
{
_tlsSettings = new TlsSettings();
}
}

/*
* TODO: support these:
SaslMechanism saslMechanism,
IRecoveryConfiguration recoveryConfiguration,
uint maxFrameSize = Consts.DefaultMaxFrameSize,
TlsSettings? tlsSettings = null)
*/
public ConnectionSettings(IEnumerable<Uri> uris)
{
string? tmpVirtualHost = null;

foreach (Uri uri in uris)
{
string? user = null;
string? password = null;
string userInfo = uri.UserInfo;
if (!string.IsNullOrEmpty(userInfo))
{
string[] userPass = userInfo.Split(':');
if (userPass.Length > 2)
{
throw new ArgumentException($"Bad user info in AMQP URI: {userInfo}");
}

user = UriDecode(userPass[0]);
if (userPass.Length == 2)
{
password = UriDecode(userPass[1]);
}
}

// C# automatically changes URIs into a canonical form
// that has at least the path segment "/"
if (uri.Segments.Length > 2)
{
throw new ArgumentException($"Multiple segments in path of AMQP URI: {string.Join(", ", uri.Segments)}");
}

if (uri.Segments.Length == 2)
{
if (tmpVirtualHost is null)
{
tmpVirtualHost = UriDecode(uri.Segments[1]);
}
else
{
string thisVirtualHost = UriDecode(uri.Segments[1]);
if (false == thisVirtualHost.Equals(tmpVirtualHost, StringComparison.InvariantCultureIgnoreCase))
{
throw new ArgumentException($"All AMQP URIs must use the same virtual host. Expected '{tmpVirtualHost}', got '{thisVirtualHost}'");
}
}
}

var address = new Address(host: uri.Host,
port: uri.Port,
user: user,
password: password,
path: "/",
scheme: uri.Scheme);
_addresses.Add(address);
}

_address = _addresses[0];
if (tmpVirtualHost is not null)
{
_virtualHost = tmpVirtualHost;
}
}

public ConnectionSettings(string scheme, string host, int port,
string? user, string? password,
string virtualHost, string containerId,
Expand All @@ -190,6 +269,7 @@ public ConnectionSettings(string scheme, string host, int port,
_address = new Address(host: host, port: port,
user: user, password: password,
path: "/", scheme: scheme);
_addresses.Add(_address);
_containerId = containerId;
_virtualHost = virtualHost;
_saslMechanism = saslMechanism;
Expand Down Expand Up @@ -224,9 +304,28 @@ public ConnectionSettings(string scheme, string host, int port,
public SaslMechanism SaslMechanism => _saslMechanism;
public TlsSettings? TlsSettings => _tlsSettings;
public IRecoveryConfiguration Recovery => _recoveryConfiguration;
public IEnumerable<Uri>? Uris => throw new NotImplementedException();

internal Address Address => _address;
internal Address Address
{
get
{
if (_addresses.Count == 1)
{
if (false == Object.ReferenceEquals(_address, _addresses[0]))
{
InternalBugException.CreateAndThrow("_address must be same object as _addresses[0]");
}
return _address;
}
else
{
// Note: the max value for RandomNext is not inclusive
return _addresses[Utils.RandomNext(0, _addresses.Count)];
}
}
}

internal List<Address> Addresses => _addresses;

public override string ToString()
{
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.AMQP.Client/IEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public interface IEnvironment
/// Close all connections.
/// </summary>
/// <returns></returns>
// TODO cancellation token
Task CloseAsync();
}
}
24 changes: 6 additions & 18 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly AmqpManagement _management;
private readonly RecordingTopologyListener _recordingTopologyListener = new();

internal readonly ConnectionSettings _connectionSettings;
private readonly ConnectionSettings _connectionSettings;
private readonly IMetricsReporter? _metricsReporter;

// TODO this is coupled with publishers and consumers
internal readonly AmqpSessionManagement _nativePubSubSessions;

private readonly Dictionary<string, object> _connectionProperties = new();
Expand Down Expand Up @@ -350,23 +352,9 @@ void OnOpened(Amqp.IConnection connection, Open openOnOpened)

try
{
ConnectionSettings connectionSettings;
if (_connectionSettings is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException(
"_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
else
{
// TODO
// There is absolutely NO POINT in having an interface if this
// is what will be done!
connectionSettings = (ConnectionSettings)_connectionSettings;
Address address = connectionSettings.Address;
_nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened)
.ConfigureAwait(false);
}
Address address = _connectionSettings.Address;
_nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened)
.ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down
6 changes: 5 additions & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public Task<IConnection> CreateConnectionAsync()
public ReadOnlyCollection<IConnection> GetConnections() =>
new(_connections.Values.ToList());

public Task CloseAsync() => Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
// TODO cancellation token
public Task CloseAsync()
{
return Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
}
}
}
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message) ->
RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message, System.Exception! innerException) -> void
RabbitMQ.AMQP.Client.ConnectionSettings
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user, string? password, string! virtualHost, string! containerId, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration, uint maxFrameSize = 0, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Collections.Generic.IEnumerable<System.Uri!>! uris) -> void
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri) -> void
RabbitMQ.AMQP.Client.ConnectionSettings.ContainerId.get -> string!
RabbitMQ.AMQP.Client.ConnectionSettings.Host.get -> string!
Expand All @@ -69,7 +70,6 @@ RabbitMQ.AMQP.Client.ConnectionSettings.Recovery.get -> RabbitMQ.AMQP.Client.IRe
RabbitMQ.AMQP.Client.ConnectionSettings.SaslMechanism.get -> RabbitMQ.AMQP.Client.SaslMechanism!
RabbitMQ.AMQP.Client.ConnectionSettings.Scheme.get -> string!
RabbitMQ.AMQP.Client.ConnectionSettings.TlsSettings.get -> RabbitMQ.AMQP.Client.TlsSettings?
RabbitMQ.AMQP.Client.ConnectionSettings.Uris.get -> System.Collections.Generic.IEnumerable<System.Uri!>?
RabbitMQ.AMQP.Client.ConnectionSettings.User.get -> string?
RabbitMQ.AMQP.Client.ConnectionSettings.UseSsl.get -> bool
RabbitMQ.AMQP.Client.ConnectionSettings.VirtualHost.get -> string!
Expand Down
19 changes: 10 additions & 9 deletions Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -15,7 +16,7 @@ public class ClusterTests(ITestOutputHelper testOutputHelper)
: IntegrationTest(testOutputHelper, setupConnectionAndManagement: false)
{
[SkippableFact]
public Task CreateConnectionWithEnvironmentAndMultipleUris()
public async Task CreateConnectionWithEnvironmentAndMultipleUris()
{
Skip.IfNot(IsCluster);

Expand All @@ -31,16 +32,16 @@ public Task CreateConnectionWithEnvironmentAndMultipleUris()
connectionSettingBuilder.Uris(uris);
ConnectionSettings connectionSettings = connectionSettingBuilder.Build();

/*
IEnvironment env = AmqpEnvironment.Create(ConnectionSettingBuilder.Create().Build());
IConnection connection = await env.CreateConnectionAsync();
Assert.NotNull(connection);
IEnvironment env = AmqpEnvironment.Create(connectionSettings);

// Note: by using _connection, the test will dispose the object on teardown
_connection = await env.CreateConnectionAsync();
Assert.NotNull(_connection);
Assert.NotEmpty(env.GetConnections());

await env.CloseAsync();
Assert.Equal(State.Closed, connection.State);
Assert.Empty(env.GetConnections());
*/

return Task.CompletedTask;
Assert.Equal(State.Closed, _connection.State);
Assert.Empty(env.GetConnections());
}
}
47 changes: 47 additions & 0 deletions Tests/ConnectionTests/ConnectionSettingsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
Expand Down Expand Up @@ -103,6 +104,52 @@ public void ConnectionSettingsViaUri()
Assert.Equal(scheme, connectionSettings.Scheme);
}

[Fact]
public void ConnectionSettingsViaUris()
{
const string scheme = "amqps";
const string host = "rabbitmq-host.foo.baz.com";
const string vhost = "/frazzle";
string user = RandomString(10);
string pass = RandomString(10);

var uri0 = new Uri($"{scheme}://{user}:{pass}@{host}:5671/%2Ffrazzle");
var uri1 = new Uri($"{scheme}://{user}:{pass}@{host}:5681/%2Ffrazzle");
var uri2 = new Uri($"{scheme}://{user}:{pass}@{host}:5691/%2Ffrazzle");

List<Uri> uris = [uri0, uri1, uri2];
var connectionSettings = new ConnectionSettings(uris);

Assert.True(connectionSettings.UseSsl);
Assert.Equal(host, connectionSettings.Host);
Assert.Equal(5671, connectionSettings.Port);
Assert.Equal(user, connectionSettings.User);
Assert.Equal(pass, connectionSettings.Password);
Assert.Equal(vhost, connectionSettings.VirtualHost);
Assert.Equal(scheme, connectionSettings.Scheme);

Amqp.Address a0 = connectionSettings.Addresses[0];
Assert.Equal(host, a0.Host);
Assert.Equal(5671, a0.Port);
Assert.Equal(user, a0.User);
Assert.Equal(pass, a0.Password);
Assert.Equal(scheme, a0.Scheme);

Amqp.Address a1 = connectionSettings.Addresses[1];
Assert.Equal(host, a1.Host);
Assert.Equal(5681, a1.Port);
Assert.Equal(user, a1.User);
Assert.Equal(pass, a1.Password);
Assert.Equal(scheme, a1.Scheme);

Amqp.Address a2 = connectionSettings.Addresses[2];
Assert.Equal(host, a2.Host);
Assert.Equal(5691, a2.Port);
Assert.Equal(user, a2.User);
Assert.Equal(pass, a2.Password);
Assert.Equal(scheme, a2.Scheme);
}

[Fact]
public async Task RaiseErrorsIfTheParametersAreNotValid()
{
Expand Down

0 comments on commit 41d3098

Please sign in to comment.