| import itertools |
| import time |
| import unittest |
| import weakref |
| from concurrent import futures |
| from concurrent.futures._base import ( |
| CANCELLED_AND_NOTIFIED, FINISHED, Future) |
| |
| from test import support |
| |
| from .util import ( |
| PENDING_FUTURE, RUNNING_FUTURE, |
| CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, |
| create_future, create_executor_tests, setup_module) |
| |
| |
| def mul(x, y): |
| return x * y |
| |
| |
| class AsCompletedTests: |
| def test_no_timeout(self): |
| future1 = self.executor.submit(mul, 2, 21) |
| future2 = self.executor.submit(mul, 7, 6) |
| |
| completed = set(futures.as_completed( |
| [CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| SUCCESSFUL_FUTURE, |
| future1, future2])) |
| self.assertEqual(set( |
| [CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| SUCCESSFUL_FUTURE, |
| future1, future2]), |
| completed) |
| |
| def test_future_times_out(self): |
| """Test ``futures.as_completed`` timing out before |
| completing it's final future.""" |
| already_completed = {CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| SUCCESSFUL_FUTURE} |
| |
| # Windows clock resolution is around 15.6 ms |
| short_timeout = 0.100 |
| for timeout in (0, short_timeout): |
| with self.subTest(timeout): |
| |
| completed_futures = set() |
| future = self.executor.submit(time.sleep, short_timeout * 10) |
| |
| try: |
| for f in futures.as_completed( |
| already_completed | {future}, |
| timeout |
| ): |
| completed_futures.add(f) |
| except futures.TimeoutError: |
| pass |
| |
| # Check that ``future`` wasn't completed. |
| self.assertEqual(completed_futures, already_completed) |
| |
| def test_duplicate_futures(self): |
| # Issue 20367. Duplicate futures should not raise exceptions or give |
| # duplicate responses. |
| # Issue #31641: accept arbitrary iterables. |
| future1 = self.executor.submit(time.sleep, 2) |
| completed = [ |
| f for f in futures.as_completed(itertools.repeat(future1, 3)) |
| ] |
| self.assertEqual(len(completed), 1) |
| |
| def test_free_reference_yielded_future(self): |
| # Issue #14406: Generator should not keep references |
| # to finished futures. |
| futures_list = [Future() for _ in range(8)] |
| futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) |
| futures_list.append(create_future(state=FINISHED, result=42)) |
| |
| with self.assertRaises(futures.TimeoutError): |
| for future in futures.as_completed(futures_list, timeout=0): |
| futures_list.remove(future) |
| wr = weakref.ref(future) |
| del future |
| support.gc_collect() # For PyPy or other GCs. |
| self.assertIsNone(wr()) |
| |
| futures_list[0].set_result("test") |
| for future in futures.as_completed(futures_list): |
| futures_list.remove(future) |
| wr = weakref.ref(future) |
| del future |
| support.gc_collect() # For PyPy or other GCs. |
| self.assertIsNone(wr()) |
| if futures_list: |
| futures_list[0].set_result("test") |
| |
| def test_correct_timeout_exception_msg(self): |
| futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, |
| RUNNING_FUTURE, SUCCESSFUL_FUTURE] |
| |
| with self.assertRaises(futures.TimeoutError) as cm: |
| list(futures.as_completed(futures_list, timeout=0)) |
| |
| self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') |
| |
| |
| create_executor_tests(globals(), AsCompletedTests) |
| |
| |
| def setUpModule(): |
| setup_module() |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |