Skip to content

Commit

Permalink
Fix the Unsubscribe timeout problem (#313)
Browse files Browse the repository at this point in the history
- The  Unsubscribe function sync (like the other commands ). The consumers' list is updated in the right way.
  The connection is correctly closed since there are no pending consumers.

- Fix the handle delivery if the consumer is removed, but there are still chunks on the wire.
  Add debug logs for this situation. 

- Add Degug Asserts to validate the buffer size, which can help understand some parse chunk errors.
  That can be temporary at some point; we can remove them.

- Add the cancellation token to the connection class
  it helps when the reader is blocked and the consumer is closed.

- Given this [use case](#310) so short-life 
   consumers, the initial credits should be low to avoid keeping the read handler busy.

- Minor change is replace Memory<T> to ReadOnlyMemory<T>  it is a bit faster and safe 

- Uniform the log debug and error messages  with consumer info it adds all the consumer info that can be helpful in case of debugging and errors

- Fixes #310


---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Sep 29, 2023
1 parent a2e06bf commit 0711e13
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 62 deletions.
47 changes: 36 additions & 11 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,19 @@ public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
{
try
{
// here we reduce a bit the timeout to avoid waiting too much
// if the client is busy with read operations it can take time to process the unsubscribe
// but the subscribe is removed.
var result =
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
new UnsubscribeRequest(corr, subscriptionId)).ConfigureAwait(false);
new UnsubscribeRequest(corr, subscriptionId), TimeSpan.FromSeconds(5)).ConfigureAwait(false);
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);

return result;
}
finally
{
_logger.LogDebug("Unsubscribe: {SubscriptionId}", subscriptionId);
// remove consumer after RPC returns, this should avoid uncorrelated data being sent
consumers.Remove(subscriptionId);
}
Expand Down Expand Up @@ -436,7 +442,6 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
// so there is no need to send the heartbeat when not necessary
_heartBeatHandler.UpdateHeartBeat();

ConsumerEvents consumerEvents;
switch (tag)
{
case PublishConfirm.Key:
Expand All @@ -446,14 +451,29 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
confirmCallback(confirm.PublishingIds);
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
{
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
if (confirmSegment.Array != null)
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
}

break;
case Deliver.Key:
Deliver.Read(frame, out var deliver);
consumerEvents = consumers[deliver.SubscriptionId];
await consumerEvents.DeliverHandler(deliver).ConfigureAwait(false);
if (consumers.TryGetValue(deliver.SubscriptionId, out var consumerEvent))
{
await consumerEvent.DeliverHandler(deliver).ConfigureAwait(false);
}
else
{
// the consumer is not found, this can happen when the consumer is closing
// and there are still chunks on the wire to the handler is still processing the chunks
// we can ignore the chunk since the subscription does not exists anymore
_logger?.LogDebug(
"Could not find stream subscription {ID} or subscription closing." +
"A possible cause it that the subscription was closed and the are still chunks on the wire. " +
"Reduce the initial credits can help to avoid this situation",
deliver.SubscriptionId);
}

break;
case PublishError.Key:
PublishError.Read(frame, out var error);
Expand Down Expand Up @@ -588,7 +608,8 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
default:
if (MemoryMarshal.TryGetArray(frame.First, out var segment))
{
ArrayPool<byte>.Shared.Return(segment.Array);
if (segment.Array != null)
ArrayPool<byte>.Shared.Return(segment.Array);
}

throw new ArgumentException($"Unknown or unexpected tag: {tag}", nameof(tag));
Expand Down Expand Up @@ -637,19 +658,23 @@ public async Task<CloseResponse> Close(string reason)
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
TimeSpan.FromSeconds(10)).ConfigureAwait(false);

InternalClose();
connection.Dispose();
return result;
}

catch (System.TimeoutException)
catch (TimeoutException)
{
_logger.LogError("Timeout while closing the connection. The connection will be closed anyway");
_logger.LogError(
"Timeout while closing the connection. The connection will be closed anyway");
}
catch (Exception e)
{
_logger.LogError(e, "An error occurred while calling {CalledFunction}", nameof(connection.Dispose));
}
finally
{
// even if the close fails we need to close the connection
InternalClose();
connection.Dispose();
}

return new CloseResponse(0, ResponseCode.Ok);
}
Expand Down
28 changes: 22 additions & 6 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class Connection : IDisposable
private bool _disposedValue;
private readonly ILogger _logger;

// this is used to cancel the socket and the reader/write operations tasks
private readonly CancellationTokenSource _cancelTokenSource = new();
private CancellationToken Token => _cancelTokenSource.Token;

internal int NumFrames => numFrames;

public bool IsClosed => isClosed;
Expand Down Expand Up @@ -98,21 +102,27 @@ public async ValueTask<bool> Write<T>(T command) where T : struct, ICommand

private async Task WriteCommand<T>(T command) where T : struct, ICommand
{
if (Token.IsCancellationRequested)
{
throw new OperationCanceledException("Token Cancellation Requested Connection");
}

if (isClosed)
{
throw new InvalidOperationException("Connection is closed");
}

// Only one thread should be able to write to the output pipeline at a time.
await _writeLock.WaitAsync().ConfigureAwait(false);
await _writeLock.WaitAsync(Token).ConfigureAwait(false);
try
{
var size = command.SizeNeeded;
var mem = new byte[4 + size]; // + 4 to write the size
WireFormatting.WriteUInt32(mem, (uint)size);
var written = command.Write(mem.AsSpan()[4..]);
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem)).ConfigureAwait(false);
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem), Token).ConfigureAwait(false);
Debug.Assert(size == written);
await writer.FlushAsync().ConfigureAwait(false);
await writer.FlushAsync(Token).ConfigureAwait(false);
}
finally
{
Expand All @@ -129,7 +139,7 @@ private async Task ProcessIncomingFrames()
{
if (!reader.TryRead(out var result))
{
result = await reader.ReadAsync().ConfigureAwait(false);
result = await reader.ReadAsync(Token).ConfigureAwait(false);
}

var buffer = result.Buffer;
Expand Down Expand Up @@ -219,13 +229,19 @@ public void Dispose()
{
try
{
if (!_cancelTokenSource.IsCancellationRequested)
{
_cancelTokenSource.Cancel();
}

isClosed = true;
writer.Complete();
reader.Complete();
socket.Dispose();
socket.Close();
if (!_incomingFramesTask.Wait(Consts.MidWait))
{
_logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}", Consts.MidWait);
_logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}",
Consts.MidWait);
}
}
finally
Expand Down
13 changes: 9 additions & 4 deletions RabbitMQ.Stream.Client/Deliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ internal readonly struct SubEntryChunk
private SubEntryChunk(byte compress,
ushort numRecordsInBatch,
uint unCompressedDataSize, uint dataLen,
Memory<byte> data)
ReadOnlyMemory<byte> data)
{
compressValue = compress;
NumRecordsInBatch = numRecordsInBatch;
Expand All @@ -67,7 +67,7 @@ private SubEntryChunk(byte compress,
public uint UnCompressedDataSize { get; }

public uint DataLen { get; }
public Memory<byte> Data { get; }
public ReadOnlyMemory<byte> Data { get; }

// This wrapper was added to be used in async methods
// where the SequenceReader is not available
Expand Down Expand Up @@ -122,7 +122,7 @@ private Chunk(byte magicVersion,
ulong epoch,
ulong chunkId,
uint crc,
Memory<byte> data)
ReadOnlyMemory<byte> data)
{
MagicVersion = magicVersion;
NumEntries = numEntries;
Expand All @@ -142,7 +142,7 @@ private Chunk(byte magicVersion,
public ulong Epoch { get; }
public ulong ChunkId { get; }
public uint Crc { get; }
public Memory<byte> Data { get; }
public ReadOnlyMemory<byte> Data { get; }

internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
{
Expand Down Expand Up @@ -175,6 +175,11 @@ internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
$"Chunk: Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

if (memory.Length != dataLen)
{
throw new Exception($"Chunk: Not enough data, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, memory);
return offset;
}
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMec
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSupportedException(string s) -> void
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory<byte>
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
Expand Down
Loading

0 comments on commit 0711e13

Please sign in to comment.