blob: eafd93b7bfe557e8d2fc536bc6a26aadd8bf2404 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import array
import fcntl
import glob
import imp
import os
import pprint
import re
import sys
import threading
import time
import types
import edid
import flashrom_util
import gft_common
import gft_fwhash
import vblock
from gft_common import DebugMsg, VerboseMsg, WarningMsg, ErrorMsg, ErrorDie
class HardwareComponents(object):
""" Hardware Components Scanner """
# Function names in this class are used for reflection, so please don't change
# the function names even if they are not compliant to coding style guide.
version = 7
# We divide all component IDs (cids) into 5 categories:
# - enumerable: able to get the results by running specific commands;
# - probable: returns existed or not by given some pre-defined choices;
# - pure data: data for some special purpose, can't be tested;
_enumerable_cids = [
'data_display_geometry',
'data_release_board',
'hash_key_recovery',
'hash_key_root',
'hash_gbb',
'hash_ro_ec_firmware',
'hash_ro_main_firmware',
'part_id_audio_codec',
'part_id_bluetooth',
'part_id_camera',
'part_id_cellular',
'part_id_chipset',
'part_id_cpu',
'part_id_display_converter',
'part_id_display_panel',
'part_id_dram',
'part_id_ec_flash_chip',
'part_id_embedded_controller',
'part_id_ethernet',
'part_id_flash_chip',
'part_id_hwqual',
'part_id_keyboard',
'part_id_storage',
'part_id_touchpad',
'part_id_tpm',
'part_id_usb_hosts',
'part_id_vga',
'part_id_wireless',
'version_cellular_firmware',
'version_rw_firmware',
'version_touchpad_firmware',
# Deprecated fields:
# 'part_id_gps',
# - GPS is currently not supported by OS and no way to probe it.
# We should enable it only when OS supports it.
]
_probable_cids = [
'part_id_cardreader',
]
_pure_data_cids = [
'data_bitmap_fv',
'data_recovery_url',
]
# list of cids that should not be fetched asynchronously.
_non_async_cids = [
# Reading EC will become very slow and cause inaccurate results if we try to
# probe components that also fires EC command at the same time.
'part_id_ec_flash_chip',
]
# _not_test_cids and _to_be_tested_cids will be re-created for each match.
_not_test_cids = []
_to_be_tested_cids = []
# TODO(hungte) unify the 'not available' style messages
_not_present = ''
_no_match = 'No match'
_failure_list = [_not_present, _no_match, '']
# Type id for connection management (compatible to flimflam)
_type_cellular = 'cellular'
_type_ethernet = 'ethernet'
_type_wireless = 'wifi'
# Type id for touchpad information
_type_id = 'id'
_type_firmware = 'firmware'
_flimflam_dir = '/usr/local/lib/flimflam/test'
def __init__(self, verbose=False):
self._initialized = False
self._verbose = verbose
self._pp = pprint.PrettyPrinter()
# cache for firmware images
self._flashrom = flashrom_util.flashrom_util(
verbose_msg=VerboseMsg,
exception_type=gft_common.GFTError,
system_output=gft_common.SystemOutput,
system=gft_common.System)
self._temp_files = []
# variables for matching
self._enumerable_system = None
self._file_base = None
self._system = None
self._failures = None
def __del__(self):
for temp_file in self._temp_files:
try:
# DebugMsg('gft_hwcomp: delete temp file %s' % temp_file)
os.remove(temp_file)
except:
pass
# --------------------------------------------------------------------
# System Probing Utilities
def Memorize(f):
""" Decorator for thread-safe memorization """
return gft_common.ThreadSafe(gft_common.Memorize(f))
def EcProperty(f):
""" Decorator for properties that requires programmable EC """
def wrapper(*args, **kargs):
self = args[0]
if not self.has_ec():
return self._not_present
return f(*args, **kargs)
return wrapper
@Memorize
def load_module(self, name):
grep_cmd = ('lsmod 2>/dev/null | grep -q %s' % name)
loaded = (os.system(grep_cmd) == 0)
if not loaded:
if os.system('modprobe %s >/dev/null 2>&1' % name) != 0:
ErrorMsg("Cannot load module: %s" % name)
return loaded
@Memorize
def is_legacy_device_record(self, record):
""" Returns if a matching record looks like a legacy device. """
# Current format: [0-9a-f]{4}:[0-9a-f]{4}
return True if re.match('[0-9a-f]{4}:[0-9a-f]{4}', record) else False
@Memorize
def _get_legacy_device_list(self):
# pci: cat /proc/bus/pci/devices | cut -f 2 # 0.004s < lspci=0.012s
device_list = []
pci_device_file = '/proc/bus/pci/devices'
if os.path.exists(pci_device_file):
with open(pci_device_file) as handle:
pci_list = [data.split('\t', 2)[1]
for data in handle.readlines()]
device_list += ['%s:%s' % (entry[:4], entry[4:])
for entry in pci_list]
else:
DebugMsg('Failed to read %s. Execute lspci.' % pci_device_file)
pci_list = [entry.split()[2:4]
for entry in
gft_common.SystemOutput('lspci -n -mm').splitlines()]
device_list += ['%s:%s' % (vendor.strip('"'), product.strip('"'))
for (vendor, product) in pci_list]
# usb: realpath(/sys/bus/usb/devices/*:*)/../id* # 0.05s < lspci=0.078s
usb_devs = glob.glob('/sys/bus/usb/devices/*:*')
for dev in usb_devs:
path = os.path.join(os.path.realpath(dev), '..')
device_list += ['%s:%s' %
(gft_common.ReadOneLine(os.path.join(path, 'idVendor')),
gft_common.ReadOneLine(os.path.join(path, 'idProduct')))]
DebugMsg('Legacy device list: ' + ', '.join(device_list))
return device_list
@Memorize
def import_flimflam_module(self):
""" Tries to load flimflam module from current system """
if not os.path.exists(self._flimflam_dir):
DebugMsg('no flimflam installed in %s' % self._flimflam_dir)
return None
try:
return imp.load_module('flimflam',
*imp.find_module('flimflam', [self._flimflam_dir]))
except ImportError:
ErrorMsg('Failed to import flimflam.')
except:
ErrorMsg('Failed to load flimflam.')
return None
@Memorize
def load_flimflam(self):
"""Gets information provided by flimflam (connection manager)
Returns
(None, None) if failed to load module, otherwise
(connection_path, connection_info) where
connection_path is a dict in {type: device_path},
connection_info is a dict of {type: {attribute: value}}.
"""
flimflam = self.import_flimflam_module()
if not flimflam:
return (None, None)
path = {}
info = {}
info_attribute_names = {
self._type_cellular: ['Carrier', 'FirmwareRevision', 'HardwareRevision',
'ModelID', 'Manufacturer'],
}
devices = flimflam.FlimFlam().GetObjectList('Device')
unpack = flimflam.convert_dbus_value
# TODO(hungte) support multiple devices existence in future
for device in devices:
# populate the 'path' collection
prop = device.GetProperties()
prop_type = unpack(prop['Type'])
prop_path = unpack(prop['Interface'])
if prop_type in path:
WarningMsg('Multiple network devices with same type (%s) were found.'
'Target path changed from %s to %s.' %
(prop_type, path[prop_type], prop_path))
path[prop_type] = '/sys/class/net/%s/device' % prop_path
if prop_type not in info_attribute_names:
continue
# populate the 'info' collection
info[prop_type] = dict((
(key, unpack(prop['Cellular.%s' % key]))
for key in info_attribute_names[prop_type]
if ('Cellular.%s' % key) in prop))
return (path, info)
@Memorize
def _get_all_connection_info(self):
""" Probes available connectivity and device information """
connection_info = {
self._type_wireless: '/sys/class/net/wlan0/device',
self._type_ethernet: '/sys/class/net/eth0/device',
# cellular may also be /sys/class/net/usb0
self._type_cellular: '/sys/class/tty/ttyUSB0/device',
}
(path, _) = self.load_flimflam()
if path is not None:
# trust flimflam instead.
for k in connection_info:
connection_info[k] = (path[k] if k in path else '')
return connection_info
@Memorize
def i2cdump(self, bus, address, size):
""" Reads binary dump from i2c bus. """
# TODO(hungte) Use /usr/sbin/i2cdump if possible
self.load_module('i2c_dev')
if type(bus) is types.IntType:
bus = '/dev/i2c-%d' % bus
fd = -1
I2C_SLAVE = 0x0703
blob = None
try:
fd = os.open(bus, os.O_RDWR)
if fcntl.ioctl(fd, I2C_SLAVE, address) != 0:
return blob
time.sleep(0.05) # Wait i2c to get ready
if os.write(fd, chr(0)) == 1:
blob = os.read(fd, size)
except:
pass
finally:
if fd >= 0:
os.close(fd)
return blob
@Memorize
def load_lvds_edid(self):
""" Probes LVDS data by using EDID method. """
lvds_files = glob.glob('/sys/class/drm/*LVDS*/edid')
if lvds_files:
try:
return edid.parse_edid_file(lvds_files[0])
except ValueError, e:
ErrorMsg('EDID Parsing Error: %s' % e)
# Try i2c
self.load_module('i2c_dev')
i2c_files = glob.glob('/dev/i2c-?')
i2c_files.sort()
# LVDS are usually on address 0x50.
I2C_LVDS_ADDERSS = 0x50
for i2c_file in i2c_files:
edid_blob = self.i2cdump(i2c_file, I2C_LVDS_ADDERSS,
edid.EDID_MINIMAL_SIZE)
if edid_blob:
try:
return edid.parse_edid(edid_blob)
except ValueError, e:
WarningMsg('EDID Parsing Error: %s' % e)
# TODO(hungte) we can also try xrandr
return None
def _get_sysfs_device_info(self, path, primary, optional=[]):
"""Gets the device information of a sysfs node.
Args
path: the sysfs device path.
primary: mandatory list of elements to read.
optional: optional list of elements to read.
Returns
[primary_values_dict, optional_values_dict]
"""
primary_values = {}
optional_values = {}
for element in primary:
element_path = os.path.join(path, element)
if not os.path.exists(element_path):
return [None, None]
primary_values[element] = gft_common.ReadOneLine(element_path)
for element in optional:
element_path = os.path.join(path, element)
if os.path.exists(element_path):
optional_values[element] = gft_common.ReadOneLine(element_path)
return [primary_values, optional_values]
def _get_pci_device_info(self, path):
""" Returns a PCI 'vendor:device' component information. """
# TODO(hungte) PCI has a 'rev' info which may be better added into info.
(info, _) = self._get_sysfs_device_info(path, ['vendor', 'device'])
return '%s:%s' % (info['vendor'].replace('0x', ''),
info['device'].replace('0x', '')) if info else None
def _get_usb_device_info(self, path):
""" Returns an USB 'idVendor:idProduct manufacturer product' info. """
# USB in sysfs is hierarchy, and usually uses the 'interface' layer.
# If we are in 'interface' layer, the product info is in real parent folder.
path = os.path.realpath(path)
while path.find('/usb') > 0:
if os.path.exists(os.path.join(path, 'idProduct')):
break
path = os.path.split(path)[0]
optional_fields = ['manufacturer', 'product', 'bcdDevice']
(info, optional) = self._get_sysfs_device_info(
path, ['idVendor', 'idProduct'], optional_fields)
if not info:
return None
info_string = '%s:%s' % (info['idVendor'].replace('0x', ''),
info['idProduct'].replace('0x', ''))
for field in optional_fields:
if field in optional:
info_string += ' ' + optional[field]
return info_string
def get_sysfs_device_id(self, path):
"""Gets a sysfs device identifier. (Currently supporting USB/PCI)
Args
path: a path to sysfs device (ex, /sys/class/net/wlan0/device)
Returns
An identifier string, or self._not_present if not available.
"""
if not path:
return self._not_present
path = os.path.realpath(path)
if not os.path.isdir(path):
return self._not_present
info = (self._get_pci_device_info(path) or
self._get_usb_device_info(path))
return info or self._not_present
def get_sysfs_node_id(self, path):
"""Gets a sysfs node identifier (for devices that may exist on SOC or bus)
Args
path: a path to sysfs node (ex, /sys/class/video4linux/video0) which
contains a 'device' or 'name' property.
Returns
An identifier string, or self._not_present if not available.
"""
if not path:
return self._not_present
device_id = self.get_sysfs_device_id(os.path.join(path, 'device'))
if device_id:
return device_id
name_path = os.path.join(path, 'name')
if os.path.exists(name_path):
return gft_common.ReadOneLine(name_path)
return self._not_present
def compact_id(self, id_info):
"""Returns a identifier with white space characters compressed
Args
id_info: a string or list/tuple of identifiers
Returns
A string with all identifiers described by id_info with minimal space
"""
if type(id_info) == list or type(id_info) == tuple:
id_info = ' '.join(id_info)
return re.sub('\s+', ' ', id_info).strip()
@Memorize
def get_arch(self):
""" Gets current system architecture. """
return gft_common.GetSystemArch()
@Memorize
def get_ssd_name(self, partition_number=None):
"""Gets a proper SSD device name by platform (arch) detection.
Args
partition_number: numeric index for partition. None for whole disk.
Returns
A device name for SSD, base on self.get_arch() result.
"""
arch = self.get_arch()
partno = '' if partition_number is None else ('%d' % partition_number)
if arch in ('x86', 'amd64'):
return 'sda' + partno
elif arch in ('arm'):
if partno:
partno = 'p' + partno
return 'mmcblk0' + partno
else:
assert False, ('get_ssd_name: unknown arch: %s' % arch)
return 'sda' + partno
@Memorize
def has_ec(self):
""" Returns if current system has programmable EC chips. """
arch = self.get_arch()
if arch in ('x86', 'amd64'):
return True
elif arch in ('arm'):
return False
else:
assert False, 'unknown platform for EC information.'
return False
# --------------------------------------------------------------------
# Firmware Processing Utilities
@Memorize
def _load_firmware(self, target_name):
filename = gft_common.GetTemporaryFileName('fw_cache')
self._temp_files.append(filename)
option_map = {
'main': '-p internal:bus=spi',
'ec': '-p internal:bus=lpc',
}
assert target_name in option_map
# example output:
# Found chip "Winbond W25x16" (2048 KB, FWH) at physical address 0xfe
# TODO(hungte) maybe we don't need the -V in future -- if that can make
# the command faster.
command = 'flashrom -V %s -r %s' % (option_map[target_name], filename)
parts = []
lines = gft_common.SystemOutput(
command,
progress_message='Reading %s firmware: ' % target_name,
show_progress=self._verbose).splitlines()
for line in lines:
match = re.search(r'Found chip "(.*)" .* at physical address ', line)
if match:
parts.append(match.group(1))
part_id = ', '.join(parts)
# restore flashrom target bus
if target_name != 'main':
os.system('flashrom %s >/dev/null 2>&1' % option_map['main'])
return (part_id, filename)
def load_main_firmware(self):
""" Loads and cache main (BIOS) firmware image.
Returns
A file name of cached image.
"""
(_, image_file) = self._load_firmware('main')
return image_file
@EcProperty
def load_ec_firmware(self):
""" Loads and cache EC firmware image.
Returns
A file name of cached image.
"""
(_, image_file) = self._load_firmware('ec')
return image_file
@Memorize
def _read_gbb_component(self, name):
image_file = self.load_main_firmware()
if not image_file:
ErrorDie('cannot load main firmware')
filename = gft_common.GetTemporaryFileName('gbb%s' % name)
if os.system('gbb_utility -g --%s=%s %s >/dev/null 2>&1' %
(name, filename, image_file)) != 0:
ErrorDie('cannot get %s from GBB' % name)
value = gft_common.ReadBinaryFile(filename)
os.remove(filename)
return value
# --------------------------------------------------------------------
# Product-Specific Probing
@Memorize
def probe_touchpad(self):
""" Probes touchpad information.
Returns:
A dict of { _type_id: 'TOUCHPAD_HWINFO',
_type_firmware: 'TOUCHPAD_FWINFO' }
"""
def synaptics():
detect_program = '/opt/Synaptics/bin/syndetect'
model_string_str = 'Model String'
firmware_id_str = 'Firmware ID'
if not os.path.exists(detect_program):
return None
command_list = [detect_program]
# Determine if X is capturing touchpad
locked = os.system('lsof /dev/serio_raw0 2>/dev/null | grep -q "^X"')
if (locked == 0) and (not os.getenv('DISPLAY')):
ErrorMsg('Warning: You are trying to detect touchpad with X in '
'foreground but not configuring DISPLAY properly.\n'
'Test may fail with incorrect detection results.')
# Make a trial with default configuration (see cros/cros_ui.py and
# suite_Factory/startx.sh)
command_list.insert(0, 'DISPLAY=":0"')
xauthority_locations = ('/var/run/factory_ui.auth',
'/home/chronos/.Xauthority')
valid_xauth = [xauth for xauth in xauthority_locations
if os.path.exists(xauth)]
if valid_xauth:
command_list.insert(0, 'XAUTHORITY="%s"' % valid_xauth[0])
(exit_code, data, _) = gft_common.ShellExecution(
' '.join(command_list),
ignore_status=True,
progress_message='Synaptics Touchpad: ',
show_progress=self._verbose)
if exit_code != 0:
return None
properties = dict(map(str.strip, line.split('=', 1))
for line in data.splitlines() if '=' in line)
model = properties.get(model_string_str, 'Unknown Synaptics')
# The pattern " on xxx Port" may vary by the detection approach,
# so we need to strip it.
model = re.sub(' on [^ ]* [Pp]ort$', '', model)
firmware_id = properties.get(firmware_id_str, self._not_present)
return (model, firmware_id)
def cypress():
nodes = glob.glob('/sys/class/input/mouse[0-9]*/device/device')
for node in nodes:
id_files = ['product_id', 'hardware_version', 'protocol_version']
if not all([os.path.exists(os.path.join(node, field))
for field in id_files]):
continue
firmware_files = ['firmware_version']
model = self.compact_id(
[gft_common.ReadOneLine(os.path.join(node, field))
for field in id_files])
firmware_id = self.compact_id(
[gft_common.ReadOneLine(os.path.join(node, field))
for field in firmware_files
if os.path.exists(os.path.join(node, field))])
return (model, firmware_id)
return None
def generic():
# TODO(hungte) add more information from id/*
# format: N: Name="XXX_trackpad"
input_file = '/proc/bus/input/devices'
cmd_grep = 'grep -iE "^N.*(touch *pad|track *pad)" %s' % input_file
info = gft_common.SystemOutput(cmd_grep, ignore_status=True).splitlines()
info = [re.sub('^[^"]*"(.*)"$', r'\1', device)
for device in info]
return (', '.join(info) or self._not_present, self._not_present)
method_list = [cypress, synaptics, generic]
data = { self._type_id: self._not_present,
self._type_firmware: self._not_present }
for method in method_list:
result = method()
DebugMsg('probe_touchpad: %s: %s' % (method,result or '<failed>'))
if result:
data[self._type_id] = result[0]
data[self._type_firmware] = result[1]
return data
return data
# --------------------------------------------------------------------
# Enumerable Properties
def get_data_display_geometry(self):
# Get edid from driver.
# TODO(nsanders): this is driver specific.
# Try EDID
lvds_edid = self.load_lvds_edid()
if lvds_edid:
return '%dx%d' % (lvds_edid[edid.EDID_WIDTH],
lvds_edid[edid.EDID_HEIGHT])
# Try frame buffer
fb_modes_file = '/sys/class/graphics/fb0/modes'
if os.path.exists(fb_modes_file):
# format: U:1366x768p-0
fb_mode = gft_common.ReadOneLine(fb_modes_file)
geometry = re.search(r'\d+x\d+', fb_mode)
if geometry:
return geometry.group(0)
return self._not_present
def get_hash_gbb(self):
image_file = self.load_main_firmware()
return gft_fwhash.GetMainFirmwareGbbHash(file_source=image_file)
def get_hash_key_recovery(self):
current_key = self._read_gbb_component('recoverykey')
return gft_fwhash.GetKeyHash(current_key)
def get_hash_key_root(self):
current_key = self._read_gbb_component('rootkey')
return gft_fwhash.GetKeyHash(current_key)
@EcProperty
def get_hash_ro_ec_firmware(self):
"""
Returns a hash of Embedded Controller firmware read only parts,
to confirm we have proper updated version of EC firmware.
"""
image_file = self.load_ec_firmware()
if not image_file:
ErrorDie('get_hash_ro_ec_firmware: cannot read firmware')
return gft_fwhash.GetEcFirmwareReadOnlyHash(file_source=image_file)
def get_hash_ro_main_firmware(self):
"""
Returns a hash of main firmware (BIOS) read only parts,
to confirm we have proper keys / boot code / recovery image installed.
"""
image_file = self.load_main_firmware()
if not image_file:
ErrorDie('get_hash_ro_main_firmware: cannot read firmware')
return gft_fwhash.GetMainFirmwareReadOnlyHash(file_source=image_file)
def get_part_id_cellular(self):
device_path = self._get_all_connection_info()[self._type_cellular]
return self.get_sysfs_device_id(device_path) or self._not_present
def get_part_id_audio_codec(self):
cmd = 'grep -R Codec: /proc/asound/* | head -n 1 | sed s/.\*Codec://'
part_id = gft_common.SystemOutput(
cmd, progress_message='Searching Audio Codecs: ',
show_progress=self._verbose).strip()
# If no codec installed, try PCM.
if not part_id:
# format: 00-00: WMXXXX PCM wmxxxx-hifi-0: ...
pcm_data = gft_common.ReadOneLine('/proc/asound/pcm').split(' ')
if len(pcm_data) > 2:
part_id = pcm_data[1]
return part_id
def get_part_id_bluetooth(self):
return self.get_sysfs_device_id('/sys/class/bluetooth/hci0/device')
def get_part_id_camera(self):
info = []
camera_node = '/sys/class/video4linux/video0'
info.append(self.get_sysfs_node_id(camera_node))
# For soc-camera, "control/name" is a helpful driver name.
control_name = os.path.join(camera_node, 'device/control/name')
if os.path.exists(control_name):
info.append(gft_common.ReadOneLine(control_name))
# Try video4linux2 (v4l2) interface
camera_dev = '/dev/video0'
if os.path.exists(camera_dev):
fd = -1
try:
fd = os.open(camera_dev, os.O_RDWR)
# See /usr/include/linux/videodev2.h for definition
VIDIOC_DBG_G_CHIP_IDENT = 0xc02c5651
V4L2_DBG_CHIP_IDENT_SIZE = 11
V4L2_INDEX_REVISION = V4L2_DBG_CHIP_IDENT_SIZE - 1
V4L2_INDEX_IDENT = V4L2_INDEX_REVISION - 1
V4L2_VALID_IDENT = 3 # V4L2_IDENT_UNKNOWN + 1
v4l2_dbg_chip_ident = array.array('i', [0] * V4L2_DBG_CHIP_IDENT_SIZE)
fcntl.ioctl(fd, VIDIOC_DBG_G_CHIP_IDENT, v4l2_dbg_chip_ident, 1)
# 'ident' values are defined in include/media/v4l2-chip-ident.h
v4l2_ident = v4l2_dbg_chip_ident[V4L2_INDEX_IDENT]
if v4l2_ident >= V4L2_VALID_IDENT:
info.append('V4L2:%04x %04x' %
(v4l2_ident, v4l2_dbg_chip_ident[V4L2_INDEX_REVISION]))
except:
pass
finally:
if fd >= 0:
os.close(fd)
return self.compact_id(info)
def get_part_id_display_converter(self):
""" Gets display converter by dedicated probing """
def probe_ch7036():
self.load_module('i2c_dev')
probe_cmd = 'ch7036_monitor -p >/dev/null 2>&1'
return 'ch7036' if os.system(probe_cmd) == 0 else ''
method_list = [probe_ch7036]
part_id = self._not_present
for method in method_list:
part_id = method()
DebugMsg('get_part_id_display_converter: %s: %s' %
(method.__name__, part_id or '<failed>'))
if part_id:
break
return part_id
def get_part_id_chipset(self):
# On x86, host bridge is always the first PCI device.
# For SOC-based system, trust the first compatible list in device-tree
# (fdt).
part_id = self.get_sysfs_device_id('/sys/bus/pci/devices/0000:00:00.0')
fdt_compatible_file = '/proc/device-tree/compatible'
if (not part_id) and os.path.exists(fdt_compatible_file):
compatible_list = gft_common.ReadOneLine(fdt_compatible_file)
# format: manufacturer,model [NUL] compat-manufacturer,model [NUL] ...
info = compatible_list.strip(chr(0)).split(chr(0))
part_id = self.compact_id(info)
return part_id
def get_part_id_cpu(self):
part_id = 'Unknown'
cpu_info_map = {
# arch: (command, core_delta)
# The core_delta computation is caused by different report format.
# For platforms like x86, it provides names for each core.
# Sample output for dual-core:
# model name : Intel(R) Atom(TM) CPU XXXX
# model name : Intel(R) Atom(TM) CPU XXXX
'amd64': (r'sed -nr "s/^model name\t*: (.*)/\1/p" /proc/cpuinfo', 0),
'x86': (r'sed -nr "s/^model name\s*: (.*)/\1/p" /proc/cpuinfo', 0),
# For platforms like arm, it gives the name only in 'Processor'
# and then a numeric ID for each cores 'processor', so delta is 1.
# Sample output for dual-core:
# Processor : ARMXXXX
# processor : 0
# processor : 1
'arm': (r'sed -nr "s/^[Pp]rocessor\s*: (.*)/\1/p" /proc/cpuinfo', 1),
}
arch = self.get_arch()
if arch not in cpu_info_map:
return part_id
info = gft_common.SystemOutput(cpu_info_map[arch][0]).splitlines()
if not info:
return part_id
cores = len(info) - cpu_info_map[arch][1]
info = info[0:1]
# TODO(hungte) For backward compatbility, core number reporting on x86 is
# disabled. We need to fix HWID component lists in future.
if cores > 1 and arch != 'x86':
info.append('*%d' % cores)
return self.compact_id(info)
def get_part_id_display_panel(self):
# Try EDID
lvds_edid = self.load_lvds_edid()
if lvds_edid:
return '%s:%04x' % (lvds_edid[edid.EDID_MANUFACTURER_ID],
lvds_edid[edid.EDID_PRODUCT_ID])
# Try frame buffer
fb_filename = '/sys/class/graphics/fb0/name'
if os.path.exists(fb_filename):
return gft_common.ReadOneLine(fb_filename)
return self._not_present
def get_part_id_dram(self):
arch = self.get_arch()
if arch in ('x86', 'amd64'):
# TODO(hungte) if we only want DRAM size, maybe no need to use mosys
self.load_module('i2c_dev')
cmd = ('mosys -l memory spd print geometry | '
'grep size_mb | cut -f2 -d"|"')
part_id = gft_common.SystemOutput(cmd).strip()
elif arch in ('arm'):
# Even kernel cannot probe the memory. We can only trust the memory
# information passed to kernel.
param = gft_common.ReadOneLine('/proc/cmdline')
# Format: *mem=384M@0M (type=size@address)
part_id = '%d' % sum(
[int(size) for size in re.findall(r'\s\w*mem=(\d+)M@\d+M', param)])
return part_id if part_id else self._not_present
@EcProperty
def get_part_id_ec_flash_chip(self):
(chip_id, _) = self._load_firmware('ec')
return chip_id
@EcProperty
def get_part_id_embedded_controller(self):
# example output:
# Found Nuvoton WPCE775x (id=0x05, rev=0x02) at 0x2e
parts = []
res = gft_common.SystemOutput(
'superiotool',
progress_message='Probing Embedded Controller: ',
show_progress=self._verbose,
ignore_status=True).splitlines()
for line in res:
match = re.search(r'Found (.*) at', line)
if match:
parts.append(match.group(1))
part_id = ', '.join(parts)
return part_id
def get_part_id_ethernet(self):
device_path = self._get_all_connection_info()[self._type_ethernet]
return self.get_sysfs_device_id(device_path) or self._not_present
def get_part_id_flash_chip(self):
(chip_id, _) = self._load_firmware('main')
return chip_id
def get_part_id_hwqual(self):
part_id = gft_common.SystemOutput('crossystem hwid').strip()
# TODO(hungte) compare this with HWID in GBB.
return (part_id if part_id else self._not_present)
def get_part_id_storage(self):
part_id = self._not_present
node = '/sys/class/block/%s' % self.get_ssd_name()
path = os.path.join(node, 'device')
if not os.path.exists(path):
return part_id
size = ''
size_path = os.path.join(node, 'size')
if os.path.exists(size_path):
size = '#' + gft_common.ReadOneLine(size_path)
info_list = (
['vendor', 'model'], # ATA
['type', 'name', 'fwrev', 'hwrev', 'oemid', 'manfid'], # EMMC
)
for info in info_list:
data = [gft_common.ReadOneLine(os.path.join(path, data_file))
for data_file in info
if os.path.exists(os.path.join(path, data_file))]
if data:
return self.compact_id(data + [size])
return part_id
def get_part_id_keyboard(self):
# VPD value "keyboard_layout"="xkb:gb:extd:eng" should be listed.
image_file = self.load_main_firmware()
part_id = gft_common.SystemOutput(
'vpd -i RO_VPD -l -f "%s" | grep keyboard_layout | cut -f4 -d\\"' %
image_file).strip()
return part_id or self._not_present
def get_part_id_touchpad(self):
data = self.probe_touchpad()
return data[self._type_id]
def get_part_id_tpm(self):
""" Returns Manufacturer_info : Chip_Version """
cmd = 'tpm_version'
part_id = self._not_present
tpm_output = gft_common.SystemOutput(cmd)
tpm_lines = tpm_output.splitlines()
tpm_dict = {}
for tpm_line in tpm_lines:
[key, colon, value] = tpm_line.partition(':')
tpm_dict[key.strip()] = value.strip()
(key1, key2) = ('Manufacturer Info', 'Chip Version')
if key1 in tpm_dict and key2 in tpm_dict:
part_id = tpm_dict[key1] + ':' + tpm_dict[key2]
return part_id
def get_part_id_usb_hosts(self):
arch = self.get_arch()
if arch in ('x86', 'amd64'):
# on x86, USB hosts are PCI devices, located in parent of root USB.
relpath = '..'
else:
# on ARM and others, use the root device itself.
relpath = '.'
usb_bus_list = glob.glob('/sys/bus/usb/devices/usb*')
usb_host_list = [os.path.join(os.path.realpath(path), relpath)
for path in usb_bus_list]
# usually there are several USB hosts, so only list the primary information.
usb_host_info = [re.sub(' .*', '', self.get_sysfs_device_id(device))
for device in usb_host_list]
usb_host_info.sort(reverse=True)
return ' '.join(usb_host_info)
def get_part_id_vga(self):
return self.get_sysfs_node_id('/sys/class/graphics/fb0')
def get_part_id_wireless(self):
device_path = self._get_all_connection_info()[self._type_wireless]
return self.get_sysfs_device_id(device_path) or self._not_present
def get_version_cellular_firmware(self):
(_, attributes) = self.load_flimflam()
if attributes and (self._type_cellular in attributes):
# A list of possible version info combination, since the supported fields
# may differ for partners.
version_formats = [
['Carrier', 'FirmwareRevision'],
['FirmwareRevision'],
['HardwareRevision']]
info = attributes[self._type_cellular]
for version_format in version_formats:
if not set(version_format).issubset(set(info)):
continue
# compact all fields into single line with limited space
return self.compact_id([info[key] for key in version_format])
# If nothing available, try 'modem status'.
modem_status = gft_common.SystemOutput(
'modem status | grep firmware_revision', ignore_status=True).strip()
info = re.findall('^\s*firmware_revision:\s*(.*)', modem_status)
if info and info[0]:
return info[0]
return self._not_present
def get_version_rw_firmware(self):
"""
Returns the version of Read-Write (writable) firmware from VBLOCK sections.
If A/B has different version, that means this system needs a reboot +
firmwar update so return value is a "error report" in the form "A=x, B=y".
"""
# TODO(hungte) Support new VBLOCK, or use vbutil_firmware
versions = [None, None]
section_names = ['VBLOCK_A', 'VBLOCK_B']
image_file = self.load_main_firmware()
if not image_file:
ErrorDie('Cannot read system main firmware.')
flashrom = self._flashrom
base_img = open(image_file).read()
flashrom_size = len(base_img)
# we can trust base image for layout, since it's only RW.
layout = flashrom.detect_chromeos_bios_layout(flashrom_size, base_img)
if not layout:
ErrorDie('Cannot detect ChromeOS flashrom layout')
for (index, name) in enumerate(section_names):
data = flashrom.get_section(base_img, layout, name)
block = vblock.unpack_verification_block(data)
ver = block['VbFirmwarePreambleHeader']['firmware_version']
versions[index] = ver
# we embed error reports in return value.
assert len(versions) == 2
if versions[0] != versions[1]:
return 'A=%d, B=%d' % (versions[0], versions[1])
return '%d' % versions[0]
def get_version_touchpad_firmware(self):
data = self.probe_touchpad()
return data[self._type_firmware]
# --------------------------------------------------------------------
# Probable Properties
def probe_part_id_cardreader(self, part_id):
# A cardreader is always power off until a card inserted. So checking
# it using log messages instead of lsusb can limit operator-attended.
# But note that it does not guarantee the cardreader presented during
# the time of the test.
# TODO(hungte) Grep entire /var/log/message may be slow. Currently we cache
# this result by verify_probable_component, but we should find some better
# and reliable way to detect this.
[vendor_id, product_id] = part_id.split(':')
found_pattern = ('New USB device found, idVendor=%s, idProduct=%s' %
(vendor_id, product_id))
cmd = 'grep -qs "%s" /var/log/messages*' % found_pattern
return os.system(cmd) == 0
# --------------------------------------------------------------------
# Matching
def force_get_property(self, property_name):
""" Returns property value or empty string on error. """
try:
return getattr(self, property_name)()
except gft_common.GFTError, e:
ErrorMsg("Error in probing property %s: %s" % (property_name, e.value))
return ''
except:
ErrorMsg('Exception in getting property %s' % property_name)
return ''
def update_ignored_cids(self, ignored_cids):
for cid in ignored_cids:
if cid in self._to_be_tested_cids:
self._to_be_tested_cids.remove(cid)
DebugMsg('Ignoring cid: %s' % cid)
else:
ErrorDie('The ignored cid %s is not defined' % cid)
self._not_test_cids.append(cid)
def read_approved_from_file(self, filename):
approved = gft_common.LoadComponentsDatabaseFile(filename)
for cid in self._to_be_tested_cids + self._not_test_cids:
if cid not in approved:
# If we don't have any listing for this type
# of part in HWID, it's not required.
WarningMsg('gft_hwcomp: Bypassing unlisted cid %s' % cid)
approved[cid] = '*'
return approved
def get_all_enumerable_components(self, async):
results = {}
thread_pools = []
def fetch_enumerable_component(cid):
""" Fetch an enumerable component and update the results """
if self._verbose:
sys.stdout.flush()
sys.stderr.write('<Fetching property %s>\n' % cid)
components = self.force_get_property('get_' + cid)
if not isinstance(components, list):
components = [components]
results[cid] = components
class FetchThread(threading.Thread):
""" Thread object for parallel enumerating """
def __init__(self, cid):
threading.Thread.__init__(self)
self.cid = cid
def run(self):
fetch_enumerable_component(self.cid)
for cid in self._enumerable_cids:
if async and cid not in self._non_async_cids:
thread_pools.append(FetchThread(cid))
else:
fetch_enumerable_component(cid)
# Complete the threads
for thread in thread_pools:
thread.start()
for thread in thread_pools:
thread.join()
return results
def format_failure(self, exact_values, approved_values):
message_not_present = 'Not Present'
actual = [(message_not_present
if value in self._failure_list else value)
for value in exact_values]
expected = [(message_not_present
if value in self._failure_list else value)
for value in approved_values]
return ['Actual: %s' % ', '.join(set(actual)),
'Expected: %s' % ' | '.join(set(expected))]
def check_enumerable_component(self, cid, exact_values, approved_values):
if '*' in approved_values:
return
unmatched = [value for value in exact_values
if value not in approved_values]
if not unmatched:
return
# there's some error, let's try to match them in legacy list
match_goal = [value for value in approved_values
if value not in self._failure_list]
legacy_approved = filter(self.is_legacy_device_record, match_goal)
if match_goal and (set(legacy_approved) == set(match_goal)):
DebugMsg('Start legacy search for cid: ' + cid)
# TODO(hungte) prefetch this list in async batch process.
legacy_list = self._get_legacy_device_list()
matched = list(set(legacy_list).intersection(set(match_goal)))
if matched:
DebugMsg('Changed detected list: %s->%s' % (self._system[cid], matched))
self._system[cid] = matched
return
# update with remaining error.
self._failures[cid] = self.format_failure(exact_values, approved_values)
@Memorize
def verify_probable_component(self, cid, approved_values):
if '*' in approved_values:
return (True, ['*'])
for value in approved_values:
present = getattr(self, 'probe_' + cid)(value)
if present:
return (True, [value])
return (False, [self._no_match])
def check_probable_component(self, cid, approved_values):
(probed, value) = self.verify_probable_component(cid, approved_values)
if probed:
self._system[cid] = value
else:
self._failures[cid] = self.format_failure(value, approved_values)
def pformat(self, obj):
return '\n' + self._pp.pformat(obj) + '\n'
def initialize(self, force=False, async=False):
if self._initialized and not force:
return
# probe current system components
DebugMsg('Starting to detect system components...')
self._enumerable_system = self.get_all_enumerable_components(async)
self._initialized = True
def match_current_system(self, filename, ignored_cids=[]):
""" Matches a given component list to current system.
Returns: (current, failures)
"""
# assert self._initialized, 'Not initialized.'
self._to_be_tested_cids = (self._enumerable_cids +
self._probable_cids)
self._not_test_cids = self._pure_data_cids[:]
self.update_ignored_cids(ignored_cids)
self._failures = {}
self._system = {}
self._system.update(self._enumerable_system)
self._file_base = gft_common.GetComponentsDatabaseBase(filename)
approved = self.read_approved_from_file(filename)
for cid in self._enumerable_cids:
if cid not in self._to_be_tested_cids:
VerboseMsg('gft_hwcomp: match: ignored: %s' % cid)
else:
self.check_enumerable_component(
cid, self._enumerable_system[cid], approved[cid])
for cid in self._probable_cids:
if cid not in self._to_be_tested_cids:
VerboseMsg('gft_hwcomp: match: ignored: %s' % cid)
else:
self.check_probable_component(cid, approved[cid])
return (self._system, self._failures)
#############################################################################
# Console main entry
@gft_common.GFTConsole
def _main(self_path, args):
do_async = True
# preprocess args
compdb_list = []
for arg in args:
if arg == '--sync':
do_async = False
elif arg == '--async':
do_async = True
elif not os.path.exists(arg):
print 'ERROR: unknown parameter: ' + arg
print 'Usage: %s [--sync|--async] [components_db_files...]\n' % self_path
sys.exit(1)
else:
compdb_list.append(arg)
components = HardwareComponents(verbose=True)
print 'Starting to detect%s...' % (' asynchrounously' if do_async else '')
components.initialize(async=do_async)
if not compdb_list:
print 'Enumerable properties:'
print components.pformat(components._enumerable_system)
sys.exit(0)
print 'Starting to match system...'
for arg in compdb_list:
(matched, failures) = components.match_current_system(arg)
print 'Probed (%s):' % arg
print components.pformat(matched)
print 'Failures (%s):' % arg
print components.pformat(failures)
if __name__ == '__main__':
_main(sys.argv[0], sys.argv[1:])