| #!/usr/bin/env python |
| # |
| # Copyright 2007 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| |
| |
| """Utility functions shared between the file and sqlite datastore stubs. |
| |
| This module is internal and should not be used by client applications. |
| """ |
| |
| |
| |
| |
| |
| |
| |
| |
| try: |
| import hashlib |
| _MD5_FUNC = hashlib.md5 |
| except ImportError: |
| import md5 |
| _MD5_FUNC = md5.new |
| |
| import atexit |
| import collections |
| import itertools |
| import logging |
| import os |
| import random |
| import struct |
| import threading |
| import time |
| import weakref |
| |
| from google.net.proto import ProtocolBuffer |
| from google.appengine.datastore import entity_pb |
| |
| from google.appengine.api import api_base_pb |
| from google.appengine.api import apiproxy_stub_map |
| from google.appengine.api import datastore_admin |
| from google.appengine.api import datastore_errors |
| from google.appengine.api import datastore_types |
| from google.appengine.api.taskqueue import taskqueue_service_pb |
| from google.appengine.datastore import datastore_index |
| from google.appengine.datastore import datastore_pb |
| from google.appengine.datastore import datastore_pbs |
| from google.appengine.datastore import datastore_query |
| from google.appengine.datastore import datastore_stub_index |
| from google.appengine.datastore import datastore_v4_pb |
| from google.appengine.runtime import apiproxy_errors |
| |
| |
| |
| |
| _MAXIMUM_RESULTS = 300 |
| |
| |
| |
| |
| |
| _MAXIMUM_QUERY_RESULT_BYTES = 2000000 |
| |
| |
| |
| |
| |
| _MAX_QUERY_OFFSET = 1000 |
| |
| |
| |
| _PROPERTY_TYPE_NAMES = { |
| 0: 'NULL', |
| entity_pb.PropertyValue.kint64Value: 'INT64', |
| entity_pb.PropertyValue.kbooleanValue: 'BOOLEAN', |
| entity_pb.PropertyValue.kstringValue: 'STRING', |
| entity_pb.PropertyValue.kdoubleValue: 'DOUBLE', |
| entity_pb.PropertyValue.kPointValueGroup: 'POINT', |
| entity_pb.PropertyValue.kUserValueGroup: 'USER', |
| entity_pb.PropertyValue.kReferenceValueGroup: 'REFERENCE' |
| } |
| |
| |
| |
| _SCATTER_PROPORTION = 32768 |
| |
| |
| |
| |
| _MAX_EG_PER_TXN = 5 |
| |
| |
| |
| |
| _BLOB_MEANINGS = frozenset((entity_pb.Property.BLOB, |
| entity_pb.Property.ENTITY_PROTO, |
| entity_pb.Property.TEXT)) |
| |
| |
| |
| |
| |
| |
| |
| _RETRIES = 3 |
| |
| |
| |
| _INITIAL_RETRY_DELAY_MS = 100 |
| |
| |
| |
| _RETRY_DELAY_MULTIPLIER = 2 |
| |
| |
| |
| _MAX_RETRY_DELAY_MS = 120000 |
| |
| |
| |
| |
| SEQUENTIAL = 'sequential' |
| SCATTERED = 'scattered' |
| |
| |
| |
| |
| |
| _MAX_SEQUENTIAL_BIT = 52 |
| |
| |
| |
| |
| _MAX_SEQUENTIAL_COUNTER = (1 << _MAX_SEQUENTIAL_BIT) - 1 |
| |
| |
| |
| _MAX_SEQUENTIAL_ID = _MAX_SEQUENTIAL_COUNTER |
| |
| |
| |
| |
| _MAX_SCATTERED_COUNTER = (1 << (_MAX_SEQUENTIAL_BIT - 1)) - 1 |
| |
| |
| |
| |
| |
| _MAX_SCATTERED_ID = _MAX_SEQUENTIAL_ID + 1 + _MAX_SCATTERED_COUNTER |
| |
| |
| |
| _SCATTER_SHIFT = 64 - _MAX_SEQUENTIAL_BIT + 1 |
| |
| |
| def _GetScatterProperty(entity_proto): |
| """Gets the scatter property for an object. |
| |
| For ease of implementation, this is not synchronized with the actual |
| value on the App Engine server, but should work equally well. |
| |
| Note: This property may change, either here or in production. No client |
| other than the mapper framework should rely on it directly. |
| |
| Returns: |
| The PropertyValue of the scatter property or None if this entity should not |
| have a scatter property. |
| """ |
| hash_obj = _MD5_FUNC() |
| for element in entity_proto.key().path().element_list(): |
| if element.has_name(): |
| hash_obj.update(element.name()) |
| elif element.has_id(): |
| hash_obj.update(str(element.id())) |
| hash_bytes = hash_obj.digest()[0:2] |
| (hash_int,) = struct.unpack('H', hash_bytes) |
| |
| if hash_int >= _SCATTER_PROPORTION: |
| return None |
| |
| scatter_property = entity_pb.Property() |
| scatter_property.set_name(datastore_types.SCATTER_SPECIAL_PROPERTY) |
| scatter_property.set_meaning(entity_pb.Property.BYTESTRING) |
| scatter_property.set_multiple(False) |
| property_value = scatter_property.mutable_value() |
| property_value.set_stringvalue(hash_bytes) |
| return scatter_property |
| |
| |
| |
| |
| |
| _SPECIAL_PROPERTY_MAP = { |
| datastore_types.SCATTER_SPECIAL_PROPERTY: (False, True, _GetScatterProperty) |
| } |
| |
| |
| def GetInvisibleSpecialPropertyNames(): |
| """Gets the names of all non user-visible special properties.""" |
| invisible_names = [] |
| for name, value in _SPECIAL_PROPERTY_MAP.items(): |
| is_visible, _, _ = value |
| if not is_visible: |
| invisible_names.append(name) |
| return invisible_names |
| |
| |
| def _PrepareSpecialProperties(entity_proto, is_load): |
| """Computes special properties for loading or storing. |
| Strips other special properties.""" |
| for i in xrange(entity_proto.property_size() - 1, -1, -1): |
| if _SPECIAL_PROPERTY_MAP.has_key(entity_proto.property(i).name()): |
| del entity_proto.property_list()[i] |
| |
| for is_visible, is_stored, property_func in _SPECIAL_PROPERTY_MAP.values(): |
| if is_load: |
| should_process = is_visible |
| else: |
| should_process = is_stored |
| |
| if should_process: |
| special_property = property_func(entity_proto) |
| if special_property: |
| entity_proto.property_list().append(special_property) |
| |
| |
| def _GetGroupByKey(entity, property_names): |
| """Computes a key value that uniquely identifies the 'group' of an entity. |
| |
| Args: |
| entity: The entity_pb.EntityProto for which to create the group key. |
| property_names: The names of the properties in the group by clause. |
| |
| Returns: |
| A hashable value that uniquely identifies the entity's 'group'. |
| """ |
| return frozenset((prop.name(), prop.value().SerializeToString()) |
| for prop in entity.property_list() |
| if prop.name() in property_names) |
| |
| |
| def PrepareSpecialPropertiesForStore(entity_proto): |
| """Computes special properties for storing. |
| Strips other special properties.""" |
| _PrepareSpecialProperties(entity_proto, False) |
| |
| |
| def LoadEntity(entity, keys_only=False, property_names=None): |
| """Prepares an entity to be returned to the user. |
| |
| Args: |
| entity: a entity_pb.EntityProto or None |
| keys_only: if a keys only result should be produced |
| property_names: if not None or empty, cause a projected entity |
| to be produced with the given properties. |
| |
| Returns: |
| A user friendly copy of entity or None. |
| """ |
| if entity: |
| clone = entity_pb.EntityProto() |
| if property_names: |
| |
| clone.mutable_key().CopyFrom(entity.key()) |
| clone.mutable_entity_group() |
| seen = set() |
| for prop in entity.property_list(): |
| if prop.name() in property_names: |
| |
| Check(prop.name() not in seen, |
| "datastore dev stub produced bad result", |
| datastore_pb.Error.INTERNAL_ERROR) |
| seen.add(prop.name()) |
| new_prop = clone.add_property() |
| new_prop.set_name(prop.name()) |
| new_prop.set_meaning(entity_pb.Property.INDEX_VALUE) |
| new_prop.mutable_value().CopyFrom(prop.value()) |
| new_prop.set_multiple(False) |
| elif keys_only: |
| |
| clone.mutable_key().CopyFrom(entity.key()) |
| clone.mutable_entity_group() |
| else: |
| |
| clone.CopyFrom(entity) |
| PrepareSpecialPropertiesForLoad(clone) |
| return clone |
| |
| |
| def StoreEntity(entity): |
| """Prepares an entity for storing. |
| |
| Args: |
| entity: a entity_pb.EntityProto to prepare |
| |
| Returns: |
| A copy of entity that should be stored in its place. |
| """ |
| clone = entity_pb.EntityProto() |
| clone.CopyFrom(entity) |
| |
| |
| |
| PrepareSpecialPropertiesForStore(clone) |
| return clone |
| |
| |
| def PrepareSpecialPropertiesForLoad(entity_proto): |
| """Computes special properties that are user-visible. |
| Strips other special properties.""" |
| _PrepareSpecialProperties(entity_proto, True) |
| |
| |
| def Check(test, msg='', error_code=datastore_pb.Error.BAD_REQUEST): |
| """Raises an apiproxy_errors.ApplicationError if the condition is false. |
| |
| Args: |
| test: A condition to test. |
| msg: A string to return with the error. |
| error_code: One of datastore_pb.Error to use as an error code. |
| |
| Raises: |
| apiproxy_errors.ApplicationError: If test is false. |
| """ |
| if not test: |
| raise apiproxy_errors.ApplicationError(error_code, msg) |
| |
| |
| def CheckValidUTF8(string, desc): |
| """Check that the given string is valid UTF-8. |
| |
| Args: |
| string: the string to validate. |
| desc: a description of the string being validated. |
| |
| Raises: |
| apiproxy_errors.ApplicationError: if the string is not valid UTF-8. |
| """ |
| try: |
| string.decode('utf-8') |
| except UnicodeDecodeError: |
| Check(False, '%s is not valid UTF-8.' % desc) |
| |
| |
| def CheckAppId(request_trusted, request_app_id, app_id): |
| """Check that this is the stub for app_id. |
| |
| Args: |
| request_trusted: If the request is trusted. |
| request_app_id: The application ID of the app making the request. |
| app_id: An application ID. |
| |
| Raises: |
| apiproxy_errors.ApplicationError: if this is not the stub for app_id. |
| """ |
| |
| assert app_id |
| CheckValidUTF8(app_id, "app id"); |
| Check(request_trusted or app_id == request_app_id, |
| 'app "%s" cannot access app "%s"\'s data' % (request_app_id, app_id)) |
| |
| |
| def CheckReference(request_trusted, |
| request_app_id, |
| key, |
| require_id_or_name=True): |
| """Check this key. |
| |
| Args: |
| request_trusted: If the request is trusted. |
| request_app_id: The application ID of the app making the request. |
| key: entity_pb.Reference |
| require_id_or_name: Boolean indicating if we should enforce the presence of |
| an id or name in the last element of the key's path. |
| |
| Raises: |
| apiproxy_errors.ApplicationError: if the key is invalid |
| """ |
| |
| assert isinstance(key, entity_pb.Reference) |
| |
| CheckAppId(request_trusted, request_app_id, key.app()) |
| |
| Check(key.path().element_size() > 0, 'key\'s path cannot be empty') |
| |
| if require_id_or_name: |
| |
| last_element = key.path().element_list()[-1] |
| has_id_or_name = ((last_element.has_id() and last_element.id() != 0) or |
| (last_element.has_name() and last_element.name() != "")) |
| if not has_id_or_name: |
| raise datastore_errors.BadRequestError('missing key id/name') |
| |
| for elem in key.path().element_list(): |
| Check(not elem.has_id() or not elem.has_name(), |
| 'each key path element should have id or name but not both: %r' % key) |
| CheckValidUTF8(elem.type(), 'key path element type') |
| if elem.has_name(): |
| CheckValidUTF8(elem.name(), 'key path element name') |
| |
| |
| def CheckEntity(request_trusted, request_app_id, entity): |
| """Check if this entity can be stored. |
| |
| Args: |
| request_trusted: If the request is trusted. |
| request_app_id: The application ID of the app making the request. |
| entity: entity_pb.EntityProto |
| |
| Raises: |
| apiproxy_errors.ApplicationError: if the entity is invalid |
| """ |
| |
| |
| CheckReference(request_trusted, request_app_id, entity.key(), False) |
| for prop in entity.property_list(): |
| CheckProperty(request_trusted, request_app_id, prop) |
| for prop in entity.raw_property_list(): |
| CheckProperty(request_trusted, request_app_id, prop, indexed=False) |
| |
| |
| def CheckProperty(request_trusted, request_app_id, prop, indexed=True): |
| """Check if this property can be stored. |
| |
| Args: |
| request_trusted: If the request is trusted. |
| request_app_id: The application ID of the app making the request. |
| prop: entity_pb.Property |
| indexed: Whether the property is indexed. |
| |
| Raises: |
| apiproxy_errors.ApplicationError: if the property is invalid |
| """ |
| name = prop.name() |
| value = prop.value() |
| meaning = prop.meaning() |
| CheckValidUTF8(name, 'property name') |
| Check(request_trusted or |
| not datastore_types.RESERVED_PROPERTY_NAME.match(name), |
| 'cannot store entity with reserved property name \'%s\'' % name) |
| Check(prop.meaning() != entity_pb.Property.INDEX_VALUE, |
| 'Entities with incomplete properties cannot be written.') |
| is_blob = meaning in _BLOB_MEANINGS |
| if indexed: |
| Check(not is_blob, |
| 'BLOB, ENITY_PROTO or TEXT property ' + name + |
| ' must be in a raw_property field') |
| max_length = datastore_types._MAX_STRING_LENGTH |
| else: |
| if is_blob: |
| Check(value.has_stringvalue(), |
| 'BLOB / ENTITY_PROTO / TEXT raw property ' + name + |
| 'must have a string value') |
| max_length = datastore_types._MAX_RAW_PROPERTY_BYTES |
| if meaning == entity_pb.Property.ATOM_LINK: |
| max_length = datastore_types._MAX_LINK_PROPERTY_LENGTH |
| |
| CheckPropertyValue(name, value, max_length, meaning) |
| |
| |
| def CheckPropertyValue(name, value, max_length, meaning): |
| """Check if this property value can be stored. |
| |
| Args: |
| name: name of the property |
| value: entity_pb.PropertyValue |
| max_length: maximum length for string values |
| meaning: meaning of the property |
| |
| Raises: |
| apiproxy_errors.ApplicationError: if the property is invalid |
| """ |
| num_values = (value.has_int64value() + |
| value.has_stringvalue() + |
| value.has_booleanvalue() + |
| value.has_doublevalue() + |
| value.has_pointvalue() + |
| value.has_uservalue() + |
| value.has_referencevalue()) |
| Check(num_values <= 1, 'PropertyValue for ' + name + |
| ' has multiple value fields set') |
| |
| if value.has_stringvalue(): |
| |
| |
| |
| |
| |
| |
| |
| s16 = value.stringvalue().decode('utf-8', 'replace').encode('utf-16') |
| |
| Check((len(s16) - 2) / 2 <= max_length, |
| 'Property %s is too long. Maximum length is %d.' % (name, max_length)) |
| if (meaning not in _BLOB_MEANINGS and |
| meaning != entity_pb.Property.BYTESTRING): |
| CheckValidUTF8(value.stringvalue(), 'String property value') |
| |
| |
| def CheckTransaction(request_trusted, request_app_id, transaction): |
| """Check that this transaction is valid. |
| |
| Args: |
| request_trusted: If the request is trusted. |
| request_app_id: The application ID of the app making the request. |
| transaction: datastore_pb.Transaction |
| |
| Raises: |
| apiproxy_errors.ApplicationError: if the transaction is not valid. |
| """ |
| assert isinstance(transaction, datastore_pb.Transaction) |
| CheckAppId(request_trusted, request_app_id, transaction.app()) |
| |
| |
| def CheckQuery(query, filters, orders, max_query_components): |
| """Check a datastore query with normalized filters, orders. |
| |
| Raises an ApplicationError when any of the following conditions are violated: |
| - transactional queries have an ancestor |
| - queries that are not too large |
| (sum of filters, orders, ancestor <= max_query_components) |
| - ancestor (if any) app and namespace match query app and namespace |
| - kindless queries only filter on __key__ and only sort on __key__ ascending |
| - multiple inequality (<, <=, >, >=) filters all applied to the same property |
| - filters on __key__ compare to a reference in the same app and namespace as |
| the query |
| - if an inequality filter on prop X is used, the first order (if any) must |
| be on X |
| |
| Args: |
| query: query to validate |
| filters: normalized (by datastore_index.Normalize) filters from query |
| orders: normalized (by datastore_index.Normalize) orders from query |
| max_query_components: limit on query complexity |
| """ |
| Check(query.property_name_size() == 0 or not query.keys_only(), |
| 'projection and keys_only cannot both be set') |
| |
| projected_properties = set(query.property_name_list()) |
| for prop_name in query.property_name_list(): |
| Check(not datastore_types.RESERVED_PROPERTY_NAME.match(prop_name), |
| 'projections are not supported for the property: ' + prop_name) |
| Check(len(projected_properties) == len(query.property_name_list()), |
| "cannot project a property multiple times") |
| |
| key_prop_name = datastore_types.KEY_SPECIAL_PROPERTY |
| unapplied_log_timestamp_us_name = ( |
| datastore_types._UNAPPLIED_LOG_TIMESTAMP_SPECIAL_PROPERTY) |
| |
| if query.has_transaction(): |
| |
| Check(query.has_ancestor(), |
| 'Only ancestor queries are allowed inside transactions.') |
| |
| |
| num_components = len(filters) + len(orders) |
| if query.has_ancestor(): |
| num_components += 1 |
| Check(num_components <= max_query_components, |
| 'query is too large. may not have more than %s filters' |
| ' + sort orders ancestor total' % max_query_components) |
| |
| |
| if query.has_ancestor(): |
| ancestor = query.ancestor() |
| Check(query.app() == ancestor.app(), |
| 'query app is %s but ancestor app is %s' % |
| (query.app(), ancestor.app())) |
| Check(query.name_space() == ancestor.name_space(), |
| 'query namespace is %s but ancestor namespace is %s' % |
| (query.name_space(), ancestor.name_space())) |
| |
| |
| if query.group_by_property_name_size(): |
| group_by_set = set(query.group_by_property_name_list()) |
| for order in orders: |
| if not group_by_set: |
| break |
| Check(order.property() in group_by_set, |
| 'items in the group by clause must be specified first ' |
| 'in the ordering') |
| group_by_set.remove(order.property()) |
| |
| |
| |
| ineq_prop_name = None |
| for filter in filters: |
| Check(filter.property_size() == 1, |
| 'Filter has %d properties, expected 1' % filter.property_size()) |
| |
| prop = filter.property(0) |
| prop_name = prop.name().decode('utf-8') |
| |
| if prop_name == key_prop_name: |
| |
| |
| |
| Check(prop.value().has_referencevalue(), |
| '%s filter value must be a Key' % key_prop_name) |
| ref_val = prop.value().referencevalue() |
| Check(ref_val.app() == query.app(), |
| '%s filter app is %s but query app is %s' % |
| (key_prop_name, ref_val.app(), query.app())) |
| Check(ref_val.name_space() == query.name_space(), |
| '%s filter namespace is %s but query namespace is %s' % |
| (key_prop_name, ref_val.name_space(), query.name_space())) |
| |
| if filter.op() in datastore_index.EQUALITY_OPERATORS: |
| Check(prop_name not in projected_properties, |
| 'cannot use projection on a property with an equality filter') |
| if (filter.op() in datastore_index.INEQUALITY_OPERATORS and |
| prop_name != unapplied_log_timestamp_us_name): |
| if ineq_prop_name is None: |
| ineq_prop_name = prop_name |
| else: |
| Check(ineq_prop_name == prop_name, |
| 'Only one inequality filter per query is supported. ' |
| 'Encountered both %s and %s' % (ineq_prop_name, prop_name)) |
| |
| if (ineq_prop_name is not None |
| and query.group_by_property_name_size() > 0 |
| and not orders): |
| |
| Check(ineq_prop_name in group_by_set, |
| 'Inequality filter on %s must also be a group by ' |
| 'property when group by properties are set.' |
| % (ineq_prop_name)) |
| |
| if ineq_prop_name is not None and orders: |
| |
| first_order_prop = orders[0].property().decode('utf-8') |
| Check(first_order_prop == ineq_prop_name, |
| 'The first sort property must be the same as the property ' |
| 'to which the inequality filter is applied. In your query ' |
| 'the first sort property is %s but the inequality filter ' |
| 'is on %s' % (first_order_prop, ineq_prop_name)) |
| |
| if not query.has_kind(): |
| |
| for filter in filters: |
| prop_name = filter.property(0).name().decode('utf-8') |
| Check(prop_name == key_prop_name or |
| prop_name == unapplied_log_timestamp_us_name, |
| 'kind is required for non-__key__ filters') |
| for order in orders: |
| prop_name = order.property().decode('utf-8') |
| Check(prop_name == key_prop_name and |
| order.direction() is datastore_pb.Query_Order.ASCENDING, |
| 'kind is required for all orders except __key__ ascending') |
| |
| |
| class ValueRange(object): |
| """A range of values defined by its two extremes (inclusive or exclusive).""" |
| |
| def __init__(self): |
| """Constructor. |
| |
| Creates an unlimited range. |
| """ |
| self.__start = self.__end = None |
| self.__start_inclusive = self.__end_inclusive = False |
| |
| def Update(self, rel_op, limit): |
| """Filter the range by 'rel_op limit'. |
| |
| Args: |
| rel_op: relational operator from datastore_pb.Query_Filter. |
| limit: the value to limit the range by. |
| """ |
| |
| if rel_op == datastore_pb.Query_Filter.LESS_THAN: |
| if self.__end is None or limit <= self.__end: |
| self.__end = limit |
| self.__end_inclusive = False |
| elif (rel_op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL or |
| rel_op == datastore_pb.Query_Filter.EQUAL): |
| if self.__end is None or limit < self.__end: |
| self.__end = limit |
| self.__end_inclusive = True |
| |
| if rel_op == datastore_pb.Query_Filter.GREATER_THAN: |
| if self.__start is None or limit >= self.__start: |
| self.__start = limit |
| self.__start_inclusive = False |
| elif (rel_op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL or |
| rel_op == datastore_pb.Query_Filter.EQUAL): |
| if self.__start is None or limit > self.__start: |
| self.__start = limit |
| self.__start_inclusive = True |
| |
| def Contains(self, value): |
| """Check if the range contains a specific value. |
| |
| Args: |
| value: the value to check. |
| Returns: |
| True iff value is contained in this range. |
| """ |
| if self.__start is not None: |
| if self.__start_inclusive and value < self.__start: return False |
| if not self.__start_inclusive and value <= self.__start: return False |
| if self.__end is not None: |
| if self.__end_inclusive and value > self.__end: return False |
| if not self.__end_inclusive and value >= self.__end: return False |
| return True |
| |
| def Remap(self, mapper): |
| """Transforms the range extremes with a function. |
| |
| The function mapper must preserve order, i.e. |
| x rel_op y iff mapper(x) rel_op y |
| |
| Args: |
| mapper: function to apply to the range extremes. |
| """ |
| self.__start = self.__start and mapper(self.__start) |
| self.__end = self.__end and mapper(self.__end) |
| |
| def MapExtremes(self, mapper): |
| """Evaluate a function on the range extremes. |
| |
| Args: |
| mapper: function to apply to the range extremes. |
| Returns: |
| (x, y) where x = None if the range has no start, |
| mapper(start, start_inclusive, False) otherwise |
| y = None if the range has no end, |
| mapper(end, end_inclusive, True) otherwise |
| """ |
| return ( |
| self.__start and mapper(self.__start, self.__start_inclusive, False), |
| self.__end and mapper(self.__end, self.__end_inclusive, True)) |
| |
| |
| def ParseKeyFilteredQuery(filters, orders): |
| """Parse queries which only allow filters and ascending-orders on __key__. |
| |
| Raises exceptions for illegal queries. |
| Args: |
| filters: the normalized filters of a query. |
| orders: the normalized orders of a query. |
| Returns: |
| The key range (a ValueRange over datastore_types.Key) requested in the |
| query. |
| """ |
| |
| remaining_filters = [] |
| key_range = ValueRange() |
| key_prop = datastore_types.KEY_SPECIAL_PROPERTY |
| for f in filters: |
| op = f.op() |
| if not (f.property_size() == 1 and |
| f.property(0).name() == key_prop and |
| not (op == datastore_pb.Query_Filter.IN or |
| op == datastore_pb.Query_Filter.EXISTS)): |
| remaining_filters.append(f) |
| continue |
| |
| val = f.property(0).value() |
| Check(val.has_referencevalue(), '__key__ kind must be compared to a key') |
| limit = datastore_types.FromReferenceProperty(val) |
| key_range.Update(op, limit) |
| |
| |
| remaining_orders = [] |
| for o in orders: |
| if not (o.direction() == datastore_pb.Query_Order.ASCENDING and |
| o.property() == datastore_types.KEY_SPECIAL_PROPERTY): |
| remaining_orders.append(o) |
| else: |
| break |
| |
| |
| |
| Check(not remaining_filters, |
| 'Only comparison filters on ' + key_prop + ' supported') |
| Check(not remaining_orders, |
| 'Only ascending order on ' + key_prop + ' supported') |
| |
| return key_range |
| |
| |
| def ParseKindQuery(query, filters, orders): |
| """Parse __kind__ (schema) queries. |
| |
| Raises exceptions for illegal queries. |
| Args: |
| query: A Query PB. |
| filters: the normalized filters from query. |
| orders: the normalized orders from query. |
| Returns: |
| The kind range (a ValueRange over string) requested in the query. |
| """ |
| |
| Check(not query.has_ancestor(), 'ancestor queries on __kind__ not allowed') |
| |
| key_range = ParseKeyFilteredQuery(filters, orders) |
| key_range.Remap(_KindKeyToString) |
| |
| return key_range |
| |
| |
| def _KindKeyToString(key): |
| """Extract kind name from __kind__ key. |
| |
| Raises an ApplicationError if the key is not of the form '__kind__'/name. |
| |
| Args: |
| key: a key for a __kind__ instance. |
| Returns: |
| kind specified by key. |
| """ |
| key_path = key.to_path() |
| if (len(key_path) == 2 and key_path[0] == '__kind__' and |
| isinstance(key_path[1], basestring)): |
| return key_path[1] |
| Check(False, 'invalid Key for __kind__ table') |
| |
| |
| def ParseNamespaceQuery(query, filters, orders): |
| """Parse __namespace__ queries. |
| |
| Raises exceptions for illegal queries. |
| Args: |
| query: A Query PB. |
| filters: the normalized filters from query. |
| orders: the normalized orders from query. |
| Returns: |
| The kind range (a ValueRange over string) requested in the query. |
| """ |
| |
| Check(not query.has_ancestor(), |
| 'ancestor queries on __namespace__ not allowed') |
| |
| key_range = ParseKeyFilteredQuery(filters, orders) |
| key_range.Remap(_NamespaceKeyToString) |
| |
| return key_range |
| |
| |
| def _NamespaceKeyToString(key): |
| """Extract namespace name from __namespace__ key. |
| |
| Raises an ApplicationError if the key is not of the form '__namespace__'/name |
| or '__namespace__'/_EMPTY_NAMESPACE_ID. |
| |
| Args: |
| key: a key for a __namespace__ instance. |
| Returns: |
| namespace specified by key. |
| """ |
| key_path = key.to_path() |
| if len(key_path) == 2 and key_path[0] == '__namespace__': |
| if key_path[1] == datastore_types._EMPTY_NAMESPACE_ID: |
| return '' |
| if isinstance(key_path[1], basestring): |
| return key_path[1] |
| Check(False, 'invalid Key for __namespace__ table') |
| |
| |
| def ParsePropertyQuery(query, filters, orders): |
| """Parse __property__ queries. |
| |
| Raises exceptions for illegal queries. |
| Args: |
| query: A Query PB. |
| filters: the normalized filters from query. |
| orders: the normalized orders from query. |
| Returns: |
| The kind range (a ValueRange over (kind, property) pairs) requested |
| in the query. |
| """ |
| |
| Check(not query.has_transaction(), |
| 'transactional queries on __property__ not allowed') |
| |
| key_range = ParseKeyFilteredQuery(filters, orders) |
| key_range.Remap(lambda x: _PropertyKeyToString(x, '')) |
| |
| if query.has_ancestor(): |
| ancestor = datastore_types.Key._FromPb(query.ancestor()) |
| ancestor_kind, ancestor_property = _PropertyKeyToString(ancestor, None) |
| |
| |
| if ancestor_property is not None: |
| key_range.Update(datastore_pb.Query_Filter.EQUAL, |
| (ancestor_kind, ancestor_property)) |
| else: |
| |
| key_range.Update(datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL, |
| (ancestor_kind, '')) |
| key_range.Update(datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL, |
| (ancestor_kind + '\0', '')) |
| query.clear_ancestor() |
| |
| return key_range |
| |
| |
| def _PropertyKeyToString(key, default_property): |
| """Extract property name from __property__ key. |
| |
| Raises an ApplicationError if the key is not of the form |
| '__kind__'/kind, '__property__'/property or '__kind__'/kind |
| |
| Args: |
| key: a key for a __property__ instance. |
| default_property: property value to return when key only has a kind. |
| Returns: |
| kind, property if key = '__kind__'/kind, '__property__'/property |
| kind, default_property if key = '__kind__'/kind |
| """ |
| key_path = key.to_path() |
| if (len(key_path) == 2 and |
| key_path[0] == '__kind__' and isinstance(key_path[1], basestring)): |
| return (key_path[1], default_property) |
| if (len(key_path) == 4 and |
| key_path[0] == '__kind__' and isinstance(key_path[1], basestring) and |
| key_path[2] == '__property__' and isinstance(key_path[3], basestring)): |
| return (key_path[1], key_path[3]) |
| |
| Check(False, 'invalid Key for __property__ table') |
| |
| |
| def SynthesizeUserId(email): |
| """Return a synthetic user ID from an email address. |
| |
| Note that this is not the same user ID found in the production system. |
| |
| Args: |
| email: An email address. |
| |
| Returns: |
| A string userid derived from the email address. |
| """ |
| |
| user_id_digest = _MD5_FUNC(email.lower()).digest() |
| user_id = '1' + ''.join(['%02d' % ord(x) for x in user_id_digest])[:20] |
| return user_id |
| |
| |
| def FillUsersInQuery(filters): |
| """Fill in a synthetic user ID for all user properties in a set of filters. |
| |
| Args: |
| filters: The normalized filters from query. |
| """ |
| for filter in filters: |
| for property in filter.property_list(): |
| FillUser(property) |
| |
| |
| def FillUser(property): |
| """Fill in a synthetic user ID for a user properties. |
| |
| Args: |
| property: A Property which may have a user value. |
| """ |
| if property.value().has_uservalue(): |
| uid = SynthesizeUserId(property.value().uservalue().email()) |
| if uid: |
| property.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(uid) |
| |
| |
| class BaseCursor(object): |
| """A base query cursor over a list of entities. |
| |
| Public properties: |
| cursor: the integer cursor. |
| app: the app for which this cursor was created. |
| keys_only: whether the query is keys_only. |
| |
| Class attributes: |
| _next_cursor: the next cursor to allocate. |
| _next_cursor_lock: protects _next_cursor. |
| """ |
| _next_cursor = 1 |
| _next_cursor_lock = threading.Lock() |
| |
| def __init__(self, query, dsquery, orders, index_list): |
| """Constructor. |
| |
| Args: |
| query: the query request proto. |
| dsquery: a datastore_query.Query over query. |
| orders: the orders of query as returned by _GuessOrders. |
| index_list: the list of indexes used by the query. |
| """ |
| |
| self.keys_only = query.keys_only() |
| self.property_names = set(query.property_name_list()) |
| self.group_by = set(query.group_by_property_name_list()) |
| self.app = query.app() |
| self.cursor = self._AcquireCursorID() |
| |
| self.__order_compare_entities = dsquery._order.cmp_for_filter( |
| dsquery._filter_predicate) |
| if self.group_by: |
| self.__cursor_properties = self.group_by |
| else: |
| self.__cursor_properties = set(order.property() for order in orders) |
| self.__cursor_properties.add('__key__') |
| self.__cursor_properties = frozenset(self.__cursor_properties) |
| |
| self.__first_sort_order = orders[0].direction() |
| self.__index_list = index_list |
| |
| def _PopulateResultMetadata(self, query_result, compile, |
| first_result, last_result): |
| query_result.set_keys_only(self.keys_only) |
| if query_result.more_results(): |
| cursor = query_result.mutable_cursor() |
| cursor.set_app(self.app) |
| cursor.set_cursor(self.cursor) |
| if compile: |
| self._EncodeCompiledCursor(last_result, |
| query_result.mutable_compiled_cursor()) |
| if first_result: |
| query_result.index_list().extend(self.__index_list) |
| |
| @classmethod |
| def _AcquireCursorID(cls): |
| """Acquires the next cursor id in a thread safe manner.""" |
| cls._next_cursor_lock.acquire() |
| try: |
| cursor_id = cls._next_cursor |
| cls._next_cursor += 1 |
| finally: |
| cls._next_cursor_lock.release() |
| return cursor_id |
| |
| def _IsBeforeCursor(self, entity, cursor): |
| """True if entity is before cursor according to the current order. |
| |
| Args: |
| entity: a entity_pb.EntityProto entity. |
| cursor: a compiled cursor as returned by _DecodeCompiledCursor. |
| """ |
| comparison_entity = entity_pb.EntityProto() |
| for prop in entity.property_list(): |
| if prop.name() in self.__cursor_properties: |
| comparison_entity.add_property().MergeFrom(prop) |
| if cursor[0].has_key(): |
| comparison_entity.mutable_key().MergeFrom(entity.key()) |
| x = self.__order_compare_entities(comparison_entity, cursor[0]) |
| if cursor[1]: |
| return x < 0 |
| else: |
| return x <= 0 |
| |
| def _DecodeCompiledCursor(self, compiled_cursor): |
| """Converts a compiled_cursor into a cursor_entity. |
| |
| Args: |
| compiled_cursor: The datastore_pb.CompiledCursor to decode. |
| |
| Returns: |
| (cursor_entity, inclusive): a entity_pb.EntityProto and if it should |
| be included in the result set. |
| """ |
| assert compiled_cursor.has_position() |
| |
| position = compiled_cursor.position() |
| |
| |
| |
| |
| remaining_properties = set(self.__cursor_properties) |
| |
| cursor_entity = entity_pb.EntityProto() |
| if position.has_key(): |
| cursor_entity.mutable_key().CopyFrom(position.key()) |
| try: |
| remaining_properties.remove('__key__') |
| except KeyError: |
| Check(False, 'Cursor does not match query: extra value __key__') |
| for indexvalue in position.indexvalue_list(): |
| property = cursor_entity.add_property() |
| property.set_name(indexvalue.property()) |
| property.mutable_value().CopyFrom(indexvalue.value()) |
| try: |
| remaining_properties.remove(indexvalue.property()) |
| except KeyError: |
| Check(False, 'Cursor does not match query: extra value %s' % |
| indexvalue.property()) |
| Check(not remaining_properties, |
| 'Cursor does not match query: missing values for %r' % |
| remaining_properties) |
| |
| |
| |
| return (cursor_entity, position.start_inclusive()) |
| |
| def _EncodeCompiledCursor(self, last_result, compiled_cursor): |
| """Converts the current state of the cursor into a compiled_cursor. |
| |
| Args: |
| last_result: the last result returned by this query. |
| compiled_cursor: an empty datstore_pb.CompiledCursor. |
| """ |
| if last_result is not None: |
| |
| |
| position = compiled_cursor.mutable_position() |
| |
| |
| if '__key__' in self.__cursor_properties: |
| position.mutable_key().MergeFrom(last_result.key()) |
| for prop in last_result.property_list(): |
| if prop.name() in self.__cursor_properties: |
| indexvalue = position.add_indexvalue() |
| indexvalue.set_property(prop.name()) |
| indexvalue.mutable_value().CopyFrom(prop.value()) |
| position.set_start_inclusive(False) |
| _SetBeforeAscending(position, self.__first_sort_order) |
| |
| |
| class ListCursor(BaseCursor): |
| """A query cursor over a list of entities. |
| |
| Public properties: |
| keys_only: whether the query is keys_only |
| """ |
| |
| def __init__(self, query, dsquery, orders, index_list, results): |
| """Constructor. |
| |
| Args: |
| query: the query request proto |
| dsquery: a datastore_query.Query over query. |
| orders: the orders of query as returned by _GuessOrders. |
| index_list: the list of indexes used by the query. |
| results: list of entity_pb.EntityProto |
| """ |
| super(ListCursor, self).__init__(query, dsquery, orders, index_list) |
| |
| |
| if self.group_by: |
| distincts = set() |
| new_results = [] |
| for result in results: |
| key_value = _GetGroupByKey(result, self.group_by) |
| if key_value not in distincts: |
| distincts.add(key_value) |
| new_results.append(result) |
| results = new_results |
| |
| if query.has_compiled_cursor() and query.compiled_cursor().has_position(): |
| start_cursor = self._DecodeCompiledCursor(query.compiled_cursor()) |
| self.__last_result = start_cursor[0] |
| start_cursor_position = self._GetCursorOffset(results, start_cursor) |
| else: |
| self.__last_result = None |
| start_cursor_position = 0 |
| |
| if query.has_end_compiled_cursor(): |
| if query.end_compiled_cursor().has_position(): |
| end_cursor = self._DecodeCompiledCursor(query.end_compiled_cursor()) |
| end_cursor_position = self._GetCursorOffset(results, end_cursor) |
| else: |
| end_cursor_position = 0 |
| else: |
| end_cursor_position = len(results) |
| |
| |
| results = results[start_cursor_position:end_cursor_position] |
| |
| |
| if query.has_limit(): |
| limit = query.limit() |
| if query.offset(): |
| limit += query.offset() |
| if limit >= 0 and limit < len(results): |
| results = results[:limit] |
| |
| self.__results = results |
| self.__offset = 0 |
| self.__count = len(self.__results) |
| |
| def _GetCursorOffset(self, results, cursor): |
| """Converts a cursor into a offset into the result set even if the |
| cursor's entity no longer exists. |
| |
| Args: |
| results: the query's results (sequence of entity_pb.EntityProto) |
| cursor: a compiled cursor as returned by _DecodeCompiledCursor |
| Returns: |
| the integer offset |
| """ |
| lo = 0 |
| hi = len(results) |
| while lo < hi: |
| mid = (lo + hi) // 2 |
| if self._IsBeforeCursor(results[mid], cursor): |
| lo = mid + 1 |
| else: |
| hi = mid |
| return lo |
| |
| def PopulateQueryResult(self, result, count, offset, |
| compile=False, first_result=False): |
| """Populates a QueryResult with this cursor and the given number of results. |
| |
| Args: |
| result: datastore_pb.QueryResult |
| count: integer of how many results to return |
| offset: integer of how many results to skip |
| compile: boolean, whether we are compiling this query |
| first_result: whether the query result is the first for this query |
| """ |
| Check(offset >= 0, 'Offset must be >= 0') |
| |
| offset = min(offset, self.__count - self.__offset) |
| limited_offset = min(offset, _MAX_QUERY_OFFSET) |
| if limited_offset: |
| self.__offset += limited_offset |
| result.set_skipped_results(limited_offset) |
| |
| if compile and result.skipped_results() > 0: |
| self._EncodeCompiledCursor(self.__results[self.__offset - 1], |
| result.mutable_skipped_results_compiled_cursor()) |
| if offset == limited_offset and count: |
| |
| if count > _MAXIMUM_RESULTS: |
| count = _MAXIMUM_RESULTS |
| results = self.__results[self.__offset:self.__offset + count] |
| count = len(results) |
| self.__offset += count |
| |
| |
| |
| |
| |
| result.result_list().extend( |
| LoadEntity(entity, self.keys_only, self.property_names) |
| for entity in results) |
| if compile: |
| for entity in results: |
| self._EncodeCompiledCursor(entity, |
| result.add_result_compiled_cursor()) |
| |
| if self.__offset: |
| |
| self.__last_result = self.__results[self.__offset - 1] |
| |
| result.set_more_results(self.__offset < self.__count) |
| self._PopulateResultMetadata(result, compile, |
| first_result, self.__last_result) |
| |
| |
| def _SynchronizeTxn(function): |
| """A decorator that locks a transaction during the function call.""" |
| |
| def sync(txn, *args, **kwargs): |
| |
| txn._lock.acquire() |
| try: |
| |
| Check(txn._state is LiveTxn.ACTIVE, 'transaction closed') |
| |
| return function(txn, *args, **kwargs) |
| finally: |
| |
| txn._lock.release() |
| return sync |
| |
| |
| def _GetEntityGroup(ref): |
| """Returns the entity group key for the given reference.""" |
| entity_group = entity_pb.Reference() |
| entity_group.CopyFrom(ref) |
| assert (entity_group.path().element_list()[0].has_id() or |
| entity_group.path().element_list()[0].has_name()) |
| del entity_group.path().element_list()[1:] |
| return entity_group |
| |
| |
| def _GetKeyKind(key): |
| """Return the kind of the given key.""" |
| return key.path().element_list()[-1].type() |
| |
| |
| def _FilterIndexesByKind(key, indexes): |
| """Return only the indexes with the specified kind.""" |
| return filter((lambda index: |
| index.definition().entity_type() == _GetKeyKind(key)), indexes) |
| |
| |
| class LiveTxn(object): |
| """An in flight transaction.""" |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| ACTIVE = 1 |
| COMMITED = 2 |
| ROLLEDBACK = 3 |
| FAILED = 4 |
| |
| _state = ACTIVE |
| _commit_time_s = None |
| |
| def __init__(self, txn_manager, app, allow_multiple_eg): |
| assert isinstance(txn_manager, BaseTransactionManager) |
| assert isinstance(app, basestring) |
| |
| self._txn_manager = txn_manager |
| self._app = app |
| self._allow_multiple_eg = allow_multiple_eg |
| |
| |
| self._entity_groups = {} |
| |
| self._lock = threading.RLock() |
| self._apply_lock = threading.Lock() |
| |
| self._actions = [] |
| self._cost = datastore_pb.Cost() |
| |
| |
| |
| |
| |
| self._kind_to_indexes = collections.defaultdict(list) |
| |
| def _GetTracker(self, reference): |
| """Gets the entity group tracker for reference. |
| |
| If this is the first time reference's entity group is seen, creates a new |
| tracker, checking that the transaction doesn't exceed the entity group |
| limit. |
| """ |
| entity_group = _GetEntityGroup(reference) |
| key = datastore_types.ReferenceToKeyValue(entity_group) |
| tracker = self._entity_groups.get(key, None) |
| if tracker is None: |
| Check(self._app == reference.app(), |
| 'Transactions cannot span applications (expected %s, got %s)' % |
| (self._app, reference.app())) |
| if self._allow_multiple_eg: |
| Check(len(self._entity_groups) < _MAX_EG_PER_TXN, |
| 'operating on too many entity groups in a single transaction.') |
| else: |
| Check(len(self._entity_groups) < 1, |
| "cross-groups transaction need to be explicitly " |
| "specified (xg=True)") |
| tracker = EntityGroupTracker(entity_group) |
| self._entity_groups[key] = tracker |
| |
| return tracker |
| |
| def _GetAllTrackers(self): |
| """Get the trackers for the transaction's entity groups. |
| |
| If no entity group has been discovered returns a 'global' entity group |
| tracker. This is possible if the txn only contains transactional tasks. |
| |
| Returns: |
| The tracker list for the entity groups used in this txn. |
| """ |
| if not self._entity_groups: |
| self._GetTracker(datastore_types.Key.from_path( |
| '__global__', 1, _app=self._app)._ToPb()) |
| return self._entity_groups.values() |
| |
| def _GrabSnapshot(self, reference): |
| """Gets snapshot for this reference, creating it if necessary. |
| |
| If no snapshot has been set for reference's entity group, a snapshot is |
| taken and stored for future reads (this also sets the read position), |
| and a CONCURRENT_TRANSACTION exception is thrown if we no longer have |
| a consistent snapshot. |
| |
| Args: |
| reference: A entity_pb.Reference from which to extract the entity group. |
| Raises: |
| apiproxy_errors.ApplicationError if the snapshot is not consistent. |
| """ |
| tracker = self._GetTracker(reference) |
| check_contention = tracker._snapshot is None |
| snapshot = tracker._GrabSnapshot(self._txn_manager) |
| if check_contention: |
| |
| |
| |
| |
| |
| candidates = [other for other in self._entity_groups.values() |
| if other._snapshot is not None and other != tracker] |
| meta_data_list = [other._meta_data for other in candidates] |
| self._txn_manager._AcquireWriteLocks(meta_data_list) |
| try: |
| for other in candidates: |
| if other._meta_data._log_pos != other._read_pos: |
| self._state = self.FAILED |
| raise apiproxy_errors.ApplicationError( |
| datastore_pb.Error.CONCURRENT_TRANSACTION, |
| 'Concurrency exception.') |
| finally: |
| self._txn_manager._ReleaseWriteLocks(meta_data_list) |
| return snapshot |
| |
| @_SynchronizeTxn |
| def Get(self, reference): |
| """Returns the entity associated with the given entity_pb.Reference or None. |
| |
| Does not see any modifications in the current txn. |
| |
| Args: |
| reference: The entity_pb.Reference of the entity to look up. |
| |
| Returns: |
| The associated entity_pb.EntityProto or None if no such entity exists. |
| """ |
| snapshot = self._GrabSnapshot(reference) |
| entity = snapshot.get(datastore_types.ReferenceToKeyValue(reference)) |
| return LoadEntity(entity) |
| |
| @_SynchronizeTxn |
| def GetQueryCursor(self, query, filters, orders, index_list): |
| """Runs the given datastore_pb.Query and returns a QueryCursor for it. |
| |
| Does not see any modifications in the current txn. |
| |
| Args: |
| query: The datastore_pb.Query to run. |
| filters: A list of filters that override the ones found on query. |
| orders: A list of orders that override the ones found on query. |
| index_list: A list of indexes used by the query. |
| |
| Returns: |
| A BaseCursor that can be used to fetch query results. |
| """ |
| Check(query.has_ancestor(), |
| 'Query must have an ancestor when performed in a transaction.') |
| snapshot = self._GrabSnapshot(query.ancestor()) |
| return _ExecuteQuery(snapshot.values(), query, filters, orders, index_list) |
| |
| @_SynchronizeTxn |
| def Put(self, entity, insert, indexes): |
| """Puts the given entity. |
| |
| Args: |
| entity: The entity_pb.EntityProto to put. |
| insert: A boolean that indicates if we should fail if the entity already |
| exists. |
| indexes: The composite indexes that apply to the entity. |
| """ |
| tracker = self._GetTracker(entity.key()) |
| key = datastore_types.ReferenceToKeyValue(entity.key()) |
| tracker._delete.pop(key, None) |
| tracker._put[key] = (entity, insert) |
| self._kind_to_indexes[_GetKeyKind(entity.key())] = indexes |
| |
| @_SynchronizeTxn |
| def Delete(self, reference, indexes): |
| """Deletes the entity associated with the given reference. |
| |
| Args: |
| reference: The entity_pb.Reference of the entity to delete. |
| indexes: The composite indexes that apply to the entity. |
| """ |
| tracker = self._GetTracker(reference) |
| key = datastore_types.ReferenceToKeyValue(reference) |
| tracker._put.pop(key, None) |
| tracker._delete[key] = reference |
| self._kind_to_indexes[_GetKeyKind(reference)] = indexes |
| |
| @_SynchronizeTxn |
| def AddActions(self, actions, max_actions=None): |
| """Adds the given actions to the current txn. |
| |
| Args: |
| actions: A list of pbs to send to taskqueue.Add when the txn is applied. |
| max_actions: A number that indicates the maximum number of actions to |
| allow on this txn. |
| """ |
| Check(not max_actions or len(self._actions) + len(actions) <= max_actions, |
| 'Too many messages, maximum allowed %s' % max_actions) |
| self._actions.extend(actions) |
| |
| def Rollback(self): |
| """Rollback the current txn.""" |
| |
| self._lock.acquire() |
| try: |
| Check(self._state is self.ACTIVE or self._state is self.FAILED, |
| 'transaction closed') |
| self._state = self.ROLLEDBACK |
| finally: |
| self._txn_manager._RemoveTxn(self) |
| |
| self._lock.release() |
| |
| @_SynchronizeTxn |
| def Commit(self): |
| """Commits the current txn. |
| |
| This function hands off the responsibility of calling _Apply to the owning |
| TransactionManager. |
| |
| Returns: |
| The cost of the transaction. |
| """ |
| try: |
| |
| trackers = self._GetAllTrackers() |
| empty = True |
| for tracker in trackers: |
| snapshot = tracker._GrabSnapshot(self._txn_manager) |
| empty = empty and not tracker._put and not tracker._delete |
| |
| |
| for entity, insert in tracker._put.itervalues(): |
| Check(not insert or self.Get(entity.key()) is None, |
| 'the id allocated for a new entity was already ' |
| 'in use, please try again') |
| |
| old_entity = None |
| key = datastore_types.ReferenceToKeyValue(entity.key()) |
| if key in snapshot: |
| old_entity = snapshot[key] |
| self._AddWriteOps(old_entity, entity) |
| |
| for reference in tracker._delete.itervalues(): |
| |
| |
| old_entity = None |
| key = datastore_types.ReferenceToKeyValue(reference) |
| if key in snapshot: |
| old_entity = snapshot[key] |
| if old_entity is not None: |
| self._AddWriteOps(None, old_entity) |
| |
| |
| if empty and not self._actions: |
| self.Rollback() |
| return datastore_pb.Cost() |
| |
| |
| meta_data_list = [tracker._meta_data for tracker in trackers] |
| self._txn_manager._AcquireWriteLocks(meta_data_list) |
| except: |
| |
| self.Rollback() |
| raise |
| |
| try: |
| |
| for tracker in trackers: |
| Check(tracker._meta_data._log_pos == tracker._read_pos, |
| 'Concurrency exception.', |
| datastore_pb.Error.CONCURRENT_TRANSACTION) |
| |
| |
| for tracker in trackers: |
| tracker._meta_data.Log(self) |
| self._state = self.COMMITED |
| self._commit_time_s = time.time() |
| except: |
| |
| self.Rollback() |
| raise |
| else: |
| |
| for action in self._actions: |
| try: |
| apiproxy_stub_map.MakeSyncCall( |
| 'taskqueue', 'Add', action, api_base_pb.VoidProto()) |
| except apiproxy_errors.ApplicationError, e: |
| logging.warning('Transactional task %s has been dropped, %s', |
| action, e) |
| self._actions = [] |
| finally: |
| self._txn_manager._RemoveTxn(self) |
| |
| self._txn_manager._ReleaseWriteLocks(meta_data_list) |
| |
| |
| self._txn_manager._consistency_policy._OnCommit(self) |
| return self._cost |
| |
| def _AddWriteOps(self, old_entity, new_entity): |
| """Adds the cost of writing the new_entity to the _cost member. |
| |
| We assume that old_entity represents the current state of the Datastore. |
| |
| Args: |
| old_entity: Entity representing the current state in the Datstore. |
| new_entity: Entity representing the desired state in the Datstore. |
| """ |
| composite_indexes = self._kind_to_indexes[_GetKeyKind(new_entity.key())] |
| entity_writes, index_writes = _CalculateWriteOps( |
| composite_indexes, old_entity, new_entity) |
| _UpdateCost(self._cost, entity_writes, index_writes) |
| |
| def _Apply(self, meta_data): |
| """Applies the current txn on the given entity group. |
| |
| This function blindly performs the operations contained in the current txn. |
| The calling function must acquire the entity group write lock and ensure |
| transactions are applied in order. |
| """ |
| |
| self._apply_lock.acquire() |
| try: |
| |
| assert self._state == self.COMMITED |
| for tracker in self._entity_groups.values(): |
| if tracker._meta_data is meta_data: |
| break |
| else: |
| assert False |
| assert tracker._read_pos != tracker.APPLIED |
| |
| |
| for entity, insert in tracker._put.itervalues(): |
| self._txn_manager._Put(entity, insert) |
| |
| |
| for key in tracker._delete.itervalues(): |
| self._txn_manager._Delete(key) |
| |
| |
| |
| tracker._read_pos = EntityGroupTracker.APPLIED |
| |
| |
| tracker._meta_data.Unlog(self) |
| finally: |
| self._apply_lock.release() |
| |
| |
| class EntityGroupTracker(object): |
| """An entity group involved a transaction.""" |
| |
| APPLIED = -2 |
| |
| |
| |
| |
| |
| _read_pos = None |
| |
| |
| _snapshot = None |
| |
| |
| _meta_data = None |
| |
| def __init__(self, entity_group): |
| self._entity_group = entity_group |
| self._put = {} |
| self._delete = {} |
| |
| def _GrabSnapshot(self, txn_manager): |
| """Snapshot this entity group, remembering the read position.""" |
| if self._snapshot is None: |
| self._meta_data, self._read_pos, self._snapshot = ( |
| txn_manager._GrabSnapshot(self._entity_group)) |
| return self._snapshot |
| |
| |
| class EntityGroupMetaData(object): |
| """The meta_data assoicated with an entity group.""" |
| |
| |
| _log_pos = -1 |
| |
| _snapshot = None |
| |
| def __init__(self, entity_group): |
| self._entity_group = entity_group |
| self._write_lock = threading.Lock() |
| self._apply_queue = [] |
| |
| def CatchUp(self): |
| """Applies all outstanding txns.""" |
| |
| assert self._write_lock.acquire(False) is False |
| |
| while self._apply_queue: |
| self._apply_queue[0]._Apply(self) |
| |
| def Log(self, txn): |
| """Add a pending transaction to this entity group. |
| |
| Requires that the caller hold the meta data lock. |
| This also increments the current log position and clears the snapshot cache. |
| """ |
| |
| assert self._write_lock.acquire(False) is False |
| self._apply_queue.append(txn) |
| self._log_pos += 1 |
| self._snapshot = None |
| |
| def Unlog(self, txn): |
| """Remove the first pending transaction from the apply queue. |
| |
| Requires that the caller hold the meta data lock. |
| This checks that the first pending transaction is indeed txn. |
| """ |
| |
| assert self._write_lock.acquire(False) is False |
| |
| Check(self._apply_queue and self._apply_queue[0] is txn, |
| 'Transaction is not appliable', |
| datastore_pb.Error.INTERNAL_ERROR) |
| self._apply_queue.pop(0) |
| |
| |
| class BaseConsistencyPolicy(object): |
| """A base class for a consistency policy to be used with a transaction manger. |
| """ |
| |
| |
| |
| def _OnCommit(self, txn): |
| """Called after a LiveTxn has been commited. |
| |
| This function can decide whether to apply the txn right away. |
| |
| Args: |
| txn: A LiveTxn that has been commited |
| """ |
| raise NotImplementedError |
| |
| def _OnGroom(self, meta_data_list): |
| """Called once for every global query. |
| |
| This function must aqcuire the write lock for any meta data before applying |
| any outstanding txns. |
| |
| Args: |
| meta_data_list: A list of EntityGroupMetaData objects. |
| """ |
| raise NotImplementedError |
| |
| |
| class MasterSlaveConsistencyPolicy(BaseConsistencyPolicy): |
| """Enforces the Master / Slave consistency policy. |
| |
| Applies all txn on commit. |
| """ |
| |
| def _OnCommit(self, txn): |
| |
| for tracker in txn._GetAllTrackers(): |
| tracker._meta_data._write_lock.acquire() |
| try: |
| tracker._meta_data.CatchUp() |
| finally: |
| tracker._meta_data._write_lock.release() |
| |
| |
| |
| |
| txn._txn_manager.Write() |
| |
| def _OnGroom(self, meta_data_list): |
| |
| |
| pass |
| |
| |
| class BaseHighReplicationConsistencyPolicy(BaseConsistencyPolicy): |
| """A base class for High Replication Datastore consistency policies. |
| |
| All txn are applied asynchronously. |
| """ |
| |
| def _OnCommit(self, txn): |
| pass |
| |
| def _OnGroom(self, meta_data_list): |
| |
| |
| for meta_data in meta_data_list: |
| if not meta_data._apply_queue: |
| continue |
| |
| |
| meta_data._write_lock.acquire() |
| try: |
| while meta_data._apply_queue: |
| txn = meta_data._apply_queue[0] |
| if self._ShouldApply(txn, meta_data): |
| txn._Apply(meta_data) |
| else: |
| break |
| finally: |
| meta_data._write_lock.release() |
| |
| def _ShouldApply(self, txn, meta_data): |
| """Determins if the given transaction should be applied.""" |
| raise NotImplementedError |
| |
| |
| class TimeBasedHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy): |
| """A High Replication Datastore consiseny policy based on elapsed time. |
| |
| This class tries to simulate performance seen in the high replication |
| datastore using estimated probabilities of a transaction commiting after a |
| given amount of time. |
| """ |
| |
| _classification_map = [(.98, 100), |
| (.99, 300), |
| (.995, 2000), |
| (1, 240000) |
| ] |
| |
| def SetClassificationMap(self, classification_map): |
| """Set the probability a txn will be applied after a given amount of time. |
| |
| Args: |
| classification_map: A list of tuples containing (float between 0 and 1, |
| number of miliseconds) that define the probability of a transaction |
| applying after a given amount of time. |
| """ |
| for prob, delay in classification_map: |
| if prob < 0 or prob > 1 or delay <= 0: |
| raise TypeError( |
| 'classification_map must be a list of (probability, delay) tuples, ' |
| 'found %r' % (classification_map,)) |
| |
| self._classification_map = sorted(classification_map) |
| |
| def _ShouldApplyImpl(self, elapsed_ms, classification): |
| for rate, ms in self._classification_map: |
| if classification <= rate: |
| break |
| return elapsed_ms >= ms |
| |
| def _Classify(self, txn, meta_data): |
| return random.Random(id(txn) ^ id(meta_data)).random() |
| |
| def _ShouldApply(self, txn, meta_data): |
| elapsed_ms = (time.time() - txn._commit_time_s) * 1000 |
| classification = self._Classify(txn, meta_data) |
| return self._ShouldApplyImpl(elapsed_ms, classification) |
| |
| |
| class PseudoRandomHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy): |
| """A policy that always gives the same sequence of consistency decisions.""" |
| |
| def __init__(self, probability=.5, seed=0): |
| """Constructor. |
| |
| Args: |
| probability: A number between 0 and 1 that is the likelihood of a |
| transaction applying before a global query is executed. |
| seed: A hashable object to use as a seed. Use None to use the current |
| timestamp. |
| """ |
| self.SetProbability(probability) |
| self.SetSeed(seed) |
| |
| def SetProbability(self, probability): |
| """Change the probability of a transaction applying. |
| |
| Args: |
| probability: A number between 0 and 1 that determins the probability of a |
| transaction applying before a global query is run. |
| """ |
| if probability < 0 or probability > 1: |
| raise TypeError('probability must be a number between 0 and 1, found %r' % |
| probability) |
| self._probability = probability |
| |
| def SetSeed(self, seed): |
| """Reset the seed.""" |
| self._random = random.Random(seed) |
| |
| def _ShouldApply(self, txn, meta_data): |
| return self._random.random() < self._probability |
| |
| |
| class BaseTransactionManager(object): |
| """A class that manages the state of transactions. |
| |
| This includes creating consistent snap shots for transactions. |
| """ |
| |
| def __init__(self, consistency_policy=None): |
| super(BaseTransactionManager, self).__init__() |
| |
| self._consistency_policy = (consistency_policy or |
| MasterSlaveConsistencyPolicy()) |
| |
| |
| self._meta_data_lock = threading.Lock() |
| BaseTransactionManager.Clear(self) |
| |
| def SetConsistencyPolicy(self, policy): |
| """Set the consistency to use. |
| |
| Causes all data to be flushed. |
| |
| Args: |
| policy: A obj inheriting from BaseConsistencyPolicy. |
| """ |
| if not isinstance(policy, BaseConsistencyPolicy): |
| raise TypeError('policy should be of type ' |
| 'datastore_stub_util.BaseConsistencyPolicy found %r.' % |
| (policy,)) |
| self.Flush() |
| self._consistency_policy = policy |
| |
| def Clear(self): |
| """Discards any pending transactions and resets the meta data.""" |
| |
| self._meta_data = {} |
| |
| self._txn_map = {} |
| |
| def BeginTransaction(self, app, allow_multiple_eg): |
| """Start a transaction on the given app. |
| |
| Args: |
| app: A string representing the app for which to start the transaction. |
| allow_multiple_eg: True if transactions can span multiple entity groups. |
| |
| Returns: |
| A datastore_pb.Transaction for the created transaction |
| """ |
| Check(not (allow_multiple_eg and isinstance( |
| self._consistency_policy, MasterSlaveConsistencyPolicy)), |
| 'transactions on multiple entity groups only allowed with the ' |
| 'High Replication datastore') |
| txn = self._BeginTransaction(app, allow_multiple_eg) |
| self._txn_map[id(txn)] = txn |
| transaction = datastore_pb.Transaction() |
| transaction.set_app(app) |
| transaction.set_handle(id(txn)) |
| return transaction |
| |
| def GetTxn(self, transaction, request_trusted, request_app): |
| """Gets the LiveTxn object associated with the given transaction. |
| |
| Args: |
| transaction: The datastore_pb.Transaction to look up. |
| request_trusted: A boolean indicating If the requesting app is trusted. |
| request_app: A string representing the app making the request. |
| |
| Returns: |
| The associated LiveTxn object. |
| """ |
| request_app = datastore_types.ResolveAppId(request_app) |
| CheckTransaction(request_trusted, request_app, transaction) |
| txn = self._txn_map.get(transaction.handle()) |
| Check(txn and txn._app == transaction.app(), |
| 'Transaction(<%s>) not found' % str(transaction).replace('\n', ', ')) |
| return txn |
| |
| def Groom(self): |
| """Attempts to apply any outstanding transactions. |
| |
| The consistency policy determins if a transaction should be applied. |
| """ |
| self._meta_data_lock.acquire() |
| try: |
| self._consistency_policy._OnGroom(self._meta_data.itervalues()) |
| finally: |
| self._meta_data_lock.release() |
| |
| def Flush(self): |
| """Applies all outstanding transactions.""" |
| self._meta_data_lock.acquire() |
| try: |
| for meta_data in self._meta_data.itervalues(): |
| if not meta_data._apply_queue: |
| continue |
| |
| |
| meta_data._write_lock.acquire() |
| try: |
| meta_data.CatchUp() |
| finally: |
| meta_data._write_lock.release() |
| finally: |
| self._meta_data_lock.release() |
| |
| def _GetMetaData(self, entity_group): |
| """Safely gets the EntityGroupMetaData object for the given entity_group. |
| """ |
| self._meta_data_lock.acquire() |
| try: |
| key = datastore_types.ReferenceToKeyValue(entity_group) |
| |
| meta_data = self._meta_data.get(key, None) |
| if not meta_data: |
| meta_data = EntityGroupMetaData(entity_group) |
| self._meta_data[key] = meta_data |
| return meta_data |
| finally: |
| self._meta_data_lock.release() |
| |
| def _BeginTransaction(self, app, allow_multiple_eg): |
| """Starts a transaction without storing it in the txn_map.""" |
| return LiveTxn(self, app, allow_multiple_eg) |
| |
| def _GrabSnapshot(self, entity_group): |
| """Grabs a consistent snapshot of the given entity group. |
| |
| Args: |
| entity_group: A entity_pb.Reference of the entity group of which the |
| snapshot should be taken. |
| |
| Returns: |
| A tuple of (meta_data, log_pos, snapshot) where log_pos is the current log |
| position and snapshot is a map of reference key value to |
| entity_pb.EntityProto. |
| """ |
| |
| meta_data = self._GetMetaData(entity_group) |
| meta_data._write_lock.acquire() |
| try: |
| if not meta_data._snapshot: |
| |
| meta_data.CatchUp() |
| meta_data._snapshot = self._GetEntitiesInEntityGroup(entity_group) |
| return meta_data, meta_data._log_pos, meta_data._snapshot |
| finally: |
| |
| meta_data._write_lock.release() |
| |
| def _AcquireWriteLocks(self, meta_data_list): |
| """Acquire the write locks for the given entity group meta data. |
| |
| These locks must be released with _ReleaseWriteLock before returning to the |
| user. |
| |
| Args: |
| meta_data_list: list of EntityGroupMetaData objects. |
| """ |
| for meta_data in sorted(meta_data_list): |
| meta_data._write_lock.acquire() |
| |
| def _ReleaseWriteLocks(self, meta_data_list): |
| """Release the write locks of the given entity group meta data. |
| |
| Args: |
| meta_data_list: list of EntityGroupMetaData objects. |
| """ |
| for meta_data in sorted(meta_data_list): |
| meta_data._write_lock.release() |
| |
| def _RemoveTxn(self, txn): |
| """Removes a LiveTxn from the txn_map (if present).""" |
| self._txn_map.pop(id(txn), None) |
| |
| def _Put(self, entity, insert): |
| """Put the given entity. |
| |
| This must be implemented by a sub-class. The sub-class can assume that any |
| need consistency is enforced at a higher level (and can just put blindly). |
| |
| Args: |
| entity: The entity_pb.EntityProto to put. |
| insert: A boolean that indicates if we should fail if the entity already |
| exists. |
| """ |
| raise NotImplementedError |
| |
| def _Delete(self, reference): |
| """Delete the entity associated with the specified reference. |
| |
| This must be implemented by a sub-class. The sub-class can assume that any |
| need consistency is enforced at a higher level (and can just delete |
| blindly). |
| |
| Args: |
| reference: The entity_pb.Reference of the entity to delete. |
| """ |
| raise NotImplementedError |
| |
| def _GetEntitiesInEntityGroup(self, entity_group): |
| """Gets the contents of a specific entity group. |
| |
| This must be implemented by a sub-class. The sub-class can assume that any |
| need consistency is enforced at a higher level (and can just blindly read). |
| |
| Other entity groups may be modified concurrently. |
| |
| Args: |
| entity_group: A entity_pb.Reference of the entity group to get. |
| |
| Returns: |
| A dict mapping datastore_types.ReferenceToKeyValue(key) to EntityProto |
| """ |
| raise NotImplementedError |
| |
| |
| class BaseIndexManager(object): |
| """A generic index manager that stores all data in memory.""" |
| |
| |
| |
| |
| |
| |
| |
| |
| WRITE_ONLY = entity_pb.CompositeIndex.WRITE_ONLY |
| READ_WRITE = entity_pb.CompositeIndex.READ_WRITE |
| DELETED = entity_pb.CompositeIndex.DELETED |
| ERROR = entity_pb.CompositeIndex.ERROR |
| |
| _INDEX_STATE_TRANSITIONS = { |
| WRITE_ONLY: frozenset((READ_WRITE, DELETED, ERROR)), |
| READ_WRITE: frozenset((DELETED,)), |
| ERROR: frozenset((DELETED,)), |
| DELETED: frozenset((ERROR,)), |
| } |
| |
| def __init__(self): |
| |
| |
| |
| self.__indexes = collections.defaultdict(list) |
| self.__indexes_lock = threading.Lock() |
| self.__next_index_id = 1 |
| self.__index_id_lock = threading.Lock() |
| |
| def __FindIndex(self, index): |
| """Finds an existing index by definition. |
| |
| Args: |
| index: entity_pb.CompositeIndex |
| |
| Returns: |
| entity_pb.CompositeIndex, if it exists; otherwise None |
| """ |
| app = index.app_id() |
| if app in self.__indexes: |
| for stored_index in self.__indexes[app]: |
| if index.definition() == stored_index.definition(): |
| return stored_index |
| |
| return None |
| |
| def CreateIndex(self, index, trusted=False, calling_app=None): |
| calling_app = datastore_types.ResolveAppId(calling_app) |
| CheckAppId(trusted, calling_app, index.app_id()) |
| Check(index.id() == 0, 'New index id must be 0.') |
| Check(not self.__FindIndex(index), 'Index already exists.') |
| |
| |
| self.__index_id_lock.acquire() |
| index.set_id(self.__next_index_id) |
| self.__next_index_id += 1 |
| self.__index_id_lock.release() |
| |
| |
| clone = entity_pb.CompositeIndex() |
| clone.CopyFrom(index) |
| app = index.app_id() |
| clone.set_app_id(app) |
| |
| |
| self.__indexes_lock.acquire() |
| try: |
| self.__indexes[app].append(clone) |
| finally: |
| self.__indexes_lock.release() |
| |
| self._OnIndexChange(index.app_id()) |
| |
| return index.id() |
| |
| def GetIndexes(self, app, trusted=False, calling_app=None): |
| """Get the CompositeIndex objects for the given app.""" |
| calling_app = datastore_types.ResolveAppId(calling_app) |
| CheckAppId(trusted, calling_app, app) |
| |
| return self.__indexes[app] |
| |
| def UpdateIndex(self, index, trusted=False, calling_app=None): |
| CheckAppId(trusted, calling_app, index.app_id()) |
| |
| stored_index = self.__FindIndex(index) |
| Check(stored_index, 'Index does not exist.') |
| Check(index.state() == stored_index.state() or |
| index.state() in self._INDEX_STATE_TRANSITIONS[stored_index.state()], |
| 'cannot move index state from %s to %s' % |
| (entity_pb.CompositeIndex.State_Name(stored_index.state()), |
| (entity_pb.CompositeIndex.State_Name(index.state())))) |
| |
| |
| self.__indexes_lock.acquire() |
| try: |
| stored_index.set_state(index.state()) |
| finally: |
| self.__indexes_lock.release() |
| |
| self._OnIndexChange(index.app_id()) |
| |
| def DeleteIndex(self, index, trusted=False, calling_app=None): |
| CheckAppId(trusted, calling_app, index.app_id()) |
| |
| stored_index = self.__FindIndex(index) |
| Check(stored_index, 'Index does not exist.') |
| |
| |
| app = index.app_id() |
| self.__indexes_lock.acquire() |
| try: |
| self.__indexes[app].remove(stored_index) |
| finally: |
| self.__indexes_lock.release() |
| |
| self._OnIndexChange(index.app_id()) |
| |
| def _SideLoadIndex(self, index): |
| self.__indexes[index.app()].append(index) |
| |
| def _OnIndexChange(self, app_id): |
| pass |
| |
| |
| class BaseDatastore(BaseTransactionManager, BaseIndexManager): |
| """A base implemenation of a Datastore. |
| |
| This class implements common functions associated with a datastore and |
| enforces security restrictions passed on by a stub or client. It is designed |
| to be shared by any number of threads or clients serving any number of apps. |
| |
| If an app is not specified explicitly it is pulled from the env and assumed to |
| be untrusted. |
| """ |
| |
| |
| |
| _MAX_QUERY_COMPONENTS = 100 |
| |
| |
| |
| _BATCH_SIZE = 20 |
| |
| |
| |
| _MAX_ACTIONS_PER_TXN = 5 |
| |
| def __init__(self, require_indexes=False, consistency_policy=None, |
| use_atexit=True, auto_id_policy=SEQUENTIAL): |
| BaseTransactionManager.__init__(self, consistency_policy=consistency_policy) |
| BaseIndexManager.__init__(self) |
| |
| self._require_indexes = require_indexes |
| self._pseudo_kinds = {} |
| self.SetAutoIdPolicy(auto_id_policy) |
| |
| if use_atexit: |
| |
| |
| |
| |
| atexit.register(self.Write) |
| |
| def Clear(self): |
| """Clears out all stored values.""" |
| |
| BaseTransactionManager.Clear(self) |
| |
| |
| def _RegisterPseudoKind(self, kind): |
| """Registers a pseudo kind to be used to satisfy a meta data query.""" |
| self._pseudo_kinds[kind.name] = kind |
| kind._stub = weakref.proxy(self) |
| |
| |
| |
| |
| def GetQueryCursor(self, raw_query, trusted=False, calling_app=None): |
| """Execute a query. |
| |
| Args: |
| raw_query: The non-validated datastore_pb.Query to run. |
| trusted: If the calling app is trusted. |
| calling_app: The app requesting the results or None to pull the app from |
| the environment. |
| |
| Returns: |
| A BaseCursor that can be used to retrieve results. |
| """ |
| |
| calling_app = datastore_types.ResolveAppId(calling_app) |
| CheckAppId(trusted, calling_app, raw_query.app()) |
| |
| |
| filters, orders = datastore_index.Normalize(raw_query.filter_list(), |
| raw_query.order_list(), |
| raw_query.property_name_list()) |
| |
| |
| CheckQuery(raw_query, filters, orders, self._MAX_QUERY_COMPONENTS) |
| FillUsersInQuery(filters) |
| |
| |
| self._CheckHasIndex(raw_query, trusted, calling_app) |
| |
| |
| index_list = self.__IndexListForQuery(raw_query) |
| |
| |
| if raw_query.has_transaction(): |
| |
| Check(raw_query.kind() not in self._pseudo_kinds, |
| 'transactional queries on "%s" not allowed' % raw_query.kind()) |
| txn = self.GetTxn(raw_query.transaction(), trusted, calling_app) |
| return txn.GetQueryCursor(raw_query, filters, orders, index_list) |
| |
| if raw_query.has_ancestor() and raw_query.kind() not in self._pseudo_kinds: |
| |
| txn = self._BeginTransaction(raw_query.app(), False) |
| return txn.GetQueryCursor(raw_query, filters, orders, index_list) |
| |
| |
| self.Groom() |
| return self._GetQueryCursor(raw_query, filters, orders, index_list) |
| |
| def __IndexListForQuery(self, query): |
| """Get the single composite index pb used by the query, if any, as a list. |
| |
| Args: |
| query: the datastore_pb.Query to compute the index list for |
| |
| Returns: |
| A singleton list of the composite index pb used by the query, |
| """ |
| |
| required, kind, ancestor, props = ( |
| datastore_index.CompositeIndexForQuery(query)) |
| if not required: |
| return [] |
| composite_index_pb = entity_pb.CompositeIndex() |
| composite_index_pb.set_app_id(query.app()) |
| composite_index_pb.set_id(0) |
| composite_index_pb.set_state(entity_pb.CompositeIndex.READ_WRITE) |
| index_pb = composite_index_pb.mutable_definition() |
| index_pb.set_entity_type(kind) |
| index_pb.set_ancestor(bool(ancestor)) |
| for name, direction in datastore_index.GetRecommendedIndexProperties(props): |
| prop_pb = entity_pb.Index_Property() |
| prop_pb.set_name(name) |
| prop_pb.set_direction(direction) |
| index_pb.property_list().append(prop_pb) |
| return [composite_index_pb] |
| |
| def Get(self, raw_keys, transaction=None, eventual_consistency=False, |
| trusted=False, calling_app=None): |
| """Get the entities for the given keys. |
| |
| Args: |
| raw_keys: A list of unverified entity_pb.Reference objects. |
| transaction: The datastore_pb.Transaction to use or None. |
| eventual_consistency: If we should allow stale, potentially inconsistent |
| results. |
| trusted: If the calling app is trusted. |
| calling_app: The app requesting the results or None to pull the app from |
| the environment. |
| |
| Returns: |
| A list containing the entity or None if no entity exists. |
| """ |
| |
| if not raw_keys: |
| return [] |
| |
| calling_app = datastore_types.ResolveAppId(calling_app) |
| |
| if not transaction and eventual_consistency: |
| |
| result = [] |
| for key in raw_keys: |
| CheckReference(calling_app, trusted, key) |
| result.append(self._GetWithPseudoKinds(None, key)) |
| return result |
| |
| |
| |
| |
| grouped_keys = collections.defaultdict(list) |
| for i, key in enumerate(raw_keys): |
| CheckReference(trusted, calling_app, key) |
| entity_group = _GetEntityGroup(key) |
| entity_group_key = datastore_types.ReferenceToKeyValue(entity_group) |
| grouped_keys[entity_group_key].append((key, i)) |
| |
| if transaction: |
| |
| txn = self.GetTxn(transaction, trusted, calling_app) |
| return [self._GetWithPseudoKinds(txn, key) for key in raw_keys] |
| else: |
| |
| |
| result = [None] * len(raw_keys) |
| |
| def op(txn, v): |
| key, i = v |
| result[i] = self._GetWithPseudoKinds(txn, key) |
| for keys in grouped_keys.itervalues(): |
| self._RunInTxn(keys, keys[0][0].app(), op) |
| return result |
| |
| def _GetWithPseudoKinds(self, txn, key): |
| """Fetch entity key in txn, taking account of pseudo-kinds.""" |
| pseudo_kind = self._pseudo_kinds.get(_GetKeyKind(key), None) |
| if pseudo_kind: |
| return pseudo_kind.Get(txn, key) |
| elif txn: |
| return txn.Get(key) |
| else: |
| return self._Get(key) |
| |
| def Put(self, raw_entities, cost, transaction=None, |
| trusted=False, calling_app=None): |
| """Writes the given given entities. |
| |
| Updates an entity's key and entity_group in place if needed |
| |
| Args: |
| raw_entities: A list of unverified entity_pb.EntityProto objects. |
| cost: Out param. The cost of putting the provided entities. |
| transaction: The datastore_pb.Transaction to use or None. |
| trusted: If the calling app is trusted. |
| calling_app: The app requesting the results or None to pull the app from |
| the environment. |
| Returns: |
| A list of entity_pb.Reference objects that indicates where each entity |
| was stored. |
| """ |
| if not raw_entities: |
| return [] |
| |
| calling_app = datastore_types.ResolveAppId(calling_app) |
| |
| |
| result = [None] * len(raw_entities) |
| grouped_entities = collections.defaultdict(list) |
| for i, raw_entity in enumerate(raw_entities): |
| CheckEntity(trusted, calling_app, raw_entity) |
| |
| |
| |
| entity = entity_pb.EntityProto() |
| entity.CopyFrom(raw_entity) |
| |
| |
| for prop in itertools.chain(entity.property_list(), |
| entity.raw_property_list()): |
| FillUser(prop) |
| |
| last_element = entity.key().path().element_list()[-1] |
| if not (last_element.id() or last_element.has_name()): |
| insert = True |
| |
| |
| if self._auto_id_policy == SEQUENTIAL: |
| last_element.set_id(self._AllocateSequentialIds(entity.key())[0]) |
| else: |
| full_key = self._AllocateIds([entity.key()])[0] |
| last_element.set_id(full_key.path().element_list()[-1].id()) |
| else: |
| insert = False |
| |
| entity_group = _GetEntityGroup(entity.key()) |
| entity.mutable_entity_group().CopyFrom(entity_group.path()) |
| entity_group_key = datastore_types.ReferenceToKeyValue(entity_group) |
| grouped_entities[entity_group_key].append((entity, insert)) |
| |
| |
| |
| key = entity_pb.Reference() |
| key.CopyFrom(entity.key()) |
| result[i] = key |
| |
| if transaction: |
| |
| txn = self.GetTxn(transaction, trusted, calling_app) |
| for group in grouped_entities.values(): |
| for entity, insert in group: |
| |
| indexes = _FilterIndexesByKind(entity.key(), self.GetIndexes( |
| entity.key().app(), trusted, calling_app)) |
| txn.Put(entity, insert, indexes) |
| else: |
| |
| for entities in grouped_entities.itervalues(): |
| txn_cost = self._RunInTxn( |
| entities, entities[0][0].key().app(), |
| |
| lambda txn, v: txn.Put(v[0], v[1], _FilterIndexesByKind( |
| v[0].key(), |
| self.GetIndexes(v[0].key().app(), trusted, calling_app)))) |
| _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes()) |
| return result |
| |
| def Delete(self, raw_keys, cost, transaction=None, |
| trusted=False, calling_app=None): |
| """Deletes the entities associated with the given keys. |
| |
| Args: |
| raw_keys: A list of unverified entity_pb.Reference objects. |
| cost: Out param. The cost of putting the provided entities. |
| transaction: The datastore_pb.Transaction to use or None. |
| trusted: If the calling app is trusted. |
| calling_app: The app requesting the results or None to pull the app from |
| the environment. |
| """ |
| if not raw_keys: |
| return |
| |
| calling_app = datastore_types.ResolveAppId(calling_app) |
| |
| |
| grouped_keys = collections.defaultdict(list) |
| for key in raw_keys: |
| CheckReference(trusted, calling_app, key) |
| entity_group = _GetEntityGroup(key) |
| entity_group_key = datastore_types.ReferenceToKeyValue(entity_group) |
| grouped_keys[entity_group_key].append(key) |
| |
| if transaction: |
| |
| txn = self.GetTxn(transaction, trusted, calling_app) |
| for key in raw_keys: |
| |
| indexes = _FilterIndexesByKind(key, self.GetIndexes( |
| key.app(), trusted, calling_app)) |
| txn.Delete(key, indexes) |
| else: |
| |
| for keys in grouped_keys.itervalues(): |
| |
| txn_cost = self._RunInTxn( |
| keys, keys[0].app(), |
| lambda txn, key: txn.Delete(key, _FilterIndexesByKind( |
| key, self.GetIndexes(key.app(), trusted, calling_app)))) |
| _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes()) |
| |
| def Touch(self, raw_keys, trusted=False, calling_app=None): |
| """Applies all outstanding writes.""" |
| calling_app = datastore_types.ResolveAppId(calling_app) |
| |
| grouped_keys = collections.defaultdict(list) |
| for key in raw_keys: |
| CheckReference(trusted, calling_app, key) |
| entity_group = _GetEntityGroup(key) |
| entity_group_key = datastore_types.ReferenceToKeyValue(entity_group) |
| grouped_keys[entity_group_key].append(key) |
| |
| for keys in grouped_keys.itervalues(): |
| self._RunInTxn(keys, keys[0].app(), lambda txn, key: None) |
| |
| def _RunInTxn(self, values, app, op): |
| """Runs the given values in a separate Txn. |
| |
| Retries up to _RETRIES times on CONCURRENT_TRANSACTION errors. |
| |
| Args: |
| values: A list of arguments to op. |
| app: The app to create the Txn on. |
| op: A function to run on each value in the Txn. |
| |
| Returns: |
| The cost of the txn. |
| """ |
| retries = 0 |
| backoff = _INITIAL_RETRY_DELAY_MS / 1000.0 |
| while True: |
| try: |
| txn = self._BeginTransaction(app, False) |
| for value in values: |
| op(txn, value) |
| return txn.Commit() |
| except apiproxy_errors.ApplicationError, e: |
| if e.application_error == datastore_pb.Error.CONCURRENT_TRANSACTION: |
| |
| retries += 1 |
| if retries <= _RETRIES: |
| time.sleep(backoff) |
| backoff *= _RETRY_DELAY_MULTIPLIER |
| if backoff * 1000.0 > _MAX_RETRY_DELAY_MS: |
| backoff = _MAX_RETRY_DELAY_MS / 1000.0 |
| continue |
| raise |
| |
| def _CheckHasIndex(self, query, trusted=False, calling_app=None): |
| """Checks if the query can be satisfied given the existing indexes. |
| |
| Args: |
| query: the datastore_pb.Query to check |
| trusted: True if the calling app is trusted (like dev_admin_console) |
| calling_app: app_id of the current running application |
| """ |
| if query.kind() in self._pseudo_kinds or not self._require_indexes: |
| return |
| |
| minimal_index = datastore_index.MinimalCompositeIndexForQuery(query, |
| (datastore_index.ProtoToIndexDefinition(index) |
| for index in self.GetIndexes(query.app(), trusted, calling_app) |
| if index.state() == entity_pb.CompositeIndex.READ_WRITE)) |
| if minimal_index is not None: |
| msg = ('This query requires a composite index that is not defined. ' |
| 'You must update the index.yaml file in your application root.') |
| is_most_efficient, kind, ancestor, properties = minimal_index |
| if not is_most_efficient: |
| |
| yaml = datastore_index.IndexYamlForQuery(kind, ancestor, |
| datastore_index.GetRecommendedIndexProperties(properties)) |
| msg += '\nThe following index is the minimum index required:\n' + yaml |
| raise apiproxy_errors.ApplicationError(datastore_pb.Error.NEED_INDEX, msg) |
| |
| def SetAutoIdPolicy(self, auto_id_policy): |
| """Set value of _auto_id_policy flag (default SEQUENTIAL). |
| |
| SEQUENTIAL auto ID assignment behavior will eventually be deprecated |
| and the default will be SCATTERED. |
| |
| Args: |
| auto_id_policy: string constant. |
| Raises: |
| TypeError: if auto_id_policy is not one of SEQUENTIAL or SCATTERED. |
| """ |
| valid_policies = (SEQUENTIAL, SCATTERED) |
| if auto_id_policy not in valid_policies: |
| raise TypeError('auto_id_policy must be in %s, found %s instead', |
| valid_policies, auto_id_policy) |
| self._auto_id_policy = auto_id_policy |
| |
| |
| |
| def Write(self): |
| """Writes the datastore to disk.""" |
| self.Flush() |
| |
| def _GetQueryCursor(self, query, filters, orders, index_list): |
| """Runs the given datastore_pb.Query and returns a QueryCursor for it. |
| |
| This must be implemented by a sub-class. The sub-class does not need to |
| enforced any consistency guarantees (and can just blindly read). |
| |
| Args: |
| query: The datastore_pb.Query to run. |
| filters: A list of filters that override the ones found on query. |
| orders: A list of orders that override the ones found on query. |
| index_list: A list of indexes used by the query. |
| |
| Returns: |
| A BaseCursor that can be used to fetch query results. |
| """ |
| raise NotImplementedError |
| |
| def _Get(self, reference): |
| """Get the entity for the given reference or None. |
| |
| This must be implemented by a sub-class. The sub-class does not need to |
| enforced any consistency guarantees (and can just blindly read). |
| |
| Args: |
| reference: A entity_pb.Reference to loop up. |
| |
| Returns: |
| The entity_pb.EntityProto associated with the given reference or None. |
| """ |
| raise NotImplementedError |
| |
| def _AllocateSequentialIds(self, reference, size=1, max_id=None): |
| """Allocate sequential ids for given reference. |
| |
| Args: |
| reference: An entity_pb.Reference to allocate an id for. |
| size: The size of the range to allocate |
| max_id: The upper bound of the range to allocate |
| |
| Returns: |
| A tuple containing (min, max) of the allocated range. |
| """ |
| raise NotImplementedError |
| |
| def _AllocateIds(self, references): |
| """Allocate or reserves IDs for the v4 datastore API. |
| |
| Incomplete keys are allocated scattered IDs. Complete keys have every id in |
| their paths reserved in the appropriate ID space. |
| |
| Args: |
| references: a list of entity_pb.Reference objects to allocate or reserve |
| |
| Returns: |
| a list of complete entity_pb.Reference objects corresponding to the |
| incomplete keys in the input, with newly allocated ids. |
| """ |
| raise NotImplementedError |
| |
| |
| def _NeedsIndexes(func): |
| """A decorator for DatastoreStub methods that require or affect indexes. |
| |
| Updates indexes to match index.yaml before the call and updates index.yaml |
| after the call if require_indexes is False. If root_path is not set, this is a |
| no op. |
| """ |
| |
| def UpdateIndexesWrapper(self, *args, **kwargs): |
| self._SetupIndexes() |
| try: |
| return func(self, *args, **kwargs) |
| finally: |
| self._UpdateIndexes() |
| |
| return UpdateIndexesWrapper |
| |
| |
| class EntityGroupPseudoKind(object): |
| """A common implementation of get() for the __entity_group__ pseudo-kind. |
| |
| Public properties: |
| name: the pseudo-kind name |
| """ |
| name = '__entity_group__' |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| base_version = int(time.time() * 1e6) |
| |
| def Get(self, txn, key): |
| """Fetch key of this pseudo-kind within txn. |
| |
| Args: |
| txn: transaction within which Get occurs, may be None if this is an |
| eventually consistent Get. |
| key: key of pseudo-entity to Get. |
| |
| Returns: |
| An entity for key, or None if it doesn't exist. |
| """ |
| |
| if not txn: |
| txn = self._stub._BeginTransaction(key.app(), False) |
| try: |
| return self.Get(txn, key) |
| finally: |
| txn.Rollback() |
| |
| |
| if isinstance(txn._txn_manager._consistency_policy, |
| MasterSlaveConsistencyPolicy): |
| return None |
| |
| |
| |
| |
| |
| |
| path = key.path() |
| if path.element_size() != 2 or path.element_list()[-1].id() != 1: |
| return None |
| |
| tracker = txn._GetTracker(key) |
| tracker._GrabSnapshot(txn._txn_manager) |
| |
| eg = entity_pb.EntityProto() |
| eg.mutable_key().CopyFrom(key) |
| eg.mutable_entity_group().CopyFrom(_GetEntityGroup(key).path()) |
| version = entity_pb.Property() |
| version.set_name('__version__') |
| version.set_multiple(False) |
| version.mutable_value().set_int64value( |
| tracker._read_pos + self.base_version) |
| eg.property_list().append(version) |
| return eg |
| |
| def Query(self, query, filters, orders): |
| """Perform a query on this pseudo-kind. |
| |
| Args: |
| query: the original datastore_pb.Query. |
| filters: the filters from query. |
| orders: the orders from query. |
| |
| Returns: |
| always raises an error |
| """ |
| |
| |
| raise apiproxy_errors.ApplicationError( |
| datastore_pb.Error.BAD_REQUEST, 'queries not supported on ' + self.name) |
| |
| |
| class DatastoreStub(object): |
| """A stub that maps datastore service calls on to a BaseDatastore. |
| |
| This class also keeps track of query cursors. |
| """ |
| |
| def __init__(self, |
| datastore, |
| app_id=None, |
| trusted=None, |
| root_path=None): |
| super(DatastoreStub, self).__init__() |
| self._datastore = datastore |
| self._app_id = datastore_types.ResolveAppId(app_id) |
| self._trusted = trusted |
| self._root_path = root_path |
| |
| |
| self.__query_history = {} |
| |
| |
| self.__query_ci_history = set() |
| |
| |
| |
| self._cached_yaml = (None, None, None) |
| |
| if self._require_indexes or root_path is None: |
| |
| self._index_yaml_updater = None |
| else: |
| |
| self._index_yaml_updater = datastore_stub_index.IndexYamlUpdater( |
| root_path) |
| |
| DatastoreStub.Clear(self) |
| |
| def Clear(self): |
| """Clears out all stored values.""" |
| self._query_cursors = {} |
| self.__query_history = {} |
| self.__query_ci_history = set() |
| |
| def QueryHistory(self): |
| """Returns a dict that maps Query PBs to times they've been run.""" |
| |
| return dict((pb, times) for pb, times in self.__query_history.items() |
| if pb.app() == self._app_id) |
| |
| def _QueryCompositeIndexHistoryLength(self): |
| """Returns the length of the CompositeIndex set for query history.""" |
| return len(self.__query_ci_history) |
| |
| def SetTrusted(self, trusted): |
| """Set/clear the trusted bit in the stub. |
| |
| This bit indicates that the app calling the stub is trusted. A |
| trusted app can write to datastores of other apps. |
| |
| Args: |
| trusted: boolean. |
| """ |
| self._trusted = trusted |
| |
| |
| |
| def _Dynamic_Get(self, req, res): |
| |
| |
| transaction = req.has_transaction() and req.transaction() or None |
| |
| |
| if req.allow_deferred() and req.key_size() > _MAXIMUM_RESULTS: |
| |
| |
| |
| keys_to_get = req.key_list()[-_MAXIMUM_RESULTS:] |
| deferred_keys = req.key_list()[:-_MAXIMUM_RESULTS] |
| res.deferred_list().extend(deferred_keys) |
| else: |
| |
| keys_to_get = req.key_list() |
| |
| res.set_in_order(not req.allow_deferred()) |
| |
| total_response_bytes = 0 |
| for index, entity in enumerate(self._datastore.Get(keys_to_get, |
| transaction, |
| req.has_failover_ms(), |
| self._trusted, |
| self._app_id)): |
| entity_size = entity and entity.ByteSize() or 0 |
| |
| |
| if (req.allow_deferred() |
| and index > 0 |
| and total_response_bytes + entity_size > _MAXIMUM_QUERY_RESULT_BYTES): |
| |
| res.deferred_list().extend(keys_to_get[index:]) |
| break |
| elif entity: |
| entity_result = res.add_entity() |
| entity_result.mutable_entity().CopyFrom(entity) |
| total_response_bytes += entity_size |
| else: |
| |
| entity_result = res.add_entity() |
| entity_result.mutable_key().CopyFrom(keys_to_get[index]) |
| |
| def _Dynamic_Put(self, req, res): |
| transaction = req.has_transaction() and req.transaction() or None |
| res.key_list().extend(self._datastore.Put(req.entity_list(), |
| res.mutable_cost(), |
| transaction, |
| self._trusted, self._app_id)) |
| |
| def _Dynamic_Delete(self, req, res): |
| transaction = req.has_transaction() and req.transaction() or None |
| self._datastore.Delete(req.key_list(), res.mutable_cost(), transaction, |
| self._trusted, self._app_id) |
| |
| def _Dynamic_Touch(self, req, _): |
| self._datastore.Touch(req.key_list(), self._trusted, self._app_id) |
| |
| @_NeedsIndexes |
| def _Dynamic_RunQuery(self, query, query_result): |
| self.__UpgradeCursors(query) |
| cursor = self._datastore.GetQueryCursor(query, self._trusted, self._app_id) |
| |
| if query.has_count(): |
| count = query.count() |
| elif query.has_limit(): |
| count = query.limit() |
| else: |
| count = self._BATCH_SIZE |
| |
| cursor.PopulateQueryResult(query_result, count, query.offset(), |
| query.compile(), first_result=True) |
| if query_result.has_cursor(): |
| self._query_cursors[query_result.cursor().cursor()] = cursor |
| |
| |
| if query.compile(): |
| |
| |
| compiled_query = query_result.mutable_compiled_query() |
| compiled_query.set_keys_only(query.keys_only()) |
| compiled_query.mutable_primaryscan().set_index_name(query.Encode()) |
| self.__UpdateQueryHistory(query) |
| |
| def __UpgradeCursors(self, query): |
| """Upgrades compiled cursors in place. |
| |
| If the cursor position does not specify before_ascending, populate it. |
| If before_ascending is already populated, use it and the sort direction |
| from the query to set an appropriate value for start_inclusive. |
| |
| Args: |
| query: datastore_pb.Query |
| """ |
| first_sort_direction = None |
| if query.order_list(): |
| first_sort_direction = query.order(0).direction() |
| |
| for compiled_cursor in [query.compiled_cursor(), |
| query.end_compiled_cursor()]: |
| self.__UpgradeCursor(compiled_cursor, first_sort_direction) |
| |
| def __UpgradeCursor(self, compiled_cursor, first_sort_direction): |
| """Upgrades a compiled cursor in place. |
| |
| If the cursor position does not specify before_ascending, populate it. |
| If before_ascending is already populated, use it and the provided direction |
| to set an appropriate value for start_inclusive. |
| |
| Args: |
| compiled_cursor: datastore_pb.CompiledCursor |
| first_sort_direction: first sort direction from the query or None |
| """ |
| |
| |
| if not self.__IsPlannable(compiled_cursor): |
| return |
| elif compiled_cursor.position().has_before_ascending(): |
| _SetStartInclusive(compiled_cursor.position(), first_sort_direction) |
| elif compiled_cursor.position().has_start_inclusive(): |
| _SetBeforeAscending(compiled_cursor.position(), first_sort_direction) |
| |
| def __IsPlannable(self, compiled_cursor): |
| """Returns True if compiled_cursor is plannable. |
| |
| Args: |
| compiled_cursor: datastore_pb.CompiledCursor |
| """ |
| position = compiled_cursor.position() |
| return position.has_key() or position.indexvalue_list() |
| |
| def __UpdateQueryHistory(self, query): |
| |
| clone = datastore_pb.Query() |
| clone.CopyFrom(query) |
| clone.clear_hint() |
| clone.clear_limit() |
| clone.clear_offset() |
| clone.clear_count() |
| if clone in self.__query_history: |
| self.__query_history[clone] += 1 |
| else: |
| self.__query_history[clone] = 1 |
| if clone.app() == self._app_id: |
| self.__query_ci_history.add( |
| datastore_index.CompositeIndexForQuery(clone)) |
| |
| def _Dynamic_Next(self, next_request, query_result): |
| app = next_request.cursor().app() |
| CheckAppId(self._trusted, self._app_id, app) |
| |
| cursor = self._query_cursors.get(next_request.cursor().cursor()) |
| Check(cursor and cursor.app == app, |
| 'Cursor %d not found' % next_request.cursor().cursor()) |
| |
| count = self._BATCH_SIZE |
| if next_request.has_count(): |
| count = next_request.count() |
| |
| cursor.PopulateQueryResult(query_result, count, next_request.offset(), |
| next_request.compile(), first_result=False) |
| |
| if not query_result.has_cursor(): |
| del self._query_cursors[next_request.cursor().cursor()] |
| |
| def _Dynamic_AddActions(self, request, _): |
| """Associates the creation of one or more tasks with a transaction. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueBulkAddRequest containing the |
| tasks that should be created when the transaction is committed. |
| """ |
| |
| |
| |
| |
| if not request.add_request_list(): |
| return |
| |
| transaction = request.add_request_list()[0].transaction() |
| txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id) |
| new_actions = [] |
| for add_request in request.add_request_list(): |
| |
| |
| |
| Check(add_request.transaction() == transaction, |
| 'Cannot add requests to different transactions') |
| clone = taskqueue_service_pb.TaskQueueAddRequest() |
| clone.CopyFrom(add_request) |
| clone.clear_transaction() |
| new_actions.append(clone) |
| |
| txn.AddActions(new_actions, self._MAX_ACTIONS_PER_TXN) |
| |
| def _Dynamic_BeginTransaction(self, req, transaction): |
| CheckAppId(self._trusted, self._app_id, req.app()) |
| transaction.CopyFrom(self._datastore.BeginTransaction( |
| req.app(), req.allow_multiple_eg())) |
| |
| def _Dynamic_Commit(self, transaction, res): |
| CheckAppId(self._trusted, self._app_id, transaction.app()) |
| txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id) |
| res.mutable_cost().CopyFrom(txn.Commit()) |
| |
| def _Dynamic_Rollback(self, transaction, _): |
| CheckAppId(self._trusted, self._app_id, transaction.app()) |
| txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id) |
| txn.Rollback() |
| |
| def _Dynamic_CreateIndex(self, index, id_response): |
| id_response.set_value(self._datastore.CreateIndex(index, |
| self._trusted, |
| self._app_id)) |
| |
| @_NeedsIndexes |
| def _Dynamic_GetIndices(self, app_str, composite_indices): |
| composite_indices.index_list().extend(self._datastore.GetIndexes( |
| app_str.value(), self._trusted, self._app_id)) |
| |
| def _Dynamic_UpdateIndex(self, index, _): |
| self._datastore.UpdateIndex(index, self._trusted, self._app_id) |
| |
| def _Dynamic_DeleteIndex(self, index, _): |
| self._datastore.DeleteIndex(index, self._trusted, self._app_id) |
| |
| def _Dynamic_AllocateIds(self, allocate_ids_request, allocate_ids_response): |
| Check(not allocate_ids_request.has_model_key() |
| or not allocate_ids_request.reserve_list(), |
| 'Cannot allocate and reserve IDs in the same request') |
| if allocate_ids_request.reserve_list(): |
| Check(not allocate_ids_request.has_size(), |
| 'Cannot specify size when reserving IDs') |
| Check(not allocate_ids_request.has_max(), |
| 'Cannot specify max when reserving IDs') |
| |
| if allocate_ids_request.has_model_key(): |
| CheckAppId(allocate_ids_request.model_key().app(), |
| self._trusted, self._app_id) |
| |
| reference = allocate_ids_request.model_key() |
| |
| (start, end) = self._datastore._AllocateSequentialIds( |
| reference, allocate_ids_request.size(), allocate_ids_request.max()) |
| |
| allocate_ids_response.set_start(start) |
| allocate_ids_response.set_end(end) |
| else: |
| for reference in allocate_ids_request.reserve_list(): |
| CheckAppId(reference.app(), self._trusted, self._app_id) |
| self._datastore._AllocateIds(allocate_ids_request.reserve_list()) |
| allocate_ids_response.set_start(0) |
| allocate_ids_response.set_end(0) |
| |
| def _SetupIndexes(self, _open=open): |
| """Ensure that the set of existing composite indexes matches index.yaml. |
| |
| Note: this is similar to the algorithm used by the admin console for |
| the same purpose. |
| """ |
| |
| |
| |
| if not self._root_path: |
| return |
| index_yaml_file = os.path.join(self._root_path, 'index.yaml') |
| if (self._cached_yaml[0] == index_yaml_file and |
| os.path.exists(index_yaml_file) and |
| os.path.getmtime(index_yaml_file) == self._cached_yaml[1]): |
| requested_indexes = self._cached_yaml[2] |
| else: |
| try: |
| index_yaml_mtime = os.path.getmtime(index_yaml_file) |
| fh = _open(index_yaml_file, 'r') |
| except (OSError, IOError): |
| index_yaml_data = None |
| else: |
| try: |
| index_yaml_data = fh.read() |
| finally: |
| fh.close() |
| |
| requested_indexes = [] |
| if index_yaml_data is not None: |
| |
| index_defs = datastore_index.ParseIndexDefinitions(index_yaml_data) |
| if index_defs is not None and index_defs.indexes is not None: |
| |
| requested_indexes = datastore_index.IndexDefinitionsToProtos( |
| self._app_id, |
| index_defs.indexes) |
| self._cached_yaml = (index_yaml_file, index_yaml_mtime, |
| requested_indexes) |
| |
| |
| existing_indexes = self._datastore.GetIndexes( |
| self._app_id, self._trusted, self._app_id) |
| |
| |
| requested = dict((x.definition().Encode(), x) for x in requested_indexes) |
| existing = dict((x.definition().Encode(), x) for x in existing_indexes) |
| |
| |
| created = 0 |
| for key, index in requested.iteritems(): |
| if key not in existing: |
| new_index = entity_pb.CompositeIndex() |
| new_index.CopyFrom(index) |
| new_index.set_id(datastore_admin.CreateIndex(new_index)) |
| new_index.set_state(entity_pb.CompositeIndex.READ_WRITE) |
| datastore_admin.UpdateIndex(new_index) |
| created += 1 |
| |
| |
| deleted = 0 |
| for key, index in existing.iteritems(): |
| if key not in requested: |
| datastore_admin.DeleteIndex(index) |
| deleted += 1 |
| |
| |
| if created or deleted: |
| logging.debug('Created %d and deleted %d index(es); total %d', |
| created, deleted, len(requested)) |
| |
| def _UpdateIndexes(self): |
| if self._index_yaml_updater is not None: |
| self._index_yaml_updater.UpdateIndexYaml() |
| |
| |
| class StubQueryConverter(object): |
| """Converter for v3 and v4 queries suitable for use in stubs.""" |
| |
| def __init__(self, entity_converter): |
| self._entity_converter = entity_converter |
| |
| def v4_to_v3_compiled_cursor(self, v4_cursor, v3_compiled_cursor): |
| """Converts a v4 cursor string to a v3 CompiledCursor. |
| |
| Args: |
| v4_cursor: a string representing a v4 query cursor |
| v3_compiled_cursor: a datastore_pb.CompiledCursor to populate |
| """ |
| v3_compiled_cursor.Clear() |
| try: |
| v3_compiled_cursor.ParseFromString(v4_cursor) |
| except ProtocolBuffer.ProtocolBufferDecodeError: |
| raise datastore_pbs.InvalidConversionError('Invalid query cursor.') |
| |
| def v3_to_v4_compiled_cursor(self, v3_compiled_cursor): |
| """Converts a v3 CompiledCursor to a v4 cursor string. |
| |
| Args: |
| v3_compiled_cursor: a datastore_pb.CompiledCursor |
| |
| Returns: |
| a string representing a v4 query cursor |
| """ |
| return v3_compiled_cursor.SerializeToString() |
| |
| def v4_to_v3_query(self, v4_partition_id, v4_query, v3_query): |
| """Converts a v4 Query to a v3 Query. |
| |
| Args: |
| v4_partition_id: a datastore_v4_pb.PartitionId |
| v4_query: a datastore_v4_pb.Query |
| v3_query: a datastore_pb.Query to populate |
| |
| Raises: |
| InvalidConversionError if the query cannot be converted |
| """ |
| v3_query.Clear() |
| |
| if v4_partition_id.dataset_id(): |
| v3_query.set_app(v4_partition_id.dataset_id()) |
| if v4_partition_id.has_namespace(): |
| v3_query.set_name_space(v4_partition_id.namespace()) |
| |
| v3_query.set_persist_offset(True) |
| v3_query.set_require_perfect_plan(True) |
| v3_query.set_compile(True) |
| |
| |
| if v4_query.has_limit(): |
| v3_query.set_limit(v4_query.limit()) |
| if v4_query.offset(): |
| v3_query.set_offset(v4_query.offset()) |
| if v4_query.has_start_cursor(): |
| self.v4_to_v3_compiled_cursor(v4_query.start_cursor(), |
| v3_query.mutable_compiled_cursor()) |
| if v4_query.has_end_cursor(): |
| self.v4_to_v3_compiled_cursor(v4_query.end_cursor(), |
| v3_query.mutable_end_compiled_cursor()) |
| |
| |
| if v4_query.kind_list(): |
| datastore_pbs.check_conversion(len(v4_query.kind_list()) == 1, |
| 'multiple kinds not supported') |
| v3_query.set_kind(v4_query.kind(0).name()) |
| |
| |
| has_key_projection = False |
| for prop in v4_query.projection_list(): |
| if prop.property().name() == datastore_pbs.PROPERTY_NAME_KEY: |
| has_key_projection = True |
| else: |
| v3_query.add_property_name(prop.property().name()) |
| if has_key_projection and not v3_query.property_name_list(): |
| v3_query.set_keys_only(True) |
| |
| |
| for prop in v4_query.group_by_list(): |
| v3_query.add_group_by_property_name(prop.name()) |
| |
| |
| self.__populate_v3_filters(v4_query.filter(), v3_query) |
| |
| |
| for v4_order in v4_query.order_list(): |
| v3_order = v3_query.add_order() |
| v3_order.set_property(v4_order.property().name()) |
| if v4_order.has_direction(): |
| v3_order.set_direction(v4_order.direction()) |
| |
| def v3_to_v4_query(self, v3_query, v4_query): |
| """Converts a v3 Query to a v4 Query. |
| |
| Args: |
| v3_query: a datastore_pb.Query |
| v4_query: a datastore_v4_pb.Query to populate |
| |
| Raises: |
| InvalidConversionError if the query cannot be converted |
| """ |
| v4_query.Clear() |
| |
| datastore_pbs.check_conversion(not v3_query.has_distinct(), |
| 'distinct option not supported') |
| datastore_pbs.check_conversion(v3_query.require_perfect_plan(), |
| 'non-perfect plans not supported') |
| |
| |
| |
| if v3_query.has_limit(): |
| v4_query.set_limit(v3_query.limit()) |
| if v3_query.offset(): |
| v4_query.set_offset(v3_query.offset()) |
| if v3_query.has_compiled_cursor(): |
| v4_query.set_start_cursor( |
| self.v3_to_v4_compiled_cursor(v3_query.compiled_cursor())) |
| if v3_query.has_end_compiled_cursor(): |
| v4_query.set_end_cursor( |
| self.v3_to_v4_compiled_cursor(v3_query.end_compiled_cursor())) |
| |
| |
| if v3_query.has_kind(): |
| v4_query.add_kind().set_name(v3_query.kind()) |
| |
| |
| for name in v3_query.property_name_list(): |
| v4_query.add_projection().mutable_property().set_name(name) |
| if v3_query.keys_only(): |
| v4_query.add_projection().mutable_property().set_name( |
| datastore_pbs.PROPERTY_NAME_KEY) |
| |
| |
| for name in v3_query.group_by_property_name_list(): |
| v4_query.add_group_by().set_name(name) |
| |
| |
| num_v4_filters = len(v3_query.filter_list()) |
| if v3_query.has_ancestor(): |
| num_v4_filters += 1 |
| |
| if num_v4_filters == 1: |
| get_property_filter = self.__get_property_filter |
| elif num_v4_filters >= 1: |
| v4_query.mutable_filter().mutable_composite_filter().set_operator( |
| datastore_v4_pb.CompositeFilter.AND) |
| get_property_filter = self.__add_property_filter |
| |
| if v3_query.has_ancestor(): |
| self.__v3_query_to_v4_ancestor_filter(v3_query, |
| get_property_filter(v4_query)) |
| for v3_filter in v3_query.filter_list(): |
| self.__v3_filter_to_v4_property_filter(v3_filter, |
| get_property_filter(v4_query)) |
| |
| |
| for v3_order in v3_query.order_list(): |
| v4_order = v4_query.add_order() |
| v4_order.mutable_property().set_name(v3_order.property()) |
| if v3_order.has_direction(): |
| v4_order.set_direction(v3_order.direction()) |
| |
| def __get_property_filter(self, v4_query): |
| """Returns the PropertyFilter from the query's top-level filter.""" |
| return v4_query.mutable_filter().mutable_property_filter() |
| |
| def __add_property_filter(self, v4_query): |
| """Adds and returns a PropertyFilter from the query's composite filter.""" |
| v4_comp_filter = v4_query.mutable_filter().mutable_composite_filter() |
| return v4_comp_filter.add_filter().mutable_property_filter() |
| |
| def __populate_v3_filters(self, v4_filter, v3_query): |
| """Populates a filters for a v3 Query. |
| |
| Args: |
| v4_filter: a datastore_v4_pb.Filter |
| v3_query: a datastore_pb.Query to populate with filters |
| """ |
| if v4_filter.has_property_filter(): |
| v4_property_filter = v4_filter.property_filter() |
| if (v4_property_filter.operator() |
| == datastore_v4_pb.PropertyFilter.HAS_ANCESTOR): |
| datastore_pbs.check_conversion( |
| v4_property_filter.value().has_key_value(), |
| 'HAS_ANCESTOR requires a reference value') |
| datastore_pbs.check_conversion((v4_property_filter.property().name() |
| == datastore_pbs.PROPERTY_NAME_KEY), |
| 'unsupported property') |
| datastore_pbs.check_conversion(not v3_query.has_ancestor(), |
| 'duplicate ancestor constraint') |
| self._entity_converter.v4_to_v3_reference( |
| v4_property_filter.value().key_value(), |
| v3_query.mutable_ancestor()) |
| else: |
| v3_filter = v3_query.add_filter() |
| property_name = v4_property_filter.property().name() |
| v3_filter.set_op(v4_property_filter.operator()) |
| datastore_pbs.check_conversion( |
| not v4_property_filter.value().list_value_list(), |
| ('unsupported value type, %s, in property filter' |
| ' on "%s"' % ('list_value', property_name))) |
| prop = v3_filter.add_property() |
| prop.set_multiple(False) |
| prop.set_name(property_name) |
| self._entity_converter.v4_value_to_v3_property_value( |
| v4_property_filter.value(), prop.mutable_value()) |
| elif v4_filter.has_composite_filter(): |
| datastore_pbs.check_conversion((v4_filter.composite_filter().operator() |
| == datastore_v4_pb.CompositeFilter.AND), |
| 'unsupported composite property operator') |
| for v4_sub_filter in v4_filter.composite_filter().filter_list(): |
| self.__populate_v3_filters(v4_sub_filter, v3_query) |
| |
| def __v3_filter_to_v4_property_filter(self, v3_filter, v4_property_filter): |
| """Converts a v3 Filter to a v4 PropertyFilter. |
| |
| Args: |
| v3_filter: a datastore_pb.Filter |
| v4_property_filter: a datastore_v4_pb.PropertyFilter to populate |
| |
| Raises: |
| InvalidConversionError if the filter cannot be converted |
| """ |
| datastore_pbs.check_conversion(v3_filter.property_size() == 1, |
| 'invalid filter') |
| datastore_pbs.check_conversion(v3_filter.op() <= 5, |
| 'unsupported filter op: %d' % v3_filter.op()) |
| v4_property_filter.Clear() |
| v4_property_filter.set_operator(v3_filter.op()) |
| v4_property_filter.mutable_property().set_name(v3_filter.property(0).name()) |
| self._entity_converter.v3_property_to_v4_value( |
| v3_filter.property(0), True, v4_property_filter.mutable_value()) |
| |
| def __v3_query_to_v4_ancestor_filter(self, v3_query, v4_property_filter): |
| """Converts a v3 Query to a v4 ancestor PropertyFilter. |
| |
| Args: |
| v3_query: a datastore_pb.Query |
| v4_property_filter: a datastore_v4_pb.PropertyFilter to populate |
| """ |
| v4_property_filter.Clear() |
| v4_property_filter.set_operator( |
| datastore_v4_pb.PropertyFilter.HAS_ANCESTOR) |
| prop = v4_property_filter.mutable_property() |
| prop.set_name(datastore_pbs.PROPERTY_NAME_KEY) |
| self._entity_converter.v3_to_v4_key( |
| v3_query.ancestor(), |
| v4_property_filter.mutable_value().mutable_key_value()) |
| |
| |
| |
| __query_converter = StubQueryConverter(datastore_pbs.get_entity_converter()) |
| |
| |
| def get_query_converter(): |
| """Returns a converter for v3 and v4 queries (not suitable for production). |
| |
| This converter is suitable for use in stubs but not for production. |
| |
| Returns: |
| a StubQueryConverter |
| """ |
| return __query_converter |
| |
| |
| class StubServiceConverter(object): |
| """Converter for v3/v4 request/response protos suitable for use in stubs.""" |
| |
| def __init__(self, entity_converter, query_converter): |
| self._entity_converter = entity_converter |
| self._query_converter = query_converter |
| |
| def v4_to_v3_cursor(self, v4_query_handle, v3_cursor): |
| """Converts a v4 cursor string to a v3 Cursor. |
| |
| Args: |
| v4_query_handle: a string representing a v4 query handle |
| v3_cursor: a datastore_pb.Cursor to populate |
| """ |
| try: |
| v3_cursor.ParseFromString(v4_query_handle) |
| except ProtocolBuffer.ProtocolBufferDecodeError: |
| raise datastore_pbs.InvalidConversionError('Invalid query handle.') |
| return v3_cursor |
| |
| def _v3_to_v4_query_handle(self, v3_cursor): |
| """Converts a v3 Cursor to a v4 query handle string. |
| |
| Args: |
| v3_cursor: a datastore_pb.Cursor |
| |
| Returns: |
| a string representing a v4 cursor |
| """ |
| return v3_cursor.SerializeToString() |
| |
| def v4_to_v3_txn(self, v4_txn, v3_txn): |
| """Converts a v4 transaction string to a v3 Transaction. |
| |
| Args: |
| v4_txn: a string representing a v4 transaction |
| v3_txn: a datastore_pb.Transaction to populate |
| """ |
| try: |
| v3_txn.ParseFromString(v4_txn) |
| except ProtocolBuffer.ProtocolBufferDecodeError: |
| raise datastore_pbs.InvalidConversionError('Invalid transaction.') |
| return v3_txn |
| |
| def _v3_to_v4_txn(self, v3_txn): |
| """Converts a v3 Transaction to a v4 transaction string. |
| |
| Args: |
| v3_txn: a datastore_pb.Transaction |
| |
| Returns: |
| a string representing a v4 transaction |
| """ |
| return v3_txn.SerializeToString() |
| |
| |
| |
| |
| def v4_to_v3_begin_transaction_req(self, app_id, v4_req): |
| """Converts a v4 BeginTransactionRequest to a v3 BeginTransactionRequest. |
| |
| Args: |
| app_id: app id |
| v4_req: a datastore_v4_pb.BeginTransactionRequest |
| |
| Returns: |
| a datastore_pb.BeginTransactionRequest |
| """ |
| v3_req = datastore_pb.BeginTransactionRequest() |
| v3_req.set_app(app_id) |
| v3_req.set_allow_multiple_eg(v4_req.cross_group()) |
| return v3_req |
| |
| def v3_to_v4_begin_transaction_resp(self, v3_resp): |
| """Converts a v3 Transaction to a v4 BeginTransactionResponse. |
| |
| Args: |
| v3_resp: a datastore_pb.Transaction |
| |
| Returns: |
| a datastore_v4_pb.BeginTransactionResponse |
| """ |
| v4_resp = datastore_v4_pb.BeginTransactionResponse() |
| v4_resp.set_transaction(self._v3_to_v4_txn(v3_resp)) |
| return v4_resp |
| |
| |
| |
| |
| def v4_rollback_req_to_v3_txn(self, v4_req): |
| """Converts a v4 RollbackRequest to a v3 Transaction. |
| |
| Args: |
| v4_req: a datastore_v4_pb.RollbackRequest |
| |
| Returns: |
| a datastore_pb.Transaction |
| """ |
| v3_txn = datastore_pb.Transaction() |
| self.v4_to_v3_txn(v4_req.transaction(), v3_txn) |
| return v3_txn |
| |
| |
| |
| |
| def v4_commit_req_to_v3_txn(self, v4_req): |
| """Converts a v4 CommitRequest to a v3 Transaction. |
| |
| Args: |
| v4_req: a datastore_v4_pb.CommitRequest |
| |
| Returns: |
| a datastore_pb.Transaction |
| """ |
| v3_txn = datastore_pb.Transaction() |
| self.v4_to_v3_txn(v4_req.transaction(), v3_txn) |
| return v3_txn |
| |
| |
| |
| |
| def v4_run_query_req_to_v3_query(self, v4_req): |
| """Converts a v4 RunQueryRequest to a v3 Query. |
| |
| GQL is not supported. |
| |
| Args: |
| v4_req: a datastore_v4_pb.RunQueryRequest |
| |
| Returns: |
| a datastore_pb.Query |
| """ |
| |
| datastore_pbs.check_conversion(not v4_req.has_gql_query(), |
| 'GQL not supported') |
| v3_query = datastore_pb.Query() |
| self._query_converter.v4_to_v3_query(v4_req.partition_id(), v4_req.query(), |
| v3_query) |
| |
| |
| if v4_req.has_suggested_batch_size(): |
| v3_query.set_count(v4_req.suggested_batch_size()) |
| |
| |
| read_options = v4_req.read_options() |
| if read_options.has_transaction(): |
| self.v4_to_v3_txn(read_options.transaction(), |
| v3_query.mutable_transaction()) |
| elif (read_options.read_consistency() |
| == datastore_v4_pb.ReadOptions.EVENTUAL): |
| v3_query.set_strong(False) |
| v3_query.set_failover_ms(-1) |
| elif read_options.read_consistency() == datastore_v4_pb.ReadOptions.STRONG: |
| v3_query.set_strong(True) |
| |
| if v4_req.has_min_safe_time_seconds(): |
| v3_query.set_min_safe_time_seconds(v4_req.min_safe_time_seconds()) |
| |
| return v3_query |
| |
| def v3_to_v4_run_query_req(self, v3_req): |
| """Converts a v3 Query to a v4 RunQueryRequest. |
| |
| Args: |
| v3_req: a datastore_pb.Query |
| |
| Returns: |
| a datastore_v4_pb.RunQueryRequest |
| """ |
| v4_req = datastore_v4_pb.RunQueryRequest() |
| |
| |
| v4_partition_id = v4_req.mutable_partition_id() |
| v4_partition_id.set_dataset_id(v3_req.app()) |
| if v3_req.name_space(): |
| v4_partition_id.set_namespace(v3_req.name_space()) |
| |
| |
| if v3_req.has_count(): |
| v4_req.set_suggested_batch_size(v3_req.count()) |
| |
| datastore_pbs.check_conversion( |
| not (v3_req.has_transaction() and v3_req.has_failover_ms()), |
| 'Cannot set failover and transaction handle.') |
| |
| |
| if v3_req.has_transaction(): |
| v4_req.mutable_read_options().set_transaction( |
| self._v3_to_v4_txn(v3_req.transaction())) |
| elif v3_req.strong(): |
| v4_req.mutable_read_options().set_read_consistency( |
| datastore_v4_pb.ReadOptions.STRONG) |
| elif v3_req.has_failover_ms(): |
| v4_req.mutable_read_options().set_read_consistency( |
| datastore_v4_pb.ReadOptions.EVENTUAL) |
| if v3_req.has_min_safe_time_seconds(): |
| v4_req.set_min_safe_time_seconds(v3_req.min_safe_time_seconds()) |
| |
| self._query_converter.v3_to_v4_query(v3_req, v4_req.mutable_query()) |
| |
| return v4_req |
| |
| def v4_run_query_resp_to_v3_query_result(self, v4_resp): |
| """Converts a V4 RunQueryResponse to a v3 QueryResult. |
| |
| Args: |
| v4_resp: a datastore_v4_pb.QueryResult |
| |
| Returns: |
| a datastore_pb.QueryResult |
| """ |
| v3_resp = self.v4_to_v3_query_result(v4_resp.batch()) |
| |
| |
| if v4_resp.has_query_handle(): |
| self.v4_to_v3_cursor(v4_resp.query_handle(), v3_resp.mutable_cursor()) |
| |
| return v3_resp |
| |
| def v3_to_v4_run_query_resp(self, v3_resp): |
| """Converts a v3 QueryResult to a V4 RunQueryResponse. |
| |
| Args: |
| v3_resp: a datastore_pb.QueryResult |
| |
| Returns: |
| a datastore_v4_pb.RunQueryResponse |
| """ |
| v4_resp = datastore_v4_pb.RunQueryResponse() |
| self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch()) |
| |
| if v3_resp.has_cursor(): |
| v4_resp.set_query_handle( |
| self._query_converter.v3_to_v4_compiled_cursor(v3_resp.cursor())) |
| |
| return v4_resp |
| |
| |
| |
| |
| def v4_to_v3_next_req(self, v4_req): |
| """Converts a v4 ContinueQueryRequest to a v3 NextRequest. |
| |
| Args: |
| v4_req: a datastore_v4_pb.ContinueQueryRequest |
| |
| Returns: |
| a datastore_pb.NextRequest |
| """ |
| v3_req = datastore_pb.NextRequest() |
| v3_req.set_compile(True) |
| self.v4_to_v3_cursor(v4_req.query_handle(), v3_req.mutable_cursor()) |
| return v3_req |
| |
| def v3_to_v4_continue_query_resp(self, v3_resp): |
| """Converts a v3 QueryResult to a v4 ContinueQueryResponse. |
| |
| Args: |
| v3_resp: a datstore_pb.QueryResult |
| |
| Returns: |
| a datastore_v4_pb.ContinueQueryResponse |
| """ |
| v4_resp = datastore_v4_pb.ContinueQueryResponse() |
| self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch()) |
| return v4_resp |
| |
| |
| |
| |
| def v4_to_v3_get_req(self, v4_req): |
| """Converts a v4 LookupRequest to a v3 GetRequest. |
| |
| Args: |
| v4_req: a datastore_v4_pb.LookupRequest |
| |
| Returns: |
| a datastore_pb.GetRequest |
| """ |
| v3_req = datastore_pb.GetRequest() |
| v3_req.set_allow_deferred(True) |
| |
| |
| if v4_req.read_options().has_transaction(): |
| self.v4_to_v3_txn(v4_req.read_options().transaction(), |
| v3_req.mutable_transaction()) |
| elif (v4_req.read_options().read_consistency() |
| == datastore_v4_pb.ReadOptions.EVENTUAL): |
| v3_req.set_strong(False) |
| v3_req.set_failover_ms(-1) |
| elif (v4_req.read_options().read_consistency() |
| == datastore_v4_pb.ReadOptions.STRONG): |
| v3_req.set_strong(True) |
| |
| for v4_key in v4_req.key_list(): |
| self._entity_converter.v4_to_v3_reference(v4_key, v3_req.add_key()) |
| |
| return v3_req |
| |
| def v3_to_v4_lookup_resp(self, v3_resp): |
| """Converts a v3 GetResponse to a v4 LookupResponse. |
| |
| Args: |
| v3_resp: a datastore_pb.GetResponse |
| |
| Returns: |
| a datastore_v4_pb.LookupResponse |
| """ |
| v4_resp = datastore_v4_pb.LookupResponse() |
| |
| for v3_ref in v3_resp.deferred_list(): |
| self._entity_converter.v3_to_v4_key(v3_ref, v4_resp.add_deferred()) |
| for v3_entity in v3_resp.entity_list(): |
| if v3_entity.has_entity(): |
| self._entity_converter.v3_to_v4_entity( |
| v3_entity.entity(), |
| v4_resp.add_found().mutable_entity()) |
| if v3_entity.has_key(): |
| self._entity_converter.v3_to_v4_key( |
| v3_entity.key(), |
| v4_resp.add_missing().mutable_entity().mutable_key()) |
| |
| return v4_resp |
| |
| def v4_to_v3_query_result(self, v4_batch): |
| """Converts a v4 QueryResultBatch to a v3 QueryResult. |
| |
| Args: |
| v4_batch: a datastore_v4_pb.QueryResultBatch |
| |
| Returns: |
| a datastore_pb.QueryResult |
| """ |
| v3_result = datastore_pb.QueryResult() |
| |
| |
| v3_result.set_more_results( |
| (v4_batch.more_results() |
| == datastore_v4_pb.QueryResultBatch.NOT_FINISHED)) |
| if v4_batch.has_end_cursor(): |
| self._query_converter.v4_to_v3_compiled_cursor( |
| v4_batch.end_cursor(), v3_result.mutable_compiled_cursor()) |
| |
| |
| if v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.PROJECTION: |
| v3_result.set_index_only(True) |
| elif v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.KEY_ONLY: |
| v3_result.set_keys_only(True) |
| |
| |
| if v4_batch.has_skipped_results(): |
| v3_result.set_skipped_results(v4_batch.skipped_results()) |
| for v4_entity in v4_batch.entity_result_list(): |
| v3_entity = v3_result.add_result() |
| self._entity_converter.v4_to_v3_entity(v4_entity.entity(), v3_entity) |
| if v4_batch.entity_result_type() != datastore_v4_pb.EntityResult.FULL: |
| |
| |
| v3_entity.clear_entity_group() |
| |
| return v3_result |
| |
| def v3_to_v4_query_result_batch(self, v3_result, v4_batch): |
| """Converts a v3 QueryResult to a v4 QueryResultBatch. |
| |
| Args: |
| v3_result: a datastore_pb.QueryResult |
| v4_batch: a datastore_v4_pb.QueryResultBatch to populate |
| """ |
| v4_batch.Clear() |
| |
| |
| if v3_result.more_results(): |
| v4_batch.set_more_results(datastore_v4_pb.QueryResultBatch.NOT_FINISHED) |
| else: |
| v4_batch.set_more_results( |
| datastore_v4_pb.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT) |
| if v3_result.has_compiled_cursor(): |
| v4_batch.set_end_cursor( |
| self._query_converter.v3_to_v4_compiled_cursor( |
| v3_result.compiled_cursor())) |
| |
| |
| if v3_result.keys_only(): |
| v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.KEY_ONLY) |
| elif v3_result.index_only(): |
| v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.PROJECTION) |
| else: |
| v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.FULL) |
| |
| |
| if v3_result.has_skipped_results(): |
| v4_batch.set_skipped_results(v3_result.skipped_results()) |
| for v3_entity in v3_result.result_list(): |
| v4_entity_result = datastore_v4_pb.EntityResult() |
| self._entity_converter.v3_to_v4_entity(v3_entity, |
| v4_entity_result.mutable_entity()) |
| v4_batch.entity_result_list().append(v4_entity_result) |
| |
| |
| |
| __service_converter = StubServiceConverter( |
| datastore_pbs.get_entity_converter(), __query_converter) |
| |
| |
| def get_service_converter(): |
| """Returns a converter for v3 and v4 service request/response protos. |
| |
| This converter is suitable for use in stubs but not for production. |
| |
| Returns: |
| a StubServiceConverter |
| """ |
| return __service_converter |
| |
| |
| def ReverseBitsInt64(v): |
| """Reverse the bits of a 64-bit integer. |
| |
| Args: |
| v: Input integer of type 'int' or 'long'. |
| |
| Returns: |
| Bit-reversed input as 'int' on 64-bit machines or as 'long' otherwise. |
| """ |
| |
| v = ((v >> 1) & 0x5555555555555555) | ((v & 0x5555555555555555) << 1) |
| v = ((v >> 2) & 0x3333333333333333) | ((v & 0x3333333333333333) << 2) |
| v = ((v >> 4) & 0x0F0F0F0F0F0F0F0F) | ((v & 0x0F0F0F0F0F0F0F0F) << 4) |
| v = ((v >> 8) & 0x00FF00FF00FF00FF) | ((v & 0x00FF00FF00FF00FF) << 8) |
| v = ((v >> 16) & 0x0000FFFF0000FFFF) | ((v & 0x0000FFFF0000FFFF) << 16) |
| v = int((v >> 32) | (v << 32) & 0xFFFFFFFFFFFFFFFF) |
| return v |
| |
| |
| def ToScatteredId(v): |
| """Map counter value v to the scattered ID space. |
| |
| Translate to scattered ID space, then reverse bits. |
| |
| Args: |
| v: Counter value from which to produce ID. |
| |
| Returns: |
| Integer ID. |
| |
| Raises: |
| datastore_errors.BadArgumentError if counter value exceeds the range of |
| the scattered ID space. |
| """ |
| if v >= _MAX_SCATTERED_COUNTER: |
| raise datastore_errors.BadArgumentError('counter value too large (%d)' %v) |
| return _MAX_SEQUENTIAL_ID + 1 + long(ReverseBitsInt64(v << _SCATTER_SHIFT)) |
| |
| |
| def IdToCounter(k): |
| """Map ID k to the counter value from which it was generated. |
| |
| Determine whether k is sequential or scattered ID. |
| |
| Args: |
| k: ID from which to infer counter value. |
| |
| Returns: |
| Tuple of integers (counter_value, id_space). |
| """ |
| if k > _MAX_SCATTERED_ID: |
| return 0, SCATTERED |
| elif k > _MAX_SEQUENTIAL_ID and k <= _MAX_SCATTERED_ID: |
| return long(ReverseBitsInt64(k) >> _SCATTER_SHIFT), SCATTERED |
| elif k > 0: |
| return long(k), SEQUENTIAL |
| else: |
| raise datastore_errors.BadArgumentError('invalid id (%d)' % k) |
| |
| |
| def CompareEntityPbByKey(a, b): |
| """Compare two entity protobuf's by key. |
| |
| Args: |
| a: entity_pb.EntityProto to compare |
| b: entity_pb.EntityProto to compare |
| Returns: |
| <0 if a's key is before b's, =0 if they are the same key, and >0 otherwise. |
| """ |
| return cmp(datastore_types.Key._FromPb(a.key()), |
| datastore_types.Key._FromPb(b.key())) |
| |
| |
| def _GuessOrders(filters, orders): |
| """Guess any implicit ordering. |
| |
| The datastore gives a logical, but not necessarily predictable, ordering when |
| orders are not completely explicit. This function guesses at that ordering |
| (which is better then always ordering by __key__ for tests). |
| |
| Args: |
| filters: The datastore_pb.Query_Filter that have already been normalized and |
| checked. |
| orders: The datastore_pb.Query_Order that have already been normalized and |
| checked. Mutated in place. |
| """ |
| orders = orders[:] |
| |
| |
| if not orders: |
| for filter_pb in filters: |
| if filter_pb.op() in datastore_index.INEQUALITY_OPERATORS: |
| |
| order = datastore_pb.Query_Order() |
| order.set_property(filter_pb.property(0).name()) |
| orders.append(order) |
| break |
| |
| |
| exists_props = (filter_pb.property(0).name() for filter_pb in filters |
| if filter_pb.op() == datastore_pb.Query_Filter.EXISTS) |
| for prop in sorted(exists_props): |
| order = datastore_pb.Query_Order() |
| order.set_property(prop) |
| orders.append(order) |
| |
| |
| if not orders or orders[-1].property() != '__key__': |
| order = datastore_pb.Query_Order() |
| order.set_property('__key__') |
| orders.append(order) |
| return orders |
| |
| |
| def _MakeQuery(query, filters, orders): |
| """Make a datastore_query.Query for the given datastore_pb.Query. |
| |
| Overrides filters and orders in query with the specified arguments.""" |
| clone = datastore_pb.Query() |
| clone.CopyFrom(query) |
| clone.clear_filter() |
| clone.clear_order() |
| clone.filter_list().extend(filters) |
| clone.order_list().extend(orders) |
| return datastore_query.Query._from_pb(clone) |
| |
| def _CreateIndexEntities(entity, postfix_props): |
| """Creates entities for index values that would appear in prodcution. |
| |
| This function finds all multi-valued properties listed in split_props, and |
| creates a new entity for each unique combination of values. The resulting |
| entities will only have a single value for each property listed in |
| split_props. |
| |
| It reserves the right to include index data that would not be |
| seen in production, e.g. by returning the original entity when no splitting |
| is needed. LoadEntity will remove any excess fields. |
| |
| This simulates the results seen by an index scan in the datastore. |
| |
| Args: |
| entity: The entity_pb.EntityProto to split. |
| split_props: A set of property names to split on. |
| |
| Returns: |
| A list of the split entity_pb.EntityProtos. |
| """ |
| to_split = {} |
| split_required = False |
| base_props = [] |
| for prop in entity.property_list(): |
| if prop.name() in postfix_props: |
| values = to_split.get(prop.name()) |
| if values is None: |
| values = [] |
| to_split[prop.name()] = values |
| else: |
| |
| split_required = True |
| if prop.value() not in values: |
| values.append(prop.value()) |
| else: |
| base_props.append(prop) |
| |
| if not split_required: |
| |
| return [entity] |
| |
| clone = entity_pb.EntityProto() |
| clone.CopyFrom(entity) |
| clone.clear_property() |
| clone.property_list().extend(base_props) |
| results = [clone] |
| |
| for name, splits in to_split.iteritems(): |
| if len(splits) == 1: |
| |
| for result in results: |
| prop = result.add_property() |
| prop.set_name(name) |
| prop.set_multiple(False) |
| prop.set_meaning(entity_pb.Property.INDEX_VALUE) |
| prop.mutable_value().CopyFrom(splits[0]) |
| continue |
| |
| new_results = [] |
| for result in results: |
| for split in splits: |
| clone = entity_pb.EntityProto() |
| clone.CopyFrom(result) |
| prop = clone.add_property() |
| prop.set_name(name) |
| prop.set_multiple(False) |
| prop.set_meaning(entity_pb.Property.INDEX_VALUE) |
| prop.mutable_value().CopyFrom(split) |
| new_results.append(clone) |
| results = new_results |
| return results |
| |
| |
| def _CreateIndexOnlyQueryResults(results, postfix_props): |
| """Creates a result set similar to that returned by an index only query.""" |
| new_results = [] |
| for result in results: |
| new_results.extend(_CreateIndexEntities(result, postfix_props)) |
| return new_results |
| |
| |
| def _ExecuteQuery(results, query, filters, orders, index_list): |
| """Executes the query on a superset of its results. |
| |
| Args: |
| results: superset of results for query. |
| query: a datastore_pb.Query. |
| filters: the filters from query. |
| orders: the orders from query. |
| index_list: the list of indexes used by the query. |
| |
| Returns: |
| A ListCursor over the results of applying query to results. |
| """ |
| orders = _GuessOrders(filters, orders) |
| dsquery = _MakeQuery(query, filters, orders) |
| |
| if query.property_name_size(): |
| results = _CreateIndexOnlyQueryResults( |
| results, set(order.property() for order in orders)) |
| |
| return ListCursor(query, dsquery, orders, index_list, |
| datastore_query.apply_query(dsquery, results)) |
| |
| |
| def _UpdateCost(cost, entity_writes, index_writes): |
| """Updates the provided cost. |
| |
| Args: |
| cost: Out param. The cost object to update. |
| entity_writes: The number of entity writes to add. |
| index_writes: The number of index writes to add. |
| """ |
| cost.set_entity_writes(cost.entity_writes() + entity_writes) |
| cost.set_index_writes(cost.index_writes() + index_writes) |
| |
| |
| def _CalculateWriteOps(composite_indexes, old_entity, new_entity): |
| """Determines number of entity and index writes needed to write new_entity. |
| |
| We assume that old_entity represents the current state of the Datastore. |
| |
| Args: |
| composite_indexes: The composite_indexes for the kind of the entities. |
| old_entity: Entity representing the current state in the Datstore. |
| new_entity: Entity representing the desired state in the Datstore. |
| |
| Returns: |
| A tuple of size 2, where the first value is the number of entity writes and |
| the second value is the number of index writes. |
| """ |
| if (old_entity is not None and |
| old_entity.property_list() == new_entity.property_list() |
| and old_entity.raw_property_list() == new_entity.raw_property_list()): |
| return 0, 0 |
| |
| index_writes = _ChangedIndexRows(composite_indexes, old_entity, new_entity) |
| if old_entity is None: |
| |
| |
| |
| index_writes += 1 |
| |
| return 1, index_writes |
| |
| |
| def _ChangedIndexRows(composite_indexes, old_entity, new_entity): |
| """Determine the number of index rows that need to change. |
| |
| We assume that old_entity represents the current state of the Datastore. |
| |
| Args: |
| composite_indexes: The composite_indexes for the kind of the entities. |
| old_entity: Entity representing the current state in the Datastore. |
| new_entity: Entity representing the desired state in the Datastore |
| |
| Returns: |
| The number of index rows that need to change. |
| """ |
| |
| |
| |
| unique_old_properties = collections.defaultdict(set) |
| |
| |
| |
| |
| unique_new_properties = collections.defaultdict(set) |
| |
| if old_entity is not None: |
| for old_prop in old_entity.property_list(): |
| _PopulateUniquePropertiesSet(old_prop, unique_old_properties) |
| |
| |
| unchanged = collections.defaultdict(int) |
| |
| for new_prop in new_entity.property_list(): |
| new_prop_as_str = _PopulateUniquePropertiesSet( |
| new_prop, unique_new_properties) |
| if new_prop_as_str in unique_old_properties[new_prop.name()]: |
| unchanged[new_prop.name()] += 1 |
| |
| |
| |
| |
| all_property_names = set(unique_old_properties.iterkeys()) |
| all_property_names.update(unique_old_properties.iterkeys()) |
| all_property_names.update(unchanged.iterkeys()) |
| |
| all_indexes = _GetEntityByPropertyIndexes(all_property_names) |
| all_indexes.extend([comp.definition() for comp in composite_indexes]) |
| path_size = new_entity.key().path().element_size() |
| writes = 0 |
| for index in all_indexes: |
| |
| |
| |
| ancestor_multiplier = 1 |
| if index.ancestor() and index.property_size() > 1: |
| ancestor_multiplier = path_size |
| writes += (_CalculateWritesForCompositeIndex( |
| index, unique_old_properties, unique_new_properties, unchanged) * |
| ancestor_multiplier) |
| return writes |
| |
| |
| def _PopulateUniquePropertiesSet(prop, unique_properties): |
| """Populates a set containing unique properties. |
| |
| Args: |
| prop: An entity property. |
| unique_properties: Dictionary mapping property names to a set of unique |
| properties. |
| |
| Returns: |
| The property pb in string (hashable) form. |
| """ |
| if prop.multiple(): |
| prop = _CopyAndSetMultipleToFalse(prop) |
| |
| |
| prop_as_str = prop.SerializePartialToString() |
| unique_properties[prop.name()].add(prop_as_str) |
| return prop_as_str |
| |
| |
| def _CalculateWritesForCompositeIndex(index, unique_old_properties, |
| unique_new_properties, |
| common_properties): |
| """Calculate the number of writes required to maintain a specific Index. |
| |
| Args: |
| index: The composite index. |
| unique_old_properties: Dictionary mapping property names to a set of props |
| present on the old entity. |
| unique_new_properties: Dictionary mapping property names to a set of props |
| present on the new entity. |
| common_properties: Dictionary mapping property names to the number of |
| properties with that name that are present on both the old and new |
| entities. |
| |
| Returns: |
| The number of writes required to maintained the provided index. |
| """ |
| old_count = 1 |
| new_count = 1 |
| common_count = 1 |
| for prop in index.property_list(): |
| old_count *= len(unique_old_properties[prop.name()]) |
| new_count *= len(unique_new_properties[prop.name()]) |
| common_count *= common_properties[prop.name()] |
| |
| return (old_count - common_count) + (new_count - common_count) |
| |
| |
| def _GetEntityByPropertyIndexes(all_property_names): |
| indexes = [] |
| for prop_name in all_property_names: |
| indexes.append( |
| _SinglePropertyIndex(prop_name, entity_pb.Index_Property.ASCENDING)) |
| indexes.append( |
| _SinglePropertyIndex(prop_name, entity_pb.Index_Property.DESCENDING)) |
| return indexes |
| |
| |
| def _SinglePropertyIndex(prop_name, direction): |
| """Creates a single property Index for the given name and direction. |
| |
| Args: |
| prop_name: The name of the single property on the Index. |
| direction: The direction of the Index. |
| |
| Returns: |
| A single property Index with the given property and direction. |
| """ |
| index = entity_pb.Index() |
| prop = index.add_property() |
| prop.set_name(prop_name) |
| prop.set_direction(direction) |
| return index |
| |
| |
| def _CopyAndSetMultipleToFalse(prop): |
| """Copy the provided Property and set its "multiple" attribute to False. |
| |
| Args: |
| prop: The Property to copy. |
| |
| Returns: |
| A copy of the given Property with its "multiple" attribute set to False. |
| """ |
| |
| |
| |
| |
| |
| prop_copy = entity_pb.Property() |
| prop_copy.MergeFrom(prop) |
| prop_copy.set_multiple(False) |
| return prop_copy |
| |
| |
| def _SetStartInclusive(position, first_direction): |
| """Sets the start_inclusive field in position. |
| |
| Args: |
| position: datastore_pb.Position |
| first_direction: the first sort order from the query |
| (a datastore_pb.Query_Order) or None |
| """ |
| position.set_start_inclusive( |
| position.before_ascending() |
| != (first_direction == datastore_pb.Query_Order.DESCENDING)) |
| |
| |
| def _SetBeforeAscending(position, first_direction): |
| """Sets the before_ascending field in position. |
| |
| Args: |
| position: datastore_pb.Position |
| first_direction: the first sort order from the query |
| (a datastore_pb.Query_Order) or None |
| """ |
| position.set_before_ascending( |
| position.start_inclusive() |
| != (first_direction == datastore_pb.Query_Order.DESCENDING)) |