diff --git a/RabbitMQ.Stream.Client/ClientExceptions.cs b/RabbitMQ.Stream.Client/ClientExceptions.cs
index de3698c1..71d47c6f 100644
--- a/RabbitMQ.Stream.Client/ClientExceptions.cs
+++ b/RabbitMQ.Stream.Client/ClientExceptions.cs
@@ -38,7 +38,8 @@ internal static bool IsAKnownException(Exception exception)
return x.Any();
}
- return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException or OperationCanceledException) ||
+ return exception is (SocketException or TimeoutException or LeaderNotFoundException
+ or InvalidOperationException or OperationCanceledException) ||
IsStreamNotAvailable(exception);
}
@@ -191,4 +192,12 @@ public TooManyConnectionsException(string s)
{
}
}
+
+ public class PendingConnectionsException : Exception
+ {
+ public PendingConnectionsException(string s)
+ : base(s)
+ {
+ }
+ }
}
diff --git a/RabbitMQ.Stream.Client/ConnectionsPool.cs b/RabbitMQ.Stream.Client/ConnectionsPool.cs
index 2ed1028f..0a37b535 100644
--- a/RabbitMQ.Stream.Client/ConnectionsPool.cs
+++ b/RabbitMQ.Stream.Client/ConnectionsPool.cs
@@ -11,6 +11,41 @@
namespace RabbitMQ.Stream.Client;
+public enum ConnectionClosePolicy
+{
+ ///
+ /// The connection is closed when the last consumer or producer is removed.
+ ///
+ CloseWhenEmpty,
+
+ ///
+ /// The connection is closed when the last consumer or producer is removed and the connection is not used for a certain time.
+ ///
+ CloseWhenEmptyAndIdle
+}
+
+public class ConnectionCloseConfig
+{
+ ///
+ /// Policy to close the connection.
+ ///
+
+ public ConnectionClosePolicy Policy { get; set; } = ConnectionClosePolicy.CloseWhenEmpty;
+
+ ///
+ /// The connection is closed when the last consumer or producer is removed and the connection is not used for a certain time.
+ /// Idle time is valid only if the policy is CloseWhenEmptyAndIdle.
+ ///
+ public TimeSpan IdleTime { get; set; } = TimeSpan.FromMinutes(5);
+
+ ///
+ /// Interval to check the idle time.
+ /// Default is high because the check is done in a separate thread.
+ /// The filed is internal to help the test.
+ ///
+ internal TimeSpan CheckIdleTime { get; set; } = TimeSpan.FromSeconds(60);
+}
+
public class ConnectionPoolConfig
{
///
@@ -30,6 +65,11 @@ public class ConnectionPoolConfig
/// but it is not the best for performance.
///
public byte ProducersPerConnection { get; set; } = 1;
+
+ ///
+ /// Define the connection close policy.
+ ///
+ public ConnectionCloseConfig ConnectionCloseConfig { get; set; } = new ConnectionCloseConfig();
}
public class LastSecret
@@ -87,9 +127,10 @@ public bool Available
/// subscriptionIds
/// publisherIds
///
-public class ConnectionsPool
+public class ConnectionsPool : IDisposable
{
private static readonly object s_lock = new();
+ private bool _isRunning = false;
internal static byte FindNextValidId(List ids, byte nextId = 0)
{
@@ -127,16 +168,56 @@ internal static byte FindNextValidId(List ids, byte nextId = 0)
private readonly byte _idsPerConnection;
private readonly SemaphoreSlim _semaphoreSlim = new(1, 1);
private readonly LastSecret _lastSecret = new();
+ private readonly Task _checkIdleConnectionTimeTask;
///
/// Init the pool with the max connections and the max ids per connection
///
/// The max connections are allowed for session
/// The max ids per Connection
- public ConnectionsPool(int maxConnections, byte idsPerConnection)
+ /// Policy to close the connections in the pool
+ public ConnectionsPool(int maxConnections, byte idsPerConnection, ConnectionCloseConfig connectionCloseConfig)
{
_maxConnections = maxConnections;
_idsPerConnection = idsPerConnection;
+ ConnectionPoolConfig = connectionCloseConfig;
+ _isRunning = true;
+ if (ConnectionPoolConfig.Policy == ConnectionClosePolicy.CloseWhenEmptyAndIdle)
+ {
+ _checkIdleConnectionTimeTask = Task.Run(CheckIdleConnectionTime);
+ }
+ }
+
+ private ConnectionCloseConfig ConnectionPoolConfig { get; }
+
+ private async Task CheckIdleConnectionTime()
+ {
+ while (_isRunning)
+ {
+ await Task.Delay(ConnectionPoolConfig.CheckIdleTime)
+ .ConfigureAwait(false);
+
+ if (!_isRunning)
+ {
+ var now = DateTime.UtcNow;
+ var connectionItems = Connections.Values.ToList();
+ foreach (var connectionItem in connectionItems.Where(connectionItem =>
+ connectionItem.EntitiesCount == 0 &&
+ connectionItem.LastUsed.Add(ConnectionPoolConfig.IdleTime) < now))
+ {
+ CloseItemAndConnection("Idle connection", connectionItem);
+ }
+ }
+ else
+ {
+ var connectionItems = Connections.Values.ToList();
+ foreach (var connectionItem in connectionItems.Where(
+ connectionItem => connectionItem.EntitiesCount == 0))
+ {
+ CloseItemAndConnection("Idle connection", connectionItem);
+ }
+ }
+ }
}
///
@@ -208,10 +289,7 @@ public bool TryMergeClientParameters(ClientParameters clientParameters, out Clie
return false;
}
- cp = clientParameters with
- {
- Password = _lastSecret.Secret
- };
+ cp = clientParameters with { Password = _lastSecret.Secret };
return true;
}
@@ -264,13 +342,12 @@ public void MaybeClose(string clientId, string reason)
return;
}
- // close the connection
- connectionItem.Client.Close(reason);
+ connectionItem.LastUsed = DateTime.UtcNow;
- // remove the connection from the pool
- // it means that the connection is closed
- // we don't care if it is called two times for the same connection
- Connections.TryRemove(clientId, out _);
+ if (ConnectionPoolConfig.Policy == ConnectionClosePolicy.CloseWhenEmpty)
+ {
+ CloseItemAndConnection(reason, connectionItem);
+ }
}
finally
{
@@ -278,6 +355,18 @@ public void MaybeClose(string clientId, string reason)
}
}
+ private void CloseItemAndConnection(string reason, ConnectionItem connectionItem)
+ {
+ // close the connection
+ connectionItem.Client.Close(reason);
+ // remove the connection from the pool
+ // it means that the connection is closed
+ // we don't care if it is called two times for the same connection
+ Connections.TryRemove(connectionItem.Client.ClientId, out _);
+ }
+
+ internal int PendingConnections => Connections.Values.Count(x => x.EntitiesCount > 0);
+
///
/// Removes the consumer entity from the client.
/// When the metadata update is called we need to remove the consumer entity from the client.
@@ -328,4 +417,33 @@ public void RemoveProducerEntityFromStream(string clientId, byte id, string stre
}
public int ConnectionsCount => Connections.Count;
+
+ public async Task Close()
+ {
+ // The pool can't be closed if there are pending connections with the policy: CloseWhenEmptyAndIdle
+ // else there is no way to close the pending connections.
+ // The user needs to close the pending connections before to close the pool.
+ // At the moment when the pool is closed the pending connections are not closed with CloseWhenEmpty
+ // because the pool is not strictly bound to the stream system.
+ // The StreamSystem doesn't close the connections when it is closed. That was by design
+ // We could consider (Version 2.0) to close all the Producers and Consumers and their connection when the StreamSystem is closed.
+ // Other clients like Java and Golang close the connections when the Environment (alias StreamSystem) is closed.
+ if (PendingConnections > 0 && ConnectionPoolConfig.Policy == ConnectionClosePolicy.CloseWhenEmptyAndIdle)
+ {
+ throw new PendingConnectionsException(
+ $"There are {PendingConnections} pending connections. With the policy CloseWhenEmptyAndIdle you need to close them");
+ }
+
+ _isRunning = false;
+ if (_checkIdleConnectionTimeTask is not null)
+ {
+ await _checkIdleConnectionTimeTask.ConfigureAwait(false);
+ }
+ }
+
+ public void Dispose()
+ {
+ _semaphoreSlim.Dispose();
+ GC.SuppressFinalize(this);
+ }
}
diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
index aeace579..1de1ca34 100644
--- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
+++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
@@ -57,6 +57,15 @@ RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Clie
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.get -> System.TimeSpan
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.set -> void
RabbitMQ.Stream.Client.Connection.UpdateCloseStatus(string reason) -> void
+RabbitMQ.Stream.Client.ConnectionCloseConfig
+RabbitMQ.Stream.Client.ConnectionCloseConfig.ConnectionCloseConfig() -> void
+RabbitMQ.Stream.Client.ConnectionCloseConfig.IdleTime.get -> System.TimeSpan
+RabbitMQ.Stream.Client.ConnectionCloseConfig.IdleTime.set -> void
+RabbitMQ.Stream.Client.ConnectionCloseConfig.Policy.get -> RabbitMQ.Stream.Client.ConnectionClosePolicy
+RabbitMQ.Stream.Client.ConnectionCloseConfig.Policy.set -> void
+RabbitMQ.Stream.Client.ConnectionClosePolicy
+RabbitMQ.Stream.Client.ConnectionClosePolicy.CloseWhenEmpty = 0 -> RabbitMQ.Stream.Client.ConnectionClosePolicy
+RabbitMQ.Stream.Client.ConnectionClosePolicy.CloseWhenEmptyAndIdle = 1 -> RabbitMQ.Stream.Client.ConnectionClosePolicy
RabbitMQ.Stream.Client.ConnectionItem
RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool
RabbitMQ.Stream.Client.ConnectionItem.BrokerInfo.get -> string
@@ -67,14 +76,18 @@ RabbitMQ.Stream.Client.ConnectionItem.IdsPerConnection.get -> byte
RabbitMQ.Stream.Client.ConnectionItem.LastUsed.get -> System.DateTime
RabbitMQ.Stream.Client.ConnectionItem.LastUsed.set -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig
+RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionCloseConfig.get -> RabbitMQ.Stream.Client.ConnectionCloseConfig
+RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionCloseConfig.set -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionPoolConfig() -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConsumersPerConnection.get -> byte
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConsumersPerConnection.set -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig.ProducersPerConnection.get -> byte
RabbitMQ.Stream.Client.ConnectionPoolConfig.ProducersPerConnection.set -> void
RabbitMQ.Stream.Client.ConnectionsPool
+RabbitMQ.Stream.Client.ConnectionsPool.Close() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.ConnectionsPool.ConnectionsCount.get -> int
-RabbitMQ.Stream.Client.ConnectionsPool.ConnectionsPool(int maxConnections, byte idsPerConnection) -> void
+RabbitMQ.Stream.Client.ConnectionsPool.ConnectionsPool(int maxConnections, byte idsPerConnection, RabbitMQ.Stream.Client.ConnectionCloseConfig connectionCloseConfig) -> void
+RabbitMQ.Stream.Client.ConnectionsPool.Dispose() -> void
RabbitMQ.Stream.Client.ConnectionsPool.MaybeClose(string clientId, string reason) -> void
RabbitMQ.Stream.Client.ConnectionsPool.Remove(string clientId) -> void
RabbitMQ.Stream.Client.ConnectionsPool.RemoveConsumerEntityFromStream(string clientId, byte id, string stream) -> void
@@ -194,6 +207,8 @@ RabbitMQ.Stream.Client.PartitionsSuperStreamSpec
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.Partitions.get -> int
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name) -> void
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name, int partitions) -> void
+RabbitMQ.Stream.Client.PendingConnectionsException
+RabbitMQ.Stream.Client.PendingConnectionsException.PendingConnectionsException(string s) -> void
RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs
index cc7c8c8d..2ee4cbfd 100644
--- a/RabbitMQ.Stream.Client/StreamSystem.cs
+++ b/RabbitMQ.Stream.Client/StreamSystem.cs
@@ -73,13 +73,13 @@ private StreamSystem(ClientParameters clientParameters, Client client,
_clientParameters = clientParameters;
_client = client;
_logger = logger ?? NullLogger.Instance;
- // we don't expose the the max connections per producer/consumer
+ // we don't expose the max connections per producer/consumer
// for the moment. We can expose it in the future if needed
PoolConsumers = new ConnectionsPool(0,
- connectionPoolConfig.ConsumersPerConnection);
+ connectionPoolConfig.ConsumersPerConnection, connectionPoolConfig.ConnectionCloseConfig);
PoolProducers = new ConnectionsPool(0,
- connectionPoolConfig.ProducersPerConnection);
+ connectionPoolConfig.ProducersPerConnection, connectionPoolConfig.ConnectionCloseConfig);
}
public bool IsClosed => _client.IsClosed;
@@ -139,6 +139,23 @@ public static async Task Create(StreamSystemConfig config, ILogger
public async Task Close()
{
await _client.Close("system close").ConfigureAwait(false);
+
+ try
+ {
+ await PoolConsumers.Close()
+ .ConfigureAwait(false);
+ await PoolProducers.Close()
+ .ConfigureAwait(false);
+ }
+ catch
+ {
+ }
+ finally
+ {
+ PoolConsumers.Dispose();
+ PoolProducers.Dispose();
+ }
+
_logger?.LogDebug("Client Closed");
}
diff --git a/Tests/ClientTests.cs b/Tests/ClientTests.cs
index fdb12a39..a1f4fd0f 100644
--- a/Tests/ClientTests.cs
+++ b/Tests/ClientTests.cs
@@ -112,7 +112,7 @@ public async void DeclarePublisherShouldReturnErrorCode()
var (publisherId, result) =
await client.DeclarePublisher(publisherRef, "this-stream-does-not-exist", confirmed, errored,
- new ConnectionsPool(0, 1));
+ new ConnectionsPool(0, 1, new ConnectionCloseConfig()));
Assert.Equal(ResponseCode.StreamDoesNotExist, result.ResponseCode);
await client.Close("done");
}
@@ -124,7 +124,7 @@ public async void DeclareConsumerShouldReturnErrorCode()
var client = await Client.Create(clientParameters);
var (subId, subscribeResponse) = await client.Subscribe(
"this-stream-does-not-exist", new OffsetTypeLast(), 1,
- new Dictionary(), null, null, new ConnectionsPool(0, 1));
+ new Dictionary(), null, null, new ConnectionsPool(0, 1, new ConnectionCloseConfig()));
Assert.Equal(ResponseCode.StreamDoesNotExist, subscribeResponse.ResponseCode);
await client.Close("done");
}
diff --git a/Tests/ConnectionsPoolTests.cs b/Tests/ConnectionsPoolTests.cs
index dfa86ea0..5b5145a8 100644
--- a/Tests/ConnectionsPoolTests.cs
+++ b/Tests/ConnectionsPoolTests.cs
@@ -46,7 +46,7 @@ public ConnectionsPoolTests(ITestOutputHelper testOutputHelper)
[Fact]
public void ValidatePoolConsistencyWithMultiBrokers()
{
- var pool = new ConnectionsPool(0, 10);
+ var pool = new ConnectionsPool(0, 10, new ConnectionCloseConfig());
// var brokerNode1 = new Broker("node0", 5552);
// var brokerNode2 = new Broker("node1", 5552);
@@ -79,10 +79,12 @@ public async void RoutingShouldReturnTwoConnectionsGivenOneItemPerConnection()
var clientParameters = new ClientParameters { Endpoint = new DnsEndPoint("localhost", 3939) };
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("localhost", 3939),
new List());
- var pool = new ConnectionsPool(0, 1);
- var c1 = await RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, pool);
+ var pool = new ConnectionsPool(0, 1, new ConnectionCloseConfig());
+ var c1 = await RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters,
+ metaDataInfo, pool);
c1.Consumers.Add(1, default);
- var c2 = await RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, pool);
+ var c2 = await RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters,
+ metaDataInfo, pool);
c2.Consumers.Add(1, default);
// here we have two different connections
// and must be different since we have only one id per connection
@@ -101,7 +103,7 @@ public async void RoutingShouldReturnOneConnectionsGivenTwoIdPerConnection()
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("localhost", 3939),
new List());
- var pool = new ConnectionsPool(0, 2);
+ var pool = new ConnectionsPool(0, 2, new ConnectionCloseConfig());
var c1 = await RoutingHelper.LookupLeaderConnection(clientParameters, metaDataInfo, pool);
var c2 = await RoutingHelper.LookupLeaderConnection(clientParameters, metaDataInfo, pool);
// here we have one connection with two ids
@@ -121,7 +123,7 @@ public async void RoutingShouldReturnTwoConnectionsGivenOneIdPerConnectionDiffer
var clientParameters = new ClientParameters { Endpoint = new DnsEndPoint("Node1", 3939) };
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("Node1", 3939),
new List());
- var pool = new ConnectionsPool(0, 2);
+ var pool = new ConnectionsPool(0, 2, new ConnectionCloseConfig());
var c1 = await RoutingHelper.LookupLeaderConnection(clientParameters, metaDataInfo, pool);
Assert.Equal(1, pool.ConnectionsCount);
@@ -146,7 +148,7 @@ public async void RoutingShouldReturnTwoConnectionsGivenOneIdPerConnectionDiffer
[Fact]
public async void RoutingShouldReturnTwoConnectionsGivenTreeIdsForConnection()
{
- var pool = new ConnectionsPool(0, 3);
+ var pool = new ConnectionsPool(0, 3, new ConnectionCloseConfig());
var clientParameters = new ClientParameters { Endpoint = new DnsEndPoint("Node1", 3939) };
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("Node1", 3939),
new List());
@@ -177,7 +179,7 @@ public async void RoutingShouldReturnTwoConnectionsGivenTreeIdsForConnection()
[Fact]
public async void ReleaseFromThePoolShouldNotRemoveTheConnection()
{
- var pool = new ConnectionsPool(0, 3);
+ var pool = new ConnectionsPool(0, 3, new ConnectionCloseConfig());
var clientParameters = new ClientParameters { Endpoint = new DnsEndPoint("Node1", 3939) };
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("Node1", 3939),
new List());
@@ -208,7 +210,7 @@ public async void ReleaseFromThePoolShouldNotRemoveTheConnection()
[Fact]
public async void RaiseExceptionWhenThePoolIsFull()
{
- var pool = new ConnectionsPool(1, 2);
+ var pool = new ConnectionsPool(1, 2, new ConnectionCloseConfig());
var clientParameters = new ClientParameters { Endpoint = new DnsEndPoint("Node1", 3939) };
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("Node1", 3939),
new List());
@@ -236,7 +238,7 @@ public async void TwoConsumersShouldShareTheSameConnectionFromThePool()
await client.CreateStream(Stream1, new Dictionary());
await client.CreateStream(Stream2, new Dictionary());
- var pool = new ConnectionsPool(0, 2);
+ var pool = new ConnectionsPool(0, 2, new ConnectionCloseConfig());
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var c1 = await RawConsumer.Create(
@@ -276,7 +278,7 @@ public async void TwoProducersShouldShareTheSameConnectionFromThePool()
await client.CreateStream(Stream1, new Dictionary());
await client.CreateStream(Stream2, new Dictionary());
- var pool = new ConnectionsPool(0, 2);
+ var pool = new ConnectionsPool(0, 2, new ConnectionCloseConfig());
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var p1 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool },
metaDataInfo.StreamInfos[Stream1]);
@@ -317,7 +319,7 @@ public async void TwoProducerAndConsumerShouldHaveDifferentConnection()
await client.CreateStream(Stream1, new Dictionary());
await client.CreateStream(Stream2, new Dictionary());
- var pool = new ConnectionsPool(0, 1);
+ var pool = new ConnectionsPool(0, 1, new ConnectionCloseConfig());
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var c1 = await RawConsumer.Create(parameters, new RawConsumerConfig(Stream1) { Pool = pool },
metaDataInfo.StreamInfos[Stream1]);
@@ -371,7 +373,7 @@ public async void DeliverToTheRightConsumerEvenShareTheConnection()
await client.CreateStream(Stream2, new Dictionary());
var testPassedC1 = new TaskCompletionSource();
- var poolConsumer = new ConnectionsPool(0, 2);
+ var poolConsumer = new ConnectionsPool(0, 2, new ConnectionCloseConfig());
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var c1 = await RawConsumer.Create(parameters,
new RawConsumerConfig(Stream1)
@@ -399,9 +401,10 @@ await RawConsumer.Create(parameters,
},
metaDataInfo.StreamInfos[Stream2]);
- var poolProducer = new ConnectionsPool(0, 2);
+ var poolProducer = new ConnectionsPool(0, 2, new ConnectionCloseConfig());
- var p1 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = poolProducer, },
+ var p1 = await RawProducer.Create(client.Parameters,
+ new RawProducerConfig(Stream1) { Pool = poolProducer, },
metaDataInfo.StreamInfos[Stream1]);
var p2 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream2) { Pool = poolProducer },
@@ -436,7 +439,7 @@ public async void RaiseErrorInCaseThePoolIsFull()
await client.CreateStream(Stream1, new Dictionary());
await client.CreateStream(Stream2, new Dictionary());
- var pool = new ConnectionsPool(1, 1);
+ var pool = new ConnectionsPool(1, 1, new ConnectionCloseConfig());
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var p1 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream2) { Pool = pool },
metaDataInfo.StreamInfos[Stream2]);
@@ -465,7 +468,7 @@ public async void ValidateTheRecycleIDs()
const string Stream1 = "pool_test_stream_1_validate_ids";
await client.CreateStream(Stream1, new Dictionary());
- var pool = new ConnectionsPool(0, 50);
+ var pool = new ConnectionsPool(0, 50, new ConnectionCloseConfig());
// MetaDataResponse metaDataInfo;
var metaDataInfo = await client.QueryMetadata(new[] { Stream1 });
var p = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool },
@@ -497,7 +500,7 @@ public async void PoolShouldBeConsistentWhenErrorDuringCreatingProducerOrConsume
var client = await Client.Create(new ClientParameters() { });
const string Stream1 = "this_stream_does_not_exist";
- var pool = new ConnectionsPool(0, 1);
+ var pool = new ConnectionsPool(0, 1, new ConnectionCloseConfig());
var metaDataInfo =
new StreamInfo(Stream1, ResponseCode.Ok, new Broker("localhost", 5552), new List());
@@ -547,7 +550,7 @@ public async void TheProducerPoolShouldBeConsistentInMultiThread()
const string Stream1 = "pool_test_stream_1_multi_thread_producer";
await client.CreateStream(Stream1, new Dictionary());
const int IdsPerConnection = 17;
- var pool = new ConnectionsPool(0, IdsPerConnection);
+ var pool = new ConnectionsPool(0, IdsPerConnection, new ConnectionCloseConfig());
var metaDataInfo = await client.QueryMetadata(new[] { Stream1 });
var producerList = new ConcurrentDictionary();
@@ -614,7 +617,7 @@ public async void TheConsumersPoolShouldBeConsistentWhenAStreamIsDeleted()
await client.CreateStream(Stream1, new Dictionary());
await client.CreateStream(Stream2, new Dictionary());
const int IdsPerConnection = 3;
- var pool = new ConnectionsPool(0, IdsPerConnection);
+ var pool = new ConnectionsPool(0, IdsPerConnection, new ConnectionCloseConfig());
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var iConsumers = new ConcurrentDictionary();
@@ -663,7 +666,7 @@ public async void TheProducersPoolShouldBeConsistentWhenAStreamIsDeleted()
await client.CreateStream(Stream1, new Dictionary());
await client.CreateStream(Stream2, new Dictionary());
const int IdsPerConnection = 3;
- var pool = new ConnectionsPool(0, IdsPerConnection);
+ var pool = new ConnectionsPool(0, IdsPerConnection, new ConnectionCloseConfig());
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var iProducers = new ConcurrentDictionary();
@@ -709,7 +712,7 @@ public async void ThePoolShouldBeConsistentWhenTheConnectionIsClosed()
await client.CreateStream(Stream1, new Dictionary());
await client.CreateStream(Stream2, new Dictionary());
const int IdsPerConnection = 3;
- var pool = new ConnectionsPool(0, IdsPerConnection);
+ var pool = new ConnectionsPool(0, IdsPerConnection, new ConnectionCloseConfig());
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var c1 = await RawConsumer.Create(client.Parameters,
@@ -897,5 +900,79 @@ public void RecycleIdsWhenTheMaxIsReachedAndStartWithAnId()
ids.Add(nextValidId);
Assert.Equal(11, nextValidId);
}
+
+ [Fact]
+ public void ValidatePoolConsistencyWithClosePolicy()
+ {
+ var pool = new ConnectionsPool(0, 10,
+ new ConnectionCloseConfig()
+ {
+ Policy = ConnectionClosePolicy.CloseWhenEmptyAndIdle,
+ IdleTime = TimeSpan.FromSeconds(1)
+ });
+ Assert.Equal(0, pool.ConnectionsCount);
+ }
+
+ ///
+ /// Test the pool consistency when the policy is CloseWhenEmptyAndIdle
+ /// After producer and consumer are closed the pool should be NOT empty
+ /// It will be closed after 2 seconds for idle activity
+ ///
+ [Fact]
+ public async void CloseWhenEmptyAndIdlePoolConsistencyShouldCloseAfterTenSeconds()
+ {
+ var client = await Client.Create(new ClientParameters() { });
+ const string Stream1 = "PoolConsistencyShouldCloseAfterTenSeconds";
+ await client.CreateStream(Stream1, new Dictionary());
+
+ var pool = new ConnectionsPool(0, 1,
+ new ConnectionCloseConfig()
+ {
+ IdleTime = TimeSpan.FromSeconds(2),
+ Policy = ConnectionClosePolicy.CloseWhenEmptyAndIdle,
+ CheckIdleTime = TimeSpan.FromMilliseconds(500)
+ });
+ var metaDataInfo = await client.QueryMetadata(new[] { Stream1 });
+ var p1 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool },
+ metaDataInfo.StreamInfos[Stream1]);
+
+ var c1 = await RawConsumer.Create(client.Parameters, new RawConsumerConfig(Stream1) { Pool = pool },
+ metaDataInfo.StreamInfos[Stream1]);
+ await p1.Close();
+ await c1.Close();
+ Assert.Equal(2, pool.ConnectionsCount);
+
+ SystemUtils.WaitUntil(() => pool.ConnectionsCount == 0);
+ await pool.Close();
+ await client.Close("byte");
+ }
+
+ ///
+ /// If there are pending connections and the policy is CloseWhenEmptyAndIdle
+ /// the pool can't be closed. It should raise an exception
+ ///
+
+ [Fact]
+ public async void RaisePendingConnectionsExceptionWhenPolicyIsCloseWhenEmptyAndIdle()
+ {
+ var client = await Client.Create(new ClientParameters() { });
+ const string Stream1 = "PoolConsistencyShouldCloseAfterTenSeconds";
+ await client.CreateStream(Stream1, new Dictionary());
+
+ var pool = new ConnectionsPool(0, 1,
+ new ConnectionCloseConfig()
+ {
+ IdleTime = TimeSpan.FromMilliseconds(1000),
+ Policy = ConnectionClosePolicy.CloseWhenEmptyAndIdle,
+ CheckIdleTime = TimeSpan.FromMilliseconds(500)
+ });
+ var metaDataInfo = await client.QueryMetadata(new[] { Stream1 });
+ var p1 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool },
+ metaDataInfo.StreamInfos[Stream1]);
+ await Assert.ThrowsAsync(async () => await pool.Close());
+ await p1.Close();
+ await pool.Close();
+ await client.Close("byte");
+ }
}
}
diff --git a/Tests/UnitTests.cs b/Tests/UnitTests.cs
index ced5ca57..1da68b26 100644
--- a/Tests/UnitTests.cs
+++ b/Tests/UnitTests.cs
@@ -133,7 +133,6 @@ public Task CreateClient(ClientParameters clientParameters, Broker brok
{
ConnectionProperties = new Dictionary()
{
-
["advertised_port"] = "5553",
["advertised_host"] = "replica2"
}
@@ -161,7 +160,7 @@ public async Task GiveProperExceptionWhenUnableToConnect()
new List());
await Assert.ThrowsAsync(() =>
RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo,
- new ConnectionsPool(1, 1)));
+ new ConnectionsPool(1, 1, new ConnectionCloseConfig())));
}
[Fact]
@@ -174,7 +173,7 @@ public async Task AddressResolverShouldRaiseAnExceptionIfAdvIsNull()
// run more than one time just to be sure to use all the IP with random
await Assert.ThrowsAsync(() =>
RoutingHelper.LookupLeaderConnection(clientParameters, metaDataInfo,
- new ConnectionsPool(1, 1)));
+ new ConnectionsPool(1, 1, new ConnectionCloseConfig())));
}
[Fact]
@@ -188,7 +187,7 @@ public void AddressResolverLoadBalancerSimulate()
for (var i = 0; i < 4; i++)
{
var client = RoutingHelper.LookupLeaderConnection(clientParameters, metaDataInfo,
- new ConnectionsPool(1, 1));
+ new ConnectionsPool(1, 1, new ConnectionCloseConfig()));
Assert.Equal("node2", client.Result.ConnectionProperties["advertised_host"]);
Assert.Equal("5552", client.Result.ConnectionProperties["advertised_port"]);
}
@@ -205,7 +204,7 @@ public void DnsAddressResolverLoadBalancerSimulate()
for (var i = 0; i < 4; i++)
{
var client = RoutingHelper.LookupLeaderConnection(clientParameters, metaDataInfo,
- new ConnectionsPool(1, 1));
+ new ConnectionsPool(1, 1, new ConnectionCloseConfig()));
Assert.Equal("node2", client.Result.ConnectionProperties["advertised_host"]);
Assert.Equal("5552", client.Result.ConnectionProperties["advertised_port"]);
}
@@ -234,7 +233,7 @@ public void RandomReplicaLeader()
new List());
var client =
RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo,
- new ConnectionsPool(1, 1));
+ new ConnectionsPool(1, 1, new ConnectionCloseConfig()));
Assert.Equal("5552", client.Result.ConnectionProperties["advertised_port"]);
var res = (client.Result.ConnectionProperties["advertised_host"] == "leader" ||
client.Result.ConnectionProperties["advertised_host"] == "replica");
@@ -249,13 +248,10 @@ public void RandomOnlyReplicaIfThereAre()
var clientParameters = new ClientParameters() { AddressResolver = addressResolver, };
var metaDataInfo = new StreamInfo("stream",
ResponseCode.Ok, new Broker("leader", 5552),
- new List()
- {
- new Broker("replica2", 5553),
- });
+ new List() { new Broker("replica2", 5553), });
var client =
RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo,
- new ConnectionsPool(1, 1));
+ new ConnectionsPool(1, 1, new ConnectionCloseConfig()));
Assert.Equal("5553", client.Result.ConnectionProperties["advertised_port"]);
var res = (client.Result.ConnectionProperties["advertised_host"] == "replica2");
Assert.True(res);
diff --git a/docs/asciidoc/api.adoc b/docs/asciidoc/api.adoc
index 424e7fea..e54c6dbd 100644
--- a/docs/asciidoc/api.adoc
+++ b/docs/asciidoc/api.adoc
@@ -142,7 +142,7 @@ The following table sums up the main settings to create an `StreamSystem` using
[[connection-pool]]
===== Connection pool
Introduced on version 1.8.0.
-With the connection pool you can define how many producers and consumers can be created on a single connection.
+With the connection pool you can define how many producers and consumers can be created on a single connection and the `ConnectionCloseConfig`
[source]
----
@@ -150,15 +150,19 @@ With the connection pool you can define how many producers and consumers can be
{
ProducersPerConnection = 2,
ConsumersPerConnection = 3,
+ ConnectionCloseConfig = new ConnectionCloseConfig()
+ {
+ Policy = ConnectionClosePolicy.CloseWhenEmpty,
+ }
}
----
-By default the connection pool is set to 1 producer and 1 consumer per connection.
+By default, the connection pool is set to 1 producer and 1 consumer per connection.
The maximum number of producers and consumers per connection is 200.
-An high value can reduce the number of connections to the server but it could reduce the performance of the producer and the consumer.
+A high value can reduce the number of connections to the server but it could reduce the performance of the producer and the consumer.
-An low value can increase the number of connections to the server but it could increase the performance of the producer and the consumer.
+A low value can increase the number of connections to the server but it could increase the performance of the producer and the consumer.
The consumers share the same handler, so if you have a high number of consumers per connection, the handler could be a bottleneck. It means that if there is a slow consumer all the other consumers could be slow.
@@ -184,7 +188,31 @@ streamSystemToIncreaseThePerformances = new StreamSystemConfig{
----
There is not a magic number, you have to test and evaluate the best value for your use case.
+The `ConnectionCloseConfig` defines the policy to close the connection when the last producer or consumer is closed.
+- `CloseWhenEmpty` the connection is closed when the last producer or consumer is closed.
+- `CloseWhenEmptyAndIdle` the connection is closed when the last producer or consumer is closed and the connection is idle for a certain amount of time.
+
+The policy `CloseWhenEmpty` covers the standard use cases when the producers or consumers have long life running.
+
+The policy `CloseWhenEmptyAndIdle` is useful when producers or consumers have short live and the pool has to be fast to create a new entity.
+The parameter `IdleTime` defines the time to wait before closing the connection when the last producer or consumer is closed.
+The parameter `CheckIdleTime` defines the time to check if the connection is idle.
+
+[source]
+----
+ new ConnectionCloseConfig()
+ {
+ Policy = ConnectionClosePolicy.CloseWhenEmptyAndIdle,
+ IdleTime = TimeSpan.FromMilliseconds(1000),
+ CheckIdleTime = TimeSpan.FromMilliseconds(500)
+ });
+----
+
+Note: You can't close the stream systems if there are producers or consumers still running with the `CloseWhenEmptyAndIdle` policy.
+
+
+[[entity-status]]
[[address-resolver]]
===== When a Load Balancer is in Use