diff --git a/README.md b/README.md index 22462299..462d1e58 100644 --- a/README.md +++ b/README.md @@ -30,12 +30,10 @@ Please refer to the [documentation](https://rabbitmq.github.io/rabbitmq-stream-d The library requires .NET 6 or .NET 7. ### Documentation - - - [HTML documentation](https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html) - [PDF documentation](https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/dotnet-stream-client.pdf) - - [A Simple Getting started](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/blob/main/docs/Documentation/) +- [Best practices to write a reliable client](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient/) - [Super Stream example](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/blob/main/docs/SuperStream) - [Stream Performance Test](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/RabbitMQ.Stream.Client.PerfTest) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 6f735cbd..93bf5861 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -864,6 +864,19 @@ public async ValueTask QueryMetadata(string[] streams) .ConfigureAwait(false); } + public async Task SuperStreamExists(string stream) + { + var response = await QueryPartition(stream).ConfigureAwait(false); + if (response is { Streams.Length: >= 0 } && + response.ResponseCode == ResponseCode.StreamNotAvailable) + { + ClientExceptions.MaybeThrowException(ResponseCode.StreamNotAvailable, stream); + } + + return response is { Streams.Length: >= 0 } && + response.ResponseCode == ResponseCode.Ok; + } + public async Task StreamExists(string stream) { var streams = new[] { stream }; diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index af48c665..3a7286f3 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -47,6 +47,7 @@ RabbitMQ.Stream.Client.Client.Publishers.get -> System.Collections.Generic.IDict RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler = null, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)> +RabbitMQ.Stream.Client.Client.SuperStreamExists(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism @@ -182,6 +183,7 @@ RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTime d RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTimeOffset dateTimeOffset) -> void RabbitMQ.Stream.Client.PartitionsSuperStreamSpec RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.Partitions.get -> int +RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name) -> void RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name, int partitions) -> void RabbitMQ.Stream.Client.ProducerFilter RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func @@ -309,6 +311,7 @@ RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Cl RabbitMQ.Stream.Client.StreamSystem.DeleteSuperStream(string superStream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.StreamSystem.SuperStreamExists(string superStream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void diff --git a/RabbitMQ.Stream.Client/StreamSpec.cs b/RabbitMQ.Stream.Client/StreamSpec.cs index c45c3935..26b2f72e 100644 --- a/RabbitMQ.Stream.Client/StreamSpec.cs +++ b/RabbitMQ.Stream.Client/StreamSpec.cs @@ -38,6 +38,10 @@ public int MaxSegmentSizeBytes public IDictionary Args => args; } + /// + /// Abstract class for SuperStreamSpec + /// + /// Super Stream Name public abstract record SuperStreamSpec(string Name) { internal virtual void Validate() @@ -79,9 +83,22 @@ public int MaxSegmentSizeBytes public IDictionary Args => args; } + /// + /// Create a super stream based on the number of partitions. + /// So there will be N partitions and N binding keys. + /// The stream names is the super stream name with a partition number appended. + /// The routing key is the partition number. + /// Producer should use HASH strategy to route the message to the correct partition. + /// Partitions should be at least 1. + /// public record PartitionsSuperStreamSpec : SuperStreamSpec { + public PartitionsSuperStreamSpec(string Name) : base(Name) + { + Partitions = 3; + } + public PartitionsSuperStreamSpec(string Name, int partitions) : base(Name) { Partitions = partitions; @@ -121,6 +138,13 @@ internal override List GetBindingKeys() } + /// + /// Create a super stream based on the number of binding keys. + /// So there will be N partitions and N binding keys. + /// The stream names is the super stream name with a binding key appended. + /// Producer should use KEY strategy to route the message to the correct partition. + /// The binding keys should be unique duplicates are not allowed. + /// public record BindingsSuperStreamSpec : SuperStreamSpec { public BindingsSuperStreamSpec(string Name, string[] bindingKeys) : base(Name) diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 7c0a7daf..b549f100 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -395,6 +395,20 @@ public async Task StreamExists(string stream) } } + public async Task SuperStreamExists(string superStream) + { + await MayBeReconnectLocator().ConfigureAwait(false); + await _semClientProvidedName.WaitAsync().ConfigureAwait(false); + try + { + return await _client.SuperStreamExists(superStream).ConfigureAwait(false); + } + finally + { + _semClientProvidedName.Release(); + } + } + private static void MaybeThrowQueryException(string reference, string stream) { if (string.IsNullOrWhiteSpace(reference) || string.IsNullOrWhiteSpace(stream)) diff --git a/Tests/SystemTests.cs b/Tests/SystemTests.cs index 7780411c..4cac92b9 100644 --- a/Tests/SystemTests.cs +++ b/Tests/SystemTests.cs @@ -375,7 +375,9 @@ public async void CreateDeleteSuperStream() Assert.Equal("1000", spec.Args["stream-max-segment-size-bytes"]); Assert.Equal("20000", spec.Args["max-length-bytes"]); await system.CreateSuperStream(spec); + Assert.True(await system.SuperStreamExists(SuperStream)); await system.DeleteSuperStream(SuperStream); + Assert.False(await system.SuperStreamExists(SuperStream)); await system.Close(); } diff --git a/Tests/Utils.cs b/Tests/Utils.cs index a3cd38db..8c52f563 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -440,7 +440,7 @@ public static async Task ResetSuperStreams() } Wait(); - var spec = new PartitionsSuperStreamSpec(InvoicesExchange, 3); + var spec = new PartitionsSuperStreamSpec(InvoicesExchange); await system.CreateSuperStream(spec); await system.Close(); } diff --git a/docs/ReliableClient/RClient.cs b/docs/ReliableClient/BestPracticesClient.cs similarity index 74% rename from docs/ReliableClient/RClient.cs rename to docs/ReliableClient/BestPracticesClient.cs index 99bf7448..af91fd51 100644 --- a/docs/ReliableClient/RClient.cs +++ b/docs/ReliableClient/BestPracticesClient.cs @@ -14,14 +14,16 @@ namespace ReliableClient; -public class RClient +public class BestPracticesClient { public record Config { - public string Host { get; set; } = "localhost"; + public string? Host { get; set; } = "localhost"; public int Port { get; set; } = 5552; - public string Username { get; set; } = "guest"; - public string Password { get; set; } = "guest"; + public string? Username { get; set; } = "guest"; + public string? Password { get; set; } = "guest"; + + public string? StreamName { get; set; } = "DotNetClientTest"; public bool LoadBalancer { get; set; } = false; public bool SuperStream { get; set; } = false; public int Streams { get; set; } = 1; @@ -30,6 +32,10 @@ public record Config public int MessagesPerProducer { get; set; } = 5_000_000; public int Consumers { get; set; } = 9; public byte ConsumersPerConnection { get; set; } = 8; + + public int DelayDuringSendMs { get; set; } = 0; + + } public static async Task Start(Config config) @@ -61,12 +67,18 @@ public static async Task Start(Config config) switch (Uri.CheckHostName(config.Host)) { case UriHostNameType.IPv4: - ep = new IPEndPoint(IPAddress.Parse(config.Host), config.Port); + if (config.Host != null) ep = new IPEndPoint(IPAddress.Parse(config.Host), config.Port); break; case UriHostNameType.Dns: - var addresses = await Dns.GetHostAddressesAsync(config.Host).ConfigureAwait(false); - ep = new IPEndPoint(addresses[0], config.Port); + if (config.Host != null) + { + var addresses = await Dns.GetHostAddressesAsync(config.Host).ConfigureAwait(false); + ep = new IPEndPoint(addresses[0], config.Port); + } + break; + default: + throw new ArgumentOutOfRangeException(); } } @@ -105,13 +117,13 @@ public static async Task Start(Config config) var streamsList = new List(); if (config.SuperStream) { - streamsList.Add("invoices"); + if (config.StreamName != null) streamsList.Add(config.StreamName); } else { for (var i = 0; i < config.Streams; i++) { - streamsList.Add($"invoices-{i}"); + streamsList.Add($"{config.StreamName}-{i}"); } } @@ -141,6 +153,18 @@ public static async Task Start(Config config) List consumersList = new(); List producersList = new(); var obj = new object(); + if (config.SuperStream) + { + if (await system.SuperStreamExists(streamsList[0]).ConfigureAwait(false)) + { + await system.DeleteSuperStream(streamsList[0]).ConfigureAwait(false); + } + + + await system.CreateSuperStream(new PartitionsSuperStreamSpec(streamsList[0], config.Streams)).ConfigureAwait(false); + } + + foreach (var stream in streamsList) { if (!config.SuperStream) @@ -153,7 +177,7 @@ public static async Task Start(Config config) await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_000,}) .ConfigureAwait(false); await Task.Delay(TimeSpan.FromSeconds(3)).ConfigureAwait(false); - } + } for (var z = 0; z < config.Consumers; z++) { @@ -162,15 +186,24 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00 OffsetSpec = new OffsetTypeLast(), IsSuperStream = config.SuperStream, IsSingleActiveConsumer = config.SuperStream, - Reference = "myApp", - Identifier = $"my_c_{z}", + Reference = "myApp",// needed for the Single Active Consumer or fot the store offset + // can help to identify the consumer on the logs and RabbitMQ Management + Identifier = $"my_consumer_{z}", InitialCredits = 10, - MessageHandler = (source, ctx, _, _) => + MessageHandler = async (source, consumer, ctx, _) => { + if (totalConsumed % 10_000 == 0) + { + // don't store the offset every time, it could be a performance issue + // store the offset every 1_000/5_000/10_000 messages + await consumer.StoreOffset(ctx.Offset).ConfigureAwait(false); + } Interlocked.Increment(ref totalConsumed); - return Task.CompletedTask; }, }; + + // This is the callback that will be called when the consumer status changes + // DON'T PUT ANY BLOCKING CODE HERE conf.StatusChanged += (status) => { var streamInfo = status.Partition is not null @@ -190,27 +223,33 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis await producer.Send(message).ConfigureAwait(false); } + // this example is meant to show how to use the producer and consumer + // Create too many tasks for the producers and consumers is not a good idea for (var z = 0; z < config.Producers; z++) { var z1 = z; _ = Task.Run(async () => { + // the list of unconfirmed messages in case of error or disconnection + // This example is only for the example, in a real scenario you should handle the unconfirmed messages + // since the list could grow event the publishEvent should avoid it. var unconfirmedMessages = new ConcurrentBag(); + // the event to wait for the producer to be ready to send + // in case of disconnection the event will be reset var publishEvent = new ManualResetEvent(false); - var producerConfig = new ProducerConfig(system, stream) { - Identifier = $"my_super_{z1}", + Identifier = $"my_producer_{z1}", SuperStreamConfig = new SuperStreamConfig() { Enabled = config.SuperStream, Routing = msg => msg.Properties.MessageId.ToString(), }, ConfirmationHandler = confirmation => { + // Add the unconfirmed messages to the list in case of error if (confirmation.Status != ConfirmationStatus.Confirmed) { confirmation.Messages.ForEach(m => { unconfirmedMessages.Add(m); }); - Interlocked.Add(ref totalError, confirmation.Messages.Count); return Task.CompletedTask; } @@ -219,15 +258,21 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis return Task.CompletedTask; }, }; + + // Like the consumer don't put any blocking code here producerConfig.StatusChanged += (status) => { var streamInfo = status.Partition is not null ? $" Partition {status.Partition} of super stream: {status.Stream}" : $"Stream: {status.Stream}"; + // just log the status change lp.LogInformation("Consumer: {Id} - status changed from: {From} to: {To} reason: {Reason} {Info}", status.Identifier, status.From, status.To,status.Reason, streamInfo); + // in case of disconnection the event will be reset + // in case of reconnection the event will be set so the producer can send messages + // It is important to use the ManualReset to avoid to send messages before the producer is ready if (status.To == ReliableEntityStatus.Open) { publishEvent.Set(); @@ -247,6 +292,7 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis { if (!unconfirmedMessages.IsEmpty) { + // checks if there are unconfirmed messages and send them var msgs = unconfirmedMessages.ToArray(); unconfirmedMessages.Clear(); foreach (var msg in msgs) @@ -261,7 +307,8 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis Properties = new Properties() {MessageId = $"hello{i}"} }; await MaybeSend(producer, message, publishEvent).ConfigureAwait(false); - // await Task.Delay(1).ConfigureAwait(false); + // You don't need this it is only for the example + await Task.Delay(config.DelayDuringSendMs).ConfigureAwait(false); Interlocked.Increment(ref totalSent); } }); diff --git a/docs/ReliableClient/Program.cs b/docs/ReliableClient/Program.cs index 77924ad6..e8bb40c1 100644 --- a/docs/ReliableClient/Program.cs +++ b/docs/ReliableClient/Program.cs @@ -1,24 +1,53 @@ // See https://aka.ms/new-console-template for more information +using Microsoft.Extensions.Configuration; using ReliableClient; Console.WriteLine("Starting RabbitMQ Streaming Client"); +const string FileName = "appsettings.json"; +var fs = File.OpenRead(FileName); -var rClient = RClient.Start(new RClient.Config() + +var section = new ConfigurationBuilder() + .AddJsonFile(FileName) + .Build() + .GetSection("RabbitMQ"); +fs.Dispose(); + + +var rClient = BestPracticesClient.Start(new BestPracticesClient.Config() { - ProducersPerConnection = (byte)(Random.Shared.Next(1, 50)), - ConsumersPerConnection = (byte)(Random.Shared.Next(1, 50)), - Host = "node0", - Port = 5562, - LoadBalancer = false, - SuperStream = true, - Streams = 3, - Producers = 2, - MessagesPerProducer = 10_000_000, - Consumers = 5, - // Username = "test", - // Password = "test" + // set the ProducersPerConnection. This is the number of producers that will be created for each connection. + // a low value can improve the throughput of the producer since the connection is shared between the producers. + // a high value can reduce the resource usage of the producer since the connection is shared between the producers. + ProducersPerConnection = section.GetSection("ProducersPerConnection").Get(), + // Same rules as ProducersPerConnection but for the consumers. + // Note that if a consumer is slow can impact the other consumers on the same connection. + // There is a small internal buffer that can help to mitigate this issue. + // but if the consumer is too slow, the buffer will be full and the other consumers will be impacted. + ConsumersPerConnection = section.GetSection("ConsumersPerConnection").Get(), + Host = section.GetSection("Host").Get(), + Port = section.GetSection("Port").Get(), + + LoadBalancer = section.GetSection("LoadBalancer").Get(), + + // Enable the SuperStream stream feature. + SuperStream = section.GetSection("SuperStream").Get(), + + // The number of streams that will be created. in case of super stream, this is the number of the partitions. + Streams = section.GetSection("Streams").Get(), + // The number of producers that will be created for each stream. + Producers = section.GetSection("Producers").Get(), + + // The number of messages that will be sent by each producer. + MessagesPerProducer = section.GetSection("MessagesPerProducer").Get(), + Consumers = section.GetSection("Consumers").Get(), + Username = section.GetSection("Username").Get(), + Password = section.GetSection("Password").Get(), + // The delay between each message sent by the producer. + DelayDuringSendMs = section.GetSection("DelayDuringSendMs").Get(), + StreamName = section.GetSection("StreamName").Get(), }); await rClient.ConfigureAwait(false); diff --git a/docs/ReliableClient/README.md b/docs/ReliableClient/README.md new file mode 100644 index 00000000..4b3a024d --- /dev/null +++ b/docs/ReliableClient/README.md @@ -0,0 +1,17 @@ +Best practices for reliable client +---------------------------------- + +This is an example of hot to use the client in a reliable way. By following these best practices, you can ensure that your application will be able to recover from network failures and other issues. + +the producer part is the most important, you need to block the sending of messages until the connection is established. This is done by using the `ManualResetEvent` method. + +you'd need also a list to store the messages that were not sent because the connection was not established. + +Use the `Identify` method to identify the producer or consumer it the logs. + +You should focus in how to deal with the entity `StatusChanged` events. + +You can customize the setting using `appsetting.json` file. + + + diff --git a/docs/ReliableClient/ReliableClient.csproj b/docs/ReliableClient/ReliableClient.csproj index 413536df..e88cf5c5 100644 --- a/docs/ReliableClient/ReliableClient.csproj +++ b/docs/ReliableClient/ReliableClient.csproj @@ -8,6 +8,8 @@ + + @@ -15,4 +17,10 @@ + + + Always + + + diff --git a/docs/ReliableClient/appsettings.json b/docs/ReliableClient/appsettings.json new file mode 100644 index 00000000..ed820677 --- /dev/null +++ b/docs/ReliableClient/appsettings.json @@ -0,0 +1,24 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + } + }, + "RabbitMQ": { + "Host": "localhost", + "Username": "guest", + "Password": "guest", + "Port": 5552, + "Virtualhost": "/", + "LoadBalancer": true, + "SuperStream": false, + "Streams": 4, + "ProducersPerConnection": 2, + "ConsumersPerConnection": 2, + "Producers": 3, + "Consumers": 2, + "DelayDuringSendMs":1, + "MessagesPerProducer": 100000, + "StreamName": "DotNetClientTest" + } +} diff --git a/docs/SuperStream/Costants.cs b/docs/SuperStream/Costants.cs index aa873316..38a49911 100644 --- a/docs/SuperStream/Costants.cs +++ b/docs/SuperStream/Costants.cs @@ -7,4 +7,5 @@ namespace SuperStream; public class Costants { public const string StreamName = "invoices"; + public const string StreamNameC = "invoices-c"; } diff --git a/docs/SuperStream/README.md b/docs/SuperStream/README.md index 73377ece..8dd0ec18 100644 --- a/docs/SuperStream/README.md +++ b/docs/SuperStream/README.md @@ -5,10 +5,8 @@ Super stream example [Super Streams blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams) -First step is to create the super stream: - - $ rabbitmq-streams add_super_stream invoices --partitions 3 - +This example shows how to use the Super Stream feature in RabbitMQ 3.11.0. +To run it you need the RabbitMQ `3.13.0` or later and the RabbitMQ .NET client library `1.8.0`. Then run the producer in one terminal: @@ -82,3 +80,5 @@ Consumer Name my_first_consumer -Received message id: hello11 body: hello11, Str ``` + + diff --git a/docs/SuperStream/Start.cs b/docs/SuperStream/Start.cs index 90ba5991..e3d0095a 100644 --- a/docs/SuperStream/Start.cs +++ b/docs/SuperStream/Start.cs @@ -19,6 +19,9 @@ private static async Task Main(string[] arguments) case "--producer": await SuperStreamProducer.Start().ConfigureAwait(false); break; + case "--producer-key": + await SuperStreamProducerKey.Start().ConfigureAwait(false); + break; case "--consumer": if (arguments.Length == 1) { diff --git a/docs/SuperStream/SuperStreamProducer.cs b/docs/SuperStream/SuperStreamProducer.cs index b3d539f9..a78f6101 100644 --- a/docs/SuperStream/SuperStreamProducer.cs +++ b/docs/SuperStream/SuperStreamProducer.cs @@ -17,21 +17,24 @@ public static async Task Start() var loggerFactory = LoggerFactory.Create(builder => { builder.AddSimpleConsole(); - + builder.AddFilter("RabbitMQ.Stream", LogLevel.Information); }); - + var logger = loggerFactory.CreateLogger(); - + var loggerMain = loggerFactory.CreateLogger(); loggerMain.LogInformation("Starting SuperStream Producer"); var config = new StreamSystemConfig(); var system = await StreamSystem.Create(config).ConfigureAwait(false); loggerMain.LogInformation("Super Stream Producer connected to RabbitMQ"); - - + + + // tag::super-stream-creation[] + await system.CreateSuperStream(new PartitionsSuperStreamSpec(Costants.StreamName, 3)).ConfigureAwait(false); + // end::super-stream-creation[] // We define a Producer with the SuperStream name (that is the Exchange name) // tag::super-stream-producer[] var producer = await Producer.Create( @@ -56,7 +59,7 @@ public static async Task Start() await producer.Send(message).ConfigureAwait(false); // end::super-stream-producer[] loggerMain.LogInformation("Sent {I} message to {StreamName}, id: {ID}", $"my_invoice_number{i}", - Costants.StreamName, $"id_{i}"); + Costants.StreamName, $"id_{i}"); Thread.Sleep(TimeSpan.FromMilliseconds(1000)); } } diff --git a/docs/SuperStream/SuperStreamProducerKey.cs b/docs/SuperStream/SuperStreamProducerKey.cs index 490b0450..35a3713e 100644 --- a/docs/SuperStream/SuperStreamProducerKey.cs +++ b/docs/SuperStream/SuperStreamProducerKey.cs @@ -27,7 +27,12 @@ public static async Task Start() var config = new StreamSystemConfig(); var system = await StreamSystem.Create(config).ConfigureAwait(false); loggerMain.LogInformation("Super Stream Producer connected to RabbitMQ"); + var keys = new [] {"apac", "emea", "amer"}; + // tag::super-stream-creation[] + await system.CreateSuperStream(new BindingsSuperStreamSpec(Costants.StreamNameC, keys)).ConfigureAwait(false); + // end::super-stream-creation[] + // We define a Producer with the SuperStream name (that is the Exchange name) // tag::super-stream-producer-key[] @@ -35,7 +40,7 @@ public static async Task Start() new ProducerConfig(system, // Costants.StreamName is the Exchange name // invoices - Costants.StreamName) + Costants.StreamNameC) { SuperStreamConfig = new SuperStreamConfig() { @@ -46,8 +51,7 @@ public static async Task Start() }, logger).ConfigureAwait(false); const int NumberOfMessages = 1_000_000; - var keys = new [] {"apac", "emea", "amer"}; - + for (var i = 0; i < NumberOfMessages; i++) { var key = keys[i % 3];