Skip to content

Commit

Permalink
Graduated force-triggering (prereqs, xtriggers, queues).
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jun 14, 2023
1 parent b153e3e commit 2fd944d
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 27 deletions.
89 changes: 66 additions & 23 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class TaskPool:
ERR_TMPL_NO_TASKID_MATCH = "No matching tasks found: {0}"
ERR_PREFIX_TASK_NOT_ON_SEQUENCE = "Invalid cycle point for task: {0}, {1}"
SUICIDE_MSG = "suicide"
FORCE_TRIGGER_MSG = (
"Trigger me again to run immediately, if I have other constraints"
)

def __init__(
self,
Expand Down Expand Up @@ -682,6 +685,12 @@ def rh_release_and_queue(self, itask) -> None:
"""
if itask.state_reset(is_runahead=False):
self.data_store_mgr.delta_task_runahead(itask)
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)

if all(itask.is_ready_to_run()):
# (otherwise waiting on xtriggers etc.)
self.queue_task(itask)
Expand Down Expand Up @@ -1642,14 +1651,24 @@ def force_trigger_tasks(
flow_wait: bool = False,
flow_descr: Optional[str] = None
) -> int:
"""Manual task triggering.
"""Forced (manual) task triggering.
Force-triggering a future (n>0) task:
- spawns it into n=0 with prerequisites satisfied
(and with the given or default flow numbers)
Force-triggering an n=0 task:
- satisfies its prerequisites, if any are unsatisfied;
- else satisfies its xtriggers, if any are unsatisfied;
- else queues it, if not already queued;
- else releases it to run, if queued.
Don't get a new flow number for existing n=0 tasks (e.g. incomplete
tasks). These can carry on in the original flow if retriggered.
This always releases tasks from runahead limiting.
Queue the task if not queued, otherwise release it to run.
TODO - extend to push ext-triggers.
"""
# Process provided flow options.
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
if len(flow) != 1:
LOG.warning(
Expand All @@ -1672,16 +1691,15 @@ def force_trigger_tasks(
)
return 0

# n_warnings, task_items = self.match_taskdefs(items)
# Attempt to match target tasks.
itasks, future_tasks, unmatched = self.filter_task_proxies(
items,
future=True,
warn=False,
)

# Spawn future tasks.
for name, point in future_tasks:
# (Flow values already validated by the trigger client).
# Trigger future_tasks with prerequisites satisfied.
itask = self.spawn_task(
name,
point,
Expand All @@ -1692,32 +1710,57 @@ def force_trigger_tasks(
)
if itask is None:
continue
itasks.append(itask)
itask.is_manual_submit = True
LOG.info(
f"[{itask}] - forcing prerequisites"
f"\n{self.__class__.FORCE_TRIGGER_MSG}"
)
itask.state.set_prerequisites_all_satisfied()
self.add_to_pool(itask)
self.rh_release_and_queue(itask)

# Trigger matched tasks if not already active.
for itask in itasks:
# Trigger existing n=0 tasks.
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
continue

itask.is_manual_submit = True
itask.reset_try_timers()
# (If None, spawner reports cycle bounds errors).
if itask.state_reset(TASK_STATUS_WAITING):
# (could also be unhandled failed)
self.data_store_mgr.delta_task_state(itask)
# (No need to set prerequisites satisfied here).
self.add_to_pool(itask) # move from hidden if necessary.

if itask.state.is_runahead:
# Release from runahead, and queue it.
self.rh_release_and_queue(itask)
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums

if itask.state_reset(TASK_STATUS_WAITING):
# We're retriggering an incomplete task.
self.data_store_mgr.delta_task_state(itask)
itask.reset_try_timers()

if itask.is_task_prereqs_not_done():
LOG.info(
f"[{itask}] - forcing prerequisites"
f"\n{self.__class__.FORCE_TRIGGER_MSG}"
)
else:
# De-queue it to run now.
self.task_queue_mgr.force_release_task(itask)
itask.state.set_prerequisites_all_satisfied()
continue

if (
itask.state.xtriggers and
not itask.state.xtriggers_all_satisfied()
):
LOG.info(
f"[{itask}] - forcing xtriggers"
f"\n{self.__class__.FORCE_TRIGGER_MSG}"
)
# TODO - does xtrigger manager need to know?
itask.state.xtriggers_set_all_satisfied()
continue

LOG.info(
f"[{itask}] - forcing queue release"
f"\n{self.__class__.FORCE_TRIGGER_MSG}"
)
self.task_queue_mgr.force_release_task(itask)

return len(unmatched)

Expand Down
5 changes: 1 addition & 4 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,9 @@ def is_ready_to_run(self) -> Tuple[bool, ...]:
"""Is this task ready to run?
Takes account of all dependence: on other tasks, xtriggers, and
old-style ext-triggers. Or, manual triggering.
old-style ext-triggers.
"""
if self.is_manual_submit:
# Manually triggered, ignore unsatisfied prerequisites.
return (True,)
if self.state.is_held:
# A held task is not ready to run.
return (False,)
Expand Down
5 changes: 5 additions & 0 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ def xtriggers_all_satisfied(self):
"""Return True if all xtriggers are satisfied."""
return all(self.xtriggers.values())

def xtriggers_set_all_satisfied(self):
"""Set all xtriggers to satisfied."""
for xtrig in self.xtriggers.keys():
self.xtriggers[xtrig] = True

def external_triggers_all_satisfied(self):
"""Return True if all external triggers are satisfied."""
return all(self.external_triggers.values())
Expand Down

0 comments on commit 2fd944d

Please sign in to comment.