| #!/usr/bin/python |
| # |
| # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ |
| gooftool: Google Factory Tool, providing all Google Required Test |
| functionality. |
| """ |
| |
| import copy |
| import glob |
| import optparse |
| import os |
| import re |
| import sys |
| |
| ######################################################################## |
| # cros/gft modules |
| |
| import flashrom_util |
| import gft_common |
| import gft_fwhash |
| import gft_hwcomp |
| import gft_report |
| import gft_upload |
| import gft_wpfw |
| import gpio |
| |
| from gft_common import WarningMsg, VerboseMsg, DebugMsg, ErrorMsg, ErrorDie |
| |
| #######################################################################r |
| # global variables and cached objects |
| |
| g_options = None |
| g_args = None |
| |
| # cache for instance of HardwareComponents to speed up |
| g_hwcomp = None |
| |
| # list of commands |
| g_commands = [] |
| |
| # partition number of factory and release rootfs. -1 for kernel. |
| FACTORY_PARTNO = 3 |
| RELEASE_PARTNO = 5 |
| |
| ######################################################################## |
| # factory utilities |
| |
| |
| def InitializeHardwareComponents(do_probe=True): |
| """ Initializes (and try to reuse the instance) a hardware components |
| detection object. |
| Returns the initialized (or cached) HardwareComponents object. |
| """ |
| |
| global g_hwcomp |
| if g_hwcomp: |
| if do_probe: |
| g_hwcomp.initialize() |
| return g_hwcomp |
| g_hwcomp = gft_hwcomp.HardwareComponents(verbose=g_options.verbose) |
| if do_probe: |
| g_hwcomp.initialize() |
| return g_hwcomp |
| |
| |
| def FindComponentsDatabases(db_path_pattern): |
| """ Finds files by pattern for component list databases. """ |
| files = glob.glob(db_path_pattern) |
| return files |
| |
| |
| def VerifySwitch(name, value): |
| """ Verifies if hardware switches are in correct states. """ |
| my_gpio = gpio.Gpio() |
| my_gpio.setup() |
| try: |
| DebugMsg('reading GPIO %s (expected %s)' % (name, value)) |
| if value != my_gpio.read(name): |
| VerboseMsg('VerifySwitch: %s is not in proper state.' % name) |
| return False |
| except: |
| ErrorDie('VerifySwitch: cannot read: ' + name) |
| return True |
| |
| |
| def MatchComponentsDatabases(db_path): |
| """Tries to match current system to known qualified component lists. |
| |
| Args: |
| db_path: path (or pattern) to the component list database files. |
| |
| Returns: |
| A list of matched component lists in dict form: { db_file: matched_list }. |
| """ |
| if not db_path: |
| ErrorDie('VerifyComponents: need --db_path.') |
| db_list = FindComponentsDatabases(db_path) |
| hwcomp = InitializeHardwareComponents() |
| if not hwcomp: |
| ErrorDie('VerifyComponents: failed to detect system configuration.') |
| matched_list = {} |
| for db in db_list: |
| VerboseMsg('MatchComponentsDatabases: Matching current system by %s' % db) |
| (matched, failure) = hwcomp.match_current_system(db) |
| if failure: |
| DebugMsg('Unmatched for %s:%s\n' % (db, hwcomp.pformat(failure))) |
| else: |
| matched_list[db] = matched |
| VerboseMsg('Matched: %s' % db) |
| return matched_list |
| |
| |
| def EnableWriteProtect(target, image): |
| """ Enables and verifies firmware write protection. """ |
| return gft_wpfw.EnableWriteProtect(target, image) |
| |
| |
| def WriteGBB(db_file): |
| """ Writes a prepared GBB data to system main firwmare. """ |
| flashrom = flashrom_util.flashrom_util(verbose_msg=VerboseMsg, |
| exception_type=gft_common.GFTError) |
| flashrom.select_target('bios') |
| image_file = gft_common.GetTemporaryFileName('wgbb') |
| if not flashrom.read_whole_to_file(image_file): |
| ErrorDie('WriteGBB: failed to read current firmware.') |
| DebugMsg('WriteGBB: updating with %s' % db_file) |
| if not gft_fwhash.UpdateGBB(image_file, db_file, in_place=True): |
| ErrorDie('WriteGBB: failed to update GBB.') |
| |
| # Latest flashrom supports minimal writing, so we don't need to care about |
| # managing partial writing sections here. Simply provide the whole image. |
| with open(image_file) as image: |
| image_data = image.read() |
| if not flashrom.write_whole(image_data): |
| ErrorDie('WriteGBB: failed to flash firmware to system.') |
| os.remove(image_file) |
| return True |
| |
| |
| def VerboseSystemCommand(command, prompt=None): |
| """ Invokes command and put stdout/stderr messages into verbose log. """ |
| VerboseMsg(" # Executing command: %s" % command) |
| (_, stdout, stderr) = gft_common.ShellExecution( |
| command, show_progress=g_options.verbose, progress_message=prompt) |
| message = '\n'.join((stdout.strip(), stderr.strip())).strip() |
| if message: |
| VerboseMsg(message) |
| |
| |
| def GFTCommand(f): |
| """ Decorator for gooftool base commands. """ |
| def wrapper(*args, **kw): |
| try: |
| DebugMsg('@Starting to execute command: %s.' % f.__name__) |
| result = f(*args, **kw) |
| DebugMsg('@Executed command: %s. Result: %s' % |
| (f.__name__, "SUCCESS" |
| if result is not False else "NOT SUCCESS")) |
| return result |
| except gft_common.GFTError, e: |
| DebugMsg('@Executed command: %s. Result: ERROR - %s' % |
| (f.__name__, e.value)) |
| raise |
| except: |
| DebugMsg('@Executed command: %s. Result: FAILED.' % f.__name__) |
| raise |
| wrapper.__name__ = f.__name__ |
| wrapper.__doc__ = re.sub(r'\s+', ' ', f.__doc__).strip() |
| g_commands.append(wrapper) |
| return wrapper |
| |
| |
| def GFTCommandDependency(command_list): |
| """Decorator to define the depenedncy of a gooftool command. |
| |
| Args |
| command_list: a list or tuple of command functions to be executed in order. |
| |
| Raises |
| GFTError if any of the commands in dependency return False. |
| """ |
| def meta_wrapper(f): |
| |
| def wrapper(*args, **kargs): |
| for command in command_list: |
| if not command(): |
| ErrorDie('%s: failed in sub-command: %s.' % |
| (f.__name__, command.__name__)) |
| return f(*args, **kargs) |
| |
| wrapper.__name__ = f.__name__ |
| wrapper.__doc__ = ('Runs: %s. %s' % |
| (', '.join(['--%s' % command.__name__ |
| for command in command_list]), |
| f.__doc__)) |
| return wrapper |
| return meta_wrapper |
| |
| ######################################################################## |
| # standard procedure |
| # (function names are used for self-reflection to command line parameters) |
| |
| # HWID, GBB, and hardware components validation |
| |
| @GFTCommand |
| def probe(): |
| """ Probes hardware components on current system and find match to given |
| component database files for HWID. |
| """ |
| if not g_options.db_path: |
| ErrorDie('probe: need --db_path.') |
| db_list = FindComponentsDatabases(g_options.db_path) |
| hwcomp = InitializeHardwareComponents(do_probe=True) |
| matched_hwids = 0 |
| matched = {} |
| |
| # Don't care about GBB elements when probing |
| ignored_cids = ['hash_ro_firmware', 'part_id_hwqual', 'version_rw_firmware'] |
| for db in db_list: |
| VerboseMsg('probe: Matching current system by %s' % db) |
| (matched, failure) = hwcomp.match_current_system(db, |
| ignored_cids=ignored_cids) |
| if not failure: |
| print 'Probed: %s' % db |
| matched_hwids = matched_hwids + 1 |
| else: |
| WarningMsg('Unmatched for %s:%s' % (db, hwcomp.pformat(failure))) |
| |
| # TODO(hungte) we may need to decide a best match during the search |
| WarningMsg('Current System:%s' % hwcomp.pformat(matched)) |
| return (True if matched_hwids > 0 else False) |
| |
| |
| @GFTCommand |
| def write_gbb(): |
| """ Writes system GBB data by assigned components database. """ |
| if not g_options.write_gbb: |
| ErrorDie('write_gbb: need a valid component list file.') |
| return WriteGBB(g_options.write_gbb) |
| |
| |
| @GFTCommand |
| def verify_hwid(): |
| """ Verifies if system HWID matches the qualified component list. """ |
| hwcomp = InitializeHardwareComponents() |
| matched_list = MatchComponentsDatabases(g_options.db_path) |
| matched = len(matched_list) |
| if matched > 0: |
| DebugMsg('Current System:%s\n' % hwcomp.pformat(matched_list.values()[0])) |
| if matched != 1: |
| ErrorDie('verify_hwid: failed to match exact one HWID (found %d).' % |
| matched) |
| matched_filename = matched_list.keys()[0] |
| matched_data = matched_list[matched_filename] |
| matched_hwid = matched_data['part_id_hwqual'][0] |
| print 'Verified: %s (%s)' % (matched_filename, matched_hwid) |
| return True |
| |
| |
| @GFTCommand |
| def verify_keys(): |
| """ Verifies if the keys in firmware and SSD are matched. """ |
| hwcomp = InitializeHardwareComponents(do_probe=False) |
| script = os.path.join(os.path.split(sys.argv[0])[0], 'gft_verify_keys.sh') |
| if not os.path.exists(script): |
| ErrorDie('verify_keys: cannot find script to verify.') |
| kernel_device = '/dev/%s' % (hwcomp.get_ssd_name(RELEASE_PARTNO - 1)) |
| command = '%s %s %s' % (script, kernel_device, hwcomp.load_main_firmware()) |
| try: |
| VerboseSystemCommand(command, prompt='verify_keys: ') |
| except gft_common.GFTError, e: |
| ErrorDie('verify_keys: FAILED.\n%s' % e.value) |
| return True |
| |
| |
| # Hardware Switches |
| |
| @GFTCommand |
| def verify_rootfs(): |
| """ Verifies if the rootfs on SSD is valid (by checking hash). """ |
| hwcomp = InitializeHardwareComponents(do_probe=False) |
| script = os.path.join(os.path.split(sys.argv[0])[0], 'gft_verify_rootfs.sh') |
| if not os.path.exists(script): |
| ErrorDie('verify_rootfs: cannot find script to verify.') |
| rootfs_device = '/dev/%s' % (hwcomp.get_ssd_name(RELEASE_PARTNO)) |
| command = '%s %s' % (script, rootfs_device) |
| try: |
| VerboseSystemCommand(command, prompt='verify_rootfs: ') |
| except gft_common.GFTError, e: |
| ErrorDie('verify_rootfs: FAILED.\n%s' % e.value) |
| return True |
| |
| |
| @GFTCommand |
| def verify_switch_wp(): |
| """ Verifies if hardware write protection switch is enabled. """ |
| if VerifySwitch('write_protect', 1): |
| return True |
| ErrorDie('verify_switch_wp: failed') |
| |
| |
| @GFTCommand |
| def verify_switch_dev(): |
| """ Verifies if developer switch is disabled. """ |
| if VerifySwitch('developer_switch', 0): |
| return True |
| ErrorDie('verify_switch_dev: failed') |
| |
| |
| # Report Creation |
| |
| @GFTCommand |
| def create_report(): |
| """ Creates the report (with vpd, hwid, date, ...) for uploading. """ |
| # decide log format |
| suffix = '.log' |
| if g_options.report_format == 'gz': |
| text_armed = False |
| suffix = suffix + '.gz' |
| elif g_options.report_format == 'base64': |
| text_armed = True |
| else: |
| ErrorDie('create_report: invalid format: %s' % g_options.report_format) |
| |
| # TODO(hungte) if matching is still too slow, we can try to reused previous |
| # cached results. |
| matched = MatchComponentsDatabases(g_options.db_path) |
| |
| if not len(matched) == 1: |
| ErrorDie('create_report: HWID is not singularly matched to database.') |
| vpd_source = InitializeHardwareComponents(do_probe=False).load_main_firmware() |
| report = gft_report.CreateReport(sys.argv, |
| matched.values()[0], |
| verbose_log_path=g_options.log_path, |
| vpd_source=vpd_source, |
| verbose=g_options.debug) |
| |
| # Printing this to log may double the log files... |
| if g_options.verbose: |
| sys.stdout.flush() |
| reduced = copy.deepcopy(report) |
| reduced['verbose_log'] = ['...(not listed, please check report)...'] |
| sys.stderr.write("Report Detail:\n%s\n" % gft_report.FormatReport(reduced)) |
| |
| data = gft_report.EncodeReport(report, text_armed=text_armed) |
| if not g_options.report_path: |
| # create one in temporary folder |
| g_options.report_path = gft_common.GetTemporaryFileName(prefix='gftrpt_', |
| suffix=suffix) |
| |
| gft_common.WriteFile(g_options.report_path, data) |
| report_saved_message = 'Report saved in: %s' % g_options.report_path |
| # To help scripting, the message must be printed to console and logged. |
| print report_saved_message |
| gft_common.Log(report_saved_message) |
| return True |
| |
| |
| @GFTCommand |
| def upload_report(): |
| """ Uploads a report (by --create_report) for tracking factory process. """ |
| report_path = g_options.report_path |
| if not (report_path and os.path.exists(report_path)): |
| ErrorDie('upload_report: need an existing report file ' |
| '(--report_path or --create_report)') |
| if not g_options.upload_method: |
| ErrorDie('upload_report: need proper --upload_method.') |
| if not gft_upload.Upload(report_path, g_options.upload_method): |
| ErrorDie('upload_report: failed to upload.') |
| return True |
| |
| |
| @GFTCommand |
| def wpfw(): |
| """ Enables and verifies firmware write protection status. """ |
| if g_options.debug_dryrun_wpfw: |
| ErrorMsg('wpfw is by-passed. This device CANNOT be qualified.') |
| return True |
| hwcomp = InitializeHardwareComponents(do_probe=False) |
| read_binary = gft_common.ReadBinaryFile |
| target_set = [('bios', hwcomp.load_main_firmware)] |
| if hwcomp.has_ec(): |
| target_set.insert(0, ('ec', hwcomp.load_ec_firmware)) |
| assert target_set[0][0] == 'ec' |
| else: |
| VerboseMsg('wpfw: current platform does not have EC. Ignored.') |
| assert len(target_set) > 0 |
| for target_name, loader_api in target_set: |
| if not EnableWriteProtect(target_name, read_binary(loader_api())): |
| ErrorDie('wpfw: failed to enable firmware write protection.') |
| return True |
| |
| |
| @GFTCommand |
| def prepare_wipe(): |
| """ Prepares the system for transitioning to release state in next reboot. """ |
| if g_options.debug_dryrun_prepare_wipe: |
| ErrorMsg('prepare_wipe is by-passed. This device CANNOT be qualified.') |
| return True |
| wipe_script = os.path.join(os.path.split(sys.argv[0])[0], |
| 'gft_prepare_wipe.sh') |
| if not os.path.exists(wipe_script): |
| ErrorDie('prepare_wipe: cannot find script to prepare for wiping.') |
| wipe_method_map = { |
| 'fast': 'fast', |
| 'secure': '', |
| } |
| method = g_options.wipe_method |
| hwcomp = InitializeHardwareComponents(do_probe=False) |
| if method not in wipe_method_map: |
| ErrorDie('prepare_wipe: unknown wipe method: %s' % method) |
| tag = wipe_method_map[method] |
| rootfs_device = '/dev/%s' % (hwcomp.get_ssd_name(RELEASE_PARTNO)) |
| command = 'FACTORY_WIPE_TAGS=%s %s %s' % (tag, wipe_script, rootfs_device) |
| try: |
| VerboseSystemCommand(command, prompt='prepare_wipe: ') |
| except gft_common.GFTError, e: |
| ErrorDie('prepare_wipe: FAILED.\n%s' % e.value) |
| return True |
| |
| |
| @GFTCommand |
| @GFTCommandDependency((verify_switch_dev, |
| verify_switch_wp, |
| verify_hwid, |
| verify_keys, |
| verify_rootfs)) |
| def verify(): |
| """ Verifies if whole factory process is ready for finalization. """ |
| # See the depenency list for the real commands to be executed. |
| return True |
| |
| |
| @GFTCommand |
| @GFTCommandDependency((verify, |
| wpfw, |
| create_report, |
| upload_report, |
| prepare_wipe)) |
| def finalize(): |
| """ Finalizes all factory tests and transit system into release state. """ |
| # See the depenency list for the real commands to be executed. |
| return True |
| |
| |
| ######################################################################## |
| # console command line options |
| |
| def ConsoleParser(): |
| """ command line option parser """ |
| parser = optparse.OptionParser() |
| |
| # Message control |
| parser.add_option('--debug', action='store_true', |
| help='provide debug messages') |
| parser.add_option('--verbose', action='store_true', default=False, |
| help='provide verbose messages') |
| parser.add_option('--log_path', metavar='PATH', |
| default=gft_common.DEFAULT_CONSOLE_LOG_PATH, |
| help='use the given path for verbose logs.') |
| # Debugging |
| parser.add_option('--debug_dryrun_wpfw', action='store_true', |
| help='make --wpfw in dry-run mode (debugging only)') |
| parser.add_option('--debug_dryrun_prepare_wipe', action='store_true', |
| help='make --prepare_wipe in dry-run mode (debugging only)') |
| # Configuration |
| parser.add_option('--config', metavar='PATH', |
| help='path to the config file (TBD)') |
| parser.add_option('--db_path', metavar='PATH', |
| help='path to the HWID component list databases') |
| # Reports |
| parser.add_option('--report_path', metavar='PATH', |
| help='override the path for report file') |
| parser.add_option('--report_format', metavar='FORMAT', default='gz', |
| help='format of the generated report (see gft_report.py)') |
| parser.add_option('--upload_method', metavar='METHOD:PARAM', |
| help='assign the upload method (see gft_upload).') |
| # Wiping |
| parser.add_option('--wipe_method', metavar='METHOD', default='secure', |
| help='assign the wipe method (secure/fast).') |
| |
| # Commands |
| commands_with_param = { |
| write_gbb: 'comp_db', |
| } |
| for command in g_commands: |
| if command in commands_with_param: |
| parser.add_option('--%s' % command.__name__, |
| metavar=commands_with_param[command], |
| help=command.__doc__) |
| else: |
| parser.add_option('--%s' % command.__name__, |
| action='store_true', |
| help=command.__doc__) |
| return parser |
| |
| |
| ######################################################################## |
| # main entry |
| |
| |
| @gft_common.GFTConsole |
| def _main(): |
| """ main entry """ |
| global g_options, g_args |
| parser = ConsoleParser() |
| (g_options, g_args) = parser.parse_args() |
| if g_args: |
| parser.error('Un-expected parameter(s): %s\n' % ' '.join(g_args)) |
| |
| if g_options.debug: |
| g_options.verbose = True |
| gft_common.SetDebugLevel(g_options.debug) |
| gft_common.SetVerboseLevel(g_options.verbose, g_options.log_path) |
| DebugMsg('options: ' + repr(g_options) + '\nargs: ' + repr(g_args), log=False) |
| |
| # execute commands by order. |
| executed_commands = 0 |
| return_value = 0 |
| for command in g_commands: |
| command_name = command.__name__ |
| if not getattr(g_options, command_name): |
| continue |
| DebugMsg('COMMAND: ' + command_name, log=False) |
| if not command(): |
| return_value = 1 |
| executed_commands = executed_commands + 1 |
| |
| if executed_commands == 0: |
| parser.print_help() |
| return return_value |
| |
| |
| if __name__ == '__main__': |
| sys.exit(_main()) |