blob: e124c9d40c05b12709c105e742741462a380bbc2 [file] [log] [blame]
# Copyright 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
from collections import namedtuple
from contextlib import contextmanager
from distutils.version import LooseVersion
import logging
import os
import re
import sys
import tempfile
import time
import yaml
import factory_common # pylint: disable=unused-import
from cros.factory.gooftool.bmpblk import unpack_bmpblock
from cros.factory.gooftool.common import Util
from cros.factory.gooftool import crosfw
from cros.factory.gooftool.probe import DeleteRwVpd
from cros.factory.gooftool.probe import DeleteRoVpd
from cros.factory.gooftool.probe import Probe
from cros.factory.gooftool.probe import ReadRoVpd
from cros.factory.gooftool.probe import ReadRwVpd
from cros.factory.gooftool.probe import UpdateRoVpd
from cros.factory.gooftool import wipe
from cros.factory.hwid.v2 import hwid_tool
from cros.factory.hwid.v3 import common as hwid3_common
from cros.factory.hwid.v3.database import Database
from cros.factory.test.l10n import regions
from cros.factory.test.rules import phase
from cros.factory.test.rules.privacy import FilterDict
from cros.factory.utils import file_utils
from cros.factory.utils import json_utils
from cros.factory.utils import service_utils
from cros.factory.utils.type_utils import Error
# A named tuple to store the probed component name and the error if any.
ProbedComponentResult = namedtuple('ProbedComponentResult',
['component_name', 'probed_string', 'error'])
# The mismatch result tuple.
Mismatch = namedtuple('Mismatch', ['expected', 'actual'])
class Gooftool(object):
"""A class to perform hardware probing and verification and to implement
Google required tests.
Properties:
db: The HWID DB. This is lazily loaded the first time it is used.
_db_creator: The function used to create the db object the first time
it is used.
"""
# TODO(andycheng): refactor all other functions in gooftool.py to this.
def __init__(self, probe=None, hwid_version=2,
hardware_db=None, component_db=None,
project=None, hwdb_path=None):
"""Constructor.
Args:
probe: The probe to use for detecting installed components. If not
specified, cros.factory.gooftool.probe.Probe is used.
hwid_version: The HWID version to operate on. Currently there are only two
options: 2 or 3.
hardware_db: The hardware db to use. If not specified, the one in
hwid_tool.DEFAULT_HWID_DATA_PATH is used.
component_db: The component db to use for both component names and
component classes lookup. If not specified,
hardware_db.component.db is used.
project: A string indicating which project-specific component database to
load. If not specified, the project name will be detected with
cros.factory.hwid.ProbeProject(). Used for HWID v3 only.
hwdb_path: The path to load the project-specific component database from.
If not specified, cros.factory.hwid.DEFAULT_HWID_DATA_PATH will be used.
Used for HWID v3 only.
"""
self._hwid_version = hwid_version
if hwid_version == 2:
self._hardware_db = (
hardware_db or
hwid_tool.HardwareDb(hwid_tool.DEFAULT_HWID_DATA_PATH))
self._db_creator = lambda: component_db or self._hardware_db.comp_db
elif hwid_version == 3:
self._project = project or hwid3_common.ProbeProject()
self._hwdb_path = hwdb_path or hwid3_common.DEFAULT_HWID_DATA_PATH
self._db_creator = lambda: Database.LoadFile(
os.path.join(self._hwdb_path, self._project.upper()))
else:
raise ValueError('Invalid HWID version: %r' % hwid_version)
self._probe = probe or Probe
self._util = Util()
self._crosfw = crosfw
self._read_ro_vpd = ReadRoVpd
self._read_rw_vpd = ReadRwVpd
self._delete_ro_vpd = DeleteRoVpd
self._delete_rw_vpd = DeleteRwVpd
self._update_ro_vpd = UpdateRoVpd
self._unpack_bmpblock = unpack_bmpblock
self._named_temporary_file = tempfile.NamedTemporaryFile
self._db = None
@property
def db(self):
"""Lazy loader for the HWID database."""
if not self._db:
self._db = self._db_creator()
# Hopefully not necessary, but just a safeguard to prevent
# accidentally loading the DB multiple times.
del self._db_creator
return self._db
def VerifyComponents(self, component_list):
"""Verifies the given component list against the component db to ensure
the installed components are correct.
Args:
component_list: A list of components to verify.
(e.g., ['camera', 'cpu'])
Returns:
A dict from component class to a list of one or more
ProbedComponentResult tuples.
{component class: [ProbedComponentResult(
component_name, # The component name if found in the db, else None.
probed_string, # The actual probed string. None if probing failed.
error)]} # The error message if there is one.
"""
probeable_classes = self.db.probeable_components.keys()
if not component_list:
raise ValueError('No component classes specified;\n' +
'Possible choices: %s' % probeable_classes)
unknown_class = [component_class for component_class in component_list
if component_class not in probeable_classes]
if unknown_class:
raise ValueError(('Invalid component classes specified: %s\n' +
'Possible choices: %s') %
(unknown_class, probeable_classes))
probe_results = self._probe(
target_comp_classes=component_list,
probe_volatile=False, probe_initial_config=False)
result = {}
for comp_class in sorted(component_list):
probe_vals = probe_results.found_probe_value_map.get(comp_class, None)
if probe_vals is not None:
if isinstance(probe_vals, str):
# Force cast probe_val to be a list so it is easier to process later
probe_vals = [probe_vals]
result_tuples = []
for val in probe_vals:
comp_name = self.db.result_name_map.get(val, None)
if comp_name is not None:
result_tuples.append(ProbedComponentResult(comp_name, val, None))
else:
result_tuples.append(ProbedComponentResult(None, val, (
'unsupported %r component found with probe result'
' %r (no matching name in the component DB)' %
(comp_class, val))))
result[comp_class] = result_tuples
else:
result[comp_class] = [ProbedComponentResult(None, None, (
'missing %r component' % comp_class))]
return result
def VerifyKeys(self, release_rootfs=None, firmware_path=None, _tmpexec=None):
"""Verify keys in firmware and SSD match.
The real partition needed is the kernel partition. However, in order to
share params with other commands, we use release_rootfs and calculate the
real kernel location from it.
Args:
release_rootfs: A string for release image rootfs path.
firmware_path: A string for firmware image file path.
_tmpexec: A function for overriding execution inside temp folder.
"""
if release_rootfs is None:
release_rootfs = self._util.GetReleaseRootPartitionPath()
kernel_dev = self._util.GetReleaseKernelPathFromRootPartition(
release_rootfs)
if firmware_path is None:
firmware_path = self._crosfw.LoadMainFirmware().GetFileName()
firmware_image = self._crosfw.LoadMainFirmware().GetFirmwareImage()
else:
with open(firmware_path, 'rb') as f:
firmware_image = self._crosfw.FirmwareImage(f.read())
with file_utils.TempDirectory() as tmpdir:
def _DefaultTmpExec(message, command, fail_message=None, regex=None):
"""Executes a command inside temp folder (tmpdir).
If regex is specified, return matched string from stdout.
"""
logging.debug(message)
result = self._util.shell('( cd %s; %s )' % (tmpdir, command))
if not result.success:
raise Error(fail_message or
('Failed to %s: %s' % (message, result.stderr)))
if regex:
matched = re.findall(regex, result.stdout)
if matched:
return matched[0]
_TmpExec = _tmpexec if _tmpexec else _DefaultTmpExec
# define key names
key_normal = 'kernel_subkey.vbpubk'
key_normal_a = 'kernel_subkey_a.vbpubk'
key_normal_b = 'kernel_subkey_b.vbpubk'
key_root = 'rootkey.vbpubk'
key_recovery = 'recovery_key.vbpubk'
blob_kern = 'kern.blob'
dir_devkeys = '/usr/share/vboot/devkeys'
logging.debug('dump kernel from %s', kernel_dev)
with open(kernel_dev, 'rb') as f:
# The kernel is usually 8M or 16M, but let's read more.
file_utils.WriteFile(os.path.join(tmpdir, blob_kern),
f.read(64 * 1048576))
logging.debug('extract firmware from %s', firmware_path)
for section in ('GBB', 'FW_MAIN_A', 'FW_MAIN_B', 'VBLOCK_A', 'VBLOCK_B'):
file_utils.WriteFile(os.path.join(tmpdir, section),
firmware_image.get_section(section))
_TmpExec('get keys from firmware GBB',
'futility gbb -g --rootkey %s --recoverykey %s GBB' %
(key_root, key_recovery))
rootkey_hash = _TmpExec(
'unpack rootkey', 'futility vbutil_key --unpack %s' % key_root,
regex=r'(?<=Key sha1sum:).*').strip()
_TmpExec('unpack recoverykey',
'futility vbutil_key --unpack %s' % key_recovery)
# Pre-scan for well-known problems.
if rootkey_hash == 'b11d74edd286c144e1135b49e7f0bc20cf041f10':
logging.warn('YOU ARE TRYING TO FINALIZE WITH DEV ROOTKEY.')
_TmpExec('verify firmware A with root key',
'futility vbutil_firmware --verify VBLOCK_A --signpubkey %s '
' --fv FW_MAIN_A --kernelkey %s' % (key_root, key_normal_a))
_TmpExec('verify firmware B with root key',
'futility vbutil_firmware --verify VBLOCK_B --signpubkey %s '
' --fv FW_MAIN_B --kernelkey %s' % (key_root, key_normal_b))
# Unpack keys and keyblocks
_TmpExec('unpack kernel keyblock',
'futility vbutil_keyblock --unpack %s' % blob_kern)
try:
for key in key_normal_a, key_normal_b:
_TmpExec('unpack %s' % key, 'vbutil_key --unpack %s' % key)
_TmpExec('verify kernel by %s' % key,
'futility vbutil_kernel --verify %s --signpubkey %s' %
(blob_kern, key))
except Error:
_TmpExec('check recovery key signed image',
'! futility vbutil_kernel --verify %s --signpubkey %s' %
(blob_kern, key_recovery),
'YOU ARE USING A RECOVERY KEY SIGNED IMAGE.')
for key in key_normal, key_recovery:
_TmpExec('check dev-signed image <%s>' % key,
'! futility vbutil_kernel --verify %s --signpubkey %s/%s' %
(blob_kern, dir_devkeys, key),
'YOU ARE FINALIZING WITH DEV-SIGNED IMAGE <%s>' %
key)
raise
logging.info('SUCCESS: Verification completed.')
def VerifySystemTime(self, release_rootfs=None, system_time=None,
rma_mode=False):
"""Verify system time is later than release filesystem creation time."""
if release_rootfs is None:
release_rootfs = self._util.GetReleaseRootPartitionPath()
if system_time is None:
system_time = time.time()
e2header = self._util.shell('dumpe2fs -h %s' % release_rootfs)
if not e2header.success:
raise Error('Failed to read file system: %s, %s' %
(release_rootfs, e2header.stderr))
matched = re.findall(r'^Filesystem created: *(.*)', e2header.stdout,
re.MULTILINE)
if not matched:
raise Error('Failed to find file system creation time: %s' %
release_rootfs)
created_time = time.mktime(time.strptime(matched[0]))
logging.debug('Comparing system time <%s> and filesystem time <%s>',
system_time, created_time)
if system_time < created_time:
if not rma_mode:
raise Error('System time (%s) earlier than file system (%s) creation '
'time (%s)' % (system_time, release_rootfs, created_time))
logging.warning('Set system time to file system creation time (%s)',
created_time)
self._util.shell('toybox date @%d' % int(created_time))
def VerifyRootFs(self, release_rootfs=None):
"""Verify rootfs on SSD is valid by checking hash."""
if release_rootfs is None:
release_rootfs = self._util.GetReleaseRootPartitionPath()
device = self._util.GetPartitionDevice(release_rootfs)
# TODO(hungte) Using chromeos_invoke_postinst here is leaving a window
# where unexpected reboot or test exit may cause the system to boot into
# the release image. Currently "cgpt" is very close to the last step of
# postinst so it may be OK, but we should seek for better method for this,
# for example adding a "--nochange_boot_partition" to chromeos-postinst.
try:
# Always rollback GPT changes.
curr_attrs = self._util.GetCgptAttributes(device)
self._util.InvokeChromeOSPostInstall(release_rootfs)
finally:
self._util.SetCgptAttributes(curr_attrs, device)
def VerifyTPM(self):
"""Verify TPM is cleared."""
expected_status = {
'enabled': '1',
'owned': '0'
}
tpm_root = '/sys/class/tpm/tpm0/device'
legacy_tpm_root = '/sys/class/misc/tpm0/device'
# TPM device path has been changed in kernel 3.18.
if not os.path.exists(tpm_root):
tpm_root = legacy_tpm_root
for key, value in expected_status.iteritems():
if open(os.path.join(tpm_root, key)).read().strip() != value:
raise Error('TPM is not cleared.')
def VerifyManagementEngineLocked(self):
"""Verify Management Engine is locked."""
mainfw = self._crosfw.LoadMainFirmware().GetFirmwareImage()
if not mainfw.has_section('SI_ME'):
logging.info('System does not have Management Engine.')
return True
# If ME is locked, it should contain only 0xFFs.
data = mainfw.get_section('SI_ME').strip(chr(0xFF))
if len(data) != 0:
raise Error('ME (ManagementEngine) firmware may be not locked.')
# TODO(hungte) In future we may add more checks using ifdtool. See
# crosbug.com/p/30283 for more information.
logging.info('Management Engine is locked.')
def VerifyVPD(self):
"""Verify that mandatory VPD values are set properly.
Returns:
A dictionary containing verified mandatory fields, for verification.
"""
ro_vpd = self._read_ro_vpd()
mandatory_fields = [
'serial_number', 'region',
]
deprecated_fields = [
# Region fields (deprecated by single 'region').
'initial_locale', 'initial_timezone', 'keyboard_layout',
# Platform and branding fields (deprecated by mosys command).
'customization_id', 'rlz_brand_code', 'model',
]
missing_keys = [key for key in mandatory_fields if key not in ro_vpd]
if missing_keys:
raise Error('Missing mandatory VPD values: %s' % ','.join(missing_keys))
bad_keys = [key for key in deprecated_fields if key in ro_vpd]
if bad_keys:
raise Error('Deprecated VPD values found: %s' % ','.join(bad_keys))
# Check known value contents.
region = ro_vpd['region']
if region not in regions.REGIONS:
raise ValueError('Unknown region: "%s".' % region)
return dict((k, v) for k, v in ro_vpd.iteritems() if k in mandatory_fields)
def VerifyReleaseChannel(self, enforced_channels=None):
"""Verify that release image channel is correct.
Args:
enforced_channels: a list of enforced release image channels, might
be different per board. It should be the subset or the same set
of the allowed release channels.
"""
release_channel = self._util.GetReleaseImageChannel()
allowed_channels = self._util.GetAllowedReleaseImageChannels()
if enforced_channels is None:
enforced_channels = allowed_channels
elif not all(channel in allowed_channels for channel in enforced_channels):
raise Error('Enforced channels are incorrect: %s. '
'Allowed channels are %s.' % (
enforced_channels, allowed_channels))
if not any(channel in release_channel for channel in enforced_channels):
raise Error('Release image channel is incorrect: %s. '
'Enforced channels are %s.' % (
release_channel, enforced_channels))
def VerifyCrosConfig(self):
"""Verify that entries in cros config make sense."""
if phase.GetPhase() >= phase.PVT_DOGFOOD:
# The value actually comes from "cros_config / brand-code", however,
# most scripts are still using "mosys platform brand" to get the value,
# so we also check the value by mosys command.
rlz = self._util.shell(['mosys', 'platform', 'brand']).stdout.strip()
if not rlz or rlz == 'ZZCR':
# this is incorrect...
raise Error('RLZ code "%s" is not allowed in PVT' % rlz)
model = self._util.shell(['mosys', 'platform', 'model']).stdout.strip()
if not model:
raise Error('Model name is empty')
def _ParseCrosConfig(config_path):
with open(config_path) as f:
obj = yaml.load(f)
for config in obj['chromeos']['configs']:
# According to https://crbug.com/1070692, 'platform-name' is not a part
# of identity info. We shouldn't check it.
config['identity'].pop('platform-name', None)
# Per b/169766857, whitelabel-tag is renamed to custom-label-tag.
# Normalize the dictionary keys to 'whitelabel-tag'.
if 'custom-label-tag' in config['identity']:
tag = config['identity'].pop('custom-label-tag', None)
config['identity']['whitelabel-tag'] = tag
fields = ['name', 'identity', 'brand-code']
configs = [
{
field: config[field] for field in fields
}
for config in obj['chromeos']['configs']
if config['name'] == model
]
configs = {
# set sort_keys=True to make the result stable.
json_utils.DumpStr(config, sort_keys=True) for config in configs
}
return configs
# Load config.yaml from release image (FSI) and test image, and compare the
# fields we cared about.
config_path = 'usr/share/chromeos-config/yaml/config.yaml'
test_configs = _ParseCrosConfig(os.path.join('/', config_path))
with sys_utils.MountPartition(
self._util.GetReleaseRootPartitionPath()) as root:
release_configs = _ParseCrosConfig(os.path.join(root, config_path))
if test_configs != release_configs:
error = ['Detect different chromeos-config between test image and FSI.']
error += ['Configs in test image:']
error += ['\t' + config for config in test_configs]
error += ['Configs in FSI:']
error += ['\t' + config for config in release_configs]
raise Error('\n'.join(error))
def ClearGBBFlags(self):
"""Zero out the GBB flags, in preparation for transition to release state.
No GBB flags are set in release/shipping state, but they are useful
for factory/development. See "futility gbb --flags" for details.
"""
result = self._util.shell('/usr/share/vboot/bin/set_gbb_flags.sh 0 2>&1')
if not result.success:
raise Error('Failed setting GBB flags: %s' % result.stdout)
def EnableReleasePartition(self, release_rootfs=None):
"""Enables a release image partition on the disk.
Args:
release_rootfs: path to the release rootfs device. If not specified,
the default (5th) partition will be used.
"""
if not release_rootfs:
release_rootfs = Util().GetReleaseRootPartitionPath()
wipe.EnableReleasePartition(release_rootfs)
def WipeInPlace(self, is_fast=None, shopfloor_url=None,
station_ip=None, station_port=None, wipe_finish_token=None):
"""Start transition to release state directly without reboot.
Args:
is_fast: Whether or not to apply fast wipe.
"""
wipe.WipeInTmpFs(is_fast, shopfloor_url,
station_ip, station_port, wipe_finish_token)
def WipeInit(self, wipe_args, shopfloor_url, state_dev,
release_rootfs, root_disk, old_root, station_ip, station_port,
wipe_finish_token):
"""Start wiping test image."""
wipe.WipeInit(wipe_args, shopfloor_url, state_dev,
release_rootfs, root_disk, old_root, station_ip, station_port,
wipe_finish_token)
def Probe(self, target_comp_classes, fast_fw_probe=False, probe_volatile=True,
probe_initial_config=True, probe_vpd=False):
"""Returns probed results for device components, hash, and initial config
data.
This method is essentially a wrapper for probe.Probe. Please refer to
probe.Probe for more detailed description.
Args:
target_comp_classes: Which component classes to probe for. A None value
implies all classes.
fast_fw_probe: Only probes for firmware versions.
probe_volatile: On False, do not probe for volatile data and
return None for the corresponding field.
probe_initial_config: On False, do not probe for initial_config
data and return None for the corresponding field.
probe_vpd: On True, include vpd data in the volatiles.
Returns:
cros.factory.hwdb.hwid_tool.ProbeResults object containing the probed
results.
"""
return self._probe(target_comp_classes=target_comp_classes,
fast_fw_probe=fast_fw_probe,
probe_volatile=probe_volatile,
probe_initial_config=probe_initial_config,
probe_vpd=probe_vpd)
def WriteHWID(self, hwid=None):
"""Writes specified HWID value into the system BB.
Args:
hwid: The HWID string to be written to the device.
"""
assert hwid
main_fw = self._crosfw.LoadMainFirmware()
fw_filename = main_fw.GetFileName(sections=['GBB'])
self._util.shell(
'futility gbb --set --hwid="%s" "%s"' % (hwid, fw_filename))
main_fw.Write(fw_filename)
def VerifyWPSwitch(self):
"""Verifies hardware write protection switch is enabled.
Raises:
Error when there is an error.
"""
if self._util.shell('crossystem wpsw_cur').stdout.strip() != '1':
raise Error('write protection switch is disabled')
def CheckDevSwitchForDisabling(self):
"""Checks if the developer switch is ready for disabling.
It checks the developer switch is either already disabled or is virtual so
it could be disabled programmatically.
Returns:
Whether or not the developer switch is virtual.
Raises:
Error, if the developer switch is not ready for disabling. i.e. it is not
disabled and it is not virtual.
"""
VBSD_HONOR_VIRT_DEV_SWITCH = 0x400
if (self._util.GetVBSharedDataFlags() & VBSD_HONOR_VIRT_DEV_SWITCH) != 0:
# Note when the system is using virtual developer switch. It could be
# disabled by "crossystem disable_dev_request=1", which is exactly what
# it does in prepare_wipe.sh.
return True
if self._util.GetCurrentDevSwitchPosition() == 0:
return False
raise Error('developer mode is not disabled')
def VerifySnBits(self):
# Add '-n' to dry run.
result = self._util.shell(['/usr/share/cros/cr50-set-sn-bits.sh', '-n'])
stdout = result.stdout.strip()
stderr = result.stderr.strip()
logging.info('status: %d', result.status)
logging.info('stdout: %s', stdout)
logging.info('stderr: %s', stderr)
if result.status != 0:
# Fail reason, either:
# - attested_device_id is not set
# - SN bits has been set differently
# cr50-set-sn-bits.sh prints errors on stdout instead of stderr.
raise Error(stdout)
if 'This device has been RMAed' in stdout:
logging.warning('SN Bits cannot be set anymore.')
return
if 'SN Bits have not been set yet' in stdout:
if 'BoardID is set' in stdout:
logging.warning('SN Bits cannot be set anymore.')
def GetBitmapLocales(self, image_file):
"""Get bitmap locales
Args:
image_file: Path to the image file where locales are searched.
Returns:
List of language codes supported by the image
"""
bitmap_locales = []
with self._named_temporary_file() as f:
self._util.shell('cbfstool %s extract -n locales -f %s -r COREBOOT' %
(image_file, f.name))
bitmap_locales = f.read()
# We reach here even if cbfstool command fails
if bitmap_locales:
return bitmap_locales.split('\n')
# Looks like image does not have locales file. Do the old-fashioned way
self._util.shell('futility gbb -g --bmpfv=%s %s' %
(f.name, image_file))
bmpblk_data = self._unpack_bmpblock(f.read())
bitmap_locales = bmpblk_data.get('locales', bitmap_locales)
return bitmap_locales
def SetFirmwareBitmapLocale(self):
"""Sets firmware bitmap locale to the default value stored in VPD.
This function ensures the default locale set in VPD is listed in the
supported locales in the firmware bitmap and sets loc_idx to the default
locale.
Returns:
A tuple of the default locale index and default locale. i.e.
(index, locale)
index: The index of the default locale in the bitmap.
locale: The 2-character-string format of the locale. e.g. "en", "zh"
Raises:
Error, if the initial locale is missing in VPD or the default locale is
not supported.
"""
image_file = self._crosfw.LoadMainFirmware().GetFileName()
ro_vpd = self._read_ro_vpd()
region = ro_vpd.get('region')
if region is None:
raise Error('Missing VPD "region".')
if region not in regions.REGIONS:
raise ValueError('Unknown region: "%s".' % region)
# Use the primary locale for the firmware bitmap.
locales = regions.REGIONS[region].language_codes
bitmap_locales = self.GetBitmapLocales(image_file)
# Some locale values are just a language code and others are a
# hyphen-separated language code and country code pair. We care
# only about the language code part for some cases. Note some old firmware
# bitmaps use underscore instead hyphen.
for locale in locales:
for language_code in [locale, locale.replace('-', '_'),
locale.partition('-')[0]]:
if language_code in bitmap_locales:
locale_index = bitmap_locales.index(language_code)
self._util.shell('crossystem loc_idx=%d' % locale_index)
return (locale_index, language_code)
raise Error('Firmware bitmaps do not contain support for the specified '
'initial locales: %r.\n'
'Current supported locales are %r.' % (
locales, bitmap_locales))
def GetSystemDetails(self):
"""Gets the system details including: platform name, crossystem,
modem status, EC write-protect status and bios write-protect status.
Returns:
A dict of system details with the following format:
{Name_of_the_detail: "result of the detail"}
Note that the outputs could be multi-line strings.
"""
# Note: Handle the shell commands with care since unit tests cannot
# ensure the correctness of commands executed in shell.
return {
'platform_name': self._util.shell('mosys platform name').stdout.strip(),
'crossystem': self._util.GetCrosSystem(),
'modem_status': self._util.shell('modem status').stdout.splitlines(),
'ec_wp_status': self._util.shell(
'flashrom -p ec --get-size 2>/dev/null && '
'flashrom -p ec --wp-status || '
'echo "EC is not available."').stdout,
'bios_wp_status': self._util.shell(
'flashrom -p host --wp-status').stdout}
def ClearFactoryVPDEntries(self):
"""Clears factory.* items in the RW VPD.
Returns:
A dict of the removed entries.
"""
ro_vpd = self._read_ro_vpd()
rw_vpd = self._read_rw_vpd()
ro_entries = dict((k, v) for k, v in ro_vpd.items() if '.' in k)
rw_entries = dict((k, v) for k, v in rw_vpd.items() if '.' in k)
logging.info('Removing VPD RW entries %s, RO entries %s',
FilterDict(rw_entries), ro_entries)
if rw_entries:
if not self._delete_rw_vpd(rw_entries):
raise Error('Failed to remove RW VPD entries: %s' % rw_entries.keys())
if ro_entries:
if not self._delete_ro_vpd(ro_entries):
raise Error('Failed to remove RO VPD entries: %s' % ro_entries.keys())
def GenerateStableDeviceSecret(self):
"""Generates a fresh stable device secret and stores it in RO VPD.
The stable device secret generated here is a high-entropy identifier that
is unique to each device. It gets generated at manufacturing time and reset
during RMA, but is stable under normal operation and notably also across
recovery image installation.
The stable device secret is suitable to obtain per-device stable hardware
identifiers and/or encryption keys. Please never use the secret directly,
but derive a secret specific for your context like this:
your_secret = HMAC_SHA256(stable_device_secret,
context_label\0optional_parameters)
The stable_device_secret acts as the HMAC key. context_label is a string
that uniquely identifies your usage context, which allows us to generate as
many per-context secrets as we need. The optional_parameters string can
contain additional information to further segregate your context, for
example if there is a need for multiple secrets.
The resulting secret(s) can be used freely, in particular they may be
shared with the environment or servers. Before you start generating and
using a secret in a new context, please always make sure to contact the
privacy and security teams to check whether your intended usage meets the
Chrome OS privacy and security guidelines.
MOST IMPORTANTLY: THE STABLE DEVICE SECRET MUST NOT LEAVE THE DEVICE AT ANY
TIME. DO NOT INCLUDE IT IN NETWORK COMMUNICATION, AND MAKE SURE IT DOES NOT
SHOW UP IN DATA THAT GETS SHARED POTENTIALLY (LOGS, ETC.). FAILURE TO DO SO
MAY BREAK THE SECURITY AND PRIVACY OF ALL OUR USERS. YOU HAVE BEEN WARNED.
"""
# Ensure that the release image is recent enough to handle the stable
# device secret key in VPD. Version 6887.0.0 is the first one that has the
# session_manager change to generate server-backed state keys for forced
# re-enrollment from the stable device secret.
release_image_version = LooseVersion(self._util.GetReleaseImageVersion())
if not release_image_version >= LooseVersion('6887.0.0'):
raise Error('Release image version can\'t handle stable device secret!')
# A context manager useful for wrapping code blocks that handle the device
# secret in an exception handler, so the secret value does not leak due to
# exception handling (for example, the value will be part of the VPD update
# command, which may get included in exceptions). Chances are that
# exceptions will prevent the secret value from getting written to VPD
# anyways, but better safe than sorry.
@contextmanager
def scrub_exceptions(operation):
try:
yield
except Exception:
# Re-raise an exception including type and stack trace for the original
# exception to facilitate error analysis. Don't include the exception
# value as it may contain the device secret.
(exc_type, _, exc_traceback) = sys.exc_info()
cause = '%s: %s' % (operation, exc_type)
raise Error, cause, exc_traceback
with scrub_exceptions('Error generating device secret'):
# Generate the stable device secret and write it to VPD. Turn off logging,
# so the generated secret doesn't leak to the logs.
secret = self._util.shell('tpm-manager get_random 32',
log=False).stdout.strip()
with scrub_exceptions('Error validating device secret'):
secret_bytes = secret.decode('hex')
if len(secret_bytes) != 32:
raise Error
with scrub_exceptions('Error writing device secret to VPD'):
if not self._update_ro_vpd({
'stable_device_secret_DO_NOT_SHARE': secret_bytes.encode('hex')}):
raise Error
def Cr50SetSnBits(self):
"""Set the serial number bits on the Cr50 chip.
Serial number bits along with the board id allow a device to attest to its
identity and participate in Chrome OS Zero-Touch.
A script located at /usr/share/cros/cr50-set-sn-bits.sh helps us
to set the proper serial number bits in the Cr50 chip.
"""
script_path = '/usr/share/cros/cr50-set-sn-bits.sh'
vpd_key = 'attested_device_id'
has_vpd_key = self._read_ro_vpd().get(vpd_key) is not None
# The script exists, Zero-Touch is enabled.
if not has_vpd_key:
# TODO(stimim): What if Zero-Touch is enabled on a program (e.g. hatch),
# but not expected for a project (e.g. kohaku).
raise Error('Zero-Touch is enabled, but %r is not set' % vpd_key)
if phase.GetPhase() >= phase.PVT_DOGFOOD:
arg_phase = 'pvt'
else:
arg_phase = 'dev'
result = self._util.shell([script_path])
if result.status == 0:
logging.info('Successfully set serial number bits on Cr50.')
elif result.status == 2:
logging.error('Serial number bits have already been set on Cr50!')
elif result.status == 3:
error_msg = 'Serial number bits have been set DIFFERENTLY on Cr50!'
if arg_phase == 'pvt':
raise Error(error_msg)
else:
logging.error(error_msg)
else: # General errors.
raise Error('Failed to set serial number bits on Cr50. '
'(args=%s)' % arg_phase)
def Cr50SetBoardId(self):
"""Set the board id and flags on the Cr50 chip.
The Cr50 image need to be lock down for a certain subset of devices for
security reason. To achieve this, we need to tell the Cr50 which board
it is running on, and which phase is it, during the factory flow.
A script located at /usr/share/cros/cr50-set-board-id.sh helps us
to set the board id and phase to the Cr50 ship.
To the detail design of the lock-down mechanism, please refer to
go/cr50-boardid-lock for more details.
"""
script_path = '/usr/share/cros/cr50-set-board-id.sh'
disable_services = ['trunksd']
if not os.path.exists(script_path):
logging.warn('The Cr50 script is not found, there should be no '
'Cr50 on this device.')
return
if phase.GetPhase() >= phase.PVT_DOGFOOD:
arg_phase = 'pvt'
else:
arg_phase = 'dev'
service_mgr = service_utils.ServiceManager()
try:
service_mgr.SetupServices(disable_services=disable_services)
result = self._util.shell([script_path, arg_phase])
if result.status == 0:
logging.info('Successfully set board ID on Cr50 with phase %s.',
arg_phase)
elif result.status == 2:
logging.error('Board ID has already been set on Cr50!')
elif result.status == 3:
error_msg = 'Board ID and/or flag has been set DIFFERENTLY on Cr50!'
if arg_phase == 'pvt':
raise Error(error_msg)
else:
logging.error(error_msg)
else: # General errors.
raise Error('Failed to set board ID and flag on Cr50. '
'(args=%s)' % arg_phase)
except Exception:
logging.exception('Failed to set Cr50 Board ID.')
raise
finally:
# Restart stopped service even if something went wrong.
service_mgr.RestoreServices()
def Cr50WriteFlashInfo(self, enable_zero_touch=False, rma_mode=False):
"""Write device info into cr50 flash."""
if not rma_mode and enable_zero_touch:
self.Cr50SetSnBits()
self.Cr50SetBoardId()
def Cr50DisableFactoryMode(self):
"""Disable Cr50 Factory mode.
Cr50 factory mode might be enabled in the factory and RMA center in order to
open ccd capabilities. Before finalizing the DUT, factory mode MUST be
disabled.
"""
def _GetCr50Version():
cmd = ['gsctool', '-a', '-f']
return re.search(r'^RW\s*(\d+\.\d+\.\d+)',
self._util.shell(cmd).stdout,
re.MULTILINE).group(1)
def _IsCCDInfoMandatory():
cr50_verion = _GetCr50Version()
# If second number is odd in version then it is prod version.
is_prod = int(cr50_verion.split('.')[1]) % 2
res = True
if is_prod and LooseVersion(cr50_verion) < LooseVersion('0.3.9'):
res = False
elif not is_prod and LooseVersion(cr50_verion) < LooseVersion('0.4.5'):
res = False
return res
gsctool_path = '/usr/sbin/gsctool'
if not os.path.exists(gsctool_path):
raise Error('gsctool is not available in path - %s.' % gsctool_path)
factory_mode_disabled = False
cmd = ['gsctool', '-a', '-F', 'disable']
result = self._util.shell(cmd)
if result.success:
factory_mode_disabled = True
if not _IsCCDInfoMandatory():
logging.warn('Command of disabling factory mode %s and can not get CCD '
'info so there is no way to make sure factory mode status. '
'cr50 version RW %s',
'succeeds' if factory_mode_disabled else 'fails',
_GetCr50Version())
return
ccd_info_cmd = ['gsctool', '-a', '-I']
result = self._util.shell(ccd_info_cmd)
if not result.success:
raise Error('Getting ccd info fails in cr50 RW %s' % _GetCr50Version())
info = result.stdout
# The pattern of output is:
# State: Locked
# Password: None
# Flags: 000000
# Capabilities, current and default:
# ...
# CCD caps bitmap: 0x1ffff
#
# TODO(b/117200472) The current way to query factory mode is done by
# checking CCD caps bitmap but this value will be changed if new CCD
# capability is introduced. For example, bitpmap becomes 0x7ffff started
# from 0.4.10. The long term plan is to ask gsctool/cr50 to report factory
# mode status directly for short term plan 0x?ffff would be checked.
if re.search('^CCD caps bitmap: 0x[0-9a-z]ffff$', info, re.MULTILINE):
raise Error('Failed to disable Cr50 factory mode. CCD info:\n%s' % info)