From 5c69d38fa93f15224e859cf5d170e76bdcd7b2c6 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 25 Sep 2024 13:25:19 -0600 Subject: [PATCH 01/38] Make ThreadPoolExecutor extensible. --- Lib/concurrent/futures/thread.py | 84 ++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index a024033f35fb54..c887f2b4d03f50 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -43,19 +43,46 @@ def _python_exit(): after_in_parent=_global_shutdown_lock.release) +class WorkerContext: + + @classmethod + def prepare(cls, initializer, initargs): + if initializer is not None: + if not callable(initializer): + raise TypeError("initializer must be a callable") + def create_context(): + return cls(initializer, initargs) + def resolve_task(cls, fn, args, kwargs): + return (fn, args, kwargs) + return create_context, resolve_task + + def __init__(self, initializer, initargs): + self.initializer = initializer + self.initargs = initargs + + def initialize(self): + if self.initializer is not None: + self.initializer(*self.initargs) + + def finalize(self): + pass + + def run(self, task): + fn, args, kwargs = task + return fn(*args, **kwargs) + + class _WorkItem: - def __init__(self, future, fn, args, kwargs): + def __init__(self, future, task): self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs + self.task = task - def run(self): + def run(self, ctx): if not self.future.set_running_or_notify_cancel(): return try: - result = self.fn(*self.args, **self.kwargs) + result = ctx.run(self.task) except BaseException as exc: self.future.set_exception(exc) # Break a reference cycle with the exception 'exc' @@ -66,16 +93,15 @@ def run(self): __class_getitem__ = classmethod(types.GenericAlias) -def _worker(executor_reference, work_queue, initializer, initargs): - if initializer is not None: - try: - initializer(*initargs) - except BaseException: - _base.LOGGER.critical('Exception in initializer:', exc_info=True) - executor = executor_reference() - if executor is not None: - executor._initializer_failed() - return +def _worker(executor_reference, ctx, work_queue): + try: + ctx.initialize() + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + executor = executor_reference() + if executor is not None: + executor._initializer_failed() + return try: while True: try: @@ -89,7 +115,7 @@ def _worker(executor_reference, work_queue, initializer, initargs): work_item = work_queue.get(block=True) if work_item is not None: - work_item.run() + work_item.run(ctx) # Delete references to object. See GH-60488 del work_item continue @@ -110,6 +136,8 @@ def _worker(executor_reference, work_queue, initializer, initargs): del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) + finally: + ctx.finalize() class BrokenThreadPool(_base.BrokenExecutor): @@ -123,8 +151,12 @@ class ThreadPoolExecutor(_base.Executor): # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ + @classmethod + def prepare_context(cls, initializer, initargs): + return WorkerContext.prepare(initializer, initargs) + def __init__(self, max_workers=None, thread_name_prefix='', - initializer=None, initargs=()): + initializer=None, initargs=(), **ctxkwargs): """Initializes a new ThreadPoolExecutor instance. Args: @@ -133,6 +165,7 @@ def __init__(self, max_workers=None, thread_name_prefix='', thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer. + ctxkwargs: Additional arguments to cls.prepare_context(). """ if max_workers is None: # ThreadPoolExecutor is often used to: @@ -146,8 +179,9 @@ def __init__(self, max_workers=None, thread_name_prefix='', if max_workers <= 0: raise ValueError("max_workers must be greater than 0") - if initializer is not None and not callable(initializer): - raise TypeError("initializer must be a callable") + (self._create_worker_context, + self._resolve_work_item_task, + ) = type(self).prepare_context(initializer, initargs, **ctxkwargs) self._max_workers = max_workers self._work_queue = queue.SimpleQueue() @@ -158,8 +192,6 @@ def __init__(self, max_workers=None, thread_name_prefix='', self._shutdown_lock = threading.Lock() self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) - self._initializer = initializer - self._initargs = initargs def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock, _global_shutdown_lock: @@ -173,7 +205,8 @@ def submit(self, fn, /, *args, **kwargs): 'interpreter shutdown') f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) + task = self._resolve_work_item_task(f, fn, args, kwargs) + w = _WorkItem(f, task) self._work_queue.put(w) self._adjust_thread_count() @@ -196,9 +229,8 @@ def weakref_cb(_, q=self._work_queue): num_threads) t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), - self._work_queue, - self._initializer, - self._initargs)) + self._create_worker_context(), + self._work_queue)) t.start() self._threads.add(t) _threads_queues[t] = self._work_queue From 01789be00f372c65b0519eab1580faa5491e0e2e Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 25 Sep 2024 15:10:23 -0600 Subject: [PATCH 02/38] Add InterpreterPoolExecutor. --- Lib/concurrent/futures/__init__.py | 6 + Lib/concurrent/futures/interpreter.py | 183 ++++++++++++++++++ Lib/test/test_concurrent_futures/executor.py | 7 +- .../test_interpreter_pool.py | 70 +++++++ Lib/test/test_concurrent_futures/util.py | 5 + 5 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 Lib/concurrent/futures/interpreter.py create mode 100644 Lib/test/test_concurrent_futures/test_interpreter_pool.py diff --git a/Lib/concurrent/futures/__init__.py b/Lib/concurrent/futures/__init__.py index 72de617a5b6f61..5a6f7d2d9a8d6a 100644 --- a/Lib/concurrent/futures/__init__.py +++ b/Lib/concurrent/futures/__init__.py @@ -29,6 +29,7 @@ 'Executor', 'wait', 'as_completed', + 'InterpreterPoolExecutor', 'ProcessPoolExecutor', 'ThreadPoolExecutor', ) @@ -51,4 +52,9 @@ def __getattr__(name): ThreadPoolExecutor = te return te + if name == 'InterpreterPoolExecutor': + from .interpreter import InterpreterPoolExecutor as ie + InterpreterPoolExecutor = ie + return ie + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py new file mode 100644 index 00000000000000..851466ddb1c847 --- /dev/null +++ b/Lib/concurrent/futures/interpreter.py @@ -0,0 +1,183 @@ +"""Implements InterpreterPoolExecutor.""" + +import pickle +from . import thread as _thread +import _interpreters +import _interpqueues + + +LINESEP = ''' +''' + + +_EXEC_FAILURE_STR = """ +{superstr} + +Uncaught in the interpreter: + +{formatted} +""".strip() + + +class ExecutionFailed(_interpreters.InterpreterError): + """An unhandled exception happened during execution.""" + + def __init__(self, excinfo): + msg = excinfo.formatted + if not msg: + if excinfo.type and excinfo.msg: + msg = f'{excinfo.type.__name__}: {excinfo.msg}' + else: + msg = excinfo.type.__name__ or excinfo.msg + super().__init__(msg) + self.excinfo = excinfo + + def __str__(self): + try: + formatted = self.excinfo.errdisplay + except Exception: + return super().__str__() + else: + return _EXEC_FAILURE_STR.format( + superstr=super().__str__(), + formatted=formatted, + ) + + +UNBOUND = 2 # error; this should not happen. + + +class WorkerContext(_thread.WorkerContext): + + @classmethod + def prepare(cls, initializer, initargs, shared): + if isinstance(initializer, str): + if initargs: + raise ValueError(f'an initializer script does not take args, got {args!r}') + initscript = initializer + # Make sure the script compiles. + # XXX Keep the compiled code object? + compile(script, '', 'exec') + elif initializer is not None: + pickled = pickle.dumps((initializer, initargs)) + initscript = f'''if True: + initializer, initargs = pickle.loads({pickled!r}) + initializer(*initargs) + ''' + else: + initscript = None + def create_context(): + return cls(initscript, shared) + def resolve_task(cls, fn, args, kwargs): + if isinstance(fn, str): + if args or kwargs: + raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}') + data = fn + kind = 'script' + else: + data = pickle.dumps((fn, args, kwargs)) + kind = 'function' + return (data, kind) + return create_context, resolve_task + + @classmethod + def _run_pickled_func(cls, data, resultsid): + fn, args, kwargs = pickle.loads(data) + res = fn(*args, **kwargs) + # Send the result back. + try: + _interpqueues.put(resultsid, res, 0, UNBOUND) + except _interpreters.NotShareableError: + res = pickle.dumps(res) + _interpqueues.put(resultsid, res, 1, UNBOUND) + + def __init__(self, initscript, shared=None): + self.initscript = initscript or '' + self.shared = dict(shared) if shared else None + self.interpid = None + self.resultsid = None + + def __del__(self): + if self.interpid is not None: + self.finalize() + + def _exec(self, script): + assert self.interpid is not None + excinfo = _interpreters.exec(self.interpid, script, restrict=True) + if excinfo is not None: + raise ExecutionFailed(excinfo) + + def initialize(self): + assert self.interpid is None, self.interpid + self.interpid = _interpreters.create(reqrefs=True) + # This may raise InterpreterNotFoundError: + _interpreters.incref(self.interpid) + + maxsize = 0 + fmt = 0 + self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND) + + initscript = f"""if True: + from {__name__} import WorkerContext + """ + initscript += LINESEP + self.initscript +# for i, line in enumerate(initscript.splitlines(), 1): +# print(f'{i:>3} {line}') + self._exec(initscript) + if self.shared: + _interpreters.set___main___attrs( + self.interpid, self.shared, restrict=True) + + def finalize(self): + interpid = self.interpid + resultsid = self.resultsid + self.resultsid = None + self.interpid = None + assert interpid is not None + assert resultsid is not None + try: + _interpqueues.destroy(resultsid) + except _interpqueues.QueueNotFoundError: + pass + try: + _interpreters.decref(interpid) + except _interpreters.InterpreterNotFoundError: + pass + + def run(self, task): + data, kind = task + if kind == 'script': + self._exec(script) + return None + elif kind == 'function': + self._exec( + f'WorkerContext._run_pickled_func({data}, {self.resultsid})') + obj, pickled, unboundop = _interpqueues.get(self.resultsid) + assert unboundop is None, unboundop + return pickle.loads(obj) if pickled else obj + else: + raise NotImplementedError(kind) + + +class InterpreterPoolExecutor(_thread.ThreadPoolExecutor): + + @classmethod + def prepare_context(cls, initializer, initargs, shared): + return WorkerContext.prepare(initializer, initargs, shared) + + def __init__(self, max_workers=None, thread_name_prefix='', + initializer=None, initargs=(), shared=None): + """Initializes a new InterpreterPoolExecutor instance. + + Args: + max_workers: The maximum number of interpreters that can be used to + execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. + initializer: A callable or script used to initialize + each worker interpreter. + initargs: A tuple of arguments to pass to the initializer. + shared: A mapping of shareabled objects to be inserted into + each worker interpreter. + """ + super().__init__(max_workers, thread_name_prefix, + initializer, initargs, shared=shared) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 4160656cb133ab..2a024ff63bae08 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -23,6 +23,10 @@ def make_dummy_object(_): class ExecutorTest: + + def assertTaskRaises(self, exctype): + return self.assertRaises(exctype) + # Executor.shutdown() and context manager usage is tested by # ExecutorShutdownTest. def test_submit(self): @@ -52,7 +56,8 @@ def test_map_exception(self): i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) self.assertEqual(i.__next__(), (0, 1)) self.assertEqual(i.__next__(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.__next__) + with self.assertTaskRaises(ZeroDivisionError): + i.__next__() @support.requires_resource('walltime') def test_map_timeout(self): diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py new file mode 100644 index 00000000000000..1b0315ad18584f --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -0,0 +1,70 @@ +import unittest +from concurrent.futures.interpreter import ExecutionFailed +from test import support +from test.support.interpreters import queues + +from .executor import ExecutorTest, mul +from .util import BaseTestCase, InterpreterPoolMixin, setup_module + + +class InterpreterPoolExecutorTest(InterpreterPoolMixin, ExecutorTest, BaseTestCase): + + def assertTaskRaises(self, exctype): + return self.assertRaisesRegex(ExecutionFailed, exctype.__name__) + + def test_saturation(self): + blocker = queues.create() + executor = self.executor_type(4, shared=dict(blocker=blocker)) + + for i in range(15 * executor._max_workers): + executor.submit('blocker.get()') + self.assertEqual(len(executor._threads), executor._max_workers) + for i in range(15 * executor._max_workers): + blocker.put_nowait(None) + executor.shutdown(wait=True) + + @support.requires_gil_enabled("gh-117344: test is flaky without the GIL") + def test_idle_thread_reuse(self): + executor = self.executor_type() + executor.submit(mul, 21, 2).result() + executor.submit(mul, 6, 7).result() + executor.submit(mul, 3, 14).result() + self.assertEqual(len(executor._threads), 1) + executor.shutdown(wait=True) + +# def test_executor_map_current_future_cancel(self): +# blocker = queues.create() +# log = queues.create() +# +# script = """if True: +# def log_n_wait({ident}): +# blocker(f"ident {ident} started") +# try: +# stop_event.wait() +# finally: +# log.append(f"ident {ident} stopped") +# """ +# +# with self.executor_type(max_workers=1) as pool: +# # submit work to saturate the pool +# fut = pool.submit(script.format(ident="first")) +# gen = pool.map(log_n_wait, ["second", "third"], timeout=0) +# try: +# with self.assertRaises(TimeoutError): +# next(gen) +# finally: +# gen.close() +# blocker.put +# stop_event.set() +# fut.result() +# # ident='second' is cancelled as a result of raising a TimeoutError +# # ident='third' is cancelled because it remained in the collection of futures +# self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py index 3b8ec3e205d5aa..52baab51340fc9 100644 --- a/Lib/test/test_concurrent_futures/util.py +++ b/Lib/test/test_concurrent_futures/util.py @@ -74,6 +74,10 @@ class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor +class InterpreterPoolMixin(ExecutorMixin): + executor_type = futures.InterpreterPoolExecutor + + class ProcessPoolForkMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "fork" @@ -120,6 +124,7 @@ def get_context(self): def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,), executor_mixins=(ThreadPoolMixin, + InterpreterPoolMixin, ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)): From 6def4bedeb4a61e10e66d94221490a55723eaebe Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 27 Sep 2024 11:54:45 -0600 Subject: [PATCH 03/38] Clean up the interpreter if initialize() fails. --- Lib/concurrent/futures/interpreter.py | 58 ++++++++++++++------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 851466ddb1c847..310acc4de08b93 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -110,39 +110,43 @@ def _exec(self, script): def initialize(self): assert self.interpid is None, self.interpid self.interpid = _interpreters.create(reqrefs=True) - # This may raise InterpreterNotFoundError: - _interpreters.incref(self.interpid) - - maxsize = 0 - fmt = 0 - self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND) - - initscript = f"""if True: - from {__name__} import WorkerContext - """ - initscript += LINESEP + self.initscript -# for i, line in enumerate(initscript.splitlines(), 1): -# print(f'{i:>3} {line}') - self._exec(initscript) - if self.shared: - _interpreters.set___main___attrs( - self.interpid, self.shared, restrict=True) + try: + _interpreters.incref(self.interpid) + + initscript = f"""if True: + from {__name__} import WorkerContext + """ + initscript += LINESEP + self.initscript + self._exec(initscript) + + if self.shared: + _interpreters.set___main___attrs( + self.interpid, self.shared, restrict=True) + + maxsize = 0 + fmt = 0 + self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND) + except _interpreters.InterpreterNotFoundError: + raise # re-raise + except BaseException: + self.finalize() + raise # re-raise def finalize(self): interpid = self.interpid resultsid = self.resultsid self.resultsid = None self.interpid = None - assert interpid is not None - assert resultsid is not None - try: - _interpqueues.destroy(resultsid) - except _interpqueues.QueueNotFoundError: - pass - try: - _interpreters.decref(interpid) - except _interpreters.InterpreterNotFoundError: - pass + if resultsid is not None: + try: + _interpqueues.destroy(resultsid) + except _interpqueues.QueueNotFoundError: + pass + if interpid is not None: + try: + _interpreters.decref(interpid) + except _interpreters.InterpreterNotFoundError: + pass def run(self, task): data, kind = task From 84993a5f62a047af803e8e930748a81faf83275d Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 27 Sep 2024 11:14:52 -0600 Subject: [PATCH 04/38] Add a missing import. --- Lib/concurrent/futures/interpreter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 310acc4de08b93..b85e437540a243 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -61,6 +61,7 @@ def prepare(cls, initializer, initargs, shared): elif initializer is not None: pickled = pickle.dumps((initializer, initargs)) initscript = f'''if True: + import pickle initializer, initargs = pickle.loads({pickled!r}) initializer(*initargs) ''' From c540cf0a9de1709c5a3d8661875b19c99013e15d Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 27 Sep 2024 14:41:56 -0600 Subject: [PATCH 05/38] Fix some typos. --- Lib/concurrent/futures/interpreter.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index b85e437540a243..dc5368365c882b 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -53,11 +53,11 @@ class WorkerContext(_thread.WorkerContext): def prepare(cls, initializer, initargs, shared): if isinstance(initializer, str): if initargs: - raise ValueError(f'an initializer script does not take args, got {args!r}') + raise ValueError(f'an initializer script does not take args, got {initargs!r}') initscript = initializer # Make sure the script compiles. # XXX Keep the compiled code object? - compile(script, '', 'exec') + compile(initscript, '', 'exec') elif initializer is not None: pickled = pickle.dumps((initializer, initargs)) initscript = f'''if True: @@ -152,11 +152,11 @@ def finalize(self): def run(self, task): data, kind = task if kind == 'script': - self._exec(script) + self._exec(data) return None elif kind == 'function': self._exec( - f'WorkerContext._run_pickled_func({data}, {self.resultsid})') + f'WorkerContext._run_pickled_func({data!r}, {self.resultsid})') obj, pickled, unboundop = _interpqueues.get(self.resultsid) assert unboundop is None, unboundop return pickle.loads(obj) if pickled else obj From 45d584d43408772f0fe0cb00231161830c97caac Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 27 Sep 2024 14:42:27 -0600 Subject: [PATCH 06/38] Add more tests. --- .../test_interpreter_pool.py | 140 ++++++++++++++---- 1 file changed, 111 insertions(+), 29 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index 1b0315ad18584f..34cbab1f727b51 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -1,3 +1,6 @@ +import contextlib +import os +import sys import unittest from concurrent.futures.interpreter import ExecutionFailed from test import support @@ -7,11 +10,119 @@ from .util import BaseTestCase, InterpreterPoolMixin, setup_module +def write_msg(fd, msg): + os.write(fd, msg + b'\0') + + +def read_msg(fd): + msg = b'' + while ch := os.read(fd, 1): + if ch == b'\0': + return msg + msg += ch + + +def get_current_name(): + return __name__ + + class InterpreterPoolExecutorTest(InterpreterPoolMixin, ExecutorTest, BaseTestCase): + def pipe(self): + r, w = os.pipe() + self.addCleanup(lambda: os.close(r)) + self.addCleanup(lambda: os.close(w)) + return r, w + def assertTaskRaises(self, exctype): return self.assertRaisesRegex(ExecutionFailed, exctype.__name__) + def test_init_func(self): + msg = b'step: init' + r, w = self.pipe() + os.write(w, b'\0') + + executor = self.executor_type( + initializer=write_msg, initargs=(w, msg)) + before = os.read(r, 100) + executor.submit(mul, 10, 10) + after = read_msg(r) + + self.assertEqual(before, b'\0') + self.assertEqual(after, msg) + + def test_init_script(self): + msg1 = b'step: init' + msg2 = b'step: run' + r, w = self.pipe() + initscript = f"""if True: + import os + msg = {msg2!r} + os.write({w}, {msg1!r} + b'\\0') + """ + script = f"""if True: + os.write({w}, msg + b'\\0') + """ + os.write(w, b'\0') + + executor = self.executor_type(initializer=initscript) + before_init = os.read(r, 100) + fut = executor.submit(script) + after_init = read_msg(r) + write_msg(w, b'') + before_run = read_msg(r) + fut.result() + after_run = read_msg(r) + + self.assertEqual(before_init, b'\0') + self.assertEqual(after_init, msg1) + self.assertEqual(before_run, b'') + self.assertEqual(after_run, msg2) + + def test_init_script_args(self): + with self.assertRaises(ValueError): + self.executor_type(initializer='pass', initargs=('spam',)) + + def test_init_shared(self): + msg = b'eggs' + r, w = self.pipe() + script = f"""if True: + import os + os.write({w}, spam + b'\\0') + """ + + executor = self.executor_type(shared={'spam': msg}) + fut = executor.submit(script) + fut.result() + after = read_msg(r) + + self.assertEqual(after, msg) + + def test_submit_script(self): + msg = b'spam' + r, w = self.pipe() + script = f"""if True: + import os + os.write({w}, __name__.encode('utf-8') + b'\\0') + """ + executor = self.executor_type() + + fut = executor.submit(script) + res = fut.result() + after = read_msg(r) + + self.assertEqual(after, b'__main__') + self.assertIs(res, None) + + def test_submit_func_globals(self): + raise NotImplementedError + executor = self.executor_type() + fut = executor.submit(get_current_name) + name = fut.result() + + self.assertEqual(name, '__main__') + self.assertNotEqual(name, __name__) + def test_saturation(self): blocker = queues.create() executor = self.executor_type(4, shared=dict(blocker=blocker)) @@ -32,35 +143,6 @@ def test_idle_thread_reuse(self): self.assertEqual(len(executor._threads), 1) executor.shutdown(wait=True) -# def test_executor_map_current_future_cancel(self): -# blocker = queues.create() -# log = queues.create() -# -# script = """if True: -# def log_n_wait({ident}): -# blocker(f"ident {ident} started") -# try: -# stop_event.wait() -# finally: -# log.append(f"ident {ident} stopped") -# """ -# -# with self.executor_type(max_workers=1) as pool: -# # submit work to saturate the pool -# fut = pool.submit(script.format(ident="first")) -# gen = pool.map(log_n_wait, ["second", "third"], timeout=0) -# try: -# with self.assertRaises(TimeoutError): -# next(gen) -# finally: -# gen.close() -# blocker.put -# stop_event.set() -# fut.result() -# # ident='second' is cancelled as a result of raising a TimeoutError -# # ident='third' is cancelled because it remained in the collection of futures -# self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) - def setUpModule(): setup_module() From c90c016202a86af860c638b59db7c62396fcb912 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 27 Sep 2024 15:32:42 -0600 Subject: [PATCH 07/38] Add docs. --- Doc/library/concurrent.futures.rst | 69 ++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 4 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index ce72127127c7a6..e37d1b3212a8c6 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -15,9 +15,10 @@ The :mod:`concurrent.futures` module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using -:class:`ThreadPoolExecutor`, or separate processes, using -:class:`ProcessPoolExecutor`. Both implement the same interface, which is -defined by the abstract :class:`Executor` class. +:class:`ThreadPoolExecutor` or :class:`InterpreterPoolExecutor`, +or separate processes, using :class:`ProcessPoolExecutor`. +Both implement the same interface, which is defined +by the abstract :class:`Executor` class. .. include:: ../includes/wasm-notavail.rst @@ -63,7 +64,8 @@ Executor Objects setting *chunksize* to a positive integer. For very long iterables, using a large value for *chunksize* can significantly improve performance compared to the default size of 1. With - :class:`ThreadPoolExecutor`, *chunksize* has no effect. + :class:`ThreadPoolExecutor` and :class:`ThreadPoolExecutor`, + *chunksize* has no effect. .. versionchanged:: 3.5 Added the *chunksize* argument. @@ -227,6 +229,54 @@ ThreadPoolExecutor Example print('%r page is %d bytes' % (url, len(data))) +InterpreterPoolExecutor +----------------------- + +The :class:`InterpreterPoolExecutor` class is a :class:`ThreadPoolExecutor` +subclass that uses a pool of isolated interpreters to execute calls +asynchronously. Each interpreter has its own GIL, which allows the +executor to side-step the :term:`Global Interpreter Lock +`. Interpreters mostly can't share objects +between them, which means that, in most cases, only picklable objects +can be executed and returned. + +.. class:: InterpreterPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), shared=None) + + A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously + using a pool of at most *max_workers* interpreters. Each interpreter + runs tasks in its own thread. + + *initializer* and *initargs* are the same as with + :class:`ThreadPoolExecutor`, though they are pickled like with + :class:`ProcessPoolExecutor`. Additionally, you can pass a script + (:class:`str`) for *initiazer*, which will be ``exec``ed in the + interpreter's ``__main__`` module. In that case, *initargs* must + not be passed in. + + Similarly you can pass a script to :meth:`Executor.submit()`, which + will be ``exec``ed in the interpreter's ``__main__`` module. In that + case no arguments may be provided and the return value is always + ``None``. + + :meth:`InterpreterPoolExecutor ` does *not* support + passing in a script. + + In each of those cases, an uncaught exception from the initializer + or task is raised as an + :class:`~concurrent.futures.interpreter.ExecutionFailed` exception, + rather than the uncaught exception itself. + + *shared* is an optional dict of objects shared by all interpreters + in the pool. The items are added to each interpreter's ``__main__`` + module. Shareable objects include the builtin singletons, :class:`str` + and :class:`bytes`, and :class:`memoryview`. See :pep:`734` + for more info. + + The other caveats that apply to :class:`ThreadPoolExecutor` apply here. + + .. versionadded:: 3.14 + + ProcessPoolExecutor ------------------- @@ -574,6 +624,17 @@ Exception classes .. versionadded:: 3.7 +.. currentmodule:: concurrent.futures.interpreter + +.. exception:: ExecutionFailed + + Raised from :class:`~concurrent.futures.InterpreterPoolExecutor` when + the given initializer fails or from + :meth:`~concurrent.futures.Executor.submit` when there's an uncaught + exception from the submitted task. + + .. versionadded:: 3.14 + .. currentmodule:: concurrent.futures.process .. exception:: BrokenProcessPool From 1cb4657c418534198e633ea83335402ffe2b0470 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 27 Sep 2024 15:43:01 -0600 Subject: [PATCH 08/38] Add a NEwS entry. --- .../Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst diff --git a/Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst b/Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst new file mode 100644 index 00000000000000..1aa1a463b0c63a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst @@ -0,0 +1,6 @@ +We've added :class:`concurrent.futures.InterpreterPoolExecutor`, which +allows you to run code in multiple isolated interpreters. This allows you +to circumvent the limitations of CPU-bound threads (due to the GIL). Patch +by Eric Snow. + +This addition is unrelated to :pep:`734`. From 4dc0989b7287390347663c09aaeb367e1cacb38e Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 27 Sep 2024 15:47:19 -0600 Subject: [PATCH 09/38] Fix the last test. --- Lib/test/test_concurrent_futures/test_interpreter_pool.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index 34cbab1f727b51..dceeb2b924a449 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -115,13 +115,12 @@ def test_submit_script(self): self.assertIs(res, None) def test_submit_func_globals(self): - raise NotImplementedError executor = self.executor_type() fut = executor.submit(get_current_name) name = fut.result() - self.assertEqual(name, '__main__') - self.assertNotEqual(name, __name__) + self.assertEqual(name, __name__) + self.assertNotEqual(name, '__main__') def test_saturation(self): blocker = queues.create() From 57b2db672fc011d19f04ca34bc6e716c580f382a Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 27 Sep 2024 16:34:08 -0600 Subject: [PATCH 10/38] Add more tests. --- .../test_interpreter_pool.py | 85 ++++++++++++++++--- 1 file changed, 71 insertions(+), 14 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index dceeb2b924a449..529ccf736c8f76 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -1,5 +1,6 @@ import contextlib import os +import pickle import sys import unittest from concurrent.futures.interpreter import ExecutionFailed @@ -37,20 +38,6 @@ def pipe(self): def assertTaskRaises(self, exctype): return self.assertRaisesRegex(ExecutionFailed, exctype.__name__) - def test_init_func(self): - msg = b'step: init' - r, w = self.pipe() - os.write(w, b'\0') - - executor = self.executor_type( - initializer=write_msg, initargs=(w, msg)) - before = os.read(r, 100) - executor.submit(mul, 10, 10) - after = read_msg(r) - - self.assertEqual(before, b'\0') - self.assertEqual(after, msg) - def test_init_script(self): msg1 = b'step: init' msg2 = b'step: run' @@ -83,6 +70,42 @@ def test_init_script_args(self): with self.assertRaises(ValueError): self.executor_type(initializer='pass', initargs=('spam',)) + def test_init_func(self): + msg = b'step: init' + r, w = self.pipe() + os.write(w, b'\0') + + executor = self.executor_type( + initializer=write_msg, initargs=(w, msg)) + before = os.read(r, 100) + executor.submit(mul, 10, 10) + after = read_msg(r) + + self.assertEqual(before, b'\0') + self.assertEqual(after, msg) + + def test_init_closure(self): + count = 0 + def init1(): + assert count == 0, count + def init2(): + nonlocal count + count += 1 + + with self.assertRaises(pickle.PicklingError): + self.executor_type(initializer=init1) + with self.assertRaises(pickle.PicklingError): + self.executor_type(initializer=init2) + + def test_init_instance_method(self): + class Spam: + def initializer(self): + raise NotImplementedError + spam = Spam() + + with self.assertRaises(pickle.PicklingError): + self.executor_type(initializer=spam.initializer) + def test_init_shared(self): msg = b'eggs' r, w = self.pipe() @@ -114,6 +137,40 @@ def test_submit_script(self): self.assertEqual(after, b'__main__') self.assertIs(res, None) + def test_submit_closure(self): + spam = True + def task1(): + return spam + def task2(): + nonlocal spam + spam += 1 + return spam + + executor = self.executor_type() + with self.assertRaises(pickle.PicklingError): + executor.submit(task1) + with self.assertRaises(pickle.PicklingError): + executor.submit(task2) + + def test_submit_local_instance(self): + class Spam: + def __init__(self): + self.value = True + + executor = self.executor_type() + with self.assertRaises(pickle.PicklingError): + executor.submit(Spam) + + def test_submit_instance_method(self): + class Spam: + def run(self): + return True + spam = Spam() + + executor = self.executor_type() + with self.assertRaises(pickle.PicklingError): + executor.submit(spam.run) + def test_submit_func_globals(self): executor = self.executor_type() fut = executor.submit(get_current_name) From 75e11d2ff9fe0849eac91dfe9bb30f55034ff55d Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 30 Sep 2024 16:11:06 -0600 Subject: [PATCH 11/38] Simplify ExecutionFailed. --- Lib/concurrent/futures/interpreter.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index dc5368365c882b..8427e28322d60e 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -6,19 +6,6 @@ import _interpqueues -LINESEP = ''' -''' - - -_EXEC_FAILURE_STR = """ -{superstr} - -Uncaught in the interpreter: - -{formatted} -""".strip() - - class ExecutionFailed(_interpreters.InterpreterError): """An unhandled exception happened during execution.""" @@ -38,10 +25,13 @@ def __str__(self): except Exception: return super().__str__() else: - return _EXEC_FAILURE_STR.format( - superstr=super().__str__(), - formatted=formatted, - ) + return textwrap.dedent(f""" +{super().__str__()} + +Uncaught in the interpreter: + +{formatted} + """.strip()) UNBOUND = 2 # error; this should not happen. From 69c2b8efe501154d0811d42b3fa8fed27340aa07 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 30 Sep 2024 16:11:49 -0600 Subject: [PATCH 12/38] Fix the signature of resolve_task(). --- Lib/concurrent/futures/interpreter.py | 2 +- Lib/concurrent/futures/thread.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 8427e28322d60e..b6a2ddfb80dc1e 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -59,7 +59,7 @@ def prepare(cls, initializer, initargs, shared): initscript = None def create_context(): return cls(initscript, shared) - def resolve_task(cls, fn, args, kwargs): + def resolve_task(fn, args, kwargs): if isinstance(fn, str): if args or kwargs: raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}') diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index c887f2b4d03f50..3bcd33110b12ea 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -52,7 +52,7 @@ def prepare(cls, initializer, initargs): raise TypeError("initializer must be a callable") def create_context(): return cls(initializer, initargs) - def resolve_task(cls, fn, args, kwargs): + def resolve_task(fn, args, kwargs): return (fn, args, kwargs) return create_context, resolve_task @@ -205,7 +205,7 @@ def submit(self, fn, /, *args, **kwargs): 'interpreter shutdown') f = _base.Future() - task = self._resolve_work_item_task(f, fn, args, kwargs) + task = self._resolve_work_item_task(fn, args, kwargs) w = _WorkItem(f, task) self._work_queue.put(w) From f03c314fb643e7acc2d1181950a23702ee678fa0 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 30 Sep 2024 16:12:15 -0600 Subject: [PATCH 13/38] Capture any uncaught exception. --- Lib/concurrent/futures/interpreter.py | 115 ++++++++++++------ Lib/test/test_concurrent_futures/executor.py | 5 +- .../test_interpreter_pool.py | 3 - 3 files changed, 77 insertions(+), 46 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index b6a2ddfb80dc1e..c7099f4329a96b 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -1,6 +1,8 @@ """Implements InterpreterPoolExecutor.""" +import contextlib import pickle +import textwrap from . import thread as _thread import _interpreters import _interpqueues @@ -41,49 +43,64 @@ class WorkerContext(_thread.WorkerContext): @classmethod def prepare(cls, initializer, initargs, shared): - if isinstance(initializer, str): - if initargs: - raise ValueError(f'an initializer script does not take args, got {initargs!r}') - initscript = initializer - # Make sure the script compiles. - # XXX Keep the compiled code object? - compile(initscript, '', 'exec') - elif initializer is not None: - pickled = pickle.dumps((initializer, initargs)) - initscript = f'''if True: - import pickle - initializer, initargs = pickle.loads({pickled!r}) - initializer(*initargs) - ''' - else: - initscript = None - def create_context(): - return cls(initscript, shared) def resolve_task(fn, args, kwargs): if isinstance(fn, str): if args or kwargs: raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}') data = fn kind = 'script' + # Make sure the script compiles. + # XXX Keep the compiled code object? + compile(fn, '', 'exec') else: data = pickle.dumps((fn, args, kwargs)) kind = 'function' return (data, kind) + + if isinstance(initializer, str): + if initargs: + raise ValueError(f'an initializer script does not take args, got {initargs!r}') + if initializer is not None: + initdata = resolve_task(initializer, initargs, {}) + else: + initdata = None + def create_context(): + return cls(initdata, shared) return create_context, resolve_task @classmethod - def _run_pickled_func(cls, data, resultsid): - fn, args, kwargs = pickle.loads(data) - res = fn(*args, **kwargs) - # Send the result back. + @contextlib.contextmanager + def _capture_exc(cls, resultsid): + try: + yield + except Exception as exc: + err = pickle.dumps(exc) + _interpqueues.put(resultsid, (None, err), 1, UNBOUND) + else: + _interpqueues.put(resultsid, (None, None), 0, UNBOUND) + + @classmethod + def _call(cls, func, args, kwargs, resultsid): try: - _interpqueues.put(resultsid, res, 0, UNBOUND) - except _interpreters.NotShareableError: - res = pickle.dumps(res) - _interpqueues.put(resultsid, res, 1, UNBOUND) + res = func(*args, **kwargs) + except Exception as exc: + err = pickle.dumps(exc) + _interpqueues.put(resultsid, (None, err), 1, UNBOUND) + else: + # Send the result back. + try: + _interpqueues.put(resultsid, (res, None), 0, UNBOUND) + except _interpreters.NotShareableError: + res = pickle.dumps(res) + _interpqueues.put(resultsid, (res, None), 1, UNBOUND) + + @classmethod + def _call_pickled(cls, pickled, resultsid): + fn, args, kwargs = pickle.loads(pickled) + cls._call(fn, args, kwargs, resultsid) - def __init__(self, initscript, shared=None): - self.initscript = initscript or '' + def __init__(self, initdata, shared=None): + self.initdata = initdata self.shared = dict(shared) if shared else None self.interpid = None self.resultsid = None @@ -104,19 +121,21 @@ def initialize(self): try: _interpreters.incref(self.interpid) + maxsize = 0 + fmt = 0 + self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND) + initscript = f"""if True: from {__name__} import WorkerContext """ - initscript += LINESEP + self.initscript self._exec(initscript) if self.shared: _interpreters.set___main___attrs( self.interpid, self.shared, restrict=True) - maxsize = 0 - fmt = 0 - self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND) + if self.initdata: + self.run(self.initdata) except _interpreters.InterpreterNotFoundError: raise # re-raise except BaseException: @@ -142,16 +161,34 @@ def finalize(self): def run(self, task): data, kind = task if kind == 'script': - self._exec(data) - return None + script = textwrap.dedent(f""" + with WorkerContext._capture_exc({self.resultsid}): + {{}}""") + script = script.format(textwrap.indent(data, ' ')) elif kind == 'function': - self._exec( - f'WorkerContext._run_pickled_func({data!r}, {self.resultsid})') - obj, pickled, unboundop = _interpqueues.get(self.resultsid) - assert unboundop is None, unboundop - return pickle.loads(obj) if pickled else obj + script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})' else: raise NotImplementedError(kind) + self._exec(script) + + # Return the result, or raise the exception. + while True: + try: + obj = _interpqueues.get(self.resultsid) + except _interpqueues.QueueNotFoundError: + raise # re-raise + except _interpqueues.QueueError: + continue + else: + break + (res, exc), pickled, unboundop = obj + assert unboundop is None, unboundop + if exc is not None: + assert res is None, res + assert pickled + exc = pickle.loads(exc) + raise exc from exc + return pickle.loads(res) if pickled else res class InterpreterPoolExecutor(_thread.ThreadPoolExecutor): diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 2a024ff63bae08..b97d9ffd94b1f8 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -24,9 +24,6 @@ def make_dummy_object(_): class ExecutorTest: - def assertTaskRaises(self, exctype): - return self.assertRaises(exctype) - # Executor.shutdown() and context manager usage is tested by # ExecutorShutdownTest. def test_submit(self): @@ -56,7 +53,7 @@ def test_map_exception(self): i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) self.assertEqual(i.__next__(), (0, 1)) self.assertEqual(i.__next__(), (0, 1)) - with self.assertTaskRaises(ZeroDivisionError): + with self.assertRaises(ZeroDivisionError): i.__next__() @support.requires_resource('walltime') diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index 529ccf736c8f76..fea79e74fdea90 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -35,9 +35,6 @@ def pipe(self): self.addCleanup(lambda: os.close(w)) return r, w - def assertTaskRaises(self, exctype): - return self.assertRaisesRegex(ExecutionFailed, exctype.__name__) - def test_init_script(self): msg1 = b'step: init' msg2 = b'step: run' From 4806d9f469cbd2d5a1a42fb43fd32cf7b07e4fce Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 30 Sep 2024 16:16:31 -0600 Subject: [PATCH 14/38] Add TODO comments. --- Lib/concurrent/futures/interpreter.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index c7099f4329a96b..fcd366cc691451 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -53,6 +53,7 @@ def resolve_task(fn, args, kwargs): # XXX Keep the compiled code object? compile(fn, '', 'exec') else: + # XXX This does not work if fn comes from the __main__ module. data = pickle.dumps((fn, args, kwargs)) kind = 'function' return (data, kind) @@ -177,6 +178,8 @@ def run(self, task): obj = _interpqueues.get(self.resultsid) except _interpqueues.QueueNotFoundError: raise # re-raise + # XXX This breaks if test.support.interpreters.queues + # doesn't exist. except _interpqueues.QueueError: continue else: From efc03956b5a3d2b519496fa58858d5c4153c0950 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 30 Sep 2024 16:30:32 -0600 Subject: [PATCH 15/38] Docs fixes. --- Doc/library/concurrent.futures.rst | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index e37d1b3212a8c6..aba794e3b8601b 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -64,7 +64,7 @@ Executor Objects setting *chunksize* to a positive integer. For very long iterables, using a large value for *chunksize* can significantly improve performance compared to the default size of 1. With - :class:`ThreadPoolExecutor` and :class:`ThreadPoolExecutor`, + :class:`ThreadPoolExecutor` and :class:`InterpreterPoolExecutor`, *chunksize* has no effect. .. versionchanged:: 3.5 @@ -236,9 +236,9 @@ The :class:`InterpreterPoolExecutor` class is a :class:`ThreadPoolExecutor` subclass that uses a pool of isolated interpreters to execute calls asynchronously. Each interpreter has its own GIL, which allows the executor to side-step the :term:`Global Interpreter Lock -`. Interpreters mostly can't share objects -between them, which means that, in most cases, only picklable objects -can be executed and returned. +`, allowing the use of multiple cores. +Interpreters mostly can't share objects between them, which means that, +in most cases, only picklable objects can be executed and returned. .. class:: InterpreterPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), shared=None) @@ -249,12 +249,12 @@ can be executed and returned. *initializer* and *initargs* are the same as with :class:`ThreadPoolExecutor`, though they are pickled like with :class:`ProcessPoolExecutor`. Additionally, you can pass a script - (:class:`str`) for *initiazer*, which will be ``exec``ed in the + (:class:`str`) for *initiazer*, which will be executed in the interpreter's ``__main__`` module. In that case, *initargs* must not be passed in. - Similarly you can pass a script to :meth:`Executor.submit()`, which - will be ``exec``ed in the interpreter's ``__main__`` module. In that + Similarly you can pass a script to :meth:`Executor.submit`, which + will be executed in the interpreter's ``__main__`` module. In that case no arguments may be provided and the return value is always ``None``. @@ -274,7 +274,7 @@ can be executed and returned. The other caveats that apply to :class:`ThreadPoolExecutor` apply here. - .. versionadded:: 3.14 + .. versionadded:: next ProcessPoolExecutor @@ -633,7 +633,7 @@ Exception classes :meth:`~concurrent.futures.Executor.submit` when there's an uncaught exception from the submitted task. - .. versionadded:: 3.14 + .. versionadded:: next .. currentmodule:: concurrent.futures.process From a29aee3d7ebf8ca4dee811dae4c9258c7e87887e Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 30 Sep 2024 17:39:59 -0600 Subject: [PATCH 16/38] Automatically apply textwrap.dedent() to scripts. --- Doc/library/concurrent.futures.rst | 6 +++++- Lib/concurrent/futures/interpreter.py | 16 ++++++---------- .../test_interpreter_pool.py | 8 ++++---- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index aba794e3b8601b..3b43e4c82dfac8 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -253,11 +253,15 @@ in most cases, only picklable objects can be executed and returned. interpreter's ``__main__`` module. In that case, *initargs* must not be passed in. - Similarly you can pass a script to :meth:`Executor.submit`, which + Similarly you can pass a script to :meth:`~Executor.submit`, which will be executed in the interpreter's ``__main__`` module. In that case no arguments may be provided and the return value is always ``None``. + For both *initializer* and :meth:`~Executor.submit`, if a script + is passed in then it will automatically have :func:`textwrap.dedent` + applied to it. That means you don't have to. + :meth:`InterpreterPoolExecutor ` does *not* support passing in a script. diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index fcd366cc691451..bb9ffab231dbb7 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -47,11 +47,11 @@ def resolve_task(fn, args, kwargs): if isinstance(fn, str): if args or kwargs: raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}') - data = fn + data = textwrap.dedent(fn) kind = 'script' # Make sure the script compiles. # XXX Keep the compiled code object? - compile(fn, '', 'exec') + compile(data, '', 'exec') else: # XXX This does not work if fn comes from the __main__ module. data = pickle.dumps((fn, args, kwargs)) @@ -126,10 +126,7 @@ def initialize(self): fmt = 0 self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND) - initscript = f"""if True: - from {__name__} import WorkerContext - """ - self._exec(initscript) + self._exec(f'from {__name__} import WorkerContext') if self.shared: _interpreters.set___main___attrs( @@ -162,10 +159,9 @@ def finalize(self): def run(self, task): data, kind = task if kind == 'script': - script = textwrap.dedent(f""" - with WorkerContext._capture_exc({self.resultsid}): - {{}}""") - script = script.format(textwrap.indent(data, ' ')) + script = f""" +with WorkerContext._capture_exc({self.resultsid}): +{textwrap.indent(data, ' ')}""" elif kind == 'function': script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})' else: diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index fea79e74fdea90..b8a8ad4b6afc8a 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -39,12 +39,12 @@ def test_init_script(self): msg1 = b'step: init' msg2 = b'step: run' r, w = self.pipe() - initscript = f"""if True: + initscript = f""" import os msg = {msg2!r} os.write({w}, {msg1!r} + b'\\0') """ - script = f"""if True: + script = f""" os.write({w}, msg + b'\\0') """ os.write(w, b'\0') @@ -106,7 +106,7 @@ def initializer(self): def test_init_shared(self): msg = b'eggs' r, w = self.pipe() - script = f"""if True: + script = f""" import os os.write({w}, spam + b'\\0') """ @@ -121,7 +121,7 @@ def test_init_shared(self): def test_submit_script(self): msg = b'spam' r, w = self.pipe() - script = f"""if True: + script = f""" import os os.write({w}, __name__.encode('utf-8') + b'\\0') """ From 8bab4576c97e7ca70c96d707e13d8d00d2f9516c Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 30 Sep 2024 17:49:55 -0600 Subject: [PATCH 17/38] Fix the WASI build. --- Lib/concurrent/futures/__init__.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Lib/concurrent/futures/__init__.py b/Lib/concurrent/futures/__init__.py index 5a6f7d2d9a8d6a..c31dea4f7624b5 100644 --- a/Lib/concurrent/futures/__init__.py +++ b/Lib/concurrent/futures/__init__.py @@ -40,7 +40,7 @@ def __dir__(): def __getattr__(name): - global ProcessPoolExecutor, ThreadPoolExecutor + global ProcessPoolExecutor, ThreadPoolExecutor, InterpreterPoolExecutor if name == 'ProcessPoolExecutor': from .process import ProcessPoolExecutor as pe @@ -53,8 +53,12 @@ def __getattr__(name): return te if name == 'InterpreterPoolExecutor': - from .interpreter import InterpreterPoolExecutor as ie - InterpreterPoolExecutor = ie + try: + from .interpreter import InterpreterPoolExecutor as ie + except ModuleNotFoundError: + InterpreterPoolExecutor = None + else: + InterpreterPoolExecutor = ie return ie raise AttributeError(f"module {__name__!r} has no attribute {name!r}") From cd29914577b48124d4cafdd3709969b243e0402b Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 1 Oct 2024 10:16:13 -0600 Subject: [PATCH 18/38] wasi --- Lib/concurrent/futures/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/__init__.py b/Lib/concurrent/futures/__init__.py index c31dea4f7624b5..7ada7431c1ab8c 100644 --- a/Lib/concurrent/futures/__init__.py +++ b/Lib/concurrent/futures/__init__.py @@ -56,7 +56,7 @@ def __getattr__(name): try: from .interpreter import InterpreterPoolExecutor as ie except ModuleNotFoundError: - InterpreterPoolExecutor = None + ie = InterpreterPoolExecutor = None else: InterpreterPoolExecutor = ie return ie From 0287f3be713f0deca73b9541035a196f8babe9b3 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 1 Oct 2024 10:25:17 -0600 Subject: [PATCH 19/38] Ignore race in test. --- Lib/test/test_concurrent_futures/test_interpreter_pool.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index b8a8ad4b6afc8a..37277999baa1cf 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -53,14 +53,11 @@ def test_init_script(self): before_init = os.read(r, 100) fut = executor.submit(script) after_init = read_msg(r) - write_msg(w, b'') - before_run = read_msg(r) fut.result() after_run = read_msg(r) self.assertEqual(before_init, b'\0') self.assertEqual(after_init, msg1) - self.assertEqual(before_run, b'') self.assertEqual(after_run, msg2) def test_init_script_args(self): From 80cd7b1746bc9f3b9b8199adaa7f2119f769ae1e Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 11:45:45 -0600 Subject: [PATCH 20/38] Add BrokenInterpreterPool. --- Doc/library/concurrent.futures.rst | 9 +++++++++ Lib/concurrent/futures/interpreter.py | 8 ++++++++ Lib/concurrent/futures/thread.py | 6 ++++-- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 3b43e4c82dfac8..643bc85e2d9055 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -630,6 +630,15 @@ Exception classes .. currentmodule:: concurrent.futures.interpreter +.. exception:: BrokenInterpreterPool + + Derived from :exc:`~concurrent.futures.thread.BrokenThreadPool`, + this exception class is raised when one of the workers + of a :class:`~concurrent.futures.InterpreterPoolExecutor` + has failed initializing. + + .. versionadded:: next + .. exception:: ExecutionFailed Raised from :class:`~concurrent.futures.InterpreterPoolExecutor` when diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index bb9ffab231dbb7..c05a4ce7a2e627 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -190,8 +190,16 @@ def run(self, task): return pickle.loads(res) if pickled else res +class BrokenInterpreterPool(_thread.BrokenThreadPool): + """ + Raised when a worker thread in an InterpreterPoolExecutor failed initializing. + """ + + class InterpreterPoolExecutor(_thread.ThreadPoolExecutor): + BROKEN = BrokenInterpreterPool + @classmethod def prepare_context(cls, initializer, initargs, shared): return WorkerContext.prepare(initializer, initargs, shared) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 3bcd33110b12ea..16cc5533d429ef 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -148,6 +148,8 @@ class BrokenThreadPool(_base.BrokenExecutor): class ThreadPoolExecutor(_base.Executor): + BROKEN = BrokenThreadPool + # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ @@ -196,7 +198,7 @@ def __init__(self, max_workers=None, thread_name_prefix='', def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock, _global_shutdown_lock: if self._broken: - raise BrokenThreadPool(self._broken) + raise self.BROKEN(self._broken) if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') @@ -246,7 +248,7 @@ def _initializer_failed(self): except queue.Empty: break if work_item is not None: - work_item.future.set_exception(BrokenThreadPool(self._broken)) + work_item.future.set_exception(self.BROKEN(self._broken)) def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: From f8d427304ff0d3651e72c0bf5fb48a1d9d370735 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 12:00:41 -0600 Subject: [PATCH 21/38] Tweak the docs. --- Doc/library/concurrent.futures.rst | 37 +++++++++++++++--------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 643bc85e2d9055..10c55b1fd22b6b 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -17,7 +17,7 @@ asynchronously executing callables. The asynchronous execution can be performed with threads, using :class:`ThreadPoolExecutor` or :class:`InterpreterPoolExecutor`, or separate processes, using :class:`ProcessPoolExecutor`. -Both implement the same interface, which is defined +Each implements the same interface, which is defined by the abstract :class:`Executor` class. .. include:: ../includes/wasm-notavail.rst @@ -234,17 +234,17 @@ InterpreterPoolExecutor The :class:`InterpreterPoolExecutor` class is a :class:`ThreadPoolExecutor` subclass that uses a pool of isolated interpreters to execute calls -asynchronously. Each interpreter has its own GIL, which allows the -executor to side-step the :term:`Global Interpreter Lock -`, allowing the use of multiple cores. -Interpreters mostly can't share objects between them, which means that, -in most cases, only picklable objects can be executed and returned. +asynchronously. Each interpreter is isolated from the others and thus +can side-step the :term:`Global Interpreter Lock `, +allowing the use of multiple cores. Interpreters mostly can't share +objects between them, which means that, in most cases, only picklable +objects can be executed and returned. .. class:: InterpreterPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), shared=None) A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously - using a pool of at most *max_workers* interpreters. Each interpreter - runs tasks in its own thread. + using a pool of at most *max_workers* threads. Each thread runs + tasks in its own interpreter. *initializer* and *initargs* are the same as with :class:`ThreadPoolExecutor`, though they are pickled like with @@ -256,25 +256,26 @@ in most cases, only picklable objects can be executed and returned. Similarly you can pass a script to :meth:`~Executor.submit`, which will be executed in the interpreter's ``__main__`` module. In that case no arguments may be provided and the return value is always - ``None``. + ``None``. Functions (and arguments) are pickled like we do with + the initializer. For both *initializer* and :meth:`~Executor.submit`, if a script is passed in then it will automatically have :func:`textwrap.dedent` - applied to it. That means you don't have to. + applied to it. That means you don't have to do so. - :meth:`InterpreterPoolExecutor ` does *not* support - passing in a script. + :meth:`~Executor.map` does *not* support passing in a script. In each of those cases, an uncaught exception from the initializer - or task is raised as an - :class:`~concurrent.futures.interpreter.ExecutionFailed` exception, - rather than the uncaught exception itself. + or task might not be suitable to send between interpreters, to be + raised as is. In tha case, an + :class:`~concurrent.futures.interpreter.ExecutionFailed` exception + is raised instead which contains a summary of the original exception. *shared* is an optional dict of objects shared by all interpreters in the pool. The items are added to each interpreter's ``__main__`` - module. Shareable objects include the builtin singletons, :class:`str` - and :class:`bytes`, and :class:`memoryview`. See :pep:`734` - for more info. + module. Not all objects are shareable. Those that are include + the builtin singletons, :class:`str` and :class:`bytes`, + and :class:`memoryview`. See :pep:`734` for more info. The other caveats that apply to :class:`ThreadPoolExecutor` apply here. From 3a8bfcef5567c54ada62f6c6d712d7557ef43143 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 12:37:12 -0600 Subject: [PATCH 22/38] Clarify the InterpreterPoolExecutor docs. --- Doc/library/concurrent.futures.rst | 44 ++++++++++++++---------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 10c55b1fd22b6b..d262b57e81fafa 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -246,30 +246,10 @@ objects can be executed and returned. using a pool of at most *max_workers* threads. Each thread runs tasks in its own interpreter. - *initializer* and *initargs* are the same as with - :class:`ThreadPoolExecutor`, though they are pickled like with - :class:`ProcessPoolExecutor`. Additionally, you can pass a script - (:class:`str`) for *initiazer*, which will be executed in the - interpreter's ``__main__`` module. In that case, *initargs* must - not be passed in. - - Similarly you can pass a script to :meth:`~Executor.submit`, which - will be executed in the interpreter's ``__main__`` module. In that - case no arguments may be provided and the return value is always - ``None``. Functions (and arguments) are pickled like we do with - the initializer. - - For both *initializer* and :meth:`~Executor.submit`, if a script - is passed in then it will automatically have :func:`textwrap.dedent` - applied to it. That means you don't have to do so. - - :meth:`~Executor.map` does *not* support passing in a script. - - In each of those cases, an uncaught exception from the initializer - or task might not be suitable to send between interpreters, to be - raised as is. In tha case, an - :class:`~concurrent.futures.interpreter.ExecutionFailed` exception - is raised instead which contains a summary of the original exception. + *initializer* may be a callable and *initargs* a tuple of arguments, + just like with :class:`ThreadPoolExecutor`. However, they are pickled + like with :class:`ProcessPoolExecutor`. Likewise, functions (and + arguments) passed to :meth:`~Executor.submit` are pickled. *shared* is an optional dict of objects shared by all interpreters in the pool. The items are added to each interpreter's ``__main__`` @@ -277,6 +257,22 @@ objects can be executed and returned. the builtin singletons, :class:`str` and :class:`bytes`, and :class:`memoryview`. See :pep:`734` for more info. + You can also pass a script (:class:`str`) for *initiazer* or to + :meth:`~Executor.submit` (but not to :meth:`~Executor.map`). + In both cases, the script will be executed in the interpreter's + ``__main__`` module. The executor will automatically apply + :func:`textwrap.dedent` to the script, so you don't have to do so. + With a script, arguments must not be passed in. + For :meth:`!Executor.submit`, the return value for a script + is always ``None``. + + Normally an uncaught exception from *initializer* or from a submitted + task will be sent back between interpreters to be raised as is. + However, in some situations the exception might not be suitable to be + sent back. In that case, the executor raises an + :class:`~concurrent.futures.interpreter.ExecutionFailed` exception + instead, which contains a summary of the original exception. + The other caveats that apply to :class:`ThreadPoolExecutor` apply here. .. versionadded:: next From af6c27a962b4750659fcedad8069b42452fe51d0 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 12:37:47 -0600 Subject: [PATCH 23/38] Catch all exceptions. --- Lib/concurrent/futures/interpreter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index c05a4ce7a2e627..36da84deac80c2 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -74,7 +74,7 @@ def create_context(): def _capture_exc(cls, resultsid): try: yield - except Exception as exc: + except BaseException as exc: err = pickle.dumps(exc) _interpqueues.put(resultsid, (None, err), 1, UNBOUND) else: @@ -84,7 +84,7 @@ def _capture_exc(cls, resultsid): def _call(cls, func, args, kwargs, resultsid): try: res = func(*args, **kwargs) - except Exception as exc: + except BaseException as exc: err = pickle.dumps(exc) _interpqueues.put(resultsid, (None, err), 1, UNBOUND) else: From 8c0a4054b287cb65a8674da751b3cbf4c6d7dd2f Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 12:52:22 -0600 Subject: [PATCH 24/38] Factor out exception serialization helpers. --- Lib/concurrent/futures/interpreter.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 36da84deac80c2..6c962bd52a445b 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -36,6 +36,15 @@ def __str__(self): """.strip()) +def _serialize_exception(exc): + # XXX Capture the traceback too. + return pickle.dumps(exc) + + +def _deserialize_exception(data): + return pickle.loads(data) + + UNBOUND = 2 # error; this should not happen. @@ -75,7 +84,7 @@ def _capture_exc(cls, resultsid): try: yield except BaseException as exc: - err = pickle.dumps(exc) + err = _serialize_exception(exc) _interpqueues.put(resultsid, (None, err), 1, UNBOUND) else: _interpqueues.put(resultsid, (None, None), 0, UNBOUND) @@ -85,7 +94,7 @@ def _call(cls, func, args, kwargs, resultsid): try: res = func(*args, **kwargs) except BaseException as exc: - err = pickle.dumps(exc) + err = _serialize_exception(exc) _interpqueues.put(resultsid, (None, err), 1, UNBOUND) else: # Send the result back. @@ -185,7 +194,7 @@ def run(self, task): if exc is not None: assert res is None, res assert pickled - exc = pickle.loads(exc) + exc = _deserialize_exception(exc) raise exc from exc return pickle.loads(res) if pickled else res From 1ae7ca298e32ed125937f0643ddb35f734df36a6 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 13:23:27 -0600 Subject: [PATCH 25/38] Set the ExecutionFailed error as __cause__. --- Lib/concurrent/futures/interpreter.py | 47 ++++++++------ .../test_interpreter_pool.py | 64 ++++++++++++++++++- 2 files changed, 91 insertions(+), 20 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 6c962bd52a445b..1917f05da203ec 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -84,25 +84,26 @@ def _capture_exc(cls, resultsid): try: yield except BaseException as exc: + # Send the captured exception out on the results queue, + # but still leave it unhandled for the interpreter to handle. err = _serialize_exception(exc) _interpqueues.put(resultsid, (None, err), 1, UNBOUND) - else: - _interpqueues.put(resultsid, (None, None), 0, UNBOUND) + raise # re-raise + + @classmethod + def _send_script_result(cls, resultsid): + _interpqueues.put(resultsid, (None, None), 0, UNBOUND) @classmethod def _call(cls, func, args, kwargs, resultsid): + with cls._capture_exc(resultsid): + res = func(*args or (), **kwargs or {}) + # Send the result back. try: - res = func(*args, **kwargs) - except BaseException as exc: - err = _serialize_exception(exc) - _interpqueues.put(resultsid, (None, err), 1, UNBOUND) - else: - # Send the result back. - try: - _interpqueues.put(resultsid, (res, None), 0, UNBOUND) - except _interpreters.NotShareableError: - res = pickle.dumps(res) - _interpqueues.put(resultsid, (res, None), 1, UNBOUND) + _interpqueues.put(resultsid, (res, None), 0, UNBOUND) + except _interpreters.NotShareableError: + res = pickle.dumps(res) + _interpqueues.put(resultsid, (res, None), 1, UNBOUND) @classmethod def _call_pickled(cls, pickled, resultsid): @@ -170,12 +171,19 @@ def run(self, task): if kind == 'script': script = f""" with WorkerContext._capture_exc({self.resultsid}): -{textwrap.indent(data, ' ')}""" +{textwrap.indent(data, ' ')} +WorkerContext._send_script_result({self.resultsid})""" elif kind == 'function': script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})' else: raise NotImplementedError(kind) - self._exec(script) + + try: + self._exec(script) + except ExecutionFailed as exc: + exc_wrapper = exc + else: + exc_wrapper = None # Return the result, or raise the exception. while True: @@ -189,13 +197,14 @@ def run(self, task): continue else: break - (res, exc), pickled, unboundop = obj + (res, excdata), pickled, unboundop = obj assert unboundop is None, unboundop - if exc is not None: + if excdata is not None: assert res is None, res assert pickled - exc = _deserialize_exception(exc) - raise exc from exc + assert exc_wrapper is not None + exc = _deserialize_exception(excdata) + raise exc from exc_wrapper return pickle.loads(res) if pickled else res diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index 37277999baa1cf..79a90a4c462405 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -1,9 +1,12 @@ import contextlib +import io import os import pickle import sys import unittest -from concurrent.futures.interpreter import ExecutionFailed +from concurrent.futures.interpreter import ( + ExecutionFailed, BrokenInterpreterPool, +) from test import support from test.support.interpreters import queues @@ -27,6 +30,10 @@ def get_current_name(): return __name__ +def fail(exctype, msg=None): + raise exctype(msg) + + class InterpreterPoolExecutorTest(InterpreterPoolMixin, ExecutorTest, BaseTestCase): def pipe(self): @@ -115,6 +122,34 @@ def test_init_shared(self): self.assertEqual(after, msg) + def test_init_exception(self): + with self.subTest('script'): + executor = self.executor_type(initializer='raise Exception("spam")') + with executor: + with contextlib.redirect_stderr(io.StringIO()) as stderr: + fut = executor.submit('pass') + with self.assertRaises(BrokenInterpreterPool): + fut.result() + stderr = stderr.getvalue() + self.assertIn('ExecutionFailed: Exception: spam', stderr) + self.assertIn('Uncaught in the interpreter:', stderr) + self.assertIn('The above exception was the direct cause of the following exception:', + stderr) + + with self.subTest('func'): + executor = self.executor_type(initializer=fail, + initargs=(Exception, 'spam')) + with executor: + with contextlib.redirect_stderr(io.StringIO()) as stderr: + fut = executor.submit('pass') + with self.assertRaises(BrokenInterpreterPool): + fut.result() + stderr = stderr.getvalue() + self.assertIn('ExecutionFailed: Exception: spam', stderr) + self.assertIn('Uncaught in the interpreter:', stderr) + self.assertIn('The above exception was the direct cause of the following exception:', + stderr) + def test_submit_script(self): msg = b'spam' r, w = self.pipe() @@ -173,6 +208,33 @@ def test_submit_func_globals(self): self.assertEqual(name, __name__) self.assertNotEqual(name, '__main__') + def test_submit_exception(self): + with self.subTest('script'): + fut = self.executor.submit('raise Exception("spam")') + with self.assertRaises(Exception) as captured: + fut.result() + self.assertIs(type(captured.exception), Exception) + self.assertEqual(str(captured.exception), 'spam') + cause = captured.exception.__cause__ + self.assertIs(type(cause), ExecutionFailed) + for attr in ('__name__', '__qualname__', '__module__'): + self.assertEqual(getattr(cause.excinfo.type, attr), + getattr(Exception, attr)) + self.assertEqual(cause.excinfo.msg, 'spam') + + with self.subTest('func'): + fut = self.executor.submit(fail, Exception, 'spam') + with self.assertRaises(Exception) as captured: + fut.result() + self.assertIs(type(captured.exception), Exception) + self.assertEqual(str(captured.exception), 'spam') + cause = captured.exception.__cause__ + self.assertIs(type(cause), ExecutionFailed) + for attr in ('__name__', '__qualname__', '__module__'): + self.assertEqual(getattr(cause.excinfo.type, attr), + getattr(Exception, attr)) + self.assertEqual(cause.excinfo.msg, 'spam') + def test_saturation(self): blocker = queues.create() executor = self.executor_type(4, shared=dict(blocker=blocker)) From d24e85d69a2e21669fb4d599a3cf4e87a4660184 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 14:47:37 -0600 Subject: [PATCH 26/38] Drop the exception serialization helpers. --- Lib/concurrent/futures/interpreter.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 1917f05da203ec..9607adcc1cdc51 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -36,15 +36,6 @@ def __str__(self): """.strip()) -def _serialize_exception(exc): - # XXX Capture the traceback too. - return pickle.dumps(exc) - - -def _deserialize_exception(data): - return pickle.loads(data) - - UNBOUND = 2 # error; this should not happen. @@ -86,7 +77,7 @@ def _capture_exc(cls, resultsid): except BaseException as exc: # Send the captured exception out on the results queue, # but still leave it unhandled for the interpreter to handle. - err = _serialize_exception(exc) + err = pickle.dumps(exc) _interpqueues.put(resultsid, (None, err), 1, UNBOUND) raise # re-raise @@ -203,7 +194,7 @@ def run(self, task): assert res is None, res assert pickled assert exc_wrapper is not None - exc = _deserialize_exception(excdata) + exc = pickle.loads(excdata) raise exc from exc_wrapper return pickle.loads(res) if pickled else res From 05a03ad402a9333f83d066dc3ee9ac05c1ef73f7 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 15:12:49 -0600 Subject: [PATCH 27/38] Always finalize if there is an error in initialize(). --- Lib/concurrent/futures/interpreter.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 9607adcc1cdc51..2cb83454ff269e 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -135,8 +135,6 @@ def initialize(self): if self.initdata: self.run(self.initdata) - except _interpreters.InterpreterNotFoundError: - raise # re-raise except BaseException: self.finalize() raise # re-raise From f150931a81519813eec7dd3945ed4e36676b3dc1 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 15:31:15 -0600 Subject: [PATCH 28/38] Explicitly note the problem with functions defined in __main__. --- Doc/library/concurrent.futures.rst | 4 ++++ Lib/concurrent/futures/interpreter.py | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index d262b57e81fafa..c1f2521a80c7b6 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -251,6 +251,10 @@ objects can be executed and returned. like with :class:`ProcessPoolExecutor`. Likewise, functions (and arguments) passed to :meth:`~Executor.submit` are pickled. + .. note:: + functions defined in the ``__main__`` module cannot be pickled + and thus cannot be used. + *shared* is an optional dict of objects shared by all interpreters in the pool. The items are added to each interpreter's ``__main__`` module. Not all objects are shareable. Those that are include diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 2cb83454ff269e..3e3d85e115aa0b 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -53,7 +53,9 @@ def resolve_task(fn, args, kwargs): # XXX Keep the compiled code object? compile(data, '', 'exec') else: - # XXX This does not work if fn comes from the __main__ module. + # Functions defined in the __main__ module can't be pickled, + # so they can't be used here (for now). We could possibly + # borrow from multiprocessing to work around this. data = pickle.dumps((fn, args, kwargs)) kind = 'function' return (data, kind) From 97d02924a5033f76ce0098814bc66a5d2e25e796 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 8 Oct 2024 16:01:45 -0600 Subject: [PATCH 29/38] Handle the case where interpreters.queues doesn't exist. --- Lib/concurrent/futures/interpreter.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 3e3d85e115aa0b..400f34b5ee2a80 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -182,10 +182,12 @@ def run(self, task): obj = _interpqueues.get(self.resultsid) except _interpqueues.QueueNotFoundError: raise # re-raise - # XXX This breaks if test.support.interpreters.queues - # doesn't exist. except _interpqueues.QueueError: continue + except ModuleNotFoundError: + # interpreters.queues doesn't exist, which means + # QueueEmpty doesn't. Act as though it does. + continue else: break (res, excdata), pickled, unboundop = obj From 5c3a3275b16ef654cdd77f6b971557cdbba92f26 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 15 Oct 2024 11:28:49 -0600 Subject: [PATCH 30/38] Add a What's New entry about InterpreterPoolExecutor. --- Doc/whatsnew/3.14.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 25e69a59bdec62..d7b692b60473c4 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -225,6 +225,14 @@ ast * The ``repr()`` output for AST nodes now includes more information. (Contributed by Tomas R in :gh:`116022`.) +concurrent.futures +------------------ + +* Add :class:`~concurrent.futures.InterpreterPoolExecutor`, + which exposes "subinterpreters (multiple Python interpreters in the + same process) to Python code. This is separate from the proposed API + in :pep:`734`. + (Contributed by Eric Snow in :gh:`124548`.) ctypes ------ From a2032a852603b927b91b493299e80b1fd02fa3f6 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 15 Oct 2024 11:31:16 -0600 Subject: [PATCH 31/38] Fix a typo. --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index c1f2521a80c7b6..bbcd2b9f4f59c6 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -261,7 +261,7 @@ objects can be executed and returned. the builtin singletons, :class:`str` and :class:`bytes`, and :class:`memoryview`. See :pep:`734` for more info. - You can also pass a script (:class:`str`) for *initiazer* or to + You can also pass a script (:class:`str`) for *initializer* or to :meth:`~Executor.submit` (but not to :meth:`~Executor.map`). In both cases, the script will be executed in the interpreter's ``__main__`` module. The executor will automatically apply From 54119b8b0778a0500bc3b385dc6285b8642f8ab5 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 15 Oct 2024 11:58:10 -0600 Subject: [PATCH 32/38] Fix the documented signature. --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index bbcd2b9f4f59c6..c677f1e9de83ef 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -240,7 +240,7 @@ allowing the use of multiple cores. Interpreters mostly can't share objects between them, which means that, in most cases, only picklable objects can be executed and returned. -.. class:: InterpreterPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), shared=None) +.. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None) A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously using a pool of at most *max_workers* threads. Each thread runs From 744dca7ee2ad8216777d8672613dc3889ab4a21e Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 15 Oct 2024 13:07:47 -0600 Subject: [PATCH 33/38] Test and document asyncio support. --- Doc/library/asyncio-dev.rst | 6 +- Doc/library/asyncio-eventloop.rst | 9 ++- Doc/library/asyncio-llapi-index.rst | 2 +- .../test_interpreter_pool.py | 73 ++++++++++++++++++- 4 files changed, 85 insertions(+), 5 deletions(-) diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst index a9c3a0183bb72d..44b507a9811116 100644 --- a/Doc/library/asyncio-dev.rst +++ b/Doc/library/asyncio-dev.rst @@ -103,7 +103,8 @@ To handle signals the event loop must be run in the main thread. The :meth:`loop.run_in_executor` method can be used with a -:class:`concurrent.futures.ThreadPoolExecutor` to execute +:class:`concurrent.futures.ThreadPoolExecutor` or +:class:`~concurrent.futures.InterpreterPoolExecutor` to execute blocking code in a different OS thread without blocking the OS thread that the event loop runs in. @@ -128,7 +129,8 @@ if a function performs a CPU-intensive calculation for 1 second, all concurrent asyncio Tasks and IO operations would be delayed by 1 second. -An executor can be used to run a task in a different thread or even in +An executor can be used to run a task in a different thread, +including in a different interpreter, or even in a different process to avoid blocking the OS thread with the event loop. See the :meth:`loop.run_in_executor` method for more details. diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 943683f6b8a7f6..14fd153f640f05 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -1305,6 +1305,12 @@ Executing code in thread or process pools pool, cpu_bound) print('custom process pool', result) + # 4. Run in a custom interpreter pool: + with concurrent.futures.InterpreterPoolExecutor() as pool: + result = await loop.run_in_executor( + pool, cpu_bound) + print('custom interpreter pool', result) + if __name__ == '__main__': asyncio.run(main()) @@ -1329,7 +1335,8 @@ Executing code in thread or process pools Set *executor* as the default executor used by :meth:`run_in_executor`. *executor* must be an instance of - :class:`~concurrent.futures.ThreadPoolExecutor`. + :class:`~concurrent.futures.ThreadPoolExecutor`, which includes + :class:`~concurrent.futures.InterpreterPoolExecutor`. .. versionchanged:: 3.11 *executor* must be an instance of diff --git a/Doc/library/asyncio-llapi-index.rst b/Doc/library/asyncio-llapi-index.rst index 3e21054aa4fe9e..f5af888f31f186 100644 --- a/Doc/library/asyncio-llapi-index.rst +++ b/Doc/library/asyncio-llapi-index.rst @@ -96,7 +96,7 @@ See also the main documentation section about the - Invoke a callback *at* the given time. -.. rubric:: Thread/Process Pool +.. rubric:: Thread/Interpreter/Process Pool .. list-table:: :widths: 50 50 :class: full-width-table diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index 79a90a4c462405..1e330fa78a40c3 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -1,13 +1,17 @@ +import asyncio import contextlib import io import os import pickle import sys +import time import unittest from concurrent.futures.interpreter import ( ExecutionFailed, BrokenInterpreterPool, ) +import _interpreters from test import support +import test.test_asyncio.utils as testasyncio_utils from test.support.interpreters import queues from .executor import ExecutorTest, mul @@ -34,7 +38,12 @@ def fail(exctype, msg=None): raise exctype(msg) -class InterpreterPoolExecutorTest(InterpreterPoolMixin, ExecutorTest, BaseTestCase): +def get_current_interpid(*extra): + interpid, _ = _interpreters.get_current() + return (interpid, *extra) + + +class InterpretersMixin(InterpreterPoolMixin): def pipe(self): r, w = os.pipe() @@ -42,6 +51,10 @@ def pipe(self): self.addCleanup(lambda: os.close(w)) return r, w + +class InterpreterPoolExecutorTest( + InterpretersMixin, ExecutorTest, BaseTestCase): + def test_init_script(self): msg1 = b'step: init' msg2 = b'step: run' @@ -256,6 +269,64 @@ def test_idle_thread_reuse(self): executor.shutdown(wait=True) +class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase): + + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + self.set_event_loop(self.loop) + + self.executor = self.executor_type() + self.addCleanup(lambda: self.executor.shutdown()) + + def tearDown(self): + if not self.loop.is_closed(): + testasyncio_utils.run_briefly(self.loop) + + self.doCleanups() + support.gc_collect() + super().tearDown() + + def test_run_in_executor(self): + unexpected, _ = _interpreters.get_current() + + func = get_current_interpid + fut = self.loop.run_in_executor(self.executor, func, 'yo') + interpid, res = self.loop.run_until_complete(fut) + + self.assertEqual(res, 'yo') + self.assertNotEqual(interpid, unexpected) + + def test_run_in_executor_cancel(self): + executor = self.executor_type() + + called = False + + def patched_call_soon(*args): + nonlocal called + called = True + + func = time.sleep + fut = self.loop.run_in_executor(self.executor, func, 0.05) + fut.cancel() + self.loop.run_until_complete( + self.loop.shutdown_default_executor()) + self.loop.close() + self.loop.call_soon = patched_call_soon + self.loop.call_soon_threadsafe = patched_call_soon + time.sleep(0.4) + self.assertFalse(called) + + def test_default_executor(self): + unexpected, _ = _interpreters.get_current() + + self.loop.set_default_executor(self.executor) + fut = self.loop.run_in_executor(None, get_current_interpid) + interpid, = self.loop.run_until_complete(fut) + + self.assertNotEqual(interpid, unexpected) + + def setUpModule(): setup_module() From f61d62d5c9d9dee1f473e603c0ad40ab76ffe790 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 16 Oct 2024 12:13:58 -0600 Subject: [PATCH 34/38] Apply suggestions from code review Co-authored-by: Carol Willing --- Doc/library/concurrent.futures.rst | 16 ++++++++-------- Lib/concurrent/futures/interpreter.py | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index c677f1e9de83ef..eaf5032462ad0b 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -232,12 +232,12 @@ ThreadPoolExecutor Example InterpreterPoolExecutor ----------------------- -The :class:`InterpreterPoolExecutor` class is a :class:`ThreadPoolExecutor` -subclass that uses a pool of isolated interpreters to execute calls -asynchronously. Each interpreter is isolated from the others and thus -can side-step the :term:`Global Interpreter Lock `, -allowing the use of multiple cores. Interpreters mostly can't share -objects between them, which means that, in most cases, only picklable +The :class:`InterpreterPoolExecutor` class, a :class:`ThreadPoolExecutor` +subclass, uses a pool of isolated interpreters to execute calls +asynchronously. Since each interpreter is isolated from the others, +side-stepping the :term:`Global Interpreter Lock ` , +multiple cores can be used. Since Interpreters do not share +objects between them, in most cases, only picklable objects can be executed and returned. .. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None) @@ -271,13 +271,13 @@ objects can be executed and returned. is always ``None``. Normally an uncaught exception from *initializer* or from a submitted - task will be sent back between interpreters to be raised as is. + task will be sent back to be raised as is. However, in some situations the exception might not be suitable to be sent back. In that case, the executor raises an :class:`~concurrent.futures.interpreter.ExecutionFailed` exception instead, which contains a summary of the original exception. - The other caveats that apply to :class:`ThreadPoolExecutor` apply here. + Other caveats from parent :class:`ThreadPoolExecutor` apply here. .. versionadded:: next diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 400f34b5ee2a80..6f7b40c15d461a 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -54,7 +54,7 @@ def resolve_task(fn, args, kwargs): compile(data, '', 'exec') else: # Functions defined in the __main__ module can't be pickled, - # so they can't be used here (for now). We could possibly + # so they can't be used here. In the future, we could possibly # borrow from multiprocessing to work around this. data = pickle.dumps((fn, args, kwargs)) kind = 'function' From ee65bb22e6fe180bc554a97b91f22e26c370f866 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 16 Oct 2024 15:20:11 -0600 Subject: [PATCH 35/38] Expand the docs. --- Doc/library/concurrent.futures.rst | 136 +++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 36 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index eaf5032462ad0b..60c11965a4da2d 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -232,54 +232,118 @@ ThreadPoolExecutor Example InterpreterPoolExecutor ----------------------- -The :class:`InterpreterPoolExecutor` class, a :class:`ThreadPoolExecutor` -subclass, uses a pool of isolated interpreters to execute calls -asynchronously. Since each interpreter is isolated from the others, -side-stepping the :term:`Global Interpreter Lock ` , -multiple cores can be used. Since Interpreters do not share -objects between them, in most cases, only picklable -objects can be executed and returned. +The :class:`InterpreterPoolExecutor` class uses a pool of interpreters +to execute calls asynchronously. It is a :class:`ThreadPoolExecutor` +subclass, which means each worker is running in its own thread. +The difference here is that each worker has its own interpreter, +and runs each task using that interpreter. + +The biggest benefit to using interpreters instead of only threads +is true multi-core parallelism. Each interpreter has its own +:term:`Global Interpreter Lock `, so code +running in one interpreter can run on one CPU core, while code in +another interpreter runs unblocked on a different core. + +The tradeoff is that writing concurrent code for use with multiple +interpreters can take extra effort. However, this is because it +forces you to be deliberate about how and when interpreters interact, +and to be explicit about what data is shared between interpreters. +This results in several benefits that help balance the extra effort, +including true multi-core parallelism, For example, code written +this way can make it easier to reason about concurrency. Another +major benefit is that you don't have to deal with several of the +big pain points of using threads, like nrace conditions. + +Each worker's interpreter is isolated from all the other interpreters. +"Isolated" means each interpreter has its own runtime state and +operates completely independently. For example, if you redirect +:data:`sys.stdout` in one interpreter, it will not be automatically +redirected any other interpreter. If you import a module in one +interpreter, it is not automatically imported in any other. You +would need to import the module separately in interpreter where +you need it. In fact, each module imported in an interpreter is +a completely separate object from the same module in a different +interpreter, including :module:`sys`, :module:`builtins`, +and even ``__main__``. + +Isolation means a mutable object, or other data, cannot be used +by more than one interpreter at the same time. That effectively means +interpreters cannot actually share such objects or data. Instead, +each interpreter must have its own copy, and you will have to +synchronize any changes between the copies manually. Immutable +objects and data, like the builtin singletons, strings, and tuples +of immutable objects, don't have these limitations. + +Communicating and synchronizing between interpreters is most effectively +done using dedicated tools, like those proposed in :pep:`734`. One less +efficient alternative is to serialize with :mod:`pickle` and then send +the bytes over a shared :mod:`socket ` or +:func:`pipe `. .. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None) A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously using a pool of at most *max_workers* threads. Each thread runs - tasks in its own interpreter. - - *initializer* may be a callable and *initargs* a tuple of arguments, - just like with :class:`ThreadPoolExecutor`. However, they are pickled - like with :class:`ProcessPoolExecutor`. Likewise, functions (and - arguments) passed to :meth:`~Executor.submit` are pickled. + tasks in its own interpreter. The worker interpreters are isolated + from each other, which means each has its own runtime state and that + they can't share any mutable objects or other data. Each interpreter + has its own :term:`Global Interpreter Lock `, + which means code run with this executor has true multi-core parallelism. + + The optional *initializer* and *initargs* arguments have the same + meaning as for :class:`!ThreadPoolExecutor`: the initializer is run + when each worker is created, though in this case it is run.in + the worker's interpreter. The executor serializes the *initializer* + and *initargs* using :mod:`pickle` when sending them to the worker's + interpreter. .. note:: - functions defined in the ``__main__`` module cannot be pickled + Functions defined in the ``__main__`` module cannot be pickled and thus cannot be used. - *shared* is an optional dict of objects shared by all interpreters - in the pool. The items are added to each interpreter's ``__main__`` - module. Not all objects are shareable. Those that are include - the builtin singletons, :class:`str` and :class:`bytes`, - and :class:`memoryview`. See :pep:`734` for more info. - - You can also pass a script (:class:`str`) for *initializer* or to - :meth:`~Executor.submit` (but not to :meth:`~Executor.map`). - In both cases, the script will be executed in the interpreter's - ``__main__`` module. The executor will automatically apply - :func:`textwrap.dedent` to the script, so you don't have to do so. - With a script, arguments must not be passed in. - For :meth:`!Executor.submit`, the return value for a script - is always ``None``. - - Normally an uncaught exception from *initializer* or from a submitted - task will be sent back to be raised as is. - However, in some situations the exception might not be suitable to be - sent back. In that case, the executor raises an - :class:`~concurrent.futures.interpreter.ExecutionFailed` exception - instead, which contains a summary of the original exception. + .. note:: + The executor may replace uncaught exceptions from *initializer* + with :class:`~concurrent.futures.interpreter.ExecutionFailed`. + + The *initializer* argument may also be a script (:class:`str`), + The script will be executed in the interpreter's ``__main__`` module. + The executor automatically applies :func:`textwrap.dedent` to the script. + *initargs* must not be passed in in this case. + + The optional *shared* argument is a :class:`dict` of objects that all + interpreters in the pool share. The *shared* items are added to each + interpreter's ``__main__`` module. Not all objects are shareable. + Shareable objects include the builtin singletons, :class:`str` + and :class:`bytes`, and :class:`memoryview`. See :pep:`734` + for more info. Other caveats from parent :class:`ThreadPoolExecutor` apply here. - .. versionadded:: next +:meth:`~Executor.submit` and :meth:`~Executor.map` work like normal, +except the worker serializes the callable and arguments using +:mod:`pickle` when sending them to its interpreter. The worker +likewise serializes the return value when sending it back. + +.. note:: + Functions defined in the ``__main__`` module cannot be pickled + and thus cannot be used. + +For :meth:`~Executor.submit`, but *not* :meth:`~Executor.map`, +you can also pass a script (:class:`str`) instead of a callable. +The script will be executed in the interpreter's ``__main__`` module. +The executor will automatically apply :func:`textwrap.dedent` to the +script, so you don't have to do so. With a script, arguments must +not be passed in. The return value for a script is always ``None``. + +When a worker's current task raises an uncaught exception, the worker +always tries to preserve the exception as-is. If that is successful +then it also sets the ``__cause__`` to a corresponding +:class:`~concurrent.futures.interpreter.ExecutionFailed` +instance, which contains a summary of the original exception. +In the uncommon case that the worker is not able to preserve the +original as-is then it directly preserves the corresponding +:class:`~concurrent.futures.interpreter.ExecutionFailed` +instance instead. ProcessPoolExecutor From a7f5c50e7c1032123d6816db614d34757d0472fc Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 16 Oct 2024 15:53:46 -0600 Subject: [PATCH 36/38] For now, drop support for scripts. --- Doc/library/concurrent.futures.rst | 12 -- Lib/concurrent/futures/interpreter.py | 13 +- .../test_interpreter_pool.py | 125 ++++++++++-------- 3 files changed, 77 insertions(+), 73 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 60c11965a4da2d..2e172ac30a373c 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -305,11 +305,6 @@ the bytes over a shared :mod:`socket ` or The executor may replace uncaught exceptions from *initializer* with :class:`~concurrent.futures.interpreter.ExecutionFailed`. - The *initializer* argument may also be a script (:class:`str`), - The script will be executed in the interpreter's ``__main__`` module. - The executor automatically applies :func:`textwrap.dedent` to the script. - *initargs* must not be passed in in this case. - The optional *shared* argument is a :class:`dict` of objects that all interpreters in the pool share. The *shared* items are added to each interpreter's ``__main__`` module. Not all objects are shareable. @@ -328,13 +323,6 @@ likewise serializes the return value when sending it back. Functions defined in the ``__main__`` module cannot be pickled and thus cannot be used. -For :meth:`~Executor.submit`, but *not* :meth:`~Executor.map`, -you can also pass a script (:class:`str`) instead of a callable. -The script will be executed in the interpreter's ``__main__`` module. -The executor will automatically apply :func:`textwrap.dedent` to the -script, so you don't have to do so. With a script, arguments must -not be passed in. The return value for a script is always ``None``. - When a worker's current task raises an uncaught exception, the worker always tries to preserve the exception as-is. If that is successful then it also sets the ``__cause__`` to a corresponding diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 6f7b40c15d461a..3c991b22d16dc5 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -45,6 +45,8 @@ class WorkerContext(_thread.WorkerContext): def prepare(cls, initializer, initargs, shared): def resolve_task(fn, args, kwargs): if isinstance(fn, str): + # XXX Circle back to this later. + raise TypeError('scripts not supported') if args or kwargs: raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}') data = textwrap.dedent(fn) @@ -60,11 +62,13 @@ def resolve_task(fn, args, kwargs): kind = 'function' return (data, kind) - if isinstance(initializer, str): - if initargs: - raise ValueError(f'an initializer script does not take args, got {initargs!r}') if initializer is not None: - initdata = resolve_task(initializer, initargs, {}) + try: + initdata = resolve_task(initializer, initargs, {}) + except ValueError: + if isinstance(initializer, str) and initargs: + raise ValueError(f'an initializer script does not take args, got {initargs!r}') + raise # re-raise else: initdata = None def create_context(): @@ -160,6 +164,7 @@ def finalize(self): def run(self, task): data, kind = task if kind == 'script': + raise NotImplementedError('script kind disabled') script = f""" with WorkerContext._capture_exc({self.resultsid}): {textwrap.indent(data, ' ')} diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index 1e330fa78a40c3..0de03c0d669399 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -18,6 +18,10 @@ from .util import BaseTestCase, InterpreterPoolMixin, setup_module +def noop(): + pass + + def write_msg(fd, msg): os.write(fd, msg + b'\0') @@ -55,6 +59,7 @@ def pipe(self): class InterpreterPoolExecutorTest( InterpretersMixin, ExecutorTest, BaseTestCase): + @unittest.expectedFailure def test_init_script(self): msg1 = b'step: init' msg2 = b'step: run' @@ -80,6 +85,7 @@ def test_init_script(self): self.assertEqual(after_init, msg1) self.assertEqual(after_run, msg2) + @unittest.expectedFailure def test_init_script_args(self): with self.assertRaises(ValueError): self.executor_type(initializer='pass', initargs=('spam',)) @@ -123,46 +129,50 @@ def initializer(self): def test_init_shared(self): msg = b'eggs' r, w = self.pipe() - script = f""" + script = f"""if True: import os + if __name__ != '__main__': + import __main__ + spam = __main__.spam os.write({w}, spam + b'\\0') """ executor = self.executor_type(shared={'spam': msg}) - fut = executor.submit(script) + fut = executor.submit(exec, script) fut.result() after = read_msg(r) self.assertEqual(after, msg) - def test_init_exception(self): - with self.subTest('script'): - executor = self.executor_type(initializer='raise Exception("spam")') - with executor: - with contextlib.redirect_stderr(io.StringIO()) as stderr: - fut = executor.submit('pass') - with self.assertRaises(BrokenInterpreterPool): - fut.result() - stderr = stderr.getvalue() - self.assertIn('ExecutionFailed: Exception: spam', stderr) - self.assertIn('Uncaught in the interpreter:', stderr) - self.assertIn('The above exception was the direct cause of the following exception:', - stderr) - - with self.subTest('func'): - executor = self.executor_type(initializer=fail, - initargs=(Exception, 'spam')) - with executor: - with contextlib.redirect_stderr(io.StringIO()) as stderr: - fut = executor.submit('pass') - with self.assertRaises(BrokenInterpreterPool): - fut.result() - stderr = stderr.getvalue() - self.assertIn('ExecutionFailed: Exception: spam', stderr) - self.assertIn('Uncaught in the interpreter:', stderr) - self.assertIn('The above exception was the direct cause of the following exception:', - stderr) - + @unittest.expectedFailure + def test_init_exception_in_script(self): + executor = self.executor_type(initializer='raise Exception("spam")') + with executor: + with contextlib.redirect_stderr(io.StringIO()) as stderr: + fut = executor.submit('pass') + with self.assertRaises(BrokenInterpreterPool): + fut.result() + stderr = stderr.getvalue() + self.assertIn('ExecutionFailed: Exception: spam', stderr) + self.assertIn('Uncaught in the interpreter:', stderr) + self.assertIn('The above exception was the direct cause of the following exception:', + stderr) + + def test_init_exception_in_func(self): + executor = self.executor_type(initializer=fail, + initargs=(Exception, 'spam')) + with executor: + with contextlib.redirect_stderr(io.StringIO()) as stderr: + fut = executor.submit(noop) + with self.assertRaises(BrokenInterpreterPool): + fut.result() + stderr = stderr.getvalue() + self.assertIn('ExecutionFailed: Exception: spam', stderr) + self.assertIn('Uncaught in the interpreter:', stderr) + self.assertIn('The above exception was the direct cause of the following exception:', + stderr) + + @unittest.expectedFailure def test_submit_script(self): msg = b'spam' r, w = self.pipe() @@ -221,39 +231,40 @@ def test_submit_func_globals(self): self.assertEqual(name, __name__) self.assertNotEqual(name, '__main__') - def test_submit_exception(self): - with self.subTest('script'): - fut = self.executor.submit('raise Exception("spam")') - with self.assertRaises(Exception) as captured: - fut.result() - self.assertIs(type(captured.exception), Exception) - self.assertEqual(str(captured.exception), 'spam') - cause = captured.exception.__cause__ - self.assertIs(type(cause), ExecutionFailed) - for attr in ('__name__', '__qualname__', '__module__'): - self.assertEqual(getattr(cause.excinfo.type, attr), - getattr(Exception, attr)) - self.assertEqual(cause.excinfo.msg, 'spam') - - with self.subTest('func'): - fut = self.executor.submit(fail, Exception, 'spam') - with self.assertRaises(Exception) as captured: - fut.result() - self.assertIs(type(captured.exception), Exception) - self.assertEqual(str(captured.exception), 'spam') - cause = captured.exception.__cause__ - self.assertIs(type(cause), ExecutionFailed) - for attr in ('__name__', '__qualname__', '__module__'): - self.assertEqual(getattr(cause.excinfo.type, attr), - getattr(Exception, attr)) - self.assertEqual(cause.excinfo.msg, 'spam') + @unittest.expectedFailure + def test_submit_exception_in_script(self): + fut = self.executor.submit('raise Exception("spam")') + with self.assertRaises(Exception) as captured: + fut.result() + self.assertIs(type(captured.exception), Exception) + self.assertEqual(str(captured.exception), 'spam') + cause = captured.exception.__cause__ + self.assertIs(type(cause), ExecutionFailed) + for attr in ('__name__', '__qualname__', '__module__'): + self.assertEqual(getattr(cause.excinfo.type, attr), + getattr(Exception, attr)) + self.assertEqual(cause.excinfo.msg, 'spam') + + def test_submit_exception_in_func(self): + fut = self.executor.submit(fail, Exception, 'spam') + with self.assertRaises(Exception) as captured: + fut.result() + self.assertIs(type(captured.exception), Exception) + self.assertEqual(str(captured.exception), 'spam') + cause = captured.exception.__cause__ + self.assertIs(type(cause), ExecutionFailed) + for attr in ('__name__', '__qualname__', '__module__'): + self.assertEqual(getattr(cause.excinfo.type, attr), + getattr(Exception, attr)) + self.assertEqual(cause.excinfo.msg, 'spam') def test_saturation(self): blocker = queues.create() executor = self.executor_type(4, shared=dict(blocker=blocker)) for i in range(15 * executor._max_workers): - executor.submit('blocker.get()') + executor.submit(exec, 'import __main__; __main__.blocker.get()') + #executor.submit('blocker.get()') self.assertEqual(len(executor._threads), executor._max_workers) for i in range(15 * executor._max_workers): blocker.put_nowait(None) From b148e09707b1c5931eaf0b1d2ec1d57021359f0b Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 16 Oct 2024 16:01:10 -0600 Subject: [PATCH 37/38] Fix a TODO comment. --- Lib/concurrent/futures/interpreter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index 3c991b22d16dc5..fd7941adb766bb 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -52,7 +52,10 @@ def resolve_task(fn, args, kwargs): data = textwrap.dedent(fn) kind = 'script' # Make sure the script compiles. - # XXX Keep the compiled code object? + # Ideally we wouldn't throw away the resulting code + # object. However, there isn't much to be done until + # code objects are shareable and/or we do a better job + # of supporting code objects in _interpreters.exec(). compile(data, '', 'exec') else: # Functions defined in the __main__ module can't be pickled, From e365ae72bcfdcdfbadc619eaefa08c00d3a8a490 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 16 Oct 2024 16:05:02 -0600 Subject: [PATCH 38/38] Fix the docs. --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 2e172ac30a373c..45a73705f10e92 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -263,7 +263,7 @@ interpreter, it is not automatically imported in any other. You would need to import the module separately in interpreter where you need it. In fact, each module imported in an interpreter is a completely separate object from the same module in a different -interpreter, including :module:`sys`, :module:`builtins`, +interpreter, including :mod:`sys`, :mod:`builtins`, and even ``__main__``. Isolation means a mutable object, or other data, cannot be used