blob: c98be29c985bd74a1b8ef5f086c3847b174c30c3 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import re
import sys
import threading
import unittest
from testing_support import thread_watcher
class _PuppetThread(threading.Thread):
def __init__(self, name):
super(_PuppetThread, self).__init__(name=name)
self.daemon = True
self._start_event = threading.Event()
self._stop_event = threading.Event()
self.start()
# For the thread to actually start.
self._start_event.wait()
def run(self):
self._start_event.set()
self._stop_event.wait()
def stop(self):
self._stop_event.set()
self.join()
class _ResultFake(object):
def __init__(self, value):
self.value = value
def wasSuccessful(self):
return self.value
class ThreadWatcherTestCase(thread_watcher.TestCase):
def setUp(self):
self._fail_called = []
self.watcher = thread_watcher.ThreadWatcherMixIn()
self.watcher.fail = lambda x: self._fail_called.append(x)
self._threads = []
def tearDown(self):
for t in self._threads:
t.stop()
def test_no_extra_threads(self):
self.watcher.setUp()
self.watcher.tearDown()
self.assertEqual(self._fail_called, [])
def test_no_extra_threads_start_stop(self):
self.watcher.setUp()
self._threads.append(_PuppetThread('puppet1'))
self._threads[-1].stop()
self.watcher.tearDown()
self.assertEqual(self._fail_called, [])
def test_extra_threads(self):
self.watcher.setUp()
self._threads.append(_PuppetThread('foo'))
self._threads.append(_PuppetThread('bar'))
self.watcher.tearDown()
self.assertEqual(len(self._fail_called), 1)
error_message = self._fail_called[0]
self.assertRegexpMatches(
error_message,
re.compile(
'^Found 2 running thread\(s\) after the test.\n\n'
'Thread <_PuppetThread\(foo, started daemon \d+\)> stacktrace:\n'
' .*\n\n'
'Thread <_PuppetThread\(bar, started daemon \d+\)> stacktrace:\n'
' .*\n$', re.DOTALL)
)
self.assertNotIn(' Thread stopped while acquiring stacktrace.\n',
error_message)
def test_extra_threads_unittest_pass(self):
# Test succeeded, so thread_watcher must cause failure.
self.watcher._resultForDoCleanups = _ResultFake(True)
self.watcher.setUp()
self._threads.append(_PuppetThread('foo'))
self.watcher.tearDown()
self.assertEqual(len(self._fail_called), 1)
def test_extra_threads_unittest_fail(self):
# Test failed already, so thread_watcher must ignore result.
self.watcher._resultForDoCleanups = _ResultFake(False)
self.watcher.setUp()
self._threads.append(_PuppetThread('foo'))
self.watcher.tearDown()
self.assertEqual(self._fail_called, [])
def test_extra_threads_expect_tests_pass(self):
# Test succeeded, so thread_watcher must cause failure.
self.watcher._test_failed_with_exception = False
self.watcher.setUp()
self._threads.append(_PuppetThread('foo'))
self.watcher.tearDown()
self.assertEqual(len(self._fail_called), 1)
def test_extra_threads_expect_tests_fail(self):
# Test failed already, so thread_watcher must ignore result.
self.watcher._test_failed_with_exception = True
self.watcher.setUp()
self._threads.append(_PuppetThread('foo'))
self.watcher.tearDown()
self.assertEqual(self._fail_called, [])
def test_fail_get_stacktrace(self):
self.watcher.setUp()
self._threads.append(_PuppetThread('foo'))
try:
old = sys._current_frames
sys._current_frames = lambda: {}
self.watcher.tearDown()
finally:
sys._current_frames = old
self.assertEqual(len(self._fail_called), 1)
error_message = self._fail_called[0]
self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_message)
class ThreadWatcherTestCaseUsageTest(thread_watcher.TestCase):
@classmethod
def setUpClass(cls):
cls._threads = []
@classmethod
def tearDownClass(cls):
for t in cls._threads:
t.stop()
@unittest.expectedFailure
def test_fail_extra_threads(self):
self._threads.append(_PuppetThread(name))
def test_ok(self):
pass
def test_ok_with_threads(self):
self._threads.append(_PuppetThread('ok'))
self._threads[-1].stop()
if __name__ == '__main__':
unittest.main()