| import contextlib |
| import multiprocessing as mp |
| import multiprocessing.process |
| import multiprocessing.util |
| import os |
| import threading |
| import unittest |
| from concurrent import futures |
| from test import support |
| |
| from .executor import ExecutorTest, mul |
| from .util import BaseTestCase, ThreadPoolMixin, setup_module |
| |
| |
| class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): |
| def test_map_submits_without_iteration(self): |
| """Tests verifying issue 11777.""" |
| finished = [] |
| def record_finished(n): |
| finished.append(n) |
| |
| self.executor.map(record_finished, range(10)) |
| self.executor.shutdown(wait=True) |
| self.assertCountEqual(finished, range(10)) |
| |
| def test_default_workers(self): |
| executor = self.executor_type() |
| expected = min(32, (os.process_cpu_count() or 1) + 4) |
| self.assertEqual(executor._max_workers, expected) |
| |
| def test_saturation(self): |
| executor = self.executor_type(4) |
| def acquire_lock(lock): |
| lock.acquire() |
| |
| sem = threading.Semaphore(0) |
| for i in range(15 * executor._max_workers): |
| executor.submit(acquire_lock, sem) |
| self.assertEqual(len(executor._threads), executor._max_workers) |
| for i in range(15 * executor._max_workers): |
| sem.release() |
| executor.shutdown(wait=True) |
| |
| @support.requires_gil_enabled("gh-117344: test is flaky without the GIL") |
| def test_idle_thread_reuse(self): |
| executor = self.executor_type() |
| executor.submit(mul, 21, 2).result() |
| executor.submit(mul, 6, 7).result() |
| executor.submit(mul, 3, 14).result() |
| self.assertEqual(len(executor._threads), 1) |
| executor.shutdown(wait=True) |
| |
| @support.requires_fork() |
| @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork') |
| @support.requires_resource('cpu') |
| def test_hang_global_shutdown_lock(self): |
| # bpo-45021: _global_shutdown_lock should be reinitialized in the child |
| # process, otherwise it will never exit |
| def submit(pool): |
| pool.submit(submit, pool) |
| |
| with futures.ThreadPoolExecutor(1) as pool: |
| pool.submit(submit, pool) |
| |
| for _ in range(50): |
| with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers: |
| workers.submit(tuple) |
| |
| def test_executor_map_current_future_cancel(self): |
| stop_event = threading.Event() |
| log = [] |
| |
| def log_n_wait(ident): |
| log.append(f"{ident=} started") |
| try: |
| stop_event.wait() |
| finally: |
| log.append(f"{ident=} stopped") |
| |
| with self.executor_type(max_workers=1) as pool: |
| # submit work to saturate the pool |
| fut = pool.submit(log_n_wait, ident="first") |
| try: |
| with contextlib.closing( |
| pool.map(log_n_wait, ["second", "third"], timeout=0) |
| ) as gen: |
| with self.assertRaises(TimeoutError): |
| next(gen) |
| finally: |
| stop_event.set() |
| fut.result() |
| # ident='second' is cancelled as a result of raising a TimeoutError |
| # ident='third' is cancelled because it remained in the collection of futures |
| self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) |
| |
| |
| def setUpModule(): |
| setup_module() |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |