Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent estimation of task duration between stealing, adaptive and occupancy calculation #9000

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

hendrikmakait
Copy link
Member

We've noticed that stealing could ping pong between two workers if tasks with long execution durations but no average duration were in processing. This PR fixes that.

  • Tests added / passed
  • Passes pre-commit run --all-files

Copy link
Contributor

github-actions bot commented Feb 3, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    27 files  ± 0      27 suites  ±0   11h 33m 27s ⏱️ + 2m 11s
 4 117 tests + 1   4 000 ✅ ± 0    111 💤 ±0  6 ❌ +1 
51 628 runs  +13  49 324 ✅ +14  2 296 💤 ±0  8 ❌  - 1 

For more details on these failures, see this check.

Results for commit 2bd934e. ± Comparison against base commit 5589049.

This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.
distributed.tests.test_scheduler ‑ test_get_task_duration
distributed.tests.test_scheduler ‑ test_get_prefix_duration
distributed.tests.test_steal ‑ test_do_not_ping_pong

♻️ This comment has been updated with latest results.

@hendrikmakait hendrikmakait changed the title Consistent estimation of task duration between stealing and occupancy calculation Consistent estimation of task duration between stealing, adaptive and occupancy calculation Feb 3, 2025
@@ -2536,13 +2545,6 @@ def _transition_processing_memory(
action=startstop["action"],
)

s = self.unknown_durations.pop(ts.prefix.name, set())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this into the stealing plugin.

@@ -1931,22 +1925,37 @@ def total_occupancy(self) -> float:
self._network_occ_global,
)

def _get_prefix_duration(self, prefix: TaskPrefix) -> float:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the single source of truth for the duration estimation.

@@ -1674,9 +1674,6 @@ class SchedulerState:
#: Subset of tasks that exist in memory on more than one worker
replicated_tasks: set[TaskState]

#: Tasks with unknown duration, grouped by prefix
#: {task prefix: {ts, ts, ...}}
unknown_durations: dict[str, set[TaskState]]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been moved into stealing.

@@ -236,8 +236,6 @@ class TaskState:
#: The next state of the task. It is not None iff :attr:`state` == resumed.
next: Literal["fetch", "waiting", None] = None

#: Expected duration of the task
duration: float | None = None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used anywhere.

queued_occupancy += self.UNKNOWN_TASK_DURATION
else:
queued_occupancy += ts.prefix.duration_average
queued_occupancy += self._get_prefix_duration(ts.prefix)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a test for this, but the old version was definitely inconsistent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant