blob: ab54260861829f2060d9bb28df28101c6e925562 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import time
import unittest
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.test.fixture import bft_fixture
from cros.factory.test.i18n import test_ui as i18n_test_ui
from cros.factory.test import test_ui
from cros.factory.test import ui_templates
from cros.factory.test.utils import stress_manager
from cros.factory.utils.arg_utils import Arg
_HTML = """
<div id='container'>
<div id='countdown'></div>
<div id='voltage'></div>
<div id='current'></div>
</div>
"""
_VOLTAGE_FMT_STR = lambda voltage: i18n_test_ui.MakeI18nLabel(
'Voltage: {voltage} mV', voltage=voltage)
_CURRENT_FMT_STR = lambda current: i18n_test_ui.MakeI18nLabel(
'Current: {current} mA', current=current)
_COUNTDOWN_FMT_STR = lambda secs: i18n_test_ui.MakeI18nLabel(
'Count Down: {secs} s', secs=secs)
class PowerUnderStressTest(unittest.TestCase):
"""Measure the power consumption (voltage and current) under heavy load."""
ARGS = [
Arg('bft_fixture', dict, bft_fixture.TEST_ARG_HELP, default=None),
Arg('adb_remote_test', bool, 'Run test against remote ADB target.',
default=False),
Arg('seconds', int,
'Time to execute the stressapptest.', default=60),
Arg('memory_ratio', float,
'Fraction of free memory', default=0.9),
Arg('free_memory_only', bool,
'Only use free memory for test. When set to True, only '
'memory_radio * free_memory are used for stressapptest.',
default=False),
Arg('wait_secs', int,
'Time to wait in seconds before executing stressapptest.',
default=30),
Arg('disk_thread', bool,
'stress disk using -f argument of stressapptest.',
default=True),
Arg('usb_port_id', int, 'The ID of USB port connected to charger',
default=0, optional=True),
Arg('voltage_threshold_min', int,
'Minimum voltage (mV) allowed through out the entire test.',
default=None, optional=True),
Arg('voltage_threshold_max', int,
'Maximum voltage (mV) allowed through out the entire test.',
default=None, optional=True),
Arg('current_threshold_min', int,
'Minimum current (mA) allowed through out the entire test.',
default=None, optional=True),
Arg('current_threshold_max', int,
'Maximum current (mA) allowed through out the entire test.',
default=None, optional=True),
]
def setUp(self):
if self.args.bft_fixture is not None:
self._bft_fixture = bft_fixture.CreateBFTFixture(**self.args.bft_fixture)
if not self._bft_fixture.IsParallelTest():
if self._adb_remote_test:
self._bft_fixture.SetDeviceEngaged('ADB_HOST', engage=True)
else:
self._bft_fixture.SetDeviceEngaged('USB3', engage=True)
else:
self._bft_fixture = None
self._dut = device_utils.CreateDUTInterface()
self._ui = test_ui.UI()
self._template = ui_templates.OneSection(self._ui)
self._template.SetState(_HTML)
def UpdateState(self, elapsed, voltage, current):
self._ui.SetHTML(_COUNTDOWN_FMT_STR(self.args.wait_secs - elapsed),
id='countdown')
self._ui.SetHTML(_VOLTAGE_FMT_STR(voltage), id='voltage')
self._ui.SetHTML(_CURRENT_FMT_STR(current), id='current')
def runTest(self):
self._ui.Run(blocking=False)
try:
with stress_manager.StressManager(self._dut).Run(
duration_secs=None,
memory_ratio=self.args.memory_ratio,
free_memory_only=self.args.free_memory_only,
disk_thread=self.args.disk_thread):
for elapsed in xrange(1, self.args.wait_secs + 1):
time.sleep(1)
if self._bft_fixture:
ina_values = self._bft_fixture.ReadINAValues()
voltage = ina_values['voltage']
current = ina_values['current']
else:
usb_port_info = (
self._dut.power.GetUSBPDPowerInfo()[self.args.usb_port_id])
voltage, current = usb_port_info.voltage, usb_port_info.current
self.UpdateState(elapsed, voltage, current)
if self.args.voltage_threshold_min:
self.assertTrue(voltage >= self.args.voltage_threshold_min)
if self.args.voltage_threshold_max:
self.assertTrue(voltage <= self.args.voltage_threshold_max)
if self.args.current_threshold_min:
self.assertTrue(current >= self.args.current_threshold_min)
if self.args.current_threshold_max:
self.assertTrue(current <= self.args.current_threshold_max)
except stress_manager.StressManagerError as e:
logging.error('StressAppTest failed: %s', e)
raise e