Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log job failure even when there are retries configured #6169

Open
wants to merge 8 commits into
base: 8.3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6169.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that job submit/failure is logged, even when retries are planned.
31 changes: 24 additions & 7 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ def process_message(
# Already failed.
return True
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
wxtim marked this conversation as resolved.
Show resolved Hide resolved
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -795,7 +795,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": signal})
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
):
wxtim marked this conversation as resolved.
Show resolved Hide resolved
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand All @@ -812,7 +812,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": aborted_with})
if self._process_message_failed(
itask, event_time, aborted_with, forced
itask, event_time, aborted_with, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -928,11 +928,15 @@ def _process_message_check(
return False

severity_lvl: int = LOG_LEVELS.get(severity, INFO)
# Don't log submit/failure messages here:
if flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
}:
return True
# Demote log level to DEBUG if this is a message that duplicates what
# gets logged by itask state change anyway (and not manual poll)
if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
}:
severity_lvl = DEBUG
LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}")
Expand Down Expand Up @@ -1297,10 +1301,17 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
if itask.state_reset(TASK_STATUS_WAITING):
self.data_store_mgr.delta_task_state(itask)

def _process_message_failed(self, itask, event_time, message, forced):
def _process_message_failed(
self, itask, event_time, message, forced, full_message
):
"""Helper for process_message, handle a failed message.

Return True if no retries (hence go to the failed state).

Args:
full_message:
If we have retries lined up we still tell users what
happened to cause the this attempt to fail.
"""
no_retries = False
if event_time is None:
Expand All @@ -1327,12 +1338,18 @@ def _process_message_failed(self, itask, event_time, message, forced):
self.data_store_mgr.delta_task_output(
itask, TASK_OUTPUT_FAILED)
self.data_store_mgr.delta_task_state(itask)
LOG.error(
f'[{itask}] {full_message or self.EVENT_FAILED}'
)
else:
# There is an execution retry lined up.
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
self._retry_task(itask, timer.timeout)
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
LOG.warning(f"[{itask}] {delay_msg}")
LOG.warning(
f'[{itask}] {full_message or self.EVENT_FAILED} - '
f'{delay_msg}'
)
msg = f"{self.JOB_FAILED}, {delay_msg}"
self.setup_event_handlers(itask, self.EVENT_RETRY, msg)
self._reset_job_timers(itask)
Expand Down Expand Up @@ -1404,7 +1421,6 @@ def _process_message_submit_failed(
Return True if no retries (hence go to the submit-failed state).
"""
no_retries = False
LOG.critical(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
if event_time is None:
event_time = get_current_time_string()
self.workflow_db_mgr.put_update_task_jobs(itask, {
Expand All @@ -1430,6 +1446,7 @@ def _process_message_submit_failed(
self.data_store_mgr.delta_task_output(
itask, TASK_OUTPUT_SUBMIT_FAILED)
self.data_store_mgr.delta_task_state(itask)
LOG.error(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
else:
# There is a submission retry lined up.
timer = itask.try_timers[TimerFlags.SUBMISSION_RETRY]
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-remove/00-simple/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
script = false
[[cleaner]]
script = """
cylc__job__poll_grep_workflow_log -E '1/b/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/b/01.* failed'
# Remove the unhandled failed task
cylc remove "$CYLC_WORKFLOW_ID//1/b"
# Remove waiting 1/c
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/cylc-remove/02-cycling/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
[runtime]
[[remover]]
script = """
cylc__job__poll_grep_workflow_log -E '2020/bar/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '2021/baz/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '2020/bar/01.* failed'
cylc__job__poll_grep_workflow_log -E '2021/baz/01.* failed'
# Remove the two unhandled failed tasks.
cylc remove "$CYLC_WORKFLOW_ID//*/ba*:failed"
# Remove the two unsatisfied waiting tasks.
Expand Down
6 changes: 3 additions & 3 deletions tests/functional/cylc-trigger/02-filter-failed/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
[[fixer]]
script = """
cylc__job__wait_cylc_message_started
cylc__job__poll_grep_workflow_log -E '1/fixable1/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable2/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable3/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '\[1/fixable1/01:failed\] failed/ERR'
cylc__job__poll_grep_workflow_log -E '\[1/fixable2/01:failed\] failed/ERR'
cylc__job__poll_grep_workflow_log -E '\[1/fixable3/01:failed\] failed/ERR'
cylc trigger "${CYLC_WORKFLOW_ID}//1/fixable*"
"""
[[Z]]
Expand Down
10 changes: 5 additions & 5 deletions tests/functional/cylc-trigger/04-filter-names/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
[[fixer]]
script = """
cylc__job__wait_cylc_message_started
cylc__job__poll_grep_workflow_log -E '1/fixable-1a/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-1b/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-2a/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-2b/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-3/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-1a/01.* failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-1b/01.* failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-2a/01.* failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-2b/01.* failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-3/01.* failed'
cylc trigger "${CYLC_WORKFLOW_ID}//" \
'//1/FIXABLE-1' '//1/fixable-2*' '//1/fixable-3'
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/hold-release/11-retrying/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ t-retry-able => t-analyse
[[t-hold-release]]
script = """
cylc__job__poll_grep_workflow_log -E \
'1/t-retry-able/01:running.* \(received\)failed'
'\[1/t-retry-able:waiting\] failed/ERR'

cylc__job__poll_grep_workflow_log -E \
'1/t-retry-able/01:running.* => waiting'
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/reload/10-runahead.t
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ run_fail "${TEST_NAME}" cylc play --debug --no-detach "${WORKFLOW_NAME}"
#-------------------------------------------------------------------------------
TEST_NAME=${TEST_NAME_BASE}-check-fail
DB_FILE="$RUN_DIR/${WORKFLOW_NAME}/log/db"
QUERY='SELECT COUNT(*) FROM task_states WHERE status == "failed"'
QUERY="SELECT COUNT(*) FROM task_states WHERE status == 'failed'"
cmp_ok <(sqlite3 "$DB_FILE" "$QUERY") <<< "4"
#-------------------------------------------------------------------------------
purge
4 changes: 2 additions & 2 deletions tests/functional/reload/25-xtriggers.t
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[[reload]]
script = """
# wait for "broken" to fail
cylc__job__poll_grep_workflow_log -E '1/broken/01.* \(received\)failed/ERR'
cylc__job__poll_grep_workflow_log -E '1/broken.*failed/ERR'
# fix "broken" to allow it to pass
sed -i 's/false/true/' "${CYLC_WORKFLOW_RUN_DIR}/flow.cylc"
# reload the workflow
Expand All @@ -63,7 +63,7 @@ workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" --no-detach
log_scan "${TEST_NAME_BASE}-scan" \
"$(cylc cat-log -m p "${WORKFLOW_NAME}")" \
1 1 \
'1/broken.* (received)failed/ERR'
'1/broken.*failed/ERR'

log_scan "${TEST_NAME_BASE}-scan" \
"$(cylc cat-log -m p "${WORKFLOW_NAME}")" 1 1 \
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/reload/runahead/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
script = true
[[reloader]]
script = """
cylc__job__poll_grep_workflow_log -E "${CYLC_TASK_CYCLE_POINT}/foo/01:running.*\(received\)failed"
cylc__job__poll_grep_workflow_log -E "${CYLC_TASK_CYCLE_POINT}/foo/01:running.*failed"
perl -pi -e 's/(runahead limit = )P1( # marker)/\1 P3\2/' $CYLC_WORKFLOW_RUN_DIR/flow.cylc
cylc reload $CYLC_WORKFLOW_ID
"""
2 changes: 1 addition & 1 deletion tests/functional/spawn-on-demand/10-retrigger/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""
[[triggerer]]
script = """
cylc__job__poll_grep_workflow_log -E '1/oops/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/oops/01.* failed'
cylc trigger "${CYLC_WORKFLOW_ID}//1/oops"
"""
[[foo, bar]]
2 changes: 1 addition & 1 deletion tests/functional/triggering/19-and-suicide/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
[[t0]]
# https://github.com/cylc/cylc-flow/issues/2655
# "1/t2" should not suicide on "1/t1:failed"
script = cylc__job__poll_grep_workflow_log -E '1/t1.* \(received\)failed'
script = cylc__job__poll_grep_workflow_log -E '1/t1.* failed'
[[t1]]
script = false
[[t2]]
Expand Down
65 changes: 55 additions & 10 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from itertools import product
import logging
from typing import Any as Fixture

from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler
from cylc.flow.data_store_mgr import (
JOBS,
TASK_STATUSES_ORDERED,
TASK_STATUS_WAITING,
TASK_STATUS_SUBMIT_FAILED,
)


Expand Down Expand Up @@ -79,17 +76,22 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate):
with correct submit number.
"""
conf = {
'scheduling': {'graph': {'R1': 'rhenas'}},
'runtime': {'rhenas': {'simulation': {
'fail cycle points': '1',
'fail try 1 only': False,
}}}}
"scheduling": {"graph": {"R1": "rhenas"}},
"runtime": {
"rhenas": {
"simulation": {
"fail cycle points": "1",
"fail try 1 only": False,
}
}
},
}
id_ = flow(conf)
schd = scheduler(id_)
async with start(schd):
# Set task to running:
itask = schd.pool.get_tasks()[0]
itask.state.status = 'running'
itask = schd.pool.get_tasks()[0]
itask.state.status = "running"
itask.submit_num += 1

# Not run _insert_task_job yet:
Expand Down Expand Up @@ -170,3 +172,46 @@ async def test__always_insert_task_job(
'1/broken/01': 'submit-failed',
'1/broken2/01': 'submit-failed'
}


async def test__process_message_failed_with_retry(one, start):
"""Log job failure, even if a retry is scheduled.

See: https://github.com/cylc/cylc-flow/pull/6169

"""

async with start(one) as LOG:
fail_once = one.pool.get_tasks()[0]
# Add retry timers:
one.task_job_mgr._set_retry_timers(
fail_once, {
'execution retry delays': [1],
'submission retry delays': [1]
})

# Process submit failed message with and without retries:
one.task_events_mgr._process_message_submit_failed(
fail_once, None, 1, False)
last_record = LOG.records[-1]
assert last_record.levelno == logging.WARNING
assert '1/one:waiting(queued)' in last_record.message

one.task_events_mgr._process_message_submit_failed(
fail_once, None, 2, False)
last_record = LOG.records[-1]
assert last_record.levelno == logging.ERROR
assert 'submission failed' in last_record.message

# Process failed message with and without retries:
one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')
last_record = LOG.records[-1]
assert last_record.levelno == logging.WARNING
assert 'failed/OOK' in last_record.message

one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')
last_record = LOG.records[-1]
assert last_record.levelno == logging.ERROR
assert 'failed/OOK' in last_record.message
Loading