From c325f4e3a376083a3d3c4b53d02b3a590c217acc Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Sat, 28 Dec 2024 00:42:37 +0100 Subject: [PATCH 01/13] allow delay download and improve error when missing output store files --- src/jobflow_remote/config/base.py | 6 +++ src/jobflow_remote/jobs/jobcontroller.py | 48 +++++++++++++++++--- src/jobflow_remote/jobs/run.py | 6 +-- src/jobflow_remote/jobs/runner.py | 46 ++++++++++++++----- tests/db/jobs/test_runner.py | 20 +++++++++ tests/integration/test_failures.py | 56 ++++++++++++++++++++++++ 6 files changed, 163 insertions(+), 19 deletions(-) create mode 100644 tests/integration/test_failures.py diff --git a/src/jobflow_remote/config/base.py b/src/jobflow_remote/config/base.py index 6b92e49e..6ce83c4f 100644 --- a/src/jobflow_remote/config/base.py +++ b/src/jobflow_remote/config/base.py @@ -198,6 +198,12 @@ class WorkerBase(BaseModel): description="Sanitize the output of commands in case of failures due to spurious text produced" "by the worker shell.", ) + delay_download: Optional[int] = Field( + default=None, + description="Amount of seconds to wait to start the download after the Runner marked a Job " + "as TERMINATED. To account for delays in the writing of the file on the worker file system" + " (e.g. NFS).", + ) model_config = ConfigDict(extra="forbid") @field_validator("scheduler_type") diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index 6eecb101..b1806236 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -50,7 +50,11 @@ FlowState, JobState, ) -from jobflow_remote.remote.data import get_remote_store, update_store +from jobflow_remote.remote.data import ( + get_remote_store, + get_remote_store_filenames, + update_store, +) from jobflow_remote.remote.queue import QueueManager from jobflow_remote.utils.data import ( deep_merge_dict, @@ -3371,6 +3375,8 @@ def complete_job( local_path = Path(local_path) out_path = local_path / OUT_FILENAME host_flow_id = job_doc["job"]["hosts"][-1] + # This check needs to be present because if the worker is "local" + # the download phase is skipped and the check is not done earlier. if not out_path.exists(): msg = ( f"The output file {OUT_FILENAME} was not present in the download " @@ -3421,6 +3427,29 @@ def complete_job( self.update_flow_state(host_flow_id) return True + # Files associated with the store may not have been downloaded. + # First check if they exist and then try to get the store + required_store_files = get_remote_store_filenames( + store, config_dict=self.project.remote_jobstore + ) + for store_file in required_store_files: + if not (local_path / store_file).exists(): + msg = ( + "No explicit error raised during the remote execution, but the output " + f"store file {store_file} is missing in the downloaded folder {local_path}. " + "The file was probably not created in the remote folder during the " + "execution but is needed to proceed." + ) + self.checkin_job( + job_doc, + flow_lock.locked_document, + response=None, + error=msg, + doc_update=doc_update, + ) + self.update_flow_state(host_flow_id) + return True + remote_store = get_remote_store( store, local_path, self.project.remote_jobstore ) @@ -3829,9 +3858,10 @@ def lock_flow(self, **lock_kwargs) -> Generator[MongoLock, None, None]: @contextlib.contextmanager def lock_job_for_update( self, - query, - max_step_attempts, - delta_retry, + query: dict, + max_step_attempts: int, + delta_retry: tuple[int, ...], + next_step_delay: int | None = None, **kwargs, ) -> Generator[MongoLock, None, None]: """ @@ -3849,6 +3879,9 @@ def lock_job_for_update( delta_retry List of increasing delay between subsequent attempts when the advancement of a remote step fails. Used to set the retry time. + next_step_delay + An amount of seconds that sets the delay for the next step to + start even in case there are no errors. kwargs Kwargs passed to the MongoLock context manager. @@ -3900,10 +3933,15 @@ def lock_job_for_update( if lock.locked_document: if not error: + next_step_time_limit = None + if next_step_delay: + next_step_time_limit = datetime.utcnow() + timedelta( + seconds=next_step_delay + ) succeeded_update = { "$set": { "remote.step_attempts": 0, - "remote.retry_time_limit": None, + "remote.retry_time_limit": next_step_time_limit, "remote.error": None, } } diff --git a/src/jobflow_remote/jobs/run.py b/src/jobflow_remote/jobs/run.py index be2b0494..1b8b1415 100644 --- a/src/jobflow_remote/jobs/run.py +++ b/src/jobflow_remote/jobs/run.py @@ -49,8 +49,8 @@ def run_remote_job(run_dir: str | Path = ".") -> None: try: response = job.run(store=store) finally: - # some jobs may have compressed the FW files while being executed, - # try to decompress them if that is the case and files need to be + # some jobs may have compressed the jfremote and store files while being + # executed, try to decompress them if that is the case and files need to be # decompressed. decompress_files(store) @@ -65,7 +65,7 @@ def run_remote_job(run_dir: str | Path = ".") -> None: # Convert to Flow the dynamic responses before dumping the output. # This is required so that the response does not need to be - # deserialized and converted by to Flows by the runner. + # deserialized and converted to Flows by the runner. if response.addition: response.addition = get_flow(response.addition) if response.detour: diff --git a/src/jobflow_remote/jobs/runner.py b/src/jobflow_remote/jobs/runner.py index a19e45c7..418f4065 100644 --- a/src/jobflow_remote/jobs/runner.py +++ b/src/jobflow_remote/jobs/runner.py @@ -408,6 +408,7 @@ def run_one_job( job_id: tuple[str, int] | None = None, max_seconds: int | None = None, raise_at_timeout: bool = True, + target_state: JobState | None = None, ) -> bool: """ Use the runner to run a single Job until it reaches a terminal state. @@ -469,7 +470,17 @@ def run_one_job( job_info = self.job_controller.get_job_info( job_id=job_data[0], job_index=job_data[1] ) + if target_state and job_info.state == target_state: + return True if job_info.state.value not in running_states: + # if the target state is defined and the code got + # here, it means it missed the target state, so + # the target was not achieved. + if target_state: + raise RuntimeError( + f"The target state {target_state.value} was not achieved. " + f"Final state: {job_info.state.value}" + ) return True if max_seconds and time.time() - t0 > max_seconds: if raise_at_timeout: @@ -795,24 +806,32 @@ def download(self, lock) -> None: makedirs_p(local_path) - fnames = [OUT_FILENAME] - fnames.extend( - get_remote_store_filenames( - store, config_dict=self.project.remote_jobstore - ) - ) - - for fname in fnames: + def download_file(fname: str, mandatory: bool): # in principle fabric should work by just passing the # destination folder, but it fails remote_file_path = str(Path(remote_path, fname)) try: host.get(remote_file_path, str(Path(local_path, fname))) except FileNotFoundError as exc: - # if files are missing it should not retry err_msg = f"file {remote_file_path} for job {job_dict['uuid']} does not exist" - logger.exception(err_msg) - raise RemoteError(err_msg, no_retry=True) from exc + if mandatory: + logger.exception(err_msg) + # if files are missing it should not retry + raise RemoteError(err_msg, no_retry=True) from exc + + err_msg += ". Allow continuing to the next state" + logger.warning(err_msg) + + # only the output file is mandatory. If the others are missing + # it will be dealt with by the complete. The output file may contain + # an error and the fact that they are missing could be expected. + # The analysis of the output file is left to the completion procedure. + download_file(OUT_FILENAME, mandatory=True) + + for fn in get_remote_store_filenames( + store, config_dict=self.project.remote_jobstore + ): + download_file(fn, mandatory=False) lock.update_on_release = {"$set": {"state": JobState.DOWNLOADED.value}} @@ -917,6 +936,7 @@ def check_run_status(self, filter: dict | None = None) -> None: # noqa: A002 qstate = qjob.state if qjob else None next_state = None start_time = None + next_step_delay = None if ( qstate == QState.RUNNING and doc["state"] == JobState.SUBMITTED.value @@ -933,6 +953,8 @@ def check_run_status(self, filter: dict | None = None) -> None: # noqa: A002 next_state = JobState.TERMINATED else: next_state = JobState.DOWNLOADED + # the delay is applied if the job is finished on the worker + next_step_delay = worker.delay_download logger.debug( f"terminated remote job with id {remote_doc['process_id']}" ) @@ -951,10 +973,12 @@ def check_run_status(self, filter: dict | None = None) -> None: # noqa: A002 "index": doc["index"], "state": doc["state"], } + with self.job_controller.lock_job_for_update( query=lock_filter, max_step_attempts=self.runner_options.max_step_attempts, delta_retry=self.runner_options.delta_retry, + next_step_delay=next_step_delay, ) as lock: if lock.locked_document: if error: diff --git a/tests/db/jobs/test_runner.py b/tests/db/jobs/test_runner.py index aa02d5c9..1fab2d5b 100644 --- a/tests/db/jobs/test_runner.py +++ b/tests/db/jobs/test_runner.py @@ -66,3 +66,23 @@ def raise_rmtree(*args, **kwargs): in j1_info.remote.error ) assert "FAKE ERROR" in j1_info.remote.error + + +def test_delay_download(job_controller, runner, monkeypatch, one_job): + from datetime import datetime + + from jobflow_remote.jobs.state import JobState + + j = one_job.jobs[0] + # since this is a local worker the state after RUNNING is DOWNLOADED, not TERMINATED + with monkeypatch.context() as m: + m.setattr(runner.workers["test_local_worker"], "delay_download", 5) + assert runner.run_one_job( + max_seconds=10, job_id=[j.uuid, j.index], target_state=JobState.DOWNLOADED + ) + j_info = job_controller.get_job_info(job_id=j.uuid, job_index=j.index) + assert j_info.remote.retry_time_limit is not None + assert j_info.remote.retry_time_limit > datetime.utcnow() + + # verify that it can properly complete after waiting + assert runner.run_one_job(max_seconds=20, job_id=[j.uuid, j.index]) diff --git a/tests/integration/test_failures.py b/tests/integration/test_failures.py new file mode 100644 index 00000000..263b07ea --- /dev/null +++ b/tests/integration/test_failures.py @@ -0,0 +1,56 @@ +import os + +import pytest + +pytestmark = pytest.mark.skipif( + not os.environ.get("CI"), + reason="Only run integration tests in CI, unless forced with 'CI' env var", +) + +# only test with local and slurm. Other queue managers should not make a difference +WORKERS = ["test_local_worker", "test_remote_slurm_worker"] + +MAX_TRY_SECONDS = 120 + + +@pytest.mark.parametrize( + "worker", + WORKERS, +) +def test_missing_store_files(worker, job_controller, runner): + from pathlib import Path + + from jobflow import Flow + + from jobflow_remote import submit_flow + from jobflow_remote.jobs.state import JobState + from jobflow_remote.testing import add + + j = add(1, 5) + flow = Flow([j]) + submit_flow(flow, worker="test_local_worker") + + # if the worker is local the TERMINATED state is skipped + # get the Job after the RUNNING state + target_state = ( + JobState.DOWNLOADED + if runner.workers[worker].type == "local" + else JobState.TERMINATED + ) + assert runner.run_one_job( + max_seconds=10, job_id=[j.uuid, j.index], target_state=target_state + ) + + # remove the file with the output store + j_info = job_controller.get_job_info(job_id=j.uuid, job_index=j.index) + host = runner.get_host(worker) + host.remove(str(Path(j_info.run_dir) / "remote_job_data.json")) + + # Finish the Job. It should fail during completion, not during download + assert runner.run_one_job(max_seconds=10, job_id=[j.uuid, j.index]) + j_info = job_controller.get_job_info(job_id=j.uuid, job_index=j.index) + assert j_info.state == JobState.FAILED + assert ( + "output store file remote_job_data.json is missing in the downloaded folder" + in j_info.error + ) From 1f16efd846a2278d78f28fbd3189ba0b43a7e061 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Sun, 5 Jan 2025 14:40:23 +0100 Subject: [PATCH 02/13] attempt to fix trends on weeks --- src/jobflow_remote/jobs/jobcontroller.py | 53 +++++++++++++++++++----- tests/db/jobs/test_jobcontroller.py | 6 +-- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index b1806236..d853f59b 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -3037,7 +3037,7 @@ def get_trends( date_format = { "hours": "%Y-%m-%d %H", "days": "%Y-%m-%d", - "weeks": "%Y-%U", # Week number of the year + "weeks": "%Y-%m-%d", # use days and group by week in python "months": "%Y-%m", "years": "%Y", }[interval] @@ -3078,18 +3078,49 @@ def get_trends( results = list(collection.aggregate(pipeline)) + # MongoDB uses an approach in grouping by weeks that may lead to + # more intervals than those specified, based on the number of + # weeks. To avoid inconsistencies "weeks" are handled separately: + # jobs are grouped by day in the query and regrouped by week + # using the ISO calendar convention result_dict = {} - for r in results: - result_dict[r["_id"]] = { - state_cls(s.upper()): n for s, n in r.items() if s != "_id" - } + if interval == "weeks": + # first prepare the full dictionary with all the expected values + for i in range(1, num_intervals + 1): + # Calculate the expected date rounded to the interval + expected_date = get_past_time_rounded( + interval=interval, num_intervals=i, reference=tznow + ) + + # Convert expected_date to ISO year-week (as a string) + iso_year, iso_week, _ = expected_date.isocalendar() + iso_id = f"{iso_year}-{iso_week:02d}" + + # Fill result_dict with zero counts + result_dict[iso_id] = {state: 0 for state in states} + + # add the number for each day in the corresponding week + for entry in results: + raw_date = datetime.strptime(entry["_id"], "%Y-%m-%d") + iso_year, iso_week, _ = raw_date.isocalendar() + iso_id = f"{iso_year}-{iso_week:02d}" + + for state, count in entry.items(): + if state != "_id": + result_dict[iso_id][state_cls(state.upper())] += count + + else: + for r in results: + result_dict[r["_id"]] = { + state_cls(s.upper()): n for s, n in r.items() if s != "_id" + } - for i in range(1, num_intervals + 1): - expected_date = get_past_time_rounded( - interval=interval, num_intervals=i, reference=tznow - ).strftime(date_format) - if expected_date not in result_dict: - result_dict[expected_date] = {state: 0 for state in states} + for i in range(1, num_intervals + 1): + expected_date = get_past_time_rounded( + interval=interval, num_intervals=i, reference=tznow + ).strftime(date_format) + if expected_date not in result_dict: + result_dict[expected_date] = {state: 0 for state in states} return result_dict diff --git a/tests/db/jobs/test_jobcontroller.py b/tests/db/jobs/test_jobcontroller.py index d707bea8..93794054 100644 --- a/tests/db/jobs/test_jobcontroller.py +++ b/tests/db/jobs/test_jobcontroller.py @@ -995,9 +995,9 @@ def test_get_trends(job_controller, one_job): list(JobState), interval="weeks", interval_timezone="UTC" ) assert len(job_trends) == 4 - assert utcnow.strftime("%Y-%U") in job_trends - assert job_trends[utcnow.strftime("%Y-%U")][JobState.READY] == 1 - assert job_trends[utcnow.strftime("%Y-%U")][JobState.COMPLETED] == 0 + assert utcnow.strftime("%Y-%V") in job_trends + assert job_trends[utcnow.strftime("%Y-%V")][JobState.READY] == 1 + assert job_trends[utcnow.strftime("%Y-%V")][JobState.COMPLETED] == 0 job_trends = job_controller.get_trends( list(JobState), interval="years", num_intervals=2, interval_timezone=tzname From ea630b4035359aefbf1477ab9c3cea7d9bc2b0b4 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Sun, 5 Jan 2025 22:19:45 +0100 Subject: [PATCH 03/13] handle flow metadata --- pyproject.toml | 2 +- src/jobflow_remote/cli/flow.py | 3 +++ src/jobflow_remote/jobs/data.py | 1 + src/jobflow_remote/jobs/jobcontroller.py | 13 +++++++++++++ tests/db/cli/test_flow.py | 18 ++++++++++++++++++ 5 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 93f1e498..c1f190f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ requires-python = ">=3.9" dependencies = [ "fabric ~= 3.2", "flufl.lock ~= 8.0", - "jobflow >= 0.1.14", + "jobflow >= 0.1.19", "psutil >= 5.9,< 7.0", "pydantic ~= 2.4", "python-dateutil>=2.8.2", diff --git a/src/jobflow_remote/cli/flow.py b/src/jobflow_remote/cli/flow.py index 81490749..d54a325e 100644 --- a/src/jobflow_remote/cli/flow.py +++ b/src/jobflow_remote/cli/flow.py @@ -31,6 +31,7 @@ job_flow_id_flag_opt, job_ids_opt, max_results_opt, + metadata_opt, name_opt, reverse_sort_flag_opt, sort_opt, @@ -69,6 +70,7 @@ def flows_list( name: name_opt = None, days: days_opt = None, hours: hours_opt = None, + metadata: metadata_opt = None, verbosity: verbosity_opt = 0, max_results: max_results_opt = 100, sort: sort_opt = SortOption.UPDATED_ON, @@ -93,6 +95,7 @@ def flows_list( start_date=start_date, end_date=end_date, name=name, + metadata=metadata, limit=max_results, sort=db_sort, full=verbosity > 0, diff --git a/src/jobflow_remote/jobs/data.py b/src/jobflow_remote/jobs/data.py index 2fb54d4c..a1226edd 100644 --- a/src/jobflow_remote/jobs/data.py +++ b/src/jobflow_remote/jobs/data.py @@ -106,6 +106,7 @@ def get_initial_flow_doc_dict(flow: Flow, job_dicts: list[dict]) -> dict: name=flow.name, ids=ids, parents=parents, + metadata=flow.metadata or {}, ) return flow_doc.as_db_dict() diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index d853f59b..69b3af83 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -308,6 +308,7 @@ def _build_query_flow( start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, + metadata: dict | None = None, locked: bool = False, ) -> dict: """ @@ -333,6 +334,9 @@ def _build_query_flow( name Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) + metadata + A dictionary of the values of the metadata to match. Should be an + exact match for all the values provided. locked If True only locked Flows will be selected. @@ -377,6 +381,10 @@ def _build_query_flow( mongo_regex = "^" + fnmatch.translate(name).replace("\\\\", "\\") query["name"] = {"$regex": mongo_regex} + if metadata: + metadata_dict = {f"metadata.{k}": v for k, v in metadata.items()} + query.update(metadata_dict) + if locked: query["lock_id"] = {"$ne": None} @@ -2239,6 +2247,7 @@ def get_flows_info( start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, + metadata: dict | None = None, locked: bool = False, sort: list[tuple] | None = None, limit: int = 0, @@ -2266,6 +2275,9 @@ def get_flows_info( name Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) + metadata + A dictionary of the values of the metadata to match. Should be an + exact match for all the values provided. locked If True only locked Flows will be selected. sort @@ -2291,6 +2303,7 @@ def get_flows_info( start_date=start_date, end_date=end_date, name=name, + metadata=metadata, locked=locked, ) diff --git a/tests/db/cli/test_flow.py b/tests/db/cli/test_flow.py index 43dcabe7..55acd9b5 100644 --- a/tests/db/cli/test_flow.py +++ b/tests/db/cli/test_flow.py @@ -4,6 +4,10 @@ def test_flows_list(job_controller, two_flows_four_jobs) -> None: + from jobflow import Flow + + from jobflow_remote import submit_flow + from jobflow_remote.testing import add from jobflow_remote.testing.cli import run_check_cli columns = ["DB id", "Name", "State", "Flow id", "Num Jobs", "Last updated"] @@ -23,6 +27,20 @@ def test_flows_list(job_controller, two_flows_four_jobs) -> None: ["flow", "list", "-fid", two_flows_four_jobs[0].uuid], required_out=outputs ) + # test metadata query + j = add(1, 2) + flow = Flow([j]) + flow.update_metadata({"test": "x"}) + submit_flow(flow, worker="test_local_worker") + + outputs = [flow.uuid[:5]] + excluded = [two_flows_four_jobs[0].uuid[:5], two_flows_four_jobs[1].uuid[:5]] + run_check_cli( + ["flow", "list", "--metadata", "test=x"], + required_out=outputs, + excluded_out=excluded, + ) + def test_delete(job_controller, two_flows_four_jobs) -> None: from jobflow import Flow From 92231cb1e48b768503ecef9977b736690266ce16 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Mon, 6 Jan 2025 11:00:46 +0100 Subject: [PATCH 04/13] improve upgrade procedure --- src/jobflow_remote/cli/admin.py | 23 ++++++++++++++++++++++- src/jobflow_remote/jobs/upgrade.py | 14 ++++++++++++-- tests/db/cli/test_admin.py | 23 +++++++++++++++++++++++ 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/jobflow_remote/cli/admin.py b/src/jobflow_remote/cli/admin.py index ba597e73..611fd446 100644 --- a/src/jobflow_remote/cli/admin.py +++ b/src/jobflow_remote/cli/admin.py @@ -78,7 +78,28 @@ def upgrade( jc = get_job_controller() upgrader = DatabaseUpgrader(jc) - target_version = parse_version(test_version_upgrade) or upgrader.current_version + target_version = ( + parse_version(test_version_upgrade) + if test_version_upgrade + else upgrader.current_version + ) + if target_version.local or target_version.post: + # this is likely a developer version installed from source. + # get the available upgrades for version higher than the developer one + base_version = parse_version(target_version.base_version) + upgrades_available = upgrader.collect_upgrades( + base_version, upgrader.registered_upgrades[-1] + ) + msg = ( + f"Target version {target_version} is likely a development version. Explicitly " + f"specify the target version if this is the case." + ) + if upgrades_available: + msg += ( + f" Available upgrades larger than {base_version}: " + f"{','.join(str(v) for v in upgrades_available)}" + ) + out_console.print(msg, style="gold1") db_version = jc.get_current_db_version() if db_version >= target_version: exit_with_warning_msg( diff --git a/src/jobflow_remote/jobs/upgrade.py b/src/jobflow_remote/jobs/upgrade.py index 3434e44c..40cecbd9 100644 --- a/src/jobflow_remote/jobs/upgrade.py +++ b/src/jobflow_remote/jobs/upgrade.py @@ -76,14 +76,24 @@ def wrapper(*args, **kwargs): return decorator + @property + def registered_upgrades(self): + return sorted(self._upgrade_registry.keys()) + def collect_upgrades( self, from_version: Version, target_version: Version ) -> list[Version]: """ Determines the upgrades that need to be performed. + + from_version + The version using as a starting point for the list of upgrades. + target_version + The final version up to which the upgrades should be listed. """ - registered_versions = sorted(self._upgrade_registry.keys()) - return [v for v in registered_versions if from_version < v <= target_version] + return [ + v for v in self.registered_upgrades if from_version < v <= target_version + ] def update_db_version(self, version: Version, session: ClientSession | None = None): """ diff --git a/tests/db/cli/test_admin.py b/tests/db/cli/test_admin.py index f1f03168..2efddae4 100644 --- a/tests/db/cli/test_admin.py +++ b/tests/db/cli/test_admin.py @@ -132,6 +132,29 @@ def test_unlock_runner(job_controller) -> None: ) +def test_upgrade(job_controller, upgrade_test_dir, random_project_name) -> None: + from jobflow_remote.testing.cli import run_check_cli + + # Test upgrading from development version. Explicitly pass such a target version + # This is the case if the target version is not specified and the code installed + # from source. No upgrade performed here. + run_check_cli( + [ + "admin", + "upgrade", + "--test-version-upgrade", + "0.1.4.post95+gc325f4e.d20250103", + ], + cli_input="wrong_project_name", + required_out=[ + "Target version 0.1.4.post95+gc325f4e.d20250103 is likely a development version. " + "Explicitly specify the target version if this is the case. Available " + "upgrades larger than 0.1.4: 0.1.5", + "No upgrade required for target version 0.1.4.post95+gc325f4e.d20250103", + ], + ) + + def test_upgrade_to_0_1_5( job_controller, upgrade_test_dir, random_project_name ) -> None: From d43de99880c00882be05123a76ce8b95749f4a95 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Mon, 6 Jan 2025 18:47:43 +0100 Subject: [PATCH 05/13] list locked flows --- src/jobflow_remote/cli/admin.py | 2 ++ src/jobflow_remote/cli/flow.py | 3 +++ src/jobflow_remote/cli/types.py | 10 ++++++++++ 3 files changed, 15 insertions(+) diff --git a/src/jobflow_remote/cli/admin.py b/src/jobflow_remote/cli/admin.py index 611fd446..4a300efa 100644 --- a/src/jobflow_remote/cli/admin.py +++ b/src/jobflow_remote/cli/admin.py @@ -212,6 +212,7 @@ def unlock( ) -> None: """ Forcibly removes the lock from the documents of the selected jobs. + If no criteria is specified all the locked jobs will be selected. WARNING: can lead to inconsistencies if the processes is actually running. """ job_ids_indexes = get_job_ids_indexes(job_id) @@ -270,6 +271,7 @@ def unlock_flow( ) -> None: """ Forcibly removes the lock from the documents of the selected jobs. + If no criteria is specified all the locked flows will be selected. WARNING: can lead to inconsistencies if the processes is actually running. """ job_ids_indexes = get_job_ids_indexes(job_id) diff --git a/src/jobflow_remote/cli/flow.py b/src/jobflow_remote/cli/flow.py index d54a325e..d2e33d9d 100644 --- a/src/jobflow_remote/cli/flow.py +++ b/src/jobflow_remote/cli/flow.py @@ -30,6 +30,7 @@ hours_opt, job_flow_id_flag_opt, job_ids_opt, + locked_flow_opt, max_results_opt, metadata_opt, name_opt, @@ -71,6 +72,7 @@ def flows_list( days: days_opt = None, hours: hours_opt = None, metadata: metadata_opt = None, + locked: locked_flow_opt = False, verbosity: verbosity_opt = 0, max_results: max_results_opt = 100, sort: sort_opt = SortOption.UPDATED_ON, @@ -96,6 +98,7 @@ def flows_list( end_date=end_date, name=name, metadata=metadata, + locked=locked, limit=max_results, sort=db_sort, full=verbosity > 0, diff --git a/src/jobflow_remote/cli/types.py b/src/jobflow_remote/cli/types.py index 6c14eee4..07dd57b6 100644 --- a/src/jobflow_remote/cli/types.py +++ b/src/jobflow_remote/cli/types.py @@ -275,6 +275,16 @@ ] +locked_flow_opt = Annotated[ + bool, + typer.Option( + "--locked", + "-l", + help="Select locked Flows", + ), +] + + serialize_file_format_opt = Annotated[ SerializeFileFormat, typer.Option( From f4e05ee1858170f8f8bf92d27ab9cec71496df81 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Thu, 9 Jan 2025 15:13:55 +0100 Subject: [PATCH 06/13] add queue.out and queue.err to the DB --- src/jobflow_remote/jobs/data.py | 4 + src/jobflow_remote/jobs/jobcontroller.py | 81 ++++++++++++- src/jobflow_remote/jobs/runner.py | 36 ++++-- src/jobflow_remote/remote/data.py | 16 +++ tests/integration/conftest.py | 6 +- tests/integration/test_failures.py | 142 ++++++++++++++++++++++- 6 files changed, 271 insertions(+), 14 deletions(-) diff --git a/src/jobflow_remote/jobs/data.py b/src/jobflow_remote/jobs/data.py index a1226edd..a933043b 100644 --- a/src/jobflow_remote/jobs/data.py +++ b/src/jobflow_remote/jobs/data.py @@ -121,6 +121,8 @@ class RemoteInfo(BaseModel): retry_time_limit: Optional[datetime] = None error: Optional[str] = None prerun_cleanup: bool = False + queue_out: Optional[str] = None + queue_err: Optional[str] = None class JobInfo(BaseModel): @@ -509,6 +511,8 @@ def get_reset_job_base_dict() -> dict: "previous_state": None, "remote.queue_state": None, "remote.error": None, + "remote.queue_out": None, + "remote.queue_err": None, "error": None, "updated_on": datetime.utcnow(), "start_time": None, diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index 69b3af83..709a5e8c 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -4,6 +4,7 @@ import fnmatch import importlib.metadata import logging +import shutil import traceback import warnings from collections import defaultdict @@ -51,6 +52,7 @@ JobState, ) from jobflow_remote.remote.data import ( + get_local_data_path, get_remote_store, get_remote_store_filenames, update_store, @@ -1004,7 +1006,7 @@ def _full_rerun( ---------- doc The dict of the JobDoc associated to the Job to rerun. - Just the "uuid", "index", "db_id", "state" values are required. + Just the "uuid", "index", "db_id", "state", "worker" values are required. sleep Amounts of seconds to wait between checks that the lock has been released. wait @@ -1146,6 +1148,7 @@ def _full_rerun( updated_states[child_doc["uuid"]][child_doc["index"]] = ( JobState.WAITING ) + self._delete_tmp_folder(child_doc) # if everything is fine here, update the state of the flow # before releasing its lock and set the update for the original job @@ -1156,6 +1159,9 @@ def _full_rerun( flow_uuid=flow_doc.uuid, updated_states=updated_states ) + # delete local temporary folder to avoid parsing + # previously downloaded files. + self._delete_tmp_folder(doc) job_doc_update = get_reset_job_base_dict() job_doc_update["state"] = JobState.READY.value if delete_files: @@ -1548,6 +1554,8 @@ def retry_job( "remote.step_attempts": 0, "remote.retry_time_limit": None, "remote.error": None, + "remote.queue_out": None, + "remote.queue_err": None, } lock.update_on_release = {"$set": set_dict} else: @@ -3527,7 +3535,11 @@ def checkin_job( doc_update: dict | None = None, ): stored_data = None + queue_out = None + queue_err = None if response is None: + # set queue_out and queue_err here in case of failure + queue_out, queue_err = self._get_downloaded_queue_files(job_doc) new_state = JobState.FAILED.value # handle response else: @@ -3580,7 +3592,13 @@ def checkin_job( if not doc_update: doc_update = {} doc_update.update( - {"state": new_state, "stored_data": stored_data, "error": error} + { + "state": new_state, + "stored_data": stored_data, + "error": error, + "remote.queue_out": queue_out, + "remote.queue_err": queue_err, + } ) result = self.jobs.update_one( @@ -3982,6 +4000,9 @@ def lock_job_for_update( next_step_time_limit = datetime.utcnow() + timedelta( seconds=next_step_delay ) + # When succeeded don't set remote.queue_out/remote.queue_err to + # None, otherwise it overwrites values that may be written if the + # completion fails. succeeded_update = { "$set": { "remote.step_attempts": 0, @@ -3995,12 +4016,15 @@ def lock_job_for_update( else: step_attempts = doc["remote"]["step_attempts"] no_retry = no_retry or step_attempts >= max_step_attempts + queue_out, queue_err = self._get_downloaded_queue_files(doc) if no_retry: update_on_release = { "$set": { "state": JobState.REMOTE_ERROR.value, "previous_state": doc["state"], "remote.error": error, + "remote.queue_out": queue_out, + "remote.queue_err": queue_err, } } else: @@ -4013,6 +4037,8 @@ def lock_job_for_update( "remote.step_attempts": step_attempts, "remote.retry_time_limit": retry_time_limit, "remote.error": error, + "remote.queue_out": queue_out, + "remote.queue_err": queue_err, } } if "$set" in update_on_release: @@ -4128,6 +4154,57 @@ def lock_auxiliary(self, **lock_kwargs) -> Generator[MongoLock, None, None]: with MongoLock(collection=self.auxiliary, **lock_kwargs) as lock: yield lock + def _get_downloaded_queue_files( + self, job_doc: dict + ) -> tuple[str | None, str | None]: + local_path_str = get_local_data_path( + project=self.project, + worker=job_doc["worker"], + job_id=job_doc["uuid"], + index=job_doc["index"], + run_dir=job_doc["run_dir"], + ) + if not local_path_str: + return None, None + local_path = Path(local_path_str) + queue_out_path = local_path / "queue.out" + queue_err_path = local_path / "queue.err" + queue_out = None + queue_err = None + length_limit = 3000 + if queue_out_path.exists(): + with queue_out_path.open(mode="rt") as f: + queue_out = f.read() + if len(queue_out) > length_limit: + queue_out = queue_out[:length_limit] + queue_out += " ...\nThe content was cut. Check the content of the actual file" + if queue_err_path.exists(): + with queue_err_path.open(mode="rt") as f: + queue_err = f.read() + if len(queue_err) > length_limit: + queue_err = queue_err[:length_limit] + queue_err += " ...\nThe content was cut. Check the content of the actual file" + + return queue_out, queue_err + + def _delete_tmp_folder(self, job_doc: dict): + worker = self.project.workers[job_doc["worker"]] + if not worker.is_local: + local_path = get_local_data_path( + project=self.project, + worker=worker, + job_id=job_doc["uuid"], + index=job_doc["index"], + run_dir=job_doc["run_dir"], + ) + if Path(local_path).exists(): + try: + shutil.rmtree(local_path) + except Exception as e: + logger.warning( + f"Could not delete the temporary local folder {local_path}: {getattr(e, 'message', e)}" + ) + def ping_flow_doc(self, uuid: str) -> None: """ Ping a Flow document to update its "updated_on" value. diff --git a/src/jobflow_remote/jobs/runner.py b/src/jobflow_remote/jobs/runner.py index 418f4065..93fa041b 100644 --- a/src/jobflow_remote/jobs/runner.py +++ b/src/jobflow_remote/jobs/runner.py @@ -35,6 +35,7 @@ from jobflow_remote.jobs.state import JobState from jobflow_remote.remote.data import ( get_job_path, + get_local_data_path, get_remote_in_file, get_remote_store, get_remote_store_filenames, @@ -799,9 +800,12 @@ def download(self, lock) -> None: store = self.jobstore remote_path = doc["run_dir"] - local_base_dir = Path(self.project.tmp_dir, "download") - local_path = get_job_path( - job_dict["uuid"], job_dict["index"], local_base_dir + local_path = get_local_data_path( + project=self.project, + worker=worker, + job_id=doc["uuid"], + index=doc["index"], + run_dir=remote_path, ) makedirs_p(local_path) @@ -810,9 +814,14 @@ def download_file(fname: str, mandatory: bool): # in principle fabric should work by just passing the # destination folder, but it fails remote_file_path = str(Path(remote_path, fname)) + local_file_path = Path(local_path, fname) try: - host.get(remote_file_path, str(Path(local_path, fname))) + host.get(remote_file_path, str(local_file_path)) except FileNotFoundError as exc: + # fabric may still create an empty local file even if the remote + # file does not exist. Remove it to avoid errors when checking + # the file existence + local_file_path.unlink(missing_ok=True) err_msg = f"file {remote_file_path} for job {job_dict['uuid']} does not exist" if mandatory: logger.exception(err_msg) @@ -822,6 +831,11 @@ def download_file(fname: str, mandatory: bool): err_msg += ". Allow continuing to the next state" logger.warning(err_msg) + # download the queue files first, so if an error is triggered + # afterwards they can be inserted in the DB + for fn in ("queue.out", "queue.err"): + download_file(fn, mandatory=False) + # only the output file is mandatory. If the others are missing # it will be dealt with by the complete. The output file may contain # an error and the fact that they are missing could be expected. @@ -849,13 +863,15 @@ def complete_job(self, lock) -> None: logger.debug(f"complete job db_id: {doc['db_id']}") # if the worker is local the files were not copied to the temporary - # folder, but the files could be directly updated + # folder, but the files could be directly accessed worker = self.get_worker(doc["worker"]) - if worker.is_local: - local_path = doc["run_dir"] - else: - local_base_dir = Path(self.project.tmp_dir, "download") - local_path = get_job_path(doc["uuid"], doc["index"], local_base_dir) + local_path = get_local_data_path( + project=self.project, + worker=worker, + job_id=doc["uuid"], + index=doc["index"], + run_dir=doc["run_dir"], + ) try: store = self.jobstore diff --git a/src/jobflow_remote/remote/data.py b/src/jobflow_remote/remote/data.py index c0292ce7..76d21d50 100644 --- a/src/jobflow_remote/remote/data.py +++ b/src/jobflow_remote/remote/data.py @@ -25,6 +25,10 @@ if TYPE_CHECKING: from collections.abc import Iterator + from jobflow_remote.config import Project + from jobflow_remote.config.base import WorkerBase + + JOB_INIT_ARGS = {k for k in inspect.signature(Job).parameters if k != "kwargs"} """A set of the arguments of the Job constructor which can be used to detect additional custom arguments @@ -40,6 +44,18 @@ def get_job_path( return str(base_path / relative_path) +def get_local_data_path( + project: Project, worker: str | WorkerBase, job_id: str, index: int, run_dir: str +) -> str: + if isinstance(worker, str): + worker = project.workers[worker] + if worker.is_local: + return run_dir + + local_base_dir = Path(project.tmp_dir, "download") + return get_job_path(job_id, index, local_base_dir) + + def get_remote_in_file(job, remote_store): d = jsanitize( {"job": job, "store": remote_store}, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index de9edf41..16d0698c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -376,7 +376,11 @@ def write_tmp_settings( sanitize_command=True, ), }, - exec_config={"test": {"export": {"TESTING_ENV_VAR": random_project_name}}}, + exec_config={ + "test": {"export": {"TESTING_ENV_VAR": random_project_name}}, + "some_pre_run": {"pre_run": "echo 'This is a pre_run' | tee /dev/stderr"}, + "long_pre_run": {"pre_run": f"echo {'X'*4000} | tee /dev/stderr"}, + }, runner=dict( delay_checkout=1, delay_check_run_status=1, diff --git a/tests/integration/test_failures.py b/tests/integration/test_failures.py index 263b07ea..4ed37297 100644 --- a/tests/integration/test_failures.py +++ b/tests/integration/test_failures.py @@ -28,7 +28,7 @@ def test_missing_store_files(worker, job_controller, runner): j = add(1, 5) flow = Flow([j]) - submit_flow(flow, worker="test_local_worker") + submit_flow(flow, worker=worker) # if the worker is local the TERMINATED state is skipped # get the Job after the RUNNING state @@ -54,3 +54,143 @@ def test_missing_store_files(worker, job_controller, runner): "output store file remote_job_data.json is missing in the downloaded folder" in j_info.error ) + + +@pytest.mark.parametrize( + "worker", + WORKERS, +) +def test_queue_files(worker, job_controller, runner, monkeypatch): + from pathlib import Path + from typing import NoReturn + + from jobflow import Flow + + from jobflow_remote import submit_flow + from jobflow_remote.jobs.runner import Runner + from jobflow_remote.jobs.state import JobState + from jobflow_remote.remote.data import get_local_data_path + from jobflow_remote.testing import add, always_fails + + w = job_controller.project.workers[worker] + + j = add(1, 5) + flow = Flow([j]) + submit_flow(flow, worker=worker, exec_config="some_pre_run") + + assert runner.run_one_job(max_seconds=20, job_id=[j.uuid, j.index]) + + j_info = job_controller.get_job_info(job_id=j.uuid, job_index=j.index) + local_data_path = Path( + get_local_data_path( + job_controller.project, + worker=w, + job_id=j.uuid, + index=j.index, + run_dir=j_info.run_dir, + ) + ) + + # since it complete correctly there should be no queue_out and queue_err data + assert j_info.remote.queue_out is None + assert j_info.remote.queue_err is None + + # If remote the local path should be removed in case of completed job. + # For a local worker the local path is equivalent to the run dir and should not be removed + if w.is_local: + assert local_data_path.exists() + else: + assert not local_data_path.exists() + + j = always_fails() + flow = Flow([j]) + submit_flow(flow, worker=worker, exec_config="some_pre_run") + + assert runner.run_one_job(max_seconds=20, job_id=[j.uuid, j.index]) + + j_info = job_controller.get_job_info(job_id=j.uuid, job_index=j.index) + local_data_path = Path( + get_local_data_path( + job_controller.project, + worker=w, + job_id=j.uuid, + index=j.index, + run_dir=j_info.run_dir, + ) + ) + + # since it fails these should be present + assert "This is a pre_run" in j_info.remote.queue_out + assert "This is a pre_run" in j_info.remote.queue_err + + if w.is_local: + assert local_data_path.exists() + else: + assert not local_data_path.exists() + + j = always_fails() + flow = Flow([j]) + submit_flow(flow, worker=worker, exec_config="long_pre_run") + + with monkeypatch.context() as m: + # avoid deleting the tmp folder, so it is always present + m.setattr(runner.runner_options, "delete_tmp_folder", False) + assert runner.run_one_job(max_seconds=20, job_id=[j.uuid, j.index]) + + j_info = job_controller.get_job_info(job_id=j.uuid, job_index=j.index) + local_data_path = Path( + get_local_data_path( + job_controller.project, + worker=w, + job_id=j.uuid, + index=j.index, + run_dir=j_info.run_dir, + ) + ) + + # since it fails these should be present. Check that it is cut + assert "XXXXX" in j_info.remote.queue_out + assert "The content was cut." in j_info.remote.queue_out + assert len(j_info.remote.queue_out) < 3500 + assert "XXXXX" in j_info.remote.queue_err + assert "The content was cut." in j_info.remote.queue_err + assert len(j_info.remote.queue_err) < 3500 + + assert local_data_path.exists() + + # When rerunning the folder should be deleted, in case it is a remote worker + assert job_controller.rerun_job(db_id=j_info.db_id) + if w.is_local: + assert local_data_path.exists() + else: + assert not local_data_path.exists() + + # check that the queue_out/err was cleaned up + j_info = job_controller.get_job_info(job_id=j.uuid, job_index=j.index) + assert j_info.remote.queue_out is None + assert j_info.remote.queue_err is None + + # Now run one that finishes but end up in REMOTE_ERROR during completion + # (it is easier to do that during completion with respect to + # download) and check that it is queue_out/queue_err are still added + + j = add(1, 5) + flow = Flow([j]) + submit_flow(flow, worker=worker, exec_config="some_pre_run") + + # patch the complete method of the runner to trigger a remote error + def complete_error(self, lock) -> NoReturn: + raise RuntimeError("FAKE ERROR") + + with monkeypatch.context() as m: + m.setattr(Runner, "complete_job", complete_error) + # patch this to 1 to avoid retrying multiple times + m.setattr(runner.runner_options, "max_step_attempts", 1) + with pytest.warns(match="FAKE ERROR"): + assert runner.run_one_job(max_seconds=20, job_id=[j.uuid, j.index]) + + j_info = job_controller.get_job_info(job_id=j.uuid, job_index=j.index) + assert j_info.state == JobState.REMOTE_ERROR + + assert "This is a pre_run" in j_info.remote.queue_out + assert "This is a pre_run" in j_info.remote.queue_err From 7bc0f17fde623d9218706bff2b4a89d5d4f7ada6 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Thu, 9 Jan 2025 15:30:46 +0100 Subject: [PATCH 07/13] remove mac_address from running runner check --- src/jobflow_remote/jobs/daemon.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/jobflow_remote/jobs/daemon.py b/src/jobflow_remote/jobs/daemon.py index aec84e11..f9162ce8 100644 --- a/src/jobflow_remote/jobs/daemon.py +++ b/src/jobflow_remote/jobs/daemon.py @@ -972,6 +972,11 @@ def _get_runner_info(self) -> dict: user = os.getlogin() except OSError: user = os.environ.get("USER", None) + # Note that this approach may give a different MAC address for the + # same machine, if more than one network device is present (this + # may also include local virtual machines). Consider replacing this + # with a more stable choice (e.g. the first one in alphabetical order, + # excluding virtual interfaces) mac_address = None found = False for addrs in psutil.net_if_addrs().values(): @@ -1063,11 +1068,13 @@ def _check_running_runner( db_data = doc["running_runner"] local_data = self._get_runner_info() + # not testing on the MAC address as it may change due to identifying + # different network devices on the same machine or changing in + # cloud VMs. data_to_check = [ "hostname", "project_name", "user", - "mac_address", "daemon_dir", ] for data in data_to_check: From 5302b6c57a136c0e0df83e26050df39bd1fc6db1 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Thu, 9 Jan 2025 15:46:00 +0100 Subject: [PATCH 08/13] make target version option for upgrade visible --- src/jobflow_remote/cli/admin.py | 28 +++++++++++++--------------- tests/db/cli/test_admin.py | 12 ++++++------ 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/src/jobflow_remote/cli/admin.py b/src/jobflow_remote/cli/admin.py index 4a300efa..4230dffb 100644 --- a/src/jobflow_remote/cli/admin.py +++ b/src/jobflow_remote/cli/admin.py @@ -44,13 +44,6 @@ @app_admin.command() def upgrade( - test_version_upgrade: Annotated[ - Optional[str], - typer.Option( - help="Explicitly sets the target version for upgrade. For testing purposes only", - hidden=True, - ), - ] = None, no_dry_run: Annotated[ bool, typer.Option( @@ -68,6 +61,15 @@ def upgrade( "proceeding with the upgrade", ), ] = False, + target: Annotated[ + Optional[str], + typer.Option( + "--target", + "-t", + help="Explicitly sets the target version for upgrade. For testing purposes or development" + " versions. Not for standard updates.", + ), + ] = None, ) -> None: """ Upgrade the jobflow database. @@ -78,11 +80,7 @@ def upgrade( jc = get_job_controller() upgrader = DatabaseUpgrader(jc) - target_version = ( - parse_version(test_version_upgrade) - if test_version_upgrade - else upgrader.current_version - ) + target_version = parse_version(target) if target else upgrader.current_version if target_version.local or target_version.post: # this is likely a developer version installed from source. # get the available upgrades for version higher than the developer one @@ -92,7 +90,7 @@ def upgrade( ) msg = ( f"Target version {target_version} is likely a development version. Explicitly " - f"specify the target version if this is the case." + f"specify the target version with the --target option if this is the case." ) if upgrades_available: msg += ( @@ -121,7 +119,7 @@ def upgrade( out_console.print(text) if not no_dry_run: - actions = upgrader.dry_run(target_version=test_version_upgrade) + actions = upgrader.dry_run(target_version=target) if not actions: out_console.print( f"No actions will be required for upgrading to version {target_version}" @@ -142,7 +140,7 @@ def upgrade( with loading_spinner(processing=False) as progress: progress.add_task(description="Upgrading the DB...", total=None) - done = upgrader.upgrade(target_version=test_version_upgrade) + done = upgrader.upgrade(target_version=target) not_text = "" if done else "[bold]NOT [/bold]" out_console.print(f"The database has {not_text}been upgraded") diff --git a/tests/db/cli/test_admin.py b/tests/db/cli/test_admin.py index 2efddae4..a9c67b80 100644 --- a/tests/db/cli/test_admin.py +++ b/tests/db/cli/test_admin.py @@ -142,14 +142,14 @@ def test_upgrade(job_controller, upgrade_test_dir, random_project_name) -> None: [ "admin", "upgrade", - "--test-version-upgrade", + "--target", "0.1.4.post95+gc325f4e.d20250103", ], cli_input="wrong_project_name", required_out=[ "Target version 0.1.4.post95+gc325f4e.d20250103 is likely a development version. " - "Explicitly specify the target version if this is the case. Available " - "upgrades larger than 0.1.4: 0.1.5", + "Explicitly specify the target version with the --target option if this is the case. " + "Available upgrades larger than 0.1.4: 0.1.5", "No upgrade required for target version 0.1.4.post95+gc325f4e.d20250103", ], ) @@ -169,7 +169,7 @@ def test_upgrade_to_0_1_5( assert str(job_controller.get_current_db_version()) == "0.1.0" run_check_cli( - ["admin", "upgrade", "--test-version-upgrade", "0.1.5"], + ["admin", "upgrade", "--target", "0.1.5"], cli_input="wrong_project_name", required_out=[ "No information about jobflow version in the database.", @@ -188,7 +188,7 @@ def test_upgrade_to_0_1_5( ) assert versions_info is None run_check_cli( - ["admin", "upgrade", "--test-version-upgrade", "0.1.5"], + ["admin", "upgrade", "--target", "0.1.5"], cli_input=random_project_name, required_out=["The database has been upgraded"], ) @@ -208,7 +208,7 @@ def test_upgrade_to_0_1_5( # test upgrading again to check that it will not perform the upgrade run_check_cli( - ["admin", "upgrade", "--test-version-upgrade", "0.1.5"], + ["admin", "upgrade", "--target", "0.1.5"], required_out=[ "Current DB version: 0.1.5. No upgrade required for target version 0.1.5" ], From edcd2294f45cef7e7400d9b25c81e75bab35ea75 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Thu, 9 Jan 2025 16:38:52 +0100 Subject: [PATCH 09/13] hide queue_out and queue_err in jf job info --- src/jobflow_remote/cli/formatting.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/jobflow_remote/cli/formatting.py b/src/jobflow_remote/cli/formatting.py index d7743f01..9177d1a9 100644 --- a/src/jobflow_remote/cli/formatting.py +++ b/src/jobflow_remote/cli/formatting.py @@ -210,6 +210,17 @@ def format_job_info( if remote_error: d["remote"]["error"] = ReprStr(remote_error) + if verbosity == 0: + d["remote"].pop("queue_out", None) + d["remote"].pop("queue_err", None) + else: + queue_out = d["remote"].get("queue_out") + if queue_out: + d["remote"]["queue_out"] = ReprStr(queue_out) + queue_err = d["remote"].get("queue_err") + if queue_err: + d["remote"]["queue_err"] = ReprStr(queue_err) + # reorder the keys # Do not check here that all the keys in JobInfo are in JOB_INFO_ORDER. Check in the tests sorted_d = {} From 6fa96028d91fb8ebe2ba1666cd89c978a99c7da2 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Thu, 9 Jan 2025 17:05:11 +0100 Subject: [PATCH 10/13] fix lint --- src/jobflow_remote/jobs/jobcontroller.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index 709a5e8c..0ee1a65b 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -3137,11 +3137,11 @@ def get_trends( } for i in range(1, num_intervals + 1): - expected_date = get_past_time_rounded( + expected_date_str = get_past_time_rounded( interval=interval, num_intervals=i, reference=tznow ).strftime(date_format) - if expected_date not in result_dict: - result_dict[expected_date] = {state: 0 for state in states} + if expected_date_str not in result_dict: + result_dict[expected_date_str] = {state: 0 for state in states} return result_dict From bc601b309da5e15967651146454e23e1658d7e18 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Thu, 9 Jan 2025 21:32:17 +0100 Subject: [PATCH 11/13] fix test --- tests/db/cli/test_admin.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/db/cli/test_admin.py b/tests/db/cli/test_admin.py index a9c67b80..ec959285 100644 --- a/tests/db/cli/test_admin.py +++ b/tests/db/cli/test_admin.py @@ -143,14 +143,13 @@ def test_upgrade(job_controller, upgrade_test_dir, random_project_name) -> None: "admin", "upgrade", "--target", - "0.1.4.post95+gc325f4e.d20250103", + "0.1.4.post95+gc325f4e.d20250102", ], - cli_input="wrong_project_name", + cli_input=random_project_name, required_out=[ - "Target version 0.1.4.post95+gc325f4e.d20250103 is likely a development version. " + "Target version 0.1.4.post95+gc325f4e.d20250102 is likely a development version. " "Explicitly specify the target version with the --target option if this is the case. " - "Available upgrades larger than 0.1.4: 0.1.5", - "No upgrade required for target version 0.1.4.post95+gc325f4e.d20250103", + "Available upgrades larger than 0.1.4: 0.1.5" ], ) From 5d4ebbd1d7a3fd55eb53e33a64e218202ff4d253 Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Thu, 9 Jan 2025 22:32:39 +0100 Subject: [PATCH 12/13] bump requirements and remove "include" option from coverage --- pyproject.toml | 1 - requirements/requirements.txt | 10 +++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c1f190f5..84b6b293 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -161,7 +161,6 @@ addopts = [ ] [tool.coverage.run] -include = ["src/*"] parallel = true branch = true diff --git a/requirements/requirements.txt b/requirements/requirements.txt index bb0f5fe8..cfefa3fe 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,11 +1,11 @@ -jobflow==0.1.18 -pydantic==2.9.2 +jobflow==0.1.19 +pydantic==2.10.5 fabric==3.2.2 tomlkit==0.13.2 qtoolkit==0.1.5 -typer==0.12.5 -rich==13.9.3 -psutil==6.1.0 +typer==0.15.1 +rich==13.9.4 +psutil==6.1.1 supervisor==4.2.5 ruamel.yaml==0.17.35 schedule==1.2.2 From 8aef1f88d0db94c076937dae397a68358dcc6f0a Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Thu, 9 Jan 2025 23:12:53 +0100 Subject: [PATCH 13/13] try different redirect to stderr --- tests/integration/conftest.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 16d0698c..0e977f8d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -378,8 +378,10 @@ def write_tmp_settings( }, exec_config={ "test": {"export": {"TESTING_ENV_VAR": random_project_name}}, - "some_pre_run": {"pre_run": "echo 'This is a pre_run' | tee /dev/stderr"}, - "long_pre_run": {"pre_run": f"echo {'X'*4000} | tee /dev/stderr"}, + "some_pre_run": { + "pre_run": "echo 'This is a pre_run'; echo 'This is a pre_run' 1>&2" + }, + "long_pre_run": {"pre_run": f"echo {'X'*4000}; echo {'X'*4000} 1>&2"}, }, runner=dict( delay_checkout=1,