Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove shielding from cancellation process. #927

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added httpcore/.DS_Store
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably don't want this file

Binary file not shown.
42 changes: 26 additions & 16 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
map_exceptions,
)
from .._models import Origin, Request, Response
from .._synchronization import AsyncLock, AsyncShieldCancellation
from .._synchronization import AsyncThreadLock
from .._trace import Trace
from .interfaces import AsyncConnectionInterface

Expand Down Expand Up @@ -62,7 +62,9 @@ def __init__(
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = AsyncLock()
self._state_thread_lock = (
AsyncThreadLock()
) # thread-lock for sync, no-op for async
self._request_count = 0
self._h11_state = h11.Connection(
our_role=h11.CLIENT,
Expand All @@ -76,7 +78,9 @@ async def handle_async_request(self, request: Request) -> Response:
f"to {self._origin}"
)

async with self._state_lock:
with self._state_thread_lock:
# We ensure that state changes at the start and end of a
# request/response cycle are thread-locked.
if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
Expand Down Expand Up @@ -137,9 +141,8 @@ async def handle_async_request(self, request: Request) -> Response:
},
)
except BaseException as exc:
with AsyncShieldCancellation():
async with Trace("response_closed", logger, request) as trace:
await self._response_closed()
if self._connection_should_close():
await self._network_stream.aclose()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't tracing anymore

raise exc

# Sending the request...
Expand Down Expand Up @@ -242,8 +245,12 @@ async def _receive_event(
# mypy fails to narrow the type in the above if statement above
return event # type: ignore[return-value]

async def _response_closed(self) -> None:
async with self._state_lock:
def _connection_should_close(self) -> bool:
# Once the response is complete we either need to move into
# an IDLE or CLOSED state.
with self._state_thread_lock:
# We ensure that state changes at the start and end of a
# request/response cycle are thread-locked.
if (
self._h11_state.our_state is h11.DONE
and self._h11_state.their_state is h11.DONE
Expand All @@ -253,8 +260,10 @@ async def _response_closed(self) -> None:
if self._keepalive_expiry is not None:
now = time.monotonic()
self._expire_at = now + self._keepalive_expiry
else:
await self.aclose()
return False

self._state = HTTPConnectionState.CLOSED
return True
Comment on lines +263 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why returning bool here? all references are trying to close connection, why avoid do it at here?


# Once the connection is no longer required...

Expand Down Expand Up @@ -344,15 +353,16 @@ async def __aiter__(self) -> AsyncIterator[bytes]:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
with AsyncShieldCancellation():
await self.aclose()
if self._connection._connection_should_close():
await self._connection.aclose()
raise exc

async def aclose(self) -> None:
if not self._closed:
self._closed = True
async with Trace("response_closed", logger, self._request):
await self._connection._response_closed()
async with Trace("response_closed", logger, self._request, kwargs={}):
if not self._closed:
self._closed = True
if self._connection._connection_should_close():
Comment on lines +361 to +364
Copy link
Contributor

@MarkusSintonen MarkusSintonen Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the await inside the Trace.__aenter__ cause issues here? Ie can there be a cancellation with the await inside it that would cause a skip of the self._closed-checks and self._connection._connection_should_close()? (Might be a stupid question so sorry about that :D)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is a problem, It's also a problem if the trace function raises, I think you'll need to do this:

# synchronization.py
aclose_forcefully = anyio.aclose_forcefully
def close_forcefully(sock):
    sock.close()
    async def aclose(self):
        entered_cmgr = False
        try:
            async with Trace("response_closed", logger, self._request, kwargs={}):
                entered_cmgr = True
                if not self._closed:
                    self._closed = True
                    if self._connection._connection_should_close():
                        await self._connection.aclose()
        except BaseException:
            if entered_cmgr:
                raise
            if not self._closed:
                self._closed = True
                if self._connection._connection_should_close():
                    await aclose_forcefully(self._connection)
            raise

await self._connection.aclose()


class AsyncHTTP11UpgradeStream(AsyncNetworkStream):
Expand Down
42 changes: 26 additions & 16 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
map_exceptions,
)
from .._models import Origin, Request, Response
from .._synchronization import Lock, ShieldCancellation
from .._synchronization import ThreadLock
from .._trace import Trace
from .interfaces import ConnectionInterface

Expand Down Expand Up @@ -62,7 +62,9 @@ def __init__(
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = Lock()
self._state_thread_lock = (
ThreadLock()
) # thread-lock for sync, no-op for async
self._request_count = 0
self._h11_state = h11.Connection(
our_role=h11.CLIENT,
Expand All @@ -76,7 +78,9 @@ def handle_request(self, request: Request) -> Response:
f"to {self._origin}"
)

with self._state_lock:
with self._state_thread_lock:
# We ensure that state changes at the start and end of a
# request/response cycle are thread-locked.
if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
Expand Down Expand Up @@ -137,9 +141,8 @@ def handle_request(self, request: Request) -> Response:
},
)
except BaseException as exc:
with ShieldCancellation():
with Trace("response_closed", logger, request) as trace:
self._response_closed()
if self._connection_should_close():
self._network_stream.close()
raise exc

# Sending the request...
Expand Down Expand Up @@ -242,8 +245,12 @@ def _receive_event(
# mypy fails to narrow the type in the above if statement above
return event # type: ignore[return-value]

def _response_closed(self) -> None:
with self._state_lock:
def _connection_should_close(self) -> bool:
# Once the response is complete we either need to move into
# an IDLE or CLOSED state.
with self._state_thread_lock:
# We ensure that state changes at the start and end of a
# request/response cycle are thread-locked.
if (
self._h11_state.our_state is h11.DONE
and self._h11_state.their_state is h11.DONE
Expand All @@ -253,8 +260,10 @@ def _response_closed(self) -> None:
if self._keepalive_expiry is not None:
now = time.monotonic()
self._expire_at = now + self._keepalive_expiry
else:
self.close()
return False

self._state = HTTPConnectionState.CLOSED
return True

# Once the connection is no longer required...

Expand Down Expand Up @@ -344,15 +353,16 @@ def __iter__(self) -> Iterator[bytes]:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
with ShieldCancellation():
self.close()
if self._connection._connection_should_close():
self._connection.close()
raise exc

def close(self) -> None:
if not self._closed:
self._closed = True
with Trace("response_closed", logger, self._request):
self._connection._response_closed()
with Trace("response_closed", logger, self._request, kwargs={}):
if not self._closed:
self._closed = True
if self._connection._connection_should_close():
self._connection.close()


class HTTP11UpgradeStream(NetworkStream):
Expand Down
2 changes: 0 additions & 2 deletions tests/_async/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,6 @@ async def trace(name, kwargs):
"http11.send_request_body.complete",
"http11.receive_response_headers.started",
"http11.receive_response_headers.failed",
"http11.response_closed.started",
"http11.response_closed.complete",
]


Expand Down
2 changes: 0 additions & 2 deletions tests/_sync/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,6 @@ def trace(name, kwargs):
"http11.send_request_body.complete",
"http11.receive_response_headers.started",
"http11.receive_response_headers.failed",
"http11.response_closed.started",
"http11.response_closed.complete",
]


Expand Down
Loading