From 43ad6596831647fc5aa4aa04b9190936ba885ba6 Mon Sep 17 00:00:00 2001 From: Takeshi Yoshimura Date: Fri, 7 Oct 2022 14:23:25 -0700 Subject: [PATCH] add LSF scheduler (#588) Summary: I prototyped the LSF scheduler for torchx. It supports native, Docker, and Singularity as runtime with a shared filesystem at this moment. I confirmed it worked with Gloo and NCCL on small VPC V100 clusters. Note: `torchx log` command is available only when the torchx host shares the filesystem with cluster nodes (e.g., NFS). In a nutshell, the LSF scheduler translates a torchx request to be LSF job submissions (i.e., `bsub`). For distributed apps, it creates multiple `bsub`. I also added lsf to scripts/component_integration_tests.py. Here is the log output with my three-node LSF cluster and you can find dryrun results there. [component_integration_tests.lsf.txt](https://github.com/pytorch/torchx/files/9424891/component_integration_tests.lsf.txt) Regarding Singularity image compatibility, it already automates to convert docker images into singularity image format, and so, only we have to do is to generate singularity-exec arguments from torchx requests. Note that users still need to set prefix docker:// for image names if they want to use docker images. The following are example commands. **Example: native hello_world and CLI utils** ``` $ torchx run -s lsf -cfg jobdir=/mnt/data/torchx,runtime=native utils.echo --msg hello_world --num_replicas 3 lsf://torchx/echo-pxc3gn5ct061k $ torchx list -s lsf $ torchx status lsf://torchx/echo-pxc3gn5ct061k $ torchx cancel lsf://torchx/echo-pxc3gn5ct061k $ torchx log --stream stdout lsf://torchx/echo-pxc3gn5ct061k/echo/0 ``` **Example: Docker hello_world** ``` $ torchx run -s lsf -cfg jobdir=/mnt/data/torchx,runtime=docker utils.echo --image alpine:latest --msg hello_world --num_replicas 3 ``` **Example: Singularity hello_world** ``` $ torchx run -s lsf -cfg jobdir=/mnt/data/torchx,runtime=singularity utils.echo --image docker://alpine:latest --msg hello_world --num_replicas 3 ``` **Example: Docker Distributed** ``` $ cp scripts/dist_app.py /mnt/data/dist/ $ torchx run -s lsf -cfg "jobdir=/mnt/data/torchx,runtime=docker,host_network=True" dist.ddp -j 2x2 --gpu 2 --script /data/dist_app.py --mount "type=bind,src=/mnt/data/dist,dst=/data" ``` **Example: Singularity Distributed** ``` $ cp scripts/dist_app.py /mnt/data/dist/ $ torchx run -s lsf -cfg "jobdir=/mnt/data/torchx,runtime=singularity,host_network=True" dist.ddp --image docker://ghcr.io/pytorch/torchx:0.3.0dev0 -j 2x2 --gpu 2 --script /data/dist_app.py --mount "type=bind,src=/mnt/data/dist,dst=/data" ``` Pull Request resolved: https://github.com/pytorch/torchx/pull/588 Reviewed By: msaroufim Differential Revision: D40184939 Pulled By: msaroufim fbshipit-source-id: 5a13d2ee88b3b5cf1b8e5a3f6786b955d47f21f8 --- docs/source/index.rst | 1 + docs/source/schedulers/lsf.rst | 18 + scripts/component_integration_tests.py | 13 +- torchx/schedulers/__init__.py | 1 + torchx/schedulers/lsf_scheduler.py | 577 +++++++++++++++++++ torchx/schedulers/test/lsf_scheduler_test.py | 538 +++++++++++++++++ 6 files changed, 1147 insertions(+), 1 deletion(-) create mode 100644 docs/source/schedulers/lsf.rst create mode 100644 torchx/schedulers/lsf_scheduler.py create mode 100644 torchx/schedulers/test/lsf_scheduler_test.py diff --git a/docs/source/index.rst b/docs/source/index.rst index 2fd6464a7..736ea4724 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -74,6 +74,7 @@ Works With schedulers/slurm schedulers/ray schedulers/aws_batch + schedulers/lsf .. fbcode:: diff --git a/docs/source/schedulers/lsf.rst b/docs/source/schedulers/lsf.rst new file mode 100644 index 000000000..9799b75ae --- /dev/null +++ b/docs/source/schedulers/lsf.rst @@ -0,0 +1,18 @@ +IBM Spectrum LSF +================= + +.. automodule:: torchx.schedulers.lsf_scheduler + +.. currentmodule:: torchx.schedulers.lsf_scheduler + +.. autoclass:: LsfScheduler + :members: + :show-inheritance: + +.. autoclass:: LsfBsub + :members: + +Reference +~~~~~~~~~~~~ + +.. autofunction:: create_scheduler diff --git a/scripts/component_integration_tests.py b/scripts/component_integration_tests.py index 6cf9c021c..41ced16d9 100755 --- a/scripts/component_integration_tests.py +++ b/scripts/component_integration_tests.py @@ -51,7 +51,7 @@ def main() -> None: torchx_image = "dummy_image" dryrun = False - if scheduler in ("kubernetes", "local_docker", "aws_batch"): + if scheduler in ("kubernetes", "local_docker", "aws_batch", "lsf"): try: build = build_and_push_image() torchx_image = build.torchx_image @@ -105,6 +105,17 @@ def main() -> None: }, "workspace": f"file://{os.getcwd()}", }, + "lsf": { + "providers": [ + component_provider, + ], + "image": torchx_image, + "cfg": { + "runtime": "docker", + "jobdir": "/mnt/data/torchx", + "host_network": True, + }, + }, } params = run_parameters[scheduler] diff --git a/torchx/schedulers/__init__.py b/torchx/schedulers/__init__.py index f370ba195..c7dae4a2b 100644 --- a/torchx/schedulers/__init__.py +++ b/torchx/schedulers/__init__.py @@ -19,6 +19,7 @@ "kubernetes": "torchx.schedulers.kubernetes_scheduler", "aws_batch": "torchx.schedulers.aws_batch_scheduler", "ray": "torchx.schedulers.ray_scheduler", + "lsf": "torchx.schedulers.lsf_scheduler", } diff --git a/torchx/schedulers/lsf_scheduler.py b/torchx/schedulers/lsf_scheduler.py new file mode 100644 index 000000000..732492adb --- /dev/null +++ b/torchx/schedulers/lsf_scheduler.py @@ -0,0 +1,577 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +This contains the TorchX LSF scheduler which can be used to run TorchX +components on a LSF cluster. + +This scheduler is in prototype stage and may change without notice. If you run +into any issues or have feedback please submit an issue. + +Prerequisites +============== + +You'll need either an existing LSF cluster to run your jobs or for individuals +you can install LSF Community Edition. + +See the LSF documentation for more details: +https://www.ibm.com/support/pages/where-do-i-download-lsf-community-edition +""" +import os.path +import re +import shlex +import subprocess +import tempfile +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Dict, Iterable, List, Optional + +import torchx +from torchx.schedulers.api import ( + AppDryRunInfo, + DescribeAppResponse, + filter_regex, + ListAppResponse, + Scheduler, + split_lines_iterator, + Stream, +) +from torchx.schedulers.ids import make_unique +from torchx.schedulers.local_scheduler import LogIterator +from torchx.specs import ( + AppDef, + AppState, + BindMount, + DeviceMount, + macros, + NONE, + ReplicaStatus, + Role, + RoleStatus, + runopts, + VolumeMount, +) +from typing_extensions import TypedDict + +JOB_STATE: Dict[str, AppState] = { + "DONE": AppState.SUCCEEDED, + "EXIT": AppState.FAILED, + "PEND": AppState.PENDING, + "RUN": AppState.RUNNING, + "PSUSP": AppState.PENDING, + "USUSP": AppState.PENDING, + "SSUSP": AppState.PENDING, +} + + +def get_job_state(state_str: str, exit_code: str) -> AppState: + state = AppState.UNKNOWN + if state_str in JOB_STATE.keys(): + state = JOB_STATE[state_str] + if state == AppState.FAILED and exit_code == "130": # likely SIGINT + state = AppState.CANCELLED + return state + + +class LsfOpts(TypedDict, total=False): + lsf_queue: Optional[str] + jobdir: Optional[str] + container_workdir: Optional[str] + host_network: Optional[bool] + shm_size: Optional[str] + + +def get_docker_command(job_name: str, role: Role, cfg: LsfOpts) -> str: + cmds = ["docker", "run", f"--name={job_name}"] + for mount in role.mounts: + if isinstance(mount, BindMount): + rw = "rw" + if mount.read_only: + rw = "ro" + cmds += ["-v", f"{mount.src_path}:{mount.dst_path}:{rw}"] + elif isinstance(mount, VolumeMount): + ro = "" + if mount.read_only: + ro = ",ro" + cmds += [ + "--mount", + f"type=volume,src={mount.src},dst={mount.dst_path}{ro}", + ] + elif isinstance(mount, DeviceMount): + cmds += [f"--device={mount.src_path}:{mount.dst_path}:{mount.permissions}"] + container_workdir = cfg.get("container_workdir") + if container_workdir: + cmds += ["-w", container_workdir] + host_network = cfg.get("host_network") + if host_network: + cmds += ["--net=host", "--ipc=host"] + else: + for name, port in role.port_map.items(): + cmds += ["-p", str(port)] + shm_size = cfg.get("shm_size") + if shm_size: + cmds += [f"--shm-size={shm_size}"] + for key, value in dict(role.env).items(): + cmds += ["-e", f"{key}={value}"] + + resource = role.resource + if resource != NONE: + if resource.cpu > 0: + cmds += [f"--cpus={str(resource.cpu)}"] + if resource.memMB > 0: + cmds += [f"--memory={str(resource.memMB * 1024 * 1024)}"] + if resource.gpu > 0: + cmds += ["--gpus", "all"] + cmds += ["--entrypoint", role.entrypoint, "--rm", role.image] + [ + arg.replace("$", "\\$") for arg in role.args + ] + return shlex.join(cmds) + + +def get_command(job_name: str, role: Role, cfg: LsfOpts) -> str: + return get_docker_command(job_name, role, cfg) # TODO: add get_singularity_command + + +def get_bsub( + app_id: str, + job_name: str, + role: Role, + cfg: LsfOpts, + head_job_name: str, + head_job_host: str, +) -> str: + bsub_args = ["bsub", "-P", app_id, "-J", job_name] + if head_job_name != "": + bsub_args += [ + "-w", + f'"started({head_job_name})"', + "-R", + f"\"select[hname!='{head_job_host}']\"", + ] + else: + bsub_args += ["-m", head_job_host] + jobdir = cfg.get("jobdir") + if jobdir: + bsub_args += [ + "-cwd", + jobdir, + "-outdir", + jobdir, + "-oo", + f"{jobdir}/{job_name}.submit.out", + "-eo", + f"{jobdir}/{job_name}.submit.err", + ] + queue = cfg.get("lsf_queue") + if queue: + bsub_args += ["-q", queue] + resource = role.resource + + if resource is not None: + if resource.cpu > 0: + bsub_args += ["-n", str(resource.cpu)] + if resource.memMB > 0: + bsub_args += ["-R", f'"span[hosts=1] rusage[mem={str(resource.memMB)}]"'] + else: + bsub_args += ["-R", '"span[hosts=1]]"'] + if resource.gpu > 0: + bsub_args += [ + "-gpu", + f'"num={str(resource.gpu)}:mode=shared:j_exclusive=yes"', + ] + bsub_line = " ".join(bsub_args) + if jobdir: + return f"{bsub_line} << EOF\n{get_command(job_name, role, cfg)} > {jobdir}/{job_name}.out 2> {jobdir}/{job_name}.err\nEOF" + else: + return f"{bsub_line} << EOF\n{get_command(job_name, role, cfg)}\nEOF" + + +def cleanup_str(data: str) -> str: + """ + Invokes ``lower`` on thes string and removes all + characters that do not satisfy ``[a-z0-9]`` pattern. + This method is mostly used to make sure kubernetes scheduler gets + the job name that does not violate its validation. + """ + if data.startswith("-"): + data = data[1:] + pattern = r"[a-z0-9\-_]" + return "".join(re.findall(pattern, data.lower())) + + +def find_rank0_host_from_bhosts_stdout(msg: str, role: Role) -> str: + resource = role.resource + cpu = 1 + gpu = 0 + + if resource != NONE: + if resource.cpu > 0: + cpu = resource.cpu + if resource.gpu > 0: + gpu = resource.gpu + + for line in msg.split("\n"): + split = line.split(" ") + if len(split) >= 3 and cpu <= int(split[1]) and gpu <= int(split[2]): + return split[0] + raise Exception( + f"cannot find a host with {cpu} CPUs, and {gpu} GPUs. Try again with enough available resource." + ) + + +def find_rank0_host(role: Role) -> str: + p = subprocess.run( + ["bhosts", "-noheader", "-o", "hname max ng"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + ) + return find_rank0_host_from_bhosts_stdout(p.stdout.decode("utf-8"), role) + + +def get_submit_script( + app_id: str, cmd: List[str], app: AppDef, cfg: LsfOpts, rank0_host: str +) -> str: + bsubs = [] + head_job_name = "" + for role_idx, role in enumerate(app.roles): + for replica_id in range(role.num_replicas): + values = macros.Values( + img_root="", + app_id=app_id, + replica_id=str(replica_id), + rank0_env="TORCHX_RANK0_HOST", + ) + replica_role = values.apply(role) + name = cleanup_str(f"{role.name}-{replica_id}") + replica_role.env["TORCHX_RANK0_HOST"] = rank0_host + job_name = app_id + "-" + name + bsubs.append( + get_bsub(app_id, job_name, replica_role, cfg, head_job_name, rank0_host) + ) + if role_idx == 0 and replica_id == 0: + head_job_name = job_name + script = f"""#!/bin/bash +# +# Generated by TorchX {torchx.__version__} +# Run with: {shlex.join(cmd)} +# +""" + return script + "\n".join(bsubs) + "\n" + + +def bjobs_msg_to_describe(app_id: str, msg: str) -> Optional[DescribeAppResponse]: + if msg == "": + return None + roles = {} + roles_statuses = {} + app_state = AppState.RUNNING + success_count = 0 + total_count = 0 + for line in msg.split("\n"): + split = line.split(" ") + if len(split) < 2: + continue + proj = split[0] + role, _, idx = split[1][len(proj) + 1 :].rpartition("-") + idx = int(idx) + if role not in roles: + roles[role] = Role(name=role, num_replicas=0, image="") + roles_statuses[role] = RoleStatus(role, []) + roles[role].num_replicas += 1 + state = get_job_state(split[2], split[3]) + roles_statuses[role].replicas.append( + ReplicaStatus(id=idx, role=role, state=state, hostname="") + ) + if ( + state == AppState.FAILED + or state == AppState.CANCELLED + or state == AppState.PENDING + or state == AppState.UNKNOWN + ): + app_state = state + elif state == AppState.SUCCEEDED: + success_count += 1 + total_count += 1 + if success_count == total_count: + app_state = AppState.SUCCEEDED + # set roles, roles_statuses, app_state + return DescribeAppResponse( + app_id=app_id, + roles=list(roles.values()), + roles_statuses=list(roles_statuses.values()), + state=app_state, + msg=msg, + ) + + +def bjobs_msg_to_log_file( + app_id: str, + role_name: str, + k: int = 0, + streams: Optional[Stream] = None, + msg: str = "", +) -> str: + if streams == Stream.COMBINED: + raise ValueError( + "LsfScheduler does not support COMBINED log stream." + " Use `stdout` or `stderr`" + ) + + extension = "err" if streams == Stream.STDERR else "out" + + lines = msg.split("\n") + jobs = {} + log_file = "" + for line in lines: + if line != "": + split = line.split(" ") + if split[2] == "-": + continue + proj = split[0] + role, _, idx = split[1][len(proj) + 1 :].rpartition("-") + if app_id == proj and role == role_name and idx == str(k): + log_file = split[2] + f"/{split[1]}.{extension}" + if log_file == "": + raise ValueError( + f"cannot find log directory for {app_id}. Note: need to specify -cfg jobdir to use this functionality." + ) + return log_file + + +def bjobs_msg_to_list(msg: str) -> List[ListAppResponse]: + ret = [] + lines = msg.split("\n") + apps = {} + for line in lines: + if line != "": + split = line.split(" ") + state = get_job_state(split[1], split[2]) + if split[0] not in apps.keys(): + apps[split[0]] = [] + apps[split[0]].append(state) + for app_id, states in apps.items(): + success_count = 0 + app_state = AppState.RUNNING + for state in states: + if ( + state == AppState.FAILED + or state == AppState.CANCELLED + or state == AppState.PENDING + or state == AppState.UNKNOWN + ): + app_state = state + break + elif state == AppState.SUCCEEDED: + success_count += 1 + if success_count == len(states): + app_state = AppState.SUCCEEDED + ret.append(ListAppResponse(app_id=app_id, state=app_state)) + return ret + + +@dataclass +class LsfBsub: + jobdir: Optional[str] + app_id: str + app: AppDef + cfg: LsfOpts + cmd: List[str] + + def materialize(self, rank0_host: str = "RANK0_HOST") -> str: + return get_submit_script(self.app_id, self.cmd, self.app, self.cfg, rank0_host) + + def __repr__(self) -> str: + return f"""{' '.join(self.cmd + ['$BSUB_SCRIPT'])} +#------------ +# BSUB_SCRIPT +#------------ +{self.materialize()}""" + + +class LsfScheduler(Scheduler[LsfOpts]): + """ + **Example: hello_world** + + .. code-block:: bash + + $ torchx run -s lsf -cfg jobdir=/mnt/data/torchx utils.echo --image alpine:latest --msg hello_world --num_replicas 3 + ... + + **Example: Gloo** + + .. code-block:: bash + + $ cp dist_app.py /mnt/data/dist/ + $ torchx run -s lsf -cfg "jobdir=/mnt/data/torchx,host_network=True" dist.ddp -j 2x2 --gpu 2 --script /data/dist_app.py --mount "type=bind,src=/mnt/data/dist,dst=/data" + ... + + **Config Options** + + .. runopts:: + class: torchx.schedulers.lsf_scheduler.create_scheduler + + **Compatibility** + + .. compatibility:: + type: scheduler + features: + cancel: true + logs: true + distributed: true + describe: | + LsfScheduler will return job. + mounts: true + workspaces: false + + **TOFIX** + + - On host_network=False, tasks cannot reoslve static names such as /etc/hosts (containers cannot reach it without host network) + - Image downloads should be separated jobs + + """ + + def __init__(self, session_name: str) -> None: + super().__init__("lsf", session_name) + + def run_opts(self) -> runopts: + opts = runopts() + opts.add( + "lsf_queue", + type_=str, + default=None, + help="queue name to submit jobs", + ) + opts.add( + "jobdir", + type_=str, + default=None, + help="The directory to place the job code and outputs. The directory must not exist and will be created.", + ) + opts.add( + "container_workdir", + type_=str, + default=None, + help="working directory in container jobs", + ) + opts.add( + "host_network", + type_=bool, + default=False, + help="True if using the host network for jobs", + ) + opts.add( + "shm_size", + type_=str, + default="64m", + help="size of shared memory (/dev/shm) for jobs", + ) + + return opts + + def schedule(self, dryrun_info: AppDryRunInfo[LsfBsub]) -> str: + req = dryrun_info.request + with tempfile.TemporaryDirectory() as tempdir: + path = os.path.join(req.jobdir or tempdir, f"{req.app_id}.sh") + req.cmd += [path] + with open(path, "w") as f: + f.write(req.materialize(find_rank0_host(req.app.roles[0]))) + subprocess.run(req.cmd, stdout=subprocess.PIPE, check=True) + return req.app_id + + def _validate(self, app: AppDef, scheduler: str) -> None: + # Skip validation step for lsf + pass + + def _cancel_existing(self, app_id: str) -> None: + p = subprocess.run( + ["bjobs", "-noheader", "-a", "-P", app_id, "-o", "id"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + ) + msg = p.stdout.decode("utf-8") + if msg != "": + subprocess.run( + ["bkill"] + msg.strip().split("\n"), + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + ) + + def _submit_dryrun(self, app: AppDef, cfg: LsfOpts) -> AppDryRunInfo[LsfBsub]: + jobdir = cfg.get("jobdir") + assert jobdir is None or isinstance(jobdir, str), "jobdir must be str" + + app_id = cleanup_str(make_unique(app.name)) + return AppDryRunInfo( + LsfBsub(app_id=app_id, cmd=["/bin/bash"], jobdir=jobdir, app=app, cfg=cfg), + repr, + ) + + def describe(self, app_id: str) -> Optional[DescribeAppResponse]: + p = subprocess.run( + [ + "bjobs", + "-noheader", + "-a", + "-P", + app_id, + "-o", + "proj name stat exit_code", + ], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + ) + return bjobs_msg_to_describe(app_id=app_id, msg=p.stdout.decode("utf-8")) + + def log_iter( + self, + app_id: str, + role_name: str, + k: int = 0, + regex: Optional[str] = None, + since: Optional[datetime] = None, + until: Optional[datetime] = None, + should_tail: bool = False, + streams: Optional[Stream] = None, + ) -> Iterable[str]: + p = subprocess.run( + ["bjobs", "-noheader", "-a", "-P", app_id, "-o", "proj name output_dir"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + ) + log_file = bjobs_msg_to_log_file( + app_id=app_id, + role_name=role_name, + k=k, + streams=streams, + msg=p.stdout.decode("utf-8"), + ) + iterator = split_lines_iterator( + LogIterator(app_id, log_file, self, should_tail=should_tail) + ) + if regex: + iterator = filter_regex(regex, iterator) + return iterator + + def list(self) -> List[ListAppResponse]: + p = subprocess.run( + ["bjobs", "-noheader", "-a", "-o", "proj stat exit_code"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + ) + return bjobs_msg_to_list(p.stdout.decode("utf-8")) + + +def create_scheduler(session_name: str, **kwargs: Any) -> LsfScheduler: + return LsfScheduler( + session_name=session_name, + ) diff --git a/torchx/schedulers/test/lsf_scheduler_test.py b/torchx/schedulers/test/lsf_scheduler_test.py new file mode 100644 index 000000000..7a84372ef --- /dev/null +++ b/torchx/schedulers/test/lsf_scheduler_test.py @@ -0,0 +1,538 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import shlex +import tempfile +import unittest + +from unittest.mock import MagicMock, patch + +import torchx + +from torchx.schedulers.api import Stream +from torchx.schedulers.lsf_scheduler import ( + bjobs_msg_to_describe, + bjobs_msg_to_list, + bjobs_msg_to_log_file, + cleanup_str, + create_scheduler, + find_rank0_host_from_bhosts_stdout, + get_bsub, + get_docker_command, + get_job_state, + get_submit_script, + LsfBsub, + LsfOpts, + LsfScheduler, +) +from torchx.specs import ( + AppDef, + AppState, + BindMount, + DeviceMount, + macros, + Resource, + Role, + VolumeMount, +) + + +def simple_role() -> Role: + return Role( + name="foo", + image="/some/path", + entrypoint="echo", + args=["hello", "$HOSTNAME"], + env={}, + mounts=[], + num_replicas=1, + ) + + +def simple_app() -> AppDef: + return AppDef( + name="foo", + roles=[ + Role( + name="a", + image="/some/path", + entrypoint="echo", + args=[macros.replica_id, f"hello {macros.app_id}"], + num_replicas=1, + max_retries=3, + ), + ], + ) + + +def simple_opts() -> LsfOpts: + return LsfOpts( + { + "lsf_queue": "queue", + "jobdir": "/path/to/job", + "container_workdir": "/path/to/container", + "host_network": True, + "shm_size": "10G", + } + ) + + +class LsfSchedulerTest(unittest.TestCase): + def setUp(self) -> None: + self.make_unique_patch = patch("torchx.schedulers.lsf_scheduler.make_unique") + make_unique = self.make_unique_patch.start() + make_unique.return_value = "app-name-42" + + def tearDown(self) -> None: + self.make_unique_patch.stop() + + def test_create_scheduler(self) -> None: + scheduler = create_scheduler("foo") + self.assertIsInstance(scheduler, LsfScheduler) + + def test_get_job_state_DONE(self) -> None: + self.assertEqual(get_job_state("DONE", "0"), AppState.SUCCEEDED) + + def test_get_job_state_EXIT(self) -> None: + self.assertEqual(get_job_state("EXIT", "1"), AppState.FAILED) + + def test_get_job_state_SIGINT(self) -> None: + self.assertEqual(get_job_state("EXIT", "130"), AppState.CANCELLED) + + def test_get_job_state_PEND(self) -> None: + self.assertEqual(get_job_state("PEND", "0"), AppState.PENDING) + + def test_get_job_state_RUN(self) -> None: + self.assertEqual(get_job_state("RUN", "0"), AppState.RUNNING) + + def test_get_job_state_PSUSP(self) -> None: + self.assertEqual(get_job_state("PSUSP", "0"), AppState.PENDING) + + def test_get_job_state_USUSP(self) -> None: + self.assertEqual(get_job_state("USUSP", "0"), AppState.PENDING) + + def test_get_job_state_SSUSP(self) -> None: + self.assertEqual(get_job_state("SSUSP", "0"), AppState.PENDING) + + def test_get_job_state_UNKNOWN(self) -> None: + self.assertEqual(get_job_state("UNKONWN", "0"), AppState.UNKNOWN) + + def test_get_docker_command(self) -> None: + role = simple_role() + self.assertEqual( + get_docker_command("foo", role, cfg={}), + "docker run --name=foo --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_bind_mount_ro(self) -> None: + role = simple_role() + role.mounts = [ + BindMount(src_path="/bind/src", dst_path="/bind/dst", read_only=True) + ] + self.assertEqual( + get_docker_command("foo", role, cfg={}), + "docker run --name=foo -v /bind/src:/bind/dst:ro --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_bind_mount_rw(self) -> None: + role = simple_role() + role.mounts = [ + BindMount(src_path="/bind/src", dst_path="/bind/dst", read_only=False) + ] + self.assertEqual( + get_docker_command("foo", role, cfg={}), + "docker run --name=foo -v /bind/src:/bind/dst:rw --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_volume_mount_ro(self) -> None: + role = simple_role() + role.mounts = [VolumeMount(src="srcvol", dst_path="/vol/dst", read_only=True)] + self.assertEqual( + get_docker_command("foo", role, cfg={}), + "docker run --name=foo --mount type=volume,src=srcvol,dst=/vol/dst,ro --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_volume_mount_rw(self) -> None: + role = simple_role() + role.mounts = [VolumeMount(src="srcvol", dst_path="/vol/dst", read_only=False)] + self.assertEqual( + get_docker_command("foo", role, cfg={}), + "docker run --name=foo --mount type=volume,src=srcvol,dst=/vol/dst --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_device_mount(self) -> None: + role = simple_role() + role.mounts = [ + DeviceMount(src_path="/dev/fuse", dst_path="/dev/fuse", permissions="rwm") + ] + self.assertEqual( + get_docker_command("foo", role, cfg={}), + "docker run --name=foo --device=/dev/fuse:/dev/fuse:rwm --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_container_workdir(self) -> None: + role = simple_role() + self.assertEqual( + get_docker_command( + "foo", role, cfg={"container_workdir": "/tmp/container"} + ), + "docker run --name=foo -w /tmp/container --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_host_network(self) -> None: + role = simple_role() + self.assertEqual( + get_docker_command("foo", role, cfg={"host_network": True}), + "docker run --name=foo --net=host --ipc=host --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_port(self) -> None: + role = simple_role() + role.port_map = {"http": 80} + self.assertEqual( + get_docker_command("foo", role, cfg={}), + "docker run --name=foo -p 80 --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_shm_size(self) -> None: + role = simple_role() + self.assertEqual( + get_docker_command("foo", role, cfg={"shm_size": "10G"}), + "docker run --name=foo --shm-size=10G --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_envs(self) -> None: + role = simple_role() + role.env = {"FOO": "bar"} + self.assertEqual( + get_docker_command("foo", role, cfg={}), + "docker run --name=foo -e FOO=bar --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_resource(self) -> None: + role = simple_role() + role.resource = Resource(cpu=1, memMB=1, gpu=1) + self.assertEqual( + get_docker_command("foo", role, cfg={}), + f"docker run --name=foo --cpus=1 --memory={1024 * 1024} --gpus all --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_docker_command_full(self) -> None: + role = simple_role() + role.mounts = [ + BindMount(src_path="/bind/src", dst_path="/bind/dst", read_only=True) + ] + role.mounts += [ + BindMount(src_path="/bind/src", dst_path="/bind/dst", read_only=False) + ] + role.mounts += [ + DeviceMount(src_path="/dev/fuse", dst_path="/dev/fuse", permissions="rwm") + ] + role.port_map = {"http": 80} + role.env = {"FOO": "bar"} + role.resource = Resource(cpu=1, memMB=1, gpu=1) + self.assertEqual( + get_docker_command( + "foo", + role, + cfg={"container_workdir": "/tmp/container", "shm_size": "10G"}, + ), + f"docker run --name=foo -v /bind/src:/bind/dst:ro -v /bind/src:/bind/dst:rw --device=/dev/fuse:/dev/fuse:rwm " + f"-w /tmp/container -p 80 --shm-size=10G -e FOO=bar --cpus=1 --memory={1024 * 1024} --gpus all --entrypoint echo --rm /some/path hello '\\$HOSTNAME'", + ) + + def test_get_bsub(self) -> None: + app_id = "appid" + job_name = "job_name" + job_host = "job_host" + role = simple_role() + self.assertEqual( + get_bsub( + app_id, job_name, role, cfg={}, head_job_name="", head_job_host=job_host + ), + f'bsub -P {app_id} -J {job_name} -m {job_host} -R "span[hosts=1]]" << EOF\n{get_docker_command(job_name, role, cfg={})}\nEOF', + ) + + def test_get_bsub_head_job(self) -> None: + app_id = "appid" + job_name = "job_name" + head_job_name = "head_job" + job_host = "job_host" + role = simple_role() + self.assertEqual( + get_bsub( + app_id, + job_name, + role, + cfg={}, + head_job_name=head_job_name, + head_job_host=job_host, + ), + f'bsub -P {app_id} -J {job_name} -w "started({head_job_name})" -R "select[hname!=\'{job_host}\']" ' + f'-R "span[hosts=1]]" << EOF\n{get_docker_command(job_name, role, cfg={})}\nEOF', + ) + + def test_get_bsub_jobdir(self) -> None: + app_id = "appid" + job_name = "job_name" + job_host = "job_host" + jobdir = "/data" + role = simple_role() + self.assertEqual( + get_bsub( + app_id, + job_name, + role, + cfg={"jobdir": jobdir}, + head_job_name="", + head_job_host=job_host, + ), + f"bsub -P {app_id} -J {job_name} -m {job_host} " + f"-cwd {jobdir} -outdir {jobdir} -oo {jobdir}/{job_name}.submit.out -eo {jobdir}/{job_name}.submit.err " + f'-R "span[hosts=1]]" << EOF\n{get_docker_command(job_name, role, cfg={})} > {jobdir}/{job_name}.out 2> {jobdir}/{job_name}.err\nEOF', + ) + + def test_get_bsub_queue(self) -> None: + app_id = "appid" + job_name = "job_name" + job_host = "job_host" + role = simple_role() + self.assertEqual( + get_bsub( + app_id, + job_name, + role, + cfg={"lsf_queue": "queue"}, + head_job_name="", + head_job_host=job_host, + ), + f'bsub -P {app_id} -J {job_name} -m {job_host} -q queue -R "span[hosts=1]]" << EOF\n{get_docker_command(job_name, role, cfg={})}\nEOF', + ) + + def test_get_bsub_resource(self) -> None: + app_id = "appid" + job_name = "job_name" + job_host = "job_host" + role = simple_role() + role.resource = Resource(cpu=1, memMB=1, gpu=1) + self.assertEqual( + get_bsub( + app_id, job_name, role, cfg={}, head_job_name="", head_job_host=job_host + ), + f"bsub -P {app_id} -J {job_name} -m {job_host} " + f'-n 1 -R "span[hosts=1] rusage[mem=1]" -gpu "num=1:mode=shared:j_exclusive=yes" << EOF\n' + f"{get_docker_command(job_name, role, cfg={})}\nEOF", + ) + + def test_get_bsub_full(self) -> None: + app_id = "appid" + job_name = "job_name" + head_job_name = "head_job" + job_host = "job_host" + jobdir = "/data" + role = simple_role() + role.resource = Resource(cpu=1, memMB=1, gpu=1) + self.maxDiff = None + self.assertEqual( + get_bsub( + app_id, + job_name, + role, + cfg={"jobdir": jobdir, "lsf_queue": "queue"}, + head_job_name=head_job_name, + head_job_host=job_host, + ), + f'bsub -P {app_id} -J {job_name} -w "started({head_job_name})" -R "select[hname!=\'{job_host}\']" ' + f"-cwd {jobdir} -outdir {jobdir} -oo {jobdir}/{job_name}.submit.out -eo {jobdir}/{job_name}.submit.err -q queue " + '-n 1 -R "span[hosts=1] rusage[mem=1]" -gpu "num=1:mode=shared:j_exclusive=yes" << EOF\n' + f"{get_docker_command(job_name, role, cfg={})} > {jobdir}/{job_name}.out 2> {jobdir}/{job_name}.err\nEOF", + ) + + def test_cleanup_str(self) -> None: + self.assertEqual( + cleanup_str("-aBc*%Def"), + "abcdef", + ) + + def test_find_rank0_host_from_bhosts_stdout(self) -> None: + role = simple_role() + role.resource = Resource(cpu=1, memMB=1, gpu=1) + bhosts_stdout = "icgen2host-10-240-0-21 0 0\nicgen2host-10-240-0-22 16 2\nicgen2host-10-240-0-23 16 2\n" + self.assertEqual( + find_rank0_host_from_bhosts_stdout(bhosts_stdout, role), + "icgen2host-10-240-0-22", + ) + + def test_find_rank0_host_from_bhosts_stdout_too_big_request(self) -> None: + role = simple_role() + role.resource = Resource(cpu=10000, memMB=1, gpu=1) + bhosts_stdout = "icgen2host-10-240-0-21 0 0\nicgen2host-10-240-0-22 16 2\nicgen2host-10-240-0-23 16 2\n" + with self.assertRaises(Exception): + find_rank0_host_from_bhosts_stdout(bhosts_stdout, role) + + def test_get_submit_script(self) -> None: + app_id = "appid" + app = simple_app() + cmd = ["/bin/bash"] + rank0_host = "icgen2host-10-240-0-22" + head_job_name = "" + values = macros.Values( + img_root="", app_id=app_id, replica_id="0", rank0_env="TORCHX_RANK0_HOST" + ) + replica_role = values.apply(app.roles[0]) + replica_role.env["TORCHX_RANK0_HOST"] = rank0_host + job_name = "appid-a-0" + self.maxDiff = None + self.assertEqual( + get_submit_script( + app_id, cmd=cmd, app=app, cfg=LsfOpts({}), rank0_host=rank0_host + ), + f"""#!/bin/bash +# +# Generated by TorchX {torchx.__version__} +# Run with: {shlex.join(cmd)} +# +{get_bsub(app_id, job_name, replica_role, LsfOpts({}), head_job_name, rank0_host)} +""", + ) + + def test_bjobs_msg_to_describe(self) -> None: + # bjobs -noheader -a -P dist_app-c6v2phgkc2j2tc -o "proj name stat exit_code" + appid = "dist_app-c6v2phgkc2j2tc" + msg = "dist_app-c6v2phgkc2j2tc dist_app-c6v2phgkc2j2tc-dist_app-0 DONE -\ndist_app-c6v2phgkc2j2tc dist_app-c6v2phgkc2j2tc-dist_app-1 DONE -" + describe = bjobs_msg_to_describe(appid, msg) + self.assertIsNot(describe, None) + if describe: + self.assertEqual(describe.app_id, appid) + self.assertEqual(describe.state, AppState.SUCCEEDED) + self.assertEqual(describe.roles[0].num_replicas, 2) + + def test_bjobs_msg_to_describe_no_msg(self) -> None: + # bjobs -noheader -a -P dist_app-c6v2phgkc2j2tc -o "proj name stat exit_code" + appid = "dist_app-c6v2phgkc2j2tc" + msg = "" + describe = bjobs_msg_to_describe(appid, msg) + self.assertEqual(describe, None) + + def test_bjobs_msg_to_describe_fail(self) -> None: + # bjobs -noheader -a -P dist_app-vdkcfm1p7lxcx -o "proj name stat exit_code" + appid = "dist_app-vdkcfm1p7lxcx" + msg = "dist_app-vdkcfm1p7lxcx dist_app-vdkcfm1p7lxcx-dist_app-0 EXIT 1\ndist_app-vdkcfm1p7lxcx dist_app-vdkcfm1p7lxcx-dist_app-1 EXIT 1" + describe = bjobs_msg_to_describe(appid, msg) + self.assertIsNot(describe, None) + if describe: + self.assertEqual(describe.app_id, appid) + self.assertEqual(describe.state, AppState.FAILED) + self.assertEqual(describe.roles[0].num_replicas, 2) + + def test_bjobs_msg_to_log_file_out(self) -> None: + # bjobs -noheader -a -P dist_app-c6v2phgkc2j2tc -o "proj name output_dir" + msg = "dist_app-c6v2phgkc2j2tc dist_app-c6v2phgkc2j2tc-dist_app-0 /mnt/data/torchx\ndist_app-c6v2phgkc2j2tc dist_app-c6v2phgkc2j2tc-dist_app-1 /mnt/data/torchx" + log_file = bjobs_msg_to_log_file( + "dist_app-c6v2phgkc2j2tc", "dist_app", k=0, streams=Stream.STDOUT, msg=msg + ) + self.assertEqual( + log_file, "/mnt/data/torchx/dist_app-c6v2phgkc2j2tc-dist_app-0.out" + ) + + def test_bjobs_msg_to_log_file_err(self) -> None: + # bjobs -noheader -a -P dist_app-c6v2phgkc2j2tc -o "proj name output_dir" + msg = "dist_app-c6v2phgkc2j2tc dist_app-c6v2phgkc2j2tc-dist_app-0 /mnt/data/torchx\ndist_app-c6v2phgkc2j2tc dist_app-c6v2phgkc2j2tc-dist_app-1 /mnt/data/torchx" + log_file = bjobs_msg_to_log_file( + "dist_app-c6v2phgkc2j2tc", "dist_app", k=0, streams=Stream.STDERR, msg=msg + ) + self.assertEqual( + log_file, "/mnt/data/torchx/dist_app-c6v2phgkc2j2tc-dist_app-0.err" + ) + + def test_bjobs_msg_to_log_file_combined(self) -> None: + # bjobs -noheader -a -P dist_app-c6v2phgkc2j2tc -o "proj name output_dir" + msg = "dist_app-c6v2phgkc2j2tc dist_app-c6v2phgkc2j2tc-dist_app-0 /mnt/data/torchx\ndist_app-c6v2phgkc2j2tc dist_app-c6v2phgkc2j2tc-dist_app-1 /mnt/data/torchx" + with self.assertRaises(ValueError): + bjobs_msg_to_log_file( + "dist_app-c6v2phgkc2j2tc", + "dist_app", + k=0, + streams=Stream.COMBINED, + msg=msg, + ) + + def test_bjobs_msg_to_log_file_no_jobdir(self) -> None: + # bjobs -noheader -a -P dist_app-mnhnfk1gvhcqq -o "proj name output_dir" + msg = "dist_app-mnhnfk1gvhcqq dist_app-mnhnfk1gvhcqq-dist_app-0 -\ndist_app-mnhnfk1gvhcqq dist_app-mnhnfk1gvhcqq-dist_app-1 -" + with self.assertRaises(ValueError): + bjobs_msg_to_log_file( + "dist_app-mnhnfk1gvhcqq", + "dist_app", + k=0, + streams=Stream.STDERR, + msg=msg, + ) + + def test_bjobs_msg_to_list(self) -> None: + # bjobs -noheader -a -o "proj stat exit_code" + msg = "dist_app-c6v2phgkc2j2tc DONE -\ndist_app-c6v2phgkc2j2tc DONE -\ndist_app-vdkcfm1p7lxcx EXIT 1\ndist_app-vdkcfm1p7lxcx EXIT 1" + listApps = bjobs_msg_to_list(msg) + self.assertEqual(len(listApps), 2) + self.assertEqual(listApps[0].app_id, "dist_app-c6v2phgkc2j2tc") + self.assertEqual(listApps[0].state, AppState.SUCCEEDED) + self.assertEqual(listApps[1].app_id, "dist_app-vdkcfm1p7lxcx") + self.assertEqual(listApps[1].state, AppState.FAILED) + + def test_submit_dryrun(self) -> None: + scheduler = create_scheduler("foo") + app = simple_app() + info = scheduler.submit_dryrun(app, cfg={}) + req = info.request + self.assertIsInstance(req, LsfBsub) + + def test_validate(self) -> None: + scheduler = create_scheduler("foo") + app = simple_app() + scheduler._validate(app, "lsf") + + @patch("subprocess.run") + def test_schedule(self, run: MagicMock) -> None: + run.return_value.stdout = b"icgen2host-10-240-0-23 16 2\n" + scheduler = create_scheduler("foo") + app = simple_app() + info = scheduler.submit_dryrun(app, cfg={}) + out = scheduler.schedule(info) + self.assertEqual(out, "app-name-42") + self.assertEqual(run.call_count, 2) + + @patch("subprocess.run") + def test_cancel(self, run: MagicMock) -> None: + scheduler = create_scheduler("foo") + app_id = "1234" + self.assertTrue(scheduler.exists(app_id)) + self.assertEqual(run.call_count, 1) + scheduler.cancel(app_id) + self.assertEqual(run.call_count, 4) + + @patch("subprocess.run") + def test_list(self, run: MagicMock) -> None: + scheduler = create_scheduler("foo") + self.assertEqual(scheduler.list(), []) + self.assertEqual(run.call_count, 1) + self.assertEqual( + run.call_args[0][0], + ["bjobs", "-noheader", "-a", "-o", "proj stat exit_code"], + ) + + @patch("torchx.schedulers.lsf_scheduler.LogIterator") + @patch("subprocess.run") + def test_log_iter(self, run: MagicMock, LogIterator: MagicMock) -> None: + run.return_value.stdout = b"app-id app-id-role-0 /some/path\n" + + scheduler = create_scheduler("foo") + out = scheduler.log_iter("app-id", "role", regex=".*") + self.assertEqual(list(out), []) + self.assertEqual(run.call_count, 1) + self.assertEqual( + run.call_args[0][0], + ["bjobs", "-noheader", "-a", "-P", "app-id", "-o", "proj name output_dir"], + )