Skip to content

Commit

Permalink
Merge pull request #2 from ChimeraPy/1-wconnection-fix
Browse files Browse the repository at this point in the history
Use async utils to connect worker. Closes #1
  • Loading branch information
umesh-timalsina authored Oct 13, 2023
2 parents 5ce2685 + 3e80d7c commit 3a3933b
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 30 deletions.
27 changes: 21 additions & 6 deletions chimerapy/workerui/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from uvicorn import run
import asyncio

from chimerapy.workerui.utils import instantiate_worker

Expand Down Expand Up @@ -90,19 +91,33 @@ def add_worker_ui_parser(subparsers):
)


def connect_worker(args):
async def aconnect_worker(args):
"""Connect the worker to the manager."""
worker = instantiate_worker(
name=args.name,
id=args.id,
wport=args.wport,
delete_temp=args.delete_temp,
port=args.port,
ip=args.ip,
zeroconf=args.zeroconf,
)
print("Starting worker")
await worker.aserve()

port = args.port
ip = args.ip
zeroconf = args.zeroconf

method = "zeroconf" if zeroconf else "ip"

await worker.async_connect(
method=method,
host=ip,
port=port,
timeout=args.timeout,
)

worker.idle()
worker.logger.info("IDLE")
while True:
await asyncio.sleep(1)


def serve_worker_ui(args):
Expand Down Expand Up @@ -137,7 +152,7 @@ def main(args=None):
cli_args = parser.parse_args(args)

if cli_args.subcommand == "connect":
connect_worker(cli_args)
asyncio.run(aconnect_worker(cli_args))
elif cli_args.subcommand == "ui":
serve_worker_ui(cli_args)
else:
Expand Down
10 changes: 8 additions & 2 deletions chimerapy/workerui/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,16 @@ async def _instantiate_worker(self, config: WorkerConfig) -> Dict[str, Any]:
id=config.id or None,
wport=config.wport or 0,
delete_temp=config.delete_temp,
)
await self.worker_instance.aserve()

await self.worker_instance.async_connect(
port=config.port,
ip=config.ip,
zeroconf=config.zeroconf,
host=config.ip,
timeout=config.timeout,
method="zeroconf" if config.zeroconf else "ip",
)
print("Connected to manager.")
await self._initialize_updater()
except TimeoutError:
self.worker_instance = None
Expand Down
21 changes: 0 additions & 21 deletions chimerapy/workerui/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,14 @@ def instantiate_worker(
name: str,
id: Optional[str] = None,
delete_temp: bool = False,
port: int = 0,
zeroconf: bool = False,
ip: Optional[str] = None,
wport: int = 0,
timeout: int = 20,
) -> Worker:
"""Connect the worker to the manager."""
method = "ip"
if not zeroconf:
if not ip or not port:
raise ValueError("Must specify IP and port if not using zeroconf")
else:
method = "zeroconf"

worker = Worker(
name=name,
id=id,
delete_temp=delete_temp,
port=wport,
)
try:
worker.connect(
port=port,
host=ip,
method=method,
timeout=timeout,
)
except Exception as e:
worker.shutdown(blocking=True)
raise e

return worker
2 changes: 1 addition & 1 deletion chimerapy/workerui/worker_state_broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def initialize(self, eventbus: EventBus, state: WorkerState):
self.state = state

for ob in self.observers.values():
self.eventbus.subscribe(ob).result(timeout=1)
await self.eventbus.asubscribe(ob)

async def on_state_changed(self):
for client in self.clients:
Expand Down

0 comments on commit 3a3933b

Please sign in to comment.