Skip to content

Commit

Permalink
Fix NRE when multiple transactional consumers attached to the same ad…
Browse files Browse the repository at this point in the history
…dress
  • Loading branch information
Havret committed Sep 5, 2023
1 parent 0002575 commit 5d58182
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 4 deletions.
11 changes: 8 additions & 3 deletions src/ArtemisNetClient.Testing/Listener/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ public static void AddSafeClosed(this Link link, ClosedCallback onLinkClosed)

public static void InitializeLinkEndpoint(this ListenerLink link, LinkEndpoint linkEndpoint, uint credit)
{
var type = typeof(TargetLinkEndpoint);
var methodInfo = type.GetMethod("InitializeLinkEndpoint")!;
var type = typeof(ListenerLink);
var methodInfo = type.GetMethod("InitializeLinkEndpoint", BindingFlags.Instance | BindingFlags.NonPublic)!;
methodInfo.Invoke(link, new object[] {linkEndpoint, credit});
}


public static void SetOnDispose(this ListenerLink link, Action<Amqp.Message, DeliveryState, bool, object> onDispose)
{
ReflectionUtils.SetFieldValue(link, "onDispose", onDispose);
}

public static void Dispose(this MessageContext messageContext, DeliveryState deliveryState)
{
var type = typeof(Context);
Expand Down
8 changes: 7 additions & 1 deletion src/ArtemisNetClient.Testing/TestLinkProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text.RegularExpressions;
using ActiveMQ.Artemis.Client.Testing.Listener;
using Amqp;
using Amqp.Framing;
using Amqp.Listener;
Expand Down Expand Up @@ -30,7 +31,12 @@ public void Process(AttachContext attachContext)
var messageSourceInfo = GetMessageSourceInfo(source, attachContext.Link);
var messageSource = new MessageSource();
_onMessageSource(messageSourceInfo, messageSource);
attachContext.Complete(new SourceLinkEndpoint(messageSource, attachContext.Link), 0);
attachContext.Link.InitializeLinkEndpoint(new SourceLinkEndpoint(messageSource, attachContext.Link), 0);

// override OnDispose so it won't throw NRE when message is null
attachContext.Link.SetOnDispose((_, _, _, _) => { });

attachContext.Link.CompleteAttach(attachContext.Attach, null);
}
else if (attachContext.Attach.Target is Target)
{
Expand Down
6 changes: 6 additions & 0 deletions src/ArtemisNetClient.Testing/Utils/ReflectionUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@ public static void SetPropValue<T>(object obj, string propertyName, T propertyVa
{
obj.GetType().GetProperty(propertyName)?.SetValue(obj, propertyValue, BindingFlags.Instance | BindingFlags.NonPublic, null, null, null);
}

public static void SetFieldValue<T>(object obj, string fieldName, T fieldValue)
{
var field = obj.GetType().GetField(fieldName, BindingFlags.Instance | BindingFlags.NonPublic);
field?.SetValue(obj, fieldValue, BindingFlags.Instance | BindingFlags.NonPublic, null, null);
}
}
71 changes: 71 additions & 0 deletions test/ArtemisNetClient.Testing.UnitTests/SendMessageSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,26 @@
using Xunit;
using System.Linq;
using System.Threading;
using ActiveMQ.Artemis.Client.TestUtils.Logging;
using Amqp;
using Microsoft.Extensions.Logging;
using Xunit.Abstractions;

namespace ActiveMQ.Artemis.Client.Testing.UnitTests;

public class SendMessageSpec
{

public SendMessageSpec(ITestOutputHelper output)
{
Trace.TraceLevel = TraceLevel.Information;
var logger = new XUnitLogger(output, "logger");
Trace.TraceListener += (_, format, args) =>
{
logger.LogTrace(format, args);
};
}

[Fact]
public async Task Should_send_message_to_given_address()
{
Expand Down Expand Up @@ -217,4 +232,60 @@ static async Task AssertReceivedAllMessagesWithTheSameGroupId(IConsumer consumer
Assert.Equal(count, messages.Count);
}
}

[Fact]
public async Task Should_send_messages_to_multiple_transactional_consumers_attached_to_the_same_address()
{
var endpoint = EndpointUtil.GetUniqueEndpoint();
using var testKit = new TestKit(endpoint);

var connectionFactory = new ConnectionFactory();
await using var connection = await connectionFactory.CreateAsync(endpoint);

var testAddress = "test_address";
var testQueue = "testQueue";
await using var consumer1 = await connection.CreateConsumerAsync(new ConsumerConfiguration
{
Address = testAddress,
Queue = testQueue,
Shared = true
});
await using var consumer2 = await connection.CreateConsumerAsync(new ConsumerConfiguration
{
Address = testAddress,
RoutingType = RoutingType.Multicast
});

await testKit.SendMessageAsync(testAddress, new Message("foo1"));
await testKit.SendMessageAsync(testAddress, new Message("foo2"));

// Handle messages for consumer1
{
await using var transaction = new Transaction();
var msg1 = await consumer1.ReceiveAsync();
Assert.Equal("foo1", msg1.GetBody<string>());
await consumer1.AcceptAsync(msg1, transaction);

var msg2 = await consumer1.ReceiveAsync();
Assert.Equal("foo2", msg2.GetBody<string>());
await consumer1.AcceptAsync(msg2, transaction);

await transaction.CommitAsync();
}

// Handle messages for consumer2
{
await using var transaction = new Transaction();

var msg1 = await consumer2.ReceiveAsync();
Assert.Equal("foo1", msg1.GetBody<string>());
await consumer2.AcceptAsync(msg1, transaction);

var msg2 = await consumer2.ReceiveAsync();
Assert.Equal("foo2", msg2.GetBody<string>());
await consumer2.AcceptAsync(msg2, transaction);

await transaction.CommitAsync();
}
}
}

0 comments on commit 5d58182

Please sign in to comment.