From c1b99617e751ff3cbcd476bb40dc242ea9e6815f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 29 Mar 2023 16:07:22 +0200 Subject: [PATCH] Run the Consumer MessageHandler in a Task (#250) * Make the message handler async, avoiding blocking the socket thread. * reduce the initial credits to two * add the documentation * Use `Random.Shared` * Reduce the noise log when the consumer is removed --------- Signed-off-by: Gabriele Santomaggio Co-authored-by: Luke Bakken --- RabbitMQ.Stream.Client/Client.cs | 6 +- RabbitMQ.Stream.Client/Connection.cs | 34 ++- RabbitMQ.Stream.Client/Consts.cs | 10 + RabbitMQ.Stream.Client/Deliver.cs | 35 ++- RabbitMQ.Stream.Client/Message.cs | 12 + RabbitMQ.Stream.Client/PublicAPI.Shipped.txt | 2 - .../PublicAPI.Unshipped.txt | 5 +- RabbitMQ.Stream.Client/RawConsumer.cs | 280 +++++++++++++----- RabbitMQ.Stream.Client/Reliable/Consumer.cs | 1 + RabbitMQ.Stream.Client/RoutingClient.cs | 17 +- RabbitMQ.Stream.Client/WireFormatting.cs | 10 + Tests/ClientTests.cs | 4 +- Tests/UnitTests.cs | 4 +- docs/asciidoc/api.adoc | 10 +- kubernetes/stream_cluster.yaml | 6 +- 15 files changed, 325 insertions(+), 111 deletions(-) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 40689dba..f7914af5 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -198,9 +198,8 @@ private async Task OnConnectionClosed(string reason) public static async Task Create(ClientParameters parameters, ILogger logger = null) { var client = new Client(parameters, logger); - client.connection = await Connection - .Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl) + .Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger) .ConfigureAwait(false); // exchange properties @@ -426,10 +425,11 @@ private async Task HandleIncoming(Memory frameMemory) .ConfigureAwait(false); if (off == null) { - _logger.LogWarning( + _logger?.LogWarning( "ConsumerUpdateHandler can't returned null, a default offsetType (OffsetTypeNext) will be used"); off = new OffsetTypeNext(); } + // event the consumer is not active, we need to send a ConsumerUpdateResponse // by protocol definition. the offsetType can't be null so we use OffsetTypeNext as default await ConsumerUpdateResponse( diff --git a/RabbitMQ.Stream.Client/Connection.cs b/RabbitMQ.Stream.Client/Connection.cs index bf32c0e3..47fbd410 100644 --- a/RabbitMQ.Stream.Client/Connection.cs +++ b/RabbitMQ.Stream.Client/Connection.cs @@ -10,6 +10,7 @@ using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace RabbitMQ.Stream.Client { @@ -25,6 +26,7 @@ public class Connection : IDisposable private int numFrames; private bool isClosed = false; private bool _disposedValue; + private readonly ILogger _logger; internal int NumFrames => numFrames; @@ -36,8 +38,9 @@ private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, Ssl } private Connection(Socket socket, Func, Task> callback, - Func closedCallBack, SslOption sslOption) + Func closedCallBack, SslOption sslOption, ILogger logger) { + _logger = logger; this.socket = socket; commandCallback = callback; closedCallback = closedCallBack; @@ -51,7 +54,7 @@ private Connection(Socket socket, Func, Task> callback, } public static async Task Create(EndPoint endpoint, Func, Task> commandCallback, - Func closedCallBack, SslOption sslOption) + Func closedCallBack, SslOption sslOption, ILogger logger) { var socket = new Socket(SocketType.Stream, ProtocolType.Tcp); socket.NoDelay = true; @@ -80,7 +83,7 @@ public static async Task Create(EndPoint endpoint, Func } } - return new Connection(socket, commandCallback, closedCallBack, sslOption); + return new Connection(socket, commandCallback, closedCallBack, sslOption, logger); } public async ValueTask Write(T command) where T : struct, ICommand @@ -95,6 +98,10 @@ public async ValueTask Write(T command) where T : struct, ICommand private async Task WriteCommand(T command) where T : struct, ICommand { + if (isClosed) + { + throw new InvalidOperationException("Connection is closed"); + } // Only one thread should be able to write to the output pipeline at a time. await _writeLock.WaitAsync().ConfigureAwait(false); try @@ -149,6 +156,15 @@ private async Task ProcessIncomingFrames() reader.AdvanceTo(buffer.Start, buffer.End); } } + catch (OperationCanceledException e) + { + caught = e; + // We don't need to log as error this exception + // It is raised when the socket is closed due of cancellation token + // from the producer or consumer class + // we leave it as debug to avoid noise in the logs + _logger?.LogDebug("Operation Canceled Exception. TCP Connection Closed"); + } catch (Exception e) { caught = e; @@ -156,7 +172,10 @@ private async Task ProcessIncomingFrames() // closedCallback event. // It is useful to trace the error, but at this point // the socket is closed maybe not in the correct way - Debug.WriteLine($"Error reading the socket, error: {e}"); + if (!isClosed) + { + _logger?.LogError(e, "Error reading the socket"); + } } finally { @@ -164,8 +183,9 @@ private async Task ProcessIncomingFrames() // Mark the PipeReader as complete await reader.CompleteAsync(caught).ConfigureAwait(false); var t = closedCallback?.Invoke("TCP Connection Closed")!; - await t.ConfigureAwait(false); - Debug.WriteLine("TCP Connection Closed"); + if (t != null) + await t.ConfigureAwait(false); + _logger?.LogDebug("TCP Connection Closed"); } } @@ -204,7 +224,7 @@ public void Dispose() socket.Dispose(); if (!_incomingFramesTask.Wait(Consts.ShortWait)) { - Debug.WriteLine($"frame reader task did not exit in {Consts.ShortWait}"); + _logger?.LogError("frame reader task did not exit in {ShortWait}", Consts.ShortWait); } } finally diff --git a/RabbitMQ.Stream.Client/Consts.cs b/RabbitMQ.Stream.Client/Consts.cs index fd3df168..54e89ebc 100644 --- a/RabbitMQ.Stream.Client/Consts.cs +++ b/RabbitMQ.Stream.Client/Consts.cs @@ -14,5 +14,15 @@ internal static class Consts internal static readonly TimeSpan ShortWait = TimeSpan.FromSeconds(1); internal static readonly TimeSpan MidWait = TimeSpan.FromSeconds(3); internal static readonly TimeSpan LongWait = TimeSpan.FromSeconds(10); + + internal static int RandomShort() + { + return Random.Shared.Next(500, 1500); + } + + internal static int RandomMid() + { + return Random.Shared.Next(1000, 2500); + } } } diff --git a/RabbitMQ.Stream.Client/Deliver.cs b/RabbitMQ.Stream.Client/Deliver.cs index 5d581b43..d4d7cd87 100644 --- a/RabbitMQ.Stream.Client/Deliver.cs +++ b/RabbitMQ.Stream.Client/Deliver.cs @@ -51,7 +51,7 @@ internal readonly struct SubEntryChunk private SubEntryChunk(byte compress, ushort numRecordsInBatch, uint unCompressedDataSize, uint dataLen, - ReadOnlySequence data) + Memory data) { compressValue = compress; NumRecordsInBatch = numRecordsInBatch; @@ -67,7 +67,18 @@ private SubEntryChunk(byte compress, public uint UnCompressedDataSize { get; } public uint DataLen { get; } - public ReadOnlySequence Data { get; } + public Memory Data { get; } + + // This wrapper was added to be used in async methods + // where the SequenceReader is not available + // see RawConsumer:ParseChunk for more details + // at some point we could remove this wrapper + // and use system.io.pipeline instead of SequenceReader + internal static int Read(ref ReadOnlySequence seq, byte entryType, out SubEntryChunk subEntryChunk) + { + var reader = new SequenceReader(seq); + return Read(ref reader, entryType, out subEntryChunk); + } internal static int Read(ref SequenceReader reader, byte entryType, out SubEntryChunk subEntryChunk) { @@ -77,14 +88,18 @@ internal static int Read(ref SequenceReader reader, byte entryType, out Su // Determinate what kind of the compression it is using // See Compress:CompressMode var compress = (byte)((byte)(entryType & 0x70) >> 4); - offset++; // Data contains the subEntryChunk information // We need to pass it to the subEntryChunk that will decode the information + var memory = + ArrayPool.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen); var data = reader.Sequence.Slice(reader.Consumed, dataLen); + data.CopyTo(memory.Span); + subEntryChunk = - new SubEntryChunk(compress, numRecordsInBatch, unCompressedDataSize, dataLen, data); - offset += (int)dataLen; + new SubEntryChunk(compress, numRecordsInBatch, unCompressedDataSize, dataLen, memory); + offset += memory.Length; + // Here we need to advance the reader to the datalen // Since Data is passed to the subEntryChunk. reader.Advance(dataLen); @@ -101,7 +116,7 @@ private Chunk(byte magicVersion, ulong epoch, ulong chunkId, int crc, - ReadOnlySequence data) + Memory data) { MagicVersion = magicVersion; NumEntries = numEntries; @@ -121,7 +136,7 @@ private Chunk(byte magicVersion, public ulong Epoch { get; } public ulong ChunkId { get; } public int Crc { get; } - public ReadOnlySequence Data { get; } + public Memory Data { get; } internal static int Read(ReadOnlySequence frame, out Chunk chunk) { @@ -138,9 +153,11 @@ internal static int Read(ReadOnlySequence frame, out Chunk chunk) offset += WireFormatting.ReadUInt32(ref reader, out _); // offset += 4; // reserved offset += WireFormatting.ReadUInt32(ref reader, out _); // reserved + var memory = + ArrayPool.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen); var data = reader.Sequence.Slice(reader.Consumed, dataLen); - offset += (int)dataLen; - chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, data); + data.CopyTo(memory.Span); + chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, memory); return offset; } } diff --git a/RabbitMQ.Stream.Client/Message.cs b/RabbitMQ.Stream.Client/Message.cs index e839c67c..1f620ac4 100644 --- a/RabbitMQ.Stream.Client/Message.cs +++ b/RabbitMQ.Stream.Client/Message.cs @@ -71,6 +71,18 @@ public ReadOnlySequence Serialize() return new ReadOnlySequence(data); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + // This wrapper was added to be used in async methods + // where the SequenceReader is not available + // see RawConsumer:ParseChunk for more details + // at some point we could remove this wrapper + // and use system.io.pipeline instead of SequenceReader + public static Message From(ref ReadOnlySequence seq, uint len) + { + var reader = new SequenceReader(seq); + return From(ref reader, len); + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public static Message From(ref SequenceReader reader, uint len) { diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index cf089bd4..e55eac57 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -159,7 +159,6 @@ RabbitMQ.Stream.Client.Chunk RabbitMQ.Stream.Client.Chunk.Chunk() -> void RabbitMQ.Stream.Client.Chunk.ChunkId.get -> ulong RabbitMQ.Stream.Client.Chunk.Crc.get -> int -RabbitMQ.Stream.Client.Chunk.Data.get -> System.Buffers.ReadOnlySequence RabbitMQ.Stream.Client.Chunk.Epoch.get -> ulong RabbitMQ.Stream.Client.Chunk.NumEntries.get -> ushort RabbitMQ.Stream.Client.Chunk.NumRecords.get -> uint @@ -803,7 +802,6 @@ static RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.Write(System.Span s static RabbitMQ.Stream.Client.Client.Create(RabbitMQ.Stream.Client.ClientParameters parameters, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.CompressionHelper.Compress(System.Collections.Generic.List messages, RabbitMQ.Stream.Client.CompressionType compressionType) -> RabbitMQ.Stream.Client.ICompressionCodec static RabbitMQ.Stream.Client.CompressionHelper.UnCompress(RabbitMQ.Stream.Client.CompressionType compressionType, System.Buffers.ReadOnlySequence source, uint dataLen, uint unCompressedDataSize) -> System.Buffers.ReadOnlySequence -static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func, System.Threading.Tasks.Task> commandCallback, System.Func closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.LeaderLocator.ClientLocal.get -> RabbitMQ.Stream.Client.LeaderLocator static RabbitMQ.Stream.Client.LeaderLocator.LeastLeaders.get -> RabbitMQ.Stream.Client.LeaderLocator static RabbitMQ.Stream.Client.LeaderLocator.Random.get -> RabbitMQ.Stream.Client.LeaderLocator diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 9ee92db4..ab6f0fe9 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -1,6 +1,7 @@ const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken +RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer @@ -24,4 +25,6 @@ RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary statistic) -> void RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task -static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task \ No newline at end of file +static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func, System.Threading.Tasks.Task> commandCallback, System.Func closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task +static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence seq, uint len) -> RabbitMQ.Stream.Client.Message +static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 9c7dfcef..3b8c62a0 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -7,6 +7,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -96,12 +97,25 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable private readonly RawConsumerConfig _config; private byte _subscriberId; private readonly ILogger _logger; + private readonly Channel _chunksBuffer; + private const int InitialCredit = 2; private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = null) { + // _chunksBuffer is a channel that is used to buffer the chunks + _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(InitialCredit) + { + AllowSynchronousContinuations = false, + SingleReader = true, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait + }); + IsPromotedAsActive = true; _client = client; _config = config; _logger = logger ?? NullLogger.Instance; + + ProcessChunks(); } // if a user specify a custom offset @@ -123,6 +137,15 @@ public async Task StoreOffset(ulong offset) await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false); } + // It is needed to understand if the consumer is active or not + // by default is active + // in case of single active consumer can be not active + // it is important to skip the messages in the chuck that + // it is in progress. In this way the promotion will be faster + // avoiding to block the consumer handler if the user put some + // long task + private bool IsPromotedAsActive { get; set; } + public static async Task Create( ClientParameters clientParameters, RawConsumerConfig config, @@ -130,93 +153,200 @@ public static async Task Create( ILogger logger = null ) { - var client = await RoutingHelper.LookupRandomConnection(clientParameters, metaStreamInfo, logger).ConfigureAwait(false); + var client = await RoutingHelper.LookupRandomConnection(clientParameters, metaStreamInfo, logger) + .ConfigureAwait(false); var consumer = new RawConsumer((Client)client, config, logger); await consumer.Init().ConfigureAwait(false); return consumer; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void ParseChunk(Chunk chunk) + private async Task ParseChunk(Chunk chunk) { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void DispatchMessage(ref SequenceReader sequenceReader, ulong i) + try { - WireFormatting.ReadUInt32(ref sequenceReader, out var len); - try + [MethodImpl(MethodImplOptions.AggressiveInlining)] + Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int compressOffset) + { + try + { + var slice = unCompressedData.Slice(compressOffset, 4); + compressOffset += WireFormatting.ReadUInt32(ref slice, out var len); + slice = unCompressedData.Slice(compressOffset, len); + compressOffset += (int)len; + + // Here we use the Message.From(ref ReadOnlySequence seq ..) method to parse the message + // instead of the Message From(ref SequenceReader reader ..) method + // Since the ParseChunk is async and we cannot use the ref SequenceReader reader + // See https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 for more details + + var message = Message.From(ref slice, len); + return message; + } + catch (Exception e) + { + _logger.LogError(e, "Error while parsing message. Message will be skipped. " + + "Please report this issue to the RabbitMQ team on GitHub {Repo}", + Consts.RabbitMQClientRepo); + } + + return null; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + async Task DispatchMessage(Message message, ulong i) { - var message = Message.From(ref sequenceReader, len); - message.MessageOffset = chunk.ChunkId + i; - if (MaybeDispatch(message.MessageOffset)) + try { - if (Token.IsCancellationRequested) + message.MessageOffset = chunk.ChunkId + i; + if (MaybeDispatch(message.MessageOffset)) { - return; + if (!Token.IsCancellationRequested) + { + // it is usually active + // it is useful only in single active consumer + if (IsPromotedAsActive) + { + await _config.MessageHandler(this, + new MessageContext(message.MessageOffset, + TimeSpan.FromMilliseconds(chunk.Timestamp)), + message).ConfigureAwait(false); + } + else + { + _logger?.LogDebug( + "The consumer is not active for the stream {ConfigStream}. message won't dispatched", + _config.Stream); + } + } } + } - _config.MessageHandler(this, - new MessageContext(message.MessageOffset, TimeSpan.FromMilliseconds(chunk.Timestamp)), - message).Wait(Token); + catch (OperationCanceledException) + { + _logger?.LogWarning( + "OperationCanceledException. The consumer id: {SubscriberId} reference: {Reference} has been closed while consuming messages", + _subscriberId, _config.Reference); + } + catch (Exception e) + { + _logger?.LogError(e, "Error while processing chunk: {ChunkId}", chunk.ChunkId); } } - catch (ArgumentOutOfRangeException e) - { - _logger.LogError(e, "Unexpected error while parsing message. Message will be skipped. " + - "Please report this issue to the RabbitMQ team on GitHub {Repo}", - Consts.RabbitMQClientRepo); - } - catch (OperationCanceledException) - { - _logger.LogWarning( - "OperationCanceledException. The consumer id: {SubscriberId} reference: {Reference} has been closed while consuming messages", - _subscriberId, _config.Reference); - } - catch (Exception e) + + var chunkBuffer = new ReadOnlySequence(chunk.Data); + + var numRecords = chunk.NumRecords; + var offset = 0; // it is used to calculate the offset in the chunk. + ulong + messageOffset = 0; // it is used to calculate the message offset. It is the chunkId + messageOffset + while (numRecords != 0) { - _logger.LogError(e, "Error while processing chunk: {ChunkId}", chunk.ChunkId); + // (entryType & 0x80) == 0 is standard entry + // (entryType & 0x80) != 0 is compress entry (used for subEntry) + // In Case of subEntry the entryType is the compression type + // In case of standard entry the entryType si part of the message + var slice = chunkBuffer.Slice(offset, 1); + offset += WireFormatting.ReadByte(ref slice, out var entryType); + var isSubEntryBatch = (entryType & 0x80) != 0; + if (isSubEntryBatch) + { + // it means that it is a sub-entry batch + // We continue to read from the stream to decode the subEntryChunk values + slice = chunkBuffer.Slice(offset); + + offset += SubEntryChunk.Read(ref slice, entryType, out var subEntryChunk); + var unCompressedData = CompressionHelper.UnCompress( + subEntryChunk.CompressionType, + new ReadOnlySequence(subEntryChunk.Data), + subEntryChunk.DataLen, + subEntryChunk.UnCompressedDataSize); + + var compressOffset = 0; + for (ulong z = 0; z < subEntryChunk.NumRecordsInBatch; z++) + { + var message = MessageFromSequence(ref unCompressedData, ref compressOffset); + await DispatchMessage(message, messageOffset++).ConfigureAwait(false); + } + + numRecords -= subEntryChunk.NumRecordsInBatch; + } + else + { + // Ok the entry is a standard entry + // we need to rewind the offset to -1 to one byte to decode the messages + offset--; + var message = MessageFromSequence(ref chunkBuffer, ref offset); + await DispatchMessage(message, messageOffset++).ConfigureAwait(false); + numRecords--; + } } } + catch (Exception e) + { + _logger?.LogError(e, "Error while parsing chunk: {ChunkId}", chunk.ChunkId); + } + } - var reader = new SequenceReader(chunk.Data); - - var numRecords = chunk.NumRecords; - ulong messageOffset = 0; // it is used to calculate the message offset. - while (numRecords != 0) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ProcessChunks() + { + Task.Run(async () => { - // (entryType & 0x80) == 0 is standard entry - // (entryType & 0x80) != 0 is compress entry (used for subEntry) - // In Case of subEntry the entryType is the compression type - // In case of standard entry the entryType si part of the message - WireFormatting.ReadByte(ref reader, out var entryType); - var isSubEntryBatch = (entryType & 0x80) != 0; - if (isSubEntryBatch) + try { - // it means that it is a subentry batch - // We continue to read from the stream to decode the subEntryChunk values - SubEntryChunk.Read(ref reader, entryType, out var subEntryChunk); - var unCompressedData = CompressionHelper.UnCompress( - subEntryChunk.CompressionType, - subEntryChunk.Data, - subEntryChunk.DataLen, - subEntryChunk.UnCompressedDataSize); - var readerUnCompressed = new SequenceReader(unCompressedData); - - for (ulong z = 0; z < subEntryChunk.NumRecordsInBatch; z++) + while (await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) { - DispatchMessage(ref readerUnCompressed, messageOffset++); + while (_chunksBuffer.Reader.TryRead(out var chunk)) + { + // We send the credit to the server to allow the server to send more messages + // we request the credit before process the check to keep the network busy + try + { + await _client.Credit(_subscriberId, 1).ConfigureAwait(false); + } + catch (InvalidOperationException) + { + // The client has been closed + // Suppose a scenario where the client is closed and the ProcessChunks task is still running + // we remove the the subscriber from the client and we close the client + // The ProcessChunks task will try to send the credit to the server + // The client will throw an InvalidOperationException + // since the connection is closed + // In this case we don't want to log the error to avoid log noise + _logger?.LogDebug("Can't send the credit: The TCP client has been closed"); + break; + } + + // We need to check the cancellation token status because the exception can be thrown + // because the cancellation token has been cancelled + // the consumer could take time to handle a single + // this check is a bit redundant but it is useful to avoid to process the chunk + // and close the task + if (Token.IsCancellationRequested) + break; + + await ParseChunk(chunk).ConfigureAwait(false); + } } - numRecords -= subEntryChunk.NumRecordsInBatch; + _logger?.LogDebug("The ProcessChunks task has been closed"); } - else + catch (Exception e) { - // Ok the entry is a standard entry - // we need to rewind the stream to one byte to decode the messages - reader.Rewind(1); - DispatchMessage(ref reader, messageOffset++); - numRecords--; + // We need to check the cancellation token status because the exception can be thrown + // because the cancellation token has been cancelled + // In this case we don't want to log the error + if (Token.IsCancellationRequested) + { + _logger?.LogDebug("The ProcessChunks task has been closed due to cancellation"); + return; + } + + _logger?.LogError(e, + "Error while process chunks. The ProcessChunks task will be closed"); } - } + }, Token); } private async Task Init() @@ -254,7 +384,6 @@ private async Task Init() // this the default value for the consumer. _config.StoredOffsetSpec = _config.OffsetSpec; - const ushort InitialCredit = 10; var (consumerId, response) = await _client.Subscribe( _config, @@ -262,14 +391,14 @@ private async Task Init() consumerProperties, async deliver => { - // receive the chunk from the deliver - // before parse the chunk, we ask for more credits - // in thi way we keep the network busy - await _client.Credit(deliver.SubscriptionId, 1).ConfigureAwait(false); - // parse the chunk, we have another function because the sequence reader - // can't be used in async context - ParseChunk(deliver.Chunk); - }, async b => + // Send the chunk to the _chunksBuffer + // in this way the chunks are processed in a separate thread + // this wont' block the socket thread + // introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 + if (Token.IsCancellationRequested) + return; + await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false); + }, async promotedAsActive => { if (_config.ConsumerUpdateListener != null) { @@ -278,9 +407,15 @@ private async Task Init() _config.StoredOffsetSpec = await _config.ConsumerUpdateListener( _config.Reference, _config.Stream, - b).ConfigureAwait(false); + promotedAsActive).ConfigureAwait(false); } + // Here we set the promotion status + // important for the dispatcher messages + IsPromotedAsActive = promotedAsActive; + _logger?.LogDebug("The consumer active status is: {IsActive} for the stream: {Stream} ", + IsPromotedAsActive, + _config.Stream); return _config.StoredOffsetSpec; } ).ConfigureAwait(false); @@ -298,6 +433,7 @@ public async Task Close() // this unlock the consumer if it is waiting for a message // see DispatchMessage method where the token is used MaybeCancelToken(); + if (_client.IsClosed) { return ResponseCode.Ok; @@ -310,7 +446,8 @@ public async Task Close() // in this case we reduce the waiting time // the consumer could be removed because of stream deleted // so it is not necessary to wait. - var unsubscribeResponse = await _client.Unsubscribe(_subscriberId).WaitAsync(Consts.MidWait).ConfigureAwait(false); + var unsubscribeResponse = + await _client.Unsubscribe(_subscriberId).WaitAsync(Consts.MidWait).ConfigureAwait(false); result = unsubscribeResponse.ResponseCode; } catch (Exception e) @@ -321,6 +458,7 @@ public async Task Close() var closed = await _client.MaybeClose($"_client-close-subscriber: {_subscriberId}").ConfigureAwait(false); ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-subscriber: {_subscriberId}"); _logger.LogDebug("Consumer {SubscriberId} closed", _subscriberId); + return result; } diff --git a/RabbitMQ.Stream.Client/Reliable/Consumer.cs b/RabbitMQ.Stream.Client/Reliable/Consumer.cs index 2d137c6d..a43ec432 100644 --- a/RabbitMQ.Stream.Client/Reliable/Consumer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Consumer.cs @@ -27,6 +27,7 @@ public record ConsumerConfig : ReliableConfig /// /// Callback function where the consumer receives the messages. + /// The callback runs in a different Task respect to the socket thread. /// Parameters that will be received by this function: /// /// diff --git a/RabbitMQ.Stream.Client/RoutingClient.cs b/RabbitMQ.Stream.Client/RoutingClient.cs index 11734a83..a61a77f1 100644 --- a/RabbitMQ.Stream.Client/RoutingClient.cs +++ b/RabbitMQ.Stream.Client/RoutingClient.cs @@ -65,14 +65,16 @@ private static async Task LookupConnection( // In this case we just return the node (leader for producer, random for consumer) // since there is not load balancer configuration - return await routing.CreateClient(clientParameters with { Endpoint = endPointNoLb }, logger).ConfigureAwait(false); + return await routing.CreateClient(clientParameters with { Endpoint = endPointNoLb }, logger) + .ConfigureAwait(false); } // here it means that there is a AddressResolver configuration // so there is a load-balancer or proxy we need to get the right connection // as first we try with the first node given from the LB var endPoint = clientParameters.AddressResolver.EndPoint; - var client = await routing.CreateClient(clientParameters with { Endpoint = endPoint }, logger).ConfigureAwait(false); + var client = await routing.CreateClient(clientParameters with { Endpoint = endPoint }, logger) + .ConfigureAwait(false); var advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host"); var advertisedPort = GetPropertyValue(client.ConnectionProperties, "advertised_port"); @@ -83,7 +85,8 @@ private static async Task LookupConnection( attemptNo++; await client.Close("advertised_host or advertised_port doesn't match").ConfigureAwait(false); - client = await routing.CreateClient(clientParameters with { Endpoint = endPoint }, logger).ConfigureAwait(false); + client = await routing.CreateClient(clientParameters with { Endpoint = endPoint }, logger) + .ConfigureAwait(false); advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host"); advertisedPort = GetPropertyValue(client.ConnectionProperties, "advertised_port"); @@ -93,7 +96,7 @@ private static async Task LookupConnection( $"Could not find broker ({broker.Host}:{broker.Port}) after {maxAttempts} attempts"); } - await Task.Delay(TimeSpan.FromMilliseconds(200)).ConfigureAwait(false); + await Task.Delay(Consts.RandomShort()).ConfigureAwait(false); } return client; @@ -148,7 +151,8 @@ private static string GetPropertyValue(IDictionary connectionPro public static async Task LookupLeaderConnection(ClientParameters clientParameters, StreamInfo metaDataInfo, ILogger logger = null) { - return await LookupConnection(clientParameters, metaDataInfo.Leader, MaxAttempts(metaDataInfo), logger).ConfigureAwait(false); + return await LookupConnection(clientParameters, metaDataInfo.Leader, MaxAttempts(metaDataInfo), logger) + .ConfigureAwait(false); } /// @@ -165,7 +169,8 @@ public static async Task LookupRandomConnection(ClientParameters client { try { - return await LookupConnection(clientParameters, broker, MaxAttempts(metaDataInfo), logger).ConfigureAwait(false); + return await LookupConnection(clientParameters, broker, MaxAttempts(metaDataInfo), logger) + .ConfigureAwait(false); } catch (Exception ex) { diff --git a/RabbitMQ.Stream.Client/WireFormatting.cs b/RabbitMQ.Stream.Client/WireFormatting.cs index 69624881..a4b2180a 100644 --- a/RabbitMQ.Stream.Client/WireFormatting.cs +++ b/RabbitMQ.Stream.Client/WireFormatting.cs @@ -95,6 +95,11 @@ internal static int ReadUInt32(ref SequenceReader reader, out uint value) return 4; } + internal static int ReadUInt32(ref ReadOnlySequence seq, out uint value) + { + return ReadUInt32(seq, out value); + + } internal static int ReadUInt32(ReadOnlySequence seq, out uint value) { if (seq.FirstSpan.Length >= 4) @@ -208,6 +213,11 @@ internal static int ReadByte(ref SequenceReader reader, out byte value) return 1; } + internal static int ReadByte(ref ReadOnlySequence seq, out byte b) + { + b = seq.FirstSpan[0]; + return 1; + } internal static int ReadByte(ReadOnlySequence seq, out byte b) { b = seq.FirstSpan[0]; diff --git a/Tests/ClientTests.cs b/Tests/ClientTests.cs index 5fd0ea22..7967d898 100644 --- a/Tests/ClientTests.cs +++ b/Tests/ClientTests.cs @@ -211,7 +211,7 @@ public async void ConsumerShouldReceiveDelivery() Task DeliverHandler(Deliver deliver) { - var sequenceReader = new SequenceReader(deliver.Chunk.Data); + var sequenceReader = new SequenceReader(new ReadOnlySequence(deliver.Chunk.Data)); for (var i = 0; i < deliver.Chunk.NumEntries; i++) { @@ -265,7 +265,7 @@ public async void ConsumerStoreOffsetShouldReceiveDelivery() var offsetType = new OffsetTypeFirst(); var deliverHandler = new Func((Deliver deliver) => { - var sequenceReader = new SequenceReader(deliver.Chunk.Data); + var sequenceReader = new SequenceReader(new ReadOnlySequence(deliver.Chunk.Data)); for (ulong i = 0; i < deliver.Chunk.NumEntries; i++) { WireFormatting.ReadUInt32(ref sequenceReader, out var len); diff --git a/Tests/UnitTests.cs b/Tests/UnitTests.cs index d48904ce..18872049 100644 --- a/Tests/UnitTests.cs +++ b/Tests/UnitTests.cs @@ -39,8 +39,7 @@ public class LoadBalancerRouting : IRouting public Task CreateClient(ClientParameters clientParameters, ILogger logger = null) { - var rnd = new Random(); - var advId = rnd.Next(0, advertisedHosts.Count); + var advId = Random.Shared.Next(0, advertisedHosts.Count); var fake = new FakeClient(clientParameters) { @@ -50,6 +49,7 @@ public Task CreateClient(ClientParameters clientParameters, ILogger log ["advertised_port"] = "5552" } }; + return Task.FromResult(fake); } diff --git a/docs/asciidoc/api.adoc b/docs/asciidoc/api.adoc index 8cb83b56..90e0ed72 100644 --- a/docs/asciidoc/api.adoc +++ b/docs/asciidoc/api.adoc @@ -667,13 +667,9 @@ include::{test-examples}/ConsumerUsage.cs[tag=consumer-creation] The broker start sending messages as soon as the `Consumer` instance is created. -[WARNING] -.Keep the message processing callback as short as possible -==== -The message processing callback should be kept as short as possible to avoid blocking the connection thread. -Not doing so can make the `StreamSystem`, `Producer`, `Consumer` instances sluggish or even block them. -Any long processing should be done in a separate thread (e.g. with an asynchronous `Task.Run(...)`). -==== + +Staring from the 1.3.0 version, the `Consumer#MessageHandler` API runs in a separated `Task` and it is possible to use `async`/`await` in the handler. + The following table sums up the main settings to create a `Consumer` with `ConsumerConfig`: diff --git a/kubernetes/stream_cluster.yaml b/kubernetes/stream_cluster.yaml index b4274480..e31ca958 100644 --- a/kubernetes/stream_cluster.yaml +++ b/kubernetes/stream_cluster.yaml @@ -10,6 +10,7 @@ metadata: namespace: stream-clients-test spec: replicas: 1 + image: rabbitmq:3.11.11-management service: type: LoadBalancer resources: @@ -22,4 +23,7 @@ spec: rabbitmq: additionalPlugins: - rabbitmq_stream - - rabbitmq_stream_management \ No newline at end of file + - rabbitmq_stream_management + additionalConfig: + default_user=test + default_pass=test \ No newline at end of file