Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-123899: Add timeout argument for notify_all/notify method in Condition class in threading/multiprocessing module #125578

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Doc/library/threading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ item to the buffer only needs to wake up one consumer thread.

.. versionadded:: 3.2

.. method:: notify(n=1)
.. method:: notify(n=1, timeout=None)

By default, wake up one thread waiting on this condition, if any. If the
calling thread has not acquired the lock when this method is called, a
Expand All @@ -852,7 +852,10 @@ item to the buffer only needs to wake up one consumer thread.
call until it can reacquire the lock. Since :meth:`notify` does not
release the lock, its caller should.

.. method:: notify_all()
.. versionchanged:: 3.14
The *timeout* parameter is new.

.. method:: notify_all(timeout=None)

Wake up all threads waiting on this condition. This method acts like
:meth:`notify`, but wakes up all waiting threads instead of one. If the
Expand All @@ -861,6 +864,9 @@ item to the buffer only needs to wake up one consumer thread.

The method ``notifyAll`` is a deprecated alias for this method.

.. versionchanged:: 3.14
The *timeout* parameter is new.


.. _semaphore-objects:

Expand Down
17 changes: 8 additions & 9 deletions Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,34 +274,33 @@ def wait(self, timeout=None):
for i in range(count):
self._lock.acquire()

def notify(self, n=1):
def notify(self, n=1, timeout=None):
assert self._lock._semlock._is_mine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(
False), ('notify: Should not have been able to acquire '
+ '_wait_semaphore')

# to take account of timeouts since last notify*() we subtract
# woken_count from sleeping_count and rezero woken_count
while self._woken_count.acquire(False):
res = self._sleeping_count.acquire(False)
while self._woken_count.acquire(False, timeout=timeout):
res = self._sleeping_count.acquire(False, timeout=timeout)
assert res, ('notify: Bug in sleeping_count.acquire'
+ '- res should not be False')

sleepers = 0
while sleepers < n and self._sleeping_count.acquire(False):
while sleepers < n and self._sleeping_count.acquire(False, timeout=timeout):
self._wait_semaphore.release() # wake up one sleeper
sleepers += 1

if sleepers:
for i in range(sleepers):
self._woken_count.acquire() # wait for a sleeper to wake

self._woken_count.acquire(timeout=timeout) # wait for a sleeper to wake
# rezero wait_semaphore in case some timeouts just happened
while self._wait_semaphore.acquire(False):
while self._wait_semaphore.acquire(False, timeout=timeout):
pass

def notify_all(self):
self.notify(n=sys.maxsize)
def notify_all(self, timeout=None):
self.notify(n=sys.maxsize, timeout=timeout)

def wait_for(self, predicate, timeout=None):
result = predicate()
Expand Down
12 changes: 6 additions & 6 deletions Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ def wait_for(self, predicate, timeout=None):
result = predicate()
return result

def notify(self, n=1):
def notify(self, n=1, timeout=None):
"""Wake up one or more threads waiting on this condition, if any.

If the calling thread has not acquired the lock when this method is
Expand Down Expand Up @@ -425,14 +425,14 @@ def notify(self, n=1):
except ValueError:
pass

def notify_all(self):
def notify_all(self, timeout=None):
"""Wake up all threads waiting on this condition.

If the calling thread has not acquired the lock when this method
is called, a RuntimeError is raised.

"""
self.notify(len(self._waiters))
self.notify(len(self._waiters), timeout=timeout)

def notifyAll(self):
"""Wake up all threads waiting on this condition.
Expand Down Expand Up @@ -723,7 +723,7 @@ def wait(self, timeout=None):
try:
if index + 1 == self._parties:
# We release the barrier
self._release()
self._release(timeout=timeout)
else:
# We wait until someone releases us
self._wait(timeout)
Expand All @@ -746,13 +746,13 @@ def _enter(self):

# Optionally run the 'action' and release the threads waiting
# in the barrier.
def _release(self):
def _release(self, timeout=None):
try:
if self._action:
self._action()
# enter draining state
self._state = 1
self._cond.notify_all()
self._cond.notify_all(timeout=timeout)
except:
#an exception during the _action handler. Break and reraise
self._break()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add timeout argument for notify_all/notify method in Condition class in
threading/multiprocessing module
Loading