diff --git a/.gitignore b/.gitignore index b5129890..a6a77152 100644 --- a/.gitignore +++ b/.gitignore @@ -119,3 +119,7 @@ projects/Unit*/TestResult.xml # Vim .sw? .*.sw? + +#tests +Tests/coverage.* + diff --git a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs index 6c4ec384..408a3794 100644 --- a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs +++ b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs @@ -30,10 +30,31 @@ internal static int WriteAny(Span seq, object value) bool bo => WriteBool(seq, bo), byte[] bArr => bArr.Length == 0 ? WriteNull(seq) : WriteBytes(seq, bArr), DateTime d => d == DateTime.MinValue ? WriteNull(seq) : WriteTimestamp(seq, d), + Symbol s => s.IsNull ? WriteNull(seq) : WriteSymbol(seq, s), _ => throw new AmqpParseException($"WriteAny Invalid type {value}") }; } + private static int WriteSymbol(Span seq, Symbol symbol) + { + var len = s_encoding.GetByteCount(symbol.Value); + var offset = 0; + // Sym8 + if (len <= byte.MaxValue) + { + offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Sym8); + offset += WireFormatting.WriteByte(seq[offset..], (byte)len); + offset += s_encoding.GetBytes(symbol.Value, seq[offset..]); + return offset; + } + + // Sym32 + offset += WireFormatting.WriteByte(seq[offset..], FormatCode.Sym32); + offset += WireFormatting.WriteInt32(seq[offset..], len); + offset += s_encoding.GetBytes(symbol.Value, seq[offset..]); + return offset; + } + private static int WriteString(Span seq, string value) { var len = s_encoding.GetByteCount(value); @@ -232,7 +253,7 @@ private static int GetStringSize(string value) { 0 => 1, // 0x40 <= byte.MaxValue => len + 1 + //marker 1 byte FormatCode.Vbin8 - 1, + 1, _ => len + 1 //marker 1 byte FormatCode.Vbin32 + 4 }; diff --git a/RabbitMQ.Stream.Client/AMQP/Map.cs b/RabbitMQ.Stream.Client/AMQP/Map.cs index 81f87640..5c6dbfa0 100644 --- a/RabbitMQ.Stream.Client/AMQP/Map.cs +++ b/RabbitMQ.Stream.Client/AMQP/Map.cs @@ -73,7 +73,8 @@ public int Write(Span span) { var offset = DescribedFormatCode.Write(span, MapDataCode); offset += WireFormatting.WriteByte(span[offset..], FormatCode.Map32); - offset += WireFormatting.WriteUInt32(span[offset..], (uint)MapSize()); // MapSize + offset += WireFormatting.WriteUInt32(span[offset..], + (uint)MapSize() + DescribedFormatCode.Size + sizeof(byte)); // MapSize + DescribedFormatCode + FormatCode offset += WireFormatting.WriteUInt32(span[offset..], (uint)Count * 2); // pair values foreach (var (key, value) in this) { diff --git a/RabbitMQ.Stream.Client/AMQP/Properties.cs b/RabbitMQ.Stream.Client/AMQP/Properties.cs index 4b65db94..56433268 100644 --- a/RabbitMQ.Stream.Client/AMQP/Properties.cs +++ b/RabbitMQ.Stream.Client/AMQP/Properties.cs @@ -133,7 +133,9 @@ public int Write(Span span) { var offset = DescribedFormatCode.Write(span, DescribedFormatCode.MessageProperties); offset += WireFormatting.WriteByte(span[offset..], FormatCode.List32); - offset += WireFormatting.WriteUInt32(span[offset..], (uint)PropertySize()); // PropertySize + offset += WireFormatting.WriteUInt32(span[offset..], + (uint)PropertySize() + DescribedFormatCode.Size + + sizeof(byte)); // PropertySize + DescribedFormatCode.Size + sizeof(FormatCode.List32) offset += WireFormatting.WriteUInt32(span[offset..], 13); // field numbers offset += AmqpWireFormatting.WriteAny(span[offset..], MessageId); offset += AmqpWireFormatting.WriteAny(span[offset..], UserId); @@ -141,8 +143,8 @@ public int Write(Span span) offset += AmqpWireFormatting.WriteAny(span[offset..], Subject); offset += AmqpWireFormatting.WriteAny(span[offset..], ReplyTo); offset += AmqpWireFormatting.WriteAny(span[offset..], CorrelationId); - offset += AmqpWireFormatting.WriteAny(span[offset..], ContentType); - offset += AmqpWireFormatting.WriteAny(span[offset..], ContentEncoding); + offset += AmqpWireFormatting.WriteAny(span[offset..], new Symbol(ContentType)); + offset += AmqpWireFormatting.WriteAny(span[offset..], new Symbol(ContentEncoding)); offset += AmqpWireFormatting.WriteAny(span[offset..], AbsoluteExpiryTime); offset += AmqpWireFormatting.WriteAny(span[offset..], CreationTime); offset += AmqpWireFormatting.WriteAny(span[offset..], GroupId); diff --git a/RabbitMQ.Stream.Client/AMQP/Symbol.cs b/RabbitMQ.Stream.Client/AMQP/Symbol.cs new file mode 100644 index 00000000..bed16c2a --- /dev/null +++ b/RabbitMQ.Stream.Client/AMQP/Symbol.cs @@ -0,0 +1,36 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2023 VMware, Inc. + +namespace RabbitMQ.Stream.Client.AMQP; + +internal class Symbol +{ + private readonly string _value; + + public Symbol(string value) + { + _value = value; + } + + public string Value + { + get + { + return _value; + } + } + + public bool IsNull + { + get + { + return string.IsNullOrWhiteSpace(_value); + } + } + + public override string ToString() + { + return Value; + } +} diff --git a/Tests/FromToAMQPTests.cs b/Tests/FromToAMQPTests.cs new file mode 100644 index 00000000..3e80631b --- /dev/null +++ b/Tests/FromToAMQPTests.cs @@ -0,0 +1,261 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2023 VMware, Inc. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using RabbitMQ.Stream.Client; +using RabbitMQ.Stream.Client.AMQP; +using RabbitMQ.Stream.Client.Reliable; +using Xunit.Abstractions; + +namespace Tests; + +using Xunit; + +public class FromToAmqpTests +{ + private readonly ITestOutputHelper _testOutputHelper; + + public FromToAmqpTests(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + + /// + /// This test is to ensure that the conversion from AMQP to Stream AMQP 1.0 is correct. + /// Stream sends the message and AMQP client reads it. + /// In this case the server decodes anc converts the message + /// + [Fact] + public async void Amqp091ShouldReadTheAmqp10Properties() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var producer = await Producer.Create(new ProducerConfig(system, stream)); + await producer.Send(new Message(Encoding.ASCII.GetBytes("FromStream")) + { + Properties = new Properties() + { + MessageId = "年 6 月", + CorrelationId = "10000_00000", + ContentType = "text/plain", + ContentEncoding = "utf-8", + UserId = Encoding.ASCII.GetBytes("MY_USER_ID"), + GroupSequence = 601, + ReplyToGroupId = "ReplyToGroupId", + GroupId = "GroupId", + }, + ApplicationProperties = new ApplicationProperties() { { "stream_key", "stream_value" } } + }); + + var factory = new ConnectionFactory(); + using var connection = factory.CreateConnection(); + var channel = connection.CreateModel(); + var consumer = new EventingBasicConsumer(channel); + var tcs = new TaskCompletionSource(); + consumer.Received += (sender, ea) => + { + tcs.SetResult(ea); + channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); + }; + channel.BasicQos(0, 100, false); + channel.BasicConsume(stream, false, "consumerTag", + arguments: new Dictionary() { { "x-stream-offset", "first" } }, consumer); + var result = tcs.Task.Result; + Assert.Equal("FromStream", Encoding.ASCII.GetString(result.Body.ToArray())); + Assert.Equal("年 6 月", result.BasicProperties.MessageId); + Assert.Equal("10000_00000", result.BasicProperties.CorrelationId); + Assert.Equal("text/plain", result.BasicProperties.ContentType); + Assert.Equal("utf-8", result.BasicProperties.ContentEncoding); + Assert.Equal("MY_USER_ID", result.BasicProperties.UserId); + Assert.Equal("stream_value", + Encoding.ASCII.GetString(result.BasicProperties.Headers["stream_key"] as byte[] ?? Array.Empty())); + channel.QueueDelete(stream); + channel.Close(); + } + + /// + /// This test is to ensure that the conversion from AMQP 091 to Stream AMQP 1.0 is correct. + /// AMQP sends the message and Stream has to read it. + /// In this case the server decodes anc converts the message + /// + [Fact] + public async void Amqp10ShouldReadTheAmqp019Properties() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + + var factory = new ConnectionFactory(); + using var connection = factory.CreateConnection(); + var channel = connection.CreateModel(); + var properties = channel.CreateBasicProperties(); + + properties.MessageId = "年 6 月"; + properties.CorrelationId = "10000_00000"; + properties.ContentType = "text/plain"; + properties.ContentEncoding = "utf-8"; + properties.UserId = "guest"; + properties.Headers = new Dictionary() + { + {"stream_key", "stream_value"}, + {"stream_key4", "Alan Mathison Turing(1912 年 6 月 23 日"}, + }; + channel.BasicPublish("", stream, properties, Encoding.ASCII.GetBytes("FromAMQP")); + var tcs = new TaskCompletionSource(); + + var consumer = await Consumer.Create(new ConsumerConfig(system, stream) + { + OffsetSpec = new OffsetTypeFirst(), + MessageHandler = async (_, _, _, message) => + { + tcs.SetResult(message); + await Task.CompletedTask; + } + }); + + new Utils(_testOutputHelper).WaitUntilTaskCompletes(tcs); + var result = tcs.Task.Result; + Assert.Equal("FromAMQP", Encoding.ASCII.GetString(result.Data.Contents.ToArray())); + Assert.Equal("年 6 月", result.Properties.MessageId); + Assert.Equal("10000_00000", result.Properties.CorrelationId); + Assert.Equal("text/plain", result.Properties.ContentType); + Assert.Equal("utf-8", result.Properties.ContentEncoding); + Assert.Equal(Encoding.Default.GetBytes("guest"), result.Properties.UserId); + Assert.Equal("stream_value", result.ApplicationProperties["stream_key"]); + Assert.Equal("Alan Mathison Turing(1912 年 6 月 23 日", result.ApplicationProperties["stream_key4"]); + await consumer.Close(); + await system.DeleteStream(stream); + await system.Close(); + } + + /// + /// The file message_from_version_1_0_0 was generated with the 1.0.0 version of the client + /// due of this issue https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/211 the write is changed + /// but the read must be compatible + /// + [Fact] + public void DecodeMessageFrom100Version() + { + var data = SystemUtils.GetFileContent("message_from_version_1_0_0"); + var reader = new SequenceReader(new ReadOnlySequence(data)); + var msg = Message.From(ref reader, (uint)reader.Length); + Assert.Equal("Message100", Encoding.ASCII.GetString(msg.Data.Contents.ToArray())); + Assert.Equal("MyMessageId", msg.Properties.MessageId); + Assert.Equal("MyCorrelationId", msg.Properties.CorrelationId); + Assert.Equal("text/plain", msg.Properties.ContentType); + Assert.Equal("utf-8", msg.Properties.ContentEncoding); + Assert.Equal("guest", Encoding.UTF8.GetString(msg.Properties.UserId)); + Assert.Equal((uint)9999, msg.Properties.GroupSequence); + Assert.Equal("MyReplyToGroupId", msg.Properties.ReplyToGroupId); + Assert.Equal("value", msg.ApplicationProperties["key_string"]); + Assert.Equal(1111, msg.ApplicationProperties["key2_int"]); + Assert.Equal(10_000_000_000, msg.ApplicationProperties["key2_decimal"]); + Assert.Equal(true, msg.ApplicationProperties["key2_bool"]); + } + + [Fact] + public async void Amqp091ShouldReadTheAmqp10Properties1000Messages() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var producer = await Producer.Create(new ProducerConfig(system, stream)); + const int NumberOfMessages = 1000; + for (var i = 0; i < NumberOfMessages; i++) + { + await producer.Send(new Message(Encoding.ASCII.GetBytes($"FromStream{i}")) + { + Properties = new Properties() + { + MessageId = $"Alan Mathison Turing(1912 年 6 月 23 日 - 1954 年 6 月 7 日)是英国数学家、计算机科学家、逻辑学家、密码分析家、哲学家和理论生物学家。 [6] 图灵在理论计算机科学的发展中具有很大的影响力,用图灵机提供了算法和计算概念的形式化,可以被认为是通用计算机的模型。[7][8][9] 他被广泛认为是理论计算机科学和人工智能之父{i}", + CorrelationId = "10000_00000", + ContentType = "text/plain", + ContentEncoding = "utf-8", + UserId = Encoding.ASCII.GetBytes("MY_USER_ID"), + GroupSequence = 601, + ReplyToGroupId = "ReplyToGroupId", + GroupId = "GroupId", + + }, + ApplicationProperties = new ApplicationProperties() + { + {"stream_key", "stream_value"}, + {"stream_key2", 100}, + {"stream_key3", 10_000_009}, + {"stream_key4", "Alan Mathison Turing(1912 年 6 月 23 日"}, + } + }); + } + + var factory = new ConnectionFactory(); + using var connection = factory.CreateConnection(); + var channel = connection.CreateModel(); + var consumer = new EventingBasicConsumer(channel); + var consumed = 0; + var tcs = new TaskCompletionSource(); + consumer.Received += (sender, ea) => + { + if (Interlocked.Increment(ref consumed) == NumberOfMessages) + { + tcs.SetResult(consumed); + } + + channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); + }; + channel.BasicQos(0, 100, false); + channel.BasicConsume(stream, false, "consumerTag", + arguments: new Dictionary() { { "x-stream-offset", "first" } }, consumer); + new Utils(_testOutputHelper).WaitUntilTaskCompletes(tcs); + Assert.Equal(NumberOfMessages, tcs.Task.Result); + channel.QueueDelete(stream); + channel.Close(); + } + + [Fact] + public async void Amqp10ShouldReadTheAmqp019Properties1000Messages() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + + var factory = new ConnectionFactory(); + using var connection = factory.CreateConnection(); + var channel = connection.CreateModel(); + var properties = channel.CreateBasicProperties(); + const int NumberOfMessages = 1000; + for (var i = 0; i < NumberOfMessages; i++) + { + properties.MessageId = $"messageId{i}"; + properties.CorrelationId = "10000_00000"; + properties.ContentType = "text/plain"; + properties.ContentEncoding = "utf-8"; + properties.UserId = "guest"; + properties.Headers = new Dictionary() { { "stream_key", "stream_value" } }; + channel.BasicPublish("", stream, properties, Encoding.ASCII.GetBytes($"FromAMQP{i}")); + } + + var tcs = new TaskCompletionSource(); + var consumed = 0; + var consumer = await Consumer.Create(new ConsumerConfig(system, stream) + { + OffsetSpec = new OffsetTypeFirst(), + MessageHandler = async (_, _, _, message) => + { + if (Interlocked.Increment(ref consumed) == NumberOfMessages) + { + tcs.SetResult(consumed); + } + + await Task.CompletedTask; + } + }); + + new Utils(_testOutputHelper).WaitUntilTaskCompletes(tcs); + var result = tcs.Task.Result; + Assert.Equal(NumberOfMessages, result); + await consumer.Close(); + await system.DeleteStream(stream); + await system.Close(); + } +} diff --git a/Tests/Resources/message_from_version_1_0_0 b/Tests/Resources/message_from_version_1_0_0 new file mode 100644 index 00000000..9e2c2249 Binary files /dev/null and b/Tests/Resources/message_from_version_1_0_0 differ diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj index c7ad64a9..a7b47869 100644 --- a/Tests/Tests.csproj +++ b/Tests/Tests.csproj @@ -9,6 +9,7 @@ + runtime; build; native; contentfiles; analyzers; buildtransitive