From 2b5177b5d73d885d349f32af63dc1fc4da45af01 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 5 Sep 2022 05:28:37 -0700 Subject: [PATCH] Fix "Data instances with a length of 255 bytes are wrongly written" (#161) * Added unit test for comparing the size of the data versus the actual written data. Tests the edge cases of determining if the data must be written with formatcode Vbin8 or Vbin32. * Fix inconsistent data size check in Data struct when writing the underlying data. * Added test for producing and consuming events with a size of 254-256 bytes. * Fixups from `dotnet format` * Replaced collections in TestEventLength-test with single variables to check on * Fixups from `dotnet format` * Refactor tests to be compliant with other tests add the same fix to GetSequenceSize remove the integration test file and move the test to ProducerSystemTests file Signed-off-by: Gabriele Santomaggio Signed-off-by: Gabriele Santomaggio Co-authored-by: Tim Heusschen Co-authored-by: Timz95 Co-authored-by: Gabriele Santomaggio --- .../AMQP/AmqpWireFormattingWrite.cs | 2 +- RabbitMQ.Stream.Client/AMQP/Data.cs | 2 +- Tests/Amqp10Tests.cs | 14 ++++ Tests/ProducerSystemTests.cs | 84 +++++++++++++++++++ 4 files changed, 100 insertions(+), 2 deletions(-) diff --git a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs index de9d37f9..0e1b4828 100644 --- a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs +++ b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs @@ -212,7 +212,7 @@ private static int WriteTimestamp(Span seq, DateTime value) // determinate the type size public static int GetSequenceSize(ReadOnlySequence data) { - if (data.Length < 256) + if (data.Length <= byte.MaxValue) { return (int)data.Length + 1 + //marker 1 byte FormatCode.Vbin8 diff --git a/RabbitMQ.Stream.Client/AMQP/Data.cs b/RabbitMQ.Stream.Client/AMQP/Data.cs index d26cc06e..a1b80ae3 100644 --- a/RabbitMQ.Stream.Client/AMQP/Data.cs +++ b/RabbitMQ.Stream.Client/AMQP/Data.cs @@ -29,7 +29,7 @@ public Data(ReadOnlySequence data) public int Write(Span span) { var offset = DescribedFormatCode.Write(span, DescribedFormatCode.ApplicationData); - if (data.Length < byte.MaxValue) + if (data.Length <= byte.MaxValue) { offset += WireFormatting.WriteByte(span.Slice(offset), FormatCode.Vbin8); //binary marker offset += WireFormatting.WriteByte(span.Slice(offset), (byte)data.Length); //length diff --git a/Tests/Amqp10Tests.cs b/Tests/Amqp10Tests.cs index 8121964c..3165ac67 100644 --- a/Tests/Amqp10Tests.cs +++ b/Tests/Amqp10Tests.cs @@ -462,5 +462,19 @@ public void MapEntriesWithAnEmptyKeyShouldNotBeWrittenToTheWire() // we do not expect the new entry to be written Assert.Equal(expectedMapSize, actualMapSize); } + + [Theory] + // AmqpWireFormattingWrite.GetSequenceSize + DescribedFormat.Size + [InlineData(254, 254 + 1 + 1 + 3)] + [InlineData(255, 255 + 1 + 1 + 3)] + [InlineData(256, 256 + 1 + 4 + 3)] + public void WriteDataEdgeCasesTests(int size, int expectedLengthWritten) + { + var buffer = new Span(new byte[4096]); + var bytes = new byte[size]; + var data = new Data(new ReadOnlySequence(bytes)); + var written = data.Write(buffer); + Assert.Equal(expectedLengthWritten, written); + } } } diff --git a/Tests/ProducerSystemTests.cs b/Tests/ProducerSystemTests.cs index b1eb550c..8c79ea57 100644 --- a/Tests/ProducerSystemTests.cs +++ b/Tests/ProducerSystemTests.cs @@ -3,6 +3,8 @@ // Copyright (c) 2007-2020 VMware, Inc. using System; +using System.Buffers; +using System.Collections; using System.Collections.Generic; using System.Text; using System.Threading; @@ -341,5 +343,87 @@ public async void ProducerBatchConfirmNumberOfMessages() await system.DeleteStream(stream); await system.Close(); } + + private class EventLengthTestCases : IEnumerable + { + private readonly Random _random = new(3895); + + public IEnumerator GetEnumerator() + { + yield return new object[] { GetRandomBytes(254) }; + yield return new object[] { GetRandomBytes(255) }; + yield return new object[] { GetRandomBytes(256) }; + // just to test an event greater than 256 bytes + yield return new object[] { GetRandomBytes(654) }; + } + + private ReadOnlySequence GetRandomBytes(ulong length) + { + var arr = new byte[length]; + _random.NextBytes(arr); + return new ReadOnlySequence(arr); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + } + + [Theory] + [ClassData(typeof(EventLengthTestCases))] + public async Task ProducerSendsArrays255Bytes(ReadOnlySequence @event) + { + // Test the data around 255 bytes + // https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/160 + // We test if the data is correctly sent and received + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var testPassed = new TaskCompletionSource(); + var producer = await system.CreateProducer(new + ProducerConfig + { + Reference = "producer", + Stream = stream, + ConfirmHandler = _ => + { + testPassed.SetResult(true); + } + } + ); + + const ulong PublishingId = 0; + var msg = new Message(new Data(@event)) + { + ApplicationProperties = new ApplicationProperties { { "myArray", @event.First.ToArray() } } + }; + + await producer.Send(PublishingId, msg); + new Utils(testOutputHelper).WaitUntilTaskCompletes(testPassed); + + var testMessageConsumer = new TaskCompletionSource(); + + var consumer = await system.CreateConsumer(new ConsumerConfig + { + Stream = stream, + // Consume the stream from the Offset + OffsetSpec = new OffsetTypeOffset(), + // Receive the messages + MessageHandler = (_, _, message) => + { + testMessageConsumer.SetResult(message); + return Task.CompletedTask; + } + }); + + new Utils(testOutputHelper).WaitUntilTaskCompletes(testMessageConsumer); + // at this point the data length _must_ be the same + Assert.Equal(@event.Length, testMessageConsumer.Task.Result.Data.Contents.Length); + + Assert.Equal(@event.Length, + ((byte[])testMessageConsumer.Task.Result.ApplicationProperties["myArray"]).Length); + await consumer.Close(); + await system.DeleteStream(stream); + await system.Close(); + } } }