diff --git a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs index de9d37f9..0e1b4828 100644 --- a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs +++ b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs @@ -212,7 +212,7 @@ private static int WriteTimestamp(Span seq, DateTime value) // determinate the type size public static int GetSequenceSize(ReadOnlySequence data) { - if (data.Length < 256) + if (data.Length <= byte.MaxValue) { return (int)data.Length + 1 + //marker 1 byte FormatCode.Vbin8 diff --git a/RabbitMQ.Stream.Client/AMQP/Data.cs b/RabbitMQ.Stream.Client/AMQP/Data.cs index d26cc06e..a1b80ae3 100644 --- a/RabbitMQ.Stream.Client/AMQP/Data.cs +++ b/RabbitMQ.Stream.Client/AMQP/Data.cs @@ -29,7 +29,7 @@ public Data(ReadOnlySequence data) public int Write(Span span) { var offset = DescribedFormatCode.Write(span, DescribedFormatCode.ApplicationData); - if (data.Length < byte.MaxValue) + if (data.Length <= byte.MaxValue) { offset += WireFormatting.WriteByte(span.Slice(offset), FormatCode.Vbin8); //binary marker offset += WireFormatting.WriteByte(span.Slice(offset), (byte)data.Length); //length diff --git a/Tests/Amqp10Tests.cs b/Tests/Amqp10Tests.cs index 8121964c..3165ac67 100644 --- a/Tests/Amqp10Tests.cs +++ b/Tests/Amqp10Tests.cs @@ -462,5 +462,19 @@ public void MapEntriesWithAnEmptyKeyShouldNotBeWrittenToTheWire() // we do not expect the new entry to be written Assert.Equal(expectedMapSize, actualMapSize); } + + [Theory] + // AmqpWireFormattingWrite.GetSequenceSize + DescribedFormat.Size + [InlineData(254, 254 + 1 + 1 + 3)] + [InlineData(255, 255 + 1 + 1 + 3)] + [InlineData(256, 256 + 1 + 4 + 3)] + public void WriteDataEdgeCasesTests(int size, int expectedLengthWritten) + { + var buffer = new Span(new byte[4096]); + var bytes = new byte[size]; + var data = new Data(new ReadOnlySequence(bytes)); + var written = data.Write(buffer); + Assert.Equal(expectedLengthWritten, written); + } } } diff --git a/Tests/ProducerSystemTests.cs b/Tests/ProducerSystemTests.cs index b1eb550c..8c79ea57 100644 --- a/Tests/ProducerSystemTests.cs +++ b/Tests/ProducerSystemTests.cs @@ -3,6 +3,8 @@ // Copyright (c) 2007-2020 VMware, Inc. using System; +using System.Buffers; +using System.Collections; using System.Collections.Generic; using System.Text; using System.Threading; @@ -341,5 +343,87 @@ public async void ProducerBatchConfirmNumberOfMessages() await system.DeleteStream(stream); await system.Close(); } + + private class EventLengthTestCases : IEnumerable + { + private readonly Random _random = new(3895); + + public IEnumerator GetEnumerator() + { + yield return new object[] { GetRandomBytes(254) }; + yield return new object[] { GetRandomBytes(255) }; + yield return new object[] { GetRandomBytes(256) }; + // just to test an event greater than 256 bytes + yield return new object[] { GetRandomBytes(654) }; + } + + private ReadOnlySequence GetRandomBytes(ulong length) + { + var arr = new byte[length]; + _random.NextBytes(arr); + return new ReadOnlySequence(arr); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + } + + [Theory] + [ClassData(typeof(EventLengthTestCases))] + public async Task ProducerSendsArrays255Bytes(ReadOnlySequence @event) + { + // Test the data around 255 bytes + // https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/160 + // We test if the data is correctly sent and received + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var testPassed = new TaskCompletionSource(); + var producer = await system.CreateProducer(new + ProducerConfig + { + Reference = "producer", + Stream = stream, + ConfirmHandler = _ => + { + testPassed.SetResult(true); + } + } + ); + + const ulong PublishingId = 0; + var msg = new Message(new Data(@event)) + { + ApplicationProperties = new ApplicationProperties { { "myArray", @event.First.ToArray() } } + }; + + await producer.Send(PublishingId, msg); + new Utils(testOutputHelper).WaitUntilTaskCompletes(testPassed); + + var testMessageConsumer = new TaskCompletionSource(); + + var consumer = await system.CreateConsumer(new ConsumerConfig + { + Stream = stream, + // Consume the stream from the Offset + OffsetSpec = new OffsetTypeOffset(), + // Receive the messages + MessageHandler = (_, _, message) => + { + testMessageConsumer.SetResult(message); + return Task.CompletedTask; + } + }); + + new Utils(testOutputHelper).WaitUntilTaskCompletes(testMessageConsumer); + // at this point the data length _must_ be the same + Assert.Equal(@event.Length, testMessageConsumer.Task.Result.Data.Contents.Length); + + Assert.Equal(@event.Length, + ((byte[])testMessageConsumer.Task.Result.ApplicationProperties["myArray"]).Length); + await consumer.Close(); + await system.DeleteStream(stream); + await system.Close(); + } } }