Skip to content

Commit

Permalink
[3.12] pythongh-114440: Close writer pipe in multiprocessing.Queue, n…
Browse files Browse the repository at this point in the history
…ot concurrent.futures (pythonGH-114489)

This was left out of the 3.12 backport for three related issues:
- pythongh-107219 (which adds `self.call_queue._writer.close()` to `_ExecutorManagerThread` in `concurrent.futures`)
- pythongh-109370 (which changes this to be only called on Windows)
- pythongh-109047 (which moves the call to `multiprocessing.Queue`'s `_terminate_broken`)

Without this change, ProcessPoolExecutor sometimes hangs on Windows
when a worker process is terminated.

Co-authored-by: Victor Stinner <[email protected]>
Co-authored-by: Serhiy Storchaka <[email protected]>
  • Loading branch information
3 people authored and naveen521kk committed Jul 11, 2024
1 parent 69d924a commit 701b465
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
5 changes: 0 additions & 5 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,6 @@ def _terminate_broken(self, cause):

self.call_queue._terminate_broken()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
if sys.platform == 'win32':
self.call_queue._writer.close()

# clean up resources
self._join_executor_internals(broken=True)

Expand Down
5 changes: 5 additions & 0 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ def _terminate_broken(self):
# gh-94777: Prevent queue writing to a pipe which is no longer read.
self._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
if sys.platform == 'win32':
self._writer.close()

self.close()
self.join_thread()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
On Windows, closing the connection writer when cleaning up a broken
:class:`multiprocessing.Queue` queue is now done for all queues, rather than
only in :mod:`concurrent.futures` manager thread.
This can prevent a deadlock when a ``multiprocessing`` worker process terminates
without cleaning up.
This completes the backport of patches by Victor Stinner and Serhiy Storchaka.

0 comments on commit 701b465

Please sign in to comment.