Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast Collection with ZeroMQ based chunked transfers #285

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
4 changes: 4 additions & 0 deletions chimerapy/engine/chimerapyrc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ config:
worker-shutdown: 10 # seconds
node-creation: 130 # seconds
reset: 30
collect: 1800 # 30 minutes
retry:
data-collection: 30 # seconds
logs-sink:
Expand Down Expand Up @@ -39,3 +40,6 @@ config:
deque-length: 10000
interval: 10
logging-enabled: false
file-transfer:
chunk-size: 500000 # bytes
max-chunks: 2 # Number of chunks to send at once
155 changes: 155 additions & 0 deletions chimerapy/engine/manager/artifacts_collector_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import logging
import pathlib
from typing import Any, Dict, Optional

import aioshutil
import zmq.asyncio

import chimerapy.engine.config as cpe_config
from chimerapy.engine._logger import fork, getLogger
from chimerapy.engine.networking.zmq_file_transfer_client import ZMQFileClient
from chimerapy.engine.utils import get_progress_bar

from ..eventbus import Event, EventBus, TypedObserver
from ..service import Service
from ..states import ManagerState
from .events import UpdateSendArchiveEvent


class ArtifactsCollectorService(Service):
def __init__(
self,
name: str,
eventbus: EventBus,
state: ManagerState,
parent_logger: Optional[logging.Logger] = None,
):
super().__init__(name=name)
self.eventbus = eventbus
self.observers: Dict[str, TypedObserver] = {}
self.clients: Dict[str, ZMQFileClient] = {}
self.state = state
self.progressbar = get_progress_bar()

if parent_logger is None:
parent_logger = getLogger("chimerapy-engine")

self.logger = fork(parent_logger, self.__class__.__name__)

async def async_init(self):
self.observers = {
"artifacts_transfer_ready": TypedObserver(
"artifacts_transfer_ready", on_asend=self.collect, handle_event="pass"
)
}

for name, observer in self.observers.items(): # noqa: B007
await self.eventbus.asubscribe(observer)

async def collect(self, event: Event) -> None:
assert event.data is not None
method = event.data["method"]
if method == "zmq":
self.logger.debug("Collecting artifacts over ZMQ")
await self._collect_zmq(
worker_id=event.data["worker_id"],
host=event.data["ip"],
port=event.data["port"],
artifacts=event.data["data"],
)
else:
self.logger.debug("Collecting artifacts locally")
await self._collect_local(
worker_id=event.data["worker_id"], artifacts=event.data["data"]
)

async def _collect_zmq(
self, worker_id: str, host: str, port: int, artifacts: Dict[str, Any]
):
files = {}
self.logger.debug("Preparing files to download")
for node_id, artifact_details in artifacts.items():
out_dir = self._create_node_dir(worker_id, node_id)
for artifact in artifact_details:
key = f"{node_id}-{artifact['name']}"
files[key] = {
"name": artifact["filename"],
"size": artifact["size"],
"outdir": out_dir,
}
context = zmq.asyncio.Context.instance()
client = ZMQFileClient(
context=context,
host=host,
port=port,
credit=cpe_config.get("file-transfer.max-chunks"),
chunk_size=cpe_config.get("file-transfer.chunk-size"),
files=files,
parent_logger=self.logger,
progressbar=self.progressbar,
)
self.clients[worker_id] = client
try:
await client.async_init()
await client.download_files()
event_data = UpdateSendArchiveEvent(worker_id=worker_id, success=True)
except Exception as e:
event_data = UpdateSendArchiveEvent(
worker_id=worker_id,
success=False,
)
self.logger.error(
f"Error while collecting artifacts for worker {worker_id}: {e}"
)
finally:
await self.eventbus.asend(Event("update_send_archive", event_data))
self.logger.info(f"Successfully collected artifacts for worker {worker_id}")

async def _collect_local(self, worker_id: str, artifacts: Dict[str, Any]) -> None:
try:
for node_id, node_artifacts in artifacts.items():
node_dir = self._create_node_dir(worker_id, node_id)

for artifact in node_artifacts:
artifact_path = pathlib.Path(artifact["path"])
self.logger.debug(f"Copying {artifact_path} to {node_dir}")
await aioshutil.copyfile(
artifact_path, node_dir / artifact["filename"]
)

await self.eventbus.asend(
Event(
"update_send_archive",
UpdateSendArchiveEvent(worker_id=worker_id, success=True),
)
)
event_data = UpdateSendArchiveEvent(
worker_id=worker_id,
success=True,
)
self.logger.info(f"Successfully collected artifacts for worker {worker_id}")
except Exception as e:
event_data = UpdateSendArchiveEvent(
worker_id=worker_id,
success=False,
)
self.logger.error(
f"Error while collecting artifacts for worker {worker_id}: {e}"
)
finally:
await self.eventbus.asend(Event("update_send_archive", event_data))

def _create_worker_dir(self, worker_id):
worker_name = self.state.workers[worker_id].name
worker_dir = self.state.logdir / f"{worker_name}-{worker_id[:10]}"
worker_dir.mkdir(parents=True, exist_ok=True)

return worker_dir

def _create_node_dir(self, worker_id, node_id):
worker_dir = self._create_worker_dir(worker_id)
nodes = self.state.workers[worker_id].nodes
node_dir = worker_dir / nodes[node_id].name
node_dir.mkdir(parents=True, exist_ok=True)

return node_dir
10 changes: 10 additions & 0 deletions chimerapy/engine/manager/http_server_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def __init__(
web.post("/workers/deregister", self._deregister_worker_route),
web.post("/workers/node_status", self._update_nodes_status),
web.post("/workers/send_archive", self._update_send_archive),
web.post(
"/workers/artifacts_transfer_ready",
self._file_transfer_server_ready,
),
],
)

Expand Down Expand Up @@ -192,6 +196,12 @@ async def _update_send_archive(self, request: web.Request):
await self.eventbus.asend(Event("update_send_archive", event_data))
return web.HTTPOk()

async def _file_transfer_server_ready(self, request: web.Request):
msg = await request.json()
event_data = msg
await self.eventbus.asend(Event("artifacts_transfer_ready", event_data))
return web.HTTPOk()

#####################################################################################
## Front-End API
#####################################################################################
Expand Down
23 changes: 23 additions & 0 deletions chimerapy/engine/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Eventbus
from ..eventbus import Event, EventBus, make_evented
from ..networking.async_loop_thread import AsyncLoopThread
from .artifacts_collector_service import ArtifactsCollectorService
from .distributed_logging_service import DistributedLoggingService

# Services
Expand Down Expand Up @@ -110,13 +111,20 @@ async def aserve(self) -> bool:
state=self.state,
# **self.kwargs,
)
self.artifacts_collector = ArtifactsCollectorService(
name="artifacts_collector",
eventbus=self.eventbus,
state=self.state,
parent_logger=logger,
)

# Initialize services
await self.http_server.async_init()
await self.worker_handler.async_init()
await self.zeroconf_service.async_init()
await self.session_record.async_init()
await self.distributed_logging.async_init()
await self.artifacts_collector.async_init()

# Start all services
await self.eventbus.asend(Event("start"))
Expand Down Expand Up @@ -336,6 +344,9 @@ async def async_stop(self) -> bool:
async def async_collect(self) -> bool:
return await self.worker_handler.collect()

async def async_collect_v2(self) -> bool:
return await self.worker_handler.collect_v2()

async def async_reset(self, keep_workers: bool = True):
return await self.worker_handler.reset(keep_workers)

Expand Down Expand Up @@ -479,6 +490,18 @@ def collect(self) -> Future[bool]:
"""
return self._exec_coro(self.async_collect())

def collect_v2(self) -> Future[bool]:
"""Collect data from the Workers

First, we wait until all the Nodes have finished save their data.\
Then, manager request that Nodes' from the Workers.

Returns:
Future[bool]: Future of success in collect data from Workers

"""
return self._exec_coro(self.async_collect_v2())

def reset(
self, keep_workers: bool = True, blocking: bool = True
) -> Union[bool, Future[bool]]:
Expand Down
19 changes: 19 additions & 0 deletions chimerapy/engine/manager/worker_handler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import dill
import networkx as nx

import chimerapy.engine.config as cpe_config
from chimerapy.engine import _logger, config

from ..data_protocols import NodePubTable
Expand Down Expand Up @@ -824,6 +825,24 @@ async def collect(self) -> bool:
await self.eventbus.asend(Event("save_meta"))
return all(results)

def workers_collected(self):
for worker_id in self.state.workers:
if worker_id not in self.collected_workers:
return False
return True

async def collect_v2(self) -> bool:
await self._broadcast_request("post", "/nodes/request_collect")
await async_waiting_for(
self.workers_collected, timeout=cpe_config.get("manager.timeout.collect")
)

for worker_id in self.collected_workers:
if not self.collected_workers[worker_id]:
return False

return True

async def reset(self, keep_workers: bool = True):

# Destroy Nodes safely
Expand Down
1 change: 1 addition & 0 deletions chimerapy/engine/networking/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ class NODE_MESSAGE(Enum):
REPORT_SAVING = 52
REPORT_RESULTS = 53
DIAGNOSTICS = 54
ARTIFACTS = 55
30 changes: 30 additions & 0 deletions chimerapy/engine/networking/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os


class ZMQFileChunk:
"""A ZeroMQ File Chunk."""

def __init__(self, data: bytes) -> None:
self.data = data

def write_into(self, handle) -> None:
handle.write(self.data)

async def awrite_into(self, ahandle) -> None:
await ahandle.write(self.data)

@classmethod
def from_bytes(cls, data) -> "ZMQFileChunk":
return cls(data=data)

@classmethod
def read_from(cls, handle, offset, chunk_size) -> "ZMQFileChunk":
handle.seek(offset, os.SEEK_SET)
data = handle.read(chunk_size)
return cls.from_bytes(data=data)

@classmethod
async def aread_from(cls, ahandle, offset, chunk_size) -> "ZMQFileChunk":
await ahandle.seek(offset, os.SEEK_SET)
data = await ahandle.read(chunk_size)
return cls.from_bytes(data=data)
Loading
Loading