Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional outputs extension #6046

Merged
merged 15 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changes.d/6046.break.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The `submit-fail` and `expire` task outputs must now be
[optional](https://cylc.github.io/cylc-doc/stable/html/glossary.html#term-optional-output)
and can no longer be
[required](https://cylc.github.io/cylc-doc/stable/html/glossary.html#term-required-output).
4 changes: 4 additions & 0 deletions changes.d/6046.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The condition that Cylc uses to evaluate task output completion can now be
customized in the `[runtime]` section with the new `completion` configuration.
This provides a more advanced way to check that tasks generate their required
outputs when run.
136 changes: 136 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,142 @@ def get_script_common_text(this: str, example: Optional[str] = None):
can be explicitly configured to provide or override default
settings for all tasks in the workflow.
'''):
Conf('completion', VDR.V_STRING, desc='''
Define the condition for task output completion.

The completion condition is evaluated when a task reaches
a final state - i.e. once it finished executing (``succeeded``
or ``failed``) or it ``submit-failed``, or ``expired``.
It is a validation check which confirms that the
task has generated the outputs it was expected to.

If the task fails this check its outputs are considered
:term:`incomplete` and a warning will be raised alerting you
that something has gone wrong which requires investigation.

.. note::

An event hook for this warning will follow in a future
release of Cylc.

By default, the completion condition ensures that all required
outputs, i.e. outputs which appear in the graph but are not
marked as optional with the ``?`` character, are completed.

E.g., in this example, the task ``foo`` must generate the
required outputs ``succeeded`` and ``x`` and it may or may not
generate the optional output ``y``:

.. code-block:: cylc-graph

foo => bar
foo:x => x
foo:y? => y
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

The default completion condition would be this:

.. code-block:: python

# the task must succeed and generate the custom output "x"
succeeded and x

You can override this default to suit your needs. E.g., in this
example, the task ``foo`` has three optional outputs, ``x``,
``y`` and ``z``:

.. code-block:: cylc-graph

foo:x? => x
foo:y? => y
foo:z? => z
x | y | z => bar

Because all three of these outputs are optional, if none of
them are generated, the task's outputs will still be
considered complete.

If you wanted to require that at least one of these outputs is
generated you can configure the completion condition like so:

.. code-block:: python

# the task must succeed and generate at least one of the
# outputs "x" or "y" or "z":
succeeded and (x or y or z)

.. note::

For the completion expression, hyphens in task outputs
must be replaced with underscores to allow evaluation by
Python, e.g.:

.. code-block:: cylc

[runtime]
[[foo]]
completion = succeeded and my_output # underscore
[[[outputs]]]
my-output = 'my custom task output' # hyphen

.. note::

In some cases the ``succeeded`` output might not explicitly
appear in the graph, e.g:

.. code-block:: cylc-graph

foo:x? => x

In these cases succeess is presumed to be required unless
explicitly stated otherwise, either in the graph:

.. code-block:: cylc-graph

foo?
foo:x? => x

Or in the completion expression:
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: cylc

completion = x # no reference to succeeded here


.. hint::

If task outputs are optional in the graph they must also
be optional in the completion condition and vice versa.

.. code-block:: cylc

[scheduling]
[[graph]]
R1 = """
# ERROR: this should be "a? => b"
a => b
"""
[runtime]
[[a]]
# this completion condition implies that the
# succeeded output is optional
completion = succeeded or failed
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved

.. rubric:: Examples

``succeeded``
The task must succeed.
``succeeded or (failed and my_error)``
The task can fail, but only if it also yields the custom
output ``my_error``.
Comment on lines +1131 to +1133
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output could be either my_error or my-error couldn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

``succeeded and (x or y or z)``
The task must succeed and yield at least one of the
custom outputs, x, y or z.
``(a and b) or (c and d)``
One pair of these outputs must be yielded for the task
to be complete.

.. versionadded:: 8.3.0
''')
Conf('platform', VDR.V_STRING, desc='''
The name of a compute resource defined in
:cylc:conf:`global.cylc[platforms]` or
Expand Down
212 changes: 211 additions & 1 deletion cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,13 @@
)
from cylc.flow.task_id import TaskID
from cylc.flow.task_outputs import (
TASK_OUTPUT_FAILED,
TASK_OUTPUT_FINISHED,
TASK_OUTPUT_SUCCEEDED,
TaskOutputs
TaskOutputs,
get_completion_expression,
get_optional_outputs,
get_trigger_completion_variable_maps,
)
from cylc.flow.task_trigger import TaskTrigger, Dependency
from cylc.flow.taskdef import TaskDef
Expand Down Expand Up @@ -520,6 +525,8 @@
self.load_graph()
self.mem_log("config.py: after load_graph()")

self._set_completion_expressions()

self.process_runahead_limit()

run_mode = self.run_mode()
Expand Down Expand Up @@ -1008,6 +1015,209 @@
)
LOG.warning(msg)

def _set_completion_expressions(self):
"""Sets and checks completion expressions for each task.

If a task does not have a user-defined completion expression, then set
one according to the default rules.

If a task does have a used-defined completion expression, then ensure
it is consistent with the use of outputs in the graph.
"""
for name, taskdef in self.taskdefs.items():
expr = taskdef.rtconfig['completion']
if expr:
# check the user-defined expression
self._check_completion_expression(name, expr)
else:
# derive a completion expression for this taskdef
expr = get_completion_expression(taskdef)

if name not in self.taskdefs:
# this is a family -> nothing more to do here
continue

Check warning on line 1038 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1038

Added line #L1038 was not covered by tests
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

# update both the sparse and dense configs to make these values
# visible to "cylc config" to make the completion expression more
# transparent to users.
# NOTE: we have to update both because we are setting this value
# late on in the process after the dense copy has been made
self.pcfg.sparse.setdefault(
'runtime', {}
).setdefault(
name, {}
)['completion'] = expr
self.pcfg.dense['runtime'][name]['completion'] = expr

# update the task's runtime config to make this value visible to
# the data store
# NOTE: we have to do this because we are setting this value late
# on after the TaskDef has been created
taskdef.rtconfig['completion'] = expr

def _check_completion_expression(self, task_name: str, expr: str) -> None:
"""Checks a user-defined completion expression.

Args:
task_name:
The name of the task we are checking.
expr:
The completion expression as defined in the config.

"""
# check completion expressions are not being used in compat mode
if cylc.flow.flags.cylc7_back_compat:
raise WorkflowConfigError(

Check warning on line 1070 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1070

Added line #L1070 was not covered by tests
'[runtime][<namespace>]completion cannot be used'
' in Cylc 7 compatibility mode.'
)

# check for invalid triggers in the expression
if 'submit-failed' in expr:
raise WorkflowConfigError(

Check warning on line 1077 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1077

Added line #L1077 was not covered by tests
f'Error in [runtime][{task_name}]completion:'
f'\nUse "submit_failed" rather than "submit-failed"'
' in completion expressions.'
)
elif '-' in expr:
raise WorkflowConfigError(

Check warning on line 1083 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1083

Added line #L1083 was not covered by tests
f'Error in [runtime][{task_name}]completion:'
f'\n {expr}'
'\nReplace hyphens with underscores in task outputs when'
' used in completion expressions.'
)

# get the outputs and completion expression for this task
try:
outputs = self.taskdefs[task_name].outputs
except KeyError:

Check warning on line 1093 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1093

Added line #L1093 was not covered by tests
# this is a family -> we'll check integrity for each task that
# inherits from it
return

Check warning on line 1096 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1096

Added line #L1096 was not covered by tests

(
trigger_to_completion_variable,
completion_variable_to_trigger,
) = get_trigger_completion_variable_maps(outputs.keys())

# get the optional/required outputs defined in the graph
graph_optionals = {
# completion_variable: is_optional
trigger_to_completion_variable[trigger]: (
None if is_required is None else not is_required
)
for trigger, (_, is_required)
in outputs.items()
}
if (
graph_optionals[TASK_OUTPUT_SUCCEEDED] is True
and graph_optionals[TASK_OUTPUT_FAILED] is None
):
# failed is implicitly optional if succeeded is optional
# https://github.com/cylc/cylc-flow/pull/6046#issuecomment-2059266086
graph_optionals[TASK_OUTPUT_FAILED] = True

Check warning on line 1118 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1118

Added line #L1118 was not covered by tests

# get the optional/required outputs defined in the expression
try:
# this involves running the expression which also validates it
expression_optionals = get_optional_outputs(expr, outputs)
except NameError as exc:
# expression references an output which has not been registered
error = exc.args[0][5:]

Check warning on line 1126 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1126

Added line #L1126 was not covered by tests

if f"'{TASK_OUTPUT_FINISHED}'" in error:
# the finished output cannot be used in completion expressions
# see proposal point 5::
# https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal
raise WorkflowConfigError(

Check warning on line 1132 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1132

Added line #L1132 was not covered by tests
f'Error in [runtime][{task_name}]completion:'
f'\n {expr}'
'\nThe "finished" output cannot be used in completion'
' expressions, use "succeeded or failed".'
)

raise WorkflowConfigError(

Check warning on line 1139 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1139

Added line #L1139 was not covered by tests
# NOTE: str(exc) == "name 'x' is not defined" tested in
# tests/integration/test_optional_outputs.py
f'Error in [runtime][{task_name}]completion:'
f'\nInput {error}'
)
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
except Exception as exc: # includes InvalidCompletionExpression

Check warning on line 1145 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1145

Added line #L1145 was not covered by tests
# expression contains non-whitelisted syntax or any other error in
# the expression e.g. SyntaxError
raise WorkflowConfigError(

Check warning on line 1148 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1148

Added line #L1148 was not covered by tests
f'Error in [runtime][{task_name}]completion:'
f'\n{str(exc)}'
)

# ensure consistency between the graph and the completion expression
for compvar in (
{
*graph_optionals,
*expression_optionals
}
):
# is the output optional in the graph?
graph_opt = graph_optionals.get(compvar)
# is the output optional in the completion expression?
expr_opt = expression_optionals.get(compvar)

# True = is optional
# False = is required
# None = is not referenced

# graph_opt expr_opt
# True True ok
# True False not ok
# True None not ok [1]
# False True not ok [1]
# False False ok
# False None not ok
# None True ok
# None False ok
# None None ok

# [1] applies only to "submit-failed" and "expired"

trigger = completion_variable_to_trigger[compvar]

if graph_opt is True and expr_opt is False:
raise WorkflowConfigError(

Check warning on line 1185 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1185

Added line #L1185 was not covered by tests
f'{task_name}:{trigger} is optional in the graph'
' (? symbol), but required in the completion'
f' expression:\n{expr}'
)

if graph_opt is False and expr_opt is None:
raise WorkflowConfigError(

Check warning on line 1192 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1192

Added line #L1192 was not covered by tests
f'{task_name}:{trigger} is required in the graph,'
' but not referenced in the completion'
f' expression\n{expr}'
)

if (
graph_opt is True
and expr_opt is None
and compvar in {'submit_failed', 'expired'}
):
raise WorkflowConfigError(

Check warning on line 1203 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1203

Added line #L1203 was not covered by tests
f'{task_name}:{trigger} is permitted in the graph'
' but is not referenced in the completion'
' expression (so is not permitted by it).'
f'\nTry: completion = "{expr} or {compvar}"'
)

if (
graph_opt is False
and expr_opt is True
and compvar not in {'submit_failed', 'expired'}
):
raise WorkflowConfigError(

Check warning on line 1215 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L1215

Added line #L1215 was not covered by tests
f'{task_name}:{trigger} is required in the graph,'
' but optional in the completion expression'
f'\n{expr}'
)

def _expand_name_list(self, orig_names):
"""Expand any parameters in lists of names."""
name_expander = NameExpander(self.parameters)
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ message PbRuntime {
optional string directives = 15;
optional string environment = 16;
optional string outputs = 17;
optional string completion = 18;
}


Expand Down
Loading
Loading