From 335a0f31b3d8a8dbb6525c69a24cee5b0d388c66 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 9 Feb 2024 13:18:04 +0100 Subject: [PATCH] Remove code duplication to handle the pool (#356) Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/ConnectionsPool.cs | 11 +++++------ RabbitMQ.Stream.Client/RawSuperStreamProducer.cs | 6 +++--- docs/ReliableClient/Program.cs | 6 +++--- docs/ReliableClient/RClient.cs | 4 ++-- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/RabbitMQ.Stream.Client/ConnectionsPool.cs b/RabbitMQ.Stream.Client/ConnectionsPool.cs index 6d3e9dd7..5eb0a0c4 100644 --- a/RabbitMQ.Stream.Client/ConnectionsPool.cs +++ b/RabbitMQ.Stream.Client/ConnectionsPool.cs @@ -145,14 +145,13 @@ internal async Task GetOrCreateClient(string brokerInfo, Func x.BrokerInfo == brokerInfo && x.Available); + var connectionItems = Connections.Values.Where(x => x.BrokerInfo == brokerInfo && x.Available).ToList(); - if (count > 0) + if (connectionItems.Any()) { // ok we have a connection available for this brokerInfo // let's get the first one - // TODO: we can improve this by getting the connection with the less active items - var connectionItem = Connections.Values.First(x => x.BrokerInfo == brokerInfo && x.Available); + var connectionItem = connectionItems.OrderBy(x => x.EntitiesCount).First(); connectionItem.LastUsed = DateTime.UtcNow; if (connectionItem.Client is not { IsClosed: true }) @@ -162,11 +161,11 @@ internal async Task GetOrCreateClient(string brokerInfo, Func 0 && Connections.Count >= _maxConnections) diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index 9b07d787..c351609f 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -311,16 +311,16 @@ public Task Close() // /// Returns lower from the LastPublishingId for all the producers // - public Task GetLastPublishingId() + public async Task GetLastPublishingId() { foreach (var stream in _streamInfos.Keys.ToList()) { - MaybeAddAndGetProducer(stream).Wait(); + await MaybeAddAndGetProducer(stream).ConfigureAwait(false); } var v = _producers.Values.Min(p => p.GetLastPublishingId().Result); - return Task.FromResult(v); + return v; } public bool IsOpen() diff --git a/docs/ReliableClient/Program.cs b/docs/ReliableClient/Program.cs index 62c0f6bf..163a7d1e 100644 --- a/docs/ReliableClient/Program.cs +++ b/docs/ReliableClient/Program.cs @@ -12,11 +12,11 @@ Host = "node0", Port = 5562, LoadBalancer = false, - SuperStream = false, + SuperStream = true, Streams = 3, - Producers = 10, + Producers = 2, MessagesPerProducer = 10_000_000, - Consumers = 10, + Consumers = 5, // Username = "test", // Password = "test" }); diff --git a/docs/ReliableClient/RClient.cs b/docs/ReliableClient/RClient.cs index cbb8e961..76bbef09 100644 --- a/docs/ReliableClient/RClient.cs +++ b/docs/ReliableClient/RClient.cs @@ -43,7 +43,7 @@ public static async Task Start(Config config) options.TimestampFormat = "[HH:mm:ss] "; options.ColorBehavior = LoggerColorBehavior.Default; }) - .AddFilter(level => level >= LogLevel.Information) + .AddFilter(level => level >= LogLevel.Debug) ); var loggerFactory = serviceCollection.BuildServiceProvider() .GetService(); @@ -261,7 +261,7 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis Properties = new Properties() {MessageId = $"hello{i}"} }; await MaybeSend(producer, message, publishEvent).ConfigureAwait(false); - await Task.Delay(20).ConfigureAwait(false); + await Task.Delay(320).ConfigureAwait(false); Interlocked.Increment(ref totalSent); } });