blob: 1934cdfe92fc66d7cb040fddda24ecaef8d20d58 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2021 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Plugin for merging HWID information into ConfigBundle format"""
import copy
import logging as _logging
import re
import yaml
from chromiumos.config.api.component_pb2 import Component
from chromiumos.config.payload.config_bundle_pb2 import ConfigBundle
from common import config_bundle_utils as cbu
from .merge_plugin import MergePlugin
# Create named logger
logging = _logging.getLogger(__name__)
def _include_item(item):
"""Predicate to determine whether to include a HWID item.
Modify this to add additional exclusion conditions.
"""
# Status values that indicate we should exclude the item
bad_status_values = set([])
if not item['values']:
return False
if item.get('status', '').lower() in bad_status_values:
return False
return True
def _set_support_status(component, item):
"""Set the supported_status field of a component from its status."""
status = item.get('status')
if not status:
return component
status_map = {
'deprecated': component.SupportStatus.STATUS_DEPRECATED,
'supported': component.SupportStatus.STATUS_SUPPORTED,
'unqualified': component.SupportStatus.STATUS_UNQUALIFIED,
'unsupported': component.SupportStatus.STATUS_UNSUPPORTED,
}
component.support_status = status_map.get(
status.lower(),
component.SupportStatus.STATUS_UNKNOWN,
)
return component
def _maybe_delete(val, key):
"""Delete key from dict, but degrade to noop if it's not present."""
if key in val:
del val[key]
def _del_if_empty(val, key):
"""Delete a key from a dict if it's empty"""
if key in val and not val[key]:
del val[key]
def _non_null_items(items):
"""Unwrap a HWID item block into a dictionary for non-null values.
a HWID item block looks like:
items:
storage_device:
status: unsupported
values:
class: '0x010101'
device: '0xff00'
sectors: '5000000'
vendor: '0xbeef'
some_hardware:
values:
FAKE_RAM_CHIP:
values:
class: '0x010101'
device: '0xff00'
sectors: '250000000'
vendor: '0xabcd'
We'll iterate over the items and check whether it should be excluded
based on _include_item.
The resulting output is a dict with unwrapped item values:
{
'storage_device': {'class' : '0x010101', ...},
...
}
"""
return {key: item for key, item in items.items() if _include_item(item)}
class MergeHwid(MergePlugin):
"""Merge plugin for HWID files.
After calling merge(), residual() can be called to get an object containing
any remaining values in the HWID data.
"""
def __init__(self, hwid_path=None, hwid_data=None):
"""Create a new HWID merger.
Args:
hwid_path (str): Path to the HWID file on disk to read
hwid_data (dict): HWID data specified directly
"""
if hwid_path and hwid_data:
raise RuntimeError('Only one of hwid_path or hwid_data can be specified')
if hwid_path:
with open(hwid_path) as hwid_file:
self.data = yaml.load(hwid_file, Loader=yaml.FullLoader)
else:
self.data = copy.deepcopy(hwid_data)
# We're not interested in the details of the HWID encoding so remove those
# fields from the residual
_maybe_delete(self.data, 'pattern')
_maybe_delete(self.data, 'encoded_fields')
_maybe_delete(self.data, 'encoding_patterns')
# And remove any other fields that would just be residual noise
_maybe_delete(self.data, 'checksum')
_maybe_delete(self.data, 'image_id')
_maybe_delete(self.data, 'rules')
_maybe_delete(self.data, 'project')
def residual(self):
"""Get any remaining data that hasn't been parsed."""
return self.data
def merge(self, bundle: ConfigBundle):
"""Merge our data into the given ConfigBundle instance."""
components = self.data['components']
for component_type in list(components.keys()):
value = components[component_type]
if not value:
_del_if_empty(components, component_type)
continue
# yapf: disable
callback = {
'audio_codec': MergeHwid._merge_audio,
'battery': MergeHwid._merge_battery,
'bluetooth': MergeHwid._merge_bluetooth,
'cellular': MergeHwid._merge_cellular,
'cpu': MergeHwid._merge_cpu,
'display_panel': MergeHwid._merge_display_panel,
'dram': MergeHwid._merge_dram,
'ec_flash_chip': MergeHwid._merge_ec_flash,
'embedded_controller': MergeHwid._merge_ec,
'flash_chip': MergeHwid._merge_flash,
'storage': MergeHwid._merge_storage,
'stylus': MergeHwid._merge_stylus,
'touchpad': MergeHwid._merge_touchpad,
'tpm': MergeHwid._merge_tpm,
'touchscreen': MergeHwid._merge_touchscreen,
'usb_hosts': MergeHwid._merge_usb_hosts,
'video': MergeHwid._merge_video,
'wireless': MergeHwid._merge_wireless,
}.get(component_type)
# yapf: enable
if callback:
value['items'] = _non_null_items(value['items'])
MergeHwid._iterate_items(bundle, value['items'], callback)
_del_if_empty(value, 'items')
_del_if_empty(components, component_type)
_del_if_empty(self.data, 'components')
@staticmethod
def _iterate_items(bundle, items, callback):
"""Iterate items in an item dict and call callback on them.
Handle boilerplate for callbacks to delete empty/touched
fields as we go to leave an informative residual.
Callbacks must take a ConfigBundle and an item and return
an iterable of fields in the item that were touched/used.
Args:
bundle: ConfigBundle instance to modify
items: dict of label => [item]
callback: callback to process a single item.
"""
# Copy keys list so that we can remove from items as we go.
for label in list(items.keys()):
item = items[label]
touched = callback(bundle, label, item)
values = item.get('values', {})
for name in touched:
_maybe_delete(values, name)
_del_if_empty(item, 'values')
_del_if_empty(items, label)
@staticmethod
def _merge_audio(bundle, label, item):
"""Merge audio_codec items."""
values = item.get('values', {})
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
# save HWID values
component.hwid_type = 'audio_codec'
component.hwid_label = label
# configure component
component.name = values.get('name', label)
component.audio_codec.name = component.name
return ['name']
@staticmethod
def _merge_battery(bundle, label, item):
"""Merge battery items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = component.id.value
# save HWID values
component.hwid_type = 'battery'
component.hwid_label = label
# lookup or create manufacturer
if 'manufacturer' in values:
# We seem to have some bad manufacturer values in the battery component.
# If we can't decode them to a valid unicode string, skip them.
mfgr = values['manufacturer']
if isinstance(mfgr, bytes):
try:
mfgr = mfgr.decode()
except UnicodeDecodeError as exception:
logging.error("Error decoding '%s' to string: %s", mfgr, exception)
return set()
component.manufacturer_id.MergeFrom(
cbu.find_partner(bundle, mfgr, create=True).id)
touched.add('manufacturer')
# set model
if 'model_name' in values:
component.battery.model = values['model_name']
touched.add('model_name')
# set battery technology
if 'technology' in values:
tech = values['technology'].lower().replace('-', '')
tech_map = {
'liion': Component.Battery.LI_ION,
'lipoly': Component.Battery.LI_POLY,
}
if tech in tech_map:
component.battery.technology = tech_map[tech]
touched.add('technology')
return touched
@staticmethod
def _merge_bluetooth(bundle, label, item):
"""Merge bluetooth items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = component.id.value
# save HWID values
component.hwid_type = 'bluetooth'
component.hwid_label = label
# lookup or create manufacturer
if 'manufacturer' in values:
component.manufacturer_id.MergeFrom(
cbu.find_partner(bundle, values['manufacturer'], create=True).id)
touched.add('manufacturer')
fields = [
('vendor_id', 'idVendor'),
('product_id', 'idProduct'),
('bcd_device', 'bcdDevice'),
]
for attr, key in fields:
if key in values:
setattr(component.bluetooth.usb, attr, values[key])
touched.add(key)
return touched
@staticmethod
def _merge_cellular(bundle, label, item):
"""Merge cellular items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = component.id.value
# save HWID values
component.hwid_type = 'cellular'
component.hwid_label = label
# lookup or create manufacturer
if 'manufacturer' in values:
component.manufacturer_id.MergeFrom(
cbu.find_partner(bundle, values['manufacturer'], create=True).id)
touched.add('manufacturer')
fields = [
('vendor_id', 'idVendor'),
('product_id', 'idProduct'),
('bcd_device', 'bcdDevice'),
]
for attr, key in fields:
if key in values:
setattr(component.cellular.usb, attr, values[key])
touched.add(key)
return touched
@staticmethod
def _merge_cpu(bundle, label, item):
"""Merge cpu items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = component.id.value
# save HWID values
component.hwid_type = 'cpu'
component.hwid_label = label
model_re = re.compile( # reversed from HWID cpu model values
'(a[0-9]?-[0-9]+[a-z]?|(m3-|i3-|i5-|i7-)*[0-9y]{4,5}[uy]?|n[0-9]{4})')
component.soc.cores = int(values.get('cores', 0))
touched.add('cores')
if 'model' in values:
component.soc.model = values['model']
touched.add('model')
model_string = values['model'].lower()
if 'intel' in model_string or 'amd' in model_string:
component.soc.family.arch = component.soc.X86_64
elif 'aarch64' in model_string or 'armv8' in model_string:
component.soc.family.arch = component.soc.ARM64
elif 'armv7' in model_string:
component.soc.family.arch = component.soc.ARM
else:
logging.warning('unknown family for cpu model \'%s\'', model_string)
match = model_re.search(model_string)
if match:
component.soc.family.name = match.group(0).upper()
return touched
@staticmethod
def _merge_display_panel(bundle, label, item):
"""Merge display panel items."""
values = item.get('values', {})
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = component.id.value
# save HWID values
component.hwid_type = 'display_panel'
component.hwid_label = label
if 'vendor' in values:
component.manufacturer_id.MergeFrom(
cbu.find_partner(bundle, values['vendor'], create=True).id)
component.display_panel.product_id = values.get('product_id', '')
component.display_panel.properties.width_px = int(values.get('width', 0))
component.display_panel.properties.height_px = int(values.get('height', 0))
return set(['vendor', 'product_id', 'width', 'height'])
@staticmethod
def _merge_dram(bundle, label, item):
"""Merge dram items."""
values = item.get('values', {})
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = component.id.value
# save HWID values
component.hwid_type = 'dram'
component.hwid_label = label
component.memory.part_number = values.get('part', '')
component.memory.profile.size_megabytes = int(values.get('size', 0))
if 'timing' in values:
memory_type = values['timing'].split('-')[0]
if memory_type == 'LPDDR4':
component.memory.profile.type = component.memory.LP_DDR4
elif memory_type == 'LPDDR3':
component.memory.profile.type = component.memory.LP_DDR3
elif memory_type == 'DDR4':
component.memory.profile.type = component.memory.DDR4
elif memory_type == 'DDR3':
component.memory.profile.type = component.memory.DDR3
elif memory_type == 'DDR2':
component.memory.profile.type = component.memory.DDR2
elif memory_type == 'DDR':
component.memory.profile.type = component.memory.DDR
return set(['part', 'size', 'timing'])
@staticmethod
def _merge_ec_flash(bundle, label, item):
"""Merge EC flash items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = component.id.value
# save HWID values
component.hwid_type = 'ec_flash_chip'
component.hwid_label = label
component.ec_flash_chip.part_number = values.get('name', '')
touched.add('name')
if 'vendor' in values:
component.manufacturer_id.MergeFrom(
cbu.find_partner(bundle, values['vendor'], create=True).id)
touched.add('vendor')
return touched
@staticmethod
def _merge_ec(bundle, label, item):
"""Merge EC items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = component.id.value
# save HWID values
component.hwid_type = 'embedded_controller'
component.hwid_label = label
component.ec.part_number = values.get('name', '')
touched.add('name')
if 'vendor' in values:
component.manufacturer_id.MergeFrom(
cbu.find_partner(bundle, values['vendor'], create=True).id)
touched.add('vendor')
return touched
@staticmethod
def _merge_flash(bundle, label, item):
"""Merge flash items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = component.id.value
# save HWID values
component.hwid_type = 'flash_chip'
component.hwid_label = label
component.system_flash_chip.part_number = values.get('name', '')
touched.add('name')
if 'vendor' in values:
component.manufacturer_id.MergeFrom(
cbu.find_partner(bundle, values['vendor'], create=True).id)
touched.add('vendor')
return touched
@staticmethod
def _merge_storage(bundle, label, item):
"""Merge storage items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = values.get('model', label)
touched.add('model')
# save HWID values
component.hwid_type = 'storage'
component.hwid_label = label
component.storage.emmc5_fw_ver = values.get('emmc5_fw_ver', '')
component.storage.manfid = values.get('manfid', '')
component.storage.name = values.get('name', '')
component.storage.oemid = values.get('oemid', '')
component.storage.prv = values.get('prv', '')
component.storage.sectors = values.get('sectors', '')
touched.update(
['emmc5_fw_ver', 'manfid', 'name', 'oemid', 'prv', 'sectors'])
pcie_fields = ['class', 'device', 'vendor']
if all([field in values for field in pcie_fields]):
component.storage.type = component.storage.NVME
component.storage.pci.vendor_id = values['vendor']
component.storage.pci.device_id = values['device']
component.storage.pci.class_id = values['class']
touched.update(pcie_fields)
if values.get('type', '').lower() == 'mmc':
component.storage.type = component.storage.EMMC
touched.add('type')
if values.get('vendor', '').lower() == 'ata':
component.storage.type = component.storage.SATA
touched.add('vendor')
return touched
@staticmethod
def _merge_stylus(bundle, label, item):
"""Merge stylus items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = values.get('name', label)
touched.add('name')
# save HWID values
component.hwid_type = 'stylus'
component.hwid_label = label
i2c_keys = ['product', 'vendor']
if all(key in values for key in i2c_keys):
component.stylus.i2c.product = values['product']
component.stylus.i2c.vendor = values['vendor']
touched.update(i2c_keys)
usb_keys = ['product_id', 'vendor_id']
if all(key in values for key in usb_keys):
component.stylus.usb.product_id = values['product_id']
component.stylus.usb.vendor_id = values['vendor_id']
touched.update(usb_keys)
if 'bcd_device' in values:
component.stylus.usb.bcd_device = values['bcd_device']
touched.add('bcd_device')
if 'version' in values:
component.stylus.usb.bcd_device = values['version']
touched.add('version')
return touched
@staticmethod
def _merge_touchpad(bundle, label, item):
"""Merge touchpad items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = values.get('model', label)
touched.add('model')
# save HWID values
component.hwid_type = 'touchpad'
component.hwid_label = label
# Check for USB based touchpad
# We don't receive an explicit type for the touchpad bus type, so
# we assume that if we have a product and vendor id, that it's USB,
# otherwise it's I2C (rare)
if 'product' in values and 'vendor' in values:
component.touchpad.type = component.touchpad.USB
component.touchpad.product_id = component.name
component.touchpad.usb.vendor_id = values['vendor']
component.touchpad.usb.product_id = values['product']
touched.update(['vendor', 'product'])
elif 'fw_version' in values and 'fw_csum' in values:
# i2c based touchpad
component.touchpad.type = component.touchpad.I2C
component.touchpad.product_id = values.get('product_id', '')
component.touchpad.fw_version = values['fw_version']
component.touchpad.fw_checksum = values['fw_csum']
touched.update(['fw_version', 'fw_csum', 'product_id'])
return touched
@staticmethod
def _merge_tpm(bundle, label, item):
"""Merge tpm items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = label
# save HWID values
component.hwid_type = 'tpm'
component.hwid_label = label
component.tpm.manufacturer_info = values.get('manufacturer_info', '')
component.tpm.version = values.get('version', '')
touched.update(['manufacturer_info', 'version'])
return touched
@staticmethod
def _merge_touchscreen(bundle, label, item):
"""Merge touchscreen items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = values.get('name', label)
touched.add('name')
# save HWID values
component.hwid_type = 'touchscreen'
component.hwid_label = label
def oneof(values, keys, default=""):
for key in keys:
if key in values:
return values[key]
return default
component.touchscreen.product_id = label
component.touchscreen.usb.product_id = oneof(
values,
['product', 'product_id'],
)
component.touchscreen.usb.vendor_id = oneof(values, ['vendor', 'vendor_id'])
component.touchscreen.usb.bcd_device = values.get('bcd_device', '')
if all([
component.touchscreen.usb.product_id,
component.touchscreen.usb.vendor_id
]):
component.touchscreen.type = component.touchscreen.USB
touched.update(
['product', 'product_id', 'vendor', 'vendor_id', 'bcd_device'])
return touched
@staticmethod
def _merge_usb_hosts(bundle, label, item):
"""Merge USB host items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = values.get('product', label)
touched.add('product')
# save HWID values
component.hwid_type = 'usb_hosts'
component.hwid_label = label
def get_oneof(obj, keys, default=None):
"""Get one of a set of keys from values, or return a default value."""
for key in keys:
if key in obj:
touched.add(key)
return obj[key]
return default
if 'manufacturer' in values:
component.manufacturer_id.MergeFrom(
cbu.find_partner(bundle, values['manufacturer'], create=True).id)
touched.add('manufacturer')
host = component.usb_host
host.product_id = get_oneof(values, ['idProduct', 'device'], '')
host.vendor_id = get_oneof(values, ['idVendor', 'vendor'], '')
host.bcd_device = get_oneof(values, ['bcdDevice', 'revision_id'], '')
return touched
@staticmethod
def _merge_video(bundle, label, item):
"""Merge video items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = values.get('product', label)
touched.add('product')
# save HWID values
component.hwid_type = 'video'
component.hwid_label = label
usb_fields = ['bcdDevice', 'idProduct', 'idVendor']
pci_fields = ['vendor', 'device', 'revision_id']
if values.get('bus_type') == 'usb' or \
all(key in values for key in usb_fields):
if 'manufacturer' in values:
component.manufacturer_id.MergeFrom(
cbu.find_partner(bundle, values['manufacturer'], create=True).id)
component.camera.usb.vendor_id = values.get('idVendor', '')
component.camera.usb.product_id = values.get('idProduct', '')
component.camera.usb.bcd_device = values.get('bcdDevice', '')
touched.update(usb_fields)
if values.get('bus_type') == 'pci' or \
all(key in values for key in pci_fields):
component.camera.pci.vendor_id = values.get('vendor', '')
component.camera.pci.device_id = values.get('device', '')
component.camera.pci.revision_id = values.get('revision_id', '')
touched.update(pci_fields)
touched.add('bus_type')
return touched
@staticmethod
def _merge_wireless(bundle, label, item):
"""Merge wireless items."""
values = item.get('values', {})
touched = set()
component = cbu.find_component(bundle, id_value=label, create=True)
component = _set_support_status(component, item)
component.name = label
# save HWID values
component.hwid_type = 'wireless'
component.hwid_label = label
component.wifi.pci.vendor_id = values.get('vendor', '')
component.wifi.pci.device_id = values.get('device', '')
component.wifi.pci.revision_id = values.get('revision_id', '')
touched.update(['vendor', 'device', 'revision_id'])
return touched