| #!/usr/bin/python |
| # |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ |
| gooftool: Google Factory Tool, providing all Google Required Test |
| functionality. |
| """ |
| |
| |
| # TODO(tammo): Refactor to match Chromium style. |
| |
| |
| import copy |
| import glob |
| import optparse |
| import os |
| import re |
| import sys |
| import tempfile |
| |
| ######################################################################## |
| # cros/gft modules |
| |
| import crosfw |
| import gft_common |
| import gft_report |
| import gft_upload |
| import gft_vpd |
| import gft_wpfw |
| import gpio |
| import hwid_tool |
| import probe |
| |
| from common import Obj |
| from gft_common import WarningMsg, VerboseMsg, DebugMsg, ErrorMsg, ErrorDie |
| |
| |
| #######################################################################r |
| # global variables and cached objects |
| |
| g_options = None |
| g_args = None |
| |
| # list of commands |
| g_commands = [] |
| |
| # gooftool version |
| VERSION = 2 |
| |
| # cached objects |
| g_cached_device_data = None |
| |
| |
| ######################################################################## |
| # factory utilities |
| |
| |
| def GetPrimaryDevicePath(partition=None): |
| def isfixed(dev): |
| sysfs_path = '/sys/block/%s/removable' % dev |
| return (os.path.exists(sysfs_path) and |
| open(sysfs_path).read().strip() == '0') |
| alpha_re = re.compile(r'^/dev/([a-zA-Z]+)[0-9]+$') |
| alnum_re = re.compile(r'^/dev/([a-zA-Z]+[0-9]+)p[0-9]+$') |
| dev_set = set( |
| dev |
| for path in gft_common.SystemOutput('cgpt find -t rootfs').split() |
| for dev in alpha_re.findall(path) + alnum_re.findall(path) |
| if isfixed(dev)) |
| if len(dev_set) != 1: |
| ErrorDie('zero or multiple primary devs: %s' % dev_set) |
| dev_path = '/dev/' + dev_set.pop() |
| if partition is None: |
| return dev_path |
| fmt_str = '%sp%d' if alnum_re.match(dev_path) else '%s%d' |
| return fmt_str % (dev_path, partition) |
| |
| |
| def GetReleaseRootPartitionPath(): |
| return GetPrimaryDevicePath(5) |
| |
| |
| def GetReleaseKernelPartitionPath(): |
| return GetPrimaryDevicePath(4) |
| |
| |
| def VerifySwitch(name, value): |
| """ Verifies if hardware switches are in correct states. """ |
| my_gpio = gpio.Gpio() |
| my_gpio.setup() |
| try: |
| DebugMsg('reading GPIO %s (expected %s)' % (name, value)) |
| if value != my_gpio.read(name): |
| VerboseMsg('VerifySwitch: %s is not in proper state.' % name) |
| return False |
| except: |
| ErrorDie('VerifySwitch: cannot read: ' + name) |
| return True |
| |
| |
| def EnableWriteProtect(target, image): |
| """ Enables and verifies firmware write protection. """ |
| return gft_wpfw.EnableWriteProtect(target, image) |
| |
| |
| def VerboseSystemCommand(command, prompt=None): |
| """ Invokes command and put stdout/stderr messages into verbose log. """ |
| VerboseMsg(" # Executing command: %s" % command) |
| (_, stdout, stderr) = gft_common.ShellExecution( |
| command, show_progress=g_options.verbose, progress_message=prompt) |
| message = '\n'.join((stdout.strip(), stderr.strip())).strip() |
| if message: |
| VerboseMsg(message) |
| |
| |
| def GFTCommand(f): |
| """ Decorator for gooftool base commands. """ |
| def wrapper(*args, **kw): |
| try: |
| DebugMsg('@Starting to execute command: %s.' % f.__name__) |
| result = f(*args, **kw) |
| DebugMsg('@Executed command: %s. Result: %s' % |
| (f.__name__, "SUCCESS" |
| if result is not False else "NOT SUCCESS")) |
| return result |
| except gft_common.GFTError, e: |
| DebugMsg('@Executed command: %s. Result: ERROR - %s' % |
| (f.__name__, e.value)) |
| raise |
| except: |
| DebugMsg('@Executed command: %s. Result: FAILED.' % f.__name__) |
| raise |
| wrapper.__name__ = f.__name__ |
| wrapper.__doc__ = re.sub(r'\s+', ' ', f.__doc__).strip() |
| g_commands.append(wrapper) |
| return wrapper |
| |
| |
| def GFTCommandDependency(command_list): |
| """Decorator to define the depenedncy of a gooftool command. |
| |
| Args |
| command_list: a list or tuple of command functions to be executed in order. |
| |
| Raises |
| GFTError if any of the commands in dependency return False. |
| """ |
| def meta_wrapper(f): |
| |
| def wrapper(*args, **kargs): |
| for command in command_list: |
| if not command(): |
| ErrorDie('%s: failed in sub-command: %s.' % |
| (f.__name__, command.__name__)) |
| return f(*args, **kargs) |
| |
| wrapper.__name__ = f.__name__ |
| wrapper.__doc__ = ('Runs: %s. %s' % |
| (', '.join(['--%s' % command.__name__ |
| for command in command_list]), |
| f.__doc__)) |
| return wrapper |
| return meta_wrapper |
| |
| |
| ######################################################################## |
| # standard procedure |
| # (function names are used for self-reflection to command line parameters) |
| |
| # HWID, GBB, and hardware components validation |
| |
| |
| @GFTCommand |
| def write_hwid(): |
| """Write specified HWID value into the system GBB.""" |
| if not g_options.hwid: |
| ErrorDie('write_hwid: No HWID specified (must specify using --hwid arg).') |
| hwid = g_options.hwid |
| DebugMsg('write_hwid: using hwid string %r' % hwid) |
| main_fw_file = crosfw.LoadMainFirmware().path |
| cmd = ('gbb_utility --set --hwid="%s" "%s"' % (hwid, main_fw_file)) |
| gft_common.System(cmd) |
| flashrom = crosfw.Flashrom(crosfw.TARGET_MAIN) |
| flashrom.Write(filename=main_fw_file, sections=['GBB']) |
| return True |
| |
| |
| @GFTCommand |
| def verify_hwid(): |
| """Verify system HWID properties match probed device properties. |
| |
| First probe components, volatile and initial_config parameters for |
| the DUT. Then use the available device data to produce a list of |
| candidate HWIDs. Then verify the HWID from the DUT is present in |
| that list. Finally, verify that the DUT initial config values match |
| those specified for its HWID. |
| """ |
| main_fw_file = crosfw.LoadMainFirmware().path |
| gbb_result = gft_common.SystemOutput('gbb_utility -g --hwid %s' % |
| main_fw_file) |
| hwid = re.findall(r'hardware_id:(.*)', gbb_result)[0].strip() |
| DebugMsg('system HWID: %r\n' % hwid) |
| try: |
| data = hwid_tool.ReadDatastore(g_options.db_path) |
| hwid_properties = hwid_tool.LookupHwidProperties(data, hwid) |
| DebugMsg('expected system properties: %s' % hwid_properties.__dict__) |
| probe_results = probe.Probe(data.comp_db.component_registry) |
| DebugMsg('found system properties: %s' % probe_results.__dict__) |
| cooked_components = hwid_tool.CookComponentProbeResults( |
| data.comp_db, probe_results) |
| DebugMsg('cooked components: %s' % cooked_components.__dict__) |
| device = data.device_db[hwid_properties.board] |
| cooked_device_details = hwid_tool.CookDeviceProbeResults( |
| device, probe_results) |
| DebugMsg('cooked device details: %s' % cooked_device_details.__dict__) |
| global g_cached_device_data |
| g_cached_device_data = Obj( |
| hwid_properties=hwid_properties, |
| probe_results=probe_results, |
| cooked_components=cooked_components, |
| cooked_device_details=cooked_device_details) |
| except hwid_tool.Error, e: |
| ErrorDie('verify_hwid: FAILED.\n%s' % e) |
| match_errors = [] |
| for comp_class, comp_value in hwid_properties.component_map.items(): |
| if comp_value == 'ANY': |
| continue |
| found_value = cooked_components.known.get(comp_class, None) |
| if found_value != comp_value: |
| if found_value is None: |
| alt_value = cooked_components.unknown.get(comp_class, None) |
| found_value = 'UNKNOWN %r' % alt_value |
| match_errors.append(' %s component mismatch, expected %s, found %s' % |
| (comp_class, comp_value, found_value)) |
| if match_errors: |
| ErrorDie('verify_hwid: FAILED.\n%s' % '\n'.join(match_errors)) |
| if hwid_properties.volatile not in cooked_device_details.volatile_set: |
| msg = (' HWID specified volatile %s, but found match only for %s' % |
| (hwid_properties.volatile, |
| ', '.join(cooked_device_details.volatile_set))) |
| ErrorDie('verify_hwid: FAILED.\n%s' % msg) |
| if (hwid_properties.initial_config not in |
| cooked_device_details.initial_config_set): |
| msg = (' HWID specified initial_config %s, but found match only for %s' % |
| (hwid_properties.initial_config, |
| ', '.join(cooked_device_details.initial_config_set))) |
| ErrorDie('verify_hwid: FAILED.\n%s' % msg) |
| # TODO(tammo): Verify HWID status is supported (or deprecated for RMA). |
| # TODO(tammo): Verify release string. |
| # TODO(tammo): Verify VPD here, using hwid_properties.vpd_ro_field_list. |
| print 'Verified: %s' % hwid |
| return True |
| |
| |
| @GFTCommand |
| def verify_keys(): |
| """ Verifies if the keys in firmware and SSD are matched. """ |
| script = os.path.join(os.path.split(sys.argv[0])[0], 'gft_verify_keys.sh') |
| if not os.path.exists(script): |
| ErrorDie('verify_keys: cannot find script to verify.') |
| kernel_device = GetReleaseKernelPartitionPath() |
| main_fw_path = crosfw.LoadMainFirmware().path |
| command = '%s %s %s' % (script, kernel_device, main_fw_path) |
| try: |
| VerboseSystemCommand(command, prompt='verify_keys: ') |
| except gft_common.GFTError, e: |
| ErrorDie('verify_keys: FAILED.\n%s' % e.value) |
| return True |
| |
| |
| @GFTCommand |
| def verify_vpd(): |
| """ Verifies if the VPD contains valid required data. """ |
| try: |
| # TODO(tammo): Verify that all of the VPD fields specified for the |
| # board (using hwid properties) have values. |
| image_file = crosfw.LoadMainFirmware().path |
| gft_vpd.ValidateVpdData(image_file, g_options.verbose) |
| gft_vpd.SetFirmwareBitmapLocale(image_file) |
| except gft_common.GFTError, e: |
| ErrorDie('verify_vpd: FAILED.\n%s' % e.value) |
| return True |
| |
| |
| @GFTCommand |
| def verify_system_time(): |
| """Verifies if system time is later than release filesystem creation time.""" |
| script = os.path.join(os.path.split(sys.argv[0])[0], |
| 'gft_verify_system_time.sh') |
| if not os.path.exists(script): |
| ErrorDie('verify_system_time: cannot find script to verify.') |
| rootfs_device = GetReleaseRootPartitionPath() |
| command = '%s %s' % (script, rootfs_device) |
| try: |
| VerboseSystemCommand(command, prompt='verify_system_time: ') |
| except gft_common.GFTError, e: |
| ErrorDie('verify_system_time: FAILED.\n%s' % e.value) |
| return True |
| |
| |
| @GFTCommand |
| def verify_rootfs(): |
| """ Verifies if the rootfs on SSD is valid (by checking hash). """ |
| script = os.path.join(os.path.split(sys.argv[0])[0], 'gft_verify_rootfs.sh') |
| if not os.path.exists(script): |
| ErrorDie('verify_rootfs: cannot find script to verify.') |
| rootfs_device = GetReleaseRootPartitionPath() |
| command = '%s %s' % (script, rootfs_device) |
| try: |
| VerboseSystemCommand(command, prompt='verify_rootfs: ') |
| except gft_common.GFTError, e: |
| ErrorDie('verify_rootfs: FAILED.\n%s' % e.value) |
| return True |
| |
| |
| # Hardware Switches |
| |
| @GFTCommand |
| def verify_switch_wp(): |
| """ Verifies if hardware write protection switch is enabled. """ |
| if VerifySwitch('write_protect', 1): |
| return True |
| ErrorDie('verify_switch_wp: failed') |
| |
| |
| @GFTCommand |
| def verify_switch_dev(): |
| """ Verifies if developer switch is disabled. """ |
| if VerifySwitch('developer_switch', 0): |
| return True |
| ErrorDie('verify_switch_dev: failed') |
| |
| |
| # Report Creation |
| |
| @GFTCommand |
| def create_report(): |
| """ Creates the report (with vpd, hwid, date, ...) for uploading. """ |
| # decide log format |
| if g_options.report_format == 'gz': |
| text_armed = False |
| elif g_options.report_format == 'base64': |
| text_armed = True |
| else: |
| ErrorDie('create_report: invalid format: %s' % g_options.report_format) |
| |
| tag = g_options.report_tag if g_options.report_tag else '' |
| |
| if g_cached_device_data is None: |
| # TODO(tammo): Refactor verify_hwid to extract the probing and |
| # cooking from the actual validation, so that we can generate |
| # reports for machines where the hwid validation fails. |
| ErrorDie('create_report: must run verify_hwid first') |
| |
| vpd_source = crosfw.LoadMainFirmware().path |
| report = gft_report.CreateReport(sys.argv, |
| system_details=g_cached_device_data, |
| verbose_log_path=g_options.log_path, |
| vpd_source=vpd_source, |
| tag=tag, |
| verbose=g_options.debug) |
| |
| data = gft_report.EncodeReport(report, text_armed=text_armed) |
| if not g_options.report_path: |
| g_options.report_path = os.path.join(tempfile.gettempdir(), |
| gft_report.GetReportName(data)) |
| |
| gft_common.WriteFile(g_options.report_path, data) |
| report_saved_message = 'Report saved in: %s' % g_options.report_path |
| # To help scripting, the message must be printed to console and logged. |
| print report_saved_message |
| gft_common.Log(report_saved_message) |
| return True |
| |
| |
| @GFTCommand |
| def upload_report(): |
| """ Uploads a report (by --create_report) for tracking factory process. """ |
| report_path = g_options.report_path |
| if not (report_path and os.path.exists(report_path)): |
| ErrorDie('upload_report: need an existing report file ' |
| '(--report_path or --create_report)') |
| if not g_options.upload_method: |
| ErrorDie('upload_report: need proper --upload_method.') |
| |
| # Always report in standard naming scheme. |
| report_blob = gft_common.ReadBinaryFile(report_path) |
| report_name = gft_report.GetReportName(report_blob) |
| local_file = os.path.join(tempfile.gettempdir(), report_name) |
| if local_file != report_path: |
| gft_common.WriteBinaryFile(local_file, report_blob) |
| if not gft_upload.Upload(local_file, g_options.upload_method): |
| ErrorDie('upload_report: failed to upload.') |
| return True |
| |
| |
| @GFTCommand |
| def wpfw(): |
| """ Enables and verifies firmware write protection status. """ |
| if g_options.debug_dryrun_wpfw: |
| ErrorMsg('wpfw is by-passed. This device CANNOT be qualified.') |
| return True |
| read_binary = gft_common.ReadBinaryFile |
| ec_fw = crosfw.LoadEcFirmware() |
| main_fw = crosfw.LoadMainFirmware() |
| target_set = [('bios', main_fw.path)] |
| if ec_fw.chip_id is not None: |
| target_set.insert(0, ('ec', ec_fw.path)) |
| assert target_set[0][0] == 'ec' |
| else: |
| VerboseMsg('wpfw: current platform does not have EC. Ignored.') |
| assert len(target_set) > 0 |
| for target_name, loader_api in target_set: |
| if not EnableWriteProtect(target_name, read_binary(loader_api())): |
| ErrorDie('wpfw: failed to enable firmware write protection.') |
| return True |
| |
| |
| @GFTCommand |
| def clear_gbb_flags(): |
| """ Clears GBB Header Flags for shipping devices. """ |
| script = os.path.join(os.path.split(sys.argv[0])[0], |
| 'gft_clear_gbb_flags.sh') |
| command = '%s' % script |
| try: |
| VerboseSystemCommand(command, prompt='clear_gbb_flags: ') |
| except gft_common.GFTError, e: |
| ErrorDie('clear_gbb_flags: FAILED.\n%s' % e.value) |
| return True |
| |
| |
| @GFTCommand |
| def prepare_wipe(): |
| """ Prepares the system for transitioning to release state in next reboot. """ |
| if g_options.debug_dryrun_prepare_wipe: |
| ErrorMsg('prepare_wipe is by-passed. This device CANNOT be qualified.') |
| return True |
| wipe_script = os.path.join(os.path.split(sys.argv[0])[0], |
| 'gft_prepare_wipe.sh') |
| if not os.path.exists(wipe_script): |
| ErrorDie('prepare_wipe: cannot find script to prepare for wiping.') |
| wipe_method_map = { |
| 'fast': 'fast', |
| 'secure': '', |
| } |
| method = g_options.wipe_method |
| if method not in wipe_method_map: |
| ErrorDie('prepare_wipe: unknown wipe method: %s' % method) |
| tag = wipe_method_map[method] |
| rootfs_device = GetReleaseRootPartitionPath() |
| command = 'FACTORY_WIPE_TAGS=%s %s %s' % (tag, wipe_script, rootfs_device) |
| try: |
| VerboseSystemCommand(command, prompt='prepare_wipe: ') |
| except gft_common.GFTError, e: |
| ErrorDie('prepare_wipe: FAILED.\n%s' % e.value) |
| return True |
| |
| |
| @GFTCommand |
| @GFTCommandDependency((verify_switch_dev, |
| verify_switch_wp, |
| verify_hwid, |
| verify_vpd, |
| verify_system_time, |
| verify_keys, |
| verify_rootfs)) |
| def verify(): |
| """ Verifies if whole factory process is ready for finalization. """ |
| # See the dependency list for the real commands to be executed. |
| return True |
| |
| |
| @GFTCommand |
| @GFTCommandDependency((clear_gbb_flags, |
| verify, |
| wpfw, |
| create_report, |
| upload_report, |
| prepare_wipe)) |
| def finalize(): |
| """ Finalizes all factory tests and transit system into release state. """ |
| # See the dependency list for the real commands to be executed. |
| return True |
| |
| |
| @GFTCommand |
| @GFTCommandDependency((clear_gbb_flags, |
| verify_hwid, |
| verify_vpd, |
| verify_system_time, |
| verify_keys, |
| verify_rootfs, |
| create_report, |
| upload_report, |
| prepare_wipe)) |
| def developer_finalize(): |
| """ Finalizes tests and leave the system in developer-friendly mode. |
| |
| This is similar to 'finalize' command but not enforcing commands that would |
| prevent developers to restart factory process, for example write protection |
| (wpfw) and non-developer mode (verify_switch_dev). |
| """ |
| # See the dependency list for the real commands to be executed. |
| return True |
| |
| |
| ######################################################################## |
| # console command line options |
| |
| def ConsoleParser(): |
| """ command line option parser """ |
| parser = optparse.OptionParser() |
| |
| # Message control |
| parser.add_option('-d', '--debug', action='store_true', |
| help='provide debug messages') |
| parser.add_option('-v', '--verbose', action='store_true', default=False, |
| help='provide verbose messages') |
| parser.add_option('--log_path', metavar='PATH', |
| default=gft_common.DEFAULT_CONSOLE_LOG_PATH, |
| help='use the given path for verbose logs.') |
| # Debugging |
| parser.add_option('--debug_dryrun_wpfw', action='store_true', |
| help='make --wpfw in dry-run mode (debugging only)') |
| parser.add_option('--debug_dryrun_prepare_wipe', action='store_true', |
| help='make --prepare_wipe in dry-run mode (debugging only)') |
| parser.add_option('--ignore_factory_initial', action='store_true', |
| help='ignore components in config_factory_initial.') |
| # Configuration |
| parser.add_option('--config', metavar='PATH', |
| help='path to the config file (TBD)') |
| # TODO(tammo): Consider removing db_path option once the hwid_tool |
| # has meaningful default path searching. |
| parser.add_option('--db_path', metavar='PATH', |
| default=hwid_tool.DEFAULT_HWID_DATA_PATH, |
| help='path to the HWID component database') |
| # TODO(tammo): Add proper support for per-command required positional args. |
| parser.add_option('--hwid', metavar='HWID', help='HWID string') |
| # Reports |
| parser.add_option('--report_path', metavar='PATH', |
| help='override the path for report file') |
| parser.add_option('--report_format', metavar='FORMAT', default='gz', |
| help='format of the generated report (see gft_report.py)') |
| parser.add_option('--report_tag', metavar='TAG', default='', |
| help='tag to be included in report (see gft_report.py), ' |
| 'current possible values may include rma, mp, pvt.') |
| parser.add_option('--upload_method', metavar='METHOD:PARAM', |
| help='assign the upload method (see gft_upload).') |
| # Wiping |
| parser.add_option('--wipe_method', metavar='METHOD', default='secure', |
| help='assign the wipe method (secure/fast).') |
| |
| for command in g_commands: |
| parser.add_option('--%s' % command.__name__, |
| action='store_true', |
| help=command.__doc__) |
| return parser |
| |
| |
| ######################################################################## |
| # main entry |
| |
| |
| @gft_common.GFTConsole |
| def _main(): |
| """ main entry """ |
| global g_options, g_args |
| parser = ConsoleParser() |
| (g_options, g_args) = parser.parse_args() |
| if g_args: |
| parser.error('Un-expected parameter(s): %s\n' % ' '.join(g_args)) |
| |
| if g_options.debug: |
| g_options.verbose = True |
| gft_common.SetDebugLevel(g_options.debug) |
| gft_common.SetVerboseLevel(g_options.verbose, g_options.log_path) |
| DebugMsg('options: ' + repr(g_options) + '\nargs: ' + repr(g_args), log=False) |
| |
| # execute commands by order. |
| executed_commands = 0 |
| return_value = 0 |
| for command in g_commands: |
| command_name = command.__name__ |
| if not getattr(g_options, command_name): |
| continue |
| DebugMsg('COMMAND: ' + command_name, log=False) |
| if not command(): |
| return_value = 1 |
| executed_commands = executed_commands + 1 |
| |
| if executed_commands == 0: |
| parser.print_help() |
| return return_value |
| |
| |
| if __name__ == '__main__': |
| sys.exit(_main()) |