Skip to content

Commit

Permalink
Add CreateConnectionAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Nov 10, 2023
1 parent 5c936ff commit 35b836d
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 29 deletions.
166 changes: 164 additions & 2 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
using System.Reflection;
using System.Security.Authentication;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -410,6 +411,19 @@ public IConnection CreateConnection()
return CreateConnection(ClientProvidedName);
}

/// <summary>
/// Asynchronously reate a connection to one of the endpoints provided by the IEndpointResolver
/// returned by the EndpointResolverFactory. By default the configured
/// hostname and port are used.
/// </summary>
/// <exception cref="BrokerUnreachableException">
/// When the configured hostname was not reachable.
/// </exception>
public ValueTask<IConnection> CreateConnectionAsync()
{
return CreateConnectionAsync(ClientProvidedName);
}

/// <summary>
/// Create a connection to one of the endpoints provided by the IEndpointResolver
/// returned by the EndpointResolverFactory. By default the configured
Expand All @@ -429,6 +443,25 @@ public IConnection CreateConnection(string clientProvidedName)
return CreateConnection(EndpointResolverFactory(LocalEndpoints()), clientProvidedName);
}

/// <summary>
/// Asynchronously create a connection to one of the endpoints provided by the IEndpointResolver
/// returned by the EndpointResolverFactory. By default the configured
/// hostname and port are used.
/// </summary>
/// <param name="clientProvidedName">
/// Application-specific connection name, will be displayed in the management UI
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
/// be used as a connection identifier, e.g. in HTTP API requests.
/// This value is supposed to be human-readable.
/// </param>
/// <exception cref="BrokerUnreachableException">
/// When the configured hostname was not reachable.
/// </exception>
public ValueTask<IConnection> CreateConnectionAsync(string clientProvidedName)
{
return CreateConnectionAsync(EndpointResolverFactory(LocalEndpoints()), clientProvidedName);
}

/// <summary>
/// Create a connection using a list of hostnames using the configured port.
/// By default each hostname is tried in a random order until a successful connection is
Expand All @@ -448,6 +481,25 @@ public IConnection CreateConnection(IList<string> hostnames)
return CreateConnection(hostnames, ClientProvidedName);
}

/// <summary>
/// Asynchronously create a connection using a list of hostnames using the configured port.
/// By default each hostname is tried in a random order until a successful connection is
/// found or the list is exhausted using the DefaultEndpointResolver.
/// The selection behaviour can be overridden by configuring the EndpointResolverFactory.
/// </summary>
/// <param name="hostnames">
/// List of hostnames to use for the initial
/// connection and recovery.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
public ValueTask<IConnection> CreateConnectionAsync(IList<string> hostnames)
{
return CreateConnectionAsync(hostnames, ClientProvidedName);
}

/// <summary>
/// Create a connection using a list of hostnames using the configured port.
/// By default each endpoint is tried in a random order until a successful connection is
Expand All @@ -474,6 +526,32 @@ public IConnection CreateConnection(IList<string> hostnames, string clientProvid
return CreateConnection(EndpointResolverFactory(endpoints), clientProvidedName);
}

/// <summary>
/// Asynchronously create a connection using a list of hostnames using the configured port.
/// By default each endpoint is tried in a random order until a successful connection is
/// found or the list is exhausted.
/// The selection behaviour can be overridden by configuring the EndpointResolverFactory.
/// </summary>
/// <param name="hostnames">
/// List of hostnames to use for the initial
/// connection and recovery.
/// </param>
/// <param name="clientProvidedName">
/// Application-specific connection name, will be displayed in the management UI
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
/// be used as a connection identifier, e.g. in HTTP API requests.
/// This value is supposed to be human-readable.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
public ValueTask<IConnection> CreateConnectionAsync(IList<string> hostnames, string clientProvidedName)
{
IEnumerable<AmqpTcpEndpoint> endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, Port, Ssl, MaxMessageSize));
return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName);
}

/// <summary>
/// Create a connection using a list of endpoints. By default each endpoint will be tried
/// in a random order until a successful connection is found or the list is exhausted.
Expand All @@ -492,6 +570,24 @@ public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
return CreateConnection(endpoints, ClientProvidedName);
}

/// <summary>
/// Asynchronously create a connection using a list of endpoints. By default each endpoint will be tried
/// in a random order until a successful connection is found or the list is exhausted.
/// The selection behaviour can be overridden by configuring the EndpointResolverFactory.
/// </summary>
/// <param name="endpoints">
/// List of endpoints to use for the initial
/// connection and recovery.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
public ValueTask<IConnection> CreateConnectionAsync(IList<AmqpTcpEndpoint> endpoints)
{
return CreateConnectionAsync(endpoints, ClientProvidedName);
}

/// <summary>
/// Create a connection using a list of endpoints. By default each endpoint will be tried
/// in a random order until a successful connection is found or the list is exhausted.
Expand All @@ -516,6 +612,30 @@ public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints, string cli
return CreateConnection(EndpointResolverFactory(endpoints), clientProvidedName);
}

/// <summary>
/// Asynchronously create a connection using a list of endpoints. By default each endpoint will be tried
/// in a random order until a successful connection is found or the list is exhausted.
/// The selection behaviour can be overridden by configuring the EndpointResolverFactory.
/// </summary>
/// <param name="endpoints">
/// List of endpoints to use for the initial
/// connection and recovery.
/// </param>
/// <param name="clientProvidedName">
/// Application-specific connection name, will be displayed in the management UI
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
/// be used as a connection identifier, e.g. in HTTP API requests.
/// This value is supposed to be human-readable.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
public ValueTask<IConnection> CreateConnectionAsync(IList<AmqpTcpEndpoint> endpoints, string clientProvidedName)
{
return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName);
}

/// <summary>
/// Create a connection using an IEndpointResolver.
/// </summary>
Expand All @@ -539,10 +659,52 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
{
if (AutomaticRecoveryEnabled)
{
return new AutorecoveringConnection(config, endpointResolver);
var c = new AutorecoveringConnection(config, endpointResolver);
return c.Open();
}
else
{
var c = new Connection(config, endpointResolver.SelectOne(CreateFrameHandler));
return c.Open();
}
}
catch (Exception e)
{
throw new BrokerUnreachableException(e);
}
}

return new Connection(config, endpointResolver.SelectOne(CreateFrameHandler));
/// <summary>
/// Asynchronously create a connection using an IEndpointResolver.
/// </summary>
/// <param name="endpointResolver">
/// The endpointResolver that returns the endpoints to use for the connection attempt.
/// </param>
/// <param name="clientProvidedName">
/// Application-specific connection name, will be displayed in the management UI
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
/// be used as a connection identifier, e.g. in HTTP API requests.
/// This value is supposed to be human-readable.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
public ValueTask<IConnection> CreateConnectionAsync(IEndpointResolver endpointResolver, string clientProvidedName)
{
ConnectionConfig config = CreateConfig(clientProvidedName);
try
{
if (AutomaticRecoveryEnabled)
{
var c = new AutorecoveringConnection(config, endpointResolver);
return c.OpenAsync();
}
else
{
var c = new Connection(config, endpointResolver.SelectOne(CreateFrameHandler));
return c.OpenAsync();
}
}
catch (Exception e)
{
Expand Down
73 changes: 72 additions & 1 deletion projects/RabbitMQ.Client/client/api/IConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

using System;
using System.Collections.Generic;

using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -102,6 +102,11 @@ public interface IConnectionFactory
/// </summary>
IConnection CreateConnection();

/// <summary>
/// Asynchronously create a connection to the specified endpoint.
/// </summary>
ValueTask<IConnection> CreateConnectionAsync();

/// <summary>
/// Create a connection to the specified endpoint.
/// </summary>
Expand All @@ -114,13 +119,32 @@ public interface IConnectionFactory
/// <returns>Open connection</returns>
IConnection CreateConnection(string clientProvidedName);

/// <summary>
/// Asynchronously create a connection to the specified endpoint.
/// </summary>
/// <param name="clientProvidedName">
/// Application-specific connection name, will be displayed in the management UI
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
/// be used as a connection identifier, e.g. in HTTP API requests.
/// This value is supposed to be human-readable.
/// </param>
/// <returns>Open connection</returns>
ValueTask<IConnection> CreateConnectionAsync(string clientProvidedName);

/// <summary>
/// Connects to the first reachable hostname from the list.
/// </summary>
/// <param name="hostnames">List of host names to use</param>
/// <returns>Open connection</returns>
IConnection CreateConnection(IList<string> hostnames);

/// <summary>
/// Asynchronously connects to the first reachable hostname from the list.
/// </summary>
/// <param name="hostnames">List of host names to use</param>
/// <returns>Open connection</returns>
ValueTask<IConnection> CreateConnectionAsync(IList<string> hostnames);

/// <summary>
/// Connects to the first reachable hostname from the list.
/// </summary>
Expand All @@ -134,6 +158,19 @@ public interface IConnectionFactory
/// <returns>Open connection</returns>
IConnection CreateConnection(IList<string> hostnames, string clientProvidedName);

/// <summary>
/// Asynchronously connects to the first reachable hostname from the list.
/// </summary>
/// <param name="hostnames">List of host names to use</param>
/// <param name="clientProvidedName">
/// Application-specific connection name, will be displayed in the management UI
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
/// be used as a connection identifier, e.g. in HTTP API requests.
/// This value is supposed to be human-readable.
/// </param>
/// <returns>Open connection</returns>
ValueTask<IConnection> CreateConnectionAsync(IList<string> hostnames, string clientProvidedName);

/// <summary>
/// Create a connection using a list of endpoints.
/// The selection behaviour can be overridden by configuring the EndpointResolverFactory.
Expand All @@ -148,6 +185,20 @@ public interface IConnectionFactory
/// </exception>
IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints);

/// <summary>
/// Asynchronously create a connection using a list of endpoints.
/// The selection behaviour can be overridden by configuring the EndpointResolverFactory.
/// </summary>
/// <param name="endpoints">
/// List of endpoints to use for the initial
/// connection and recovery.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
ValueTask<IConnection> CreateConnectionAsync(IList<AmqpTcpEndpoint> endpoints);

/// <summary>
/// Create a connection using a list of endpoints.
/// The selection behaviour can be overridden by configuring the EndpointResolverFactory.
Expand All @@ -168,6 +219,26 @@ public interface IConnectionFactory
/// </exception>
IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints, string clientProvidedName);

/// <summary>
/// Asynchronously create a connection using a list of endpoints.
/// The selection behaviour can be overridden by configuring the EndpointResolverFactory.
/// </summary>
/// <param name="endpoints">
/// List of endpoints to use for the initial
/// connection and recovery.
/// </param>
/// <param name="clientProvidedName">
/// Application-specific connection name, will be displayed in the management UI
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
/// be used as a connection identifier, e.g. in HTTP API requests.
/// This value is supposed to be human-readable.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
ValueTask<IConnection> CreateConnectionAsync(IList<AmqpTcpEndpoint> endpoints, string clientProvidedName);

/// <summary>
/// Amount of time protocol handshake operations are allowed to take before
/// timing out.
Expand Down
12 changes: 11 additions & 1 deletion projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private Connection InnerConnection
}
}

public AutorecoveringConnection(ConnectionConfig config, IEndpointResolver endpoints)
internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver endpoints)
{
_config = config;
_endpoints = endpoints;
Expand All @@ -78,6 +78,16 @@ public AutorecoveringConnection(ConnectionConfig config, IEndpointResolver endpo
ConnectionShutdown += HandleConnectionShutdown;
}

internal IConnection Open()
{
return InnerConnection.Open();
}

internal ValueTask<IConnection> OpenAsync()
{
return InnerConnection.OpenAsync();
}

public event EventHandler<EventArgs> RecoverySucceeded
{
add => _recoverySucceededWrapper.AddHandler(value);
Expand Down
8 changes: 0 additions & 8 deletions projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;
using RabbitMQ.Client.Logging;

namespace RabbitMQ.Client.Framing.Impl
{
Expand Down Expand Up @@ -70,13 +69,6 @@ internal void HandleConnectionUnblocked()
}
}

private async ValueTask OpenAsync()
{
RabbitMqClientEventSource.Log.ConnectionOpened();
await StartAndTuneAsync().ConfigureAwait(false);
await _channel0.ConnectionOpenAsync(_config.VirtualHost);
}

private async ValueTask StartAndTuneAsync()
{
var connectionStartCell = new TaskCompletionSource<ConnectionStartDetails>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down
Loading

0 comments on commit 35b836d

Please sign in to comment.