Skip to content

Commit

Permalink
Tweak code comment.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jan 25, 2024
1 parent a454a06 commit b7b4019
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,8 +876,12 @@ def _load_task_run_times(self, row_idx, row):
return

def process_queued_task_messages(self) -> None:
"""Handle incoming task messages for each task proxy."""
"""Process incoming task messages for each task proxy.
"""
messages: 'Dict[str, List[Tuple[Optional[int], TaskMsg]]]' = {}

# Retrieve queued messages
while self.message_queue.qsize():
try:
task_msg = self.message_queue.get(block=False)
Expand All @@ -893,9 +897,8 @@ def process_queued_task_messages(self) -> None:
messages[task_id].append(
(job, task_msg)
)
# Note on to_poll_tasks: If an incoming message is going to cause a
# reverse change to task state, it is desirable to confirm this by
# polling.

# Poll tasks for which messages caused a backward state change.
to_poll_tasks = []
for itask in self.pool.get_tasks():
message_items = messages.get(itask.identity)
Expand All @@ -913,8 +916,10 @@ def process_queued_task_messages(self) -> None:
to_poll_tasks.append(itask)
if to_poll_tasks:
self.task_job_mgr.poll_task_jobs(self.workflow, to_poll_tasks)
# Remaining messages don't have a receiving task in the pool.
# E.g., after manually setting a running task to finished.

# Remaining unprocessed messages have no corresponding task proxy.
# For example, if I manually set a running task to succeeded, the
# proxy can be removed, but the orphaned job still sends messages.
for _id, tms in messages.items():
warn = "Undeliverable task messages received and ignored:"
for _, msg in tms:
Expand Down

0 comments on commit b7b4019

Please sign in to comment.