Skip to content

Commit

Permalink
Add Connection Pool Close configuration (#389)
Browse files Browse the repository at this point in the history
* Add Connection Pool Close configuration. It is possible to configure the close configuration 

- ConnectionClosePolicy.CloseWhenEmptyAndIdle
- ConnectionClosePolicy.CloseWhenEmpty


---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
Co-authored-by: Luke Bakken <[email protected]>
  • Loading branch information
Gsantomaggio and lukebakken authored Aug 28, 2024
1 parent dc5c88d commit b648639
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 56 deletions.
11 changes: 10 additions & 1 deletion RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ internal static bool IsAKnownException(Exception exception)
return x.Any();
}

return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException or OperationCanceledException) ||
return exception is (SocketException or TimeoutException or LeaderNotFoundException
or InvalidOperationException or OperationCanceledException) ||
IsStreamNotAvailable(exception);
}

Expand Down Expand Up @@ -191,4 +192,12 @@ public TooManyConnectionsException(string s)
{
}
}

public class PendingConnectionsException : Exception
{
public PendingConnectionsException(string s)
: base(s)
{
}
}
}
142 changes: 130 additions & 12 deletions RabbitMQ.Stream.Client/ConnectionsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,41 @@

namespace RabbitMQ.Stream.Client;

public enum ConnectionClosePolicy
{
/// <summary>
/// The connection is closed when the last consumer or producer is removed.
/// </summary>
CloseWhenEmpty,

/// <summary>
/// The connection is closed when the last consumer or producer is removed and the connection is not used for a certain time.
/// </summary>
CloseWhenEmptyAndIdle
}

public class ConnectionCloseConfig
{
/// <summary>
/// Policy to close the connection.
/// </summary>

public ConnectionClosePolicy Policy { get; set; } = ConnectionClosePolicy.CloseWhenEmpty;

/// <summary>
/// The connection is closed when the last consumer or producer is removed and the connection is not used for a certain time.
/// Idle time is valid only if the policy is CloseWhenEmptyAndIdle.
/// </summary>
public TimeSpan IdleTime { get; set; } = TimeSpan.FromMinutes(5);

/// <summary>
/// Interval to check the idle time.
/// Default is high because the check is done in a separate thread.
/// The filed is internal to help the test.
/// </summary>
internal TimeSpan CheckIdleTime { get; set; } = TimeSpan.FromSeconds(60);
}

public class ConnectionPoolConfig
{
/// <summary>
Expand All @@ -30,6 +65,11 @@ public class ConnectionPoolConfig
/// but it is not the best for performance.
/// </summary>
public byte ProducersPerConnection { get; set; } = 1;

/// <summary>
/// Define the connection close policy.
/// </summary>
public ConnectionCloseConfig ConnectionCloseConfig { get; set; } = new ConnectionCloseConfig();
}

public class LastSecret
Expand Down Expand Up @@ -87,9 +127,10 @@ public bool Available
/// subscriptionIds
/// publisherIds
/// </summary>
public class ConnectionsPool
public class ConnectionsPool : IDisposable
{
private static readonly object s_lock = new();
private bool _isRunning = false;

internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
{
Expand Down Expand Up @@ -127,16 +168,56 @@ internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
private readonly byte _idsPerConnection;
private readonly SemaphoreSlim _semaphoreSlim = new(1, 1);
private readonly LastSecret _lastSecret = new();
private readonly Task _checkIdleConnectionTimeTask;

/// <summary>
/// Init the pool with the max connections and the max ids per connection
/// </summary>
/// <param name="maxConnections"> The max connections are allowed for session</param>
/// <param name="idsPerConnection"> The max ids per Connection</param>
public ConnectionsPool(int maxConnections, byte idsPerConnection)
/// <param name="connectionCloseConfig"> Policy to close the connections in the pool</param>
public ConnectionsPool(int maxConnections, byte idsPerConnection, ConnectionCloseConfig connectionCloseConfig)
{
_maxConnections = maxConnections;
_idsPerConnection = idsPerConnection;
ConnectionPoolConfig = connectionCloseConfig;
_isRunning = true;
if (ConnectionPoolConfig.Policy == ConnectionClosePolicy.CloseWhenEmptyAndIdle)
{
_checkIdleConnectionTimeTask = Task.Run(CheckIdleConnectionTime);
}
}

private ConnectionCloseConfig ConnectionPoolConfig { get; }

private async Task CheckIdleConnectionTime()
{
while (_isRunning)
{
await Task.Delay(ConnectionPoolConfig.CheckIdleTime)
.ConfigureAwait(false);

if (!_isRunning)
{
var now = DateTime.UtcNow;
var connectionItems = Connections.Values.ToList();
foreach (var connectionItem in connectionItems.Where(connectionItem =>
connectionItem.EntitiesCount == 0 &&
connectionItem.LastUsed.Add(ConnectionPoolConfig.IdleTime) < now))
{
CloseItemAndConnection("Idle connection", connectionItem);
}
}
else
{
var connectionItems = Connections.Values.ToList();
foreach (var connectionItem in connectionItems.Where(
connectionItem => connectionItem.EntitiesCount == 0))
{
CloseItemAndConnection("Idle connection", connectionItem);
}
}
}
}

/// <summary>
Expand Down Expand Up @@ -208,10 +289,7 @@ public bool TryMergeClientParameters(ClientParameters clientParameters, out Clie
return false;
}

cp = clientParameters with
{
Password = _lastSecret.Secret
};
cp = clientParameters with { Password = _lastSecret.Secret };
return true;
}

Expand Down Expand Up @@ -264,20 +342,31 @@ public void MaybeClose(string clientId, string reason)
return;
}

// close the connection
connectionItem.Client.Close(reason);
connectionItem.LastUsed = DateTime.UtcNow;

// remove the connection from the pool
// it means that the connection is closed
// we don't care if it is called two times for the same connection
Connections.TryRemove(clientId, out _);
if (ConnectionPoolConfig.Policy == ConnectionClosePolicy.CloseWhenEmpty)
{
CloseItemAndConnection(reason, connectionItem);
}
}
finally
{
_semaphoreSlim.Release();
}
}

private void CloseItemAndConnection(string reason, ConnectionItem connectionItem)
{
// close the connection
connectionItem.Client.Close(reason);
// remove the connection from the pool
// it means that the connection is closed
// we don't care if it is called two times for the same connection
Connections.TryRemove(connectionItem.Client.ClientId, out _);
}

internal int PendingConnections => Connections.Values.Count(x => x.EntitiesCount > 0);

/// <summary>
/// Removes the consumer entity from the client.
/// When the metadata update is called we need to remove the consumer entity from the client.
Expand Down Expand Up @@ -328,4 +417,33 @@ public void RemoveProducerEntityFromStream(string clientId, byte id, string stre
}

public int ConnectionsCount => Connections.Count;

public async Task Close()
{
// The pool can't be closed if there are pending connections with the policy: CloseWhenEmptyAndIdle
// else there is no way to close the pending connections.
// The user needs to close the pending connections before to close the pool.
// At the moment when the pool is closed the pending connections are not closed with CloseWhenEmpty
// because the pool is not strictly bound to the stream system.
// The StreamSystem doesn't close the connections when it is closed. That was by design
// We could consider (Version 2.0) to close all the Producers and Consumers and their connection when the StreamSystem is closed.
// Other clients like Java and Golang close the connections when the Environment (alias StreamSystem) is closed.
if (PendingConnections > 0 && ConnectionPoolConfig.Policy == ConnectionClosePolicy.CloseWhenEmptyAndIdle)
{
throw new PendingConnectionsException(
$"There are {PendingConnections} pending connections. With the policy CloseWhenEmptyAndIdle you need to close them");
}

_isRunning = false;
if (_checkIdleConnectionTimeTask is not null)
{
await _checkIdleConnectionTimeTask.ConfigureAwait(false);
}
}

public void Dispose()
{
_semaphoreSlim.Dispose();
GC.SuppressFinalize(this);
}
}
17 changes: 16 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Clie
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.get -> System.TimeSpan
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.set -> void
RabbitMQ.Stream.Client.Connection.UpdateCloseStatus(string reason) -> void
RabbitMQ.Stream.Client.ConnectionCloseConfig
RabbitMQ.Stream.Client.ConnectionCloseConfig.ConnectionCloseConfig() -> void
RabbitMQ.Stream.Client.ConnectionCloseConfig.IdleTime.get -> System.TimeSpan
RabbitMQ.Stream.Client.ConnectionCloseConfig.IdleTime.set -> void
RabbitMQ.Stream.Client.ConnectionCloseConfig.Policy.get -> RabbitMQ.Stream.Client.ConnectionClosePolicy
RabbitMQ.Stream.Client.ConnectionCloseConfig.Policy.set -> void
RabbitMQ.Stream.Client.ConnectionClosePolicy
RabbitMQ.Stream.Client.ConnectionClosePolicy.CloseWhenEmpty = 0 -> RabbitMQ.Stream.Client.ConnectionClosePolicy
RabbitMQ.Stream.Client.ConnectionClosePolicy.CloseWhenEmptyAndIdle = 1 -> RabbitMQ.Stream.Client.ConnectionClosePolicy
RabbitMQ.Stream.Client.ConnectionItem
RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool
RabbitMQ.Stream.Client.ConnectionItem.BrokerInfo.get -> string
Expand All @@ -67,14 +76,18 @@ RabbitMQ.Stream.Client.ConnectionItem.IdsPerConnection.get -> byte
RabbitMQ.Stream.Client.ConnectionItem.LastUsed.get -> System.DateTime
RabbitMQ.Stream.Client.ConnectionItem.LastUsed.set -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionCloseConfig.get -> RabbitMQ.Stream.Client.ConnectionCloseConfig
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionCloseConfig.set -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionPoolConfig() -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConsumersPerConnection.get -> byte
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConsumersPerConnection.set -> void
RabbitMQ.Stream.Client.ConnectionPoolConfig.ProducersPerConnection.get -> byte
RabbitMQ.Stream.Client.ConnectionPoolConfig.ProducersPerConnection.set -> void
RabbitMQ.Stream.Client.ConnectionsPool
RabbitMQ.Stream.Client.ConnectionsPool.Close() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.ConnectionsPool.ConnectionsCount.get -> int
RabbitMQ.Stream.Client.ConnectionsPool.ConnectionsPool(int maxConnections, byte idsPerConnection) -> void
RabbitMQ.Stream.Client.ConnectionsPool.ConnectionsPool(int maxConnections, byte idsPerConnection, RabbitMQ.Stream.Client.ConnectionCloseConfig connectionCloseConfig) -> void
RabbitMQ.Stream.Client.ConnectionsPool.Dispose() -> void
RabbitMQ.Stream.Client.ConnectionsPool.MaybeClose(string clientId, string reason) -> void
RabbitMQ.Stream.Client.ConnectionsPool.Remove(string clientId) -> void
RabbitMQ.Stream.Client.ConnectionsPool.RemoveConsumerEntityFromStream(string clientId, byte id, string stream) -> void
Expand Down Expand Up @@ -194,6 +207,8 @@ RabbitMQ.Stream.Client.PartitionsSuperStreamSpec
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.Partitions.get -> int
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name) -> void
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name, int partitions) -> void
RabbitMQ.Stream.Client.PendingConnectionsException
RabbitMQ.Stream.Client.PendingConnectionsException.PendingConnectionsException(string s) -> void
RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
Expand Down
23 changes: 20 additions & 3 deletions RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ private StreamSystem(ClientParameters clientParameters, Client client,
_clientParameters = clientParameters;
_client = client;
_logger = logger ?? NullLogger<StreamSystem>.Instance;
// we don't expose the the max connections per producer/consumer
// we don't expose the max connections per producer/consumer
// for the moment. We can expose it in the future if needed
PoolConsumers = new ConnectionsPool(0,
connectionPoolConfig.ConsumersPerConnection);
connectionPoolConfig.ConsumersPerConnection, connectionPoolConfig.ConnectionCloseConfig);

PoolProducers = new ConnectionsPool(0,
connectionPoolConfig.ProducersPerConnection);
connectionPoolConfig.ProducersPerConnection, connectionPoolConfig.ConnectionCloseConfig);
}

public bool IsClosed => _client.IsClosed;
Expand Down Expand Up @@ -139,6 +139,23 @@ public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger
public async Task Close()
{
await _client.Close("system close").ConfigureAwait(false);

try
{
await PoolConsumers.Close()
.ConfigureAwait(false);
await PoolProducers.Close()
.ConfigureAwait(false);
}
catch
{
}
finally
{
PoolConsumers.Dispose();
PoolProducers.Dispose();
}

_logger?.LogDebug("Client Closed");
}

Expand Down
4 changes: 2 additions & 2 deletions Tests/ClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public async void DeclarePublisherShouldReturnErrorCode()

var (publisherId, result) =
await client.DeclarePublisher(publisherRef, "this-stream-does-not-exist", confirmed, errored,
new ConnectionsPool(0, 1));
new ConnectionsPool(0, 1, new ConnectionCloseConfig()));
Assert.Equal(ResponseCode.StreamDoesNotExist, result.ResponseCode);
await client.Close("done");
}
Expand All @@ -124,7 +124,7 @@ public async void DeclareConsumerShouldReturnErrorCode()
var client = await Client.Create(clientParameters);
var (subId, subscribeResponse) = await client.Subscribe(
"this-stream-does-not-exist", new OffsetTypeLast(), 1,
new Dictionary<string, string>(), null, null, new ConnectionsPool(0, 1));
new Dictionary<string, string>(), null, null, new ConnectionsPool(0, 1, new ConnectionCloseConfig()));
Assert.Equal(ResponseCode.StreamDoesNotExist, subscribeResponse.ResponseCode);
await client.Close("done");
}
Expand Down
Loading

0 comments on commit b648639

Please sign in to comment.