| #!/usr/bin/env python |
| # |
| # Copyright 2007 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| |
| |
| """Asynchronous datastore API. |
| |
| This is designed to be the lowest-level API to be used by all Python |
| datastore client libraries. |
| |
| A refactoring is in progress to rebuild datastore.py on top of this, |
| while remaining nearly 100% backwards compatible. A new (not intended |
| to be compatible) library to replace db.py is also under development. |
| """ |
| |
| |
| |
| |
| |
| |
| |
| |
| __all__ = ['AbstractAdapter', |
| 'BaseConfiguration', |
| 'BaseConnection', |
| 'ConfigOption', |
| 'Configuration', |
| 'Connection', |
| 'IdentityAdapter', |
| 'MultiRpc', |
| 'TransactionalConnection', |
| 'TransactionOptions', |
| ] |
| |
| |
| |
| |
| import collections |
| import copy |
| import functools |
| import logging |
| |
| |
| from google.appengine.datastore import entity_pb |
| |
| |
| from google.appengine.api import api_base_pb |
| from google.appengine.api import apiproxy_rpc |
| from google.appengine.api import apiproxy_stub_map |
| |
| from google.appengine.api import datastore_errors |
| from google.appengine.api import datastore_types |
| |
| from google.appengine.api.app_identity import app_identity |
| from google.appengine.datastore import datastore_pb |
| from google.appengine.datastore import datastore_pbs |
| from google.appengine.datastore import datastore_v4_pb |
| from google.appengine.datastore import entity_v4_pb |
| from google.appengine.runtime import apiproxy_errors |
| |
| |
| |
| |
| |
| _MAX_ID_BATCH_SIZE = 1000 * 1000 * 1000 |
| |
| |
| _DATASTORE_V3 = 'datastore_v3' |
| _DATASTORE_V4 = 'datastore_v4' |
| |
| |
| |
| def _positional(max_pos_args): |
| """A decorator to declare that only the first N arguments may be positional. |
| |
| Note that for methods, n includes 'self'. |
| """ |
| def positional_decorator(wrapped): |
| @functools.wraps(wrapped) |
| def positional_wrapper(*args, **kwds): |
| if len(args) > max_pos_args: |
| plural_s = '' |
| if max_pos_args != 1: |
| plural_s = 's' |
| raise TypeError( |
| '%s() takes at most %d positional argument%s (%d given)' % |
| (wrapped.__name__, max_pos_args, plural_s, len(args))) |
| return wrapped(*args, **kwds) |
| return positional_wrapper |
| return positional_decorator |
| |
| |
| def _GetDatastoreType(app=None): |
| """Tries to get the datastore type for the given app. |
| |
| This function is only guaranteed to return something other than |
| UNKNOWN_DATASTORE when running in production and querying the current app. |
| """ |
| current_app = datastore_types.ResolveAppId(None) |
| if app not in (current_app, None): |
| return BaseConnection.UNKNOWN_DATASTORE |
| |
| |
| |
| |
| partition, _, _ = app_identity._ParseFullAppId(current_app) |
| if partition: |
| return BaseConnection.HIGH_REPLICATION_DATASTORE |
| return BaseConnection.MASTER_SLAVE_DATASTORE |
| |
| |
| class AbstractAdapter(object): |
| """Abstract interface between protobufs and user-level classes. |
| |
| This class defines conversions between the protobuf classes defined |
| in entity_pb.py on the one hand, and the corresponding user-level |
| classes (which are defined by higher-level API libraries such as |
| datastore.py or db.py) on the other hand. |
| |
| The premise is that the code in this module is agnostic about the |
| user-level classes used to represent keys and entities, while at the |
| same time provinging APIs that accept or return such user-level |
| classes. |
| |
| Higher-level libraries must subclass this abstract class and pass an |
| instance of the subclass to the Connection they want to use. |
| |
| These methods may raise datastore_errors.Error for bad inputs. |
| """ |
| |
| def pb_to_key(self, pb): |
| """Turn an entity_pb.Reference into a user-level key.""" |
| raise NotImplementedError |
| |
| def pb_v4_to_key(self, pb): |
| """Turn an entity_v4_pb.Key into a user-level key.""" |
| v3_ref = entity_pb.Reference() |
| datastore_pbs.get_entity_converter().v4_to_v3_reference(pb, v3_ref) |
| return self.pb_to_key(v3_ref) |
| |
| def pb_to_entity(self, pb): |
| """Turn an entity_pb.EntityProto into a user-level entity.""" |
| raise NotImplementedError |
| |
| def pb_v4_to_entity(self, pb): |
| """Turn an entity_v4_pb.Entity into a user-level entity.""" |
| v3_entity = entity_pb.EntityProto() |
| datastore_pbs.get_entity_converter().v4_to_v3_entity(pb, v3_entity) |
| return self.pb_to_entity(v3_entity) |
| |
| def pb_to_index(self, pb): |
| """Turn an entity_pb.CompositeIndex into a user-level Index |
| representation.""" |
| raise NotImplementedError |
| |
| def pb_to_query_result(self, pb, query_options): |
| """Turn an entity_pb.EntityProto into a user-level query result.""" |
| if query_options.keys_only: |
| return self.pb_to_key(pb.key()) |
| else: |
| return self.pb_to_entity(pb) |
| |
| def key_to_pb(self, key): |
| """Turn a user-level key into an entity_pb.Reference.""" |
| raise NotImplementedError |
| |
| def key_to_pb_v4(self, key): |
| """Turn a user-level key into an entity_v4_pb.Key.""" |
| v3_ref = self.key_to_pb(key) |
| v4_key = entity_v4_pb.Key() |
| datastore_pbs.get_entity_converter().v3_to_v4_key(v3_ref, v4_key) |
| return v4_key |
| |
| def entity_to_pb(self, entity): |
| """Turn a user-level entity into an entity_pb.EntityProto.""" |
| raise NotImplementedError |
| |
| def entity_to_pb_v4(self, entity): |
| """Turn a user-level entity into an entity_v4_pb.Key.""" |
| v3_entity = self.entity_to_pb(entity) |
| v4_entity = entity_v4_pb.Entity() |
| datastore_pbs.get_entity_converter().v3_to_v4_entity(v3_entity, v4_entity) |
| return v4_entity |
| |
| def new_key_pb(self): |
| """Create a new, empty entity_pb.Reference.""" |
| return entity_pb.Reference() |
| |
| def new_entity_pb(self): |
| """Create a new, empty entity_pb.EntityProto.""" |
| return entity_pb.EntityProto() |
| |
| |
| class IdentityAdapter(AbstractAdapter): |
| """A concrete adapter that implements the identity mapping. |
| |
| This is used as the default when a Connection is created without |
| specifying an adapter; that's primarily for testing. |
| """ |
| |
| def pb_to_key(self, pb): |
| return pb |
| |
| def pb_to_entity(self, pb): |
| return pb |
| |
| def key_to_pb(self, key): |
| return key |
| |
| def entity_to_pb(self, entity): |
| return entity |
| |
| def pb_to_index(self, pb): |
| return pb |
| |
| |
| class ConfigOption(object): |
| """A descriptor for a Configuration option. |
| |
| This class is used to create a configuration option on a class that inherits |
| from BaseConfiguration. A validator function decorated with this class will |
| be converted to a read-only descriptor and BaseConfiguration will implement |
| constructor and merging logic for that configuration option. A validator |
| function takes a single non-None value to validate and either throws |
| an exception or returns that value (or an equivalent value). A validator is |
| called once at construction time, but only if a non-None value for the |
| configuration option is specified the constructor's keyword arguments. |
| """ |
| |
| def __init__(self, validator): |
| self.validator = validator |
| |
| def __get__(self, obj, objtype): |
| if obj is None: |
| return self |
| return obj._values.get(self.validator.__name__, None) |
| |
| def __set__(self, obj, value): |
| raise AttributeError('Configuration options are immutable (%s)' % |
| (self.validator.__name__,)) |
| |
| def __call__(self, *args): |
| """Gets the first non-None value for this option from the given args. |
| |
| Args: |
| *arg: Any number of configuration objects or None values. |
| |
| Returns: |
| The first value for this ConfigOption found in the given configuration |
| objects or None. |
| |
| Raises: |
| datastore_errors.BadArgumentError if a given in object is not a |
| configuration object. |
| """ |
| name = self.validator.__name__ |
| for config in args: |
| |
| if isinstance(config, (type(None), apiproxy_stub_map.UserRPC)): |
| pass |
| elif not isinstance(config, BaseConfiguration): |
| raise datastore_errors.BadArgumentError( |
| 'invalid config argument (%r)' % (config,)) |
| elif name in config._values and self is config._options[name]: |
| return config._values[name] |
| return None |
| |
| |
| class _ConfigurationMetaClass(type): |
| """The metaclass for all Configuration types. |
| |
| This class is needed to store a class specific list of all ConfigOptions in |
| cls._options, and insert a __slots__ variable into the class dict before the |
| class is created to impose immutability. |
| """ |
| |
| def __new__(metaclass, classname, bases, classDict): |
| if classname == '_MergedConfiguration': |
| |
| return type.__new__(metaclass, classname, bases, classDict) |
| |
| |
| |
| if object in bases: |
| classDict['__slots__'] = ['_values'] |
| else: |
| classDict['__slots__'] = [] |
| cls = type.__new__(metaclass, classname, bases, classDict) |
| if object not in bases: |
| options = {} |
| for c in reversed(cls.__mro__): |
| if '_options' in c.__dict__: |
| options.update(c.__dict__['_options']) |
| cls._options = options |
| for option, value in cls.__dict__.iteritems(): |
| if isinstance(value, ConfigOption): |
| if cls._options.has_key(option): |
| raise TypeError('%s cannot be overridden (%s)' % |
| (option, cls.__name__)) |
| cls._options[option] = value |
| value._cls = cls |
| return cls |
| |
| |
| |
| |
| class BaseConfiguration(object): |
| """A base class for a configuration object. |
| |
| Subclasses should provide validation functions for every configuration option |
| they accept. Any public function decorated with ConfigOption is assumed to be |
| a validation function for an option of the same name. All validation functions |
| take a single non-None value to validate and must throw an exception or return |
| the value to store. |
| |
| This class forces subclasses to be immutable and exposes a read-only |
| property for every accepted configuration option. Configuration options set by |
| passing keyword arguments to the constructor. The constructor and merge |
| function are designed to avoid creating redundant copies and may return |
| the configuration objects passed to them if appropriate. |
| |
| Setting an option to None is the same as not specifying the option except in |
| the case where the 'config' argument is given. In this case the value on |
| 'config' of the same name is ignored. Options that are not specified will |
| return 'None' when accessed. |
| """ |
| |
| __metaclass__ = _ConfigurationMetaClass |
| _options = {} |
| |
| def __new__(cls, config=None, **kwargs): |
| """Immutable constructor. |
| |
| If 'config' is non-None all configuration options will default to the value |
| it contains unless the configuration option is explicitly set to 'None' in |
| the keyword arguments. If 'config' is None then all configuration options |
| default to None. |
| |
| Args: |
| config: Optional base configuration providing default values for |
| parameters not specified in the keyword arguments. |
| **kwargs: Configuration options to store on this object. |
| |
| Returns: |
| Either a new Configuration object or (if it would be equivalent) |
| the config argument unchanged, but never None. |
| """ |
| if config is None: |
| pass |
| elif isinstance(config, BaseConfiguration): |
| if cls is config.__class__ and config.__is_stronger(**kwargs): |
| |
| return config |
| |
| for key, value in config._values.iteritems(): |
| |
| if issubclass(cls, config._options[key]._cls): |
| kwargs.setdefault(key, value) |
| else: |
| raise datastore_errors.BadArgumentError( |
| 'config argument should be Configuration (%r)' % (config,)) |
| |
| obj = super(BaseConfiguration, cls).__new__(cls) |
| obj._values = {} |
| for key, value in kwargs.iteritems(): |
| if value is not None: |
| try: |
| config_option = obj._options[key] |
| except KeyError, err: |
| raise TypeError('Unknown configuration option (%s)' % err) |
| value = config_option.validator(value) |
| if value is not None: |
| obj._values[key] = value |
| return obj |
| |
| def __eq__(self, other): |
| if self is other: |
| return True |
| if not isinstance(other, BaseConfiguration): |
| return NotImplemented |
| return self._options == other._options and self._values == other._values |
| |
| def __ne__(self, other): |
| equal = self.__eq__(other) |
| if equal is NotImplemented: |
| return equal |
| return not equal |
| |
| def __hash__(self): |
| return (hash(frozenset(self._values.iteritems())) ^ |
| hash(frozenset(self._options.iteritems()))) |
| |
| def __repr__(self): |
| args = [] |
| for key_value in sorted(self._values.iteritems()): |
| args.append('%s=%r' % key_value) |
| return '%s(%s)' % (self.__class__.__name__, ', '.join(args)) |
| |
| def __is_stronger(self, **kwargs): |
| """Internal helper to ask whether a configuration is stronger than another. |
| |
| A configuration is stronger when it contains every name/value pair in |
| kwargs. |
| |
| Example: a configuration with: |
| (deadline=5, on_configuration=None, read_policy=EVENTUAL_CONSISTENCY) |
| is stronger than: |
| (deadline=5, on_configuration=None) |
| but not stronger than: |
| (deadline=5, on_configuration=None, read_policy=None) |
| or |
| (deadline=10, on_configuration=None, read_policy=None). |
| |
| More formally: |
| - Any value is stronger than an unset value; |
| - Any value is stronger than itself. |
| |
| Returns: |
| True if each of the self attributes is stronger than the |
| corresponding argument. |
| """ |
| for key, value in kwargs.iteritems(): |
| if key not in self._values or value != self._values[key]: |
| return False |
| return True |
| |
| @classmethod |
| def is_configuration(cls, obj): |
| """True if configuration obj handles all options of this class. |
| |
| Use this method rather than isinstance(obj, cls) to test if a |
| configuration object handles the options of cls (is_configuration |
| is handled specially for results of merge which may handle the options |
| of unrelated configuration classes). |
| |
| Args: |
| obj: the object to test. |
| """ |
| return isinstance(obj, BaseConfiguration) and obj._is_configuration(cls) |
| |
| def _is_configuration(self, cls): |
| return isinstance(self, cls) |
| |
| def merge(self, config): |
| """Merge two configurations. |
| |
| The configuration given as an argument (if any) takes priority; |
| defaults are filled in from the current configuration. |
| |
| Args: |
| config: Configuration providing overrides, or None (but cannot |
| be omitted). |
| |
| Returns: |
| Either a new configuration object or (if it would be equivalent) |
| self or the config argument unchanged, but never None. |
| |
| Raises: |
| BadArgumentError if self or config are of configurations classes |
| with conflicting options (i.e. the same option name defined in |
| two different configuration classes). |
| """ |
| if config is None or config is self: |
| |
| return self |
| |
| |
| |
| if not (isinstance(config, _MergedConfiguration) or |
| isinstance(self, _MergedConfiguration)): |
| |
| |
| |
| if isinstance(config, self.__class__): |
| for key in self._values: |
| if key not in config._values: |
| break |
| else: |
| return config |
| if isinstance(self, config.__class__): |
| if self.__is_stronger(**config._values): |
| return self |
| |
| |
| def _quick_merge(obj): |
| obj._values = self._values.copy() |
| obj._values.update(config._values) |
| return obj |
| |
| if isinstance(config, self.__class__): |
| return _quick_merge(type(config)()) |
| if isinstance(self, config.__class__): |
| return _quick_merge(type(self)()) |
| |
| |
| return _MergedConfiguration(config, self) |
| |
| def __getstate__(self): |
| return {'_values': self._values} |
| |
| def __setstate__(self, state): |
| |
| |
| obj = self.__class__(**state['_values']) |
| self._values = obj._values |
| |
| |
| class _MergedConfiguration(BaseConfiguration): |
| """Helper class to handle merges of configurations. |
| |
| Instances of _MergedConfiguration are in some sense "subclasses" of the |
| argument configurations, i.e.: |
| - they handle exactly the configuration options of the argument configurations |
| - the value of these options is taken in priority order from the arguments |
| - isinstance is true on this configuration if it is true on any of the |
| argument configurations |
| This class raises an exception if two argument configurations have an option |
| with the same name but coming from a different configuration class. |
| """ |
| __slots__ = ['_values', '_configs', '_options', '_classes'] |
| |
| def __new__(cls, *configs): |
| obj = super(BaseConfiguration, cls).__new__(cls) |
| obj._configs = configs |
| |
| |
| obj._options = {} |
| for config in configs: |
| for name, option in config._options.iteritems(): |
| if name in obj._options: |
| if option is not obj._options[name]: |
| error = ("merge conflict on '%s' from '%s' and '%s'" % |
| (name, option._cls.__name__, |
| obj._options[name]._cls.__name__)) |
| raise datastore_errors.BadArgumentError(error) |
| obj._options[name] = option |
| |
| obj._values = {} |
| for config in reversed(configs): |
| for name, value in config._values.iteritems(): |
| obj._values[name] = value |
| |
| return obj |
| |
| def __repr__(self): |
| return '%s%r' % (self.__class__.__name__, tuple(self._configs)) |
| |
| def _is_configuration(self, cls): |
| for config in self._configs: |
| if config._is_configuration(cls): |
| return True |
| return False |
| |
| def __getattr__(self, name): |
| if name in self._options: |
| if name in self._values: |
| return self._values[name] |
| else: |
| return None |
| raise AttributeError("Configuration has no attribute '%s'" % (name,)) |
| |
| def __getstate__(self): |
| return {'_configs': self._configs} |
| |
| def __setstate__(self, state): |
| |
| obj = _MergedConfiguration(*state['_configs']) |
| self._values = obj._values |
| self._configs = obj._configs |
| self._options = obj._options |
| |
| |
| class Configuration(BaseConfiguration): |
| """Configuration parameters for datastore RPCs. |
| |
| This class reserves the right to define configuration options of any name |
| except those that start with 'user_'. External subclasses should only define |
| function or variables with names that start with in 'user_'. |
| |
| The options defined on this class include generic RPC parameters (deadline) |
| but also datastore-specific parameters (on_completion and read_policy). |
| |
| Options are set by passing keyword arguments to the constructor corresponding |
| to the configuration options defined below. |
| """ |
| |
| |
| STRONG_CONSISTENCY = 0 |
| """A read consistency that will return up to date results.""" |
| |
| EVENTUAL_CONSISTENCY = 1 |
| """A read consistency that allows requests to return possibly stale results. |
| |
| This read_policy tends to be faster and less prone to unavailability/timeouts. |
| May return transactionally inconsistent results in rare cases. |
| """ |
| |
| APPLY_ALL_JOBS_CONSISTENCY = 2 |
| """A read consistency that aggressively tries to find write jobs to apply. |
| |
| Use of this read policy is strongly discouraged. |
| |
| This read_policy tends to be more costly and is only useful in a few specific |
| cases. It is equivalent to splitting a request by entity group and wrapping |
| each batch in a separate transaction. Cannot be used with non-ancestor |
| queries. |
| """ |
| |
| |
| ALL_READ_POLICIES = frozenset((STRONG_CONSISTENCY, |
| EVENTUAL_CONSISTENCY, |
| APPLY_ALL_JOBS_CONSISTENCY, |
| )) |
| |
| |
| |
| @ConfigOption |
| def deadline(value): |
| """The deadline for any RPC issued. |
| |
| If unset the system default will be used which is typically 5 seconds. |
| |
| Raises: |
| BadArgumentError if value is not a number or is less than zero. |
| """ |
| if not isinstance(value, (int, long, float)): |
| raise datastore_errors.BadArgumentError( |
| 'deadline argument should be int/long/float (%r)' % (value,)) |
| if value <= 0: |
| raise datastore_errors.BadArgumentError( |
| 'deadline argument should be > 0 (%r)' % (value,)) |
| return value |
| |
| @ConfigOption |
| def on_completion(value): |
| """A callback that is invoked when any RPC completes. |
| |
| If specified, it will be called with a UserRPC object as argument when an |
| RPC completes. |
| |
| NOTE: There is a subtle but important difference between |
| UserRPC.callback and Configuration.on_completion: on_completion is |
| called with the RPC object as its first argument, where callback is |
| called without arguments. (Because a Configuration's on_completion |
| function can be used with many UserRPC objects, it would be awkward |
| if it was called without passing the specific RPC.) |
| """ |
| |
| |
| return value |
| |
| @ConfigOption |
| def read_policy(value): |
| """The read policy to use for any relevent RPC. |
| |
| if unset STRONG_CONSISTENCY will be used. |
| |
| Raises: |
| BadArgumentError if value is not a known read policy. |
| """ |
| if value not in Configuration.ALL_READ_POLICIES: |
| raise datastore_errors.BadArgumentError( |
| 'read_policy argument invalid (%r)' % (value,)) |
| return value |
| |
| @ConfigOption |
| def force_writes(value): |
| """If a write request should succeed even if the app is read-only. |
| |
| This only applies to user controlled read-only periods. |
| """ |
| if not isinstance(value, bool): |
| raise datastore_errors.BadArgumentError( |
| 'force_writes argument invalid (%r)' % (value,)) |
| return value |
| |
| @ConfigOption |
| def max_entity_groups_per_rpc(value): |
| """The maximum number of entity groups that can be represented in one rpc. |
| |
| For a non-transactional operation that involves more entity groups than the |
| maximum, the operation will be performed by executing multiple, asynchronous |
| rpcs to the datastore, each of which has no more entity groups represented |
| than the maximum. So, if a put() operation has 8 entity groups and the |
| maximum is 3, we will send 3 rpcs, 2 with 3 entity groups and 1 with 2 |
| entity groups. This is a performance optimization - in many cases |
| multiple, small, concurrent rpcs will finish faster than a single large |
| rpc. The optimal value for this property will be application-specific, so |
| experimentation is encouraged. |
| """ |
| if not (isinstance(value, (int, long)) and value > 0): |
| raise datastore_errors.BadArgumentError( |
| 'max_entity_groups_per_rpc should be a positive integer') |
| return value |
| |
| @ConfigOption |
| def max_allocate_ids_keys(value): |
| """The maximum number of keys in a v4 AllocateIds rpc.""" |
| if not (isinstance(value, (int, long)) and value > 0): |
| raise datastore_errors.BadArgumentError( |
| 'max_allocate_ids_keys should be a positive integer') |
| return value |
| |
| @ConfigOption |
| def max_rpc_bytes(value): |
| """The maximum serialized size of a Get/Put/Delete without batching.""" |
| if not (isinstance(value, (int, long)) and value > 0): |
| raise datastore_errors.BadArgumentError( |
| 'max_rpc_bytes should be a positive integer') |
| return value |
| |
| @ConfigOption |
| def max_get_keys(value): |
| """The maximum number of keys in a Get without batching.""" |
| if not (isinstance(value, (int, long)) and value > 0): |
| raise datastore_errors.BadArgumentError( |
| 'max_get_keys should be a positive integer') |
| return value |
| |
| @ConfigOption |
| def max_put_entities(value): |
| """The maximum number of entities in a Put without batching.""" |
| if not (isinstance(value, (int, long)) and value > 0): |
| raise datastore_errors.BadArgumentError( |
| 'max_put_entities should be a positive integer') |
| return value |
| |
| @ConfigOption |
| def max_delete_keys(value): |
| """The maximum number of keys in a Delete without batching.""" |
| if not (isinstance(value, (int, long)) and value > 0): |
| raise datastore_errors.BadArgumentError( |
| 'max_delete_keys should be a positive integer') |
| return value |
| |
| |
| class _StubRpc(object): |
| """A stub RPC implementation. |
| |
| Returns a hard-coded result provided at construction time. |
| """ |
| |
| def __init__(self, result): |
| self.__result = result |
| |
| def wait(self): |
| pass |
| |
| def check_success(self): |
| pass |
| |
| def get_result(self): |
| return self.__result |
| |
| |
| class MultiRpc(object): |
| """A wrapper around multiple UserRPC objects. |
| |
| This provides an API similar to that of UserRPC, but wraps multiple |
| RPCs such that e.g. .wait() blocks until all wrapped RPCs are |
| complete, and .get_result() returns the combined results from all |
| wrapped RPCs. |
| |
| Class methods: |
| flatten(rpcs): Expand a list of UserRPCs and MultiRpcs |
| into a list of UserRPCs. |
| wait_any(rpcs): Call UserRPC.wait_any(flatten(rpcs)). |
| wait_all(rpcs): Call UserRPC.wait_all(flatten(rpcs)). |
| |
| Instance methods: |
| wait(): Wait for all RPCs. |
| check_success(): Wait and then check success for all RPCs. |
| get_result(): Wait for all, check successes, then merge |
| all results. |
| |
| Instance attributes: |
| rpcs: The list of wrapped RPCs (returns a copy). |
| state: The combined state of all RPCs. |
| """ |
| |
| def __init__(self, rpcs, extra_hook=None): |
| """Constructor. |
| |
| Args: |
| rpcs: A list of UserRPC and MultiRpc objects; it is flattened |
| before being stored. |
| extra_hook: Optional function to be applied to the final result |
| or list of results. |
| """ |
| self.__rpcs = self.flatten(rpcs) |
| self.__extra_hook = extra_hook |
| |
| @property |
| def rpcs(self): |
| """Get a flattened list containing the RPCs wrapped. |
| |
| This returns a copy to prevent users from modifying the state. |
| """ |
| return list(self.__rpcs) |
| |
| @property |
| def state(self): |
| """Get the combined state of the wrapped RPCs. |
| |
| This mimics the UserRPC.state property. If all wrapped RPCs have |
| the same state, that state is returned; otherwise, RUNNING is |
| returned (which here really means 'neither fish nor flesh'). |
| """ |
| lo = apiproxy_rpc.RPC.FINISHING |
| hi = apiproxy_rpc.RPC.IDLE |
| for rpc in self.__rpcs: |
| lo = min(lo, rpc.state) |
| hi = max(hi, rpc.state) |
| if lo == hi: |
| return lo |
| return apiproxy_rpc.RPC.RUNNING |
| |
| def wait(self): |
| """Wait for all wrapped RPCs to finish. |
| |
| This mimics the UserRPC.wait() method. |
| """ |
| apiproxy_stub_map.UserRPC.wait_all(self.__rpcs) |
| |
| def check_success(self): |
| """Check success of all wrapped RPCs, failing if any of the failed. |
| |
| This mimics the UserRPC.check_success() method. |
| |
| NOTE: This first waits for all wrapped RPCs to finish before |
| checking the success of any of them. This makes debugging easier. |
| """ |
| self.wait() |
| for rpc in self.__rpcs: |
| rpc.check_success() |
| |
| def get_result(self): |
| """Return the combined results of all wrapped RPCs. |
| |
| This mimics the UserRPC.get_results() method. Multiple results |
| are combined using the following rules: |
| |
| 1. If there are no wrapped RPCs, an empty list is returned. |
| |
| 2. If exactly one RPC is wrapped, its result is returned. |
| |
| 3. If more than one RPC is wrapped, the result is always a list, |
| which is constructed from the wrapped results as follows: |
| |
| a. A wrapped result equal to None is ignored; |
| |
| b. A wrapped result that is a list (but not any other type of |
| sequence!) has its elements added to the result list. |
| |
| c. Any other wrapped result is appended to the result list. |
| |
| After all results are combined, if __extra_hook is set, it is |
| called with the combined results and its return value becomes the |
| final result. |
| |
| NOTE: This first waits for all wrapped RPCs to finish, and then |
| checks all their success. This makes debugging easier. |
| """ |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if len(self.__rpcs) == 1: |
| results = self.__rpcs[0].get_result() |
| else: |
| results = [] |
| |
| |
| for rpc in self.__rpcs: |
| result = rpc.get_result() |
| if isinstance(result, list): |
| results.extend(result) |
| elif result is not None: |
| results.append(result) |
| if self.__extra_hook is not None: |
| results = self.__extra_hook(results) |
| return results |
| |
| @classmethod |
| def flatten(cls, rpcs): |
| """Return a list of UserRPCs, expanding MultiRpcs in the argument list. |
| |
| For example: given 4 UserRPCs rpc1 through rpc4, |
| flatten(rpc1, MultiRpc([rpc2, rpc3], rpc4) |
| returns [rpc1, rpc2, rpc3, rpc4]. |
| |
| Args: |
| rpcs: A list of UserRPC and MultiRpc objects. |
| |
| Returns: |
| A list of UserRPC objects. |
| """ |
| flat = [] |
| for rpc in rpcs: |
| if isinstance(rpc, MultiRpc): |
| |
| |
| |
| flat.extend(rpc.__rpcs) |
| else: |
| if not isinstance(rpc, apiproxy_stub_map.UserRPC): |
| raise datastore_errors.BadArgumentError( |
| 'Expected a list of UserRPC object (%r)' % (rpc,)) |
| flat.append(rpc) |
| return flat |
| |
| @classmethod |
| def wait_any(cls, rpcs): |
| """Wait until one of the RPCs passed in is finished. |
| |
| This mimics UserRPC.wait_any(). |
| |
| Args: |
| rpcs: A list of UserRPC and MultiRpc objects. |
| |
| Returns: |
| A UserRPC object or None. |
| """ |
| return apiproxy_stub_map.UserRPC.wait_any(cls.flatten(rpcs)) |
| |
| @classmethod |
| def wait_all(cls, rpcs): |
| """Wait until all RPCs passed in are finished. |
| |
| This mimics UserRPC.wait_all(). |
| |
| Args: |
| rpcs: A list of UserRPC and MultiRpc objects. |
| """ |
| apiproxy_stub_map.UserRPC.wait_all(cls.flatten(rpcs)) |
| |
| |
| class BaseConnection(object): |
| """Datastore connection base class. |
| |
| NOTE: Do not instantiate this class; use Connection or |
| TransactionalConnection instead. |
| |
| This is not a traditional database connection -- with App Engine, in |
| the end the connection is always implicit in the process state. |
| There is also no intent to be compatible with PEP 249 (Python's |
| Database-API). But it is a useful abstraction to have an explicit |
| object that manages the database interaction, and especially |
| transactions. Other settings related to the App Engine datastore |
| are also stored here (e.g. the RPC timeout). |
| |
| A similar class in the Java API to the App Engine datastore is |
| DatastoreServiceConfig (but in Java, transaction state is always |
| held by the current thread). |
| |
| To use transactions, call connection.new_transaction(). This |
| returns a new connection (an instance of the TransactionalConnection |
| subclass) which you should use for all operations in the |
| transaction. |
| |
| This model supports multiple unrelated concurrent transactions (but |
| not nested transactions as this concept is commonly understood in |
| the relational database world). |
| |
| When the transaction is done, call .commit() or .rollback() on the |
| transactional connection. If .commit() returns False, the |
| transaction failed and none of your operations made it to the |
| datastore; if it returns True, all your operations were committed. |
| The transactional connection cannot be used once .commit() or |
| .rollback() is called. |
| |
| Transactions are created lazily. The first operation that requires |
| a transaction handle will issue the low-level BeginTransaction |
| request and wait for it to return. |
| |
| Transactions keep track of the entity group. All operations within |
| a transaction must use the same entity group. An entity group |
| (currently) comprises an app id, a namespace, and a top-level key (a |
| kind and an id or name). The first operation performed determines |
| the entity group. There is some special-casing when the first |
| operation is a put() of an entity with an incomplete key; in this case |
| the entity group is determined after the operation returns. |
| |
| NOTE: the datastore stubs in the dev_appserver currently support |
| only a single concurrent transaction. Specifically, the (old) file |
| stub locks up if an attempt is made to start a new transaction while |
| a transaction is already in use, whereas the sqlite stub fails an |
| assertion. |
| """ |
| |
| UNKNOWN_DATASTORE = 0 |
| MASTER_SLAVE_DATASTORE = 1 |
| HIGH_REPLICATION_DATASTORE = 2 |
| |
| __SUPPORTED_VERSIONS = frozenset((_DATASTORE_V3, _DATASTORE_V4)) |
| |
| @_positional(1) |
| def __init__(self, adapter=None, config=None, _api_version=_DATASTORE_V3): |
| """Constructor. |
| |
| All arguments should be specified as keyword arguments. |
| |
| Args: |
| adapter: Optional AbstractAdapter subclass instance; |
| default IdentityAdapter. |
| config: Optional Configuration object. |
| """ |
| if adapter is None: |
| adapter = IdentityAdapter() |
| if not isinstance(adapter, AbstractAdapter): |
| raise datastore_errors.BadArgumentError( |
| 'invalid adapter argument (%r)' % (adapter,)) |
| self.__adapter = adapter |
| |
| if config is None: |
| config = Configuration() |
| elif not Configuration.is_configuration(config): |
| raise datastore_errors.BadArgumentError( |
| 'invalid config argument (%r)' % (config,)) |
| self.__config = config |
| |
| if _api_version not in self.__SUPPORTED_VERSIONS: |
| raise datastore_errors.BadArgumentError( |
| 'unsupported API version (%s)' % (_api_version,)) |
| self._api_version = _api_version |
| |
| self.__pending_rpcs = set() |
| |
| |
| |
| @property |
| def adapter(self): |
| """The adapter used by this connection.""" |
| return self.__adapter |
| |
| @property |
| def config(self): |
| """The default configuration used by this connection.""" |
| return self.__config |
| |
| |
| |
| |
| def _add_pending(self, rpc): |
| """Add an RPC object to the list of pending RPCs. |
| |
| The argument must be a UserRPC object, not a MultiRpc object. |
| """ |
| assert not isinstance(rpc, MultiRpc) |
| self.__pending_rpcs.add(rpc) |
| |
| def _remove_pending(self, rpc): |
| """Remove an RPC object from the list of pending RPCs. |
| |
| If the argument is a MultiRpc object, the wrapped RPCs are removed |
| from the list of pending RPCs. |
| """ |
| if isinstance(rpc, MultiRpc): |
| |
| |
| for wrapped_rpc in rpc._MultiRpc__rpcs: |
| self._remove_pending(wrapped_rpc) |
| else: |
| try: |
| self.__pending_rpcs.remove(rpc) |
| except KeyError: |
| |
| |
| pass |
| |
| def is_pending(self, rpc): |
| """Check whether an RPC object is currently pending. |
| |
| Note that 'pending' in this context refers to an RPC associated |
| with this connection for which _remove_pending() hasn't been |
| called yet; normally this is called by check_rpc_success() which |
| itself is called by the various result hooks. A pending RPC may |
| be in the RUNNING or FINISHING state. |
| |
| If the argument is a MultiRpc object, this returns true if at least |
| one of its wrapped RPCs is pending. |
| """ |
| if isinstance(rpc, MultiRpc): |
| for wrapped_rpc in rpc._MultiRpc__rpcs: |
| if self.is_pending(wrapped_rpc): |
| return True |
| return False |
| else: |
| return rpc in self.__pending_rpcs |
| |
| def get_pending_rpcs(self): |
| """Return (a copy of) the list of currently pending RPCs.""" |
| return set(self.__pending_rpcs) |
| |
| def get_datastore_type(self, app=None): |
| """Tries to get the datastore type for the given app. |
| |
| This function is only guaranteed to return something other than |
| UNKNOWN_DATASTORE when running in production and querying the current app. |
| """ |
| return _GetDatastoreType(app) |
| |
| def wait_for_all_pending_rpcs(self): |
| """Wait for all currently pending RPCs to complete.""" |
| while self.__pending_rpcs: |
| try: |
| rpc = apiproxy_stub_map.UserRPC.wait_any(self.__pending_rpcs) |
| except Exception: |
| |
| |
| |
| |
| logging.info('wait_for_all_pending_rpcs(): exception in wait_any()', |
| exc_info=True) |
| continue |
| if rpc is None: |
| logging.debug('wait_any() returned None') |
| continue |
| assert rpc.state == apiproxy_rpc.RPC.FINISHING |
| if rpc in self.__pending_rpcs: |
| |
| |
| |
| |
| |
| |
| try: |
| self.check_rpc_success(rpc) |
| except Exception: |
| |
| logging.info('wait_for_all_pending_rpcs(): ' |
| 'exception in check_rpc_success()', |
| exc_info=True) |
| |
| |
| |
| |
| def _create_rpc(self, config=None, service_name=None): |
| """Create an RPC object using the configuration parameters. |
| |
| Internal only. |
| |
| Args: |
| config: Optional Configuration object. |
| service_name: Optional datastore service name. |
| |
| Returns: |
| A new UserRPC object with the designated settings. |
| |
| NOTES: |
| |
| (1) The RPC object returned can only be used to make a single call |
| (for details see apiproxy_stub_map.UserRPC). |
| |
| (2) To make a call, use one of the specific methods on the |
| Connection object, such as conn.put(entities). This sends the |
| call to the server but does not wait. To wait for the call to |
| finish and get the result, call rpc.get_result(). |
| """ |
| deadline = Configuration.deadline(config, self.__config) |
| on_completion = Configuration.on_completion(config, self.__config) |
| callback = None |
| if service_name is None: |
| |
| |
| |
| |
| |
| service_name = self._api_version |
| if on_completion is not None: |
| |
| |
| def callback(): |
| return on_completion(rpc) |
| rpc = apiproxy_stub_map.UserRPC(service_name, deadline, callback) |
| return rpc |
| |
| |
| create_rpc = _create_rpc |
| |
| def _set_request_read_policy(self, request, config=None): |
| """Set the read policy on a request. |
| |
| This takes the read policy from the config argument or the |
| configuration's default configuration, and sets the request's read |
| options. |
| |
| Args: |
| request: A read request protobuf. |
| config: Optional Configuration object. |
| |
| Returns: |
| True if the read policy specifies a read current request, False if it |
| specifies an eventually consistent request, None if it does |
| not specify a read consistency. |
| """ |
| |
| if isinstance(config, apiproxy_stub_map.UserRPC): |
| read_policy = getattr(config, 'read_policy', None) |
| else: |
| read_policy = Configuration.read_policy(config) |
| |
| |
| if read_policy is None: |
| read_policy = self.__config.read_policy |
| |
| if hasattr(request, 'set_failover_ms') and hasattr(request, 'strong'): |
| |
| if read_policy == Configuration.APPLY_ALL_JOBS_CONSISTENCY: |
| request.set_strong(True) |
| return True |
| elif read_policy == Configuration.EVENTUAL_CONSISTENCY: |
| request.set_strong(False) |
| |
| |
| |
| request.set_failover_ms(-1) |
| return False |
| else: |
| return None |
| elif hasattr(request, 'read_options'): |
| |
| |
| |
| if read_policy == Configuration.EVENTUAL_CONSISTENCY: |
| request.mutable_read_options().set_read_consistency( |
| datastore_v4_pb.ReadOptions.EVENTUAL) |
| return False |
| else: |
| return None |
| else: |
| raise datastore_errors.BadRequestError( |
| 'read_policy is only supported on read operations.') |
| |
| def _set_request_transaction(self, request): |
| """Set the current transaction on a request. |
| |
| NOTE: This version of the method does nothing. The version |
| overridden by TransactionalConnection is the real thing. |
| |
| Args: |
| request: A protobuf with a transaction field. |
| |
| Returns: |
| An object representing a transaction or None. |
| """ |
| return None |
| |
| def _make_rpc_call(self, config, method, request, response, |
| get_result_hook=None, user_data=None, |
| service_name=None): |
| """Make an RPC call. |
| |
| Internal only. |
| |
| Except for the added config argument, this is a thin wrapper |
| around UserRPC.make_call(). |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| method: The method name. |
| request: The request protocol buffer. |
| response: The response protocol buffer. |
| get_result_hook: Optional get-result hook function. If not None, |
| this must be a function with exactly one argument, the RPC |
| object (self). Its return value is returned from get_result(). |
| user_data: Optional additional arbitrary data for the get-result |
| hook function. This can be accessed as rpc.user_data. The |
| type of this value is up to the service module. |
| |
| Returns: |
| The UserRPC object used for the call. |
| """ |
| |
| |
| if isinstance(config, apiproxy_stub_map.UserRPC): |
| rpc = config |
| else: |
| rpc = self._create_rpc(config, service_name) |
| rpc.make_call(method, request, response, get_result_hook, user_data) |
| self._add_pending(rpc) |
| return rpc |
| |
| |
| make_rpc_call = _make_rpc_call |
| |
| def check_rpc_success(self, rpc): |
| """Check for RPC success and translate exceptions. |
| |
| This wraps rpc.check_success() and should be called instead of that. |
| |
| This also removes the RPC from the list of pending RPCs, once it |
| has completed. |
| |
| Args: |
| rpc: A UserRPC or MultiRpc object. |
| |
| Raises: |
| Nothing if the call succeeded; various datastore_errors.Error |
| subclasses if ApplicationError was raised by rpc.check_success(). |
| """ |
| try: |
| rpc.wait() |
| finally: |
| |
| |
| self._remove_pending(rpc) |
| try: |
| rpc.check_success() |
| except apiproxy_errors.ApplicationError, err: |
| raise _ToDatastoreError(err) |
| |
| |
| |
| |
| |
| MAX_RPC_BYTES = 1024 * 1024 |
| MAX_GET_KEYS = 1000 |
| MAX_PUT_ENTITIES = 500 |
| MAX_DELETE_KEYS = 500 |
| MAX_ALLOCATE_IDS_KEYS = 500 |
| |
| |
| DEFAULT_MAX_ENTITY_GROUPS_PER_RPC = 10 |
| |
| |
| |
| |
| def __get_max_entity_groups_per_rpc(self, config): |
| """Internal helper: figures out max_entity_groups_per_rpc for the config.""" |
| return Configuration.max_entity_groups_per_rpc( |
| config, self.__config) or self.DEFAULT_MAX_ENTITY_GROUPS_PER_RPC |
| |
| def _extract_entity_group(self, value): |
| """Internal helper: extracts the entity group from a key or entity. |
| |
| Supports both v3 and v4 protobufs. |
| |
| Args: |
| value: an entity_pb.{Reference, EntityProto} or |
| entity_v4_pb.{Key, Entity}. |
| |
| Returns: |
| A tuple consisting of: |
| - kind |
| - name, id, or ('new', unique id) |
| """ |
| if (isinstance(value, entity_v4_pb.Entity) |
| or isinstance(value, entity_pb.EntityProto)): |
| value = value.key() |
| if isinstance(value, entity_v4_pb.Key): |
| elem = value.path_element(0) |
| kind = elem.kind() |
| else: |
| elem = value.path().element(0) |
| kind = elem.type() |
| |
| |
| return (kind, elem.id() or elem.name() or ('new', id(elem))) |
| |
| def _map_and_group(self, values, map_fn, group_fn): |
| """Internal helper: map values to keys and group by key. Here key is any |
| object derived from an input value by map_fn, and which can be grouped |
| by group_fn. |
| |
| Args: |
| values: The values to be grouped by applying get_group(to_ref(value)). |
| map_fn: a function that maps a value to a key to be grouped. |
| group_fn: a function that groups the keys output by map_fn. |
| |
| Returns: |
| A list where each element is a list of (key, index) pairs. Here |
| index is the location of the value from which the key was derived in |
| the original list. |
| """ |
| indexed_key_groups = collections.defaultdict(list) |
| for index, value in enumerate(values): |
| key = map_fn(value) |
| indexed_key_groups[group_fn(key)].append((key, index)) |
| return indexed_key_groups.values() |
| |
| def __create_result_index_pairs(self, indexes): |
| """Internal helper: build a function that ties an index with each result. |
| |
| Args: |
| indexes: A list of integers. A value x at location y in the list means |
| that the result at location y in the result list needs to be at location |
| x in the list of results returned to the user. |
| """ |
| def create_result_index_pairs(results): |
| return zip(results, indexes) |
| return create_result_index_pairs |
| |
| def __sort_result_index_pairs(self, extra_hook): |
| """Builds a function that sorts the indexed results. |
| |
| Args: |
| extra_hook: A function that the returned function will apply to its result |
| before returning. |
| |
| Returns: |
| A function that takes a list of results and reorders them to match the |
| order in which the input values associated with each results were |
| originally provided. |
| """ |
| |
| def sort_result_index_pairs(result_index_pairs): |
| results = [None] * len(result_index_pairs) |
| for result, index in result_index_pairs: |
| results[index] = result |
| if extra_hook is not None: |
| results = extra_hook(results) |
| return results |
| return sort_result_index_pairs |
| |
| def _generate_pb_lists(self, grouped_values, base_size, max_count, |
| max_groups, config): |
| """Internal helper: repeatedly yield a list of 2 elements. |
| |
| Args: |
| grouped_values: A list of lists. The inner lists consist of objects |
| grouped by e.g. entity group or id sequence. |
| |
| base_size: An integer representing the base size of an rpc. Used for |
| splitting operations across multiple RPCs due to size limitations. |
| |
| max_count: An integer representing the maximum number of objects we can |
| send in an rpc. Used for splitting operations across multiple RPCs. |
| |
| max_groups: An integer representing the maximum number of groups we can |
| have represented in an rpc. Can be None, in which case no constraint. |
| |
| config: The config object, defining max rpc size in bytes. |
| |
| Yields: |
| Repeatedly yields 2 element tuples. The first element is a list of |
| protobufs to send in one batch. The second element is a list containing |
| the original location of those protobufs (expressed as an index) in the |
| input. |
| """ |
| max_size = (Configuration.max_rpc_bytes(config, self.__config) or |
| self.MAX_RPC_BYTES) |
| pbs = [] |
| pb_indexes = [] |
| size = base_size |
| num_groups = 0 |
| for indexed_pbs in grouped_values: |
| num_groups += 1 |
| if max_groups is not None and num_groups > max_groups: |
| yield (pbs, pb_indexes) |
| pbs = [] |
| pb_indexes = [] |
| size = base_size |
| num_groups = 1 |
| for indexed_pb in indexed_pbs: |
| (pb, index) = indexed_pb |
| |
| incr_size = pb.lengthString(pb.ByteSize()) + 1 |
| |
| |
| |
| |
| if (not isinstance(config, apiproxy_stub_map.UserRPC) and |
| (len(pbs) >= max_count or (pbs and size + incr_size > max_size))): |
| yield (pbs, pb_indexes) |
| pbs = [] |
| pb_indexes = [] |
| size = base_size |
| num_groups = 1 |
| pbs.append(pb) |
| pb_indexes.append(index) |
| size += incr_size |
| yield (pbs, pb_indexes) |
| |
| def __force(self, req): |
| """Configure a request to force mutations.""" |
| if isinstance(req, datastore_v4_pb.CommitRequest): |
| req.mutable_deprecated_mutation().set_force(True) |
| else: |
| req.set_force(True) |
| |
| def get(self, keys): |
| """Synchronous Get operation. |
| |
| Args: |
| keys: An iterable of user-level key objects. |
| |
| Returns: |
| A list of user-level entity objects and None values, corresponding |
| 1:1 to the argument keys. A None means there is no entity for the |
| corresponding key. |
| """ |
| return self.async_get(None, keys).get_result() |
| |
| def async_get(self, config, keys, extra_hook=None): |
| """Asynchronous Get operation. |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| keys: An iterable of user-level key objects. |
| extra_hook: Optional function to be called on the result once the |
| RPC has completed. |
| |
| Returns: |
| A MultiRpc object. |
| """ |
| |
| def make_get_call(base_req, pbs, extra_hook=None): |
| req = copy.deepcopy(base_req) |
| req.key_list().extend(pbs) |
| if self._api_version == _DATASTORE_V4: |
| method = 'Lookup' |
| resp = datastore_v4_pb.LookupResponse() |
| else: |
| method = 'Get' |
| resp = datastore_pb.GetResponse() |
| |
| |
| |
| |
| |
| |
| |
| user_data = config, pbs, extra_hook |
| return self._make_rpc_call(config, method, req, resp, |
| get_result_hook=self.__get_hook, |
| user_data=user_data, |
| service_name=self._api_version) |
| |
| if self._api_version == _DATASTORE_V4: |
| base_req = datastore_v4_pb.LookupRequest() |
| key_to_pb = self.__adapter.key_to_pb_v4 |
| else: |
| base_req = datastore_pb.GetRequest() |
| base_req.set_allow_deferred(True) |
| key_to_pb = self.__adapter.key_to_pb |
| is_read_current = self._set_request_read_policy(base_req, config) |
| txn = self._set_request_transaction(base_req) |
| |
| |
| if isinstance(config, apiproxy_stub_map.UserRPC) or len(keys) <= 1: |
| pbs = [key_to_pb(key) for key in keys] |
| return make_get_call(base_req, pbs, extra_hook) |
| |
| max_count = (Configuration.max_get_keys(config, self.__config) or |
| self.MAX_GET_KEYS) |
| |
| indexed_keys_by_entity_group = self._map_and_group( |
| keys, key_to_pb, self._extract_entity_group) |
| |
| if is_read_current is None: |
| is_read_current = (self.get_datastore_type() == |
| BaseConnection.HIGH_REPLICATION_DATASTORE) |
| |
| |
| |
| |
| if is_read_current and txn is None: |
| max_egs_per_rpc = self.__get_max_entity_groups_per_rpc(config) |
| else: |
| max_egs_per_rpc = None |
| |
| |
| |
| pbsgen = self._generate_pb_lists(indexed_keys_by_entity_group, |
| base_req.ByteSize(), max_count, |
| max_egs_per_rpc, config) |
| |
| rpcs = [] |
| for pbs, indexes in pbsgen: |
| rpcs.append(make_get_call(base_req, pbs, |
| self.__create_result_index_pairs(indexes))) |
| return MultiRpc(rpcs, self.__sort_result_index_pairs(extra_hook)) |
| |
| def __get_hook(self, rpc): |
| """Internal method used as get_result_hook for Get operation.""" |
| self.check_rpc_success(rpc) |
| |
| |
| |
| config, keys_from_request, extra_hook = rpc.user_data |
| |
| if self._api_version == _DATASTORE_V3 and rpc.response.in_order(): |
| |
| |
| entities = [] |
| for entity_result in rpc.response.entity_list(): |
| if entity_result.has_entity(): |
| entity = self.__adapter.pb_to_entity(entity_result.entity()) |
| else: |
| entity = None |
| entities.append(entity) |
| else: |
| |
| current_get_response = rpc.response |
| result_dict = {} |
| self.__add_get_response_entities_to_dict(current_get_response, |
| result_dict) |
| |
| |
| |
| if self._api_version == _DATASTORE_V4: |
| method = 'Lookup' |
| deferred_resp = datastore_v4_pb.LookupResponse() |
| else: |
| method = 'Get' |
| deferred_resp = datastore_pb.GetResponse() |
| deferred_req = copy.deepcopy(rpc.request) |
| while current_get_response.deferred_list(): |
| deferred_req.clear_key() |
| deferred_req.key_list().extend(current_get_response.deferred_list()) |
| deferred_resp.Clear() |
| deferred_rpc = self._make_rpc_call(config, method, |
| deferred_req, deferred_resp, |
| service_name=self._api_version) |
| deferred_rpc.get_result() |
| current_get_response = deferred_rpc.response |
| |
| |
| self.__add_get_response_entities_to_dict(current_get_response, |
| result_dict) |
| |
| |
| |
| entities = [result_dict.get(datastore_types.ReferenceToKeyValue(pb)) |
| for pb in keys_from_request] |
| |
| |
| |
| if extra_hook is not None: |
| entities = extra_hook(entities) |
| |
| return entities |
| |
| def __add_get_response_entities_to_dict(self, get_response, result_dict): |
| """Converts entities from the get response and adds them to the dict. |
| |
| The Key for the dict will be calculated via |
| datastore_types.ReferenceToKeyValue. There will be no entry for entities |
| that were not found. |
| |
| Args: |
| get_response: A datastore_pb.GetResponse or |
| datastore_v4_pb.LookupResponse. |
| result_dict: The dict to add results to. |
| """ |
| if isinstance(get_response, datastore_v4_pb.LookupResponse): |
| for result in get_response.found_list(): |
| v4_key = result.entity().key() |
| entity = self.__adapter.pb_v4_to_entity(result.entity()) |
| result_dict[datastore_types.ReferenceToKeyValue(v4_key)] = entity |
| else: |
| for entity_result in get_response.entity_list(): |
| |
| if entity_result.has_entity(): |
| |
| |
| |
| |
| reference_pb = entity_result.entity().key() |
| hashable_key = datastore_types.ReferenceToKeyValue(reference_pb) |
| entity = self.__adapter.pb_to_entity(entity_result.entity()) |
| result_dict[hashable_key] = entity |
| |
| def get_indexes(self): |
| """Synchronous get indexes operation. |
| |
| Returns: |
| user-level indexes representation |
| """ |
| return self.async_get_indexes(None).get_result() |
| |
| def async_get_indexes(self, config, extra_hook=None, _app=None): |
| """Asynchronous get indexes operation. |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| extra_hook: Optional function to be called once the RPC has completed. |
| |
| Returns: |
| A MultiRpc object. |
| """ |
| req = api_base_pb.StringProto() |
| req.set_value(datastore_types.ResolveAppId(_app)) |
| resp = datastore_pb.CompositeIndices() |
| return self._make_rpc_call(config, 'GetIndices', req, resp, |
| get_result_hook=self.__get_indexes_hook, |
| user_data=extra_hook, |
| service_name=_DATASTORE_V3) |
| |
| def __get_indexes_hook(self, rpc): |
| """Internal method used as get_result_hook for Get operation.""" |
| self.check_rpc_success(rpc) |
| indexes = [self.__adapter.pb_to_index(index) |
| for index in rpc.response.index_list()] |
| if rpc.user_data: |
| indexes = rpc.user_data(indexes) |
| return indexes |
| |
| def put(self, entities): |
| """Synchronous Put operation. |
| |
| Args: |
| entities: An iterable of user-level entity objects. |
| |
| Returns: |
| A list of user-level key objects, corresponding 1:1 to the |
| argument entities. |
| |
| NOTE: If any of the entities has an incomplete key, this will |
| *not* patch up those entities with the complete key. |
| """ |
| return self.async_put(None, entities).get_result() |
| |
| def async_put(self, config, entities, extra_hook=None): |
| """Asynchronous Put operation. |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| entities: An iterable of user-level entity objects. |
| extra_hook: Optional function to be called on the result once the |
| RPC has completed. |
| |
| Returns: |
| A MultiRpc object. |
| |
| NOTE: If any of the entities has an incomplete key, this will |
| *not* patch up those entities with the complete key. |
| """ |
| |
| def make_put_call(base_req, pbs, user_data=None): |
| req = copy.deepcopy(base_req) |
| if self._api_version == _DATASTORE_V4: |
| deprecated_mutation = req.mutable_deprecated_mutation() |
| for entity in pbs: |
| if datastore_pbs.is_complete_v4_key(entity.key()): |
| deprecated_mutation.upsert_list().append(entity) |
| else: |
| deprecated_mutation.insert_auto_id_list().append(entity) |
| method = 'Commit' |
| resp = datastore_v4_pb.CommitResponse() |
| else: |
| req.entity_list().extend(pbs) |
| method = 'Put' |
| resp = datastore_pb.PutResponse() |
| user_data = pbs, user_data |
| return self._make_rpc_call(config, method, req, resp, |
| get_result_hook=self.__put_hook, |
| user_data=user_data, |
| service_name=self._api_version) |
| |
| |
| if self._api_version == _DATASTORE_V4: |
| base_req = datastore_v4_pb.CommitRequest() |
| base_req.set_mode(datastore_v4_pb.CommitRequest.NON_TRANSACTIONAL) |
| entity_to_pb = self.__adapter.entity_to_pb_v4 |
| else: |
| base_req = datastore_pb.PutRequest() |
| entity_to_pb = self.__adapter.entity_to_pb |
| self._set_request_transaction(base_req) |
| if Configuration.force_writes(config, self.__config): |
| self.__force(base_req) |
| |
| |
| if isinstance(config, apiproxy_stub_map.UserRPC) or len(entities) <= 1: |
| pbs = [entity_to_pb(entity) for entity in entities] |
| return make_put_call(base_req, pbs, extra_hook) |
| |
| max_count = (Configuration.max_put_entities(config, self.__config) or |
| self.MAX_PUT_ENTITIES) |
| if not base_req.has_transaction(): |
| max_egs_per_rpc = self.__get_max_entity_groups_per_rpc(config) |
| else: |
| max_egs_per_rpc = None |
| |
| indexed_entities_by_entity_group = self._map_and_group( |
| entities, entity_to_pb, self._extract_entity_group) |
| |
| |
| |
| pbsgen = self._generate_pb_lists(indexed_entities_by_entity_group, |
| base_req.ByteSize(), max_count, |
| max_egs_per_rpc, config) |
| |
| rpcs = [] |
| for pbs, indexes in pbsgen: |
| rpcs.append(make_put_call(base_req, pbs, |
| self.__create_result_index_pairs(indexes))) |
| return MultiRpc(rpcs, self.__sort_result_index_pairs(extra_hook)) |
| |
| def __put_hook(self, rpc): |
| """Internal method used as get_result_hook for Put operation.""" |
| self.check_rpc_success(rpc) |
| entities_from_request, extra_hook = rpc.user_data |
| |
| if isinstance(rpc.response, datastore_v4_pb.CommitResponse): |
| keys = [] |
| i = 0 |
| for entity in entities_from_request: |
| if datastore_pbs.is_complete_v4_key(entity.key()): |
| keys.append(entity.key()) |
| else: |
| keys.append( |
| rpc.response.deprecated_mutation_result().insert_auto_id_key(i)) |
| i += 1 |
| keys = [self.__adapter.pb_v4_to_key(key) for key in keys] |
| else: |
| keys = [self.__adapter.pb_to_key(key) for key in rpc.response.key_list()] |
| |
| |
| |
| if extra_hook is not None: |
| keys = extra_hook(keys) |
| return keys |
| |
| def delete(self, keys): |
| """Synchronous Delete operation. |
| |
| Args: |
| keys: An iterable of user-level key objects. |
| |
| Returns: |
| None. |
| """ |
| return self.async_delete(None, keys).get_result() |
| |
| def async_delete(self, config, keys, extra_hook=None): |
| """Asynchronous Delete operation. |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| keys: An iterable of user-level key objects. |
| extra_hook: Optional function to be called once the RPC has completed. |
| |
| Returns: |
| A MultiRpc object. |
| """ |
| |
| def make_delete_call(base_req, pbs, user_data=None): |
| req = copy.deepcopy(base_req) |
| if self._api_version == _DATASTORE_V4: |
| req.mutable_deprecated_mutation().delete_list().extend(pbs) |
| method = 'Commit' |
| resp = datastore_v4_pb.CommitResponse() |
| else: |
| req.key_list().extend(pbs) |
| method = 'Delete' |
| resp = datastore_pb.DeleteResponse() |
| return self._make_rpc_call(config, method, req, resp, |
| get_result_hook=self.__delete_hook, |
| user_data=user_data, |
| service_name=self._api_version) |
| |
| |
| if self._api_version == _DATASTORE_V4: |
| base_req = datastore_v4_pb.CommitRequest() |
| base_req.set_mode(datastore_v4_pb.CommitRequest.NON_TRANSACTIONAL) |
| key_to_pb = self.__adapter.key_to_pb_v4 |
| else: |
| base_req = datastore_pb.DeleteRequest() |
| key_to_pb = self.__adapter.key_to_pb |
| self._set_request_transaction(base_req) |
| if Configuration.force_writes(config, self.__config): |
| self.__force(base_req) |
| |
| |
| if isinstance(config, apiproxy_stub_map.UserRPC) or len(keys) <= 1: |
| pbs = [key_to_pb(key) for key in keys] |
| return make_delete_call(base_req, pbs, extra_hook) |
| |
| max_count = (Configuration.max_delete_keys(config, self.__config) or |
| self.MAX_DELETE_KEYS) |
| if not base_req.has_transaction(): |
| max_egs_per_rpc = self.__get_max_entity_groups_per_rpc(config) |
| else: |
| max_egs_per_rpc = None |
| |
| indexed_keys_by_entity_group = self._map_and_group( |
| keys, key_to_pb, self._extract_entity_group) |
| |
| |
| |
| pbsgen = self._generate_pb_lists(indexed_keys_by_entity_group, |
| base_req.ByteSize(), max_count, |
| max_egs_per_rpc, config) |
| |
| rpcs = [] |
| for pbs, _ in pbsgen: |
| rpcs.append(make_delete_call(base_req, pbs)) |
| return MultiRpc(rpcs, extra_hook) |
| |
| def __delete_hook(self, rpc): |
| """Internal method used as get_result_hook for Delete operation.""" |
| self.check_rpc_success(rpc) |
| if rpc.user_data is not None: |
| |
| rpc.user_data(None) |
| |
| |
| |
| def begin_transaction(self, app): |
| """Syncnronous BeginTransaction operation. |
| |
| NOTE: In most cases the new_transaction() method is preferred, |
| since that returns a TransactionalConnection object which will |
| begin the transaction lazily. |
| |
| Args: |
| app: Application ID. |
| |
| Returns: |
| An object representing a transaction or None. |
| """ |
| return self.async_begin_transaction(None, app).get_result() |
| |
| def async_begin_transaction(self, config, app): |
| """Asynchronous BeginTransaction operation. |
| |
| Args: |
| config: A configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| app: Application ID. |
| |
| Returns: |
| A MultiRpc object. |
| """ |
| if not isinstance(app, basestring) or not app: |
| raise datastore_errors.BadArgumentError( |
| 'begin_transaction requires an application id argument (%r)' % |
| (app,)) |
| |
| if self._api_version == _DATASTORE_V4: |
| req = datastore_v4_pb.BeginTransactionRequest() |
| if TransactionOptions.xg(config, self.config): |
| req.set_cross_group(True) |
| resp = datastore_v4_pb.BeginTransactionResponse() |
| else: |
| req = datastore_pb.BeginTransactionRequest() |
| req.set_app(app) |
| if (TransactionOptions.xg(config, self.__config)): |
| req.set_allow_multiple_eg(True) |
| resp = datastore_pb.Transaction() |
| |
| return self._make_rpc_call(config, 'BeginTransaction', req, resp, |
| get_result_hook=self.__begin_transaction_hook, |
| service_name=self._api_version) |
| |
| def __begin_transaction_hook(self, rpc): |
| """Internal method used as get_result_hook for BeginTransaction.""" |
| self.check_rpc_success(rpc) |
| if isinstance(rpc.response, datastore_v4_pb.BeginTransactionResponse): |
| return rpc.response.transaction() |
| else: |
| return rpc.response |
| |
| |
| class Connection(BaseConnection): |
| """Transaction-less connection class. |
| |
| This contains those operations that are not allowed on transactional |
| connections. (Currently only allocate_ids and reserve_key_ids.) |
| """ |
| |
| @_positional(1) |
| def __init__(self, adapter=None, config=None, _api_version=_DATASTORE_V3): |
| """Constructor. |
| |
| All arguments should be specified as keyword arguments. |
| |
| Args: |
| adapter: Optional AbstractAdapter subclass instance; |
| default IdentityAdapter. |
| config: Optional Configuration object. |
| """ |
| super(Connection, self).__init__(adapter=adapter, config=config, |
| _api_version=_api_version) |
| self.__adapter = self.adapter |
| self.__config = self.config |
| |
| |
| |
| def new_transaction(self, config=None): |
| """Create a new transactional connection based on this one. |
| |
| This is different from, and usually preferred over, the |
| begin_transaction() method; new_transaction() returns a new |
| TransactionalConnection object. |
| |
| Args: |
| config: A configuration object for the new connection, merged |
| with this connection's config. |
| """ |
| config = self.__config.merge(config) |
| return TransactionalConnection(adapter=self.__adapter, config=config, |
| _api_version=self._api_version) |
| |
| |
| |
| def allocate_ids(self, key, size=None, max=None): |
| """Synchronous AllocateIds operation. |
| |
| Exactly one of size and max must be specified. |
| |
| Args: |
| key: A user-level key object. |
| size: Optional number of IDs to allocate. |
| max: Optional maximum ID to allocate. |
| |
| Returns: |
| A pair (start, end) giving the (inclusive) range of IDs allocation. |
| """ |
| return self.async_allocate_ids(None, key, size, max).get_result() |
| |
| def async_allocate_ids(self, config, key, size=None, max=None, |
| extra_hook=None): |
| """Asynchronous AllocateIds operation. |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| key: A user-level key object. |
| size: Optional number of IDs to allocate. |
| max: Optional maximum ID to allocate. |
| extra_hook: Optional function to be called on the result once the |
| RPC has completed. |
| |
| Returns: |
| A MultiRpc object. |
| """ |
| if size is not None: |
| if max is not None: |
| raise datastore_errors.BadArgumentError( |
| 'Cannot allocate ids using both size and max') |
| if not isinstance(size, (int, long)): |
| raise datastore_errors.BadArgumentError('Invalid size (%r)' % (size,)) |
| if size > _MAX_ID_BATCH_SIZE: |
| raise datastore_errors.BadArgumentError( |
| 'Cannot allocate more than %s ids at a time; received %s' |
| % (_MAX_ID_BATCH_SIZE, size)) |
| if size <= 0: |
| raise datastore_errors.BadArgumentError( |
| 'Cannot allocate less than 1 id; received %s' % size) |
| if max is not None: |
| if not isinstance(max, (int, long)): |
| raise datastore_errors.BadArgumentError('Invalid max (%r)' % (max,)) |
| if max < 0: |
| raise datastore_errors.BadArgumentError( |
| 'Cannot allocate a range with a max less than 0 id; received %s' % |
| size) |
| req = datastore_pb.AllocateIdsRequest() |
| req.mutable_model_key().CopyFrom(self.__adapter.key_to_pb(key)) |
| if size is not None: |
| req.set_size(size) |
| if max is not None: |
| req.set_max(max) |
| resp = datastore_pb.AllocateIdsResponse() |
| rpc = self._make_rpc_call(config, 'AllocateIds', req, resp, |
| get_result_hook=self.__allocate_ids_hook, |
| user_data=extra_hook, |
| service_name=_DATASTORE_V3) |
| return rpc |
| |
| def __allocate_ids_hook(self, rpc): |
| """Internal method used as get_result_hook for AllocateIds.""" |
| self.check_rpc_success(rpc) |
| pair = rpc.response.start(), rpc.response.end() |
| if rpc.user_data is not None: |
| pair = rpc.user_data(pair) |
| return pair |
| |
| |
| |
| def _reserve_keys(self, keys): |
| """Synchronous AllocateIds operation to reserve the given keys. |
| |
| Sends one or more v4 AllocateIds rpcs with keys to reserve. |
| Reserved keys must be complete and must have valid ids. |
| |
| Args: |
| keys: Iterable of user-level keys. |
| """ |
| self._async_reserve_keys(None, keys).get_result() |
| |
| def _async_reserve_keys(self, config, keys, extra_hook=None): |
| """Asynchronous AllocateIds operation to reserve the given keys. |
| |
| Sends one or more v4 AllocateIds rpcs with keys to reserve. |
| Reserved keys must be complete and must have valid ids. |
| |
| Args: |
| config: A Configuration object or None to use Connection default. |
| keys: Iterable of user-level keys. |
| extra_hook: Optional function to be called on rpc result. |
| |
| Returns: |
| None, or the result of user-supplied extra_hook. |
| """ |
| def to_id_key(key): |
| if key.path().element_size() == 1: |
| return 'root_idkey' |
| else: |
| return self._extract_entity_group(key) |
| |
| keys_by_idkey = self._map_and_group(keys, self.__adapter.key_to_pb, |
| to_id_key) |
| max_count = (Configuration.max_allocate_ids_keys(config, self.__config) or |
| self.MAX_ALLOCATE_IDS_KEYS) |
| |
| rpcs = [] |
| pbsgen = self._generate_pb_lists(keys_by_idkey, 0, max_count, None, config) |
| for pbs, _ in pbsgen: |
| req = datastore_v4_pb.AllocateIdsRequest() |
| for key in pbs: |
| datastore_pbs.get_entity_converter().v3_to_v4_key(key, |
| req.add_reserve()) |
| resp = datastore_v4_pb.AllocateIdsResponse() |
| rpcs.append(self._make_rpc_call(config, 'AllocateIds', req, resp, |
| get_result_hook=self.__reserve_keys_hook, |
| user_data=extra_hook, |
| service_name=_DATASTORE_V4)) |
| return MultiRpc(rpcs) |
| |
| def __reserve_keys_hook(self, rpc): |
| """Internal get_result_hook for _reserve_keys.""" |
| self.check_rpc_success(rpc) |
| if rpc.user_data is not None: |
| return rpc.user_data(rpc.response) |
| |
| |
| class TransactionOptions(Configuration): |
| """An immutable class that contains options for a transaction.""" |
| |
| NESTED = 1 |
| """Create a nested transaction under an existing one.""" |
| |
| MANDATORY = 2 |
| """Always propagate an existing transaction, throw an exception if there is |
| no existing transaction.""" |
| |
| ALLOWED = 3 |
| """If there is an existing transaction propagate it.""" |
| |
| INDEPENDENT = 4 |
| """Always use a new transaction, pausing any existing transactions.""" |
| |
| _PROPAGATION = frozenset((NESTED, MANDATORY, ALLOWED, INDEPENDENT)) |
| |
| @ConfigOption |
| def propagation(value): |
| """How existing transactions should be handled. |
| |
| One of NESTED, MANDATORY, ALLOWED, INDEPENDENT. The interpertation of |
| these types is up to higher level run-in-transaction implementations. |
| |
| WARNING: Using anything other than NESTED for the propagation flag |
| can have strange consequences. When using ALLOWED or MANDATORY, if |
| an exception is raised, the transaction is likely not safe to |
| commit. When using INDEPENDENT it is not generally safe to return |
| values read to the caller (as they were not read in the caller's |
| transaction). |
| |
| Raises: datastore_errors.BadArgumentError if value is not reconized. |
| """ |
| if value not in TransactionOptions._PROPAGATION: |
| raise datastore_errors.BadArgumentError('Unknown propagation value (%r)' % |
| (value,)) |
| return value |
| |
| @ConfigOption |
| def xg(value): |
| """Whether to allow cross-group transactions. |
| |
| Raises: datastore_errors.BadArgumentError if value is not a bool. |
| """ |
| if not isinstance(value, bool): |
| raise datastore_errors.BadArgumentError( |
| 'xg argument should be bool (%r)' % (value,)) |
| return value |
| |
| @ConfigOption |
| def retries(value): |
| """How many retries to attempt on the transaction. |
| |
| The exact retry logic is implemented in higher level run-in-transaction |
| implementations. |
| |
| Raises: datastore_errors.BadArgumentError if value is not an integer or |
| is not greater than zero. |
| """ |
| datastore_types.ValidateInteger(value, |
| 'retries', |
| datastore_errors.BadArgumentError, |
| zero_ok=True) |
| return value |
| |
| @ConfigOption |
| def app(value): |
| """The application in which to perform the transaction. |
| |
| Raises: datastore_errors.BadArgumentError if value is not a string |
| or is the empty string. |
| """ |
| datastore_types.ValidateString(value, |
| 'app', |
| datastore_errors.BadArgumentError) |
| return value |
| |
| |
| class TransactionalConnection(BaseConnection): |
| """A connection specific to one transaction. |
| |
| It is possible to pass the transaction and entity group to the |
| constructor, but typically the transaction is lazily created by |
| _get_transaction() when the first operation is started. |
| """ |
| |
| @_positional(1) |
| def __init__(self, |
| adapter=None, config=None, transaction=None, entity_group=None, |
| _api_version=_DATASTORE_V3): |
| """Constructor. |
| |
| All arguments should be specified as keyword arguments. |
| |
| Args: |
| adapter: Optional AbstractAdapter subclass instance; |
| default IdentityAdapter. |
| config: Optional Configuration object. |
| transaction: Optional datastore_db.Transaction object. |
| entity_group: Deprecated, do not use. |
| """ |
| super(TransactionalConnection, self).__init__(adapter=adapter, |
| config=config, |
| _api_version=_api_version) |
| self.__adapter = self.adapter |
| self.__config = self.config |
| if transaction is None: |
| app = TransactionOptions.app(self.config) |
| app = datastore_types.ResolveAppId(TransactionOptions.app(self.config)) |
| self.__transaction_rpc = self.async_begin_transaction(None, app) |
| else: |
| if self._api_version == _DATASTORE_V4: |
| txn_class = str |
| else: |
| txn_class = datastore_pb.Transaction |
| if not isinstance(transaction, txn_class): |
| raise datastore_errors.BadArgumentError( |
| 'Invalid transaction (%r)' % (transaction,)) |
| self.__transaction = transaction |
| self.__transaction_rpc = None |
| self.__finished = False |
| |
| |
| self.__pending_v4_upserts = {} |
| self.__pending_v4_deletes = {} |
| |
| @property |
| def finished(self): |
| return self.__finished |
| |
| @property |
| def transaction(self): |
| if self.__transaction_rpc is not None: |
| self.__transaction = self.__transaction_rpc.get_result() |
| self.__transaction_rpc = None |
| return self.__transaction |
| |
| def _set_request_transaction(self, request): |
| """Set the current transaction on a request. |
| |
| This calls _get_transaction() (see below). The transaction object |
| returned is both set as the transaction field on the request |
| object and returned. |
| |
| Args: |
| request: A protobuf with a transaction field. |
| |
| Returns: |
| An object representing a transaction or None. |
| """ |
| if self.__finished: |
| raise datastore_errors.BadRequestError( |
| 'Cannot start a new operation in a finished transaction.') |
| transaction = self.transaction |
| if self._api_version == _DATASTORE_V4: |
| request.mutable_read_options().set_transaction(transaction) |
| |
| request.mutable_read_options().clear_read_consistency() |
| else: |
| request.mutable_transaction().CopyFrom(transaction) |
| return transaction |
| |
| def _end_transaction(self): |
| """Finish the current transaction. |
| |
| This blocks waiting for all pending RPCs to complete, and then |
| marks the connection as finished. After that no more operations |
| can be started using this connection. |
| |
| Returns: |
| An object representing a transaction or None. |
| |
| Raises: |
| datastore_errors.BadRequestError if the transaction is already |
| finished. |
| """ |
| if self.__finished: |
| raise datastore_errors.BadRequestError( |
| 'The transaction is already finished.') |
| |
| |
| self.wait_for_all_pending_rpcs() |
| assert not self.get_pending_rpcs() |
| transaction = self.transaction |
| self.__finished = True |
| self.__transaction = None |
| return transaction |
| |
| |
| |
| def async_put(self, config, entities, extra_hook=None): |
| """Transactional asynchronous Put operation. |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| entities: An iterable of user-level entity objects. |
| extra_hook: Optional function to be called on the result once the |
| RPC has completed. |
| |
| Returns: |
| A MultiRpc object. |
| |
| NOTE: If any of the entities has an incomplete key, this will |
| *not* patch up those entities with the complete key. |
| """ |
| if self._api_version != _DATASTORE_V4: |
| |
| return super(TransactionalConnection, self).async_put( |
| config, entities, extra_hook) |
| |
| v4_entities = [self.adapter.entity_to_pb_v4(entity) |
| for entity in entities] |
| |
| |
| v4_req = datastore_v4_pb.AllocateIdsRequest() |
| for v4_entity in v4_entities: |
| if not datastore_pbs.is_complete_v4_key(v4_entity.key()): |
| v4_req.allocate_list().append(v4_entity.key()) |
| |
| user_data = v4_entities, extra_hook |
| |
| if not v4_req.allocate_list(): |
| |
| return _StubRpc(self.__v4_build_put_result([], user_data)) |
| |
| return self._make_rpc_call(config, 'AllocateIds', v4_req, |
| datastore_v4_pb.AllocateIdsResponse(), |
| get_result_hook=self.__v4_put_allocate_ids_hook, |
| user_data=user_data, |
| service_name=_DATASTORE_V4) |
| |
| def __v4_put_allocate_ids_hook(self, rpc): |
| """Internal method used as get_result_hook for AllocateIds call.""" |
| self.check_rpc_success(rpc) |
| v4_resp = rpc.response |
| return self.__v4_build_put_result(list(v4_resp.allocated_list()), |
| rpc.user_data) |
| |
| def __v4_build_put_result(self, v4_allocated_keys, user_data): |
| """Internal method that builds the result of a put operation. |
| |
| Converts the results from a v4 AllocateIds operation to a list of user-level |
| key objects. |
| |
| Args: |
| v4_allocated_keys: a list of datastore_v4_pb.Keys that have been allocated |
| user_data: a tuple consisting of: |
| - a list of datastore_v4_pb.Entity objects |
| - an optional extra_hook |
| """ |
| v4_entities, extra_hook = user_data |
| keys = [] |
| idx = 0 |
| for v4_entity in v4_entities: |
| |
| |
| |
| |
| |
| |
| v4_entity = copy.deepcopy(v4_entity) |
| if not datastore_pbs.is_complete_v4_key(v4_entity.key()): |
| v4_entity.key().CopyFrom(v4_allocated_keys[idx]) |
| idx += 1 |
| hashable_key = datastore_types.ReferenceToKeyValue(v4_entity.key()) |
| |
| self.__pending_v4_deletes.pop(hashable_key, None) |
| |
| |
| self.__pending_v4_upserts[hashable_key] = v4_entity |
| keys.append(self.adapter.pb_v4_to_key(copy.deepcopy(v4_entity.key()))) |
| |
| |
| if extra_hook: |
| keys = extra_hook(keys) |
| return keys |
| |
| |
| |
| def async_delete(self, config, keys, extra_hook=None): |
| """Transactional asynchronous Delete operation. |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| keys: An iterable of user-level key objects. |
| extra_hook: Optional function to be called once the RPC has completed. |
| |
| Returns: |
| A MultiRpc object. |
| """ |
| if self._api_version != _DATASTORE_V4: |
| |
| return super(TransactionalConnection, self).async_delete(config, |
| keys, |
| extra_hook) |
| |
| v4_keys = [self.__adapter.key_to_pb_v4(key) for key in keys] |
| |
| for key in v4_keys: |
| hashable_key = datastore_types.ReferenceToKeyValue(key) |
| |
| self.__pending_v4_upserts.pop(hashable_key, None) |
| |
| |
| self.__pending_v4_deletes[hashable_key] = key |
| |
| |
| return _StubRpc(self.__v4_delete_hook(extra_hook)) |
| |
| def __v4_delete_hook(self, extra_hook): |
| if extra_hook: |
| extra_hook(None) |
| |
| |
| |
| def commit(self): |
| """Synchronous Commit operation. |
| |
| Returns: |
| True if the transaction was successfully committed. False if |
| the backend reported a concurrent transaction error. |
| """ |
| |
| |
| rpc = self._create_rpc(service_name=self._api_version) |
| rpc = self.async_commit(rpc) |
| if rpc is None: |
| return True |
| return rpc.get_result() |
| |
| def async_commit(self, config): |
| """Asynchronous Commit operation. |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| |
| Returns: |
| A MultiRpc object. |
| """ |
| transaction = self._end_transaction() |
| if transaction is None: |
| return None |
| |
| if self._api_version == _DATASTORE_V4: |
| req = datastore_v4_pb.CommitRequest() |
| req.set_transaction(transaction) |
| if Configuration.force_writes(config, self.__config): |
| self.__force(req) |
| |
| |
| deprecated_mutation = req.mutable_deprecated_mutation() |
| deprecated_mutation.upsert_list().extend( |
| self.__pending_v4_upserts.itervalues()) |
| deprecated_mutation.delete_list().extend( |
| self.__pending_v4_deletes.itervalues()) |
| |
| |
| self.__pending_v4_upserts.clear() |
| self.__pending_v4_deletes.clear() |
| |
| resp = datastore_v4_pb.CommitResponse() |
| else: |
| req = transaction |
| resp = datastore_pb.CommitResponse() |
| |
| return self._make_rpc_call(config, 'Commit', req, resp, |
| get_result_hook=self.__commit_hook, |
| service_name=self._api_version) |
| |
| def __commit_hook(self, rpc): |
| """Internal method used as get_result_hook for Commit.""" |
| try: |
| rpc.check_success() |
| except apiproxy_errors.ApplicationError, err: |
| if err.application_error == datastore_pb.Error.CONCURRENT_TRANSACTION: |
| return False |
| else: |
| raise _ToDatastoreError(err) |
| else: |
| return True |
| |
| |
| |
| def rollback(self): |
| """Synchronous Rollback operation.""" |
| rpc = self.async_rollback(None) |
| if rpc is None: |
| return None |
| return rpc.get_result() |
| |
| def async_rollback(self, config): |
| """Asynchronous Rollback operation. |
| |
| Args: |
| config: A Configuration object or None. Defaults are taken from |
| the connection's default configuration. |
| |
| Returns: |
| A MultiRpc object. |
| """ |
| transaction = self._end_transaction() |
| if transaction is None: |
| return None |
| |
| if self._api_version == _DATASTORE_V4: |
| req = datastore_v4_pb.RollbackRequest() |
| req.set_transaction(transaction) |
| resp = datastore_v4_pb.RollbackResponse() |
| else: |
| req = transaction |
| resp = api_base_pb.VoidProto() |
| |
| return self._make_rpc_call(config, 'Rollback', req, resp, |
| get_result_hook=self.__rollback_hook, |
| service_name=self._api_version) |
| |
| def __rollback_hook(self, rpc): |
| """Internal method used as get_result_hook for Rollback.""" |
| self.check_rpc_success(rpc) |
| |
| |
| |
| |
| |
| def _ToDatastoreError(err): |
| """Converts an apiproxy.ApplicationError to an error in datastore_errors. |
| |
| Args: |
| err: An apiproxy.ApplicationError object. |
| |
| Returns: |
| An instance of a subclass of datastore_errors.Error. |
| """ |
| return _DatastoreExceptionFromErrorCodeAndDetail(err.application_error, |
| err.error_detail) |
| |
| |
| def _DatastoreExceptionFromErrorCodeAndDetail(error, detail): |
| """Converts a datastore_pb.Error into a datastore_errors.Error. |
| |
| Args: |
| error: A member of the datastore_pb.Error enumeration. |
| detail: A string providing extra details about the error. |
| |
| Returns: |
| An instance of a subclass of datastore_errors.Error. |
| """ |
| exception_class = { |
| datastore_pb.Error.BAD_REQUEST: datastore_errors.BadRequestError, |
| datastore_pb.Error.CONCURRENT_TRANSACTION: |
| datastore_errors.TransactionFailedError, |
| datastore_pb.Error.INTERNAL_ERROR: datastore_errors.InternalError, |
| datastore_pb.Error.NEED_INDEX: datastore_errors.NeedIndexError, |
| datastore_pb.Error.TIMEOUT: datastore_errors.Timeout, |
| datastore_pb.Error.BIGTABLE_ERROR: datastore_errors.Timeout, |
| datastore_pb.Error.COMMITTED_BUT_STILL_APPLYING: |
| datastore_errors.CommittedButStillApplying, |
| datastore_pb.Error.CAPABILITY_DISABLED: |
| apiproxy_errors.CapabilityDisabledError, |
| }.get(error, datastore_errors.Error) |
| |
| if detail is None: |
| return exception_class() |
| else: |
| return exception_class(detail) |