Skip to content

Commit

Permalink
release 0.0.205 (#25)
Browse files Browse the repository at this point in the history
* optimize tasks cleanup

* ignore empty bytes on yield

* optimize

---------

Co-authored-by: nggit <[email protected]>
  • Loading branch information
nggit and nggit committed Oct 31, 2023
1 parent 8ec0982 commit a70be32
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 23 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name='tremolo',
version='0.0.204',
version='0.0.205',
license='MIT',
author='nggit',
author_email='[email protected]',
Expand Down
7 changes: 7 additions & 0 deletions tests/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,15 @@ async def upload(content_type=b'application/octet-stream', **server):
@app.route('/upload/multipart')
async def upload_multipart(stream=False, **server):
server['response'].set_content_type(b'text/csv')

# should be ignored
yield b''

yield b'name,length,type,data\r\n'

# should be ignored
yield b''

# stream multipart file upload then send it back as csv
async for info, data in server['request'].files():
yield b'%s,%d,%s,%s\r\n' % (info['name'].encode(),
Expand Down
2 changes: 1 addition & 1 deletion tremolo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.0.204'
__version__ = '0.0.205'

from .tremolo import Tremolo # noqa: E402
from . import exceptions # noqa: E402,F401
Expand Down
2 changes: 1 addition & 1 deletion tremolo/asgi_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ async def send(self, data):
)

if data['type'] == 'http.response.body':
if 'body' in data:
if 'body' in data and data['body'] != b'':
await self.response.write(
data['body'],
chunked=self._http_chunked,
Expand Down
12 changes: 8 additions & 4 deletions tremolo/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,19 @@ async def _handle_response(self, func, options={}):
if options.get('stream', True):
buffer_min_size = None

await self.response.write(data,
rate=options['rate'],
buffer_size=options['buffer_size'],
buffer_min_size=buffer_min_size)
if data != b'':
await self.response.write(data,
rate=options['rate'],
buffer_size=options['buffer_size'],
buffer_min_size=buffer_min_size)

while True:
try:
data = await next_data()

if data == b'':
continue

await self.response.write(
data,
rate=options['rate'],
Expand Down
8 changes: 6 additions & 2 deletions tremolo/lib/h1parser/parse_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ def remove(self, *args):
if not args:
return self

for i, v in enumerate(self._headers):
if v[0] in args:
i = len(self._headers)

while i > 0:
i -= 1

if self._headers[i][0] in args:
del self._headers[i]

return self
Expand Down
31 changes: 17 additions & 14 deletions tremolo/lib/http_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,12 @@ def connection_made(self, transport):
self._header_buf = bytearray()
self._waiters['request'] = self._loop.create_future()

self.tasks.extend([
self._loop.create_task(self._send_data()).cancel,
self._loop.create_task(self.set_timeout(
self._waiters['request'],
timeout=self._options['request_timeout'],
timeout_cb=self.request_timeout))
])
self.tasks.append(self._loop.create_task(self._send_data()).cancel)
self.tasks.append(self._loop.create_task(self.set_timeout(
self._waiters['request'],
timeout=self._options['request_timeout'],
timeout_cb=self.request_timeout))
)

async def request_timeout(self, timeout):
self._logger.info('request timeout after %gs' % timeout)
Expand Down Expand Up @@ -442,17 +441,21 @@ def _handle_keepalive(self):
)
return

for task in self.tasks[:]:
if callable(task):
i = len(self.tasks)

while i > 0:
i -= 1

if callable(self.tasks[i]):
continue

try:
exc = task.exception()
exc = self.tasks[i].exception()

if exc:
self.print_exception(exc)

self.tasks.remove(task)
del self.tasks[i]
except asyncio.InvalidStateError:
pass

Expand Down Expand Up @@ -480,7 +483,9 @@ def connection_lost(self, exc):
if self in self._options['_connections']:
del self._options['_connections'][self]

for task in self.tasks:
while self.tasks:
task = self.tasks.pop()

try:
if callable(task):
# even if you put callable objects in self.tasks,
Expand All @@ -498,8 +503,6 @@ def connection_lost(self, exc):
except Exception as exc:
self.print_exception(exc)

self.tasks.clear()

for queue in self._queue:
if not queue.clear():
break
Expand Down

0 comments on commit a70be32

Please sign in to comment.