| import signal |
| import sys |
| import threading |
| import time |
| import unittest |
| from concurrent import futures |
| |
| from test import support |
| from test.support.script_helper import assert_python_ok |
| |
| from .util import ( |
| BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin, |
| ProcessPoolForkserverMixin, ProcessPoolSpawnMixin, |
| create_executor_tests, setup_module) |
| |
| |
| def sleep_and_print(t, msg): |
| time.sleep(t) |
| print(msg) |
| sys.stdout.flush() |
| |
| |
| class ExecutorShutdownTest: |
| def test_run_after_shutdown(self): |
| self.executor.shutdown() |
| self.assertRaises(RuntimeError, |
| self.executor.submit, |
| pow, 2, 5) |
| |
| def test_interpreter_shutdown(self): |
| # Test the atexit hook for shutdown of worker threads and processes |
| rc, out, err = assert_python_ok('-c', """if 1: |
| from concurrent.futures import {executor_type} |
| from time import sleep |
| from test.test_concurrent_futures.test_shutdown import sleep_and_print |
| if __name__ == "__main__": |
| context = '{context}' |
| if context == "": |
| t = {executor_type}(5) |
| else: |
| from multiprocessing import get_context |
| context = get_context(context) |
| t = {executor_type}(5, mp_context=context) |
| t.submit(sleep_and_print, 1.0, "apple") |
| """.format(executor_type=self.executor_type.__name__, |
| context=getattr(self, "ctx", ""))) |
| # Errors in atexit hooks don't change the process exit code, check |
| # stderr manually. |
| self.assertFalse(err) |
| self.assertEqual(out.strip(), b"apple") |
| |
| def test_submit_after_interpreter_shutdown(self): |
| # Test the atexit hook for shutdown of worker threads and processes |
| rc, out, err = assert_python_ok('-c', """if 1: |
| import atexit |
| @atexit.register |
| def run_last(): |
| try: |
| t.submit(id, None) |
| except RuntimeError: |
| print("runtime-error") |
| raise |
| from concurrent.futures import {executor_type} |
| if __name__ == "__main__": |
| context = '{context}' |
| if not context: |
| t = {executor_type}(5) |
| else: |
| from multiprocessing import get_context |
| context = get_context(context) |
| t = {executor_type}(5, mp_context=context) |
| t.submit(id, 42).result() |
| """.format(executor_type=self.executor_type.__name__, |
| context=getattr(self, "ctx", ""))) |
| # Errors in atexit hooks don't change the process exit code, check |
| # stderr manually. |
| self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) |
| self.assertEqual(out.strip(), b"runtime-error") |
| |
| def test_hang_issue12364(self): |
| fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] |
| self.executor.shutdown() |
| for f in fs: |
| f.result() |
| |
| def test_cancel_futures(self): |
| assert self.worker_count <= 5, "test needs few workers" |
| fs = [self.executor.submit(time.sleep, .1) for _ in range(50)] |
| self.executor.shutdown(cancel_futures=True) |
| # We can't guarantee the exact number of cancellations, but we can |
| # guarantee that *some* were cancelled. With few workers, many of |
| # the submitted futures should have been cancelled. |
| cancelled = [fut for fut in fs if fut.cancelled()] |
| self.assertGreater(len(cancelled), 20) |
| |
| # Ensure the other futures were able to finish. |
| # Use "not fut.cancelled()" instead of "fut.done()" to include futures |
| # that may have been left in a pending state. |
| others = [fut for fut in fs if not fut.cancelled()] |
| for fut in others: |
| self.assertTrue(fut.done(), msg=f"{fut._state=}") |
| self.assertIsNone(fut.exception()) |
| |
| # Similar to the number of cancelled futures, we can't guarantee the |
| # exact number that completed. But, we can guarantee that at least |
| # one finished. |
| self.assertGreater(len(others), 0) |
| |
| def test_hang_gh83386(self): |
| """shutdown(wait=False) doesn't hang at exit with running futures. |
| |
| See https://github.com/python/cpython/issues/83386. |
| """ |
| if self.executor_type == futures.ProcessPoolExecutor: |
| raise unittest.SkipTest( |
| "Hangs, see https://github.com/python/cpython/issues/83386") |
| |
| rc, out, err = assert_python_ok('-c', """if True: |
| from concurrent.futures import {executor_type} |
| from test.test_concurrent_futures.test_shutdown import sleep_and_print |
| if __name__ == "__main__": |
| if {context!r}: multiprocessing.set_start_method({context!r}) |
| t = {executor_type}(max_workers=3) |
| t.submit(sleep_and_print, 1.0, "apple") |
| t.shutdown(wait=False) |
| """.format(executor_type=self.executor_type.__name__, |
| context=getattr(self, 'ctx', None))) |
| self.assertFalse(err) |
| self.assertEqual(out.strip(), b"apple") |
| |
| def test_hang_gh94440(self): |
| """shutdown(wait=True) doesn't hang when a future was submitted and |
| quickly canceled right before shutdown. |
| |
| See https://github.com/python/cpython/issues/94440. |
| """ |
| if not hasattr(signal, 'alarm'): |
| raise unittest.SkipTest( |
| "Tested platform does not support the alarm signal") |
| |
| def timeout(_signum, _frame): |
| raise RuntimeError("timed out waiting for shutdown") |
| |
| kwargs = {} |
| if getattr(self, 'ctx', None): |
| kwargs['mp_context'] = self.get_context() |
| executor = self.executor_type(max_workers=1, **kwargs) |
| executor.submit(int).result() |
| old_handler = signal.signal(signal.SIGALRM, timeout) |
| try: |
| signal.alarm(5) |
| executor.submit(int).cancel() |
| executor.shutdown(wait=True) |
| finally: |
| signal.alarm(0) |
| signal.signal(signal.SIGALRM, old_handler) |
| |
| |
| class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): |
| def test_threads_terminate(self): |
| def acquire_lock(lock): |
| lock.acquire() |
| |
| sem = threading.Semaphore(0) |
| for i in range(3): |
| self.executor.submit(acquire_lock, sem) |
| self.assertEqual(len(self.executor._threads), 3) |
| for i in range(3): |
| sem.release() |
| self.executor.shutdown() |
| for t in self.executor._threads: |
| t.join() |
| |
| def test_context_manager_shutdown(self): |
| with futures.ThreadPoolExecutor(max_workers=5) as e: |
| executor = e |
| self.assertEqual(list(e.map(abs, range(-5, 5))), |
| [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) |
| |
| for t in executor._threads: |
| t.join() |
| |
| def test_del_shutdown(self): |
| executor = futures.ThreadPoolExecutor(max_workers=5) |
| res = executor.map(abs, range(-5, 5)) |
| threads = executor._threads |
| del executor |
| |
| for t in threads: |
| t.join() |
| |
| # Make sure the results were all computed before the |
| # executor got shutdown. |
| assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) |
| |
| def test_shutdown_no_wait(self): |
| # Ensure that the executor cleans up the threads when calling |
| # shutdown with wait=False |
| executor = futures.ThreadPoolExecutor(max_workers=5) |
| res = executor.map(abs, range(-5, 5)) |
| threads = executor._threads |
| executor.shutdown(wait=False) |
| for t in threads: |
| t.join() |
| |
| # Make sure the results were all computed before the |
| # executor got shutdown. |
| assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) |
| |
| |
| def test_thread_names_assigned(self): |
| executor = futures.ThreadPoolExecutor( |
| max_workers=5, thread_name_prefix='SpecialPool') |
| executor.map(abs, range(-5, 5)) |
| threads = executor._threads |
| del executor |
| support.gc_collect() # For PyPy or other GCs. |
| |
| for t in threads: |
| self.assertRegex(t.name, r'^SpecialPool_[0-4]$') |
| t.join() |
| |
| def test_thread_names_default(self): |
| executor = futures.ThreadPoolExecutor(max_workers=5) |
| executor.map(abs, range(-5, 5)) |
| threads = executor._threads |
| del executor |
| support.gc_collect() # For PyPy or other GCs. |
| |
| for t in threads: |
| # Ensure that our default name is reasonably sane and unique when |
| # no thread_name_prefix was supplied. |
| self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') |
| t.join() |
| |
| def test_cancel_futures_wait_false(self): |
| # Can only be reliably tested for TPE, since PPE often hangs with |
| # `wait=False` (even without *cancel_futures*). |
| rc, out, err = assert_python_ok('-c', """if True: |
| from concurrent.futures import ThreadPoolExecutor |
| from test.test_concurrent_futures.test_shutdown import sleep_and_print |
| if __name__ == "__main__": |
| t = ThreadPoolExecutor() |
| t.submit(sleep_and_print, .1, "apple") |
| t.shutdown(wait=False, cancel_futures=True) |
| """) |
| # Errors in atexit hooks don't change the process exit code, check |
| # stderr manually. |
| self.assertFalse(err) |
| # gh-116682: stdout may be empty if shutdown happens before task |
| # starts executing. |
| self.assertIn(out.strip(), [b"apple", b""]) |
| |
| |
| class ProcessPoolShutdownTest(ExecutorShutdownTest): |
| def test_processes_terminate(self): |
| def acquire_lock(lock): |
| lock.acquire() |
| |
| mp_context = self.get_context() |
| if mp_context.get_start_method(allow_none=False) == "fork": |
| # fork pre-spawns, not on demand. |
| expected_num_processes = self.worker_count |
| else: |
| expected_num_processes = 3 |
| |
| sem = mp_context.Semaphore(0) |
| for _ in range(3): |
| self.executor.submit(acquire_lock, sem) |
| self.assertEqual(len(self.executor._processes), expected_num_processes) |
| for _ in range(3): |
| sem.release() |
| processes = self.executor._processes |
| self.executor.shutdown() |
| |
| for p in processes.values(): |
| p.join() |
| |
| def test_context_manager_shutdown(self): |
| with futures.ProcessPoolExecutor( |
| max_workers=5, mp_context=self.get_context()) as e: |
| processes = e._processes |
| self.assertEqual(list(e.map(abs, range(-5, 5))), |
| [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) |
| |
| for p in processes.values(): |
| p.join() |
| |
| def test_del_shutdown(self): |
| executor = futures.ProcessPoolExecutor( |
| max_workers=5, mp_context=self.get_context()) |
| res = executor.map(abs, range(-5, 5)) |
| executor_manager_thread = executor._executor_manager_thread |
| processes = executor._processes |
| call_queue = executor._call_queue |
| executor_manager_thread = executor._executor_manager_thread |
| del executor |
| support.gc_collect() # For PyPy or other GCs. |
| |
| # Make sure that all the executor resources were properly cleaned by |
| # the shutdown process |
| executor_manager_thread.join() |
| for p in processes.values(): |
| p.join() |
| call_queue.join_thread() |
| |
| # Make sure the results were all computed before the |
| # executor got shutdown. |
| assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) |
| |
| def test_shutdown_no_wait(self): |
| # Ensure that the executor cleans up the processes when calling |
| # shutdown with wait=False |
| executor = futures.ProcessPoolExecutor( |
| max_workers=5, mp_context=self.get_context()) |
| res = executor.map(abs, range(-5, 5)) |
| processes = executor._processes |
| call_queue = executor._call_queue |
| executor_manager_thread = executor._executor_manager_thread |
| executor.shutdown(wait=False) |
| |
| # Make sure that all the executor resources were properly cleaned by |
| # the shutdown process |
| executor_manager_thread.join() |
| for p in processes.values(): |
| p.join() |
| call_queue.join_thread() |
| |
| # Make sure the results were all computed before the executor got |
| # shutdown. |
| assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) |
| |
| |
| create_executor_tests(globals(), ProcessPoolShutdownTest, |
| executor_mixins=(ProcessPoolForkMixin, |
| ProcessPoolForkserverMixin, |
| ProcessPoolSpawnMixin)) |
| |
| |
| def setUpModule(): |
| setup_module() |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |