Skip to content

Commit

Permalink
Allow to delete n jobs per user.
Browse files Browse the repository at this point in the history
  • Loading branch information
notoraptor committed Jun 13, 2024
1 parent bd54738 commit a1ebb07
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 2 deletions.
62 changes: 61 additions & 1 deletion scripts/cleanup_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,17 @@ def main(arguments: list):
type=int,
help=(
"Number of most recent jobs to keep. If specified, script will delete all older jobs until N jobs remain. "
"If there were initially less than N jobs in database, nothing will be deleted."
"If there were initially at most N jobs in database, nothing will be deleted."
),
)
group.add_argument(
"-u",
"--jobs-per-user",
type=int,
help=(
"Number of most recent jobs to keep **PER USER**. "
"If specified, script will delete all older jobs until N Jobss remain for each user. "
"If a user initially had at most N jobs, all its jobs will remain."
),
)
group.add_argument(
Expand All @@ -47,6 +57,8 @@ def main(arguments: list):

if args.jobs is not None:
keep_n_most_recent_jobs(args.jobs)
elif args.jobs_per_user is not None:
keep_n_most_recent_jobs_per_user(args.jobs_per_user)
else:
if args.date.count("-") == 2:
date_format = "%Y-%m-%d"
Expand Down Expand Up @@ -91,6 +103,54 @@ def keep_n_most_recent_jobs(n: int):
)


def keep_n_most_recent_jobs_per_user(n: int):
mc = _get_db()
db_jobs = mc["jobs"]
nb_total_jobs = db_jobs.count_documents({})

db_users = mc["users"]
nb_users = db_users.count_documents({})
print(f"Keeping {n} most recent jobs for each of {nb_users} users.")

jobs_to_delete = []
for user in db_users.find({}).sort([("mila_email_username", 1)]):
mila_email_username = user["mila_email_username"]
user_filter = {"cw.mila_email_username": mila_email_username}
nb_user_jobs = db_jobs.count_documents(user_filter)
if nb_user_jobs <= n:
# print(f"[{mila_email_username}] {nb_user_jobs} user jobs, nothing to do.")
continue

# Find user jobs to delete.
# Sort jobs by slurm_last_update ascending
# and keep only first jobs, excluding n last jobs.
# NB: Jobs that don't have `last_slurm_update` will appear first in sorting.
user_jobs_to_delete = list(
db_jobs.find(user_filter)
.sort([("cw.last_slurm_update", 1)])
.limit(nb_user_jobs - n)
)
assert len(user_jobs_to_delete) == nb_user_jobs - n
jobs_to_delete.extend(user_jobs_to_delete)
# print(f"[{mila_email_username}] {nb_user_jobs} user jobs, {len(user_jobs_to_delete)} to delete.")

if not jobs_to_delete:
print(f"Each user has at most {n} jobs, nothing to do.")
return

filter_to_delete = {"_id": {"$in": [job["_id"] for job in jobs_to_delete]}}
result = db_jobs.delete_many(filter_to_delete)
nb_deleted_jobs = result.deleted_count
# Delete user props associated to deleted jobs
_delete_user_props(mc, jobs_to_delete)

nb_remaining_jobs = db_jobs.count_documents({})

print(
f"Jobs in database: initially {nb_total_jobs}, deleted {nb_deleted_jobs}, remaining {nb_remaining_jobs}"
)


def keep_jobs_from_date(date: datetime):
print(f"Keeping jobs starting from: {date}")
mc = _get_db()
Expand Down
96 changes: 95 additions & 1 deletion scripts_test/test_cleanup_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from scripts.cleanup_jobs import main as cleanup_jobs
from scripts_test.config import get_config
from datetime import datetime, timedelta
from collections import Counter


class CleanupTestContext:
Expand All @@ -22,12 +23,21 @@ def __init__(self):

base_datetime = datetime.now()

# Create fake users, intended to be checked when cleaning jobs per user.
fake_users = [
{"mila_email_username": "[email protected]"},
{"mila_email_username": "[email protected]"},
]
# Create fake jobs
fake_jobs = [
{
"slurm": {"job_id": str(i), "cluster_name": f"cluster_{i}"},
"cw": {
"last_slurm_update": (base_datetime + timedelta(days=i)).timestamp()
"last_slurm_update": (
base_datetime + timedelta(days=i)
).timestamp(),
# Associate first NB_JOBS / 4 Jobs to first_user_0 and other jobs to first_user_1
"mila_email_username": f"first_user_{int(i >= self.NB_JOBS / 4)}@email.com",
},
}
for i in range(self.NB_JOBS)
Expand All @@ -54,8 +64,12 @@ def __init__(self):
del fake_jobs[0]["cw"]["last_slurm_update"]
fake_jobs[1]["cw"]["last_slurm_update"] = None

db_users = mc["users"]
db_jobs = mc["jobs"]
db_user_props = mc["job_user_props"]

db_users.delete_many({})
db_users.insert_many(fake_users)
db_jobs.delete_many({})
db_jobs.insert_many(fake_jobs)
db_user_props.delete_many({})
Expand Down Expand Up @@ -150,6 +164,86 @@ def test_keep_n_most_recent_jobs():
assert len(ctx.check_user_props()) == 0


def test_keep_n_most_recent_jobs_per_user():
with CleanupTestContext() as ctx:
jobs = ctx.check_user_props()

# Check we indeed have 25 jobs for first_user_0 and 75 jobs for first_user_1
nb_jobs_per_user = Counter(job["cw"]["mila_email_username"] for job in jobs)
assert len(nb_jobs_per_user) == 2
count = nb_jobs_per_user.most_common()
assert count == [("[email protected]", 75), ("[email protected]", 25)]

cleanup_jobs(["-u", "100"])
# Nothing should happen
assert jobs == ctx.check_user_props()

cleanup_jobs(["-u", "75"])
# Nothing shoule happen
assert jobs == ctx.check_user_props()

cleanup_jobs(["-u", "74"])
# Nothing should happen for first_user_0 (25 => 25)
# 1 job (oldest one) should be deleted for first_user_1 (75 => 74)
current_jobs = ctx.check_user_props()
jobs_first_user_0 = current_jobs[:25]
jobs_first_user_1 = current_jobs[25:]
assert len(jobs_first_user_0) == 25
assert len(jobs_first_user_1) == 74
assert all(
job["cw"]["mila_email_username"] == "[email protected]"
for job in jobs_first_user_0
)
assert all(
job["cw"]["mila_email_username"] == "[email protected]"
for job in jobs_first_user_1
)
assert jobs[:25] == jobs_first_user_0
assert jobs[26:] == jobs_first_user_1

cleanup_jobs(["-u", "26"])
# Nothing should happen for first_user_0 (25 => 25)
# 74 - 26 jobs (oldest ones) should be deleted for first_user_1 (74 => 26)
current_jobs = ctx.check_user_props()
jobs_first_user_0 = current_jobs[:25]
jobs_first_user_1 = current_jobs[25:]
assert len(jobs_first_user_0) == 25
assert len(jobs_first_user_1) == 26
assert all(
job["cw"]["mila_email_username"] == "[email protected]"
for job in jobs_first_user_0
)
assert all(
job["cw"]["mila_email_username"] == "[email protected]"
for job in jobs_first_user_1
)
assert jobs[:25] == jobs_first_user_0
assert jobs[-26:] == jobs_first_user_1

cleanup_jobs(["-u", "17"])
# 25 - 17 jobs (oldest ones) should be deleted for first_user_0 (25 => 17)
# 26 - 17 jobs (oldest ones) should be deleted for first_user_1 (26 => 17)
current_jobs = ctx.check_user_props()
jobs_first_user_0 = current_jobs[:17]
jobs_first_user_1 = current_jobs[17:]
assert len(jobs_first_user_0) == 17
assert len(jobs_first_user_1) == 17
assert all(
job["cw"]["mila_email_username"] == "[email protected]"
for job in jobs_first_user_0
)
assert all(
job["cw"]["mila_email_username"] == "[email protected]"
for job in jobs_first_user_1
)
assert jobs[(25 - 17) : 25] == jobs_first_user_0
assert jobs[-17:] == jobs_first_user_1

cleanup_jobs(["-u", "0"])
# All jobs should be deleted
assert len(ctx.check_user_props()) == 0


# Parameterize with the two date formats accepted by script `cleanup_jobs`
@pytest.mark.parametrize("date_format", ["%Y-%m-%d-%H:%M:%S", "%Y-%m-%d"])
def test_keep_jobs_after_a_date(date_format):
Expand Down

0 comments on commit a1ebb07

Please sign in to comment.