Skip to content

Commit

Permalink
Merge pull request #4 from mberacochea/feature/fix-squeue-filters
Browse files Browse the repository at this point in the history
Slurm -squeue use --format instead of --json
  • Loading branch information
mberacochea authored Apr 15, 2024
2 parents f4ccdb0 + e843de4 commit e609a4c
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 34 deletions.
1 change: 1 addition & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ version: '3'

tasks:
build:
desc: Build and scp into the cluster
cmds:
- pyinstaller mjobs/main.py --onefile --clean --name mjobs
- scp dist/mjobs codon:~/
101 changes: 67 additions & 34 deletions mjobs/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import getpass
import json
import re
import sys
from collections import namedtuple
from datetime import datetime
from subprocess import CalledProcessError, check_output
from typing import Optional

Expand All @@ -32,16 +32,14 @@ class Slurm(Base):
def __init__(self, console: Console, error_console: Console):
super().__init__(console, error_console)

def status_style(self, job_entry) -> Text:
def status_style(self, job_state) -> Text:
colours = {
"RUNNING": "bold green",
"PENDING": "dark_orange",
"COMPLETED": "honeydew2",
"FAILED": "red",
}
return Text(
job_entry["job_state"], style=colours.get(job_entry["job_state"], "grey93")
)
return Text(job_state, style=colours.get(job_state, "grey93"))

def get_args(self):
JOB_STATES = [
Expand Down Expand Up @@ -135,25 +133,63 @@ def get_jobs(
self, job_ids: Optional[list[int]] = None, args: Optional[list[str]] = None
):
"""Call squeue to obtain the jobs, it uses the json output."""
squeue_args = ["squeue", "--json"]
# fmt: off
squeue_fields = [
["%.18i", "job_id"], # job id
["%.50j", "job_name"], # job name
["%l", "time_limit"], # time limit for the job (it can be NOT_SET), format: days-hours:minutes:seconds
["%m", "memory"], # Minimum size of memory (in MB) requested by the job
["%.10P", "partition"], # Partition of the job or job step.
["%T", "job_state"], # Job state in extended form.
["%u", "user_name"], # User name for a job or job step.
["%.20o", "command"], # The command to be executed.
["%.20r", "state_reason"], # The reason for the job status.
["%S", "start_time"], # Job start time, format: days-hours:minutes:seconds.
["%V", "submit_time"], # Job submission time, format: days-hours:minutes:seconds.
["%L", "end_time"], # time left (it can be NOT_SET), format: days-hours:minutes:seconds
["%.N", "nodes"], # List of nodes allocated to the job or job step.
["%.100Z", "workdir"], # Working directory
]
# fmt: on

SlurmJob = namedtuple("SlurmJob", " ".join(x[1] for x in squeue_fields))

len_flags = len(squeue_fields)

jobs = []

squeue = [
"squeue",
"-h",
"--format",
f"\"{' '.join(x[0] for x in squeue_fields)}\"",
]

if args:
squeue_args.extend(list(map(str, args)))
squeue.extend(list(map(str, args)))
if job_ids:
squeue_args.extend(["--jobs", ",".join(list(map(str, job_ids)))])
squeue.extend([",".join(list(map(str, job_ids)))])
try:
squeue_output = check_output(squeue_args, universal_newlines=True)
return json.loads(squeue_output).get("jobs", [])
squeue_output = check_output(squeue, universal_newlines=True).split("\n")
for line in squeue_output:
values = line.strip('"').split()
if len(values) < len_flags:
continue
jobs.append(SlurmJob(*values))

return jobs

except CalledProcessError as ex:
self.error_console.log(
f"squeue call failed. Arguments: {' '.join(squeue_args)}. Error {ex.output}"
f"squeue call failed. Arguments: {' '.join(squeue)}. Error {ex.output}"
)
raise ex

def convert_unix_timestamp(self, timestamp: Optional[int]) -> str:
def parse_timestamp_str(self, timestamp: Optional[int]) -> str:
if timestamp is None:
return f"Invalid timestamp. {timestamp}"
try:
dt_object = datetime.datetime.utcfromtimestamp(timestamp)
dt_object = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S")
return str(dt_object)
except (TypeError, ValueError):
return f"Invalid timestamp. {timestamp}"
Expand Down Expand Up @@ -199,8 +235,8 @@ def main(self):
filter_regex = re.compile(self.args.filter)
jobs = list(
filter(
lambda j: filter_regex.search(j["name"])
or filter_regex.search(j["command"]),
lambda j: filter_regex.search(j.job_name)
or filter_regex.search(j.command),
jobs,
)
)
Expand All @@ -223,41 +259,38 @@ def main(self):
{"header": "Partition"},
{"header": "Submit Time"},
{"header": "Start Time"},
{"header": "End Time"},
# {"header": "Pending reason"}, // TODO: implement, I don't know how this looks like in the json
{"header": "Time rem."},
{"header": "Status reason"},
]
if self.args.extended:
cols.append({"header": "WorkDir"})
cols.append({"header": "Nodes"})
cols.append({"header": "StdOut"})
cols.append({"header": "StdErr"})

rows = []

for job in sorted(jobs, key=lambda j: j["job_id"]):
job_name = Text(job["name"])
for job in sorted(jobs, key=lambda j: j.job_id):
job_name = Text(job.job_name)
if self.args.filter:
job_name.highlight_regex(rf"{self.args.filter}", "bold red")

row = [
str(job["job_id"]),
self.status_style(job),
job.job_id,
self.status_style(job.job_state),
job_name,
job["user_name"],
job["partition"],
self.convert_unix_timestamp(job["submit_time"]),
self.convert_unix_timestamp(job["start_time"]),
self.convert_unix_timestamp(job["end_time"]),
# pending_reason,
job.user_name,
job.partition,
self.parse_timestamp_str(job.submit_time),
self.parse_timestamp_str(job.start_time),
job.end_time,
job.state_reason,
]
if self.args.extended:
row.extend(
[
Text(job["nodes"], overflow="fold"),
job["standard_error"],
job["standard_output"],
job.workdir,
Text(job.nodes, overflow="fold"),
]
)

rows.append(row)

self.render(title=title, columns=cols, rows=rows)

0 comments on commit e609a4c

Please sign in to comment.