From d5cdce4fe6af16a53a680b97b42b8b63f0baa69d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 18 Dec 2023 09:22:36 +0100 Subject: [PATCH] New event to handle Metadata update (#332) With this PR #328 the client can handle multi-producers and consumers per connection. This PR removes MetadataHandler and introduces OnMetadataUpdate event. The event can handle multiple Metadata updates coming from the server. Metadata update is raised when a stream is deleted, or a replica is removed. The server automatically removes the producers and consumers linked to the connection, here we need to remove these entities from the internal pool to be consistent. - Refactor RawConsumer and RawProducer. Remove duplication code. Move the common code to the AbstractEntity Class --------- Signed-off-by: Gabriele Santomaggio --- .github/workflows/build-test.yaml | 2 + RabbitMQ.Stream.Client/AbstractEntity.cs | 71 ++++++- RabbitMQ.Stream.Client/Client.cs | 100 ++++++--- RabbitMQ.Stream.Client/IConsumer.cs | 7 +- RabbitMQ.Stream.Client/IProducer.cs | 8 +- RabbitMQ.Stream.Client/PublicAPI.Shipped.txt | 8 - .../PublicAPI.Unshipped.txt | 20 +- RabbitMQ.Stream.Client/RawConsumer.cs | 198 ++++++++++-------- RabbitMQ.Stream.Client/RawProducer.cs | 111 +++++----- .../RawSuperStreamConsumer.cs | 3 +- .../RawSuperStreamProducer.cs | 1 - RabbitMQ.Stream.Client/RoutingClient.cs | 5 +- Tests/ClientTests.cs | 7 +- Tests/ConnectionsPoolTests.cs | 164 +++++++++++---- 14 files changed, 463 insertions(+), 242 deletions(-) diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index d4d2db42..1ded4492 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -67,6 +67,8 @@ jobs: key: ${{ runner.os }}-v2-nuget-${{ hashFiles('**/*.csproj') }} restore-keys: | ${{ runner.os }}-v2-nuget- + - name: Wait RabbitMQ is Up + run: docker exec ${{ job.services.rabbitmq.id }} rabbitmqctl wait --pid 1 --timeout 60 - name: Enable RabbitMQ Plugins run: docker exec ${{ job.services.rabbitmq.id }} rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0 - name: Restore diff --git a/RabbitMQ.Stream.Client/AbstractEntity.cs b/RabbitMQ.Stream.Client/AbstractEntity.cs index fb3da8bc..e4a414fd 100644 --- a/RabbitMQ.Stream.Client/AbstractEntity.cs +++ b/RabbitMQ.Stream.Client/AbstractEntity.cs @@ -10,31 +10,87 @@ namespace RabbitMQ.Stream.Client { + public abstract record EntityCommonConfig + { + internal ConnectionsPool Pool { get; set; } + } + internal enum EntityStatus { Open, Closed, Disposed } - public abstract class AbstractEntity + + public interface IClosable + { + public Task Close(); + } + + public abstract class AbstractEntity : IClosable { private readonly CancellationTokenSource _cancelTokenSource = new(); protected CancellationToken Token => _cancelTokenSource.Token; - + protected ILogger Logger { get; init; } internal EntityStatus _status = EntityStatus.Closed; + + protected byte EntityId { get; set; } + protected abstract string GetStream(); + protected abstract string DumpEntityConfiguration(); + // here the _cancelTokenSource is disposed and the token is cancelled // in producer is used to cancel the send task // in consumer is used to cancel the receive task - protected void MaybeCancelToken() + private void MaybeCancelToken() { - if (!_cancelTokenSource.IsCancellationRequested) _cancelTokenSource.Cancel(); } public abstract Task Close(); - protected void Dispose(bool disposing, string entityInfo, ILogger logger) + /// + /// Remove the producer or consumer from the server + /// + /// In case the producer or consumer is already removed from the server. + /// ex: metadata update + /// + protected abstract Task DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false); + + /// + /// Internal close method. It is called by the public Close method. + /// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed ) + /// Close the TCP connection if it is not already closed or it is needed. + /// + /// The connection pool instance + /// + /// + protected async Task Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false) + { + MaybeCancelToken(); + + if (!IsOpen()) // the client is already closed + { + return ResponseCode.Ok; + } + + _status = EntityStatus.Closed; + var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false); + + if (_client is { IsClosed: true }) + { + return result; + } + + var closed = await _client.MaybeClose($"closing: {EntityId}", + GetStream(), config.Pool) + .ConfigureAwait(false); + ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}"); + Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration()); + return result; + } + + protected void Dispose(bool disposing) { if (!disposing) { @@ -51,12 +107,12 @@ protected void Dispose(bool disposing, string entityInfo, ILogger logger) var closeTask = Close(); if (!closeTask.Wait(Consts.MidWait)) { - logger.LogWarning("Failed to close {EntityInfo} in time", entityInfo); + Logger?.LogWarning("Failed to close {EntityInfo} in time", DumpEntityConfiguration()); } } catch (Exception e) { - logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", entityInfo, e.Message); + Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message); } finally { @@ -70,6 +126,5 @@ public bool IsOpen() } internal Client _client; - } } diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index d2e1fa45..6361370f 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -53,7 +53,9 @@ public record ClientParameters public string VirtualHost { get; set; } = "/"; public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552); - public Action MetadataHandler { get; set; } = _ => { }; + public delegate void MetadataUpdateHandler(MetaDataUpdate update); + + public event MetadataUpdateHandler OnMetadataUpdate; public Action UnhandledExceptionHandler { get; set; } = _ => { }; public TimeSpan Heartbeat { get; set; } = TimeSpan.FromMinutes(1); @@ -71,6 +73,11 @@ public string ClientProvidedName public AddressResolver AddressResolver { get; set; } = null; public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain; + + internal void FireMetadataUpdate(MetaDataUpdate metaDataUpdate) + { + OnMetadataUpdate?.Invoke(metaDataUpdate); + } } internal readonly struct OutgoingMsg : ICommand @@ -213,7 +220,8 @@ await client .ConfigureAwait(false); logger?.LogDebug("Sasl mechanism: {Mechanisms}", saslHandshakeResponse.Mechanisms); - var isValid = saslHandshakeResponse.Mechanisms.Contains(parameters.AuthMechanism.ToString().ToUpperInvariant(), + var isValid = saslHandshakeResponse.Mechanisms.Contains( + parameters.AuthMechanism.ToString().ToUpperInvariant(), StringComparer.OrdinalIgnoreCase); if (!isValid) { @@ -225,7 +233,8 @@ await client var authResponse = await client .Request(corr => - new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(), saslData)) + new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(), + saslData)) .ConfigureAwait(false); ClientExceptions.MaybeThrowException(authResponse.ResponseCode, parameters.UserName); @@ -322,22 +331,28 @@ public ValueTask Publish(T msg) where T : struct, ICommand return (publisherId, response); } - public async Task DeletePublisher(byte publisherId) + public async Task DeletePublisher(byte publisherId, + bool ignoreIfAlreadyRemoved = false) { await _poolSemaphore.WaitAsync().ConfigureAwait(false); try { - var result = - await Request(corr => - new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false); + if (!ignoreIfAlreadyRemoved) + { + var result = + await Request(corr => + new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false); - return result; + return result; + } } finally { publishers.Remove(publisherId); _poolSemaphore.Release(); } + + return new DeletePublisherResponse(); } public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType, @@ -386,20 +401,24 @@ await Request(corr => return (subscriptionId, response); } - public async Task Unsubscribe(byte subscriptionId) + public async Task Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) { await _poolSemaphore.WaitAsync().ConfigureAwait(false); try { - // here we reduce a bit the timeout to avoid waiting too much - // if the client is busy with read operations it can take time to process the unsubscribe - // but the subscribe is removed. - var result = - await Request(corr => - new UnsubscribeRequest(corr, subscriptionId), TimeSpan.FromSeconds(5)).ConfigureAwait(false); - _logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId); - - return result; + if (!ignoreIfAlreadyRemoved) + { + // here we reduce a bit the timeout to avoid waiting too much + // if the client is busy with read operations it can take time to process the unsubscribe + // but the subscribe is removed. + var result = + await Request(corr => + new UnsubscribeRequest(corr, subscriptionId), + TimeSpan.FromSeconds(5)).ConfigureAwait(false); + _logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId); + + return result; + } } finally { @@ -408,6 +427,8 @@ await Request(corr => consumers.Remove(subscriptionId); _poolSemaphore.Release(); } + + return new UnsubscribeResponse(); } public async Task QueryPartition(string superStream) @@ -477,12 +498,25 @@ private async Task HandleIncoming(Memory frameMemory) case PublishConfirm.Key: PublishConfirm.Read(frame, out var confirm); confirmFrames += 1; - var (confirmCallback, _) = publishers[confirm.PublisherId]; - confirmCallback(confirm.PublishingIds); - if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment)) + if (publishers.TryGetValue(confirm.PublisherId, out var publisherConf)) { - if (confirmSegment.Array != null) - ArrayPool.Shared.Return(confirmSegment.Array); + var (confirmCallback, _) = publisherConf; + confirmCallback(confirm.PublishingIds); + if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment)) + { + if (confirmSegment.Array != null) + ArrayPool.Shared.Return(confirmSegment.Array); + } + } + else + { + // the producer is not found, this can happen when the producer is closing + // and there are still confirmation on the wire + // we can ignore the error since the producer does not exists anymore + _logger?.LogDebug( + "Could not find stream producer {ID} or producer is closing." + + "A possible cause it that the producer was closed and the are still confirmation on the wire. ", + confirm.PublisherId); } break; @@ -507,12 +541,26 @@ private async Task HandleIncoming(Memory frameMemory) break; case PublishError.Key: PublishError.Read(frame, out var error); - var (_, errorCallback) = publishers[error.PublisherId]; - errorCallback(error.PublishingErrors); + if (publishers.TryGetValue(error.PublisherId, out var publisher)) + { + var (_, errorCallback) = publisher; + errorCallback(error.PublishingErrors); + } + else + { + // the producer is not found, this can happen when the producer is closing + // and there are still confirmation on the wire + // we can ignore the error since the producer does not exists anymore + _logger?.LogDebug( + "Could not find stream producer {ID} or producer is closing." + + "A possible cause it that the producer was closed and the are still confirmation on the wire. ", + error.PublisherId); + } + break; case MetaDataUpdate.Key: MetaDataUpdate.Read(frame, out var metaDataUpdate); - Parameters.MetadataHandler(metaDataUpdate); + Parameters.FireMetadataUpdate(metaDataUpdate); break; case TuneResponse.Key: TuneResponse.Read(frame, out var tuneResponse); diff --git a/RabbitMQ.Stream.Client/IConsumer.cs b/RabbitMQ.Stream.Client/IConsumer.cs index b032e2ec..513bf88f 100644 --- a/RabbitMQ.Stream.Client/IConsumer.cs +++ b/RabbitMQ.Stream.Client/IConsumer.cs @@ -7,21 +7,18 @@ namespace RabbitMQ.Stream.Client; -public interface IConsumer +public interface IConsumer : IClosable { public Task StoreOffset(ulong offset); - public Task Close(); public void Dispose(); public ConsumerInfo Info { get; } } -public record IConsumerConfig : INamedEntity +public record IConsumerConfig : EntityCommonConfig, INamedEntity { private ushort _initialCredits = Consts.ConsumerInitialCredits; - internal ConnectionsPool Pool { get; set; } - // StoredOffsetSpec configuration it is needed to keep the offset spec. // since the offset can be decided from the ConsumerConfig.OffsetSpec. // and from ConsumerConfig.ConsumerUpdateListener. diff --git a/RabbitMQ.Stream.Client/IProducer.cs b/RabbitMQ.Stream.Client/IProducer.cs index e24051d2..2a1e8a79 100644 --- a/RabbitMQ.Stream.Client/IProducer.cs +++ b/RabbitMQ.Stream.Client/IProducer.cs @@ -15,7 +15,7 @@ namespace RabbitMQ.Stream.Client; // - Super-Stream producer // -public interface IProducer +public interface IProducer : IClosable { /// /// Send the message to the stream in asynchronous mode. @@ -49,8 +49,6 @@ public interface IProducer /// public ValueTask Send(ulong publishingId, List subEntryMessages, CompressionType compressionType); - public Task Close(); - /// /// Return the last publishing id. /// @@ -83,11 +81,9 @@ public record ProducerFilter public Func FilterValue { get; set; } = null; } -public record IProducerConfig : INamedEntity +public record IProducerConfig : EntityCommonConfig, INamedEntity { - internal ConnectionsPool Pool { get; set; } - public string Reference { get; set; } public int MaxInFlight { get; set; } = 1_000; public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer"; diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index 6891133e..919b8adc 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -169,7 +169,6 @@ RabbitMQ.Stream.Client.Client.ConnectionCloseHandler RabbitMQ.Stream.Client.Client.ConnectionProperties.get -> System.Collections.Generic.IDictionary RabbitMQ.Stream.Client.Client.CreateStream(string stream, System.Collections.Generic.IDictionary args) -> System.Threading.Tasks.ValueTask RabbitMQ.Stream.Client.Client.Credit(byte subscriptionId, ushort credit) -> System.Threading.Tasks.ValueTask -RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.DeleteStream(string stream) -> System.Threading.Tasks.ValueTask RabbitMQ.Stream.Client.Client.IncomingFrames.get -> int RabbitMQ.Stream.Client.Client.IsClosed.get -> bool @@ -188,7 +187,6 @@ RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)> RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)> -RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.ClientParameters RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void @@ -198,8 +196,6 @@ RabbitMQ.Stream.Client.ClientParameters.Endpoint.get -> System.Net.EndPoint RabbitMQ.Stream.Client.ClientParameters.Endpoint.set -> void RabbitMQ.Stream.Client.ClientParameters.Heartbeat.get -> System.TimeSpan RabbitMQ.Stream.Client.ClientParameters.Heartbeat.set -> void -RabbitMQ.Stream.Client.ClientParameters.MetadataHandler.get -> System.Action -RabbitMQ.Stream.Client.ClientParameters.MetadataHandler.set -> void RabbitMQ.Stream.Client.ClientParameters.Password.get -> string RabbitMQ.Stream.Client.ClientParameters.Password.set -> void RabbitMQ.Stream.Client.ClientParameters.Properties.get -> System.Collections.Generic.IDictionary @@ -335,7 +331,6 @@ RabbitMQ.Stream.Client.ICompressionCodec.UnCompress(System.Buffers.ReadOnlySeque RabbitMQ.Stream.Client.ICompressionCodec.UnCompressedSize.get -> int RabbitMQ.Stream.Client.ICompressionCodec.Write(System.Span span) -> int RabbitMQ.Stream.Client.IConsumer -RabbitMQ.Stream.Client.IConsumer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.IConsumer.Dispose() -> void RabbitMQ.Stream.Client.IConsumer.StoreOffset(ulong offset) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.IConsumerConfig @@ -357,7 +352,6 @@ RabbitMQ.Stream.Client.IOffsetType.OffsetType.get -> RabbitMQ.Stream.Client.Offs RabbitMQ.Stream.Client.IOffsetType.Size.get -> int RabbitMQ.Stream.Client.IOffsetType.Write(System.Span span) -> int RabbitMQ.Stream.Client.IProducer -RabbitMQ.Stream.Client.IProducer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.IProducer.ConfirmFrames.get -> int RabbitMQ.Stream.Client.IProducer.Dispose() -> void RabbitMQ.Stream.Client.IProducer.GetLastPublishingId() -> System.Threading.Tasks.Task @@ -559,7 +553,6 @@ RabbitMQ.Stream.Client.RawProducerConfig.MetadataHandler.set -> void RabbitMQ.Stream.Client.RawProducerConfig.RawProducerConfig(string stream) -> void RabbitMQ.Stream.Client.RawProducerConfig.Stream.get -> string RabbitMQ.Stream.Client.RawSuperStreamConsumer -RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamConsumer.Dispose() -> void RabbitMQ.Stream.Client.RawSuperStreamConsumer.StoreOffset(ulong offset) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig @@ -570,7 +563,6 @@ RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.OffsetSpec.set -> void RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.RawSuperStreamConsumerConfig(string superStream) -> void RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.SuperStream.get -> string RabbitMQ.Stream.Client.RawSuperStreamProducer -RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamProducer.ConfirmFrames.get -> int RabbitMQ.Stream.Client.RawSuperStreamProducer.Dispose() -> void RabbitMQ.Stream.Client.RawSuperStreamProducer.GetLastPublishingId() -> System.Threading.Tasks.Task diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 1129f257..2a011e84 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -1,12 +1,19 @@ abstract RabbitMQ.Stream.Client.AbstractEntity.Close() -> System.Threading.Tasks.Task +abstract RabbitMQ.Stream.Client.AbstractEntity.DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task +abstract RabbitMQ.Stream.Client.AbstractEntity.DumpEntityConfiguration() -> string +abstract RabbitMQ.Stream.Client.AbstractEntity.GetStream() -> string const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort override RabbitMQ.Stream.Client.Broker.ToString() -> string override RabbitMQ.Stream.Client.RawConsumer.Close() -> System.Threading.Tasks.Task override RabbitMQ.Stream.Client.RawProducer.Close() -> System.Threading.Tasks.Task -RabbitMQ.Stream.Client.AbstractEntity.Dispose(bool disposing, string entityInfo, Microsoft.Extensions.Logging.ILogger logger) -> void +RabbitMQ.Stream.Client.AbstractEntity.Dispose(bool disposing) -> void +RabbitMQ.Stream.Client.AbstractEntity.EntityId.get -> byte +RabbitMQ.Stream.Client.AbstractEntity.EntityId.set -> void RabbitMQ.Stream.Client.AbstractEntity.IsOpen() -> bool -RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void +RabbitMQ.Stream.Client.AbstractEntity.Logger.get -> Microsoft.Extensions.Logging.ILogger +RabbitMQ.Stream.Client.AbstractEntity.Logger.init -> void +RabbitMQ.Stream.Client.AbstractEntity.Shutdown(RabbitMQ.Stream.Client.EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken RabbitMQ.Stream.Client.AuthMechanism RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.AuthMechanism @@ -19,11 +26,15 @@ RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte RabbitMQ.Stream.Client.Client.ClientId.get -> string RabbitMQ.Stream.Client.Client.ClientId.init -> void RabbitMQ.Stream.Client.Client.DeclarePublisher(string publisherRef, string stream, System.Action> confirmCallback, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]> errorCallback, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.DeclarePublisherResponse)> +RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask +RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void +RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler +RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler RabbitMQ.Stream.Client.ConnectionItem RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool RabbitMQ.Stream.Client.ConnectionItem.BrokerInfo.get -> string @@ -60,9 +71,12 @@ RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string RabbitMQ.Stream.Client.CrcException RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void +RabbitMQ.Stream.Client.EntityCommonConfig RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> RabbitMQ.Stream.Client.IClient.ClientId.get -> string RabbitMQ.Stream.Client.IClient.ClientId.init -> void +RabbitMQ.Stream.Client.IClosable +RabbitMQ.Stream.Client.IClosable.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32 RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void @@ -123,7 +137,9 @@ RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int RabbitMQ.Stream.Client.PublishFilter.Write(System.Span span) -> int RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo +RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo +RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index e53e9a63..2517234a 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -116,8 +116,7 @@ internal void Validate() public class RawConsumer : AbstractEntity, IConsumer, IDisposable { private readonly RawConsumerConfig _config; - private byte _subscriberId; - private readonly ILogger _logger; + private readonly Channel _chunksBuffer; private readonly ushort _initialCredits; @@ -127,13 +126,13 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable // assigned. private readonly TaskCompletionSource _completeSubscription = new(); - private string ConsumerInfo() + protected sealed override string DumpEntityConfiguration() { var superStream = string.IsNullOrEmpty(_config.SuperStream) ? "No SuperStream" : $"SuperStream {_config.SuperStream}"; return - $"Consumer id {_subscriberId} for stream: {_config.Stream}, reference: {_config.Reference}, OffsetSpec {_config.OffsetSpec} " + + $"Consumer id {EntityId} for stream: {_config.Stream}, reference: {_config.Reference}, OffsetSpec {_config.OffsetSpec} " + $"Client ProvidedName {_config.ClientProvidedName}, " + $"{superStream}, IsSingleActiveConsumer: {_config.IsSingleActiveConsumer}, " + $"Token IsCancellationRequested: {Token.IsCancellationRequested} "; @@ -141,10 +140,10 @@ private string ConsumerInfo() private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = null) { - _logger = logger ?? NullLogger.Instance; + Logger = logger ?? NullLogger.Instance; _initialCredits = config.InitialCredits; _config = config; - _logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo()); + Logger.LogDebug("Creating... {ConsumerInfo}", DumpEntityConfiguration()); Info = new ConsumerInfo(_config.Stream, _config.Reference); // _chunksBuffer is a channel that is used to buffer the chunks _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits) @@ -174,6 +173,10 @@ private bool MaybeDispatch(ulong offset) }; } + protected override string GetStream() + { + return _config.Stream; + } public async Task StoreOffset(ulong offset) { await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false); @@ -233,16 +236,16 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int { if (Token.IsCancellationRequested) { - _logger?.LogDebug( + Logger?.LogDebug( "Error while parsing message {ConsumerInfo}, Cancellation Requested, the consumer is closing. ", - ConsumerInfo()); + DumpEntityConfiguration()); return null; } - _logger?.LogError(e, - "Error while parsing message {ConsumerInfo}. The message will be skipped. " + + Logger?.LogError(e, + "Error while parsing message {EntityInfo}. The message will be skipped. " + "Please report this issue to the RabbitMQ team on GitHub {Repo}", - ConsumerInfo(), Consts.RabbitMQClientRepo); + DumpEntityConfiguration(), Consts.RabbitMQClientRepo); } return null; @@ -276,11 +279,11 @@ async Task DispatchMessage(Message message, ulong i) } catch (Exception e) { - _logger.LogError(e, + Logger.LogError(e, "Error while filtering message. Message with offset {MessageOffset} won't be dispatched." + "Suggestion: review the PostFilter value function" - + "{ConsumerInfo}", - message.MessageOffset, ConsumerInfo()); + + "{EntityInfo}", + message.MessageOffset, DumpEntityConfiguration()); canDispatch = false; } } @@ -295,9 +298,9 @@ await _config.MessageHandler(this, } else { - _logger?.LogDebug( - "{ConsumerInfo} is not active. message won't dispatched", - ConsumerInfo()); + Logger?.LogDebug( + "{EntityInfo} is not active. message won't dispatched", + DumpEntityConfiguration()); } } } @@ -305,23 +308,23 @@ await _config.MessageHandler(this, catch (OperationCanceledException) { - _logger?.LogWarning( - "OperationCanceledException. {ConsumerInfo} has been closed while consuming messages", - ConsumerInfo()); + Logger?.LogWarning( + "OperationCanceledException. {EntityInfo} has been closed while consuming messages", + DumpEntityConfiguration()); } catch (Exception e) { if (Token.IsCancellationRequested) { - _logger?.LogDebug( - "Dispatching {ConsumerInfo}, Cancellation Requested, the consumer is closing. ", - ConsumerInfo()); + Logger?.LogDebug( + "Dispatching {EntityInfo}, Cancellation Requested, the consumer is closing. ", + DumpEntityConfiguration()); return; } - _logger?.LogError(e, - "Error while Dispatching message, ChunkId : {ChunkId} {ConsumerInfo}", - chunk.ChunkId, ConsumerInfo()); + Logger?.LogError(e, + "Error while Dispatching message, ChunkId : {ChunkId} {EntityInfo}", + chunk.ChunkId, DumpEntityConfiguration()); } } @@ -377,9 +380,9 @@ await _config.MessageHandler(this, } catch (Exception e) { - _logger?.LogError(e, - "Error while parsing chunk, ChunkId : {ChunkId} {ConsumerInfo}", - chunk.ChunkId, ConsumerInfo()); + Logger?.LogError(e, + "Error while parsing chunk, ChunkId : {ChunkId} {EntityInfo}", + chunk.ChunkId, DumpEntityConfiguration()); } } @@ -402,8 +405,7 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // // we request the credit before process the check to keep the network busy try { - - await _client.Credit(_subscriberId, 1).ConfigureAwait(false); + await _client.Credit(EntityId, 1).ConfigureAwait(false); } catch (InvalidOperationException) { @@ -414,9 +416,9 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // // The client will throw an InvalidOperationException // since the connection is closed // In this case we don't want to log the error to avoid log noise - _logger?.LogDebug( - "Can't send the credit {ConsumerInfo}: The TCP client has been closed", - ConsumerInfo()); + Logger?.LogDebug( + "Can't send the credit {EntityInfo}: The TCP client has been closed", + DumpEntityConfiguration()); break; } @@ -432,9 +434,9 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // } } - _logger?.LogDebug( - "The ProcessChunks {ConsumerInfo} task has been closed normally", - ConsumerInfo()); + Logger?.LogDebug( + "The ProcessChunks {EntityInfo} task has been closed normally", + DumpEntityConfiguration()); } catch (Exception e) { @@ -443,15 +445,15 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // // In this case we don't want to log the error if (Token.IsCancellationRequested) { - _logger?.LogDebug( - "The ProcessChunks task for the stream: {ConsumerInfo} has been closed due to cancellation", - ConsumerInfo()); + Logger?.LogDebug( + "The ProcessChunks task for the stream: {EntityInfo} has been closed due to cancellation", + DumpEntityConfiguration()); return; } - _logger?.LogError(e, - "Error while process chunks the stream: {ConsumerInfo} The ProcessChunks task will be closed", - ConsumerInfo()); + Logger?.LogError(e, + "Error while process chunks the stream: {EntityInfo} The ProcessChunks task will be closed", + DumpEntityConfiguration()); } }, Token); } @@ -460,20 +462,8 @@ private async Task Init() { _config.Validate(); - _client.ConnectionClosed += async reason => - { - _config.Pool.Remove(_client.ClientId); - await Close().ConfigureAwait(false); - if (_config.ConnectionClosedHandler != null) - { - await _config.ConnectionClosedHandler(reason).ConfigureAwait(false); - } - }; - - if (_config.MetadataHandler != null) - { - _client.Parameters.MetadataHandler += _config.MetadataHandler; - } + _client.ConnectionClosed += OnConnectionClosed(); + _client.Parameters.OnMetadataUpdate += OnMetadataUpdate(); var consumerProperties = new Dictionary(); @@ -507,7 +497,7 @@ private async Task Init() var chunkConsumed = 0; // this the default value for the consumer. _config.StoredOffsetSpec = _config.OffsetSpec; - (_subscriberId, var response) = await _client.Subscribe( + (EntityId, var response) = await _client.Subscribe( _config, _initialCredits, consumerProperties, @@ -522,10 +512,10 @@ private async Task Init() { // the consumer is closing from the user but some chunks are still in the buffer // simply skip the chunk - _logger?.LogTrace( - "CancellationToken requested. The {ConsumerInfo} " + + Logger?.LogTrace( + "CancellationToken requested. The {EntityInfo} " + "The chunk won't be processed", - ConsumerInfo()); + DumpEntityConfiguration()); return; } @@ -536,13 +526,13 @@ private async Task Init() ); if (crcCalculated != deliver.Chunk.Crc) { - _logger?.LogError( - "CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {ConsumerInfo}, " + - "Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated, ConsumerInfo(), + Logger?.LogError( + "CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {EntityInfo}, " + + "Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated, DumpEntityConfiguration(), chunkConsumed); throw new CrcException( - $"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {ConsumerInfo()}, " + + $"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " + $"Chunk Consumed {chunkConsumed}"); } } @@ -563,10 +553,10 @@ private async Task Init() // Here we set the promotion status // important for the dispatcher messages IsPromotedAsActive = promotedAsActive; - _logger?.LogDebug( + Logger?.LogDebug( "The consumer active status is: {IsActive} for {ConsumeInfo}", IsPromotedAsActive, - ConsumerInfo()); + DumpEntityConfiguration()); return _config.StoredOffsetSpec; } ).ConfigureAwait(false); @@ -580,53 +570,77 @@ private async Task Init() throw new CreateConsumerException($"consumer could not be created code: {response.ResponseCode}"); } - public override async Task Close() - { - // this unlock the consumer if it is waiting for a message - // see DispatchMessage method where the token is used - MaybeCancelToken(); - if (!IsOpen()) + private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => + metaDataUpdate => { - return ResponseCode.Ok; - } + // the connection can handle different streams + // we need to check if the metadata update is for the stream + // where the consumer is consuming else can ignore the update + if (metaDataUpdate.Stream != _config.Stream) + return; + // at this point the server has removed the consumer from the list + // and the unsubscribe is not needed anymore (ignoreIfClosed = true) + // we call the Close to re-enter to the standard behavior + // ignoreIfClosed is an optimization to avoid to send the unsubscribe + _config.MetadataHandler?.Invoke(metaDataUpdate); + Shutdown(_config, true).ConfigureAwait(false); + // remove the event since the consumer is closed + // only if the stream is the valid + _client.Parameters.OnMetadataUpdate -= OnMetadataUpdate(); + }; - _status = EntityStatus.Closed; - var result = ResponseCode.Ok; + private Client.ConnectionCloseHandler OnConnectionClosed() => + async reason => + { + _config.Pool.Remove(_client.ClientId); + await Shutdown(_config, true).ConfigureAwait(false); + if (_config.ConnectionClosedHandler != null) + { + await _config.ConnectionClosedHandler(reason).ConfigureAwait(false); + } + + // remove the event since the connection is closed + _client.ConnectionClosed -= OnConnectionClosed(); + }; + + protected override async Task DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false) + { try { var unsubscribeResponse = - await _client.Unsubscribe(_subscriberId).ConfigureAwait(false); - result = unsubscribeResponse.ResponseCode; + await _client.Unsubscribe(EntityId, ignoreIfAlreadyDeleted).ConfigureAwait(false); + return unsubscribeResponse.ResponseCode; } catch (TimeoutException) { - _logger.LogError( - "Timeout removing the consumer id: {SubscriberId}, {ConsumerInfo} from the server. " + + Logger.LogError( + "Timeout removing the consumer id: {SubscriberId}, {EntityInfo} from the server. " + "The consumer will be closed anyway", - _subscriberId, ConsumerInfo()); + EntityId, DumpEntityConfiguration()); } catch (Exception e) { - _logger.LogError(e, - "Error removing {ConsumerInfo} from the server", - ConsumerInfo()); + Logger.LogError(e, + "Error removing {EntityInfo} from the server", + DumpEntityConfiguration()); } - var closed = await _client.MaybeClose($"_client-close-subscriber: {_subscriberId}", - _config.Stream, _config.Pool) - .ConfigureAwait(false); - ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-subscriber: {_subscriberId}"); - _logger.LogDebug("{ConsumerInfo} is closed", ConsumerInfo()); - return result; + return ResponseCode.Ok; + + } + + public override async Task Close() + { + return await Shutdown(_config).ConfigureAwait(false); } public void Dispose() { try { - Dispose(true, ConsumerInfo(), _logger); + Dispose(true); } finally { diff --git a/RabbitMQ.Stream.Client/RawProducer.cs b/RabbitMQ.Stream.Client/RawProducer.cs index 8140be4b..06f95d26 100644 --- a/RabbitMQ.Stream.Client/RawProducer.cs +++ b/RabbitMQ.Stream.Client/RawProducer.cs @@ -57,18 +57,18 @@ internal void Validate() public class RawProducer : AbstractEntity, IProducer, IDisposable { - private byte _publisherId; private readonly RawProducerConfig _config; private readonly Channel _messageBuffer; private readonly SemaphoreSlim _semaphore; - private readonly ILogger _logger; public int PendingCount => _config.MaxInFlight - _semaphore.CurrentCount; - private string ProducerInfo() + protected override string GetStream() => _config.Stream; + + protected override string DumpEntityConfiguration() { return - $"Producer id {_publisherId} for stream: {_config.Stream}, reference: {_config.Reference}," + + $"Producer id {EntityId} for stream: {_config.Stream}, reference: {_config.Reference}," + $"Client ProvidedName {_config.ClientProvidedName}, " + $"Token IsCancellationRequested: {Token.IsCancellationRequested} "; } @@ -101,7 +101,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu SingleWriter = false, FullMode = BoundedChannelFullMode.Wait }); - _logger = logger ?? NullLogger.Instance; + Logger = logger ?? NullLogger.Instance; Task.Run(ProcessBuffer); _semaphore = new SemaphoreSlim(config.MaxInFlight, config.MaxInFlight); } @@ -113,24 +113,12 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu private async Task Init() { - _client.ConnectionClosed += async reason => - { - await Close().ConfigureAwait(false); - _config.Pool.Remove(_client.ClientId); - if (_config.ConnectionClosedHandler != null) - { - await _config.ConnectionClosedHandler(reason).ConfigureAwait(false); - } - }; - - if (_config.MetadataHandler != null) - { - _client.Parameters.MetadataHandler += _config.MetadataHandler; - } - _config.Validate(); - (_publisherId, var response) = await _client.DeclarePublisher( + _client.ConnectionClosed += OnConnectionClosed(); + _client.Parameters.OnMetadataUpdate += OnMetadataUpdate(); + + (EntityId, var response) = await _client.DeclarePublisher( _config.Reference, _config.Stream, publishingIds => @@ -152,8 +140,9 @@ private async Task Init() // there could be an exception in the user code. // So here we log the exception and we continue. - _logger.LogError(e, "Error during confirm handler, publishing id: {Id}. {ProducerInfo} " + - "Hint: Check the user ConfirmHandler callback", id, ProducerInfo()); + Logger.LogError(e, "Error during confirm handler, publishing id: {Id}. {ProducerInfo} " + + "Hint: Check the user ConfirmHandler callback", id, + DumpEntityConfiguration()); } } @@ -178,6 +167,40 @@ private async Task Init() throw new CreateProducerException($"producer could not be created code: {response.ResponseCode}"); } + private Client.ConnectionCloseHandler OnConnectionClosed() => + async reason => + { + _config.Pool.Remove(_client.ClientId); + await Shutdown(_config, true).ConfigureAwait(false); + if (_config.ConnectionClosedHandler != null) + { + await _config.ConnectionClosedHandler(reason).ConfigureAwait(false); + } + + // remove the event since the connection is closed + _client.ConnectionClosed -= OnConnectionClosed(); + }; + + private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => + metaDataUpdate => + { + // the connection can handle different streams + // we need to check if the metadata update is for the stream + // where the producer is sending the messages else can ignore the update + if (metaDataUpdate.Stream != _config.Stream) + return; + // at this point the server has removed the producer from the list + // and the DeletePublisher producer is not needed anymore (ignoreIfClosed = true) + // we call the Close to re-enter to the standard behavior + // ignoreIfClosed is an optimization to avoid to send the DeletePublisher + _config.MetadataHandler?.Invoke(metaDataUpdate); + Shutdown(_config, true).ConfigureAwait(false); + + // remove the event since the producer is closed + // only if the stream is the valid + _client.Parameters.OnMetadataUpdate -= OnMetadataUpdate(); + }; + private bool IsFilteringEnabled => _config.Filter is { FilterValue: not null }; /// @@ -196,7 +219,7 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, { await SemaphoreAwaitAsync().ConfigureAwait(false); var publishTask = - _client.Publish(new SubEntryPublish(_publisherId, publishingId, + _client.Publish(new SubEntryPublish(EntityId, publishingId, CompressionHelper.Compress(subEntryMessages, compressionType))); await publishTask.ConfigureAwait(false); } @@ -254,13 +277,13 @@ private async Task SendMessages(List<(ulong, Message)> messages, bool clearMessa { if (IsFilteringEnabled) { - await _client.Publish(new PublishFilter(_publisherId, messages, _config.Filter.FilterValue, - _logger)) + await _client.Publish(new PublishFilter(EntityId, messages, _config.Filter.FilterValue, + Logger)) .ConfigureAwait(false); } else { - await _client.Publish(new Publish(_publisherId, messages)).ConfigureAwait(false); + await _client.Publish(new Publish(EntityId, messages)).ConfigureAwait(false); } if (clearMessagesList) @@ -306,7 +329,7 @@ public async ValueTask Send(ulong publishingId, Message message) } await SemaphoreAwaitAsync().ConfigureAwait(false); - var msg = new OutgoingMsg(_publisherId, publishingId, message); + var msg = new OutgoingMsg(EntityId, publishingId, message); // Let's see if we can write a message to the channel without having to wait if (!_messageBuffer.Writer.TryWrite(msg)) @@ -341,51 +364,41 @@ private async Task ProcessBuffer() } catch (Exception e) { - _logger.LogError(e, "error while Process Buffer"); + Logger.LogError(e, "error while Process Buffer"); } } public override async Task Close() { - // MaybeCancelToken This unlocks the semaphore so that the background task can exit - // see SemaphoreAwaitAsync method and processBuffer method - MaybeCancelToken(); - - if (!IsOpen()) // the client is already closed - { - return ResponseCode.Ok; - } + return await Shutdown(_config).ConfigureAwait(false); + } - _status = EntityStatus.Closed; - var result = ResponseCode.Ok; + protected override async Task DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false) + { try { // The default timeout is usually 10 seconds // in this case we reduce the waiting time // the producer could be removed because of stream deleted // so it is not necessary to wait. - var closeResponse = await _client.DeletePublisher(_publisherId).WaitAsync(TimeSpan.FromSeconds(3)) + var closeResponse = await _client.DeletePublisher(EntityId, ignoreIfAlreadyDeleted) + .WaitAsync(TimeSpan.FromSeconds(3)) .ConfigureAwait(false); - result = closeResponse.ResponseCode; + return closeResponse.ResponseCode; } catch (Exception e) { - _logger.LogError(e, "Error removing {ProducerInfo} from the server", ProducerInfo()); + Logger.LogError(e, "Error removing {ProducerInfo} from the server", DumpEntityConfiguration()); } - var closed = await _client.MaybeClose($"client-close-publisher: {_publisherId}", - _config.Stream, _config.Pool) - .ConfigureAwait(false); - ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-publisher: {_publisherId}"); - _logger?.LogDebug("{ProducerInfo} closed", ProducerInfo()); - return result; + return ResponseCode.Ok; } public void Dispose() { try { - Dispose(true, ProducerInfo(), _logger); + Dispose(true); } finally { diff --git a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs index ebeb2f22..f4586951 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs @@ -112,8 +112,7 @@ private RawConsumerConfig FromStreamConfig(string stream) Thread.Sleep(500); _streamInfos.Remove(update.Stream); - _consumers.TryRemove(update.Stream, out var consumerMetadata); - consumerMetadata?.Close(); + _consumers.TryRemove(update.Stream, out _); // this check is needed only for an edge case // when the system is closed and the connections for the steam are still open for diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index 44cb6015..a5e72c2d 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -119,7 +119,6 @@ private RawProducerConfig FromStreamConfig(string stream) } _producers.TryRemove(update.Stream, out var producer); - producer?.Close(); }, ClientProvidedName = _config.ClientProvidedName, BatchSize = _config.BatchSize, diff --git a/RabbitMQ.Stream.Client/RoutingClient.cs b/RabbitMQ.Stream.Client/RoutingClient.cs index 8aa060b6..6e61021c 100644 --- a/RabbitMQ.Stream.Client/RoutingClient.cs +++ b/RabbitMQ.Stream.Client/RoutingClient.cs @@ -170,9 +170,10 @@ public static async Task LookupRandomConnection(ClientParameters client { var brokers = new List() { metaDataInfo.Leader }; brokers.AddRange(metaDataInfo.Replicas); - brokers.Sort((_, _) => Random.Shared.Next(-1, 1)); + // brokers.Sort((_, _) => Random.Shared.Next(-1, 1)); + var br = brokers.OrderBy(x => Random.Shared.Next()).ToList(); var exceptions = new List(); - foreach (var broker in brokers) + foreach (var broker in br) { try { diff --git a/Tests/ClientTests.cs b/Tests/ClientTests.cs index 1b031783..967b724a 100644 --- a/Tests/ClientTests.cs +++ b/Tests/ClientTests.cs @@ -77,7 +77,12 @@ public async void MetadataUpdateIsHandled() { var stream = Guid.NewGuid().ToString(); var testPassed = new TaskCompletionSource(); - var clientParameters = new ClientParameters { MetadataHandler = m => testPassed.SetResult(m) }; + var clientParameters = new ClientParameters(); + clientParameters.OnMetadataUpdate += (update) => + { + testPassed.SetResult(update); + }; + var client = await Client.Create(clientParameters); await client.CreateStream(stream, new Dictionary()); Action> confirmed = (pubIds) => { }; diff --git a/Tests/ConnectionsPoolTests.cs b/Tests/ConnectionsPoolTests.cs index 3296220c..9a82a9bf 100644 --- a/Tests/ConnectionsPoolTests.cs +++ b/Tests/ConnectionsPoolTests.cs @@ -701,45 +701,130 @@ public async void TheProducerConsumerPoolShouldBeConsistentInMultiThreadCreateDe await client.Close("byte"); } - // // this test doesn't work since the client parameters metadata handler is not an event - // // for the moment I won't change the code. Introduced a new event is a breaking change - // - // // [Fact] - // // public async void TheProducerPoolShouldBeConsistentWhenAStreamIsDeleted() - // // { - // // var client = await Client.Create(new ClientParameters() { }); - // // const string Stream1 = "pool_test_stream_1_multi_thread_producer"; - // // await client.CreateStream(Stream1, new Dictionary()); - // // const int IdsPerConnection = 2; - // // var pool = new ConnectionsPool(0, IdsPerConnection); - // // var metaDataInfo = await client.QueryMetadata(new[] {Stream1}); - // // var producerList = new ConcurrentDictionary(); - // // - // // var tasksP = new List(); - // // for (var i = 0; i < (IdsPerConnection * 1); i++) - // // { - // // tasksP.Add(Task.Run(async () => - // // { - // // var p = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) - // // { - // // Pool = pool, - // // }, - // // metaDataInfo.StreamInfos[Stream1]); - // // producerList.TryAdd(Guid.NewGuid().ToString(), p); - // // })); - // // } - // // - // // await Task.WhenAll(tasksP); - // // - // // Assert.Equal(2, pool.Count); - // // Assert.Equal(IdsPerConnection, pool.Connections.Values.First().ActiveIds); - // // Assert.Equal(IdsPerConnection, pool.Connections.Values.Skip(1).First().ActiveIds); - // // - // // await client.DeleteStream(Stream1); - // // - // // SystemUtils.WaitUntil(() => pool.Count == 0); - // // Assert.Equal(0, pool.Count); - // // } + /// + /// The pool has 3 ids per connection. + /// Here we test the metadata update event. One connection can handle different + /// Streams so we need to be sure the pool is consistent when the metadata update handler is raised. + /// By default the metadata update removes the consumer from the server so we need to remove the consumers + /// from the pool. + /// + [Fact] + public async void TheConsumersPoolShouldBeConsistentWhenAStreamIsDeleted() + { + var client = await Client.Create(new ClientParameters() { }); + const string Stream1 = "pool_test_stream_1_delete_consumer"; + const string Stream2 = "pool_test_stream_2_delete_consumer"; + await client.CreateStream(Stream1, new Dictionary()); + await client.CreateStream(Stream2, new Dictionary()); + const int IdsPerConnection = 3; + var pool = new ConnectionsPool(0, IdsPerConnection); + var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 }); + var iConsumers = new ConcurrentDictionary(); + + var tasksP = new List(); + for (var i = 0; i < (IdsPerConnection * 2); i++) + { + tasksP.Add(Task.Run(async () => + { + var p = await RawConsumer.Create(client.Parameters, new RawConsumerConfig(Stream1) { Pool = pool, }, + metaDataInfo.StreamInfos[Stream1]); + iConsumers.TryAdd(Guid.NewGuid().ToString(), p); + })); + + tasksP.Add(Task.Run(async () => + { + var p2 = await RawConsumer.Create(client.Parameters, new RawConsumerConfig(Stream2) { Pool = pool, }, + metaDataInfo.StreamInfos[Stream2]); + iConsumers.TryAdd(Guid.NewGuid().ToString(), p2); + })); + } + + await Task.WhenAll(tasksP); + + // Here we have 4 connections ( IdsPerConnection * 2) + // one per stream + Assert.Equal(4, pool.ConnectionsCount); + await client.DeleteStream(Stream1); + // removed one stream so we should not have active ids for this stream + // we don't check the connection pool since the connections can be random + // so not sure how many connection can we have here. But it doesn't matter since we check the active ids + SystemUtils.WaitUntil(() => pool.ActiveIdsCountForStream(Stream1) == 0); + Assert.Equal(IdsPerConnection * 2, pool.ActiveIdsCount); + + await client.DeleteStream(Stream2); + // here we can check the pool. however the connections are distributed here must be 0 + SystemUtils.WaitUntil(() => pool.ConnectionsCount == 0); + // no active ids for the stream2 since we removed the stream + Assert.Equal(0, pool.ActiveIdsCountForStream(Stream2)); + Assert.Equal(0, pool.ActiveIdsCount); + + // no active consumers to the internal consumers list + iConsumers.Values.ToList().ForEach( + x => Assert.Empty(ConsumersIdsPerConnection(x))); + } + + /// + /// The pool has 3 ids per connection. + /// Here we test the metadata update event. One connection can handle different + /// Streams so we need to be sure the pool is consistent when the metadata update handler is raised. + /// By default the metadata update removes the producer from the server so we need to remove the producers + /// from the pool. + /// + [Fact] + public async void TheProducersPoolShouldBeConsistentWhenAStreamIsDeleted() + { + var client = await Client.Create(new ClientParameters() { }); + const string Stream1 = "pool_test_stream_1_delete_producer"; + const string Stream2 = "pool_test_stream_2_delete_producer"; + await client.CreateStream(Stream1, new Dictionary()); + await client.CreateStream(Stream2, new Dictionary()); + const int IdsPerConnection = 3; + var pool = new ConnectionsPool(0, IdsPerConnection); + var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 }); + var iProducers = new ConcurrentDictionary(); + + var tasksP = new List(); + for (var i = 0; i < (IdsPerConnection * 2); i++) + { + tasksP.Add(Task.Run(async () => + { + var p = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool, }, + metaDataInfo.StreamInfos[Stream1]); + iProducers.TryAdd(Guid.NewGuid().ToString(), p); + })); + + tasksP.Add(Task.Run(async () => + { + var p2 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream2) { Pool = pool, }, + metaDataInfo.StreamInfos[Stream2]); + iProducers.TryAdd(Guid.NewGuid().ToString(), p2); + })); + } + + await Task.WhenAll(tasksP); + + // Here we have 4 connections ( IdsPerConnection * 2) + // one per stream + Assert.Equal(4, pool.ConnectionsCount); + await client.DeleteStream(Stream1); + // removed one stream so we should not have active ids for this stream + // we don't check the connection pool since the connections can be random + // so not sure how many connection can we have here. But it doesn't matter since we check the active ids + SystemUtils.WaitUntil(() => pool.ActiveIdsCountForStream(Stream1) == 0); + Assert.Equal(IdsPerConnection * 2, pool.ActiveIdsCount); + + await client.DeleteStream(Stream2); + // here we can check the pool. however the connections are distributed here must be 0 + SystemUtils.WaitUntil(() => pool.ConnectionsCount == 0); + // no active ids for the stream2 since we removed the stream + Assert.Equal(0, pool.ActiveIdsCountForStream(Stream2)); + Assert.Equal(0, pool.ActiveIdsCount); + + // no active consumers to the internal producers list + iProducers.Values.ToList().ForEach( + x => Assert.Empty(ProducersIdsPerConnection(x))); + } + // /// /// The pool has 13 ids per connection. @@ -838,7 +923,6 @@ public async void ValidatePool() { await Assert.ThrowsAsync(async () => await StreamSystem.Create(new StreamSystemConfig() { ConnectionPoolConfig = null })); - } /// The following tests are related to the FindMissingConsecutive method