Skip to content

Commit

Permalink
Allow CWL workflows to have jobs use all of a Slurm node's memory (#5052
Browse files Browse the repository at this point in the history
)

* Remove periods form option section headings

* Hide entire service section from CWL help

* Change Slurm memory allocation flag to new style, and document Slurm options and env vars

* Use but add a way to bypass the CWL spec default RAM, and rearrange options

* Add --slurmDefaultAllMem to let jobs without their own memory requirements use whole nodes' memory

* Actually set new config entry

* Add a test for whole-node CWL memory on Slurm

* Detect a job not getting a node as a passing test, and fix Slurm job cleanup on kill

* Fix Slurm issue method typing

* Fix docs and remove dead code from code review

* Drop duplicate import options

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
adamnovak and github-actions[bot] authored Oct 14, 2024
1 parent 4514bfa commit 631ae0c
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 91 deletions.
11 changes: 10 additions & 1 deletion docs/appendices/environment_vars.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ There are several environment variables that affect the way Toil runs.
| TOIL_GOOGLE_PROJECTID | The Google project ID to use when generating |
| | Google job store names for tests or CWL workflows. |
+----------------------------------+----------------------------------------------------+
| TOIL_SLURM_ALLOCATE_MEM | Whether to allocate memory in Slurm with --mem. |
| | True by default. |
+----------------------------------+----------------------------------------------------+
| TOIL_SLURM_ARGS | Arguments for sbatch for the slurm batch system. |
| | Do not pass CPU or memory specifications here. |
| | Instead, define resource requirements for the job. |
Expand All @@ -140,9 +143,15 @@ There are several environment variables that affect the way Toil runs.
| | provided. |
+----------------------------------+----------------------------------------------------+
| TOIL_SLURM_PE | Name of the slurm partition to use for parallel |
| | jobs. |
| | jobs. Useful for Slurm clusters that do not offer |
| | a partition accepting both single-core and |
| | multi-core jobs. |
| | There is no default value for this variable. |
+----------------------------------+----------------------------------------------------+
| TOIL_SLURM_TIME | Slurm job time limit, in [DD-]HH:MM:SS format. For |
| | example, ``2-07:15:30`` for 2 days, 7 hours, 15 |
| | minutes and 30 seconds, or ``4:00:00`` for 4 hours.|
+----------------------------------+----------------------------------------------------+
| TOIL_GRIDENGINE_ARGS | Arguments for qsub for the gridengine batch |
| | system. Do not pass CPU or memory specifications |
| | here. Instead, define resource requirements for |
Expand Down
14 changes: 11 additions & 3 deletions docs/cwl/running.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,17 @@ printed to the stdout stream after workflow execution.
``--stats``: Save resources usages in json files that can be collected with the
``toil stats`` command after the workflow is done.

``--disable-streaming``: Does not allow streaming of input files. This is enabled
by default for files marked with ``streamable`` flag True and only for remote files
when the jobStore is not on local machine.
Extra Toil CWL Options
++++++++++++++++++++++

Besides the normal Toil options and the options supported by cwltool, toil-cwl-runner adds some of its own options:

--bypass-file-store Do not use Toil's file store system and assume all paths are accessible in place from all nodes. This can avoid possibly-redundant file copies into Toil's job store storage, and is required for CWL's ``InplaceUpdateRequirement``. But, it allows a failed job execution to leave behind a partially-modified state, which means that a restarted workflow might not work correctly.
--reference-inputs Do not copy remote inputs into Toil's file store and assume they are accessible in place from all nodes.
--disable-streaming Do not allow streaming of job input files. By default, files marked with ``streamable`` True are streamed from remote job stores.
--cwl-default-ram Apply CWL specification default ramMin.
--no-cwl-default-ram Do not apply CWL specification default ramMin, so that Toil --defaultMemory applies.


Running CWL in the Cloud
------------------------
Expand Down
22 changes: 17 additions & 5 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,23 @@ levels in toil are based on priority from the logging module:
Use a Mesos role.
--mesosName MESOSNAME
The Mesos name to use. (default: toil)
--scale SCALE A scaling factor to change the value of all submitted
tasks' submitted cores. Used in single_machine batch
system. Useful for running workflows on smaller
machines than they were designed for, by setting a
value less than 1. (default: 1)
--slurmAllocateMem SLURM_ALLOCATE_MEM
If False, do not use --mem. Used as a workaround for
Slurm clusters that reject jobs with memory
allocations.
--slurmTime SLURM_TIME
Slurm job time limit, in [DD-]HH:MM:SS format.
--slurmPE SLURM_PE Special partition to send Slurm jobs to if they ask
for more than 1 CPU. Useful for Slurm clusters that do
not offer a partition accepting both single-core and
multi-core jobs.
--slurmArgs SLURM_ARGS
Extra arguments to pass to Slurm.
--kubernetesHostPath KUBERNETES_HOST_PATH
Path on Kubernetes hosts to use as shared inter-pod temp
directory.
Expand All @@ -261,11 +278,6 @@ levels in toil are based on priority from the logging module:
The ARN of an IAM role to run AWS Batch jobs as, so they
can e.g. access a job store. Must be assumable by
ecs-tasks.amazonaws.com
--scale SCALE A scaling factor to change the value of all submitted
tasks' submitted cores. Used in single_machine batch
system. Useful for running workflows on smaller
machines than they were designed for, by setting a
value less than 1. (default: 1)

**Data Storage Options**
Allows configuring Toil's data storage.
Expand Down
54 changes: 35 additions & 19 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
UpdatedBatchJobInfo)
from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport
from toil.bus import ExternalBatchIdMessage, get_job_kind
from toil.job import AcceleratorRequirement
from toil.job import JobDescription, AcceleratorRequirement
from toil.lib.misc import CalledProcessErrorStderr
from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry

Expand Down Expand Up @@ -79,6 +79,7 @@ def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queu
self.killedJobsQueue = killedJobsQueue
self.waitingJobs: List[JobTuple] = list()
self.runningJobs = set()
# TODO: Why do we need a lock for this? We have the GIL.
self.runningJobsLock = Lock()
self.batchJobIDs: Dict[int, str] = dict()
self._checkOnJobsCache = None
Expand Down Expand Up @@ -400,28 +401,36 @@ def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int
def supportsAutoDeployment(cls):
return False

def issueBatchJob(self, command: str, jobDesc, job_environment: Optional[Dict[str, str]] = None):
def count_needed_gpus(self, job_desc: JobDescription):
"""
Count the number of cluster-allocateable GPUs we want to allocate for the given job.
"""
gpus = 0
if isinstance(job_desc.accelerators, list):
for accelerator in job_desc.accelerators:
if accelerator['kind'] == 'gpu':
gpus += accelerator['count']
else:
gpus = job_desc.accelerators

return gpus

def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None):
# Avoid submitting internal jobs to the batch queue, handle locally
localID = self.handleLocalJob(command, jobDesc)
if localID is not None:
return localID
local_id = self.handleLocalJob(command, job_desc)
if local_id is not None:
return local_id
else:
self.check_resource_request(jobDesc)
jobID = self.getNextJobID()
self.currentJobs.add(jobID)
gpus = 0
if isinstance(jobDesc.accelerators, list):
for accelerator in jobDesc.accelerators:
if accelerator['kind'] == 'gpu':
gpus = accelerator['count']
else:
gpus = jobDesc.accelerators
self.check_resource_request(job_desc)
gpus = self.count_needed_gpus(job_desc)
job_id = self.getNextJobID()
self.currentJobs.add(job_id)

self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, command, get_job_kind(jobDesc.get_names()),
self.newJobsQueue.put((job_id, job_desc.cores, job_desc.memory, command, get_job_kind(job_desc.get_names()),
job_environment, gpus))
logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(jobID),
get_job_kind(jobDesc.get_names()))
return jobID
logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(job_id),
get_job_kind(job_desc.get_names()))
return job_id

def killBatchJobs(self, jobIDs):
"""
Expand Down Expand Up @@ -511,6 +520,13 @@ def shutdown(self) -> None:
newJobsQueue.put(None)
self.background_thread.join()

# Now in one thread, kill all the jobs
if len(self.background_thread.runningJobs) > 0:
logger.warning("Cleaning up %s jobs still running at shutdown", len(self.background_thread.runningJobs))
for job in self.background_thread.runningJobs:
self.killQueue.put(job)
self.background_thread.killJobs()

def setEnv(self, name, value=None):
if value and ',' in value:
raise ValueError(type(self).__name__ + " does not support commata in environment variable values")
Expand Down
61 changes: 47 additions & 14 deletions src/toil/batchSystems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
import math
import os
import sys
from argparse import ArgumentParser, _ArgumentGroup
from argparse import ArgumentParser, _ArgumentGroup, SUPPRESS
from shlex import quote
from typing import Dict, List, Optional, Set, Tuple, TypeVar, Union, NamedTuple

from toil.bus import get_job_kind
from toil.common import Config
from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE, InsufficientSystemResources
from toil.batchSystems.abstractGridEngineBatchSystem import \
AbstractGridEngineBatchSystem
from toil.batchSystems.options import OptionSetter
from toil.job import Requirer
from toil.job import JobDescription, Requirer
from toil.lib.conversions import strtobool
from toil.lib.misc import CalledProcessErrorStderr, call_command
from toil.statsAndLogging import TRACE

Expand Down Expand Up @@ -566,7 +568,7 @@ def prepareSbatch(self,
if cpu and cpu > 1 and parallel_env:
sbatch_line.append(f'--partition={parallel_env}')

if mem is not None and self.boss.config.allocate_mem: # type: ignore[attr-defined]
if mem is not None and self.boss.config.slurm_allocate_mem: # type: ignore[attr-defined]
# memory passed in is in bytes, but slurm expects megabytes
sbatch_line.append(f'--mem={math.ceil(mem / 2 ** 20)}')
if cpu is not None:
Expand Down Expand Up @@ -625,6 +627,33 @@ def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int
super().__init__(config, maxCores, maxMemory, maxDisk)
self.partitions = SlurmBatchSystem.PartitionSet()

# Override issuing jobs so we can check if we need to use Slurm's magic
# whole-node-memory feature.
def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
# Avoid submitting internal jobs to the batch queue, handle locally
local_id = self.handleLocalJob(command, job_desc)
if local_id is not None:
return local_id
else:
self.check_resource_request(job_desc)
gpus = self.count_needed_gpus(job_desc)
job_id = self.getNextJobID()
self.currentJobs.add(job_id)

if "memory" not in job_desc.requirements and self.config.slurm_default_all_mem: # type: ignore[attr-defined]
# The job doesn't have its own memory requirement, and we are
# defaulting to whole node memory. Use Slurm's 0-memory sentinel.
memory = 0
else:
# Use the memory actually on the job, or the Toil default memory
memory = job_desc.memory

self.newJobsQueue.put((job_id, job_desc.cores, memory, command, get_job_kind(job_desc.get_names()),
job_environment, gpus))
logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(job_id),
get_job_kind(job_desc.get_names()))
return job_id

def _check_accelerator_request(self, requirer: Requirer) -> None:
for accelerator in requirer.accelerators:
if accelerator['kind'] != 'gpu':
Expand All @@ -647,26 +676,30 @@ def _check_accelerator_request(self, requirer: Requirer) -> None:

@classmethod
def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None:
allocate_mem = parser.add_mutually_exclusive_group()
allocate_mem_help = ("A flag that can block allocating memory with '--mem' for job submissions "
"on SLURM since some system servers may reject any job request that "
"explicitly specifies the memory allocation. The default is to always allocate memory.")
allocate_mem.add_argument("--dont_allocate_mem", action='store_false', dest="allocate_mem", help=allocate_mem_help)
allocate_mem.add_argument("--allocate_mem", action='store_true', dest="allocate_mem", help=allocate_mem_help)
allocate_mem.set_defaults(allocate_mem=True)

parser.add_argument("--slurmAllocateMem", dest="slurm_allocate_mem", type=strtobool, default=True, env_var="TOIL_SLURM_ALLOCATE_MEM",
help="If False, do not use --mem. Used as a workaround for Slurm clusters that reject jobs "
"with memory allocations.")
# Keep these deprcated options for backward compatibility
parser.add_argument("--dont_allocate_mem", action='store_false', dest="slurm_allocate_mem", help=SUPPRESS)
parser.add_argument("--allocate_mem", action='store_true', dest="slurm_allocate_mem", help=SUPPRESS)

parser.add_argument("--slurmDefaultAllMem", dest="slurm_default_all_mem", type=strtobool, default=False, env_var="TOIL_SLURM_DEFAULT_ALL_MEM",
help="If True, assign Toil jobs without their own memory requirements all available "
"memory on a Slurm node (via Slurm --mem=0).")
parser.add_argument("--slurmTime", dest="slurm_time", type=parse_slurm_time, default=None, env_var="TOIL_SLURM_TIME",
help="Slurm job time limit, in [DD-]HH:MM:SS format")
help="Slurm job time limit, in [DD-]HH:MM:SS format.")
parser.add_argument("--slurmPE", dest="slurm_pe", default=None, env_var="TOIL_SLURM_PE",
help="Special partition to send Slurm jobs to if they ask for more than 1 CPU")
help="Special partition to send Slurm jobs to if they ask for more than 1 CPU.")
parser.add_argument("--slurmArgs", dest="slurm_args", default="", env_var="TOIL_SLURM_ARGS",
help="Extra arguments to pass to Slurm")
help="Extra arguments to pass to Slurm.")


OptionType = TypeVar('OptionType')
@classmethod
def setOptions(cls, setOption: OptionSetter) -> None:
setOption("allocate_mem")
setOption("slurm_allocate_mem")
setOption("slurm_default_all_mem")
setOption("slurm_time")
setOption("slurm_pe")
setOption("slurm_args")
Expand Down
26 changes: 22 additions & 4 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ def visit(
# wherever else we would stage it.
# TODO: why would we do that?
stagedir = cast(Optional[str], obj.get("dirname")) or stagedir

if obj["class"] not in ("File", "Directory"):
# We only handle files and directories; only they have locations.
return
Expand Down Expand Up @@ -990,7 +990,7 @@ def visit(
# reference, we just pass that along.

"""Link or copy files to their targets. Create them as needed."""

logger.debug(
"ToilPathMapper adding file mapping %s -> %s", deref, tgt
)
Expand Down Expand Up @@ -2473,6 +2473,20 @@ def __init__(

req = tool.evalResources(self.builder, runtime_context)

tool_own_resources = tool.get_requirement("ResourceRequirement")[0] or {}
if "ramMin" in tool_own_resources or "ramMax" in tool_own_resources:
# The tool is actually asking for memory.
memory = int(req["ram"] * (2**20))
else:
# The tool is getting a default ram allocation.
if getattr(runtime_context, "cwl_default_ram"):
# We will respect the CWL spec and apply the default cwltool
# computed, which might be different than Toil's default.
memory = int(req["ram"] * (2**20))
else:
# We use a None requirement and the Toil default applies.
memory = None

accelerators: Optional[List[AcceleratorRequirement]] = None
if req.get("cudaDeviceCount", 0) > 0:
# There's a CUDARequirement, which cwltool processed for us
Expand Down Expand Up @@ -2537,7 +2551,7 @@ def __init__(

super().__init__(
cores=req["cores"],
memory=int(req["ram"] * (2**20)),
memory=memory,
disk=int(total_disk),
accelerators=accelerators,
preemptible=preemptible,
Expand Down Expand Up @@ -3837,6 +3851,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
)
runtime_context.workdir = workdir # type: ignore[attr-defined]
runtime_context.outdir = outdir
setattr(runtime_context, "cwl_default_ram", options.cwl_default_ram)
runtime_context.move_outputs = "leave"
runtime_context.rm_tmpdir = False
runtime_context.streaming_allowed = not options.disable_streaming
Expand Down Expand Up @@ -3875,11 +3890,14 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
if options.restart:
outobj = toil.restart()
else:
# TODO: why are we doing this? Does this get applied to all
# tools as a default or something?
loading_context.hints = [
{
"class": "ResourceRequirement",
"coresMin": toil.config.defaultCores,
"ramMin": toil.config.defaultMemory / (2**20),
# Don't include any RAM requirement because we want to
# know when tools don't manually ask for RAM.
"outdirMin": toil.config.defaultDisk / (2**20),
"tmpdirMin": 0,
}
Expand Down
1 change: 0 additions & 1 deletion src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,6 @@ def killJobs(self, jobsToKill, exit_reason: BatchJobExitReason = BatchJobExitRea

return jobsRerunning


#Following functions handle error cases for when jobs have gone awry with the batch system.

def reissueOverLongJobs(self) -> None:
Expand Down
Loading

0 comments on commit 631ae0c

Please sign in to comment.