Skip to content

Commit

Permalink
pythongh-109047: concurrent.futures _ExecutorManagerThread catches Py…
Browse files Browse the repository at this point in the history
…thonFinalizationError

_ExecutorManagerThread of concurrent.futures now catches
PythonFinalizationError: call terminate_broken(). The exception
occurs when adding an item to the "call_queue" creates a new thread
while Python is being finalized.

Add test_python_finalization_error() to test_concurrent_futures.

Changes:

* _ExecutorManagerThread.terminate_broken() no longer calls
  shutdown_workers() since the queue is no longer working anymore
  (read and write ends of the queue pipe are closed).
* multiprocessing.Queue:

  * Add _terminate_broken() method.
  * _start_thread() starts _thread to None on exception to prevent
    leaking "dangling threads" even if the thread was not started
    yet.

* wait_result_broken_or_wakeup() now uses the short form of
  traceback.format_exception().
  • Loading branch information
vstinner committed Sep 25, 2023
1 parent 6d969f3 commit 3beed92
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 17 deletions.
27 changes: 15 additions & 12 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,12 @@ def run(self):
# Main loop for the executor manager thread.

while True:
self.add_call_item_to_queue()
try:
self.add_call_item_to_queue()
except PythonFinalizationError as exc:
cause = format_exception(exc)
self.terminate_broken(cause)
return

result_item, is_broken, cause = self.wait_result_broken_or_wakeup()

Expand Down Expand Up @@ -425,8 +430,8 @@ def wait_result_broken_or_wakeup(self):
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = format_exception(type(e), e, e.__traceback__)
except BaseException as exc:
cause = format_exception(exc)

elif wakeup_reader in ready:
is_broken = False
Expand Down Expand Up @@ -515,16 +520,10 @@ def terminate_broken(self, cause):
for p in self.processes.values():
p.terminate()

# Prevent queue writing to a pipe which is no longer read.
# https://github.com/python/cpython/issues/94777
self.call_queue._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
self.call_queue._writer.close()
self.call_queue._terminate_broken()

# clean up resources
self.join_executor_internals()
self._join_executor_internals(broken=True)

def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
Expand Down Expand Up @@ -567,7 +566,11 @@ def shutdown_workers(self):
break

def join_executor_internals(self):
self.shutdown_workers()
self._join_executor_internals()

def _join_executor_internals(self, broken=False):
if not broken:
self.shutdown_workers()
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
Expand Down
27 changes: 22 additions & 5 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@ def cancel_join_thread(self):
except AttributeError:
pass

def _terminate_broken(self):
# Close a Queue on error.

# gh-94777: Prevent queue writing to a pipe which is no longer read.
self._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
self._writer.close()

self.close()
self.join_thread()

def _start_thread(self):
debug('Queue._start_thread()')

Expand All @@ -169,13 +182,17 @@ def _start_thread(self):
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
name='QueueFeederThread'
name='QueueFeederThread',
daemon=True,
)
self._thread.daemon = True

debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
try:
debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
except:
self._thread = None
raise

if not self._joincancelled:
self._jointhread = Finalize(
Expand Down
22 changes: 22 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import threading
import time
import unittest
from concurrent import futures
Expand Down Expand Up @@ -187,6 +188,27 @@ def test_max_tasks_early_shutdown(self):
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))

def test_python_finalization_error(self):
context = self.get_context()

# Create _ExecutorManagerThread, but block QueueFeederThread
orig_start_new_thread = threading._start_new_thread
nthread = 0
def mock_start_new_thread(func, *args):
nonlocal nthread
if nthread >= 1:
raise PythonFinalizationError()
nthread += 1
return orig_start_new_thread(func, *args)

with support.swap_attr(threading, '_start_new_thread',
mock_start_new_thread):
executor = self.executor_type(max_workers=2, mp_context=context)
with executor:
with self.assertRaises(BrokenProcessPool):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down

0 comments on commit 3beed92

Please sign in to comment.