Skip to content

Commit

Permalink
Add option to keep workers during manager shutdown. Closes #283
Browse files Browse the repository at this point in the history
  • Loading branch information
umesh-timalsina committed Oct 15, 2023
1 parent 6f81a7a commit 8a79a15
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 19 deletions.
12 changes: 8 additions & 4 deletions chimerapy/engine/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,16 @@ async def async_collect(self) -> bool:
async def async_reset(self, keep_workers: bool = True):
return await self.worker_handler.reset(keep_workers)

async def async_shutdown(self) -> bool:
async def async_shutdown(self, shutdown_workers=True) -> bool:

# Only let shutdown happen once
if self.has_shutdown:
# logger.debug(f"{self}: requested to shutdown twice, skipping.")
return True

await self.eventbus.asend(Event("shutdown"))
await self.eventbus.asend(
Event("shutdown", {"shutdown_workers": shutdown_workers})
)
self.has_shutdown = True

return True
Expand Down Expand Up @@ -489,14 +491,16 @@ def reset(

return future

def shutdown(self, blocking: bool = True) -> Union[bool, Future[bool]]:
def shutdown(
self, blocking: bool = True, shutdown_workers: bool = True
) -> Union[bool, Future[bool]]:
"""Proper shutting down ChimeraPy-Engine cluster.
Through this method, the ``Manager`` broadcast to all ``Workers``
to shutdown, in which they will stop their processes and threads safely.
"""
future = self._exec_coro(self.async_shutdown())
future = self._exec_coro(self.async_shutdown(shutdown_workers=shutdown_workers))
if blocking:
return future.result(timeout=config.get("manager.timeout.worker-shutdown"))
return future
43 changes: 29 additions & 14 deletions chimerapy/engine/manager/worker_handler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def async_init(self):
# Specify observers
self.observers: Dict[str, TypedObserver] = {
"shutdown": TypedObserver(
"shutdown", on_asend=self.shutdown, handle_event="drop"
"shutdown", on_asend=self.shutdown, handle_event="pass"
),
"worker_register": TypedObserver(
"worker_register",
Expand All @@ -85,27 +85,42 @@ async def async_init(self):
for ob in self.observers.values():
await self.eventbus.asubscribe(ob)

async def shutdown(self) -> bool:
async def shutdown(self, shutdown_event: Event) -> bool:

# If workers are connected, let's notify them that the cluster is
# shutting down
success = True
params = shutdown_event.data

if len(self.state.workers) > 0:
if params["shutdown_workers"]:
# Send shutdown message
try:
success = await self._broadcast_request(
"post",
"/shutdown",
timeout=config.get("manager.timeout.worker-shutdown"),
report_exceptions=False,
)
except Exception:
success = False

# Send shutdown message
try:
success = await self._broadcast_request(
"post",
"/shutdown",
timeout=config.get("manager.timeout.worker-shutdown"),
report_exceptions=False,
)
except Exception:
success = False
if not success:
logger.warning(f"{self}: All workers shutdown: FAILED - forced")
else:
try:
# Signal workers that manager is shutdown
success = await self._broadcast_request(
"post",
"/manager-shutdown",
timeout=config.get("manager.timeout.worker-shutdown"),
report_exceptions=False,
)
except Exception:
success = False

if not success:
logger.warning(f"{self}: All workers shutdown: FAILED - forced")
if not success:
logger.warning(f"{self}: Signaling workers shutdown failed.")

# Close the HTTP client
await self.http_client.close()
Expand Down
13 changes: 12 additions & 1 deletion chimerapy/engine/worker/http_client_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from chimerapy.engine import _logger, config

from ..eventbus import EventBus, TypedObserver
from ..eventbus import Event, EventBus, TypedObserver
from ..logger.zmq_handlers import NodeIDZMQPullListener
from ..networking import Client
from ..service import Service
Expand Down Expand Up @@ -58,6 +58,9 @@ async def async_init(self):
"shutdown": TypedObserver(
"shutdown", on_asend=self.shutdown, handle_event="drop"
),
"manager_shutdown": TypedObserver(
"manager_shutdown", on_asend=self._manager_shutdown, handle_event="drop"
),
"WorkerState.changed": TypedObserver(
"WorkerState.changed",
on_asend=self._async_node_status_update,
Expand Down Expand Up @@ -90,6 +93,14 @@ async def shutdown(self) -> bool:

return success

async def _manager_shutdown(self) -> bool:
success = True
if self.connected_to_manager:
self.connected_to_manager = False
# May be broadcast events
await self.eventbus.asend(Event("WorkerState.changed"))
return success

###################################################################################
## Client Methods
###################################################################################
Expand Down
7 changes: 7 additions & 0 deletions chimerapy/engine/worker/http_server_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
web.post("/nodes/diagnostics", self._async_diagnostics_route),
# web.post("/packages/load", self._async_load_sent_packages),
web.post("/shutdown", self._async_shutdown_route),
web.post("/manager-shutdown", self._async_manager_shutdown_route),
],
ws_handlers={
NODE_MESSAGE.STATUS: self._async_node_status_update,
Expand Down Expand Up @@ -304,6 +305,12 @@ async def _async_shutdown_route(self, request: web.Request) -> web.Response:

return web.HTTPOk()

async def _async_manager_shutdown_route(self, request: web.Request) -> web.Response:
self.tasks.append(
asyncio.create_task(self.eventbus.asend(Event("manager_shutdown")))
)
return web.HTTPOk()

####################################################################
## WS Routes
####################################################################
Expand Down
16 changes: 16 additions & 0 deletions test/manager/test_manager_worker_connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,19 @@ async def test_manager_shutting_down_ungracefully():
# Only shutting Manager
await manager.async_shutdown()
await worker.async_shutdown()


async def test_manager_shutting_down_no_worker_shutdown():
manager = cpe.Manager(logdir=TEST_DATA_DIR, port=0)
await manager.aserve()
worker = cpe.Worker(name="local", port=0)
await worker.aserve()

# Connect to the Manager
await worker.async_connect(method="ip", host=manager.host, port=manager.port)

# Only shutting Manager
await manager.async_shutdown(shutdown_workers=False)
assert not worker.http_client.connected_to_manager

await worker.async_shutdown()

0 comments on commit 8a79a15

Please sign in to comment.