Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fake data generation #215

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion clockwork_frontend_test/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@
DASHBOARD_TABLE_CONTENT = []
for job in fake_data["jobs"]:
if job["cw"]["mila_email_username"] == "[email protected]":
# This element could be an array of states, or a simple string.
# For now, each array we encountered contained only one element.
job_states = job["slurm"]["job_state"]

DASHBOARD_TABLE_CONTENT.append(
[
job["slurm"]["cluster_name"],
int(
job["slurm"]["job_id"]
), # job ID is currently handled as a numeric value
job["slurm"]["name"],
job["slurm"]["job_state"].lower(),
job_states[0].lower()
if isinstance(job_states, list)
else job_states.lower(),
get_default_display_date(job["slurm"]["submit_time"]),
get_default_display_date(job["slurm"]["start_time"]),
get_default_display_date(job["slurm"]["end_time"]),
Expand Down
24 changes: 16 additions & 8 deletions clockwork_web_test/test_browser_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ def test_jobs_with_both_pagination_options(
nbr_items_per_page The number of jobs we want to display per page
"""
# Define the user we want to use for this test
current_user_id = "[email protected]" # Can access all clusters
# An admin can access all clusters
for user in fake_data["users"]:
if "admin_access" in user and user["admin_access"]:
current_user_id = user["mila_email_username"]
assert current_user_id

# Log in to Clockwork
login_response = client.get(f"/login/testing?user_id={current_user_id}")
Expand All @@ -215,10 +219,10 @@ def test_jobs_with_both_pagination_options(
sorted_all_jobs = sorted(
fake_data["jobs"],
key=lambda d: (
d["slurm"]["submit_time"],
-int(d["slurm"]["job_id"]),
-d["slurm"]["submit_time"],
d["slurm"]["job_id"],
), # These are the default values for the sorting
reverse=True,
reverse=False,
)

# Get the response
Expand Down Expand Up @@ -313,7 +317,11 @@ def test_jobs_with_nbr_items_per_page_pagination_option(
nbr_items_per_page The number of jobs we want to display per page
"""
# Define the user we want to use for this test
current_user_id = "[email protected]" # Can access all clusters
# An admin can access all clusters
for user in fake_data["users"]:
if "admin_access" in user and user["admin_access"]:
current_user_id = user["mila_email_username"]
assert current_user_id

# Log in to Clockwork
login_response = client.get(f"/login/testing?user_id={current_user_id}")
Expand All @@ -323,10 +331,10 @@ def test_jobs_with_nbr_items_per_page_pagination_option(
sorted_all_jobs = sorted(
fake_data["jobs"],
key=lambda d: (
d["slurm"]["submit_time"],
-int(d["slurm"]["job_id"]),
-d["slurm"]["submit_time"],
d["slurm"]["job_id"],
), # This is the default sorting option
reverse=True,
reverse=False,
)

# Get the response
Expand Down
2 changes: 1 addition & 1 deletion clockwork_web_test/test_rest_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest


@pytest.mark.parametrize("cluster_name", ("mila", "beluga", "cedar", "graham"))
@pytest.mark.parametrize("cluster_name", ("mila", "beluga", "cedar"))
def test_single_node_at_random(
client, fake_data, valid_rest_auth_headers, cluster_name
):
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ services:
- ./clockwork_tools_test:/clockwork/clockwork_tools_test
- ./clockwork_web:/clockwork/clockwork_web
- ./clockwork_web_test:/clockwork/clockwork_web_test
- ./slurm_report:/clockwork/tmp/slurm_report
- ./slurm_state:/clockwork/slurm_state
- ./slurm_state_test:/clockwork/slurm_state_test
- ./scripts:/clockwork/scripts
Expand Down
7 changes: 6 additions & 1 deletion scripts/concat_json_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ def main(argv):
# regardless of the repartition of the jobs and nodes by clusters,
# we divide the number we want to keep by the number of input
# sources we use, and keep this number of elements from each source.
kept_elements_nb = round(args.keep / len(args.inputs)) + 1
if args.keep is not None:
kept_elements_nb = round(args.keep / len(args.inputs)) + 1
else:
kept_elements_nb = None
# Nota bene: as this number has to be an integer, we are not sure to
# have a perfect fraction of the requested number. Thus, we round up the
# result then add one: the surplus would then be withdrawn from the
Expand All @@ -44,6 +47,8 @@ def main(argv):
# Get a part of the data
np.random.shuffle(E)
E = E[0:kept_elements_nb]
if kept_elements_nb is not None:
E = E[0:kept_elements_nb]
L.extend(E)

if args.keep is not None:
Expand Down
12 changes: 8 additions & 4 deletions scripts/produce_fake_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,17 @@ for SUBFOLDER in ${CLOCKWORK_ROOT}/tmp/slurm_report/*; do
# "job anonymized" to "jobs anonymized dump file"
# "node anonymized" to "nodes anonymized dump file"
ANONYMIZED_JOBS_FILE=${CLOCKWORK_ROOT}/tmp/slurm_report/${CLUSTER_NAME}/job_anonymized_dump_file.json
: <<COMMENT
# Nodes parsing is not up to date anymore. I cheat by using old anonymized file.
ANONYMIZED_NODES_FILE=${CLOCKWORK_ROOT}/tmp/slurm_report/${CLUSTER_NAME}/node_anonymized_dump_file.json
COMMENT
ANONYMIZED_NODES_FILE=${CLOCKWORK_ROOT}/tmp/slurm_report/${CLUSTER_NAME}/hardcoded_values/nodes.json
python3 -m slurm_state.read_report_commit_to_db \
--cluster_name ${CLUSTER_NAME} \
--from_existing_jobs_file \
--from_existing_slurm_jobs_file \
--slurm_jobs_file ${CLOCKWORK_ROOT}/tmp/slurm_report/${CLUSTER_NAME}/sacct_anonymized \
--cw_jobs_file $ANONYMIZED_JOBS_FILE \
--from_existing_nodes_file \
--from_existing_cw_nodes_file \
--slurm_nodes_file ${CLOCKWORK_ROOT}/tmp/slurm_report/${CLUSTER_NAME}/sinfo_anonymized \
--cw_nodes_file $ANONYMIZED_NODES_FILE
ANONYMIZED_JOBS_FILES+=(${ANONYMIZED_JOBS_FILE})
Expand All @@ -68,11 +72,11 @@ for SUBFOLDER in ${CLOCKWORK_ROOT}/tmp/slurm_report/*; do
fi
done

python3 concat_json_lists.py --keep 100 \
python3 concat_json_lists.py \
--inputs ${ANONYMIZED_JOBS_FILES[@]} \
--output ${CLOCKWORK_ROOT}/tmp/slurm_report/subset_100_jobs_anonymized.json

python3 concat_json_lists.py --keep 100 \
python3 concat_json_lists.py \
--inputs ${ANONYMIZED_NODES_FILES[@]} \
--output ${CLOCKWORK_ROOT}/tmp/slurm_report/subset_100_nodes_anonymized.json

Expand Down
1 change: 1 addition & 0 deletions slurm_report/hardcoded_values/nodes.json

Large diffs are not rendered by default.

43 changes: 30 additions & 13 deletions slurm_state/mongo_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# Import parser classes
from slurm_state.parsers.job_parser import JobParser
from slurm_state.parsers.node_parser import NodeParser
from slurm_state.parsers.entity_parser import IdentityParser


def pprint_bulk_result(result):
Expand Down Expand Up @@ -118,7 +119,7 @@ def main_read_report_and_update_collection(
users_collection,
cluster_name,
report_file_path,
from_file=False,
from_file=None,
want_commit_to_db=True,
dump_file="",
):
Expand All @@ -133,8 +134,8 @@ def main_read_report_and_update_collection(
cluster_name Name of the cluster we are working on
report_file_path Path to the report from which the jobs or nodes information is extracted. This report is generated through
the command sacct for the jobs, or sinfo for the nodes. If None, a new report is generated.
from_file Boolean indicating whether or not the jobs or nodes are extracted from a Slurm file. If True, the input file
is report_file_path. If False, the file is generated at the report_file_path path.
from_file Value contained in ["cw", "slurm", None] indicating whether the jobs or nodes are extracted from a Slurm file, a CW file (ie JSON file presenting a list of the entities formatted as used in Clockwork) or from no file. If "cw" or "slurm", the input file
is report_file_path. If None, the file is generated at the report_file_path path.
want_commit_to_db Boolean indicating whether or not the jobs or nodes are stored in the database. Default is True
dump_file String containing the path to the file in which we want to dump the data. Default is "", which means nothing is stored in an output file
"""
Expand All @@ -153,8 +154,20 @@ def main_read_report_and_update_collection(
parser_version = None
if from_file:
with open(report_file_path, "r") as infile:
version = json.load(infile)["meta"]["Slurm"]["version"]
parser_version = f"{version['major']}.{version['micro']}.{version['minor']}"
infile_data = json.load(infile)

if from_file == "slurm":
if "Slurm" in infile_data["meta"]:
version = infile_data["meta"]["Slurm"]["version"]
elif "slurm" in infile_data["meta"]:
version = infile_data["meta"]["slurm"]["version"]
else:
raise Exception(
f'"Slurm" or "slurm" not found in data["meta"] for file {report_file_path}'
)
parser_version = (
f"{version['major']}.{version['micro']}.{version['minor']}"
)

# Check the input parameters
assert entity in ["jobs", "nodes"]
Expand All @@ -164,17 +177,21 @@ def main_read_report_and_update_collection(
"job_id" # The id_key is used to determine how to retrieve the ID of a job
)
parser = JobParser(
cluster_name, slurm_version=parser_version
cluster_name=cluster_name, slurm_version=parser_version
) # This parser is used to retrieve and format useful information from a sacct job
from_slurm_to_clockwork = slurm_job_to_clockwork_job # This function is used to translate a Slurm job (created through the parser) to a Clockwork job

elif entity == "nodes":
id_key = (
"name" # The id_key is used to determine how to retrieve the ID of a node
)
parser = NodeParser(
cluster_name, slurm_version=parser_version
) # This parser is used to retrieve and format useful information from a sacct node
from_slurm_to_clockwork = slurm_node_to_clockwork_node # This function is used to translate a Slurm node (created through the parser) to a Clockwork node
if from_file in ["slurm", None]:
id_key = "name" # The id_key is used to determine how to retrieve the ID of a node
parser = NodeParser(
cluster_name=cluster_name, slurm_version=parser_version
) # This parser is used to retrieve and format useful information from a sacct node
from_slurm_to_clockwork = slurm_node_to_clockwork_node # This function is used to translate a Slurm node (created through the parser) to a Clockwork node
elif from_file == "cw":
parser = IdentityParser(entity=entity, cluster_name=cluster_name)
from_slurm_to_clockwork = lambda x: x

else:
# Raise an error because it should not happen
raise ValueError(
Expand Down
109 changes: 109 additions & 0 deletions slurm_state/parsers/entity_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Imports to retrieve the values related to Slurm command
from slurm_state.helpers.ssh_helper import launch_slurm_command, open_connection
from slurm_state.helpers.clusters_helper import get_all_clusters

# Common imports
import json, os, re


class EntityParser:
"""
A parser for Slurm entities
"""

def __init__(self, entity, cluster_name, slurm_command=None, slurm_version=None):
self.entity = entity
assert entity in ["jobs", "nodes"]

self.cluster = get_all_clusters()[cluster_name]
self.cluster["name"] = cluster_name

self.slurm_command = slurm_command
# Retrieve the path to the Slurm command we want to launch on the cluster
# It is stored in the cluster data under the key "sacct_path" for the sacct command
# and "sinfo_path" for the sinfo command
self.slurm_command_path = self.cluster[f"{self.slurm_command}_path"]
# Check if slurm_command_path exists
assert (
self.slurm_command_path
), f"Error. We have called the function to make updates with {self.slurm_command} but the {self.slurm_command}_path config is empty."
assert self.slurm_command_path.endswith(
self.slurm_command
), f"Error. The {self.slurm_command}_path configuration needs to end with '{self.slurm_command}'. It is currently {self.slurm_command_path} ."

if slurm_version is not None:
self.slurm_version = slurm_version
else:
# If no Slurm version is provided, retrieve the version of Slurm installed on the current cluster
self.slurm_version = self.get_slurm_version()

def get_slurm_version(self):
"""
Get the Slurm version
"""
if (
"slurm_version" in self.cluster
and self.cluster["slurm_version"] is not None
):
# If the Slurm version has been added to the configuration file,
# return the value of the configuration
return self.cluster["slurm_version"]
else:
# Launch the sacct or sinfo command to get its version
remote_command = f"{self.slurm_command_path} -V"
response = self.launch_slurm_command(remote_command)
assert len(response) == 1
version_regex = re.compile(r"^slurm (\d+\.\d+\.\d+)$")
if m := version_regex.match(response[0]):
return m.group(1)
# If the version has not been identified, raise an error
raise Exception(
f'The version "{response[0]}" has not been recognized as a Slurm version.'
)

def launch_slurm_command(self, remote_command):
""" """
return launch_slurm_command(
remote_command,
self.cluster["remote_hostname"],
self.cluster["remote_user"],
self.cluster["ssh_key_filename"],
self.cluster["ssh_port"],
)

def generate_report(self, remote_command, file_name):
"""
Launch a Slurm command in order to retrieve JSON report containing
jobs or nodes information

Parameters:
cluster_name The name of the cluster on which the Slurm command will be launched
remote_command The command used to retrieve the data from Slurm
file_name The path of the report file to write
"""
# Launch the requested command in order to retrieve Slurm information
stdout = self.launch_slurm_command(remote_command)

# Create directories if needed
os.makedirs(os.path.dirname(file_name), exist_ok=True)

# Write the command output to a file
with open(file_name, "w") as outfile:
for line in stdout:
outfile.write(line)


class IdentityParser(EntityParser):
def __init__(self, entity, cluster_name):
self.entity = entity

self.cluster = get_all_clusters()[cluster_name]
self.cluster["name"] = cluster_name

def parser(self, f):
# Load the JSON file generated using the Slurm command
# (At this point, slurm_data is a hierarchical structure of dictionaries and lists)
entities = json.load(f)
for entity in entities:
if entity["slurm"]["cluster_name"] == self.cluster["name"]:
yield entity
6 changes: 3 additions & 3 deletions slurm_state/parsers/job_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
zero_to_null,
)

from slurm_state.parsers.slurm_parser import SlurmParser
from slurm_state.parsers.entity_parser import EntityParser

# Common imports
import json, re


class JobParser(SlurmParser):
class JobParser(EntityParser):
""" """

def __init__(self, cluster_name, slurm_version=None):
super().__init__("jobs", "sacct", cluster_name, slurm_version=slurm_version)
super().__init__("jobs", cluster_name, "sacct", slurm_version=slurm_version)

def generate_report(self, file_name):

Expand Down
6 changes: 3 additions & 3 deletions slurm_state/parsers/node_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from slurm_state.parsers.slurm_parser import SlurmParser
from slurm_state.parsers.entity_parser import EntityParser

# These functions are translators used in order to handle the values
# we could encounter while parsing a node dictionary retrieved from a
Expand All @@ -13,11 +13,11 @@
import json, re


class NodeParser(SlurmParser):
class NodeParser(EntityParser):
""" """

def __init__(self, cluster_name, slurm_version=None):
super().__init__("nodes", "sinfo", cluster_name, slurm_version=slurm_version)
super().__init__("nodes", cluster_name, "sinfo", slurm_version=slurm_version)

def generate_report(self, file_name):
# The command to be launched through SSH is "sinfo --json"
Expand Down
Loading
Loading