blob: 8cba96588ba5de6e32f6cabafa902e3f858a7c52 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The finalize test is the last step before DUT switching to release image.
The test checks if all tests are passed, and checks the hardware
write-protection, charge percentage. Then it invoke gooftool finalize with
specified arguments to switch the machine to release image.
"""
import logging
import os
import re
import threading
import time
import unittest
import yaml
import factory_common # pylint: disable=W0611
from cros.factory import system
from cros.factory.gooftool import Gooftool
from cros.factory.system.state import SystemInfo
from cros.factory.test import factory
from cros.factory.test import gooftools
from cros.factory.test import phase
from cros.factory.test import shopfloor
from cros.factory.test import test_ui
from cros.factory.test import ui_templates
from cros.factory.test.args import Arg
from cros.factory.test.event_log import Log
from cros.factory.test.test_ui import MakeLabel
MSG_CHECKING = MakeLabel('Checking system status for finalization...',
'正在检查系统是否已可执行最终程序...')
MSG_NOT_READY = MakeLabel('System is not ready.<br>'
'Please fix RED tasks and then press SPACE.',
'系统尚未就绪。<br>'
'请修正红色项目后按空白键重新检查。')
MSG_NOT_READY_POLLING = MakeLabel('System is NOT ready. Please fix RED tasks.',
'系统尚未就绪。请修正红色项目。')
MSG_FORCE = MakeLabel('Press “f” to force starting finalization procedure.',
'按下 「f」 键以强迫开始最终程序。')
MSG_READY = MakeLabel('System is READY. Press SPACE to start FINALIZATION.',
'系统已準备就绪。 请按空白键开始最终程序!')
MSG_FINALIZING = MakeLabel(
'Finalizing, please wait.<br>'
'Do not restart the device or terminate this test,<br>'
'or the device may become unusable.',
'正在开始最终程序,请稍等.<br>'
'不要重启机器或停止测试,<br>'
'不然机器将无法开机。')
class Finalize(unittest.TestCase):
"""The main class for finalize pytest."""
ARGS = [
Arg('write_protection', bool,
'Check write protection.', default=True),
Arg('polling_seconds', (int, type(None)),
'Interval between updating results (None to disable polling).',
default=5),
Arg('allow_force_finalize', list,
'List of users as strings allowed to force finalize, supported '
'users are operator or engineer.',
default=['operator', 'engineer']),
Arg('min_charge_pct', int,
'Minimum battery charge percentage allowed (None to disable '
'checking charge level)',
optional=True),
Arg('max_charge_pct', int,
'Maximum battery charge percentage allowed (None to disable '
'checking charge level)',
optional=True),
Arg('secure_wipe', bool,
'Wipe the stateful partition securely (False for a fast wipe).',
default=True),
Arg('upload_method', str,
'Upload method for "gooftool finalize"',
optional=True),
Arg('waive_tests', list,
'Do not require certain tests to pass. This is a list of elements; '
'each element must either be a regular expression of test path, '
'or a tuple of regular expression of test path and a regular '
'expression that must match the error message in order to waive the '
'test. If regular expression of error message is empty, the test '
'can be waived if it is either UNTESTED or FAILED. '
r'e.g.: [(r"^FATP\.FooBar$", r"Timeout"), (r"Diagnostic\..*")] will '
'waive FATP.FooBar test if error message starts with Timeout. It '
'will also waive all Diagnostic.* tests, either UNTESTED or FAILED. '
'Error messages may be multiline (e.g., stack traces) so this is a '
'multiline match. This is a Python re.match operation, so it will '
'match from the beginning of the error string.',
default=[]),
Arg('hwid_version', int,
'Version of HWID library to use in gooftool.', default=3,
optional=True),
Arg('enable_shopfloor', bool,
'Perform shopfloor operations: update HWID data and flush event '
'logs.', default=True),
Arg('sync_event_logs', bool, 'Sync event logs to shopfloor',
default=True),
Arg('rma_mode', bool,
'Enable rma_mode, do not check for deprecated components.',
default=False, optional=True),
Arg('is_cros_core', bool,
'For ChromeOS Core device, skip verifying branding and setting'
'firmware bitmap locale.',
default=False, optional=True),
Arg('wipe_in_place', bool,
'Wipe the stateful partition directly in a tmpfs without reboot. '
'False for legacy implementation to invoke wiping under '
'release image after reboot.',
default=True, optional=True),
Arg('cutoff_options', dict,
'Battery cutoff options after wiping. Only used when wipe_in_place'
'is set to true. Should be a dict with following optional keys:\n'
'- "method": The cutoff method after wiping. Value should be one of'
' {shutdown, reboot, battery_cutoff, battery_cutoff_at_shutdown}\n'
'- "check_ac": Allowed AC state when performing battery cutoff'
' Value should be one of {remove_ac, connect_ac}\n'
'- "min_battery_percent": Minimum battery percentage allowed\n'
'- "max_battery_percent": Maximum battery percentage allowed\n'
'- "min_battery_voltage": Minimum battery voltage allowed\n'
'- "max_battery_voltage": Maximum battery voltage allowed',
optional=True),
Arg('enforced_release_channels', list,
'A list of string indicating the enforced release image channels. '
'Each item should be one of "dev", "beta" or "stable".',
default=None, optional=True),
]
def setUp(self):
self.ui = test_ui.UI()
self.template = ui_templates.OneSection(self.ui)
self.force = False
self.go_cond = threading.Condition()
self.test_states_path = os.path.join(factory.get_log_root(),
'test_states')
self.gooftool = Gooftool(hwid_version=self.args.hwid_version)
# Set of waived tests.
self.waived_tests = set()
# Normalize 0 to None (since various things, e.g.,
# Condition.wait(timeout), treat 0 and None differently.
if self.args.polling_seconds == 0:
self.args.polling_seconds = None
def runTest(self):
# Check waived_tests argument. (It must be empty at DVT and
# beyond.)
phase.AssertStartingAtPhase(
phase.DVT,
not self.args.waive_tests,
'Tests may not be waived; set of waived tests is %s' % (
self.args.waive_tests))
phase.AssertStartingAtPhase(phase.PVT, self.args.write_protection,
'Write protection must be enabled')
# Check for HWID bundle update from shopfloor.
if self.args.enable_shopfloor:
shopfloor.update_local_hwid_data()
# Preprocess waive_tests: turn it into a list of tuples where the
# first element is the regular expression of test id and the second
# is the regular expression of error messages.
for i, w in enumerate(self.args.waive_tests):
if isinstance(w, str):
w = (w, '') # '' matches anything
self.assertTrue(isinstance(w, tuple) and
len(w) == 2,
'Invalid waive_tests element %r' % (w,))
self.args.waive_tests[i] = (re.compile(w[0]),
re.compile(w[1], re.MULTILINE))
test_list = self.test_info.ReadTestList()
test_states = test_list.as_dict(
factory.get_state_instance().get_test_states())
with open(self.test_states_path, 'w') as f:
yaml.dump(test_states, f)
Log('test_states', test_states=test_states)
def Go(force=False):
with self.go_cond:
if self.ForcePermissions():
self.force = force
self.go_cond.notify()
self.ui.BindKey(' ', lambda _: Go(False))
self.ui.BindKey('F', lambda _: Go(True))
thread = threading.Thread(target=self.Run)
thread.start()
self.ui.Run()
def Run(self):
try:
self.LogImageVersion()
self.RunPreflight()
self.template.SetState(MSG_FINALIZING)
self.DoFinalize()
self.ui.Fail('Should never be reached')
except Exception, e: # pylint: disable=W0703
self.ui.Fail('Exception during finalization: %s' % e)
def LogImageVersion(self):
system_info = SystemInfo()
release_image_version = system_info.release_image_version
factory_image_version = system_info.factory_image_version
if release_image_version:
logging.info('release image version: %s', release_image_version)
else:
self.ui.Fail('Can not determine release image version')
if factory_image_version:
logging.info('factory image version: %s', factory_image_version)
else:
self.ui.Fail('Can not determine factory image version')
Log('finalize_image_version',
factory_image_version=factory_image_version,
release_image_version=release_image_version)
def RunPreflight(self):
power = system.GetBoard(self.dut).power
def CheckRequiredTests():
"""Returns True if all tests (except waived tests) have passed."""
test_list = self.test_info.ReadTestList()
state_map = factory.get_state_instance().get_test_states()
self.waived_tests = set()
for k, v in state_map.iteritems():
test = test_list.lookup_path(k)
if not test:
# Test has been removed (e.g., by updater).
continue
if test.subtests:
# There are subtests. Don't check the parent itself (only check
# the children).
continue
if v.status == factory.TestState.FAILED_AND_WAIVED:
# The test is explicitly waived in the test list.
continue
if v.status == factory.TestState.UNTESTED:
# See if it's been waived. The regular expression of error messages
# must be empty string.
for regex_path, regex_error_msg in self.args.waive_tests:
if regex_path.match(k) and not regex_error_msg.pattern:
self.waived_tests.add(k)
logging.info('Waived UNTESTED test %r', k)
break
else:
# It has not been waived.
return False
if v.status == factory.TestState.FAILED:
# See if it's been waived.
for regex_path, regex_error_msg in self.args.waive_tests:
if regex_path.match(k) and regex_error_msg.match(v.error_msg):
self.waived_tests.add(k)
logging.info('Waived FAILED test %r', k)
break
else:
# It has not been waived.
return False
return True
items = [(CheckRequiredTests,
MakeLabel('Verify all tests passed',
'确认测试项目都已成功了')),
(lambda: (
self.gooftool.CheckDevSwitchForDisabling() in (True, False)),
MakeLabel('Turn off Developer Switch',
'停用开发者开关(DevSwitch)'))]
if self.args.min_charge_pct:
items.append((lambda: (power.CheckBatteryPresent() and
power.GetChargePct() >= self.args.min_charge_pct),
MakeLabel('Charge battery to %d%%' %
self.args.min_charge_pct,
'充电到%d%%' %
self.args.min_charge_pct)))
if self.args.max_charge_pct:
items.append((lambda: (power.CheckBatteryPresent() and
power.GetChargePct() <= self.args.max_charge_pct),
MakeLabel('Discharge battery to %d%%' %
self.args.max_charge_pct,
'放电到%d%%' %
self.args.max_charge_pct)))
if self.args.write_protection:
items += [(lambda: self.gooftool.VerifyWPSwitch() == None,
MakeLabel('Enable write protection pin',
'确认硬体写入保护已开启'))]
self.template.SetState(
'<table style="margin: auto; font-size: 150%"><tr><td>' +
'<div id="finalize-state">%s</div>' % MSG_CHECKING +
'<table style="margin: auto"><tr><td>' +
'<ul id="finalize-list" style="margin-top: 1em">' +
''.join(['<li id="finalize-%d">%s' % (i, item[1])
for i, item in enumerate(items)]),
'</ul>'
'</td></tr></table>'
'</td></tr></table>')
def UpdateState():
'''Polls and updates the states of all checklist items.
Returns:
True if all have passed.
'''
all_passed = True
js = []
for i, item in enumerate(items):
try:
passed = item[0]()
except: # pylint: disable=W0702
logging.exception('Error evaluating finalization condition')
passed = False
js.append('$("finalize-%d").className = "test-status-%s"' % (
i, 'passed' if passed else 'failed'))
all_passed = all_passed and passed
self.ui.RunJS(';'.join(js))
if not all_passed:
msg = (MSG_NOT_READY_POLLING if self.args.polling_seconds
else MSG_NOT_READY)
if self.ForcePermissions():
msg += '<div>' + MSG_FORCE + '</div>'
self.ui.SetHTML(msg, id='finalize-state')
return all_passed
with self.go_cond:
first_time = True
while not self.force:
if UpdateState():
# All done!
if first_time and not self.args.polling_seconds:
# Succeeded on the first try, and we're not polling; wait
# for a SPACE keypress.
self.ui.SetHTML(MSG_READY, id='finalize-state')
self.go_cond.wait()
return
# Wait for a "go" signal, up to polling_seconds (or forever if
# not polling).
self.go_cond.wait(self.args.polling_seconds)
first_time = False
def Warn(self, message, times=3):
"""Alerts user that a required test is bypassed."""
for i in range(times, 0, -1):
factory.console.warn(
'%s. '
'THIS DEVICE CANNOT BE QUALIFIED. '
'(will continue in %d seconds)', message, i)
time.sleep(1)
def ForcePermissions(self):
"""Return true if there are permissions to force, false if not."""
for user in self.args.allow_force_finalize:
self.assertTrue(user in ['engineer', 'operator'],
'Invalid user %r in allow_force_finalize.' % user)
if user == 'engineer' and self.ui.InEngineeringMode():
return True
elif user == 'operator' and not self.ui.InEngineeringMode():
return True
return False
def NormalizeUploadMethod(self, method):
"""Builds the report file name and resolves variables."""
if method in [None, 'none']:
# gooftool accepts only 'none', not empty string.
return 'none'
if method == 'shopfloor':
method = 'shopfloor:%s#%s' % (shopfloor.get_server_url(),
shopfloor.get_serial_number())
logging.info('Using upload method %s', method)
return method
def DoFinalize(self):
upload_method = self.NormalizeUploadMethod(self.args.upload_method)
command = 'gooftool -v 4 finalize -i %d' % self.args.hwid_version
if self.waived_tests:
self.Warn('TESTS WERE WAIVED: %s.' % sorted(list(self.waived_tests)))
Log('waived_tests', waived_tests=sorted(list(self.waived_tests)))
if self.args.enable_shopfloor and self.args.sync_event_logs:
factory.get_state_instance().FlushEventLogs()
if not self.args.write_protection:
self.Warn('WRITE PROTECTION IS DISABLED.')
command += ' --no_write_protect'
if not self.args.secure_wipe:
command += ' --fast'
if self.args.wipe_in_place:
command += ' --wipe_in_place'
if self.args.cutoff_options:
cutoff_args = ''
for key, value in self.args.cutoff_options.iteritems():
cutoff_args += ' --%s %s' % (key.replace('_', '-'), str(value))
command += ' --cutoff_args "%s"' % cutoff_args
if shopfloor.is_enabled():
command += ' --shopfloor_url "%s"' % shopfloor.get_server_url()
command += ' --upload_method "%s"' % upload_method
command += ' --add_file "%s"' % self.test_states_path
if self.args.rma_mode:
command += ' --rma_mode'
logging.info('Using RMA mode. Accept deprecated components')
if self.args.is_cros_core:
command += ' --cros_core'
logging.info('ChromeOS Core device. Skip some check.')
if self.args.enforced_release_channels:
command += ' --enforced_release_channels %s' % (
' '.join(self.args.enforced_release_channels))
logging.info(
'Enforced release channels: %s.', self.args.enforced_release_channels)
gooftools.run(command)
# If using wipe_in_place in the above gooftool command,
# factory service should be stopped here.
if self.args.wipe_in_place:
time.sleep(60)
# The following line should not be reached.
self.ui.Fail('Failed to wipe in place')
return
# It will reach the following if not using wipe_in_place in the above
# gooftool command.
if shopfloor.is_enabled():
shopfloor.finalize()
# TODO(hungte): Use Reboot in test list to replace this, or add a
# key-press check in developer mode.
os.system('sync; sleep 3; shutdown -r now')
time.sleep(60)
self.ui.Fail('Unable to shutdown')