blob: 344dfe91fb76e15ee78b1b9b7efa357d905a8583 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Database classes for HWID v3 operation."""
import collections
import copy
import hashlib
import math
import os
import pprint
import re
import yaml
import factory_common # pylint: disable=W0611
from cros.factory import rule, schema
from cros.factory.common import MakeList, MakeSet
from cros.factory.hwid import common
# Import yaml_tags to decode special YAML tags specific to HWID module.
from cros.factory.hwid import yaml_tags # pylint: disable=W0611
from cros.factory.hwid.base32 import Base32
from cros.factory.hwid.base8192 import Base8192
from cros.factory.utils import file_utils
class Database(object):
"""A class for reading in, parsing, and obtaining information of the given
device-specific component database.
Attributes:
board: A string indicating the board name.
encoding_patterns: An EncodingPatterns object.
image_id: An ImageId object.
pattern: A Pattern object.
encoded_fields: An EncodedFields object.
components: A Components object.
rules: A Rules object.
checksum: The value of the checksum field.
"""
_HWID_FORMAT = {
common.HWID.ENCODING_SCHEME.base32: re.compile(
r'^([A-Z0-9]+)' # group(0): Board
r' (' # group(1): Entire BOM.
r'(?:[A-Z2-7]{4}-)*' # Zero or more 4-character groups with
# dash.
r'[A-Z2-7]{1,4}' # Last group with 1 to 4 characters.
r')$' # End group(1)
),
common.HWID.ENCODING_SCHEME.base8192: re.compile(
r'^([A-Z0-9]+)' # group(0): Board
r' (' # group(1): Entire BOM
r'(?:[A-Z2-7][2-9][A-Z2-7]-)*' # Zero or more 3-character groups with
# dash.
r'[A-Z2-7][2-9][A-Z2-7]' # Last group with 3 characters.
r')$' # End group(1)
)}
def __init__(self, board, encoding_patterns, image_id, pattern,
encoded_fields, components, rules, checksum):
self.board = board
self.encoding_patterns = encoding_patterns
self.image_id = image_id
self.pattern = pattern
self.encoded_fields = encoded_fields
self.components = components
self.rules = rules
self.checksum = checksum
self._SanityChecks()
def _SanityChecks(self):
def _VerifyComponent(comp_cls, comp_name, label):
try:
self.components.CheckComponent(comp_cls, comp_name)
except common.HWIDException as e:
raise common.HWIDException(
'%s in %s[%r]' %
(str(e), label, comp_cls))
# Check that all the component class-name pairs in encoded_fields are valid.
for field, indexed_data in self.encoded_fields.iteritems():
for index, class_name_dict in indexed_data.iteritems():
for comp_cls, comp_names in class_name_dict.iteritems():
if comp_names is None:
_VerifyComponent(comp_cls, None,
'encoded_fields[%r][%r]' % (field, index))
continue
for comp_name in comp_names:
_VerifyComponent(comp_cls, comp_name,
'encoded_fields[%r][%r]' % (field, index))
# Check that every image ID has a corresponding pattern defined.
for image_id in self.image_id:
# Simply get the pattern for each image ID. This call will raise exception
# if it fails to find a pattern for the given image ID.
self.pattern.GetPatternByImageId(image_id=image_id)
# Check that the bit length of each encoded field in the pattern is enough
# to hold all items of the encoded field. We only check the pattern used by
# the latest image id here.
field_bit_length = self.pattern.GetFieldsBitLength()
pattern = self.pattern.GetPatternByImageId()
encoded_fields_in_pattern = set([f.keys()[0] for f in pattern['fields']])
for field in encoded_fields_in_pattern:
if field not in self.encoded_fields:
raise common.HWIDException(
'Pattern contains unknown encoded field %r' % field)
max_index = max(self.encoded_fields[field].iterkeys())
bit_length = field_bit_length[field]
if math.pow(2, bit_length) <= max_index:
raise common.HWIDException(
'Pattern does not have enough bits to hold all items for encoded '
'field %r. The maximum index of %r is %d but its bit length is '
'%d in the pattern' % (field, field, max_index, bit_length))
@staticmethod
def Load(verify_checksum=False):
"""Trys to locate the HWID database at pre-defined locations and load it.
Returns:
The loaded HWID database.
Raises:
HWIDException if no database is found.
"""
return Database.LoadFile(os.path.join(common.DEFAULT_HWID_DATA_PATH,
common.ProbeBoard().upper()),
verify_checksum=verify_checksum)
@staticmethod
def LoadFile(file_name, verify_checksum=False):
"""Loads a device-specific component database from the given file and
parses it to a Database object.
Args:
file_name: A path to a device-specific component database.
verify_checksum: Whether to verify the checksum of the database.
Returns:
A Database object containing all the settings in the database file.
Raises:
HWIDException if there is missing field in the database.
"""
db_yaml = None
with open(file_name, 'r') as f:
db_yaml = yaml.load(f)
return Database.LoadData(db_yaml,
expected_checksum=Database.Checksum(file_name)
if verify_checksum else None,
strict=verify_checksum)
@staticmethod
def Checksum(file_name):
"""Computes a SHA1 digest as the checksum of the given database file.
Args:
file_name: A path to a device-specific component database.
Returns:
The computed checksum as a string.
"""
return Database.ChecksumForText(file_utils.ReadFile(file_name))
@staticmethod
def ChecksumForText(db_text):
"""Computes a SHA1 digest as the checksum of the given database string.
Args:
db_text: The database as a string.
Returns:
The computed checksum as a string.
"""
# Ignore the 'checksum: <hash value>\n' line when calculating checksum.
db_text = re.sub(r'^checksum:.*$\n?', '', db_text, flags=re.MULTILINE)
return hashlib.sha1(db_text).hexdigest()
@staticmethod
def LoadData(db_yaml, expected_checksum=None, strict=True):
"""Loads a device-specific component database from the given database data.
Args:
db_yaml: The database in parsed dict form.
expected_checksum: The checksum value to verify the loaded data with.
A value of None disables checksum verification.
strict: Whether to insist on fully-formed databases. This should always be
true in production use, but may be set to False to accept slightly
older formats, e.g., missing checksum field.
Returns:
A Database object containing all the settings in the database file.
Raises:
HWIDException if there is missing field in the database, or database
integrity veification fails.
"""
if not db_yaml:
raise common.HWIDException('Invalid HWID database')
for key in ['board', 'encoding_patterns', 'image_id', 'pattern',
'encoded_fields', 'components', 'rules', 'checksum']:
if not db_yaml.get(key):
if (not strict) and key == 'checksum':
# That's OK, let it go
pass
else:
raise common.HWIDException(
'%r is not specified in component database' % key)
# Verify database integrity.
if (expected_checksum is not None and
db_yaml['checksum'] != expected_checksum):
raise common.HWIDException(
'HWID database %r checksum verification failed' % db_yaml['board'])
return Database(db_yaml['board'],
EncodingPatterns(db_yaml['encoding_patterns']),
ImageId(db_yaml['image_id']),
Pattern(db_yaml['pattern']),
EncodedFields(db_yaml['encoded_fields']),
Components(db_yaml['components']),
Rules(db_yaml['rules']),
db_yaml.get('checksum'))
def ProbeResultToBOM(self, probe_result, loose_matching=False):
"""Parses the given probe result into a BOM object. Each component is
represented by its corresponding encoded index in the database.
Args:
probe_result: A YAML string of the probe result, which is usually the
output of the probe command.
loose_matching: If set to True, partial match of probed results will be
accepted. For example, if the probed results only contain the
firmware version of RO main firmware but not its hash, and we want to
know if the firmware version is supported, then we can enable
loose_matching to see if the firmware version is supported in the
database.
Returns:
A BOM object.
"""
probed_bom = yaml.load(probe_result)
# encoding_pattern_index and image_id are unprobeable and should be set
# explictly. Defaults them to 0.
encoding_pattern_index = 0
image_id = 0
def LookupProbedValue(comp_cls):
for field in ['found_probe_value_map', 'found_volatile_values',
'initial_configs']:
if comp_cls in probed_bom[field]:
# We actually want to return a list of dict here.
return MakeList(probed_bom[field][comp_cls] if
isinstance(probed_bom[field][comp_cls], list) else
[probed_bom[field][comp_cls]])
# comp_cls is in probed_bom['missing_component_classes'].
return None
# Construct a dict of component classes to list of ProbedComponentResult.
probed_components = collections.defaultdict(list)
for comp_cls in self.components.GetRequiredComponents():
probed_comp_values = LookupProbedValue(comp_cls)
if probed_comp_values is not None:
for probed_value in probed_comp_values:
if comp_cls not in self.components.probeable:
probed_components[comp_cls].append(
common.ProbedComponentResult(
None, probed_value,
common.UNPROBEABLE_COMPONENT_ERROR(comp_cls)))
continue
matched_comps = self.components.MatchComponentsFromValues(
comp_cls, probed_value, loose_matching)
if matched_comps is None:
probed_components[comp_cls].append(common.ProbedComponentResult(
None, probed_value,
common.INVALID_COMPONENT_ERROR(comp_cls, probed_value)))
elif len(matched_comps) == 1:
comp_name, comp_data = matched_comps.items()[0]
comp_status = self.components.GetComponentStatus(
comp_cls, comp_name)
if comp_status == common.HWID.COMPONENT_STATUS.supported:
probed_components[comp_cls].append(
common.ProbedComponentResult(
comp_name, comp_data['values'], None))
else:
probed_components[comp_cls].append(
common.ProbedComponentResult(
comp_name, comp_data['values'],
common.UNSUPPORTED_COMPONENT_ERROR(comp_cls, comp_name,
comp_status)))
elif len(matched_comps) > 1:
probed_components[comp_cls].append(common.ProbedComponentResult(
None, probed_value,
common.AMBIGUOUS_COMPONENT_ERROR(
comp_cls, probed_value, matched_comps)))
else:
# No component of comp_cls is found in probe results.
probed_components[comp_cls].append(common.ProbedComponentResult(
None, probed_comp_values, common.MISSING_COMPONENT_ERROR(comp_cls)))
# Encode the components to a dict of encoded fields to encoded indices.
encoded_fields = {}
for field in self.encoded_fields:
encoded_fields[field] = self._GetFieldIndexFromProbedComponents(
field, probed_components)
return common.BOM(self.board, encoding_pattern_index, image_id,
probed_components, encoded_fields)
def UpdateComponentsOfBOM(self, bom, updated_components):
"""Updates the components data of the given BOM.
The components field of the given BOM is updated with the given component
class and component name, and the encoded_fields field is re-calculated.
Args:
bom: A BOM object to update.
updated_components: A dict of component classes to component names
indicating the set of components to update.
Returns:
A BOM object with updated components and encoded fields.
"""
result = bom.Duplicate()
for comp_cls, comp_name in updated_components.iteritems():
new_probed_result = []
if comp_name is None:
new_probed_result.append(common.ProbedComponentResult(
None, None, common.MISSING_COMPONENT_ERROR(comp_cls)))
else:
comp_name = MakeList(comp_name)
for name in comp_name:
comp_attrs = self.components.GetComponentAttributes(comp_cls, name)
new_probed_result.append(common.ProbedComponentResult(
name, comp_attrs['values'], None))
# Update components data of the duplicated BOM.
result.components[comp_cls] = new_probed_result
# Re-calculate all the encoded index of each encoded field.
result.encoded_fields = {}
for field in self.encoded_fields:
result.encoded_fields[field] = self._GetFieldIndexFromProbedComponents(
field, result.components)
return result
def _GetFieldIndexFromProbedComponents(self, encoded_field,
probed_components):
"""Gets the encoded index of the specified encoded field by matching
the given probed components against the definitions in the database.
Args:
encoded_field: A string indicating the encoding field of interest.
probed_components: A dict that maps a set of component classes to their
list of ProbedComponentResult.
Returns:
An int indicating the encoded index, or None if no matching encoded
index is found.
"""
if encoded_field not in self.encoded_fields:
return None
for index, db_comp_cls_names in (
self.encoded_fields[encoded_field].iteritems()):
# Iterate through all indices in the encoded_fields of the database.
found = True
for db_comp_cls, db_comp_names in db_comp_cls_names.iteritems():
# Check if every component class and value the index consists of
# matches.
if db_comp_names is None:
# Special handling for NULL component.
if (probed_components[db_comp_cls] and
probed_components[db_comp_cls][0].probed_values is not None):
found = False
break
else:
# Create a set of component names of db_comp_cls from the
# probed_components argument.
bom_component_names_of_the_class = MakeSet([
x.component_name for x in probed_components[db_comp_cls]])
# Create a set of component names of db_comp_cls from the database.
db_component_names_of_the_class = MakeSet(db_comp_names)
if (bom_component_names_of_the_class !=
db_component_names_of_the_class):
found = False
break
if found:
return index
return None
def _GetAllIndices(self, encoded_field):
"""Gets a list of all the encoded indices of the given encoded_field in the
database.
Args:
encoded_field: The encoded field of interest.
Returns:
A list of ints of the encoded indices.
"""
return [key for key in self.encoded_fields[encoded_field]
if isinstance(key, int)]
def _GetAttributesByIndex(self, encoded_field, index):
"""Gets the attributes of all the component(s) of a encoded field through
the given encoded index.
Args:
encoded_field: The encoded field of interest.
index: The index of the component.
Returns:
A dict indexed by component classes that includes a list of all the
attributes of the components represented by the encoded index, or None if
the index if not found.
"""
if encoded_field not in self.encoded_fields:
return None
if index not in self.encoded_fields[encoded_field]:
return None
result = collections.defaultdict(list)
for comp_cls, comp_names in (
self.encoded_fields[encoded_field][index].iteritems()):
if comp_names is None:
result[comp_cls] = None
else:
for name in comp_names:
# Add an additional index 'name' to record component name
new_attr = self.components.GetComponentAttributes(comp_cls, name)
new_attr['name'] = name
result[comp_cls].append(new_attr)
return result
def VerifyBinaryString(self, binary_string):
"""Verifies the binary string.
Args:
binary_string: The binary string to verify.
Raises:
HWIDException if verification fails.
"""
if set(binary_string) - set('01'):
raise common.HWIDException('Invalid binary string: %r' % binary_string)
if '1' not in binary_string:
raise common.HWIDException('Binary string %r does not have stop bit set',
binary_string)
# Truncate trailing 0s.
string_without_paddings = binary_string[:binary_string.rfind('1') + 1]
image_id = self.pattern.GetImageIdFromBinaryString(binary_string)
if len(string_without_paddings) > self.pattern.GetTotalBitLength(image_id):
raise common.HWIDException('Invalid bit string length of %r. Expected '
'length <= %d, got length %d' %
(binary_string,
self.pattern.GetTotalBitLength(image_id),
len(string_without_paddings)))
def VerifyEncodedStringFormat(self, encoded_string):
"""Verifies that the format of the given encoded string.
Checks that the string matches either base32 or base8192 format.
Args:
encoded_string: The encoded string to verify.
Raises:
HWIDException if verification fails.
"""
if not any(hwid_format.match(encoded_string) for hwid_format in
self._HWID_FORMAT.itervalues()):
raise common.HWIDException(
'HWID string %r is neither base32 nor base8192 encoded' %
encoded_string)
def VerifyEncodedString(self, encoded_string):
"""Verifies the given encoded string.
Args:
encoded_string: The encoded string to verify.
Raises:
HWIDException if verification fails.
"""
try:
image_id = self.pattern.GetImageIdFromEncodedString(encoded_string)
encoding_scheme = self.pattern.GetPatternByImageId(
image_id)['encoding_scheme']
board, bom_checksum = Database._HWID_FORMAT[encoding_scheme].findall(
encoded_string)[0]
except IndexError:
raise common.HWIDException(
'Invalid HWID string format: %r' % encoded_string)
if len(bom_checksum) < 2:
raise common.HWIDException(
'Length of encoded string %r is less than 2 characters' %
bom_checksum)
if not board == self.board.upper():
raise common.HWIDException('Invalid board name: %r' % board)
# Verify the checksum
stripped = encoded_string.replace('-', '')
hwid = stripped[:-2]
checksum = stripped[-2:]
if encoding_scheme == common.HWID.ENCODING_SCHEME.base32:
expected_checksum = Base32.Checksum(hwid)
elif encoding_scheme == common.HWID.ENCODING_SCHEME.base8192:
expected_checksum = Base8192.Checksum(hwid)
if not checksum == expected_checksum:
raise common.HWIDException('Checksum of %r mismatch (expected %r)' % (
encoded_string, expected_checksum))
def VerifyBOM(self, bom):
"""Verifies the data contained in the given BOM object matches the settings
and definitions in the database.
Args:
bom: The BOM object to verify.
Raises:
HWIDException if verification fails.
"""
# All the classes encoded in the pattern should exist in BOM.
missing_comp = []
for encoded_indices in self.encoded_fields.itervalues():
for index_content in encoded_indices.itervalues():
missing_comp.extend([comp_cls for comp_cls in index_content
if comp_cls not in bom.components])
if missing_comp:
raise common.HWIDException('Missing component classes: %r',
', '.join(sorted(missing_comp)))
bom_encoded_fields = MakeSet(bom.encoded_fields.keys())
db_encoded_fields = MakeSet(self.encoded_fields.keys())
# Every encoded field defined in the database must present in BOM.
if db_encoded_fields - bom_encoded_fields:
raise common.HWIDException('Missing encoded fields in BOM: %r',
', '.join(sorted(db_encoded_fields -
bom_encoded_fields)))
# Every encoded field the BOM has must exist in the database.
if bom_encoded_fields - db_encoded_fields:
raise common.HWIDException('Extra encoded fields in BOM: %r',
', '.join(sorted(bom_encoded_fields -
db_encoded_fields)))
if bom.board != self.board:
raise common.HWIDException('Invalid board name. Expected %r, got %r' %
(self.board, bom.board))
if bom.encoding_pattern_index not in self.encoding_patterns:
raise common.HWIDException('Invalid encoding pattern: %r' %
bom.encoding_pattern_index)
if bom.image_id not in self.image_id:
raise common.HWIDException('Invalid image id: %r' % bom.image_id)
# All the probeable component values in the BOM should exist in the
# database.
unknown_values = []
for comp_cls, probed_values in bom.components.iteritems():
for element in probed_values:
probed_values = element.probed_values
if probed_values is None or comp_cls not in self.components.probeable:
continue
found_comps = (
self.components.MatchComponentsFromValues(comp_cls, probed_values))
if not found_comps:
unknown_values.append('%s:%s' % (comp_cls, pprint.pformat(
probed_values, indent=0, width=1024)))
if unknown_values:
raise common.HWIDException('Unknown component values: %r' %
', '.join(sorted(unknown_values)))
# All the encoded index should exist in the database.
invalid_fields = []
for field, index in bom.encoded_fields.iteritems():
if index is not None and index not in self.encoded_fields[field]:
invalid_fields.append(field)
if invalid_fields:
raise common.HWIDException('Encoded fields %r have unknown indices' %
', '.join(sorted(invalid_fields)))
def VerifyComponents(self, probe_result, comp_list=None,
loose_matching=False):
"""Given a list of component classes, verify that the probed components of
all the component classes in the list are valid components in the database.
Args:
probe_result: A YAML string of the probe result, which is usually the
output of the probe command.
comp_list: An optional list of component class to be verified. Defaults to
None, which will then verify all the probeable components defined in
the database.
loose_matching: If set to True, partial match of probed results will be
accepted. For example, if the probed results only contain the
firmware version of RO main firmware but not its hash, and we want to
know if the firmware version is supported, then we can enable
loose_matching to see if the firmware version is supported in the
database.
Returns:
A dict from component class to a list of one or more
ProbedComponentResult tuples.
{component class: [ProbedComponentResult(
component_name, # The component name if found in the db, else None.
probed_values, # The actual probed string. None if probing failed.
error)]} # The error message if there is one; else None.
"""
probed_bom = self.ProbeResultToBOM(probe_result, loose_matching)
if not comp_list:
comp_list = sorted(self.components.probeable)
if not isinstance(comp_list, list):
raise common.HWIDException('Argument comp_list should be a list')
invalid_cls = set(comp_list) - set(self.components.probeable)
if invalid_cls:
raise common.HWIDException(
'%r do not have probe values and cannot be verified' %
sorted(invalid_cls))
return dict((comp_cls, probed_bom.components[comp_cls]) for comp_cls in
comp_list)
class EncodingPatterns(dict):
"""Class for parsing encoding_patterns in database.
Args:
encoding_patterns_dict: A dict of encoding patterns of the form:
{
0: 'default',
1: 'extra_encoding_pattern',
...
}
"""
def __init__(self, encoding_patterns_dict):
self.schema = schema.Dict('encoding patterns',
key_type=schema.Scalar('encoding pattern', int),
value_type=schema.Scalar('encoding scheme', str))
self.schema.Validate(encoding_patterns_dict)
super(EncodingPatterns, self).__init__(encoding_patterns_dict)
class ImageId(dict):
"""Class for parsing image_id in database.
Args:
image_id_dict: A dict of image ids of the form:
{
0: 'image_id0',
1: 'image_id1',
...
}
"""
def __init__(self, image_id_dict):
self.schema = schema.Dict('image id',
key_type=schema.Scalar('image id', int),
value_type=schema.Scalar('image name', str))
self.schema.Validate(image_id_dict)
super(ImageId, self).__init__(image_id_dict)
class EncodedFields(dict):
"""Class for parsing encoded_fields in database.
Args:
encoded_fields_dict: A dict of encoded fields of the form:
{
'encoded_field_name1': {
0: {
'component_class1': 'component_name1',
'component_class2': ['component_name2', 'component_name3']
...
}
1: {
'component_class1': 'component_name4',
'component_class2': None,
...
}
}
'encoded_field_name2':
...
}
"""
def __init__(self, encoded_fields_dict):
self.schema = schema.Dict(
'encoded fields', schema.Scalar('encoded field', str),
schema.Dict(
'encoded indices', schema.Scalar('encoded index', int),
schema.Dict(
'component classes', schema.Scalar('component class', str),
schema.Optional([schema.Scalar('component name', str),
schema.List('list of component names',
schema.Scalar('component name', str))]))))
self.schema.Validate(encoded_fields_dict)
super(EncodedFields, self).__init__(encoded_fields_dict)
# Convert string to list of string.
for field in self:
for index in self[field]:
for comp_cls in self[field][index]:
comp_value = self[field][index][comp_cls]
if isinstance(comp_value, str):
self[field][index][comp_cls] = MakeList(comp_value)
class Components(object):
"""A class for parsing and obtaining information of a pre-defined components
list.
Args:
components_dict: A dict of components of the form:
{
'component_class_1': { # Probeable component class.
'probeable': True,
'items': {
'component_name_1': {
'values': { probed values dict },
'labels': [ labels ],
'status': status
},
...
}
},
'component_class_2': { # Unprobeable component class.
'probeable': False,
'items': {
'component_name_2': {
'values': None,
'labels': [ labels ],
'status': status
},
...
}
},
...
}
Raises:
HWIDException if the given dict fails sanity checks.
"""
def __init__(self, components_dict):
self.schema = schema.Dict(
'components',
schema.Scalar('component class', str),
schema.FixedDict(
'component description',
items={
'items': schema.Dict(
'component names',
key_type=schema.Scalar('component name', str),
value_type=schema.FixedDict(
'component attributes',
items={'values': schema.Optional(
schema.Dict('probe key-value pairs',
key_type=schema.Scalar('probe key', str),
value_type=schema.AnyOf([
schema.Scalar('probe value', str),
schema.Scalar('probe value regexp',
rule.Value)])))},
optional_items={
'labels': schema.Dict(
'dict of labels',
key_type=schema.Scalar('label key', str),
value_type=schema.Scalar('label value', str)),
'status': schema.Scalar('item status', str)}))
},
optional_items={
'probeable': schema.Scalar('is component probeable', bool)
}))
self.schema.Validate(components_dict)
# Classify components based on their attributes.
self.probeable = set()
for comp_cls, comp_cls_properties in components_dict.iteritems():
if comp_cls_properties.get('probeable', True):
# Default 'probeable' to True.
self.probeable.add(comp_cls)
for comp_cls_data in components_dict.itervalues():
for comp_cls_item_attrs in comp_cls_data['items'].itervalues():
# Sanity check for component status.
status = comp_cls_item_attrs.get(
'status', common.HWID.COMPONENT_STATUS.supported)
if status not in common.HWID.COMPONENT_STATUS:
raise common.HWIDException(
'Invalid component item status: %r' % status)
# Convert all probe values to Value objects.
if comp_cls_item_attrs['values'] is None:
continue
for key, value in comp_cls_item_attrs['values'].items():
if not isinstance(value, rule.Value):
comp_cls_item_attrs['values'][key] = rule.Value(value)
self.components_dict = components_dict
def GetRequiredComponents(self):
"""Gets the list of required component classes.
Returns:
A set of component classes that are required to present on board.
"""
return set(self.components_dict.keys())
def GetComponentAttributes(self, comp_cls, comp_name):
"""Gets the attributes of the given component.
Args:
comp_cls: The component class to look up for.
comp_name: The component name to look up for.
Returns:
A copy of the dict that contains all the attributes of the given
component.
"""
self.CheckComponent(comp_cls, comp_name)
if not comp_name:
# Missing component.
return {}
return copy.deepcopy(self.components_dict[comp_cls]['items'][comp_name])
def GetComponentStatus(self, comp_cls, comp_name):
"""Gets the status of the given component.
Args:
comp_cls: The component class to look up for.
comp_name: The component name to look up for.
Returns:
One of the status in Components.STATUS indicating the status of the given
component.
"""
return self.components_dict[comp_cls]['items'][comp_name].get(
'status', common.HWID.COMPONENT_STATUS.supported)
def MatchComponentsFromValues(self, comp_cls, values_dict,
loose_matching=False):
"""Matches a list of components whose 'values' attributes match the given
'values_dict'.
Only the fields listed in the 'values' dict of each component are used as
matching keys.
For example, this may be used to look up all the dram components that are of
size 4G with {'size': '4G'} as 'values_dict' and 'dram' as 'comp_cls'.
Args:
comp_cls: The component class of interest.
values_dict: A dict of values to be used as look up key.
loose_matching: If set to True, partial match of probed results will be
accepted. For example, if the probed results only contain the
firmware version of RO main firmware but not its hash, and we want to
know if the firmware version is supported, then we can enable
loose_matching to see if the firmware version is supported in the
database.
Returns:
A dict with keys being the matched component names and values being the
dict of component attributes corresponding to the component names.
Raises:
HWIDException if the given component class is invalid.
"""
self.CheckComponent(comp_cls, None)
results = {}
for comp_name, comp_attrs in (
self.components_dict[comp_cls]['items'].iteritems()):
if comp_attrs['values'] is None and values_dict is None:
# Special handling for None values.
results[comp_name] = copy.deepcopy(comp_attrs)
elif comp_attrs['values'] is None:
continue
else:
match = True
keys_missing = set()
for key, value in comp_attrs['values'].iteritems():
# Only match the listed fields in 'values'.
if key not in values_dict:
if loose_matching:
keys_missing.add(key)
continue
else:
match = False
break
if not value.Matches(values_dict[key]):
match = False
break
if (loose_matching and
keys_missing == set(comp_attrs['values'].keys())):
match = False
if match:
results[comp_name] = copy.deepcopy(comp_attrs)
if results:
return results
return None
def CheckComponent(self, comp_cls, comp_name):
"""Checks if the given component class and component name are valid.
Args:
comp_cls: The component class to check.
comp_name: The component name to check. Set this to None will check
component class validity only.
Raises:
HWIDException if the given component class or name are invalid.
"""
if comp_cls not in self.components_dict:
raise common.HWIDException('Invalid component class %r' % comp_cls)
if comp_name and comp_name not in self.components_dict[comp_cls]['items']:
raise common.HWIDException(
'Invalid component name %r of class %r' % (comp_name, comp_cls))
class Pattern(object):
"""A class for parsing and obtaining information of a pre-defined encoding
pattern.
Args:
pattern_list: A list of dicts that maps encoded fields to their
bit length.
"""
def __init__(self, pattern_list):
self.schema = schema.List(
'pattern', schema.FixedDict(
'pattern list', items={
'image_ids': schema.List('image ids', schema.Scalar('image id',
int)),
'encoding_scheme': schema.Scalar('encoding scheme', str),
'fields': schema.List('encoded fields', schema.Dict(
'pattern field', key_type=schema.Scalar(
'encoded index', str),
value_type=schema.Scalar('bit offset', int)))}))
self.schema.Validate(pattern_list)
self.pattern = pattern_list
def GetPatternByImageId(self, image_id=None):
"""Get pattern definition by image id.
Args:
image_id: An integer of the image id to query. If not given, the latest
image id would be used.
Returns:
A dict of the pattern definiton.
"""
id_pattern_map = {}
for pattern in self.pattern:
id_pattern_map.update(dict((image_id, pattern) for image_id in
pattern['image_ids']))
if image_id is None:
return id_pattern_map[max(id_pattern_map.keys())]
if image_id not in id_pattern_map:
raise common.HWIDException(
'Pattern for image id %r is not defined' % image_id)
return id_pattern_map[image_id]
def GetImageIdFromEncodedString(self, encoded_string):
return int(Base32.Decode(encoded_string.split(' ')[1][0])[1:5], 2)
def GetImageIdFromBinaryString(self, binary_string):
return int(binary_string[1:5], 2)
def GetFieldsBitLength(self, image_id=None):
"""Gets a map for the bit length of each encoded fields defined by the
pattern. Scattered fields with the same field name are aggregated into one.
Returns:
A dict mapping each encoded field to its bit length.
"""
if self.pattern is None:
raise common.HWIDException(
'Cannot get encoded field bit length with uninitialized pattern')
ret = collections.defaultdict(int)
for element in self.GetPatternByImageId(image_id)['fields']:
for cls, length in element.iteritems():
ret[cls] += length
return ret
def GetTotalBitLength(self, image_id=None):
"""Gets the total bit length defined by the pattern. Common header and
stopper bit are included.
Args:
image_id: An integer of the image id to query. If not given, the latest
image id would be used.
Returns:
A int indicating the total bit length.
"""
if self.pattern is None:
raise common.HWIDException(
'Cannot get bit length with uninitialized pattern')
# 5 bits for header and 1 bit for stop bit
return (common.HWID.HEADER_BITS + 1 +
sum(self.GetFieldsBitLength(image_id).values()))
def GetBitMapping(self, image_id=None, binary_string_length=None):
"""Gets a map indicating the bit offset of certain encoded field a bit in a
encoded binary string corresponds to.
For example, the returned map may say that bit 5 in the encoded binary
string corresponds to the least significant bit of encoded field 'cpu'.
Args:
image_id: An integer of the image id to query. If not given, the latest
image id would be used.
binary_string_length: The length of the input binary string. If given, it
is used to check against the encoding pattern to see if there is an
incomplete bit chunk.
Returns:
A list of BitEntry objects indexed by bit position in the encoded binary
string. Each BitEntry object has attributes (field, bit_offset) indicating
which bit_offset of field this particular bit corresponds to. For example,
if ret[6] has attributes (field='cpu', bit_offset=1), then it means that
bit position 6 of the encoded binary string corresponds to the bit offset
1 (which is the second least significant bit) of encoded field 'cpu'.
"""
BitEntry = collections.namedtuple('BitEntry', ['field', 'bit_offset'])
if self.pattern is None:
raise common.HWIDException(
'Cannot construct bit mapping with uninitialized pattern')
ret = {}
index = common.HWID.HEADER_BITS # Skips the 5-bit common header.
field_offset_map = collections.defaultdict(int)
if not binary_string_length:
# Exclude stop bit.
binary_string_length = self.GetTotalBitLength(image_id=image_id) - 1
for element in self.GetPatternByImageId(image_id)['fields']:
for field, length in element.iteritems():
# Normally when one wants to extend bit length of a field, he should
# append new pattern field instead of expanding the last field.
# However, for some board, we already have cases where last pattern
# fields were expanded directly. See crosbug.com/p/30266.
#
# We check for incomplete bit string chunk at the end and adjust bit
# indices as needed here, so that we can decode correctly the HWIDs
# generated before the last pattern field was expanded in the above
# scenario.
remaining_bits = binary_string_length - index
field_offset_map[field] += min(remaining_bits, length)
first_bit_index = field_offset_map[field] - 1
# Reverse bit order.
for field_index in xrange(
first_bit_index, first_bit_index - length, -1):
ret[index] = BitEntry(field, field_index)
index += 1
return ret
def GetBitMappingSpringEVT(self, image_id=None):
"""Gets a map indicating the bit offset of certain encoded field a bit in a
encoded binary string corresponds to.
This is a hack for Spring EVT, which used the LSB first encoding pattern.
Args:
image_id: An integer of the image id to query. If not given, the latest
image id would be used.
Returns:
A list of BitEntry objects indexed by bit position in the encoded binary
string. Each BitEntry object has attributes (field, bit_offset) indicating
which bit_offset of field this particular bit corresponds to. For example,
if ret[6] has attributes (field='cpu', bit_offset=1), then it means that
bit position 6 of the encoded binary string corresponds to the bit offset
1 (which is the second least significant bit) of encoded field 'cpu'.
"""
BitEntry = collections.namedtuple('BitEntry', ['field', 'bit_offset'])
if self.pattern is None:
raise common.HWIDException(
'Cannot construct bit mapping with uninitialized pattern')
ret = {}
index = common.HWID.HEADER_BITS # Skips the 5-bit common header.
field_offset_map = collections.defaultdict(int)
for element in self.GetPatternByImageId(image_id)['fields']:
for field, length in element.iteritems():
for _ in xrange(length):
ret[index] = BitEntry(field, field_offset_map[field])
field_offset_map[field] += 1
index += 1
return ret
class Rules(object):
"""A class for parsing and evaluating rules defined in the database.
Args:
rule_list: A list of dicts that can be converted to a list of Rule objects.
"""
def __init__(self, rule_list):
self.schema = schema.List('list of rules', schema.FixedDict(
'rule', items={
'name': schema.Scalar('rule name', str),
'evaluate': schema.AnyOf([
schema.Scalar('rule function', str),
schema.List('list of rule functions',
schema.Scalar('rule function', str))])
}, optional_items={
'when': schema.Scalar('expression', str),
'otherwise': schema.AnyOf([
schema.Scalar('rule function', str),
schema.List('list of rule functions',
schema.Scalar('rule function', str))])
}))
self.schema.Validate(rule_list)
self.initialized = False
self.rule_list = [rule.Rule.CreateFromDict(r) for r in rule_list]
for r in self.rule_list:
if not any([r.name.startswith(x) for x in ('device_info.', 'verify.')]):
raise common.HWIDException(
'Invalid rule name %r; rule name must be prefixed with '
'"device_info." (evaluated when generating HWID) '
'or "verify." (evaluated when verifying HWID)' % r.name)
def _Initialize(self):
# Lazy import to avoid circular import problems. This also avoids import
# when only decoding functions are needed.
# These imports are needed to make sure all the rule functions needed by
# HWID-related operations are loaded and initialized.
# pylint: disable = W0612
from cros.factory import common_rule_functions
from cros.factory.hwid import hwid_rule_functions
self.initialized = True
def EvaluateRules(self, context, namespace=None):
"""Evaluate rules under the given context. If namespace is specified, only
those rules with names matching the specified namespace is evaluated.
Args:
context: A Context object holding all the context needed to evaluate the
rules.
namespace: A regular expression string indicating the rules to be
evaluated.
"""
if not self.initialized:
self._Initialize()
for r in self.rule_list:
if namespace is None or re.match(namespace, r.name):
r.Evaluate(context)