Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various updates #234

Merged
merged 13 commits into from
Jan 13, 2025
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ requires-python = ">=3.9"
dependencies = [
"fabric ~= 3.2",
"flufl.lock ~= 8.0",
"jobflow >= 0.1.14",
"jobflow >= 0.1.19",
"psutil >= 5.9,< 7.0",
"pydantic ~= 2.4",
"python-dateutil>=2.8.2",
Expand Down Expand Up @@ -161,7 +161,6 @@ addopts = [
]

[tool.coverage.run]
include = ["src/*"]
parallel = true
branch = true

Expand Down
10 changes: 5 additions & 5 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
jobflow==0.1.18
pydantic==2.9.2
jobflow==0.1.19
pydantic==2.10.5
fabric==3.2.2
tomlkit==0.13.2
qtoolkit==0.1.5
typer==0.12.5
rich==13.9.3
psutil==6.1.0
typer==0.15.1
rich==13.9.4
psutil==6.1.1
supervisor==4.2.5
ruamel.yaml==0.17.35
schedule==1.2.2
Expand Down
41 changes: 31 additions & 10 deletions src/jobflow_remote/cli/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,6 @@

@app_admin.command()
def upgrade(
test_version_upgrade: Annotated[
Optional[str],
typer.Option(
help="Explicitly sets the target version for upgrade. For testing purposes only",
hidden=True,
),
] = None,
no_dry_run: Annotated[
bool,
typer.Option(
Expand All @@ -68,6 +61,15 @@ def upgrade(
"proceeding with the upgrade",
),
] = False,
target: Annotated[
Optional[str],
typer.Option(
"--target",
"-t",
help="Explicitly sets the target version for upgrade. For testing purposes or development"
" versions. Not for standard updates.",
),
] = None,
) -> None:
"""
Upgrade the jobflow database.
Expand All @@ -78,7 +80,24 @@ def upgrade(

jc = get_job_controller()
upgrader = DatabaseUpgrader(jc)
target_version = parse_version(test_version_upgrade) or upgrader.current_version
target_version = parse_version(target) if target else upgrader.current_version
if target_version.local or target_version.post:
# this is likely a developer version installed from source.
# get the available upgrades for version higher than the developer one
base_version = parse_version(target_version.base_version)
upgrades_available = upgrader.collect_upgrades(
base_version, upgrader.registered_upgrades[-1]
)
msg = (
f"Target version {target_version} is likely a development version. Explicitly "
f"specify the target version with the --target option if this is the case."
)
if upgrades_available:
msg += (
f" Available upgrades larger than {base_version}: "
f"{','.join(str(v) for v in upgrades_available)}"
)
out_console.print(msg, style="gold1")
db_version = jc.get_current_db_version()
if db_version >= target_version:
exit_with_warning_msg(
Expand All @@ -100,7 +119,7 @@ def upgrade(
out_console.print(text)

if not no_dry_run:
actions = upgrader.dry_run(target_version=test_version_upgrade)
actions = upgrader.dry_run(target_version=target)
if not actions:
out_console.print(
f"No actions will be required for upgrading to version {target_version}"
Expand All @@ -121,7 +140,7 @@ def upgrade(
with loading_spinner(processing=False) as progress:
progress.add_task(description="Upgrading the DB...", total=None)

done = upgrader.upgrade(target_version=test_version_upgrade)
done = upgrader.upgrade(target_version=target)
not_text = "" if done else "[bold]NOT [/bold]"
out_console.print(f"The database has {not_text}been upgraded")

Expand Down Expand Up @@ -191,6 +210,7 @@ def unlock(
) -> None:
"""
Forcibly removes the lock from the documents of the selected jobs.
If no criteria is specified all the locked jobs will be selected.
WARNING: can lead to inconsistencies if the processes is actually running.
"""
job_ids_indexes = get_job_ids_indexes(job_id)
Expand Down Expand Up @@ -249,6 +269,7 @@ def unlock_flow(
) -> None:
"""
Forcibly removes the lock from the documents of the selected jobs.
If no criteria is specified all the locked flows will be selected.
WARNING: can lead to inconsistencies if the processes is actually running.
"""
job_ids_indexes = get_job_ids_indexes(job_id)
Expand Down
6 changes: 6 additions & 0 deletions src/jobflow_remote/cli/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
hours_opt,
job_flow_id_flag_opt,
job_ids_opt,
locked_flow_opt,
max_results_opt,
metadata_opt,
name_opt,
reverse_sort_flag_opt,
sort_opt,
Expand Down Expand Up @@ -69,6 +71,8 @@ def flows_list(
name: name_opt = None,
days: days_opt = None,
hours: hours_opt = None,
metadata: metadata_opt = None,
locked: locked_flow_opt = False,
verbosity: verbosity_opt = 0,
max_results: max_results_opt = 100,
sort: sort_opt = SortOption.UPDATED_ON,
Expand All @@ -93,6 +97,8 @@ def flows_list(
start_date=start_date,
end_date=end_date,
name=name,
metadata=metadata,
locked=locked,
limit=max_results,
sort=db_sort,
full=verbosity > 0,
Expand Down
11 changes: 11 additions & 0 deletions src/jobflow_remote/cli/formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,17 @@ def format_job_info(
if remote_error:
d["remote"]["error"] = ReprStr(remote_error)

if verbosity == 0:
d["remote"].pop("queue_out", None)
d["remote"].pop("queue_err", None)
else:
queue_out = d["remote"].get("queue_out")
if queue_out:
d["remote"]["queue_out"] = ReprStr(queue_out)
queue_err = d["remote"].get("queue_err")
if queue_err:
d["remote"]["queue_err"] = ReprStr(queue_err)

# reorder the keys
# Do not check here that all the keys in JobInfo are in JOB_INFO_ORDER. Check in the tests
sorted_d = {}
Expand Down
10 changes: 10 additions & 0 deletions src/jobflow_remote/cli/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,16 @@
]


locked_flow_opt = Annotated[
bool,
typer.Option(
"--locked",
"-l",
help="Select locked Flows",
),
]


serialize_file_format_opt = Annotated[
SerializeFileFormat,
typer.Option(
Expand Down
6 changes: 6 additions & 0 deletions src/jobflow_remote/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ class WorkerBase(BaseModel):
description="Sanitize the output of commands in case of failures due to spurious text produced"
"by the worker shell.",
)
delay_download: Optional[int] = Field(
default=None,
description="Amount of seconds to wait to start the download after the Runner marked a Job "
"as TERMINATED. To account for delays in the writing of the file on the worker file system"
" (e.g. NFS).",
)
model_config = ConfigDict(extra="forbid")

@field_validator("scheduler_type")
Expand Down
9 changes: 8 additions & 1 deletion src/jobflow_remote/jobs/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,11 @@ def _get_runner_info(self) -> dict:
user = os.getlogin()
except OSError:
user = os.environ.get("USER", None)
# Note that this approach may give a different MAC address for the
# same machine, if more than one network device is present (this
# may also include local virtual machines). Consider replacing this
# with a more stable choice (e.g. the first one in alphabetical order,
# excluding virtual interfaces)
mac_address = None
found = False
for addrs in psutil.net_if_addrs().values():
Expand Down Expand Up @@ -1063,11 +1068,13 @@ def _check_running_runner(
db_data = doc["running_runner"]

local_data = self._get_runner_info()
# not testing on the MAC address as it may change due to identifying
# different network devices on the same machine or changing in
# cloud VMs.
data_to_check = [
"hostname",
"project_name",
"user",
"mac_address",
"daemon_dir",
]
for data in data_to_check:
Expand Down
5 changes: 5 additions & 0 deletions src/jobflow_remote/jobs/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def get_initial_flow_doc_dict(flow: Flow, job_dicts: list[dict]) -> dict:
name=flow.name,
ids=ids,
parents=parents,
metadata=flow.metadata or {},
)

return flow_doc.as_db_dict()
Expand All @@ -120,6 +121,8 @@ class RemoteInfo(BaseModel):
retry_time_limit: Optional[datetime] = None
error: Optional[str] = None
prerun_cleanup: bool = False
queue_out: Optional[str] = None
queue_err: Optional[str] = None


class JobInfo(BaseModel):
Expand Down Expand Up @@ -508,6 +511,8 @@ def get_reset_job_base_dict() -> dict:
"previous_state": None,
"remote.queue_state": None,
"remote.error": None,
"remote.queue_out": None,
"remote.queue_err": None,
"error": None,
"updated_on": datetime.utcnow(),
"start_time": None,
Expand Down
Loading
Loading