Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ASGI Support #529

Closed
wants to merge 15 commits into from
4 changes: 2 additions & 2 deletions guillotina/api/files.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import mimetypes

from aiohttp.web import StreamResponse
from guillotina.response import StreamResponse
from guillotina import configure
from guillotina._settings import app_settings
from guillotina.api.content import DefaultOPTIONS
Expand Down Expand Up @@ -76,7 +76,7 @@ async def serve_file(self, fi):
filepath = str(fi.file_path.absolute())
filename = fi.file_path.name
with open(filepath, 'rb') as f:
resp = StreamResponse()
resp = StreamResponse(status=200)
resp.content_type, _ = mimetypes.guess_type(filename)

disposition = 'filename="{}"'.format(filename)
Expand Down
7 changes: 6 additions & 1 deletion guillotina/api/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from guillotina._settings import app_settings
from guillotina.api.service import Service
from guillotina.auth.extractors import BasicAuthPolicy
from guillotina.request import GuillotinaRequest
from guillotina.component import get_adapter
from guillotina.component import get_utility
from guillotina.component import query_multi_adapter
Expand Down Expand Up @@ -187,7 +188,11 @@ async def handle_ws_request(self, ws, message):
async def __call__(self):
tm = get_tm(self.request)
await tm.abort(self.request)
ws = web.WebSocketResponse()

if isinstance(self.request, GuillotinaRequest):
ws = self.request.get_ws()
else:
ws = web.WebSocketResponse()
await ws.prepare(self.request)

async for msg in ws:
Expand Down
174 changes: 174 additions & 0 deletions guillotina/asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from aiohttp.streams import EmptyStreamReader
from guillotina.request import GuillotinaRequest
import asyncio
import os
import yaml


def headers_to_list(headers):
return [[k.encode(), v.encode()] for k, v in headers.items()]


class AsgiStreamReader(EmptyStreamReader):
"""Dummy implementation of StreamReader"""

def __init__(self, receive):
self.receive = receive
self.finished = False
self._buffer = b""

async def readany(self):
return await self.read()

async def read(self):
if self.finished:
return b""
payload = await self.receive()
self.finished = not payload.get("more_body", False)
return payload["body"]

async def readexactly(self, n: int) -> bytes:
data = b""

if self._buffer:
data += self._buffer[:n]
self._buffer = self._buffer[n:] # rest

while len(data) < n and not self.finished:
chunk = await self.read()
data += chunk

if len(data) < n:
raise asyncio.IncompleteReadError(data, n)

self._buffer += data[n:]
return data


class AsgiStreamWriter():
"""Dummy implementation of StreamWriter"""

buffer_size = 0
output_size = 0
length = 0 # type: Optional[int]

def __init__(self, send):
self.send = send
self._buffer = asyncio.Queue()
self.eof = False

async def write(self, chunk: bytes) -> None:
"""Write chunk into stream."""
await self._buffer.put(chunk)

async def write_eof(self, chunk: bytes=b'') -> None:
"""Write last chunk."""
await self.write(chunk)
self.eof = True
await self.drain()

async def drain(self) -> None:
"""Flush the write buffer."""
while not self._buffer.empty():
body = await self._buffer.get()
await self.send({
"type": "http.response.body",
"body": body,
"more_body": True
})

if self.eof:
await self.send({
"type": "http.response.body",
"body": b"",
"more_body": False
})

def enable_compression(self, encoding: str='deflate') -> None:
"""Enable HTTP body compression"""
raise NotImplemented()

def enable_chunking(self) -> None:
"""Enable HTTP chunked mode"""
raise NotImplemented()


class AsgiApp:
def __init__(self, config_file, settings, loop):
self.app = None
self.config_file = config_file
self.settings = settings
self.loop = loop

async def __call__(self, scope, receive, send):
if scope["type"] == "http" or scope["type"] == "websocket":
# daphne is not sending `lifespan` event
if not self.app:
self.app = await self.startup()
return await self.handler(scope, receive, send)

elif scope["type"] == "lifespan":
while True:
message = await receive()
if message['type'] == 'lifespan.startup':
self.app = await self.startup()
await send({'type': 'lifespan.startup.complete'})
elif message['type'] == 'lifespan.shutdown':
await self.shutdown()
await send({'type': 'lifespan.shutdown.complete'})
return

async def startup(self):
from guillotina.factory.app import startup_app
return await startup_app(
config_file=self.config_file,
settings=self.settings, loop=self.loop, server_app=self)

async def shutdown(self):
pass

async def handler(self, scope, receive, send):
# Aiohttp compatible StreamReader
payload = AsgiStreamReader(receive)

if scope["type"] == "websocket":
scope["method"] = "GET"

request = GuillotinaRequest(
scope["scheme"],
scope["method"],
scope["path"],
scope["headers"],
payload,
loop=self.loop,
send=send,
scope=scope,
receive=receive
)

# This is to fake IRequest interface
request.record = lambda x: None

route = await self.app.router.resolve(request)
resp = await route.handler(request)

# WS handling after view execution missing here!!!
if scope["type"] != "websocket":
from guillotina.response import StreamResponse

if not isinstance(resp, StreamResponse):
await send(
{
"type": "http.response.start",
"status": resp.status,
"headers": headers_to_list(resp.headers)
}
)
body = resp.text or ""
await send({"type": "http.response.body", "body": body.encode()})


# from guillotina.factory.app import make_asgi_app

# # asgi app singleton
# app = make_asgi_app()
5 changes: 2 additions & 3 deletions guillotina/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from guillotina import logger
from guillotina import profile
from guillotina._settings import app_settings
from guillotina.factory import make_app
from guillotina.tests.utils import get_mocked_request
from guillotina.tests.utils import login
from guillotina.utils import get_dotted_name
Expand Down Expand Up @@ -245,9 +244,9 @@ def signal_handler(self, signal, frame):

def make_app(self, settings):
signal.signal(signal.SIGINT, self.signal_handler)
from guillotina.factory.app import make_app
loop = self.get_loop()
return loop.run_until_complete(
make_app(settings=settings, loop=loop))
return make_app(settings=settings, loop=loop)

def get_parser(self):
parser = argparse.ArgumentParser(description=self.description)
Expand Down
95 changes: 95 additions & 0 deletions guillotina/commands/asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from aiohttp.test_utils import make_mocked_request
from aiohttp.streams import EmptyStreamReader
import asyncio
import multidict
import guillotina
import os
import yaml


def headers_to_list(headers):
return [[k.encode(), v.encode()] for k, v in headers.items()]


class AsgiStreamReader(EmptyStreamReader):
"""Dummy implementation of StreamReader"""

def __init__(self, receive):
self.receive = receive
self.finished = False

async def readany(self):
return await self.read()

async def read(self):
if self.finished:
return b""
payload = await self.receive()
self.finished = True
return payload["body"]


class AsgiApp:
def __init__(self):
self.app = None

async def __call__(self, scope, receive, send):
if scope["type"] == "http":
# daphne is not sending `lifespan` event
if not self.app:
self.app = await self.setup()
return await self.handler(scope, receive, send)

elif scope["type"] == "lifespan":
self.app = await self.setup()

async def setup(self):
# The config file is defined in the env var `CONFIG`
loop = asyncio.get_event_loop()
from guillotina.factory.app import startup_app

config = os.getenv("CONFIG", None)

if not config:
settings = guillotina._settings.default_settings
else:
with open(config, "r") as f:
settings = yaml.load(f, Loader=yaml.FullLoader)
return await startup_app(settings=settings, loop=loop, server_app=self)

async def handler(self, scope, receive, send):
# Copy headers
headers = multidict.CIMultiDict()
raw_headers = scope["headers"]
for key, value in raw_headers:
headers.add(key.decode(), value.decode())

method = scope["method"]
path = scope["path"]

# Aiohttp compatible StreamReader
payload = AsgiStreamReader(receive)

request = make_mocked_request(method, path, headers=headers, payload=payload)

# This is to fake IRequest interface
request.record = lambda x: None
request.__class__ = guillotina.request.Request

route = await self.app.router.resolve(request)
resp = await route.handler(request)

await send(
{
"type": "http.response.start",
"status": resp.status,
"headers": headers_to_list(resp.headers)
}
)

if resp.text:
body = resp.text.encode()
else:
body = b""

await send({"type": "http.response.body", "body": body})
13 changes: 4 additions & 9 deletions guillotina/commands/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from aiohttp import web
import uvicorn
from guillotina.commands import Command

import asyncio
Expand Down Expand Up @@ -41,14 +41,9 @@ def run(self, arguments, settings, app):
'Use `pip install aiohttp_autoreload` to install aiohttp_autoreload.\n'
)
return 1
aiohttp_autoreload.start()

port = arguments.port or settings.get('address', settings.get('port'))
host = arguments.host or settings.get('host', '0.0.0.0')
log_format = settings.get('access_log_format', AccessLogger.LOG_FORMAT)
try:
web.run_app(app, host=host, port=port,
access_log_format=log_format)
except asyncio.CancelledError:
# server shut down, we're good here.
pass
#log_format = settings.get('access_log_format', AccessLogger.LOG_FORMAT)

uvicorn.run(app, host=host, port=port, reload=arguments.reload)
Loading