diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index f7914af5..e234a3c0 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -341,6 +341,12 @@ public async Task QueryPartition(string superStream) new PartitionsQueryRequest(corr, superStream)).ConfigureAwait(false); } + public async Task QueryRoute(string superStream, string routingKey) + { + return await Request(corr => + new RouteQueryRequest(corr, superStream, routingKey)).ConfigureAwait(false); + } + private async ValueTask Request(Func request, TimeSpan? timeout = null) where TIn : struct, ICommand where TOut : struct, ICommand { @@ -526,7 +532,10 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence fram PartitionsQueryResponse.Read(frame, out var partitionsQueryResponse); HandleCorrelatedResponse(partitionsQueryResponse); break; - + case RouteQueryResponse.Key: + RouteQueryResponse.Read(frame, out var routeQueryResponse); + HandleCorrelatedResponse(routeQueryResponse); + break; default: if (MemoryMarshal.TryGetArray(frame.First, out var segment)) { diff --git a/RabbitMQ.Stream.Client/ClientExceptions.cs b/RabbitMQ.Stream.Client/ClientExceptions.cs index 7a107bc9..5310cc57 100644 --- a/RabbitMQ.Stream.Client/ClientExceptions.cs +++ b/RabbitMQ.Stream.Client/ClientExceptions.cs @@ -82,4 +82,15 @@ public OffsetNotFoundException(string s) { } } + + // RouteNotFoundException the exception for super stream publish + // RouteNotFoundException is raised when the message can't be routed to any stream. + // In this case the user will receive a timeout error and this exception is raised + public class RouteNotFoundException : ProtocolException + { + public RouteNotFoundException(string s) + : base(s) + { + } + } } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index e55eac57..1c69b4ff 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -317,7 +317,6 @@ RabbitMQ.Stream.Client.Hash.Murmur3.Murmur3(uint seed) -> void RabbitMQ.Stream.Client.Hash.Murmur3.Seed.get -> uint RabbitMQ.Stream.Client.HashRoutingMurmurStrategy RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.HashRoutingMurmurStrategy(System.Func routingKeyExtractor) -> void -RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Collections.Generic.List RabbitMQ.Stream.Client.HeartBeatHandler RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func> sendHeartbeatFunc, System.Func> close, int heartbeat, Microsoft.Extensions.Logging.ILogger logger = null) -> void RabbitMQ.Stream.Client.IClient @@ -390,7 +389,6 @@ RabbitMQ.Stream.Client.IRouting.ValidateDns.get -> bool RabbitMQ.Stream.Client.IRouting.ValidateDns.set -> void RabbitMQ.Stream.Client.IRoutingConfiguration RabbitMQ.Stream.Client.IRoutingStrategy -RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Collections.Generic.List RabbitMQ.Stream.Client.LeaderLocator RabbitMQ.Stream.Client.LeaderLocator.LeaderLocator() -> void RabbitMQ.Stream.Client.LeaderNotFoundException diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 363337e5..9278d9f7 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -1,11 +1,20 @@ +const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte +RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask +RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void +RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> +RabbitMQ.Stream.Client.KeyRoutingStrategy +RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func routingKeyExtractor, System.Func> routingKeyQFunc, string superStream) -> void +RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> +RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType +RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer @@ -16,6 +25,21 @@ RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, Rab RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void +RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType +RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void +RabbitMQ.Stream.Client.RouteNotFoundException +RabbitMQ.Stream.Client.RouteNotFoundException.RouteNotFoundException(string s) -> void +RabbitMQ.Stream.Client.RouteQueryResponse +RabbitMQ.Stream.Client.RouteQueryResponse.CorrelationId.get -> uint +RabbitMQ.Stream.Client.RouteQueryResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode +RabbitMQ.Stream.Client.RouteQueryResponse.RouteQueryResponse() -> void +RabbitMQ.Stream.Client.RouteQueryResponse.RouteQueryResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.List streams) -> void +RabbitMQ.Stream.Client.RouteQueryResponse.SizeNeeded.get -> int +RabbitMQ.Stream.Client.RouteQueryResponse.Streams.get -> System.Collections.Generic.List +RabbitMQ.Stream.Client.RouteQueryResponse.Write(System.Span span) -> int +RabbitMQ.Stream.Client.RoutingStrategyType +RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType +RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.StreamStats RabbitMQ.Stream.Client.StreamStats.CommittedChunkId() -> ulong RabbitMQ.Stream.Client.StreamStats.FirstOffset() -> ulong diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index ec411dcd..3abdc553 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -63,7 +63,13 @@ private RawSuperStreamProducer( _config = config; _streamInfos = streamInfos; _clientParameters = clientParameters; - _defaultRoutingConfiguration.RoutingStrategy = new HashRoutingMurmurStrategy(_config.Routing); + _defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch + { + RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing, + _config.Client.QueryRoute, _config.SuperStream), + RoutingStrategyType.Hash => new HashRoutingMurmurStrategy(_config.Routing), + _ => new HashRoutingMurmurStrategy(_config.Routing) + }; _logger = logger ?? NullLogger.Instance; } @@ -122,7 +128,8 @@ private RawProducerConfig FromStreamConfig(string stream) // The producer is created on demand when a message is sent to a stream private async Task InitProducer(string stream) { - var p = await RawProducer.Create(_clientParameters, FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false); + var p = await RawProducer.Create(_clientParameters, FromStreamConfig(stream), _streamInfos[stream], _logger) + .ConfigureAwait(false); _logger?.LogDebug("Producer {ProducerReference} created for Stream {StreamIdentifier}", _config.Reference, stream); return p; @@ -147,8 +154,16 @@ private async Task GetProducerForMessage(Message message) throw new ObjectDisposedException(nameof(RawSuperStreamProducer)); } - var routes = _defaultRoutingConfiguration.RoutingStrategy.Route(message, - _streamInfos.Keys.ToList()); + var routes = await _defaultRoutingConfiguration.RoutingStrategy.Route(message, + _streamInfos.Keys.ToList()).ConfigureAwait(false); + + // we should always have a route + // but in case of stream KEY the routing could not exist + if (routes is not { Count: > 0 }) + { + throw new RouteNotFoundException("No route found for the message to any stream"); + } + return await GetProducer(routes[0]).ConfigureAwait(false); } @@ -263,6 +278,12 @@ public void Dispose() public int PendingCount => _producers.Sum(x => x.Value.PendingCount); } +public enum RoutingStrategyType +{ + Hash, + Key, +} + public record RawSuperStreamProducerConfig : IProducerConfig { public RawSuperStreamProducerConfig(string superStream) @@ -285,6 +306,8 @@ public RawSuperStreamProducerConfig(string superStream) public Func Routing { get; set; } = null; public Action<(string, Confirmation)> ConfirmHandler { get; set; } = _ => { }; + public RoutingStrategyType RoutingStrategyType { get; set; } = RoutingStrategyType.Hash; + internal Client Client { get; set; } } @@ -306,7 +329,7 @@ internal class DefaultRoutingConfiguration : IRoutingConfiguration /// public interface IRoutingStrategy { - List Route(Message message, List partitions); + Task> Route(Message message, List partitions); } /// @@ -322,12 +345,13 @@ public class HashRoutingMurmurStrategy : IRoutingStrategy private const int Seed = 104729; // must be the same to all the clients to be compatible // Routing based on the Murmur hash function - public List Route(Message message, List partitions) + public Task> Route(Message message, List partitions) { var key = _routingKeyExtractor(message); var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key)); var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count; - return new List() { partitions[(int)index] }; + var r = new List() { partitions[(int)index] }; + return Task.FromResult(r); } public HashRoutingMurmurStrategy(Func routingKeyExtractor) @@ -335,3 +359,39 @@ public HashRoutingMurmurStrategy(Func routingKeyExtractor) _routingKeyExtractor = routingKeyExtractor; } } + +/// +/// KeyRoutingStrategy is a routing strategy that uses the routing key to route messages to streams. +/// +public class KeyRoutingStrategy : IRoutingStrategy +{ + private readonly Func _routingKeyExtractor; + private readonly Func> _routingKeyQFunc; + private readonly string _superStream; + private readonly Dictionary> _cacheStream = new(); + + public async Task> Route(Message message, List partitions) + { + var key = _routingKeyExtractor(message); + // If the stream is already in the cache we return it + // to avoid a query to the server for each send + if (_cacheStream.TryGetValue(key, out var value)) + { + return value; + } + + var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false); + _cacheStream[key] = c.Streams; + return (from resultStream in c.Streams + where partitions.Contains(resultStream) + select new List() { resultStream }).FirstOrDefault(); + } + + public KeyRoutingStrategy(Func routingKeyExtractor, + Func> routingKeyQFunc, string superStream) + { + _routingKeyExtractor = routingKeyExtractor; + _routingKeyQFunc = routingKeyQFunc; + _superStream = superStream; + } +} diff --git a/RabbitMQ.Stream.Client/Reliable/Producer.cs b/RabbitMQ.Stream.Client/Reliable/Producer.cs index ac295d13..91592d5e 100644 --- a/RabbitMQ.Stream.Client/Reliable/Producer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Producer.cs @@ -16,6 +16,8 @@ public record SuperStreamConfig { public bool Enabled { get; init; } = true; public Func Routing { get; set; } + + public RoutingStrategyType RoutingStrategyType { get; set; } = RoutingStrategyType.Hash; } [AttributeUsage(AttributeTargets.Method)] @@ -257,6 +259,12 @@ internal async ValueTask SendInternal(ulong publishingId, Message message) } } + // see the RouteNotFoundException comment + catch (RouteNotFoundException) + { + throw; + } + catch (Exception e) { _logger?.LogError(e, "Error sending message. " + @@ -291,6 +299,12 @@ public async ValueTask Send(List messages, CompressionType compressionT } } + // see the RouteNotFoundException comment + catch (RouteNotFoundException) + { + throw; + } + catch (Exception e) { _logger?.LogError(e, "Error sending sub-batch messages. " + @@ -346,6 +360,12 @@ public async ValueTask Send(List messages) } } + // see the RouteNotFoundException comment + catch (RouteNotFoundException) + { + throw; + } + catch (Exception e) { _logger?.LogError(e, "Error sending messages. " + diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index aaeae9c0..d97e0c0a 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -39,6 +39,7 @@ private async Task SuperStreamProducer() MessagesBufferSize = _producerConfig.MessagesBufferSize, MaxInFlight = _producerConfig.MaxInFlight, Routing = _producerConfig.SuperStreamConfig.Routing, + RoutingStrategyType = _producerConfig.SuperStreamConfig.RoutingStrategyType, ConfirmHandler = confirmationHandler => { var (stream, confirmation) = confirmationHandler; @@ -77,7 +78,10 @@ private async Task StandardProducer() _producerConfig.StreamSystem).WaitAsync(CancellationToken.None); }); }, - ConnectionClosedHandler = async _ => { await TryToReconnect(_producerConfig.ReconnectStrategy).ConfigureAwait(false); }, + ConnectionClosedHandler = async _ => + { + await TryToReconnect(_producerConfig.ReconnectStrategy).ConfigureAwait(false); + }, ConfirmHandler = confirmation => { var confirmationStatus = confirmation.Code switch diff --git a/RabbitMQ.Stream.Client/RouteQueryRequest.cs b/RabbitMQ.Stream.Client/RouteQueryRequest.cs new file mode 100644 index 00000000..455bbaa2 --- /dev/null +++ b/RabbitMQ.Stream.Client/RouteQueryRequest.cs @@ -0,0 +1,37 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2023 VMware, Inc. + +using System; + +namespace RabbitMQ.Stream.Client; + +internal readonly struct RouteQueryRequest : ICommand +{ + public const ushort Key = 0x0018; + private readonly string _superStream; + private readonly string _routingKey; + private readonly uint _corrId; + + public RouteQueryRequest(uint corrId, string superStream, string routingKey) + { + _corrId = corrId; + _superStream = superStream; + _routingKey = routingKey; + } + + public int SizeNeeded => 2 + 2 + 4 + + WireFormatting.StringSize(_superStream) + + WireFormatting.StringSize(_routingKey); + + public int Write(Span span) + { + var command = (ICommand)this; + var offset = WireFormatting.WriteUInt16(span, Key); + offset += WireFormatting.WriteUInt16(span[offset..], command.Version); + offset += WireFormatting.WriteUInt32(span[offset..], _corrId); + offset += WireFormatting.WriteString(span[offset..], _routingKey); + offset += WireFormatting.WriteString(span[offset..], _superStream); + return offset; + } +} diff --git a/RabbitMQ.Stream.Client/RouteQueryResponse.cs b/RabbitMQ.Stream.Client/RouteQueryResponse.cs new file mode 100644 index 00000000..5b122f70 --- /dev/null +++ b/RabbitMQ.Stream.Client/RouteQueryResponse.cs @@ -0,0 +1,47 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2023 VMware, Inc. + +using System; +using System.Buffers; +using System.Collections.Generic; + +namespace RabbitMQ.Stream.Client; + +public struct RouteQueryResponse : ICommand +{ + public const ushort Key = 0x0018; + + public RouteQueryResponse(uint correlationId, ResponseCode responseCode, List streams) + { + Streams = streams; + ResponseCode = responseCode; + CorrelationId = correlationId; + } + + public List Streams { get; } + public int SizeNeeded => throw new NotImplementedException(); + public int Write(Span span) => throw new NotImplementedException(); + + public uint CorrelationId { get; } + public ResponseCode ResponseCode { get; } + + internal static int Read(ReadOnlySequence frame, out RouteQueryResponse command) + { + var offset = WireFormatting.ReadUInt16(frame, out _); + offset += WireFormatting.ReadUInt16(frame.Slice(offset), out _); + offset += WireFormatting.ReadUInt32(frame.Slice(offset), out var correlationId); + offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var responseCode); + offset += WireFormatting.ReadUInt32(frame.Slice(offset), out var numOfStreams); + + var streams = new List(); + for (var i = 0; i < numOfStreams; i++) + { + offset += WireFormatting.ReadString(frame.Slice(offset), out var stream); + streams.Add(stream); + } + + command = new RouteQueryResponse(correlationId, (ResponseCode)responseCode, streams); + return offset; + } +} diff --git a/Tests/SuperStreamProducerTests.cs b/Tests/SuperStreamProducerTests.cs index 8676fb63..9ea85dc8 100644 --- a/Tests/SuperStreamProducerTests.cs +++ b/Tests/SuperStreamProducerTests.cs @@ -86,7 +86,7 @@ IEnumerator IEnumerable.GetEnumerator() [Theory] [ClassData(typeof(MessageIdToStreamTestCases))] - public void ValidateHashRoutingStrategy(MessageIdToStream @msg) + public async void ValidateHashRoutingStrategy(MessageIdToStream @msg) { // this test validates that the hash routing strategy is working as expected var murmurStrategy = new HashRoutingMurmurStrategy(message => message.Properties.MessageId.ToString()); @@ -95,12 +95,41 @@ public void ValidateHashRoutingStrategy(MessageIdToStream @msg) Properties = new Properties() { MessageId = msg.MessageId } }; var routes = - murmurStrategy.Route(messageTest, new List() { "invoices-01", "invoices-02", "invoices-03" }); + await murmurStrategy.Route(messageTest, new List() { "invoices-01", "invoices-02", "invoices-03" }); Assert.Single(routes); Assert.Equal(msg.StreamExpected, routes[0]); } + // ValidateKeyRouting is a test that validates that the key routing strategy is working as expected + // The key routing strategy is used when the routing key is known before hand + [Fact] + public async void ValidateKeyRouting() + { + var messageTest = new Message(Encoding.Default.GetBytes("hello")) + { + Properties = new Properties() { MessageId = "italy" } + }; + // this test validates that the key routing strategy is working as expected + var keyRoutingStrategy = new KeyRoutingStrategy(message => message.Properties.MessageId.ToString(), + (superStream, key) => + { + if (key == "italy") + { + return Task.FromResult(new RouteQueryResponse(1, ResponseCode.Ok, new List() { "italy" })); + } + + var response = new RouteQueryResponse(1, ResponseCode.Ok, new List() { "" }); + return Task.FromResult(response); + }, "orders"); + + var routes = + await keyRoutingStrategy.Route(messageTest, + new List() { "italy", "france", "spain" }); + + Assert.Equal("italy", routes[0]); + } + [Fact] public async void SendMessageToSuperStream() { @@ -144,6 +173,172 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste await system.Close(); } + // SendMessageToSuperStreamWithKeyStrategy is a test that validates that the key routing strategy is working as expected + + [Fact] + public async void SendMessageToSuperStreamWithKeyStrategy() + { + SystemUtils.ResetSuperStreams(); + // Simple send message to super stream + + var system = await StreamSystem.Create(new StreamSystemConfig()); + var streamProducer = + await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(SystemUtils.InvoicesExchange) + { + Routing = message1 => message1.Properties.MessageId.ToString(), + Reference = "reference", + RoutingStrategyType = RoutingStrategyType.Key // this is the key routing strategy + }); + Assert.True(streamProducer.MessagesSent == 0); + Assert.True(streamProducer.ConfirmFrames == 0); + Assert.True(streamProducer.PublishCommandsSent == 0); + Assert.True(streamProducer.PendingCount == 0); + var messages = new List(); + for (ulong i = 0; i < 20; i++) + { + // We should not have any errors and according to the routing strategy + // based on the key the message should be routed to the correct stream + // the routing keys are: + // 0,1,2 + var idx = i % 3; + var message = new Message(Encoding.Default.GetBytes("hello")) + { + Properties = new Properties() { MessageId = $"{idx}" } + }; + messages.Add(message); + } + + ulong id = 0; + foreach (var message in messages) + { + await streamProducer.Send(++id, message); + } + + SystemUtils.Wait(); + // Total messages must be 20 + // according to the routing strategy hello{i} that must be the correct routing + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 7); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 6); + + Assert.True(streamProducer.MessagesSent == 20); + SystemUtils.WaitUntil(() => streamProducer.ConfirmFrames > 0); + SystemUtils.WaitUntil(() => streamProducer.PublishCommandsSent > 0); + + var messageNotRouted = new Message(Encoding.Default.GetBytes("hello")) + { + Properties = new Properties() { MessageId = "this_is_key_does_not_exist" } + }; + await Assert.ThrowsAsync(async () => await streamProducer.Send(21, messageNotRouted)); + + var messagesWithId = messages.Select(message => (++id, message)).ToList(); + + await streamProducer.Send(messagesWithId); + + // Total messages must be 20 * 2 + // according to the routing strategy hello{i} that must be the correct routing + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 7 * 2); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7 * 2); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 6 * 2); + + Assert.True(streamProducer.MessagesSent == 20 * 2); + + await Assert.ThrowsAsync(async () => await streamProducer.Send( + new List<(ulong, Message)>() { (55, messageNotRouted) })); + + await streamProducer.Send(++id, messages, CompressionType.Gzip); + + // Total messages must be 20 * 3 + // according to the routing strategy hello{i} that must be the correct routing + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 7 * 3); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7 * 3); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 6 * 3); + + await Assert.ThrowsAsync(async () => + await streamProducer.Send(++id, + new List() { messageNotRouted }, CompressionType.Gzip)); + + Assert.True(await streamProducer.Close() == ResponseCode.Ok); + await system.Close(); + } + + [Fact] + public async void SendMessageWithProducerToSuperStreamWithKeyStrategy() + { + SystemUtils.ResetSuperStreams(); + // Simple send message to super stream + + var system = await StreamSystem.Create(new StreamSystemConfig()); + var producer = + await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange) + { + SuperStreamConfig = new SuperStreamConfig() + { + Routing = message1 => message1.Properties.MessageId.ToString(), + RoutingStrategyType = RoutingStrategyType.Key // this is the key routing strategy + } + }); + var messages = new List(); + for (ulong i = 0; i < 20; i++) + { + // We should not have any errors and according to the routing strategy + // based on the key the message should be routed to the correct stream + // the routing keys are: + // 0,1,2 + var idx = i % 3; + var message = new Message(Encoding.Default.GetBytes("hello")) + { + Properties = new Properties() { MessageId = $"{idx}" } + }; + messages.Add(message); + } + + foreach (var message in messages) + { + await producer.Send(message); + } + + SystemUtils.Wait(); + // Total messages must be 20 + // according to the routing strategy hello{i} that must be the correct routing + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 7); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 6); + + var messageNotRouted = new Message(Encoding.Default.GetBytes("hello")) + { + Properties = new Properties() { MessageId = "this_is_key_does_not_exist" } + }; + await Assert.ThrowsAsync(async () => await producer.Send(messageNotRouted)); + + var messagesWithId = messages.Select(message => (message)).ToList(); + + await producer.Send(messagesWithId); + + // Total messages must be 20 * 2 + // according to the routing strategy hello{i} that must be the correct routing + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 7 * 2); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7 * 2); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 6 * 2); + + await Assert.ThrowsAsync(async () => await producer.Send( + new List() { messageNotRouted })); + + await producer.Send(messages, CompressionType.Gzip); + + // Total messages must be 20 * 3 + // according to the routing strategy hello{i} that must be the correct routing + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 7 * 3); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7 * 3); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 6 * 3); + + await Assert.ThrowsAsync(async () => + await producer.Send( + new List() { messageNotRouted }, CompressionType.Gzip)); + + await system.Close(); + } + [Fact] public async void SendBachToSuperStream() { @@ -213,7 +408,7 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste // We _must_ have the same number of messages per queue as in the SendMessageToSuperStream test SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 9); SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7); - SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount("invoices-2") == 4); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 4); Assert.Equal((ulong)1, await streamProducer.GetLastPublishingId()); Assert.True(await streamProducer.Close() == ResponseCode.Ok); await system.Close(); diff --git a/docs/SuperStream/SuperStreamProducerKey.cs b/docs/SuperStream/SuperStreamProducerKey.cs new file mode 100644 index 00000000..9714a5b5 --- /dev/null +++ b/docs/SuperStream/SuperStreamProducerKey.cs @@ -0,0 +1,66 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +using System.Text; +using Microsoft.Extensions.Logging; +using RabbitMQ.Stream.Client; +using RabbitMQ.Stream.Client.AMQP; +using RabbitMQ.Stream.Client.Reliable; + +namespace SuperStream; + +public class SuperStreamProducerKey +{ + public static async Task Start() + { + var loggerFactory = LoggerFactory.Create(builder => + { + builder.AddSimpleConsole(); + builder.AddFilter("RabbitMQ.Stream", LogLevel.Information); + }); + + var logger = loggerFactory.CreateLogger(); + var loggerMain = loggerFactory.CreateLogger(); + + loggerMain.LogInformation("Starting SuperStream Producer"); + var config = new StreamSystemConfig(); + var system = await StreamSystem.Create(config).ConfigureAwait(false); + loggerMain.LogInformation("Super Stream Producer connected to RabbitMQ"); + + + // We define a Producer with the SuperStream name (that is the Exchange name) + // tag::super-stream-producer-key[] + var producer = await Producer.Create( + new ProducerConfig(system, + // Costants.StreamName is the Exchange name + // invoices + Costants.StreamName) + { + SuperStreamConfig = new SuperStreamConfig() + { + // The super stream is enable and we define the routing hashing algorithm + Routing = msg => msg.Properties.MessageId.ToString(), // <1> + RoutingStrategyType = RoutingStrategyType.Key // <2> + } + }, logger).ConfigureAwait(false); + const int NumberOfMessages = 1_000_000; + + var keys = new [] {"italy", "france", "germany"}; + + for (var i = 0; i < NumberOfMessages; i++) + { + var key = keys[i % 3]; + + var message = new Message(Encoding.Default.GetBytes($"hello{i}")) + { + Properties = new Properties() {MessageId = $"{key}"} + }; + await producer.Send(message).ConfigureAwait(false); + // end::super-stream-producer-key[] + loggerMain.LogInformation("Super Stream Producer sent {I} messages to {StreamName}", i, + Costants.StreamName); + Thread.Sleep(TimeSpan.FromMilliseconds(1000)); + } + } +} diff --git a/docs/asciidoc/super-streams.adoc b/docs/asciidoc/super-streams.adoc index 6c3feeb5..a8e9a346 100644 --- a/docs/asciidoc/super-streams.adoc +++ b/docs/asciidoc/super-streams.adoc @@ -92,6 +92,54 @@ The client will hash the routing key to determine the stream to send the message The client uses 32-bit https://en.wikipedia.org/wiki/MurmurHash[MurmurHash3] by default to hash the routing key. This hash function provides good uniformity and it is compatible with the other clients. +====== Resolving Routes with Bindings + +Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams. +The stream .NET client provides another way to resolve streams, based on the routing key _and_ the bindings between the super stream exchange and the streams. + +This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below: + +.A super stream with a partition for a region in a world +[ditaa] +.... + amer +---------------+ + +------>+ invoices–amer | + | +---------------+ ++----------+ | +| invoices | | emea +---------------+ +| +---+------>+ invoices–emea | +| exchange | | +---------------+ ++----------+ | + | apac +---------------+ + +------>+ invoices–apac | + +---------------+ +.... + +To create this topology: + +[source] +---- +rabbitmq-streams add_super_stream invoices --routing-keys apac,emea,amer +---- + +In such a case, the routing key will be a property of the message that represents the region: + +.Enabling the "key" routing strategy +[source,c#,indent=0] +-------- +include::{test-examples}/SuperStreamProducerKey.cs[tag=super-stream-producer-key] +-------- +<1> Extract the routing key +<2> Enable the "key" routing strategy + +Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams. + +If there is no binding for a routing key, the client will raise an exception `RouteNotFoundException`. + +`RouteNotFoundException` the message is not routed to any stream. + +[[super-stream-deduplication]] + ====== Deduplication