From 54f491c9b9a1d0a4f099d21a473b630bcc89d551 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 14 Nov 2023 15:02:23 +0100 Subject: [PATCH] Add support for multiple streams (#442) --- .../ConnectionPoolModule/ConnectionPool.swift | 6 +- .../PoolStateMachine+ConnectionGroup.swift | 48 +++++- .../PoolStateMachine+ConnectionState.swift | 47 ++++++ .../PoolStateMachine.swift | 46 +++++- .../ConnectionPoolTests.swift | 142 ++++++++++++++++++ 5 files changed, 280 insertions(+), 9 deletions(-) diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index e9c9c4c9..ec865979 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -265,8 +265,10 @@ public final class ConnectionPool< } - public func connection(_ connection: Connection, didReceiveNewMaxStreamSetting: UInt16) { - + public func connectionReceivedNewMaxStreamSetting(_ connection: Connection, newMaxStreamSetting maxStreams: UInt16) { + self.modifyStateAndRunActions { state in + state.stateMachine.connectionReceivedNewMaxStreamSetting(connection.id, newMaxStreamSetting: maxStreams) + } } public func run() async { diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift index fabc3009..0dbca86f 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift @@ -256,6 +256,50 @@ extension PoolStateMachine { return self.connections[index].timerScheduled(timer, cancelContinuation: cancelContinuation) } + // MARK: Changes at runtime + + @usableFromInline + struct NewMaxStreamInfo { + + @usableFromInline + var index: Int + + @usableFromInline + var newMaxStreams: UInt16 + + @usableFromInline + var oldMaxStreams: UInt16 + + @usableFromInline + var usedStreams: UInt16 + + @inlinable + init(index: Int, info: ConnectionState.NewMaxStreamInfo) { + self.index = index + self.newMaxStreams = info.newMaxStreams + self.oldMaxStreams = info.oldMaxStreams + self.usedStreams = info.usedStreams + } + } + + @inlinable + mutating func connectionReceivedNewMaxStreamSetting( + _ connectionID: ConnectionID, + newMaxStreamSetting maxStreams: UInt16 + ) -> NewMaxStreamInfo? { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + return nil + } + + guard let info = self.connections[index].newMaxStreamSetting(maxStreams) else { + return nil + } + + self.stats.availableStreams += maxStreams - info.oldMaxStreams + + return NewMaxStreamInfo(index: index, info: info) + } + // MARK: Leasing and releasing /// Lease a connection, if an idle connection is available. @@ -424,9 +468,9 @@ extension PoolStateMachine { /// Closes the connection at the given index. @inlinable - mutating func closeConnectionIfIdle(at index: Int) -> CloseAction { + mutating func closeConnectionIfIdle(at index: Int) -> CloseAction? { guard let closeAction = self.connections[index].closeIfIdle() else { - preconditionFailure("Invalid state: \(self)") + return nil // apparently the connection isn't idle } self.stats.idle -= 1 diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift index 94196a09..98755ff9 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift @@ -195,6 +195,53 @@ extension PoolStateMachine { } } + @usableFromInline + struct NewMaxStreamInfo { + @usableFromInline + var newMaxStreams: UInt16 + + @usableFromInline + var oldMaxStreams: UInt16 + + @usableFromInline + var usedStreams: UInt16 + + @inlinable + init(newMaxStreams: UInt16, oldMaxStreams: UInt16, usedStreams: UInt16) { + self.newMaxStreams = newMaxStreams + self.oldMaxStreams = oldMaxStreams + self.usedStreams = usedStreams + } + } + + @inlinable + mutating func newMaxStreamSetting(_ newMaxStreams: UInt16) -> NewMaxStreamInfo? { + switch self.state { + case .starting, .backingOff: + preconditionFailure("Invalid state: \(self.state)") + + case .idle(let connection, let oldMaxStreams, let keepAlive, idleTimer: let idleTimer): + self.state = .idle(connection, maxStreams: newMaxStreams, keepAlive: keepAlive, idleTimer: idleTimer) + return NewMaxStreamInfo( + newMaxStreams: newMaxStreams, + oldMaxStreams: oldMaxStreams, + usedStreams: keepAlive.usedStreams + ) + + case .leased(let connection, let usedStreams, let oldMaxStreams, let keepAlive): + self.state = .leased(connection, usedStreams: usedStreams, maxStreams: newMaxStreams, keepAlive: keepAlive) + return NewMaxStreamInfo( + newMaxStreams: newMaxStreams, + oldMaxStreams: oldMaxStreams, + usedStreams: usedStreams + keepAlive.usedStreams + ) + + case .closing, .closed: + return nil + } + } + + @inlinable mutating func parkConnection(scheduleKeepAliveTimer: Bool, scheduleIdleTimeoutTimer: Bool) -> Max2Sequence { var keepAliveTimer: ConnectionTimer? diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index 4484e405..6671460a 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -262,6 +262,39 @@ struct PoolStateMachine< } } + @inlinable + mutating func connectionReceivedNewMaxStreamSetting( + _ connection: ConnectionID, + newMaxStreamSetting maxStreams: UInt16 + ) -> Action { + guard let info = self.connections.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: maxStreams) else { + return .none() + } + + let waitingRequests = self.requestQueue.count + + guard waitingRequests > 0 else { + return .none() + } + + // the only thing we can do if we receive a new max stream setting is check if the new stream + // setting is higher and then dequeue some waiting requests + + guard info.newMaxStreams > info.oldMaxStreams && info.newMaxStreams > info.usedStreams else { + return .none() + } + + let leaseStreams = min(info.newMaxStreams - info.oldMaxStreams, info.newMaxStreams - info.usedStreams, UInt16(clamping: waitingRequests)) + let requests = self.requestQueue.pop(max: leaseStreams) + precondition(Int(leaseStreams) == requests.count) + let leaseResult = self.connections.leaseConnection(at: info.index, streams: leaseStreams) + + return .init( + request: .leaseConnection(requests, leaseResult.connection), + connection: .cancelTimers(.init(leaseResult.timersToCancel)) + ) + } + @inlinable mutating func timerScheduled(_ timer: Timer, cancelContinuation: TimerCancellationToken) -> TimerCancellationToken? { self.connections.timerScheduled(timer.underlying, cancelContinuation: cancelContinuation) @@ -445,11 +478,14 @@ struct PoolStateMachine< } case .overflow: - let closeAction = self.connections.closeConnectionIfIdle(at: index) - return .init( - request: .none, - connection: .closeConnection(closeAction.connection, closeAction.timersToCancel) - ) + if let closeAction = self.connections.closeConnectionIfIdle(at: index) { + return .init( + request: .none, + connection: .closeConnection(closeAction.connection, closeAction.timersToCancel) + ) + } else { + return .none() + } } } diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index d4388893..0ff2bdf7 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -607,6 +607,148 @@ final class ConnectionPoolTests: XCTestCase { } } } + + func testLeasingMultipleStreamsFromOneConnectionWorks() async throws { + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 0 + mutableConfig.maximumConnectionSoftLimit = 1 + mutableConfig.maximumConnectionHardLimit = 10 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionFuture.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + + // create 4 connection requests + let requests = (0..<10).map { ConnectionFuture(id: $0) } + pool.leaseConnections(requests) + var connections = [MockConnection]() + + await factory.nextConnectAttempt { connectionID in + return 10 + } + + for request in requests { + let connection = try await request.future.success + connections.append(connection) + } + + // Ensure that all requests got the same connection + XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1) + + // release all 10 leased streams + for connection in connections { + pool.releaseConnection(connection) + } + + for _ in 0..<9 { + _ = try? await factory.nextConnectAttempt { connectionID in + throw CancellationError() + } + } + + // shutdown + taskGroup.cancelAll() + for connection in factory.runningConnections { + connection.closeIfClosing() + } + } + } + + func testIncreasingAvailableStreamsWorks() async throws { + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 0 + mutableConfig.maximumConnectionSoftLimit = 1 + mutableConfig.maximumConnectionHardLimit = 1 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionFuture.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + + // create 4 connection requests + var requests = (0..<21).map { ConnectionFuture(id: $0) } + pool.leaseConnections(requests) + var connections = [MockConnection]() + + await factory.nextConnectAttempt { connectionID in + return 1 + } + + let connection = try await requests.first!.future.success + connections.append(connection) + requests.removeFirst() + + pool.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: 21) + + for (index, request) in requests.enumerated() { + let connection = try await request.future.success + connections.append(connection) + } + + // Ensure that all requests got the same connection + XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1) + + requests = (22..<42).map { ConnectionFuture(id: $0) } + pool.leaseConnections(requests) + + // release all 21 leased streams in a single call + pool.releaseConnection(connection, streams: 21) + + // ensure all 20 new requests got fulfilled + for request in requests { + let connection = try await request.future.success + connections.append(connection) + } + + // release all 20 leased streams one by one + for _ in requests { + pool.releaseConnection(connection, streams: 1) + } + + // shutdown + taskGroup.cancelAll() + for connection in factory.runningConnections { + connection.closeIfClosing() + } + } + } } struct ConnectionFuture: ConnectionRequestProtocol {