| import contextlib |
| import logging |
| import queue |
| import time |
| import unittest |
| import sys |
| from concurrent.futures._base import BrokenExecutor |
| from concurrent.futures.process import _check_system_limits |
| |
| from logging.handlers import QueueHandler |
| |
| from test import support |
| |
| from .util import ExecutorMixin, create_executor_tests, setup_module |
| |
| |
| INITIALIZER_STATUS = 'uninitialized' |
| |
| def init(x): |
| global INITIALIZER_STATUS |
| INITIALIZER_STATUS = x |
| |
| def get_init_status(): |
| return INITIALIZER_STATUS |
| |
| def init_fail(log_queue=None): |
| if log_queue is not None: |
| logger = logging.getLogger('concurrent.futures') |
| logger.addHandler(QueueHandler(log_queue)) |
| logger.setLevel('CRITICAL') |
| logger.propagate = False |
| time.sleep(0.1) # let some futures be scheduled |
| raise ValueError('error in initializer') |
| |
| |
| class InitializerMixin(ExecutorMixin): |
| worker_count = 2 |
| |
| def setUp(self): |
| global INITIALIZER_STATUS |
| INITIALIZER_STATUS = 'uninitialized' |
| self.executor_kwargs = dict(initializer=init, |
| initargs=('initialized',)) |
| super().setUp() |
| |
| def test_initializer(self): |
| futures = [self.executor.submit(get_init_status) |
| for _ in range(self.worker_count)] |
| |
| for f in futures: |
| self.assertEqual(f.result(), 'initialized') |
| |
| |
| class FailingInitializerMixin(ExecutorMixin): |
| worker_count = 2 |
| |
| def setUp(self): |
| if hasattr(self, "ctx"): |
| # Pass a queue to redirect the child's logging output |
| self.mp_context = self.get_context() |
| self.log_queue = self.mp_context.Queue() |
| self.executor_kwargs = dict(initializer=init_fail, |
| initargs=(self.log_queue,)) |
| else: |
| # In a thread pool, the child shares our logging setup |
| # (see _assert_logged()) |
| self.mp_context = None |
| self.log_queue = None |
| self.executor_kwargs = dict(initializer=init_fail) |
| super().setUp() |
| |
| def test_initializer(self): |
| with self._assert_logged('ValueError: error in initializer'): |
| try: |
| future = self.executor.submit(get_init_status) |
| except BrokenExecutor: |
| # Perhaps the executor is already broken |
| pass |
| else: |
| with self.assertRaises(BrokenExecutor): |
| future.result() |
| |
| # At some point, the executor should break |
| for _ in support.sleeping_retry(support.SHORT_TIMEOUT, |
| "executor not broken"): |
| if self.executor._broken: |
| break |
| |
| # ... and from this point submit() is guaranteed to fail |
| with self.assertRaises(BrokenExecutor): |
| self.executor.submit(get_init_status) |
| |
| @contextlib.contextmanager |
| def _assert_logged(self, msg): |
| if self.log_queue is not None: |
| yield |
| output = [] |
| try: |
| while True: |
| output.append(self.log_queue.get_nowait().getMessage()) |
| except queue.Empty: |
| pass |
| else: |
| with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: |
| yield |
| output = cm.output |
| self.assertTrue(any(msg in line for line in output), |
| output) |
| |
| |
| create_executor_tests(globals(), InitializerMixin) |
| create_executor_tests(globals(), FailingInitializerMixin) |
| |
| |
| @unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows") |
| class FailingInitializerResourcesTest(unittest.TestCase): |
| """ |
| Source: https://github.com/python/cpython/issues/104090 |
| """ |
| |
| def _test(self, test_class): |
| try: |
| _check_system_limits() |
| except NotImplementedError: |
| self.skipTest("ProcessPoolExecutor unavailable on this system") |
| |
| runner = unittest.TextTestRunner() |
| runner.run(test_class('test_initializer')) |
| |
| # GH-104090: |
| # Stop resource tracker manually now, so we can verify there are not leaked resources by checking |
| # the process exit code |
| from multiprocessing.resource_tracker import _resource_tracker |
| _resource_tracker._stop() |
| |
| self.assertEqual(_resource_tracker._exitcode, 0) |
| |
| def test_spawn(self): |
| self._test(ProcessPoolSpawnFailingInitializerTest) |
| |
| def test_forkserver(self): |
| self._test(ProcessPoolForkserverFailingInitializerTest) |
| |
| |
| def setUpModule(): |
| setup_module() |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |