Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-124694: Add concurrent.futures.InterpreterPoolExecutor #124548

Merged
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5c69d38
Make ThreadPoolExecutor extensible.
ericsnowcurrently Sep 25, 2024
01789be
Add InterpreterPoolExecutor.
ericsnowcurrently Sep 25, 2024
6def4be
Clean up the interpreter if initialize() fails.
ericsnowcurrently Sep 27, 2024
84993a5
Add a missing import.
ericsnowcurrently Sep 27, 2024
c540cf0
Fix some typos.
ericsnowcurrently Sep 27, 2024
45d584d
Add more tests.
ericsnowcurrently Sep 27, 2024
c90c016
Add docs.
ericsnowcurrently Sep 27, 2024
1cb4657
Add a NEwS entry.
ericsnowcurrently Sep 27, 2024
4dc0989
Fix the last test.
ericsnowcurrently Sep 27, 2024
57b2db6
Add more tests.
ericsnowcurrently Sep 27, 2024
75e11d2
Simplify ExecutionFailed.
ericsnowcurrently Sep 30, 2024
69c2b8e
Fix the signature of resolve_task().
ericsnowcurrently Sep 30, 2024
f03c314
Capture any uncaught exception.
ericsnowcurrently Sep 30, 2024
4806d9f
Add TODO comments.
ericsnowcurrently Sep 30, 2024
efc0395
Docs fixes.
ericsnowcurrently Sep 30, 2024
a29aee3
Automatically apply textwrap.dedent() to scripts.
ericsnowcurrently Sep 30, 2024
8bab457
Fix the WASI build.
ericsnowcurrently Sep 30, 2024
cd29914
wasi
ericsnowcurrently Oct 1, 2024
0287f3b
Ignore race in test.
ericsnowcurrently Oct 1, 2024
80cd7b1
Add BrokenInterpreterPool.
ericsnowcurrently Oct 8, 2024
f8d4273
Tweak the docs.
ericsnowcurrently Oct 8, 2024
3a8bfce
Clarify the InterpreterPoolExecutor docs.
ericsnowcurrently Oct 8, 2024
af6c27a
Catch all exceptions.
ericsnowcurrently Oct 8, 2024
8c0a405
Factor out exception serialization helpers.
ericsnowcurrently Oct 8, 2024
1ae7ca2
Set the ExecutionFailed error as __cause__.
ericsnowcurrently Oct 8, 2024
d24e85d
Drop the exception serialization helpers.
ericsnowcurrently Oct 8, 2024
05a03ad
Always finalize if there is an error in initialize().
ericsnowcurrently Oct 8, 2024
f150931
Explicitly note the problem with functions defined in __main__.
ericsnowcurrently Oct 8, 2024
97d0292
Handle the case where interpreters.queues doesn't exist.
ericsnowcurrently Oct 8, 2024
baf0504
Merge branch 'main' into interpreter-pool-executor
ericsnowcurrently Oct 15, 2024
5c3a327
Add a What's New entry about InterpreterPoolExecutor.
ericsnowcurrently Oct 15, 2024
a2032a8
Fix a typo.
ericsnowcurrently Oct 15, 2024
54119b8
Fix the documented signature.
ericsnowcurrently Oct 15, 2024
744dca7
Test and document asyncio support.
ericsnowcurrently Oct 15, 2024
f61d62d
Apply suggestions from code review
ericsnowcurrently Oct 16, 2024
ee65bb2
Expand the docs.
ericsnowcurrently Oct 16, 2024
a7f5c50
For now, drop support for scripts.
ericsnowcurrently Oct 16, 2024
b148e09
Fix a TODO comment.
ericsnowcurrently Oct 16, 2024
e365ae7
Fix the docs.
ericsnowcurrently Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 79 additions & 4 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ The :mod:`concurrent.futures` module provides a high-level interface for
asynchronously executing callables.

The asynchronous execution can be performed with threads, using
:class:`ThreadPoolExecutor`, or separate processes, using
:class:`ProcessPoolExecutor`. Both implement the same interface, which is
defined by the abstract :class:`Executor` class.
:class:`ThreadPoolExecutor` or :class:`InterpreterPoolExecutor`,
or separate processes, using :class:`ProcessPoolExecutor`.
Each implements the same interface, which is defined
by the abstract :class:`Executor` class.

.. include:: ../includes/wasm-notavail.rst

Expand Down Expand Up @@ -63,7 +64,8 @@ Executor Objects
setting *chunksize* to a positive integer. For very long iterables,
using a large value for *chunksize* can significantly improve
performance compared to the default size of 1. With
:class:`ThreadPoolExecutor`, *chunksize* has no effect.
:class:`ThreadPoolExecutor` and :class:`InterpreterPoolExecutor`,
*chunksize* has no effect.

.. versionchanged:: 3.5
Added the *chunksize* argument.
Expand Down Expand Up @@ -227,6 +229,59 @@ ThreadPoolExecutor Example
print('%r page is %d bytes' % (url, len(data)))


InterpreterPoolExecutor
-----------------------

The :class:`InterpreterPoolExecutor` class is a :class:`ThreadPoolExecutor`
subclass that uses a pool of isolated interpreters to execute calls
asynchronously. Each interpreter is isolated from the others and thus
can side-step the :term:`Global Interpreter Lock <global interpreter lock>`,
allowing the use of multiple cores. Interpreters mostly can't share
objects between them, which means that, in most cases, only picklable
objects can be executed and returned.
ericsnowcurrently marked this conversation as resolved.
Show resolved Hide resolved

.. class:: InterpreterPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), shared=None)

A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously
using a pool of at most *max_workers* threads. Each thread runs
tasks in its own interpreter.

*initializer* may be a callable and *initargs* a tuple of arguments,
just like with :class:`ThreadPoolExecutor`. However, they are pickled
like with :class:`ProcessPoolExecutor`. Likewise, functions (and
arguments) passed to :meth:`~Executor.submit` are pickled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*initializer* may be a callable and *initargs* a tuple of arguments,
just like with :class:`ThreadPoolExecutor`. However, they are pickled
like with :class:`ProcessPoolExecutor`. Likewise, functions (and
arguments) passed to :meth:`~Executor.submit` are pickled.
An *initializer* may be a callable and *initargs* a tuple of arguments,
similar to the behavior of :class:`ThreadPoolExecutor`. Additionally, they are pickled
like with :class:`ProcessPoolExecutor`. Likewise, functions (and
arguments) passed to :meth:`~Executor.submit` are pickled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This motivated me to add a bit more clarity in the docs. 😄


.. note::
functions defined in the ``__main__`` module cannot be pickled
and thus cannot be used.

*shared* is an optional dict of objects shared by all interpreters
in the pool. The items are added to each interpreter's ``__main__``
module. Not all objects are shareable. Those that are include
the builtin singletons, :class:`str` and :class:`bytes`,
and :class:`memoryview`. See :pep:`734` for more info.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*shared* is an optional dict of objects shared by all interpreters
in the pool. The items are added to each interpreter's ``__main__``
module. Not all objects are shareable. Those that are include
the builtin singletons, :class:`str` and :class:`bytes`,
and :class:`memoryview`. See :pep:`734` for more info.
*shared* is an optional dict of objects shared by all isolated interpreters
in the pool. The *shared* items are added to each interpreter's ``__main__``
module. Not all objects are shareable. Shareable objects include
the builtin singletons, :class:`str` and :class:`bytes`,
and :class:`memoryview`. See :pep:`734` for more info.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made use of your suggestions. Thanks!


You can also pass a script (:class:`str`) for *initiazer* or to
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You can also pass a script (:class:`str`) for *initiazer* or to
You can also pass a script (:class:`str`) for *initializer* or to

I noticed one small thing. I'm looking forward to using subinterpreters!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

:meth:`~Executor.submit` (but not to :meth:`~Executor.map`).
In both cases, the script will be executed in the interpreter's
``__main__`` module. The executor will automatically apply
:func:`textwrap.dedent` to the script, so you don't have to do so.
With a script, arguments must not be passed in.
For :meth:`!Executor.submit`, the return value for a script
is always ``None``.

Normally an uncaught exception from *initializer* or from a submitted
task will be sent back between interpreters to be raised as is.
ericsnowcurrently marked this conversation as resolved.
Show resolved Hide resolved
However, in some situations the exception might not be suitable to be
sent back. In that case, the executor raises an
:class:`~concurrent.futures.interpreter.ExecutionFailed` exception
instead, which contains a summary of the original exception.

The other caveats that apply to :class:`ThreadPoolExecutor` apply here.
ericsnowcurrently marked this conversation as resolved.
Show resolved Hide resolved

.. versionadded:: next


ProcessPoolExecutor
-------------------

Expand Down Expand Up @@ -574,6 +629,26 @@ Exception classes

.. versionadded:: 3.7

.. currentmodule:: concurrent.futures.interpreter

.. exception:: BrokenInterpreterPool

Derived from :exc:`~concurrent.futures.thread.BrokenThreadPool`,
this exception class is raised when one of the workers
of a :class:`~concurrent.futures.InterpreterPoolExecutor`
has failed initializing.

.. versionadded:: next

.. exception:: ExecutionFailed

Raised from :class:`~concurrent.futures.InterpreterPoolExecutor` when
the given initializer fails or from
:meth:`~concurrent.futures.Executor.submit` when there's an uncaught
exception from the submitted task.

.. versionadded:: next

.. currentmodule:: concurrent.futures.process

.. exception:: BrokenProcessPool
Expand Down
12 changes: 11 additions & 1 deletion Lib/concurrent/futures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
'Executor',
'wait',
'as_completed',
'InterpreterPoolExecutor',
'ProcessPoolExecutor',
'ThreadPoolExecutor',
)
Expand All @@ -39,7 +40,7 @@ def __dir__():


def __getattr__(name):
global ProcessPoolExecutor, ThreadPoolExecutor
global ProcessPoolExecutor, ThreadPoolExecutor, InterpreterPoolExecutor

if name == 'ProcessPoolExecutor':
from .process import ProcessPoolExecutor as pe
Expand All @@ -51,4 +52,13 @@ def __getattr__(name):
ThreadPoolExecutor = te
return te

if name == 'InterpreterPoolExecutor':
try:
from .interpreter import InterpreterPoolExecutor as ie
except ModuleNotFoundError:
ie = InterpreterPoolExecutor = None
else:
InterpreterPoolExecutor = ie
return ie

raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
233 changes: 233 additions & 0 deletions Lib/concurrent/futures/interpreter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""Implements InterpreterPoolExecutor."""

import contextlib
import pickle
import textwrap
from . import thread as _thread
import _interpreters
import _interpqueues


class ExecutionFailed(_interpreters.InterpreterError):
"""An unhandled exception happened during execution."""

def __init__(self, excinfo):
msg = excinfo.formatted
if not msg:
if excinfo.type and excinfo.msg:
msg = f'{excinfo.type.__name__}: {excinfo.msg}'
else:
msg = excinfo.type.__name__ or excinfo.msg
super().__init__(msg)
self.excinfo = excinfo

def __str__(self):
try:
formatted = self.excinfo.errdisplay
except Exception:
return super().__str__()
else:
return textwrap.dedent(f"""
{super().__str__()}

Uncaught in the interpreter:

{formatted}
""".strip())


UNBOUND = 2 # error; this should not happen.


class WorkerContext(_thread.WorkerContext):

@classmethod
def prepare(cls, initializer, initargs, shared):
def resolve_task(fn, args, kwargs):
if isinstance(fn, str):
if args or kwargs:
raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
data = textwrap.dedent(fn)
kind = 'script'
# Make sure the script compiles.
# XXX Keep the compiled code object?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a clear case of why you would wish to keep the compiled code object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly a mater of efficiency, avoiding a duplication of effort at runtime. However, we're not in a position yet to share code objects anyway, so the comment isn't particularly helpful. I'll fix that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

compile(data, '<string>', 'exec')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here (for now). We could possibly
ericsnowcurrently marked this conversation as resolved.
Show resolved Hide resolved
# borrow from multiprocessing to work around this.
data = pickle.dumps((fn, args, kwargs))
kind = 'function'
return (data, kind)

if isinstance(initializer, str):
if initargs:
raise ValueError(f'an initializer script does not take args, got {initargs!r}')
if initializer is not None:
initdata = resolve_task(initializer, initargs, {})
else:
initdata = None
def create_context():
return cls(initdata, shared)
return create_context, resolve_task

@classmethod
@contextlib.contextmanager
def _capture_exc(cls, resultsid):
try:
yield
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
err = pickle.dumps(exc)
_interpqueues.put(resultsid, (None, err), 1, UNBOUND)
raise # re-raise

@classmethod
def _send_script_result(cls, resultsid):
_interpqueues.put(resultsid, (None, None), 0, UNBOUND)

@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
try:
_interpqueues.put(resultsid, (res, None), 0, UNBOUND)
except _interpreters.NotShareableError:
res = pickle.dumps(res)
_interpqueues.put(resultsid, (res, None), 1, UNBOUND)

@classmethod
def _call_pickled(cls, pickled, resultsid):
fn, args, kwargs = pickle.loads(pickled)
cls._call(fn, args, kwargs, resultsid)

def __init__(self, initdata, shared=None):
self.initdata = initdata
self.shared = dict(shared) if shared else None
self.interpid = None
self.resultsid = None

def __del__(self):
if self.interpid is not None:
self.finalize()

def _exec(self, script):
assert self.interpid is not None
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
if excinfo is not None:
raise ExecutionFailed(excinfo)

def initialize(self):
assert self.interpid is None, self.interpid
self.interpid = _interpreters.create(reqrefs=True)
try:
_interpreters.incref(self.interpid)

maxsize = 0
fmt = 0
self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)

self._exec(f'from {__name__} import WorkerContext')

if self.shared:
_interpreters.set___main___attrs(
self.interpid, self.shared, restrict=True)

if self.initdata:
self.run(self.initdata)
except BaseException:
self.finalize()
raise # re-raise

def finalize(self):
interpid = self.interpid
resultsid = self.resultsid
self.resultsid = None
self.interpid = None
if resultsid is not None:
try:
_interpqueues.destroy(resultsid)
except _interpqueues.QueueNotFoundError:
pass
if interpid is not None:
try:
_interpreters.decref(interpid)
except _interpreters.InterpreterNotFoundError:
pass

def run(self, task):
data, kind = task
if kind == 'script':
script = f"""
with WorkerContext._capture_exc({self.resultsid}):
{textwrap.indent(data, ' ')}
WorkerContext._send_script_result({self.resultsid})"""
elif kind == 'function':
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
else:
raise NotImplementedError(kind)

try:
self._exec(script)
except ExecutionFailed as exc:
exc_wrapper = exc
else:
exc_wrapper = None

# Return the result, or raise the exception.
while True:
try:
obj = _interpqueues.get(self.resultsid)
except _interpqueues.QueueNotFoundError:
raise # re-raise
except _interpqueues.QueueError:
continue
except ModuleNotFoundError:
# interpreters.queues doesn't exist, which means
# QueueEmpty doesn't. Act as though it does.
continue
else:
break
(res, excdata), pickled, unboundop = obj
assert unboundop is None, unboundop
if excdata is not None:
assert res is None, res
assert pickled
assert exc_wrapper is not None
exc = pickle.loads(excdata)
raise exc from exc_wrapper
return pickle.loads(res) if pickled else res


class BrokenInterpreterPool(_thread.BrokenThreadPool):
"""
Raised when a worker thread in an InterpreterPoolExecutor failed initializing.
"""


class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):

BROKEN = BrokenInterpreterPool

@classmethod
def prepare_context(cls, initializer, initargs, shared):
return WorkerContext.prepare(initializer, initargs, shared)

def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=(), shared=None):
"""Initializes a new InterpreterPoolExecutor instance.

Args:
max_workers: The maximum number of interpreters that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
initializer: A callable or script used to initialize
each worker interpreter.
initargs: A tuple of arguments to pass to the initializer.
shared: A mapping of shareabled objects to be inserted into
each worker interpreter.
"""
super().__init__(max_workers, thread_name_prefix,
initializer, initargs, shared=shared)
Loading
Loading