Redid the port from scratch using Python 3.2.5 as base
diff --git a/.hgignore b/.hgignore
new file mode 100644
index 0000000..78f3a52
--- /dev/null
+++ b/.hgignore
@@ -0,0 +1,11 @@
+
+syntax: glob
+*.egg-info
+syntax: regexp
+^\.tox$
+syntax: regexp
+^\.project$
+syntax: regexp
+^\.pydevproject$
+syntax: regexp
+^dist$
\ No newline at end of file
diff --git a/CHANGES b/CHANGES
index 1a84b63..81df636 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,9 @@
+2.1.4
+=====
+
+- Ported the library again from Python 3.2.5 to get the latest bug fixes 
+
+
 2.1.3
 =====
 
diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py
index aaefa2b..8ed69b7 100644
--- a/concurrent/futures/_base.py
+++ b/concurrent/futures/_base.py
@@ -2,7 +2,6 @@
 # Licensed to PSF under a Contributor Agreement.
 
 from __future__ import with_statement
-import functools
 import logging
 import threading
 import time
@@ -46,8 +45,6 @@
 
 # Logger for internal use by the futures package.
 LOGGER = logging.getLogger("concurrent.futures")
-STDERR_HANDLER = logging.StreamHandler()
-LOGGER.addHandler(STDERR_HANDLER)
 
 class Error(Exception):
     """Base class for all future-related exceptions."""
@@ -119,11 +116,14 @@
     def __init__(self, num_pending_calls, stop_on_exception):
         self.num_pending_calls = num_pending_calls
         self.stop_on_exception = stop_on_exception
+        self.lock = threading.Lock()
         super(_AllCompletedWaiter, self).__init__()
 
     def _decrement_pending_calls(self):
-        if self.num_pending_calls == len(self.finished_futures):
-            self.event.set()
+        with self.lock:
+            self.num_pending_calls -= 1
+            if not self.num_pending_calls:
+                self.event.set()
 
     def add_result(self, future):
         super(_AllCompletedWaiter, self).add_result(future)
@@ -523,7 +523,7 @@
         """Returns a iterator equivalent to map(fn, iter).
 
         Args:
-            fn: A callable that will take take as many arguments as there are
+            fn: A callable that will take as many arguments as there are
                 passed iterables.
             timeout: The maximum number of seconds to wait. If None, then there
                 is no limit on the wait time.
diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py
index 87dc789..98684f8 100644
--- a/concurrent/futures/process.py
+++ b/concurrent/futures/process.py
@@ -73,28 +73,17 @@
 # workers to exit when their work queues are empty and then waits until the
 # threads/processes finish.
 
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
 _shutdown = False
 
 def _python_exit():
     global _shutdown
     _shutdown = True
-    for thread_reference in _thread_references:
-        thread = thread_reference()
-        if thread is not None:
-            thread.join()
-
-def _remove_dead_thread_references():
-    """Remove inactive threads from _thread_references.
-
-    Should be called periodically to prevent memory leaks in scenarios such as:
-    >>> while True:
-    >>> ...    t = ThreadPoolExecutor(max_workers=5)
-    >>> ...    t.map(int, ['1', '2', '3', '4', '5'])
-    """
-    for thread_reference in set(_thread_references):
-        if thread_reference() is None:
-            _thread_references.discard(thread_reference)
+    items = list(_threads_queues.items())
+    for t, q in items:
+        q.put(None)
+    for t, q in items:
+        t.join()
 
 # Controls how many more calls than processes will be queued in the call queue.
 # A smaller number will mean that processes spend more time idle waiting for
@@ -122,10 +111,10 @@
         self.args = args
         self.kwargs = kwargs
 
-def _process_worker(call_queue, result_queue, shutdown):
+def _process_worker(call_queue, result_queue):
     """Evaluates calls from call_queue and places the results in result_queue.
 
-    This worker is run in a seperate process.
+    This worker is run in a separate process.
 
     Args:
         call_queue: A multiprocessing.Queue of _CallItems that will be read and
@@ -136,21 +125,20 @@
             worker that it should exit when call_queue is empty.
     """
     while True:
+        call_item = call_queue.get(block=True)
+        if call_item is None:
+            # Wake up queue management thread
+            result_queue.put(None)
+            return
         try:
-            call_item = call_queue.get(block=True, timeout=0.1)
-        except queue.Empty:
-            if shutdown.is_set():
-                return
+            r = call_item.fn(*call_item.args, **call_item.kwargs)
+        except BaseException:
+            e = sys.exc_info()[1]
+            result_queue.put(_ResultItem(call_item.work_id,
+                                         exception=e))
         else:
-            try:
-                r = call_item.fn(*call_item.args, **call_item.kwargs)
-            except BaseException:
-                e = sys.exc_info()[1]
-                result_queue.put(_ResultItem(call_item.work_id,
-                                             exception=e))
-            else:
-                result_queue.put(_ResultItem(call_item.work_id,
-                                             result=r))
+            result_queue.put(_ResultItem(call_item.work_id,
+                                         result=r))
 
 def _add_call_item_to_queue(pending_work_items,
                             work_ids,
@@ -189,13 +177,12 @@
                 del pending_work_items[work_id]
                 continue
 
-def _queue_manangement_worker(executor_reference,
-                              processes,
-                              pending_work_items,
-                              work_ids_queue,
-                              call_queue,
-                              result_queue,
-                              shutdown_process_event):
+def _queue_management_worker(executor_reference,
+                             processes,
+                             pending_work_items,
+                             work_ids_queue,
+                             call_queue,
+                             result_queue):
     """Manages the communication between this process and the worker processes.
 
     This function is run in a local thread.
@@ -213,37 +200,19 @@
             derived from _WorkItems for processing by the process workers.
         result_queue: A multiprocessing.Queue of _ResultItems generated by the
             process workers.
-        shutdown_process_event: A multiprocessing.Event used to signal the
-            process workers that they should exit when their work queue is
-            empty.
     """
+    nb_shutdown_processes = [0]
+    def shutdown_one_process():
+        """Tell a worker to terminate, which will in turn wake us again"""
+        call_queue.put(None)
+        nb_shutdown_processes[0] += 1
     while True:
         _add_call_item_to_queue(pending_work_items,
                                 work_ids_queue,
                                 call_queue)
 
-        try:
-            result_item = result_queue.get(block=True, timeout=0.1)
-        except queue.Empty:
-            executor = executor_reference()
-            # No more work items can be added if:
-            #   - The interpreter is shutting down OR
-            #   - The executor that owns this worker has been collected OR
-            #   - The executor that owns this worker has been shutdown.
-            if _shutdown or executor is None or executor._shutdown_thread:
-                # Since no new work items can be added, it is safe to shutdown
-                # this thread if there are no pending work items.
-                if not pending_work_items:
-                    shutdown_process_event.set()
-
-                    # If .join() is not called on the created processes then
-                    # some multiprocessing.Queue methods may deadlock on Mac OS
-                    # X.
-                    for p in processes:
-                        p.join()
-                    return
-            del executor
-        else:
+        result_item = result_queue.get(block=True)
+        if result_item is not None:
             work_item = pending_work_items[result_item.work_id]
             del pending_work_items[result_item.work_id]
 
@@ -251,6 +220,51 @@
                 work_item.future.set_exception(result_item.exception)
             else:
                 work_item.future.set_result(result_item.result)
+        # Check whether we should start shutting down.
+        executor = executor_reference()
+        # No more work items can be added if:
+        #   - The interpreter is shutting down OR
+        #   - The executor that owns this worker has been collected OR
+        #   - The executor that owns this worker has been shutdown.
+        if _shutdown or executor is None or executor._shutdown_thread:
+            # Since no new work items can be added, it is safe to shutdown
+            # this thread if there are no pending work items.
+            if not pending_work_items:
+                while nb_shutdown_processes[0] < len(processes):
+                    shutdown_one_process()
+                # If .join() is not called on the created processes then
+                # some multiprocessing.Queue methods may deadlock on Mac OS
+                # X.
+                for p in processes:
+                    p.join()
+                call_queue.close()
+                return
+        del executor
+
+_system_limits_checked = False
+_system_limited = None
+def _check_system_limits():
+    global _system_limits_checked, _system_limited
+    if _system_limits_checked:
+        if _system_limited:
+            raise NotImplementedError(_system_limited)
+    _system_limits_checked = True
+    try:
+        import os
+        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
+    except (AttributeError, ValueError):
+        # sysconf not available or setting not available
+        return
+    if nsems_max == -1:
+        # indetermine limit, assume that limit is determined
+        # by available memory only
+        return
+    if nsems_max >= 256:
+        # minimum number of semaphores available
+        # according to POSIX
+        return
+    _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
+    raise NotImplementedError(_system_limited)
 
 class ProcessPoolExecutor(_base.Executor):
     def __init__(self, max_workers=None):
@@ -261,7 +275,7 @@
                 execute the given calls. If None or not given then as many
                 worker processes will be created as the machine has processors.
         """
-        _remove_dead_thread_references()
+        _check_system_limits()
 
         if max_workers is None:
             self._max_workers = multiprocessing.cpu_count()
@@ -280,33 +294,34 @@
 
         # Shutdown is a two-step process.
         self._shutdown_thread = False
-        self._shutdown_process_event = multiprocessing.Event()
         self._shutdown_lock = threading.Lock()
         self._queue_count = 0
         self._pending_work_items = {}
 
     def _start_queue_management_thread(self):
+        # When the executor gets lost, the weakref callback will wake up
+        # the queue management thread.
+        def weakref_cb(_, q=self._result_queue):
+            q.put(None)
         if self._queue_management_thread is None:
             self._queue_management_thread = threading.Thread(
-                    target=_queue_manangement_worker,
-                    args=(weakref.ref(self),
+                    target=_queue_management_worker,
+                    args=(weakref.ref(self, weakref_cb),
                           self._processes,
                           self._pending_work_items,
                           self._work_ids,
                           self._call_queue,
-                          self._result_queue,
-                          self._shutdown_process_event))
+                          self._result_queue))
             self._queue_management_thread.daemon = True
             self._queue_management_thread.start()
-            _thread_references.add(weakref.ref(self._queue_management_thread))
+            _threads_queues[self._queue_management_thread] = self._result_queue
 
     def _adjust_process_count(self):
         for _ in range(len(self._processes), self._max_workers):
             p = multiprocessing.Process(
                     target=_process_worker,
                     args=(self._call_queue,
-                          self._result_queue,
-                          self._shutdown_process_event))
+                          self._result_queue))
             p.start()
             self._processes.add(p)
 
@@ -321,6 +336,8 @@
             self._pending_work_items[self._queue_count] = w
             self._work_ids.put(self._queue_count)
             self._queue_count += 1
+            # Wake up queue management thread
+            self._result_queue.put(None)
 
             self._start_queue_management_thread()
             self._adjust_process_count()
@@ -330,15 +347,16 @@
     def shutdown(self, wait=True):
         with self._shutdown_lock:
             self._shutdown_thread = True
-        if wait:
-            if self._queue_management_thread:
+        if self._queue_management_thread:
+            # Wake up queue management thread
+            self._result_queue.put(None)
+            if wait:
                 self._queue_management_thread.join()
         # To reduce the risk of openning too many files, remove references to
         # objects that use file descriptors.
         self._queue_management_thread = None
         self._call_queue = None
         self._result_queue = None
-        self._shutdown_process_event = None
         self._processes = None
     shutdown.__doc__ = _base.Executor.shutdown.__doc__
 
diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py
index ce0dda0..a45959d 100644
--- a/concurrent/futures/thread.py
+++ b/concurrent/futures/thread.py
@@ -32,28 +32,17 @@
 # workers to exit when their work queues are empty and then waits until the
 # threads finish.
 
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
 _shutdown = False
 
 def _python_exit():
     global _shutdown
     _shutdown = True
-    for thread_reference in _thread_references:
-        thread = thread_reference()
-        if thread is not None:
-            thread.join()
-
-def _remove_dead_thread_references():
-    """Remove inactive threads from _thread_references.
-
-    Should be called periodically to prevent memory leaks in scenarios such as:
-    >>> while True:
-    ...    t = ThreadPoolExecutor(max_workers=5)
-    ...    t.map(int, ['1', '2', '3', '4', '5'])
-    """
-    for thread_reference in set(_thread_references):
-        if thread_reference() is None:
-            _thread_references.discard(thread_reference)
+    items = list(_threads_queues.items())
+    for t, q in items:
+        q.put(None)
+    for t, q in items:
+        t.join()
 
 atexit.register(_python_exit)
 
@@ -79,19 +68,20 @@
 def _worker(executor_reference, work_queue):
     try:
         while True:
-            try:
-                work_item = work_queue.get(block=True, timeout=0.1)
-            except queue.Empty:
-                executor = executor_reference()
-                # Exit if:
-                #   - The interpreter is shutting down OR
-                #   - The executor that owns the worker has been collected OR
-                #   - The executor that owns the worker has been shutdown.
-                if _shutdown or executor is None or executor._shutdown:
-                    return
-                del executor
-            else:
+            work_item = work_queue.get(block=True)
+            if work_item is not None:
                 work_item.run()
+                continue
+            executor = executor_reference()
+            # Exit if:
+            #   - The interpreter is shutting down OR
+            #   - The executor that owns the worker has been collected OR
+            #   - The executor that owns the worker has been shutdown.
+            if _shutdown or executor is None or executor._shutdown:
+                # Notice other workers
+                work_queue.put(None)
+                return
+            del executor
     except BaseException:
         _base.LOGGER.critical('Exception in worker', exc_info=True)
 
@@ -103,8 +93,6 @@
             max_workers: The maximum number of threads that can be used to
                 execute the given calls.
         """
-        _remove_dead_thread_references()
-
         self._max_workers = max_workers
         self._work_queue = queue.Queue()
         self._threads = set()
@@ -125,19 +113,25 @@
     submit.__doc__ = _base.Executor.submit.__doc__
 
     def _adjust_thread_count(self):
+        # When the executor gets lost, the weakref callback will wake up
+        # the worker threads.
+        def weakref_cb(_, q=self._work_queue):
+            q.put(None)
         # TODO(bquinlan): Should avoid creating new threads if there are more
         # idle threads than items in the work queue.
         if len(self._threads) < self._max_workers:
             t = threading.Thread(target=_worker,
-                                 args=(weakref.ref(self), self._work_queue))
+                                 args=(weakref.ref(self, weakref_cb),
+                                       self._work_queue))
             t.daemon = True
             t.start()
             self._threads.add(t)
-            _thread_references.add(weakref.ref(t))
+            _threads_queues[t] = self._work_queue
 
     def shutdown(self, wait=True):
         with self._shutdown_lock:
             self._shutdown = True
+            self._work_queue.put(None)
         if wait:
             for t in self._threads:
                 t.join()
diff --git a/setup.py b/setup.py
index f0dcd89..c08461e 100755
--- a/setup.py
+++ b/setup.py
@@ -11,7 +11,7 @@
     from distutils.core import setup
 
 setup(name='futures',
-      version='2.1.3',
+      version='2.1.4',
       description='Backport of the concurrent.futures package from Python 3.2',
       author='Brian Quinlan',
       author_email='brian@sweetapp.com',
diff --git a/test_futures.py b/test_futures.py
index 725a60b..dd7fd3e 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -1,11 +1,22 @@
 from __future__ import with_statement
-import logging
-import multiprocessing
-import re
+import os
+import subprocess
 import sys
 import threading
+import functools
+import contextlib
+import logging
+import re
 import time
-import unittest
+
+from concurrent import futures
+from concurrent.futures._base import (
+    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
 
 try:
     from StringIO import StringIO
@@ -13,21 +24,95 @@
     from io import StringIO
 
 try:
-    from test.test_support import run_unittest
+    from test import test_support
 except ImportError:
-    from test.support import run_unittest
+    from test import support as test_support
 
-if sys.version_info < (3, 0):
+try:
+    next
+except NameError:
     next = lambda x: x.next()
 
-if sys.platform.startswith('win'):
-    import ctypes
-    import ctypes.wintypes
 
-from concurrent import futures
-from concurrent.futures._base import (
-    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
-    LOGGER, STDERR_HANDLER)
+def reap_threads(func):
+    """Use this function when threads are being used.  This will
+    ensure that the threads are cleaned up even when the test fails.
+    If threading is unavailable this function does nothing.
+    """
+    @functools.wraps(func)
+    def decorator(*args): 
+        key = test_support.threading_setup()
+        try:
+            return func(*args)
+        finally:
+            test_support.threading_cleanup(*key)
+    return decorator
+
+
+# Executing the interpreter in a subprocess
+def _assert_python(expected_success, *args, **env_vars):
+    cmd_line = [sys.executable]
+    if not env_vars:
+        cmd_line.append('-E')
+    # Need to preserve the original environment, for in-place testing of
+    # shared library builds.
+    env = os.environ.copy()
+    # But a special flag that can be set to override -- in this case, the
+    # caller is responsible to pass the full environment.
+    if env_vars.pop('__cleanenv', None):
+        env = {}
+    env.update(env_vars) 
+    cmd_line.extend(args)
+    p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+                         env=env)
+    try:
+        out, err = p.communicate()
+    finally:
+        subprocess._cleanup()
+        p.stdout.close()
+        p.stderr.close()
+    rc = p.returncode
+    err = strip_python_stderr(err)
+    if (rc and expected_success) or (not rc and not expected_success):
+        raise AssertionError(
+            "Process return code is %d, "
+            "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
+    return rc, out, err
+
+
+def assert_python_ok(*args, **env_vars):
+    """
+    Assert that running the interpreter with `args` and optional environment
+    variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
+    """
+    return _assert_python(True, *args, **env_vars)
+
+
+def strip_python_stderr(stderr): 
+    """Strip the stderr of a Python process from potential debug output
+    emitted by the interpreter.
+
+    This will typically be run on the result of the communicate() method
+    of a subprocess.Popen object.
+    """
+    stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip()
+    return stderr
+
+
+@contextlib.contextmanager
+def captured_stderr():
+    """Return a context manager used by captured_stdout/stdin/stderr
+    that temporarily replaces the sys stream *stream_name* with a StringIO."""
+    logging_stream = StringIO()
+    handler = logging.StreamHandler(logging_stream)
+    logging.root.addHandler(handler)
+
+    try:
+        yield logging_stream
+    finally:
+        logging.root.removeHandler(handler)
+
 
 def create_future(state=PENDING, exception=None, result=None):
     f = Future()
@@ -36,6 +121,7 @@
     f._result = result
     return f
 
+
 PENDING_FUTURE = create_future(state=PENDING)
 RUNNING_FUTURE = create_future(state=RUNNING)
 CANCELLED_FUTURE = create_future(state=CANCELLED)
@@ -43,138 +129,94 @@
 EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
 SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
 
+
 def mul(x, y):
     return x * y
 
-class Call(object):
-    """A call that can be submitted to a future.Executor for testing.
 
-    The call signals when it is called and waits for an event before finishing.
-    """
-    CALL_LOCKS = {}
-    def _create_event(self):
-        if sys.platform.startswith('win'):
-            class SECURITY_ATTRIBUTES(ctypes.Structure):
-                _fields_ = [("nLength", ctypes.wintypes.DWORD),
-                            ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
-                            ("bInheritHandle", ctypes.wintypes.BOOL)]
+def sleep_and_raise(t):
+    time.sleep(t)
+    raise Exception('this is an exception')
 
-            s = SECURITY_ATTRIBUTES()
-            s.nLength = ctypes.sizeof(s)
-            s.lpSecurityDescriptor = None
-            s.bInheritHandle = True
+def sleep_and_print(t, msg):
+    time.sleep(t)
+    print(msg)
+    sys.stdout.flush()
 
-            handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
-                                                         True,
-                                                         False,
-                                                         None)
-            assert handle is not None
-            return handle
-        else:
-            event = multiprocessing.Event()
-            self.CALL_LOCKS[id(event)] = event
-            return id(event)
 
-    def _wait_on_event(self, handle):
-        if sys.platform.startswith('win'):
-            r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
-            assert r == 0
-        else:
-            self.CALL_LOCKS[handle].wait()
+class ExecutorMixin:
+    worker_count = 5
 
-    def _signal_event(self, handle):
-        if sys.platform.startswith('win'):
-            r = ctypes.windll.kernel32.SetEvent(handle)
-            assert r != 0
-        else:
-            self.CALL_LOCKS[handle].set()
+    def setUp(self):
+        self.t1 = time.time()
+        try:
+            self.executor = self.executor_type(max_workers=self.worker_count)
+        except NotImplementedError:
+            e = sys.exc_info()[1]
+            self.skipTest(str(e))
+        self._prime_executor()
 
-    def __init__(self, manual_finish=False, result=42):
-        self._called_event = self._create_event()
-        self._can_finish = self._create_event()
+    def tearDown(self):
+        self.executor.shutdown(wait=True)
+        dt = time.time() - self.t1
+        if test_support.verbose:
+            print("%.2fs" % dt)
+        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
 
-        self._result = result
+    def _prime_executor(self):
+        # Make sure that the executor is ready to do work before running the
+        # tests. This should reduce the probability of timeouts in the tests.
+        futures = [self.executor.submit(time.sleep, 0.1)
+                   for _ in range(self.worker_count)]
 
-        if not manual_finish:
-            self._signal_event(self._can_finish)
+        for f in futures:
+            f.result()
 
-    def wait_on_called(self):
-        self._wait_on_event(self._called_event)
 
-    def set_can(self):
-        self._signal_event(self._can_finish)
+class ThreadPoolMixin(ExecutorMixin):
+    executor_type = futures.ThreadPoolExecutor
 
-    def __call__(self):
-        self._signal_event(self._called_event)
-        self._wait_on_event(self._can_finish)
 
-        return self._result
+class ProcessPoolMixin(ExecutorMixin):
+    executor_type = futures.ProcessPoolExecutor
 
-    def close(self):
-        self.set_can()
-        if sys.platform.startswith('win'):
-            ctypes.windll.kernel32.CloseHandle(self._called_event)
-            ctypes.windll.kernel32.CloseHandle(self._can_finish)
-        else:
-            del self.CALL_LOCKS[self._called_event]
-            del self.CALL_LOCKS[self._can_finish]
-
-class ExceptionCall(Call):
-    def __call__(self):
-        self._signal_event(self._called_event)
-        self._wait_on_event(self._can_finish)
-        raise ZeroDivisionError()
-
-class MapCall(Call):
-    def __init__(self, result=42):
-        super(MapCall, self).__init__(manual_finish=True, result=result)
-
-    def __call__(self, manual_finish):
-        if manual_finish:
-            super(MapCall, self).__call__()
-        return self._result
 
 class ExecutorShutdownTest(unittest.TestCase):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
     def test_run_after_shutdown(self):
         self.executor.shutdown()
         self.assertRaises(RuntimeError,
                           self.executor.submit,
                           pow, 2, 5)
 
-    def _start_some_futures(self):
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
-        call3 = Call(manual_finish=True)
+    def test_interpreter_shutdown(self):
+        # Test the atexit hook for shutdown of worker threads and processes
+        rc, out, err = assert_python_ok('-c', """if 1:
+            from concurrent.futures import %s
+            from time import sleep
+            from test_futures import sleep_and_print
+            t = %s(5)
+            t.submit(sleep_and_print, 1.0, "apple")
+            """ % (self.executor_type.__name__, self.executor_type.__name__))
+        # Errors in atexit hooks don't change the process exit code, check
+        # stderr manually.
+        self.assertFalse(err)
+        self.assertEqual(out.strip(), "apple".encode())
 
-        try:
-            self.executor.submit(call1)
-            self.executor.submit(call2)
-            self.executor.submit(call3)
+    def test_hang_issue12364(self):
+        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
+        self.executor.shutdown()
+        for f in fs:
+            f.result()
 
-            call1.wait_on_called()
-            call2.wait_on_called()
-            call3.wait_on_called()
 
-            call1.set_can()
-            call2.set_can()
-            call3.set_can()
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
-
-class ThreadPoolShutdownTest(ExecutorShutdownTest):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=5)
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
+    def _prime_executor(self):
+        pass
 
     def test_threads_terminate(self):
-        self._start_some_futures()
+        self.executor.submit(mul, 21, 2)
+        self.executor.submit(mul, 6, 7)
+        self.executor.submit(mul, 3, 14)
         self.assertEqual(len(self.executor._threads), 3)
         self.executor.shutdown()
         for t in self.executor._threads:
@@ -198,15 +240,15 @@
         for t in threads:
             t.join()
 
-class ProcessPoolShutdownTest(ExecutorShutdownTest):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=5)
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
+    def _prime_executor(self):
+        pass
 
     def test_processes_terminate(self):
-        self._start_some_futures()
+        self.executor.submit(mul, 21, 2)
+        self.executor.submit(mul, 6, 7)
+        self.executor.submit(mul, 3, 14)
         self.assertEqual(len(self.executor._processes), 5)
         processes = self.executor._processes
         self.executor.shutdown()
@@ -216,11 +258,11 @@
 
     def test_context_manager_shutdown(self):
         with futures.ProcessPoolExecutor(max_workers=5) as e:
-            executor = e
+            processes = e._processes
             self.assertEqual(list(e.map(abs, range(-5, 5))),
                              [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 
-        for p in self.executor._processes:
+        for p in processes:
             p.join()
 
     def test_del_shutdown(self):
@@ -234,325 +276,186 @@
         for p in processes:
             p.join()
 
+
 class WaitTests(unittest.TestCase):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
 
     def test_first_completed(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
+        future1 = self.executor.submit(mul, 21, 2)
+        future2 = self.executor.submit(time.sleep, 1.5)
 
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
+        done, not_done = futures.wait(
+                [CANCELLED_FUTURE, future1, future2],
+                 return_when=futures.FIRST_COMPLETED)
 
-            t = threading.Thread(target=wait_test)
-            t.start()
-            done, not_done = futures.wait(
-                    [CANCELLED_FUTURE, future1, future2],
-                     return_when=futures.FIRST_COMPLETED)
+        self.assertEqual(set([future1]), done)
+        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
 
-            self.assertEquals(set([future1]), done)
-            self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
-        finally:
-            call1.close()
-            call2.close()
+    def test_first_completed_some_already_completed(self):
+        future1 = self.executor.submit(time.sleep, 1.5)
 
-    def test_first_completed_one_already_completed(self):
-        call1 = Call(manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
+        finished, pending = futures.wait(
+                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
+                 return_when=futures.FIRST_COMPLETED)
 
-            finished, pending = futures.wait(
-                     [SUCCESSFUL_FUTURE, future1],
-                     return_when=futures.FIRST_COMPLETED)
-
-            self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
-            self.assertEquals(set([future1]), pending)
-        finally:
-            call1.close()
+        self.assertEqual(
+                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+                finished)
+        self.assertEqual(set([future1]), pending)
 
     def test_first_exception(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
-            call2.set_can()
+        future1 = self.executor.submit(mul, 2, 21)
+        future2 = self.executor.submit(sleep_and_raise, 1.5)
+        future3 = self.executor.submit(time.sleep, 3)
 
-        call1 = Call(manual_finish=True)
-        call2 = ExceptionCall(manual_finish=True)
-        call3 = Call(manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
-            future3 = self.executor.submit(call3)
+        finished, pending = futures.wait(
+                [future1, future2, future3],
+                return_when=futures.FIRST_EXCEPTION)
 
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [future1, future2, future3],
-                    return_when=futures.FIRST_EXCEPTION)
-
-            self.assertEquals(set([future1, future2]), finished)
-            self.assertEquals(set([future3]), pending)
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
+        self.assertEqual(set([future1, future2]), finished)
+        self.assertEqual(set([future3]), pending)
 
     def test_first_exception_some_already_complete(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
+        future1 = self.executor.submit(divmod, 21, 0)
+        future2 = self.executor.submit(time.sleep, 1.5)
 
-        call1 = ExceptionCall(manual_finish=True)
-        call2 = Call(manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
+        finished, pending = futures.wait(
+                [SUCCESSFUL_FUTURE,
+                 CANCELLED_FUTURE,
+                 CANCELLED_AND_NOTIFIED_FUTURE,
+                 future1, future2],
+                return_when=futures.FIRST_EXCEPTION)
 
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [SUCCESSFUL_FUTURE,
-                     CANCELLED_FUTURE,
-                     CANCELLED_AND_NOTIFIED_FUTURE,
-                     future1, future2],
-                    return_when=futures.FIRST_EXCEPTION)
-
-            self.assertEquals(set([SUCCESSFUL_FUTURE,
-                                   CANCELLED_AND_NOTIFIED_FUTURE,
-                                   future1]), finished)
-            self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
-
-
-        finally:
-            call1.close()
-            call2.close()
+        self.assertEqual(set([SUCCESSFUL_FUTURE,
+                              CANCELLED_AND_NOTIFIED_FUTURE,
+                              future1]), finished)
+        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
 
     def test_first_exception_one_already_failed(self):
-        call1 = Call(manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
+        future1 = self.executor.submit(time.sleep, 2)
 
-            finished, pending = futures.wait(
-                     [EXCEPTION_FUTURE, future1],
-                     return_when=futures.FIRST_EXCEPTION)
+        finished, pending = futures.wait(
+                 [EXCEPTION_FUTURE, future1],
+                 return_when=futures.FIRST_EXCEPTION)
 
-            self.assertEquals(set([EXCEPTION_FUTURE]), finished)
-            self.assertEquals(set([future1]), pending)
-        finally:
-            call1.close()
+        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+        self.assertEqual(set([future1]), pending)
 
     def test_all_completed(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
-            call2.set_can()
+        future1 = self.executor.submit(divmod, 2, 0)
+        future2 = self.executor.submit(mul, 2, 21)
 
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
+        finished, pending = futures.wait(
+                [SUCCESSFUL_FUTURE,
+                 CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 future1,
+                 future2],
+                return_when=futures.ALL_COMPLETED)
 
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [future1, future2],
-                    return_when=futures.ALL_COMPLETED)
-
-            self.assertEquals(set([future1, future2]), finished)
-            self.assertEquals(set(), pending)
-
-
-        finally:
-            call1.close()
-            call2.close()
-
-    def test_all_completed_some_already_completed(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-
-            future4.cancel()
-            call1.set_can()
-            call2.set_can()
-            call3.set_can()
-
-        self.assertTrue(
-                futures.process.EXTRA_QUEUED_CALLS <= 1,
-               'this test assumes that future4 will be cancelled before it is '
-               'queued to run - which might not be the case if '
-               'ProcessPoolExecutor is too aggresive in scheduling futures')
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
-        call3 = Call(manual_finish=True)
-        call4 = Call(manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
-            future3 = self.executor.submit(call3)
-            future4 = self.executor.submit(call4)
-
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [SUCCESSFUL_FUTURE,
-                     CANCELLED_AND_NOTIFIED_FUTURE,
-                     future1, future2, future3, future4],
-                    return_when=futures.ALL_COMPLETED)
-
-            self.assertEquals(set([SUCCESSFUL_FUTURE,
-                                   CANCELLED_AND_NOTIFIED_FUTURE,
-                                   future1, future2, future3, future4]),
-                              finished)
-            self.assertEquals(set(), pending)
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
-            call4.close()
+        self.assertEqual(set([SUCCESSFUL_FUTURE,
+                              CANCELLED_AND_NOTIFIED_FUTURE,
+                              EXCEPTION_FUTURE,
+                              future1,
+                              future2]), finished)
+        self.assertEqual(set(), pending)
 
     def test_timeout(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
+        future1 = self.executor.submit(mul, 6, 7)
+        future2 = self.executor.submit(time.sleep, 3)
 
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
+        finished, pending = futures.wait(
+                [CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 SUCCESSFUL_FUTURE,
+                 future1, future2],
+                timeout=1.5,
+                return_when=futures.ALL_COMPLETED)
+
+        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+                              EXCEPTION_FUTURE,
+                              SUCCESSFUL_FUTURE,
+                              future1]), finished)
+        self.assertEqual(set([future2]), pending)
+
+
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+
+    def test_pending_calls_race(self):
+        # Issue #14406: multi-threaded race condition when waiting on all
+        # futures.
+        event = threading.Event()
+        def future_func():
+            event.wait()
+        oldswitchinterval = sys.getcheckinterval()
+        sys.setcheckinterval(1)
         try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
-
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [CANCELLED_AND_NOTIFIED_FUTURE,
-                     EXCEPTION_FUTURE,
-                     SUCCESSFUL_FUTURE,
-                     future1, future2],
-                    timeout=1,
-                    return_when=futures.ALL_COMPLETED)
-
-            self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
-                                   EXCEPTION_FUTURE,
-                                   SUCCESSFUL_FUTURE,
-                                   future1]), finished)
-            self.assertEquals(set([future2]), pending)
-
-
+            fs = set(self.executor.submit(future_func) for i in range(100))
+            event.set()
+            futures.wait(fs, return_when=futures.ALL_COMPLETED)
         finally:
-            call1.close()
-            call2.close()
+            sys.setcheckinterval(oldswitchinterval)
 
 
-class ThreadPoolWaitTests(WaitTests):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=1)
+class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
+    pass
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
-
-class ProcessPoolWaitTests(WaitTests):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
 
 class AsCompletedTests(unittest.TestCase):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
     # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
     def test_no_timeout(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
-            call2.set_can()
+        future1 = self.executor.submit(mul, 2, 21)
+        future2 = self.executor.submit(mul, 7, 6)
 
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
-
-            t = threading.Thread(target=wait_test)
-            t.start()
-            completed = set(futures.as_completed(
-                    [CANCELLED_AND_NOTIFIED_FUTURE,
-                     EXCEPTION_FUTURE,
-                     SUCCESSFUL_FUTURE,
-                     future1, future2]))
-            self.assertEquals(set(
-                    [CANCELLED_AND_NOTIFIED_FUTURE,
-                     EXCEPTION_FUTURE,
-                     SUCCESSFUL_FUTURE,
-                     future1, future2]),
-                    completed)
-        finally:
-            call1.close()
-            call2.close()
+        completed = set(futures.as_completed(
+                [CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 SUCCESSFUL_FUTURE,
+                 future1, future2]))
+        self.assertEqual(set(
+                [CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 SUCCESSFUL_FUTURE,
+                 future1, future2]),
+                completed)
 
     def test_zero_timeout(self):
-        call1 = Call(manual_finish=True)
+        future1 = self.executor.submit(time.sleep, 2)
+        completed_futures = set()
         try:
-            future1 = self.executor.submit(call1)
-            completed_futures = set()
-            try:
-                for future in futures.as_completed(
-                        [CANCELLED_AND_NOTIFIED_FUTURE,
-                         EXCEPTION_FUTURE,
-                         SUCCESSFUL_FUTURE,
-                         future1],
-                        timeout=0):
-                    completed_futures.add(future)
-            except futures.TimeoutError:
-                pass
+            for future in futures.as_completed(
+                    [CANCELLED_AND_NOTIFIED_FUTURE,
+                     EXCEPTION_FUTURE,
+                     SUCCESSFUL_FUTURE,
+                     future1],
+                    timeout=0):
+                completed_futures.add(future)
+        except futures.TimeoutError:
+            pass
 
-            self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
-                                   EXCEPTION_FUTURE,
-                                   SUCCESSFUL_FUTURE]),
-                              completed_futures)
-        finally:
-            call1.close()
+        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+                              EXCEPTION_FUTURE,
+                              SUCCESSFUL_FUTURE]),
+                         completed_futures)
 
-class ThreadPoolAsCompletedTests(AsCompletedTests):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=1)
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
+    pass
 
-class ProcessPoolAsCompletedTests(AsCompletedTests):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
+    pass
+
 
 class ExecutorTest(unittest.TestCase):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
     # Executor.shutdown() and context manager usage is tested by
     # ExecutorShutdownTest.
     def test_submit(self):
         future = self.executor.submit(pow, 2, 8)
-        self.assertEquals(256, future.result())
+        self.assertEqual(256, future.result())
 
     def test_submit_keyword(self):
         future = self.executor.submit(mul, 2, y=8)
-        self.assertEquals(16, future.result())
+        self.assertEqual(16, future.result())
 
     def test_map(self):
         self.assertEqual(
@@ -567,139 +470,123 @@
 
     def test_map_timeout(self):
         results = []
-        timeout_call = MapCall()
         try:
-            try:
-                for i in self.executor.map(timeout_call,
-                                           [False, False, True],
-                                           timeout=1):
-                    results.append(i)
-            except futures.TimeoutError:
-                pass
-            else:
-                self.fail('expected TimeoutError')
-        finally:
-            timeout_call.close()
+            for i in self.executor.map(time.sleep,
+                                       [0, 0, 3],
+                                       timeout=1.5):
+                results.append(i)
+        except futures.TimeoutError:
+            pass
+        else:
+            self.fail('expected TimeoutError')
 
-        self.assertEquals([42, 42], results)
+        self.assertEqual([None, None], results)
 
-class ThreadPoolExecutorTest(ExecutorTest):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=1)
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
+    pass
 
-class ProcessPoolExecutorTest(ExecutorTest):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
+    pass
+
 
 class FutureTests(unittest.TestCase):
     def test_done_callback_with_result(self):
-        self.callback_result = None
+        callback_result = [None]
         def fn(callback_future):
-            self.callback_result = callback_future.result()
+            callback_result[0] = callback_future.result()
 
         f = Future()
         f.add_done_callback(fn)
         f.set_result(5)
-        self.assertEquals(5, self.callback_result)
+        self.assertEqual(5, callback_result[0])
 
     def test_done_callback_with_exception(self):
-        self.callback_exception = None
+        callback_exception = [None]
         def fn(callback_future):
-            self.callback_exception = callback_future.exception()
+            callback_exception[0] = callback_future.exception()
 
         f = Future()
         f.add_done_callback(fn)
         f.set_exception(Exception('test'))
-        self.assertEquals(('test',), self.callback_exception.args)
+        self.assertEqual(('test',), callback_exception[0].args)
 
     def test_done_callback_with_cancel(self):
-        self.was_cancelled = None
+        was_cancelled = [None]
         def fn(callback_future):
-            self.was_cancelled = callback_future.cancelled()
+            was_cancelled[0] = callback_future.cancelled()
 
         f = Future()
         f.add_done_callback(fn)
         self.assertTrue(f.cancel())
-        self.assertTrue(self.was_cancelled)
+        self.assertTrue(was_cancelled[0])
 
     def test_done_callback_raises(self):
-        LOGGER.removeHandler(STDERR_HANDLER)
-        logging_stream = StringIO()
-        handler = logging.StreamHandler(logging_stream)
-        LOGGER.addHandler(handler)
-        try:
-            self.raising_was_called = False
-            self.fn_was_called = False
+        with captured_stderr() as stderr:
+            raising_was_called = [False]
+            fn_was_called = [False]
 
             def raising_fn(callback_future):
-                self.raising_was_called = True
+                raising_was_called[0] = True
                 raise Exception('doh!')
 
             def fn(callback_future):
-                self.fn_was_called = True
+                fn_was_called[0] = True
 
             f = Future()
             f.add_done_callback(raising_fn)
             f.add_done_callback(fn)
             f.set_result(5)
-            self.assertTrue(self.raising_was_called)
-            self.assertTrue(self.fn_was_called)
-            self.assertTrue('Exception: doh!' in logging_stream.getvalue())
-        finally:
-            LOGGER.removeHandler(handler)
-            LOGGER.addHandler(STDERR_HANDLER)
+            self.assertTrue(raising_was_called)
+            self.assertTrue(fn_was_called)
+            self.assertIn('Exception: doh!', stderr.getvalue())
 
     def test_done_callback_already_successful(self):
-        self.callback_result = None
+        callback_result = [None]
         def fn(callback_future):
-            self.callback_result = callback_future.result()
+            callback_result[0] = callback_future.result()
 
         f = Future()
         f.set_result(5)
         f.add_done_callback(fn)
-        self.assertEquals(5, self.callback_result)
+        self.assertEqual(5, callback_result[0])
 
     def test_done_callback_already_failed(self):
-        self.callback_exception = None
+        callback_exception = [None]
         def fn(callback_future):
-            self.callback_exception = callback_future.exception()
+            callback_exception[0] = callback_future.exception()
 
         f = Future()
         f.set_exception(Exception('test'))
         f.add_done_callback(fn)
-        self.assertEquals(('test',), self.callback_exception.args)
+        self.assertEqual(('test',), callback_exception[0].args)
 
     def test_done_callback_already_cancelled(self):
-        self.was_cancelled = None
+        was_cancelled = [None]
         def fn(callback_future):
-            self.was_cancelled = callback_future.cancelled()
+            was_cancelled[0] = callback_future.cancelled()
 
         f = Future()
         self.assertTrue(f.cancel())
         f.add_done_callback(fn)
-        self.assertTrue(self.was_cancelled)
+        self.assertTrue(was_cancelled[0])
 
     def test_repr(self):
-        self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=pending>',
-                                 repr(PENDING_FUTURE)))
-        self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=running>',
-                                 repr(RUNNING_FUTURE)))
-        self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
-                                 repr(CANCELLED_FUTURE)))
-        self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
-                                 repr(CANCELLED_AND_NOTIFIED_FUTURE)))
-        self.assertTrue(re.match(
-                '<Future at 0x[0-9a-f]+L? state=finished raised IOError>',
-                repr(EXCEPTION_FUTURE)))
-        self.assertTrue(re.match(
-                '<Future at 0x[0-9a-f]+L? state=finished returned int>',
-                repr(SUCCESSFUL_FUTURE)))
+        self.assertRegexpMatches(repr(PENDING_FUTURE),
+                                 '<Future at 0x[0-9a-f]+ state=pending>')
+        self.assertRegexpMatches(repr(RUNNING_FUTURE),
+                                 '<Future at 0x[0-9a-f]+ state=running>')
+        self.assertRegexpMatches(repr(CANCELLED_FUTURE),
+                                 '<Future at 0x[0-9a-f]+ state=cancelled>')
+        self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
+                                 '<Future at 0x[0-9a-f]+ state=cancelled>')
+        self.assertRegexpMatches(
+                repr(EXCEPTION_FUTURE),
+                '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
+        self.assertRegexpMatches(
+                repr(SUCCESSFUL_FUTURE),
+                '<Future at 0x[0-9a-f]+ state=finished returned int>')
 
     def test_cancel(self):
         f1 = create_future(state=PENDING)
@@ -710,22 +597,22 @@
         f6 = create_future(state=FINISHED, result=5)
 
         self.assertTrue(f1.cancel())
-        self.assertEquals(f1._state, CANCELLED)
+        self.assertEqual(f1._state, CANCELLED)
 
         self.assertFalse(f2.cancel())
-        self.assertEquals(f2._state, RUNNING)
+        self.assertEqual(f2._state, RUNNING)
 
         self.assertTrue(f3.cancel())
-        self.assertEquals(f3._state, CANCELLED)
+        self.assertEqual(f3._state, CANCELLED)
 
         self.assertTrue(f4.cancel())
-        self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED)
+        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
 
         self.assertFalse(f5.cancel())
-        self.assertEquals(f5._state, FINISHED)
+        self.assertEqual(f5._state, FINISHED)
 
         self.assertFalse(f6.cancel())
-        self.assertEquals(f6._state, FINISHED)
+        self.assertEqual(f6._state, FINISHED)
 
     def test_cancelled(self):
         self.assertFalse(PENDING_FUTURE.cancelled())
@@ -774,7 +661,7 @@
         t = threading.Thread(target=notification)
         t.start()
 
-        self.assertEquals(f1.result(timeout=5), 42)
+        self.assertEqual(f1.result(timeout=5), 42)
 
     def test_result_with_cancel(self):
         # TODO(brian@sweetapp.com): This test is timing dependant.
@@ -817,16 +704,20 @@
 
         self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
 
+@reap_threads
 def test_main():
-    run_unittest(ProcessPoolExecutorTest,
-                 ThreadPoolExecutorTest,
-                 ProcessPoolWaitTests,
-                 ThreadPoolWaitTests,
-                 ProcessPoolAsCompletedTests,
-                 ThreadPoolAsCompletedTests,
-                 FutureTests,
-                 ProcessPoolShutdownTest,
-                 ThreadPoolShutdownTest)
+    try:
+        test_support.run_unittest(ProcessPoolExecutorTest,
+                                  ThreadPoolExecutorTest,
+                                  ProcessPoolWaitTests,
+                                  ThreadPoolWaitTests,
+                                  ProcessPoolAsCompletedTests,
+                                  ThreadPoolAsCompletedTests,
+                                  FutureTests,
+                                  ProcessPoolShutdownTest,
+                                  ThreadPoolShutdownTest)
+    finally:
+        test_support.reap_children()
 
 if __name__ == "__main__":
     test_main()
diff --git a/tox.ini b/tox.ini
index 83eda49..c1ff2f1 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,11 +1,8 @@
 [tox]
-envlist = py25,py26,py27,py31
+envlist = py26,py27,py31
 
 [testenv]
 commands={envpython} test_futures.py []
 
-#[testenv:py24]
-#deps=multiprocessing
-#
-#[testenv:py25]
-#deps=multiprocessing
+[testenv:py26]
+deps=unittest2