pytests: Fix shutdown pytest fail.
* shutdown need timeout for event client, so it should use
ThreadingEventClient.
* ShutdownStep is now in test_object.
* BlockingEventClient.wait should be EventClientBase.wait.
BUG=b:67759465
TEST=make test
TEST=manually run HaltStep and RebootStep on DUT.
Change-Id: I5969f510e37a7dba86357700a29e3d5f1fed688f
Reviewed-on: https://chromium-review.googlesource.com/718242
Commit-Ready: Pi-Hsun Shih <pihsun@chromium.org>
Tested-by: Pi-Hsun Shih <pihsun@chromium.org>
Reviewed-by: Wei-Han Chen <stimim@chromium.org>
diff --git a/py/test/event.py b/py/test/event.py
index 66b6cf7..73eb2ad 100644
--- a/py/test/event.py
+++ b/py/test/event.py
@@ -441,6 +441,21 @@
"""
raise NotImplementedError
+ def wait(self, condition, timeout=None):
+ """Waits for an event matching a condition.
+
+ Args:
+ condition: A function to evaluate. The function takes one
+ argument (an event to evaluate) and returns whether the condition
+ applies.
+ timeout: A timeout in seconds, or None to wait forever.
+
+ Returns:
+ The event that matched the condition, or None if the connection
+ was closed or timeout.
+ """
+ return self.request_response(None, condition, timeout)
+
class BlockingEventClient(EventClientBase):
"""A blocking event client.
@@ -465,21 +480,6 @@
if event and check_response(event):
return event
- def wait(self, condition, timeout=None):
- """Waits for an event matching a condition.
-
- Args:
- condition: A function to evaluate. The function takes one
- argument (an event to evaluate) and returns whether the condition
- applies.
- timeout: A timeout in seconds, or None to wait forever.
-
- Returns:
- The event that matched the condition, or None if the connection
- was closed or timeout.
- """
- return self.request_response(None, condition, timeout)
-
class ThreadingEventClient(EventClientBase):
"""A threaded event client.
diff --git a/py/test/pytests/shutdown/shutdown.py b/py/test/pytests/shutdown/shutdown.py
index 78f60ac..2c6d3fc 100644
--- a/py/test/pytests/shutdown/shutdown.py
+++ b/py/test/pytests/shutdown/shutdown.py
@@ -58,6 +58,7 @@
from cros.factory.test.i18n import _
from cros.factory.test.i18n import test_ui as i18n_test_ui
from cros.factory.test import state
+from cros.factory.test.test_lists import test_object
from cros.factory.test import test_ui
from cros.factory.test import ui_templates
from cros.factory.test.utils import audio_utils
@@ -148,9 +149,9 @@
]
def setUp(self):
- assert self.args.operation in (factory.ShutdownStep.REBOOT,
- factory.ShutdownStep.FULL_REBOOT,
- factory.ShutdownStep.HALT)
+ assert self.args.operation in (test_object.ShutdownStep.REBOOT,
+ test_object.ShutdownStep.FULL_REBOOT,
+ test_object.ShutdownStep.HALT)
self.dut = device_utils.CreateDUTInterface()
self.ui = test_ui.UI(css=_CSS)
self.template = ui_templates.OneSection(self.ui)
@@ -179,7 +180,7 @@
'wait_shutdown_secs': self.args.wait_shutdown_secs,
}
- with test_event.BlockingEventClient() as event_client:
+ with test_event.ThreadingEventClient() as event_client:
event_client.post_event(
test_event.Event(
test_event.Event.Type.PENDING_SHUTDOWN, **pending_shutdown_data))
@@ -266,7 +267,7 @@
if last_shutdown_time > now:
LogAndEndTest(status=state.TestState.FAILED,
error_msg='Time moved backward during reboot')
- elif (self.args.operation == factory.ShutdownStep.REBOOT and
+ elif (self.args.operation == test_object.ShutdownStep.REBOOT and
self.args.max_reboot_time_secs and
(now - last_shutdown_time > self.args.max_reboot_time_secs)):
# A reboot took too long; fail. (We don't check this for
@@ -314,17 +315,18 @@
self.PreShutdown()
end_time = time.time() + self.args.wait_shutdown_secs
- if self.args.operation in (factory.ShutdownStep.REBOOT,
- factory.ShutdownStep.FULL_REBOOT):
+ if self.args.operation in (test_object.ShutdownStep.REBOOT,
+ test_object.ShutdownStep.FULL_REBOOT):
checkpoints = [DUT_NOT_READY_CHECKPOINT, DUT_READY_CHECKPOINT]
else:
checkpoints = [DUT_NOT_READY_CHECKPOINT, DUT_WAIT_SHUTDOWN]
# TODO(akahuang): Make shutdown command as system module
command_table = {
- factory.ShutdownStep.REBOOT: ['shutdown -r now'],
- factory.ShutdownStep.FULL_REBOOT: ['ectool reboot_ec cold at-shutdown',
- 'shutdown -r now'],
- factory.ShutdownStep.HALT: ['shutdown -h now']}
+ test_object.ShutdownStep.REBOOT: ['shutdown -r now'],
+ test_object.ShutdownStep.FULL_REBOOT: [
+ 'ectool reboot_ec cold at-shutdown',
+ 'shutdown -r now'],
+ test_object.ShutdownStep.HALT: ['shutdown -h now']}
for command in command_table[self.args.operation]:
self.dut.Call(command)
while checkpoints:
diff --git a/py/test/pytests/shutdown/shutdown_automator.py b/py/test/pytests/shutdown/shutdown_automator.py
index 07b8c9c..f000324 100644
--- a/py/test/pytests/shutdown/shutdown_automator.py
+++ b/py/test/pytests/shutdown/shutdown_automator.py
@@ -7,7 +7,7 @@
import logging
import factory_common # pylint: disable=unused-import
-from cros.factory.test import factory
+from cros.factory.test.test_lists import test_object
from cros.factory.test.e2e_test.common import AutomationMode
from cros.factory.test.e2e_test.automator import Automator, AutomationFunction
@@ -20,7 +20,7 @@
@AutomationFunction(automation_mode=AutomationMode.FULL,
wait_for_factory_test=False)
def automateSkipHalt(self):
- if self.args.operation == factory.ShutdownStep.HALT:
+ if self.args.operation == test_object.ShutdownStep.HALT:
# Skip the test right after it is loaded.
logging.info('Skip halt in full automation mode.')
self.uictl.WaitForContent(search_text='Shutdown Test')
diff --git a/py/test/pytests/shutdown/shutdown_e2etest.py b/py/test/pytests/shutdown/shutdown_e2etest.py
index e52914c..22e0564 100644
--- a/py/test/pytests/shutdown/shutdown_e2etest.py
+++ b/py/test/pytests/shutdown/shutdown_e2etest.py
@@ -55,7 +55,7 @@
@e2e_test.E2ETestCase()
@mock.patch.object(state, 'get_instance',
return_value=_goofy)
- @mock.patch.object(shutdown, 'BlockingEventClient',
+ @mock.patch.object(shutdown, 'ThreadingEventClient',
return_value=_event_client)
def testReboot(self, mock_event_client, mock_get_state_instance):
# Set 'post_shutdown' to None.
@@ -73,7 +73,7 @@
@e2e_test.E2ETestCase()
@mock.patch.object(state, 'get_instance',
return_value=_goofy)
- @mock.patch.object(shutdown, 'BlockingEventClient',
+ @mock.patch.object(shutdown, 'ThreadingEventClient',
return_value=_event_client)
def testRebootAborted(self, mock_event_client, mock_get_state_instance):
# Set 'post_shutdown' to None.