-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Docs: Add a tutorial for how to use Prefect with a job scheduler (e.g. Slurm) on an HPC machine #10136
Comments
I'm all in for a tutorial on how to use Prefect with a stack using |
I must admit that I have no clue what any of those 3 are 😅 I was thinking more of the recommended approach of using a |
This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment. |
I would also be interested in such a tutorial. In particular a simple tutorial that takes a Prefect workflow, submits it as a Slurm job. And monitors its execution. I'm able to help all y'all, and can test it at NERSC. I'm also interested in embarrassingly parallel tasks, and tasks that communicate using MPI across nodes. |
@JBlaschke: Here is an example I wrote up for Perlmutter. Feel free to use this and do whatever you'd like with it. I think the challenge I ran into for practical use was that you can only apply a single task runner for the entire flow, meaning if some of the individual tasks have different compute requirements, you're mostly out of luck (unless I missed something). There is also the fact that the compute nodes need to be able to communicate with the Prefect server, which not all HPC machines can do if using Prefect Cloud, but that's not a problem at NERSC. Note to future me: the content below was taken from my 0.2.0 release of Some Utility Functionsfrom __future__ import annotations
from typing import Callable, TYPE_CHECKING
from dask_jobqueue import SLURMCluster
if TYPE_CHECKING:
from dask_jobqueue.core import DaskJobqueueJob
from prefect_dask.task_runners import DaskTaskRunner
def make_dask_runner(
cluster_kwargs: dict,
cluster_class: Callable = None,
adapt_kwargs: dict[str, int | None] | None = None,
client_kwargs: dict = None,
temporary: bool = False,
) -> DaskTaskRunner:
"""
Make a DaskTaskRunner for use with Prefect workflows.
Parameters
----------
cluster_kwargs
Keyword arguments to pass to `cluster_class`.
cluster_class
The Dask cluster class to use. Defaults to `dask_jobqueue.SLURMCluster`.
adapt_kwargs
Keyword arguments to pass to `cluster.adapt` of the form `{"minimum": int, "maximum": int}`.
If `None`, no adaptive scaling will be done.
client_kwargs
Keyword arguments to pass to `dask.distributed.Client`.
temporary
Whether to use a temporary cluster. If `True`, the cluster will be
terminated once the `Flow` is finished. If `False`, the cluster will
run until the walltime is reached and can run multiple `Flow`s.
Returns
-------
DaskTaskRunner
A DaskTaskRunner object for use with Prefect workflows.
"""
from dask_jobqueue import SLURMCluster
from prefect_dask.task_runners import DaskTaskRunner
if cluster_class is None:
cluster_class = SLURMCluster
# Make the one-time-use DaskTaskRunner
if temporary:
return DaskTaskRunner(
cluster_class=cluster_class,
cluster_kwargs=cluster_kwargs,
adapt_kwargs=adapt_kwargs,
client_kwargs=client_kwargs,
)
# Make the Dask cluster
cluster = _make_dask_cluster(cluster_class, cluster_kwargs)
# Set up adaptive scaling
if adapt_kwargs and (adapt_kwargs["minimum"] or adapt_kwargs["maximum"]):
cluster.adapt(minimum=adapt_kwargs["minimum"], maximum=adapt_kwargs["maximum"])
# Return the DaskTaskRunner with the cluster address
return DaskTaskRunner(address=cluster.scheduler_address)
def _make_dask_cluster(
cluster_class: Callable = SLURMCluster, cluster_kwargs: dict, verbose: bool = True
) -> DaskJobqueueJob:
"""
Make a Dask cluster for use with Prefect workflows.
Parameters
----------
cluster_class
The Dask cluster class to use. Defaults to `dask_jobqueue.SLURMCluster`.
cluster_kwargs
Keyword arguments to pass to `cluster_class`.
verbose
Whether to print the job script to stdout.
"""
cluster = cluster_class(**cluster_kwargs)
if verbose:
print(
f"Workers are submitted with the following job script:\n{cluster.job_script()}"
)
print(f"Scheduler is running at {cluster.scheduler.address}")
print(f"Dashboard is located at {cluster.dashboard_link}")
return cluster Example Usagen_slurm_jobs = 1 # Number of Slurm jobs to launch in parallel.
n_nodes_per_calc = 1 # Number of nodes to reserve for each Slurm job.
n_cores_per_node = 48 # Number of CPU cores per node.
mem_per_node = "64 GB" # Total memory per node.
cluster_kwargs = {
# Dask worker options
"n_workers": n_slurm_jobs,
"cores": n_cores_per_node,
"memory": mem_per_node,
# SLURM options
"shebang": "#!/bin/bash",
"account": "AccountName",
"walltime": "00:10:00",
"job_mem": "0",
"job_script_prologue": [
"source ~/.bashrc",
"conda activate MyEnv",
],
"job_directives_skip": ["-n", "--cpus-per-task"],
"job_extra_directives": [f"-N {n_nodes_per_calc}", "-q debug", "-C cpu"],
"python": "python",
}
runner = make_dask_runner(cluster_kwargs, temporary=True)
@flow(task_runner=runner)
def workflow(*args, **kwargs):
... When the workflow is run from the login node, it will be submitted to the job scheduling system (Slurm by default), and the results will be sent back to Prefect Cloud once completed. Refer to the Dask-Jobqueue Documentation for the available To asynchronously spawn a Slurm job that continually pulls in work for the duration of its walltime (rather than starting and terminating over the lifetime of the associated runner = make_dask_runner(cluster_kwargs) Additionally, you can have the generated Dask cluster adaptively scale based on the amount of work available by setting runner = make_dask_runner(cluster_kwargs, adapt_kwargs={"minimum": 1, "maximum": 5}) This will ensure that at least one Slurm job is always running, but the number of jobs will scale up to 5 if there is enough work available. |
@Andrew-S-Rosen thank you -- I'll dust this off and condense it into a tutorial for our docs. |
Here is a worked example for those curious. It is meant to work on the Perlmutter machine at NERSC, but the insights are largely machine-agnostic. Preliminary StepsOn the login node: pip install prefect[dask] dask-jobqueue
prefect cloud login Basic ExamplePrefect has a Dask and Ray backend, but only Dask (via We start by defining the from dask.distributed import Client
from dask_jobqueue import SLURMCluster
cluster_kwargs = {
"cores": 128,
"memory": "64 GB",
"shebang": "#!/bin/bash",
"account": "MyAccountName",
"walltime": "00:10:00",
"job_mem": "0",
"job_script_prologue": ["source ~/.bashrc"],
"job_directives_skip": ["-n", "--cpus-per-task"],
"job_extra_directives": ["-q debug", "-C cpu"],
}
cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script()) Now we define how many Slurm jobs we want with those specs and instantiate the Dask cluster. This will immediately submit a job to the queue even though we don't have any compute tasks to run just yet. slurm_jobs = 1
cluster.scale(jobs=slurm_jobs)
client = Client(cluster) Now we'll define our Prefect workflow ( from prefect import flow, task
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def workflow(a: float, b: float) -> float:
output1 = add.submit(a, b)
output2 = mult.submit(output1, b)
return output2
@task
def add(a: float, b: float) -> float:
return a + b
@task
def mult(a: float, b: float) -> float:
return a * b Now we instantiate and execute the workflow. The progress can be traced in Prefect Cloud. output = workflow(1, 2)
print(output.result()) Since the Dask cluster remains alive until the walltime or it is killed, we can run another workflow if we want. Temporary Dask ClusterSome users may prefer to spin up a Dask cluster (i.e. Slurm job) for each indiviual from prefect import flow, task
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(cluster_class=SLURMCluster, cluster_kwargs=cluster_kwargs))
def workflow(a: float, b: float) -> float:
output1 = add.submit(a, b)
output2 = mult.submit(output1, b)
return output2
@task
def add(a: float, b: float) -> float:
return a + b
@task
def mult(a: float, b: float) -> float:
return a * b
workflow(1, 2).result() At this point, since the LimitationsPrefect Cloud is Fine at NERSC But Not EverywhereNeed a network connection from the compute node. Killed Slurm Jobs Aren't Reflected in the UIIf you Pilot Job Behavior is LimitedThe main limitation of Prefect in this kind of setup is that it inherits the limitations of
Some details:
Concurrent
|
Hello, I'm trying to use this example to get tasks to execute on the actual compute node and running into difficulties. Here is what I have so far: #!/usr/bin/env python3
"""
Submits work to SLURM via dask_jobqueue.
In this example, we use Dask in conjunction with Prefect to submit a job to SLURM,
and retrieve information about the SLURM node and job specifications. No compute-heavy
work is done here.
"""
import os
import socket
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from prefect import flow, task
from prefect.logging import get_run_logger
from prefect_dask import DaskTaskRunner
CLUSTER_KWARGS = {
"account": "computing.computing",
"walltime": "00:10:00",
"memory": "8Gb",
"cores": 1,
"n_workers": 1,
}
CLUSTER = SLURMCluster(**CLUSTER_KWARGS)
SLURM_JOBS = 1
CLUSTER.scale(jobs=SLURM_JOBS)
CLIENT = Client(CLUSTER)
@task
def show_job_script():
logger = get_run_logger()
logger.info(CLUSTER.job_script())
@task
def show_slurm_info():
logger = get_run_logger()
logger.info("Finding slurm environment variables...")
for var_name, var_value in os.environ.items():
if var_name.startswith("SLURM"):
logger.info(f"{var_name}={var_value}")
@task
def show_hostname():
logger = get_run_logger()
hostname = socket.getfqdn()
logger.info(f"Running on {hostname}")
@flow(task_runner=DaskTaskRunner(address=CLIENT.scheduler.address))
def display_slurm_info():
show_job_script()
show_slurm_info()
show_hostname()
if __name__ == "__main__":
display_slurm_info() Here is the corresponding output: pgierz in 🌐 albedo0 in prefect-examples on main [?] via 🐍 v3.10.6 (python-3.10.6) took 43s
❯ python prefect-slurm-info.py
09:13:57.780 | INFO | prefect.engine - Created flow run 'muscular-kingfisher' for flow 'display-slurm-info'
09:13:57.783 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/fe8cd180-eb51-4c4a-93d5-c815df87e28b
09:13:57.834 | INFO | prefect.task_runner.dask - Connecting to existing Dask cluster SLURMCluster(d4d0253f, 'tcp://10.100.1.1:38383', workers=0, threads=0, memory=0 B)
09:13:57.877 | INFO | Task run 'show_job_script-355' - Created task run 'show_job_script-355' for task 'show_job_script'
09:13:57.898 | INFO | Task run 'show_job_script-355' - #!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -A computing.computing
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=8G
#SBATCH -t 00:10:00
/albedo/home/pgierz/Code/gitlab.awi.de/hpc/tutorials/prefect-examples/.direnv/python-3.10.6/bin/python -m distributed.cli.dask_worker tcp://10.100.1.1:38383 --name dummy-name --nthreads 1 --memory-limit 7.45GiB --nanny --death-timeout 60
09:13:57.901 | INFO | Task run 'show_job_script-355' - Finished in state Completed()
09:13:57.918 | INFO | Task run 'show_slurm_info-73d' - Created task run 'show_slurm_info-73d' for task 'show_slurm_info'
09:13:57.922 | INFO | Task run 'show_slurm_info-73d' - Finding slurm environment variables...
09:13:57.923 | INFO | Task run 'show_slurm_info-73d' - SLURM_ACCOUNT=computing.computing
09:13:57.927 | INFO | Task run 'show_slurm_info-73d' - Finished in state Completed()
09:13:57.944 | INFO | Task run 'show_hostname-cb1' - Created task run 'show_hostname-cb1' for task 'show_hostname'
09:13:57.950 | INFO | Task run 'show_hostname-cb1' - Running on albedo0
09:13:57.952 | INFO | Task run 'show_hostname-cb1' - Finished in state Completed()
09:13:57.998 | INFO | Flow run 'muscular-kingfisher' - Finished in state Completed() From the output, it is evident that I am still getting information from a login node (these are called What am I doing wrong? |
Hi @pgierz Does squeue show your job as running? Did slurm throw an error (most likely in a log somewhere) Tbh I don't really know how Dask interacts with Slurm behind the scenes -- at NERSC we actually have make sure Dask is configured correctly (eg. https://gitlab.com/NERSC/nersc-notebooks/-/tree/main/perlmutter/dask) in order to interact with the network correctly. Is there a reason you're using the Dask Slurm cluster? At NERSC we're working on a way to interact with Slurm directly, which might side-step issues like these. |
Hi @JBlaschke, I could see the job running, and the logs don't show anything interesting, just our standard SLURM prolog output.
I would be very interested in something like this, and am also more than happy to contribute, if that is something you would need more manpower for. |
If anyone is interested in looking through what we are doing, the examples are here: https://gitlab.awi.de/hpc/tutorials/prefect-examples |
Moin @JBlaschke, any updates on how NERSC is using Prefect directly with SLURM? I am about to start on yet another project that would benefit from such an integration, so I would be curious about any progress. |
First check
Describe the issue
There are currently no examples for how to use Prefect with a job scheduling system (e.g. SLURM, PBS, MOAB) on an HPC machine. I think this is a pretty important omission because most academics and users of the top supercomputers might not be aware how they can use Prefect.
Describe the proposed change
Add a tutorial.
Additional context
There's mention of spinning up a Dask cluster but no representative example of how this is done in practice with a given job scheduling system.
The text was updated successfully, but these errors were encountered: