From 3e0da37c2a7bb340cb99118256a5091df2523fd9 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Fri, 19 Jan 2024 19:06:55 +1300 Subject: [PATCH] Task expire optionality. --- cylc/flow/config.py | 27 +++++++-- cylc/flow/graph_parser.py | 4 +- cylc/flow/task_job_mgr.py | 2 + cylc/flow/task_outputs.py | 29 ++++++--- cylc/flow/task_pool.py | 59 +++++++++---------- cylc/flow/task_proxy.py | 9 ++- cylc/flow/taskdef.py | 4 -- .../scripts/test_completion_server.py | 1 - tests/integration/test_config.py | 7 +++ tests/integration/test_task_job_mgr.py | 1 - .../tui/screenshots/test_show.success.html | 4 +- 11 files changed, 94 insertions(+), 53 deletions(-) diff --git a/cylc/flow/config.py b/cylc/flow/config.py index f871531adeb..2241f306f17 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -89,7 +89,7 @@ ) from cylc.flow.task_id import TaskID from cylc.flow.task_outputs import ( - TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_EXPIRED, TaskOutputs ) from cylc.flow.task_trigger import TaskTrigger, Dependency @@ -1657,7 +1657,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, offset_is_irregular, offset_is_absolute) = ( GraphNodeParser.get_inst().parse(left)) - # Qualifier. + # Qualifier. Note ":succeeded" made explicit by the graph parser. outputs = self.cfg['runtime'][name]['outputs'] if outputs and (output in outputs): # Qualifier is a custom task message. @@ -1668,9 +1668,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, f"Undefined custom output: {name}:{output}" ) qualifier = output - else: - # No qualifier specified => use "succeeded". - qualifier = TASK_OUTPUT_SUCCEEDED # Generate TaskTrigger if not already done. key = (name, offset, qualifier, @@ -2136,7 +2133,11 @@ def set_required_outputs( Args: task_output_opt: {(task, output): (is-optional, default, is_set)} + """ + # task_output_opt: outputs parsed from graph triggers + # taskdef.outputs: outputs listed under runtime + for name, taskdef in self.taskdefs.items(): for output in taskdef.outputs: try: @@ -2146,6 +2147,22 @@ def set_required_outputs( continue taskdef.set_required_output(output, not optional) + # Clock-expire must be flagged as optional in the graph. + graph_exp = set() + for (task, output) in task_output_opt.keys(): + if output == TASK_OUTPUT_EXPIRED: + graph_exp.add(task) + bad_exp = set() + for task in self.expiration_offsets: + if task not in graph_exp: + bad_exp.add(task) + if bad_exp: + msg = '\n '.join( + [t + f":{TASK_OUTPUT_EXPIRED}?" for t in bad_exp]) + raise WorkflowConfigError( + f"Clock-expire must be flagged as optional: {msg}" + ) + def find_taskdefs(self, name: str) -> Set[TaskDef]: """Find TaskDef objects in family "name" or matching "name". diff --git a/cylc/flow/graph_parser.py b/cylc/flow/graph_parser.py index 3dbab9261b7..20d05b891ba 100644 --- a/cylc/flow/graph_parser.py +++ b/cylc/flow/graph_parser.py @@ -61,6 +61,7 @@ class Replacement: """A class to remember match group information in re.sub() calls""" + def __init__(self, replacement): self.replacement = replacement self.substitutions = [] @@ -747,7 +748,8 @@ def _set_output_opt( if output == TASK_OUTPUT_EXPIRED and not optional: raise GraphParseError( - f"Expired-output {name}:{output} must be optional") + f"{output} must be an optional output:" + f" {name}:{output}?") if output == TASK_OUTPUT_FINISHED: # Interpret :finish pseudo-output diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 6651488b4b9..7fe3e8b079d 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -235,6 +235,8 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True): # state transition message reflects the correct submit_num itask.submit_num += 1 itask.state_reset(TASK_STATUS_PREPARING) + # Avoid clock-expiring an active task: + itask.expire_time = None self.data_store_mgr.delta_task_state(itask) prep_task = self._prep_submit_task_job( workflow, itask, check_syntax=check_syntax) diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index 644b7b0dd3c..e98353ad7b5 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -32,7 +32,8 @@ TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_FAILED) + TASK_OUTPUT_FAILED +) TASK_OUTPUTS = ( TASK_OUTPUT_EXPIRED, @@ -65,12 +66,13 @@ class TaskOutputs: """ # Memory optimization - constrain possible attributes to this list. - __slots__ = ["_by_message", "_by_trigger", "_required"] + __slots__ = ["_by_message", "_by_trigger", "_required", "_expire_ok"] def __init__(self, tdef): self._by_message = {} self._by_trigger = {} self._required = {} # trigger: message + self._expire_ok = True # Add outputs from task def. for trigger, (message, required) in tdef.outputs.items(): @@ -90,9 +92,20 @@ def __init__(self, tdef): ) def _add(self, message, trigger, is_completed=False, required=False): - """Add a new output message""" + """Add a new output message + + required: + - True: is required + - False: is optional + - None: not set + + """ self._by_message[message] = [trigger, message, is_completed] self._by_trigger[trigger] = self._by_message[message] + + if (trigger == TASK_OUTPUT_EXPIRED and required is None): + self._expire_ok = False + if required: self._required[trigger] = message @@ -100,11 +113,14 @@ def set_completed_by_msg(self, message): """For flow trigger --wait: set completed outputs from the DB.""" for trig, msg, _ in self._by_trigger.values(): if message == msg: - self._add(message, trig, True, trig in self._required) + self._add( + message, trig, True, + required=(trig in self._required) + ) break def all_completed(self): - """Return True if all all outputs completed.""" + """Return True if all outputs completed.""" return all(val[_IS_COMPLETED] for val in self._by_message.values()) def exists(self, message=None, trigger=None): @@ -225,7 +241,6 @@ def get_incomplete(self): """Return a list of required outputs that are not complete. A task is incomplete if: - * it finished executing without completing all required outputs * or if job submission failed and the :submit output was not optional @@ -288,7 +303,7 @@ def get_incomplete_implied(self, output: str) -> List[str]: @staticmethod def is_valid_std_name(name): - """Check name is a valid standard output name.""" + """Check name is a valid standard output (including 'expired').""" return name in SORT_ORDERS @staticmethod diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index a1528ec963b..ce47fcc8908 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -114,13 +114,13 @@ def __init__( flow_mgr: 'FlowMgr' ) -> None: self.tokens = tokens - self.config: 'WorkflowConfig' = config + self.config = config self.stop_point = config.stop_point or config.final_point - self.workflow_db_mgr: 'WorkflowDatabaseManager' = workflow_db_mgr - self.task_events_mgr: 'TaskEventsManager' = task_events_mgr + self.workflow_db_mgr = workflow_db_mgr + self.task_events_mgr = task_events_mgr self.task_events_mgr.spawn_func = self.spawn_on_output - self.data_store_mgr: 'DataStoreMgr' = data_store_mgr - self.flow_mgr: 'FlowMgr' = flow_mgr + self.data_store_mgr = data_store_mgr + self.flow_mgr = flow_mgr self.max_future_offset: Optional['IntervalBase'] = None self._prev_runahead_base_point: Optional['PointBase'] = None @@ -1379,25 +1379,19 @@ def remove_if_complete( self.release_runahead_tasks() return ret - if itask.state(TASK_STATUS_EXPIRED): - self.remove(itask, "expired") - if self.compute_runahead(): - self.release_runahead_tasks() - return True - - incomplete = itask.state.outputs.get_incomplete() - if incomplete: - # Keep incomplete tasks in the pool. - if output in TASK_STATUSES_FINAL: - # Log based on the output, not the state, to avoid warnings - # due to use of "cylc set" to set internal outputs on an - # already-finished task. - LOG.warning( - f"[{itask}] did not complete required outputs:" - f" {incomplete}" - ) - return False + if not itask.is_complete(): + # (Includes non-optional expired) + incomplete = itask.state.outputs.get_incomplete() + if incomplete: + # Keep incomplete tasks in the pool. + if output in TASK_STATUSES_FINAL: + LOG.warning( + f"[{itask}] did not complete required outputs:" + f" {incomplete}" + ) + return False + # Remove as complete self.remove(itask) if self.compute_runahead(): self.release_runahead_tasks() @@ -1761,7 +1755,6 @@ def set_prereqs_and_outputs( - future tasks must be specified individually - family names are not expanded to members - Uses a transient task proxy to spawn children. (Even if parent was previously spawned in this flow its children might not have been). @@ -1831,9 +1824,15 @@ def _set_outputs_itask( outputs = sorted(outputs, key=itask.state.outputs.output_sort_key) for output in outputs: - if itask.state.outputs.is_completed(output): - LOG.info(f"output {itask.identity}:{output} completed already") - continue + if output == TASK_OUTPUT_EXPIRED: + if itask.state(*TASK_STATUSES_ACTIVE): + LOG.warning(f"Can't expire active {itask}") + continue + elif itask.state.outputs.is_completed(output): + LOG.info(f"output {itask.identity}:{output}" + " completed already") + continue + self.task_events_mgr.process_message( itask, logging.INFO, output, forced=True) @@ -1919,9 +1918,9 @@ def remove_tasks(self, items): return len(bad_items) def _get_flow_nums( - self, - flow: List[str], - meta: Optional[str] = None, + self, + flow: List[str], + meta: Optional[str] = None, ) -> Optional[Set[int]]: """Get correct flow numbers given user command options.""" if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}): diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 6afb6a2087e..2eb78f9b852 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -571,10 +571,13 @@ def is_finished(self) -> bool: ) def is_complete(self) -> bool: - """Return True if complete or expired.""" + """Return True if task is complete.""" return ( - self.state(TASK_STATUS_EXPIRED) - or not self.state.outputs.is_incomplete() + not self.state.outputs.is_incomplete() + or ( + self.state(TASK_OUTPUT_EXPIRED) + and self.state.outputs._expire_ok + ) ) def set_state_by_outputs(self) -> None: diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 1da5101306b..d21b84f6a8c 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -23,7 +23,6 @@ from cylc.flow.exceptions import TaskDefError from cylc.flow.task_id import TaskID from cylc.flow.task_outputs import ( - TASK_OUTPUT_EXPIRED, TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_SUCCEEDED, @@ -216,9 +215,6 @@ def tweak_outputs(self): ): self.set_required_output(TASK_OUTPUT_SUCCEEDED, True) - # Expired must be optional - self.set_required_output(TASK_OUTPUT_EXPIRED, False) - # In Cylc 7 back compat mode, make all success outputs required. if cylc.flow.flags.cylc7_back_compat: for output in [ diff --git a/tests/integration/scripts/test_completion_server.py b/tests/integration/scripts/test_completion_server.py index 0c792fac3da..d86016bb4d9 100644 --- a/tests/integration/scripts/test_completion_server.py +++ b/tests/integration/scripts/test_completion_server.py @@ -88,7 +88,6 @@ async def test_list_prereqs_and_outputs(flow, scheduler, start): # list outputs (b1) assert await _complete_cylc('cylc', 'set', b1.id, '--out', '') == { # regular task outputs - 'expired', 'failed', 'started', 'submit-failed', diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index 581ec4d83fb..c70ee9ac89f 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -270,6 +270,13 @@ def test_parse_special_tasks_families(flow, scheduler, validate, section): with pytest.raises(WorkflowConfigError) as exc_ctx: config = validate(id_) assert 'external triggers must be used only once' in str(exc_ctx.value) + + elif section == 'clock-expire': + with pytest.raises(WorkflowConfigError) as exc_ctx: + config = validate(id_) + assert ( + 'Clock-expire must be visible in the graph' in str(exc_ctx.value) + ) else: config = validate(id_) assert set(config.cfg['scheduling']['special tasks'][section]) == { diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index b085162a1da..e1338a7ae77 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -99,7 +99,6 @@ async def test_run_job_cmd_no_hosts_error( # killing the task should not result in an error... schd.task_job_mgr.kill_task_jobs( - schd.workflow, schd.pool.get_tasks() ) diff --git a/tests/integration/tui/screenshots/test_show.success.html b/tests/integration/tui/screenshots/test_show.success.html index 5f9c192b04b..c26a207795e 100644 --- a/tests/integration/tui/screenshots/test_show.success.html +++ b/tests/integration/tui/screenshots/test_show.success.html @@ -18,6 +18,7 @@ prerequisites: (None) outputs: ('-': not completed) - 1/foo expired + outputs: ('-': not completed) - 1/foo submitted - 1/foo submit-failed - 1/foo started @@ -36,6 +37,7 @@ + quit: q help: h context: enter tree: - ← + → navigation: ↑ ↓ ↥ ↧ Home End filter tasks: T f s r R filter workflows: W E p - \ No newline at end of file +