Skip to content

Commit

Permalink
Use CreateConnectionAsync in AsyncIntegration
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Nov 10, 2023
1 parent 35b836d commit 9bb4995
Show file tree
Hide file tree
Showing 15 changed files with 55 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end

internal IConnection Open()
{
return InnerConnection.Open();
InnerConnection.Open();
return this;
}

internal ValueTask<IConnection> OpenAsync()
internal async ValueTask<IConnection> OpenAsync()
{
return InnerConnection.OpenAsync();
await InnerConnection.OpenAsync();
return this;
}

public event EventHandler<EventArgs> RecoverySucceeded
Expand Down
9 changes: 5 additions & 4 deletions projects/Test/AsyncIntegration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public async Task InitializeAsync()
_connFactory = CreateConnectionFactory();
_connFactory.DispatchConsumersAsync = true;
_connFactory.ConsumerDispatchConcurrency = 2;
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down Expand Up @@ -254,7 +255,7 @@ public async Task TestBasicRejectAsync()
var cf = CreateConnectionFactory();
cf.DispatchConsumersAsync = true;

using IConnection connection = cf.CreateConnection();
using IConnection connection = await cf.CreateConnectionAsync();
using IChannel channel = await connection.CreateChannelAsync();

connection.ConnectionShutdown += (o, ea) =>
Expand Down Expand Up @@ -337,7 +338,7 @@ public async Task TestBasicAckAsync()
var cf = CreateConnectionFactory();
cf.DispatchConsumersAsync = true;

using IConnection connection = cf.CreateConnection();
using IConnection connection = await cf.CreateConnectionAsync();
using IChannel channel = await connection.CreateChannelAsync();

connection.ConnectionShutdown += (o, ea) =>
Expand Down Expand Up @@ -405,7 +406,7 @@ public async Task TestBasicNackAsync()
var cf = CreateConnectionFactory();
cf.DispatchConsumersAsync = true;

using IConnection connection = cf.CreateConnection();
using IConnection connection = await cf.CreateConnectionAsync();
using IChannel channel = await connection.CreateChannelAsync();

connection.ConnectionShutdown += (o, ea) =>
Expand Down
16 changes: 9 additions & 7 deletions projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_connFactory.DispatchConsumersAsync = true;
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down Expand Up @@ -118,22 +119,23 @@ await TestExceptionHandlingWith(consumer, (ch, q, c, ct) =>
protected async Task TestExceptionHandlingWith(IBasicConsumer consumer,
Func<IChannel, string, IBasicConsumer, string, ValueTask> action)
{
var semaphore = new SemaphoreSlim(0, 1);
bool notified = false;
var waitSpan = TimeSpan.FromSeconds(2);
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var cts = new CancellationTokenSource(waitSpan);
cts.Token.Register(() => tcs.TrySetResult(false));

string q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
_channel.CallbackException += (ch, evt) =>
{
if (evt.Exception == TestException)
{
notified = true;
semaphore.Release();
tcs.SetResult(true);
}
};

string tag = await _channel.BasicConsumeAsync(q, true, string.Empty, false, false, null, consumer);
await action(_channel, q, consumer, tag);
Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(2)));
Assert.True(notified);
Assert.True(await tcs.Task);
}

private class ConsumerFailingOnDelivery : AsyncEventingBasicConsumer
Expand Down
3 changes: 2 additions & 1 deletion projects/Test/AsyncIntegration/TestBasicGetAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ protected override void SetUp()
public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down
3 changes: 2 additions & 1 deletion projects/Test/AsyncIntegration/TestBasicPublishAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ protected override void SetUp()
public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

namespace Test.AsyncIntegration
{
public class TestConcurrentAccessWithSharedConnectionAsync : IntegrationFixture
public class TestConcurrentAccessWithSharedConnectionAsync : IntegrationFixture, IAsyncLifetime
{
private const ushort _messageCount = 200;

Expand All @@ -49,14 +49,24 @@ public TestConcurrentAccessWithSharedConnectionAsync(ITestOutputHelper output) :
}

protected override void SetUp()
{
}

public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_conn.ConnectionShutdown += HandleConnectionShutdown;
// NB: not creating _channel because this test suite doesn't use it.
Assert.Null(_channel);
}

public Task DisposeAsync()
{
return Task.CompletedTask;
}

[Fact]
public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync()
{
Expand Down
3 changes: 2 additions & 1 deletion projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ protected override void SetUp()
public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down
3 changes: 2 additions & 1 deletion projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ protected override void SetUp()
public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down
3 changes: 2 additions & 1 deletion projects/Test/AsyncIntegration/TestExtensionsAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ protected override void SetUp()
public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down
6 changes: 4 additions & 2 deletions projects/Test/AsyncIntegration/TestFloodPublishingAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public async Task TestUnthrottledFloodPublishingAsync()
_connFactory = CreateConnectionFactory();
_connFactory.RequestedHeartbeat = TimeSpan.FromSeconds(60);
_connFactory.AutomaticRecoveryEnabled = false;
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsNotType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();

_conn.ConnectionShutdown += (_, ea) =>
Expand Down Expand Up @@ -111,7 +112,8 @@ public async Task TestMultithreadFloodPublishingAsync()
_connFactory.DispatchConsumersAsync = true;
_connFactory.AutomaticRecoveryEnabled = false;

_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsNotType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();

string message = "Hello from test TestMultithreadFloodPublishing";
Expand Down
3 changes: 2 additions & 1 deletion projects/Test/AsyncIntegration/TestMessageCountAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ protected override void SetUp()
public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down
3 changes: 2 additions & 1 deletion projects/Test/AsyncIntegration/TestPassiveDeclareAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ protected override void SetUp()
public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public async Task MultiThreadPublishOnSharedChannel()
var cf = CreateConnectionFactory();
cf.AutomaticRecoveryEnabled = false;

using (IConnection conn = cf.CreateConnection())
using (IConnection conn = await cf.CreateConnectionAsync())
{
Assert.IsNotType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(conn);
conn.ConnectionShutdown += HandleConnectionShutdown;

using (IChannel channel = await conn.CreateChannelAsync())
Expand Down
3 changes: 2 additions & 1 deletion projects/Test/AsyncIntegration/TestPublisherConfirmsAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ protected override void SetUp()
public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down
3 changes: 2 additions & 1 deletion projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ protected override void SetUp()
public async Task InitializeAsync()
{
_connFactory = CreateConnectionFactory();
_conn = _connFactory.CreateConnection();
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
_channel = await _conn.CreateChannelAsync();
}

Expand Down

0 comments on commit 9bb4995

Please sign in to comment.