Skip to content

Commit

Permalink
feat: add ayncio server
Browse files Browse the repository at this point in the history
  • Loading branch information
Neng Wan committed Oct 9, 2024
1 parent de92a75 commit 2505228
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
[pytest]
addopts = --strict-markers -vvl --cov=sea --cov-report=term-missing --cov-fail-under=85
markers =
asyncio: mark a test as an asyncio test.
2 changes: 1 addition & 1 deletion sea/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def middlewares(self):
return rv

def _register_servicer(self, servicer):
"""register serviser
"""register servicer
:param servicer: servicer
"""
Expand Down
13 changes: 10 additions & 3 deletions sea/cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,26 @@
"--worker_mode",
required=False,
action="store",
help="Worker mode. threading|multiprocessing",
help="Worker mode. threading|multiprocessing|asycio",
)
def server(worker_mode):
worker_mode = worker_mode or current_app.config["GRPC_WORKER_MODE"]
if worker_mode == "threading":
from sea.server.threading import Server

s = Server(current_app)
else:
s.run()
elif worker_mode == "multiprocessing":
from sea.server.multiprocessing import Server

s = Server(current_app)
s.run()
s.run()
else:
import asyncio
from sea.server.asyncio import Server

s = Server(current_app)
asyncio.run(s.run())
return 0


Expand Down
61 changes: 61 additions & 0 deletions sea/server/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import signal
import asyncio
from concurrent import futures

import grpc
from grpc_reflection.v1alpha import reflection

from sea import signals


class Server:
"""sea server implements
:param app: application instance
"""

def __init__(self, app):
self.app = app
self.workers = self.app.config["GRPC_WORKERS"]
self.host = self.app.config["GRPC_HOST"]
self.port = self.app.config["GRPC_PORT"]
self.server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=self.workers))
self.server.add_insecure_port("{}:{}".format(self.host, self.port))
self._stopped = False

async def run(self):
self.app.logger.warning("Starting server...")
# run prometheus client
if self.app.config["PROMETHEUS_SCRAPE"]:
from prometheus_client import start_http_server

self.app.logger.warning(f'Starting prometheus client...{self.app.config["PROMETHEUS_PORT"]}')
start_http_server(self.app.config["PROMETHEUS_PORT"])
# register reflection service
if self.app.config.get("GRPC_REFLECTION_SERVICES"):
reflection.enable_server_reflection((reflection.SERVICE_NAME, *self.app.config["GRPC_REFLECTION_SERVICES"]), self.server)
# run grpc server
for _, (add_func, servicer) in self.app.servicers.items():
add_func(servicer(), self.server)
await self.server.start()
signals.server_started.send(self)
self.register_signal()

await self.server.wait_for_termination()
# while not self._stopped:
# await asyncio.sleep(1)
# signals.server_stopped.send(self)
# return True

def register_signal(self):
signal.signal(signal.SIGINT, self._stop_handler)
signal.signal(signal.SIGHUP, self._stop_handler)
signal.signal(signal.SIGTERM, self._stop_handler)
signal.signal(signal.SIGQUIT, self._stop_handler)

async def _stop_handler(self):
self.app.logger.warning("Stopping server...")
grace = self.app.config["GRPC_GRACE"]
await self.server.stop(grace)
await asyncio.sleep(grace or 1)
self._stopped = True
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ prometheus_client
versioneer
pre-commit
gitlint
pytest-asyncio
36 changes: 36 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import threading
import time
from unittest import mock
import pytest
import asyncio

from sea.signals import server_started, server_stopped

Expand Down Expand Up @@ -75,3 +77,37 @@ def kill_later(sec):
assert "stopped!" in content
finally:
os.rmdir("/tmp/prometheus_metrics")


@pytest.mark.asyncio
async def test_asyncio_server(app, logstream):
from sea.server.asyncio import Server

s = Server(app)
assert not s._stopped

def log_started(s):
app.logger.warning("started!")

def log_stopped(s):
app.logger.warning("stopped!")

server_started.connect(log_started)
server_stopped.connect(log_stopped)

async def stop_server_later(sec):
await asyncio.sleep(sec)
await s._stop_handler()
# server_stopped.send(s)

# Run the server and stop it after 3 seconds in parallel
await asyncio.gather(s.run(), stop_server_later(3))

# asyncio.create_task(stop_server_later(3))

# await s.run()
assert s._stopped

content = logstream.getvalue()
assert "started!" in content
# assert "started!" in content and "stopped!" in content

0 comments on commit 2505228

Please sign in to comment.