diff --git a/changes.d/6448.fix.md b/changes.d/6448.fix.md new file mode 100644 index 0000000000..56a0a44f3c --- /dev/null +++ b/changes.d/6448.fix.md @@ -0,0 +1 @@ +Fix the non-spawning of parentless sequential xtriggered tasks when outputs are set. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 6d320a7301..6a83adefbd 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1948,6 +1948,9 @@ def set_prereqs_and_outputs( if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) else: + # Spawn as if seq xtrig of parentless task was satisfied, + # with associated task producing these outputs. + self.check_spawn_psx_task(itask) self._set_outputs_itask(itask, outputs) # Spawn and set inactive tasks. @@ -2071,21 +2074,7 @@ def remove_tasks(self, items): itasks, _, bad_items = self.filter_task_proxies(items) for itask in itasks: # Spawn next occurrence of xtrigger sequential task. - if ( - itask.is_xtrigger_sequential - and ( - itask.identity not in - self.xtrigger_mgr.sequential_has_spawned_next - ) - ): - self.xtrigger_mgr.sequential_has_spawned_next.add( - itask.identity - ) - self.spawn_to_rh_limit( - itask.tdef, - itask.tdef.next_point(itask.point), - itask.flow_nums - ) + self.check_spawn_psx_task(itask) self.remove(itask, 'request') if self.compute_runahead(): self.release_runahead_tasks() @@ -2134,26 +2123,12 @@ def _force_trigger(self, itask): itask.tdef.next_point(itask.point), itask.flow_nums ) - # Task may be set running before xtrigger is satisfied, - # if so check/spawn if xtrigger sequential. - elif ( - itask.is_xtrigger_sequential - and ( - itask.identity not in - self.xtrigger_mgr.sequential_has_spawned_next - ) - ): - self.xtrigger_mgr.sequential_has_spawned_next.add( - itask.identity - ) - self.spawn_to_rh_limit( - itask.tdef, - itask.tdef.next_point(itask.point), - itask.flow_nums - ) else: # De-queue it to run now. self.task_queue_mgr.force_release_task(itask) + # Task may be set running before xtrigger is satisfied, + # if so check/spawn if xtrigger sequential. + self.check_spawn_psx_task(itask) def force_trigger_tasks( self, items: Iterable[str], @@ -2237,10 +2212,23 @@ def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" while self.xtrigger_mgr.sequential_spawn_next: taskid = self.xtrigger_mgr.sequential_spawn_next.pop() - self.xtrigger_mgr.sequential_has_spawned_next.add(taskid) itask = self._get_task_by_id(taskid) - # Will spawn out to RH limit or next parentless clock trigger - # or non-parentless. + self.check_spawn_psx_task(itask) + + def check_spawn_psx_task(self, itask: 'TaskProxy') -> None: + """Check and spawn parentless sequential xtriggered task (psx).""" + # Will spawn out to RH limit or next parentless clock trigger + # or non-parentless. + if ( + itask.is_xtrigger_sequential + and ( + itask.identity not in + self.xtrigger_mgr.sequential_has_spawned_next + ) + ): + self.xtrigger_mgr.sequential_has_spawned_next.add( + itask.identity + ) self.spawn_to_rh_limit( itask.tdef, itask.tdef.next_point(itask.point), diff --git a/tests/integration/test_sequential_xtriggers.py b/tests/integration/test_sequential_xtriggers.py index 1c98437bc7..4ef563a1c5 100644 --- a/tests/integration/test_sequential_xtriggers.py +++ b/tests/integration/test_sequential_xtriggers.py @@ -104,6 +104,26 @@ async def test_trigger(sequential, start): assert list_cycles(sequential) == ['2000', '2001'] +async def test_set(sequential, start): + """It should spawn its next instance if outputs are set ahead of time. + + If you set outputs of a sequentially spawned task before its xtriggers + have become satisfied, then the sequential spawning chain is broken. + + The task pool should defend against this to ensure that setting outputs + doesn't cancel it's future instances and their downstream tasks. + """ + async with start(sequential): + assert list_cycles(sequential) == ['2000'] + + foo = sequential.pool.get_task(ISO8601Point('2000'), 'foo') + # set foo:succeeded it should spawn next instance + sequential.pool.set_prereqs_and_outputs( + ["2000/foo"], ["succeeded"], None, ['all']) + + assert list_cycles(sequential) == ['2001'] + + async def test_reload(sequential, start): """It should set the is_xtrigger_sequential flag on reload.