Skip to content

Commit

Permalink
default reversed, xtrigger argument added
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Sep 21, 2023
1 parent 2a57adc commit ba46897
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 42 deletions.
36 changes: 15 additions & 21 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,9 +785,21 @@ def get_script_common_text(this: str, example: Optional[str] = None):
:ref:`SequentialTasks`.
''')

Conf('sequential xtriggers default', VDR.V_BOOLEAN, False, desc='''
Set to ``True``, this allows for sequential spawning of associated
parentless tasks on xtrigger satisfaction.
Instead of out to the runahead limit (default: ``False``).
This workflow wide default can be overridden by a reserved
keyword argument in the xtrigger function declaration
(``sequential=True/False``).
The presence of one sequential xtrigger on a parentless task with
multiple xtriggers will cause sequential behavior.
''')
with Conf('xtriggers', desc='''
This section is for *External Trigger* function declarations -
see :ref:`Section External Triggers`.
This section is for *External Trigger* function declarations -
see :ref:`Section External Triggers`.
'''):
Conf('<xtrigger name>', VDR.V_XTRIGGER, desc='''
Any user-defined event trigger function declarations and
Expand All @@ -797,26 +809,8 @@ def get_script_common_text(this: str, example: Optional[str] = None):
Example::
.. code-block:: cylc
[[xtriggers]]
my_trig = my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S
[[[settings]]]
non-sequential xtriggers = my_trig
``my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S``
''')
with Conf('settings', desc='''
Section heading for xtrigger behavior settings.
''') as Queue:
Conf('non-sequential xtriggers', VDR.V_STRING_LIST, desc='''
A list of xtrigger labels whose xtrigger, of associated
parentless task, be checked out to the runahead limit.
This allows for non-sequential xtrigger checking.
A task with multiple xtriggers requires all labels
be specified to behave in this way.
''')

with Conf('graph', desc=f'''
The workflow graph is defined under this section.
Expand Down
15 changes: 4 additions & 11 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1760,17 +1760,16 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
validator = XtriggerNameValidator.validate
xtrigs = self.cfg['scheduling']['xtriggers']
for label in xtrigs:
if (
label == 'settings'
and not isinstance(xtrigs[label], SubFuncContext)
):
continue
valid, msg = validator(label)
if not valid:
raise WorkflowConfigError(
f'Invalid xtrigger name "{label}" - {msg}'
)

if self.xtrigger_mgr is not None:
self.xtrigger_mgr.sequential_xtriggers_default = (
self.cfg['scheduling']['sequential xtriggers default']
)
for label in xtrig_labels:
try:
xtrig = xtrigs[label]
Expand Down Expand Up @@ -1802,12 +1801,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
self.taskdefs[right].add_xtrig_label(label, seq)

if self.xtrigger_mgr is not None:
with suppress(KeyError):
self.xtrigger_mgr.non_sequential_labels.update(
set(xtrigs['settings']['non-sequential xtriggers'])
)

def get_actual_first_point(self, start_point):
"""Get actual first cycle point for the workflow
Expand Down
8 changes: 4 additions & 4 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ def load_db_task_pool_for_restart(self, row_idx, row):
# Set xtrigger checking type, which effects parentless spawning.
if (
itask.tdef.is_parentless(itask.point)
and set(itask.state.xtriggers.keys()).difference(
self.xtrigger_mgr.non_sequential_labels
and set(itask.state.xtriggers.keys()).intersection(
self.xtrigger_mgr.sequential_xtrigger_labels
)
):
itask.is_xtrigger_sequential = True

Check warning on line 613 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L613

Added line #L613 was not covered by tests
Expand Down Expand Up @@ -748,8 +748,8 @@ def _get_spawned_or_merged_task(
# if the task was found set xtrigger checking type.
if (
ntask is not None
and set(ntask.state.xtriggers.keys()).difference(
self.xtrigger_mgr.non_sequential_labels
and set(ntask.state.xtriggers.keys()).intersection(
self.xtrigger_mgr.sequential_xtrigger_labels
)
):
ntask.is_xtrigger_sequential = True

Check warning on line 755 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L755

Added line #L755 was not covered by tests
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/unicode_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ class XtriggerNameValidator(UnicodeRuleChecker):
RULES = [
allowed_characters(r'a-zA-Z0-9', '_'),
not_starts_with('_cylc'),
not_equals('settings'),
]


Expand Down
51 changes: 48 additions & 3 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from contextlib import suppress
from enum import Enum
from inspect import getfullargspec
import json
import re
from copy import deepcopy
Expand Down Expand Up @@ -182,6 +183,32 @@ class XtriggerManager:
managed uniquely - i.e. many tasks depending on the same clock trigger
(with same offset from cycle point) get satisfied by the same call.
Parentless tasks with xtrigger(s) are, by default, spawned out to the
runahead limit. This results in non-sequential, and potentially
unnecessary, checking out to this limit (and may introduce clutter to
user interfaces). An option to make this sequential is now available,
by changing the default for all xtriggers in a workflow, and a way to
override this default with a (reserved) keyword function argument
(i.e. "sequential=True/False"):
# Example:
[scheduling]
sequential xtrigger default = True
[[xtriggers]]
clock_0 = wall_clock() # offset PT0H
clock_1 = wall_clock(offset=PT1H)
# or wall_clock(PT1H)
workflow_x = workflow_state(
workflow=other,
point=%(task_cycle_point)s,
sequential=False
):PT30S
[[graph]]
PT1H = '''
@clock_1 & @workflow_x => foo & bar
@wall_clock = baz # pre-defined zero-offset clock
'''
Args:
workflow: workflow name
user: workflow owner
Expand Down Expand Up @@ -213,8 +240,11 @@ def __init__(

# Clock labels, to avoid repeated string comparisons
self.wall_clock_labels: set = set()

# Workflow wide default, used when not specified in xtrigger kwargs.
self.sequential_xtriggers_default = False
# Labels whose xtrigger will be checked out to the RH limit.
self.non_sequential_labels: set = set()
self.sequential_xtrigger_labels: set = set()
# Gather parentless tasks whose xtrigger(s) have been satisfied
# (these will be used to spawn the next occurance).
self.sequential_spawn_next: set = set()
Expand Down Expand Up @@ -255,6 +285,7 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
* If the function module was not found.
* If the function was not found in the xtrigger module.
* If the function is not callable.
* If the function is not callable.
* If any string template in the function context
arguments are not present in the expected template values.
Expand All @@ -280,6 +311,15 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
fname,
f"'{fname}' not callable in xtrigger module '{fname}'",
)
if 'sequential' in getfullargspec(func).args:
raise XtriggerConfigError(

Check warning on line 315 in cylc/flow/xtrigger_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtrigger_mgr.py#L315

Added line #L315 was not covered by tests
label,
fname,
(
f"xtrigger module '{fname}' contains reserved"
" argument name 'sequential'"
),
)

# Check any string templates in the function arg values (note this
# won't catch bad task-specific values - which are added dynamically).
Expand Down Expand Up @@ -324,6 +364,11 @@ def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None:
"""
self.validate_xtrigger(label, fctx, fdir)
self.functx_map[label] = fctx
if fctx.func_kwargs.pop(
'sequential',
self.sequential_xtriggers_default
):
self.sequential_xtrigger_labels.add(label)

Check warning on line 371 in cylc/flow/xtrigger_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtrigger_mgr.py#L371

Added line #L371 was not covered by tests
if fctx.func_name == "wall_clock":
self.wall_clock_labels.add(label)

Expand Down Expand Up @@ -434,11 +479,11 @@ def call_xtriggers_async(self, itask: TaskProxy):
# Special case: quick synchronous clock check.
if wall_clock(*ctx.func_args, **ctx.func_kwargs):
itask.state.xtriggers[label] = True
if itask.is_xtrigger_sequential:
self.sequential_spawn_next.add(itask.identity)
self.sat_xtrig[sig] = {}
self.data_store_mgr.delta_task_xtrigger(sig, True)
LOG.info('xtrigger satisfied: %s = %s', label, sig)
if itask.is_xtrigger_sequential:
self.sequential_spawn_next.add(itask.identity)

Check warning on line 486 in cylc/flow/xtrigger_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtrigger_mgr.py#L486

Added line #L486 was not covered by tests
continue
# General case: potentially slow asynchronous function call.
if sig in self.sat_xtrig:
Expand Down
3 changes: 1 addition & 2 deletions tests/functional/cylc-config/00-simple/section1.stdout
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ hold after cycle point =
stop after cycle point =
cycling mode = integer
runahead limit = P4
sequential xtriggers default = False
[[queues]]
[[[default]]]
limit = 100
Expand All @@ -16,7 +17,5 @@ runahead limit = P4
clock-expire =
sequential =
[[xtriggers]]
[[[settings]]]
non-sequential xtriggers =
[[graph]]
R1 = OPS:finish-all => VAR

0 comments on commit ba46897

Please sign in to comment.