Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task expire optionality. #6020

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
)
from cylc.flow.task_id import TaskID
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TaskOutputs
)
from cylc.flow.task_trigger import TaskTrigger, Dependency
Expand Down Expand Up @@ -1657,7 +1657,7 @@
offset_is_irregular, offset_is_absolute) = (
GraphNodeParser.get_inst().parse(left))

# Qualifier.
# Qualifier. Note ":succeeded" made explicit by the graph parser.
outputs = self.cfg['runtime'][name]['outputs']
if outputs and (output in outputs):
# Qualifier is a custom task message.
Expand All @@ -1668,9 +1668,6 @@
f"Undefined custom output: {name}:{output}"
)
qualifier = output
else:
# No qualifier specified => use "succeeded".
qualifier = TASK_OUTPUT_SUCCEEDED

# Generate TaskTrigger if not already done.
key = (name, offset, qualifier,
Expand Down Expand Up @@ -2136,7 +2133,11 @@

Args:
task_output_opt: {(task, output): (is-optional, default, is_set)}

"""
# task_output_opt: outputs parsed from graph triggers
# taskdef.outputs: outputs listed under runtime

for name, taskdef in self.taskdefs.items():
for output in taskdef.outputs:
try:
Expand All @@ -2146,6 +2147,22 @@
continue
taskdef.set_required_output(output, not optional)

# Clock-expire must be flagged as optional in the graph.
graph_exp = set()
for (task, output) in task_output_opt.keys():
if output == TASK_OUTPUT_EXPIRED:
graph_exp.add(task)
bad_exp = set()
for task in self.expiration_offsets:
if task not in graph_exp:
bad_exp.add(task)

Check warning on line 2158 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L2158

Added line #L2158 was not covered by tests
if bad_exp:
msg = '\n '.join(
[t + f":{TASK_OUTPUT_EXPIRED}?" for t in bad_exp])
raise WorkflowConfigError(

Check warning on line 2162 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L2162

Added line #L2162 was not covered by tests
f"Clock-expire must be flagged as optional: {msg}"
)

def find_taskdefs(self, name: str) -> Set[TaskDef]:
"""Find TaskDef objects in family "name" or matching "name".

Expand Down
4 changes: 3 additions & 1 deletion cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

class Replacement:
"""A class to remember match group information in re.sub() calls"""

def __init__(self, replacement):
self.replacement = replacement
self.substitutions = []
Expand Down Expand Up @@ -747,7 +748,8 @@ def _set_output_opt(

if output == TASK_OUTPUT_EXPIRED and not optional:
raise GraphParseError(
f"Expired-output {name}:{output} must be optional")
f"{output} must be an optional output:"
f" {name}:{output}?")

if output == TASK_OUTPUT_FINISHED:
# Interpret :finish pseudo-output
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
# state transition message reflects the correct submit_num
itask.submit_num += 1
itask.state_reset(TASK_STATUS_PREPARING)
# Avoid clock-expiring an active task:
itask.expire_time = None
Comment on lines +238 to +239
Copy link
Member

@oliver-sanders oliver-sanders Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't handled clock-expiry of active tasks (I'm implementing the opt outputs proposal which is just about outputs).

Presumably if a task (submit-)fails and has [submission retries configured, then it should be allowed to expire between retries?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue for the clock-expire of active tasks thing: #6025

self.data_store_mgr.delta_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
Expand Down
29 changes: 22 additions & 7 deletions cylc/flow/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED)
TASK_OUTPUT_FAILED
)

TASK_OUTPUTS = (
TASK_OUTPUT_EXPIRED,
Expand Down Expand Up @@ -65,12 +66,13 @@ class TaskOutputs:
"""

# Memory optimization - constrain possible attributes to this list.
__slots__ = ["_by_message", "_by_trigger", "_required"]
__slots__ = ["_by_message", "_by_trigger", "_required", "_expire_ok"]

def __init__(self, tdef):
self._by_message = {}
self._by_trigger = {}
self._required = {} # trigger: message
self._expire_ok = True

# Add outputs from task def.
for trigger, (message, required) in tdef.outputs.items():
Expand All @@ -90,21 +92,35 @@ def __init__(self, tdef):
)

def _add(self, message, trigger, is_completed=False, required=False):
"""Add a new output message"""
"""Add a new output message

required:
- True: is required
- False: is optional
- None: not set

"""
self._by_message[message] = [trigger, message, is_completed]
self._by_trigger[trigger] = self._by_message[message]

if (trigger == TASK_OUTPUT_EXPIRED and required is None):
self._expire_ok = False

if required:
self._required[trigger] = message

def set_completed_by_msg(self, message):
"""For flow trigger --wait: set completed outputs from the DB."""
for trig, msg, _ in self._by_trigger.values():
if message == msg:
self._add(message, trig, True, trig in self._required)
self._add(
message, trig, True,
required=(trig in self._required)
)
break

def all_completed(self):
"""Return True if all all outputs completed."""
"""Return True if all outputs completed."""
return all(val[_IS_COMPLETED] for val in self._by_message.values())

def exists(self, message=None, trigger=None):
Expand Down Expand Up @@ -225,7 +241,6 @@ def get_incomplete(self):
"""Return a list of required outputs that are not complete.

A task is incomplete if:

* it finished executing without completing all required outputs
* or if job submission failed and the :submit output was not optional

Expand Down Expand Up @@ -288,7 +303,7 @@ def get_incomplete_implied(self, output: str) -> List[str]:

@staticmethod
def is_valid_std_name(name):
"""Check name is a valid standard output name."""
"""Check name is a valid standard output (including 'expired')."""
return name in SORT_ORDERS

@staticmethod
Expand Down
59 changes: 29 additions & 30 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@
flow_mgr: 'FlowMgr'
) -> None:
self.tokens = tokens
self.config: 'WorkflowConfig' = config
self.config = config
self.stop_point = config.stop_point or config.final_point
self.workflow_db_mgr: 'WorkflowDatabaseManager' = workflow_db_mgr
self.task_events_mgr: 'TaskEventsManager' = task_events_mgr
self.workflow_db_mgr = workflow_db_mgr
self.task_events_mgr = task_events_mgr
self.task_events_mgr.spawn_func = self.spawn_on_output
self.data_store_mgr: 'DataStoreMgr' = data_store_mgr
self.flow_mgr: 'FlowMgr' = flow_mgr
self.data_store_mgr = data_store_mgr
self.flow_mgr = flow_mgr

self.max_future_offset: Optional['IntervalBase'] = None
self._prev_runahead_base_point: Optional['PointBase'] = None
Expand Down Expand Up @@ -1379,25 +1379,19 @@
self.release_runahead_tasks()
return ret

if itask.state(TASK_STATUS_EXPIRED):
self.remove(itask, "expired")
if self.compute_runahead():
self.release_runahead_tasks()
return True

incomplete = itask.state.outputs.get_incomplete()
if incomplete:
# Keep incomplete tasks in the pool.
if output in TASK_STATUSES_FINAL:
# Log based on the output, not the state, to avoid warnings
# due to use of "cylc set" to set internal outputs on an
# already-finished task.
LOG.warning(
f"[{itask}] did not complete required outputs:"
f" {incomplete}"
)
return False
if not itask.is_complete():
# (Includes non-optional expired)
incomplete = itask.state.outputs.get_incomplete()
if incomplete:
# Keep incomplete tasks in the pool.
if output in TASK_STATUSES_FINAL:
LOG.warning(
f"[{itask}] did not complete required outputs:"
f" {incomplete}"
)
return False

# Remove as complete
self.remove(itask)
if self.compute_runahead():
self.release_runahead_tasks()
Expand Down Expand Up @@ -1761,7 +1755,6 @@
- future tasks must be specified individually
- family names are not expanded to members


Uses a transient task proxy to spawn children. (Even if parent was
previously spawned in this flow its children might not have been).

Expand Down Expand Up @@ -1831,9 +1824,15 @@

outputs = sorted(outputs, key=itask.state.outputs.output_sort_key)
for output in outputs:
if itask.state.outputs.is_completed(output):
LOG.info(f"output {itask.identity}:{output} completed already")
continue
if output == TASK_OUTPUT_EXPIRED:
if itask.state(*TASK_STATUSES_ACTIVE):
LOG.warning(f"Can't expire active {itask}")
continue

Check warning on line 1830 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1829-L1830

Added lines #L1829 - L1830 were not covered by tests
elif itask.state.outputs.is_completed(output):
LOG.info(f"output {itask.identity}:{output}"

Check warning on line 1832 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1832

Added line #L1832 was not covered by tests
" completed already")
continue

Check warning on line 1834 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1834

Added line #L1834 was not covered by tests

self.task_events_mgr.process_message(
itask, logging.INFO, output, forced=True)

Expand Down Expand Up @@ -1919,9 +1918,9 @@
return len(bad_items)

def _get_flow_nums(
self,
flow: List[str],
meta: Optional[str] = None,
self,
flow: List[str],
meta: Optional[str] = None,
) -> Optional[Set[int]]:
"""Get correct flow numbers given user command options."""
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
Expand Down
9 changes: 6 additions & 3 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,10 +571,13 @@ def is_finished(self) -> bool:
)

def is_complete(self) -> bool:
"""Return True if complete or expired."""
"""Return True if task is complete."""
return (
self.state(TASK_STATUS_EXPIRED)
or not self.state.outputs.is_incomplete()
not self.state.outputs.is_incomplete()
or (
self.state(TASK_OUTPUT_EXPIRED)
and self.state.outputs._expire_ok
)
)

def set_state_by_outputs(self) -> None:
Expand Down
4 changes: 0 additions & 4 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from cylc.flow.exceptions import TaskDefError
from cylc.flow.task_id import TaskID
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_SUCCEEDED,
Expand Down Expand Up @@ -216,9 +215,6 @@ def tweak_outputs(self):
):
self.set_required_output(TASK_OUTPUT_SUCCEEDED, True)

# Expired must be optional
self.set_required_output(TASK_OUTPUT_EXPIRED, False)

# In Cylc 7 back compat mode, make all success outputs required.
if cylc.flow.flags.cylc7_back_compat:
for output in [
Expand Down
1 change: 0 additions & 1 deletion tests/integration/scripts/test_completion_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ async def test_list_prereqs_and_outputs(flow, scheduler, start):
# list outputs (b1)
assert await _complete_cylc('cylc', 'set', b1.id, '--out', '') == {
# regular task outputs
'expired',
'failed',
'started',
'submit-failed',
Expand Down
7 changes: 7 additions & 0 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ def test_parse_special_tasks_families(flow, scheduler, validate, section):
with pytest.raises(WorkflowConfigError) as exc_ctx:
config = validate(id_)
assert 'external triggers must be used only once' in str(exc_ctx.value)

elif section == 'clock-expire':
with pytest.raises(WorkflowConfigError) as exc_ctx:
config = validate(id_)
assert (
'Clock-expire must be visible in the graph' in str(exc_ctx.value)
)
else:
config = validate(id_)
assert set(config.cfg['scheduling']['special tasks'][section]) == {
Expand Down
1 change: 0 additions & 1 deletion tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ async def test_run_job_cmd_no_hosts_error(

# killing the task should not result in an error...
schd.task_job_mgr.kill_task_jobs(
schd.workflow,
schd.pool.get_tasks()
)

Expand Down
4 changes: 3 additions & 1 deletion tests/integration/tui/screenshots/test_show.success.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">prerequisites: (None) </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">outputs: (&#x27;-&#x27;: not completed) </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> - 1/foo expired </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">outputs: ('-': not completed) </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> - 1/foo submitted </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> - 1/foo submit-failed </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> - 1/foo started </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span>
Expand All @@ -36,6 +37,7 @@
<span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#ffffff;background:#0000ee">quit: </span><span style="color:#00ffff;background:#0000ee">q</span><span style="color:#ffffff;background:#0000ee"> help: </span><span style="color:#00ffff;background:#0000ee">h</span><span style="color:#ffffff;background:#0000ee"> context: </span><span style="color:#00ffff;background:#0000ee">enter</span><span style="color:#ffffff;background:#0000ee"> tree: </span><span style="color:#00ffff;background:#0000ee">- ← + → </span><span style="color:#ffffff;background:#0000ee"> navigation: </span><span style="color:#00ffff;background:#0000ee">↑ ↓ ↥ ↧ Home End </span><span style="color:#ffffff;background:#0000ee"> </span>
<span style="color:#ffffff;background:#0000ee">filter tasks: </span><span style="color:#00ffff;background:#0000ee">T f s r R </span><span style="color:#ffffff;background:#0000ee"> filter workflows: </span><span style="color:#00ffff;background:#0000ee">W E p </span><span style="color:#ffffff;background:#0000ee"> </span>
</pre>
</pre>
Loading