Skip to content

Commit

Permalink
Merge branch 'main' into otel-integration-package
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken authored May 23, 2024
2 parents 64e7dc0 + 2a48e6c commit 582bd5c
Show file tree
Hide file tree
Showing 15 changed files with 431 additions and 200 deletions.
35 changes: 22 additions & 13 deletions build.ps1
Original file line number Diff line number Diff line change
@@ -1,36 +1,45 @@
[CmdletBinding(PositionalBinding=$false)]
param(
[switch]$RunTests
[switch]$RunTests,
[switch]$RunTestsUntilFailure
)

$ErrorActionPreference = 'Stop'
Set-StrictMode -Version Latest
$PSNativeCommandUseErrorActionPreference = $true

Write-Host "Run Parameters:" -ForegroundColor Cyan
Write-Host "`tPSScriptRoot: $PSScriptRoot"
Write-Host "`tRunTests: $RunTests"
Write-Host "`tRunTestsUntilFailure: $RunTestsUntilFailure"
Write-Host "`tdotnet --version: $(dotnet --version)"

Write-Host "[INFO] building all projects (Build.csproj traversal)..." -ForegroundColor "Magenta"
dotnet build "$PSScriptRoot\Build.csproj"
Write-Host "[INFO] done building." -ForegroundColor "Green"

if ($RunTests)
if ($RunTests -or $RunTestsUntilFailure)
{
$tests_dir = Join-Path -Path $PSScriptRoot -ChildPath 'projects' | Join-Path -ChildPath 'Test'
$unit_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Unit' | Join-Path -ChildPath 'Unit.csproj')
$integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Integration' | Join-Path -ChildPath 'Integration.csproj')
$sequential_integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'SequentialIntegration' | Join-Path -ChildPath 'SequentialIntegration.csproj')

foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $sequential_integration_csproj_file)
Do
{
Write-Host "[INFO] running Unit / Integration tests from '$csproj_file' (all frameworks)" -ForegroundColor "Magenta"
dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
if ($LASTEXITCODE -ne 0)
{
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
Exit 1
}
else
foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $sequential_integration_csproj_file)
{
Write-Host "[INFO] tests passed" -ForegroundColor "Green"
Write-Host "[INFO] running Unit / Integration tests from '$csproj_file' (all frameworks)" -ForegroundColor "Magenta"
& dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
if ($LASTEXITCODE -ne 0)
{
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
Exit 1
}
else
{
Write-Host "[INFO] tests passed" -ForegroundColor "Green"
}
}
}
} While ($RunTestsUntilFailure)
}
15 changes: 12 additions & 3 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,21 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph
_innerChannel.RunRecoveryEventHandlers(this);
}

public Task CloseAsync(ushort replyCode, string replyText, bool abort,
public async Task CloseAsync(ushort replyCode, string replyText, bool abort,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
var args = new ShutdownEventArgs(ShutdownInitiator.Library, replyCode, replyText);
return CloseAsync(args, abort, cancellationToken);
try
{
await _innerChannel.CloseAsync(replyCode, replyText, abort, cancellationToken)
.ConfigureAwait(false);
}
finally
{
await _connection.DeleteRecordedChannelAsync(this,
channelsSemaphoreHeld: false, recordedEntitiesSemaphoreHeld: false)
.ConfigureAwait(false);
}
}

public async Task CloseAsync(ShutdownEventArgs args, bool abort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public ValueTask HandleBasicConsumeOkAsync(IBasicConsumer consumer, string consu
if (false == _disposed && false == _quiesce)
{
AddConsumer(consumer, consumerTag);
return _writer.WriteAsync(new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag), cancellationToken);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
Expand All @@ -78,7 +79,8 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag,
{
if (false == _disposed && false == _quiesce)
{
var work = new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
IBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
return _writer.WriteAsync(work, cancellationToken);
}
else
Expand All @@ -91,7 +93,9 @@ public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken
{
if (false == _disposed && false == _quiesce)
{
return _writer.WriteAsync(new WorkStruct(WorkType.CancelOk, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken);
IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
Expand All @@ -103,7 +107,9 @@ public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken ca
{
if (false == _disposed && false == _quiesce)
{
return _writer.WriteAsync(new WorkStruct(WorkType.Cancel, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken);
IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
Expand Down Expand Up @@ -226,7 +232,7 @@ await _worker

protected sealed override void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason)
{
_writer.TryWrite(new WorkStruct(consumer, reason));
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason));
}

protected override void InternalShutdown()
Expand Down Expand Up @@ -258,23 +264,23 @@ protected override Task InternalShutdownAsync()
public readonly ShutdownEventArgs? Reason;
public readonly WorkType WorkType;

public WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag)
private WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag)
: this()
{
WorkType = type;
Consumer = consumer;
ConsumerTag = consumerTag;
}

public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
private WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
: this()
{
WorkType = WorkType.Shutdown;
Consumer = consumer;
Reason = reason;
}

public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
private WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body)
{
WorkType = WorkType.Deliver;
Expand All @@ -289,6 +295,33 @@ public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag
Reason = default;
}

public static WorkStruct CreateCancel(IBasicConsumer consumer, string consumerTag)
{
return new WorkStruct(WorkType.Cancel, consumer, consumerTag);
}

public static WorkStruct CreateCancelOk(IBasicConsumer consumer, string consumerTag)
{
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag);
}

public static WorkStruct CreateConsumeOk(IBasicConsumer consumer, string consumerTag)
{
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag);
}

public static WorkStruct CreateShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
{
return new WorkStruct(consumer, reason);
}

public static WorkStruct CreateDeliver(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body)
{
return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered,
exchange, routingKey, basicProperties, body);
}

public void Dispose() => Body.Dispose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void Enqueue(IRpcContinuation k)
IRpcContinuation result = Interlocked.CompareExchange(ref _outstandingRpc, k, s_tmp);
if (!(result is EmptyRpcContinuation))
{
throw new NotSupportedException("Pipelining of requests forbidden");
throw new NotSupportedException($"Pipelining of requests forbidden (attempted: {k.GetType()}, enqueued: {result.GetType()})");
}
}

Expand Down
1 change: 1 addition & 0 deletions projects/Test/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: Routi
}

await consumeChannel.CloseAsync();
await consumeConnection.CloseAsync();
}

private static void PublishChannel_BasicNacks(object sender, BasicNackEventArgs e)
Expand Down
8 changes: 4 additions & 4 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ protected ConnectionFactory CreateConnectionFactory()

protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args)
{
if (args.Initiator == ShutdownInitiator.Peer)
if (args.Initiator != ShutdownInitiator.Application)
{
IConnection conn = (IConnection)sender;
_output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}");
Expand All @@ -547,7 +547,7 @@ protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args)

protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args, Action<ShutdownEventArgs> a)
{
if (args.Initiator == ShutdownInitiator.Peer)
if (args.Initiator != ShutdownInitiator.Application)
{
_output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}");
}
Expand All @@ -556,7 +556,7 @@ protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args

protected void HandleChannelShutdown(object sender, ShutdownEventArgs args)
{
if (args.Initiator == ShutdownInitiator.Peer)
if (args.Initiator != ShutdownInitiator.Application)
{
IChannel ch = (IChannel)sender;
_output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}");
Expand All @@ -565,7 +565,7 @@ protected void HandleChannelShutdown(object sender, ShutdownEventArgs args)

protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action<ShutdownEventArgs> a)
{
if (args.Initiator == ShutdownInitiator.Peer)
if (args.Initiator != ShutdownInitiator.Application)
{
_output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Channel
{
public class TestRecoveryEventHandlers : TestConnectionRecoveryBase
{
public TestRecoveryEventHandlers(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task TestRecoveryEventHandlers_Called()
{
int counter = 0;
((AutorecoveringChannel)_channel).Recovery += (source, ea) => Interlocked.Increment(ref counter);

await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);
Assert.True(counter >= 3);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Channel
{
public class TestShutdownEventHandlers : TestConnectionRecoveryBase
{
public TestShutdownEventHandlers(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task TestShutdownEventHandlersOnChannel_Called()
{
int counter = 0;
_channel.ChannelShutdown += (c, args) => Interlocked.Increment(ref counter);

Assert.True(_channel.IsOpen);
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);

Assert.True(counter >= 3);
}
}
}
Loading

0 comments on commit 582bd5c

Please sign in to comment.