From 209e621b2d2603eb02218c9336aac8e5b04c45a5 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 15 Sep 2023 13:24:21 +1200 Subject: [PATCH] parentless sequential xtrigger spawning --- cylc/flow/cfgspec/workflow.py | 20 ++++- cylc/flow/config.py | 16 +++- cylc/flow/scheduler.py | 5 ++ cylc/flow/task_pool.py | 81 ++++++++++++++++--- cylc/flow/task_proxy.py | 2 + cylc/flow/unicode_rules.py | 1 + cylc/flow/xtrigger_mgr.py | 18 ++++- .../cylc-config/00-simple/section1.stdout | 2 + 8 files changed, 127 insertions(+), 18 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index a6d16bbe76d..0885da6759b 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -797,8 +797,26 @@ def get_script_common_text(this: str, example: Optional[str] = None): Example:: - ``my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S`` + .. code-block:: cylc + + [[xtriggers]] + my_trig = my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S + [[[settings]]] + non-sequential xtriggers = my_trig + ''') + with Conf('settings', desc=''' + Section heading for xtrigger behavior settings. + ''') as Queue: + Conf('non-sequential xtriggers', VDR.V_STRING_LIST, desc=''' + A list of xtrigger labels whose xtrigger, of associated + parentless task, be checked out to the runahead limit. + + This allows for non-sequential xtrigger checking. + + A task with multiple xtriggers requires all labels + be specified to behave in this way. + ''') with Conf('graph', desc=f''' The workflow graph is defined under this section. diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 79d305da57a..bdc3d590830 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1758,7 +1758,13 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, self.taskdefs[right].add_dependency(dependency, seq) validator = XtriggerNameValidator.validate - for label in self.cfg['scheduling']['xtriggers']: + xtrigs = self.cfg['scheduling']['xtriggers'] + for label in xtrigs: + if ( + label == 'settings' + and not isinstance(xtrigs[label], SubFuncContext) + ): + continue valid, msg = validator(label) if not valid: raise WorkflowConfigError( @@ -1767,7 +1773,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, for label in xtrig_labels: try: - xtrig = self.cfg['scheduling']['xtriggers'][label] + xtrig = xtrigs[label] except KeyError: if label != 'wall_clock': raise WorkflowConfigError(f"xtrigger not defined: {label}") @@ -1796,6 +1802,12 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, self.xtrigger_mgr.add_trig(label, xtrig, self.fdir) self.taskdefs[right].add_xtrig_label(label, seq) + if self.xtrigger_mgr is not None: + with suppress(KeyError): + self.xtrigger_mgr.non_sequential_labels.update( + set(xtrigs['settings']['non-sequential xtriggers']) + ) + def get_actual_first_point(self, start_point): """Get actual first cycle point for the workflow diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7449e7d274a..461dd7b603b 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -467,6 +467,7 @@ async def configure(self, params): self.config, self.workflow_db_mgr, self.task_events_mgr, + self.xtrigger_mgr, self.data_store_mgr, self.flow_mgr ) @@ -1733,6 +1734,10 @@ async def main_loop(self) -> None: ): self.pool.queue_task(itask) + if self.xtrigger_mgr.sequential_spawn_next: + # Spawn parentless tasks with sequentially checked xtrigger(s). + self.pool.spawn_parentless_sequential_xtriggers() + if housekeep_xtriggers: # (Could do this periodically?) self.xtrigger_mgr.housekeep(self.pool.get_tasks()) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 92905becf1e..f27ee87202e 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -79,6 +79,7 @@ from cylc.flow.data_store_mgr import DataStoreMgr from cylc.flow.taskdef import TaskDef from cylc.flow.task_events_mgr import TaskEventsManager + from cylc.flow.xtrigger_mgr import XtriggerManager from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager from cylc.flow.flow_mgr import FlowMgr, FlowNums @@ -98,6 +99,7 @@ def __init__( config: 'WorkflowConfig', workflow_db_mgr: 'WorkflowDatabaseManager', task_events_mgr: 'TaskEventsManager', + xtrigger_mgr: 'XtriggerManager', data_store_mgr: 'DataStoreMgr', flow_mgr: 'FlowMgr' ) -> None: @@ -108,6 +110,7 @@ def __init__( self.task_events_mgr: 'TaskEventsManager' = task_events_mgr # TODO this is ugly: self.task_events_mgr.spawn_func = self.spawn_on_output + self.xtrigger_mgr: 'XtriggerManager' = xtrigger_mgr self.data_store_mgr: 'DataStoreMgr' = data_store_mgr self.flow_mgr: 'FlowMgr' = flow_mgr @@ -281,11 +284,12 @@ def release_runahead_tasks(self): for itask in release_me: self.rh_release_and_queue(itask) - self.spawn_to_rh_limit( - itask.tdef, - itask.tdef.next_point(itask.point), - itask.flow_nums - ) + if not itask.is_xtrigger_sequential: + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) released = True return released @@ -599,6 +603,15 @@ def load_db_task_pool_for_restart(self, row_idx, row): ) ) + # Set xtrigger checking type, which effects parentless spawning. + if ( + itask.tdef.is_parentless(itask.point) + and set(itask.state.xtriggers.keys()).difference( + self.xtrigger_mgr.non_sequential_labels + ) + ): + itask.is_xtrigger_sequential = True + if itask.state_reset(status, is_runahead=True): self.data_store_mgr.delta_task_runahead(itask) self.add_to_pool(itask) @@ -721,45 +734,71 @@ def rh_release_and_queue(self, itask) -> None: def _get_spawned_or_merged_task( self, point: 'PointBase', name: str, flow_nums: 'FlowNums' - ) -> Optional[TaskProxy]: + ) -> 'Tuple[Optional[TaskProxy], bool]': """Return new or existing task point/name with merged flow_nums""" taskid = Tokens(cycle=str(point), task=name).relative_id ntask = ( self._get_hidden_task_by_id(taskid) or self._get_main_task_by_id(taskid) ) + is_in_pool = False if ntask is None: # ntask does not exist: spawn it in the flow. ntask = self.spawn_task(name, point, flow_nums) + # if the task was found set xtrigger checking type. + if ( + ntask is not None + and set(ntask.state.xtriggers.keys()).difference( + self.xtrigger_mgr.non_sequential_labels + ) + ): + ntask.is_xtrigger_sequential = True else: # ntask already exists (n=0 or incomplete): merge flows. + is_in_pool = True self.merge_flows(ntask, flow_nums) - return ntask # may be None + return ntask, is_in_pool # may be None def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: - """Spawn parentless task instances from point to runahead limit.""" + """Spawn parentless task instances from point to runahead limit. + + Sequentially checked xtriggers with spawn corresponding task out to the + next task with any xtrigger with the same behaviour, or to the runahead + limit (whichever occurs first). + + """ if not flow_nums or point is None: # Force-triggered no-flow task. # Or called with an invalid next_point. return if self.runahead_limit_point is None: self.compute_runahead() + + is_sequential = False while point is not None and (point <= self.runahead_limit_point): if tdef.is_parentless(point): - ntask = self._get_spawned_or_merged_task( + ntask, is_in_pool = self._get_spawned_or_merged_task( point, tdef.name, flow_nums ) if ntask is not None: - self.add_to_pool(ntask) + if not is_in_pool: + self.add_to_pool(ntask) self.rh_release_and_queue(ntask) + if ntask.is_xtrigger_sequential: + is_sequential = True + break point = tdef.next_point(point) # Once more (for the rh-limited task: don't rh release it!) - if point is not None and tdef.is_parentless(point): - ntask = self._get_spawned_or_merged_task( + if ( + point is not None + and tdef.is_parentless(point) + and not is_sequential + ): + ntask, is_in_pool = self._get_spawned_or_merged_task( point, tdef.name, flow_nums ) - if ntask is not None: + if ntask is not None and not is_in_pool: self.add_to_pool(ntask) def remove(self, itask, reason=""): @@ -1753,6 +1792,22 @@ def force_trigger_tasks( return len(unmatched) + def spawn_parentless_sequential_xtriggers(self): + """Spawn successor(s) of parentless wall clock satisfied tasks.""" + while self.xtrigger_mgr.sequential_spawn_next: + taskid = self.xtrigger_mgr.sequential_spawn_next.pop() + itask = ( + self._get_hidden_task_by_id(taskid) + or self._get_main_task_by_id(taskid) + ) + # Will spawn out to RH limit or next parentless clock trigger + # or non-parentless. + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) + def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool: """Simulation mode: simulate task run times and set states.""" if not self.config.run_mode('simulation'): diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 271049126fd..31c4fc5a8f1 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -180,6 +180,7 @@ class TaskProxy: 'tokens', 'try_timers', 'waiting_on_job_prep', + 'is_xtrigger_sequential', ] def __init__( @@ -214,6 +215,7 @@ def __init__( task=self.tdef.name, ) self.identity = self.tokens.relative_id + self.is_xtrigger_sequential = False self.reload_successor: Optional['TaskProxy'] = None self.point_as_seconds: Optional[int] = None diff --git a/cylc/flow/unicode_rules.py b/cylc/flow/unicode_rules.py index 4c71e190ce3..751b8ea820e 100644 --- a/cylc/flow/unicode_rules.py +++ b/cylc/flow/unicode_rules.py @@ -323,6 +323,7 @@ class XtriggerNameValidator(UnicodeRuleChecker): RULES = [ allowed_characters(r'a-zA-Z0-9', '_'), not_starts_with('_cylc'), + not_equals('settings'), ] diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 4512d97b7c8..4c1e1f8f6a0 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -211,6 +211,14 @@ def __init__( # Signatures of active functions (waiting on callback). self.active: list = [] + # Clock labels, to avoid repeated string comparisons + self.wall_clock_labels: set = set() + # Labels whose xtrigger will be checked out to the RH limit. + self.non_sequential_labels: set = set() + # Gather parentless tasks whose xtrigger(s) have been satisfied + # (these will be used to spawn the next occurance). + self.sequential_spawn_next: set = set() + self.workflow_run_dir = workflow_run_dir # For function arg templating. @@ -316,6 +324,8 @@ def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None: """ self.validate_xtrigger(label, fctx, fdir) self.functx_map[label] = fctx + if fctx.func_name == "wall_clock": + self.wall_clock_labels.add(label) def mutate_trig(self, label, kwargs): self.functx_map[label].func_kwargs.update(kwargs) @@ -380,7 +390,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: args = [] kwargs = {} - if ctx.func_name == "wall_clock": + if label in self.wall_clock_labels: if "trigger_time" in ctx.func_kwargs: # Internal (retry timer): trigger_time already set. kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"] @@ -420,10 +430,12 @@ def call_xtriggers_async(self, itask: TaskProxy): itask: task proxy to check. """ for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True): - if sig.startswith("wall_clock"): + if label in self.wall_clock_labels: # Special case: quick synchronous clock check. if wall_clock(*ctx.func_args, **ctx.func_kwargs): itask.state.xtriggers[label] = True + if itask.is_xtrigger_sequential: + self.sequential_spawn_next.add(itask.identity) self.sat_xtrig[sig] = {} self.data_store_mgr.delta_task_xtrigger(sig, True) LOG.info('xtrigger satisfied: %s = %s', label, sig) @@ -443,6 +455,8 @@ def call_xtriggers_async(self, itask: TaskProxy): [itask.tdef.name], xtrigger_env ) + if itask.is_xtrigger_sequential: + self.sequential_spawn_next.add(itask.identity) continue if sig in self.active: # Already waiting on this result. diff --git a/tests/functional/cylc-config/00-simple/section1.stdout b/tests/functional/cylc-config/00-simple/section1.stdout index 972b7a55606..e6e21d3d84b 100644 --- a/tests/functional/cylc-config/00-simple/section1.stdout +++ b/tests/functional/cylc-config/00-simple/section1.stdout @@ -16,5 +16,7 @@ runahead limit = P4 clock-expire = sequential = [[xtriggers]] + [[[settings]]] + non-sequential xtriggers = [[graph]] R1 = OPS:finish-all => VAR