diff --git a/scripts/cleanup_jobs.py b/scripts/cleanup_jobs.py new file mode 100644 index 00000000..1f946ead --- /dev/null +++ b/scripts/cleanup_jobs.py @@ -0,0 +1,230 @@ +""" +Script to clean-up jobs in database. + +To evaluate job status, we check job["slurm"]["slurm_last_update"]. +""" + +import argparse +import sys +from datetime import datetime +from slurm_state.mongo_client import get_mongo_client +from slurm_state.config import get_config + + +def main(arguments: list): + parser = argparse.ArgumentParser(description="Delete old jobs from database.") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "-n", + "--jobs", + type=int, + help=( + "Number of most recent jobs to keep. If specified, script will delete all older jobs until N jobs remain. " + "If there were initially at most N jobs in database, nothing will be deleted." + ), + ) + group.add_argument( + "-u", + "--jobs-per-user", + type=int, + help=( + "Number of most recent jobs to keep **PER USER**. " + "If specified, script will delete all older jobs until N Jobss remain for each user. " + "If a user initially had at most N jobs, all its jobs will remain." + ), + ) + group.add_argument( + "-d", + "--date", + help=( + "Date of older job to keep. " + "Format 'YYYY-MM-DD-HH:MM:SS', or 'YYYY-MM-DD' (equivalent to 'YYYY-MM-DD-00:00:00'). " + "If specified, script will delete all jobs older than given date." + ), + ) + parser.add_argument( + "--debug", + action="store_true", + help=( + "If specified, print info (job ID and slurm_last_update) " + "about all available jobs before and after cleanup." + ), + ) + args = parser.parse_args(arguments) + + if args.debug: + _debug_db_jobs() + + if args.jobs is not None: + keep_n_most_recent_jobs(args.jobs) + elif args.jobs_per_user is not None: + keep_n_most_recent_jobs_per_user(args.jobs_per_user) + else: + if args.date.count("-") == 2: + date_format = "%Y-%m-%d" + else: + date_format = "%Y-%m-%d-%H:%M:%S" + date = datetime.strptime(args.date, date_format) + keep_jobs_from_date(date) + + if args.debug: + _debug_db_jobs() + + +def keep_n_most_recent_jobs(n: int): + print(f"Keeping {n} most recent jobs") + mc = _get_db() + db_jobs = mc["jobs"] + nb_total_jobs = db_jobs.count_documents({}) + + if nb_total_jobs <= n: + print(f"{nb_total_jobs} jobs in database, {n} to keep, nothing to do.") + return + + # Find jobs to delete. + # Sort jobs by slurm_last_update ascending + # and keep only first jobs, excluding n last jobs. + jobs_to_delete = list( + db_jobs.find({}).sort([("cw.last_slurm_update", 1)]).limit(nb_total_jobs - n) + ) + assert len(jobs_to_delete) == nb_total_jobs - n + # Delete jobs + filter_to_delete = {"_id": {"$in": [job["_id"] for job in jobs_to_delete]}} + result = db_jobs.delete_many(filter_to_delete) + nb_deleted_jobs = result.deleted_count + # Delete user props associated to deleted jobs + _delete_user_props(mc, jobs_to_delete) + + nb_remaining_jobs = db_jobs.count_documents({}) + + print( + f"Jobs in database: initially {nb_total_jobs}, deleted {nb_deleted_jobs}, remaining {nb_remaining_jobs}" + ) + + +def keep_n_most_recent_jobs_per_user(n: int): + mc = _get_db() + db_jobs = mc["jobs"] + nb_total_jobs = db_jobs.count_documents({}) + + db_users = mc["users"] + nb_users = db_users.count_documents({}) + print(f"Keeping {n} most recent jobs for each of {nb_users} users.") + + jobs_to_delete = [] + for user in db_users.find({}).sort([("mila_email_username", 1)]): + mila_email_username = user["mila_email_username"] + user_filter = {"cw.mila_email_username": mila_email_username} + nb_user_jobs = db_jobs.count_documents(user_filter) + if nb_user_jobs <= n: + # print(f"[{mila_email_username}] {nb_user_jobs} user jobs, nothing to do.") + continue + + # Find user jobs to delete. + # Sort jobs by slurm_last_update ascending + # and keep only first jobs, excluding n last jobs. + user_jobs_to_delete = list( + db_jobs.find(user_filter) + .sort([("cw.last_slurm_update", 1)]) + .limit(nb_user_jobs - n) + ) + assert len(user_jobs_to_delete) == nb_user_jobs - n + jobs_to_delete.extend(user_jobs_to_delete) + # print(f"[{mila_email_username}] {nb_user_jobs} user jobs, {len(user_jobs_to_delete)} to delete.") + + if not jobs_to_delete: + print(f"Each user has at most {n} jobs, nothing to do.") + return + + filter_to_delete = {"_id": {"$in": [job["_id"] for job in jobs_to_delete]}} + result = db_jobs.delete_many(filter_to_delete) + nb_deleted_jobs = result.deleted_count + # Delete user props associated to deleted jobs + _delete_user_props(mc, jobs_to_delete) + + nb_remaining_jobs = db_jobs.count_documents({}) + + print( + f"Jobs in database: initially {nb_total_jobs}, deleted {nb_deleted_jobs}, remaining {nb_remaining_jobs}" + ) + + +def keep_jobs_from_date(date: datetime): + print(f"Keeping jobs starting from: {date}") + mc = _get_db() + db_jobs = mc["jobs"] + nb_total_jobs = db_jobs.count_documents({}) + + # Find jobs to delete + + jobs_to_delete = list( + db_jobs.find({"cw.last_slurm_update": {"$lt": date.timestamp()}}) + ) + if not jobs_to_delete: + print(f"No job found before {date}, nothing to do.") + return + + # Delete jobs + filter_to_delete = {"_id": {"$in": [job["_id"] for job in jobs_to_delete]}} + result = db_jobs.delete_many(filter_to_delete) + nb_deleted_jobs = result.deleted_count + # Delete user props associated to deleted jobs + _delete_user_props(mc, jobs_to_delete) + + nb_remaining_jobs = db_jobs.count_documents({}) + + print( + f"Jobs in database: initially {nb_total_jobs}, deleted {nb_deleted_jobs}, remaining {nb_remaining_jobs}" + ) + + +def _delete_user_props(mc, jobs: list): + """Delete user props associated to given jobs.""" + # Build filter. + # Use OR, to delete any of given jobs. + filter_jobs = { + "$or": [ + # For each job, find user prop with same job ID and cluster name. + { + "$and": [ + {"job_id": job["slurm"]["job_id"]}, + {"cluster_name": job["slurm"]["cluster_name"]}, + ] + } + for job in jobs + ] + } + + result = mc["job_user_props"].delete_many(filter_jobs) + return result.deleted_count + + +def _get_db(): + client = get_mongo_client() + mc = client[get_config("mongo.database_name")] + return mc + + +def _debug_db_jobs(): + """Debug function. Print job ID and `last_slurm_update` for each job in database.""" + mc = _get_db() + db_jobs = mc["jobs"] + jobs = list(db_jobs.find({}).sort([("cw.last_slurm_update", 1)])) + nb_jobs = len(jobs) + print(f"[JOBS: {nb_jobs}]") + for i, job in enumerate(jobs): + print( + f"\t[{i + 1}/{nb_jobs}] job_id={job['slurm']['job_id']} cw.last_slurm_update={_fmt_last_slurm_update(job)}" + ) + + +def _fmt_last_slurm_update(job): + """Pretty print last_slurm_update (None if None, else as a date time).""" + v = job["cw"].get("last_slurm_update") + if v is None: + return v + return datetime.fromtimestamp(v) + + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/scripts_test/Dockerfile b/scripts_test/Dockerfile index 23d4f6c0..9e148236 100644 --- a/scripts_test/Dockerfile +++ b/scripts_test/Dockerfile @@ -15,4 +15,8 @@ RUN pip install -r /requirements_scripts.txt && rm -rf /root/.cache COPY scripts_test/requirements.txt /requirements_scripts_test.txt RUN pip install -r /requirements_scripts_test.txt && rm -rf /root/.cache +# Add folder `slurm_state` so that `slurm_state` symbols are available during tests. +# `slurm_state` symbols get_mongo_client() and get_config() are used in script `cleanup_jobs`. +ADD slurm_state slurm_state + CMD ["coverage", "run", "--source=scripts", "--rcfile=scripts_test/.coveragerc", "-m", "pytest", "scripts_test"] diff --git a/scripts_test/test_cleanup_jobs.py b/scripts_test/test_cleanup_jobs.py new file mode 100644 index 00000000..f99fbc0f --- /dev/null +++ b/scripts_test/test_cleanup_jobs.py @@ -0,0 +1,276 @@ +import pytest + +from pymongo import MongoClient +from scripts.cleanup_jobs import main as cleanup_jobs +from scripts_test.config import get_config +from datetime import datetime, timedelta +from collections import Counter + + +class CleanupTestContext: + """ + Helper to test job cleanup script. + + Create and fill a test database. + """ + + NB_JOBS = 100 + + def __init__(self): + db_name = get_config("mongo.database_name") + client = MongoClient(get_config("mongo.connection_string")) + mc = client[db_name] + + base_datetime = datetime.now() + + # Create fake users, intended to be checked when cleaning jobs per user. + fake_users = [ + {"mila_email_username": "first_user_0@email.com"}, + {"mila_email_username": "first_user_1@email.com"}, + ] + # Create fake jobs + fake_jobs = [ + { + "slurm": {"job_id": str(i), "cluster_name": f"cluster_{i}"}, + "cw": { + "last_slurm_update": ( + base_datetime + timedelta(days=i) + ).timestamp(), + # Associate first NB_JOBS / 4 Jobs to first_user_0 and other jobs to first_user_1 + "mila_email_username": f"first_user_{int(i >= self.NB_JOBS / 4)}@email.com", + }, + } + for i in range(self.NB_JOBS) + ] + # Create fake user props, 2 for each job + fake_user_props = [ + { + "mila_email_username": f"first_user_{i}@email.com", + "cluster_name": fake_job["slurm"]["cluster_name"], + "job_id": fake_job["slurm"]["job_id"], + "props": {"prop first user 1": "value first user 1"}, + } + for i, fake_job in enumerate(fake_jobs) + ] + [ + { + "mila_email_username": f"second_user_{i}@email.com", + "cluster_name": fake_job["slurm"]["cluster_name"], + "job_id": fake_job["slurm"]["job_id"], + "props": {"prop second user 1": "value second user 1"}, + } + for i, fake_job in enumerate(fake_jobs) + ] + + db_users = mc["users"] + db_jobs = mc["jobs"] + db_user_props = mc["job_user_props"] + + db_users.delete_many({}) + db_users.insert_many(fake_users) + db_jobs.delete_many({}) + db_jobs.insert_many(fake_jobs) + db_user_props.delete_many({}) + db_user_props.insert_many(fake_user_props) + assert db_jobs.count_documents({}) == self.NB_JOBS + + self.db_name = db_name + self.base_datetime = base_datetime + self.mc = mc + self.db_jobs = db_jobs + self.db_user_props = db_user_props + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.mc.drop_collection(self.db_name) + print("cleaned") + + def _get_jobs(self): + """Get current jobs in test database.""" + jobs = list(self.db_jobs.find({}).sort([("cw.last_slurm_update", 1)])) + for job in jobs: + del job["_id"] + return jobs + + def check_user_props(self) -> list: + """ + Check that current user props exactly match current jobs in database. + + We expect 2 different user prop dicts per job. + + If everything is ok, return current jobs in database. + """ + # Get all jobs + jobs = self._get_jobs() + # Get all user props + user_props = list(self.db_user_props.find({})) + + assert len(user_props) == 2 * len(jobs) + + # Map each job to user prop emails. + user_props_map = {} + for prop in user_props: + key = prop["job_id"], prop["cluster_name"] + user_props_map.setdefault(key, set()).add(prop["mila_email_username"]) + + assert len(user_props_map) == len(jobs) + # Check each job is associated to 2 user props (i.e. 2 emails) + for job in jobs: + key = job["slurm"]["job_id"], job["slurm"]["cluster_name"] + assert key in user_props_map + assert len(user_props_map[key]) == 2 + # Delete found job in mapping + del user_props_map[key] + # At end, mapping should be empty (every job should have been found) + assert not user_props_map + + # Return jobs for further checks + return jobs + + +def test_keep_n_most_recent_jobs(): + with CleanupTestContext() as ctx: + jobs = ctx.check_user_props() + + cleanup_jobs(["-n", "500"]) + # There are less than 500 jobs in db, nothing should happen. + assert jobs == ctx.check_user_props() + + cleanup_jobs(["-n", str(CleanupTestContext.NB_JOBS)]) + # There are exactly NB_JOBS jobs in db, nothing should happen. + assert jobs == ctx.check_user_props() + + cleanup_jobs(["-n", "60"]) + # Now db should contain only 60 jobs + remaining_jobs = ctx.check_user_props() + assert len(remaining_jobs) == 60 + assert jobs[-60:] == remaining_jobs + + cleanup_jobs(["-n", "60"]) + # There are exactly 60 jobs in db, nothing should happen. + assert remaining_jobs == ctx.check_user_props() + + cleanup_jobs(["-n", "15"]) + remaining_jobs = ctx.check_user_props() + assert len(remaining_jobs) == 15 + assert jobs[-15:] == remaining_jobs + + cleanup_jobs(["-n", "0"]) + # With current code, "-n 0" will erase all jobs in database. + assert len(ctx.check_user_props()) == 0 + + +def test_keep_n_most_recent_jobs_per_user(): + with CleanupTestContext() as ctx: + jobs = ctx.check_user_props() + + # Check we indeed have 25 jobs for first_user_0 and 75 jobs for first_user_1 + nb_jobs_per_user = Counter(job["cw"]["mila_email_username"] for job in jobs) + assert len(nb_jobs_per_user) == 2 + count = nb_jobs_per_user.most_common() + assert count == [("first_user_1@email.com", 75), ("first_user_0@email.com", 25)] + + cleanup_jobs(["-u", "100"]) + # Nothing should happen + assert jobs == ctx.check_user_props() + + cleanup_jobs(["-u", "75"]) + # Nothing shoule happen + assert jobs == ctx.check_user_props() + + cleanup_jobs(["-u", "74"]) + # Nothing should happen for first_user_0 (25 => 25) + # 1 job (oldest one) should be deleted for first_user_1 (75 => 74) + current_jobs = ctx.check_user_props() + jobs_first_user_0 = current_jobs[:25] + jobs_first_user_1 = current_jobs[25:] + assert len(jobs_first_user_0) == 25 + assert len(jobs_first_user_1) == 74 + assert all( + job["cw"]["mila_email_username"] == "first_user_0@email.com" + for job in jobs_first_user_0 + ) + assert all( + job["cw"]["mila_email_username"] == "first_user_1@email.com" + for job in jobs_first_user_1 + ) + assert jobs[:25] == jobs_first_user_0 + assert jobs[26:] == jobs_first_user_1 + + cleanup_jobs(["-u", "26"]) + # Nothing should happen for first_user_0 (25 => 25) + # 74 - 26 jobs (oldest ones) should be deleted for first_user_1 (74 => 26) + current_jobs = ctx.check_user_props() + jobs_first_user_0 = current_jobs[:25] + jobs_first_user_1 = current_jobs[25:] + assert len(jobs_first_user_0) == 25 + assert len(jobs_first_user_1) == 26 + assert all( + job["cw"]["mila_email_username"] == "first_user_0@email.com" + for job in jobs_first_user_0 + ) + assert all( + job["cw"]["mila_email_username"] == "first_user_1@email.com" + for job in jobs_first_user_1 + ) + assert jobs[:25] == jobs_first_user_0 + assert jobs[-26:] == jobs_first_user_1 + + cleanup_jobs(["-u", "17"]) + # 25 - 17 jobs (oldest ones) should be deleted for first_user_0 (25 => 17) + # 26 - 17 jobs (oldest ones) should be deleted for first_user_1 (26 => 17) + current_jobs = ctx.check_user_props() + jobs_first_user_0 = current_jobs[:17] + jobs_first_user_1 = current_jobs[17:] + assert len(jobs_first_user_0) == 17 + assert len(jobs_first_user_1) == 17 + assert all( + job["cw"]["mila_email_username"] == "first_user_0@email.com" + for job in jobs_first_user_0 + ) + assert all( + job["cw"]["mila_email_username"] == "first_user_1@email.com" + for job in jobs_first_user_1 + ) + assert jobs[(25 - 17) : 25] == jobs_first_user_0 + assert jobs[-17:] == jobs_first_user_1 + + cleanup_jobs(["-u", "0"]) + # All jobs should be deleted + assert len(ctx.check_user_props()) == 0 + + +# Parameterize with the two date formats accepted by script `cleanup_jobs` +@pytest.mark.parametrize("date_format", ["%Y-%m-%d-%H:%M:%S", "%Y-%m-%d"]) +def test_keep_jobs_after_a_date(date_format): + with CleanupTestContext() as ctx: + too_old_date = ctx.base_datetime - timedelta(days=1) + inbound_date_1 = ctx.base_datetime + timedelta(days=15) + inbound_date_2 = ctx.base_datetime + timedelta(days=60) + new_date = ctx.base_datetime + timedelta(days=ctx.NB_JOBS) + + def _fmt_date(d: datetime): + return d.strftime(date_format) + + jobs = ctx.check_user_props() + + cleanup_jobs(["-d", _fmt_date(too_old_date)]) + # Date is too old, so no job should be deleted + assert jobs == ctx.check_user_props() + + cleanup_jobs(["-d", _fmt_date(inbound_date_1)]) + remaining_jobs = ctx.check_user_props() + assert len(remaining_jobs) == 100 - 15 + assert jobs[15:] == remaining_jobs + + cleanup_jobs(["-d", _fmt_date(inbound_date_2)]) + remaining_jobs = ctx.check_user_props() + assert len(remaining_jobs) == 100 - 60 + assert jobs[60:] == remaining_jobs + + cleanup_jobs(["-d", _fmt_date(new_date)]) + # With a date more recent than latest job, + # all jobs should be deleted. + remaining_jobs = ctx.check_user_props() + assert len(remaining_jobs) == 0