From 505b7d4e7f41cfc0d1752ff633c2752868efe131 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 25 Mar 2024 10:43:25 +0000 Subject: [PATCH] clock-expire: exclude active tasks * Active tasks should not be considered for clock-expiry. Closes https://github.com/cylc/cylc-flow/issues/6025 * Manually triggered tasks should not be considered for clock-expiry. Implements proposal point 10 https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal --- cylc/flow/task_pool.py | 23 ++++++-- tests/integration/test_optional_outputs.py | 63 ++++++++++++++++++++++ 2 files changed, 82 insertions(+), 4 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 1c6eeb17272..68aa79ccaf0 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2161,10 +2161,25 @@ def spawn_parentless_sequential_xtriggers(self): def clock_expire_tasks(self): """Expire any tasks past their clock-expiry time.""" for itask in self.get_tasks(): - if not itask.clock_expire(): - continue - self.task_events_mgr.process_message( - itask, logging.WARNING, TASK_OUTPUT_EXPIRED) + if ( + # force triggered tasks can not clock-expire + # see proposal point 10: + # https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal + not itask.is_manual_submit + + # only waiting tasks can clock-expire + # see https://github.com/cylc/cylc-flow/issues/6025 + # (note retrying tasks will be in the waiting state) + and itask.state(TASK_STATUS_WAITING) + + # check if this task is clock expired + and itask.clock_expire() + ): + self.task_events_mgr.process_message( + itask, + logging.WARNING, + TASK_OUTPUT_EXPIRED, + ) def task_succeeded(self, id_): """Return True if task with id_ is in the succeeded state.""" diff --git a/tests/integration/test_optional_outputs.py b/tests/integration/test_optional_outputs.py index d897fc4ce6d..afab3d9e8fa 100644 --- a/tests/integration/test_optional_outputs.py +++ b/tests/integration/test_optional_outputs.py @@ -44,6 +44,7 @@ TASK_STATUS_EXPIRED, TASK_STATUS_PREPARING, TASK_STATUS_WAITING, + TASK_STATUSES_ACTIVE, ) if TYPE_CHECKING: @@ -378,3 +379,65 @@ async def test_clock_expire_partially_satisfied_task( # the task should now be in the expired state assert e.state(TASK_STATUS_EXPIRED) + + +async def test_clock_expiry( + flow, + scheduler, + start, +): + """Waiting tasks should be considered for clock-expiry. + + Tests two things: + + * Manually triggered tasks should not be considered for clock-expiry. + + Tests proposal point 10: + https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal + + * Active tasks should not be considered for clock-expiry. + + Closes https://github.com/cylc/cylc-flow/issues/6025 + """ + id_ = flow({ + 'scheduling': { + 'initial cycle point': '2000', + 'runahead limit': 'P1', + 'special tasks': { + 'clock-expire': 'x' + }, + 'graph': { + 'P1Y': 'x' + }, + }, + }) + schd = scheduler(id_) + async with start(schd): + # the first task (waiting) + one = schd.pool.get_task(ISO8601Point('20000101T0000Z'), 'x') + assert one + + # the second task (preparing) + two = schd.pool.get_task(ISO8601Point('20010101T0000Z'), 'x') + assert two + two.state_reset(TASK_STATUS_PREPARING) + + # the third task (force-triggered) + schd.pool.force_trigger_tasks(['20100101T0000Z/x'], ['1']) + three = schd.pool.get_task(ISO8601Point('20100101T0000Z'), 'x') + assert three + + # check for expiry + schd.pool.clock_expire_tasks() + + # the first task should be expired (it was waiting) + assert one.state(TASK_STATUS_EXPIRED) + assert one.state.outputs.is_message_complete(TASK_OUTPUT_EXPIRED) + + # the second task should *not* be expired (it was active) + assert not two.state(TASK_STATUS_EXPIRED) + assert not two.state.outputs.is_message_complete(TASK_OUTPUT_EXPIRED) + + # the third task should *not* be expired (it was a manual submit) + assert not three.state(TASK_STATUS_EXPIRED) + assert not three.state.outputs.is_message_complete(TASK_OUTPUT_EXPIRED)