| import threading |
| import time |
| import unittest |
| from concurrent import futures |
| from concurrent.futures._base import ( |
| PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) |
| |
| from test import support |
| |
| from .util import ( |
| PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE, |
| CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, |
| BaseTestCase, create_future, setup_module) |
| |
| |
| class FutureTests(BaseTestCase): |
| def test_done_callback_with_result(self): |
| callback_result = None |
| def fn(callback_future): |
| nonlocal callback_result |
| callback_result = callback_future.result() |
| |
| f = Future() |
| f.add_done_callback(fn) |
| f.set_result(5) |
| self.assertEqual(5, callback_result) |
| |
| def test_done_callback_with_exception(self): |
| callback_exception = None |
| def fn(callback_future): |
| nonlocal callback_exception |
| callback_exception = callback_future.exception() |
| |
| f = Future() |
| f.add_done_callback(fn) |
| f.set_exception(Exception('test')) |
| self.assertEqual(('test',), callback_exception.args) |
| |
| def test_done_callback_with_cancel(self): |
| was_cancelled = None |
| def fn(callback_future): |
| nonlocal was_cancelled |
| was_cancelled = callback_future.cancelled() |
| |
| f = Future() |
| f.add_done_callback(fn) |
| self.assertTrue(f.cancel()) |
| self.assertTrue(was_cancelled) |
| |
| def test_done_callback_raises(self): |
| with support.captured_stderr() as stderr: |
| raising_was_called = False |
| fn_was_called = False |
| |
| def raising_fn(callback_future): |
| nonlocal raising_was_called |
| raising_was_called = True |
| raise Exception('doh!') |
| |
| def fn(callback_future): |
| nonlocal fn_was_called |
| fn_was_called = True |
| |
| f = Future() |
| f.add_done_callback(raising_fn) |
| f.add_done_callback(fn) |
| f.set_result(5) |
| self.assertTrue(raising_was_called) |
| self.assertTrue(fn_was_called) |
| self.assertIn('Exception: doh!', stderr.getvalue()) |
| |
| def test_done_callback_already_successful(self): |
| callback_result = None |
| def fn(callback_future): |
| nonlocal callback_result |
| callback_result = callback_future.result() |
| |
| f = Future() |
| f.set_result(5) |
| f.add_done_callback(fn) |
| self.assertEqual(5, callback_result) |
| |
| def test_done_callback_already_failed(self): |
| callback_exception = None |
| def fn(callback_future): |
| nonlocal callback_exception |
| callback_exception = callback_future.exception() |
| |
| f = Future() |
| f.set_exception(Exception('test')) |
| f.add_done_callback(fn) |
| self.assertEqual(('test',), callback_exception.args) |
| |
| def test_done_callback_already_cancelled(self): |
| was_cancelled = None |
| def fn(callback_future): |
| nonlocal was_cancelled |
| was_cancelled = callback_future.cancelled() |
| |
| f = Future() |
| self.assertTrue(f.cancel()) |
| f.add_done_callback(fn) |
| self.assertTrue(was_cancelled) |
| |
| def test_done_callback_raises_already_succeeded(self): |
| with support.captured_stderr() as stderr: |
| def raising_fn(callback_future): |
| raise Exception('doh!') |
| |
| f = Future() |
| |
| # Set the result first to simulate a future that runs instantly, |
| # effectively allowing the callback to be run immediately. |
| f.set_result(5) |
| f.add_done_callback(raising_fn) |
| |
| self.assertIn('exception calling callback for', stderr.getvalue()) |
| self.assertIn('doh!', stderr.getvalue()) |
| |
| |
| def test_repr(self): |
| self.assertRegex(repr(PENDING_FUTURE), |
| '<Future at 0x[0-9a-f]+ state=pending>') |
| self.assertRegex(repr(RUNNING_FUTURE), |
| '<Future at 0x[0-9a-f]+ state=running>') |
| self.assertRegex(repr(CANCELLED_FUTURE), |
| '<Future at 0x[0-9a-f]+ state=cancelled>') |
| self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), |
| '<Future at 0x[0-9a-f]+ state=cancelled>') |
| self.assertRegex( |
| repr(EXCEPTION_FUTURE), |
| '<Future at 0x[0-9a-f]+ state=finished raised OSError>') |
| self.assertRegex( |
| repr(SUCCESSFUL_FUTURE), |
| '<Future at 0x[0-9a-f]+ state=finished returned int>') |
| |
| def test_cancel(self): |
| f1 = create_future(state=PENDING) |
| f2 = create_future(state=RUNNING) |
| f3 = create_future(state=CANCELLED) |
| f4 = create_future(state=CANCELLED_AND_NOTIFIED) |
| f5 = create_future(state=FINISHED, exception=OSError()) |
| f6 = create_future(state=FINISHED, result=5) |
| |
| self.assertTrue(f1.cancel()) |
| self.assertEqual(f1._state, CANCELLED) |
| |
| self.assertFalse(f2.cancel()) |
| self.assertEqual(f2._state, RUNNING) |
| |
| self.assertTrue(f3.cancel()) |
| self.assertEqual(f3._state, CANCELLED) |
| |
| self.assertTrue(f4.cancel()) |
| self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) |
| |
| self.assertFalse(f5.cancel()) |
| self.assertEqual(f5._state, FINISHED) |
| |
| self.assertFalse(f6.cancel()) |
| self.assertEqual(f6._state, FINISHED) |
| |
| def test_cancelled(self): |
| self.assertFalse(PENDING_FUTURE.cancelled()) |
| self.assertFalse(RUNNING_FUTURE.cancelled()) |
| self.assertTrue(CANCELLED_FUTURE.cancelled()) |
| self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) |
| self.assertFalse(EXCEPTION_FUTURE.cancelled()) |
| self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) |
| |
| def test_done(self): |
| self.assertFalse(PENDING_FUTURE.done()) |
| self.assertFalse(RUNNING_FUTURE.done()) |
| self.assertTrue(CANCELLED_FUTURE.done()) |
| self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) |
| self.assertTrue(EXCEPTION_FUTURE.done()) |
| self.assertTrue(SUCCESSFUL_FUTURE.done()) |
| |
| def test_running(self): |
| self.assertFalse(PENDING_FUTURE.running()) |
| self.assertTrue(RUNNING_FUTURE.running()) |
| self.assertFalse(CANCELLED_FUTURE.running()) |
| self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) |
| self.assertFalse(EXCEPTION_FUTURE.running()) |
| self.assertFalse(SUCCESSFUL_FUTURE.running()) |
| |
| def test_result_with_timeout(self): |
| self.assertRaises(futures.TimeoutError, |
| PENDING_FUTURE.result, timeout=0) |
| self.assertRaises(futures.TimeoutError, |
| RUNNING_FUTURE.result, timeout=0) |
| self.assertRaises(futures.CancelledError, |
| CANCELLED_FUTURE.result, timeout=0) |
| self.assertRaises(futures.CancelledError, |
| CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) |
| self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) |
| self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) |
| |
| def test_result_with_success(self): |
| # TODO(brian@sweetapp.com): This test is timing dependent. |
| def notification(): |
| # Wait until the main thread is waiting for the result. |
| time.sleep(1) |
| f1.set_result(42) |
| |
| f1 = create_future(state=PENDING) |
| t = threading.Thread(target=notification) |
| t.start() |
| |
| self.assertEqual(f1.result(timeout=5), 42) |
| t.join() |
| |
| def test_result_with_cancel(self): |
| # TODO(brian@sweetapp.com): This test is timing dependent. |
| def notification(): |
| # Wait until the main thread is waiting for the result. |
| time.sleep(1) |
| f1.cancel() |
| |
| f1 = create_future(state=PENDING) |
| t = threading.Thread(target=notification) |
| t.start() |
| |
| self.assertRaises(futures.CancelledError, |
| f1.result, timeout=support.SHORT_TIMEOUT) |
| t.join() |
| |
| def test_exception_with_timeout(self): |
| self.assertRaises(futures.TimeoutError, |
| PENDING_FUTURE.exception, timeout=0) |
| self.assertRaises(futures.TimeoutError, |
| RUNNING_FUTURE.exception, timeout=0) |
| self.assertRaises(futures.CancelledError, |
| CANCELLED_FUTURE.exception, timeout=0) |
| self.assertRaises(futures.CancelledError, |
| CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) |
| self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), |
| OSError)) |
| self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) |
| |
| def test_exception_with_success(self): |
| def notification(): |
| # Wait until the main thread is waiting for the exception. |
| time.sleep(1) |
| with f1._condition: |
| f1._state = FINISHED |
| f1._exception = OSError() |
| f1._condition.notify_all() |
| |
| f1 = create_future(state=PENDING) |
| t = threading.Thread(target=notification) |
| t.start() |
| |
| self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError)) |
| t.join() |
| |
| def test_multiple_set_result(self): |
| f = create_future(state=PENDING) |
| f.set_result(1) |
| |
| with self.assertRaisesRegex( |
| futures.InvalidStateError, |
| 'FINISHED: <Future at 0x[0-9a-f]+ ' |
| 'state=finished returned int>' |
| ): |
| f.set_result(2) |
| |
| self.assertTrue(f.done()) |
| self.assertEqual(f.result(), 1) |
| |
| def test_multiple_set_exception(self): |
| f = create_future(state=PENDING) |
| e = ValueError() |
| f.set_exception(e) |
| |
| with self.assertRaisesRegex( |
| futures.InvalidStateError, |
| 'FINISHED: <Future at 0x[0-9a-f]+ ' |
| 'state=finished raised ValueError>' |
| ): |
| f.set_exception(Exception()) |
| |
| self.assertEqual(f.exception(), e) |
| |
| |
| def setUpModule(): |
| setup_module() |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |