Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Land ConnectionPool #428

Merged
merged 7 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
484 changes: 483 additions & 1 deletion Sources/ConnectionPoolModule/ConnectionPool.swift

Large diffs are not rendered by default.

53 changes: 53 additions & 0 deletions Sources/ConnectionPoolModule/ConnectionRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,56 @@ public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequest
self.continuation.resume(with: result)
}
}

fileprivate let requestIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator()

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension ConnectionPool where Request == ConnectionRequest<Connection> {
public convenience init(
configuration: ConnectionPoolConfiguration,
idGenerator: ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator(),
keepAliveBehavior: KeepAliveBehavior,
observabilityDelegate: ObservabilityDelegate,
clock: Clock = ContinuousClock(),
connectionFactory: @escaping ConnectionFactory
) {
self.init(
configuration: configuration,
idGenerator: idGenerator,
requestType: ConnectionRequest<Connection>.self,
keepAliveBehavior: keepAliveBehavior,
observabilityDelegate: observabilityDelegate,
clock: clock,
connectionFactory: connectionFactory
)
}

public func leaseConnection() async throws -> Connection {
let requestID = requestIDGenerator.next()

let connection = try await withTaskCancellationHandler {
if Task.isCancelled {
throw CancellationError()
}

return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Connection, Error>) in
let request = Request(
id: requestID,
continuation: continuation
)

self.leaseConnection(request)
}
} onCancel: {
self.cancelLeaseConnection(requestID)
}

return connection
}

public func withConnection<Result>(_ closure: (Connection) async throws -> Result) async throws -> Result {
let connection = try await self.leaseConnection()
defer { self.releaseConnection(connection) }
return try await closure(connection)
}
}
46 changes: 46 additions & 0 deletions Sources/ConnectionPoolModule/NIOLockedValueBox.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Implementation vendored from SwiftNIO:
// https://github.com/apple/swift-nio

//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// Provides locked access to `Value`.
///
/// - note: ``NIOLockedValueBox`` has reference semantics and holds the `Value`
/// alongside a lock behind a reference.
///
/// This is no different than creating a ``Lock`` and protecting all
/// accesses to a value using the lock. But it's easy to forget to actually
/// acquire/release the lock in the correct place. ``NIOLockedValueBox`` makes
/// that much easier.
@usableFromInline
struct NIOLockedValueBox<Value> {

@usableFromInline
internal let _storage: LockStorage<Value>

/// Initialize the `Value`.
@inlinable
init(_ value: Value) {
self._storage = .create(value: value)
}

/// Access the `Value`, allowing mutation of it.
@inlinable
func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
return try self._storage.withLockedValue(mutate)
}
}

extension NIOLockedValueBox: Sendable where Value: Sendable {}
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,9 @@ extension PoolStateMachine {
/// Call ``leaseConnection(at:)`` or ``closeConnection(at:)`` with the supplied index after
/// this. If you want to park the connection no further call is required.
@inlinable
mutating func releaseConnection(_ connectionID: Connection.ID, streams: UInt16) -> (Int, AvailableConnectionContext) {
mutating func releaseConnection(_ connectionID: Connection.ID, streams: UInt16) -> (Int, AvailableConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
preconditionFailure("A connection that we don't know was released? Something is very wrong...")
return nil
}

let connectionInfo = self.connections[index].release(streams: streams)
Expand Down Expand Up @@ -657,3 +657,6 @@ extension PoolStateMachine {

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.ConnectionGroup.BackoffDoneAction: Equatable where TimerCancellationToken: Equatable {}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.ConnectionGroup.ClosedAction: Equatable where TimerCancellationToken: Equatable {}
61 changes: 40 additions & 21 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ struct PoolStateMachine<

@inlinable
mutating func releaseConnection(_ connection: Connection, streams: UInt16) -> Action {
let (index, context) = self.connections.releaseConnection(connection.id, streams: streams)
guard let (index, context) = self.connections.releaseConnection(connection.id, streams: streams) else {
return .none()
}
return self.handleAvailableConnection(index: index, availableContext: context)
}

Expand All @@ -251,8 +253,13 @@ struct PoolStateMachine<

@inlinable
mutating func connectionEstablished(_ connection: Connection, maxStreams: UInt16) -> Action {
let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams)
return self.handleAvailableConnection(index: index, availableContext: context)
switch self.poolState {
case .running, .shuttingDown(graceful: true):
let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams)
return self.handleAvailableConnection(index: index, availableContext: context)
case .shuttingDown(graceful: false), .shutDown:
return .init(request: .none, connection: .closeConnection(connection, []))
}
}

@inlinable
Expand All @@ -274,31 +281,43 @@ struct PoolStateMachine<

@inlinable
mutating func connectionEstablishFailed(_ error: Error, for request: ConnectionRequest) -> Action {
self.failedConsecutiveConnectionAttempts += 1
switch self.poolState {
case .running, .shuttingDown(graceful: true):
self.failedConsecutiveConnectionAttempts += 1

let connectionTimer = self.connections.backoffNextConnectionAttempt(request.connectionID)
let backoff = Self.calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts)
let timer = Timer(connectionTimer, duration: backoff)
return .init(request: .none, connection: .scheduleTimers(.init(timer)))
let connectionTimer = self.connections.backoffNextConnectionAttempt(request.connectionID)
let backoff = Self.calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts)
let timer = Timer(connectionTimer, duration: backoff)
return .init(request: .none, connection: .scheduleTimers(.init(timer)))

case .shuttingDown(graceful: false), .shutDown:
return .none()
}
}

@inlinable
mutating func connectionCreationBackoffDone(_ connectionID: ConnectionID) -> Action {
let soonAvailable = self.connections.soonAvailableConnections
let retry = (soonAvailable - 1) < self.requestQueue.count

switch self.connections.backoffDone(connectionID, retry: retry) {
case .createConnection(let request, let continuation):
let timers: TinyFastSequence<TimerCancellationToken>
if let continuation {
timers = .init(element: continuation)
} else {
timers = .init()
switch self.poolState {
case .running, .shuttingDown(graceful: true):
let soonAvailable = self.connections.soonAvailableConnections
let retry = (soonAvailable - 1) < self.requestQueue.count

switch self.connections.backoffDone(connectionID, retry: retry) {
case .createConnection(let request, let continuation):
let timers: TinyFastSequence<TimerCancellationToken>
if let continuation {
timers = .init(element: continuation)
} else {
timers = .init()
}
return .init(request: .none, connection: .makeConnection(request, timers))

case .cancelTimers(let timers):
return .init(request: .none, connection: .cancelTimers(.init(timers)))
}
return .init(request: .none, connection: .makeConnection(request, timers))

case .cancelTimers(let timers):
return .init(request: .none, connection: .cancelTimers(.init(timers)))
case .shuttingDown(graceful: false), .shutDown:
return .none()
}
}

Expand Down
Loading