Skip to content

Commit

Permalink
Batch send (#142)
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Jul 15, 2022
1 parent 86f924d commit da980eb
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 28 deletions.
245 changes: 245 additions & 0 deletions Examples/Performances/BatchVsBatchSend.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

namespace Performances;

public class BatchVsBatchSend
{
private const int TotalMessages = 20_000_000;
private const int MessageSize = 100;
private const int AggregateBatchSize = 300;
private const int ModPrintMessages = 10_000_000;

public async Task Start()
{
Console.WriteLine("Stream Client Performance Test");
Console.WriteLine("==============================");
Console.WriteLine("Client will test Batch vs Batch Send");
Console.WriteLine("Total Messages: {0}", TotalMessages);
Console.WriteLine("Message Size: {0}", MessageSize);
Console.WriteLine("Aggregate Batch Size: {0}", AggregateBatchSize);
Console.WriteLine("Print Messages each: {0} messages", ModPrintMessages);


var config = new StreamSystemConfig() {Heartbeat = TimeSpan.Zero};
var system = await StreamSystem.Create(config);
await BatchSend(system, await RecreateStream(system, "StandardBatchSend"));
await StandardProducerSend(await RecreateStream(system, "StandardProducerSendNoBatch"), system);
await RProducerBatchSend(await RecreateStream(system, "ReliableProducerBatch"), system);
await RProducerSend(await RecreateStream(system, "ReliableProducerSendNoBatch"), system);
}

private static async Task RProducerSend(string stream, StreamSystem system)
{
Console.WriteLine("*****Reliable Producer Send No Batch*****");
var total = 0;
var confirmed = 0;
var error = 0;
var reliableProducer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
{
Stream = stream,
StreamSystem = system,
MaxInFlight = 1_000_000,
ConfirmationHandler = messagesConfirmed =>
{
if (messagesConfirmed.Status == ConfirmationStatus.Confirmed)
{
confirmed += messagesConfirmed.Messages.Count;
}
else
{
error += messagesConfirmed.Messages.Count;
}

if (++total % ModPrintMessages == 0)
{
Console.WriteLine(
$"*****Reliable Producer Send No Batch Confirmed: {confirmed} Error: {error}*****");
}

return Task.CompletedTask;
}
});

var start = DateTime.Now;
for (ulong i = 1; i <= TotalMessages; i++)
{
var array = new byte[MessageSize];
await reliableProducer.Send(new Message(array));

if (i % ModPrintMessages == 0)
{
Console.WriteLine($"*****Reliable Producer Send No Batch: {i}");
}
}

Console.WriteLine(
$"*****Reliable Producer Send No Batch***** send time: {DateTime.Now - start}, messages sent: {TotalMessages}");

Thread.Sleep(1000);
await reliableProducer.Close();
}


private static async Task RProducerBatchSend(string stream, StreamSystem system)
{
Console.WriteLine("*****Reliable Producer Batch Send*****");
var total = 0;
var confirmed = 0;
var error = 0;
var reliableProducer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
{
Stream = stream,
StreamSystem = system,
MaxInFlight = 1_000_000,
ConfirmationHandler = messagesConfirmed =>
{
if (messagesConfirmed.Status == ConfirmationStatus.Confirmed)
{
confirmed += messagesConfirmed.Messages.Count;
}
else
{
error += messagesConfirmed.Messages.Count;
}

if (++total % ModPrintMessages == 0)
{
Console.WriteLine($"*****Reliable Producer Batch Confirmed: {confirmed}, Error: {error}*****");
}

return Task.CompletedTask;
}
});

var messages = new List<Message>();


var start = DateTime.Now;
for (ulong i = 1; i <= TotalMessages; i++)
{
var array = new byte[MessageSize];
messages.Add(new Message(array));
if (i % AggregateBatchSize == 0)
{
await reliableProducer.BatchSend(messages);
messages.Clear();
}

if (i % ModPrintMessages == 0)
{
Console.WriteLine($"*****Reliable Producer Batch Send: {i}");
}
}

await reliableProducer.BatchSend(messages);
messages.Clear();

Console.WriteLine(
$"*****Reliable Producer Batch Send***** time: {DateTime.Now - start}, messages sent: {TotalMessages}");
Thread.Sleep(1000);
await reliableProducer.Close();
}


private static async Task StandardProducerSend(string stream, StreamSystem system)
{
Console.WriteLine("*****Standard Producer Send*****");
var confirmed = 0;
var producer = await system.CreateProducer(new ProducerConfig()
{
Stream = stream,
MaxInFlight = 1_000_000,
ConfirmHandler = _ =>
{
if (++confirmed % ModPrintMessages == 0)
{
Console.WriteLine($"*****Standard Producer Send Confirmed: {confirmed}");
}
}
});

var start = DateTime.Now;
for (ulong i = 1; i <= TotalMessages; i++)
{
var array = new byte[MessageSize];

await producer.Send(i, new Message(array));

if (i % ModPrintMessages == 0)
{
Console.WriteLine($"*****Standard Producer: {i}");
}
}

Console.WriteLine(
$"*****Standard Producer Send***** send time: {DateTime.Now - start}, messages sent: {TotalMessages}");
Thread.Sleep(1000);
await producer.Close();
}

private static async Task BatchSend(StreamSystem system, string stream)
{
Console.WriteLine("*****Standard Batch Send*****");
var confirmed = 0;
var producer = await system.CreateProducer(new ProducerConfig()
{
Stream = stream,
MaxInFlight = 1_000_000,
ConfirmHandler = _ =>
{
if (++confirmed % ModPrintMessages == 0)
{
Console.WriteLine($"*****Standard Batch Confirmed: {confirmed}");
}
}
});
var messages = new List<(ulong, Message)>();
var start = DateTime.Now;
for (ulong i = 1; i <= TotalMessages; i++)
{
var array = new byte[MessageSize];
messages.Add((i, new Message(array)));
if (i % AggregateBatchSize == 0)
{
await producer.BatchSend(messages);
messages.Clear();
}

if (i % ModPrintMessages == 0)
{
Console.WriteLine($"*****Standard Batch Send: {i}");
}
}

await producer.BatchSend(messages);
messages.Clear();

Console.WriteLine(
$"*****Standard Batch Send***** send time: {DateTime.Now - start}, messages sent: {TotalMessages}");
Thread.Sleep(1000);
await producer.Close();
}

private static async Task<string> RecreateStream(StreamSystem system, string stream)
{
Console.WriteLine("==============================");
Console.WriteLine($"Recreate Stream: {stream}, just wait a bit..");
Thread.Sleep(5000);
try
{
await system.DeleteStream(stream);
}
catch (Exception e)
{
// Console.WriteLine(e);
}

Thread.Sleep(5000);

await system.CreateStream(new StreamSpec(stream) { });
Thread.Sleep(1000);
Console.WriteLine($"Stream: {stream} created");
return stream;
}
}
14 changes: 14 additions & 0 deletions Examples/Performances/Performances.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\RabbitMQ.Stream.Client\RabbitMQ.Stream.Client.csproj" />
</ItemGroup>

</Project>
3 changes: 3 additions & 0 deletions Examples/Performances/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using Performances;

new BatchVsBatchSend().Start().Wait();
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,21 @@ or more generic `system.QuerySequence("reference", "my_stream")`.

`publishingId` must be incremented for each send.

#### Standard Batch publish

Batch send is a synchronous operation.
It allows to pre-aggregate messages and send them in a single synchronous call.
```csharp
var messages = new List<(ulong, Message)>();
for (ulong i = 0; i < 30; i++)
{
messages.Add((i, new Message(Encoding.UTF8.GetBytes($"batch {i}"))));
}
await producer.BatchSend(messages);
messages.Clear();
```
In most cases, the standard `Send` is easier and works in most of the cases.

#### Sub Entries Batching
A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames,
but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency.
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public class Client : IClient
public int PublishCommandsSent => publishCommandsSent;

public int MessagesSent => messagesSent;
public uint MaxFrameSize => tuneReceived.Task.Result.FrameMax;

private int messagesSent;
private int confirmFrames;
Expand Down Expand Up @@ -215,7 +216,7 @@ await client.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr =>
ClientExceptions.MaybeThrowException(authResponse.ResponseCode, parameters.UserName);

//tune
var tune = await client.tuneReceived.Task;
await client.tuneReceived.Task;
await client.Publish(new TuneRequest(0,
(uint)client.Parameters.Heartbeat.TotalSeconds));

Expand Down
62 changes: 51 additions & 11 deletions RabbitMQ.Stream.Client/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -139,6 +140,42 @@ public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages,
}
}

public async ValueTask BatchSend(List<(ulong, Message)> messages)
{
PreValidateBatch(messages);
await InternalBatchSend(messages);
}

internal async Task InternalBatchSend(List<(ulong, Message)> messages)
{
for (var i = 0; i < messages.Count; i++)
{
await SemaphoreWait();
}

if (messages.Count != 0 && !client.IsClosed)
{
await SendMessages(messages, false).ConfigureAwait(false);
}
}

internal void PreValidateBatch(List<(ulong, Message)> messages)
{
if (messages.Count > config.MaxInFlight)
{
throw new InvalidOperationException($"Too many messages in batch. " +
$"Max allowed is {config.MaxInFlight}");
}

var totalSize = messages.Sum(message => message.Item2.Size);

if (totalSize > client.MaxFrameSize)
{
throw new InvalidOperationException($"Total size of messages in batch is too big. " +
$"Max allowed is {client.MaxFrameSize}");
}
}

private async Task SemaphoreWait()
{
if (!semaphore.Wait(0) && !client.IsClosed)
Expand All @@ -151,6 +188,20 @@ private async Task SemaphoreWait()
}
}

private async Task SendMessages(List<(ulong, Message)> messages, bool clearMessagesList = true)
{
var publishTask = client.Publish(new Publish(publisherId, messages));
if (!publishTask.IsCompletedSuccessfully)
{
await publishTask.ConfigureAwait(false);
}

if (clearMessagesList)
{
messages.Clear();
}
}

/// <summary>
/// GetLastPublishingId
/// </summary>
Expand Down Expand Up @@ -196,17 +247,6 @@ private async Task ProcessBuffer()
await SendMessages(messages).ConfigureAwait(false);
}
}

async Task SendMessages(List<(ulong, Message)> messages)
{
var publishTask = client.Publish(new Publish(publisherId, messages));
if (!publishTask.IsCompletedSuccessfully)
{
await publishTask.ConfigureAwait(false);
}

messages.Clear();
}
}

public Task<ResponseCode> Close()
Expand Down
Loading

0 comments on commit da980eb

Please sign in to comment.