| # pylint: disable=E1101 |
| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import difflib |
| import inspect |
| import logging |
| import os |
| import yaml |
| |
| import factory_common # pylint: disable=W0611 |
| |
| |
| # Warning message prepended to all datastore files. |
| DATA_FILE_WARNING_MESSAGE_HEADER = ''' |
| # WARNING: This file is AUTOMATICALLY GENERATED, do not edit. |
| # The proper way to modify this file is using the hwid_tool. |
| '''.strip() |
| |
| |
| def YamlWrite(structured_data): |
| """Wrap yaml.dump to make calling convention consistent.""" |
| return yaml.dump(structured_data, default_flow_style=False) |
| |
| |
| def YamlRead(serialized_data): |
| """Wrap yaml.load to make calling convention consistent.""" |
| return yaml.safe_load(serialized_data) |
| |
| |
| class InvalidDataError(ValueError): |
| """Error in (en/de)coding or validating data.""" |
| pass |
| |
| |
| class YamlDatastore(object): |
| |
| def WriteOnDiff(self, path, filename, raw_data): |
| """Write data to file if there are any differences, logging the diffs. |
| |
| The file will be created if it does not exist. |
| """ |
| full_path = os.path.join(path, filename) |
| internal_data = (DATA_FILE_WARNING_MESSAGE_HEADER.split('\n') + |
| raw_data.strip('\n').split('\n')) |
| if os.path.exists(full_path): |
| file_data = [line.rstrip('\n') for line in open(full_path, 'r')] |
| diff = [line for line in difflib.unified_diff(file_data, internal_data)] |
| if not diff: |
| logging.debug('no differences for %s', full_path) |
| return |
| logging.info('updating %s with changes:\n%s', filename, '\n'.join(diff)) |
| else: |
| logging.info('creating new data file %s', filename) |
| with open(full_path, 'w') as f: |
| f.write('%s\n' % '\n'.join(internal_data)) |
| |
| |
| class _DatastoreBase(object): |
| |
| def __init__(self, **field_dict): |
| """Creates object using the field data specified in field_dict.""" |
| self.__dict__.update(field_dict) |
| |
| def Yrepr(self, yaml_representer): |
| """The object YAML representation is just its field_dict data.""" |
| return yaml_representer.represent_data(self.__dict__) |
| |
| def Encode(self): |
| """Return the YAML string for this object and check its schema. |
| |
| After generating the output data, run decode on that to validate. |
| """ |
| yaml_data = YamlWrite(self) |
| self.Decode(yaml_data) |
| return yaml_data |
| |
| @classmethod |
| def New(cls): |
| field_dict = {} |
| for elt_key, elt_type in cls._schema.items(): |
| if isinstance(elt_type, tuple): |
| elt_data = {} if elt_type[0] is dict else [] |
| elif issubclass(elt_type, _DatastoreBase): |
| elt_data = elt_type.New() |
| else: |
| elt_data = '' |
| field_dict[elt_key] = elt_data |
| return cls(**field_dict) |
| |
| @classmethod |
| def Decode(cls, data): |
| """Given YAML string, creates corresponding object and check its schema.""" |
| def NestedDecode(elt_type, elt_data): |
| """Apply appropriate constructors to nested object data.""" |
| if isinstance(elt_type, tuple): |
| collection_type, field_type = elt_type |
| if collection_type is dict: |
| return dict((field_key, NestedDecode(field_type, field)) |
| for field_key, field in elt_data.items()) |
| if collection_type is list: |
| return sorted(NestedDecode(field_type, field) for field in elt_data) |
| elif isinstance(elt_type, list): |
| for elt_subtype in elt_type: |
| try: |
| return NestedDecode(elt_subtype, elt_data) |
| except: # pylint: disable=W0702 |
| continue |
| elif issubclass(elt_type, _DatastoreBase): |
| cooked_field_dict = dict( |
| (subelt_key, NestedDecode(subelt_type, elt_data[subelt_key])) |
| # pylint: disable=W0212 |
| for subelt_key, subelt_type in elt_type._schema.items()) |
| return elt_type(**cooked_field_dict) |
| elif isinstance(elt_data, elt_type): |
| return elt_data |
| raise InvalidDataError |
| try: |
| field_dict = YamlRead(data) |
| except yaml.YAMLError, e: |
| raise InvalidDataError("YAML deserialization error: %s" % e) |
| cls.ValidateSchema(field_dict) |
| cooked_field_dict = dict( |
| (elt_key, NestedDecode(elt_type, field_dict[elt_key])) |
| for elt_key, elt_type in cls._schema.items()) |
| return cls(**cooked_field_dict) |
| |
| @classmethod |
| def FieldNames(cls): |
| return cls._schema.keys() |
| |
| @classmethod |
| def ValidateSchema(cls, field_dict): |
| """Ensures the layout of field_dict matches the class schema specification. |
| |
| This should be run before data is coerced into objects. When the |
| schema indicates an object class type, the corresponding schema |
| for that class is applied to the corresponding subset of field |
| data. Long story short, field_dict should not contain any |
| objects. |
| |
| Args: |
| field_dict: Data which must have layout and type matching schema. |
| """ |
| def ValidateCollection(top_level_tag, collection_type_data, |
| collection_data): |
| if len(collection_type_data) != 2: |
| raise InvalidDataError( |
| '%r schema contains bad type definiton for element %r, ' % |
| (cls.__name__, top_level_tag) + |
| 'expected (collection type, field type) tuple, ' |
| 'found %s' % repr(collection_type_data)) |
| collection_type, field_type = collection_type_data |
| if collection_type not in [dict, list]: |
| raise InvalidDataError( |
| '%r schema element %r has illegal collection type %r ' % |
| (cls.__name__, top_level_tag, collection_type.__name__) + |
| '(only "dict" and "list" are supported)') |
| if not isinstance(collection_data, collection_type): |
| raise InvalidDataError( |
| '%r schema validation failed for element %r, ' % |
| (cls.__name__, top_level_tag) + |
| 'expected type %r, found %r' % |
| (collection_type.__name__, type(collection_data).__name__)) |
| if collection_type is dict: |
| for field_key, field_data in collection_data.items(): |
| if not (isinstance(field_key, str) or isinstance(field_key, int)): |
| raise InvalidDataError( |
| '%r schema validation failed for element %r, ' % |
| (cls.__name__, top_level_tag) + |
| 'dict key must be "str" or "int", found %r' % |
| (type(field_key).__name__)) |
| ValidateField(top_level_tag, field_type, field_data) |
| elif collection_type is list: |
| for field_data in collection_data: |
| ValidateField(top_level_tag, field_type, field_data) |
| def ValidateField(top_level_tag, field_type, field_data): |
| if isinstance(field_type, tuple): |
| ValidateCollection(top_level_tag, field_type, field_data) |
| return |
| if isinstance(field_type, list): |
| for field_subtype in field_type: |
| try: |
| ValidateField(top_level_tag, field_subtype, field_data) |
| return |
| except InvalidDataError: |
| continue |
| elif issubclass(field_type, _DatastoreBase): |
| field_type.ValidateSchema(field_data) |
| return |
| elif isinstance(field_data, field_type): |
| return |
| raise InvalidDataError( |
| '%r schema validation failed for element %r, expected type %r, found %r' |
| % (cls.__name__, top_level_tag, |
| field_type.__name__, type(field_data).__name__)) |
| if (set(cls._schema) ^ set(field_dict)): |
| raise InvalidDataError( |
| '%r schema and data dict keys do not match, ' % cls.__name__ + |
| 'data is missing keys: %r, ' % |
| sorted(set(cls._schema) - set(field_dict)) + |
| 'data has extra keys: %r' % |
| sorted(set(field_dict) - set(cls._schema))) |
| for top_level_tag, field_type in cls._schema.items(): |
| ValidateField(top_level_tag, field_type, field_dict[top_level_tag]) |
| |
| |
| def MakeDatastoreClass(class_name, class_schema): |
| """Define storable object type with a schema and yaml representer. |
| |
| The new object is defined in the module scope of the calling |
| function. The yaml representer is added so that yaml calls the |
| appropriate Yrepr function for the object, instead of the generic |
| tagged python object format. |
| |
| Args: |
| class_name: Name of the class to be defined. |
| class_schema: A dict describing the class schema, which will |
| be enforced whenever the class data is written to the backing |
| store. The dict must contain a key for each data field, and the |
| corresponding value is the type of that field ; value types can |
| be singleton python types, or they can be 2-tuples. For tuples, |
| the lhs must be either dict or list, and the rhs can be either a |
| singleton or recursively another tuple. The keys for dicts are |
| implicitly enforced to always be of type str. There must be a |
| field called 'name', which is used as an index by the backing |
| store. |
| Returns: |
| Nothing. |
| """ |
| cls = type(class_name, (_DatastoreBase,), {}) |
| caller_module = inspect.getmodule((inspect.stack()[1])[0]) |
| setattr(caller_module, class_name, cls) |
| cls._schema = class_schema |
| yaml.add_representer(cls, lambda yaml_repr, obj: obj.Yrepr(yaml_repr)) |