diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 82d7ff6d2..ade86d380 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -36,6 +36,7 @@ using System.Reflection; using System.Security.Authentication; using System.Text; +using System.Threading.Tasks; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Client.Impl; @@ -410,6 +411,19 @@ public IConnection CreateConnection() return CreateConnection(ClientProvidedName); } + /// + /// Asynchronously reate a connection to one of the endpoints provided by the IEndpointResolver + /// returned by the EndpointResolverFactory. By default the configured + /// hostname and port are used. + /// + /// + /// When the configured hostname was not reachable. + /// + public ValueTask CreateConnectionAsync() + { + return CreateConnectionAsync(ClientProvidedName); + } + /// /// Create a connection to one of the endpoints provided by the IEndpointResolver /// returned by the EndpointResolverFactory. By default the configured @@ -429,6 +443,25 @@ public IConnection CreateConnection(string clientProvidedName) return CreateConnection(EndpointResolverFactory(LocalEndpoints()), clientProvidedName); } + /// + /// Asynchronously create a connection to one of the endpoints provided by the IEndpointResolver + /// returned by the EndpointResolverFactory. By default the configured + /// hostname and port are used. + /// + /// + /// Application-specific connection name, will be displayed in the management UI + /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot + /// be used as a connection identifier, e.g. in HTTP API requests. + /// This value is supposed to be human-readable. + /// + /// + /// When the configured hostname was not reachable. + /// + public ValueTask CreateConnectionAsync(string clientProvidedName) + { + return CreateConnectionAsync(EndpointResolverFactory(LocalEndpoints()), clientProvidedName); + } + /// /// Create a connection using a list of hostnames using the configured port. /// By default each hostname is tried in a random order until a successful connection is @@ -448,6 +481,25 @@ public IConnection CreateConnection(IList hostnames) return CreateConnection(hostnames, ClientProvidedName); } + /// + /// Asynchronously create a connection using a list of hostnames using the configured port. + /// By default each hostname is tried in a random order until a successful connection is + /// found or the list is exhausted using the DefaultEndpointResolver. + /// The selection behaviour can be overridden by configuring the EndpointResolverFactory. + /// + /// + /// List of hostnames to use for the initial + /// connection and recovery. + /// + /// Open connection + /// + /// When no hostname was reachable. + /// + public ValueTask CreateConnectionAsync(IList hostnames) + { + return CreateConnectionAsync(hostnames, ClientProvidedName); + } + /// /// Create a connection using a list of hostnames using the configured port. /// By default each endpoint is tried in a random order until a successful connection is @@ -474,6 +526,32 @@ public IConnection CreateConnection(IList hostnames, string clientProvid return CreateConnection(EndpointResolverFactory(endpoints), clientProvidedName); } + /// + /// Asynchronously create a connection using a list of hostnames using the configured port. + /// By default each endpoint is tried in a random order until a successful connection is + /// found or the list is exhausted. + /// The selection behaviour can be overridden by configuring the EndpointResolverFactory. + /// + /// + /// List of hostnames to use for the initial + /// connection and recovery. + /// + /// + /// Application-specific connection name, will be displayed in the management UI + /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot + /// be used as a connection identifier, e.g. in HTTP API requests. + /// This value is supposed to be human-readable. + /// + /// Open connection + /// + /// When no hostname was reachable. + /// + public ValueTask CreateConnectionAsync(IList hostnames, string clientProvidedName) + { + IEnumerable endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, Port, Ssl, MaxMessageSize)); + return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName); + } + /// /// Create a connection using a list of endpoints. By default each endpoint will be tried /// in a random order until a successful connection is found or the list is exhausted. @@ -492,6 +570,24 @@ public IConnection CreateConnection(IList endpoints) return CreateConnection(endpoints, ClientProvidedName); } + /// + /// Asynchronously create a connection using a list of endpoints. By default each endpoint will be tried + /// in a random order until a successful connection is found or the list is exhausted. + /// The selection behaviour can be overridden by configuring the EndpointResolverFactory. + /// + /// + /// List of endpoints to use for the initial + /// connection and recovery. + /// + /// Open connection + /// + /// When no hostname was reachable. + /// + public ValueTask CreateConnectionAsync(IList endpoints) + { + return CreateConnectionAsync(endpoints, ClientProvidedName); + } + /// /// Create a connection using a list of endpoints. By default each endpoint will be tried /// in a random order until a successful connection is found or the list is exhausted. @@ -516,6 +612,30 @@ public IConnection CreateConnection(IList endpoints, string cli return CreateConnection(EndpointResolverFactory(endpoints), clientProvidedName); } + /// + /// Asynchronously create a connection using a list of endpoints. By default each endpoint will be tried + /// in a random order until a successful connection is found or the list is exhausted. + /// The selection behaviour can be overridden by configuring the EndpointResolverFactory. + /// + /// + /// List of endpoints to use for the initial + /// connection and recovery. + /// + /// + /// Application-specific connection name, will be displayed in the management UI + /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot + /// be used as a connection identifier, e.g. in HTTP API requests. + /// This value is supposed to be human-readable. + /// + /// Open connection + /// + /// When no hostname was reachable. + /// + public ValueTask CreateConnectionAsync(IList endpoints, string clientProvidedName) + { + return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName); + } + /// /// Create a connection using an IEndpointResolver. /// @@ -539,10 +659,52 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c { if (AutomaticRecoveryEnabled) { - return new AutorecoveringConnection(config, endpointResolver); + var c = new AutorecoveringConnection(config, endpointResolver); + return c.Open(); + } + else + { + var c = new Connection(config, endpointResolver.SelectOne(CreateFrameHandler)); + return c.Open(); } + } + catch (Exception e) + { + throw new BrokerUnreachableException(e); + } + } - return new Connection(config, endpointResolver.SelectOne(CreateFrameHandler)); + /// + /// Asynchronously create a connection using an IEndpointResolver. + /// + /// + /// The endpointResolver that returns the endpoints to use for the connection attempt. + /// + /// + /// Application-specific connection name, will be displayed in the management UI + /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot + /// be used as a connection identifier, e.g. in HTTP API requests. + /// This value is supposed to be human-readable. + /// + /// Open connection + /// + /// When no hostname was reachable. + /// + public ValueTask CreateConnectionAsync(IEndpointResolver endpointResolver, string clientProvidedName) + { + ConnectionConfig config = CreateConfig(clientProvidedName); + try + { + if (AutomaticRecoveryEnabled) + { + var c = new AutorecoveringConnection(config, endpointResolver); + return c.OpenAsync(); + } + else + { + var c = new Connection(config, endpointResolver.SelectOne(CreateFrameHandler)); + return c.OpenAsync(); + } } catch (Exception e) { diff --git a/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs index b1480cf88..1c9e63fcf 100644 --- a/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/IConnectionFactory.cs @@ -31,7 +31,7 @@ using System; using System.Collections.Generic; - +using System.Threading.Tasks; using RabbitMQ.Client.Exceptions; namespace RabbitMQ.Client @@ -102,6 +102,11 @@ public interface IConnectionFactory /// IConnection CreateConnection(); + /// + /// Asynchronously create a connection to the specified endpoint. + /// + ValueTask CreateConnectionAsync(); + /// /// Create a connection to the specified endpoint. /// @@ -114,6 +119,18 @@ public interface IConnectionFactory /// Open connection IConnection CreateConnection(string clientProvidedName); + /// + /// Asynchronously create a connection to the specified endpoint. + /// + /// + /// Application-specific connection name, will be displayed in the management UI + /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot + /// be used as a connection identifier, e.g. in HTTP API requests. + /// This value is supposed to be human-readable. + /// + /// Open connection + ValueTask CreateConnectionAsync(string clientProvidedName); + /// /// Connects to the first reachable hostname from the list. /// @@ -121,6 +138,13 @@ public interface IConnectionFactory /// Open connection IConnection CreateConnection(IList hostnames); + /// + /// Asynchronously connects to the first reachable hostname from the list. + /// + /// List of host names to use + /// Open connection + ValueTask CreateConnectionAsync(IList hostnames); + /// /// Connects to the first reachable hostname from the list. /// @@ -134,6 +158,19 @@ public interface IConnectionFactory /// Open connection IConnection CreateConnection(IList hostnames, string clientProvidedName); + /// + /// Asynchronously connects to the first reachable hostname from the list. + /// + /// List of host names to use + /// + /// Application-specific connection name, will be displayed in the management UI + /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot + /// be used as a connection identifier, e.g. in HTTP API requests. + /// This value is supposed to be human-readable. + /// + /// Open connection + ValueTask CreateConnectionAsync(IList hostnames, string clientProvidedName); + /// /// Create a connection using a list of endpoints. /// The selection behaviour can be overridden by configuring the EndpointResolverFactory. @@ -148,6 +185,20 @@ public interface IConnectionFactory /// IConnection CreateConnection(IList endpoints); + /// + /// Asynchronously create a connection using a list of endpoints. + /// The selection behaviour can be overridden by configuring the EndpointResolverFactory. + /// + /// + /// List of endpoints to use for the initial + /// connection and recovery. + /// + /// Open connection + /// + /// When no hostname was reachable. + /// + ValueTask CreateConnectionAsync(IList endpoints); + /// /// Create a connection using a list of endpoints. /// The selection behaviour can be overridden by configuring the EndpointResolverFactory. @@ -168,6 +219,26 @@ public interface IConnectionFactory /// IConnection CreateConnection(IList endpoints, string clientProvidedName); + /// + /// Asynchronously create a connection using a list of endpoints. + /// The selection behaviour can be overridden by configuring the EndpointResolverFactory. + /// + /// + /// List of endpoints to use for the initial + /// connection and recovery. + /// + /// + /// Application-specific connection name, will be displayed in the management UI + /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot + /// be used as a connection identifier, e.g. in HTTP API requests. + /// This value is supposed to be human-readable. + /// + /// Open connection + /// + /// When no hostname was reachable. + /// + ValueTask CreateConnectionAsync(IList endpoints, string clientProvidedName); + /// /// Amount of time protocol handshake operations are allowed to take before /// timing out. diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 77b9b6404..ab9ee6040 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -58,7 +58,7 @@ private Connection InnerConnection } } - public AutorecoveringConnection(ConnectionConfig config, IEndpointResolver endpoints) + internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver endpoints) { _config = config; _endpoints = endpoints; @@ -78,6 +78,16 @@ public AutorecoveringConnection(ConnectionConfig config, IEndpointResolver endpo ConnectionShutdown += HandleConnectionShutdown; } + internal IConnection Open() + { + return InnerConnection.Open(); + } + + internal ValueTask OpenAsync() + { + return InnerConnection.OpenAsync(); + } + public event EventHandler RecoverySucceeded { add => _recoverySucceededWrapper.AddHandler(value); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index ea561d698..a753e251b 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -36,7 +36,6 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Impl; -using RabbitMQ.Client.Logging; namespace RabbitMQ.Client.Framing.Impl { @@ -70,13 +69,6 @@ internal void HandleConnectionUnblocked() } } - private async ValueTask OpenAsync() - { - RabbitMqClientEventSource.Log.ConnectionOpened(); - await StartAndTuneAsync().ConfigureAwait(false); - await _channel0.ConnectionOpenAsync(_config.VirtualHost); - } - private async ValueTask StartAndTuneAsync() { var connectionStartCell = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index e135d91b3..423fe987b 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -58,7 +58,7 @@ internal sealed partial class Connection : IConnection private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); - public Connection(ConnectionConfig config, IFrameHandler frameHandler) + internal Connection(ConnectionConfig config, IFrameHandler frameHandler) { _config = config; _frameHandler = frameHandler; @@ -78,23 +78,7 @@ public Connection(ConnectionConfig config, IFrameHandler frameHandler) ["capabilities"] = Protocol.Capabilities, ["connection_name"] = ClientProvidedName }; - _mainLoopTask = Task.Run(MainLoop); - try - { - /* - * TODO FUTURE - * Connection should not happen in ctor, instead change - * the API so that it's awaitable - */ - OpenAsync().AsTask().GetAwaiter().GetResult(); - } - catch - { - var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen"); - Close(ea, true, TimeSpan.FromSeconds(5)); - throw; - } } public Guid Id => _id; @@ -227,6 +211,29 @@ internal void TakeOver(Connection other) _connectionShutdownWrapper.Takeover(other._connectionShutdownWrapper); } + internal IConnection Open() + { + return OpenAsync().GetAwaiter().GetResult(); + } + + internal async ValueTask OpenAsync() + { + try + { + RabbitMqClientEventSource.Log.ConnectionOpened(); + await StartAndTuneAsync().ConfigureAwait(false); + await _channel0.ConnectionOpenAsync(_config.VirtualHost); + return this; + } + catch + { + var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen"); + // TODO CloseAsync + Close(ea, true, TimeSpan.FromSeconds(5)); + throw; + } + } + public IChannel CreateChannel() { EnsureIsOpen(); diff --git a/projects/Test/Unit/APIApproval.Approve.verified.txt b/projects/Test/Unit/APIApproval.Approve.verified.txt index 898e4c60c..1902c40c2 100644 --- a/projects/Test/Unit/APIApproval.Approve.verified.txt +++ b/projects/Test/Unit/APIApproval.Approve.verified.txt @@ -218,6 +218,13 @@ namespace RabbitMQ.Client public RabbitMQ.Client.IConnection CreateConnection(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName) { } public RabbitMQ.Client.IConnection CreateConnection(System.Collections.Generic.IList endpoints, string clientProvidedName) { } public RabbitMQ.Client.IConnection CreateConnection(System.Collections.Generic.IList hostnames, string clientProvidedName) { } + public System.Threading.Tasks.ValueTask CreateConnectionAsync() { } + public System.Threading.Tasks.ValueTask CreateConnectionAsync(System.Collections.Generic.IList endpoints) { } + public System.Threading.Tasks.ValueTask CreateConnectionAsync(System.Collections.Generic.IList hostnames) { } + public System.Threading.Tasks.ValueTask CreateConnectionAsync(string clientProvidedName) { } + public System.Threading.Tasks.ValueTask CreateConnectionAsync(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName) { } + public System.Threading.Tasks.ValueTask CreateConnectionAsync(System.Collections.Generic.IList endpoints, string clientProvidedName) { } + public System.Threading.Tasks.ValueTask CreateConnectionAsync(System.Collections.Generic.IList hostnames, string clientProvidedName) { } } public class ConnectionFactoryBase { @@ -566,6 +573,12 @@ namespace RabbitMQ.Client RabbitMQ.Client.IConnection CreateConnection(string clientProvidedName); RabbitMQ.Client.IConnection CreateConnection(System.Collections.Generic.IList endpoints, string clientProvidedName); RabbitMQ.Client.IConnection CreateConnection(System.Collections.Generic.IList hostnames, string clientProvidedName); + System.Threading.Tasks.ValueTask CreateConnectionAsync(); + System.Threading.Tasks.ValueTask CreateConnectionAsync(System.Collections.Generic.IList endpoints); + System.Threading.Tasks.ValueTask CreateConnectionAsync(System.Collections.Generic.IList hostnames); + System.Threading.Tasks.ValueTask CreateConnectionAsync(string clientProvidedName); + System.Threading.Tasks.ValueTask CreateConnectionAsync(System.Collections.Generic.IList endpoints, string clientProvidedName); + System.Threading.Tasks.ValueTask CreateConnectionAsync(System.Collections.Generic.IList hostnames, string clientProvidedName); } public interface ICredentialsProvider {