Skip to content

Commit

Permalink
replace seq.Slice(offset) with seq[offset..] (#182)
Browse files Browse the repository at this point in the history
Improve the publish

- replace seq.Slice(offset) with seq[offset..]
- Add AggressiveInlining to the main send functions
- Add AggressiveInlining to write functions
- Tune the performance test increasing the messages buffer

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Nov 15, 2022
1 parent 6bf2a73 commit 656fcaa
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 90 deletions.
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client.PerfTest/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ let main argv =
let producerConfig = RawProducerConfig(streamName,
Reference = null,
MaxInFlight = 10000,
MessagesBufferSize = 10000,
ConfirmHandler = fun c -> confirmed <- confirmed + 1)
let! producer = system.CreateRawProducer producerConfig
//make producer available to metrics async
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ internal static int ReadString(ref SequenceReader<byte> reader, out string value
case FormatCode.Sym32:
case FormatCode.Str32:
offset += WireFormatting.ReadInt32(ref reader, out var len);
Span<byte> tempSpan32 = len <= 64 ? stackalloc byte[len] : new byte[len];
var tempSpan32 = len <= 64 ? stackalloc byte[len] : new byte[len];
reader.TryCopyTo(tempSpan32);
reader.Advance(len);
value = Encoding.UTF8.GetString(tempSpan32);
Expand Down
70 changes: 35 additions & 35 deletions RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ private static int WriteString(Span<byte> seq, string value)
// Str8
if (len <= byte.MaxValue)
{
offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Str8);
offset += WireFormatting.WriteByte(seq.Slice(offset), (byte)len);
offset += s_encoding.GetBytes(value, seq.Slice(offset));
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Str8);
offset += WireFormatting.WriteByte(seq[offset..], (byte)len);
offset += s_encoding.GetBytes(value, seq[offset..]);
return offset;
}

// Str32
offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Str32);
offset += WireFormatting.WriteInt32(seq.Slice(offset), len);
offset += s_encoding.GetBytes(value, seq.Slice(offset));
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Str32);
offset += WireFormatting.WriteInt32(seq[offset..], len);
offset += s_encoding.GetBytes(value, seq[offset..]);
return offset;
}

Expand All @@ -64,13 +64,13 @@ private static int WriteUInt64(Span<byte> seq, ulong value)
var offset = 0;
if (value < 256)
{
offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.SmallUlong);
offset += WireFormatting.WriteByte(seq.Slice(offset), (byte)value);
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.SmallUlong);
offset += WireFormatting.WriteByte(seq[offset..], (byte)value);
return offset;
}

offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Ulong);
offset += WireFormatting.WriteUInt64(seq.Slice(offset), value);
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Ulong);
offset += WireFormatting.WriteUInt64(seq[offset..], value);
return offset;
}

Expand All @@ -83,13 +83,13 @@ private static int WriteUInt(Span<byte> seq, uint value)
case 0:
return WireFormatting.WriteByte(seq, FormatCode.Uint0);
case < 256:
offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.SmallUint);
offset += WireFormatting.WriteByte(seq.Slice(offset), (byte)value);
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.SmallUint);
offset += WireFormatting.WriteByte(seq[offset..], (byte)value);
return offset;
}

offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Uint);
offset += WireFormatting.WriteUInt32(seq.Slice(offset), value);
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Uint);
offset += WireFormatting.WriteUInt32(seq[offset..], value);
return offset;
}

Expand All @@ -98,14 +98,14 @@ private static int WriteInt(Span<byte> seq, int value)
var offset = 0;
if (value is < 128 and >= -128)
{
offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Smallint);
offset += WireFormatting.WriteByte(seq.Slice(offset), (byte)value);
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Smallint);
offset += WireFormatting.WriteByte(seq[offset..], (byte)value);

return offset;
}

offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Int);
offset += WireFormatting.WriteInt32(seq.Slice(offset), value);
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Int);
offset += WireFormatting.WriteInt32(seq[offset..], value);
return offset;
}

Expand All @@ -114,14 +114,14 @@ private static int WriteInt64(Span<byte> seq, long value)
var offset = 0;
if (value is < 128 and >= -128)
{
offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Smalllong);
offset += WireFormatting.WriteByte(seq.Slice(offset), (byte)value);
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Smalllong);
offset += WireFormatting.WriteByte(seq[offset..], (byte)value);

return offset;
}

offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Long);
offset += WireFormatting.WriteInt64(seq.Slice(offset), value);
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Long);
offset += WireFormatting.WriteInt64(seq[offset..], value);
return offset;
}

Expand All @@ -138,60 +138,60 @@ private static int WriteBytes(Span<byte> seq, byte[] value)
// List8
if (len < 256)
{
offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Vbin8);
offset += WireFormatting.WriteByte(seq.Slice(offset), (byte)len);
offset += WireFormatting.Write(seq.Slice(offset), new ReadOnlySequence<byte>(value));
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Vbin8);
offset += WireFormatting.WriteByte(seq[offset..], (byte)len);
offset += WireFormatting.Write(seq[offset..], new ReadOnlySequence<byte>(value));
return offset;
}

// List32
offset += WireFormatting.WriteByte(seq.Slice(offset), FormatCode.Vbin32);
offset += WireFormatting.WriteUInt32(seq.Slice(offset), (uint)len);
offset += WireFormatting.Write(seq.Slice(offset), new ReadOnlySequence<byte>(value));
offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Vbin32);
offset += WireFormatting.WriteUInt32(seq[offset..], (uint)len);
offset += WireFormatting.Write(seq[offset..], new ReadOnlySequence<byte>(value));
return offset;
}

private static int WriteUInt16(Span<byte> seq, ushort value)
{
var offset = WireFormatting.WriteByte(seq, FormatCode.Ushort);
offset += WireFormatting.WriteUInt16(seq.Slice(offset), value);
offset += WireFormatting.WriteUInt16(seq[offset..], value);
return offset;
}

private static int WriteByte(Span<byte> seq, byte value)
{
var offset = WireFormatting.WriteByte(seq, FormatCode.Ubyte);
offset += WireFormatting.WriteByte(seq.Slice(offset), value);
offset += WireFormatting.WriteByte(seq[offset..], value);
return offset;
}

private static int WriteSByte(Span<byte> seq, sbyte value)
{
var offset = WireFormatting.WriteByte(seq, FormatCode.Byte);
offset += WireFormatting.WriteByte(seq.Slice(offset), (byte)value);
offset += WireFormatting.WriteByte(seq[offset..], (byte)value);
return offset;
}

private static int WriteFloat(Span<byte> seq, float value)
{
var offset = WireFormatting.WriteByte(seq, FormatCode.Float);
var intFloat = BitConverter.SingleToInt32Bits(value);
offset += WireFormatting.WriteInt32(seq.Slice(offset), intFloat);
offset += WireFormatting.WriteInt32(seq[offset..], intFloat);
return offset;
}

private static int WriteDouble(Span<byte> seq, double value)
{
var offset = WireFormatting.WriteByte(seq, FormatCode.Double);
var intFloat = BitConverter.DoubleToInt64Bits(value);
offset += WireFormatting.WriteInt64(seq.Slice(offset), intFloat);
offset += WireFormatting.WriteInt64(seq[offset..], intFloat);
return offset;
}

private static int WriteIn16(Span<byte> seq, short value)
{
var offset = WireFormatting.WriteByte(seq, FormatCode.Short);
offset += WireFormatting.WriteInt16(seq.Slice(offset), value);
offset += WireFormatting.WriteInt16(seq[offset..], value);
return offset;
}

Expand All @@ -204,7 +204,7 @@ private static int WriteTimestamp(Span<byte> seq, DateTime value)
{
var offset = WireFormatting.WriteByte(seq, FormatCode.Timestamp);
var unixTime = ((DateTimeOffset)value).ToUnixTimeMilliseconds();
offset += WireFormatting.WriteUInt64(seq.Slice(offset), (ulong)unixTime);
offset += WireFormatting.WriteUInt64(seq[offset..], (ulong)unixTime);
return offset;
}

Expand Down
10 changes: 5 additions & 5 deletions RabbitMQ.Stream.Client/AMQP/Data.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ public int Write(Span<byte> span)
var offset = DescribedFormatCode.Write(span, DescribedFormatCode.ApplicationData);
if (data.Length <= byte.MaxValue)
{
offset += WireFormatting.WriteByte(span.Slice(offset), FormatCode.Vbin8); //binary marker
offset += WireFormatting.WriteByte(span.Slice(offset), (byte)data.Length); //length
offset += WireFormatting.WriteByte(span[offset..], FormatCode.Vbin8); //binary marker
offset += WireFormatting.WriteByte(span[offset..], (byte)data.Length); //length
}
else
{
offset += WireFormatting.WriteByte(span.Slice(offset), FormatCode.Vbin32); //binary marker
offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint)data.Length); //length
offset += WireFormatting.WriteByte(span[offset..], FormatCode.Vbin32); //binary marker
offset += WireFormatting.WriteUInt32(span[offset..], (uint)data.Length); //length
}

offset += WireFormatting.Write(span.Slice(offset), data);
offset += WireFormatting.Write(span[offset..], data);
return offset;
}

Expand Down
3 changes: 3 additions & 0 deletions RabbitMQ.Stream.Client/AMQP/DescribedFormatCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ public static class DescribedFormatCode
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static byte Read(ref SequenceReader<byte> reader)
{
// DescribedFormatCode in this case we need to read
// only the last byte form the header to get the format code
// the first can be ignored but it is necessary to read them
reader.TryRead(out _);
reader.TryRead(out _);
reader.TryRead(out var formatCode);
Expand Down
10 changes: 5 additions & 5 deletions RabbitMQ.Stream.Client/AMQP/Map.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ public int Size
public int Write(Span<byte> span)
{
var offset = DescribedFormatCode.Write(span, MapDataCode);
offset += WireFormatting.WriteByte(span.Slice(offset), FormatCode.Map32);
offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint)MapSize()); // MapSize
offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint)Count * 2); // pair values
offset += WireFormatting.WriteByte(span[offset..], FormatCode.Map32);
offset += WireFormatting.WriteUInt32(span[offset..], (uint)MapSize()); // MapSize
offset += WireFormatting.WriteUInt32(span[offset..], (uint)Count * 2); // pair values
foreach (var (key, value) in this)
{
if (!IsNullOrEmptyString(key))
{
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), key);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), value);
offset += AmqpWireFormatting.WriteAny(span[offset..], key);
offset += AmqpWireFormatting.WriteAny(span[offset..], value);
}
}

Expand Down
32 changes: 16 additions & 16 deletions RabbitMQ.Stream.Client/AMQP/Properties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,22 @@ public int Size
public int Write(Span<byte> span)
{
var offset = DescribedFormatCode.Write(span, DescribedFormatCode.MessageProperties);
offset += WireFormatting.WriteByte(span.Slice(offset), FormatCode.List32);
offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint)PropertySize()); // PropertySize
offset += WireFormatting.WriteUInt32(span.Slice(offset), 13); // field numbers
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), MessageId);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), UserId);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), To);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), Subject);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), ReplyTo);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), CorrelationId);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), ContentType);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), ContentEncoding);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), AbsoluteExpiryTime);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), CreationTime);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), GroupId);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), GroupSequence);
offset += AmqpWireFormatting.WriteAny(span.Slice(offset), ReplyToGroupId);
offset += WireFormatting.WriteByte(span[offset..], FormatCode.List32);
offset += WireFormatting.WriteUInt32(span[offset..], (uint)PropertySize()); // PropertySize
offset += WireFormatting.WriteUInt32(span[offset..], 13); // field numbers
offset += AmqpWireFormatting.WriteAny(span[offset..], MessageId);
offset += AmqpWireFormatting.WriteAny(span[offset..], UserId);
offset += AmqpWireFormatting.WriteAny(span[offset..], To);
offset += AmqpWireFormatting.WriteAny(span[offset..], Subject);
offset += AmqpWireFormatting.WriteAny(span[offset..], ReplyTo);
offset += AmqpWireFormatting.WriteAny(span[offset..], CorrelationId);
offset += AmqpWireFormatting.WriteAny(span[offset..], ContentType);
offset += AmqpWireFormatting.WriteAny(span[offset..], ContentEncoding);
offset += AmqpWireFormatting.WriteAny(span[offset..], AbsoluteExpiryTime);
offset += AmqpWireFormatting.WriteAny(span[offset..], CreationTime);
offset += AmqpWireFormatting.WriteAny(span[offset..], GroupId);
offset += AmqpWireFormatting.WriteAny(span[offset..], GroupSequence);
offset += AmqpWireFormatting.WriteAny(span[offset..], ReplyToGroupId);
return offset;
}
}
Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ public int Write(Span<byte> span)
var offset = 0;
if (Properties != null)
{
offset += Properties.Write(span.Slice(offset));
offset += Properties.Write(span[offset..]);
}

if (ApplicationProperties != null)
{
offset += ApplicationProperties.Write(span.Slice(offset));
offset += ApplicationProperties.Write(span[offset..]);
}

if (Annotations != null)
{
offset += Annotations.Write(span.Slice(offset));
offset += Annotations.Write(span[offset..]);
}

offset += Data.Write(span.Slice(offset));
offset += Data.Write(span[offset..]);
return offset;
}

Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/MetaData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public int Write(Span<byte> span)
{
var command = (ICommand)this;
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId);
offset += WireFormatting.WriteUInt16(span[offset..], command.Version);
offset += WireFormatting.WriteUInt32(span[offset..], correlationId);
// map
offset += WireFormatting.WriteInt32(span.Slice(offset), streams.Count());
offset += WireFormatting.WriteInt32(span[offset..], streams.Count());
foreach (var s in streams)
{
offset += WireFormatting.WriteString(span.Slice(offset), s);
offset += WireFormatting.WriteString(span[offset..], s);
}

return offset;
Expand Down
12 changes: 6 additions & 6 deletions RabbitMQ.Stream.Client/Publish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ public Publish(byte publisherId, List<(ulong, Message)> messages)
public int Write(Span<byte> span)
{
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), Version);
offset += WireFormatting.WriteByte(span.Slice(offset), publisherId);
offset += WireFormatting.WriteUInt16(span[offset..], Version);
offset += WireFormatting.WriteByte(span[offset..], publisherId);
// this assumes we never write an empty publish frame
offset += WireFormatting.WriteInt32(span.Slice(offset), MessageCount);
offset += WireFormatting.WriteInt32(span[offset..], MessageCount);
foreach (var (publishingId, msg) in messages)
{
offset += WireFormatting.WriteUInt64(span.Slice(offset), publishingId);
offset += WireFormatting.WriteUInt64(span[offset..], publishingId);
// this only write "simple" messages, we assume msg is just the binary body
// not stream encoded data
offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint)msg.Size);
offset += msg.Write(span.Slice(offset));
offset += WireFormatting.WriteUInt32(span[offset..], (uint)msg.Size);
offset += msg.Write(span[offset..]);
}

return offset;
Expand Down
4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -221,6 +222,7 @@ public bool IsOpen()
return !_disposed && !_client.IsClosed;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask Send(ulong publishingId, Message message)
{
await SemaphoreWait();
Expand All @@ -235,9 +237,11 @@ public async ValueTask Send(ulong publishingId, Message message)
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private async Task ProcessBuffer()
{
var messages = new List<(ulong, Message)>(_config.MessagesBufferSize);

while (await _messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false))
{
while (_messageBuffer.Reader.TryRead(out var msg))
Expand Down
Loading

0 comments on commit 656fcaa

Please sign in to comment.