blob: 91159a4a8b1c5a212163af11709b44fde1580691 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Bulkloader Config Parser and runner.
A library to read bulkloader yaml configs.
The code to interface between the bulkloader tool and the various connectors
and conversions.
"""
import copy
import os
import sys
from google.appengine.api import datastore
from google.appengine.ext.bulkload import bulkloader_errors
from google.appengine.ext.bulkload import bulkloader_parser
from google.appengine.ext.bulkload import csv_connector
from google.appengine.ext.bulkload import simpletext_connector
from google.appengine.ext.bulkload import simplexml_connector
CONNECTOR_FACTORIES = {
'csv': csv_connector.CsvConnector.create_from_options,
'simplexml': simplexml_connector.SimpleXmlConnector.create_from_options,
'simpletext': simpletext_connector.SimpleTextConnector.create_from_options,
}
class BulkloadState(object):
"""Encapsulates state which is passed to other methods used in bulk loading.
It is optionally passed to import/export transform functions.
It is passed to connector objects.
Properties:
filename: The filename flag passed on the command line.
loader_opts: The loader_opts flag passed on the command line.
exporter_opts: The exporter_opts flag passed on the command line.
current_instance: The current entity or model instance.
current_entity: On export, the current entity instance.
current_dictionary: The current input or output dictionary.
"""
def __init__(self):
self.filename = ''
self.loader_opts = None
self.exporter_opts = None
self.current_instance = None
self.current_entity = None
self.current_dictionary = None
def default_export_transform(value):
"""A default export transform if nothing else is specified.
We assume most export connectors are string based, so a string cast is used.
However, casting None to a string leads to 'None', so that's special cased.
Args:
value: A value of some type.
Returns:
unicode(value), or u'' if value is None
"""
if value is None:
return u''
else:
return unicode(value)
class DictConvertor(object):
"""Convert a dict to an App Engine model instance or entity. And back.
The constructor takes a transformer spec representing a single transformer
in a bulkloader.yaml.
The DictConvertor object has two public methods, dict_to_entity and
entity_to_dict, which do the conversion between a neutral dictionary (the
input/output of a connector) and an entity based on the spec.
Note that the model class may be used instead of an entity during the
transform--this adds extra validation, etc, but also has a performance hit.
"""
def __init__(self, transformer_spec):
"""Constructor. See class docstring for more info.
Args:
transformer_spec: A single transformer from a parsed bulkloader.yaml.
This assumes that the transformer_spec is valid. It does not
double check things like use_model_on_export requiring model.
"""
self._transformer_spec = transformer_spec
self._create_key = None
for prop in self._transformer_spec.property_map:
if prop.property == '__key__':
self._create_key = prop
def dict_to_entity(self, input_dict, bulkload_state):
"""Transform the dict to a model or entity instance(s).
Args:
input_dict: Neutral input dictionary describing a single input record.
bulkload_state: bulkload_state object describing the state.
Returns:
Entity or model instance, or collection of entity or model instances,
to be uploaded.
"""
bulkload_state_copy = copy.copy(bulkload_state)
bulkload_state_copy.current_dictionary = input_dict
instance = self.__create_instance(input_dict, bulkload_state_copy)
bulkload_state_copy.current_instance = instance
self.__run_import_transforms(input_dict, instance, bulkload_state_copy)
if self._transformer_spec.post_import_function:
post_map_instance = self._transformer_spec.post_import_function(
input_dict, instance, bulkload_state_copy)
return post_map_instance
return instance
def entity_to_dict(self, entity, bulkload_state):
"""Transform the entity to a dict, possibly via a model.
Args:
entity: An entity.
bulkload_state: bulkload_state object describing the global state.
Returns:
A neutral output dictionary describing the record to write to the
output.
In the future this may return zero or multiple output dictionaries.
"""
if self._transformer_spec.use_model_on_export:
instance = self._transformer_spec.model.from_entity(entity)
else:
instance = entity
export_dict = {}
bulkload_state.current_entity = entity
bulkload_state.current_instance = instance
bulkload_state.current_dictionary = export_dict
self.__run_export_transforms(instance, export_dict, bulkload_state)
if self._transformer_spec.post_export_function:
post_export_result = self._transformer_spec.post_export_function(
instance, export_dict, bulkload_state)
return post_export_result
return export_dict
def __dict_to_prop(self, transform, input_dict, bulkload_state):
"""Handle a single property on import.
Args:
transform: The transform spec for this property.
input_dict: Neutral input dictionary describing a single input record.
bulkload_state: bulkload_state object describing the global state.
Returns:
The value for this particular property.
"""
if transform.import_template:
value = transform.import_template % input_dict
else:
value = input_dict.get(transform.external_name)
if transform.import_transform:
if transform.import_transform.supports_bulkload_state:
value = transform.import_transform(value, bulkload_state=bulkload_state)
else:
value = transform.import_transform(value)
return value
def __create_instance(self, input_dict, bulkload_state):
"""Return a model instance or entity from an input_dict.
Args:
input_dict: Neutral input dictionary describing a single input record.
bulkload_state: bulkload_state object describing the global state.
Returns:
Entity or model instance, or collection of entity or model instances,
to be uploaded.
"""
key = None
if self._create_key:
key = self.__dict_to_prop(self._create_key, input_dict, bulkload_state)
if isinstance(key, (int, long)):
key = datastore.Key.from_path(self._transformer_spec.kind, key)
if self._transformer_spec.model:
if isinstance(key, datastore.Key):
return self._transformer_spec.model(key=key)
else:
return self._transformer_spec.model(key_name=key)
else:
if isinstance(key, datastore.Key):
parent = key.parent()
if key.name() is None:
return datastore.Entity(self._transformer_spec.kind,
parent=parent, id=key.id())
else:
return datastore.Entity(self._transformer_spec.kind,
parent=parent, name=key.name())
elif self._transformer_spec.model:
return self._transformer_spec.model()
return datastore.Entity(self._transformer_spec.kind, name=key)
def __run_import_transforms(self, input_dict, instance, bulkload_state):
"""Fill in a single entity or model instance from an input_dict.
Args:
input_dict: Input dict from the connector object.
instance: Entity or model instance to fill in.
bulkload_state: Passed bulkload state.
"""
for transform in self._transformer_spec.property_map:
if transform.property == '__key__':
continue
value = self.__dict_to_prop(transform, input_dict, bulkload_state)
if self._transformer_spec.model:
setattr(instance, transform.property, value)
else:
instance[transform.property] = value
def __prop_to_dict(self, value, property_name, transform, export_dict,
bulkload_state):
"""Transform a single export-side field value to dict property.
Args:
value: Value from the entity or model instance.
property_name: Name of the value in the entity or model instance.
transform: Transform property, either an ExportEntry or PropertyEntry
export_dict: output dictionary.
bulkload_state: Passed bulkload state.
Raises:
ErrorOnTransform, encapsulating an error encountered during the transform.
"""
if transform.export_transform:
try:
if transform.export_transform.supports_bulkload_state:
transformed_value = transform.export_transform(
value, bulkload_state=bulkload_state)
else:
transformed_value = transform.export_transform(value)
except Exception, err:
raise bulkloader_errors.ErrorOnTransform(
'Error on transform. '
'Property: %s External Name: %s. Code: %s Details: %s' %
(property_name, transform.external_name, transform.export_transform,
err))
else:
transformed_value = default_export_transform(value)
export_dict[transform.external_name] = transformed_value
def __run_export_transforms(self, instance, export_dict, bulkload_state):
"""Fill in export_dict for an entity or model instance.
Args:
instance: Entity or model instance
export_dict: output dictionary.
bulkload_state: Passed bulkload state.
"""
for transform in self._transformer_spec.property_map:
if transform.property == '__key__':
value = instance.key()
elif self._transformer_spec.use_model_on_export:
value = getattr(instance, transform.property, transform.default_value)
else:
value = instance.get(transform.property, transform.default_value)
if transform.export:
for prop in transform.export:
self.__prop_to_dict(value, transform.property, prop, export_dict,
bulkload_state)
elif transform.external_name:
self.__prop_to_dict(value, transform.property, transform, export_dict,
bulkload_state)
class GenericImporter(object):
"""Generic Bulkloader import class for input->dict->model transformation.
The bulkloader will call generate_records and create_entity, and
we'll delegate those to the passed in methods.
"""
def __init__(self, import_record_iterator, dict_to_entity, name,
reserve_keys):
"""Constructor.
Args:
import_record_iterator: Method which yields neutral dictionaries.
dict_to_entity: Method dict_to_entity(input_dict) returns model or entity
instance(s).
name: Name to register with the bulkloader importers (as 'kind').
reserve_keys: Method ReserveKeys(keys) which will advance the id
sequence in the datastore beyond each key.id(). Can be None.
"""
self.import_record_iterator = import_record_iterator
self.dict_to_entity = dict_to_entity
self.kind = name
self.bulkload_state = BulkloadState()
self.reserve_keys = reserve_keys
self.keys_to_reserve = []
def get_keys_to_reserve(self):
"""Required as part of the bulkloader Loader interface.
At the moment, this is not actually used by the bulkloader for import;
instead we will reserve keys if necessary in finalize.
Returns:
List of keys to reserve, currently always [].
"""
return []
def initialize(self, filename, loader_opts):
"""Performs initialization. Merely records the values for later use.
Args:
filename: The string given as the --filename flag argument.
loader_opts: The string given as the --loader_opts flag argument.
"""
self.bulkload_state.loader_opts = loader_opts
self.bulkload_state.filename = filename
def finalize(self):
"""Performs finalization actions after the upload completes.
If keys with numeric ids were used on import, this will call AllocateIds
to ensure that autogenerated IDs will not raise exceptions on conflict
with uploaded entities.
"""
if self.reserve_keys:
self.reserve_keys(self.keys_to_reserve)
def generate_records(self, filename):
"""Iterator yielding neutral dictionaries from the connector object.
Args:
filename: Filename argument passed in on the command line.
Returns:
Iterator yielding neutral dictionaries, later passed to create_entity.
"""
return self.import_record_iterator(filename, self.bulkload_state)
def generate_key(self, line_number, unused_values):
"""Bulkloader method to generate keys, mostly unused here.
This is called by the bulkloader just before it calls create_entity. The
line_number is returned to be passed to the record dict, but otherwise
unused.
Args:
line_number: Record number from the bulkloader.
unused_values: Neutral dict from generate_records; unused.
Returns:
line_number for use later on.
"""
return line_number
def __reserve_entity_key(self, entity):
"""Collect entity key to be reserved if it has a numeric id in its path.
Keys to reserve are stored in self.keys_to_reserve.
They are not tracked if self.reserve_keys is None.
Args:
entity: An entity with a key.
"""
if not self.reserve_keys:
return
if isinstance(entity, datastore.Entity):
if not entity.key():
return
elif not entity.has_key():
return
key = entity.key()
if not key.has_id_or_name():
return
for id_or_name in key.to_path()[1::2]:
if isinstance(id_or_name, (int, long)):
self.keys_to_reserve.append(key)
return
def create_entity(self, values, key_name=None, parent=None):
"""Creates entity/entities from input values via the dict_to_entity method.
Args:
values: Neutral dict from generate_records.
key_name: record number from generate_key.
parent: Always None in this implementation of a Loader.
Returns:
Entity or model instance, or collection of entity or model instances,
to be uploaded.
"""
input_dict = values
input_dict['__record_number__'] = key_name
entity = self.dict_to_entity(input_dict, self.bulkload_state)
self.__reserve_entity_key(entity)
return entity
class GenericExporter(object):
"""Implements bulkloader.Exporter interface and delegates.
This will delegate to the passed in entity_to_dict method and the
methods on the export_recorder which are in the ConnectorInterface.
"""
def __init__(self, export_recorder, entity_to_dict, kind,
sort_key_from_entity):
"""Constructor.
Args:
export_recorder: Object which writes results, an implementation of
ConnectorInterface.
entity_to_dict: Method which converts a single entity to a neutral dict.
kind: Kind to identify this object to the bulkloader.
sort_key_from_entity: Optional method to return a sort key for each
entity. This key will be used to sort the downloaded entities before
passing them to eneity_to_dict.
"""
self.export_recorder = export_recorder
self.entity_to_dict = entity_to_dict
self.kind = kind
self.sort_key_from_entity = sort_key_from_entity
self.calculate_sort_key_from_entity = bool(sort_key_from_entity)
self.bulkload_state = BulkloadState()
def initialize(self, filename, exporter_opts):
"""Performs initialization and validation of the output file.
Args:
filename: The string given as the --filename flag argument.
exporter_opts: The string given as the --exporter_opts flag argument.
"""
self.bulkload_state.filename = filename
self.bulkload_state.exporter_opts = exporter_opts
self.export_recorder.initialize_export(filename, self.bulkload_state)
def output_entities(self, entity_iterator):
"""Outputs the downloaded entities.
Args:
entity_iterator: An iterator that yields the downloaded entities
in sorted order.
"""
for entity in entity_iterator:
output_dict = self.entity_to_dict(entity, self.bulkload_state)
if output_dict:
self.export_recorder.write_dict(output_dict)
def finalize(self):
"""Performs finalization actions after the download completes."""
self.export_recorder.finalize_export()
def create_transformer_classes(transformer_spec, config_globals, reserve_keys):
"""Create an importer and exporter class from a transformer spec.
Args:
transformer_spec: A bulkloader_parser.TransformerEntry.
config_globals: Dict to use to reference globals for code in the config.
reserve_keys: Method ReserveKeys(keys) which will advance the id
sequence in the datastore beyond each key.id(). Can be None.
Raises:
InvalidConfig: when the config is invalid.
Returns:
Tuple, (importer class, exporter class), each which is in turn a wrapper
for the GenericImporter/GenericExporter class using a DictConvertor object
configured as per the transformer_spec.
"""
if transformer_spec.connector in CONNECTOR_FACTORIES:
connector_factory = CONNECTOR_FACTORIES[transformer_spec.connector]
elif config_globals and '.' in transformer_spec.connector:
try:
connector_factory = eval(transformer_spec.connector, config_globals)
except (NameError, AttributeError):
raise bulkloader_errors.InvalidConfiguration(
'Invalid connector specified for name=%s. Could not evaluate %s.' %
(transformer_spec.name, transformer_spec.connector))
else:
raise bulkloader_errors.InvalidConfiguration(
'Invalid connector specified for name=%s. Must be either a built in '
'connector ("%s") or a factory method in a module imported via '
'python_preamble.' %
(transformer_spec.name, '", "'.join(CONNECTOR_FACTORIES)))
options = {}
if transformer_spec.connector_options:
options = transformer_spec.connector_options.ToDict()
try:
connector_object = connector_factory(options, transformer_spec.name)
except TypeError:
raise bulkloader_errors.InvalidConfiguration(
'Invalid connector specified for name=%s. Could not initialize %s.' %
(transformer_spec.name, transformer_spec.connector))
dict_to_model_object = DictConvertor(transformer_spec)
class ImporterClass(GenericImporter):
"""Class to pass to the bulkloader, wraps the specificed configuration."""
def __init__(self):
super(self.__class__, self).__init__(
connector_object.generate_import_record,
dict_to_model_object.dict_to_entity,
transformer_spec.name,
reserve_keys)
importer_class = ImporterClass
class ExporterClass(GenericExporter):
"""Class to pass to the bulkloader, wraps the specificed configuration."""
def __init__(self):
super(self.__class__, self).__init__(
connector_object,
dict_to_model_object.entity_to_dict,
transformer_spec.kind,
transformer_spec.sort_key_from_entity)
exporter_class = ExporterClass
return importer_class, exporter_class
def load_config_from_stream(stream, reserve_keys=None):
"""Parse a bulkloader.yaml file into bulkloader loader classes.
Args:
stream: A stream containing bulkloader.yaml data.
reserve_keys: Method ReserveKeys(keys) which will advance the id
sequence in the datastore beyond each key.id(). Can be None.
Returns:
importer_classes, exporter_classes: Constructors suitable to pass to the
bulkloader.
"""
config_globals = {}
config = bulkloader_parser.load_config(stream, config_globals)
importer_classes = []
exporter_classes = []
for transformer in config.transformers:
importer, exporter = create_transformer_classes(transformer, config_globals,
reserve_keys)
if importer:
importer_classes.append(importer)
if exporter:
exporter_classes.append(exporter)
return importer_classes, exporter_classes
def load_config(filename, update_path=True, reserve_keys=None):
"""Load a configuration file and create importer and exporter classes.
Args:
filename: Filename of bulkloader.yaml.
update_path: Should sys.path be extended to include the path of filename?
reserve_keys: Method ReserveKeys(keys) which will advance the id
sequence in the datastore beyond each key.id(). Can be None.
Returns:
Tuple, (importer classes, exporter classes) based on the transformers
specified in the file.
"""
if update_path:
sys.path.append(os.path.abspath(os.path.dirname(os.path.abspath(filename))))
stream = file(filename, 'r')
try:
return load_config_from_stream(stream, reserve_keys)
finally:
stream.close()