Skip to content

Commit

Permalink
Fix message selector filtering in NMS.AMQP consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Nov 4, 2023
1 parent eec066b commit 552c747
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 3 deletions.
23 changes: 20 additions & 3 deletions src/ArtemisNetClient.Testing/TestLinkProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,33 @@ private static MessageSourceInfo GetMessageSourceInfo(Source source, ILink link)
}

private static readonly Symbol _selectorFilterSymbol = new("apache.org:selector-filter:string");
private static readonly Symbol _jmsSelectorSymbol = new("jms-selector");

private static string? GetFilterExpression(Source source)
{
if (source.FilterSet is { } filterSet
&& filterSet.TryGetValue(_selectorFilterSymbol, out var filterExpressionObj)
&& filterExpressionObj is DescribedValue { Value: string filterExpression })
if (source.FilterSet is { } filterSet &&
TryGetValue(filterSet, out var filterExpressionObj) &&
filterExpressionObj is DescribedValue { Value: string filterExpression })
{
return filterExpression;
}

return null;

static bool TryGetValue(Map filterSet, out object filterExpressionObj)
{
if (filterSet.TryGetValue(_selectorFilterSymbol, out filterExpressionObj))
{
return true;
}

// ReSharper disable once ConvertIfStatementToReturnStatement
if (filterSet.TryGetValue(_jmsSelectorSymbol, out filterExpressionObj))
{
return true;
}

return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Apache.NMS.AMQP" Version="2.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.extensibility.core" Version="2.4.1" />
Expand Down
38 changes: 38 additions & 0 deletions test/ArtemisNetClient.Testing.UnitTests/FilterExpressionsSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Threading;
using System.Threading.Tasks;
using ActiveMQ.Artemis.Client.TestUtils;
using Apache.NMS;
using Apache.NMS.AMQP;
using Xunit;

namespace ActiveMQ.Artemis.Client.Testing.UnitTests;
Expand Down Expand Up @@ -220,6 +222,42 @@ public async Task Should_filter_messages_when_property_used_in_filter_is_missing

Assert.Equal("msg", message.GetBody<string>());
}

[Fact]
public async Task Should_filter_message_for_nms_consumers()
{
var endpoint = EndpointUtil.GetUniqueEndpoint();
using var testKit = new TestKit(endpoint);

var nmsConnectionFactory = new NmsConnectionFactory(endpoint.ToString());
using var connection = await nmsConnectionFactory.CreateConnectionAsync(endpoint.User, endpoint.Password);
using var session = await connection.CreateSessionAsync();
await connection.StartAsync();

var address = "test_address";

var topic = new NmsTopic(address);

using var redNmsMessageConsumer = await session.CreateConsumerAsync(topic, "color = 'red'");
using var blueNmsMessageConsumer = await session.CreateConsumerAsync(topic, "color = 'blue'");


for (int i = 0; i < 2; i++)
{
await testKit.SendMessageAsync(address, new Message("red") { ApplicationProperties = { ["color"] = "red" } });
await testKit.SendMessageAsync(address, new Message("blue") { ApplicationProperties = { ["color"] = "blue" } });
}

var redMsg1 = await redNmsMessageConsumer.ReceiveAsync();
var redMsg2 = await redNmsMessageConsumer.ReceiveAsync();
var blueMsg1 = await blueNmsMessageConsumer.ReceiveAsync();
var blueMsg2 = await blueNmsMessageConsumer.ReceiveAsync();

Assert.Equal("red", ((ITextMessage) redMsg1).Text);
Assert.Equal("red", ((ITextMessage) redMsg2).Text);
Assert.Equal("blue", ((ITextMessage) blueMsg1).Text);
Assert.Equal("blue", ((ITextMessage) blueMsg2).Text);
}

static async Task<IReadOnlyList<Message>> ReceiveMessages(IConsumer consumer, int count)
{
Expand Down

0 comments on commit 552c747

Please sign in to comment.