Skip to content

Commit

Permalink
Add more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
ericsnowcurrently committed Sep 27, 2024
1 parent eeca385 commit 28f948b
Showing 1 changed file with 111 additions and 29 deletions.
140 changes: 111 additions & 29 deletions Lib/test/test_concurrent_futures/test_interpreter_pool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import contextlib
import os
import sys
import unittest
from concurrent.futures.interpreter import ExecutionFailed
from test import support
Expand All @@ -7,11 +10,119 @@
from .util import BaseTestCase, InterpreterPoolMixin, setup_module


def write_msg(fd, msg):
os.write(fd, msg + b'\0')


def read_msg(fd):
msg = b''
while ch := os.read(fd, 1):
if ch == b'\0':
return msg
msg += ch


def get_current_name():
return __name__


class InterpreterPoolExecutorTest(InterpreterPoolMixin, ExecutorTest, BaseTestCase):

def pipe(self):
r, w = os.pipe()
self.addCleanup(lambda: os.close(r))
self.addCleanup(lambda: os.close(w))
return r, w

def assertTaskRaises(self, exctype):
return self.assertRaisesRegex(ExecutionFailed, exctype.__name__)

def test_init_func(self):
msg = b'step: init'
r, w = self.pipe()
os.write(w, b'\0')

executor = self.executor_type(
initializer=write_msg, initargs=(w, msg))
before = os.read(r, 100)
executor.submit(mul, 10, 10)
after = read_msg(r)

self.assertEqual(before, b'\0')
self.assertEqual(after, msg)

def test_init_script(self):
msg1 = b'step: init'
msg2 = b'step: run'
r, w = self.pipe()
initscript = f"""if True:
import os
msg = {msg2!r}
os.write({w}, {msg1!r} + b'\\0')
"""
script = f"""if True:
os.write({w}, msg + b'\\0')
"""
os.write(w, b'\0')

executor = self.executor_type(initializer=initscript)
before_init = os.read(r, 100)
fut = executor.submit(script)
after_init = read_msg(r)
write_msg(w, b'')
before_run = read_msg(r)
fut.result()
after_run = read_msg(r)

self.assertEqual(before_init, b'\0')
self.assertEqual(after_init, msg1)
self.assertEqual(before_run, b'')
self.assertEqual(after_run, msg2)

def test_init_script_args(self):
with self.assertRaises(ValueError):
self.executor_type(initializer='pass', initargs=('spam',))

def test_init_shared(self):
msg = b'eggs'
r, w = self.pipe()
script = f"""if True:
import os
os.write({w}, spam + b'\\0')
"""

executor = self.executor_type(shared={'spam': msg})
fut = executor.submit(script)
fut.result()
after = read_msg(r)

self.assertEqual(after, msg)

def test_submit_script(self):
msg = b'spam'
r, w = self.pipe()
script = f"""if True:
import os
os.write({w}, __name__.encode('utf-8') + b'\\0')
"""
executor = self.executor_type()

fut = executor.submit(script)
res = fut.result()
after = read_msg(r)

self.assertEqual(after, b'__main__')
self.assertIs(res, None)

def test_submit_func_globals(self):
raise NotImplementedError
executor = self.executor_type()
fut = executor.submit(get_current_name)
name = fut.result()

self.assertEqual(name, '__main__')
self.assertNotEqual(name, __name__)

def test_saturation(self):
blocker = queues.create()
executor = self.executor_type(4, shared=dict(blocker=blocker))
Expand All @@ -32,35 +143,6 @@ def test_idle_thread_reuse(self):
self.assertEqual(len(executor._threads), 1)
executor.shutdown(wait=True)

# def test_executor_map_current_future_cancel(self):
# blocker = queues.create()
# log = queues.create()
#
# script = """if True:
# def log_n_wait({ident}):
# blocker(f"ident {ident} started")
# try:
# stop_event.wait()
# finally:
# log.append(f"ident {ident} stopped")
# """
#
# with self.executor_type(max_workers=1) as pool:
# # submit work to saturate the pool
# fut = pool.submit(script.format(ident="first"))
# gen = pool.map(log_n_wait, ["second", "third"], timeout=0)
# try:
# with self.assertRaises(TimeoutError):
# next(gen)
# finally:
# gen.close()
# blocker.put
# stop_event.set()
# fut.result()
# # ident='second' is cancelled as a result of raising a TimeoutError
# # ident='third' is cancelled because it remained in the collection of futures
# self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])


def setUpModule():
setup_module()
Expand Down

0 comments on commit 28f948b

Please sign in to comment.