diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index 1b0315ad18584fc..34cbab1f727b513 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -1,3 +1,6 @@ +import contextlib +import os +import sys import unittest from concurrent.futures.interpreter import ExecutionFailed from test import support @@ -7,11 +10,119 @@ from .util import BaseTestCase, InterpreterPoolMixin, setup_module +def write_msg(fd, msg): + os.write(fd, msg + b'\0') + + +def read_msg(fd): + msg = b'' + while ch := os.read(fd, 1): + if ch == b'\0': + return msg + msg += ch + + +def get_current_name(): + return __name__ + + class InterpreterPoolExecutorTest(InterpreterPoolMixin, ExecutorTest, BaseTestCase): + def pipe(self): + r, w = os.pipe() + self.addCleanup(lambda: os.close(r)) + self.addCleanup(lambda: os.close(w)) + return r, w + def assertTaskRaises(self, exctype): return self.assertRaisesRegex(ExecutionFailed, exctype.__name__) + def test_init_func(self): + msg = b'step: init' + r, w = self.pipe() + os.write(w, b'\0') + + executor = self.executor_type( + initializer=write_msg, initargs=(w, msg)) + before = os.read(r, 100) + executor.submit(mul, 10, 10) + after = read_msg(r) + + self.assertEqual(before, b'\0') + self.assertEqual(after, msg) + + def test_init_script(self): + msg1 = b'step: init' + msg2 = b'step: run' + r, w = self.pipe() + initscript = f"""if True: + import os + msg = {msg2!r} + os.write({w}, {msg1!r} + b'\\0') + """ + script = f"""if True: + os.write({w}, msg + b'\\0') + """ + os.write(w, b'\0') + + executor = self.executor_type(initializer=initscript) + before_init = os.read(r, 100) + fut = executor.submit(script) + after_init = read_msg(r) + write_msg(w, b'') + before_run = read_msg(r) + fut.result() + after_run = read_msg(r) + + self.assertEqual(before_init, b'\0') + self.assertEqual(after_init, msg1) + self.assertEqual(before_run, b'') + self.assertEqual(after_run, msg2) + + def test_init_script_args(self): + with self.assertRaises(ValueError): + self.executor_type(initializer='pass', initargs=('spam',)) + + def test_init_shared(self): + msg = b'eggs' + r, w = self.pipe() + script = f"""if True: + import os + os.write({w}, spam + b'\\0') + """ + + executor = self.executor_type(shared={'spam': msg}) + fut = executor.submit(script) + fut.result() + after = read_msg(r) + + self.assertEqual(after, msg) + + def test_submit_script(self): + msg = b'spam' + r, w = self.pipe() + script = f"""if True: + import os + os.write({w}, __name__.encode('utf-8') + b'\\0') + """ + executor = self.executor_type() + + fut = executor.submit(script) + res = fut.result() + after = read_msg(r) + + self.assertEqual(after, b'__main__') + self.assertIs(res, None) + + def test_submit_func_globals(self): + raise NotImplementedError + executor = self.executor_type() + fut = executor.submit(get_current_name) + name = fut.result() + + self.assertEqual(name, '__main__') + self.assertNotEqual(name, __name__) + def test_saturation(self): blocker = queues.create() executor = self.executor_type(4, shared=dict(blocker=blocker)) @@ -32,35 +143,6 @@ def test_idle_thread_reuse(self): self.assertEqual(len(executor._threads), 1) executor.shutdown(wait=True) -# def test_executor_map_current_future_cancel(self): -# blocker = queues.create() -# log = queues.create() -# -# script = """if True: -# def log_n_wait({ident}): -# blocker(f"ident {ident} started") -# try: -# stop_event.wait() -# finally: -# log.append(f"ident {ident} stopped") -# """ -# -# with self.executor_type(max_workers=1) as pool: -# # submit work to saturate the pool -# fut = pool.submit(script.format(ident="first")) -# gen = pool.map(log_n_wait, ["second", "third"], timeout=0) -# try: -# with self.assertRaises(TimeoutError): -# next(gen) -# finally: -# gen.close() -# blocker.put -# stop_event.set() -# fut.result() -# # ident='second' is cancelled as a result of raising a TimeoutError -# # ident='third' is cancelled because it remained in the collection of futures -# self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) - def setUpModule(): setup_module()