diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index 0885da6759b..1974b320760 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -785,9 +785,21 @@ def get_script_common_text(this: str, example: Optional[str] = None): :ref:`SequentialTasks`. ''') + Conf('sequential xtriggers default', VDR.V_BOOLEAN, False, desc=''' + Set to ``True``, this allows for sequential spawning of associated + parentless tasks on xtrigger satisfaction. + Instead of out to the runahead limit (default: ``False``). + + This workflow wide default can be overridden by a reserved + keyword argument in the xtrigger function declaration + (``sequential=True/False``). + + The presence of one sequential xtrigger on a parentless task with + multiple xtriggers will cause sequential behavior. + ''') with Conf('xtriggers', desc=''' - This section is for *External Trigger* function declarations - - see :ref:`Section External Triggers`. + This section is for *External Trigger* function declarations - + see :ref:`Section External Triggers`. '''): Conf('', VDR.V_XTRIGGER, desc=''' Any user-defined event trigger function declarations and @@ -797,26 +809,8 @@ def get_script_common_text(this: str, example: Optional[str] = None): Example:: - .. code-block:: cylc - - [[xtriggers]] - my_trig = my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S - [[[settings]]] - non-sequential xtriggers = my_trig - + ``my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S`` ''') - with Conf('settings', desc=''' - Section heading for xtrigger behavior settings. - ''') as Queue: - Conf('non-sequential xtriggers', VDR.V_STRING_LIST, desc=''' - A list of xtrigger labels whose xtrigger, of associated - parentless task, be checked out to the runahead limit. - - This allows for non-sequential xtrigger checking. - - A task with multiple xtriggers requires all labels - be specified to behave in this way. - ''') with Conf('graph', desc=f''' The workflow graph is defined under this section. diff --git a/cylc/flow/config.py b/cylc/flow/config.py index bdc3d590830..6b7d961deb5 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1760,17 +1760,16 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, validator = XtriggerNameValidator.validate xtrigs = self.cfg['scheduling']['xtriggers'] for label in xtrigs: - if ( - label == 'settings' - and not isinstance(xtrigs[label], SubFuncContext) - ): - continue valid, msg = validator(label) if not valid: raise WorkflowConfigError( f'Invalid xtrigger name "{label}" - {msg}' ) + if self.xtrigger_mgr is not None: + self.xtrigger_mgr.sequential_xtriggers_default = ( + self.cfg['scheduling']['sequential xtriggers default'] + ) for label in xtrig_labels: try: xtrig = xtrigs[label] @@ -1802,12 +1801,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, self.xtrigger_mgr.add_trig(label, xtrig, self.fdir) self.taskdefs[right].add_xtrig_label(label, seq) - if self.xtrigger_mgr is not None: - with suppress(KeyError): - self.xtrigger_mgr.non_sequential_labels.update( - set(xtrigs['settings']['non-sequential xtriggers']) - ) - def get_actual_first_point(self, start_point): """Get actual first cycle point for the workflow diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index f27ee87202e..5f95109014f 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -606,8 +606,8 @@ def load_db_task_pool_for_restart(self, row_idx, row): # Set xtrigger checking type, which effects parentless spawning. if ( itask.tdef.is_parentless(itask.point) - and set(itask.state.xtriggers.keys()).difference( - self.xtrigger_mgr.non_sequential_labels + and set(itask.state.xtriggers.keys()).intersection( + self.xtrigger_mgr.sequential_xtrigger_labels ) ): itask.is_xtrigger_sequential = True @@ -748,8 +748,8 @@ def _get_spawned_or_merged_task( # if the task was found set xtrigger checking type. if ( ntask is not None - and set(ntask.state.xtriggers.keys()).difference( - self.xtrigger_mgr.non_sequential_labels + and set(ntask.state.xtriggers.keys()).intersection( + self.xtrigger_mgr.sequential_xtrigger_labels ) ): ntask.is_xtrigger_sequential = True diff --git a/cylc/flow/unicode_rules.py b/cylc/flow/unicode_rules.py index 751b8ea820e..4c71e190ce3 100644 --- a/cylc/flow/unicode_rules.py +++ b/cylc/flow/unicode_rules.py @@ -323,7 +323,6 @@ class XtriggerNameValidator(UnicodeRuleChecker): RULES = [ allowed_characters(r'a-zA-Z0-9', '_'), not_starts_with('_cylc'), - not_equals('settings'), ] diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 4c1e1f8f6a0..120f15a8185 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -16,6 +16,7 @@ from contextlib import suppress from enum import Enum +from inspect import getfullargspec import json import re from copy import deepcopy @@ -182,6 +183,32 @@ class XtriggerManager: managed uniquely - i.e. many tasks depending on the same clock trigger (with same offset from cycle point) get satisfied by the same call. + Parentless tasks with xtrigger(s) are, by default, spawned out to the + runahead limit. This results in non-sequential, and potentially + unnecessary, checking out to this limit (and may introduce clutter to + user interfaces). An option to make this sequential is now available, + by changing the default for all xtriggers in a workflow, and a way to + override this default with a (reserved) keyword function argument + (i.e. "sequential=True/False"): + + # Example: + [scheduling] + sequential xtrigger default = True + [[xtriggers]] + clock_0 = wall_clock() # offset PT0H + clock_1 = wall_clock(offset=PT1H) + # or wall_clock(PT1H) + workflow_x = workflow_state( + workflow=other, + point=%(task_cycle_point)s, + sequential=False + ):PT30S + [[graph]] + PT1H = ''' + @clock_1 & @workflow_x => foo & bar + @wall_clock = baz # pre-defined zero-offset clock + ''' + Args: workflow: workflow name user: workflow owner @@ -213,8 +240,11 @@ def __init__( # Clock labels, to avoid repeated string comparisons self.wall_clock_labels: set = set() + + # Workflow wide default, used when not specified in xtrigger kwargs. + self.sequential_xtriggers_default = False # Labels whose xtrigger will be checked out to the RH limit. - self.non_sequential_labels: set = set() + self.sequential_xtrigger_labels: set = set() # Gather parentless tasks whose xtrigger(s) have been satisfied # (these will be used to spawn the next occurance). self.sequential_spawn_next: set = set() @@ -255,6 +285,7 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None: * If the function module was not found. * If the function was not found in the xtrigger module. * If the function is not callable. + * If the function is not callable. * If any string template in the function context arguments are not present in the expected template values. @@ -280,6 +311,15 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None: fname, f"'{fname}' not callable in xtrigger module '{fname}'", ) + if 'sequential' in getfullargspec(func).args: + raise XtriggerConfigError( + label, + fname, + ( + f"xtrigger module '{fname}' contains reserved" + " argument name 'sequential'" + ), + ) # Check any string templates in the function arg values (note this # won't catch bad task-specific values - which are added dynamically). @@ -324,6 +364,11 @@ def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None: """ self.validate_xtrigger(label, fctx, fdir) self.functx_map[label] = fctx + if fctx.func_kwargs.pop( + 'sequential', + self.sequential_xtriggers_default + ): + self.sequential_xtrigger_labels.add(label) if fctx.func_name == "wall_clock": self.wall_clock_labels.add(label) @@ -434,11 +479,11 @@ def call_xtriggers_async(self, itask: TaskProxy): # Special case: quick synchronous clock check. if wall_clock(*ctx.func_args, **ctx.func_kwargs): itask.state.xtriggers[label] = True - if itask.is_xtrigger_sequential: - self.sequential_spawn_next.add(itask.identity) self.sat_xtrig[sig] = {} self.data_store_mgr.delta_task_xtrigger(sig, True) LOG.info('xtrigger satisfied: %s = %s', label, sig) + if itask.is_xtrigger_sequential: + self.sequential_spawn_next.add(itask.identity) continue # General case: potentially slow asynchronous function call. if sig in self.sat_xtrig: diff --git a/tests/functional/cylc-config/00-simple/section1.stdout b/tests/functional/cylc-config/00-simple/section1.stdout index e6e21d3d84b..8695889440e 100644 --- a/tests/functional/cylc-config/00-simple/section1.stdout +++ b/tests/functional/cylc-config/00-simple/section1.stdout @@ -6,6 +6,7 @@ hold after cycle point = stop after cycle point = cycling mode = integer runahead limit = P4 +sequential xtriggers default = False [[queues]] [[[default]]] limit = 100 @@ -16,7 +17,5 @@ runahead limit = P4 clock-expire = sequential = [[xtriggers]] - [[[settings]]] - non-sequential xtriggers = [[graph]] R1 = OPS:finish-all => VAR