blob: e746d5ec6247a7757f381052f5b0decabdff61e9 [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from collections import OrderedDict
import itertools
import logging
import math
import re
import uuid
# For unittest purpose. Then mock module can change six.moves.input function.
# TODO(kerker) Remove after py3 upgrade complete.
import six
from six import iteritems
from six import itervalues
from six.moves import xrange
import factory_common # pylint: disable=unused-import
from cros.factory.hwid.v3 import common
from cros.factory.hwid.v3.database import Database
from cros.factory.hwid.v3 import probe
from cros.factory.hwid.v3 import yaml_wrapper as yaml
# The components that are always be created at the front of the pattern,
# even if they don't exist in the probe results.
ESSENTIAL_COMPS = [
'mainboard',
'region',
'dram',
'cpu',
'storage']
# The components that are added in order if they exist in the probe results.
PRIORITY_COMPS = OrderedDict([
('ro_main_firmware', 5),
('firmware_keys', 3),
('ro_ec_firmware', 5)])
def FilterSpecialCharacter(string):
"""Filters special cases and converts all seperation characters to underlines.
"""
string = re.sub(r'[: .-]+', '_', string)
string = re.sub(r'[^A-Za-z0-9_]+', '', string)
string = re.sub(r'[_]+', '_', string)
string = re.sub(r'^_|_$', '', string)
if not string:
string = 'unknown'
return string
def DetermineComponentName(comp_cls, value, name_list=None):
comp_name = _DetermineComponentName(comp_cls, value)
if name_list is None:
name_list = []
return HandleCollisionName(comp_name, name_list)
def HandleCollisionName(comp_name, name_list):
# To prevent name collision, add "_n" if the name already exists.
if name_list is None:
name_list = []
if comp_name in name_list:
suffix_num = 1
while '%s_%d' % (comp_name, suffix_num) in name_list:
suffix_num += 1
comp_name = '%s_%d' % (comp_name, suffix_num)
return comp_name
def _DetermineComponentName(comp_cls, value):
"""Determines the componenet name by the value.
For some specific components, we can determine a meaningful name by the
component value. For example the value contains the vendor name, or the part
number. But some components value don't, so we just use UUID.
Note that the function doen't ganrantee the name is deterministic and unique.
Args:
comp_cls: the component class name.
value: the probed value of the component item.
Returns:
the component name.
"""
# Known specific components.
if comp_cls == 'firmware_keys':
if 'devkeys' in value['key_root']:
return 'firmware_keys_dev'
else:
return 'firmware_keys_non_dev'
# General components.
if len(value) == 1:
return FilterSpecialCharacter(str(next(iter(value.values()))))
try:
return '%s_%smb_%s' % (value['part'], value['size'], value['slot'])
except KeyError:
pass
for key in ['id', 'version', 'model', 'manufacturer', 'part', 'name',
'compact_str']:
if key in value:
return FilterSpecialCharacter(str(value[key]))
return comp_cls + '_' + str(uuid.uuid4())[:8]
def PromptAndAsk(question_str, default_answer=True):
"""Prompts the question and asks user to decide yes or no.
If the first character user enter is not 'y' nor 'n', the method returns the
default answer.
Args:
question_str: the question prompted to ask the user.
default_answer: the default answer of the question.
"""
hint_str = ' [Y/n] ' if default_answer else ' [y/N] '
input_str = six.moves.input(question_str + hint_str)
if input_str and input_str[0].lower() in ['y', 'n']:
ret = input_str[0].lower() == 'y'
else:
ret = default_answer
logging.info('You chose: %s', 'Yes' if ret else 'No')
return ret
def ChecksumUpdater():
"""Finds the checksum updater in the chromium source tree.
Returns:
a update_checksum module if found. otherwise return None.
"""
try:
from cros.chromeoshwid import update_checksum
return update_checksum
except ImportError:
logging.error('checksum_update is not found.')
return None
class DatabaseBuilder(object):
"""A helper class for updating a HWID Database object.
properties:
database: The Database object this class manipulates on.
_from_empty_database: True if this builder is for creating a new database.
"""
_DEFAULT_COMPONENT_SUFFIX = '_default'
def __init__(self, database_path=None, project=None, image_name=None):
"""Constructor.
If the `database_path` is given, this class will load the existed database;
otherwise this class will create an new empty database which project and
the image name of image id 0 meets the given arguments `project` and
`image_name`.
Args:
database_path: A path to the database to be loaded.
project: A string of the project name.
image_name: A string of the default image name.
"""
if database_path:
self.database = Database.LoadFile(database_path, verify_checksum=False)
self._from_empty_database = False
if not self.database.can_encode:
raise ValueError('The given HWID database %r is legacy and not '
'supported by DatabaseBuilder.' % database_path)
else:
if project is None or image_name is None:
raise ValueError('No project name.')
self.database = self._BuildEmptyDatabase(project.upper(), image_name)
self._from_empty_database = True
def AddDefaultComponent(self, comp_cls):
"""Adds a default component item and corresponding rule to the database.
Args:
comp_cls: The component class.
"""
logging.info('Component [%s]: add a default item.', comp_cls)
if self.database.GetDefaultComponent(comp_cls) is not None:
raise ValueError(
'The component class %r already has a default component.' % comp_cls)
comp_name = comp_cls + self._DEFAULT_COMPONENT_SUFFIX
self.database.AddComponent(
comp_cls, comp_name, None, common.COMPONENT_STATUS.unqualified)
def AddNullComponent(self, comp_cls):
"""Updates the database to be able to encode a device without specific
component class.
Args:
comp_cls: A string of the component class name.
"""
field_name = self.database.GetEncodedFieldForComponent(comp_cls)
if not field_name:
self._AddNewEncodedField(comp_cls, [])
return
if len(self.database.GetComponentClasses(field_name)) > 1:
raise ValueError(
'The encoded field %r for component %r encodes more than one '
'component class so it\'s not trivial to mark a null %r component. '
'Please update the database by a real probed results.' %
(field_name, comp_cls, comp_cls))
if all(comps[comp_cls]
for comps in itervalues(self.database.GetEncodedField(field_name))):
self.database.AddEncodedFieldComponents(field_name, {comp_cls: []})
def UpdateByProbedResults(self, probed_results, device_info, vpd,
image_name=None):
"""Updates the database by a real probed results.
Args:
probed_results: The probed result obtained by probing the device.
device_info: An emty dict or a dict contains the device information.
vpd: An empty dict or a dict contains the vpd values.
"""
if self._from_empty_database and image_name:
logging.warning('The argument `image_name` will be ignored when '
'DatabaseBuilder is creating the new database instead of '
'updating an existed database.')
bom = self._UpdateComponents(probed_results, device_info, vpd)
self._UpdateEncodedFields(bom)
if not self._from_empty_database:
self._MayAddNewPatternAndImage(image_name)
self._UpdatePattern()
def Render(self, database_path):
"""Renders the database to a yaml file.
Args:
database_path: the path of the output HWID database file.
"""
self.database.DumpFile(database_path)
checksum_updater = ChecksumUpdater()
if checksum_updater is None:
logging.info('Checksum is not updated.')
else:
logging.info('Update the checksum.')
checksum_updater.UpdateFile(database_path)
@classmethod
def _BuildEmptyDatabase(cls, project, image_name):
return Database.LoadData(
'checksum: None\n' +
'project: %s\n' % project +
'encoding_patterns:\n' +
' 0: default\n' +
'image_id:\n' +
' 0: %s\n' % image_name +
'pattern:\n' +
' - image_ids: [0]\n' +
' encoding_scheme: %s\n' % common.ENCODING_SCHEME.base8192 +
' fields: []\n' +
'encoded_fields:\n' +
' region_field: !region_field []\n' +
'components:\n' +
' region: !region_component\n' +
'rules: []\n')
def _AddComponent(self, comp_cls, probed_value):
"""Tries to add a item into the component.
Args:
comp_cls: The component class.
probed_value: The probed value of the component.
"""
# Set old firmware components to deprecated.
if comp_cls in ['ro_main_firmware', 'ro_ec_firmware', 'ro_pd_firmware']:
for comp_name in self.database.GetComponents(comp_cls):
self.database.SetComponentStatus(
comp_cls, comp_name, common.COMPONENT_STATUS.deprecated)
comp_name = DetermineComponentName(
comp_cls, probed_value, list(self.database.GetComponents(comp_cls)))
logging.info('Component %s: add an item "%s".', comp_cls, comp_name)
self.database.AddComponent(
comp_cls, comp_name, probed_value, common.COMPONENT_STATUS.unqualified)
# Deprecate the default component.
default_comp_name = self.database.GetDefaultComponent(comp_cls)
if default_comp_name is not None:
self.database.SetComponentStatus(
comp_cls, default_comp_name, common.COMPONENT_STATUS.unsupported)
def _AddComponents(self, comp_cls, probed_values):
"""Adds a list of components to the database.
Args:
comp_cls: A string of the component class name.
probed_values: A list of probed value from the device.
"""
def _IsSubset(subset, superset):
return all([subset.get(key) == value
for key, value in iteritems(superset)])
# Only add the unique component to the database.
# For example, if the given probed values are
# {"a": "A", "b": "B"},
# {"a": "A", "b": "B", "c": "C"},
# {"a": "A", "x": "X", "y": "Y"}
# then we only add the first and the third components because the second
# one is considered as the same as the first one.
for i, probed_value_i in enumerate(probed_values):
if (any(_IsSubset(probed_value_i, probed_values[j]) and
probed_value_i != probed_values[j] for j in xrange(i)) or
any(_IsSubset(probed_value_i, probed_values[j])
for j in xrange(i + 1, len(probed_values)))):
continue
self._AddComponent(comp_cls, probed_value_i)
def _AddNewEncodedField(self, comp_cls, comp_names):
"""Adds a new encoded field for the specific component class.
Args:
comp_cls: The component class.
comp_names: A list of component name.
"""
field_name = HandleCollisionName(
comp_cls + '_field', self.database.encoded_fields)
self.database.AddNewEncodedField(field_name, {comp_cls: comp_names})
def _UpdateComponents(self, probed_results, device_info, vpd):
"""Updates the component part of the database.
This function update the database by trying to generate the BOM object
and add mis-matched components on the probed results to the database.
Args:
probed_results: The probed results generated by probing the device.
device_info: The device info object.
vpd: A dict stores the vpd values.
"""
# Add extra components.
existed_comp_classes = self.database.GetComponentClasses()
for comp_cls, probed_comps in iteritems(probed_results):
if comp_cls not in existed_comp_classes:
# We only need the probe values here.
probed_values = [probed_comp['values'] for probed_comp in probed_comps]
if not probed_values:
continue
self._AddComponents(comp_cls, probed_values)
if self._from_empty_database:
continue
add_null = PromptAndAsk(
'Found probed values of [%s] component\n' % comp_cls +
''.join(['\n' + yaml.dump(probed_value, default_flow_style=False)
for probed_value in probed_values]).replace('\n', '\n ') +
'\n' +
'to be added to the database, please confirm that:\n' +
'If the device has a SKU without %s component, ' % comp_cls +
'please enter "Y".\n' +
'If the device always has %s component, ' % comp_cls +
'please enter "N".\n',
default_answer=True)
if add_null:
self.AddNullComponent(comp_cls)
# Add mismatched components to the database.
bom, mismatched_probed_results = probe.GenerateBOMFromProbedResults(
self.database, probed_results, device_info, vpd,
common.OPERATION_MODE.normal, True)
if mismatched_probed_results:
for comp_cls, probed_comps in iteritems(mismatched_probed_results):
self._AddComponents(
comp_cls, [probed_comp['values'] for probed_comp in probed_comps])
bom = probe.GenerateBOMFromProbedResults(
self.database, probed_results, device_info, vpd,
common.OPERATION_MODE.normal, False)[0]
# Ensure all essential components are recorded in the database.
for comp_cls in ESSENTIAL_COMPS:
if comp_cls == 'region':
# Skip checking the region because it's acceptable to have a null
# region component.
continue
if not bom.components.get(comp_cls):
field_name = self.database.GetEncodedFieldForComponent(comp_cls)
if (field_name and
any(not comps[comp_cls] for comps
in itervalues(self.database.GetEncodedField(field_name)))):
# Pass if the database says that device without this component is
# acceptable.
continue
# Ask user to add a default item or a null item.
add_default = PromptAndAsk(
'Component [%s] is essential but the probe result is missing. '
'Do you want to add a default item?\n'
'If the probed code is not ready yet, please enter "Y".\n'
'If the device does not have the component, please enter "N".'
% comp_cls, default_answer=True)
if add_default:
self.AddDefaultComponent(comp_cls)
else:
# If there's already an encoded field for this component, leave
# the work to `_UpdateEncodedFields` method and do nothing here.
if not field_name:
self.AddNullComponent(comp_cls)
return probe.GenerateBOMFromProbedResults(
self.database, probed_results, device_info, vpd,
common.OPERATION_MODE.normal, False)[0]
def _UpdateEncodedFields(self, bom):
covered_comp_classes = set()
for field_name in self.database.encoded_fields:
comp_classes = self.database.GetComponentClasses(field_name)
for comps in itervalues(self.database.GetEncodedField(field_name)):
if all(comp_names == bom.components[comp_cls]
for comp_cls, comp_names in iteritems(comps)):
break
else:
self.database.AddEncodedFieldComponents(
field_name,
{comp_cls: bom.components[comp_cls] for comp_cls in comp_classes})
covered_comp_classes |= set(comp_classes)
# Although the database allows a component recorded but not encoded by
# any of the encoded fields, this builder always ensures that all components
# will be encoded into the HWID string.
for comp_cls in self.database.GetComponentClasses():
if comp_cls not in covered_comp_classes:
self._AddNewEncodedField(comp_cls, bom.components[comp_cls])
def _MayAddNewPatternAndImage(self, image_name):
if image_name in [self.database.GetImageName(image_id)
for image_id in self.database.image_ids]:
if image_name != self.database.GetImageName(
self.database.max_image_id):
raise ValueError('image_id [%s] is already in the database.' % image_id)
# Mark the image name to none if the given image name is the latest image
# name so that the caller can specify that they don't want to create
# an extra image by specifying the image_name to either none or the latest
# image name.
image_name = None
# If the use case is to create a new HWID database, the only pattern
# contained by the empty database is empty.
if not self.database.GetEncodedFieldsBitLength():
return
extra_fields = set(self.database.encoded_fields) - set(
self.database.GetEncodedFieldsBitLength().keys())
if image_name:
self.database.AddImage(self.database.max_image_id + 1, image_name,
common.ENCODING_SCHEME.base8192,
new_pattern=bool(extra_fields))
elif extra_fields and PromptAndAsk(
'WARNING: Extra fields [%s] without assigning a new image_id.\n'
'If the fields are added into the current pattern, the index of '
'these fields will be encoded to index 0 for all old HWID string. '
'Enter "y" if you are sure all old devices with old HWID string '
'have the component with index 0.' %
','.join(extra_fields), default_answer=False) is False:
raise ValueError(
'Please assign a image_id by adding "--image-id" argument.')
def _UpdatePattern(self):
"""Updates the pattern so that it includes all encoded fields."""
def _GetMinBitLength(field_name):
return int(math.ceil(math.log(
max(self.database.GetEncodedField(field_name).keys()) + 1, 2)))
handled_comp_classes = set()
handled_encoded_fields = set()
# Put the important components at first if the pattern is a new one.
if not self.database.GetEncodedFieldsBitLength():
# Put the essential field first, and align the 5-3-5 bit field.
bit_iter = itertools.cycle([5, 3, 5])
next(bit_iter) # Skip the first field, which is for image_id.
for comp_cls in ESSENTIAL_COMPS:
if comp_cls in handled_comp_classes:
continue
field_name = self.database.GetEncodedFieldForComponent(comp_cls)
bit_length = 0
min_bit_length = max(_GetMinBitLength(field_name), 1)
while bit_length < min_bit_length:
bit_length += next(bit_iter)
self.database.AppendEncodedFieldBit(field_name, bit_length)
handled_comp_classes |= set(
self.database.GetComponentClasses(field_name))
handled_encoded_fields.add(field_name)
# Put the priority components.
for comp_cls, bit_length in iteritems(PRIORITY_COMPS):
if comp_cls in handled_comp_classes:
continue
field_name = self.database.GetEncodedFieldForComponent(comp_cls)
if not field_name:
continue
bit_length = max(bit_length, _GetMinBitLength(field_name))
self.database.AppendEncodedFieldBit(field_name, bit_length)
handled_comp_classes |= set(
self.database.GetComponentClasses(field_name))
handled_encoded_fields.add(field_name)
# Append other encoded fields.
curr_bit_lengths = self.database.GetEncodedFieldsBitLength()
for field_name in self.database.encoded_fields:
if field_name in handled_encoded_fields:
continue
bit_length = _GetMinBitLength(field_name)
if (field_name in curr_bit_lengths and
curr_bit_lengths[field_name] >= bit_length):
continue
self.database.AppendEncodedFieldBit(
field_name, bit_length - curr_bit_lengths.get(field_name, 0))