blob: 76e5df0412458f26ba277d4a0832c64691914df8 [file] [log] [blame] [edit]
# Copyright 2023 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import re
from cros.factory.gooftool.common import Util
from cros.factory.gooftool import vpd_data
from cros.factory.test.l10n import regions
from cros.factory.test.rules import phase
from cros.factory.test.rules.privacy import FilterDict
from cros.factory.test.rules import registration_codes
from cros.factory.test.rules.registration_codes import RegistrationCode
from cros.factory.test.utils import smart_amp_utils
from cros.factory.utils import config_utils
from cros.factory.utils import file_utils
from cros.factory.utils.type_utils import Error
from cros.factory.external.chromeos_cli import cros_config
from cros.factory.external.chromeos_cli import vpd
# The change (https://crrev.com/c/3527015) was landed in 14675.0.0.
# TODO(cyueh) Drop this after all factory branches before 14675.0.0 are
# removed.
non_inclusive_custom_label_tag_vpd_key = (
bytes.fromhex('77686974656c6162656c5f746167').decode('utf-8'))
non_inclusive_custom_label_tag_cros_config_key = (
bytes.fromhex('77686974656c6162656c2d746167').decode('utf-8'))
IDENTITY_VPD_FIELDS = {
'custom_label_tag':
'custom-label-tag',
'customization_id':
'customization-id',
non_inclusive_custom_label_tag_vpd_key:
non_inclusive_custom_label_tag_cros_config_key,
}
class VPDError(Error):
pass
class VPDUtils:
def __init__(self, project):
self._util = Util()
self._project = project
self._vpd = vpd.VPDTool()
self._cros_config = cros_config.CrosConfig()
def _GetInvalidVPDFields(self, data, known_vpd, known_vpd_re):
"""Gets the invalid VPD fields from `data`.
Invalid VPDs are VPDs with unknown keys or unmatched value pattern.
Args:
data: a mapping of (key, value) for VPD data.
known_vpd: a mapping of (key, format_RE) for known data.
known_vpd_re: a mapping of (key_re, format_RE) for known data.
Returns:
A list of unknown keys and a list of (key, pattern) tuple where the
value of the key does not match the expected pattern.
"""
unknown_keys = []
misformat_key_pattern = []
for k, v in data.items():
if k in known_vpd:
if not re.fullmatch(known_vpd[k], v):
misformat_key_pattern.append((k, known_vpd[k]))
continue
for rk, rv in known_vpd_re.items():
if re.fullmatch(rk, k):
if not re.fullmatch(rv, v):
misformat_key_pattern.append((k, rv))
break
else:
unknown_keys.append(k)
return unknown_keys, misformat_key_pattern
def ClearUnknownVPDEntries(self):
rw_vpd = self._vpd.GetAllData(partition=vpd.VPD_READWRITE_PARTITION_NAME)
known_rw_vpd = dict(vpd_data.REQUIRED_RW_DATA, **vpd_data.KNOWN_RW_DATA)
unknown_keys, unused_misformat_key_pattern = self._GetInvalidVPDFields(
rw_vpd, known_rw_vpd, vpd_data.KNOWN_RW_DATA_RE)
logging.info('Current RW VPDs: %r', FilterDict(rw_vpd))
if unknown_keys:
self._ClearRWVPDEntries(unknown_keys)
else:
logging.info('No unknown RW VPDs are found. Skip clearing.')
return unknown_keys
def _ClearRWVPDEntries(self, keys):
logging.info('Removing VPD entries with key %r', keys)
try:
self._vpd.UpdateData({k: None
for k in keys},
partition=vpd.VPD_READWRITE_PARTITION_NAME)
except Exception as e:
raise VPDError(f'Failed to remove VPD entries: {e!r}') from None
def _CheckVPDFields(self, section, data, required, optional, optional_re):
"""Checks if all fields in data fall into given format.
Args:
section: a string for VPD section name, 'RO' or 'RW.
data: a mapping of (key, value) for VPD data.
required: a mapping of (key, format_RE) for required data.
optional: a mapping of (key, format_RE) for optional data.
optional_re: a mapping of (key_re, format_RE) for optional data.
Returns:
A list of verified keys.
Raises:
ValueError if some value does not match format_RE.
KeyError if some unexpected VPD key name is found.
"""
known = required.copy()
known.update(optional)
unknown_keys, misformat_key_pattern = self._GetInvalidVPDFields(
data, known, optional_re)
errors = []
for k in unknown_keys:
errors.append(f'Unexpected {section} VPD: {k}={data[k]}.')
for k, pattern in misformat_key_pattern:
errors.append(f'Incorrect {section} VPD: {k}={data[k]} '
f'(expected format: {pattern})')
missing_keys = set(required).difference(set(data.keys()))
if missing_keys:
errors.append(
f"Missing required {section} VPD values: {','.join(missing_keys)}")
if errors:
raise VPDError('\n'.join(errors))
def ClearFactoryVPDEntries(self):
"""Clears factory related VPD entries in the RW VPD.
All VPD entries with '.' in key name are considered as special.
We collect all special names and delete entries with known prefixes,
and fail if there are unknown entries left.
Returns:
A dict of the removed entries.
"""
def _IsFactoryVPD(k):
# These names are defined in cros.factory.test.device_data
known_names = ['factory.', 'component.', 'serials.']
return any(name for name in known_names if k.startswith(name))
rw_vpd = self._vpd.GetAllData(partition=vpd.VPD_READWRITE_PARTITION_NAME)
dot_entries = {k: v
for k, v in rw_vpd.items()
if '.' in k}
logging.info('Current special RW VPDs: %r', FilterDict(dot_entries))
entries = {k: v
for k, v in dot_entries.items()
if _IsFactoryVPD(k)}
unknown_keys = set(dot_entries) - set(entries)
if unknown_keys:
raise VPDError(f'Found unexpected RW VPD(s): {unknown_keys!r}')
if entries:
self._ClearRWVPDEntries(entries.keys())
else:
logging.info('No factory-related RW VPDs are found. Skip clearing.')
return entries
def _GetAudioVPDROData(self):
"""Return the required audio VPD RO data.
If a DUT comes with a smart amplifier, it must be calibrated in the
factory and the DSM-related VPD values must be set.
"""
speaker_amp, sound_card_init_file, channel_names = \
smart_amp_utils.GetSmartAmpInfo()
if speaker_amp:
logging.info('Amplifier %s found on DUT.', speaker_amp)
if not sound_card_init_file:
logging.info('No smart amplifier found! '
'Skip checking DSM VPD value.')
return {}
num_channels = len(channel_names)
dsm_vpd_ro_data = {}
if speaker_amp == 'TAS2563':
logging.info(
'The VPD RO should contain `dsm_calib_register_array` and '
'`dsm_calib_value_N` where N ranges from 0 ~ %d.', (num_channels - 1))
dsm_vpd_ro_data['dsm_calib_register_array'] = r'[0-9a-f]*'
for channel in range(num_channels):
dsm_vpd_ro_data[f'dsm_calib_value_{int(channel)}'] = r'[0-9a-f]*'
else:
logging.info(
'The VPD RO should contain `dsm_calib_r0_N` and '
'`dsm_calib_temp_N` where N ranges from 0 ~ %d.', (num_channels - 1))
for channel in range(num_channels):
dsm_vpd_ro_data[f'dsm_calib_r0_{int(channel)}'] = r'[0-9]*'
dsm_vpd_ro_data[f'dsm_calib_temp_{int(channel)}'] = r'[0-9]*'
return dsm_vpd_ro_data
def _GetDeviceNameForRegistrationCode(self, project):
def _LoadConfigJsonFile(_config):
try:
return config_utils.LoadConfig(_config, validate_schema=False)
except Exception:
return None
config = 'custom_label_reg_code'
# Load config json file
reg_code_config = _LoadConfigJsonFile(config)
if reg_code_config is None:
# Fallback to the legacy name.
reg_code_config = _LoadConfigJsonFile('whitelabel_reg_code')
if reg_code_config:
logging.warning(
'whitelabel_reg_code.json is deprecated, please rename it to %s',
config)
if reg_code_config is None:
return project
if project not in reg_code_config:
return project
# Get the custom-label-tag for custom label device
is_custom_label, custom_label_tag = self._cros_config.GetCustomLabelTag()
if (not is_custom_label or
custom_label_tag not in reg_code_config[project]):
return project
if reg_code_config[project][custom_label_tag]:
return custom_label_tag
return project
def VerifyVPD(self, skip_check_reg_code=False):
"""Verify that VPD values are set properly."""
required_vpd_ro_data = vpd_data.REQUIRED_RO_DATA.copy()
audio_vpd_ro_data = self._GetAudioVPDROData()
required_vpd_ro_data.update(audio_vpd_ro_data)
# Update PVS required field
# ARM-based platforms are not supported by PVS yet, therefore should be
# ruled out.
if phase.GetPhase() <= phase.DVT and self._util.shell(
'crossystem arch').stdout.strip() != 'arm':
required_vpd_ro_data.update(vpd_data.PVS_REQUIRED_RO_DATA.copy())
# Check required data
ro_vpd = self._vpd.GetAllData(partition=vpd.VPD_READONLY_PARTITION_NAME)
rw_vpd = self._vpd.GetAllData(partition=vpd.VPD_READWRITE_PARTITION_NAME)
self._CheckVPDFields('RO', ro_vpd, required_vpd_ro_data,
vpd_data.KNOWN_RO_DATA, vpd_data.KNOWN_RO_DATA_RE)
self._CheckVPDFields('RW', rw_vpd, vpd_data.REQUIRED_RW_DATA,
vpd_data.KNOWN_RW_DATA, vpd_data.KNOWN_RW_DATA_RE)
# Check known value contents.
region = ro_vpd['region']
if region not in regions.REGIONS:
raise VPDError(f'Unknown region: "{region}".')
if not skip_check_reg_code:
device_name = self._GetDeviceNameForRegistrationCode(self._project)
for type_prefix in ['UNIQUE', 'GROUP']:
vpd_field_name = type_prefix[0].lower() + 'bind_attribute'
type_name = getattr(RegistrationCode.Type, type_prefix + '_CODE')
try:
# RegCode should be ready since PVT
registration_codes.CheckRegistrationCode(
rw_vpd[vpd_field_name], type=type_name, device=device_name,
allow_dummy=phase.GetPhase() < phase.PVT_DOGFOOD)
except registration_codes.RegistrationCodeException as e:
raise VPDError(f'{vpd_field_name} is invalid: {e!r}') from None
def VerifyCacheForIdentity(self):
"""Verifies if the identity fields in vpd are synced with the boot cache.
crosid reads from boot cache so we have to reboot after VPD is updated and
before some critical steps. e.g. GSCFinalize.
"""
RO_VPD_CACHE_PATH = '/sys/firmware/vpd/ro'
out_of_sync_fields = []
for vpd_key in IDENTITY_VPD_FIELDS:
boot_cache_path = os.path.join(RO_VPD_CACHE_PATH, vpd_key)
boot_cache_value = None
if os.path.isfile(boot_cache_path):
boot_cache_value = file_utils.ReadFile(boot_cache_path)
vpd_value = self._vpd.GetValue(vpd_key)
if boot_cache_value != vpd_value:
out_of_sync_fields.append((vpd_key, vpd_value, boot_cache_value))
if out_of_sync_fields:
messages = ['VPD updated without reboot.'] + [
f'{vpd_key!r} is {vpd_value!r} in vpd but is {boot_cache_value!r}'
' in boot cache.'
for vpd_key, vpd_value, boot_cache_value in out_of_sync_fields
]
raise VPDError('\n'.join(messages))