Skip to content

Commit

Permalink
Implement Chunk Filter (#276)
Browse files Browse the repository at this point in the history
* Implement the Filter chunk feature 
* Check if the broker supports the broker filter.
* Check if the broker is >= 3.11.0 to validate the exchange version
* Add AvailableFeatures singleton to test check features


Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Jul 28, 2023
1 parent 2f1be23 commit 79f56db
Show file tree
Hide file tree
Showing 35 changed files with 1,138 additions and 27 deletions.
12 changes: 12 additions & 0 deletions .ci/install.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Write-Host "[INFO] versions: $versions"
$erlang_ver = $versions.erlang
$rabbitmq_ver = $versions.rabbitmq


$base_installers_dir = Join-Path -Path $HOME -ChildPath 'installers'
if (-Not (Test-Path $base_installers_dir))
{
Expand All @@ -36,6 +37,7 @@ Write-Host "[INFO] Installing Erlang to $erlang_install_dir..."

$rabbitmq_installer_download_url = "https://github.com/rabbitmq/rabbitmq-server/releases/download/v$rabbitmq_ver/rabbitmq-server-$rabbitmq_ver.exe"
$rabbitmq_installer_path = Join-Path -Path $base_installers_dir -ChildPath "rabbitmq-server-$rabbitmq_ver.exe"
Write-Host "[INFO] rabbitmq installer path $rabbitmq_installer_path"

$erlang_reg_path = 'HKLM:\SOFTWARE\Ericsson\Erlang'
if (Test-Path 'HKLM:\SOFTWARE\WOW6432Node\')
Expand Down Expand Up @@ -94,7 +96,17 @@ if (Test-Path 'HKLM:\SOFTWARE\WOW6432Node\')
$rabbitmq_base_path = Split-Path -Parent (Get-ItemProperty $regPath 'UninstallString').UninstallString
$rabbitmq_version = (Get-ItemProperty $regPath "DisplayVersion").DisplayVersion



Write-Host "[INFO] RabbitMQ version path: $rabbitmq_base_path and version: $rabbitmq_version"

## we have to remove the double quote added here
## https://github.com/rabbitmq/rabbitmq-packaging/commit/4476a3489f80658b31c0b58a6a04314c9d7acf72
$rabbitmq_base_path = $rabbitmq_base_path -replace '"', ''

$rabbitmq_home = Join-Path -Path $rabbitmq_base_path -ChildPath "rabbitmq_server-$rabbitmq_version"


Write-Host "[INFO] Setting RABBITMQ_HOME to '$rabbitmq_home'..."
[Environment]::SetEnvironmentVariable('RABBITMQ_HOME', $rabbitmq_home, 'Machine')
$env:RABBITMQ_HOME = $rabbitmq_home
Expand Down
2 changes: 1 addition & 1 deletion .ci/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "25.3",
"rabbitmq": "3.11.11"
"rabbitmq": "3.13.0-beta.3"
}
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq:3-management
image: rabbitmq:3.13.0-beta.2-management
env:
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbitmq_stream advertised_host localhost
ports:
Expand Down
58 changes: 58 additions & 0 deletions RabbitMQ.Stream.Client/AvailableFeatures.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;

namespace RabbitMQ.Stream.Client;

/// <summary>
/// AvailableFeatures holds the features enabled by the server and the client.
/// </summary>
internal class AvailableFeatures
{
public bool PublishFilter { get; private set; }

public bool Is311OrMore { get; private set; }

private static string ExtractVersion(string fullVersion)
{
const string Pattern = @"(\d+\.\d+\.\d+)";
var match = Regex.Match(fullVersion, Pattern);

return match.Success
? match.Groups[1].Value
: string.Empty;
}

public void SetServerVersion(string brokerVersion)
{
var v = ExtractVersion(brokerVersion);
Is311OrMore = new System.Version(v) >= new System.Version("3.11.0");
}

public void ParseCommandVersions(List<ICommandVersions> commands)
{
foreach (var command in commands)
{
switch (command.Command)
{
case Stream.Client.PublishFilter.Key:
var p = new PublishFilter();
PublishFilter = command.MinVersion <= p.MinVersion &&
command.MaxVersion >= p.MaxVersion;
break;
}
}
}
}

internal sealed class AvailableFeaturesSingleton
{
private static readonly Lazy<AvailableFeatures> s_lazy =
new(() => new AvailableFeatures());

public static AvailableFeatures Instance => s_lazy.Value;
}
24 changes: 22 additions & 2 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ public static async Task<Client> Create(ClientParameters parameters, ILogger log
.ConfigureAwait(false);

// exchange properties
await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
var peerPropertiesResponse = await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
logger?.LogDebug("Server properties: {@Properties}", parameters.Properties);
logger?.LogDebug("Server properties: {@Properties}", peerPropertiesResponse);

//auth
var saslHandshakeResponse =
Expand Down Expand Up @@ -251,6 +251,26 @@ await client.Publish(new TuneRequest(0,
logger?.LogDebug("Open: ConnectionProperties: {ConnectionProperties}", open.ConnectionProperties);
client.ConnectionProperties = open.ConnectionProperties;

if (peerPropertiesResponse.Properties.TryGetValue("version", out var version))
{
AvailableFeaturesSingleton.Instance.SetServerVersion(version);
if (AvailableFeaturesSingleton.Instance.Is311OrMore)
{
var features = await client.ExchangeVersions().ConfigureAwait(false);
AvailableFeaturesSingleton.Instance.ParseCommandVersions(features.Commands);
}
else
{
logger?.LogInformation(
"Server version is less than 3.11.0, skipping command version exchange");
}
}
else
{
logger?.LogInformation(
"Server version is less than 3.11.0, skipping command version exchange");
}

client.correlationId = 100;
// start heart beat only when the client is connected
client.StartHeartBeat();
Expand Down
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,12 @@ public AuthMechanismNotSupportedException(string s)
{
}
}

public class UnsupportedOperationException : Exception
{
public UnsupportedOperationException(string s)
: base(s)
{
}
}
}
6 changes: 3 additions & 3 deletions RabbitMQ.Stream.Client/CommandVersionsRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ namespace RabbitMQ.Stream.Client;
{
private const ushort Key = 0x001b;
private readonly uint _correlationId;
private readonly ICommandVersions[] _commands = { };
private readonly ICommandVersions[] _commands = { new PublishFilter() };
// private readonly ICommandVersions[] _commands = {};

public CommandVersionsRequest(uint correlationId)
{
Expand All @@ -23,7 +24,7 @@ public int SizeNeeded
{
var size = 2 + 2 + 4
+ 4 + // _commands.Length
_commands.Length * (2 + 2 + 2);
_commands.Length * (2 + 2 + 2);
return size;
}
}
Expand All @@ -45,4 +46,3 @@ public int Write(Span<byte> span)
return offset;
}
}

4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/Consts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ internal static class Consts
internal static readonly TimeSpan MidWait = TimeSpan.FromSeconds(3);
internal static readonly TimeSpan LongWait = TimeSpan.FromSeconds(10);
internal const ushort ConsumerInitialCredits = 2;
internal const byte Version1 = 1;
internal const byte Version2 = 2;
internal const string SubscriptionPropertyFilterPrefix = "filter.";
internal const string SubscriptionPropertyMatchUnfiltered = "match-unfiltered";

internal static int RandomShort()
{
Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public record IConsumerConfig : INamedEntity

public Func<string, Task> ConnectionClosedHandler { get; set; }

public ConsumerFilter ConsumerFilter { get; set; } = null;

// InitialCredits is the initial credits to be used for the consumer.
// if the InitialCredits is not set, the default value will be 2.
// It is the number of the chunks that the consumer will receive at beginning.
Expand Down
15 changes: 15 additions & 0 deletions RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

Expand Down Expand Up @@ -68,6 +69,15 @@ public interface IProducer
public int PendingCount { get; }
}

public record ProducerFilter
{
/// <summary>
/// FilterValue is a function that returns the filter value.
/// It is executed for each message.
/// </summary>
public Func<Message, string> FilterValue { get; set; } = null;
}

public record IProducerConfig : INamedEntity
{
public string Reference { get; set; }
Expand All @@ -83,4 +93,9 @@ public record IProducerConfig : INamedEntity
/// Default value is 100.
/// </summary>
public int MessagesBufferSize { get; set; } = 100;

/// <summary>
/// Filter enables the chunk filter feature.
/// </summary>
public ProducerFilter Filter { get; set; } = null;
}
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,6 @@ static RabbitMQ.Stream.Client.LeaderLocator.ClientLocal.get -> RabbitMQ.Stream.C
static RabbitMQ.Stream.Client.LeaderLocator.LeastLeaders.get -> RabbitMQ.Stream.Client.LeaderLocator
static RabbitMQ.Stream.Client.LeaderLocator.Random.get -> RabbitMQ.Stream.Client.LeaderLocator
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.SequenceReader<byte> reader, uint len) -> RabbitMQ.Stream.Client.Message
static RabbitMQ.Stream.Client.Publish.Version.get -> byte
static RabbitMQ.Stream.Client.RawConsumer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawConsumerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawProducerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IConsumer
Expand Down
30 changes: 29 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey)
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.ConsumerFilter.MatchUnfiltered.get -> bool
RabbitMQ.Stream.Client.ConsumerFilter.MatchUnfiltered.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.get -> System.Func<RabbitMQ.Stream.Client.Message, bool>
RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.CommandVersions
RabbitMQ.Stream.Client.CommandVersions.Command.get -> ushort
RabbitMQ.Stream.Client.CommandVersions.CommandVersions(ushort command, ushort minVersion, ushort maxVersion) -> void
Expand All @@ -31,19 +40,34 @@ RabbitMQ.Stream.Client.CommandVersionsResponse.CorrelationId.get -> uint
RabbitMQ.Stream.Client.CommandVersionsResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode
RabbitMQ.Stream.Client.CommandVersionsResponse.SizeNeeded.get -> int
RabbitMQ.Stream.Client.CommandVersionsResponse.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.ICommandVersions
RabbitMQ.Stream.Client.ICommandVersions.Command.get -> ushort
RabbitMQ.Stream.Client.ICommandVersions.MaxVersion.get -> ushort
RabbitMQ.Stream.Client.ICommandVersions.MinVersion.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.KeyRoutingStrategy
RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>> routingKeyQFunc, string superStream) -> void
RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
RabbitMQ.Stream.Client.PublishFilter
RabbitMQ.Stream.Client.PublishFilter.Command.get -> ushort
RabbitMQ.Stream.Client.PublishFilter.MaxVersion.get -> ushort
RabbitMQ.Stream.Client.PublishFilter.MinVersion.get -> ushort
RabbitMQ.Stream.Client.PublishFilter.PublishFilter() -> void
RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages, System.Func<RabbitMQ.Stream.Client.Message, string> filterValueExtractor, Microsoft.Extensions.Logging.ILogger logger) -> void
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
Expand All @@ -53,6 +77,8 @@ RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.IsOpen() -> bool
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
Expand Down Expand Up @@ -84,6 +110,8 @@ RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
RabbitMQ.Stream.Client.UnsupportedOperationException
RabbitMQ.Stream.Client.UnsupportedOperationException.UnsupportedOperationException(string s) -> void
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Publish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace RabbitMQ.Stream.Client
public readonly struct Publish : ICommand
{
private const ushort Key = 2;
public static byte Version => 1;
private static byte Version => Consts.Version1;

public int SizeNeeded
{
Expand Down
Loading

0 comments on commit 79f56db

Please sign in to comment.