Skip to content

Commit

Permalink
Add cancellation token during subscribe (#206)
Browse files Browse the repository at this point in the history
Fixes #205
With the cancellation token, the handle message function can be stopped during the close.

Signed-off-by: Gabriele Santomaggio <[email protected]>
Co-authored-by: Luke Bakken <[email protected]>
  • Loading branch information
Gsantomaggio and lukebakken authored Jan 10, 2023
1 parent 455028d commit f6efcc8
Showing 1 changed file with 34 additions and 9 deletions.
43 changes: 34 additions & 9 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
Expand Down Expand Up @@ -95,11 +96,14 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable
private readonly RawConsumerConfig _config;
private byte _subscriberId;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cancelTokenSource = new();
private readonly CancellationToken _token;

private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = null)
{
_client = client;
_config = config;
_token = _cancelTokenSource.Token;
_logger = logger ?? NullLogger.Instance;
}

Expand Down Expand Up @@ -148,9 +152,14 @@ void DispatchMessage(ref SequenceReader<byte> sequenceReader, ulong i)
message.MessageOffset = chunk.ChunkId + i;
if (MaybeDispatch(message.MessageOffset))
{
if (_token.IsCancellationRequested)
{
return;
}

_config.MessageHandler(this,
new MessageContext(message.MessageOffset, TimeSpan.FromMilliseconds(chunk.Timestamp)),
message).GetAwaiter().GetResult();
message).Wait(_token);
}
}
catch (ArgumentOutOfRangeException e)
Expand All @@ -159,6 +168,12 @@ void DispatchMessage(ref SequenceReader<byte> sequenceReader, ulong i)
"Please report this issue to the RabbitMQ team on GitHub {Repo}",
Consts.RabbitMQClientRepo);
}
catch (OperationCanceledException)
{
_logger.LogWarning(
"OperationCanceledException. The consumer id: {SubscriberId} reference: {Reference} has been closed while consuming messages",
_subscriberId, _config.Reference);
}
catch (Exception e)
{
_logger.LogError(e, "Error while processing chunk: {ChunkId}", chunk.ChunkId);
Expand Down Expand Up @@ -278,6 +293,8 @@ public async Task<ResponseCode> Close()
return ResponseCode.Ok;
}

_cancelTokenSource.Cancel();

var result = ResponseCode.Ok;
try
{
Expand All @@ -299,12 +316,10 @@ public async Task<ResponseCode> Close()

var closed = _client.MaybeClose($"_client-close-subscriber: {_subscriberId}");
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-subscriber: {_subscriberId}");
_disposed = true;
_logger.LogDebug("Consumer {SubscriberId} closed", _subscriberId);
return result;
}

//
private void Dispose(bool disposing)
{
if (!disposing)
Expand All @@ -317,10 +332,18 @@ private void Dispose(bool disposing)
return;
}

var closeConsumer = Close();
closeConsumer.Wait(TimeSpan.FromSeconds(1));
ClientExceptions.MaybeThrowException(closeConsumer.Result,
$"Error during remove producer. Subscriber: {_subscriberId}");
try
{
var closeConsumer = Close();
closeConsumer.Wait(TimeSpan.FromSeconds(1));
ClientExceptions.MaybeThrowException(closeConsumer.Result,
$"Error during remove producer. Subscriber: {_subscriberId}");
}
finally
{
_cancelTokenSource.Dispose();
_disposed = true;
}
}

public void Dispose()
Expand All @@ -333,8 +356,10 @@ public void Dispose()
{
_logger.LogError(e, "Error during disposing of consumer: {SubscriberId}.", _subscriberId);
}

GC.SuppressFinalize(this);
finally
{
GC.SuppressFinalize(this);
}
}
}
}

0 comments on commit f6efcc8

Please sign in to comment.