| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| |
| """Umpire YAML config validator. |
| |
| To validate a YAML file 'abc.yaml': |
| from cros.factory.umpire.config import UmpireConfig |
| umpire_config = UmpireConfig('abc.yaml') |
| """ |
| |
| from __future__ import print_function |
| |
| import copy |
| import yaml |
| |
| import factory_common # pylint: disable=W0611 |
| from cros.factory.schema import FixedDict, List, Optional, Scalar |
| from cros.factory.umpire.common import UmpireError, VerifyResource |
| from cros.factory.umpire.service.umpire_service import LoadServiceModule |
| from cros.factory.umpire.service.umpire_service import GetServiceSchemata |
| |
| |
| NUMBER_SHOP_FLOOR_HANDLERS = 50 |
| |
| |
| # Ruleset matcher validator. |
| _RULE_MATCHER_SCHEMA = FixedDict( |
| 'Matcher of a rule', |
| optional_items={ |
| 'mac': List('MAC address list', |
| Scalar('Network interface MAC address', str)), |
| 'sn': List('Serial number list', |
| Scalar('Serial number', str)), |
| 'mlb_sn': List('MLB serial number list', |
| Scalar('MLB serial number', str)), |
| 'stage': List('Matched stage list', Scalar('Stage', str)), |
| 'sn_range': List( |
| 'Inclusive serial number start/end pair', |
| Scalar('Serial number or "-" as open end', str)), |
| 'mlb_sn_range': List( |
| 'Inclusive MLB serial number start/end pair', |
| Scalar('MLB serial number or "-" as open end', str))}) |
| # Factory stage range. |
| _FACTORY_STAGE_RANGE = List( |
| 'Factory stage range', |
| Optional(label='Factory stage', |
| types=[Scalar('Fixed factory stage', str)])) |
| # Rulesets enable_update validator. |
| _ENABLE_UPDATE_SCHEMA = FixedDict( |
| 'Matcher of enable update', |
| optional_items={ |
| 'device_factory_toolkit': _FACTORY_STAGE_RANGE, |
| 'firmware_bios': _FACTORY_STAGE_RANGE, |
| 'firmware_ec': _FACTORY_STAGE_RANGE, |
| 'firmware_pd': _FACTORY_STAGE_RANGE, |
| 'rootfs_release': _FACTORY_STAGE_RANGE, |
| 'rootfs_test': _FACTORY_STAGE_RANGE}) |
| # Rulesets validator. |
| _RULESETS_SCHEMA = List( |
| 'Rule sets for selecting configuration', |
| FixedDict( |
| 'Rule and description', |
| items={ |
| 'bundle_id': Scalar('The target bundle', str), |
| 'note': Scalar('Brief summary of this rule', str), |
| 'active': Scalar('Initial state of this rule', bool)}, |
| optional_items={ |
| 'match': _RULE_MATCHER_SCHEMA, |
| 'enable_update': _ENABLE_UPDATE_SCHEMA})) |
| # Resources validator. |
| _RESOURCES_SCHEMA = FixedDict( |
| 'Resource files in a bundle', |
| items={ |
| 'device_factory_toolkit': Scalar('Device package', str), |
| 'oem_partition': Scalar('OEM channel', str), |
| 'rootfs_release': Scalar('RELEASE channel', str), |
| 'rootfs_test': Scalar('TEST channel', str), |
| 'stateful_partition': Scalar('STATE channel', str)}, |
| optional_items={ |
| 'server_factory_toolkit': Scalar('Server package', str), |
| 'netboot_firmware': Scalar('Netboot BIOS image.net.bin', str), |
| 'netboot_vmlinux': Scalar('Netboot install vmlinux', str), |
| 'complete_script': Scalar('COMPLETE channel', str), |
| 'efi_partition': Scalar('EFI channel', str), |
| 'firmware': Scalar('FIRMWARE channel', str), |
| 'hwid': Scalar('HWID updater', str), |
| 'download_conf': Scalar('DOWNLOAD configuration', str)}) |
| # Single bundle validator. |
| # A valid configuration can contain multiple bundles. At any time, one device |
| # state (mac, sn, mlb_sn) can map to one bundle only. |
| _BUNDLE_SCHEMA = FixedDict( |
| 'Bundle for one device', |
| items={ |
| 'id': Scalar('Unique key for this bundle', str), |
| 'note': Scalar('Notes', str), |
| 'resources': _RESOURCES_SCHEMA, |
| 'shop_floor': FixedDict( |
| 'Shop floor handler settings', |
| items={ |
| 'handler': Scalar('Full handler package name', str)}, |
| optional_items={ |
| 'handler_config': FixedDict( |
| 'Optional handler configs', |
| optional_items={ |
| 'mount_point_smt': Scalar('SMT mount point', str), |
| 'mount_point_fatp': Scalar('FATP mount point', str) |
| })})}) |
| |
| |
| def ValidateConfig(config): |
| """Validates Umpire config dict. |
| |
| ValidateConfig() imports service modules. Validates configuration schema |
| and service parameters. |
| |
| Parameter: |
| config: Umpire config dict. |
| |
| Raises: |
| TypeError: when 'services' is not a dict. |
| KeyError: when top level key 'services' not found. |
| SchemaException: on schema validation failed. |
| """ |
| map(LoadServiceModule, config['services'].keys()) |
| schema = FixedDict( |
| 'Top level Umpire config fields', |
| items={ |
| 'board': Scalar('Board name', str), |
| 'rulesets': _RULESETS_SCHEMA, |
| 'services': GetServiceSchemata(), |
| 'bundles': List('Bundles', _BUNDLE_SCHEMA), |
| 'ip': Scalar('IP address to bind', str), |
| 'port': Scalar('Base port', int)}) |
| schema.Validate(config) |
| |
| |
| def ValidateResources(config, env): |
| """Validates resources in each active bundle. |
| |
| Args: |
| config: Umpire config dict. |
| env: UmpireEnv. |
| |
| Raises: |
| UmpireError if there's any resources for active bundles missing. |
| """ |
| active_bundles = set(r['bundle_id'] |
| for r in config['rulesets'] if r.get('active')) |
| |
| # Used to cache verified resource name |
| resource_verified = set() |
| |
| # Used to store missing or checksum mismatch resource(s). |
| error = [] |
| for bundle in config['bundles']: |
| if bundle['id'] not in active_bundles: |
| continue |
| for resource_name, resource_filename in bundle['resources'].items(): |
| if resource_filename not in resource_verified: |
| resource_verified.add(resource_filename) |
| try: |
| resource_path = env.GetResourcePath(resource_filename) |
| except IOError as e: |
| error.append('[NOT FOUND] resource %s:%s for bundle %r' % ( |
| resource_name, e.filename, bundle['id'])) |
| else: |
| if not VerifyResource(resource_path): |
| error.append('[CHECKSUM MISMATCH] resource %s:%s for bundle %r' % ( |
| resource_name, resource_path, bundle['id'])) |
| if error: |
| raise UmpireError('\n'.join(error)) |
| |
| |
| def ShowDiff(original, new): |
| """Shows difference between original and new UmpireConfig. |
| |
| Note that it only compares active bundles, i.e. bundles which are used by |
| active rulesets. |
| |
| Args: |
| original: Original UmpireConfig object. |
| new: New UmpireConfig object. |
| |
| Returns: |
| List of string showing the difference. |
| """ |
| def DumpRulesets(rulesets): |
| INDENT_SPACE = ' ' |
| for r in rulesets: |
| rule_yaml = yaml.dump(RulesetOrderedDict(r), default_flow_style=False) |
| result.extend((INDENT_SPACE + line) for line in rule_yaml.split('\n')) |
| |
| result = [] |
| original_active_rulesets = [r for r in original['rulesets'] if r['active']] |
| new_active_rulesets = [r for r in new['rulesets'] if r['active']] |
| newly_added_rulesets = [r for r in new_active_rulesets |
| if r not in original_active_rulesets] |
| deleted_rulesets = [r for r in original_active_rulesets |
| if r not in new_active_rulesets] |
| |
| if newly_added_rulesets: |
| result.append('Newly added rulesets:') |
| DumpRulesets(newly_added_rulesets) |
| |
| if deleted_rulesets: |
| result.append('Deleted rulesets:') |
| DumpRulesets(deleted_rulesets) |
| |
| return result |
| |
| |
| class UmpireOrderedDict(dict): |
| |
| """Used to output UmpireConfig with desired key order.""" |
| |
| def Omap(self): |
| result = [(k, self[k]) for k in ['board', 'ip', 'port']] |
| result.append(('rulesets', |
| [RulesetOrderedDict(r) for r in self['rulesets']])) |
| result.append(('services', ServicesOrderedDict(self['services']))) |
| result.append(('bundles', |
| [BundleOrderedDict(b) for b in self['bundles']])) |
| return result |
| |
| |
| def DictToOrderedList(d, key_order, d_name): |
| """Converts a dict to list of key, value pairs with key_order. |
| |
| Args: |
| d: dictionary |
| key_order: predefined key order |
| d_name: name of the dict. Used when raising an Exception. |
| |
| Returns: |
| [(key, value)] of the dict with key_order. |
| |
| Raises: |
| KeyError if the dict has key not defined in key_order. |
| """ |
| # Sanity check first. |
| missing_keys = [k for k in d if k not in key_order] |
| if missing_keys: |
| raise KeyError('Undefined key(s): %r in %s' % (missing_keys, d_name)) |
| |
| return [(k, d[k]) for k in key_order if k in d] |
| |
| |
| class RulesetOrderedDict(dict): |
| |
| """Used to output an UmpireConfig's ruleset with desired key order.""" |
| _KEY_ORDER = ['bundle_id', 'note', 'active', 'enable_update', 'match'] |
| |
| def Omap(self): |
| return DictToOrderedList(self, self._KEY_ORDER, 'RulesetOrderedDict') |
| |
| |
| class ServicesOrderedDict(dict): |
| |
| """Used to output an UmpireConfig's services with desired key order.""" |
| _KEY_ORDER = ['archiver', 'http', 'shop_floor', 'minijack', |
| 'mock_shop_floor_backend', 'rsync', 'dhcp', 'tftp', 'overlord'] |
| |
| def Omap(self): |
| return DictToOrderedList(self, self._KEY_ORDER, 'ServicesOrderedDict') |
| |
| |
| class BundleOrderedDict(dict): |
| |
| """Used to output an UmpireConfig's bundle with desired key order.""" |
| _KEY_ORDER = ['id', 'note', 'shop_floor', 'auto_update', 'resources'] |
| |
| def Omap(self): |
| return DictToOrderedList(self, self._KEY_ORDER, 'BundleOrderedDict') |
| |
| |
| def RepresentOmap(dumper, data): |
| """A YAML representer for ordered map with dict look.""" |
| return dumper.represent_mapping(u'tag:yaml.org,2002:map', data.Omap()) |
| |
| |
| yaml.add_representer(UmpireOrderedDict, RepresentOmap) |
| yaml.add_representer(RulesetOrderedDict, RepresentOmap) |
| yaml.add_representer(ServicesOrderedDict, RepresentOmap) |
| yaml.add_representer(BundleOrderedDict, RepresentOmap) |
| |
| |
| class UmpireConfig(dict): |
| |
| """Container of Umpire configuration. |
| |
| It reads an Umpire config file in YAML format or a dict. Then validates it. |
| |
| Once validated, the UmpireConfig object is a dict for users to access config. |
| |
| Properties: |
| bundle_map: maps bundle ID to bundle dict. |
| |
| Raises: |
| TypeError: when 'services' is not a dict. |
| KeyError: when top level key 'services' not found. |
| SchemaException: when schema validation failed. |
| |
| Example: |
| umpire_config = UmpireConfig(config_file) |
| logging.info('Reads Umpire config for boards: %s', umpire_config['board'] |
| """ |
| |
| def __init__(self, config, validate=True): |
| """Loads an UmpireConfig and validates it. |
| |
| If validate is set, it validates config with ValidateConfig() and checks |
| default bundle's existance. |
| |
| Args: |
| config: path to an Umpire config file or config content or an |
| UmpireConfig dict. |
| validate: True to validate config (schema check only; no resource check) |
| Note that it would be removed once all UmpireConfig components are |
| implemented. |
| """ |
| self.bundle_map = {} |
| if isinstance(config, str): |
| if config.find('\n') == -1: |
| # Treat single line config as file name. |
| with open(config, 'r') as f: |
| config = yaml.load(f) |
| else: |
| config = yaml.load(config) |
| elif isinstance(config, dict): |
| # As config dict has multi-layer dict, deepcopy is necessary. |
| config = copy.deepcopy(config) |
| |
| super(UmpireConfig, self).__init__(config) |
| self.BuildBundleMap() |
| |
| if validate: |
| ValidateConfig(config) |
| if not self.GetDefaultBundle(): |
| raise UmpireError('Missing default bundle') |
| |
| def BuildBundleMap(self): |
| """Builds bundle_map attribute. |
| |
| bundle_map is a dict maps bundle ID to bundle dict. |
| """ |
| self.bundle_map = {bundle['id']: bundle |
| for bundle in self.get('bundles', [])} |
| |
| def WriteFile(self, config_file): |
| """Writes UmpireConfig to a file in YAML format. |
| |
| Args: |
| config_file: path to write. |
| """ |
| with open(config_file, 'w') as f: |
| yaml.dump(UmpireOrderedDict(self), f, default_flow_style=False) |
| |
| def GetDefaultBundle(self): |
| """Gets the default bundle. |
| |
| Returns: |
| The default bundle object. None if not found. |
| """ |
| for rule in reversed(self.get('rulesets', [])): |
| if rule['active']: |
| return self.GetBundle(rule['bundle_id']) |
| return None |
| |
| def GetBundle(self, bundle_id): |
| """Gets a bundle object with specific bundle ID. |
| |
| Args: |
| bundle_id: bundle ID to get |
| |
| Returns: |
| The bundle object. None if not found. |
| """ |
| return self.bundle_map.get(bundle_id, None) |
| |
| def GetActiveBundles(self): |
| """Gets active bundles. |
| |
| Returns: |
| Iterable of active bundles. |
| """ |
| for active_rule in (r for r in self.get('rulesets', []) if r['active']): |
| bundle_id = active_rule['bundle_id'] |
| if bundle_id in self.bundle_map: |
| yield self.bundle_map[bundle_id] |