blob: f78e8cae1181849cafbdc49aac81c73d0839e1d3 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Probing routines for hardware and firmware identification.
There are three types of probe functions: hardware components, hash
values, and initial_config.
Probe functions must return the target identification string if the
probe was successful or None if no appropriate data was available.
Probe functions may also raise the Error exception to indicate that a
survivable error occurred, in which case the error will be reported
and a None probe result assumed.
"""
from __future__ import print_function
from array import array
import collections
from fcntl import ioctl
from glob import glob
import hashlib
import logging
import os
import re
import string # pylint: disable=deprecated-module
import struct
import sys
from tempfile import NamedTemporaryFile
import factory_common # pylint: disable=unused-import
from cros.factory.gooftool.common import Shell
from cros.factory.gooftool import crosfw
from cros.factory.gooftool import edid
from cros.factory.gooftool import vblock
# pylint: disable=no-name-in-module
from cros.factory.hwid.v2.hwid_tool import ProbeResults
from cros.factory.test.l10n import regions
from cros.factory.utils import process_utils
from cros.factory.utils import sys_utils
from cros.factory.utils.type_utils import Error
from cros.factory.utils.type_utils import Obj
from cros.factory.external import evdev
try:
sys.path.append('/usr/local/lib/flimflam/test')
import flimflam # pylint: disable=import-error
except: # pylint: disable=bare-except
pass
# TODO(tammo): Some tests look for multiple components, some tests
# throw away all but the first, and some just look for one. All tests
# should return a list of results, with the empty list indicating no
# components were found.
# TODO(tammo): Get rid of trial-and-error detection. If there are
# multiple different ways to perform detection, we should run them all
# and collate the results. Different code paths on different systems
# leads to bitrot and fragility.
# Load-time decorator-populated dicts (arch of None implies generality):
# { arch : { class : probe function } }
_COMPONENT_PROBE_MAP = {}
_INITIAL_CONFIG_PROBE_MAP = {}
# Load-time decorator-populated set of probably component classes.
PROBEABLE_COMPONENT_CLASSES = set()
# If this file is present, we'll return its probe results rather than
# actually probing.
FAKE_PROBE_RESULTS_FILE = '/tmp/fake_probe_results.yaml'
def CompactStr(data):
"""Converts data to string with compressed white space.
Args:
data: Single string or a list/tuple of strings.
Returns:
If data is a string, compress all contained contiguous spaces to
single spaces. If data is a list or tuple, space-join and then
treat like string input.
"""
if isinstance(data, list) or isinstance(data, tuple):
data = ' '.join(x for x in data if x)
return re.sub(r'\s+', ' ', data).strip()
def ParseKeyValueData(pattern, data):
"""Converts structured text into a {(key, value)} dict.
Args:
pattern: A regex pattern to decode key/value pairs
data: The text to be parsed.
Returns:
A { key: value, ... } dict.
Raises:
ValueError: When the input is invalid.
"""
parsed_list = {}
for line in data.splitlines():
matched = re.match(pattern, line.strip())
if not matched:
raise ValueError('Invalid data: %s' % line)
(name, value) = (matched.group(1), matched.group(2))
if name in parsed_list:
raise ValueError('Duplicate key: %s' % name)
parsed_list[name] = value
return parsed_list
def _StripRead(filepath):
"""Return the stripped file content."""
with open(filepath) as f:
return f.read().strip()
def _ShellOutput(command, on_error=''):
"""Returns shell command output.
When the execution failed, usually the caller would want either empty string
or None. However because most probe results expect empty string (for schema
validation), here we set default on_error to empty string ('').
command: A shell command passed to Shell().
on_error: What to return if execution failed, defaults to empty string.
"""
result = Shell(command)
return result.stdout.strip() if result.success else on_error
def _ReadSysfsFields(base_path, field_list, optional_field_list=None):
"""Return dict of {field_name: field_value} corresponding to sysfs contents.
Args:
base_path: sysfs directory which each field should be a file within.
field_list: Required fields ; function returns None if fields are missing.
optional_field_list: Fields that are included if the corresponding
files exist.
Returns:
Dict of field names and values, or None if required fields are not
all present.
"""
all_fields_list = field_list + (optional_field_list or [])
path_list = [os.path.join(base_path, field) for field in all_fields_list]
data = dict((field, _StripRead(path))
for field, path in zip(all_fields_list, path_list)
if os.path.exists(path) and not os.path.isdir(path))
if not set(data) >= set(field_list):
return None
return data
def _ReadSysfsPciFields(path):
"""Returns dict that contains the values of PCI.
Args:
path: Path used to search for PCI sysfs data.
Returns:
A dict that contains at least the value of PCI 'vendor', 'device', and
'revision_id'. Returns None if the information cannot be found.
"""
field_data = _ReadSysfsFields(path, ['vendor', 'device'])
if field_data is None:
return None
# Add PCI 'revision_id' field
pci_revision_id_offset = 0x08
try:
with open(os.path.join(path, 'config'), 'rb') as f:
f.seek(pci_revision_id_offset)
rev_byte = f.read(1)
if len(rev_byte) == 1:
field_data['revision_id'] = hex(ord(rev_byte))
except IOError:
logging.exception('Cannot read config in the sysfs: %s', path)
return None
return field_data
def _ReadSysfsUsbFields(path):
"""Returns dict containing at least the values of USB 'idVendor' and
'idProduct'.
Args:
path: Path used to search for USB sysfs data. First all symlinks
are resolved, to the the 'real' path. Then path terms are
iteratively removed from the right hand side until the remaining
path looks to contain the relevent data fields.
Returns:
A dict with the USB 'idVendor' and 'idProduct' values if a sutable
directory containing the field data can be found. This dict will also
contain other optional field data if those are available. If no directory
with the required fields are found, returns None.
"""
path = os.path.realpath(path)
while path.find('/usb') > 0:
if os.path.exists(os.path.join(path, 'idProduct')):
break
path = os.path.split(path)[0]
field_data = _ReadSysfsFields(path, ['idVendor', 'idProduct'],
['manufacturer', 'product', 'bcdDevice'])
if field_data is None:
return None
return field_data
def _ReadSysfsDeviceId(path, ignore_usb=False):
"""Returns sysfs PCI or USB device identification string."""
return (_ReadSysfsPciFields(path) or
(_ReadSysfsUsbFields(path) if not ignore_usb else None) or
None)
def _ReadSysfsNodeId(path):
"""Returns sysfs node identification string.
A more generic wrapper around _ReadSysfsDeviceId which supports
cases where only a 'name' file exists. Basically it tries to read
the DeviceID data if present, but otherwise falls back to just
reading the name file data.
"""
device_id = _ReadSysfsDeviceId(os.path.join(path, 'device'))
if device_id:
return device_id
name_path = os.path.join(path, 'name')
if os.path.exists(name_path):
device_id = _StripRead(name_path)
if device_id:
return {'name': ' '.join(device_id.replace(chr(0), ' ').split())}
return None
def _RecursiveProbe(path, read_method):
"""Recursively probes in path and all the subdirectory using read_method.
Args:
path: Root path of the recursive probing.
read_method: The method used to probe device information.
This method accepts an input path and returns a string.
e.g. _ReadSysfsUsbFields, _ReadSysfsPciFields, or _ReadSysfsDeviceId.
Returns:
A list of strings which contains probed results under path and
all the subdirectory of path. Duplicated data will be omitted.
"""
def _InternalRecursiveProbe(path, visited_path, data_list, read_method):
"""Recursively probes in path and all the subdirectory using read_method.
Args:
path: Root path of the recursive probing.
visited_path: A set containing visited paths. These paths will not
be visited again.
data_list: A list of string which contains probed results.
This list will be appended through the recursive probing.
read_method: The method used to probe device information.
This method accepts an input path and returns a string.
Returns:
No return value. data_list in the input will be appended with probed
information. Duplicated data will be omitted.
"""
path = os.path.realpath(path)
if path in visited_path:
return
if os.path.isdir(path):
data = read_method(path)
# Only append new data
if data not in data_list:
data_list.append(data)
entries_list = os.listdir(path)
visited_path.add(path)
else:
return
for filename in entries_list:
# Do not search directory upward
if filename == 'subsystem':
continue
sub_path = os.path.join(path, filename)
_InternalRecursiveProbe(sub_path, visited_path, data_list, read_method)
return
visited_path = set()
data_list = []
_InternalRecursiveProbe(path, visited_path, data_list, read_method)
return data_list
class _GobiDevices(object):
"""Wrapper around Gobi specific utility information."""
# TODO(bhthompson): This will need to be rewritten when gobi-fw is
# deprecated, see crbug.com/217324
@classmethod
def IsDeviceGobi(cls):
"""Return true if there is a Gobi modem, false if not."""
for path in glob('/sys/class/net/*/device/uevent'):
with open(path) as f:
if 'DRIVER=gobi' in [x.strip() for x in f.readlines()]:
return True
return False
@classmethod
def ReadFirmwareList(cls):
"""Return a list of firmware tuples from the `gobi-fw list` command"""
if not cls.IsDeviceGobi():
return None
firmwares = []
Firmware = collections.namedtuple('Firmware', 'attrs active build_id '
'carrier')
# Split utility output into a list and remove the legend and last newline.
# The attrs field consists of some/all of the characters AIPM from the
# gobi-fw utility 'Legend: A available I installed P pri M modem * active'
# We separate out the * for active as it is an initial configuration,
# modifiable by the user or tests to enable different carriers/regions.
for l in _ShellOutput('gobi-fw list').splitlines()[1:]:
m = re.match(r'^([A ][I ][P ][M ])([* ]) (\S+)\s+(.+)$', l)
if not m:
raise ValueError('Unable to parse line %r in gobi-fw output' % l)
firmwares.append(Firmware(m.group(1), m.group(2) != ' ', m.group(3),
m.group(4)))
return firmwares
@classmethod
def ActiveFirmware(cls):
"""Return the string of the active firmware (build_id for Gobi)."""
if not cls.IsDeviceGobi():
return None
firmwares = cls.ReadFirmwareList()
active_firmwares = [fw.build_id for fw in firmwares if fw.active]
active_firmware = active_firmwares[0] if active_firmwares else None
return active_firmware
class _NetworkDevices(object):
"""A general probing module for network devices."""
cached_dev_list = None
@classmethod
def _GetIwconfigDevices(cls, extension='IEEE 802.11'):
"""Wrapper around iwconfig(8) information.
Example output:
eth0 no wireless extensions.
wlan0 IEEE 802.11abgn ESSID:off/any
Mod:Managed Access Point: Not-Associated Tx-Power=20 dBm
...
Returns a list of network objects having WiFi extension.
"""
return [Obj(devtype='wifi',
path='/sys/class/net/%s/device' % node.split()[0])
for node in _ShellOutput('iwconfig').splitlines()
if extension in node]
@classmethod
def _GetIwDevices(cls, iw_type='managed'):
"""Wrapper around iw(8) information.
Command 'iw' explicitly said "Do NOT screenscrape this tool" but we have no
any better solutions. A typical output for 'iw dev' on mwifiex:
phy#0
Interface p2p0
ifindex 4
wdev 0x3
addr 28:c2:dd:45:94:39
type P2P-client
Interface uap0
ifindex 3
wdev 0x2
addr 28:c2:dd:45:94:39
type AP
Interface mlan0
ifindex 2
wdev 0x1
addr 28:c2:dd:45:94:39
type managed
p2p0 and uap0 are virtual nodes and what we really want is mlan0 (managed).
Returns:
A list of network objects with correct iw type.
"""
data = [line.split()[1] for line in _ShellOutput('iw dev').splitlines()
if ' ' in line and line.split()[0] in ['Interface', 'type']]
i = iter(data)
return [Obj(devtype='wifi', path='/sys/class/net/%s/device' % name)
for name in i if i.next() == iw_type]
@classmethod
def _GetFlimflamDevices(cls):
"""Wrapper around flimflam (shill), the ChromeOS connection manager.
This object is a wrapper around the data from the flimflam module, providing
dbus format post processing.
Returns:
A list of network objects in Obj, having:
devtype: A string in flimflam Type (wifi, cellular, ethernet).
path: A string for /sys node device path.
attributes: A dictionary for additional attributes.
"""
def _ProcessDevice(device):
properties = device.GetProperties()
get_prop = lambda p: flimflam.convert_dbus_value(properties[p])
result = Obj(
devtype=get_prop('Type'),
path='/sys/class/net/%s/device' % get_prop('Interface'))
if result.devtype == 'cellular':
result.attributes = dict(
(key, get_prop('Cellular.%s' % key))
for key in ['Carrier', 'FirmwareRevision', 'HardwareRevision',
'ModelID', 'Manufacturer']
if 'Cellular.%s' % key in properties)
return result
return [_ProcessDevice(device) for device in
flimflam.FlimFlam().GetObjectList('Device')]
@classmethod
def GetDevices(cls, devtype):
"""Returns network device information by given type.
Returned data is a list of Objs corresponding to detected devices.
Each has devtype (in same way as flimflam type classification) and path
(location of related data in sysfs) fields. For cellular devices, there is
also an attributes field which contains a dict of attribute:value items.
"""
if cls.cached_dev_list is None:
dev_list = cls._GetFlimflamDevices()
# On some Brillo (AP-type) devices, WiFi interfaces are blacklisted by
# shill and needs to be discovered manually, so we have to try 'iw config'
# or 'iw dev' to get a more correct list.
# 'iwconfig' is easier to parse, but for some WiFi drivers, for example
# mwifiex, do not support wireless extensions and only provide the new
# CFG80211/NL80211. Also mwifiex will create two more virtual nodes 'uap0,
# p2p0' so we can't rely on globbing /sys/class/net/*/wireless. The only
# solution is to trust 'iw dev'.
existing_nodes = [dev.path for dev in dev_list]
dev_list += [dev for dev in cls._GetIwconfigDevices()
if dev.path not in existing_nodes]
existing_nodes = [dev.path for dev in dev_list]
dev_list += [dev for dev in cls._GetIwDevices()
if dev.path not in existing_nodes]
cls.cached_dev_list = dev_list
return [dev for dev in cls.cached_dev_list if dev.devtype == devtype]
@classmethod
def ReadSysfsDeviceIds(cls, devtype, ignore_usb=False):
"""Return _ReadSysfsDeviceId result for each device of specified type."""
ids = [_ReadSysfsDeviceId(dev.path, ignore_usb)
for dev in cls.GetDevices(devtype)]
# Filter out 'None' results
return sorted(device for device in ids if device is not None)
class _InputDevices(object):
"""Parses /proc/bus/input/devices and turns into a key-value dataset."""
_PATH = '/proc/bus/input/devices'
_dataset = None
@classmethod
def _InitDataset(cls):
dataset = []
data = {}
entry = None
with open(cls._PATH) as f:
for line in f:
prefix = line[0]
content = line[3:].strip()
# Format: PREFIX: Key=Value
# I: Bus=HHHH Vendor=HHHH Product=HHHH Version=HHHH
# N: Name="XXXX"
# P: Phys=XXXX
# S: Sysfs=XXXX
if prefix == 'I':
if data:
dataset.append(Obj(**data))
data = {}
for entry in content.split():
key, value = entry.split('=', 1)
data[key] = value
elif prefix in ['N', 'S']:
key, value = content.split('=', 1)
data[key] = value.strip('"')
elif prefix == 'H':
for handler in line[3:].split('=', 1)[1].split():
if re.match(r'event\d+', handler):
data['Event'] = handler
break
# Flush output
if data:
dataset.append(Obj(**data))
cls._dataset = dataset
@classmethod
def FindByNamePattern(cls, regex):
"""Finds devices by given regular expression."""
if cls._dataset is None:
cls._InitDataset()
return [data for data in cls._dataset if re.match(regex, data.Name)]
@classmethod
def GetEvdevDevice(cls, data):
"""Return the corresponding evdev device."""
return evdev.InputDevice(os.path.join('/dev/input', data.Event))
@classmethod
def GetI2cId(cls, entry):
"""Return the i2c id of an entry returned from FindByNamePattern."""
path = os.path.realpath(os.path.join('/sys', entry.Sysfs.lstrip('/')))
result = re.search(r'/i2c-(\d+)(/.*)?$', path)
if result:
return int(result.group(1))
raise ValueError('Unable to find i2c id in "%s"' % path)
@classmethod
def IsStylusDevice(cls, dev):
"""Check if a device is a stylus device.
Same logic from cros.factory.test.utils.evdev_utils.IsStylusDevice.
To prevent import hierarchy and dependency problems, we want Gooftool to
have its own implementation.
Args:
dev: evdev.InputDevice
Returns:
True if dev is a stylus device.
"""
keycaps = dev.capabilities().get(evdev.ecodes.EV_KEY, [])
return bool(set(keycaps) & set([
evdev.ecodes.BTN_STYLUS,
evdev.ecodes.BTN_STYLUS2,
evdev.ecodes.BTN_TOOL_PEN]))
@classmethod
def IsTouchpadDevice(cls, dev):
"""Check if a device is a touchpad device.
Same logic from cros.factory.test.utils.evdev_utils.IsTouchpadDevice.
To prevent import hierarchy and dependency problems, we want Gooftool to
have its own implementation.
Args:
dev: evdev.InputDevice
Returns:
True if dev is a touchpad device.
"""
keycaps = dev.capabilities().get(evdev.ecodes.EV_KEY, [])
return (evdev.ecodes.BTN_TOUCH in keycaps and
evdev.ecodes.BTN_MOUSE in keycaps)
class _TouchInputData(object): # pylint: disable=no-init
"""Base class for collecting touchpad and touchscreen information."""
@classmethod
def GenericInput(
cls, name_pattern, sysfs_files=None, filter_rule=None, customizer=None):
"""A generic touch device resolver."""
data = _InputDevices.FindByNamePattern(name_pattern)
if filter_rule:
data = [entry for entry in data if filter_rule(entry)]
if not data:
return None
# TODO(hungte) Should we support multiple components in future?
if len(data) > 1:
logging.warning('TouchInputData: multiple components matched for %s: %s',
name_pattern, data)
entry = data[0]
result = {'ident_str': entry.Name}
# Ignore Linux dummy ID (0).
if int(entry.Vendor, 16):
result['vendor_id'] = entry.Vendor
result['product_id'] = entry.Product
if int(entry.Version, 16):
result['version'] = entry.Version
# Find out more information from sysfs.
for name in sysfs_files or []:
# entry.Sysfs starts with '/' and ends at input node, for example:
# /devices/pci0000:00/0000:00:02.0/i2c-2/2-004a/input/input7
path = os.path.join('/sys', entry.Sysfs.lstrip('/'), 'device', name)
if os.path.exists(path):
result[name] = _StripRead(path)
if customizer:
customizer(result, entry)
return Obj(**result)
@classmethod
def SynapticsInput(cls, name_pattern, sysfs_files=None):
data = cls.GenericInput(name_pattern, sysfs_files,
filter_rule=lambda e: e.Vendor == '06cb')
if not data:
return None
rmi4update_program = '/usr/sbin/rmi4update'
if not os.path.exists(rmi4update_program):
return data
devs = glob(os.path.join(
'/sys/bus/hid/devices/',
'*:%s:%s.*' % (data.vendor_id.upper(), data.product_id.upper()),
'hidraw/hidraw*'))
if not devs:
return data
hidraw_dev = '/dev/' + devs[0].split('/')[-1]
result = Shell(rmi4update_program + ' -p -d ' + hidraw_dev)
if not result.success:
return data
data.fw_version = result.stdout.strip()
return data
@classmethod
def HidOverI2c(cls, filter_rule):
return cls.GenericInput(r'hid-over-i2c.*', filter_rule=filter_rule)
cached_data = None
@classmethod
def GetGeneric(cls, vendor_fun_list):
if cls.cached_data is None:
cls.cached_data = Obj(ident_str=None)
for vendor_fun in vendor_fun_list:
data = vendor_fun()
if data is not None:
cls.cached_data = data
break
return cls.cached_data
class _TouchpadData(_TouchInputData):
"""Return Obj with hw_ident and fw_ident string fields."""
@classmethod
def Synaptics(cls):
def SynapticsSyndetect():
detect_program = '/opt/Synaptics/bin/syndetect'
if not os.path.exists(detect_program):
return None
lock_check = Shell('lsof /dev/serio_raw0 | grep -q "^X"')
if lock_check.success and not os.getenv('DISPLAY'):
logging.error('Synaptics touchpad detection with X in the '
'foreground requires DISPLAY and XAUTHORITY '
'to be set properly.')
return None
result = Shell(detect_program)
if not result.success:
return None
properties = dict(map(str.strip, line.split('=', 1))
for line in result.stdout.splitlines() if '=' in line)
model = properties.get('Model String', 'Unknown Synaptics')
# Delete the " on xxx Port" substring, as we do not care about the port.
model = re.sub(' on [^ ]* [Pp]ort$', '', model)
firmware = properties.get('Firmware ID', None)
return Obj(ident_str=model, fw_version=firmware)
def SynapticsByName():
return cls.SynapticsInput(r'^SYNA.*', ['fw_version'])
return SynapticsSyndetect() or SynapticsByName()
@classmethod
def Cypress(cls):
for node in glob('/sys/class/input/mouse[0-9]*/device/device'):
model_path_list = [os.path.join(node, field) for field in
['product_id', 'hardware_version', 'protocol_version']]
firmware_path = os.path.join(node, 'firmware_version')
if not all(os.path.exists(path) for path in
model_path_list + [firmware_path]):
continue
return Obj(
ident_str=CompactStr([_StripRead(path) for path in model_path_list]),
fw_version=CompactStr(_StripRead(firmware_path)))
return None
@classmethod
def Elan(cls):
for driver_link in glob('/sys/bus/i2c/drivers/elan_i2c/*'):
if not os.path.islink(driver_link):
continue
return Obj(
ident_str=_StripRead(os.path.join(driver_link, 'name')),
product_id=_StripRead(os.path.join(driver_link, 'product_id')),
fw_version=_StripRead(os.path.join(driver_link, 'firmware_version')),
fw_csum=_StripRead(os.path.join(driver_link, 'fw_checksum')))
return None
@classmethod
def I2c(cls):
def is_touchpad(data):
return _InputDevices.IsTouchpadDevice(_InputDevices.GetEvdevDevice(data))
return cls.HidOverI2c(is_touchpad)
@classmethod
def Generic(cls):
return cls.GenericInput(r'.*[Tt](?:ouch|rack) *[Pp]ad',
['fw_version', 'hw_version', 'config_csum'])
@classmethod
def Get(cls):
return cls.GetGeneric([
cls.Cypress, cls.Synaptics, cls.Elan, cls.I2c, cls.Generic])
class _TouchscreenData(_TouchInputData): # pylint: disable=no-init
"""Return Obj with hw_ident and fw_ident string fields."""
@classmethod
def Elan(cls):
for device_path in glob('/sys/bus/i2c/devices/*'):
driver_link = os.path.join(device_path, 'driver')
if os.path.islink(driver_link):
driver_name = os.path.basename(os.readlink(driver_link))
if driver_name == 'elants_i2c':
return Obj(
ident_str=_StripRead(os.path.join(device_path, 'name')),
hw_version=_StripRead(os.path.join(device_path, 'hw_version')),
fw_version=_StripRead(os.path.join(device_path, 'fw_version')))
return None
@classmethod
def Synaptics(cls):
return cls.SynapticsInput(r'SYTS.*', ['fw_version'])
@classmethod
def Wacom(cls):
def put_version(result, entry):
result['version'] = process_utils.SpawnOutput(
'wacom_flash /dev/null -a i2c-%d' % _InputDevices.GetI2cId(entry),
shell=True).strip()
# Using "WCOM" device name to detect if WACOM touchscreen exists or not.
return cls.GenericInput(r'WCOM.*', customizer=put_version)
@classmethod
def Weida(cls):
# Using "WDHT" device name to detect if Weida touchscreen exists or not.
data = cls.GenericInput(r'WDHT.*')
if data:
data.version = process_utils.SpawnOutput('wdt_util -v -c',
shell=True).strip()
return data
@classmethod
def Generic(cls):
return cls.GenericInput(r'.*[Tt]ouch *[Ss]creen',
['fw_version', 'hw_version', 'config_csum'])
@classmethod
def Get(cls):
return cls.GetGeneric([cls.Elan, cls.Synaptics, cls.Wacom, cls.Weida,
cls.Generic])
class _StylusData(_TouchInputData):
"""Return Obj with hw_ident and fw_ident string fields."""
@classmethod
def Generic(cls):
def is_stylus(data):
return _InputDevices.IsStylusDevice(_InputDevices.GetEvdevDevice(data))
return cls.GenericInput(r'.*', filter_rule=is_stylus)
@classmethod
def Get(cls):
return cls.GetGeneric([cls.Generic])
def _ProbeFun(probe_map, probe_class, *arch_targets):
"""Decorator that populates probe_map.
There can only be one probe function for each arch for each
probe_class. If no arch_targets are specified, the probe is assumed
to be general and apply for those arches whithout arch specific
probes.
Args:
probe_map: Map to update.
probe_class: Probe class for which the probe fun produces results.
arch_targets: List of arches for which the probe is relevant.
"""
def Decorate(f):
arch_list = arch_targets if arch_targets else [None]
for arch in arch_list:
arch_probe_map = probe_map.setdefault(arch, {})
assert probe_class not in arch_probe_map, (
'Multiple component probe functions for %r %r',
arch if arch else 'generic', probe_class)
arch_probe_map[probe_class] = f
return f
return Decorate
def _ComponentProbe(probe_class, *arch_targets):
PROBEABLE_COMPONENT_CLASSES.add(probe_class)
return _ProbeFun(_COMPONENT_PROBE_MAP, probe_class, *arch_targets)
def _InitialConfigProbe(probe_class, *arch_targets):
return _ProbeFun(_INITIAL_CONFIG_PROBE_MAP, probe_class, *arch_targets)
@_ComponentProbe('audio_codec')
def _ProbeAudioCodec():
"""Looks for codec strings.
Collect /sys/kernel/debug/asoc/codecs for ASOC (ALSA
SOC) drivers, /proc/asound for HDA codecs, then PCM details.
There is a set of known invalid codec names that are not included in the
return value.
"""
KNOWN_INVALID_CODEC_NAMES = set([
'snd-soc-dummy',
'ts3a227e.4-003b', # autonomous audiojack switch, not an audio codec
'dw-hdmi-audio' # this is a virtual audio codec driver
])
asoc_path = '/sys/kernel/debug/asoc/codecs'
if os.path.exists(asoc_path):
with open(asoc_path) as f:
results = [{'name': codec} for codec in f.read().splitlines()
if codec not in KNOWN_INVALID_CODEC_NAMES]
else:
results = []
grep_result = _ShellOutput('grep -R "Codec:" /proc/asound/*')
match_set = set()
for line in grep_result.splitlines():
match_set |= set(re.findall(r'.*Codec:(.*)', line))
results += [{'name': match} for match in sorted(match_set) if match]
if results:
return results
# Formatted '00-00: WM??? PCM wm???-hifi-0: ...'
pcm_data = _StripRead('/proc/asound/pcm').split(' ')
if len(pcm_data) > 2:
return [{'name': pcm_data[1]}]
return []
@_ComponentProbe('battery')
def _ProbeBattery():
"""Compose data from sysfs."""
node_path_list = glob('/sys/class/power_supply/*')
type_data_list = [_ReadSysfsFields(node_path, ['type'])['type']
for node_path in node_path_list]
battery_field_list = ['manufacturer', 'model_name', 'technology']
# probe energy_full_design or charge_full_design, battery can have either
battery_full_field_candidate = ['charge_full_design',
'energy_full_design']
battery_full_field_candidate_found = False
for candidate in battery_full_field_candidate:
if any(os.path.exists(os.path.join(path, candidate))
for path in node_path_list):
battery_field_list.append(candidate)
battery_full_field_candidate_found = True
break
if not battery_full_field_candidate_found:
return []
battery_data_list = [_ReadSysfsFields(node_path, battery_field_list)
for node_path, type_data
in zip(node_path_list, type_data_list)
if type_data == 'Battery']
return sorted(x for x in battery_data_list if x)
@_ComponentProbe('bluetooth')
def _ProbeBluetooth():
# Probe in primary path
device_id = _ReadSysfsDeviceId('/sys/class/bluetooth/hci0/device')
if device_id:
return [device_id]
# Use information in driver if probe failed in primary path
device_id_list = _RecursiveProbe('/sys/module/bluetooth/holders',
_ReadSysfsDeviceId)
return sorted(x for x in device_id_list if x)
def _GetV4L2Data(video_idx):
# Get information from video4linux2 (v4l2) interface.
# See /usr/include/linux/videodev2.h for definition of these consts.
# 'ident' values are defined in include/media/v4l2-chip-ident.h
info = {}
VIDIOC_DBG_G_CHIP_IDENT = 0xc02c5651
V4L2_DBG_CHIP_IDENT_SIZE = 11
V4L2_INDEX_REVISION = V4L2_DBG_CHIP_IDENT_SIZE - 1
V4L2_INDEX_IDENT = V4L2_INDEX_REVISION - 1
V4L2_VALID_IDENT = 3 # V4L2_IDENT_UNKNOWN + 1
# Get v4l2 capability
V4L2_CAPABILITY_FORMAT = '<16B32B32BII4I'
V4L2_CAPABILITY_STRUCT_SIZE = struct.calcsize(V4L2_CAPABILITY_FORMAT)
V4L2_CAPABILITIES_OFFSET = struct.calcsize(V4L2_CAPABILITY_FORMAT[0:-3])
# struct v4l2_capability
# {
# __u8 driver[16];
# __u8 card[32];
# __u8 bus_info[32];
# __u32 version;
# __u32 capabilities; /* V4L2_CAPABILITIES_OFFSET */
# __u32 reserved[4];
# };
IOCTL_VIDIOC_QUERYCAP = 0x80685600
# Webcam should have CAPTURE capability but no OUTPUT capability.
V4L2_CAP_VIDEO_CAPTURE = 0x00000001
V4L2_CAP_VIDEO_OUTPUT = 0x00000002
# V4L2 encode/decode device should have the following capabilities.
V4L2_CAP_VIDEO_CAPTURE_MPLANE = 0x00001000
V4L2_CAP_VIDEO_OUTPUT_MPLANE = 0x00002000
V4L2_CAP_STREAMING = 0x04000000
V4L2_CAP_VIDEO_CODEC = (V4L2_CAP_VIDEO_CAPTURE_MPLANE |
V4L2_CAP_VIDEO_OUTPUT_MPLANE |
V4L2_CAP_STREAMING)
def _TryIoctl(fileno, request, *args):
"""Try to invoke ioctl without raising an exception if it fails."""
try:
ioctl(fileno, request, *args)
except: # pylint: disable=bare-except
pass
try:
with open('/dev/video%d' % video_idx, 'r+') as f:
# Read chip identifier.
buf = array('i', [0] * V4L2_DBG_CHIP_IDENT_SIZE)
_TryIoctl(f.fileno(), VIDIOC_DBG_G_CHIP_IDENT, buf, 1)
v4l2_ident = buf[V4L2_INDEX_IDENT]
if v4l2_ident >= V4L2_VALID_IDENT:
info['ident'] = 'V4L2:%04x %04x' % (v4l2_ident,
buf[V4L2_INDEX_REVISION])
# Read V4L2 capabilities.
buf = array('B', [0] * V4L2_CAPABILITY_STRUCT_SIZE)
_TryIoctl(f.fileno(), IOCTL_VIDIOC_QUERYCAP, buf, 1)
capabilities = struct.unpack_from('<I', buf, V4L2_CAPABILITIES_OFFSET)[0]
if ((capabilities & V4L2_CAP_VIDEO_CAPTURE) and
(not capabilities & V4L2_CAP_VIDEO_OUTPUT)):
info['type'] = 'webcam'
elif capabilities & V4L2_CAP_VIDEO_CODEC == V4L2_CAP_VIDEO_CODEC:
info['type'] = 'video_codec'
except: # pylint: disable=bare-except
pass
return info
@_ComponentProbe('video')
def _ProbeVideo():
# TODO(tammo/sheckylin): Try to replace the code below with OpenCV calls.
result = []
for video_node in glob('/sys/class/video4linux/video*'):
video_idx = re.search(r'video(\d+)$', video_node).group(1)
info = {}
video_data = _ReadSysfsNodeId(video_node)
if video_data:
info.update(video_data)
# Also check video max packet size
video_max_packet_size = _ReadSysfsFields(
os.path.join(video_node, 'device', 'ep_82'),
['wMaxPacketSize'])
if video_max_packet_size:
info.update({'wMaxPacketSize': video_max_packet_size['wMaxPacketSize']})
# For SOC videos
video_data_soc = _ReadSysfsFields(video_node, ['device/control/name'])
if video_data_soc:
info.update(video_data_soc)
# Get video4linux2 (v4l2) info.
v4l2_data = _GetV4L2Data(int(video_idx))
if v4l2_data:
info.update(v4l2_data)
result.append(info)
return result
@_ComponentProbe('cellular')
def _ProbeCellular():
# It is found that some cellular components may have their interface listed in
# shill but not available from /sys (for example, shill Interface=no_netdev_23
# but no /sys/class/net/no_netdev_23. Meanwhile, 'modem status' gives right
# Device info like 'Device: /sys/devices/ff500000.usb/usb1/1-1'.
# Unfortunately, information collected by shill, 'modem status', or the USB
# node under Device are not always synced.
data = (_NetworkDevices.ReadSysfsDeviceIds('cellular') or
[dev.attributes for dev in _NetworkDevices.GetDevices('cellular')])
if data:
modem_status = _ShellOutput('modem status')
for key in ['carrier', 'firmware_revision', 'Revision']:
matches = re.findall(
r'^\s*' + key + ': (.*)$', modem_status, re.M)
if matches:
data[0][key] = matches[0]
# For some chipsets we can use custom utilities for more data
if _GobiDevices.IsDeviceGobi():
full_fw_string = []
for fw in _GobiDevices.ReadFirmwareList():
fw_string = '%s %s %s' % (fw.attrs, fw.build_id, fw.carrier)
full_fw_string.append(fw_string)
data[0]['firmwares'] = ', '.join(full_fw_string)
data[0]['active_firmware'] = str(_GobiDevices.ActiveFirmware())
return data
@_ComponentProbe('chassis')
def _ProbeChassis():
"""Returns chassis identifier."""
chassis_id = _ShellOutput('VPD_IGNORE_CACHE=1 mosys platform chassis')
return [{'id': chassis_id}] if chassis_id else []
@_ComponentProbe('cpu', 'x86')
def _ProbeCpuX86():
"""Reformat /proc/cpuinfo data."""
# For platforms like x86, it provides names for each core.
# Sample output for dual-core:
# model name : Intel(R) Atom(TM) CPU ???
# model name : Intel(R) Atom(TM) CPU ???
cmd = r'sed -nr "s/^model name\s*: (.*)/\1/p" /proc/cpuinfo'
stdout = _ShellOutput(cmd).splitlines()
return [{'model': stdout[0], 'cores': str(len(stdout))}]
@_ComponentProbe('cpu', 'arm')
def _ProbeCpuArm():
"""Reformat /proc/cpuinfo data."""
# For platforms like arm, it sometimes gives the model name in 'Processor',
# and sometimes in 'model name'. But they all give something like 'ARMv7
# Processor rev 4 (v71)' only. So to uniquely identify an ARM CPU, we should
# use the 'Hardware' field.
with open('/proc/cpuinfo') as f:
cpuinfo = f.read()
try:
model = re.search(r'^(?:Processor|model name)\s*: (.*)$',
cpuinfo, re.MULTILINE).group(1)
except AttributeError:
model = 'unknown'
logging.error("Unable to find 'Processor' or 'model name' field in "
"/proc/cpuinfo, can't determine CPU model.")
try:
hardware = re.search(r'^Hardware\s*: (.*)$',
cpuinfo, re.MULTILINE).group(1)
except AttributeError:
hardware = 'unknown'
logging.error("Unable to find 'Hardware' field in /proc/cpuinfo, "
"can't determine CPU hardware.")
cores = _ShellOutput('nproc')
return [{'model': model, 'cores': cores, 'hardware': hardware}]
@_ComponentProbe('display_panel')
def _ProbeDisplayPanel():
"""Combine all available edid data, from sysfs and directly from the i2c."""
edid_list = []
glob_list = [
'/sys/class/drm/*LVDS*/edid',
'/sys/kernel/debug/edid*',
]
path_list = []
for path in glob_list:
path_list += glob(path)
for path in path_list:
with open(path) as f:
parsed_edid = edid.Parse(f.read())
if parsed_edid:
edid_list.append(parsed_edid)
sys_utils.LoadKernelModule('i2c_dev', error_on_fail=False)
for path in sorted(glob('/dev/i2c-[0-9]*')):
parsed_edid = edid.LoadFromI2c(path)
if parsed_edid:
edid_list.append(parsed_edid)
return edid_list
@_ComponentProbe('dram')
def _ProbeDram():
"""Combine mosys memory timing and geometry information."""
# TODO(tammo): Document why mosys cannot load i2c_dev itself.
sys_utils.LoadKernelModule('i2c_dev', error_on_fail=False)
part_data = _ShellOutput('mosys -k memory spd print id')
timing_data = _ShellOutput('mosys -k memory spd print timings')
size_data = _ShellOutput('mosys -k memory spd print geometry')
parts = dict(re.findall('dimm="([^"]*)".*part_number="([^"]*)"', part_data))
timings = dict(re.findall('dimm="([^"]*)".*speeds="([^"]*)"', timing_data))
sizes = dict(re.findall('dimm="([^"]*)".*size_mb="([^"]*)"', size_data))
results = []
for slot in sorted(parts):
part = parts[slot]
size = sizes[slot]
timing = timings[slot].replace(' ', '')
results.append({
'slot': slot,
'part': part,
'size': size,
'timing': timing})
return results
@_ComponentProbe('ec_flash_chip')
def _ProbeEcFlashChip():
ret = []
ec_chip_id = crosfw.LoadEcFirmware().GetChipId()
if ec_chip_id is not None:
ret.append({'name': ec_chip_id})
pd_chip_id = crosfw.LoadPDFirmware().GetChipId()
if pd_chip_id:
ret.append({'name': pd_chip_id})
return ret
@_ComponentProbe('embedded_controller')
def _ProbeEmbeddedController():
"""Reformat mosys output."""
# Example mosys command output:
# vendor="VENDOR" name="CHIPNAME" fw_version="ECFWVER"
ret = []
info_keys = ('vendor', 'name')
for name in ('ec', 'pd'):
ec_info = dict(
(key, _ShellOutput(['mosys', name, 'info', '-s', key], on_error=None))
for key in info_keys)
if None in ec_info.values():
continue
ret.append(ec_info)
return ret
@_ComponentProbe('ethernet')
def _ProbeEthernet():
# Build-in ethernet devices should not be attached to USB. They are usually
# either PCI or SOC.
return _NetworkDevices.ReadSysfsDeviceIds('ethernet', ignore_usb=True)
@_ComponentProbe('flash_chip')
def _ProbeMainFlashChip():
chip_id = crosfw.LoadMainFirmware().GetChipId()
return [{'name': chip_id}] if chip_id else []
def _GetFixedDevices():
"""Returns paths to all fixed storage devices on the system."""
ret = []
for node in sorted(glob('/sys/class/block/*')):
path = os.path.join(node, 'removable')
if not os.path.exists(path) or _StripRead(path) != '0':
continue
if re.match(r'^loop|^dm-', os.path.basename(node)):
# Loopback or dm-verity device; skip
continue
ret.append(node)
return ret
def _GetEMMC5FirmwareVersion(node_path):
"""Extracts eMMC 5.0 firmware version from EXT_CSD[254:261].
Args:
node_path: the node_path returned by _GetFixedDevices(). For example,
'/sys/class/block/mmcblk0'.
Returns:
A string indicating the firmware version if firmware version is found.
Return None if firmware version doesn't present.
"""
ext_csd = process_utils.GetLines(Shell(
'mmc extcsd read /dev/%s' % os.path.basename(node_path)).stdout)
# The output for firmware version is encoded by hexdump of a ASCII
# string or hexdump of hexadecimal values, always in 8 characters.
# For example, version 'ABCDEFGH' is:
# [FIRMWARE_VERSION[261]]: 0x48
# [FIRMWARE_VERSION[260]]: 0x47
# [FIRMWARE_VERSION[259]]: 0x46
# [FIRMWARE_VERSION[258]]: 0x45
# [FIRMWARE_VERSION[257]]: 0x44
# [FIRMWARE_VERSION[256]]: 0x43
# [FIRMWARE_VERSION[255]]: 0x42
# [FIRMWARE_VERSION[254]]: 0x41
#
# Some vendors might use hexadecimal values for it.
# For example, version 3 is:
# [FIRMWARE_VERSION[261]]: 0x00
# [FIRMWARE_VERSION[260]]: 0x00
# [FIRMWARE_VERSION[259]]: 0x00
# [FIRMWARE_VERSION[258]]: 0x00
# [FIRMWARE_VERSION[257]]: 0x00
# [FIRMWARE_VERSION[256]]: 0x00
# [FIRMWARE_VERSION[255]]: 0x00
# [FIRMWARE_VERSION[254]]: 0x03
#
# To handle both cases, this function returns a 64-bit hexadecimal value
# and will try to decode it as a ASCII string or as a 64-bit little-endian
# integer. It returns '4142434445464748 (ABCDEFGH)' for the first example
# and returns '0300000000000000 (3)' for the second example.
pattern = re.compile(r'^\[FIRMWARE_VERSION\[(\d+)\]\]: (.*)$')
data = dict(m.groups() for m in map(pattern.match, ext_csd) if m)
if not data:
return None
raw_version = [int(data[str(i)], 0) for i in range(254, 262)]
version = ''.join(('%02x' % c for c in raw_version))
# Try to decode it as a ASCII string.
# Note vendor may choose SPACE (0x20) or NUL (0x00) to pad version string,
# so we want to strip both in the human readable part.
ascii = ''.join(map(chr, raw_version)).strip(' \0')
if len(ascii) > 0 and all(c in string.printable for c in ascii):
version += ' (%s)' % ascii
else:
# Try to decode it as a 64-bit little-endian integer.
version += ' (%s)' % struct.unpack_from('<q', version.decode('hex'))
return version
@_ComponentProbe('region')
def _ProbeRegion():
"""Probes the region of the DUT based on the region field in RO VPD."""
region_code = ReadRoVpd().get('region', None)
if region_code:
region_obj = regions.REGIONS[region_code]
ret = [{'region_code': region_obj.region_code,}]
else:
ret = []
return ret
@_ComponentProbe('storage')
def _ProbeStorage():
"""Compile sysfs data for all non-removable block storage devices."""
def ProcessNode(node_path):
dev_path = os.path.join(node_path, 'device')
size_path = os.path.join(os.path.dirname(dev_path), 'size')
sectors = (_StripRead(size_path) if os.path.exists(size_path) else '')
ata_fields = ['vendor', 'model']
emmc_fields = ['type', 'name', 'hwrev', 'oemid', 'manfid']
# Another field 'cid' is a combination of all other fields so we should not
# include it again.
optional_fields = ['prv']
data = (_ReadSysfsFields(dev_path, ata_fields) or
_ReadSysfsFields(dev_path, emmc_fields, optional_fields) or
None)
if not data:
return None
emmc5_fw_ver = _GetEMMC5FirmwareVersion(node_path)
if emmc5_fw_ver is not None:
data['emmc5_fw_ver'] = emmc5_fw_ver
data['sectors'] = sectors
return data
return [ident for ident in map(ProcessNode, _GetFixedDevices())
if ident is not None]
def _ProbeGenericTouch(cls, key_list):
data = cls.Get()
if data.ident_str is None:
return []
results = {'id': data.ident_str}
for key in key_list:
value = getattr(data, key, '')
if value:
results[key] = value
return [results]
@_ComponentProbe('stylus')
def _ProbeStylus():
return _ProbeGenericTouch(_StylusData, [
'product_id', 'vendor_id', 'version'])
@_ComponentProbe('touchpad')
def _ProbeTouchpad():
return _ProbeGenericTouch(_TouchpadData, [
'product_id', 'vendor_id', 'version', 'fw_version', 'hw_version',
'fw_csum', 'config_csum'])
@_ComponentProbe('touchscreen')
def _ProbeTouchscreen():
return _ProbeGenericTouch(_TouchscreenData, [
'product_id', 'vendor_id', 'version', 'fw_version', 'hw_version',
'fw_csum', 'config_csum'])
@_ComponentProbe('tpm')
def _ProbeTpm():
"""Return Manufacturer_info : Chip_Version string from tpm_version output."""
tpm_data = [line.partition(':') for line in
_ShellOutput('tpm_version').splitlines()]
tpm_dict = dict((key.strip(), value.strip()) for
key, _, value in tpm_data)
mfg = tpm_dict.get('Manufacturer Info', None)
version = tpm_dict.get('Chip Version', None)
if mfg is not None and version is not None:
return [{'manufacturer_info': mfg, 'version': version}]
return []
@_ComponentProbe('usb_hosts')
def _ProbeUsbHosts():
"""Compile USB data from sysfs."""
# On x86, USB hosts are PCI devices, located in parent of root USB.
# On ARM and others, use the root device itself.
# TODO(tammo): Think of a better way to do this, without arch.
arch = _ShellOutput('crossystem arch')
relpath = '.' if arch == 'arm' else '..'
usb_bus_list = glob('/sys/bus/usb/devices/usb*')
usb_host_list = [os.path.join(os.path.realpath(path), relpath)
for path in usb_bus_list]
# Usually there are several USB hosts, so only list the primary information.
device_id_list = [_ReadSysfsDeviceId(usb_host) for usb_host in usb_host_list]
return [x for x in device_id_list if x is not None]
@_ComponentProbe('wireless')
def _ProbeWireless():
return _NetworkDevices.ReadSysfsDeviceIds('wifi')
@_ComponentProbe('pmic')
def _ProbePmic():
pmics = glob('/sys/bus/platform/devices/*-pmic')
return ([{'name': os.path.basename(x)} for x in pmics]
if pmics else [])
@_ComponentProbe('mainboard')
def _ProbeMainBoard():
result = Shell('mosys platform version')
version = result.stdout.strip() if result.success else None
if version is None:
return []
else:
return [{'version': version}]
@_InitialConfigProbe('cellular_fw_version')
def _ProbeCellularFirmwareVersion():
"""Return firmware detail strings for all cellular devices."""
def GetVersionString(dev_attrs):
"""Use flimflam or modem status data to generate a version string.
The fields present in the flimflam data may differ for
partners/components.
"""
# TODO(tammo): Document breakdown of known combinations for each
# supported part, correspondingly document when the 'modem status'
# fallback is necessary.
version_formats = [
['Carrier', 'FirmwareRevision'],
['FirmwareRevision'],
['HardwareRevision']]
for version_format in version_formats:
if not set(version_format).issubset(set(dev_attrs)):
continue
# Compact all fields into single line with limited space.
return CompactStr([dev_attrs[key] for key in version_format])
# If nothing available, try 'modem status'.
cmd = 'modem status | grep firmware_revision'
modem_status = _ShellOutput(cmd)
info = re.findall(r'^\s*firmware_revision:\s*(.*)', modem_status)
if info and info[0]:
return info[0]
return None
results = [GetVersionString(dev.attributes) for dev in
_NetworkDevices.GetDevices('cellular')]
results = [x for x in results if x is not None]
return ' ; '.join(results)
@_InitialConfigProbe('rw_fw_key_version')
def _ProbeRwFirmwareVersion():
"""Returns RW (writable) firmware version from VBLOCK sections."""
def GetVersion(section_name):
data = image.get_section(section_name)
block = vblock.unpack_verification_block(data)
return block['VbFirmwarePreambleHeader']['firmware_version']
main_fw_file = crosfw.LoadMainFirmware().GetFileName()
image = crosfw.FirmwareImage(open(main_fw_file, 'rb').read())
versions = map(GetVersion, ['VBLOCK_A', 'VBLOCK_B'])
if versions[0] != versions[1]:
return 'A=%d, B=%d' % versions
return '%d' % versions[0]
@_InitialConfigProbe('touchpad_fw_version')
def _ProbeTouchpadFirmwareVersion():
touchdata = _TouchpadData.Get()
return touchdata.__dict__.get('fw_version')
@_InitialConfigProbe('storage_fw_version')
def _ProbeStorageFirmwareVersion():
"""Returns firmware rev for all fixed devices."""
ret = []
for f in _GetFixedDevices():
smartctl = Shell('smartctl --all %s' %
os.path.join('/dev', os.path.basename(f))).stdout
matches = re.findall(r'(?m)^Firmware Version:\s+(.+)$', smartctl)
if matches:
if re.search(r'(?m)^Device Model:\s+SanDisk', smartctl):
# Canonicalize SanDisk firmware versions by replacing 'CS' with '11'.
matches = [re.sub('^CS', '11', x) for x in matches]
ret.extend(matches)
else:
# Use fwrev file (e.g., for eMMC where smartctl is unsupported)
fw_rev = _ReadSysfsFields(os.path.join(f, 'device'), ['fwrev'])
if fw_rev:
ret.extend(fw_rev.values())
return CompactStr(ret)
def _AddFirmwareIdTag(image, id_name='RO_FRID'):
"""Returns firmware ID in '#NAME' format if available."""
if not image.has_section(id_name):
return ''
id_stripped = image.get_section(id_name).strip(chr(0))
if id_stripped:
return '#%s' % id_stripped
return ''
def _MainRoHash(image):
"""Algorithm: sha256(fmap, RO_SECTION[-GBB])."""
hash_src = image.get_fmap_blob()
gbb = image.get_section('GBB')
zero_gbb = chr(0) * len(gbb)
image.put_section('GBB', zero_gbb)
hash_src += image.get_section('RO_SECTION')
image.put_section('GBB', gbb)
# pylint: disable=no-member
return {
'hash': hashlib.sha256(hash_src).hexdigest(),
'version': _AddFirmwareIdTag(image).lstrip('#')}
def _EcRoHash(image):
"""Algorithm: sha256(fmap, EC_RO)."""
hash_src = image.get_fmap_blob()
hash_src += image.get_section('EC_RO')
# pylint: disable=no-member
return {
'hash': hashlib.sha256(hash_src).hexdigest(),
'version': _AddFirmwareIdTag(image).lstrip('#')}
def _FwKeyHash(main_fw_file, key_name):
"""Hash specified GBB key, extracted by vbutil_key."""
known_hashes = {
'b11d74edd286c144e1135b49e7f0bc20cf041f10': 'devkeys/rootkey',
'c14bd720b70d97394257e3e826bd8f43de48d4ed': 'devkeys/recovery',
}
with NamedTemporaryFile(prefix='gbb_%s_' % key_name) as f:
if not Shell('gbb_utility -g --%s=%s %s' %
(key_name, f.name, main_fw_file)).success:
raise Error('cannot get %s from GBB' % key_name)
key_info = _ShellOutput('vbutil_key --unpack %s' % f.name)
sha1sum = re.findall(r'Key sha1sum:[\s]+([\w]+)', key_info)
if len(sha1sum) != 1:
logging.error('Failed calling vbutil_key for firmware key hash.')
return None
sha1 = sha1sum[0]
if sha1 in known_hashes:
sha1 += '#' + known_hashes[sha1]
return {'hash': sha1}
def CalculateFirmwareHashes(fw_file_path):
"""Calculate the volatile hashes corresponding to a firmware blob.
Given a firmware blob, determine what kind of firmware it is based
on what sections are present. Then generate a dict containing the
corresponding hash values.
"""
raw_image = open(fw_file_path, 'rb').read()
try:
image = crosfw.FirmwareImage(raw_image)
except: # pylint: disable=bare-except
return None
hashes = {}
if image.has_section('EC_RO'):
hashes['ro_ec_firmware'] = _EcRoHash(image)
elif image.has_section('GBB') and image.has_section('RO_SECTION'):
hashes['ro_main_firmware'] = _MainRoHash(image)
# Originally at HWID database we put all the information in firmware_field.
# Now we decide to split key_root and key_recovery out. For compatibility,
# we keep two copy in probed result. Should remove the old one after all of
# the project change to new HWID style.
hashes['key_recovery'] = _FwKeyHash(fw_file_path, 'recoverykey')
hashes['key_root'] = _FwKeyHash(fw_file_path, 'rootkey')
hashes['firmware_keys'] = {
'key_recovery': hashes['key_recovery']['hash'],
'key_root': hashes['key_root']['hash']}
return hashes
def ReadVpd(kind, fw_image_file=None):
"""Reads data from VPD.
Args:
kind: VPD section name to read.
fw_image_file: A string for path to existing firmware image file. None to
use the crosfw.LoadMainFirmware().
Returns:
A dictionary for the key-value pairs stored in VPD.
"""
# Do not log command output since this will include private data such as
# registration codes.
if fw_image_file is None:
fw_image_file = crosfw.LoadMainFirmware().GetFileName()
raw_data = Shell('vpd -l -i %s -f %s' %
(kind, fw_image_file), log=False).stdout
return ParseKeyValueData('"(.*)"="(.*)"$', raw_data)
def ReadRoVpd(fw_image_file=None):
"""Reads VPD data from RO section."""
return ReadVpd('RO_VPD', fw_image_file)
def ReadRwVpd(fw_image_file=None):
"""Reads VPD data from RW section."""
return ReadVpd('RW_VPD', fw_image_file)
def DeleteVpd(kind, keys):
"""Deletes VPD data by specified keys.
Args:
kind: The VPD section to select.
keys: A list of VPD key names to delete.
Returns:
True if updated successfully, otherwise False.
"""
command = 'vpd -i %s %s' % (
kind, ' '.join('-d %s' % k for k in keys))
return Shell(command).success
def UpdateVpd(kind, values):
"""Updates VPD data by given values.
Args:
kind: The VPD section to select.
values: A dictionary containing VPD values to set.
Returns:
True if updated successfully, otherwise False.
"""
command = 'vpd -i %s %s' % (
kind, ' '.join(('-s "%s"="%s"' % (k, v) for k, v in values.iteritems())))
return Shell(command).success
def DeleteRoVpd(keys):
"""Deletes VPD data in read-only partition before write-protected."""
return DeleteVpd('RO_VPD', keys)
def DeleteRwVpd(keys):
"""Deletes VPD data in read-write partition."""
return DeleteVpd('RW_VPD', keys)
def UpdateRoVpd(values):
"""Changes VPD data in read-only partition before write-protected."""
return UpdateVpd('RO_VPD', values)
def UpdateRwVpd(values):
"""Changes VPD data in read-write partition."""
return UpdateVpd('RW_VPD', values)
def RemoveAutoSuffix(probe_value_map):
"""Remove the '.*.auto' suffix in the probe result.
When the platform add the device, it would add the .*.auto suffix to the
device name. It causes the probe module gets the different value form the same
component. Therefore we remove the auto-generated suffix.
Please see https://bugs.chromium.org/p/chromium/issues/detail?id=659921
"""
def _RemoveSuffix(probe_value):
pattern = re.compile(r'\.[0-9]+\.auto$')
return {key: pattern.sub('', value) for key, value in probe_value.items()}
ret = {}
for comp_cls, probe_value in probe_value_map.items():
if isinstance(probe_value, list):
ret[comp_cls] = [_RemoveSuffix(item) for item in probe_value]
else:
ret[comp_cls] = _RemoveSuffix(probe_value)
return ret
def Probe(target_comp_classes=None,
fast_fw_probe=False,
probe_volatile=True,
probe_initial_config=True,
probe_vpd=False):
"""Return device component, hash, and initial_config data.
Run all of the available probing routines that make sense for the
target architecture, for example if the machine being probed is x86
then somewhat different probes would be run than for an ARM machine.
All probe results are returned directly, without analysis. Matching
these results against the component database or against HWID data
can be done afterwards.
Args:
target_comp_classes: Which component classes to probe for. A None
value implies all classes.
fast_fw_probe: Do a fast probe for EC and main firmware version. Setting
this to True implies probe_volatile, probe_initial_config, probe_vpd,
and all probing related to VPD (for example, region) are False.
probe_volatile: On False, do not probe for volatile data and
return None for the corresponding field.
probe_initial_config: On False, do not probe for initial_config
data and return None for the corresponding field.
probe_vpd: On True, include vpd data in the volatiles (handy for use with
'gooftool verify_hwid --probe_results=...').
Returns:
Obj with components, volatile, and initial_config fields, each
containing the corresponding dict of probe results.
"""
if os.path.exists(FAKE_PROBE_RESULTS_FILE):
# Overriding with results from a file (for testing).
with open(FAKE_PROBE_RESULTS_FILE) as f:
logging.warning('Using fake probe results in %s',
FAKE_PROBE_RESULTS_FILE)
return ProbeResults.Decode(f)
def RunProbe(probe_fun):
try:
return probe_fun()
except Exception: # pylint: disable=broad-except
logging.exception('Probe %r FAILED (see traceback), returning None.',
probe_fun.__name__)
return None
def FilterProbes(ref_probe_map, arch, probe_class_white_list):
generic_probes = ref_probe_map.get(None, {})
arch_probes = ref_probe_map.get(arch, {})
if probe_class_white_list is None:
probe_class_white_list = set(generic_probes) | set(arch_probes)
return dict((probe_class, (arch_probes[probe_class]
if probe_class in arch_probes
else generic_probes[probe_class]))
for probe_class in sorted(probe_class_white_list)
if probe_class not in (
'ro_ec_firmware', 'ro_pd_firmware', 'ro_main_firmware',
'hash_gbb', 'key_recovery', 'key_root'))
arch = _ShellOutput('crossystem arch')
comp_probes = FilterProbes(_COMPONENT_PROBE_MAP, arch, target_comp_classes)
initial_configs = {}
volatiles = {}
if fast_fw_probe:
logging.debug('fast_fw_probe enabled.')
optional_fields = {
'ro_ec_firmware': _ShellOutput('mosys ec info -s fw_version'),
'ro_pd_firmware': _ShellOutput('mosys pd info -s fw_version')
}
for k, v in optional_fields.iteritems():
if v:
volatiles[k] = {'version': v}
volatiles['ro_main_firmware'] = {
'version': _ShellOutput('crossystem ro_fwid')}
probe_volatile = False
probe_initial_config = False
probe_vpd = False
if probe_initial_config:
ic_probes = FilterProbes(_INITIAL_CONFIG_PROBE_MAP, arch, None)
else:
ic_probes = {}
found_probe_value_map = {}
missing_component_classes = []
# TODO(hungte) Extend _ComponentProbe to support filtering flashrom related
# probing methods.
vpd_classes = ['region']
for comp_class, probe_fun in comp_probes.items():
if comp_class in vpd_classes and not probe_vpd:
logging.info('Ignored probing [%s]', comp_class)
continue
logging.info('probing [%s]...', comp_class)
probe_values = RunProbe(probe_fun)
if not probe_values:
missing_component_classes.append(comp_class)
elif len(probe_values) == 1:
found_probe_value_map[comp_class] = probe_values.pop()
else:
found_probe_value_map[comp_class] = sorted(probe_values)
for ic_class, probe_fun in ic_probes.items():
probe_value = RunProbe(probe_fun)
if probe_value is not None:
initial_configs[ic_class] = probe_value
if probe_volatile:
main_fw_file = crosfw.LoadMainFirmware().GetFileName()
volatiles.update(CalculateFirmwareHashes(main_fw_file))
ec_fw_file = crosfw.LoadEcFirmware().GetFileName()
if ec_fw_file is not None:
volatiles.update(CalculateFirmwareHashes(ec_fw_file))
pd_fw_file = crosfw.LoadPDFirmware().GetFileName()
if pd_fw_file is not None:
# Currently PD is using same FMAP layout as EC so we have to rename
# section name to avoid conflict.
hashes = CalculateFirmwareHashes(pd_fw_file)
volatiles.update({'ro_pd_firmware': hashes['ro_ec_firmware']})
if probe_vpd:
for which, vpd_field in (('ro', ReadRoVpd()),
('rw', ReadRwVpd())):
for k, v in sorted(vpd_field.items()):
volatiles['vpd.%s.%s' % (which, k)] = v
# Filter the ".*.auto" suffix
found_probe_value_map = RemoveAutoSuffix(found_probe_value_map)
return ProbeResults(
found_probe_value_map=found_probe_value_map,
missing_component_classes=missing_component_classes,
found_volatile_values=volatiles,
initial_configs=initial_configs)