Skip to content

Commit

Permalink
Remove code duplication to handle the pool (#356)
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Feb 9, 2024
1 parent 66cbd13 commit 335a0f3
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 14 deletions.
11 changes: 5 additions & 6 deletions RabbitMQ.Stream.Client/ConnectionsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,13 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
// do we have a connection for this brokerInfo and with free slots for producer or consumer?
// it does not matter which connection is available
// the important is to have a connection available for the brokerInfo
var count = Connections.Values.Count(x => x.BrokerInfo == brokerInfo && x.Available);
var connectionItems = Connections.Values.Where(x => x.BrokerInfo == brokerInfo && x.Available).ToList();

if (count > 0)
if (connectionItems.Any())
{
// ok we have a connection available for this brokerInfo
// let's get the first one
// TODO: we can improve this by getting the connection with the less active items
var connectionItem = Connections.Values.First(x => x.BrokerInfo == brokerInfo && x.Available);
var connectionItem = connectionItems.OrderBy(x => x.EntitiesCount).First();
connectionItem.LastUsed = DateTime.UtcNow;

if (connectionItem.Client is not { IsClosed: true })
Expand All @@ -162,11 +161,11 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
// let's remove it from the pool
Connections.TryRemove(connectionItem.Client.ClientId, out _);
// let's create a new one
connectionItem = new ConnectionItem(brokerInfo, _idsPerConnection,
var newConnectionItem = new ConnectionItem(brokerInfo, _idsPerConnection,
await createClient().ConfigureAwait(false));
Connections.TryAdd(connectionItem.Client.ClientId, connectionItem);

return connectionItem.Client;
return newConnectionItem.Client;
}

if (_maxConnections > 0 && Connections.Count >= _maxConnections)
Expand Down
6 changes: 3 additions & 3 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,16 @@ public Task<ResponseCode> Close()
//<summary>
/// Returns lower from the LastPublishingId for all the producers
// </summary>
public Task<ulong> GetLastPublishingId()
public async Task<ulong> GetLastPublishingId()
{
foreach (var stream in _streamInfos.Keys.ToList())
{
MaybeAddAndGetProducer(stream).Wait();
await MaybeAddAndGetProducer(stream).ConfigureAwait(false);
}

var v = _producers.Values.Min(p => p.GetLastPublishingId().Result);

return Task.FromResult(v);
return v;
}

public bool IsOpen()
Expand Down
6 changes: 3 additions & 3 deletions docs/ReliableClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
Host = "node0",
Port = 5562,
LoadBalancer = false,
SuperStream = false,
SuperStream = true,
Streams = 3,
Producers = 10,
Producers = 2,
MessagesPerProducer = 10_000_000,
Consumers = 10,
Consumers = 5,
// Username = "test",
// Password = "test"
});
Expand Down
4 changes: 2 additions & 2 deletions docs/ReliableClient/RClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static async Task Start(Config config)
options.TimestampFormat = "[HH:mm:ss] ";
options.ColorBehavior = LoggerColorBehavior.Default;
})
.AddFilter(level => level >= LogLevel.Information)
.AddFilter(level => level >= LogLevel.Debug)
);
var loggerFactory = serviceCollection.BuildServiceProvider()
.GetService<ILoggerFactory>();
Expand Down Expand Up @@ -261,7 +261,7 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis
Properties = new Properties() {MessageId = $"hello{i}"}
};
await MaybeSend(producer, message, publishEvent).ConfigureAwait(false);
await Task.Delay(20).ConfigureAwait(false);
await Task.Delay(320).ConfigureAwait(false);
Interlocked.Increment(ref totalSent);
}
});
Expand Down

0 comments on commit 335a0f3

Please sign in to comment.