Skip to content

Commit

Permalink
pythongh-109047: concurrent.futures catches exc on add_call_item_to_q…
Browse files Browse the repository at this point in the history
…ueue()

concurrent.futures: The *executor manager thread* now catches
exceptions when adding an item to the *call queue*. During Python
finalization, creating a new thread can now raise RuntimeError. Catch
the exception and call terminate_broken() in this case.

Add test_python_finalization_error() to test_concurrent_futures.

concurrent.futures._ExecutorManagerThread changes:

* terminate_broken() no longer calls shutdown_workers() since the
  queue is no longer working anymore (read and write ends of the
  queue pipe are closed).
* terminate_broken() now terminates child processes.
* wait_result_broken_or_wakeup() now uses the short form (1 argument,
  not 3) of traceback.format_exception().
* _ExecutorManagerThread.terminate_broken() now holds shutdown_lock
  to prevent race conditons with ProcessPoolExecutor.submit().

multiprocessing.Queue changes:

* Add _terminate_broken() method.
* _start_thread() sets _thread to None on exception to prevent
  leaking "dangling threads" even if the thread was not started
  yet.
  • Loading branch information
vstinner committed Sep 29, 2023
1 parent 3439cb0 commit f94feb1
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 22 deletions.
47 changes: 30 additions & 17 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,14 @@ def run(self):
# Main loop for the executor manager thread.

while True:
self.add_call_item_to_queue()
# gh-109047: During Python finalization, self.call_queue.put()
# creation of a thread can fail with RuntimeError.
try:
self.add_call_item_to_queue()
except BaseException as exc:
cause = format_exception(exc)
self.terminate_broken(cause)
return

result_item, is_broken, cause = self.wait_result_broken_or_wakeup()

Expand Down Expand Up @@ -425,8 +432,8 @@ def wait_result_broken_or_wakeup(self):
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = format_exception(type(e), e, e.__traceback__)
except BaseException as exc:
cause = format_exception(exc)

elif wakeup_reader in ready:
is_broken = False
Expand Down Expand Up @@ -463,7 +470,7 @@ def is_shutting_down(self):
return (_global_shutdown or executor is None
or executor._shutdown_thread)

def terminate_broken(self, cause):
def _terminate_broken(self, cause):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
Expand All @@ -490,7 +497,7 @@ def terminate_broken(self, cause):
for work_id, work_item in self.pending_work_items.items():
try:
work_item.future.set_exception(bpe)
except _base.InvalidStateError as exc:
except _base.InvalidStateError:
# set_exception() fails if the future is cancelled: ignore it.
# Trying to check if the future is cancelled before calling
# set_exception() would leave a race condition if the future is
Expand All @@ -505,17 +512,14 @@ def terminate_broken(self, cause):
for p in self.processes.values():
p.terminate()

# Prevent queue writing to a pipe which is no longer read.
# https://github.com/python/cpython/issues/94777
self.call_queue._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
if sys.platform == 'win32':
self.call_queue._writer.close()
self.call_queue._terminate_broken()

# clean up resources
self.join_executor_internals()
self._join_executor_internals(broken=True)

def terminate_broken(self, cause):
with self.shutdown_lock:
self._terminate_broken(cause)

def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
Expand Down Expand Up @@ -558,15 +562,24 @@ def shutdown_workers(self):
break

def join_executor_internals(self):
self.shutdown_workers()
with self.shutdown_lock:
self._join_executor_internals()

def _join_executor_internals(self, broken=False):
# If broken, call_queue was closed and so can no longer be used.
if not broken:
self.shutdown_workers()

# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
with self.shutdown_lock:
self.thread_wakeup.close()
self.thread_wakeup.close()

# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
if broken:
p.terminate()
p.join()

def get_n_children_alive(self):
Expand Down
30 changes: 25 additions & 5 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ def cancel_join_thread(self):
except AttributeError:
pass

def _terminate_broken(self):
# Close a Queue on error.

# gh-94777: Prevent queue writing to a pipe which is no longer read.
self._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
if sys.platform == 'win32':
self.call_queue._writer.close()

self.close()
self.join_thread()

def _start_thread(self):
debug('Queue._start_thread()')

Expand All @@ -169,13 +183,19 @@ def _start_thread(self):
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
name='QueueFeederThread'
name='QueueFeederThread',
daemon=True,
)
self._thread.daemon = True

debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
try:
debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
except:
# gh-109047: During Python finalization, creating a thread
# can fail with RuntimeError.
self._thread = None
raise

if not self._joincancelled:
self._jointhread = Finalize(
Expand Down
29 changes: 29 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import threading
import time
import unittest
from concurrent import futures
Expand Down Expand Up @@ -187,6 +188,34 @@ def test_max_tasks_early_shutdown(self):
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))

def test_python_finalization_error(self):
# gh-109047: Catch RuntimeError on thread creation
# during Python finalization.

context = self.get_context()

# gh-109047: Mock the threading.start_new_thread() function to inject
# RuntimeError: simulate the error raised during Python finalization.
# Block the second creation: create _ExecutorManagerThread, but block
# QueueFeederThread.
orig_start_new_thread = threading._start_new_thread
nthread = 0
def mock_start_new_thread(func, *args):
nonlocal nthread
if nthread >= 1:
raise RuntimeError("can't create new thread at "
"interpreter shutdown")
nthread += 1
return orig_start_new_thread(func, *args)

with support.swap_attr(threading, '_start_new_thread',
mock_start_new_thread):
executor = self.executor_type(max_workers=2, mp_context=context)
with executor:
with self.assertRaises(BrokenProcessPool):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
when adding an item to the *call queue*. During Python finalization, creating a
new thread can now raise :exc:`RuntimeError`. Catch the exception and call
``terminate_broken()`` in this case. Patch by Victor Stinner.

0 comments on commit f94feb1

Please sign in to comment.