blob: 4520a4335488f5ddc22a7c9e46bed8861ece3216 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
gooftool: Google Factory Tool, providing all Google Required Test
functionality.
"""
# TODO(tammo): Refactor to match Chromium style.
import copy
import glob
import logging
import optparse
import os
import re
import sys
import tempfile
import yaml
########################################################################
# cros/gft modules
import crosfw
import gft_common
import gft_report
import gft_upload
import gft_vpd
import gft_wpfw
import gpio
import hwid_tool
import probe
from common import Obj, SetupLogging
from gft_common import WarningMsg, VerboseMsg, DebugMsg, ErrorMsg, ErrorDie
#######################################################################r
# global variables and cached objects
g_options = None
g_args = None
# list of commands
g_commands = []
# gooftool version
VERSION = 2
# cached objects
g_cached_device_data = None
########################################################################
# factory utilities
def GetPrimaryDevicePath(partition=None):
def isfixed(dev):
sysfs_path = '/sys/block/%s/removable' % dev
return (os.path.exists(sysfs_path) and
open(sysfs_path).read().strip() == '0')
alpha_re = re.compile(r'^/dev/([a-zA-Z]+)[0-9]+$')
alnum_re = re.compile(r'^/dev/([a-zA-Z]+[0-9]+)p[0-9]+$')
dev_set = set(
dev
for path in gft_common.SystemOutput('cgpt find -t rootfs').split()
for dev in alpha_re.findall(path) + alnum_re.findall(path)
if isfixed(dev))
if len(dev_set) != 1:
ErrorDie('zero or multiple primary devs: %s' % dev_set)
dev_path = '/dev/' + dev_set.pop()
if partition is None:
return dev_path
fmt_str = '%sp%d' if alnum_re.match(dev_path) else '%s%d'
return fmt_str % (dev_path, partition)
def GetReleaseRootPartitionPath():
return GetPrimaryDevicePath(5)
def GetReleaseKernelPartitionPath():
return GetPrimaryDevicePath(4)
def VerifySwitch(name, value):
""" Verifies if hardware switches are in correct states. """
my_gpio = gpio.Gpio()
my_gpio.setup()
try:
DebugMsg('reading GPIO %s (expected %s)' % (name, value))
if value != my_gpio.read(name):
VerboseMsg('VerifySwitch: %s is not in proper state.' % name)
return False
except:
ErrorDie('VerifySwitch: cannot read: ' + name)
return True
def EnableWriteProtect(target, image):
""" Enables and verifies firmware write protection. """
return gft_wpfw.EnableWriteProtect(target, image)
def VerboseSystemCommand(command, prompt=None):
""" Invokes command and put stdout/stderr messages into verbose log. """
VerboseMsg(" # Executing command: %s" % command)
(_, stdout, stderr) = gft_common.ShellExecution(
command, show_progress=g_options.verbose, progress_message=prompt)
message = '\n'.join((stdout.strip(), stderr.strip())).strip()
if message:
VerboseMsg(message)
def GFTCommand(f):
""" Decorator for gooftool base commands. """
def wrapper(*args, **kw):
try:
DebugMsg('@Starting to execute command: %s.' % f.__name__)
result = f(*args, **kw)
DebugMsg('@Executed command: %s. Result: %s' %
(f.__name__, "SUCCESS"
if result is not False else "NOT SUCCESS"))
return result
except gft_common.GFTError, e:
DebugMsg('@Executed command: %s. Result: ERROR - %s' %
(f.__name__, e.value))
raise
except:
DebugMsg('@Executed command: %s. Result: FAILED.' % f.__name__)
raise
wrapper.__name__ = f.__name__
wrapper.__doc__ = re.sub(r'\s+', ' ', f.__doc__).strip()
g_commands.append(wrapper)
return wrapper
def GFTCommandDependency(command_list):
"""Decorator to define the depenedncy of a gooftool command.
Args
command_list: a list or tuple of command functions to be executed in order.
Raises
GFTError if any of the commands in dependency return False.
"""
def meta_wrapper(f):
def wrapper(*args, **kargs):
for command in command_list:
if not command():
ErrorDie('%s: failed in sub-command: %s.' %
(f.__name__, command.__name__))
return f(*args, **kargs)
wrapper.__name__ = f.__name__
wrapper.__doc__ = ('Runs: %s. %s' %
(', '.join(['--%s' % command.__name__
for command in command_list]),
f.__doc__))
return wrapper
return meta_wrapper
########################################################################
# standard procedure
# (function names are used for self-reflection to command line parameters)
# HWID, GBB, and hardware components validation
@GFTCommand
def write_hwid():
"""Write specified HWID value into the system GBB."""
if not g_options.hwid:
ErrorDie('write_hwid: No HWID specified (must specify using --hwid arg).')
hwid = g_options.hwid
DebugMsg('write_hwid: using hwid string %r' % hwid)
main_fw_file = crosfw.LoadMainFirmware().GetFileName()
cmd = ('gbb_utility --set --hwid="%s" "%s"' % (hwid, main_fw_file))
gft_common.System(cmd)
flashrom = crosfw.Flashrom(crosfw.TARGET_MAIN)
flashrom.Write(filename=main_fw_file, sections=['GBB'])
return True
@GFTCommand
def run_probe():
"""Print yaml-formatted breakdown of probed device properties."""
SetupLogging(logging.DEBUG, g_options.probe_log)
probe_results = probe.Probe()
print yaml.dump(probe_results.__dict__, default_flow_style=False)
return True
@GFTCommand
def verify_hwid():
"""Verify system HWID properties match probed device properties.
First probe components, volatile and initial_config parameters for
the DUT. Then use the available device data to produce a list of
candidate HWIDs. Then verify the HWID from the DUT is present in
that list. Finally, verify that the DUT initial config values match
those specified for its HWID.
"""
main_fw_file = crosfw.LoadMainFirmware().GetFileName()
gbb_result = gft_common.SystemOutput(
'gbb_utility -g --hwid %s' % main_fw_file)
hwid = re.findall(r'hardware_id:(.*)', gbb_result)[0].strip()
DebugMsg('system HWID: %r\n' % hwid)
try:
data = hwid_tool.ReadDatastore(g_options.db_path)
hwid_properties = hwid_tool.LookupHwidProperties(data, hwid)
board = hwid_properties.board
DebugMsg('expected system properties:\n%s' %
yaml.dump(hwid_properties.__dict__, default_flow_style=False))
probe_results = probe.Probe()
cooked_results = hwid_tool.CookProbeResults(data, probe_results, board)
DebugMsg('found system properties:\n%s' %
yaml.dump(cooked_results.__dict__, default_flow_style=False))
global g_cached_device_data
g_cached_device_data = Obj(
hwid_properties=hwid_properties,
cooked_results=cooked_results)
except hwid_tool.Error, e:
ErrorDie('verify_hwid: FAILED.\n%s' % e)
match_errors = []
for comp_class, expected_name in hwid_properties.component_map.items():
if expected_name == 'ANY':
continue
if expected_name == cooked_results.matched_components.get(comp_class, None):
continue
if comp_class in probe_results.missing_components:
match_errors.append(' %s component mismatch, expected %s, found nothing'
% (comp_class, expected_name))
else:
probe_value = probe_results.found_components.get(comp_class, None)
match_errors.append(' %s component mismatch, expected %s, found %r' %
(comp_class, expected_name, probe_value))
if match_errors:
ErrorDie('verify_hwid: FAILED.\n%s' % '\n'.join(match_errors))
if hwid_properties.volatile not in cooked_results.matched_volatile_tags:
msg = (' HWID specified volatile %s, but found match only for %s' %
(hwid_properties.volatile,
', '.join(cooked_results.matched_volatile_tags)))
ErrorDie('verify_hwid: FAILED.\n%s' % msg)
if (hwid_properties.initial_config is not None and
hwid_properties.initial_config not in
cooked_results.matched_initial_config_tags):
msg = (' HWID specified initial_config %s, but found match only for %s' %
(hwid_properties.initial_config,
', '.join(cooked_results.matched_initial_config_tags)))
ErrorDie('verify_hwid: FAILED.\n%s' % msg)
# TODO(tammo): Verify HWID status is supported (or deprecated for RMA).
# TODO(tammo): Verify VPD here, using hwid_properties.vpd_ro_field_list.
print 'Verified: %s' % hwid
return True
@GFTCommand
def verify_keys():
""" Verifies if the keys in firmware and SSD are matched. """
script = os.path.join(os.path.split(sys.argv[0])[0], 'gft_verify_keys.sh')
if not os.path.exists(script):
ErrorDie('verify_keys: cannot find script to verify.')
kernel_device = GetReleaseKernelPartitionPath()
main_fw_path = crosfw.LoadMainFirmware().GetFileName()
command = '%s %s %s' % (script, kernel_device, main_fw_path)
try:
VerboseSystemCommand(command, prompt='verify_keys: ')
except gft_common.GFTError, e:
ErrorDie('verify_keys: FAILED.\n%s' % e.value)
return True
@GFTCommand
def verify_vpd():
""" Verifies if the VPD contains valid required data. """
try:
# TODO(tammo): Verify that all of the VPD fields specified for the
# board (using hwid properties) have values.
image_file = crosfw.LoadMainFirmware().GetFileName()
gft_vpd.ValidateVpdData(image_file, g_options.verbose)
gft_vpd.SetFirmwareBitmapLocale(image_file)
except gft_common.GFTError, e:
ErrorDie('verify_vpd: FAILED.\n%s' % e.value)
return True
@GFTCommand
def verify_system_time():
"""Verifies if system time is later than release filesystem creation time."""
script = os.path.join(os.path.split(sys.argv[0])[0],
'gft_verify_system_time.sh')
if not os.path.exists(script):
ErrorDie('verify_system_time: cannot find script to verify.')
rootfs_device = GetReleaseRootPartitionPath()
command = '%s %s' % (script, rootfs_device)
try:
VerboseSystemCommand(command, prompt='verify_system_time: ')
except gft_common.GFTError, e:
ErrorDie('verify_system_time: FAILED.\n%s' % e.value)
return True
@GFTCommand
def verify_rootfs():
""" Verifies if the rootfs on SSD is valid (by checking hash). """
script = os.path.join(os.path.split(sys.argv[0])[0], 'gft_verify_rootfs.sh')
if not os.path.exists(script):
ErrorDie('verify_rootfs: cannot find script to verify.')
rootfs_device = GetReleaseRootPartitionPath()
command = '%s %s' % (script, rootfs_device)
try:
VerboseSystemCommand(command, prompt='verify_rootfs: ')
except gft_common.GFTError, e:
ErrorDie('verify_rootfs: FAILED.\n%s' % e.value)
return True
# Hardware Switches
@GFTCommand
def verify_switch_wp():
""" Verifies if hardware write protection switch is enabled. """
if VerifySwitch('write_protect', 1):
return True
ErrorDie('verify_switch_wp: failed')
@GFTCommand
def verify_switch_dev():
""" Verifies if developer switch is disabled. """
if VerifySwitch('developer_switch', 0):
return True
ErrorDie('verify_switch_dev: failed')
# Report Creation
@GFTCommand
def create_report():
""" Creates the report (with vpd, hwid, date, ...) for uploading. """
# decide log format
if g_options.report_format == 'gz':
text_armed = False
elif g_options.report_format == 'base64':
text_armed = True
else:
ErrorDie('create_report: invalid format: %s' % g_options.report_format)
tag = g_options.report_tag if g_options.report_tag else ''
if g_cached_device_data is None:
# TODO(tammo): Refactor verify_hwid to extract the probing and
# cooking from the actual validation, so that we can generate
# reports for machines where the hwid validation fails.
ErrorDie('create_report: must run verify_hwid first')
vpd_source = crosfw.LoadMainFirmware().GetFileName()
report = gft_report.CreateReport(sys.argv,
system_details=g_cached_device_data,
verbose_log_path=g_options.log_path,
vpd_source=vpd_source,
tag=tag,
verbose=g_options.debug)
data = gft_report.EncodeReport(report, text_armed=text_armed)
if not g_options.report_path:
g_options.report_path = os.path.join(tempfile.gettempdir(),
gft_report.GetReportName(data))
gft_common.WriteFile(g_options.report_path, data)
report_saved_message = 'Report saved in: %s' % g_options.report_path
# To help scripting, the message must be printed to console and logged.
print report_saved_message
gft_common.Log(report_saved_message)
return True
@GFTCommand
def upload_report():
""" Uploads a report (by --create_report) for tracking factory process. """
report_path = g_options.report_path
if not (report_path and os.path.exists(report_path)):
ErrorDie('upload_report: need an existing report file '
'(--report_path or --create_report)')
if not g_options.upload_method:
ErrorDie('upload_report: need proper --upload_method.')
# Always report in standard naming scheme.
report_blob = gft_common.ReadBinaryFile(report_path)
report_name = gft_report.GetReportName(report_blob)
local_file = os.path.join(tempfile.gettempdir(), report_name)
if local_file != report_path:
gft_common.WriteBinaryFile(local_file, report_blob)
if not gft_upload.Upload(local_file, g_options.upload_method):
ErrorDie('upload_report: failed to upload.')
return True
@GFTCommand
def wpfw():
""" Enables and verifies firmware write protection status. """
if g_options.debug_dryrun_wpfw:
ErrorMsg('wpfw is by-passed. This device CANNOT be qualified.')
return True
read_binary = gft_common.ReadBinaryFile
ec_fw = crosfw.LoadEcFirmware()
main_fw = crosfw.LoadMainFirmware()
target_set = [('bios', main_fw.GetFileName())]
if ec_fw.GetChipId() is not None:
target_set.insert(0, ('ec', ec_fw.path))
assert target_set[0][0] == 'ec'
else:
VerboseMsg('wpfw: current platform does not have EC. Ignored.')
assert len(target_set) > 0
for target_name, loader_api in target_set:
if not EnableWriteProtect(target_name, read_binary(loader_api())):
ErrorDie('wpfw: failed to enable firmware write protection.')
return True
@GFTCommand
def clear_gbb_flags():
""" Clears GBB Header Flags for shipping devices. """
script = os.path.join(os.path.split(sys.argv[0])[0],
'gft_clear_gbb_flags.sh')
command = '%s' % script
try:
VerboseSystemCommand(command, prompt='clear_gbb_flags: ')
except gft_common.GFTError, e:
ErrorDie('clear_gbb_flags: FAILED.\n%s' % e.value)
return True
@GFTCommand
def prepare_wipe():
""" Prepares the system for transitioning to release state in next reboot. """
if g_options.debug_dryrun_prepare_wipe:
ErrorMsg('prepare_wipe is by-passed. This device CANNOT be qualified.')
return True
wipe_script = os.path.join(os.path.split(sys.argv[0])[0],
'gft_prepare_wipe.sh')
if not os.path.exists(wipe_script):
ErrorDie('prepare_wipe: cannot find script to prepare for wiping.')
wipe_method_map = {
'fast': 'fast',
'secure': '',
}
method = g_options.wipe_method
if method not in wipe_method_map:
ErrorDie('prepare_wipe: unknown wipe method: %s' % method)
tag = wipe_method_map[method]
rootfs_device = GetReleaseRootPartitionPath()
command = 'FACTORY_WIPE_TAGS=%s %s %s' % (tag, wipe_script, rootfs_device)
try:
VerboseSystemCommand(command, prompt='prepare_wipe: ')
except gft_common.GFTError, e:
ErrorDie('prepare_wipe: FAILED.\n%s' % e.value)
return True
@GFTCommand
@GFTCommandDependency((verify_switch_dev,
verify_switch_wp,
verify_hwid,
verify_vpd,
verify_system_time,
verify_keys,
verify_rootfs))
def verify():
""" Verifies if whole factory process is ready for finalization. """
# See the dependency list for the real commands to be executed.
return True
@GFTCommand
@GFTCommandDependency((clear_gbb_flags,
verify,
wpfw,
create_report,
upload_report,
prepare_wipe))
def finalize():
""" Finalizes all factory tests and transit system into release state. """
# See the dependency list for the real commands to be executed.
return True
@GFTCommand
@GFTCommandDependency((clear_gbb_flags,
verify_hwid,
verify_vpd,
verify_system_time,
verify_keys,
verify_rootfs,
create_report,
upload_report,
prepare_wipe))
def developer_finalize():
""" Finalizes tests and leave the system in developer-friendly mode.
This is similar to 'finalize' command but not enforcing commands that would
prevent developers to restart factory process, for example write protection
(wpfw) and non-developer mode (verify_switch_dev).
"""
# See the dependency list for the real commands to be executed.
return True
########################################################################
# console command line options
def ConsoleParser():
""" command line option parser """
parser = optparse.OptionParser()
# Message control
parser.add_option('-d', '--debug', action='store_true',
help='provide debug messages')
parser.add_option('-v', '--verbose', action='store_true', default=False,
help='provide verbose messages')
parser.add_option('--log_path', metavar='PATH',
default=gft_common.DEFAULT_CONSOLE_LOG_PATH,
help='use the given path for verbose logs.')
# TODO(tammo): Remove this when gooftool converts to use logging module.
parser.add_option('--probe_log', metavar='PATH',
default='/var/log/factory_probe.log',
help='probe process details')
# Debugging
parser.add_option('--debug_dryrun_wpfw', action='store_true',
help='make --wpfw in dry-run mode (debugging only)')
parser.add_option('--debug_dryrun_prepare_wipe', action='store_true',
help='make --prepare_wipe in dry-run mode (debugging only)')
parser.add_option('--ignore_factory_initial', action='store_true',
help='ignore components in config_factory_initial.')
# Configuration
parser.add_option('--config', metavar='PATH',
help='path to the config file (TBD)')
# TODO(tammo): Consider removing db_path option once the hwid_tool
# has meaningful default path searching.
parser.add_option('--db_path', metavar='PATH',
default=hwid_tool.DEFAULT_HWID_DATA_PATH,
help='path to the HWID component database')
# TODO(tammo): Add proper support for per-command required positional args.
parser.add_option('--hwid', metavar='HWID', help='HWID string')
# Reports
parser.add_option('--report_path', metavar='PATH',
help='override the path for report file')
parser.add_option('--report_format', metavar='FORMAT', default='gz',
help='format of the generated report (see gft_report.py)')
parser.add_option('--report_tag', metavar='TAG', default='',
help='tag to be included in report (see gft_report.py), '
'current possible values may include rma, mp, pvt.')
parser.add_option('--upload_method', metavar='METHOD:PARAM',
help='assign the upload method (see gft_upload).')
# Wiping
parser.add_option('--wipe_method', metavar='METHOD', default='secure',
help='assign the wipe method (secure/fast).')
for command in g_commands:
parser.add_option('--%s' % command.__name__,
action='store_true',
help=command.__doc__)
return parser
########################################################################
# main entry
@gft_common.GFTConsole
def _main():
""" main entry """
global g_options, g_args
parser = ConsoleParser()
(g_options, g_args) = parser.parse_args()
if g_args:
parser.error('Un-expected parameter(s): %s\n' % ' '.join(g_args))
if g_options.debug:
g_options.verbose = True
gft_common.SetDebugLevel(g_options.debug)
gft_common.SetVerboseLevel(g_options.verbose, g_options.log_path)
DebugMsg('options: ' + repr(g_options) + '\nargs: ' + repr(g_args), log=False)
# execute commands by order.
executed_commands = 0
return_value = 0
for command in g_commands:
command_name = command.__name__
if not getattr(g_options, command_name):
continue
DebugMsg('COMMAND: ' + command_name, log=False)
if not command():
return_value = 1
executed_commands = executed_commands + 1
if executed_commands == 0:
parser.print_help()
return return_value
if __name__ == '__main__':
sys.exit(_main())