Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asyncio backend #930

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion docs/network-backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,29 @@ while True:

If we're working with an `async` codebase, then we need to select a different backend.

The `httpcore.AnyIOBackend` is suitable for usage if you're running under `asyncio`. This is a networking backend implemented using [the `anyio` package](https://anyio.readthedocs.io/en/3.x/).
These `async` network backends are available:
- `httpcore.AsyncioBackend` This networking backend is implemented using Pythons native `asyncio`.
tomchristie marked this conversation as resolved.
Show resolved Hide resolved
- `httpcore.AnyIOBackend` This is implemented using [the `anyio` package](https://anyio.readthedocs.io/en/3.x/).
- `httpcore.TrioBackend` This is implemented using [`trio`](https://trio.readthedocs.io/en/stable/).

Currently by default `AnyIOBackend` is used when running with `asyncio` (this may change).
`TrioBackend` is used by default when running with `trio`.

Using `httpcore.AsyncioBackend`:
tomchristie marked this conversation as resolved.
Show resolved Hide resolved
```python
import httpcore
import asyncio

async def main():
network_backend = httpcore.AsyncioBackend()
tomchristie marked this conversation as resolved.
Show resolved Hide resolved
async with httpcore.AsyncConnectionPool(network_backend=network_backend) as http:
response = await http.request('GET', 'https://www.example.com')
print(response)

asyncio.run(main())
```

Using `httpcore.AnyIOBackend`:
```python
import httpcore
import asyncio
Expand Down
2 changes: 2 additions & 0 deletions httpcore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AsyncHTTPProxy,
AsyncSOCKSProxy,
)
from ._backends.asyncio import AsyncIOBackend
from ._backends.base import (
SOCKET_OPTION,
AsyncNetworkBackend,
Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(self, *args, **kwargs): # type: ignore
"SOCKSProxy",
# network backends, implementations
"SyncBackend",
"AsyncIOBackend",
"AnyIOBackend",
"TrioBackend",
# network backends, mock implementations
Expand Down
228 changes: 228 additions & 0 deletions httpcore/_backends/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import asyncio
import socket
import ssl
from typing import Any, Dict, Iterable, Optional, Type

from .._exceptions import (
ConnectError,
ConnectTimeout,
ReadError,
ReadTimeout,
WriteError,
WriteTimeout,
map_exceptions,
)
from .._utils import is_socket_readable
from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream


class AsyncIOStream(AsyncNetworkStream):
def __init__(
self, stream_reader: asyncio.StreamReader, stream_writer: asyncio.StreamWriter
):
self._stream_reader = stream_reader
self._stream_writer = stream_writer
self._read_lock = asyncio.Lock()
self._write_lock = asyncio.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use locking at the stream layer, let's remove these.

(I think we had locking at this layer in the past, the locking is now handled at the HTTP11Connection/HTTP2Connection layer)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, they are now gone

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._read_lock = asyncio.Lock()
self._write_lock = asyncio.Lock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

self._inner: Optional[AsyncIOStream] = None

async def start_tls(
self,
ssl_context: ssl.SSLContext,
server_hostname: Optional[str] = None,
timeout: Optional[float] = None,
) -> AsyncNetworkStream:
loop = asyncio.get_event_loop()

stream_reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(stream_reader)

exc_map: Dict[Type[Exception], Type[Exception]] = {
asyncio.TimeoutError: ConnectTimeout,
OSError: ConnectError,
}
with map_exceptions(exc_map):
transport_ssl = await asyncio.wait_for(
loop.start_tls(
self._stream_writer.transport,
protocol,
ssl_context,
server_hostname=server_hostname,
),
timeout,
)
if transport_ssl is None:
# https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.start_tls
raise ConnectError("Transport closed while starting TLS") # pragma: nocover

# Initialize the protocol, so it is made aware of being tied to
# a TLS connection.
# See: https://github.com/encode/httpx/issues/859
protocol.connection_made(transport_ssl)

stream_writer = asyncio.StreamWriter(
transport=transport_ssl, protocol=protocol, reader=stream_reader, loop=loop
)

ssl_stream = AsyncIOStream(stream_reader, stream_writer)
# When we return a new SocketStream with new StreamReader/StreamWriter instances
# we need to keep references to the old StreamReader/StreamWriter so that they
# are not garbage collected and closed while we're still using them.
ssl_stream._inner = self
return ssl_stream

async def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes:
exc_map: Dict[Type[Exception], Type[Exception]] = {
asyncio.TimeoutError: ReadTimeout,
OSError: ReadError,
}
async with self._read_lock:
with map_exceptions(exc_map):
try:
return await asyncio.wait_for(
self._stream_reader.read(max_bytes), timeout
)
except AttributeError as exc: # pragma: nocover
if "resume_reading" in str(exc):
# Python's asyncio has a bug that can occur when a
# connection has been closed, while it is paused.
# See: https://github.com/encode/httpx/issues/1213
#
# Returning an empty byte-string to indicate connection
# close will eventually raise an httpcore.RemoteProtocolError
# to the user when this goes through our HTTP parsing layer.
return b""
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with self._read_lock:
with map_exceptions(exc_map):
try:
return await asyncio.wait_for(
self._stream_reader.read(max_bytes), timeout
)
except AttributeError as exc: # pragma: nocover
if "resume_reading" in str(exc):
# Python's asyncio has a bug that can occur when a
# connection has been closed, while it is paused.
# See: https://github.com/encode/httpx/issues/1213
#
# Returning an empty byte-string to indicate connection
# close will eventually raise an httpcore.RemoteProtocolError
# to the user when this goes through our HTTP parsing layer.
return b""
raise
with map_exceptions(exc_map):
try:
return await asyncio.wait_for(
self._stream_reader.read(max_bytes), timeout
)
except AttributeError as exc: # pragma: nocover
if "resume_reading" in str(exc):
# Python's asyncio has a bug that can occur when a
# connection has been closed, while it is paused.
# See: https://github.com/encode/httpx/issues/1213
#
# Returning an empty byte-string to indicate connection
# close will eventually raise an httpcore.RemoteProtocolError
# to the user when this goes through our HTTP parsing layer.
return b""
raise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


async def write(self, data: bytes, timeout: Optional[float] = None) -> None:
if not data:
return

exc_map: Dict[Type[Exception], Type[Exception]] = {
asyncio.TimeoutError: WriteTimeout,
OSError: WriteError,
}
async with self._write_lock:
with map_exceptions(exc_map):
self._stream_writer.write(data)
return await asyncio.wait_for(self._stream_writer.drain(), timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with self._write_lock:
with map_exceptions(exc_map):
self._stream_writer.write(data)
return await asyncio.wait_for(self._stream_writer.drain(), timeout)
with map_exceptions(exc_map):
self._stream_writer.write(data)
return await asyncio.wait_for(self._stream_writer.drain(), timeout)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


async def aclose(self) -> None:
# SSL connections should issue the close and then abort, rather than
# waiting for the remote end of the connection to signal the EOF.
#
# See:
#
# * https://bugs.python.org/issue39758
# * https://github.com/python-trio/trio/blob/
# 31e2ae866ad549f1927d45ce073d4f0ea9f12419/trio/_ssl.py#L779-L829
#
# And related issues caused if we simply omit the 'wait_closed' call,
# without first using `.abort()`
#
# * https://github.com/encode/httpx/issues/825
# * https://github.com/encode/httpx/issues/914
is_ssl = self._sslobj is not None

async with self._write_lock:
try:
self._stream_writer.close()
if is_ssl:
# Give the connection a chance to write any data in the buffer,
# and then forcibly tear down the SSL connection.
await asyncio.sleep(0)
self._stream_writer.transport.abort()
await self._stream_writer.wait_closed()
except OSError: # pragma: nocover
pass
tomchristie marked this conversation as resolved.
Show resolved Hide resolved

def get_extra_info(self, info: str) -> Any:
if info == "is_readable":
return is_socket_readable(self._raw_socket)
if info == "ssl_object":
return self._sslobj
if info in ("client_addr", "server_addr"):
sock = self._raw_socket
if sock is None: # pragma: nocover
# TODO replace with an explicit error such as BrokenSocketError
raise ConnectError()
return sock.getsockname() if info == "client_addr" else sock.getpeername()
if info == "socket":
return self._raw_socket
return None

@property
def _raw_socket(self) -> Optional[socket.socket]:
transport = self._stream_writer.transport
sock: Optional[socket.socket] = transport.get_extra_info("socket")
return sock

@property
def _sslobj(self) -> Optional[ssl.SSLObject]:
transport = self._stream_writer.transport
sslobj: Optional[ssl.SSLObject] = transport.get_extra_info("ssl_object")
return sslobj


class AsyncIOBackend(AsyncNetworkBackend):
async def connect_tcp(
self,
host: str,
port: int,
timeout: Optional[float] = None,
local_address: Optional[str] = None,
socket_options: Optional[Iterable[SOCKET_OPTION]] = None,
) -> AsyncNetworkStream:
local_addr = None if local_address is None else (local_address, 0)

exc_map: Dict[Type[Exception], Type[Exception]] = {
asyncio.TimeoutError: ConnectTimeout,
OSError: ConnectError,
}
with map_exceptions(exc_map):
stream_reader, stream_writer = await asyncio.wait_for(
asyncio.open_connection(host, port, local_addr=local_addr),
timeout,
)
self._set_socket_options(stream_writer, socket_options)
return AsyncIOStream(
stream_reader=stream_reader, stream_writer=stream_writer
)

async def connect_unix_socket(
self,
path: str,
timeout: Optional[float] = None,
socket_options: Optional[Iterable[SOCKET_OPTION]] = None,
) -> AsyncNetworkStream:
exc_map: Dict[Type[Exception], Type[Exception]] = {
asyncio.TimeoutError: ConnectTimeout,
OSError: ConnectError,
}
with map_exceptions(exc_map):
stream_reader, stream_writer = await asyncio.wait_for(
asyncio.open_unix_connection(path), timeout
)
self._set_socket_options(stream_writer, socket_options)
return AsyncIOStream(
stream_reader=stream_reader, stream_writer=stream_writer
)

async def sleep(self, seconds: float) -> None:
await asyncio.sleep(seconds) # pragma: nocover

def _set_socket_options(
self,
stream: asyncio.StreamWriter,
socket_options: Optional[Iterable[SOCKET_OPTION]] = None,
) -> None:
if not socket_options:
return

sock = stream.get_extra_info("socket")
if sock is None: # pragma: nocover
# TODO replace with an explicit error such as BrokenSocketError
raise ConnectError()

for option in socket_options:
sock.setsockopt(*option)
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ ignore_missing_imports = true

[tool.pytest.ini_options]
addopts = ["-rxXs", "--strict-config", "--strict-markers"]
markers = ["copied_from(source, changes=None): mark test as copied from somewhere else, along with a description of changes made to accodomate e.g. our test setup"]
markers = [
"copied_from(source, changes=None): mark test as copied from somewhere else, along with a description of changes made to accodomate e.g. our test setup",
"no_auto_backend_patch", # TODO remove this marker once we have a way to define the asyncio backend in AutoBackend
]
filterwarnings = ["error"]

[tool.coverage.run]
Expand Down
84 changes: 84 additions & 0 deletions tests/_async/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import os
import socket
import ssl
from tempfile import gettempdir

import pytest
import uvicorn

import httpcore
from tests.conftest import Server


@pytest.mark.anyio
Expand Down Expand Up @@ -49,3 +54,82 @@ async def test_extra_info(httpbin_secure):
assert invalid is None

stream.get_extra_info("is_readable")


@pytest.mark.anyio
@pytest.mark.parametrize("keep_alive_enabled", [True, False])
async def test_socket_options(
server: Server, server_url: str, keep_alive_enabled: bool
) -> None:
socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, int(keep_alive_enabled))]
async with httpcore.AsyncConnectionPool(socket_options=socket_options) as pool:
response = await pool.request("GET", server_url)
assert response.status == 200

stream = response.extensions["network_stream"]
sock = stream.get_extra_info("socket")
opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE)
assert bool(opt) is keep_alive_enabled


@pytest.mark.anyio
async def test_socket_no_nagle(server: Server, server_url: str) -> None:
async with httpcore.AsyncConnectionPool() as pool:
response = await pool.request("GET", server_url)
assert response.status == 200

stream = response.extensions["network_stream"]
sock = stream.get_extra_info("socket")
opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
assert bool(opt) is True


@pytest.mark.anyio
async def test_pool_recovers_from_connection_breakage(
server_config: uvicorn.Config, server_url: str
) -> None:
async with httpcore.AsyncConnectionPool(
max_connections=1, max_keepalive_connections=1, keepalive_expiry=10
) as pool:
with Server(server_config).run_in_thread():
response = await pool.request("GET", server_url)
assert response.status == 200

assert len(pool.connections) == 1
conn = pool.connections[0]

stream = response.extensions["network_stream"]
assert stream.get_extra_info("is_readable") is False

assert (
stream.get_extra_info("is_readable") is True
), "Should break by coming readable"

with Server(server_config).run_in_thread():
assert len(pool.connections) == 1
assert pool.connections[0] is conn, "Should be the broken connection"

response = await pool.request("GET", server_url)
assert response.status == 200

assert len(pool.connections) == 1
assert pool.connections[0] is not conn, "Should be a new connection"


@pytest.mark.anyio
async def test_unix_domain_socket(server_port, server_config, server_url):
uds = f"{gettempdir()}/test_httpcore_app.sock"
if os.path.exists(uds):
os.remove(uds) # pragma: nocover

uds_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
uds_sock.bind(uds)

with Server(server_config).run_in_thread(sockets=[uds_sock]):
async with httpcore.AsyncConnectionPool(uds=uds) as pool:
response = await pool.request("GET", server_url)
assert response.status == 200
finally:
uds_sock.close()
os.remove(uds)
Loading
Loading