From eccd7fe6448bb8a810d23826ae56ec776e394d0b Mon Sep 17 00:00:00 2001 From: Tim Pillinger Date: Sat, 19 Oct 2024 12:57:33 +0100 Subject: [PATCH] Type checking improved. --- cylc/flow/commands.py | 4 +-- cylc/flow/data_store_mgr.py | 2 +- cylc/flow/prerequisite.py | 7 ++-- cylc/flow/run_modes/__init__.py | 11 +++--- cylc/flow/run_modes/simulation.py | 3 +- cylc/flow/run_modes/skip.py | 5 ++- cylc/flow/scheduler.py | 15 ++++---- cylc/flow/task_events_mgr.py | 8 ++--- cylc/flow/task_job_mgr.py | 13 ++++--- cylc/flow/task_pool.py | 7 ++-- cylc/flow/task_proxy.py | 5 ++- cylc/flow/task_state.py | 3 +- cylc/flow/workflow_db_mgr.py | 12 ++++--- tests/integration/conftest.py | 3 +- .../run_modes/test_mode_overrides.py | 16 +++++---- tests/integration/run_modes/test_nonlive.py | 4 +-- .../integration/run_modes/test_simulation.py | 36 ++++++++++--------- tests/integration/utils/flow_tools.py | 1 + 18 files changed, 92 insertions(+), 63 deletions(-) diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index 30fa96344d2..cd131aa2c1b 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -249,7 +249,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]): """Poll pollable tasks or a task or family if options are provided.""" validate.is_tasks(tasks) yield - if schd.get_run_mode() == RunMode.SIMULATION.value: + if schd.get_run_mode() == RunMode.SIMULATION: yield 0 itasks, _, bad_items = schd.pool.filter_task_proxies(tasks) schd.task_job_mgr.poll_task_jobs(schd.workflow, itasks) @@ -262,7 +262,7 @@ async def kill_tasks(schd: 'Scheduler', tasks: Iterable[str]): validate.is_tasks(tasks) yield itasks, _, bad_items = schd.pool.filter_task_proxies(tasks) - if schd.get_run_mode() == RunMode.SIMULATION.value: + if schd.get_run_mode() == RunMode.SIMULATION: for itask in itasks: if itask.state(*TASK_STATUSES_ACTIVE): itask.state_reset(TASK_STATUS_FAILED) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index ca6579d5fc6..f17f8158773 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -701,7 +701,7 @@ def generate_definition_elements(self): time_zone_info = TIME_ZONE_LOCAL_INFO for key, val in time_zone_info.items(): setbuff(workflow.time_zone_info, key, val) - workflow.run_mode = RunMode.get(config.options) + workflow.run_mode = RunMode.get(config.options).value workflow.cycling_mode = config.cfg['scheduling']['cycling mode'] workflow.workflow_log_dir = self.schd.workflow_log_dir workflow.job_log_names.extend(list(JOB_LOG_OPTS.values())) diff --git a/cylc/flow/prerequisite.py b/cylc/flow/prerequisite.py index ba9300bd75d..c58f54652ab 100644 --- a/cylc/flow/prerequisite.py +++ b/cylc/flow/prerequisite.py @@ -40,6 +40,7 @@ from cylc.flow.data_messages_pb2 import PbCondition, PbPrerequisite from cylc.flow.exceptions import TriggerExpressionError from cylc.flow.id import quick_relative_detokenise +from cylc.flow.run_modes import RunMode if TYPE_CHECKING: @@ -263,7 +264,7 @@ def _eval_satisfied(self) -> bool: def satisfy_me( self, outputs: Iterable['Tokens'], - mode: Literal['skip', 'live', 'simulation', 'skip'] = 'live' + mode: "RunMode" = RunMode.LIVE ) -> 'Set[Tokens]': """Attempt to satisfy me with given outputs. @@ -273,9 +274,9 @@ def satisfy_me( """ satisfied_message: SatisfiedState - if mode != 'live': + if mode != RunMode.LIVE: satisfied_message = self.DEP_STATE_SATISFIED_BY.format( - mode) # type: ignore + mode.value) # type: ignore else: satisfied_message = self.DEP_STATE_SATISFIED valid = set() diff --git a/cylc/flow/run_modes/__init__.py b/cylc/flow/run_modes/__init__.py index 08761780b1b..b2c7001bdb9 100644 --- a/cylc/flow/run_modes/__init__.py +++ b/cylc/flow/run_modes/__init__.py @@ -80,9 +80,12 @@ def describe(self): raise KeyError(f'No description for {self}.') @staticmethod - def get(options: 'Values') -> str: + def get(options: 'Values') -> "RunMode": """Return the workflow run mode from the options.""" - return getattr(options, 'run_mode', None) or RunMode.LIVE.value + run_mode = getattr(options, 'run_mode', None) + if run_mode: + return RunMode(run_mode) + return RunMode.LIVE def get_submit_method(self) -> 'Optional[SubmissionInterface]': """Return the job submission method for this run mode. @@ -113,9 +116,9 @@ def disable_task_event_handlers(itask: 'TaskProxy'): """ mode = itask.run_mode return ( - mode == RunMode.SIMULATION.value + mode == RunMode.SIMULATION or ( - mode == RunMode.SKIP.value + mode == RunMode.SKIP and itask.platform.get( 'disable task event handlers', False) ) diff --git a/cylc/flow/run_modes/simulation.py b/cylc/flow/run_modes/simulation.py index 79b713f6647..047a4526ed7 100644 --- a/cylc/flow/run_modes/simulation.py +++ b/cylc/flow/run_modes/simulation.py @@ -321,12 +321,13 @@ def sim_time_check( """ now = time() sim_task_state_changed: bool = False + for itask in itasks: if ( itask.state.status != TASK_STATUS_RUNNING or ( itask.run_mode - and itask.run_mode != RunMode.SIMULATION.value + and itask.run_mode != RunMode.SIMULATION ) ): continue diff --git a/cylc/flow/run_modes/skip.py b/cylc/flow/run_modes/skip.py index 9bcb36a8ec8..7e3f4310ee5 100644 --- a/cylc/flow/run_modes/skip.py +++ b/cylc/flow/run_modes/skip.py @@ -64,7 +64,10 @@ def submit_task_job( 'execution retry delays': [] } itask.summary['job_runner_name'] = RunMode.SKIP.value - itask.run_mode = RunMode.SKIP.value + itask.jobs.append( + task_job_mgr.get_simulation_job_conf(itask, _workflow) + ) + itask.run_mode = RunMode.SKIP task_job_mgr.workflow_db_mgr.put_insert_task_jobs( itask, { 'time_submit': now[1], diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 081a5276637..8ba5f48289a 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -432,8 +432,10 @@ async def configure(self, params): og_run_mode = self.get_run_mode() if run_mode != og_run_mode: raise InputError( - f'This workflow was originally run in {og_run_mode} mode:' - f' Will not restart in {run_mode} mode.') + "This workflow was originally run in " + f"{run_mode.value} mode:" + f" Will not restart in {run_mode.value} mode." + ) self.profiler.log_memory("scheduler.py: before load_flow_file") try: @@ -1195,7 +1197,7 @@ def run_event_handlers(self, event, reason=""): Run workflow events only in live mode or skip mode. """ - if self.get_run_mode() in WORKFLOW_ONLY_MODES: + if self.get_run_mode().value in WORKFLOW_ONLY_MODES: return self.workflow_event_handler.handle(self, event, str(reason)) @@ -1320,7 +1322,7 @@ def timeout_check(self): """Check workflow and task timers.""" self.check_workflow_timers() # check submission and execution timeout and polling timers - if self.get_run_mode() != RunMode.SIMULATION.value: + if self.get_run_mode() != RunMode.SIMULATION: self.task_job_mgr.check_task_jobs(self.workflow, self.pool) async def workflow_shutdown(self): @@ -1518,8 +1520,9 @@ async def _main_loop(self) -> None: self.xtrigger_mgr.housekeep(self.pool.get_tasks()) self.pool.clock_expire_tasks() self.release_queued_tasks() + if ( - self.options.run_mode == RunMode.SIMULATION.value + self.get_run_mode() == RunMode.SIMULATION and sim_time_check( self.task_events_mgr, self.pool.get_tasks(), @@ -1979,7 +1982,7 @@ def _check_startup_opts(self) -> None: f"option --{opt}=reload is only valid for restart" ) - def get_run_mode(self) -> str: + def get_run_mode(self) -> RunMode: return RunMode.get(self.options) async def handle_exception(self, exc: BaseException) -> NoReturn: diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 9e37f2d1eaa..f0c65bb9ae0 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -773,7 +773,7 @@ def process_message( # ... but either way update the job ID in the job proxy (it only # comes in via the submission message). - if itask.run_mode != RunMode.SIMULATION.value: + if itask.run_mode != RunMode.SIMULATION: job_tokens = itask.tokens.duplicate( job=str(itask.submit_num) ) @@ -896,7 +896,7 @@ def _process_message_check( if ( itask.state(TASK_STATUS_WAITING) # Polling in live mode only: - and itask.run_mode == RunMode.LIVE.value + and itask.run_mode == RunMode.LIVE and ( ( # task has a submit-retry lined up @@ -1470,7 +1470,7 @@ def _process_message_submitted( ) itask.set_summary_time('submitted', event_time) - if itask.run_mode == RunMode.SIMULATION.value: + if itask.run_mode == RunMode.SIMULATION: # Simulate job started as well. itask.set_summary_time('started', event_time) if itask.state_reset(TASK_STATUS_RUNNING, forced=forced): @@ -1507,7 +1507,7 @@ def _process_message_submitted( 'submitted', event_time, ) - if itask.run_mode == RunMode.SIMULATION.value: + if itask.run_mode == RunMode.SIMULATION: # Simulate job started as well. self.data_store_mgr.delta_job_time( job_tokens, diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 0338c278e35..3d8db951890 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -248,7 +248,7 @@ def submit_task_jobs( itasks, curve_auth, client_pub_key_dir, - run_mode: Union[str, RunMode] = RunMode.LIVE, + run_mode: RunMode = RunMode.LIVE, ): """Prepare for job submission and submit task jobs. @@ -1028,7 +1028,7 @@ def submit_nonlive_task_jobs( self: 'TaskJobManager', workflow: str, itasks: 'List[TaskProxy]', - workflow_run_mode: Union[str, RunMode], + workflow_run_mode: RunMode, ) -> 'Tuple[List[TaskProxy], List[TaskProxy]]': """Identify task mode and carry out alternative submission paths if required: @@ -1058,14 +1058,19 @@ def submit_nonlive_task_jobs( # Get task config with broadcasts applied: rtconfig = self.task_events_mgr.broadcast_mgr.get_updated_rtconfig( itask) + # Apply task run mode - if workflow_run_mode in WORKFLOW_ONLY_MODES: + if workflow_run_mode.value in WORKFLOW_ONLY_MODES: # Task run mode cannot override workflow run-mode sim or dummy: run_mode = workflow_run_mode else: # If workflow mode is skip or live and task mode is set, # override workflow mode, else use workflow mode. - run_mode = rtconfig.get('run mode', None) or workflow_run_mode + run_mode = rtconfig.get('run mode', None) + if run_mode: + run_mode = RunMode(run_mode) + else: + run_mode = workflow_run_mode # Store the run mode of the this submission: itask.run_mode = run_mode diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index d1e1cbf94ba..0ef87e4a799 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1422,10 +1422,9 @@ def spawn_on_output(self, itask: TaskProxy, output: str) -> None: tasks = [c_task] for t in tasks: - t.satisfy_me( [itask.tokens.duplicate(task_sel=output)], - mode=itask.run_mode + mode=itask.run_mode # type: ignore ) self.data_store_mgr.delta_task_prerequisite(t) if not in_pool: @@ -1554,7 +1553,7 @@ def spawn_on_all_outputs( if completed_only: c_task.satisfy_me( [itask.tokens.duplicate(task_sel=message)], - mode=itask.run_mode + mode=itask.run_mode # type: ignore ) self.data_store_mgr.delta_task_prerequisite(c_task) self.add_to_pool(c_task) @@ -1979,7 +1978,7 @@ def _set_outputs_itask( rtconfig = bc_mgr.get_updated_rtconfig(itask) outputs.remove(RunMode.SKIP.value) skips = get_skip_mode_outputs(itask, rtconfig) - itask.run_mode = RunMode.SKIP.value + itask.run_mode = RunMode.SKIP outputs = self._standardise_outputs( itask.point, itask.tdef, outputs) outputs = list(set(outputs + skips)) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 8ce3669271b..270ac35307d 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -31,7 +31,6 @@ Optional, Set, Tuple, - Union, ) from metomi.isodatetime.timezone import get_local_time_zone @@ -300,7 +299,7 @@ def __init__( self.graph_children = generate_graph_children(tdef, self.point) self.mode_settings: Optional['ModeSettings'] = None - self.run_mode: Optional[Union[str, RunMode]] = None + self.run_mode: Optional[RunMode] = None if self.tdef.expiration_offset is not None: self.expire_time = ( @@ -551,7 +550,7 @@ def state_reset( return False def satisfy_me( - self, task_messages: 'Iterable[Tokens]', mode=RunMode.LIVE.value + self, task_messages: 'Iterable[Tokens]', mode: "RunMode" = RunMode.LIVE ) -> 'Set[Tokens]': """Try to satisfy my prerequisites with given output messages. diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index 8447a7bed6d..bea07204cc4 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -41,6 +41,7 @@ from cylc.flow.cycling import PointBase from cylc.flow.id import Tokens from cylc.flow.prerequisite import PrereqMessage + from cylc.flow.run_modes import RunMode from cylc.flow.taskdef import TaskDef @@ -324,7 +325,7 @@ def __call__( def satisfy_me( self, outputs: Iterable['Tokens'], - mode, + mode: "RunMode", ) -> Set['Tokens']: """Try to satisfy my prerequisites with given outputs. diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index 5f1f673440c..b06367e37b3 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -344,11 +344,15 @@ def put_workflow_params(self, schd: 'Scheduler') -> None: value = getattr(schd.options, key, None) value = None if value == 'reload' else value self.put_workflow_params_1(key, value) - for key in ( + + self.put_workflow_params_1( + self.KEY_CYCLE_POINT_TIME_ZONE, + getattr(schd.options, self.KEY_CYCLE_POINT_TIME_ZONE, None), + ) + self.put_workflow_params_1( self.KEY_RUN_MODE, - self.KEY_CYCLE_POINT_TIME_ZONE - ): - self.put_workflow_params_1(key, getattr(schd.options, key, None)) + schd.get_run_mode().value, + ) def put_workflow_params_1( self, key: str, value: Union[AnyStr, float, None] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 2f0aa5afab4..1bd06697cd0 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -28,6 +28,7 @@ from cylc.flow.option_parsers import Options from cylc.flow.pathutil import get_cylc_run_dir from cylc.flow.rundb import CylcWorkflowDAO +from cylc.flow.run_modes import RunMode from cylc.flow.scripts.validate import ValidateOptions from cylc.flow.scripts.install import ( install as cylc_install, @@ -686,7 +687,7 @@ def capture_live_submissions(capcall, monkeypatch): would have been submitted had this fixture not been used. """ def fake_submit(self, _workflow, itasks, *_): - self.submit_nonlive_task_jobs(_workflow, itasks, 'simulation') + self.submit_nonlive_task_jobs(_workflow, itasks, RunMode.SIMULATION) for itask in itasks: for status in (TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED): self.task_events_mgr.process_message( diff --git a/tests/integration/run_modes/test_mode_overrides.py b/tests/integration/run_modes/test_mode_overrides.py index 7c75c7c69f1..ec426946244 100644 --- a/tests/integration/run_modes/test_mode_overrides.py +++ b/tests/integration/run_modes/test_mode_overrides.py @@ -31,17 +31,18 @@ import pytest from cylc.flow.cycling.iso8601 import ISO8601Point -from cylc.flow.run_modes import WORKFLOW_RUN_MODES +from cylc.flow.run_modes import WORKFLOW_RUN_MODES, RunMode -@pytest.mark.parametrize('workflow_run_mode', sorted(WORKFLOW_RUN_MODES)) +@pytest.mark.parametrize('workflow_run_mode', WORKFLOW_RUN_MODES) async def test_run_mode_override_from_config( capture_live_submissions, flow, scheduler, run, complete, - workflow_run_mode + workflow_run_mode, + validate ): """Test that `[runtime][]run mode` overrides workflow modes.""" id_ = flow({ @@ -51,11 +52,14 @@ async def test_run_mode_override_from_config( }, }, 'runtime': { + 'root': {'simulation': {'default run length': 'PT0S'}}, 'live': {'run mode': 'live'}, 'skip': {'run mode': 'skip'}, } }) - schd = scheduler(id_, run_mode=workflow_run_mode, paused_start=False) + run_mode = RunMode(workflow_run_mode) + validate(id_) + schd = scheduler(id_, run_mode=run_mode, paused_start=False) async with run(schd): await complete(schd) @@ -96,7 +100,7 @@ async def test_force_trigger_does_not_override_run_mode( schd.server.curve_auth, schd.server.client_pub_key_dir) - assert foo.run_mode == 'skip' + assert foo.run_mode.value == 'skip' async def test_run_mode_skip_abides_by_held( @@ -161,5 +165,5 @@ async def test_run_mode_override_from_broadcast( [foo_1000, foo_1001], schd.server.curve_auth, schd.server.client_pub_key_dir) - assert foo_1000.run_mode == 'skip' + assert foo_1000.run_mode.value == 'skip' assert capture_live_submissions() == {'1001/foo'} diff --git a/tests/integration/run_modes/test_nonlive.py b/tests/integration/run_modes/test_nonlive.py index eca02cf5018..90cefbf7701 100644 --- a/tests/integration/run_modes/test_nonlive.py +++ b/tests/integration/run_modes/test_nonlive.py @@ -110,8 +110,8 @@ async def test_db_task_jobs( submit_and_check_db(schd) - assert itask_live.run_mode == 'simulation' - assert itask_skip.run_mode == 'skip' + assert itask_live.run_mode.value == 'simulation' + assert itask_skip.run_mode.value == 'skip' async def test_db_task_states( diff --git a/tests/integration/run_modes/test_simulation.py b/tests/integration/run_modes/test_simulation.py index c78f6e139dc..100a30c46b1 100644 --- a/tests/integration/run_modes/test_simulation.py +++ b/tests/integration/run_modes/test_simulation.py @@ -22,6 +22,7 @@ from cylc.flow import commands from cylc.flow.cycling.iso8601 import ISO8601Point +from cylc.flow.run_modes import RunMode from cylc.flow.run_modes.simulation import sim_time_check @@ -63,7 +64,7 @@ def _run_simjob(schd, point, task): itask.state.is_queued = False monkeytime(0) schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], 'simulation') + schd.workflow, [itask], RunMode.SIMULATION) monkeytime(itask.mode_settings.timeout + 1) # Run Time Check @@ -171,7 +172,7 @@ def test_fail_once(sim_time_check_setup, itask, point, results, monkeypatch): for i, result in enumerate(results): itask.try_timers['execution-retry'].num = i schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], 'simulation') + schd.workflow, [itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is result @@ -191,7 +192,7 @@ def test_task_finishes(sim_time_check_setup, monkeytime, caplog): fail_all_1066.state.status = 'running' fail_all_1066.state.is_queued = False schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [fail_all_1066], 'simulation') + schd.workflow, [fail_all_1066], RunMode.SIMULATION) # For the purpose of the test delete the started time set by # submit_nonlive_task_jobs. @@ -221,7 +222,7 @@ def test_task_sped_up(sim_time_check_setup, monkeytime): # Run the job submission method: monkeytime(0) schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [fast_forward_1066], 'simulation') + schd.workflow, [fast_forward_1066], RunMode.SIMULATION) fast_forward_1066.state.is_queued = False result = sim_time_check(schd.task_events_mgr, [fast_forward_1066], '') @@ -235,7 +236,7 @@ def test_task_sped_up(sim_time_check_setup, monkeytime): async def test_settings_restart( - monkeytime, flow, scheduler, start + monkeytime, flow, scheduler, start,validate ): """Check that simulation mode settings are correctly restored upon restart. @@ -269,13 +270,12 @@ async def test_settings_restart( } }) schd = scheduler(id_) - # Start the workflow: async with start(schd): og_timeouts = {} for itask in schd.pool.get_tasks(): schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], 'simulation') + schd.workflow, [itask], RunMode.SIMULATION) og_timeouts[itask.identity] = itask.mode_settings.timeout @@ -285,9 +285,9 @@ async def test_settings_restart( schd.task_events_mgr, [itask], schd.workflow_db_mgr ) is False - # Stop and restart the scheduler: + # Stop and restart the scheduler: schd = scheduler(id_) - async with start(schd): + async with start(schd) as log: for itask in schd.pool.get_tasks(): # Check that we haven't got mode settings back: assert itask.mode_settings is None @@ -312,6 +312,7 @@ async def test_settings_restart( ) is False # Check that the itask.mode_settings is now re-created + assert itask.mode_settings.simulated_run_length == 60.0 assert itask.mode_settings.sim_task_fails is True @@ -393,7 +394,7 @@ async def test_settings_broadcast( # Submit the first - the sim task will fail: schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], 'simulation') + schd.workflow, [itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is True # Let task finish. @@ -412,13 +413,13 @@ async def test_settings_broadcast( }]) # Submit again - result is different: schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], 'simulation') + schd.workflow, [itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is False # Assert Clearing the broadcast works schd.broadcast_mgr.clear_broadcast() schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], 'simulation') + schd.workflow, [itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is True # Assert that list of broadcasts doesn't change if we submit @@ -429,7 +430,7 @@ async def test_settings_broadcast( 'simulation': {'fail cycle points': 'higadfuhasgiurguj'} }]) schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], 'simulation') + schd.workflow, [itask], RunMode.SIMULATION) assert ( 'Invalid ISO 8601 date representation: higadfuhasgiurguj' in log.messages[-1]) @@ -443,7 +444,7 @@ async def test_settings_broadcast( 'simulation': {'fail cycle points': '1'} }]) schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], 'simulation') + schd.workflow, [itask], RunMode.SIMULATION) assert ( 'Invalid ISO 8601 date representation: 1' in log.messages[-1]) @@ -455,7 +456,7 @@ async def test_settings_broadcast( 'execution retry delays': '3*PT2S' }]) schd.task_job_mgr.submit_nonlive_task_jobs( - schd.workflow, [itask], 'simulation') + schd.workflow, [itask], RunMode.SIMULATION) assert itask.mode_settings.sim_task_fails is True assert itask.try_timers['execution-retry'].delays == [2.0, 2.0, 2.0] # n.b. rtconfig should remain unchanged, lest we cancel broadcasts: @@ -466,9 +467,12 @@ async def test_db_submit_num( flow, one_conf, scheduler, run, complete, db_select ): """Test simulation mode correctly increments the submit_num in the DB.""" + one_conf['runtime'] = { + 'one': {'simulation': {'default run length': 'PT0S'}} + } schd = scheduler(flow(one_conf), paused_start=False) async with run(schd): - await complete(schd, '1/one') + await complete(schd, '1/one', timeout=10) assert db_select(schd, False, 'task_states', 'submit_num', 'status') == [ (1, 'succeeded'), ] diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index fef15e3e3dc..b771597798f 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -31,6 +31,7 @@ from uuid import uuid1 from cylc.flow import CYLC_LOG +from cylc.flow.run_modes import RunMode from cylc.flow.workflow_files import WorkflowFiles from cylc.flow.scheduler import Scheduler, SchedulerStop from cylc.flow.scheduler_cli import RunOptions