diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 73bdcbe86939918..5e6a347275fee5a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -341,7 +341,14 @@ def run(self): # Main loop for the executor manager thread. while True: - self.add_call_item_to_queue() + # gh-109047: During Python finalization, if self.call_queue.put() + # tries to try to create a thread, it can fail with RuntimeError. + try: + self.add_call_item_to_queue() + except BaseException as exc: + cause = format_exception(exc) + self.terminate_broken(cause) + return result_item, is_broken, cause = self.wait_result_broken_or_wakeup() @@ -425,8 +432,8 @@ def wait_result_broken_or_wakeup(self): try: result_item = result_reader.recv() is_broken = False - except BaseException as e: - cause = format_exception(type(e), e, e.__traceback__) + except BaseException as exc: + cause = format_exception(exc) elif wakeup_reader in ready: is_broken = False @@ -463,7 +470,7 @@ def is_shutting_down(self): return (_global_shutdown or executor is None or executor._shutdown_thread) - def terminate_broken(self, cause): + def _terminate_broken_unlocked(self, cause): # Terminate the executor because it is in a broken state. The cause # argument can be used to display more information on the error that # lead the executor into becoming broken. @@ -490,7 +497,7 @@ def terminate_broken(self, cause): for work_id, work_item in self.pending_work_items.items(): try: work_item.future.set_exception(bpe) - except _base.InvalidStateError as exc: + except _base.InvalidStateError: # set_exception() fails if the future is cancelled: ignore it. # Trying to check if the future is cancelled before calling # set_exception() would leave a race condition if the future is @@ -505,17 +512,14 @@ def terminate_broken(self, cause): for p in self.processes.values(): p.terminate() - # Prevent queue writing to a pipe which is no longer read. - # https://github.com/python/cpython/issues/94777 - self.call_queue._reader.close() - - # gh-107219: Close the connection writer which can unblock - # Queue._feed() if it was stuck in send_bytes(). - if sys.platform == 'win32': - self.call_queue._writer.close() + self.call_queue._terminate_broken() # clean up resources - self.join_executor_internals() + self._join_executor_internals_unlocked(broken=True) + + def terminate_broken(self, cause): + with self.shutdown_lock: + self._terminate_broken_unlocked(cause) def flag_executor_shutting_down(self): # Flag the executor as shutting down and cancel remaining tasks if @@ -558,15 +562,24 @@ def shutdown_workers(self): break def join_executor_internals(self): - self.shutdown_workers() + with self.shutdown_lock: + self._join_executor_internals_unlocked() + + def _join_executor_internals_unlocked(self, broken=False): + # If broken, call_queue was closed and is no longer usable + if not broken: + self.shutdown_workers() + # Release the queue's resources as soon as possible. self.call_queue.close() self.call_queue.join_thread() - with self.shutdown_lock: - self.thread_wakeup.close() + self.thread_wakeup.close() + # If .join() is not called on the created processes then # some ctx.Queue methods may deadlock on Mac OS X. for p in self.processes.values(): + if broken: + p.terminate() p.join() def get_n_children_alive(self): diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index daf9ee94a194318..c00f18b1fcb1c34 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -158,6 +158,20 @@ def cancel_join_thread(self): except AttributeError: pass + def _terminate_broken(self): + # Close a Queue on error. + + # gh-94777: Prevent queue writing to a pipe which is no longer read. + self._reader.close() + + # gh-107219: Close the connection writer which can unblock + # Queue._feed() if it was stuck in send_bytes(). + if sys.platform == 'win32': + self.call_queue._writer.close() + + self.close() + self.join_thread() + def _start_thread(self): debug('Queue._start_thread()') @@ -169,13 +183,17 @@ def _start_thread(self): self._wlock, self._reader.close, self._writer.close, self._ignore_epipe, self._on_queue_feeder_error, self._sem), - name='QueueFeederThread' + name='QueueFeederThread', + daemon=True, ) - self._thread.daemon = True - debug('doing self._thread.start()') - self._thread.start() - debug('... done self._thread.start()') + try: + debug('doing self._thread.start()') + self._thread.start() + debug('... done self._thread.start()') + except: + self._thread = None + raise if not self._joincancelled: self._jointhread = Finalize( diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 7763a4946f110cb..ebe0560d500fd2f 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,5 +1,6 @@ import os import sys +import threading import time import unittest from concurrent import futures @@ -187,6 +188,31 @@ def test_max_tasks_early_shutdown(self): for i, future in enumerate(futures): self.assertEqual(future.result(), mul(i, i)) + def test_python_finalization_error(self): + context = self.get_context() + + # gh-109047: Create _ExecutorManagerThread, but block + # QueueFeederThread. Mock the threading.start_new_thread() function + # to inject RuntimeError: simulate the error raised during Python + # finalization. + orig_start_new_thread = threading._start_new_thread + nthread = 0 + def mock_start_new_thread(func, *args): + nonlocal nthread + if nthread >= 1: + raise RuntimeError("can't create new thread at " + "interpreter shutdown") + nthread += 1 + return orig_start_new_thread(func, *args) + + with support.swap_attr(threading, '_start_new_thread', + mock_start_new_thread): + executor = self.executor_type(max_workers=2, mp_context=context) + with executor: + with self.assertRaises(BrokenProcessPool): + list(executor.map(mul, [(2, 3)] * 10)) + executor.shutdown() + create_executor_tests(globals(), ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst b/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst new file mode 100644 index 000000000000000..71cb5a80847d0af --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst @@ -0,0 +1,4 @@ +:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions +when adding an item to the *call queue*. During Python finalization, creating a +new thread can now raise :exc:`RuntimeError`. Catch the exception and call +``terminate_broken()`` in this case. Patch by Victor Stinner.