Skip to content

Commit

Permalink
parentless sequential xtrigger spawning
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Sep 20, 2023
1 parent cfb3f76 commit 209e621
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 18 deletions.
20 changes: 19 additions & 1 deletion cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,8 +797,26 @@ def get_script_common_text(this: str, example: Optional[str] = None):
Example::
``my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S``
.. code-block:: cylc
[[xtriggers]]
my_trig = my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S
[[[settings]]]
non-sequential xtriggers = my_trig
''')
with Conf('settings', desc='''
Section heading for xtrigger behavior settings.
''') as Queue:
Conf('non-sequential xtriggers', VDR.V_STRING_LIST, desc='''
A list of xtrigger labels whose xtrigger, of associated
parentless task, be checked out to the runahead limit.
This allows for non-sequential xtrigger checking.
A task with multiple xtriggers requires all labels
be specified to behave in this way.
''')

with Conf('graph', desc=f'''
The workflow graph is defined under this section.
Expand Down
16 changes: 14 additions & 2 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,13 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
self.taskdefs[right].add_dependency(dependency, seq)

validator = XtriggerNameValidator.validate
for label in self.cfg['scheduling']['xtriggers']:
xtrigs = self.cfg['scheduling']['xtriggers']
for label in xtrigs:
if (
label == 'settings'
and not isinstance(xtrigs[label], SubFuncContext)
):
continue
valid, msg = validator(label)
if not valid:
raise WorkflowConfigError(
Expand All @@ -1767,7 +1773,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,

for label in xtrig_labels:
try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
xtrig = xtrigs[label]
except KeyError:
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
Expand Down Expand Up @@ -1796,6 +1802,12 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
self.taskdefs[right].add_xtrig_label(label, seq)

if self.xtrigger_mgr is not None:
with suppress(KeyError):
self.xtrigger_mgr.non_sequential_labels.update(
set(xtrigs['settings']['non-sequential xtriggers'])
)

def get_actual_first_point(self, start_point):
"""Get actual first cycle point for the workflow
Expand Down
5 changes: 5 additions & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ async def configure(self, params):
self.config,
self.workflow_db_mgr,
self.task_events_mgr,
self.xtrigger_mgr,
self.data_store_mgr,
self.flow_mgr
)
Expand Down Expand Up @@ -1733,6 +1734,10 @@ async def main_loop(self) -> None:
):
self.pool.queue_task(itask)

if self.xtrigger_mgr.sequential_spawn_next:
# Spawn parentless tasks with sequentially checked xtrigger(s).
self.pool.spawn_parentless_sequential_xtriggers()

if housekeep_xtriggers:
# (Could do this periodically?)
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
Expand Down
81 changes: 68 additions & 13 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.taskdef import TaskDef
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.xtrigger_mgr import XtriggerManager
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.flow_mgr import FlowMgr, FlowNums

Expand All @@ -98,6 +99,7 @@ def __init__(
config: 'WorkflowConfig',
workflow_db_mgr: 'WorkflowDatabaseManager',
task_events_mgr: 'TaskEventsManager',
xtrigger_mgr: 'XtriggerManager',
data_store_mgr: 'DataStoreMgr',
flow_mgr: 'FlowMgr'
) -> None:
Expand All @@ -108,6 +110,7 @@ def __init__(
self.task_events_mgr: 'TaskEventsManager' = task_events_mgr
# TODO this is ugly:
self.task_events_mgr.spawn_func = self.spawn_on_output
self.xtrigger_mgr: 'XtriggerManager' = xtrigger_mgr
self.data_store_mgr: 'DataStoreMgr' = data_store_mgr
self.flow_mgr: 'FlowMgr' = flow_mgr

Expand Down Expand Up @@ -281,11 +284,12 @@ def release_runahead_tasks(self):

for itask in release_me:
self.rh_release_and_queue(itask)
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)
if not itask.is_xtrigger_sequential:
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)
released = True

return released
Expand Down Expand Up @@ -599,6 +603,15 @@ def load_db_task_pool_for_restart(self, row_idx, row):
)
)

# Set xtrigger checking type, which effects parentless spawning.
if (
itask.tdef.is_parentless(itask.point)
and set(itask.state.xtriggers.keys()).difference(
self.xtrigger_mgr.non_sequential_labels
)
):
itask.is_xtrigger_sequential = True

Check warning on line 613 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L613

Added line #L613 was not covered by tests

if itask.state_reset(status, is_runahead=True):
self.data_store_mgr.delta_task_runahead(itask)
self.add_to_pool(itask)
Expand Down Expand Up @@ -721,45 +734,71 @@ def rh_release_and_queue(self, itask) -> None:

def _get_spawned_or_merged_task(
self, point: 'PointBase', name: str, flow_nums: 'FlowNums'
) -> Optional[TaskProxy]:
) -> 'Tuple[Optional[TaskProxy], bool]':
"""Return new or existing task point/name with merged flow_nums"""
taskid = Tokens(cycle=str(point), task=name).relative_id
ntask = (
self._get_hidden_task_by_id(taskid)
or self._get_main_task_by_id(taskid)
)
is_in_pool = False
if ntask is None:
# ntask does not exist: spawn it in the flow.
ntask = self.spawn_task(name, point, flow_nums)
# if the task was found set xtrigger checking type.
if (
ntask is not None
and set(ntask.state.xtriggers.keys()).difference(
self.xtrigger_mgr.non_sequential_labels
)
):
ntask.is_xtrigger_sequential = True
else:
# ntask already exists (n=0 or incomplete): merge flows.
is_in_pool = True
self.merge_flows(ntask, flow_nums)
return ntask # may be None
return ntask, is_in_pool # may be None

def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
"""Spawn parentless task instances from point to runahead limit."""
"""Spawn parentless task instances from point to runahead limit.
Sequentially checked xtriggers with spawn corresponding task out to the
next task with any xtrigger with the same behaviour, or to the runahead
limit (whichever occurs first).
"""
if not flow_nums or point is None:
# Force-triggered no-flow task.
# Or called with an invalid next_point.
return
if self.runahead_limit_point is None:
self.compute_runahead()

is_sequential = False
while point is not None and (point <= self.runahead_limit_point):
if tdef.is_parentless(point):
ntask = self._get_spawned_or_merged_task(
ntask, is_in_pool = self._get_spawned_or_merged_task(
point, tdef.name, flow_nums
)
if ntask is not None:
self.add_to_pool(ntask)
if not is_in_pool:
self.add_to_pool(ntask)
self.rh_release_and_queue(ntask)
if ntask.is_xtrigger_sequential:
is_sequential = True
break
point = tdef.next_point(point)

# Once more (for the rh-limited task: don't rh release it!)
if point is not None and tdef.is_parentless(point):
ntask = self._get_spawned_or_merged_task(
if (
point is not None
and tdef.is_parentless(point)
and not is_sequential
):
ntask, is_in_pool = self._get_spawned_or_merged_task(
point, tdef.name, flow_nums
)
if ntask is not None:
if ntask is not None and not is_in_pool:
self.add_to_pool(ntask)

def remove(self, itask, reason=""):
Expand Down Expand Up @@ -1753,6 +1792,22 @@ def force_trigger_tasks(

return len(unmatched)

def spawn_parentless_sequential_xtriggers(self):
"""Spawn successor(s) of parentless wall clock satisfied tasks."""
while self.xtrigger_mgr.sequential_spawn_next:
taskid = self.xtrigger_mgr.sequential_spawn_next.pop()
itask = (
self._get_hidden_task_by_id(taskid)
or self._get_main_task_by_id(taskid)
)
# Will spawn out to RH limit or next parentless clock trigger
# or non-parentless.
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)

def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
"""Simulation mode: simulate task run times and set states."""
if not self.config.run_mode('simulation'):
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class TaskProxy:
'tokens',
'try_timers',
'waiting_on_job_prep',
'is_xtrigger_sequential',
]

def __init__(
Expand Down Expand Up @@ -214,6 +215,7 @@ def __init__(
task=self.tdef.name,
)
self.identity = self.tokens.relative_id
self.is_xtrigger_sequential = False
self.reload_successor: Optional['TaskProxy'] = None
self.point_as_seconds: Optional[int] = None

Expand Down
1 change: 1 addition & 0 deletions cylc/flow/unicode_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ class XtriggerNameValidator(UnicodeRuleChecker):
RULES = [
allowed_characters(r'a-zA-Z0-9', '_'),
not_starts_with('_cylc'),
not_equals('settings'),
]


Expand Down
18 changes: 16 additions & 2 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ def __init__(
# Signatures of active functions (waiting on callback).
self.active: list = []

# Clock labels, to avoid repeated string comparisons
self.wall_clock_labels: set = set()
# Labels whose xtrigger will be checked out to the RH limit.
self.non_sequential_labels: set = set()
# Gather parentless tasks whose xtrigger(s) have been satisfied
# (these will be used to spawn the next occurance).
self.sequential_spawn_next: set = set()

self.workflow_run_dir = workflow_run_dir

# For function arg templating.
Expand Down Expand Up @@ -316,6 +324,8 @@ def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None:
"""
self.validate_xtrigger(label, fctx, fdir)
self.functx_map[label] = fctx
if fctx.func_name == "wall_clock":
self.wall_clock_labels.add(label)

def mutate_trig(self, label, kwargs):
self.functx_map[label].func_kwargs.update(kwargs)
Expand Down Expand Up @@ -380,7 +390,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:

args = []
kwargs = {}
if ctx.func_name == "wall_clock":
if label in self.wall_clock_labels:
if "trigger_time" in ctx.func_kwargs:
# Internal (retry timer): trigger_time already set.
kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"]
Expand Down Expand Up @@ -420,10 +430,12 @@ def call_xtriggers_async(self, itask: TaskProxy):
itask: task proxy to check.
"""
for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True):
if sig.startswith("wall_clock"):
if label in self.wall_clock_labels:
# Special case: quick synchronous clock check.
if wall_clock(*ctx.func_args, **ctx.func_kwargs):
itask.state.xtriggers[label] = True
if itask.is_xtrigger_sequential:
self.sequential_spawn_next.add(itask.identity)
self.sat_xtrig[sig] = {}
self.data_store_mgr.delta_task_xtrigger(sig, True)
LOG.info('xtrigger satisfied: %s = %s', label, sig)
Expand All @@ -443,6 +455,8 @@ def call_xtriggers_async(self, itask: TaskProxy):
[itask.tdef.name],
xtrigger_env
)
if itask.is_xtrigger_sequential:
self.sequential_spawn_next.add(itask.identity)
continue
if sig in self.active:
# Already waiting on this result.
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/cylc-config/00-simple/section1.stdout
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ runahead limit = P4
clock-expire =
sequential =
[[xtriggers]]
[[[settings]]]
non-sequential xtriggers =
[[graph]]
R1 = OPS:finish-all => VAR

0 comments on commit 209e621

Please sign in to comment.