| # |
| # Unit tests for the multiprocessing package |
| # |
| |
| import unittest |
| import unittest.mock |
| import queue as pyqueue |
| import textwrap |
| import time |
| import io |
| import itertools |
| import sys |
| import os |
| import gc |
| import importlib |
| import errno |
| import functools |
| import signal |
| import array |
| import collections.abc |
| import socket |
| import random |
| import logging |
| import shutil |
| import subprocess |
| import struct |
| import tempfile |
| import operator |
| import pickle |
| import weakref |
| import warnings |
| import test.support |
| import test.support.script_helper |
| from test import support |
| from test.support import hashlib_helper |
| from test.support import import_helper |
| from test.support import os_helper |
| from test.support import script_helper |
| from test.support import socket_helper |
| from test.support import threading_helper |
| from test.support import warnings_helper |
| |
| |
| # Skip tests if _multiprocessing wasn't built. |
| _multiprocessing = import_helper.import_module('_multiprocessing') |
| # Skip tests if sem_open implementation is broken. |
| support.skip_if_broken_multiprocessing_synchronize() |
| import threading |
| |
| import multiprocessing.connection |
| import multiprocessing.dummy |
| import multiprocessing.heap |
| import multiprocessing.managers |
| import multiprocessing.pool |
| import multiprocessing.queues |
| from multiprocessing.connection import wait |
| |
| from multiprocessing import util |
| |
| try: |
| from multiprocessing import reduction |
| HAS_REDUCTION = reduction.HAVE_SEND_HANDLE |
| except ImportError: |
| HAS_REDUCTION = False |
| |
| try: |
| from multiprocessing.sharedctypes import Value, copy |
| HAS_SHAREDCTYPES = True |
| except ImportError: |
| HAS_SHAREDCTYPES = False |
| |
| try: |
| from multiprocessing import shared_memory |
| HAS_SHMEM = True |
| except ImportError: |
| HAS_SHMEM = False |
| |
| try: |
| import msvcrt |
| except ImportError: |
| msvcrt = None |
| |
| |
| if support.HAVE_ASAN_FORK_BUG: |
| # gh-89363: Skip multiprocessing tests if Python is built with ASAN to |
| # work around a libasan race condition: dead lock in pthread_create(). |
| raise unittest.SkipTest("libasan has a pthread_create() dead lock related to thread+fork") |
| |
| |
| # gh-110666: Tolerate a difference of 100 ms when comparing timings |
| # (clock resolution) |
| CLOCK_RES = 0.100 |
| |
| |
| def latin(s): |
| return s.encode('latin') |
| |
| |
| def close_queue(queue): |
| if isinstance(queue, multiprocessing.queues.Queue): |
| queue.close() |
| queue.join_thread() |
| |
| |
| def join_process(process): |
| # Since multiprocessing.Process has the same API than threading.Thread |
| # (join() and is_alive(), the support function can be reused |
| threading_helper.join_thread(process) |
| |
| |
| if os.name == "posix": |
| from multiprocessing import resource_tracker |
| |
| def _resource_unlink(name, rtype): |
| resource_tracker._CLEANUP_FUNCS[rtype](name) |
| |
| |
| # |
| # Constants |
| # |
| |
| LOG_LEVEL = util.SUBWARNING |
| #LOG_LEVEL = logging.DEBUG |
| |
| DELTA = 0.1 |
| CHECK_TIMINGS = False # making true makes tests take a lot longer |
| # and can sometimes cause some non-serious |
| # failures because some calls block a bit |
| # longer than expected |
| if CHECK_TIMINGS: |
| TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 |
| else: |
| TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 |
| |
| # BaseManager.shutdown_timeout |
| SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT |
| |
| WAIT_ACTIVE_CHILDREN_TIMEOUT = 5.0 |
| |
| HAVE_GETVALUE = not getattr(_multiprocessing, |
| 'HAVE_BROKEN_SEM_GETVALUE', False) |
| |
| WIN32 = (sys.platform == "win32") |
| |
| def wait_for_handle(handle, timeout): |
| if timeout is not None and timeout < 0.0: |
| timeout = None |
| return wait([handle], timeout) |
| |
| try: |
| MAXFD = os.sysconf("SC_OPEN_MAX") |
| except: |
| MAXFD = 256 |
| |
| # To speed up tests when using the forkserver, we can preload these: |
| PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] |
| |
| # |
| # Some tests require ctypes |
| # |
| |
| try: |
| from ctypes import Structure, c_int, c_double, c_longlong |
| except ImportError: |
| Structure = object |
| c_int = c_double = c_longlong = None |
| |
| |
| def check_enough_semaphores(): |
| """Check that the system supports enough semaphores to run the test.""" |
| # minimum number of semaphores available according to POSIX |
| nsems_min = 256 |
| try: |
| nsems = os.sysconf("SC_SEM_NSEMS_MAX") |
| except (AttributeError, ValueError): |
| # sysconf not available or setting not available |
| return |
| if nsems == -1 or nsems >= nsems_min: |
| return |
| raise unittest.SkipTest("The OS doesn't support enough semaphores " |
| "to run the test (required: %d)." % nsems_min) |
| |
| |
| def only_run_in_spawn_testsuite(reason): |
| """Returns a decorator: raises SkipTest when SM != spawn at test time. |
| |
| This can be useful to save overall Python test suite execution time. |
| "spawn" is the universal mode available on all platforms so this limits the |
| decorated test to only execute within test_multiprocessing_spawn. |
| |
| This would not be necessary if we refactored our test suite to split things |
| into other test files when they are not start method specific to be rerun |
| under all start methods. |
| """ |
| |
| def decorator(test_item): |
| |
| @functools.wraps(test_item) |
| def spawn_check_wrapper(*args, **kwargs): |
| if (start_method := multiprocessing.get_start_method()) != "spawn": |
| raise unittest.SkipTest(f"{start_method=}, not 'spawn'; {reason}") |
| return test_item(*args, **kwargs) |
| |
| return spawn_check_wrapper |
| |
| return decorator |
| |
| |
| class TestInternalDecorators(unittest.TestCase): |
| """Logic within a test suite that could errantly skip tests? Test it!""" |
| |
| @unittest.skipIf(sys.platform == "win32", "test requires that fork exists.") |
| def test_only_run_in_spawn_testsuite(self): |
| if multiprocessing.get_start_method() != "spawn": |
| raise unittest.SkipTest("only run in test_multiprocessing_spawn.") |
| |
| try: |
| @only_run_in_spawn_testsuite("testing this decorator") |
| def return_four_if_spawn(): |
| return 4 |
| except Exception as err: |
| self.fail(f"expected decorated `def` not to raise; caught {err}") |
| |
| orig_start_method = multiprocessing.get_start_method(allow_none=True) |
| try: |
| multiprocessing.set_start_method("spawn", force=True) |
| self.assertEqual(return_four_if_spawn(), 4) |
| multiprocessing.set_start_method("fork", force=True) |
| with self.assertRaises(unittest.SkipTest) as ctx: |
| return_four_if_spawn() |
| self.assertIn("testing this decorator", str(ctx.exception)) |
| self.assertIn("start_method=", str(ctx.exception)) |
| finally: |
| multiprocessing.set_start_method(orig_start_method, force=True) |
| |
| |
| # |
| # Creates a wrapper for a function which records the time it takes to finish |
| # |
| |
| class TimingWrapper(object): |
| |
| def __init__(self, func): |
| self.func = func |
| self.elapsed = None |
| |
| def __call__(self, *args, **kwds): |
| t = time.monotonic() |
| try: |
| return self.func(*args, **kwds) |
| finally: |
| self.elapsed = time.monotonic() - t |
| |
| # |
| # Base class for test cases |
| # |
| |
| class BaseTestCase(object): |
| |
| ALLOWED_TYPES = ('processes', 'manager', 'threads') |
| # If not empty, limit which start method suites run this class. |
| START_METHODS: set[str] = set() |
| start_method = None # set by install_tests_in_module_dict() |
| |
| def assertTimingAlmostEqual(self, a, b): |
| if CHECK_TIMINGS: |
| self.assertAlmostEqual(a, b, 1) |
| |
| def assertReturnsIfImplemented(self, value, func, *args): |
| try: |
| res = func(*args) |
| except NotImplementedError: |
| pass |
| else: |
| return self.assertEqual(value, res) |
| |
| # For the sanity of Windows users, rather than crashing or freezing in |
| # multiple ways. |
| def __reduce__(self, *args): |
| raise NotImplementedError("shouldn't try to pickle a test case") |
| |
| __reduce_ex__ = __reduce__ |
| |
| # |
| # Return the value of a semaphore |
| # |
| |
| def get_value(self): |
| try: |
| return self.get_value() |
| except AttributeError: |
| try: |
| return self._Semaphore__value |
| except AttributeError: |
| try: |
| return self._value |
| except AttributeError: |
| raise NotImplementedError |
| |
| # |
| # Testcases |
| # |
| |
| class DummyCallable: |
| def __call__(self, q, c): |
| assert isinstance(c, DummyCallable) |
| q.put(5) |
| |
| |
| class _TestProcess(BaseTestCase): |
| |
| ALLOWED_TYPES = ('processes', 'threads') |
| |
| def test_current(self): |
| if self.TYPE == 'threads': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| current = self.current_process() |
| authkey = current.authkey |
| |
| self.assertTrue(current.is_alive()) |
| self.assertTrue(not current.daemon) |
| self.assertIsInstance(authkey, bytes) |
| self.assertTrue(len(authkey) > 0) |
| self.assertEqual(current.ident, os.getpid()) |
| self.assertEqual(current.exitcode, None) |
| |
| def test_set_executable(self): |
| if self.TYPE == 'threads': |
| self.skipTest(f'test not appropriate for {self.TYPE}') |
| paths = [ |
| sys.executable, # str |
| os.fsencode(sys.executable), # bytes |
| os_helper.FakePath(sys.executable), # os.PathLike |
| os_helper.FakePath(os.fsencode(sys.executable)), # os.PathLike bytes |
| ] |
| for path in paths: |
| self.set_executable(path) |
| p = self.Process() |
| p.start() |
| p.join() |
| self.assertEqual(p.exitcode, 0) |
| |
| @support.requires_resource('cpu') |
| def test_args_argument(self): |
| # bpo-45735: Using list or tuple as *args* in constructor could |
| # achieve the same effect. |
| args_cases = (1, "str", [1], (1,)) |
| args_types = (list, tuple) |
| |
| test_cases = itertools.product(args_cases, args_types) |
| |
| for args, args_type in test_cases: |
| with self.subTest(args=args, args_type=args_type): |
| q = self.Queue(1) |
| # pass a tuple or list as args |
| p = self.Process(target=self._test_args, args=args_type((q, args))) |
| p.daemon = True |
| p.start() |
| child_args = q.get() |
| self.assertEqual(child_args, args) |
| p.join() |
| close_queue(q) |
| |
| @classmethod |
| def _test_args(cls, q, arg): |
| q.put(arg) |
| |
| def test_daemon_argument(self): |
| if self.TYPE == "threads": |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| # By default uses the current process's daemon flag. |
| proc0 = self.Process(target=self._test) |
| self.assertEqual(proc0.daemon, self.current_process().daemon) |
| proc1 = self.Process(target=self._test, daemon=True) |
| self.assertTrue(proc1.daemon) |
| proc2 = self.Process(target=self._test, daemon=False) |
| self.assertFalse(proc2.daemon) |
| |
| @classmethod |
| def _test(cls, q, *args, **kwds): |
| current = cls.current_process() |
| q.put(args) |
| q.put(kwds) |
| q.put(current.name) |
| if cls.TYPE != 'threads': |
| q.put(bytes(current.authkey)) |
| q.put(current.pid) |
| |
| def test_parent_process_attributes(self): |
| if self.TYPE == "threads": |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| self.assertIsNone(self.parent_process()) |
| |
| rconn, wconn = self.Pipe(duplex=False) |
| p = self.Process(target=self._test_send_parent_process, args=(wconn,)) |
| p.start() |
| p.join() |
| parent_pid, parent_name = rconn.recv() |
| self.assertEqual(parent_pid, self.current_process().pid) |
| self.assertEqual(parent_pid, os.getpid()) |
| self.assertEqual(parent_name, self.current_process().name) |
| |
| @classmethod |
| def _test_send_parent_process(cls, wconn): |
| from multiprocessing.process import parent_process |
| wconn.send([parent_process().pid, parent_process().name]) |
| |
| def test_parent_process(self): |
| if self.TYPE == "threads": |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| # Launch a child process. Make it launch a grandchild process. Kill the |
| # child process and make sure that the grandchild notices the death of |
| # its parent (a.k.a the child process). |
| rconn, wconn = self.Pipe(duplex=False) |
| p = self.Process( |
| target=self._test_create_grandchild_process, args=(wconn, )) |
| p.start() |
| |
| if not rconn.poll(timeout=support.LONG_TIMEOUT): |
| raise AssertionError("Could not communicate with child process") |
| parent_process_status = rconn.recv() |
| self.assertEqual(parent_process_status, "alive") |
| |
| p.terminate() |
| p.join() |
| |
| if not rconn.poll(timeout=support.LONG_TIMEOUT): |
| raise AssertionError("Could not communicate with child process") |
| parent_process_status = rconn.recv() |
| self.assertEqual(parent_process_status, "not alive") |
| |
| @classmethod |
| def _test_create_grandchild_process(cls, wconn): |
| p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) |
| p.start() |
| time.sleep(300) |
| |
| @classmethod |
| def _test_report_parent_status(cls, wconn): |
| from multiprocessing.process import parent_process |
| wconn.send("alive" if parent_process().is_alive() else "not alive") |
| parent_process().join(timeout=support.SHORT_TIMEOUT) |
| wconn.send("alive" if parent_process().is_alive() else "not alive") |
| |
| def test_process(self): |
| q = self.Queue(1) |
| e = self.Event() |
| args = (q, 1, 2) |
| kwargs = {'hello':23, 'bye':2.54} |
| name = 'SomeProcess' |
| p = self.Process( |
| target=self._test, args=args, kwargs=kwargs, name=name |
| ) |
| p.daemon = True |
| current = self.current_process() |
| |
| if self.TYPE != 'threads': |
| self.assertEqual(p.authkey, current.authkey) |
| self.assertEqual(p.is_alive(), False) |
| self.assertEqual(p.daemon, True) |
| self.assertNotIn(p, self.active_children()) |
| self.assertTrue(type(self.active_children()) is list) |
| self.assertEqual(p.exitcode, None) |
| |
| p.start() |
| |
| self.assertEqual(p.exitcode, None) |
| self.assertEqual(p.is_alive(), True) |
| self.assertIn(p, self.active_children()) |
| |
| self.assertEqual(q.get(), args[1:]) |
| self.assertEqual(q.get(), kwargs) |
| self.assertEqual(q.get(), p.name) |
| if self.TYPE != 'threads': |
| self.assertEqual(q.get(), current.authkey) |
| self.assertEqual(q.get(), p.pid) |
| |
| p.join() |
| |
| self.assertEqual(p.exitcode, 0) |
| self.assertEqual(p.is_alive(), False) |
| self.assertNotIn(p, self.active_children()) |
| close_queue(q) |
| |
| @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id") |
| def test_process_mainthread_native_id(self): |
| if self.TYPE == 'threads': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| current_mainthread_native_id = threading.main_thread().native_id |
| |
| q = self.Queue(1) |
| p = self.Process(target=self._test_process_mainthread_native_id, args=(q,)) |
| p.start() |
| |
| child_mainthread_native_id = q.get() |
| p.join() |
| close_queue(q) |
| |
| self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id) |
| |
| @classmethod |
| def _test_process_mainthread_native_id(cls, q): |
| mainthread_native_id = threading.main_thread().native_id |
| q.put(mainthread_native_id) |
| |
| @classmethod |
| def _sleep_some(cls): |
| time.sleep(100) |
| |
| @classmethod |
| def _test_sleep(cls, delay): |
| time.sleep(delay) |
| |
| def _kill_process(self, meth): |
| if self.TYPE == 'threads': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| p = self.Process(target=self._sleep_some) |
| p.daemon = True |
| p.start() |
| |
| self.assertEqual(p.is_alive(), True) |
| self.assertIn(p, self.active_children()) |
| self.assertEqual(p.exitcode, None) |
| |
| join = TimingWrapper(p.join) |
| |
| self.assertEqual(join(0), None) |
| self.assertTimingAlmostEqual(join.elapsed, 0.0) |
| self.assertEqual(p.is_alive(), True) |
| |
| self.assertEqual(join(-1), None) |
| self.assertTimingAlmostEqual(join.elapsed, 0.0) |
| self.assertEqual(p.is_alive(), True) |
| |
| # XXX maybe terminating too soon causes the problems on Gentoo... |
| time.sleep(1) |
| |
| meth(p) |
| |
| if hasattr(signal, 'alarm'): |
| # On the Gentoo buildbot waitpid() often seems to block forever. |
| # We use alarm() to interrupt it if it blocks for too long. |
| def handler(*args): |
| raise RuntimeError('join took too long: %s' % p) |
| old_handler = signal.signal(signal.SIGALRM, handler) |
| try: |
| signal.alarm(10) |
| self.assertEqual(join(), None) |
| finally: |
| signal.alarm(0) |
| signal.signal(signal.SIGALRM, old_handler) |
| else: |
| self.assertEqual(join(), None) |
| |
| self.assertTimingAlmostEqual(join.elapsed, 0.0) |
| |
| self.assertEqual(p.is_alive(), False) |
| self.assertNotIn(p, self.active_children()) |
| |
| p.join() |
| |
| return p.exitcode |
| |
| def test_terminate(self): |
| exitcode = self._kill_process(multiprocessing.Process.terminate) |
| self.assertEqual(exitcode, -signal.SIGTERM) |
| |
| def test_kill(self): |
| exitcode = self._kill_process(multiprocessing.Process.kill) |
| if os.name != 'nt': |
| self.assertEqual(exitcode, -signal.SIGKILL) |
| else: |
| self.assertEqual(exitcode, -signal.SIGTERM) |
| |
| def test_cpu_count(self): |
| try: |
| cpus = multiprocessing.cpu_count() |
| except NotImplementedError: |
| cpus = 1 |
| self.assertTrue(type(cpus) is int) |
| self.assertTrue(cpus >= 1) |
| |
| def test_active_children(self): |
| self.assertEqual(type(self.active_children()), list) |
| |
| p = self.Process(target=time.sleep, args=(DELTA,)) |
| self.assertNotIn(p, self.active_children()) |
| |
| p.daemon = True |
| p.start() |
| self.assertIn(p, self.active_children()) |
| |
| p.join() |
| self.assertNotIn(p, self.active_children()) |
| |
| @classmethod |
| def _test_recursion(cls, wconn, id): |
| wconn.send(id) |
| if len(id) < 2: |
| for i in range(2): |
| p = cls.Process( |
| target=cls._test_recursion, args=(wconn, id+[i]) |
| ) |
| p.start() |
| p.join() |
| |
| def test_recursion(self): |
| rconn, wconn = self.Pipe(duplex=False) |
| self._test_recursion(wconn, []) |
| |
| time.sleep(DELTA) |
| result = [] |
| while rconn.poll(): |
| result.append(rconn.recv()) |
| |
| expected = [ |
| [], |
| [0], |
| [0, 0], |
| [0, 1], |
| [1], |
| [1, 0], |
| [1, 1] |
| ] |
| self.assertEqual(result, expected) |
| |
| @classmethod |
| def _test_sentinel(cls, event): |
| event.wait(10.0) |
| |
| def test_sentinel(self): |
| if self.TYPE == "threads": |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| event = self.Event() |
| p = self.Process(target=self._test_sentinel, args=(event,)) |
| with self.assertRaises(ValueError): |
| p.sentinel |
| p.start() |
| self.addCleanup(p.join) |
| sentinel = p.sentinel |
| self.assertIsInstance(sentinel, int) |
| self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) |
| event.set() |
| p.join() |
| self.assertTrue(wait_for_handle(sentinel, timeout=1)) |
| |
| @classmethod |
| def _test_close(cls, rc=0, q=None): |
| if q is not None: |
| q.get() |
| sys.exit(rc) |
| |
| def test_close(self): |
| if self.TYPE == "threads": |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| q = self.Queue() |
| p = self.Process(target=self._test_close, kwargs={'q': q}) |
| p.daemon = True |
| p.start() |
| self.assertEqual(p.is_alive(), True) |
| # Child is still alive, cannot close |
| with self.assertRaises(ValueError): |
| p.close() |
| |
| q.put(None) |
| p.join() |
| self.assertEqual(p.is_alive(), False) |
| self.assertEqual(p.exitcode, 0) |
| p.close() |
| with self.assertRaises(ValueError): |
| p.is_alive() |
| with self.assertRaises(ValueError): |
| p.join() |
| with self.assertRaises(ValueError): |
| p.terminate() |
| p.close() |
| |
| wr = weakref.ref(p) |
| del p |
| gc.collect() |
| self.assertIs(wr(), None) |
| |
| close_queue(q) |
| |
| @support.requires_resource('walltime') |
| def test_many_processes(self): |
| if self.TYPE == 'threads': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| sm = multiprocessing.get_start_method() |
| N = 5 if sm == 'spawn' else 100 |
| |
| # Try to overwhelm the forkserver loop with events |
| procs = [self.Process(target=self._test_sleep, args=(0.01,)) |
| for i in range(N)] |
| for p in procs: |
| p.start() |
| for p in procs: |
| join_process(p) |
| for p in procs: |
| self.assertEqual(p.exitcode, 0) |
| |
| procs = [self.Process(target=self._sleep_some) |
| for i in range(N)] |
| for p in procs: |
| p.start() |
| time.sleep(0.001) # let the children start... |
| for p in procs: |
| p.terminate() |
| for p in procs: |
| join_process(p) |
| if os.name != 'nt': |
| exitcodes = [-signal.SIGTERM] |
| if sys.platform == 'darwin': |
| # bpo-31510: On macOS, killing a freshly started process with |
| # SIGTERM sometimes kills the process with SIGKILL. |
| exitcodes.append(-signal.SIGKILL) |
| for p in procs: |
| self.assertIn(p.exitcode, exitcodes) |
| |
| def test_lose_target_ref(self): |
| c = DummyCallable() |
| wr = weakref.ref(c) |
| q = self.Queue() |
| p = self.Process(target=c, args=(q, c)) |
| del c |
| p.start() |
| p.join() |
| gc.collect() # For PyPy or other GCs. |
| self.assertIs(wr(), None) |
| self.assertEqual(q.get(), 5) |
| close_queue(q) |
| |
| @classmethod |
| def _test_child_fd_inflation(self, evt, q): |
| q.put(os_helper.fd_count()) |
| evt.wait() |
| |
| def test_child_fd_inflation(self): |
| # Number of fds in child processes should not grow with the |
| # number of running children. |
| if self.TYPE == 'threads': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| sm = multiprocessing.get_start_method() |
| if sm == 'fork': |
| # The fork method by design inherits all fds from the parent, |
| # trying to go against it is a lost battle |
| self.skipTest('test not appropriate for {}'.format(sm)) |
| |
| N = 5 |
| evt = self.Event() |
| q = self.Queue() |
| |
| procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) |
| for i in range(N)] |
| for p in procs: |
| p.start() |
| |
| try: |
| fd_counts = [q.get() for i in range(N)] |
| self.assertEqual(len(set(fd_counts)), 1, fd_counts) |
| |
| finally: |
| evt.set() |
| for p in procs: |
| p.join() |
| close_queue(q) |
| |
| @classmethod |
| def _test_wait_for_threads(self, evt): |
| def func1(): |
| time.sleep(0.5) |
| evt.set() |
| |
| def func2(): |
| time.sleep(20) |
| evt.clear() |
| |
| threading.Thread(target=func1).start() |
| threading.Thread(target=func2, daemon=True).start() |
| |
| def test_wait_for_threads(self): |
| # A child process should wait for non-daemonic threads to end |
| # before exiting |
| if self.TYPE == 'threads': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| evt = self.Event() |
| proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) |
| proc.start() |
| proc.join() |
| self.assertTrue(evt.is_set()) |
| |
| @classmethod |
| def _test_error_on_stdio_flush(self, evt, break_std_streams={}): |
| for stream_name, action in break_std_streams.items(): |
| if action == 'close': |
| stream = io.StringIO() |
| stream.close() |
| else: |
| assert action == 'remove' |
| stream = None |
| setattr(sys, stream_name, None) |
| evt.set() |
| |
| def test_error_on_stdio_flush_1(self): |
| # Check that Process works with broken standard streams |
| streams = [io.StringIO(), None] |
| streams[0].close() |
| for stream_name in ('stdout', 'stderr'): |
| for stream in streams: |
| old_stream = getattr(sys, stream_name) |
| setattr(sys, stream_name, stream) |
| try: |
| evt = self.Event() |
| proc = self.Process(target=self._test_error_on_stdio_flush, |
| args=(evt,)) |
| proc.start() |
| proc.join() |
| self.assertTrue(evt.is_set()) |
| self.assertEqual(proc.exitcode, 0) |
| finally: |
| setattr(sys, stream_name, old_stream) |
| |
| def test_error_on_stdio_flush_2(self): |
| # Same as test_error_on_stdio_flush_1(), but standard streams are |
| # broken by the child process |
| for stream_name in ('stdout', 'stderr'): |
| for action in ('close', 'remove'): |
| old_stream = getattr(sys, stream_name) |
| try: |
| evt = self.Event() |
| proc = self.Process(target=self._test_error_on_stdio_flush, |
| args=(evt, {stream_name: action})) |
| proc.start() |
| proc.join() |
| self.assertTrue(evt.is_set()) |
| self.assertEqual(proc.exitcode, 0) |
| finally: |
| setattr(sys, stream_name, old_stream) |
| |
| @classmethod |
| def _sleep_and_set_event(self, evt, delay=0.0): |
| time.sleep(delay) |
| evt.set() |
| |
| def check_forkserver_death(self, signum): |
| # bpo-31308: if the forkserver process has died, we should still |
| # be able to create and run new Process instances (the forkserver |
| # is implicitly restarted). |
| if self.TYPE == 'threads': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| sm = multiprocessing.get_start_method() |
| if sm != 'forkserver': |
| # The fork method by design inherits all fds from the parent, |
| # trying to go against it is a lost battle |
| self.skipTest('test not appropriate for {}'.format(sm)) |
| |
| from multiprocessing.forkserver import _forkserver |
| _forkserver.ensure_running() |
| |
| # First process sleeps 500 ms |
| delay = 0.5 |
| |
| evt = self.Event() |
| proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) |
| proc.start() |
| |
| pid = _forkserver._forkserver_pid |
| os.kill(pid, signum) |
| # give time to the fork server to die and time to proc to complete |
| time.sleep(delay * 2.0) |
| |
| evt2 = self.Event() |
| proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) |
| proc2.start() |
| proc2.join() |
| self.assertTrue(evt2.is_set()) |
| self.assertEqual(proc2.exitcode, 0) |
| |
| proc.join() |
| self.assertTrue(evt.is_set()) |
| self.assertIn(proc.exitcode, (0, 255)) |
| |
| def test_forkserver_sigint(self): |
| # Catchable signal |
| self.check_forkserver_death(signal.SIGINT) |
| |
| def test_forkserver_sigkill(self): |
| # Uncatchable signal |
| if os.name != 'nt': |
| self.check_forkserver_death(signal.SIGKILL) |
| |
| |
| # |
| # |
| # |
| |
| class _UpperCaser(multiprocessing.Process): |
| |
| def __init__(self): |
| multiprocessing.Process.__init__(self) |
| self.child_conn, self.parent_conn = multiprocessing.Pipe() |
| |
| def run(self): |
| self.parent_conn.close() |
| for s in iter(self.child_conn.recv, None): |
| self.child_conn.send(s.upper()) |
| self.child_conn.close() |
| |
| def submit(self, s): |
| assert type(s) is str |
| self.parent_conn.send(s) |
| return self.parent_conn.recv() |
| |
| def stop(self): |
| self.parent_conn.send(None) |
| self.parent_conn.close() |
| self.child_conn.close() |
| |
| class _TestSubclassingProcess(BaseTestCase): |
| |
| ALLOWED_TYPES = ('processes',) |
| |
| def test_subclassing(self): |
| uppercaser = _UpperCaser() |
| uppercaser.daemon = True |
| uppercaser.start() |
| self.assertEqual(uppercaser.submit('hello'), 'HELLO') |
| self.assertEqual(uppercaser.submit('world'), 'WORLD') |
| uppercaser.stop() |
| uppercaser.join() |
| |
| def test_stderr_flush(self): |
| # sys.stderr is flushed at process shutdown (issue #13812) |
| if self.TYPE == "threads": |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| testfn = os_helper.TESTFN |
| self.addCleanup(os_helper.unlink, testfn) |
| proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) |
| proc.start() |
| proc.join() |
| with open(testfn, encoding="utf-8") as f: |
| err = f.read() |
| # The whole traceback was printed |
| self.assertIn("ZeroDivisionError", err) |
| self.assertIn("test_multiprocessing.py", err) |
| self.assertIn("1/0 # MARKER", err) |
| |
| @classmethod |
| def _test_stderr_flush(cls, testfn): |
| fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) |
| sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) |
| 1/0 # MARKER |
| |
| |
| @classmethod |
| def _test_sys_exit(cls, reason, testfn): |
| fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) |
| sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) |
| sys.exit(reason) |
| |
| def test_sys_exit(self): |
| # See Issue 13854 |
| if self.TYPE == 'threads': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| testfn = os_helper.TESTFN |
| self.addCleanup(os_helper.unlink, testfn) |
| |
| for reason in ( |
| [1, 2, 3], |
| 'ignore this', |
| ): |
| p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) |
| p.daemon = True |
| p.start() |
| join_process(p) |
| self.assertEqual(p.exitcode, 1) |
| |
| with open(testfn, encoding="utf-8") as f: |
| content = f.read() |
| self.assertEqual(content.rstrip(), str(reason)) |
| |
| os.unlink(testfn) |
| |
| cases = [ |
| ((True,), 1), |
| ((False,), 0), |
| ((8,), 8), |
| ((None,), 0), |
| ((), 0), |
| ] |
| |
| for args, expected in cases: |
| with self.subTest(args=args): |
| p = self.Process(target=sys.exit, args=args) |
| p.daemon = True |
| p.start() |
| join_process(p) |
| self.assertEqual(p.exitcode, expected) |
| |
| # |
| # |
| # |
| |
| def queue_empty(q): |
| if hasattr(q, 'empty'): |
| return q.empty() |
| else: |
| return q.qsize() == 0 |
| |
| def queue_full(q, maxsize): |
| if hasattr(q, 'full'): |
| return q.full() |
| else: |
| return q.qsize() == maxsize |
| |
| |
| class _TestQueue(BaseTestCase): |
| |
| |
| @classmethod |
| def _test_put(cls, queue, child_can_start, parent_can_continue): |
| child_can_start.wait() |
| for i in range(6): |
| queue.get() |
| parent_can_continue.set() |
| |
| def test_put(self): |
| MAXSIZE = 6 |
| queue = self.Queue(maxsize=MAXSIZE) |
| child_can_start = self.Event() |
| parent_can_continue = self.Event() |
| |
| proc = self.Process( |
| target=self._test_put, |
| args=(queue, child_can_start, parent_can_continue) |
| ) |
| proc.daemon = True |
| proc.start() |
| |
| self.assertEqual(queue_empty(queue), True) |
| self.assertEqual(queue_full(queue, MAXSIZE), False) |
| |
| queue.put(1) |
| queue.put(2, True) |
| queue.put(3, True, None) |
| queue.put(4, False) |
| queue.put(5, False, None) |
| queue.put_nowait(6) |
| |
| # the values may be in buffer but not yet in pipe so sleep a bit |
| time.sleep(DELTA) |
| |
| self.assertEqual(queue_empty(queue), False) |
| self.assertEqual(queue_full(queue, MAXSIZE), True) |
| |
| put = TimingWrapper(queue.put) |
| put_nowait = TimingWrapper(queue.put_nowait) |
| |
| self.assertRaises(pyqueue.Full, put, 7, False) |
| self.assertTimingAlmostEqual(put.elapsed, 0) |
| |
| self.assertRaises(pyqueue.Full, put, 7, False, None) |
| self.assertTimingAlmostEqual(put.elapsed, 0) |
| |
| self.assertRaises(pyqueue.Full, put_nowait, 7) |
| self.assertTimingAlmostEqual(put_nowait.elapsed, 0) |
| |
| self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) |
| self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) |
| |
| self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) |
| self.assertTimingAlmostEqual(put.elapsed, 0) |
| |
| self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) |
| self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) |
| |
| child_can_start.set() |
| parent_can_continue.wait() |
| |
| self.assertEqual(queue_empty(queue), True) |
| self.assertEqual(queue_full(queue, MAXSIZE), False) |
| |
| proc.join() |
| close_queue(queue) |
| |
| @classmethod |
| def _test_get(cls, queue, child_can_start, parent_can_continue): |
| child_can_start.wait() |
| #queue.put(1) |
| queue.put(2) |
| queue.put(3) |
| queue.put(4) |
| queue.put(5) |
| parent_can_continue.set() |
| |
| def test_get(self): |
| queue = self.Queue() |
| child_can_start = self.Event() |
| parent_can_continue = self.Event() |
| |
| proc = self.Process( |
| target=self._test_get, |
| args=(queue, child_can_start, parent_can_continue) |
| ) |
| proc.daemon = True |
| proc.start() |
| |
| self.assertEqual(queue_empty(queue), True) |
| |
| child_can_start.set() |
| parent_can_continue.wait() |
| |
| time.sleep(DELTA) |
| self.assertEqual(queue_empty(queue), False) |
| |
| # Hangs unexpectedly, remove for now |
| #self.assertEqual(queue.get(), 1) |
| self.assertEqual(queue.get(True, None), 2) |
| self.assertEqual(queue.get(True), 3) |
| self.assertEqual(queue.get(timeout=1), 4) |
| self.assertEqual(queue.get_nowait(), 5) |
| |
| self.assertEqual(queue_empty(queue), True) |
| |
| get = TimingWrapper(queue.get) |
| get_nowait = TimingWrapper(queue.get_nowait) |
| |
| self.assertRaises(pyqueue.Empty, get, False) |
| self.assertTimingAlmostEqual(get.elapsed, 0) |
| |
| self.assertRaises(pyqueue.Empty, get, False, None) |
| self.assertTimingAlmostEqual(get.elapsed, 0) |
| |
| self.assertRaises(pyqueue.Empty, get_nowait) |
| self.assertTimingAlmostEqual(get_nowait.elapsed, 0) |
| |
| self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) |
| self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) |
| |
| self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) |
| self.assertTimingAlmostEqual(get.elapsed, 0) |
| |
| self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) |
| self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) |
| |
| proc.join() |
| close_queue(queue) |
| |
| @classmethod |
| def _test_fork(cls, queue): |
| for i in range(10, 20): |
| queue.put(i) |
| # note that at this point the items may only be buffered, so the |
| # process cannot shutdown until the feeder thread has finished |
| # pushing items onto the pipe. |
| |
| def test_fork(self): |
| # Old versions of Queue would fail to create a new feeder |
| # thread for a forked process if the original process had its |
| # own feeder thread. This test checks that this no longer |
| # happens. |
| |
| queue = self.Queue() |
| |
| # put items on queue so that main process starts a feeder thread |
| for i in range(10): |
| queue.put(i) |
| |
| # wait to make sure thread starts before we fork a new process |
| time.sleep(DELTA) |
| |
| # fork process |
| p = self.Process(target=self._test_fork, args=(queue,)) |
| p.daemon = True |
| p.start() |
| |
| # check that all expected items are in the queue |
| for i in range(20): |
| self.assertEqual(queue.get(), i) |
| self.assertRaises(pyqueue.Empty, queue.get, False) |
| |
| p.join() |
| close_queue(queue) |
| |
| def test_qsize(self): |
| q = self.Queue() |
| try: |
| self.assertEqual(q.qsize(), 0) |
| except NotImplementedError: |
| self.skipTest('qsize method not implemented') |
| q.put(1) |
| self.assertEqual(q.qsize(), 1) |
| q.put(5) |
| self.assertEqual(q.qsize(), 2) |
| q.get() |
| self.assertEqual(q.qsize(), 1) |
| q.get() |
| self.assertEqual(q.qsize(), 0) |
| close_queue(q) |
| |
| @classmethod |
| def _test_task_done(cls, q): |
| for obj in iter(q.get, None): |
| time.sleep(DELTA) |
| q.task_done() |
| |
| def test_task_done(self): |
| queue = self.JoinableQueue() |
| |
| workers = [self.Process(target=self._test_task_done, args=(queue,)) |
| for i in range(4)] |
| |
| for p in workers: |
| p.daemon = True |
| p.start() |
| |
| for i in range(10): |
| queue.put(i) |
| |
| queue.join() |
| |
| for p in workers: |
| queue.put(None) |
| |
| for p in workers: |
| p.join() |
| close_queue(queue) |
| |
| def test_no_import_lock_contention(self): |
| with os_helper.temp_cwd(): |
| module_name = 'imported_by_an_imported_module' |
| with open(module_name + '.py', 'w', encoding="utf-8") as f: |
| f.write("""if 1: |
| import multiprocessing |
| |
| q = multiprocessing.Queue() |
| q.put('knock knock') |
| q.get(timeout=3) |
| q.close() |
| del q |
| """) |
| |
| with import_helper.DirsOnSysPath(os.getcwd()): |
| try: |
| __import__(module_name) |
| except pyqueue.Empty: |
| self.fail("Probable regression on import lock contention;" |
| " see Issue #22853") |
| |
| def test_timeout(self): |
| q = multiprocessing.Queue() |
| start = time.monotonic() |
| self.assertRaises(pyqueue.Empty, q.get, True, 0.200) |
| delta = time.monotonic() - start |
| # bpo-30317: Tolerate a delta of 100 ms because of the bad clock |
| # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once |
| # failed because the delta was only 135.8 ms. |
| self.assertGreaterEqual(delta, 0.100) |
| close_queue(q) |
| |
| def test_queue_feeder_donot_stop_onexc(self): |
| # bpo-30414: verify feeder handles exceptions correctly |
| if self.TYPE != 'processes': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| class NotSerializable(object): |
| def __reduce__(self): |
| raise AttributeError |
| with test.support.captured_stderr(): |
| q = self.Queue() |
| q.put(NotSerializable()) |
| q.put(True) |
| self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) |
| close_queue(q) |
| |
| with test.support.captured_stderr(): |
| # bpo-33078: verify that the queue size is correctly handled |
| # on errors. |
| q = self.Queue(maxsize=1) |
| q.put(NotSerializable()) |
| q.put(True) |
| try: |
| self.assertEqual(q.qsize(), 1) |
| except NotImplementedError: |
| # qsize is not available on all platform as it |
| # relies on sem_getvalue |
| pass |
| self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) |
| # Check that the size of the queue is correct |
| self.assertTrue(q.empty()) |
| close_queue(q) |
| |
| def test_queue_feeder_on_queue_feeder_error(self): |
| # bpo-30006: verify feeder handles exceptions using the |
| # _on_queue_feeder_error hook. |
| if self.TYPE != 'processes': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| class NotSerializable(object): |
| """Mock unserializable object""" |
| def __init__(self): |
| self.reduce_was_called = False |
| self.on_queue_feeder_error_was_called = False |
| |
| def __reduce__(self): |
| self.reduce_was_called = True |
| raise AttributeError |
| |
| class SafeQueue(multiprocessing.queues.Queue): |
| """Queue with overloaded _on_queue_feeder_error hook""" |
| @staticmethod |
| def _on_queue_feeder_error(e, obj): |
| if (isinstance(e, AttributeError) and |
| isinstance(obj, NotSerializable)): |
| obj.on_queue_feeder_error_was_called = True |
| |
| not_serializable_obj = NotSerializable() |
| # The captured_stderr reduces the noise in the test report |
| with test.support.captured_stderr(): |
| q = SafeQueue(ctx=multiprocessing.get_context()) |
| q.put(not_serializable_obj) |
| |
| # Verify that q is still functioning correctly |
| q.put(True) |
| self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) |
| |
| # Assert that the serialization and the hook have been called correctly |
| self.assertTrue(not_serializable_obj.reduce_was_called) |
| self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) |
| |
| def test_closed_queue_empty_exceptions(self): |
| # Assert that checking the emptiness of an unused closed queue |
| # does not raise an OSError. The rationale is that q.close() is |
| # a no-op upon construction and becomes effective once the queue |
| # has been used (e.g., by calling q.put()). |
| for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): |
| q.close() # this is a no-op since the feeder thread is None |
| q.join_thread() # this is also a no-op |
| self.assertTrue(q.empty()) |
| |
| for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): |
| q.put('foo') # make sure that the queue is 'used' |
| q.close() # close the feeder thread |
| q.join_thread() # make sure to join the feeder thread |
| with self.assertRaisesRegex(OSError, 'is closed'): |
| q.empty() |
| |
| def test_closed_queue_put_get_exceptions(self): |
| for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): |
| q.close() |
| with self.assertRaisesRegex(ValueError, 'is closed'): |
| q.put('foo') |
| with self.assertRaisesRegex(ValueError, 'is closed'): |
| q.get() |
| # |
| # |
| # |
| |
| class _TestLock(BaseTestCase): |
| |
| @staticmethod |
| def _acquire(lock, l=None): |
| lock.acquire() |
| if l is not None: |
| l.append(repr(lock)) |
| |
| @staticmethod |
| def _acquire_event(lock, event): |
| lock.acquire() |
| event.set() |
| time.sleep(1.0) |
| |
| def test_repr_lock(self): |
| if self.TYPE != 'processes': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| lock = self.Lock() |
| self.assertEqual(f'<Lock(owner=None)>', repr(lock)) |
| |
| lock.acquire() |
| self.assertEqual(f'<Lock(owner=MainProcess)>', repr(lock)) |
| lock.release() |
| |
| tname = 'T1' |
| l = [] |
| t = threading.Thread(target=self._acquire, |
| args=(lock, l), |
| name=tname) |
| t.start() |
| time.sleep(0.1) |
| self.assertEqual(f'<Lock(owner=MainProcess|{tname})>', l[0]) |
| lock.release() |
| |
| t = threading.Thread(target=self._acquire, |
| args=(lock,), |
| name=tname) |
| t.start() |
| time.sleep(0.1) |
| self.assertEqual('<Lock(owner=SomeOtherThread)>', repr(lock)) |
| lock.release() |
| |
| pname = 'P1' |
| l = multiprocessing.Manager().list() |
| p = self.Process(target=self._acquire, |
| args=(lock, l), |
| name=pname) |
| p.start() |
| p.join() |
| self.assertEqual(f'<Lock(owner={pname})>', l[0]) |
| |
| lock = self.Lock() |
| event = self.Event() |
| p = self.Process(target=self._acquire_event, |
| args=(lock, event), |
| name='P2') |
| p.start() |
| event.wait() |
| self.assertEqual(f'<Lock(owner=SomeOtherProcess)>', repr(lock)) |
| p.terminate() |
| |
| def test_lock(self): |
| lock = self.Lock() |
| self.assertEqual(lock.acquire(), True) |
| self.assertEqual(lock.acquire(False), False) |
| self.assertEqual(lock.release(), None) |
| self.assertRaises((ValueError, threading.ThreadError), lock.release) |
| |
| @staticmethod |
| def _acquire_release(lock, timeout, l=None, n=1): |
| for _ in range(n): |
| lock.acquire() |
| if l is not None: |
| l.append(repr(lock)) |
| time.sleep(timeout) |
| for _ in range(n): |
| lock.release() |
| |
| def test_repr_rlock(self): |
| if self.TYPE != 'processes': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| lock = self.RLock() |
| self.assertEqual('<RLock(None, 0)>', repr(lock)) |
| |
| n = 3 |
| for _ in range(n): |
| lock.acquire() |
| self.assertEqual(f'<RLock(MainProcess, {n})>', repr(lock)) |
| for _ in range(n): |
| lock.release() |
| |
| t, l = [], [] |
| for i in range(n): |
| t.append(threading.Thread(target=self._acquire_release, |
| args=(lock, 0.1, l, i+1), |
| name=f'T{i+1}')) |
| t[-1].start() |
| for t_ in t: |
| t_.join() |
| for i in range(n): |
| self.assertIn(f'<RLock(MainProcess|T{i+1}, {i+1})>', l) |
| |
| |
| t = threading.Thread(target=self._acquire_release, |
| args=(lock, 0.2), |
| name=f'T1') |
| t.start() |
| time.sleep(0.1) |
| self.assertEqual('<RLock(SomeOtherThread, nonzero)>', repr(lock)) |
| time.sleep(0.2) |
| |
| pname = 'P1' |
| l = multiprocessing.Manager().list() |
| p = self.Process(target=self._acquire_release, |
| args=(lock, 0.1, l), |
| name=pname) |
| p.start() |
| p.join() |
| self.assertEqual(f'<RLock({pname}, 1)>', l[0]) |
| |
| event = self.Event() |
| lock = self.RLock() |
| p = self.Process(target=self._acquire_event, |
| args=(lock, event)) |
| p.start() |
| event.wait() |
| self.assertEqual('<RLock(SomeOtherProcess, nonzero)>', repr(lock)) |
| p.join() |
| |
| def test_rlock(self): |
| lock = self.RLock() |
| self.assertEqual(lock.acquire(), True) |
| self.assertEqual(lock.acquire(), True) |
| self.assertEqual(lock.acquire(), True) |
| self.assertEqual(lock.release(), None) |
| self.assertEqual(lock.release(), None) |
| self.assertEqual(lock.release(), None) |
| self.assertRaises((AssertionError, RuntimeError), lock.release) |
| |
| def test_lock_context(self): |
| with self.Lock(): |
| pass |
| |
| |
| class _TestSemaphore(BaseTestCase): |
| |
| def _test_semaphore(self, sem): |
| self.assertReturnsIfImplemented(2, get_value, sem) |
| self.assertEqual(sem.acquire(), True) |
| self.assertReturnsIfImplemented(1, get_value, sem) |
| self.assertEqual(sem.acquire(), True) |
| self.assertReturnsIfImplemented(0, get_value, sem) |
| self.assertEqual(sem.acquire(False), False) |
| self.assertReturnsIfImplemented(0, get_value, sem) |
| self.assertEqual(sem.release(), None) |
| self.assertReturnsIfImplemented(1, get_value, sem) |
| self.assertEqual(sem.release(), None) |
| self.assertReturnsIfImplemented(2, get_value, sem) |
| |
| def test_semaphore(self): |
| sem = self.Semaphore(2) |
| self._test_semaphore(sem) |
| self.assertEqual(sem.release(), None) |
| self.assertReturnsIfImplemented(3, get_value, sem) |
| self.assertEqual(sem.release(), None) |
| self.assertReturnsIfImplemented(4, get_value, sem) |
| |
| def test_bounded_semaphore(self): |
| sem = self.BoundedSemaphore(2) |
| self._test_semaphore(sem) |
| # Currently fails on OS/X |
| #if HAVE_GETVALUE: |
| # self.assertRaises(ValueError, sem.release) |
| # self.assertReturnsIfImplemented(2, get_value, sem) |
| |
| def test_timeout(self): |
| if self.TYPE != 'processes': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| sem = self.Semaphore(0) |
| acquire = TimingWrapper(sem.acquire) |
| |
| self.assertEqual(acquire(False), False) |
| self.assertTimingAlmostEqual(acquire.elapsed, 0.0) |
| |
| self.assertEqual(acquire(False, None), False) |
| self.assertTimingAlmostEqual(acquire.elapsed, 0.0) |
| |
| self.assertEqual(acquire(False, TIMEOUT1), False) |
| self.assertTimingAlmostEqual(acquire.elapsed, 0) |
| |
| self.assertEqual(acquire(True, TIMEOUT2), False) |
| self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) |
| |
| self.assertEqual(acquire(timeout=TIMEOUT3), False) |
| self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) |
| |
| |
| class _TestCondition(BaseTestCase): |
| |
| @classmethod |
| def f(cls, cond, sleeping, woken, timeout=None): |
| cond.acquire() |
| sleeping.release() |
| cond.wait(timeout) |
| woken.release() |
| cond.release() |
| |
| def assertReachesEventually(self, func, value): |
| for i in range(10): |
| try: |
| if func() == value: |
| break |
| except NotImplementedError: |
| break |
| time.sleep(DELTA) |
| time.sleep(DELTA) |
| self.assertReturnsIfImplemented(value, func) |
| |
| def check_invariant(self, cond): |
| # this is only supposed to succeed when there are no sleepers |
| if self.TYPE == 'processes': |
| try: |
| sleepers = (cond._sleeping_count.get_value() - |
| cond._woken_count.get_value()) |
| self.assertEqual(sleepers, 0) |
| self.assertEqual(cond._wait_semaphore.get_value(), 0) |
| except NotImplementedError: |
| pass |
| |
| def test_notify(self): |
| cond = self.Condition() |
| sleeping = self.Semaphore(0) |
| woken = self.Semaphore(0) |
| |
| p = self.Process(target=self.f, args=(cond, sleeping, woken)) |
| p.daemon = True |
| p.start() |
| self.addCleanup(p.join) |
| |
| p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) |
| p.daemon = True |
| p.start() |
| self.addCleanup(p.join) |
| |
| # wait for both children to start sleeping |
| sleeping.acquire() |
| sleeping.acquire() |
| |
| # check no process/thread has woken up |
| time.sleep(DELTA) |
| self.assertReturnsIfImplemented(0, get_value, woken) |
| |
| # wake up one process/thread |
| cond.acquire() |
| cond.notify() |
| cond.release() |
| |
| # check one process/thread has woken up |
| time.sleep(DELTA) |
| self.assertReturnsIfImplemented(1, get_value, woken) |
| |
| # wake up another |
| cond.acquire() |
| cond.notify() |
| cond.release() |
| |
| # check other has woken up |
| time.sleep(DELTA) |
| self.assertReturnsIfImplemented(2, get_value, woken) |
| |
| # check state is not mucked up |
| self.check_invariant(cond) |
| p.join() |
| |
| def test_notify_all(self): |
| cond = self.Condition() |
| sleeping = self.Semaphore(0) |
| woken = self.Semaphore(0) |
| |
| # start some threads/processes which will timeout |
| for i in range(3): |
| p = self.Process(target=self.f, |
| args=(cond, sleeping, woken, TIMEOUT1)) |
| p.daemon = True |
| p.start() |
| self.addCleanup(p.join) |
| |
| t = threading.Thread(target=self.f, |
| args=(cond, sleeping, woken, TIMEOUT1)) |
| t.daemon = True |
| t.start() |
| self.addCleanup(t.join) |
| |
| # wait for them all to sleep |
| for i in range(6): |
| sleeping.acquire() |
| |
| # check they have all timed out |
| for i in range(6): |
| woken.acquire() |
| self.assertReturnsIfImplemented(0, get_value, woken) |
| |
| # check state is not mucked up |
| self.check_invariant(cond) |
| |
| # start some more threads/processes |
| for i in range(3): |
| p = self.Process(target=self.f, args=(cond, sleeping, woken)) |
| p.daemon = True |
| p.start() |
| self.addCleanup(p.join) |
| |
| t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) |
| t.daemon = True |
| t.start() |
| self.addCleanup(t.join) |
| |
| # wait for them to all sleep |
| for i in range(6): |
| sleeping.acquire() |
| |
| # check no process/thread has woken up |
| time.sleep(DELTA) |
| self.assertReturnsIfImplemented(0, get_value, woken) |
| |
| # wake them all up |
| cond.acquire() |
| cond.notify_all() |
| cond.release() |
| |
| # check they have all woken |
| self.assertReachesEventually(lambda: get_value(woken), 6) |
| |
| # check state is not mucked up |
| self.check_invariant(cond) |
| |
| def test_notify_n(self): |
| cond = self.Condition() |
| sleeping = self.Semaphore(0) |
| woken = self.Semaphore(0) |
| |
| # start some threads/processes |
| for i in range(3): |
| p = self.Process(target=self.f, args=(cond, sleeping, woken)) |
| p.daemon = True |
| p.start() |
| self.addCleanup(p.join) |
| |
| t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) |
| t.daemon = True |
| t.start() |
| self.addCleanup(t.join) |
| |
| # wait for them to all sleep |
| for i in range(6): |
| sleeping.acquire() |
| |
| # check no process/thread has woken up |
| time.sleep(DELTA) |
| self.assertReturnsIfImplemented(0, get_value, woken) |
| |
| # wake some of them up |
| cond.acquire() |
| cond.notify(n=2) |
| cond.release() |
| |
| # check 2 have woken |
| self.assertReachesEventually(lambda: get_value(woken), 2) |
| |
| # wake the rest of them |
| cond.acquire() |
| cond.notify(n=4) |
| cond.release() |
| |
| self.assertReachesEventually(lambda: get_value(woken), 6) |
| |
| # doesn't do anything more |
| cond.acquire() |
| cond.notify(n=3) |
| cond.release() |
| |
| self.assertReturnsIfImplemented(6, get_value, woken) |
| |
| # check state is not mucked up |
| self.check_invariant(cond) |
| |
| def test_timeout(self): |
| cond = self.Condition() |
| wait = TimingWrapper(cond.wait) |
| cond.acquire() |
| res = wait(TIMEOUT1) |
| cond.release() |
| self.assertEqual(res, False) |
| self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) |
| |
| @classmethod |
| def _test_waitfor_f(cls, cond, state): |
| with cond: |
| state.value = 0 |
| cond.notify() |
| result = cond.wait_for(lambda : state.value==4) |
| if not result or state.value != 4: |
| sys.exit(1) |
| |
| @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') |
| def test_waitfor(self): |
| # based on test in test/lock_tests.py |
| cond = self.Condition() |
| state = self.Value('i', -1) |
| |
| p = self.Process(target=self._test_waitfor_f, args=(cond, state)) |
| p.daemon = True |
| p.start() |
| |
| with cond: |
| result = cond.wait_for(lambda : state.value==0) |
| self.assertTrue(result) |
| self.assertEqual(state.value, 0) |
| |
| for i in range(4): |
| time.sleep(0.01) |
| with cond: |
| state.value += 1 |
| cond.notify() |
| |
| join_process(p) |
| self.assertEqual(p.exitcode, 0) |
| |
| @classmethod |
| def _test_waitfor_timeout_f(cls, cond, state, success, sem): |
| sem.release() |
| with cond: |
| expected = 0.100 |
| dt = time.monotonic() |
| result = cond.wait_for(lambda : state.value==4, timeout=expected) |
| dt = time.monotonic() - dt |
| if not result and (expected - CLOCK_RES) <= dt: |
| success.value = True |
| |
| @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') |
| def test_waitfor_timeout(self): |
| # based on test in test/lock_tests.py |
| cond = self.Condition() |
| state = self.Value('i', 0) |
| success = self.Value('i', False) |
| sem = self.Semaphore(0) |
| |
| p = self.Process(target=self._test_waitfor_timeout_f, |
| args=(cond, state, success, sem)) |
| p.daemon = True |
| p.start() |
| self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT)) |
| |
| # Only increment 3 times, so state == 4 is never reached. |
| for i in range(3): |
| time.sleep(0.010) |
| with cond: |
| state.value += 1 |
| cond.notify() |
| |
| join_process(p) |
| self.assertTrue(success.value) |
| |
| @classmethod |
| def _test_wait_result(cls, c, pid): |
| with c: |
| c.notify() |
| time.sleep(1) |
| if pid is not None: |
| os.kill(pid, signal.SIGINT) |
| |
| def test_wait_result(self): |
| if isinstance(self, ProcessesMixin) and sys.platform != 'win32': |
| pid = os.getpid() |
| else: |
| pid = None |
| |
| c = self.Condition() |
| with c: |
| self.assertFalse(c.wait(0)) |
| self.assertFalse(c.wait(0.1)) |
| |
| p = self.Process(target=self._test_wait_result, args=(c, pid)) |
| p.start() |
| |
| self.assertTrue(c.wait(60)) |
| if pid is not None: |
| self.assertRaises(KeyboardInterrupt, c.wait, 60) |
| |
| p.join() |
| |
| |
| class _TestEvent(BaseTestCase): |
| |
| @classmethod |
| def _test_event(cls, event): |
| time.sleep(TIMEOUT2) |
| event.set() |
| |
| def test_event(self): |
| event = self.Event() |
| wait = TimingWrapper(event.wait) |
| |
| # Removed temporarily, due to API shear, this does not |
| # work with threading._Event objects. is_set == isSet |
| self.assertEqual(event.is_set(), False) |
| |
| # Removed, threading.Event.wait() will return the value of the __flag |
| # instead of None. API Shear with the semaphore backed mp.Event |
| self.assertEqual(wait(0.0), False) |
| self.assertTimingAlmostEqual(wait.elapsed, 0.0) |
| self.assertEqual(wait(TIMEOUT1), False) |
| self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) |
| |
| event.set() |
| |
| # See note above on the API differences |
| self.assertEqual(event.is_set(), True) |
| self.assertEqual(wait(), True) |
| self.assertTimingAlmostEqual(wait.elapsed, 0.0) |
| self.assertEqual(wait(TIMEOUT1), True) |
| self.assertTimingAlmostEqual(wait.elapsed, 0.0) |
| # self.assertEqual(event.is_set(), True) |
| |
| event.clear() |
| |
| #self.assertEqual(event.is_set(), False) |
| |
| p = self.Process(target=self._test_event, args=(event,)) |
| p.daemon = True |
| p.start() |
| self.assertEqual(wait(), True) |
| p.join() |
| |
| def test_repr(self) -> None: |
| event = self.Event() |
| if self.TYPE == 'processes': |
| self.assertRegex(repr(event), r"<Event at .* unset>") |
| event.set() |
| self.assertRegex(repr(event), r"<Event at .* set>") |
| event.clear() |
| self.assertRegex(repr(event), r"<Event at .* unset>") |
| elif self.TYPE == 'manager': |
| self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*") |
| event.set() |
| self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*") |
| |
| |
| # Tests for Barrier - adapted from tests in test/lock_tests.py |
| # |
| |
| # Many of the tests for threading.Barrier use a list as an atomic |
| # counter: a value is appended to increment the counter, and the |
| # length of the list gives the value. We use the class DummyList |
| # for the same purpose. |
| |
| class _DummyList(object): |
| |
| def __init__(self): |
| wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) |
| lock = multiprocessing.Lock() |
| self.__setstate__((wrapper, lock)) |
| self._lengthbuf[0] = 0 |
| |
| def __setstate__(self, state): |
| (self._wrapper, self._lock) = state |
| self._lengthbuf = self._wrapper.create_memoryview().cast('i') |
| |
| def __getstate__(self): |
| return (self._wrapper, self._lock) |
| |
| def append(self, _): |
| with self._lock: |
| self._lengthbuf[0] += 1 |
| |
| def __len__(self): |
| with self._lock: |
| return self._lengthbuf[0] |
| |
| def _wait(): |
| # A crude wait/yield function not relying on synchronization primitives. |
| time.sleep(0.01) |
| |
| |
| class Bunch(object): |
| """ |
| A bunch of threads. |
| """ |
| def __init__(self, namespace, f, args, n, wait_before_exit=False): |
| """ |
| Construct a bunch of `n` threads running the same function `f`. |
| If `wait_before_exit` is True, the threads won't terminate until |
| do_finish() is called. |
| """ |
| self.f = f |
| self.args = args |
| self.n = n |
| self.started = namespace.DummyList() |
| self.finished = namespace.DummyList() |
| self._can_exit = namespace.Event() |
| if not wait_before_exit: |
| self._can_exit.set() |
| |
| threads = [] |
| for i in range(n): |
| p = namespace.Process(target=self.task) |
| p.daemon = True |
| p.start() |
| threads.append(p) |
| |
| def finalize(threads): |
| for p in threads: |
| p.join() |
| |
| self._finalizer = weakref.finalize(self, finalize, threads) |
| |
| def task(self): |
| pid = os.getpid() |
| self.started.append(pid) |
| try: |
| self.f(*self.args) |
| finally: |
| self.finished.append(pid) |
| self._can_exit.wait(30) |
| assert self._can_exit.is_set() |
| |
| def wait_for_started(self): |
| while len(self.started) < self.n: |
| _wait() |
| |
| def wait_for_finished(self): |
| while len(self.finished) < self.n: |
| _wait() |
| |
| def do_finish(self): |
| self._can_exit.set() |
| |
| def close(self): |
| self._finalizer() |
| |
| |
| class AppendTrue(object): |
| def __init__(self, obj): |
| self.obj = obj |
| def __call__(self): |
| self.obj.append(True) |
| |
| |
| class _TestBarrier(BaseTestCase): |
| """ |
| Tests for Barrier objects. |
| """ |
| N = 5 |
| defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout |
| |
| def setUp(self): |
| self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) |
| |
| def tearDown(self): |
| self.barrier.abort() |
| self.barrier = None |
| |
| def DummyList(self): |
| if self.TYPE == 'threads': |
| return [] |
| elif self.TYPE == 'manager': |
| return self.manager.list() |
| else: |
| return _DummyList() |
| |
| def run_threads(self, f, args): |
| b = Bunch(self, f, args, self.N-1) |
| try: |
| f(*args) |
| b.wait_for_finished() |
| finally: |
| b.close() |
| |
| @classmethod |
| def multipass(cls, barrier, results, n): |
| m = barrier.parties |
| assert m == cls.N |
| for i in range(n): |
| results[0].append(True) |
| assert len(results[1]) == i * m |
| barrier.wait() |
| results[1].append(True) |
| assert len(results[0]) == (i + 1) * m |
| barrier.wait() |
| try: |
| assert barrier.n_waiting == 0 |
| except NotImplementedError: |
| pass |
| assert not barrier.broken |
| |
| def test_barrier(self, passes=1): |
| """ |
| Test that a barrier is passed in lockstep |
| """ |
| results = [self.DummyList(), self.DummyList()] |
| self.run_threads(self.multipass, (self.barrier, results, passes)) |
| |
| def test_barrier_10(self): |
| """ |
| Test that a barrier works for 10 consecutive runs |
| """ |
| return self.test_barrier(10) |
| |
| @classmethod |
| def _test_wait_return_f(cls, barrier, queue): |
| res = barrier.wait() |
| queue.put(res) |
| |
| def test_wait_return(self): |
| """ |
| test the return value from barrier.wait |
| """ |
| queue = self.Queue() |
| self.run_threads(self._test_wait_return_f, (self.barrier, queue)) |
| results = [queue.get() for i in range(self.N)] |
| self.assertEqual(results.count(0), 1) |
| close_queue(queue) |
| |
| @classmethod |
| def _test_action_f(cls, barrier, results): |
| barrier.wait() |
| if len(results) != 1: |
| raise RuntimeError |
| |
| def test_action(self): |
| """ |
| Test the 'action' callback |
| """ |
| results = self.DummyList() |
| barrier = self.Barrier(self.N, action=AppendTrue(results)) |
| self.run_threads(self._test_action_f, (barrier, results)) |
| self.assertEqual(len(results), 1) |
| |
| @classmethod |
| def _test_abort_f(cls, barrier, results1, results2): |
| try: |
| i = barrier.wait() |
| if i == cls.N//2: |
| raise RuntimeError |
| barrier.wait() |
| results1.append(True) |
| except threading.BrokenBarrierError: |
| results2.append(True) |
| except RuntimeError: |
| barrier.abort() |
| |
| def test_abort(self): |
| """ |
| Test that an abort will put the barrier in a broken state |
| """ |
| results1 = self.DummyList() |
| results2 = self.DummyList() |
| self.run_threads(self._test_abort_f, |
| (self.barrier, results1, results2)) |
| self.assertEqual(len(results1), 0) |
| self.assertEqual(len(results2), self.N-1) |
| self.assertTrue(self.barrier.broken) |
| |
| @classmethod |
| def _test_reset_f(cls, barrier, results1, results2, results3): |
| i = barrier.wait() |
| if i == cls.N//2: |
| # Wait until the other threads are all in the barrier. |
| while barrier.n_waiting < cls.N-1: |
| time.sleep(0.001) |
| barrier.reset() |
| else: |
| try: |
| barrier.wait() |
| results1.append(True) |
| except threading.BrokenBarrierError: |
| results2.append(True) |
| # Now, pass the barrier again |
| barrier.wait() |
| results3.append(True) |
| |
| def test_reset(self): |
| """ |
| Test that a 'reset' on a barrier frees the waiting threads |
| """ |
| results1 = self.DummyList() |
| results2 = self.DummyList() |
| results3 = self.DummyList() |
| self.run_threads(self._test_reset_f, |
| (self.barrier, results1, results2, results3)) |
| self.assertEqual(len(results1), 0) |
| self.assertEqual(len(results2), self.N-1) |
| self.assertEqual(len(results3), self.N) |
| |
| @classmethod |
| def _test_abort_and_reset_f(cls, barrier, barrier2, |
| results1, results2, results3): |
| try: |
| i = barrier.wait() |
| if i == cls.N//2: |
| raise RuntimeError |
| barrier.wait() |
| results1.append(True) |
| except threading.BrokenBarrierError: |
| results2.append(True) |
| except RuntimeError: |
| barrier.abort() |
| # Synchronize and reset the barrier. Must synchronize first so |
| # that everyone has left it when we reset, and after so that no |
| # one enters it before the reset. |
| if barrier2.wait() == cls.N//2: |
| barrier.reset() |
| barrier2.wait() |
| barrier.wait() |
| results3.append(True) |
| |
| def test_abort_and_reset(self): |
| """ |
| Test that a barrier can be reset after being broken. |
| """ |
| results1 = self.DummyList() |
| results2 = self.DummyList() |
| results3 = self.DummyList() |
| barrier2 = self.Barrier(self.N) |
| |
| self.run_threads(self._test_abort_and_reset_f, |
| (self.barrier, barrier2, results1, results2, results3)) |
| self.assertEqual(len(results1), 0) |
| self.assertEqual(len(results2), self.N-1) |
| self.assertEqual(len(results3), self.N) |
| |
| @classmethod |
| def _test_timeout_f(cls, barrier, results): |
| i = barrier.wait() |
| if i == cls.N//2: |
| # One thread is late! |
| time.sleep(1.0) |
| try: |
| barrier.wait(0.5) |
| except threading.BrokenBarrierError: |
| results.append(True) |
| |
| def test_timeout(self): |
| """ |
| Test wait(timeout) |
| """ |
| results = self.DummyList() |
| self.run_threads(self._test_timeout_f, (self.barrier, results)) |
| self.assertEqual(len(results), self.barrier.parties) |
| |
| @classmethod |
| def _test_default_timeout_f(cls, barrier, results): |
| i = barrier.wait(cls.defaultTimeout) |
| if i == cls.N//2: |
| # One thread is later than the default timeout |
| time.sleep(1.0) |
| try: |
| barrier.wait() |
| except threading.BrokenBarrierError: |
| results.append(True) |
| |
| def test_default_timeout(self): |
| """ |
| Test the barrier's default timeout |
| """ |
| barrier = self.Barrier(self.N, timeout=0.5) |
| results = self.DummyList() |
| self.run_threads(self._test_default_timeout_f, (barrier, results)) |
| self.assertEqual(len(results), barrier.parties) |
| |
| def test_single_thread(self): |
| b = self.Barrier(1) |
| b.wait() |
| b.wait() |
| |
| @classmethod |
| def _test_thousand_f(cls, barrier, passes, conn, lock): |
| for i in range(passes): |
| barrier.wait() |
| with lock: |
| conn.send(i) |
| |
| def test_thousand(self): |
| if self.TYPE == 'manager': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| passes = 1000 |
| lock = self.Lock() |
| conn, child_conn = self.Pipe(False) |
| for j in range(self.N): |
| p = self.Process(target=self._test_thousand_f, |
| args=(self.barrier, passes, child_conn, lock)) |
| p.start() |
| self.addCleanup(p.join) |
| |
| for i in range(passes): |
| for j in range(self.N): |
| self.assertEqual(conn.recv(), i) |
| |
| # |
| # |
| # |
| |
| class _TestValue(BaseTestCase): |
| |
| ALLOWED_TYPES = ('processes',) |
| |
| codes_values = [ |
| ('i', 4343, 24234), |
| ('d', 3.625, -4.25), |
| ('h', -232, 234), |
| ('q', 2 ** 33, 2 ** 34), |
| ('c', latin('x'), latin('y')) |
| ] |
| |
| def setUp(self): |
| if not HAS_SHAREDCTYPES: |
| self.skipTest("requires multiprocessing.sharedctypes") |
| |
| @classmethod |
| def _test(cls, values): |
| for sv, cv in zip(values, cls.codes_values): |
| sv.value = cv[2] |
| |
| |
| def test_value(self, raw=False): |
| if raw: |
| values = [self.RawValue(code, value) |
| for code, value, _ in self.codes_values] |
| else: |
| values = [self.Value(code, value) |
| for code, value, _ in self.codes_values] |
| |
| for sv, cv in zip(values, self.codes_values): |
| self.assertEqual(sv.value, cv[1]) |
| |
| proc = self.Process(target=self._test, args=(values,)) |
| proc.daemon = True |
| proc.start() |
| proc.join() |
| |
| for sv, cv in zip(values, self.codes_values): |
| self.assertEqual(sv.value, cv[2]) |
| |
| def test_rawvalue(self): |
| self.test_value(raw=True) |
| |
| def test_getobj_getlock(self): |
| val1 = self.Value('i', 5) |
| lock1 = val1.get_lock() |
| obj1 = val1.get_obj() |
| |
| val2 = self.Value('i', 5, lock=None) |
| lock2 = val2.get_lock() |
| obj2 = val2.get_obj() |
| |
| lock = self.Lock() |
| val3 = self.Value('i', 5, lock=lock) |
| lock3 = val3.get_lock() |
| obj3 = val3.get_obj() |
| self.assertEqual(lock, lock3) |
| |
| arr4 = self.Value('i', 5, lock=False) |
| self.assertFalse(hasattr(arr4, 'get_lock')) |
| self.assertFalse(hasattr(arr4, 'get_obj')) |
| |
| self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') |
| |
| arr5 = self.RawValue('i', 5) |
| self.assertFalse(hasattr(arr5, 'get_lock')) |
| self.assertFalse(hasattr(arr5, 'get_obj')) |
| |
| |
| class _TestArray(BaseTestCase): |
| |
| ALLOWED_TYPES = ('processes',) |
| |
| @classmethod |
| def f(cls, seq): |
| for i in range(1, len(seq)): |
| seq[i] += seq[i-1] |
| |
| @unittest.skipIf(c_int is None, "requires _ctypes") |
| def test_array(self, raw=False): |
| seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] |
| if raw: |
| arr = self.RawArray('i', seq) |
| else: |
| arr = self.Array('i', seq) |
| |
| self.assertEqual(len(arr), len(seq)) |
| self.assertEqual(arr[3], seq[3]) |
| self.assertEqual(list(arr[2:7]), list(seq[2:7])) |
| |
| arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) |
| |
| self.assertEqual(list(arr[:]), seq) |
| |
| self.f(seq) |
| |
| p = self.Process(target=self.f, args=(arr,)) |
| p.daemon = True |
| p.start() |
| p.join() |
| |
| self.assertEqual(list(arr[:]), seq) |
| |
| @unittest.skipIf(c_int is None, "requires _ctypes") |
| def test_array_from_size(self): |
| size = 10 |
| # Test for zeroing (see issue #11675). |
| # The repetition below strengthens the test by increasing the chances |
| # of previously allocated non-zero memory being used for the new array |
| # on the 2nd and 3rd loops. |
| for _ in range(3): |
| arr = self.Array('i', size) |
| self.assertEqual(len(arr), size) |
| self.assertEqual(list(arr), [0] * size) |
| arr[:] = range(10) |
| self.assertEqual(list(arr), list(range(10))) |
| del arr |
| |
| @unittest.skipIf(c_int is None, "requires _ctypes") |
| def test_rawarray(self): |
| self.test_array(raw=True) |
| |
| @unittest.skipIf(c_int is None, "requires _ctypes") |
| def test_getobj_getlock_obj(self): |
| arr1 = self.Array('i', list(range(10))) |
| lock1 = arr1.get_lock() |
| obj1 = arr1.get_obj() |
| |
| arr2 = self.Array('i', list(range(10)), lock=None) |
| lock2 = arr2.get_lock() |
| obj2 = arr2.get_obj() |
| |
| lock = self.Lock() |
| arr3 = self.Array('i', list(range(10)), lock=lock) |
| lock3 = arr3.get_lock() |
| obj3 = arr3.get_obj() |
| self.assertEqual(lock, lock3) |
| |
| arr4 = self.Array('i', range(10), lock=False) |
| self.assertFalse(hasattr(arr4, 'get_lock')) |
| self.assertFalse(hasattr(arr4, 'get_obj')) |
| self.assertRaises(AttributeError, |
| self.Array, 'i', range(10), lock='notalock') |
| |
| arr5 = self.RawArray('i', range(10)) |
| self.assertFalse(hasattr(arr5, 'get_lock')) |
| self.assertFalse(hasattr(arr5, 'get_obj')) |
| |
| # |
| # |
| # |
| |
| class _TestContainers(BaseTestCase): |
| |
| ALLOWED_TYPES = ('manager',) |
| |
| def test_list(self): |
| a = self.list(list(range(10))) |
| self.assertEqual(a[:], list(range(10))) |
| |
| b = self.list() |
| self.assertEqual(b[:], []) |
| |
| b.extend(list(range(5))) |
| self.assertEqual(b[:], list(range(5))) |
| |
| self.assertEqual(b[2], 2) |
| self.assertEqual(b[2:10], [2,3,4]) |
| |
| b *= 2 |
| self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) |
| |
| self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) |
| |
| self.assertEqual(a[:], list(range(10))) |
| |
| d = [a, b] |
| e = self.list(d) |
| self.assertEqual( |
| [element[:] for element in e], |
| [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] |
| ) |
| |
| f = self.list([a]) |
| a.append('hello') |
| self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) |
| |
| def test_list_isinstance(self): |
| a = self.list() |
| self.assertIsInstance(a, collections.abc.MutableSequence) |
| |
| def test_list_iter(self): |
| a = self.list(list(range(10))) |
| it = iter(a) |
| self.assertEqual(list(it), list(range(10))) |
| self.assertEqual(list(it), []) # exhausted |
| # list modified during iteration |
| it = iter(a) |
| a[0] = 100 |
| self.assertEqual(next(it), 100) |
| |
| def test_list_proxy_in_list(self): |
| a = self.list([self.list(range(3)) for _i in range(3)]) |
| self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) |
| |
| a[0][-1] = 55 |
| self.assertEqual(a[0][:], [0, 1, 55]) |
| for i in range(1, 3): |
| self.assertEqual(a[i][:], [0, 1, 2]) |
| |
| self.assertEqual(a[1].pop(), 2) |
| self.assertEqual(len(a[1]), 2) |
| for i in range(0, 3, 2): |
| self.assertEqual(len(a[i]), 3) |
| |
| del a |
| |
| b = self.list() |
| b.append(b) |
| del b |
| |
| def test_dict(self): |
| d = self.dict() |
| indices = list(range(65, 70)) |
| for i in indices: |
| d[i] = chr(i) |
| self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) |
| self.assertEqual(sorted(d.keys()), indices) |
| self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) |
| self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) |
| |
| def test_dict_isinstance(self): |
| a = self.dict() |
| self.assertIsInstance(a, collections.abc.MutableMapping) |
| |
| def test_dict_iter(self): |
| d = self.dict() |
| indices = list(range(65, 70)) |
| for i in indices: |
| d[i] = chr(i) |
| it = iter(d) |
| self.assertEqual(list(it), indices) |
| self.assertEqual(list(it), []) # exhausted |
| # dictionary changed size during iteration |
| it = iter(d) |
| d.clear() |
| self.assertRaises(RuntimeError, next, it) |
| |
| def test_dict_proxy_nested(self): |
| pets = self.dict(ferrets=2, hamsters=4) |
| supplies = self.dict(water=10, feed=3) |
| d = self.dict(pets=pets, supplies=supplies) |
| |
| self.assertEqual(supplies['water'], 10) |
| self.assertEqual(d['supplies']['water'], 10) |
| |
| d['supplies']['blankets'] = 5 |
| self.assertEqual(supplies['blankets'], 5) |
| self.assertEqual(d['supplies']['blankets'], 5) |
| |
| d['supplies']['water'] = 7 |
| self.assertEqual(supplies['water'], 7) |
| self.assertEqual(d['supplies']['water'], 7) |
| |
| del pets |
| del supplies |
| self.assertEqual(d['pets']['ferrets'], 2) |
| d['supplies']['blankets'] = 11 |
| self.assertEqual(d['supplies']['blankets'], 11) |
| |
| pets = d['pets'] |
| supplies = d['supplies'] |
| supplies['water'] = 7 |
| self.assertEqual(supplies['water'], 7) |
| self.assertEqual(d['supplies']['water'], 7) |
| |
| d.clear() |
| self.assertEqual(len(d), 0) |
| self.assertEqual(supplies['water'], 7) |
| self.assertEqual(pets['hamsters'], 4) |
| |
| l = self.list([pets, supplies]) |
| l[0]['marmots'] = 1 |
| self.assertEqual(pets['marmots'], 1) |
| self.assertEqual(l[0]['marmots'], 1) |
| |
| del pets |
| del supplies |
| self.assertEqual(l[0]['marmots'], 1) |
| |
| outer = self.list([[88, 99], l]) |
| self.assertIsInstance(outer[0], list) # Not a ListProxy |
| self.assertEqual(outer[-1][-1]['feed'], 3) |
| |
| def test_nested_queue(self): |
| a = self.list() # Test queue inside list |
| a.append(self.Queue()) |
| a[0].put(123) |
| self.assertEqual(a[0].get(), 123) |
| b = self.dict() # Test queue inside dict |
| b[0] = self.Queue() |
| b[0].put(456) |
| self.assertEqual(b[0].get(), 456) |
| |
| def test_namespace(self): |
| n = self.Namespace() |
| n.name = 'Bob' |
| n.job = 'Builder' |
| n._hidden = 'hidden' |
| self.assertEqual((n.name, n.job), ('Bob', 'Builder')) |
| del n.job |
| self.assertEqual(str(n), "Namespace(name='Bob')") |
| self.assertTrue(hasattr(n, 'name')) |
| self.assertTrue(not hasattr(n, 'job')) |
| |
| # |
| # |
| # |
| |
| def sqr(x, wait=0.0, event=None): |
| if event is None: |
| time.sleep(wait) |
| else: |
| event.wait(wait) |
| return x*x |
| |
| def mul(x, y): |
| return x*y |
| |
| def raise_large_valuerror(wait): |
| time.sleep(wait) |
| raise ValueError("x" * 1024**2) |
| |
| def identity(x): |
| return x |
| |
| class CountedObject(object): |
| n_instances = 0 |
| |
| def __new__(cls): |
| cls.n_instances += 1 |
| return object.__new__(cls) |
| |
| def __del__(self): |
| type(self).n_instances -= 1 |
| |
| class SayWhenError(ValueError): pass |
| |
| def exception_throwing_generator(total, when): |
| if when == -1: |
| raise SayWhenError("Somebody said when") |
| for i in range(total): |
| if i == when: |
| raise SayWhenError("Somebody said when") |
| yield i |
| |
| |
| class _TestPool(BaseTestCase): |
| |
| @classmethod |
| def setUpClass(cls): |
| super().setUpClass() |
| cls.pool = cls.Pool(4) |
| |
| @classmethod |
| def tearDownClass(cls): |
| cls.pool.terminate() |
| cls.pool.join() |
| cls.pool = None |
| super().tearDownClass() |
| |
| def test_apply(self): |
| papply = self.pool.apply |
| self.assertEqual(papply(sqr, (5,)), sqr(5)) |
| self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) |
| |
| def test_map(self): |
| pmap = self.pool.map |
| self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) |
| self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), |
| list(map(sqr, list(range(100))))) |
| |
| def test_starmap(self): |
| psmap = self.pool.starmap |
| tuples = list(zip(range(10), range(9,-1, -1))) |
| self.assertEqual(psmap(mul, tuples), |
| list(itertools.starmap(mul, tuples))) |
| tuples = list(zip(range(100), range(99,-1, -1))) |
| self.assertEqual(psmap(mul, tuples, chunksize=20), |
| list(itertools.starmap(mul, tuples))) |
| |
| def test_starmap_async(self): |
| tuples = list(zip(range(100), range(99,-1, -1))) |
| self.assertEqual(self.pool.starmap_async(mul, tuples).get(), |
| list(itertools.starmap(mul, tuples))) |
| |
| def test_map_async(self): |
| self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), |
| list(map(sqr, list(range(10))))) |
| |
| def test_map_async_callbacks(self): |
| call_args = self.manager.list() if self.TYPE == 'manager' else [] |
| self.pool.map_async(int, ['1'], |
| callback=call_args.append, |
| error_callback=call_args.append).wait() |
| self.assertEqual(1, len(call_args)) |
| self.assertEqual([1], call_args[0]) |
| self.pool.map_async(int, ['a'], |
| callback=call_args.append, |
| error_callback=call_args.append).wait() |
| self.assertEqual(2, len(call_args)) |
| self.assertIsInstance(call_args[1], ValueError) |
| |
| def test_map_unplicklable(self): |
| # Issue #19425 -- failure to pickle should not cause a hang |
| if self.TYPE == 'threads': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| class A(object): |
| def __reduce__(self): |
| raise RuntimeError('cannot pickle') |
| with self.assertRaises(RuntimeError): |
| self.pool.map(sqr, [A()]*10) |
| |
| def test_map_chunksize(self): |
| try: |
| self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) |
| except multiprocessing.TimeoutError: |
| self.fail("pool.map_async with chunksize stalled on null list") |
| |
| def test_map_handle_iterable_exception(self): |
| if self.TYPE == 'manager': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| # SayWhenError seen at the very first of the iterable |
| with self.assertRaises(SayWhenError): |
| self.pool.map(sqr, exception_throwing_generator(1, -1), 1) |
| # again, make sure it's reentrant |
| with self.assertRaises(SayWhenError): |
| self.pool.map(sqr, exception_throwing_generator(1, -1), 1) |
| |
| with self.assertRaises(SayWhenError): |
| self.pool.map(sqr, exception_throwing_generator(10, 3), 1) |
| |
| class SpecialIterable: |
| def __iter__(self): |
| return self |
| def __next__(self): |
| raise SayWhenError |
| def __len__(self): |
| return 1 |
| with self.assertRaises(SayWhenError): |
| self.pool.map(sqr, SpecialIterable(), 1) |
| with self.assertRaises(SayWhenError): |
| self.pool.map(sqr, SpecialIterable(), 1) |
| |
| def test_async(self): |
| res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) |
| get = TimingWrapper(res.get) |
| self.assertEqual(get(), 49) |
| self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) |
| |
| def test_async_timeout(self): |
| p = self.Pool(3) |
| try: |
| event = threading.Event() if self.TYPE == 'threads' else None |
| res = p.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT, event)) |
| get = TimingWrapper(res.get) |
| self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) |
| self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) |
| finally: |
| if event is not None: |
| event.set() |
| p.terminate() |
| p.join() |
| |
| def test_imap(self): |
| it = self.pool.imap(sqr, list(range(10))) |
| self.assertEqual(list(it), list(map(sqr, list(range(10))))) |
| |
| it = self.pool.imap(sqr, list(range(10))) |
| for i in range(10): |
| self.assertEqual(next(it), i*i) |
| self.assertRaises(StopIteration, it.__next__) |
| |
| it = self.pool.imap(sqr, list(range(1000)), chunksize=100) |
| for i in range(1000): |
| self.assertEqual(next(it), i*i) |
| self.assertRaises(StopIteration, it.__next__) |
| |
| def test_imap_handle_iterable_exception(self): |
| if self.TYPE == 'manager': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| # SayWhenError seen at the very first of the iterable |
| it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) |
| self.assertRaises(SayWhenError, it.__next__) |
| # again, make sure it's reentrant |
| it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) |
| self.assertRaises(SayWhenError, it.__next__) |
| |
| it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) |
| for i in range(3): |
| self.assertEqual(next(it), i*i) |
| self.assertRaises(SayWhenError, it.__next__) |
| |
| # SayWhenError seen at start of problematic chunk's results |
| it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) |
| for i in range(6): |
| self.assertEqual(next(it), i*i) |
| self.assertRaises(SayWhenError, it.__next__) |
| it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) |
| for i in range(4): |
| self.assertEqual(next(it), i*i) |
| self.assertRaises(SayWhenError, it.__next__) |
| |
| def test_imap_unordered(self): |
| it = self.pool.imap_unordered(sqr, list(range(10))) |
| self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) |
| |
| it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) |
| self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) |
| |
| def test_imap_unordered_handle_iterable_exception(self): |
| if self.TYPE == 'manager': |
| self.skipTest('test not appropriate for {}'.format(self.TYPE)) |
| |
| # SayWhenError seen at the very first of the iterable |
| it = self.pool.imap_unordered(sqr, |
| exception_throwing_generator(1, -1), |
| 1) |
| self.assertRaises(SayWhenError, it.__next__) |
| # again, make sure it's reentrant |
| it = self.pool.imap_unordered(sqr, |
| exception_throwing_generator(1, -1), |
| 1) |
| self.assertRaises(SayWhenError, it.__next__) |
| |
| it = self.pool.imap_unordered(sqr, |
| exception_throwing_generator(10, 3), |
| 1) |
| expected_values = list(map(sqr, list(range(10)))) |
| with self.assertRaises(SayWhenError): |
| # imap_unordered makes it difficult to anticipate the SayWhenError |
| for i in range(10): |
| value = next(it) |
| self.assertIn(value, expected_values) |
| expected_values.remove(value) |
| |
| it = self.pool.imap_unordered(sqr, |
| exception_throwing_generator(20, 7), |
| 2) |
| expected_values = list(map(sqr, list(range(20)))) |
| with self.assertRaises(SayWhenError): |
| for i in range(20): |
| value = next(it) |
| self.assertIn(value, expected_values) |
| expected_values.remove(value) |
| |
| def test_make_pool(self): |
| expected_error = (RemoteError if self.TYPE == 'manager' |
| else ValueError) |
| |
| self.assertRaises(expected_error, self.Pool, -1) |
|