blob: 6a9e51e631d243948547fc0439637933f30ead79 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Probing routines for hardware and firmware identification.
There are three types of probe functions: hardware components, hash
values, and initial_config.
Probe functions must return the target identification string if the
probe was successful or None if no appropriate data was available.
Probe functions may also raise the Error exception to indicate that a
survivable error occurred, in which case the error will be reported
and a None probe result assumed.
"""
import hashlib
import logging
import os
import re
import sys
from array import array
from glob import glob
from inspect import getargspec
from tempfile import NamedTemporaryFile
import edid
import crosfw
import vblock
sys.path.append('/usr/local/lib/flimflam/test')
import flimflam
from common import CompactStr, Error, Obj, RunShellCmd
# TODO(tammo): Some tests look for multiple components, some tests
# throw away all but the first, and some just look for one. All tests
# should return a list of results, with the empty list indicating no
# components were found.
# TODO(tammo): Get rid of trial-and-error detection. If there are
# multiple different ways to perform detection, we should run them all
# and collate the results. Different code paths on different systems
# leads to bitrot and fragility.
# Load-time decorator-populated { class : probe function } tables.
_COMPONENT_PROBE_MAP = {}
_HASH_PROBE_MAP = {}
_INITIAL_CONFIG_PROBE_MAP = {}
# Load-time decorator-populated { key : function } table.
_COMMON_DATA_PROVIDER_MAP = {
'arch': None, # Always calculated by Probe().
'component_registry': None # Provided as arg to Probe() itself.
}
# Load-time decorator-populated { function : required common data } table.
_SHARED_DATA_REQS_MAP = {}
def _LoadKernelModule(name):
"""Ensure kernel module is loaded. If not already loaded, do the load."""
# TODO(tammo): Maybe lift into shared data for performance reasons.
loaded = RunShellCmd('lsmod | grep -q %s' % name).success
if not loaded:
loaded = RunShellCmd('modprobe %s' % name).success
if not loaded:
raise Error('Cannot load kernel module: %s' % name)
def _ReadSysfsFields(base_path, field_list, optional_field_list=[]):
"""Return dict of {field_name: field_value} corresponding to sysfs contents.
Args:
base_path: sysfs directory which each field should be a file within.
field_list: Required fields ; function returns None if fields are missing.
optional_field_list: Fields that are included if the corresponding
files exist.
Returns:
Dict of field names and values, or None if required fields are not
all present.
"""
all_fields_list = field_list + optional_field_list
path_list = [os.path.join(base_path, field) for field in all_fields_list]
data = dict((field, open(path).read().strip())
for field, path in zip(all_fields_list, path_list)
if os.path.exists(path))
result = [data[field] for field in all_fields_list if field in data]
return result if set(data) >= set(field_list) else None
def _ReadSysfsPciFields(path):
"""Returns string containing PCI 'vendor:device' tuple."""
# TODO(hungte): Maybe add PCI 'rev' field.
field_data = _ReadSysfsFields(path, ['vendor', 'device'])
if field_data is None:
return None
vendor, device = field_data
return '%s:%s' % (vendor.replace('0x', ''), device.replace('0x', ''))
def _ReadSysfsUsbFields(path):
"""Returns string containing at least the USB 'idVendor:idProduct' tuple.
Args:
path: Path used to search for USB sysfs data. First all symlinks
are resolved, to the the 'real' path. Then path terms are
iteratively removed from the right hand side until the remaining
path looks to contain the relevent data fields.
Returns:
A string with the above tuple if a sutable directory with
containing the field data can be found. This string will also
contain space-separated optional field data if those are
available. If no directory with the required fields are found,
returns None.
"""
path = os.path.realpath(path)
while path.find('/usb') > 0:
if os.path.exists(os.path.join(path, 'idProduct')):
break
path = os.path.split(path)[0]
field_data = _ReadSysfsFields(path, ['idVendor', 'idProduct'],
['manufacturer', 'product', 'bcdDevice'])
if field_data is None:
return None
result = '%s:%s' % (field_data[0].replace('0x', ''),
field_data[1].replace('0x', ''))
result += (' ' + ' '.join(field_data[2:])) if field_data[2:] else ''
return result
def _ReadSysfsDeviceId(path):
"""Returns sysfs PCI or USB device identification string."""
return (_ReadSysfsPciFields(path) or
_ReadSysfsUsbFields(path) or
None)
def _ReadSysfsNodeId(path):
"""Returns sysfs node identification string.
A more generic wrapper around _ReadSysfsDeviceId which supports
cases where only a 'name' file exists. Basically it tries to read
the DeviceID data if present, but otherwise falls back to just
reading the name file data.
"""
name_path = os.path.join(path, 'name')
return (_ReadSysfsDeviceId(os.path.join(path, 'device')) or
(os.path.exists(name_path) and
open(name_path).read().strip()) or
None)
class _FlimflamDevices(object):
"""Wrapper around flimflam (connection manager) information.
This object is a wrapper around the data from the flimflam module,
providing dbus format post processing.
Wrapped data is a list of Objs corresponding to devices detected by
flimflam. Each has devtype (flimflam type classification) and path
(location of related data in sysfs) fields. For cellular devices,
there is also an attributes field which contains a dict of
attribute:value items.
"""
def __init__(self):
def ProcessDevice(device):
properties = device.GetProperties()
get_prop = lambda p: flimflam.convert_dbus_value(properties[p])
result = Obj(
devtype=get_prop('Type'),
path='/sys/class/net/%s/device' % get_prop('Interface'))
if result.devtype == 'cellular':
result.attributes = dict(
(key, get_prop('Cellular.%s' % key))
for key in ['Carrier', 'FirmwareRevision', 'HardwareRevision',
'ModelID', 'Manufacturer']
if ('Cellular.%s' % key) in properties)
return result
self.dev_list = [ProcessDevice(device) for device in
flimflam.FlimFlam().GetObjectList('Device')]
def GetDevices(self, devtype):
"""Return device Obj list for devices with the specified type."""
return [dev for dev in self.dev_list if dev.devtype == devtype]
def ReadSysfsDeviceIds(self, devtype):
"""Return _ReadSysfsDeviceId result for each device of specified type."""
ids = [_ReadSysfsDeviceId(dev.path) for dev in self.GetDevices(devtype)]
return ' ; '.join(ids) if ids else None
def _RegisterSharedDataRequirements(fun):
args = getargspec(fun)[0]
for arg in args:
assert arg in _COMMON_DATA_PROVIDER_MAP, \
'No provider for required common data %s' % repr(arg)
_SHARED_DATA_REQS_MAP[fun] = args
def _ProvidesSharedData(data_name):
def Decorate(f):
assert data_name not in _COMMON_DATA_PROVIDER_MAP, \
'Multiple functions providing %s common data.' % repr(data_name)
_COMMON_DATA_PROVIDER_MAP[data_name] = f
_RegisterSharedDataRequirements(f)
return f
return Decorate
@_ProvidesSharedData('ec_fw')
def _LoadEcFirmware():
return crosfw.LoadEcFirmware()
@_ProvidesSharedData('main_fw')
def _LoadMainFirmware():
return crosfw.LoadMainFirmware()
@_ProvidesSharedData('touchpad')
def _LoadTouchpadData():
"""Return Obj with hw_ident and fw_ident string fields."""
def Synaptics():
detect_program = '/opt/Synaptics/bin/syndetect'
if not os.path.exists(detect_program):
return None
lock_check = RunShellCmd('lsof /dev/serio_raw0 | grep -q "^X"')
if lock_check.success and not os.getenv('DISPLAY'):
logging.error('Synaptics touchpad detection with X in the '
'foreground requires DISPLAY and XAUTHORITY '
'to be set properly.')
return None
result = RunShellCmd(detect_program)
if not result.success:
return None
properties = dict(map(str.strip, line.split('=', 1))
for line in result.stdout.splitlines() if '=' in line)
model = properties.get('Model String', 'Unknown Synaptics')
# Delete the " on xxx Port" substring, as we do not care about the port.
model = re.sub(' on [^ ]* [Pp]ort$', '', model)
firmware = properties.get('Firmware ID', None)
return Obj(hw_ident=model, fw_ident=firmware)
def Cypress():
for node in glob('/sys/class/input/mouse[0-9]*/device/device'):
model_path_list = [os.path.join(node, field) for field in
['product_id', 'hardware_version', 'protocol_version']]
firmware_path = os.path.join(node, 'firmware_version')
if not all(os.path.exists(path) for path in
model_path_list + [firmware_path]):
continue
return Obj(
hw_ident=CompactStr([open(path).read().strip()
for path in model_path_list]),
fw_ident=CompactStr(open(firmware_path).read().strip()))
return None
def Generic():
# TODO(hungte) add more information from id/*
# format: N: Name="???_trackpad"
input_file = '/proc/bus/input/devices'
cmd = 'grep -iE "^N.*(touch *pad|track *pad)" %s' % input_file
info = RunShellCmd(cmd).stdout.splitlines()
info = [re.sub('^[^"]*"(.*)"$', r'\1', device) for device in info]
return Obj(hw_ident=(', '.join(info) if info else None), fw_ident=None)
result_gen = (vendor_fun() for vendor_fun in [Cypress, Synaptics, Generic])
return next((x for x in result_gen if x is not None),
Obj(hw_ident=None, fw_ident=None))
@_ProvidesSharedData('flimflam')
def _LoadFlimflam():
"""Function wrapper to allow decoration of the class contructor."""
return _FlimflamDevices()
def _ComponentProbe(comp_class, *arch_targets):
"""Decorator that populates _COMPONENT_PROBE_MAP.
There can only be one probe function for each arch for each
comp_class. If no arch_targets are specified, the probe is assumed
to be general and apply for all arch values.
Args:
comp_class: Target component class for which the generator
produces results.
arch_targets: List of arch strings for which the probe is relevant.
"""
def Decorate(f):
if not arch_targets:
assert comp_class not in _COMPONENT_PROBE_MAP, \
'Multiple generic component probe functions for %s' % repr(comp_class)
_COMPONENT_PROBE_MAP[comp_class] = f
else:
arch_map = _COMPONENT_PROBE_MAP.setdefault(comp_class, {})
assert set(arch_targets).isdisjoint(set(arch_map)), \
'Overlapping target architectures for %s probe function' % comp_class
for arch in arch_targets:
arch_map[arch] = f
_RegisterSharedDataRequirements(f)
return f
return Decorate
@_ComponentProbe('audio_codec')
def _ProbeAudioCodec():
"""Looks for codec strings in /proc/asound then at PCM details."""
grep_result = RunShellCmd('grep -R "Codec:" /proc/asound/*')
match_list = [re.findall(r'.*Codec:(.*)', line)
for line in grep_result.stdout.splitlines()]
result_set = set(CompactStr(match) for match in match_list if match)
if result_set:
return ' ; '.join(result_set)
# Formatted '00-00: WM??? PCM wm???-hifi-0: ...'
pcm_data = open('/proc/asound/pcm').read().strip().split(' ')
if len(pcm_data) > 2:
return CompactStr(pcm_data[1])
return None
@_ComponentProbe('battery')
def _ProbeBattery():
"""Compose data from sysfs."""
node_path_list = glob('/sys/class/power_supply/*')
type_data_list = [_ReadSysfsFields(node_path, ['type'])[0]
for node_path in node_path_list]
battery_field_list = ['manufacturer', 'model_name', 'technology',
'charge_full_design']
battery_data_list = [_ReadSysfsFields(node_path, battery_field_list)
for node_path, type_data
in zip(node_path_list, type_data_list)
if type_data == 'Battery']
results = [CompactStr(x) for x in battery_data_list]
return ' ; '.join(results) if results else None
@_ComponentProbe('bluetooth')
def _ProbeBluetooth():
return _ReadSysfsDeviceId('/sys/class/bluetooth/hci0/device')
@_ComponentProbe('camera')
def _ProbeCamera():
# TODO(tammo): What is happening here? Arch-specific stuff? Doc string...
# TODO(tammo/sheckylin): Try to replace the code below with OpenCV calls.
info = []
camera_node = '/sys/class/video4linux/video0'
camera_data = _ReadSysfsNodeId(camera_node)
if camera_data:
info.append(camera_data)
# For SOC cameras
camera_data = _ReadSysfsFields(camera_node, ['device/control/name'])
if camera_data:
info.append(camera_data)
# Try video4linux2 (v4l2) interface.
# See /usr/include/linux/videodev2.h for definition of these consts.
# 'ident' values are defined in include/media/v4l2-chip-ident.h
VIDIOC_DBG_G_CHIP_IDENT = 0xc02c5651
V4L2_DBG_CHIP_IDENT_SIZE = 11
V4L2_INDEX_REVISION = V4L2_DBG_CHIP_IDENT_SIZE - 1
V4L2_INDEX_IDENT = V4L2_INDEX_REVISION - 1
V4L2_VALID_IDENT = 3 # V4L2_IDENT_UNKNOWN + 1
try:
with open('/dev/video0', 'r+') as f:
buf = array('i', [0] * V4L2_DBG_CHIP_IDENT_SIZE)
ioctl(f.fileno(), VIDIOC_DBG_G_CHIP_IDENT, buf, 1)
v4l2_ident = buf[V4L2_INDEX_IDENT]
if v4l2_ident >= V4L2_VALID_IDENT:
info.append('V4L2:%04x %04x' % (v4l2_ident, buf[V4L2_INDEX_REVISION]))
except:
pass
return CompactStr(info) if info else None
@_ComponentProbe('cellular')
def _ProbeCellular(flimflam):
return flimflam.ReadSysfsDeviceIds('cellular')
@_ComponentProbe('display_converter')
def _ProbeDisplayConverter():
"""Try brand-specific probes, return the first viable result."""
def ProbeChrontel():
"""Search style borrowed from the /etc/init/chrontel.conf behavior."""
_LoadKernelModule('i2c-dev')
_LoadKernelModule('i2c-i801')
dev_chrontel = '/dev/i2c-chrontel'
if not os.path.exists(dev_chrontel):
for dev_path in glob('/sys/class/i2c-adapter/*'):
adapter_name = open(os.path.join(dev_path, 'name')).read().strip()
if adapter_name.startswith('SMBus I801 adapter'):
dev_chrontel = os.path.basename(dev_path)
break
cmd = 'ch7036_monitor -d %s -p' % dev_chrontel
if os.path.exists(dev_chrontel) and RunShellCmd(cmd).success:
return 'ch7036'
return None
part_id_gen = (probe_fun() for probe_fun in [ProbeChrontel])
return next((x for x in part_id_gen if x is not None), None)
# TODO(tammo): Either remove this probe or make it work better.
# @_ComponentProbe('cardreader')
def _ProbeCardreader(component_registry):
"""Look for white listed card readers in system logs.
A cardreader is generally powered off until cards are inserted.
Therefore, instead of directly checking, we compare log data against
a white list of known devices.
This avoids needing to insert a card just for probing purposes, but
does still require that a card was inserted at some point prior to
probing. NOTE: The overhead of scanning the logs can be
significant, and is naturally undesirable.
"""
pattern = r'New USB device found, idVendor=.*, idProduct=.*'
grep_result = RunShellCmd('grep -s %s /var/log/messages*' % repr(pattern))
cardreader_whitelist = component_registry['cardreader'].values()
match_list = [re.findall(r'idVendor=(.*), idProduct=(.*)', line)
for line in grep_result.stdout.splitlines()]
device_str_list = ['%s:%s' % match[0] for match in match_list if match]
found_cardreader_set = set(device for device in device_str_list
if device in cardreader_whitelist)
return (' '.join(found_cardreader_set) if found_cardreader_set
else None)
@_ComponentProbe('chipset', 'x86')
def _ProbeChipsetX86():
"""On x86, host bridge is always the first PCI device."""
return _ReadSysfsDeviceId('/sys/bus/pci/devices/0000:00:00.0')
@_ComponentProbe('chipset', 'arm')
def _ProbeChipsetArm():
"""On ARM SOC-based systems, use first compatible list in device-tree."""
# Format: manufacturer,model [NUL] compat-manufacturer,model [NUL] ...
fdt_compatible_file = '/proc/device-tree/compatible'
if not os.path.exists(fdt_compatible_file):
return None
compatible_list = open(fdt_compatible_file).read().strip()
return CompactStr(compatible_list.strip(chr(0)).split(chr(0)))
@_ComponentProbe('cpu', 'x86')
def _ProbeCpuX86():
"""Reformat /proc/cpuinfo data."""
# For platforms like x86, it provides names for each core.
# Sample output for dual-core:
# model name : Intel(R) Atom(TM) CPU ???
# model name : Intel(R) Atom(TM) CPU ???
cmd = r'sed -nr "s/^model name\s*: (.*)/\1/p" /proc/cpuinfo'
stdout = RunShellCmd(cmd).stdout.splitlines()
return CompactStr(stdout[0] + ' [%d cores]' % len(stdout))
@_ComponentProbe('cpu', 'arm')
def _ProbeCpuArm():
"""Reformat /proc/cpuinfo data."""
# For platforms like arm, it gives the name only in 'Processor'
# and then a numeric ID for each cores 'processor', so delta is 1.
# Sample output for dual-core:
# Processor : ARM???
# processor : 0
# processor : 1
cmd = r'sed -nr "s/^[Pp]rocessor\s*: (.*)/\1/p" /proc/cpuinfo'
stdout = RunShellCmd(cmd).stdout.splitlines()
return CompactStr(stdout[0] + ' [%d cores]' % len(stdout) - 1)
@_ComponentProbe('display_panel')
def _ProbeDisplayPanel():
"""Combine all available edid data, from sysfs and directly from the i2c."""
edid_set = set(edid.Parse(open(path).read())
for path in glob('/sys/class/drm/*LVDS*/edid'))
_LoadKernelModule('i2c_dev')
edid_set |= set(edid.LoadFromI2c(path)
for path in sorted(glob('/dev/i2c-?')))
edid_set -= set([None])
return ' ; '.join(sorted(edid_set)) if edid_set else None
@_ComponentProbe('dram', 'x86')
def _ProbeDramX86():
"""Combine mosys memory timing and geometry information."""
# TODO(tammo): Document why mosys cannot load i2c_dev itself.
_LoadKernelModule('i2c_dev')
time_data = RunShellCmd('mosys -k memory spd print timings').stdout
size_data = RunShellCmd('mosys -k memory spd print geometry').stdout
times = dict(re.findall('dimm="([^"]*)".*speeds="([^"]*)"', time_data))
sizes = dict(re.findall('dimm="([^"]*)".*size_mb="([^"]*)"', size_data))
return CompactStr(['%s|%s|%s' % (i, sizes[i], times[i].replace(' ', ''))
for i in sorted(times)])
@_ComponentProbe('dram', 'arm')
def _ProbeDramArm():
"""Memory is not directly probable, so use kernel cmdline info."""
# TODO(tammo): Request that mosys provide this info (by any means).
cmdline = open('/proc/cmdline').read().strip()
# Format: *mem=384M@0M (size@address)
return CompactStr(re.findall(r'\s\w*mem=(\d+M@\d+M)', cmdline))
@_ComponentProbe('ec_flash_chip')
def _ProbeEcFlashChip(ec_fw):
return ec_fw.chip_id
@_ComponentProbe('embedded_controller')
def _ProbeEmbeddedController():
"""Reformat mosys output."""
# Example mosys command output:
# vendor="VENDOR" name="CHIPNAME" fw_version="ECFWVER"
ecinfo = re.findall(r'\bvendor="([^"]*)".*\bname="([^"]*)"',
RunShellCmd('mosys -k ec info').stdout)
if ecinfo:
return CompactStr(*ecinfo)
return None
@_ComponentProbe('ethernet')
def _ProbeEthernet(flimflam):
return flimflam.ReadSysfsDeviceIds('ethernet')
@_ComponentProbe('flash_chip')
def _ProbeMainFlashChip(main_fw):
return main_fw.chip_id
@_ComponentProbe('storage')
def _ProbeStorage():
"""Compile sysfs data for all non-removable block storage devices."""
def IsFixed(node):
path = os.path.join(node, 'removable')
return (os.path.exists(path) and open(path).read().strip() == '0')
def ProcessNode(node_path):
dev_path = os.path.join(node_path, 'device')
size_path = os.path.join(os.path.dirname(dev_path), 'size')
size = ('#' + open(size_path).read().strip()
if os.path.exists(size_path) else '')
ata_fields = ['vendor', 'model']
emmc_fields = ['type', 'name', 'fwrev', 'hwrev', 'oemid', 'manfid']
data = (_ReadSysfsFields(dev_path, ata_fields) or
_ReadSysfsFields(dev_path, emmc_fields) or
None)
return CompactStr(data + [size]) if data is not None else None
fixed_devices = [node for node in glob('/sys/class/block/*') if IsFixed(node)]
ident_list = [ident for ident in map(ProcessNode, fixed_devices)
if ident is not None]
return ' ; '.join(ident_list) if ident_list else None
@_ComponentProbe('touchpad')
def _ProbeTouchpad(touchpad):
return touchpad.hw_ident if touchpad is not None else None
@_ComponentProbe('tpm')
def _ProbeTpm():
"""Return Manufacturer_info : Chip_Version string from tpm_version output."""
tpm_data = [line.partition(':') for line in
RunShellCmd('tpm_version').stdout.splitlines()]
tpm_dict = dict((key.strip(), value.strip()) for
key, _, value in tpm_data)
mfg = tpm_dict.get('Manufacturer Info', None)
version = tpm_dict.get('Chip Version', None)
if mfg is not None and version is not None:
return mfg + ':' + version
return None
@_ComponentProbe('usb_hosts')
def _ProbeUsbHosts(arch):
"""Compile USB data from sysfs."""
# On x86, USB hosts are PCI devices, located in parent of root USB.
# On ARM and others, use the root device itself.
relpath = '.' if arch == 'arm' else '..'
usb_bus_list = glob('/sys/bus/usb/devices/usb*')
usb_host_list = [os.path.join(os.path.realpath(path), relpath)
for path in usb_bus_list]
# Usually there are several USB hosts, so only list the primary information.
device_id_list = [_ReadSysfsDeviceId(usb_host) for usb_host in usb_host_list]
valid_device_id_list = [x for x in device_id_list if x is not None]
return ' '.join(valid_device_id_list) if valid_device_id_list else None
@_ComponentProbe('vga')
def _ProbeVga():
return _ReadSysfsNodeId('/sys/class/graphics/fb0')
@_ComponentProbe('wireless')
def _ProbeWireless(flimflam):
return flimflam.ReadSysfsDeviceIds('wifi')
def _HashProbe(hash_class):
"""Decorator that populates _HASH_PROBE_MAP.
There can be only one probe function for each hash_class.
"""
def Decorate(f):
assert hash_class not in _HASH_PROBE_MAP, \
'Multiple hash probe functions for %s' % repr(hash_class)
_HASH_PROBE_MAP[hash_class] = f
_RegisterSharedDataRequirements(f)
return f
return Decorate
@_HashProbe('ro_main_firmware')
def _GetMainFirmwareReadOnlyHash(main_fw):
"""Returns hash of main firmware (BIOS) read only parts.
Allows verification that we have proper keys / boot code / recovery
image installed.
Algorithm: sha256(fmap, RO_SECTION[-GBB]).
"""
raw_image = open(main_fw.path, 'rb').read()
image = crosfw.FirmwareImage(raw_image)
hash_src = image.get_fmap_blob()
gbb = image.get_section('GBB')
zero_gbb = chr(0) * len(gbb)
image.put_section('GBB', zero_gbb)
hash_src += image.get_section('RO_SECTION')
return hashlib.sha256(hash_src).hexdigest()
@_HashProbe('hash_gbb')
def _GetMainFirmwareGbbHash(main_fw):
"""Returns hash of main firmware (BIOS) GBB section.
Allows verification that we have proper keys / images / HWID.
Algorithm: sha256(GBB[-HWID]).
"""
raw_image = open(main_fw.path, 'rb').read()
image = crosfw.FirmwareImage(raw_image)
# Clobber HWID in a copy of the GBB, to get a HWID-independent hash.
with NamedTemporaryFile('wb', delete=True) as f:
f.write(image.get_section('GBB'))
RunShellCmd('gbb_utility -s --hwid="ChromeOS" "%s"' % f.name)
hash_src = f.read()
return hashlib.sha256(hash_src).hexdigest()
@_HashProbe('ro_ec_firmware')
def _GetEcFirmwareReadOnlyHash(ec_fw):
"""Returns hash of Embedded Controller firmware read only parts.
Allows verification that we have proper updated version of EC firmware.
Algorithm: sha256(fmap, EC_RO).
"""
if ec_fw.chip_id is None:
return None
raw_image = open(ec_fw.path, 'rb').read()
image = crosfw.FirmwareImage(raw_image)
hash_src = image.get_fmap_blob()
hash_src += image.get_section('EC_RO')
return hashlib.sha256(hash_src).hexdigest()
def _CalculateMainFwKeyHash(main_fw_file, key_name):
"""Returns hash of the specified key from the main firmware.
Extracts specified GBB key element from the main firmware, using vbutil_key
command.
Algorithm: vbutil_key | grep "Key sha1sum".
"""
with NamedTemporaryFile(prefix='gbb_%s_' % key_name, delete=True) as f:
if not RunShellCmd('gbb_utility -g --%s=%s %s' %
(key_name, f.name, main_fw_file)).success:
raise Error('cannot get %s from GBB' % key_name)
key_info = RunShellCmd('vbutil_key --unpack %s' % f.name).stdout
sha1sum = re.findall(r'Key sha1sum:[\s]+([\w]+)', key_info)
if len(sha1sum) != 1:
logging.error("Failed calling vbutil_key for firmware key hash.")
return None
return sha1sum[0]
@_HashProbe('key_recovery')
def _CalculateRecoveryKeyHash(main_fw):
return _CalculateMainFwKeyHash(main_fw.path, 'recoverykey')
@_HashProbe('key_root')
def _CalculateRootKeyHash(main_fw):
return _CalculateMainFwKeyHash(main_fw.path, 'rootkey')
def _InitialConfigProbe(initial_config_class):
"""Decorator that populates _INITIAL_CONFIG_PROBE_MAP.
There can be only one probe function for each initial_config_class.
"""
def Decorate(f):
assert initial_config_class not in _INITIAL_CONFIG_PROBE_MAP, \
'Multiple initial config probe functions for %s' % \
repr(initial_config_class)
_INITIAL_CONFIG_PROBE_MAP[initial_config_class] = f
_RegisterSharedDataRequirements(f)
return f
return Decorate
@_InitialConfigProbe('cellular_fw_version')
def _ProbeCellularFirmwareVersion(flimflam):
"""Return firmware detail strings for all cellular devices."""
def GetVersionString(dev_attrs):
"""Use flimflam or modem status data to generate a version string.
The fields present in the flimflam data may differ for
partners/components.
"""
# TODO(tammo): Document breakdown of known combinations for each
# supported part, correspondingly document when the 'modem status'
# fallback is necessary.
version_formats = [
['Carrier', 'FirmwareRevision'],
['FirmwareRevision'],
['HardwareRevision']]
for version_format in version_formats:
if not set(version_format).issubset(set(dev_attrs)):
continue
# Compact all fields into single line with limited space.
return CompactStr([dev_attrs[key] for key in version_format])
# If nothing available, try 'modem status'.
cmd = 'modem status | grep firmware_revision'
modem_status = RunShellCmd(cmd).stdout.strip()
info = re.findall('^\s*firmware_revision:\s*(.*)', modem_status)
if info and info[0]:
return info[0]
return None
results = [GetVersionString(dev.attributes) for dev in
flimflam.GetDevices('cellular')]
results = [x for x in results if x is not None]
return ' ; '.join(results) if results else None
@_InitialConfigProbe('rw_fw_version')
def _ProbeRwFirmwareVersion(main_fw):
"""Returns RW (writable) firmware version from VBLOCK sections.
If A/B has different version, that means this system needs a reboot +
firmwar update so return value is a "error report" in the form "A=x, B=y".
"""
versions = [None, None]
with open(main_fw.path, 'rb') as f:
image = crosfw.FirmwareImage(f.read())
for (index, name) in enumerate(['VBLOCK_A', 'VBLOCK_B']):
data = image.get_section(name)
block = vblock.unpack_verification_block(data)
versions[index] = block['VbFirmwarePreambleHeader']['firmware_version']
if len(versions) != 2:
raise Error('Bad RW FW version data.')
if versions[0] != versions[1]:
return 'A=%d, B=%d' % versions
return '%d' % versions[0]
@_InitialConfigProbe('touchpad_fw_version')
def _ProbeTouchpadFirmwareVersion(touchpad):
return touchpad.fw_ident if touchpad is not None else None
# TODO(tammo): Consider getting rid of the component_registry
# argument. Without it, probeing can be done independently of any
# component database, which is conceptually much cleaner. Currently
# the only code that needs this is the card_reader probe to whitelist
# device identifiers, but this probe also has other ugliness -- it
# does not actually probe, but instead just crawls logs. which will
# introduce flakiness if the reader was never used. One possible
# approach here is to add an option to the functional test for the
# card reader, so that it checks that the card readers it finds are
# the ones that we want. The benefit here is that the functional test
# will always have all the information it needs, because the reader is
# active. For example, it could use the hwid_tool to query the reader
# associated with the system (and maybe just give a warning if there
# is no hwid set yet).
def Probe(component_registry, probe_volatile=True, probe_initial_config=True):
"""Return device component, hash, and initial_config data.
Run all of the available probing routines that make sense for the
target architecture, for example if the machine being probed is x86
then somewhat different probes would be run than for an ARM machine.
All probe results are returned directly, without analysis. Matching
these results against the component database or against HWID data
can be done afterwards.
Args:
component_registry: Dict of {component name: probe result} data,
which can be used by probe routines (for example as a whitelist
of known components of certain component classes, eg
cardreader).
probe_volatile: On False, do not probe for volatile data and
return None for the corresponding field.
probe_initial_config: On False, do not probe for initial_config
data and return None for the corresponding field.
Returns:
Obj with components, volatile, and initial_config fields, each
containing the corresponding dict of probe results.
"""
arch = RunShellCmd('crossystem arch').stdout.strip()
shared_data = { 'arch': arch, 'component_registry': component_registry }
def PopulateSharedData(data_name):
if data_name in shared_data:
return
fun = _COMMON_DATA_PROVIDER_MAP[data_name]
shared_data[data_name] = RunFunWithSharedData(fun)
def RunFunWithSharedData(fun):
required_data_name_list = _SHARED_DATA_REQS_MAP[fun]
map(PopulateSharedData, required_data_name_list)
fun_args = dict((x, shared_data[x]) for x in shared_data
if x in required_data_name_list)
return fun(**fun_args)
def RunProbe(fun):
try:
return RunFunWithSharedData(fun)
except Exception:
logging.exception('Probe FAILED (%s), see traceback, returning None.' %
fun.__name__)
return None
component_result_map = {}
hash_result_map = {}
initial_config_result_map = {}
for component_class, fun_data in sorted(_COMPONENT_PROBE_MAP.items()):
fun = fun_data[arch] if isinstance(fun_data, dict) else fun_data
component_result_map[component_class] = RunProbe(fun)
if probe_volatile:
# TODO(tammo): Lift out the hash generation, to allow convenient
# generation of hashes directly for firmware images (as opposed to
# just DUT machines). Ideally provide a command (maybe in
# hwid_tool) which will take an arbitrary firmware image/blob,
# determine the firmware type (main, ec, etc) and then generate
# all of the corresponding hashes. This command, similar to
# probing for new BOM components, should help with appending new
# hash values to the database.
for hash_class, fun in sorted(_HASH_PROBE_MAP.items()):
hash_result_map[hash_class] = RunProbe(fun)
else:
hash_result_map = None
if probe_initial_config:
for initial_config_class, fun in sorted(_INITIAL_CONFIG_PROBE_MAP.items()):
initial_config_result_map[initial_config_class] = RunProbe(fun)
else:
initial_config_result_map = None
return Obj(components=component_result_map,
volatiles=hash_result_map,
initial_configs=initial_config_result_map)