From c28f55cc68fd44acc27edfa2fc55995d419b4af5 Mon Sep 17 00:00:00 2001 From: Ivan Grynko Date: Wed, 18 Dec 2024 18:43:26 +0200 Subject: [PATCH] Fix ConnectionPool state after async cancellation --- CHANGELOG.md | 5 +++++ httpcore/_async/connection.py | 3 +++ httpcore/_async/connection_pool.py | 4 ++++ httpcore/_async/http11.py | 3 +++ httpcore/_async/http2.py | 3 +++ httpcore/_async/http_proxy.py | 6 ++++++ httpcore/_async/interfaces.py | 8 ++++++++ httpcore/_async/socks_proxy.py | 3 +++ httpcore/_sync/connection.py | 3 +++ httpcore/_sync/connection_pool.py | 4 ++++ httpcore/_sync/http11.py | 3 +++ httpcore/_sync/http2.py | 3 +++ httpcore/_sync/http_proxy.py | 6 ++++++ httpcore/_sync/interfaces.py | 8 ++++++++ httpcore/_sync/socks_proxy.py | 3 +++ tests/_async/test_connection.py | 2 ++ tests/_async/test_http11.py | 1 + tests/_async/test_http2.py | 1 + tests/_sync/test_connection.py | 2 ++ tests/_sync/test_http11.py | 1 + tests/_sync/test_http2.py | 1 + tests/test_cancellations.py | 24 ++++++++++++++++++++++++ 22 files changed, 97 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b46c6c0..07cb3733 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## [Unreleased] + +- Fix `ConnectionPool` state after async cancellation. + + ## Version 1.0.7 (November 15th, 2024) - Support `proxy=…` configuration on `ConnectionPool()`. (#974) diff --git a/httpcore/_async/connection.py b/httpcore/_async/connection.py index b42581df..961ebf97 100644 --- a/httpcore/_async/connection.py +++ b/httpcore/_async/connection.py @@ -172,6 +172,9 @@ async def aclose(self) -> None: async with Trace("close", logger, None, {}): await self._connection.aclose() + def is_connected(self) -> bool: + return self._connection and self._connection.is_connected() + def is_available(self) -> bool: if self._connection is None: # If HTTP/2 support is enabled, and the resulting connection could diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 96e973d0..9079cc11 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -278,6 +278,7 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]: those connections to be handled seperately. """ closing_connections = [] + request_connections = {request.connection for request in self._requests} # First we handle cleaning up any connections that are closed, # have expired their keep-alive, or surplus idle connections. @@ -285,6 +286,9 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]: if connection.is_closed(): # log: "removing closed connection" self._connections.remove(connection) + elif not (connection.is_connected() or connection in request_connections): + # log: "removing garbage connection" + self._connections.remove(connection) elif connection.has_expired(): # log: "closing expired connection" self._connections.remove(connection) diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index e6d6d709..f0a3a975 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -264,6 +264,9 @@ async def aclose(self) -> None: def can_handle_request(self, origin: Origin) -> bool: return origin == self._origin + def is_connected(self) -> bool: + return not self.is_closed() + def is_available(self) -> bool: # Note that HTTP/1.1 connections in the "NEW" state are not treated as # being "available". The control flow which created the connection will diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index c6434a04..0ad3fb18 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -499,6 +499,9 @@ async def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int def can_handle_request(self, origin: Origin) -> bool: return origin == self._origin + def is_connected(self) -> bool: + return not self.is_closed() + def is_available(self) -> bool: return ( self._state != HTTPConnectionState.CLOSED diff --git a/httpcore/_async/http_proxy.py b/httpcore/_async/http_proxy.py index cc9d9206..05da2552 100644 --- a/httpcore/_async/http_proxy.py +++ b/httpcore/_async/http_proxy.py @@ -214,6 +214,9 @@ async def aclose(self) -> None: def info(self) -> str: return self._connection.info() + def is_connected(self) -> bool: + return self._connection.is_connected() + def is_available(self) -> bool: return self._connection.is_available() @@ -351,6 +354,9 @@ async def aclose(self) -> None: def info(self) -> str: return self._connection.info() + def is_connected(self) -> bool: + return self._connection.is_connected() + def is_available(self) -> bool: return self._connection.is_available() diff --git a/httpcore/_async/interfaces.py b/httpcore/_async/interfaces.py index 361583be..e909db43 100644 --- a/httpcore/_async/interfaces.py +++ b/httpcore/_async/interfaces.py @@ -94,6 +94,14 @@ def info(self) -> str: def can_handle_request(self, origin: Origin) -> bool: raise NotImplementedError() # pragma: nocover + def is_connected(self) -> bool: + """ + Return `True` if the connection is open. + + Beware that for some implementations `is_connected() != not is_closed()`. + """ + raise NotImplementedError() # pragma: nocover + def is_available(self) -> bool: """ Return `True` if the connection is currently able to accept an diff --git a/httpcore/_async/socks_proxy.py b/httpcore/_async/socks_proxy.py index b363f55a..5475ee31 100644 --- a/httpcore/_async/socks_proxy.py +++ b/httpcore/_async/socks_proxy.py @@ -305,6 +305,9 @@ async def aclose(self) -> None: if self._connection is not None: await self._connection.aclose() + def is_connected(self) -> bool: + return self._connection and self._connection.is_connected() + def is_available(self) -> bool: if self._connection is None: # pragma: nocover # If HTTP/2 support is enabled, and the resulting connection could diff --git a/httpcore/_sync/connection.py b/httpcore/_sync/connection.py index 363f8be8..2712ea83 100644 --- a/httpcore/_sync/connection.py +++ b/httpcore/_sync/connection.py @@ -172,6 +172,9 @@ def close(self) -> None: with Trace("close", logger, None, {}): self._connection.close() + def is_connected(self) -> bool: + return self._connection and self._connection.is_connected() + def is_available(self) -> bool: if self._connection is None: # If HTTP/2 support is enabled, and the resulting connection could diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 9ccfa53e..7c7e65f8 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -278,6 +278,7 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]: those connections to be handled seperately. """ closing_connections = [] + request_connections = {request.connection for request in self._requests} # First we handle cleaning up any connections that are closed, # have expired their keep-alive, or surplus idle connections. @@ -285,6 +286,9 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]: if connection.is_closed(): # log: "removing closed connection" self._connections.remove(connection) + elif not (connection.is_connected() or connection in request_connections): + # log: "removing garbage connection" + self._connections.remove(connection) elif connection.has_expired(): # log: "closing expired connection" self._connections.remove(connection) diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index ebd3a974..d315e579 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -264,6 +264,9 @@ def close(self) -> None: def can_handle_request(self, origin: Origin) -> bool: return origin == self._origin + def is_connected(self) -> bool: + return not self.is_closed() + def is_available(self) -> bool: # Note that HTTP/1.1 connections in the "NEW" state are not treated as # being "available". The control flow which created the connection will diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index ca4dd724..3933eaaf 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -499,6 +499,9 @@ def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int: def can_handle_request(self, origin: Origin) -> bool: return origin == self._origin + def is_connected(self) -> bool: + return not self.is_closed() + def is_available(self) -> bool: return ( self._state != HTTPConnectionState.CLOSED diff --git a/httpcore/_sync/http_proxy.py b/httpcore/_sync/http_proxy.py index ecca88f7..10e1e77e 100644 --- a/httpcore/_sync/http_proxy.py +++ b/httpcore/_sync/http_proxy.py @@ -214,6 +214,9 @@ def close(self) -> None: def info(self) -> str: return self._connection.info() + def is_connected(self) -> bool: + return self._connection.is_connected() + def is_available(self) -> bool: return self._connection.is_available() @@ -351,6 +354,9 @@ def close(self) -> None: def info(self) -> str: return self._connection.info() + def is_connected(self) -> bool: + return self._connection.is_connected() + def is_available(self) -> bool: return self._connection.is_available() diff --git a/httpcore/_sync/interfaces.py b/httpcore/_sync/interfaces.py index e673d4cc..5fa57a74 100644 --- a/httpcore/_sync/interfaces.py +++ b/httpcore/_sync/interfaces.py @@ -94,6 +94,14 @@ def info(self) -> str: def can_handle_request(self, origin: Origin) -> bool: raise NotImplementedError() # pragma: nocover + def is_connected(self) -> bool: + """ + Return `True` if the connection is open. + + Beware that for some implementations `is_connected() != not is_closed()`. + """ + raise NotImplementedError() # pragma: nocover + def is_available(self) -> bool: """ Return `True` if the connection is currently able to accept an diff --git a/httpcore/_sync/socks_proxy.py b/httpcore/_sync/socks_proxy.py index 0ca96ddf..6b97ddc0 100644 --- a/httpcore/_sync/socks_proxy.py +++ b/httpcore/_sync/socks_proxy.py @@ -305,6 +305,9 @@ def close(self) -> None: if self._connection is not None: self._connection.close() + def is_connected(self) -> bool: + return self._connection and self._connection.is_connected() + def is_available(self) -> bool: if self._connection is None: # pragma: nocover # If HTTP/2 support is enabled, and the resulting connection could diff --git a/tests/_async/test_connection.py b/tests/_async/test_connection.py index b6ee0c7e..efd17e81 100644 --- a/tests/_async/test_connection.py +++ b/tests/_async/test_connection.py @@ -36,6 +36,7 @@ async def test_http_connection(): origin=origin, network_backend=network_backend, keepalive_expiry=5.0 ) as conn: assert not conn.is_idle() + assert not conn.is_connected() assert not conn.is_closed() assert not conn.is_available() assert not conn.has_expired() @@ -52,6 +53,7 @@ async def test_http_connection(): assert response.content == b"Hello, world!" assert conn.is_idle() + assert conn.is_connected() assert not conn.is_closed() assert conn.is_available() assert not conn.has_expired() diff --git a/tests/_async/test_http11.py b/tests/_async/test_http11.py index 94f2febf..f689447e 100644 --- a/tests/_async/test_http11.py +++ b/tests/_async/test_http11.py @@ -23,6 +23,7 @@ async def test_http11_connection(): assert response.content == b"Hello, world!" assert conn.is_idle() + assert conn.is_connected() assert not conn.is_closed() assert conn.is_available() assert not conn.has_expired() diff --git a/tests/_async/test_http2.py b/tests/_async/test_http2.py index b4ec6648..a2734ef2 100644 --- a/tests/_async/test_http2.py +++ b/tests/_async/test_http2.py @@ -35,6 +35,7 @@ async def test_http2_connection(): assert conn.is_idle() assert conn.is_available() + assert conn.is_connected() assert not conn.is_closed() assert not conn.has_expired() assert ( diff --git a/tests/_sync/test_connection.py b/tests/_sync/test_connection.py index 37c82e02..21f72f11 100644 --- a/tests/_sync/test_connection.py +++ b/tests/_sync/test_connection.py @@ -36,6 +36,7 @@ def test_http_connection(): origin=origin, network_backend=network_backend, keepalive_expiry=5.0 ) as conn: assert not conn.is_idle() + assert not conn.is_connected() assert not conn.is_closed() assert not conn.is_available() assert not conn.has_expired() @@ -52,6 +53,7 @@ def test_http_connection(): assert response.content == b"Hello, world!" assert conn.is_idle() + assert conn.is_connected() assert not conn.is_closed() assert conn.is_available() assert not conn.has_expired() diff --git a/tests/_sync/test_http11.py b/tests/_sync/test_http11.py index f2fa28f4..5e9bcecd 100644 --- a/tests/_sync/test_http11.py +++ b/tests/_sync/test_http11.py @@ -23,6 +23,7 @@ def test_http11_connection(): assert response.content == b"Hello, world!" assert conn.is_idle() + assert conn.is_connected() assert not conn.is_closed() assert conn.is_available() assert not conn.has_expired() diff --git a/tests/_sync/test_http2.py b/tests/_sync/test_http2.py index 695359bd..78d0a6af 100644 --- a/tests/_sync/test_http2.py +++ b/tests/_sync/test_http2.py @@ -35,6 +35,7 @@ def test_http2_connection(): assert conn.is_idle() assert conn.is_available() + assert conn.is_connected() assert not conn.is_closed() assert not conn.has_expired() assert ( diff --git a/tests/test_cancellations.py b/tests/test_cancellations.py index 033acef6..5c00c602 100644 --- a/tests/test_cancellations.py +++ b/tests/test_cancellations.py @@ -1,4 +1,5 @@ import typing +from unittest.mock import patch import anyio import hpack @@ -135,6 +136,29 @@ async def test_connection_pool_timeout_during_response(): assert not pool.connections +@pytest.mark.anyio +async def test_connection_pool_cancellation_during_waiting_for_connection(): + """ + A cancellation of ongoing request waiting for a connection should leave + the pool in a consistent state. + + In this case, that means the connection will become closed, and no + longer remain in the pool. + """ + + async def wait_for_connection(self, *args, **kwargs): + await anyio.sleep(999) + + with patch( + "httpcore._async.connection_pool.AsyncPoolRequest.wait_for_connection", + new=wait_for_connection, + ): + async with httpcore.AsyncConnectionPool() as pool: + with anyio.move_on_after(0.01): + await pool.request("GET", "http://example.com") + assert not pool.connections + + @pytest.mark.anyio async def test_h11_timeout_during_request(): """