From 0f3f029e555cdfd399d75a8a444d1dbbba6b2400 Mon Sep 17 00:00:00 2001 From: Asif Tamuri Date: Wed, 26 Jul 2023 15:43:36 +0100 Subject: [PATCH 1/8] Add batch-terminate: checks job exists in user's directory and is running before terminating --- src/tlo/cli.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/tlo/cli.py b/src/tlo/cli.py index 2b32b48f91..e3f36cb9e0 100644 --- a/src/tlo/cli.py +++ b/src/tlo/cli.py @@ -247,6 +247,49 @@ def batch_run(path_to_json, work_directory, draw, sample): runner.run_sample_by_number(output_directory, draw, sample) +@cli.command() +@click.argument("job_id", type=str) +@click.pass_context +def batch_terminate(ctx, job_id): + # we check that directory has been created for this job in the user's folder + # (set up when the job is submitted) + config = load_config(ctx.obj["config_file"]) + username = config["DEFAULT"]["USERNAME"] + directory = f"{username}/{job_id}" + share_client = ShareClient.from_connection_string(config['STORAGE']['CONNECTION_STRING'], + config['STORAGE']['FILESHARE']) + + try: + directories = share_client.list_directories_and_files(directory) + assert len(list(directories)) > 0 + except ResourceNotFoundError: + print("ERROR: directory ", directory, "not found.") + return + + batch_client = get_batch_client( + config["BATCH"]["NAME"], + config["BATCH"]["KEY"], + config["BATCH"]["URL"] + ) + + # check the job is running + try: + job = batch_client.job.get(job_id=job_id) + except BatchErrorException: + print("ERROR: job ", job_id, "not found.") + return + + if job.state == "completed": + print("ERROR: Job already finished.") + return + + # job is running & not finished - terminate the job + batch_client.job.terminate(job_id=job_id) + print(f"Job {job_id} terminated.") + print("To download output run:") + print(f"\ttlo batch-download {job_id}") + + @cli.command() @click.argument("job_id", type=str) @click.option("show_tasks", "--tasks", is_flag=True, default=False, help="Display task information") From 89d77d54276a12e43dec758d67ff2ddb3164d937 Mon Sep 17 00:00:00 2001 From: Asif Tamuri Date: Thu, 27 Jul 2023 01:10:03 +0100 Subject: [PATCH 2/8] Update the `batch-list` subcommand - use the job directories in user's folder to see if jobs are theirs - add `--username` argument to filter by a different user - convert job results to dataframe and filter according to options --- src/tlo/cli.py | 86 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 28 deletions(-) diff --git a/src/tlo/cli.py b/src/tlo/cli.py index e3f36cb9e0..14b93ec8a1 100644 --- a/src/tlo/cli.py +++ b/src/tlo/cli.py @@ -7,14 +7,15 @@ import tempfile from collections import defaultdict from pathlib import Path -from typing import Dict +from typing import Dict, List import click import dateutil.parser +import pandas as pd from azure import batch from azure.batch import batch_auth from azure.batch import models as batch_models -from azure.batch.models import BatchErrorException +from azure.batch.models import BatchErrorException, CloudJobPaged, CloudJob from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError from azure.identity import DefaultAzureCredential from azure.keyvault.secrets import SecretClient @@ -359,45 +360,74 @@ def batch_job(ctx, job_id, raw, show_tasks): @click.option("--completed", "status", flag_value="completed", default=False, help="Only display completed jobs") @click.option("--active", "status", flag_value="active", default=False, help="Only display active jobs") @click.option("-n", default=5, type=int, help="Maximum number of jobs to list (default is 5)") +@click.option("--username", type=str, hidden=True) @click.pass_context -def batch_list(ctx, status, n, find): +def batch_list(ctx, status, n, find, username): """List and find running and completed jobs.""" - print(">Querying batch system\r", end="") + print("Querying Batch...") config = load_config(ctx.obj["config_file"]) + + if username is None: + username = config["DEFAULT"]["USERNAME"] + batch_client = get_batch_client( config["BATCH"]["NAME"], config["BATCH"]["KEY"], config["BATCH"]["URL"] ) - # get list of all batch jobs - jobs = batch_client.job.list( + # create client to connect to file share + share_client = ShareClient.from_connection_string(config['STORAGE']['CONNECTION_STRING'], + config['STORAGE']['FILESHARE']) + + user_directory = f"{username}/" + + # get list of all directories in user_directory, and get timestamps + directories = list(share_client.list_directories_and_files(user_directory, include=["timestamps"])) + + # if no directories then print message and exit + if len(directories) == 0: + print("No jobs found.") + return + + # sort directories by creation time, descending + directories.sort(key=lambda x: x["creation_time"], reverse=True) + # convert to dictionary + directories = {d['name']: d['creation_time'] for d in directories} + + # the directory names are job ids - get information about the first 10 jobs + # get information about jobs in single call + # filter the list of jobs by those ids in the directories + jobs_list = list(batch_client.job.list( job_list_options=batch_models.JobListOptions( expand='stats' ) - ) - count = 0 - for job in jobs: - jad = job.as_dict() - print_job = False - if (status is None or - ("completed" in status and jad["state"] == "completed") or - ("active" in status and jad["state"] == "active")): - if find is not None: - if find in jad["id"]: - print_job = True - else: - print_job = True + )) - if print_job: - print_basic_job_details(jad) - if "stats" in jad: - print(f"{'Succeeded tasks'.ljust(JOB_LABEL_PADDING)}: {jad['stats']['num_succeeded_tasks']}") - print(f"{'Failed tasks'.ljust(JOB_LABEL_PADDING)}: {jad['stats']['num_failed_tasks']}") - print() - count += 1 - if count == n: - break + # create a pandas dataframe of the jobs, using the job.as_dict() record + jobs: pd.DataFrame = pd.DataFrame([job.as_dict() for job in jobs_list if job.id in directories]) + + # get dataframe subset where id contains the find string + if find is not None: + jobs = jobs[jobs["id"].str.contains(find)] + + # sort by creation time + jobs = jobs.sort_values("creation_time", ascending=False) + + # filter by status + if status is not None: + jobs = jobs[jobs["state"] == status] + + # if no rows in jobs dataframe then print message and exit + if len(jobs) == 0: + print("No jobs found.") + return + + # get the first n rows + jobs = jobs.head(n) + + # print the dataframe + print(jobs[["id", "creation_time", "state"]].to_string(index=False)) def print_basic_job_details(job: dict): From 82a7cc9bd55865c2c386143a965c244ed39e8222 Mon Sep 17 00:00:00 2001 From: Asif Tamuri Date: Thu, 27 Jul 2023 01:15:52 +0100 Subject: [PATCH 3/8] Tidy up --- src/tlo/cli.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/tlo/cli.py b/src/tlo/cli.py index 14b93ec8a1..5af2e183e9 100644 --- a/src/tlo/cli.py +++ b/src/tlo/cli.py @@ -383,17 +383,15 @@ def batch_list(ctx, status, n, find, username): user_directory = f"{username}/" # get list of all directories in user_directory, and get timestamps - directories = list(share_client.list_directories_and_files(user_directory, include=["timestamps"])) + directories = list(share_client.list_directories_and_files(user_directory)) # if no directories then print message and exit if len(directories) == 0: print("No jobs found.") return - # sort directories by creation time, descending - directories.sort(key=lambda x: x["creation_time"], reverse=True) - # convert to dictionary - directories = {d['name']: d['creation_time'] for d in directories} + # convert directories to set + directories = set([directory["name"] for directory in directories]) # the directory names are job ids - get information about the first 10 jobs # get information about jobs in single call @@ -406,6 +404,7 @@ def batch_list(ctx, status, n, find, username): # create a pandas dataframe of the jobs, using the job.as_dict() record jobs: pd.DataFrame = pd.DataFrame([job.as_dict() for job in jobs_list if job.id in directories]) + jobs = jobs[["id", "creation_time", "state"]] # get dataframe subset where id contains the find string if find is not None: @@ -427,7 +426,7 @@ def batch_list(ctx, status, n, find, username): jobs = jobs.head(n) # print the dataframe - print(jobs[["id", "creation_time", "state"]].to_string(index=False)) + print(jobs.to_string(index=False)) def print_basic_job_details(job: dict): From 09ef40d5256510e9455dc9cd6498e8b8e59a7c97 Mon Sep 17 00:00:00 2001 From: Asif Tamuri Date: Thu, 27 Jul 2023 01:52:53 +0100 Subject: [PATCH 4/8] Tidy up --- src/tlo/cli.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tlo/cli.py b/src/tlo/cli.py index 5af2e183e9..f12d5fb757 100644 --- a/src/tlo/cli.py +++ b/src/tlo/cli.py @@ -7,7 +7,7 @@ import tempfile from collections import defaultdict from pathlib import Path -from typing import Dict, List +from typing import Dict import click import dateutil.parser @@ -15,7 +15,7 @@ from azure import batch from azure.batch import batch_auth from azure.batch import models as batch_models -from azure.batch.models import BatchErrorException, CloudJobPaged, CloudJob +from azure.batch.models import BatchErrorException from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError from azure.identity import DefaultAzureCredential from azure.keyvault.secrets import SecretClient @@ -378,7 +378,7 @@ def batch_list(ctx, status, n, find, username): # create client to connect to file share share_client = ShareClient.from_connection_string(config['STORAGE']['CONNECTION_STRING'], - config['STORAGE']['FILESHARE']) + config['STORAGE']['FILESHARE']) user_directory = f"{username}/" @@ -519,7 +519,7 @@ def load_config(config_file): def load_server_config(kv_uri, tenant_id) -> Dict[str, Dict]: - """Retrieve the server configuration for running Batch using the user"s Azure credentials + """Retrieve the server configuration for running Batch using the user's Azure credentials Allows user to login using credentials from Azure CLI or interactive browser. From 519819f8c19c043b2af204f4159b829a9df89f94 Mon Sep 17 00:00:00 2001 From: Asif Tamuri Date: Thu, 27 Jul 2023 08:55:44 +0100 Subject: [PATCH 5/8] Sort after filtering --- src/tlo/cli.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tlo/cli.py b/src/tlo/cli.py index f12d5fb757..99bf577a56 100644 --- a/src/tlo/cli.py +++ b/src/tlo/cli.py @@ -410,9 +410,6 @@ def batch_list(ctx, status, n, find, username): if find is not None: jobs = jobs[jobs["id"].str.contains(find)] - # sort by creation time - jobs = jobs.sort_values("creation_time", ascending=False) - # filter by status if status is not None: jobs = jobs[jobs["state"] == status] @@ -422,6 +419,9 @@ def batch_list(ctx, status, n, find, username): print("No jobs found.") return + # sort by creation time + jobs = jobs.sort_values("creation_time", ascending=False) + # get the first n rows jobs = jobs.head(n) From 908cc8ae728c3895ec89815940196126d3bb9849 Mon Sep 17 00:00:00 2001 From: Asif Tamuri Date: Thu, 27 Jul 2023 09:06:39 +0100 Subject: [PATCH 6/8] tidy up comments --- src/tlo/cli.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/tlo/cli.py b/src/tlo/cli.py index 99bf577a56..be536da7bf 100644 --- a/src/tlo/cli.py +++ b/src/tlo/cli.py @@ -380,12 +380,9 @@ def batch_list(ctx, status, n, find, username): share_client = ShareClient.from_connection_string(config['STORAGE']['CONNECTION_STRING'], config['STORAGE']['FILESHARE']) - user_directory = f"{username}/" + # get list of all directories in user_directory + directories = list(share_client.list_directories_and_files(f"{username}/")) - # get list of all directories in user_directory, and get timestamps - directories = list(share_client.list_directories_and_files(user_directory)) - - # if no directories then print message and exit if len(directories) == 0: print("No jobs found.") return @@ -393,20 +390,20 @@ def batch_list(ctx, status, n, find, username): # convert directories to set directories = set([directory["name"] for directory in directories]) - # the directory names are job ids - get information about the first 10 jobs - # get information about jobs in single call - # filter the list of jobs by those ids in the directories + # get all jobs in batch system jobs_list = list(batch_client.job.list( job_list_options=batch_models.JobListOptions( expand='stats' ) )) - # create a pandas dataframe of the jobs, using the job.as_dict() record - jobs: pd.DataFrame = pd.DataFrame([job.as_dict() for job in jobs_list if job.id in directories]) + # create a dataframe of the jobs, using the job.as_dict() record + # filter the list of jobs by those ids in the directories + jobs_list = [job.as_dict() for job in jobs_list if job.id in directories] + jobs: pd.DataFrame = pd.DataFrame(jobs_list) jobs = jobs[["id", "creation_time", "state"]] - # get dataframe subset where id contains the find string + # get subset where id contains the find string if find is not None: jobs = jobs[jobs["id"].str.contains(find)] @@ -414,7 +411,6 @@ def batch_list(ctx, status, n, find, username): if status is not None: jobs = jobs[jobs["state"] == status] - # if no rows in jobs dataframe then print message and exit if len(jobs) == 0: print("No jobs found.") return From dc5f6de442d5a58a56391c502f126ce3b5ad3e86 Mon Sep 17 00:00:00 2001 From: Asif Tamuri Date: Tue, 1 Aug 2023 11:22:02 +0100 Subject: [PATCH 7/8] Doc the default options --- src/tlo/cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/tlo/cli.py b/src/tlo/cli.py index 60d3948243..e42231f03a 100644 --- a/src/tlo/cli.py +++ b/src/tlo/cli.py @@ -366,7 +366,9 @@ def batch_job(ctx, job_id, raw, show_tasks): @click.option("--username", type=str, hidden=True) @click.pass_context def batch_list(ctx, status, n, find, username): - """List and find running and completed jobs.""" + """List and find running and completed jobs. + By default, the 5 most recent jobs are displayed for the current user. + """ print("Querying Batch...") config = load_config(ctx.obj["config_file"]) From e1f1612ac8bb1d30f68c16cfd752907286f1fa3e Mon Sep 17 00:00:00 2001 From: Asif Tamuri Date: Tue, 1 Aug 2023 11:34:24 +0100 Subject: [PATCH 8/8] split datetime into separate creation date and time columns --- src/tlo/cli.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/tlo/cli.py b/src/tlo/cli.py index e42231f03a..afc11856e9 100644 --- a/src/tlo/cli.py +++ b/src/tlo/cli.py @@ -423,6 +423,14 @@ def batch_list(ctx, status, n, find, username): # sort by creation time jobs = jobs.sort_values("creation_time", ascending=False) + # split datetime into date and time + jobs["creation_time"] = pd.to_datetime(jobs.creation_time) + jobs["creation_date"] = jobs.creation_time.dt.date + jobs["creation_time"] = jobs.creation_time.dt.floor("S").dt.time + + # reorder columns + jobs = jobs[["id", "creation_date", "creation_time", "state"]] + # get the first n rows jobs = jobs.head(n)