Skip to content

Commit

Permalink
Fix ConnectionPool state after async cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
grynko committed Dec 18, 2024
1 parent a173552 commit c28f55c
Show file tree
Hide file tree
Showing 22 changed files with 97 additions and 0 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [Unreleased]

- Fix `ConnectionPool` state after async cancellation.


## Version 1.0.7 (November 15th, 2024)

- Support `proxy=…` configuration on `ConnectionPool()`. (#974)
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_async/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ async def aclose(self) -> None:
async with Trace("close", logger, None, {}):
await self._connection.aclose()

def is_connected(self) -> bool:
return self._connection and self._connection.is_connected()

def is_available(self) -> bool:
if self._connection is None:
# If HTTP/2 support is enabled, and the resulting connection could
Expand Down
4 changes: 4 additions & 0 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,17 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
request_connections = {request.connection for request in self._requests}

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
elif not (connection.is_connected() or connection in request_connections):
# log: "removing garbage connection"
self._connections.remove(connection)
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ async def aclose(self) -> None:
def can_handle_request(self, origin: Origin) -> bool:
return origin == self._origin

def is_connected(self) -> bool:
return not self.is_closed()

def is_available(self) -> bool:
# Note that HTTP/1.1 connections in the "NEW" state are not treated as
# being "available". The control flow which created the connection will
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ async def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int
def can_handle_request(self, origin: Origin) -> bool:
return origin == self._origin

def is_connected(self) -> bool:
return not self.is_closed()

def is_available(self) -> bool:
return (
self._state != HTTPConnectionState.CLOSED
Expand Down
6 changes: 6 additions & 0 deletions httpcore/_async/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ async def aclose(self) -> None:
def info(self) -> str:
return self._connection.info()

def is_connected(self) -> bool:
return self._connection.is_connected()

def is_available(self) -> bool:
return self._connection.is_available()

Expand Down Expand Up @@ -351,6 +354,9 @@ async def aclose(self) -> None:
def info(self) -> str:
return self._connection.info()

def is_connected(self) -> bool:
return self._connection.is_connected()

def is_available(self) -> bool:
return self._connection.is_available()

Expand Down
8 changes: 8 additions & 0 deletions httpcore/_async/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ def info(self) -> str:
def can_handle_request(self, origin: Origin) -> bool:
raise NotImplementedError() # pragma: nocover

def is_connected(self) -> bool:
"""
Return `True` if the connection is open.
Beware that for some implementations `is_connected() != not is_closed()`.
"""
raise NotImplementedError() # pragma: nocover

def is_available(self) -> bool:
"""
Return `True` if the connection is currently able to accept an
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_async/socks_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ async def aclose(self) -> None:
if self._connection is not None:
await self._connection.aclose()

def is_connected(self) -> bool:
return self._connection and self._connection.is_connected()

def is_available(self) -> bool:
if self._connection is None: # pragma: nocover
# If HTTP/2 support is enabled, and the resulting connection could
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ def close(self) -> None:
with Trace("close", logger, None, {}):
self._connection.close()

def is_connected(self) -> bool:
return self._connection and self._connection.is_connected()

def is_available(self) -> bool:
if self._connection is None:
# If HTTP/2 support is enabled, and the resulting connection could
Expand Down
4 changes: 4 additions & 0 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,17 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
request_connections = {request.connection for request in self._requests}

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
elif not (connection.is_connected() or connection in request_connections):
# log: "removing garbage connection"
self._connections.remove(connection)
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ def close(self) -> None:
def can_handle_request(self, origin: Origin) -> bool:
return origin == self._origin

def is_connected(self) -> bool:
return not self.is_closed()

def is_available(self) -> bool:
# Note that HTTP/1.1 connections in the "NEW" state are not treated as
# being "available". The control flow which created the connection will
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int:
def can_handle_request(self, origin: Origin) -> bool:
return origin == self._origin

def is_connected(self) -> bool:
return not self.is_closed()

def is_available(self) -> bool:
return (
self._state != HTTPConnectionState.CLOSED
Expand Down
6 changes: 6 additions & 0 deletions httpcore/_sync/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ def close(self) -> None:
def info(self) -> str:
return self._connection.info()

def is_connected(self) -> bool:
return self._connection.is_connected()

def is_available(self) -> bool:
return self._connection.is_available()

Expand Down Expand Up @@ -351,6 +354,9 @@ def close(self) -> None:
def info(self) -> str:
return self._connection.info()

def is_connected(self) -> bool:
return self._connection.is_connected()

def is_available(self) -> bool:
return self._connection.is_available()

Expand Down
8 changes: 8 additions & 0 deletions httpcore/_sync/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ def info(self) -> str:
def can_handle_request(self, origin: Origin) -> bool:
raise NotImplementedError() # pragma: nocover

def is_connected(self) -> bool:
"""
Return `True` if the connection is open.
Beware that for some implementations `is_connected() != not is_closed()`.
"""
raise NotImplementedError() # pragma: nocover

def is_available(self) -> bool:
"""
Return `True` if the connection is currently able to accept an
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_sync/socks_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ def close(self) -> None:
if self._connection is not None:
self._connection.close()

def is_connected(self) -> bool:
return self._connection and self._connection.is_connected()

def is_available(self) -> bool:
if self._connection is None: # pragma: nocover
# If HTTP/2 support is enabled, and the resulting connection could
Expand Down
2 changes: 2 additions & 0 deletions tests/_async/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async def test_http_connection():
origin=origin, network_backend=network_backend, keepalive_expiry=5.0
) as conn:
assert not conn.is_idle()
assert not conn.is_connected()
assert not conn.is_closed()
assert not conn.is_available()
assert not conn.has_expired()
Expand All @@ -52,6 +53,7 @@ async def test_http_connection():
assert response.content == b"Hello, world!"

assert conn.is_idle()
assert conn.is_connected()
assert not conn.is_closed()
assert conn.is_available()
assert not conn.has_expired()
Expand Down
1 change: 1 addition & 0 deletions tests/_async/test_http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async def test_http11_connection():
assert response.content == b"Hello, world!"

assert conn.is_idle()
assert conn.is_connected()
assert not conn.is_closed()
assert conn.is_available()
assert not conn.has_expired()
Expand Down
1 change: 1 addition & 0 deletions tests/_async/test_http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async def test_http2_connection():

assert conn.is_idle()
assert conn.is_available()
assert conn.is_connected()
assert not conn.is_closed()
assert not conn.has_expired()
assert (
Expand Down
2 changes: 2 additions & 0 deletions tests/_sync/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def test_http_connection():
origin=origin, network_backend=network_backend, keepalive_expiry=5.0
) as conn:
assert not conn.is_idle()
assert not conn.is_connected()
assert not conn.is_closed()
assert not conn.is_available()
assert not conn.has_expired()
Expand All @@ -52,6 +53,7 @@ def test_http_connection():
assert response.content == b"Hello, world!"

assert conn.is_idle()
assert conn.is_connected()
assert not conn.is_closed()
assert conn.is_available()
assert not conn.has_expired()
Expand Down
1 change: 1 addition & 0 deletions tests/_sync/test_http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def test_http11_connection():
assert response.content == b"Hello, world!"

assert conn.is_idle()
assert conn.is_connected()
assert not conn.is_closed()
assert conn.is_available()
assert not conn.has_expired()
Expand Down
1 change: 1 addition & 0 deletions tests/_sync/test_http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_http2_connection():

assert conn.is_idle()
assert conn.is_available()
assert conn.is_connected()
assert not conn.is_closed()
assert not conn.has_expired()
assert (
Expand Down
24 changes: 24 additions & 0 deletions tests/test_cancellations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typing
from unittest.mock import patch

import anyio
import hpack
Expand Down Expand Up @@ -135,6 +136,29 @@ async def test_connection_pool_timeout_during_response():
assert not pool.connections


@pytest.mark.anyio
async def test_connection_pool_cancellation_during_waiting_for_connection():
"""
A cancellation of ongoing request waiting for a connection should leave
the pool in a consistent state.
In this case, that means the connection will become closed, and no
longer remain in the pool.
"""

async def wait_for_connection(self, *args, **kwargs):
await anyio.sleep(999)

with patch(
"httpcore._async.connection_pool.AsyncPoolRequest.wait_for_connection",
new=wait_for_connection,
):
async with httpcore.AsyncConnectionPool() as pool:
with anyio.move_on_after(0.01):
await pool.request("GET", "http://example.com")
assert not pool.connections


@pytest.mark.anyio
async def test_h11_timeout_during_request():
"""
Expand Down

0 comments on commit c28f55c

Please sign in to comment.