| # Copyright 2016 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import re |
| import sys |
| import threading |
| import unittest |
| |
| from testing_support import thread_watcher |
| |
| |
| class _PuppetThread(threading.Thread): |
| def __init__(self, name): |
| super(_PuppetThread, self).__init__(name=name) |
| self.daemon = True |
| self._start_event = threading.Event() |
| self._stop_event = threading.Event() |
| self.start() |
| # For the thread to actually start. |
| self._start_event.wait() |
| |
| def run(self): |
| self._start_event.set() |
| self._stop_event.wait() |
| |
| def stop(self): |
| self._stop_event.set() |
| self.join() |
| |
| class _ResultFake(object): |
| def __init__(self, value): |
| self.value = value |
| |
| def wasSuccessful(self): |
| return self.value |
| |
| |
| class ThreadWatcherTestCase(thread_watcher.TestCase): |
| def setUp(self): |
| self._fail_called = [] |
| self.watcher = thread_watcher.ThreadWatcherMixIn() |
| self.watcher.fail = lambda x: self._fail_called.append(x) |
| self._threads = [] |
| |
| def tearDown(self): |
| for t in self._threads: |
| t.stop() |
| |
| def test_no_extra_threads(self): |
| self.watcher.setUp() |
| self.watcher.tearDown() |
| self.assertEqual(self._fail_called, []) |
| |
| def test_no_extra_threads_start_stop(self): |
| self.watcher.setUp() |
| self._threads.append(_PuppetThread('puppet1')) |
| self._threads[-1].stop() |
| self.watcher.tearDown() |
| self.assertEqual(self._fail_called, []) |
| |
| def test_extra_threads(self): |
| self.watcher.setUp() |
| self._threads.append(_PuppetThread('foo')) |
| self._threads.append(_PuppetThread('bar')) |
| self.watcher.tearDown() |
| self.assertEqual(len(self._fail_called), 1) |
| error_message = self._fail_called[0] |
| self.assertRegexpMatches( |
| error_message, |
| re.compile( |
| '^Found 2 running thread\(s\) after the test.\n\n' |
| 'Thread <_PuppetThread\(foo, started daemon \d+\)> stacktrace:\n' |
| ' .*\n\n' |
| 'Thread <_PuppetThread\(bar, started daemon \d+\)> stacktrace:\n' |
| ' .*\n$', re.DOTALL) |
| ) |
| self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', |
| error_message) |
| |
| def test_extra_threads_unittest_pass(self): |
| # Test succeeded, so thread_watcher must cause failure. |
| self.watcher._resultForDoCleanups = _ResultFake(True) |
| self.watcher.setUp() |
| self._threads.append(_PuppetThread('foo')) |
| self.watcher.tearDown() |
| self.assertEqual(len(self._fail_called), 1) |
| |
| def test_extra_threads_unittest_fail(self): |
| # Test failed already, so thread_watcher must ignore result. |
| self.watcher._resultForDoCleanups = _ResultFake(False) |
| self.watcher.setUp() |
| self._threads.append(_PuppetThread('foo')) |
| self.watcher.tearDown() |
| self.assertEqual(self._fail_called, []) |
| |
| def test_extra_threads_expect_tests_pass(self): |
| # Test succeeded, so thread_watcher must cause failure. |
| self.watcher._test_failed_with_exception = False |
| self.watcher.setUp() |
| self._threads.append(_PuppetThread('foo')) |
| self.watcher.tearDown() |
| self.assertEqual(len(self._fail_called), 1) |
| |
| def test_extra_threads_expect_tests_fail(self): |
| # Test failed already, so thread_watcher must ignore result. |
| self.watcher._test_failed_with_exception = True |
| self.watcher.setUp() |
| self._threads.append(_PuppetThread('foo')) |
| self.watcher.tearDown() |
| self.assertEqual(self._fail_called, []) |
| |
| def test_fail_get_stacktrace(self): |
| self.watcher.setUp() |
| self._threads.append(_PuppetThread('foo')) |
| try: |
| old = sys._current_frames |
| sys._current_frames = lambda: {} |
| self.watcher.tearDown() |
| finally: |
| sys._current_frames = old |
| |
| self.assertEqual(len(self._fail_called), 1) |
| error_message = self._fail_called[0] |
| self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_message) |
| |
| |
| class ThreadWatcherTestCaseUsageTest(thread_watcher.TestCase): |
| @classmethod |
| def setUpClass(cls): |
| cls._threads = [] |
| |
| @classmethod |
| def tearDownClass(cls): |
| for t in cls._threads: |
| t.stop() |
| |
| @unittest.expectedFailure |
| def test_fail_extra_threads(self): |
| self._threads.append(_PuppetThread(name)) |
| |
| def test_ok(self): |
| pass |
| |
| def test_ok_with_threads(self): |
| self._threads.append(_PuppetThread('ok')) |
| self._threads[-1].stop() |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |