From 2fd944dc0d5f685056fd7df45e07e87da97861e3 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 8 Jun 2023 22:15:19 +1200 Subject: [PATCH] Graduated force-triggering (prereqs, xtriggers, queues). --- cylc/flow/task_pool.py | 89 ++++++++++++++++++++++++++++++----------- cylc/flow/task_proxy.py | 5 +-- cylc/flow/task_state.py | 5 +++ 3 files changed, 72 insertions(+), 27 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index e50df1591a0..82514fcf266 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -91,6 +91,9 @@ class TaskPool: ERR_TMPL_NO_TASKID_MATCH = "No matching tasks found: {0}" ERR_PREFIX_TASK_NOT_ON_SEQUENCE = "Invalid cycle point for task: {0}, {1}" SUICIDE_MSG = "suicide" + FORCE_TRIGGER_MSG = ( + "Trigger me again to run immediately, if I have other constraints" + ) def __init__( self, @@ -682,6 +685,12 @@ def rh_release_and_queue(self, itask) -> None: """ if itask.state_reset(is_runahead=False): self.data_store_mgr.delta_task_runahead(itask) + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) + if all(itask.is_ready_to_run()): # (otherwise waiting on xtriggers etc.) self.queue_task(itask) @@ -1642,14 +1651,24 @@ def force_trigger_tasks( flow_wait: bool = False, flow_descr: Optional[str] = None ) -> int: - """Manual task triggering. + """Forced (manual) task triggering. + + Force-triggering a future (n>0) task: + - spawns it into n=0 with prerequisites satisfied + (and with the given or default flow numbers) + + Force-triggering an n=0 task: + - satisfies its prerequisites, if any are unsatisfied; + - else satisfies its xtriggers, if any are unsatisfied; + - else queues it, if not already queued; + - else releases it to run, if queued. - Don't get a new flow number for existing n=0 tasks (e.g. incomplete - tasks). These can carry on in the original flow if retriggered. + This always releases tasks from runahead limiting. - Queue the task if not queued, otherwise release it to run. + TODO - extend to push ext-triggers. """ + # Process provided flow options. if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}): if len(flow) != 1: LOG.warning( @@ -1672,16 +1691,15 @@ def force_trigger_tasks( ) return 0 - # n_warnings, task_items = self.match_taskdefs(items) + # Attempt to match target tasks. itasks, future_tasks, unmatched = self.filter_task_proxies( items, future=True, warn=False, ) - # Spawn future tasks. for name, point in future_tasks: - # (Flow values already validated by the trigger client). + # Trigger future_tasks with prerequisites satisfied. itask = self.spawn_task( name, point, @@ -1692,32 +1710,57 @@ def force_trigger_tasks( ) if itask is None: continue - itasks.append(itask) + itask.is_manual_submit = True + LOG.info( + f"[{itask}] - forcing prerequisites" + f"\n{self.__class__.FORCE_TRIGGER_MSG}" + ) + itask.state.set_prerequisites_all_satisfied() + self.add_to_pool(itask) + self.rh_release_and_queue(itask) - # Trigger matched tasks if not already active. for itask in itasks: + # Trigger existing n=0 tasks. if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.warning(f"[{itask}] ignoring trigger - already active") continue + itask.is_manual_submit = True - itask.reset_try_timers() - # (If None, spawner reports cycle bounds errors). - if itask.state_reset(TASK_STATUS_WAITING): - # (could also be unhandled failed) - self.data_store_mgr.delta_task_state(itask) - # (No need to set prerequisites satisfied here). self.add_to_pool(itask) # move from hidden if necessary. + if itask.state.is_runahead: - # Release from runahead, and queue it. self.rh_release_and_queue(itask) - self.spawn_to_rh_limit( - itask.tdef, - itask.tdef.next_point(itask.point), - itask.flow_nums + + if itask.state_reset(TASK_STATUS_WAITING): + # We're retriggering an incomplete task. + self.data_store_mgr.delta_task_state(itask) + itask.reset_try_timers() + + if itask.is_task_prereqs_not_done(): + LOG.info( + f"[{itask}] - forcing prerequisites" + f"\n{self.__class__.FORCE_TRIGGER_MSG}" ) - else: - # De-queue it to run now. - self.task_queue_mgr.force_release_task(itask) + itask.state.set_prerequisites_all_satisfied() + continue + + if ( + itask.state.xtriggers and + not itask.state.xtriggers_all_satisfied() + ): + LOG.info( + f"[{itask}] - forcing xtriggers" + f"\n{self.__class__.FORCE_TRIGGER_MSG}" + ) + # TODO - does xtrigger manager need to know? + itask.state.xtriggers_set_all_satisfied() + continue + + LOG.info( + f"[{itask}] - forcing queue release" + f"\n{self.__class__.FORCE_TRIGGER_MSG}" + ) + self.task_queue_mgr.force_release_task(itask) return len(unmatched) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 271049126fd..09269d3bc12 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -390,12 +390,9 @@ def is_ready_to_run(self) -> Tuple[bool, ...]: """Is this task ready to run? Takes account of all dependence: on other tasks, xtriggers, and - old-style ext-triggers. Or, manual triggering. + old-style ext-triggers. """ - if self.is_manual_submit: - # Manually triggered, ignore unsatisfied prerequisites. - return (True,) if self.state.is_held: # A held task is not ready to run. return (False,) diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index 9dea7e9af28..b6f4ef62464 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -320,6 +320,11 @@ def xtriggers_all_satisfied(self): """Return True if all xtriggers are satisfied.""" return all(self.xtriggers.values()) + def xtriggers_set_all_satisfied(self): + """Set all xtriggers to satisfied.""" + for xtrig in self.xtriggers.keys(): + self.xtriggers[xtrig] = True + def external_triggers_all_satisfied(self): """Return True if all external triggers are satisfied.""" return all(self.external_triggers.values())