diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..17930a4 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 88 +extend-ignore = E203,E501 \ No newline at end of file diff --git a/.github/codeql/config.yml b/.github/codeql/config.yml new file mode 100644 index 0000000..963b0dd --- /dev/null +++ b/.github/codeql/config.yml @@ -0,0 +1,12 @@ +name: "StreamFlow LSF CodeQL configuration" +queries: + - uses: security-and-quality +paths-ignore: + - tests +query-filters: + # Reason: this rule targets XSS, which is not a concern here + - exclude: + id: py/jinja2/autoescape-false + # Reason: false positive on function body ellipsis (issue 11351) + - exclude: + id: py/ineffectual-statement diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..1532070 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "daily" \ No newline at end of file diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml new file mode 100644 index 0000000..ea17d54 --- /dev/null +++ b/.github/workflows/ci-tests.yaml @@ -0,0 +1,48 @@ +name: "CI Tests" +on: + push: + branches: + - master + pull_request: + branches: + - master +concurrency: + group: build-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true +jobs: + code-ql-check: + name: "CodeQL check" + runs-on: ubuntu-22.04 + permissions: + security-events: write + steps: + - uses: actions/checkout@v4 + - uses: github/codeql-action/init@v3 + with: + config-file: .github/codeql/config.yml + languages: python + - uses: github/codeql-action/analyze@v3 + static-checks: + name: "Static checks" + runs-on: ubuntu-22.04 + strategy: + matrix: + step: [ "bandit", "lint" ] + env: + TOXENV: ${{ matrix.step }} + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + cache-dependency-path: | + requirements.txt + test-requirements.txt + tox.ini + - name: "Install Python Dependencies and plugin" + run: | + python -m pip install tox --user + python -m pip install . --user + - name: "Run static analysis via Tox" + run: tox \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..f7d3a4a --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,60 @@ +name: "Release new version" +on: + workflow_run: + workflows: + - "CI Tests" + branches: + - master + types: + - completed +jobs: + github: + name: "Create GitHub Release" + runs-on: ubuntu-22.04 + permissions: + contents: write + if: ${{ github.event.workflow_run.conclusion == 'success' }} + steps: + - uses: actions/checkout@v4 + - name: "Get version" + run: echo "PLUGIN_VERSION=$(cat streamflow_lsf/version.py | grep -oP '(?<=VERSION = \")(.*)(?=\")')" >> $GITHUB_ENV + - name: "Check tag existence" + uses: mukunku/tag-exists-action@v1.6.0 + id: check-tag + with: + tag: ${{ env.PLUGIN_VERSION }} + - name: "Create Release" + id: create-release + uses: actions/create-release@v1 + if: ${{ steps.check-tag.outputs.exists == 'false' }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ env.PLUGIN_VERSION }} + release_name: ${{ env.PLUGIN_VERSION }} + draft: false + prerelease: false + pypi: + name: "Publish on PyPI" + runs-on: ubuntu-22.04 + if: ${{ github.event.workflow_run.conclusion == 'success' }} + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.10" + - name: "Get local version" + run: echo "PLUGIN_VERSION=$(cat streamflow_lsf/version.py | grep -oP '(?<=VERSION = \")(.*)(?=\")')" >> $GITHUB_ENV + - name: "Get PyPI version" + run: echo "PYPI_VERSION=$(pip index versions --pre streamflow-lsf | grep streamflow-lsf | sed 's/.*(\(.*\))/\1/')" >> $GITHUB_ENV + - name: "Build Python packages" + if: ${{ env.PLUGIN_VERSION != env.PYPI_VERSION }} + run: | + python -m pip install build --user + python -m build --sdist --wheel --outdir dist/ . + - name: "Publish package to PyPI" + uses: pypa/gh-action-pypi-publish@release/v1 + if: ${{ env.PLUGIN_VERSION != env.PYPI_VERSION }} + with: + user: __token__ + password: ${{ secrets.PYPI_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..880f52a --- /dev/null +++ b/.gitignore @@ -0,0 +1,133 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# IDE +.idea + +# Mac OS X +.DS_Store + +# StreamFlow +.streamflow diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0a04128 --- /dev/null +++ b/LICENSE @@ -0,0 +1,165 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + + This version of the GNU Lesser General Public License incorporates +the terms and conditions of version 3 of the GNU General Public +License, supplemented by the additional permissions listed below. + + 0. Additional Definitions. + + As used herein, "this License" refers to version 3 of the GNU Lesser +General Public License, and the "GNU GPL" refers to version 3 of the GNU +General Public License. + + "The Library" refers to a covered work governed by this License, +other than an Application or a Combined Work as defined below. + + An "Application" is any work that makes use of an interface provided +by the Library, but which is not otherwise based on the Library. +Defining a subclass of a class defined by the Library is deemed a mode +of using an interface provided by the Library. + + A "Combined Work" is a work produced by combining or linking an +Application with the Library. The particular version of the Library +with which the Combined Work was made is also called the "Linked +Version". + + The "Minimal Corresponding Source" for a Combined Work means the +Corresponding Source for the Combined Work, excluding any source code +for portions of the Combined Work that, considered in isolation, are +based on the Application, and not on the Linked Version. + + The "Corresponding Application Code" for a Combined Work means the +object code and/or source code for the Application, including any data +and utility programs needed for reproducing the Combined Work from the +Application, but excluding the System Libraries of the Combined Work. + + 1. Exception to Section 3 of the GNU GPL. + + You may convey a covered work under sections 3 and 4 of this License +without being bound by section 3 of the GNU GPL. + + 2. Conveying Modified Versions. + + If you modify a copy of the Library, and, in your modifications, a +facility refers to a function or data to be supplied by an Application +that uses the facility (other than as an argument passed when the +facility is invoked), then you may convey a copy of the modified +version: + + a) under this License, provided that you make a good faith effort to + ensure that, in the event an Application does not supply the + function or data, the facility still operates, and performs + whatever part of its purpose remains meaningful, or + + b) under the GNU GPL, with none of the additional permissions of + this License applicable to that copy. + + 3. Object Code Incorporating Material from Library Header Files. + + The object code form of an Application may incorporate material from +a header file that is part of the Library. You may convey such object +code under terms of your choice, provided that, if the incorporated +material is not limited to numerical parameters, data structure +layouts and accessors, or small macros, inline functions and templates +(ten or fewer lines in length), you do both of the following: + + a) Give prominent notice with each copy of the object code that the + Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the object code with a copy of the GNU GPL and this license + document. + + 4. Combined Works. + + You may convey a Combined Work under terms of your choice that, +taken together, effectively do not restrict modification of the +portions of the Library contained in the Combined Work and reverse +engineering for debugging such modifications, if you also do each of +the following: + + a) Give prominent notice with each copy of the Combined Work that + the Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the Combined Work with a copy of the GNU GPL and this license + document. + + c) For a Combined Work that displays copyright notices during + execution, include the copyright notice for the Library among + these notices, as well as a reference directing the user to the + copies of the GNU GPL and this license document. + + d) Do one of the following: + + 0) Convey the Minimal Corresponding Source under the terms of this + License, and the Corresponding Application Code in a form + suitable for, and under terms that permit, the user to + recombine or relink the Application with a modified version of + the Linked Version to produce a modified Combined Work, in the + manner specified by section 6 of the GNU GPL for conveying + Corresponding Source. + + 1) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (a) uses at run time + a copy of the Library already present on the user's computer + system, and (b) will operate properly with a modified version + of the Library that is interface-compatible with the Linked + Version. + + e) Provide Installation Information, but only if you would otherwise + be required to provide such information under section 6 of the + GNU GPL, and only to the extent that such information is + necessary to install and execute a modified version of the + Combined Work produced by recombining or relinking the + Application with a modified version of the Linked Version. (If + you use option 4d0, the Installation Information must accompany + the Minimal Corresponding Source and Corresponding Application + Code. If you use option 4d1, you must provide the Installation + Information in the manner specified by section 6 of the GNU GPL + for conveying Corresponding Source.) + + 5. Combined Libraries. + + You may place library facilities that are a work based on the +Library side by side in a single library together with other library +facilities that are not Applications and are not covered by this +License, and convey such a combined library under terms of your +choice, if you do both of the following: + + a) Accompany the combined library with a copy of the same work based + on the Library, uncombined with any other library facilities, + conveyed under the terms of this License. + + b) Give prominent notice with the combined library that part of it + is a work based on the Library, and explaining where to find the + accompanying uncombined form of the same work. + + 6. Revised Versions of the GNU Lesser General Public License. + + The Free Software Foundation may publish revised and/or new versions +of the GNU Lesser General Public License from time to time. Such new +versions will be similar in spirit to the present version, but may +differ in detail to address new problems or concerns. + + Each version is given a distinguishing version number. If the +Library as you received it specifies that a certain numbered version +of the GNU Lesser General Public License "or any later version" +applies to it, you have the option of following the terms and +conditions either of that published version or of any later version +published by the Free Software Foundation. If the Library as you +received it does not specify a version number of the GNU Lesser +General Public License, you may choose any version of the GNU Lesser +General Public License ever published by the Free Software Foundation. + + If the Library as you received it specifies that a proxy can decide +whether future versions of the GNU Lesser General Public License shall +apply, that proxy's public statement of acceptance of any version is +permanent authorization for you to choose that version for the +Library. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..d0c2369 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,6 @@ +include LICENSE +include README.md +include requirements.txt +include bandit-requirements.txt +include lint-requirements.txt +include test-requirements.txt \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d6ca487 --- /dev/null +++ b/Makefile @@ -0,0 +1,29 @@ +codespell: + codespell -w $(shell git ls-files) + +codespell-check: + codespell $(shell git ls-files) + +coverage.xml: testcov + coverage xml + +coverage-report: testcov + coverage report + +flake8: + flake8 streamflow_lsf tests + +format: + black streamflow_lsf tests + +format-check: + black --diff --check streamflow_lsf tests + +pyupgrade: + pyupgrade --py3-only --py38-plus $(shell git ls-files | grep .py) + +test: + python -m pytest -rs ${PYTEST_EXTRA} + +testcov: + python -m pytest -rs --cov --cov-report= ${PYTEST_EXTRA} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e9ca53d --- /dev/null +++ b/README.md @@ -0,0 +1,30 @@ +# LSF Plugin for StreamFlow + +## Installation +Simply install the package directory from [PyPI]() using [pip](https://pip.pypa.io/en/stable/). StreamFlow will automatically recognise it as a plugin and load it at each workflow execution. +```bash +pip install streamflow-lsf +``` + +If everything worked correctly, whenever a workflow execution start the following message should be printed in the log: +```bash +Successfully registered plugin streamflow_lsf.plugin.LSFStreamFlowPlugin +``` + +## Usage +This plugin registers a new `Connector` component, called `LSFConnector`, which extends the StreamFlow `ConnectorWrapper` class. This implies that the `LSFConnector` can wrap an underlying `Connector` object through the `wraps` directive. The example below shows a possible `streamflow.yml` configuration file, where the `LSFConnector` wraps an `SSHConnector` for remote execution offloading. +```bash +deployments: + ssh-deplyoment: + type: ssh + config: + nodes: + - 10.0.0.1 + - 10.0.0.2 + sshKey: /path/to/ssh/key + username: + lsf-deployment: + type: unito.lsf + config: {} + wraps: ssh-deplyoment +``` \ No newline at end of file diff --git a/bandit-requirements.txt b/bandit-requirements.txt new file mode 100644 index 0000000..af37a2d --- /dev/null +++ b/bandit-requirements.txt @@ -0,0 +1 @@ +bandit==1.7.5 \ No newline at end of file diff --git a/lint-requirements.txt b/lint-requirements.txt new file mode 100644 index 0000000..9574456 --- /dev/null +++ b/lint-requirements.txt @@ -0,0 +1,4 @@ +black==23.7.0 +codespell==2.2.5 +flake8-bugbear==23.7.10 +pyupgrade==3.10.1 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..0db940d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,54 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "streamflow-lsf" +authors = [ + {name = "Iacopo Colonnelli", email = "iacopo.colonnelli@unito.it"}, + {name = "Alberto Mulone", email = "alberto.mulone@unito.it"} +] +description = "StreamFlow LSF plugin" +readme = "README.md" +requires-python = ">=3.8" +license = {text = "LGPL-3.0-or-later"} +classifiers = [ + "Development Status :: 3 - Alpha", + "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)", + "Intended Audience :: Science/Research", + "Operating System :: POSIX", + "Operating System :: MacOS", + "Programming Language :: Python", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering", + "Topic :: System :: Distributed Computing", +] +dynamic = ["dependencies", "version"] + +[project.urls] +Repository = "https://github.com/alpha-unito/streamflow-lsf" + +[project.entry-points] +"unito.streamflow.plugin" = {"unito.lsf" = "streamflow_lsf.plugin:LSFStreamFlowPlugin"} + +[tool.setuptools] +packages = [ + "streamflow_lsf" +] +zip-safe = true +[tool.setuptools.package-data] +"streamflow_lsf" = ["schemas/*.json"] + +[tool.setuptools.dynamic] +dependencies = {file = "requirements.txt"} +version = {attr = "streamflow_lsf.version.VERSION"} + +[tool.setuptools.dynamic.optional-dependencies] +bandit = {file = "bandit-requirements.txt"} +lint = {file = "lint-requirements.txt"} +test = {file = "test-requirements.txt"} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..446e70e --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +streamflow @ git+https://github.com/alpha-unito/streamflow@refs/pull/390/head diff --git a/streamflow_lsf/__init__.py b/streamflow_lsf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/streamflow_lsf/connector.py b/streamflow_lsf/connector.py new file mode 100644 index 0000000..4653984 --- /dev/null +++ b/streamflow_lsf/connector.py @@ -0,0 +1,390 @@ +from __future__ import annotations + +import asyncio.subprocess +import base64 +import json +import logging +import os +import re +import shlex +from typing import Any, MutableMapping, MutableSequence, cast + +from importlib_resources import files +from streamflow.core import utils +from streamflow.core.deployment import ExecutionLocation +from streamflow.core.exception import WorkflowExecutionException +from streamflow.core.utils import get_option +from streamflow.deployment.connector.queue_manager import ( + QueueManagerConnector, + QueueManagerService, +) +from streamflow.log_handler import logger + + +def get_lsf_option( + name: str, + value: Any, +): + option = get_option(name, value) + return option[1:] if len(name) > 1 else option + + +def _get_result(job_id, stdout, field_names): + if ((json_start := stdout.find("{")) != -1) and ( + (json_end := stdout.rfind("}")) != -1 + ): + try: + output = json.loads(stdout[json_start : (json_end + 1)]) + if len(output["RECORDS"]) != 1: + raise WorkflowExecutionException( + f"Error while retrieving job {job_id} record: {stdout.strip()}" + ) + return [output["RECORDS"][0][field_name] for field_name in field_names] + except json.decoder.JSONDecodeError: + raise WorkflowExecutionException( + f"Error while decoding JSON output for job {job_id}: {stdout.strip()}" + ) + else: + raise WorkflowExecutionException( + f"Error while retrieving job {job_id} record: {stdout.strip()}" + ) + + +class LSFService(QueueManagerService): + def __init__( + self, + file: str | None = None, + applicationProfile: str | None = None, + autoResizable: bool | None = None, + checkpoint: str | None = None, + clusters: str | None = None, + coreLimit: int | None = None, + cpuTimeLimit: str | None = None, + data: str | None = None, + dataLimit: int | None = None, + datagrp: str | None = None, + dynamicInputFile: str | None = None, + eligiblePendingTimeLimit: str | None = None, + enableSpool: bool | None = None, + env: str | None = None, + errorFileAppend: str | None = None, + errorFileOverwrite: str | None = None, + estimatedRunningTime: int | None = None, + exclusive: bool | None = None, + externalSchedulerOptions: str | None = None, + fileSizeLimit: int | None = None, + freq: int | None = None, + hostfile: str | None = None, + inputFile: str | None = None, + jobDescription: str | None = None, + jobGroup: str | None = None, + jobName: str | None = None, + jsdlFile: str | None = None, + jsdlStrict: str | None = None, + licenseProject: str | None = None, + localFile: str | None = None, + locationRequired: str | None = None, + mailUser: str | None = None, + memAndSwapLimit: bool | None = None, + memoryLimit: int | None = None, + migrationThreshold: int | None = None, + noRerunnable: bool | None = None, + numOfTasks: str | None = None, + pendingTimeLimit: str | None = None, + postExecCommand: str | None = None, + preExecCommand: str | None = None, + priority: int | None = None, + processLimit: str | None = None, + projectName: str | None = None, + queueName: str | None = None, + requeueExitValue: str | None = None, + rerunOnHostFailure: bool | None = None, + reservation: str | None = None, + resizeNotificationCmd: str | None = None, + resourceRequirements: str | None = None, + runLimit: int | None = None, + sendMail: bool | None = None, + serviceClassName: str | None = None, + signal: str | None = None, + stackSizeLimit: int | None = None, + startTime: str | None = None, + terminationDeadline: str | None = None, + threadLimit: int | None = None, + userGroup: str | None = None, + virtualMemLimit: int | None = None, + warningAction: str | None = None, + warningTimeAction: str | None = None, + ): + super().__init__(file) + self.application_profile: str | None = applicationProfile + self.auto_resizable: bool | None = autoResizable + self.checkpoint: str | None = checkpoint + self.clusters: str | None = clusters + self.core_limit: int | None = coreLimit + self.cpu_time_limit: str | None = cpuTimeLimit + self.data: str | None = data + self.data_limit: int | None = dataLimit + self.datagrp: str | None = datagrp + self.dynamic_input_file: str | None = dynamicInputFile + self.eligible_pending_time_limit: str | None = eligiblePendingTimeLimit + self.enable_spool: bool | None = enableSpool + self.env: str | None = env + self.error_file_append: str | None = errorFileAppend + self.error_file_overwrite: str | None = errorFileOverwrite + self.estimated_running_time: int | None = estimatedRunningTime + self.exclusive: bool | None = exclusive + self.external_scheduler_options: str | None = externalSchedulerOptions + self.file_size_limit: int | None = fileSizeLimit + self.freq: int | None = freq + self.hostfile: str | None = hostfile + self.location_required: str | None = locationRequired + self.input_file: str | None = inputFile + self.job_description: str | None = jobDescription + self.job_group: str | None = jobGroup + self.job_name: str | None = jobName + self.jsdl_file: str | None = jsdlFile + self.jsdl_strict: str | None = jsdlStrict + self.license_project: str | None = licenseProject + self.local_file: str | None = localFile + self.mail_user: str | None = mailUser + self.mem_and_swap_imit: bool | None = memAndSwapLimit + self.memory_limit: int | None = memoryLimit + self.migration_threshold: int | None = migrationThreshold + self.no_rerunnable: str | None = noRerunnable + self.num_of_tasks: str | None = numOfTasks + self.pending_time_limit: str | None = pendingTimeLimit + self.post_exec_command: str | None = postExecCommand + self.pre_exec_command: str | None = preExecCommand + self.priority: int | None = priority + self.process_limit: int | None = processLimit + self.project_name: str | None = projectName + self.queue_name: str | None = queueName + self.requeue_exit_value: str | None = requeueExitValue + self.rerun_on_host_failure: bool | None = rerunOnHostFailure + self.reservation: str | None = reservation + self.resize_notification_cmd: str | None = resizeNotificationCmd + self.resource_requirements: str | None = resourceRequirements + self.run_limit: int | None = runLimit + self.send_mail: bool | None = sendMail + self.service_class_name: str | None = serviceClassName + self.signal: str | None = signal + self.stack_size_limit: int | None = stackSizeLimit + self.start_time: str | None = startTime + self.termination_deadline: str | None = terminationDeadline + self.thread_limit: int | None = threadLimit + self.user_group: str | None = userGroup + self.virtual_mem_limit: int | None = virtualMemLimit + self.warning_action: str | None = warningAction + self.warning_time_action: str | None = warningTimeAction + + +class LSFConnector(QueueManagerConnector): + async def _get_output(self, job_id: str, location: ExecutionLocation) -> str: + command = ["bjobs", "-a", "-json", "-o", "'EXEC_CWD OUTPUT_FILE'", job_id] + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Running command `{' '.join(command)}`") + stdout, _ = await self.connector.run( + location=location, command=command, capture_output=True + ) + results = _get_result(job_id, stdout, ["EXEC_CWD", "OUTPUT_FILE"]) + if output_path := str(os.path.join(*results)): + stdout, _ = await super().run( + location=location, command=["cat", output_path], capture_output=True + ) + return stdout.strip() + else: + return "" + + async def _get_returncode(self, job_id: str, location: ExecutionLocation) -> int: + command = ["bjobs", "-a", "-json", "-o", "EXIT_CODE", job_id] + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Running command `{' '.join(command)}`") + stdout, _ = await super().run( + location=location, + command=command, + capture_output=True, + ) + result = _get_result(job_id, stdout, ["EXIT_CODE"])[0] + try: + return int(result) if result else 0 + except ValueError: + raise WorkflowExecutionException( + f"Error while retrieving return code for job {job_id}: {result}" + ) + + async def _get_running_jobs( + self, location: ExecutionLocation + ) -> MutableSequence[str]: + # bjobs -noheader -o jobid [JOBIDS] # missing a state filter parameter + command = [ + "bjobs", + "-a", + "-json", + "-o", + "'JOBID STAT'", + " ".join(self._scheduled_jobs.keys()), + ] + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Running command `{' '.join(command)}`") + stdout, _ = await super().run( + location=location, + command=command, + capture_output=True, + ) + if ((json_start := stdout.find("{")) != -1) and ( + (json_end := stdout.rfind("}")) != -1 + ): + try: + stdout = json.loads(stdout[json_start : (json_end + 1)]) + except json.decoder.JSONDecodeError: + raise WorkflowExecutionException( + f"Error parsing running jobs: {stdout.strip()}" + ) + return [ + record["JOBID"] + for record in stdout["RECORDS"] + if record["STAT"] in ("PEND", "RUN") + ] + else: + raise WorkflowExecutionException( + f"Error retrieving running jobs: {stdout.strip()}" + ) + + @property + def _service_class(self) -> type[QueueManagerService]: + return LSFService + + async def _remove_jobs( + self, location: ExecutionLocation, jobs: MutableSequence[str] + ) -> None: + await super().run( + location=location, + command=["bkill", " ".join(jobs)], + ) + + async def _run_batch_command( + self, + command: str, + environment: MutableMapping[str, str] | None, + job_name: str, + location: ExecutionLocation, + workdir: str | None = None, + stdin: int | str | None = None, + stdout: int | str = asyncio.subprocess.STDOUT, + stderr: int | str = asyncio.subprocess.STDOUT, + timeout: int | None = None, + ) -> str: + batch_command = [ + "echo", + base64.b64encode(command.encode("utf-8")).decode("utf-8"), + "|", + "base64", + "-d", + "|", + ] + if environment: + batch_command.extend([f"{k}={v}" for k, v in environment.items()]) + batch_command.append("bsub") + if stdin is not None and stdin != asyncio.subprocess.DEVNULL: + batch_command.append(get_lsf_option("i", shlex.quote(stdin))) + if stderr != asyncio.subprocess.STDOUT and stderr != stdout: + batch_command.append(get_lsf_option("e", self._format_stream(stderr))) + if stdout != asyncio.subprocess.STDOUT: + batch_command.append(get_lsf_option("o", self._format_stream(stdout))) + if timeout: + batch_command.append( + get_lsf_option("W", utils.format_seconds_to_hhmmss(timeout)) + ) + if service := cast(LSFService, self.services.get(location.service)): + batch_command.append(get_lsf_option("app", service.application_profile)) + batch_command.append(get_lsf_option("ar", service.auto_resizable)) + batch_command.append(get_lsf_option("k", service.checkpoint)) + batch_command.append(get_lsf_option("clusters", service.clusters)) + batch_command.append(get_lsf_option("C", service.core_limit)) + batch_command.append(get_lsf_option("c", service.cpu_time_limit)) + batch_command.append(get_lsf_option("data", service.data)) + batch_command.append(get_lsf_option("D", service.data_limit)) + batch_command.append(get_lsf_option("datagrp", service.datagrp)) + batch_command.append(get_lsf_option("is", service.dynamic_input_file)) + batch_command.append( + get_lsf_option("eptl", service.eligible_pending_time_limit) + ) + batch_command.append(get_lsf_option("Zs", service.enable_spool)) + batch_command.append(get_lsf_option("env", service.env)) + batch_command.append(get_lsf_option("e", service.error_file_append)) + batch_command.append(get_lsf_option("eo", service.error_file_overwrite)) + batch_command.append(get_lsf_option("We", service.estimated_running_time)) + batch_command.append(get_lsf_option("x", service.exclusive)) + batch_command.append( + get_lsf_option("ext", service.external_scheduler_options) + ) + batch_command.append(get_lsf_option("F", service.file_size_limit)) + batch_command.append(get_lsf_option("freq", service.freq)) + batch_command.append(get_lsf_option("hostfile", service.hostfile)) + batch_command.append(get_lsf_option("m", service.location_required)) + batch_command.append(get_lsf_option("i", service.input_file)) + batch_command.append(get_lsf_option("Jd", service.job_description)) + batch_command.append(get_lsf_option("g", service.job_group)) + batch_command.append(get_lsf_option("J", service.job_name)) + batch_command.append(get_lsf_option("jsdl", service.jsdl_file)) + batch_command.append(get_lsf_option("jsdl_strict", service.jsdl_strict)) + batch_command.append(get_lsf_option("Lp", service.license_project)) + batch_command.append(get_lsf_option("f", service.local_file)) + batch_command.append(get_lsf_option("u", service.mail_user)) + batch_command.append(get_lsf_option("hl", service.mem_and_swap_imit)) + batch_command.append(get_lsf_option("M", service.memory_limit)) + batch_command.append(get_lsf_option("mig", service.migration_threshold)) + batch_command.append(get_lsf_option("rn", service.no_rerunnable)) + batch_command.append(get_lsf_option("n", service.num_of_tasks)) + batch_command.append(get_lsf_option("ptl", service.pending_time_limit)) + batch_command.append(get_lsf_option("Ep", service.post_exec_command)) + batch_command.append(get_lsf_option("E", service.pre_exec_command)) + batch_command.append(get_lsf_option("sp", service.priority)) + batch_command.append(get_lsf_option("p", service.process_limit)) + batch_command.append(get_lsf_option("P", service.project_name)) + batch_command.append(get_lsf_option("q", service.queue_name)) + batch_command.append(get_lsf_option("Q", service.requeue_exit_value)) + batch_command.append(get_lsf_option("r", service.rerun_on_host_failure)) + batch_command.append(get_lsf_option("U", service.reservation)) + batch_command.append(get_lsf_option("rnc", service.resize_notification_cmd)) + batch_command.append(get_lsf_option("R", service.resource_requirements)) + batch_command.append(get_lsf_option("B", service.send_mail)) + batch_command.append(get_lsf_option("sla", service.service_class_name)) + batch_command.append(get_lsf_option("s", service.signal)) + batch_command.append(get_lsf_option("S", service.stack_size_limit)) + batch_command.append(get_lsf_option("b", service.start_time)) + batch_command.append(get_lsf_option("t", service.termination_deadline)) + batch_command.append(get_lsf_option("T", service.thread_limit)) + batch_command.append(get_lsf_option("G", service.user_group)) + batch_command.append(get_lsf_option("v", service.virtual_mem_limit)) + batch_command.append(get_lsf_option("wa", service.warning_action)) + batch_command.append(get_lsf_option("wt", service.warning_time_action)) + if not timeout: + batch_command.append(get_lsf_option("W", service.run_limit)) + batch_command.extend( + [ + get_lsf_option("cwd", workdir), + get_lsf_option("outdir", workdir), + get_lsf_option("N", True), + ] + ) + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Running command {' '.join(batch_command)}") + stdout, returncode = await super().run( + location=location, command=batch_command, capture_output=True + ) + if returncode == 0: + return re.search("Job <(.*)> is submitted", stdout.strip())[1] + else: + raise WorkflowExecutionException( + f"Error submitting job {job_name} to LSF: {stdout.strip()}" + ) + + @classmethod + def get_schema(cls) -> str: + return ( + files(__package__) + .joinpath("schemas") + .joinpath("lsf.json") + .read_text("utf-8") + ) diff --git a/streamflow_lsf/plugin.py b/streamflow_lsf/plugin.py new file mode 100644 index 0000000..8206aa9 --- /dev/null +++ b/streamflow_lsf/plugin.py @@ -0,0 +1,8 @@ +from streamflow.ext.plugin import StreamFlowPlugin + +from streamflow_lsf.connector import LSFConnector + + +class LSFStreamFlowPlugin(StreamFlowPlugin): + def register(self) -> None: + self.register_connector("unito.lsf", LSFConnector) diff --git a/streamflow_lsf/schemas/lsf.json b/streamflow_lsf/schemas/lsf.json new file mode 100644 index 0000000..2338e27 --- /dev/null +++ b/streamflow_lsf/schemas/lsf.json @@ -0,0 +1,275 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://streamflow.di.unito.it/schemas/plugins/lsf/lfs.json", + "type": "object", + "$defs": { + "service": { + "type": "object", + "title": "LSFService", + "description": "This complex type represents a submission to the LSF queue manager.", + "properties": { + "file": { + "type": "string", + "description": "Path to a file containing a Jinja2 template, describing how the StreamFlow command should be executed in the remote environment" + }, + "applicationProfile": { + "type": "string", + "description": "Submits the job to the specified application profile." + }, + "autoResizable": { + "type": "boolean", + "description": "Specifies that the job is autoresizable." + }, + "checkpoint": { + "type": "string", + "description": "Makes a job checkpointable and specifies the checkpoint directory." + }, + "clusters": { + "type": "string", + "description": "LSF multicluster capability only. Specifies cluster names when submitting jobs." + }, + "coreLimit": { + "type": "integer", + "description": "Sets a per-process (soft) core file size limit for all the processes that belong to this job." + }, + "cpuTimeLimit": { + "type": "string", + "description": "Limits the total CPU time the job can use. Format: [hour:]minute[/host_name | /host_model]" + }, + "data": { + "type": "string", + "description": "Specifies data requirements for a job." + }, + "dataLimit": { + "type": "integer", + "description": "Sets a per-process (soft) data segment size limit for each of the processes that belong to the job." + }, + "datagrp": { + "type": "string", + "description": "Specifies user group for job data requirements." + }, + "dynamicInputFile": { + "type": "string", + "description": "Gets the standard input for the job from the specified file path, but allows you to modify or remove the input file before the job completes." + }, + "eligiblePendingTimeLimit": { + "type": "string", + "description": "Specifies the eligible pending time limit for the job. Format: [hour:]minute" + }, + "enableSpool": { + "type": "boolean", + "description": "Spools a job command file to the directory specified by the JOB_SPOOL_DIR parameter in lsb.params, and uses the spooled file as the command file for the job." + }, + "env": { + "type": "string", + "description": "Controls the propagation of the specified job submission environment variables to the execution hosts." + }, + "errorFileAppend": { + "type": "string", + "description": "Appends the standard error output of the job to the specified file path." + }, + "errorFileOverwrite": { + "type": "string", + "description": "Overwrites the standard error output of the job to the specified file path." + }, + "estimatedRunningTime": { + "type": "integer", + "description": "Specifies an estimated run time for the job." + }, + "exclusive": { + "type": "boolean", + "description": "Puts the host running your job into exclusive execution mode." + }, + "externalSchedulerOptions": { + "type": "string", + "description": "Specifies application-specific external scheduling options for the job." + }, + "fileSizeLimit": { + "type": "integer", + "description": "Sets a per-process (soft) file size limit for each of the processes that belong to the job." + }, + "freq": { + "type": "integer", + "description": "Specifies a CPU frequency for a job." + }, + "hostfile": { + "type": "string", + "description": "Submits a job with a user-specified host file." + }, + "inputFile": { + "type": "string", + "description": "Gets the standard input for the job from specified file path." + }, + "jobDescription": { + "type": "string", + "description": "Assigns the specified description to the job; for job arrays, specifies the same job description for all elements in the job array." + }, + "jobGroup": { + "type": "string", + "description": "Submits jobs in the specified job group." + }, + "jobName": { + "type": "string", + "description": "Assigns the specified name to the job, and, for job arrays, specifies the indices of the job array and optionally the maximum number of jobs that can run at any given time." + }, + "jsdlFile": { + "type": "string", + "description": "Submits a job using a JSDL file that uses the LSF extension to specify job submission options." + }, + "jsdlStrict": { + "type": "string", + "description": "Submits a job using a JSDL file that only uses the standard JSDL elements and POSIX extensions to specify job submission options." + }, + "licenseProject": { + "type": "string", + "description": "Assigns the job to the specified LSF License Scheduler project." + }, + "localFile": { + "type": "string", + "description": "Copies a file between the local (submission) host and the remote (execution) host." + }, + "locationRequired": { + "type": "string", + "description": "Submits a job to be run on specific hosts, host groups, or compute units." + }, + "mailUser": { + "type": "string", + "description": "Sends mail to the specified email destination." + }, + "memAndSwapLimit": { + "type": "boolean", + "description": "Enables job-level host-based memory and swap limit enforcement on systems that support Linux cgroups." + }, + "memoryLimit": { + "type": "integer", + "description": "Sets a memory limit for all the processes that belong to the job." + }, + "migrationThreshold": { + "type": "integer", + "description": "Specifies the migration threshold for checkpointable or re-runnable jobs, in minutes." + }, + "noRerunnable": { + "type": "boolean", + "description": "Specifies that the job is never re-runnable." + }, + "numOfTasks": { + "type": "string", + "description": "Submits a parallel job and specifies the number of tasks in the job." + }, + "pendingTimeLimit": { + "type": "string", + "description": "Specifies the pending time limit for the job. Format: [hour:]minute" + }, + "postExecCommand": { + "type": "string", + "description": "Runs the specified job-based post-execution command on the execution host after the job finishes." + }, + "preExecCommand": { + "type": "string", + "description": "Runs the specified job-based pre-execution command on the execution host before actually running the job." + }, + "priority": { + "type": "integer", + "description": "Specifies user-assigned job priority that orders jobs in a queue." + }, + "processLimit": { + "type": "integer", + "description": "Sets the limit of the number of processes to the specified value for the whole job." + }, + "projectName": { + "type": "string", + "description": "Assigns the job to the specified project." + }, + "queueName": { + "type": "string", + "description": "Submits the job to one of the specified queues." + }, + "requeueExitValue": { + "type": "string", + "description": "Specify automatic job re-queue exit values." + }, + "rerunOnHostFailure": { + "type": "boolean", + "description": "Reruns a job if the execution host or the system fails; it does not rerun a job if the job itself fails." + }, + "reservation": { + "type": "string", + "description": "If an advance reservation has been created with the brsvadd command, the job makes use of the reservation." + }, + "resizeNotificationCmd": { + "type": "string", + "description": "Specifies the full path of an executable to be invoked on the first execution host when the job allocation has been modified (both shrink and grow)." + }, + "resourceRequirements": { + "type": "string", + "description": "Runs the job on a host that meets the specified resource requirements." + }, + "runLimit": { + "type": "integer", + "description": "Set a limit on the total run time of the job allocation. If a `timeout` value is defined directly in the workflow specification, it will override this value" + }, + "sendMail": { + "type": "boolean", + "description": "Sends mail to you when the job is dispatched and begins execution." + }, + "serviceClassName": { + "type": "string", + "description": "Specifies the service class where the job is to run." + }, + "signal": { + "type": "string", + "description": "Sends the specified signal when a queue-level run window closes." + }, + "stackSizeLimit": { + "type": "integer", + "description": "Sets a per-process (soft) stack segment size limit for each of the processes that belong to the job." + }, + "startTime": { + "type": "string", + "description": "Dispatches the job for execution on or after the specified date and time. Format: [[year:][month:]day:]hour:minute" + }, + "terminationDeadline": { + "type": "string", + "description": "Specifies the job termination deadline. Format: [[[year:]month:]day:]hour:minute" + }, + "threadLimit": { + "type": "integer", + "description": "Sets the limit of the number of concurrent threads to the specified value for the whole job." + }, + "userGroup": { + "type": "string", + "description": "For fair share scheduling. Associates the job with the specified group or excludes the job from the specified groups." + }, + "virtualMemLimit": { + "type": "integer", + "description": "Sets the total process virtual memory limit to the specified value for the whole job." + }, + "warningAction": { + "type": "string", + "description": "Specifies the job action to be taken before a job control action occurs." + }, + "warningTimeAction": { + "type": "string", + "description": "Specifies the amount of time before a job control action occurs that a job warning action is to be taken. Format: [hour:]minute" + } + } + } + }, + "allOf": [ + { + "$ref": "https://streamflow.di.unito.it/schemas/deployment/connector/queue_manager.json" + } + ], + "properties": { + "services": { + "type": "object", + "description": "Map containing named configurations of LSF submissions. Parameters can be either specified as #HQ directives in a file or directly in YAML format.", + "patternProperties": { + "^[a-z][a-zA-Z0-9._-]*$": { + "$ref": "#/$defs/service" + } + } + } + }, + "unevaluatedProperties": false +} \ No newline at end of file diff --git a/streamflow_lsf/version.py b/streamflow_lsf/version.py new file mode 100644 index 0000000..901e511 --- /dev/null +++ b/streamflow_lsf/version.py @@ -0,0 +1 @@ +VERSION = "0.0.1" diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 0000000..d149d28 --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,4 @@ +pytest==8.0.1 +pytest-asyncio==0.21.1 +pytest-cov==4.1.0 +pytest-xdist==3.5.0 diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..547fa6d --- /dev/null +++ b/tox.ini @@ -0,0 +1,42 @@ +[tox] +envlist = + bandit + lint + py3.{8,9,10,11,12}-unit +skip_missing_interpreters = True + +[pytest] +asyncio_mode = auto +testpaths = tests + +[testenv] +allowlist_externals = make +commands_pre = + py3.{8,9,10,11,12}-unit: python -m pip install -U pip setuptools wheel +commands = + py3.{8,9,10,11,12}-unit: make coverage-report coverage.xml PYTEST_EXTRA={posargs} +deps = + py3.{8,9,10,11,12}-unit: -rrequirements.txt + py3.{8,9,10,11,12}-unit: -rtest-requirements.txt +description = + py3.{8,9,10,11,12}-unit: Run the unit tests +setenv = + py3.{8,9,10,11,12}-unit: LC_ALL = C.UTF-8 + +[testenv:bandit] +commands = bandit -r streamflow_hyperqueue +deps = + -rrequirements.txt + -rbandit-requirements.txt +description = Search for common security issues +passenv = + CI + GITHUB_* + +[testenv:lint] +allowlist_externals = make +commands = make flake8 format-check codespell-check pyupgrade +deps = + -rrequirements.txt + -rlint-requirements.txt +description = Lint the Python code