blob: 8809e0ae1c106787bba072856fcdb7bdcc510e4e [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2017 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for TestCase module."""
import queue
import unittest
from unittest import mock
from cros.factory.test import device_data
from cros.factory.test import event as test_event
from cros.factory.test import state
from cros.factory.test import test_case
from cros.factory.test import test_ui
from cros.factory.test.utils import pytest_utils
from cros.factory.unittest_utils import mock_time_utils
from cros.factory.utils import type_utils
_EventType = test_event.Event.Type
class TestCaseTest(unittest.TestCase):
class _MockEventLoop:
def __init__(self):
# yapf: disable
self._event_loop_end = queue.Queue() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# We don't use mock for PostNewEvent and Run, since there is race
# condition within the mock library __call__...
self.mock = mock.Mock()
def PostNewEvent(self, event_type, **kwargs):
if event_type == _EventType.END_EVENT_LOOP:
self._event_loop_end.put(kwargs)
# Call `MockPostNewEvent` for checking function calling
# since we don't mock `PostNewEvent`.
self.mock.MockPostNewEvent(event_type=event_type, **kwargs)
def Run(self):
end_event_kwargs = self._event_loop_end.get()
return type_utils.Obj(**end_event_kwargs)
def __getattr__(self, name):
return getattr(self.mock, name)
def setUp(self):
self._patchers = []
self._mock_device_data_dict = {}
self._timeline = mock_time_utils.TimeLine()
self._patchers.extend(mock_time_utils.MockAll(self._timeline))
# Mocks goofy_rpc
mock_goofy_rpc_attributes = {
'GetAttributeOfCurrentFactoryTest.return_value': 'mock_attribute',
}
self._mock_goofy_rpc = mock.Mock(**mock_goofy_rpc_attributes)
self._CreatePatcher(state,
'GetInstance').return_value = self._mock_goofy_rpc
self._test = test_case.TestCase()
# yapf: disable
self._test.ui_class = mock.Mock # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._handler_exception_hook = None
self._mock_event_loop = self._MockEventLoop()
self._CreatePatcher(
test_ui, 'EventLoop').side_effect = self._StubEventLoopConstructor
self._CreatePatcher(device_data,
'GetDeviceData').side_effect = self._MockGetDeviceData
self._CreatePatcher(
device_data,
'UpdateDeviceData').side_effect = self._MockUpdateDeviceData
self._CreatePatcher(
device_data,
'DeleteDeviceData').side_effect = self._MockDeleteDeviceData
def _StubEventLoopConstructor(self, handler_exception_hook):
self._handler_exception_hook = handler_exception_hook
return self._mock_event_loop
def _MockGetDeviceData(self, key, default=0):
return self._mock_device_data_dict.get(key, default)
def _MockUpdateDeviceData(self, update_dict):
for key, value in update_dict.items():
self._mock_device_data_dict[key] = value
def _MockDeleteDeviceData(self, delete_key, optional=False):
if optional:
self._mock_device_data_dict.pop(delete_key, None)
else:
del self._mock_device_data_dict[delete_key]
def _CreatePatcher(self, *args, **kwargs):
patcher = mock.patch.object(*args, **kwargs)
self._patchers.append(patcher)
return patcher.start()
def tearDown(self):
for patcher in self._patchers:
patcher.stop()
def GetRunResult(self):
result = unittest.TestResult()
self._test.run(result=result)
errors = result.errors + result.failures
return errors
def AssertRunPass(self):
errors = self.GetRunResult()
self.assertFalse(errors)
def AssertRunFailOrWaive(self, msg=None):
errors = self.GetRunResult()
self.assertEqual(1, len(errors))
if msg:
self.assertIn(msg, errors[0][1])
def AssertNotReached(self):
raise AssertionError('This should not be reached.')
def testGetNextTaskStageKey(self):
# pylint: disable=protected-access
self.assertEqual(self._test._next_task_stage_key,
'factory.test_case.next_task_stage.mock_attribute')
self._mock_goofy_rpc.GetAttributeOfCurrentFactoryTest.assert_called_with(
current_invocation_uuid=None, attribute_name='id')
def testAutomaticPass(self):
def _RunTest():
pass
# yapf: disable
self._test.runTest = _RunTest # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self.AssertRunPass()
self._mock_event_loop.AddEventHandler.assert_any_call(
'goofy_ui_task_end', mock.ANY)
def testPassTask(self):
call_count = [0]
def _Task():
call_count[0] += 1
self._test.PassTask()
self.AssertNotReached()
self._test.AddTask(_Task)
self._test.AddTask(_Task)
self.AssertRunPass()
self.assertEqual([2], call_count)
self._mock_event_loop.mock.MockPostNewEvent.assert_called_with(
event_type=_EventType.END_EVENT_LOOP, status=state.TestState.PASSED)
def testFailTask(self):
def _Task():
self._test.FailTask('Test fail.')
self.AssertNotReached()
self._test.AddTask(_Task)
self.AssertRunFailOrWaive()
self.assertRaises(pytest_utils.IndirectException)
self._mock_event_loop.mock.MockPostNewEvent.assert_called_with(
event_type=_EventType.END_EVENT_LOOP, status=state.TestState.FAILED,
exception_index=0)
def testWaiveTest(self):
def _Task():
self._test.WaiveTest('Test waived.')
self.AssertNotReached()
self._test.AddTask(_Task)
self.AssertRunFailOrWaive()
self.assertRaises(pytest_utils.IndirectException)
self._mock_event_loop.mock.MockPostNewEvent.assert_called_with(
event_type=_EventType.END_EVENT_LOOP,
status=state.TestState.FAILED_AND_WAIVED, exception_index=0)
self._mock_goofy_rpc.WaiveCurrentFactoryTest.assert_called_once()
def testFailWithAssert(self):
def _RunTest():
self.assertTrue(False) # pylint: disable=redundant-unittest-assert
# yapf: disable
self._test.runTest = _RunTest # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self.AssertRunFailOrWaive('False is not true')
def testAddTask_AllPass(self):
executed_tasks = []
def _Task(idx):
self.assertEqual(idx, self._mock_event_loop.ClearHandlers.call_count)
# yapf: disable
self.assertEqual(idx, self._test.ui.UnbindAllKeys.call_count) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
executed_tasks.append(idx)
self._test.AddTask(lambda: _Task(0))
self._test.AddTask(lambda: _Task(1))
self._test.AddTask(lambda: _Task(2))
self.AssertRunPass()
self.assertEqual([0, 1, 2], executed_tasks)
def testAddTask_SomeFail(self):
executed_tasks = []
def _Task(name, fail=False):
executed_tasks.append(name)
if fail:
self._test.FailTask('Task fail.')
self._test.AddTask(lambda: _Task('task1'))
self._test.AddTask(lambda: _Task('task2', fail=True))
self._test.AddTask(lambda: _Task('task3'))
self.AssertRunFailOrWaive()
self.assertEqual(['task1', 'task2'], executed_tasks)
def testAddTask_CheckNextTaskStage_WithoutReboot(self):
next_task_stages = []
def _Task():
# yapf: disable
next_task_stages.append(self._test.GetNextTaskStage()) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# next_task_stage will not be updated if all tasks were added with
# `reboot=False`
self._test.AddTask(_Task)
self._test.AddTask(_Task)
self._test.AddTask(_Task)
self.AssertRunPass()
self.assertEqual([0, 0, 0], next_task_stages)
def testAddTask_CheckNextTaskStage_WithReboot(self):
next_task_stages = []
def _Task():
# yapf: disable
next_task_stages.append(self._test.GetNextTaskStage()) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# next_task_stage will be updated if any task was added with
# `reboot=True`
self._test.AddTask(_Task)
self._test.AddTask(_Task)
self._test.AddTask(_Task, reboot=True)
self.AssertRunFailOrWaive()
self.assertEqual([1, 2, 3], next_task_stages)
def testAddTasks_SkipFinishedTasksAfterReboot(self):
executed_tasks = []
next_task_stages = []
def _Task(name):
executed_tasks.append((name))
# yapf: disable
next_task_stages.append(self._test.GetNextTaskStage()) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._test.AddTask(lambda: _Task('task1'))
self._test.AddTask(lambda: _Task('task2'), reboot=True)
self._test.AddTask(lambda: _Task('task3'))
# Sets the next stage flag for simulating the reboot scenario.
# The tasks finished before reboot should be skipped.
self._test.UpdateNextTaskStage(2)
self.AssertRunPass()
self.assertEqual(['task3'], executed_tasks)
self.assertEqual([3], next_task_stages)
def testAddTasks_ClearNextTaskStage_TestPass(self):
def _Task():
pass
# Makes sure the next stage flag will be cleared if all tasks pass.
self._test.AddTask(_Task, reboot=True)
self._test.AddTask(_Task)
self._test.AddTask(_Task)
self._test.UpdateNextTaskStage(1)
self.AssertRunPass()
# yapf: disable
self.assertEqual(self._test.GetNextTaskStage(), 0) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
def testAddTasks_ClearNextTaskStage_TestFail(self):
def _Task(fail_task=False):
if fail_task:
self._test.FailTask('Task fail.')
# Makes sure the next stage flag will be cleared if any task fail.
self._test.AddTask(_Task, reboot=True)
self._test.AddTask(_Task, fail_task=True)
self._test.AddTask(_Task)
self._test.UpdateNextTaskStage(1)
self.AssertRunFailOrWaive()
# yapf: disable
self.assertEqual(self._test.GetNextTaskStage(), 0) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
def testAddTasks_UnexpectedReboot(self):
def _Task():
pass
# Sets the next stage flag for simulating the reboot scenario.
# Exception should be triggered if the device reboot
# while running the task with 'reboot=False'
self._test.UpdateNextTaskStage(2)
self._test.AddTask(_Task, reboot=True)
self._test.AddTask(_Task, reboot=False)
self._test.AddTask(_Task)
self.AssertRunFailOrWaive(
'Unexpected reboot was triggered while running "_Task".')
@mock.patch('time.sleep')
def testAddTasks_RebootNotTriggeredWithinBufferTime(self, time_sleep):
# Task without reboot process
def _Task():
pass
self._test.AddTask(_Task, reboot=True)
self._test.AddTask(_Task)
self.AssertRunFailOrWaive('Reboot not triggered '
'within buffer time (5 seconds), '
'next task may be executed in advance.')
time_sleep.assert_called_once()
@mock.patch('time.sleep')
def testAddTasks_SetRebootBufferTime(self, time_sleep):
def _Task():
pass
self._test.AddTask(_Task, reboot=True, reboot_timeout_secs=3)
self._test.AddTask(_Task)
self.AssertRunFailOrWaive('Reboot not triggered '
'within buffer time (3 seconds), '
'next task may be executed in advance.')
time_sleep.assert_called_once_with(3)
def testWaitTaskEnd(self):
def _RunTest():
self._test.WaitTaskEnd()
self.AssertNotReached()
def _TestEnd():
try:
self._test.PassTask()
except Exception:
# yapf: disable
self._handler_exception_hook() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self._test.runTest = _RunTest # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._timeline.AddEvent(10, _TestEnd)
self.AssertRunPass()
self._timeline.AssertTimeAt(10)
def testWaitTaskEndFail(self):
def _RunTest():
self._test.WaitTaskEnd()
self.AssertNotReached()
def _TestEnd():
try:
self._test.FailTask('FAILED!')
except Exception:
# yapf: disable
self._handler_exception_hook() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self._test.runTest = _RunTest # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._timeline.AddEvent(10, _TestEnd)
self.AssertRunFailOrWaive()
self._timeline.AssertTimeAt(10)
def testSleep(self):
times = []
def _RunTest():
while True:
times.append(self._timeline.GetTime())
self._test.Sleep(2)
def _TestEnd():
try:
self._test.FailTask('FAILED!')
except Exception:
# yapf: disable
self._handler_exception_hook() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self._test.runTest = _RunTest # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._timeline.AddEvent(5, _TestEnd)
self.AssertRunFailOrWaive()
self.assertEqual(5, self._timeline.GetTime())
self.assertEqual([0, 2, 4], times)
def testSaveDataRightBeforeReboot(self):
next_stage = []
def _side_effect():
# yapf: disable
next_stage.append(self._test.GetNextTaskStage()) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
def _Task():
pass
self._test.AddTask(_Task)
self._test.AddTask(_Task, reboot=True)
self._mock_goofy_rpc.SaveDataForNextBoot.side_effect = _side_effect
self.GetRunResult()
self.assertEqual([2], next_stage)
@mock.patch('subprocess.check_call')
def testFlushLogRightBeforeReboot(self, mock_check_call):
next_stage = []
def _side_effect(_):
# yapf: disable
next_stage.append(self._test.GetNextTaskStage()) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
def _Task():
pass
self._test.AddTask(_Task, reboot=True)
self._test.AddTask(_Task)
mock_check_call.side_effect = _side_effect
self.GetRunResult()
self.assertEqual([1], next_stage)
mock_check_call.assert_called_once_with('sync')
if __name__ == '__main__':
unittest.main()