blob: 1f1f059af5461bc0c81ce4e450c12490e35aefed [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import glob
import imp
import os
import pprint
import re
import sys
import threading
import flashrom_util
import gft_common
import gft_fwhash
import vblock
from gft_common import DebugMsg, VerboseMsg, WarningMsg, ErrorMsg, ErrorDie
class HardwareComponents(object):
""" Hardware Components Scanner """
# Function names in this class are used for reflection, so please don't change
# the function names even if they are not comliant to coding style guide.
version = 6
# We divide all component IDs (cids) into 5 categories:
# - enumerable: able to get the results by running specific commands;
# - probable: returns existed or not by given some pre-defined choices;
# - pure data: data for some special purpose, can't be tested;
_enumerable_cids = [
'data_display_geometry',
'hash_ec_firmware',
'hash_ro_firmware',
'part_id_3g',
'part_id_audio_codec',
'part_id_bluetooth',
'part_id_chrontel',
'part_id_chipset',
'part_id_cpu',
'part_id_display_panel',
'part_id_dram',
'part_id_ec_flash_chip',
'part_id_embedded_controller',
'part_id_ethernet',
'part_id_flash_chip',
'part_id_hwqual',
'part_id_keyboard',
'part_id_storage',
'part_id_tpm',
'part_id_usb_hosts',
'part_id_vga',
'part_id_webcam',
'part_id_wireless',
'vendor_id_touchpad',
'version_3g_firmware',
'version_rw_firmware',
# Deprcated fields:
# 'part_id_gps',
# - GPS is currently not supported by OS and no way to probe it.
# We should enable it only when OS supports it.
]
_probable_cids = [
'key_recovery',
'key_root',
'part_id_cardreader',
]
_pure_data_cids = [
'data_bitmap_fv',
'data_recovery_url',
]
# list of cids that should not be fetched asynchronously.
_non_async_cids = [
# Reading EC will become very slow and cause inaccurate results if we try to
# probe components that also fires EC command at the same time.
'part_id_ec_flash_chip',
]
# _not_test_cids and _to_be_tested_cids will be re-created for each match.
_not_test_cids = []
_to_be_tested_cids = []
# TODO(hungte) unify the 'not available' style messages
_not_present = ''
_no_match = 'No match'
_failure_list = [_not_present, _no_match, '']
# Type id for connection management (compatible to flimflam)
_type_3g = 'cellular'
_type_ethernet = 'ethernet'
_type_wireless = 'wifi'
_flimflam_dir = '/usr/local/lib/flimflam/test'
def __init__(self, verbose=False):
self._initialized = False
self._verbose = verbose
self._pp = pprint.PrettyPrinter()
# cache for firmware images
self._flashrom = flashrom_util.flashrom_util(
verbose_msg=VerboseMsg,
exception_type=gft_common.GFTError,
system_output=gft_common.SystemOutput,
system=gft_common.System)
self._temp_files = []
# variables for matching
self._enumerable_system = None
self._file_base = None
self._system = None
self._failures = None
def __del__(self):
for temp_file in self._temp_files:
try:
# DebugMsg('gft_hwcomp: delete temp file %s' % temp_file)
os.remove(temp_file)
except:
pass
# --------------------------------------------------------------------
# System Probing Utilities
def Memorize(f):
""" Decorator for thread-safe memorization """
return gft_common.ThreadSafe(gft_common.Memorize(f))
@Memorize
def load_module(self, name):
grep_cmd = ('lsmod 2>/dev/null | grep -q %s' % name)
loaded = (os.system(grep_cmd) == 0)
if not loaded:
if os.system('modprobe %s >/dev/null 2>&1' % name) != 0:
ErrorMsg("Cannot load module: %s" % name)
return loaded
@Memorize
def is_legacy_device_record(self, record):
""" Returns if a matching record looks like a legacy device. """
# Current format: [0-9a-f]{4}:[0-9a-f]{4}
return True if re.match('[0-9a-f]{4}:[0-9a-f]{4}', record) else False
@Memorize
def _get_legacy_device_list(self):
# pci: cat /proc/bus/pci/devices | cut -f 2 # 0.004s < lspci=0.012s
device_list = []
pci_device_file = '/proc/bus/pci/devices'
if os.path.exists(pci_device_file):
with open(pci_device_file) as handle:
pci_list = [data.split('\t', 2)[1]
for data in handle.readlines()]
device_list += ['%s:%s' % (entry[:4], entry[4:])
for entry in pci_list]
else:
DebugMsg('Failed to read %s. Execute lspci.' % pci_device_list)
pci_list = [entry.split()[2:4]
for entry in
gft_common.SystemOutput('lspci -n -mm').splitlines()]
device_list += ['%s:%s' % (vendor.strip('"'), product.strip('"'))
for (vendor, product) in pci_list]
# usb: realpath(/sys/bus/usb/devices/*:*)/../id* # 0.05s < lspci=0.078s
usb_devs = glob.glob('/sys/bus/usb/devices/*:*')
for dev in usb_devs:
path = os.path.join(os.path.realpath(dev), '..')
device_list += ['%s:%s' %
(gft_common.ReadOneLine(os.path.join(path, 'idVendor')),
gft_common.ReadOneLine(os.path.join(path, 'idProduct')))]
DebugMsg('Legacy device list: ' + ', '.join(device_list))
return device_list
@Memorize
def import_flimflam_module(self):
""" Tries to load flimflam module from current system """
if not os.path.exists(self._flimflam_dir):
DebugMsg('no flimflam installed in %s' % self._flimflam_dir)
return None
try:
return imp.load_module('flimflam',
*imp.find_module('flimflam', [self._flimflam_dir]))
except ImportError:
ErrorMsg('Failed to import flimflam.')
except:
ErrorMsg('Failed to load flimflam.')
return None
@Memorize
def load_flimflam(self):
"""Gets information provided by flimflam (connection manager)
Returns
(None, None) if failed to load module, otherwise
(connection_path, connection_info) where
connection_path is a dict in {type: device_path},
connection_info is a dict of {type: {attribute: value}}.
"""
flimflam = self.import_flimflam_module()
if not flimflam:
return (None, None)
path = {}
info = {}
info_attribute_names = {
self._type_3g: ['Carrier', 'FirmwareRevision', 'HardwareRevision',
'ModelID', 'Manufacturer'],
}
devices = flimflam.FlimFlam().GetObjectList('Device')
unpack = flimflam.convert_dbus_value
# TODO(hungte) support multiple devices existence in future
for device in devices:
# populate the 'path' collection
prop = device.GetProperties()
prop_type = unpack(prop['Type'])
prop_path = unpack(prop['Interface'])
if prop_type in path:
WarningMsg('Multiple network devices with same type (%s) were found.'
'Target path changed from %s to %s.' %
(prop_type, path[prop_type], prop_path))
path[prop_type] = '/sys/class/net/%s/device' % prop_path
if prop_type not in info_attribute_names:
continue
# populate the 'info' collection
info[prop_type] = dict((
(key, unpack(prop['Cellular.%s' % key]))
for key in info_attribute_names[prop_type]
if ('Cellular.%s' % key) in prop))
return (path, info)
@Memorize
def _get_all_connection_info(self):
""" Probes available connectivity and device information """
connection_info = {
self._type_wireless: '/sys/class/net/wlan0/device',
self._type_ethernet: '/sys/class/net/eth0/device',
# 3g(cellular) may also be /sys/class/net/usb0
self._type_3g: '/sys/class/tty/ttyUSB0/device',
}
(path, _) = self.load_flimflam()
if path is not None:
# trust flimflam instead.
for k in connection_info:
connection_info[k] = (path[k] if k in path else '')
return connection_info
def _get_sysfs_device_info(self, path, primary, optional=[]):
"""Gets the device information of a sysfs node.
Args
path: the sysfs device path.
primary: mandatory list of elements to read.
optional: optional list of elements to read.
Returns
[primary_values_dict, optional_values_dict]
"""
primary_values = {}
optional_values = {}
for element in primary:
element_path = os.path.join(path, element)
if not os.path.exists(element_path):
return [None, None]
primary_values[element] = gft_common.ReadOneLine(element_path)
for element in optional:
element_path = os.path.join(path, element)
if os.path.exists(element_path):
optional_values[element] = gft_common.ReadOneLine(element_path)
return [primary_values, optional_values]
def _get_pci_device_info(self, path):
""" Returns a PCI 'vendor:device' component information. """
# TODO(hungte) PCI has a 'rev' info which may be better added into info.
(info, _) = self._get_sysfs_device_info(path, ['vendor', 'device'])
return '%s:%s' % (info['vendor'].replace('0x', ''),
info['device'].replace('0x', '')) if info else None
def _get_usb_device_info(self, path):
""" Returns an USB 'idVendor:idProduct manufacturer product' info. """
# USB in sysfs is hierarchy, and usually uses the 'interface' layer.
# If we are in 'interface' layer, the product info is in real parent folder.
path = os.path.realpath(path)
while path.find('/usb') > 0:
if os.path.exists(os.path.join(path, 'idProduct')):
break
path = os.path.split(path)[0]
optional_fields = ['manufacturer', 'product']
(info, optional) = self._get_sysfs_device_info(
path, ['idVendor', 'idProduct'], optional_fields)
if not info:
return None
info_string = '%s:%s' % (info['idVendor'].replace('0x', ''),
info['idProduct'].replace('0x', ''))
for field in optional_fields:
if field in optional:
info_string += ' ' + optional[field]
return info_string
def get_sysfs_device_id(self, path):
"""Gets a sysfs device identifier. (Currently supporting USB/PCI)
Args
path: a path to sysfs device (ex, /sys/class/net/wlan0/device)
Returns
An identifier string, or self._not_present if not available.
"""
if not path:
return self._not_present
path = os.path.realpath(path)
if not os.path.isdir(path):
return self._not_present
info = (self._get_pci_device_info(path) or
self._get_usb_device_info(path))
return info or self._not_present
# --------------------------------------------------------------------
# Firmware Processing Utilities
@Memorize
def _load_firmware(self, target_name):
filename = gft_common.GetTemporaryFileName('fw_cache')
self._temp_files.append(filename)
option_map = {
'main': '-p internal:bus=spi',
'ec': '-p internal:bus=lpc',
}
assert target_name in option_map
# example output:
# Found chip "Winbond W25x16" (2048 KB, FWH) at physical address 0xfe
# TODO(hungte) maybe we don't need the -V in future -- if that can make
# the command faster.
command = 'flashrom -V %s -r %s' % (option_map[target_name], filename)
parts = []
lines = gft_common.SystemOutput(
command,
progress_messsage='Reading %s firmware: ' % target_name,
show_progress=self._verbose).splitlines()
for line in lines:
match = re.search(r'Found chip "(.*)" .* at physical address ', line)
if match:
parts.append(match.group(1))
part_id = ', '.join(parts)
# restore flashrom target bus
if target_name != 'main':
os.system('flashrom %s >/dev/null 2>&1' % option_map['main'])
return (part_id, filename)
def load_main_firmware(self):
""" Loads and cache main (BIOS) firmware image.
Returns:
A file name of cached image.
"""
(_, image_file) = self._load_firmware('main')
return image_file
def load_ec_firmware(self):
""" Loads and cache EC firmware image.
Returns:
A file name of cached image.
"""
(_, image_file) = self._load_firmware('ec')
return image_file
@Memorize
def _read_gbb_component(self, name):
image_file = self.load_main_firmware()
if not image_file:
ErrorDie('cannot load main firmware')
filename = gft_common.GetTemporaryFileName('gbb%s' % name)
if os.system('gbb_utility -g --%s=%s %s >/dev/null 2>&1' %
(name, filename, image_file)) != 0:
ErrorDie('cannot get %s from GBB' % name)
value = gft_common.ReadBinaryFile(filename)
os.remove(filename)
return value
# --------------------------------------------------------------------
# Enumerable Properties
def get_data_display_geometry(self):
# Get edid from driver.
# TODO(nsanders): this is driver specific.
# TODO(waihong): read-edid is also x86 only.
# format: Mode "1280x800" -> 1280x800
# TODO(hungte) merge this with get_part_id_display_panel
cmd = ('cat "$(find /sys/devices/ -name edid | grep LVDS)" | '
'parse-edid 2>/dev/null | grep "Mode " | cut -d\\" -f2')
output = gft_common.SystemOutput(cmd).splitlines()
return (output if output else [''])
def get_hash_ec_firmware(self):
"""
Returns a hash of Embedded Controller firmware parts,
to confirm we have proper updated version of EC firmware.
"""
image_file = self.load_ec_firmware()
if not image_file:
ErrorDie('get_hash_ec_firmware: cannot read firmware')
return gft_fwhash.GetECHash(file_source=image_file)
def get_hash_ro_firmware(self):
"""
Returns a hash of Read Only (BIOS) firmware parts,
to confirm we have proper keys / boot code / recovery image installed.
"""
image_file = self.load_main_firmware()
if not image_file:
ErrorDie('get_hash_ro_firmware: cannot read firmware')
return gft_fwhash.GetBIOSReadOnlyHash(file_source=image_file)
def get_part_id_3g(self):
device_path = self._get_all_connection_info()[self._type_3g]
return self.get_sysfs_device_id(device_path) or self._not_present
def get_part_id_audio_codec(self):
cmd = 'grep -R Codec: /proc/asound/* | head -n 1 | sed s/.\*Codec://'
part_id = gft_common.SystemOutput(
cmd, progress_messsage='Searching Audio Codecs: ',
show_progress=self._verbose).strip()
return part_id
def get_part_id_bluetooth(self):
return self.get_sysfs_device_id('/sys/class/bluetooth/hci0/device')
def get_part_id_chrontel(self):
""" Gets chrontel HDMI devices by dedicated probing """
def probe_ch7036():
self.load_module('i2c_dev')
probe_cmd = 'ch7036_monitor -p >/dev/null 2>&1'
return 'ch7036' if os.system(probe_cmd) == 0 else ''
method_list = [probe_ch7036]
part_id = ''
for method in method_list:
part_id = method()
DebugMsg('get_part_id_chrontel: %s: %s' %
(method.__name__, part_id or '<failed>'))
if part_id:
break
return part_id
def get_part_id_chipset(self):
# Host bridge is always the first PCI device.
return self.get_sysfs_device_id('/sys/bus/pci/devices/0000:00:00.0')
def get_part_id_cpu(self):
cmd = 'grep -m 1 "model name" /proc/cpuinfo | sed s/.\*://'
part_id = gft_common.SystemOutput(cmd).strip()
return part_id
def get_part_id_display_panel(self):
# format: ModelName "SEC:4231" -> SEC:4231
cmd = ('cat "$(find /sys/devices/ -name edid | grep LVDS)" | '
'parse-edid 2>/dev/null | grep ModelName | cut -d\\" -f2')
output = gft_common.SystemOutput(cmd).strip()
return (output if output else self._not_present)
def get_part_id_dram(self):
# TODO(hungte) if we only want DRAM size, maybe no need to use mosys
self.load_module('i2c_dev')
cmd = ('mosys -l memory spd print geometry | '
'grep size_mb | cut -f2 -d"|"')
part_id = gft_common.SystemOutput(cmd).strip()
if part_id != '':
return part_id
else:
return self._not_present
def get_part_id_ec_flash_chip(self):
(chip_id, _) = self._load_firmware('ec')
return chip_id
def get_part_id_embedded_controller(self):
# example output:
# Found Nuvoton WPCE775x (id=0x05, rev=0x02) at 0x2e
parts = []
res = gft_common.SystemOutput(
'superiotool',
progress_messsage='Probing Embedded Controller: ',
show_progress=self._verbose,
ignore_status=True).splitlines()
for line in res:
match = re.search(r'Found (.*) at', line)
if match:
parts.append(match.group(1))
part_id = ', '.join(parts)
return part_id
def get_part_id_ethernet(self):
device_path = self._get_all_connection_info()[self._type_ethernet]
return self.get_sysfs_device_id(device_path) or self._not_present
def get_part_id_flash_chip(self):
(chip_id, _) = self._load_firmware('main')
return chip_id
def get_part_id_hwqual(self):
part_id = gft_common.SystemOutput('crossystem hwid').strip()
return (part_id if part_id else self._not_present)
def get_part_id_storage(self):
cmd = ('cd $(find /sys/devices -name sda)/../..; '
'cat vendor model | tr "\n" " " | sed "s/ \\+/ /g"')
part_id = gft_common.SystemOutput(cmd).strip()
return part_id
def get_part_id_keyboard(self):
# VPD value "keyboard_layout"="xkb:gb:extd:eng" should be listed.
image_file = self.load_main_firmware()
part_id = gft_common.SystemOutput(
'vpd -i RO_VPD -l -f "%s" | grep keyboard_layout | cut -f4 -d\\"' %
image_file).strip()
return part_id or self._not_present
def get_part_id_tpm(self):
""" Returns Manufacturer_info : Chip_Version """
cmd = 'tpm_version'
tpm_output = gft_common.SystemOutput(cmd)
tpm_lines = tpm_output.splitlines()
tpm_dict = {}
for tpm_line in tpm_lines:
[key, colon, value] = tpm_line.partition(':')
tpm_dict[key.strip()] = value.strip()
part_id = ''
(key1, key2) = ('Manufacturer Info', 'Chip Version')
if key1 in tpm_dict and key2 in tpm_dict:
part_id = tpm_dict[key1] + ':' + tpm_dict[key2]
return part_id
def get_part_id_usb_hosts(self):
usb_bus_list = glob.glob('/sys/bus/usb/devices/usb*')
usb_host_list = [os.path.join(os.path.realpath(path), '..')
for path in usb_bus_list]
usb_host_info = [self.get_sysfs_device_id(device)
for device in usb_host_list]
usb_host_info.sort(reverse=True)
return ' '.join(usb_host_info)
def get_part_id_vga(self):
return self.get_sysfs_device_id('/sys/class/graphics/fb0/device')
def get_part_id_webcam(self):
return self.get_sysfs_device_id('/sys/class/video4linux/video0/device')
def get_part_id_wireless(self):
device_path = self._get_all_connection_info()[self._type_wireless]
return self.get_sysfs_device_id(device_path) or self._not_present
def get_vendor_id_touchpad_synaptics(self):
part_id = ''
detect_program = '/opt/Synaptics/bin/syndetect'
model_string_str = 'Model String'
firmware_id_str = 'Firmware ID'
if os.path.exists(detect_program):
(exit_code, data, _) = gft_common.ShellExecution(
detect_program,
ignore_status=True,
progress_messsage='Synaptics Touchpad: ',
show_progress=self._verbose)
if exit_code != 0:
return part_id
properties = dict(map(str.strip, line.split('=', 1))
for line in data.splitlines() if '=' in line)
model = properties.get(model_string_str, 'UnknownModel')
firmware_id = properties.get(firmware_id_str, 'UnknownFWID')
# The pattern " on xxx Port" may vary by the detection approach,
# so we need to strip it.
model = re.sub(' on [^ ]* [Pp]ort$', '', model)
# Format: Model #FirmwareId
part_id = '%s #%s' % (model, firmware_id)
return part_id
def get_vendor_id_touchpad_generic(self):
cmd_grep = 'grep -i Touchpad /proc/bus/input/devices | sed s/.\*=//'
part_id = gft_common.SystemOutput(
cmd_grep,
progress_messsage='Finding Touchpad: ',
show_progress=self._verbose).strip('"')
return part_id
def get_vendor_id_touchpad(self):
method_list = ['synaptics', 'generic']
part_id = ''
for method in method_list:
part_id = getattr(self, 'get_vendor_id_touchpad_%s' % method)()
DebugMsg('get_vendor_id_touchpad: %s: %s' %
(method, part_id or '<failed>'))
if part_id:
break
return part_id
def get_version_3g_firmware(self):
(_, attributes) = self.load_flimflam()
if attributes and (self._type_3g in attributes):
# A list of possible version info combination, since the supported fields
# may differ for partners.
version_formats = [
['Carrier', 'FirmwareRevision'],
['FirmwareRevision'],
['HardwareRevision']]
info = attributes[self._type_3g]
for version_format in version_formats:
if not set(version_format).issubset(set(info)):
continue
# compact all fields into single line with limited space
version_string = ' '.join([info[key] for key in version_format])
return re.sub('\s+', ' ', version_string)
return self._not_present
def get_version_rw_firmware(self):
"""
Returns the version of Read-Write (writable) firmware from VBLOCK sections.
If A/B has different version, that means this system needs a reboot +
firmwar update so return value is a "error report" in the form "A=x, B=y".
"""
versions = [None, None]
section_names = ['VBLOCK_A', 'VBLOCK_B']
image_file = self.load_main_firmware()
if not image_file:
ErrorDie('Cannot read system main firmware.')
flashrom = self._flashrom
base_img = open(image_file).read()
flashrom_size = len(base_img)
# we can trust base image for layout, since it's only RW.
layout = flashrom.detect_chromeos_bios_layout(flashrom_size, base_img)
if not layout:
ErrorDie('Cannot detect ChromeOS flashrom layout')
for (index, name) in enumerate(section_names):
data = flashrom.get_section(base_img, layout, name)
block = vblock.unpack_verification_block(data)
ver = block['VbFirmwarePreambleHeader']['firmware_version']
versions[index] = ver
# we embed error reports in return value.
assert len(versions) == 2
if versions[0] != versions[1]:
return 'A=%d, B=%d' % (versions[0], versions[1])
return '%d' % versions[0]
# --------------------------------------------------------------------
# Probable Properties
def probe_key_recovery(self, part_id):
current_key = self._read_gbb_component('recoverykey')
file_path = os.path.join(self._file_base, part_id)
target_key = gft_common.ReadBinaryFile(file_path)
return current_key.startswith(target_key)
def probe_key_root(self, part_id):
current_key = self._read_gbb_component('rootkey')
file_path = os.path.join(self._file_base, part_id)
target_key = gft_common.ReadBinaryFile(file_path)
return current_key.startswith(target_key)
def probe_part_id_cardreader(self, part_id):
# A cardreader is always power off until a card inserted. So checking
# it using log messages instead of lsusb can limit operator-attended.
# But note that it does not guarantee the cardreader presented during
# the time of the test.
# TODO(hungte) Grep entire /var/log/message may be slow. Currently we cache
# this result by verify_probable_component, but we should find some better
# and reliable way to detect this.
[vendor_id, product_id] = part_id.split(':')
found_pattern = ('New USB device found, idVendor=%s, idProduct=%s' %
(vendor_id, product_id))
cmd = 'grep -qs "%s" /var/log/messages*' % found_pattern
return os.system(cmd) == 0
# --------------------------------------------------------------------
# Matching
def force_get_property(self, property_name):
""" Returns property value or empty string on error. """
try:
return getattr(self, property_name)()
except gft_common.GFTError, e:
ErrorMsg("Error in probing property %s: %s" % (property_name, e.value))
return ''
except:
ErrorMsg('Exception in getting property %s' % property_name)
return ''
def update_ignored_cids(self, ignored_cids):
for cid in ignored_cids:
if cid in self._to_be_tested_cids:
self._to_be_tested_cids.remove(cid)
DebugMsg('Ignoring cid: %s' % cid)
else:
ErrorDie('The ignored cid %s is not defined' % cid)
self._not_test_cids.append(cid)
def read_approved_from_file(self, filename):
approved = gft_common.LoadComponentsDatabaseFile(filename)
for cid in self._to_be_tested_cids + self._not_test_cids:
if cid not in approved:
# If we don't have any listing for this type
# of part in HWID, it's not required.
WarningMsg('gft_hwcomp: Bypassing unlisted cid %s' % cid)
approved[cid] = '*'
return approved
def get_all_enumerable_components(self, async):
results = {}
thread_pools = []
def fetch_enumerable_component(cid):
""" Fetch an enumerable component and update the results """
if self._verbose:
sys.stdout.flush()
sys.stderr.write('<Fetching property %s>\n' % cid)
components = self.force_get_property('get_' + cid)
if not isinstance(components, list):
components = [components]
results[cid] = components
class FetchThread(threading.Thread):
""" Thread object for parallel enumerating """
def __init__(self, cid):
threading.Thread.__init__(self)
self.cid = cid
def run(self):
fetch_enumerable_component(self.cid)
for cid in self._enumerable_cids:
if async and cid not in self._non_async_cids:
thread_pools.append(FetchThread(cid))
else:
fetch_enumerable_component(cid)
# Complete the threads
for thread in thread_pools:
thread.start()
for thread in thread_pools:
thread.join()
return results
def format_failure(self, exact_values, approved_values):
message_not_present = 'Not Present'
actual = [(message_not_present
if value in self._failure_list else value)
for value in exact_values]
expected = [(message_not_present
if value in self._failure_list else value)
for value in approved_values]
return ['Actual: %s' % ', '.join(set(actual)),
'Expected: %s' % ' | '.join(set(expected))]
def check_enumerable_component(self, cid, exact_values, approved_values):
if '*' in approved_values:
return
unmatched = [value for value in exact_values
if value not in approved_values]
if not unmatched:
return
# there's some error, let's try to match them in legacy list
match_goal = [value for value in approved_values
if value not in self._failure_list]
legacy_approved = filter(self.is_legacy_device_record, match_goal)
if match_goal and (set(legacy_approved) == set(match_goal)):
DebugMsg('Start legacy search for cid: ' + cid)
# TODO(hungte) prefetch this list in async batch process.
legacy_list = self._get_legacy_device_list()
matched = list(set(legacy_list).intersection(set(match_goal)))
if matched:
DebugMsg('Changed detected list: %s->%s' % (self._system[cid], matched))
self._system[cid] = matched
return
# update with remaining error.
self._failures[cid] = self.format_failure(exact_values, approved_values)
@Memorize
def verify_probable_component(self, cid, approved_values):
if '*' in approved_values:
return (True, ['*'])
for value in approved_values:
present = getattr(self, 'probe_' + cid)(value)
if present:
return (True, [value])
return (False, [self._no_match])
def check_probable_component(self, cid, approved_values):
(probed, value) = self.verify_probable_component(cid, approved_values)
if probed:
self._system[cid] = value
else:
self._failures[cid] = self.format_failure(value, approved_values)
def pformat(self, obj):
return '\n' + self._pp.pformat(obj) + '\n'
def initialize(self, force=False, async=False):
if self._initialized and not force:
return
# probe current system components
DebugMsg('Starting to detect system components...')
self._enumerable_system = self.get_all_enumerable_components(async)
self._initialized = True
def match_current_system(self, filename, ignored_cids=[]):
""" Matches a given component list to current system.
Returns: (current, failures)
"""
# assert self._initialized, 'Not initialized.'
self._to_be_tested_cids = (self._enumerable_cids +
self._probable_cids)
self._not_test_cids = self._pure_data_cids[:]
self.update_ignored_cids(ignored_cids)
self._failures = {}
self._system = {}
self._system.update(self._enumerable_system)
self._file_base = gft_common.GetComponentsDatabaseBase(filename)
approved = self.read_approved_from_file(filename)
for cid in self._enumerable_cids:
if cid not in self._to_be_tested_cids:
VerboseMsg('gft_hwcomp: match: ignored: %s' % cid)
else:
self.check_enumerable_component(
cid, self._enumerable_system[cid], approved[cid])
for cid in self._probable_cids:
if cid not in self._to_be_tested_cids:
VerboseMsg('gft_hwcomp: match: ignored: %s' % cid)
else:
self.check_probable_component(cid, approved[cid])
return (self._system, self._failures)
#############################################################################
# Console main entry
@gft_common.GFTConsole
def _main(self_path, args):
do_async = True
# preprocess args
compdb_list = []
for arg in args:
if arg == '--sync':
do_async = False
elif arg == '--async':
do_async = True
elif not os.path.exists(arg):
print 'ERROR: unknown parameter: ' + arg
print 'Usage: %s [--sync|--async] [components_db_files...]\n' % self_path
sys.exit(1)
else:
compdb_list.append(arg)
components = HardwareComponents(verbose=True)
print 'Starting to detect%s...' % (' asynchrounously' if do_async else '')
components.initialize(async=do_async)
if not compdb_list:
print 'Enumerable properties:'
print components.pformat(components._enumerable_system)
sys.exit(0)
print 'Starting to match system...'
for arg in compdb_list:
(matched, failures) = components.match_current_system(arg)
print 'Probed (%s):' % arg
print components.pformat(matched)
print 'Failures (%s):' % arg
print components.pformat(failures)
if __name__ == '__main__':
_main(sys.argv[0], sys.argv[1:])