| import multiprocessing |
| import sys |
| import time |
| import unittest |
| from concurrent import futures |
| from concurrent.futures._base import ( |
| PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, |
| ) |
| from concurrent.futures.process import _check_system_limits |
| |
| from test import support |
| from test.support import threading_helper |
| |
| |
| def create_future(state=PENDING, exception=None, result=None): |
| f = Future() |
| f._state = state |
| f._exception = exception |
| f._result = result |
| return f |
| |
| |
| PENDING_FUTURE = create_future(state=PENDING) |
| RUNNING_FUTURE = create_future(state=RUNNING) |
| CANCELLED_FUTURE = create_future(state=CANCELLED) |
| CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) |
| EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) |
| SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) |
| |
| |
| class BaseTestCase(unittest.TestCase): |
| def setUp(self): |
| self._thread_key = threading_helper.threading_setup() |
| |
| def tearDown(self): |
| support.reap_children() |
| threading_helper.threading_cleanup(*self._thread_key) |
| |
| |
| class ExecutorMixin: |
| worker_count = 5 |
| executor_kwargs = {} |
| |
| def setUp(self): |
| super().setUp() |
| |
| self.t1 = time.monotonic() |
| if hasattr(self, "ctx"): |
| self.executor = self.executor_type( |
| max_workers=self.worker_count, |
| mp_context=self.get_context(), |
| **self.executor_kwargs) |
| else: |
| self.executor = self.executor_type( |
| max_workers=self.worker_count, |
| **self.executor_kwargs) |
| |
| def tearDown(self): |
| self.executor.shutdown(wait=True) |
| self.executor = None |
| |
| dt = time.monotonic() - self.t1 |
| if support.verbose: |
| print("%.2fs" % dt, end=' ') |
| self.assertLess(dt, 300, "synchronization issue: test lasted too long") |
| |
| super().tearDown() |
| |
| def get_context(self): |
| return multiprocessing.get_context(self.ctx) |
| |
| |
| class ThreadPoolMixin(ExecutorMixin): |
| executor_type = futures.ThreadPoolExecutor |
| |
| |
| class ProcessPoolForkMixin(ExecutorMixin): |
| executor_type = futures.ProcessPoolExecutor |
| ctx = "fork" |
| |
| def get_context(self): |
| try: |
| _check_system_limits() |
| except NotImplementedError: |
| self.skipTest("ProcessPoolExecutor unavailable on this system") |
| if sys.platform == "win32": |
| self.skipTest("require unix system") |
| if support.check_sanitizer(thread=True): |
| self.skipTest("TSAN doesn't support threads after fork") |
| return super().get_context() |
| |
| |
| class ProcessPoolSpawnMixin(ExecutorMixin): |
| executor_type = futures.ProcessPoolExecutor |
| ctx = "spawn" |
| |
| def get_context(self): |
| try: |
| _check_system_limits() |
| except NotImplementedError: |
| self.skipTest("ProcessPoolExecutor unavailable on this system") |
| return super().get_context() |
| |
| |
| class ProcessPoolForkserverMixin(ExecutorMixin): |
| executor_type = futures.ProcessPoolExecutor |
| ctx = "forkserver" |
| |
| def get_context(self): |
| try: |
| _check_system_limits() |
| except NotImplementedError: |
| self.skipTest("ProcessPoolExecutor unavailable on this system") |
| if sys.platform == "win32": |
| self.skipTest("require unix system") |
| if support.check_sanitizer(thread=True): |
| self.skipTest("TSAN doesn't support threads after fork") |
| return super().get_context() |
| |
| |
| def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,), |
| executor_mixins=(ThreadPoolMixin, |
| ProcessPoolForkMixin, |
| ProcessPoolForkserverMixin, |
| ProcessPoolSpawnMixin)): |
| def strip_mixin(name): |
| if name.endswith(('Mixin', 'Tests')): |
| return name[:-5] |
| elif name.endswith('Test'): |
| return name[:-4] |
| else: |
| return name |
| |
| module = remote_globals['__name__'] |
| for exe in executor_mixins: |
| name = ("%s%sTest" |
| % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) |
| cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module}) |
| remote_globals[name] = cls |
| |
| |
| def setup_module(): |
| try: |
| _check_system_limits() |
| except NotImplementedError: |
| pass |
| else: |
| unittest.addModuleCleanup(multiprocessing.util._cleanup_tests) |
| |
| thread_info = threading_helper.threading_setup() |
| unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) |