blob: 30b86393df715fdbbcf4f7aab50f98db406e2446 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Probing routines for hardware and firmware identification.
There are three types of probe functions: hardware components, hash
values, and initial_config.
Probe functions must return the target identification string if the
probe was successful or None if no appropriate data was available.
Probe functions may also raise the Error exception to indicate that a
survivable error occurred, in which case the error will be reported
and a None probe result assumed.
import hashlib
import logging
import os
import re
import sys
from array import array
from glob import glob
from fcntl import ioctl
from tempfile import NamedTemporaryFile
import edid
import crosfw
import vblock
import flimflam
from common import CompactStr, Error, Obj, Shell
# TODO(tammo): Some tests look for multiple components, some tests
# throw away all but the first, and some just look for one. All tests
# should return a list of results, with the empty list indicating no
# components were found.
# TODO(tammo): Get rid of trial-and-error detection. If there are
# multiple different ways to perform detection, we should run them all
# and collate the results. Different code paths on different systems
# leads to bitrot and fragility.
# Load-time decorator-populated dicts (arch of None implies generality):
# { arch : { class : probe function } }
def _LoadKernelModule(name):
"""Ensure kernel module is loaded. If not already loaded, do the load."""
# TODO(tammo): Maybe lift into shared data for performance reasons.
loaded = Shell('lsmod | grep -q %s' % name).success
if not loaded:
loaded = Shell('modprobe %s' % name).success
if not loaded:
raise Error('Cannot load kernel module: %s' % name)
def _ReadSysfsFields(base_path, field_list, optional_field_list=None):
"""Return dict of {field_name: field_value} corresponding to sysfs contents.
base_path: sysfs directory which each field should be a file within.
field_list: Required fields ; function returns None if fields are missing.
optional_field_list: Fields that are included if the corresponding
files exist.
Dict of field names and values, or None if required fields are not
all present.
all_fields_list = field_list + (optional_field_list or [])
path_list = [os.path.join(base_path, field) for field in all_fields_list]
data = dict((field, open(path).read().strip())
for field, path in zip(all_fields_list, path_list)
if os.path.exists(path))
result = [data[field] for field in all_fields_list if field in data]
return result if set(data) >= set(field_list) else None
def _ReadSysfsPciFields(path):
"""Returns string containing PCI 'vendor:device' tuple."""
# TODO(hungte): Maybe add PCI 'rev' field.
field_data = _ReadSysfsFields(path, ['vendor', 'device'])
if field_data is None:
return None
vendor, device = field_data
return '%s:%s' % (vendor.replace('0x', ''), device.replace('0x', ''))
def _ReadSysfsUsbFields(path):
"""Returns string containing at least the USB 'idVendor:idProduct' tuple.
path: Path used to search for USB sysfs data. First all symlinks
are resolved, to the the 'real' path. Then path terms are
iteratively removed from the right hand side until the remaining
path looks to contain the relevent data fields.
A string with the above tuple if a sutable directory with
containing the field data can be found. This string will also
contain space-separated optional field data if those are
available. If no directory with the required fields are found,
returns None.
path = os.path.realpath(path)
while path.find('/usb') > 0:
if os.path.exists(os.path.join(path, 'idProduct')):
path = os.path.split(path)[0]
field_data = _ReadSysfsFields(path, ['idVendor', 'idProduct'],
['manufacturer', 'product', 'bcdDevice'])
if field_data is None:
return None
result = '%s:%s' % (field_data[0].replace('0x', ''),
field_data[1].replace('0x', ''))
result += (' ' + ' '.join(field_data[2:])) if field_data[2:] else ''
return result
def _ReadSysfsDeviceId(path):
"""Returns sysfs PCI or USB device identification string."""
return (_ReadSysfsPciFields(path) or
_ReadSysfsUsbFields(path) or
def _ReadSysfsNodeId(path):
"""Returns sysfs node identification string.
A more generic wrapper around _ReadSysfsDeviceId which supports
cases where only a 'name' file exists. Basically it tries to read
the DeviceID data if present, but otherwise falls back to just
reading the name file data.
name_path = os.path.join(path, 'name')
return (_ReadSysfsDeviceId(os.path.join(path, 'device')) or
(os.path.exists(name_path) and
open(name_path).read().strip()) or
class _FlimflamDevices(object):
"""Wrapper around flimflam (connection manager) information.
This object is a wrapper around the data from the flimflam module,
providing dbus format post processing.
Wrapped data is a list of Objs corresponding to devices detected by
flimflam. Each has devtype (flimflam type classification) and path
(location of related data in sysfs) fields. For cellular devices,
there is also an attributes field which contains a dict of
attribute:value items.
cached_dev_list = None
def GetDevices(c, devtype):
"""Return device Obj list for devices with the specified type."""
def ProcessDevice(device):
properties = device.GetProperties()
get_prop = lambda p: flimflam.convert_dbus_value(properties[p])
result = Obj(
path='/sys/class/net/%s/device' % get_prop('Interface'))
if result.devtype == 'cellular':
result.attributes = dict(
(key, get_prop('Cellular.%s' % key))
for key in ['Carrier', 'FirmwareRevision', 'HardwareRevision',
'ModelID', 'Manufacturer']
if ('Cellular.%s' % key) in properties)
return result
if c.cached_dev_list is None:
c.cached_dev_list = [ProcessDevice(device) for device in
return [dev for dev in c.cached_dev_list if dev.devtype == devtype]
def ReadSysfsDeviceIds(c, devtype):
"""Return _ReadSysfsDeviceId result for each device of specified type."""
ids = [_ReadSysfsDeviceId(dev.path) for dev in c.GetDevices(devtype)]
return ' ; '.join(ids) if ids else None
class _TouchpadData():
"""Return Obj with hw_ident and fw_ident string fields."""
def Synaptics(c):
detect_program = '/opt/Synaptics/bin/syndetect'
if not os.path.exists(detect_program):
return None
lock_check = Shell('lsof /dev/serio_raw0 | grep -q "^X"')
if lock_check.success and not os.getenv('DISPLAY'):
logging.error('Synaptics touchpad detection with X in the '
'foreground requires DISPLAY and XAUTHORITY '
'to be set properly.')
return None
result = Shell(detect_program)
if not result.success:
return None
properties = dict(map(str.strip, line.split('=', 1))
for line in result.stdout.splitlines() if '=' in line)
model = properties.get('Model String', 'Unknown Synaptics')
# Delete the " on xxx Port" substring, as we do not care about the port.
model = re.sub(' on [^ ]* [Pp]ort$', '', model)
firmware = properties.get('Firmware ID', None)
return Obj(ident_str=model, fw_version=firmware)
def Cypress(c):
for node in glob('/sys/class/input/mouse[0-9]*/device/device'):
model_path_list = [os.path.join(node, field) for field in
['product_id', 'hardware_version', 'protocol_version']]
firmware_path = os.path.join(node, 'firmware_version')
if not all(os.path.exists(path) for path in
model_path_list + [firmware_path]):
return Obj(
[open(path).read().strip() for path in model_path_list]),
return None
def Generic(c):
# TODO(hungte) add more information from id/*
# format: N: Name="???_trackpad"
input_file = '/proc/bus/input/devices'
cmd = 'grep -iE "^N.*(touch *pad|track *pad)" %s' % input_file
info = Shell(cmd).stdout.splitlines()
info = [re.sub('^[^"]*"(.*)"$', r'\1', device) for device in info]
return Obj(ident_str=(', '.join(info) if info else None), fw_version=None)
cached_data = None
def Get(c):
if c.cached_data is None:
c.cached_data = Obj(ident_str=None, fw_version=None)
for vendor_fun in [c.Cypress, c.Synaptics, c.Generic]:
data = vendor_fun()
if data is not None:
c.cached_data = data
return c.cached_data
def _ProbeFun(probe_map, probe_class, *arch_targets):
"""Decorator that populates probe_map.
There can only be one probe function for each arch for each
probe_class. If no arch_targets are specified, the probe is assumed
to be general and apply for those arches whithout arch specific
probe_map: Map to update.
comp_class: Probe class for which the probe fun produces results.
arch_targets: List of arches for which the probe is relevant.
def Decorate(f):
arch_list = arch_targets if arch_targets else [None]
for arch in arch_list:
arch_probe_map = probe_map.setdefault(arch, {})
assert probe_class not in arch_probe_map, (
'Multiple component probe functions for %r %r',
arch if arch else 'generic', probe_class)
arch_probe_map[probe_class] = f
return f
return Decorate
def _ComponentProbe(probe_class, *arch_targets):
return _ProbeFun(_COMPONENT_PROBE_MAP, probe_class, *arch_targets)
def _InitialConfigProbe(probe_class, *arch_targets):
return _ProbeFun(_INITIAL_CONFIG_PROBE_MAP, probe_class, *arch_targets)
def _ProbeAudioCodec():
"""Looks for codec strings in /proc/asound then at PCM details."""
grep_result = Shell('grep -R "Codec:" /proc/asound/*')
match_list = [re.findall(r'.*Codec:(.*)', line)
for line in grep_result.stdout.splitlines()]
result_set = set(CompactStr(match) for match in match_list if match)
if result_set:
return ' ; '.join(result_set)
# Formatted '00-00: WM??? PCM wm???-hifi-0: ...'
pcm_data = open('/proc/asound/pcm').read().strip().split(' ')
if len(pcm_data) > 2:
return CompactStr(pcm_data[1])
return None
def _ProbeBattery():
"""Compose data from sysfs."""
node_path_list = glob('/sys/class/power_supply/*')
type_data_list = [_ReadSysfsFields(node_path, ['type'])[0]
for node_path in node_path_list]
battery_field_list = ['manufacturer', 'model_name', 'technology',
battery_data_list = [_ReadSysfsFields(node_path, battery_field_list)
for node_path, type_data
in zip(node_path_list, type_data_list)
if type_data == 'Battery']
results = [CompactStr(x) for x in battery_data_list]
return ' ; '.join(results) if results else None
def _ProbeBluetooth():
return _ReadSysfsDeviceId('/sys/class/bluetooth/hci0/device')
def _ProbeCamera():
# TODO(tammo): What is happening here? Arch-specific stuff? Doc string...
# TODO(tammo/sheckylin): Try to replace the code below with OpenCV calls.
info = []
camera_node = '/sys/class/video4linux/video0'
camera_data = _ReadSysfsNodeId(camera_node)
if camera_data:
# For SOC cameras
camera_data = _ReadSysfsFields(camera_node, ['device/control/name'])
if camera_data:
# Try video4linux2 (v4l2) interface.
# See /usr/include/linux/videodev2.h for definition of these consts.
# 'ident' values are defined in include/media/v4l2-chip-ident.h
with open('/dev/video0', 'r+') as f:
buf = array('i', [0] * V4L2_DBG_CHIP_IDENT_SIZE)
ioctl(f.fileno(), VIDIOC_DBG_G_CHIP_IDENT, buf, 1)
v4l2_ident = buf[V4L2_INDEX_IDENT]
if v4l2_ident >= V4L2_VALID_IDENT:
info.append('V4L2:%04x %04x' % (v4l2_ident, buf[V4L2_INDEX_REVISION]))
return CompactStr(info) if info else None
def _ProbeCellular():
return _FlimflamDevices.ReadSysfsDeviceIds('cellular')
def _ProbeDisplayConverter():
"""Try brand-specific probes, return the first viable result."""
def ProbeChrontel():
"""Search style borrowed from the /etc/init/chrontel.conf behavior."""
dev_chrontel = '/dev/i2c-chrontel'
if not os.path.exists(dev_chrontel):
for dev_path in glob('/sys/class/i2c-adapter/*'):
adapter_name = open(os.path.join(dev_path, 'name')).read().strip()
if adapter_name.startswith('SMBus I801 adapter'):
dev_chrontel = os.path.basename(dev_path)
cmd = 'ch7036_monitor -d %s -p' % dev_chrontel
if os.path.exists(dev_chrontel) and Shell(cmd).success:
return 'ch7036'
return None
part_id_gen = (probe_fun() for probe_fun in [ProbeChrontel])
return next((x for x in part_id_gen if x is not None), None)
@_ComponentProbe('chipset', 'x86')
def _ProbeChipsetX86():
"""On x86, host bridge is always the first PCI device."""
return _ReadSysfsDeviceId('/sys/bus/pci/devices/0000:00:00.0')
@_ComponentProbe('chipset', 'arm')
def _ProbeChipsetArm():
"""On ARM SOC-based systems, use first compatible list in device-tree."""
# Format: manufacturer,model [NUL] compat-manufacturer,model [NUL] ...
fdt_compatible_file = '/proc/device-tree/compatible'
if not os.path.exists(fdt_compatible_file):
return None
compatible_list = open(fdt_compatible_file).read().strip()
return CompactStr(compatible_list.strip(chr(0)).split(chr(0)))
@_ComponentProbe('cpu', 'x86')
def _ProbeCpuX86():
"""Reformat /proc/cpuinfo data."""
# For platforms like x86, it provides names for each core.
# Sample output for dual-core:
# model name : Intel(R) Atom(TM) CPU ???
# model name : Intel(R) Atom(TM) CPU ???
cmd = r'sed -nr "s/^model name\s*: (.*)/\1/p" /proc/cpuinfo'
stdout = Shell(cmd).stdout.splitlines()
return CompactStr(stdout[0] + ' [%d cores]' % len(stdout))
@_ComponentProbe('cpu', 'arm')
def _ProbeCpuArm():
"""Reformat /proc/cpuinfo data."""
# For platforms like arm, it gives the name only in 'Processor'
# and then a numeric ID for each cores 'processor', so delta is 1.
# Sample output for dual-core:
# Processor : ARM???
# processor : 0
# processor : 1
cmd = r'sed -nr "s/^[Pp]rocessor\s*: (.*)/\1/p" /proc/cpuinfo'
stdout = Shell(cmd).stdout.splitlines()
return CompactStr(stdout[0] + ' [%d cores]' % len(stdout) - 1)
def _ProbeDisplayPanel():
"""Combine all available edid data, from sysfs and directly from the i2c."""
edid_set = set(edid.Parse(open(path).read())
for path in glob('/sys/class/drm/*LVDS*/edid'))
edid_set |= set(edid.LoadFromI2c(path)
for path in sorted(glob('/dev/i2c-?')))
edid_set -= set([None])
return ' ; '.join(sorted(edid_set)) if edid_set else None
@_ComponentProbe('dram', 'x86')
def _ProbeDramX86():
"""Combine mosys memory timing and geometry information."""
# TODO(tammo): Document why mosys cannot load i2c_dev itself.
time_data = Shell('mosys -k memory spd print timings').stdout
size_data = Shell('mosys -k memory spd print geometry').stdout
times = dict(re.findall('dimm="([^"]*)".*speeds="([^"]*)"', time_data))
sizes = dict(re.findall('dimm="([^"]*)".*size_mb="([^"]*)"', size_data))
return CompactStr(['%s|%s|%s' % (i, sizes[i], times[i].replace(' ', ''))
for i in sorted(times)])
@_ComponentProbe('dram', 'arm')
def _ProbeDramArm():
"""Memory is not directly probable, so use kernel cmdline info."""
# TODO(tammo): Request that mosys provide this info (by any means).
cmdline = open('/proc/cmdline').read().strip()
# Format: *mem=384M@0M (size@address)
return CompactStr(re.findall(r'\s\w*mem=(\d+M@\d+M)', cmdline))
def _ProbeEcFlashChip():
return crosfw.LoadEcFirmware().GetChipId()
def _ProbeEmbeddedController():
"""Reformat mosys output."""
# Example mosys command output:
# vendor="VENDOR" name="CHIPNAME" fw_version="ECFWVER"
ecinfo = re.findall(r'\bvendor="([^"]*)".*\bname="([^"]*)"',
Shell('mosys -k ec info').stdout)
if ecinfo:
return CompactStr(*ecinfo)
return None
def _ProbeEthernet():
return _FlimflamDevices.ReadSysfsDeviceIds('ethernet')
def _ProbeMainFlashChip():
return crosfw.LoadMainFirmware().GetChipId()
def _ProbeStorage():
"""Compile sysfs data for all non-removable block storage devices."""
def IsFixed(node):
path = os.path.join(node, 'removable')
return (os.path.exists(path) and open(path).read().strip() == '0')
def ProcessNode(node_path):
dev_path = os.path.join(node_path, 'device')
size_path = os.path.join(os.path.dirname(dev_path), 'size')
size = ('#' + open(size_path).read().strip()
if os.path.exists(size_path) else '')
ata_fields = ['vendor', 'model']
emmc_fields = ['type', 'name', 'fwrev', 'hwrev', 'oemid', 'manfid']
data = (_ReadSysfsFields(dev_path, ata_fields) or
_ReadSysfsFields(dev_path, emmc_fields) or
return CompactStr(data + [size]) if data is not None else None
fixed_devices = [node for node in glob('/sys/class/block/*') if IsFixed(node)]
ident_list = [ident for ident in map(ProcessNode, fixed_devices)
if ident is not None]
return ' ; '.join(ident_list) if ident_list else None
def _ProbeTouchpad():
return _TouchpadData.Get().ident_str
def _ProbeTpm():
"""Return Manufacturer_info : Chip_Version string from tpm_version output."""
tpm_data = [line.partition(':') for line in
tpm_dict = dict((key.strip(), value.strip()) for
key, _, value in tpm_data)
mfg = tpm_dict.get('Manufacturer Info', None)
version = tpm_dict.get('Chip Version', None)
if mfg is not None and version is not None:
return mfg + ':' + version
return None
def _ProbeUsbHosts():
"""Compile USB data from sysfs."""
# On x86, USB hosts are PCI devices, located in parent of root USB.
# On ARM and others, use the root device itself.
# TODO(tammo): Think of a better way to do this, without arch.
arch = Shell('crossystem arch').stdout.strip()
relpath = '.' if arch == 'arm' else '..'
usb_bus_list = glob('/sys/bus/usb/devices/usb*')
usb_host_list = [os.path.join(os.path.realpath(path), relpath)
for path in usb_bus_list]
# Usually there are several USB hosts, so only list the primary information.
device_id_list = [_ReadSysfsDeviceId(usb_host) for usb_host in usb_host_list]
valid_device_id_list = [x for x in device_id_list if x is not None]
return ' '.join(valid_device_id_list) if valid_device_id_list else None
def _ProbeVga():
return _ReadSysfsNodeId('/sys/class/graphics/fb0')
def _ProbeWireless():
return _FlimflamDevices.ReadSysfsDeviceIds('wifi')
def _ProbeCellularFirmwareVersion():
"""Return firmware detail strings for all cellular devices."""
def GetVersionString(dev_attrs):
"""Use flimflam or modem status data to generate a version string.
The fields present in the flimflam data may differ for
# TODO(tammo): Document breakdown of known combinations for each
# supported part, correspondingly document when the 'modem status'
# fallback is necessary.
version_formats = [
['Carrier', 'FirmwareRevision'],
for version_format in version_formats:
if not set(version_format).issubset(set(dev_attrs)):
# Compact all fields into single line with limited space.
return CompactStr([dev_attrs[key] for key in version_format])
# If nothing available, try 'modem status'.
cmd = 'modem status | grep firmware_revision'
modem_status = Shell(cmd).stdout.strip()
info = re.findall('^\s*firmware_revision:\s*(.*)', modem_status)
if info and info[0]:
return info[0]
return None
results = [GetVersionString(dev.attributes) for dev in
results = [x for x in results if x is not None]
return ' ; '.join(results)
def _ProbeRwFirmwareVersion():
"""Returns RW (writable) firmware version from VBLOCK sections."""
def GetVersion(section_name):
data = image.get_section(section_name)
block = vblock.unpack_verification_block(data)
return block['VbFirmwarePreambleHeader']['firmware_version']
main_fw_file = crosfw.LoadMainFirmware().GetFileName()
image = crosfw.FirmwareImage(open(main_fw_file, 'rb').read())
versions = map(GetVersion, ['VBLOCK_A', 'VBLOCK_B'])
if versions[0] != versions[1]:
return 'A=%d, B=%d' % versions
return '%d' % versions[0]
def _ProbeTouchpadFirmwareVersion():
return _TouchpadData.Get().fw_version
def _GbbHash(image):
"""Algorithm: sha256(GBB[-HWID]); GBB without HWID."""
with NamedTemporaryFile('wb') as f:
Shell('gbb_utility -s --hwid="ChromeOS" "%s"' %
hash_src =
return hashlib.sha256(hash_src).hexdigest()
def _MainRoHash(image):
"""Algorithm: sha256(fmap, RO_SECTION[-GBB])."""
hash_src = image.get_fmap_blob()
gbb = image.get_section('GBB')
zero_gbb = chr(0) * len(gbb)
image.put_section('GBB', zero_gbb)
hash_src += image.get_section('RO_SECTION')
image.put_section('GBB', gbb)
return hashlib.sha256(hash_src).hexdigest()
def _EcRoHash(image):
"""Algorithm: sha256(fmap, EC_RO)."""
hash_src = image.get_fmap_blob()
hash_src += image.get_section('EC_RO')
return hashlib.sha256(hash_src).hexdigest()
def _FwKeyHash(main_fw_file, key_name):
"""Hash specified GBB key, extracted by vbutil_key."""
with NamedTemporaryFile(prefix='gbb_%s_' % key_name) as f:
if not Shell('gbb_utility -g --%s=%s %s' %
(key_name,, main_fw_file)).success:
raise Error('cannot get %s from GBB' % key_name)
key_info = Shell('vbutil_key --unpack %s' %
sha1sum = re.findall(r'Key sha1sum:[\s]+([\w]+)', key_info)
if len(sha1sum) != 1:
logging.error("Failed calling vbutil_key for firmware key hash.")
return None
return sha1sum[0]
def CalculateFirmwareHashes(fw_file_path):
"""Calculate the volatile hashes corresponding to a firmware blob.
Given a firmware blob, determine what kind of firmware it is based
on what sections are present. Then generate a dict containing the
corresponding hash values.
raw_image = open(fw_file_path, 'rb').read()
image = crosfw.FirmwareImage(raw_image)
return None
hashes = {}
if image.has_section('EC_RO'):
hashes['ro_ec_firmware'] = _EcRoHash(image)
elif image.has_section('GBB') and image.has_section('RO_SECTION'):
hashes['hash_gbb'] = _GbbHash(image)
hashes['ro_main_firmware'] = _MainRoHash(image)
hashes['key_recovery'] = _FwKeyHash(fw_file_path, 'recoverykey')
hashes['key_root'] = _FwKeyHash(fw_file_path, 'rootkey')
return hashes
def Probe(target_comp_classes=[],
"""Return device component, hash, and initial_config data.
Run all of the available probing routines that make sense for the
target architecture, for example if the machine being probed is x86
then somewhat different probes would be run than for an ARM machine.
All probe results are returned directly, without analysis. Matching
these results against the component database or against HWID data
can be done afterwards.
component_classes: Which component classes to probe for. The
empty list implies all classes.
probe_volatile: On False, do not probe for volatile data and
return None for the corresponding field.
probe_initial_config: On False, do not probe for initial_config
data and return None for the corresponding field.
Obj with components, volatile, and initial_config fields, each
containing the corresponding dict of probe results.
def RunProbe(probe_fun):
return probe_fun()
except Exception:
logging.exception('Probe %r FAILED (see traceback), returning None.',
return None
def FilterProbes(ref_probe_map, arch, probe_class_white_list):
generic_probes = ref_probe_map.get(None, {})
arch_probes = ref_probe_map.get(arch, {})
if not probe_class_white_list:
probe_class_white_list = set(generic_probes) | set(arch_probes)
return dict((probe_class, (arch_probes[probe_class]
if probe_class in arch_probes
else generic_probes[probe_class]))
for probe_class in sorted(probe_class_white_list))
results = Obj(
arch = Shell('crossystem arch').stdout.strip()
comp_probes = FilterProbes(_COMPONENT_PROBE_MAP, arch, target_comp_classes)
if probe_initial_config:
ic_probes = FilterProbes(_INITIAL_CONFIG_PROBE_MAP, arch, [])
ic_probes = {}
for comp_class, probe_fun in comp_probes.items():
probe_value = RunProbe(probe_fun)
if probe_value is not None:
results.found_components[comp_class] = probe_value
for ic_class, probe_fun in ic_probes.items():
probe_value = RunProbe(probe_fun)
if probe_value is not None:
results.initial_configs[ic_class] = probe_value
if probe_volatile:
main_fw_file = crosfw.LoadMainFirmware().GetFileName()
ec_fw_file = crosfw.LoadEcFirmware().GetFileName()
if ec_fw_file is not None:
return results