| #!/usr/bin/env python |
| # |
| # Copyright 2007 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| |
| |
| """A thin wrapper around datastore query RPC calls. |
| |
| This provides wrappers around the internal only datastore_pb library and is |
| designed to be the lowest-level API to be used by all Python datastore client |
| libraries for executing queries. It provides a layer of protection so the actual |
| RPC syntax can change without affecting client libraries. |
| |
| Any class, function, field or argument starting with an '_' is for INTERNAL use |
| only and should not be used by developers! |
| """ |
| |
| |
| |
| |
| |
| |
| |
| __all__ = ['Batch', |
| 'Batcher', |
| 'CompositeFilter', |
| 'CompositeOrder', |
| 'CorrelationFilter', |
| 'Cursor', |
| 'FetchOptions', |
| 'FilterPredicate', |
| 'Order', |
| 'PropertyFilter', |
| 'PropertyOrder', |
| 'Query', |
| 'QueryOptions', |
| 'ResultsIterator', |
| 'make_filter', |
| 'apply_query', |
| 'inject_results', |
| ] |
| |
| import base64 |
| import pickle |
| import collections |
| |
| from google.appengine.datastore import entity_pb |
| from google.appengine.api import datastore_errors |
| from google.appengine.api import datastore_types |
| from google.appengine.datastore import datastore_index |
| from google.appengine.datastore import datastore_pb |
| from google.appengine.datastore import datastore_rpc |
| |
| |
| class _BaseComponent(object): |
| """A base class for query components. |
| |
| Currently just implements basic == and != functions. |
| """ |
| |
| def __eq__(self, other): |
| if self.__class__ is not other.__class__: |
| return NotImplemented |
| return self is other or self.__dict__ == other.__dict__ |
| |
| def __ne__(self, other): |
| equal = self.__eq__(other) |
| if equal is NotImplemented: |
| return equal |
| return not equal |
| |
| |
| def make_filter(name, op, values): |
| """Constructs a FilterPredicate from the given name, op and values. |
| |
| Args: |
| name: A non-empty string, the name of the property to filter. |
| op: One of PropertyFilter._OPERATORS.keys(), the operator to use. |
| values: A supported value, the value to compare against. |
| |
| Returns: |
| if values is a list, a CompositeFilter that uses AND to combine all |
| values, otherwise a PropertyFilter for the single value. |
| |
| Raises: |
| datastore_errors.BadPropertyError: if the property name is invalid. |
| datastore_errors.BadValueError: if the property did not validate correctly |
| or the value was an empty list. |
| Other exception types (like OverflowError): if the property value does not |
| meet type-specific criteria. |
| """ |
| datastore_types.ValidateProperty(name, values) |
| properties = datastore_types.ToPropertyPb(name, values) |
| if isinstance(properties, list): |
| filters = [PropertyFilter(op, prop) for prop in properties] |
| return CompositeFilter(CompositeFilter.AND, filters) |
| else: |
| return PropertyFilter(op, properties) |
| |
| |
| def _make_key_value_map(entity, property_names): |
| """Extracts key values from the given entity. |
| |
| Args: |
| entity: The entity_pb.EntityProto to extract values from. |
| property_names: The names of the properties from which to extract values. |
| |
| Returns: |
| A dict mapping property names to a lists of key values. |
| """ |
| value_map = dict((name, []) for name in property_names) |
| |
| |
| for prop in entity.property_list(): |
| if prop.name() in value_map: |
| value_map[prop.name()].append( |
| datastore_types.PropertyValueToKeyValue(prop.value())) |
| |
| |
| if datastore_types.KEY_SPECIAL_PROPERTY in value_map: |
| value_map[datastore_types.KEY_SPECIAL_PROPERTY] = [ |
| datastore_types.ReferenceToKeyValue(entity.key())] |
| |
| return value_map |
| |
| |
| class _PropertyComponent(_BaseComponent): |
| """A component that operates on a specific set of properties.""" |
| |
| def _get_prop_names(self): |
| """Returns a set of property names used by the filter.""" |
| raise NotImplementedError |
| |
| |
| class FilterPredicate(_PropertyComponent): |
| """An abstract base class for all query filters. |
| |
| All sub-classes must be immutable as these are often stored without creating a |
| defensive copy. |
| """ |
| |
| def __call__(self, entity): |
| """Applies the filter predicate to the given entity. |
| |
| Args: |
| entity: the datastore_pb.EntityProto to test. |
| |
| Returns: |
| True if the given entity matches the filter, False otherwise. |
| """ |
| return self._apply(_make_key_value_map(entity, self._get_prop_names())) |
| |
| def _apply(self, key_value_map): |
| """Apply the given component to the comparable value map. |
| |
| A filter matches a list of values if at least one value in the list |
| matches the filter, for example: |
| 'prop: [1, 2]' matches both 'prop = 1' and 'prop = 2' but not 'prop = 3' |
| |
| Note: the values are actually represented as tuples whose first item |
| encodes the type; see datastore_types.PropertyValueToKeyValue(). |
| |
| Args: |
| key_value_map: A dict mapping property names to a list of |
| comparable values. |
| |
| Return: |
| A boolean indicating if the given map matches the filter. |
| """ |
| raise NotImplementedError |
| |
| def _prune(self, key_value_map): |
| """Removes values from the given map that do not match the filter. |
| |
| When doing a scan in the datastore, only index values that match the filters |
| are seen. When multiple values that point to the same entity are seen, the |
| entity only appears where the first value is found. This function removes |
| all values that don't match the query so that the first value in the map |
| is the same one the datastore would see first. |
| |
| Args: |
| key_value_map: the comparable value map from which to remove |
| values. Does not need to contain values for all filtered properties. |
| |
| Returns: |
| A value that evaluates to False if every value in a single list was |
| completely removed. This effectively applies the filter but is less |
| efficient than _apply(). |
| """ |
| raise NotImplementedError |
| |
| def _to_pb(self): |
| """Internal only function to generate a pb.""" |
| raise NotImplementedError( |
| 'This filter only supports in memory operations (%r)' % self) |
| |
| def _to_pbs(self): |
| """Internal only function to generate a list of pbs.""" |
| return [self._to_pb()] |
| |
| |
| class _SinglePropertyFilter(FilterPredicate): |
| """Base class for a filter that operates on a single property.""" |
| |
| def _get_prop_name(self): |
| """Returns the name of the property being filtered.""" |
| raise NotImplementedError |
| |
| def _apply_to_value(self, value): |
| """Apply the filter to the given value. |
| |
| Args: |
| value: The comparable value to check. |
| |
| Returns: |
| A boolean indicating if the given value matches the filter. |
| """ |
| raise NotImplementedError |
| |
| def _get_prop_names(self): |
| return set([self._get_prop_name()]) |
| |
| def _apply(self, value_map): |
| for other_value in value_map[self._get_prop_name()]: |
| if self._apply_to_value(other_value): |
| return True |
| return False |
| |
| def _prune(self, value_map): |
| |
| |
| |
| |
| if self._get_prop_name() not in value_map: |
| return True |
| values = [value for value in value_map[self._get_prop_name()] |
| if self._apply_to_value(value)] |
| value_map[self._get_prop_name()] = values |
| return bool(values) |
| |
| |
| class PropertyFilter(_SinglePropertyFilter): |
| """An immutable filter predicate that constrains a single property.""" |
| |
| _OPERATORS = { |
| '<': datastore_pb.Query_Filter.LESS_THAN, |
| '<=': datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL, |
| '>': datastore_pb.Query_Filter.GREATER_THAN, |
| '>=': datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL, |
| '=': datastore_pb.Query_Filter.EQUAL, |
| } |
| |
| _OPERATORS_INVERSE = dict((value, key) |
| for key, value in _OPERATORS.iteritems()) |
| |
| _OPERATORS_TO_PYTHON_OPERATOR = { |
| datastore_pb.Query_Filter.LESS_THAN: '<', |
| datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=', |
| datastore_pb.Query_Filter.GREATER_THAN: '>', |
| datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=', |
| datastore_pb.Query_Filter.EQUAL: '==', |
| } |
| |
| _INEQUALITY_OPERATORS = frozenset(['<', '<=', '>', '>=']) |
| |
| _INEQUALITY_OPERATORS_ENUM = frozenset([ |
| datastore_pb.Query_Filter.LESS_THAN, |
| datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL, |
| datastore_pb.Query_Filter.GREATER_THAN, |
| datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL, |
| ]) |
| |
| _UPPERBOUND_INEQUALITY_OPERATORS = frozenset(['<', '<=']) |
| |
| def __init__(self, op, value): |
| """Constructor. |
| |
| Args: |
| op: A string representing the operator to use. |
| value: A entity_pb.Property, the property and value to compare against. |
| |
| Raises: |
| datastore_errors.BadArgumentError if op has an unsupported value or value |
| is not an entity_pb.Property. |
| """ |
| if op not in self._OPERATORS: |
| raise datastore_errors.BadArgumentError('unknown operator: %r' % (op,)) |
| if not isinstance(value, entity_pb.Property): |
| raise datastore_errors.BadArgumentError( |
| 'value argument should be entity_pb.Property (%r)' % (value,)) |
| |
| super(PropertyFilter, self).__init__() |
| self._filter = datastore_pb.Query_Filter() |
| self._filter.set_op(self._OPERATORS[op]) |
| self._filter.add_property().CopyFrom(value) |
| |
| @property |
| def op(self): |
| raw_op = self._filter.op() |
| return self._OPERATORS_INVERSE.get(raw_op, str(raw_op)) |
| |
| @property |
| def value(self): |
| |
| return self._filter.property(0) |
| |
| def __repr__(self): |
| prop = self.value |
| name = prop.name() |
| value = datastore_types.FromPropertyPb(prop) |
| return '%s(%r, <%r, %r>)' % (self.__class__.__name__, self.op, name, value) |
| |
| def _get_prop_name(self): |
| return self._filter.property(0).name() |
| |
| def _apply_to_value(self, value): |
| if not hasattr(self, '_cmp_value'): |
| if self._filter.op() == datastore_pb.Query_Filter.EXISTS: |
| |
| return True |
| self._cmp_value = datastore_types.PropertyValueToKeyValue( |
| self._filter.property(0).value()) |
| self._condition = ('value %s self._cmp_value' % |
| self._OPERATORS_TO_PYTHON_OPERATOR[self._filter.op()]) |
| return eval(self._condition) |
| |
| def _has_inequality(self): |
| """Returns True if the filter predicate contains inequalities filters.""" |
| return self._filter.op() in self._INEQUALITY_OPERATORS_ENUM |
| |
| @classmethod |
| def _from_pb(cls, filter_pb): |
| |
| self = cls.__new__(cls) |
| self._filter = filter_pb |
| return self |
| |
| def _to_pb(self): |
| """Returns the internal only pb representation.""" |
| return self._filter |
| |
| def __getstate__(self): |
| raise pickle.PicklingError( |
| 'Pickling of datastore_query.PropertyFilter is unsupported.') |
| |
| def __eq__(self, other): |
| |
| |
| if self.__class__ is not other.__class__: |
| if other.__class__ is _PropertyRangeFilter: |
| return [self._filter] == other._to_pbs() |
| return NotImplemented |
| return self._filter == other._filter |
| |
| |
| class _PropertyRangeFilter(_SinglePropertyFilter): |
| """A filter predicate that represents a range of values. |
| |
| Since we allow multi-valued properties there is a large difference between |
| "x > 0 AND x < 1" and "0 < x < 1." An entity with x = [-1, 2] will match the |
| first but not the second. |
| |
| Since the datastore only allows a single inequality filter, multiple |
| in-equality filters are merged into a single range filter in the |
| datastore (unlike equality filters). This class is used by |
| datastore_query.CompositeFilter to implement the same logic. |
| """ |
| |
| _start_key_value = None |
| _end_key_value = None |
| |
| @datastore_rpc._positional(1) |
| def __init__(self, start=None, start_incl=True, end=None, end_incl=True): |
| """Constructs a range filter using start and end properties. |
| |
| Args: |
| start: A entity_pb.Property to use as a lower bound or None to indicate |
| no lower bound. |
| start_incl: A boolean that indicates if the lower bound is inclusive. |
| end: A entity_pb.Property to use as an upper bound or None to indicate |
| no upper bound. |
| end_incl: A boolean that indicates if the upper bound is inclusive. |
| """ |
| if start is not None and not isinstance(start, entity_pb.Property): |
| raise datastore_errors.BadArgumentError( |
| 'start argument should be entity_pb.Property (%r)' % (start,)) |
| if end is not None and not isinstance(end, entity_pb.Property): |
| raise datastore_errors.BadArgumentError( |
| 'start argument should be entity_pb.Property (%r)' % (end,)) |
| if start and end and start.name() != end.name(): |
| raise datastore_errors.BadArgumentError( |
| 'start and end arguments must be on the same property (%s != %s)' % |
| (start.name(), end.name())) |
| if not start and not end: |
| raise datastore_errors.BadArgumentError( |
| 'Unbounded ranges are not supported.') |
| |
| super(_PropertyRangeFilter, self).__init__() |
| self._start = start |
| self._start_incl = start_incl |
| self._end = end |
| self._end_incl = end_incl |
| |
| @classmethod |
| def from_property_filter(cls, prop_filter): |
| op = prop_filter._filter.op() |
| if op == datastore_pb.Query_Filter.GREATER_THAN: |
| return cls(start=prop_filter._filter.property(0), start_incl=False) |
| elif op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: |
| return cls(start=prop_filter._filter.property(0)) |
| elif op == datastore_pb.Query_Filter.LESS_THAN: |
| return cls(end=prop_filter._filter.property(0), end_incl=False) |
| elif op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: |
| return cls(end=prop_filter._filter.property(0)) |
| else: |
| raise datastore_errors.BadArgumentError( |
| 'Unsupported operator (%s)' % (op,)) |
| |
| def intersect(self, other): |
| """Returns a filter representing the intersection of self and other.""" |
| if isinstance(other, PropertyFilter): |
| other = self.from_property_filter(other) |
| elif not isinstance(other, _PropertyRangeFilter): |
| raise datastore_errors.BadArgumentError( |
| 'other argument should be a _PropertyRangeFilter (%r)' % (other,)) |
| |
| if other._get_prop_name() != self._get_prop_name(): |
| raise datastore_errors.BadArgumentError( |
| 'other argument must be on the same property (%s != %s)' % |
| (other._get_prop_name(), self._get_prop_name())) |
| |
| start_source = None |
| if other._start: |
| if self._start: |
| result = cmp(self._get_start_key_value(), other._get_start_key_value()) |
| if result == 0: |
| result = cmp(other._start_incl, self._start_incl) |
| if result > 0: |
| start_source = self |
| elif result < 0: |
| start_source = other |
| else: |
| start_source = other |
| elif self._start: |
| start_source = self |
| |
| end_source = None |
| if other._end: |
| if self._end: |
| result = cmp(self._get_end_key_value(), other._get_end_key_value()) |
| if result == 0: |
| result = cmp(self._end_incl, other._end_incl) |
| if result < 0: |
| end_source = self |
| elif result > 0: |
| end_source = other |
| else: |
| end_source = other |
| elif self._end: |
| end_source = self |
| |
| if start_source: |
| if end_source in (start_source, None): |
| return start_source |
| |
| result = _PropertyRangeFilter(start=start_source._start, |
| start_incl=start_source._start_incl, |
| end=end_source._end, |
| end_incl=end_source._end_incl) |
| |
| result._start_key_value = start_source._start_key_value |
| result._end_key_value = end_source._end_key_value |
| return result |
| else: |
| return end_source or self |
| |
| def _get_start_key_value(self): |
| if self._start_key_value is None: |
| self._start_key_value = datastore_types.PropertyValueToKeyValue( |
| self._start.value()) |
| return self._start_key_value |
| |
| def _get_end_key_value(self): |
| if self._end_key_value is None: |
| self._end_key_value = datastore_types.PropertyValueToKeyValue( |
| self._end.value()) |
| return self._end_key_value |
| |
| def _apply_to_value(self, value): |
| """Apply the filter to the given value. |
| |
| Args: |
| value: The comparable value to check. |
| |
| Returns: |
| A boolean indicating if the given value matches the filter. |
| """ |
| if self._start: |
| result = cmp(self._get_start_key_value(), value) |
| if result > 0 or (result == 0 and not self._start_incl): |
| return False |
| |
| if self._end: |
| result = cmp(self._get_end_key_value(), value) |
| if result < 0 or (result == 0 and not self._end_incl): |
| return False |
| |
| return True |
| |
| def _get_prop_name(self): |
| if self._start: |
| return self._start.name() |
| if self._end: |
| return self._end.name() |
| assert False |
| |
| def _to_pbs(self): |
| pbs = [] |
| if self._start: |
| if self._start_incl: |
| op = datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL |
| else: |
| op = datastore_pb.Query_Filter.GREATER_THAN |
| pb = datastore_pb.Query_Filter() |
| pb.set_op(op) |
| pb.add_property().CopyFrom(self._start) |
| pbs.append(pb) |
| |
| if self._end: |
| if self._end_incl: |
| op = datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL |
| else: |
| op = datastore_pb.Query_Filter.LESS_THAN |
| pb = datastore_pb.Query_Filter() |
| pb.set_op(op) |
| pb.add_property().CopyFrom(self._end) |
| pbs.append(pb) |
| |
| return pbs |
| |
| def __getstate__(self): |
| raise pickle.PicklingError( |
| 'Pickling of %r is unsupported.' % self) |
| |
| def __eq__(self, other): |
| |
| |
| if self.__class__ is not other.__class__: |
| return NotImplemented |
| return (self._start == other._start and |
| self._end == other._end and |
| (self._start_incl == other._start_incl or self._start is None) and |
| (self._end_incl == other._end_incl or self._end is None)) |
| |
| |
| class _PropertyExistsFilter(FilterPredicate): |
| """A FilterPredicate that matches entities containing specific properties. |
| |
| Only works as an in-memory filter. Used internally to filter out entities |
| that don't have all properties in a given Order. |
| """ |
| |
| def __init__(self, names): |
| super(_PropertyExistsFilter, self).__init__() |
| self._names = frozenset(names) |
| |
| def _apply(self, value_map): |
| for name in self._names: |
| if not value_map.get(name): |
| return False |
| return True |
| |
| def _get_prop_names(self): |
| return self._names |
| |
| def _prune(self, _): |
| |
| raise NotImplementedError |
| |
| def __getstate__(self): |
| raise pickle.PicklingError( |
| 'Pickling of %r is unsupported.' % self) |
| |
| |
| class CorrelationFilter(FilterPredicate): |
| """A filter that isolates correlated values and applies a sub-filter on them. |
| |
| This filter assumes that every property used by the sub-filter should be |
| grouped before being passed to the sub-filter. The default grouping puts |
| each value in its own group. Consider: |
| e = {a: [1, 2], b: [2, 1, 3], c: 4} |
| |
| A correlation filter with a sub-filter that operates on (a, b) will be tested |
| against the following 3 sets of values: |
| {a: 1, b: 2} |
| {a: 2, b: 1} |
| {b: 3} |
| |
| In this case CorrelationFilter('a = 2 AND b = 2') won't match this entity but |
| CorrelationFilter('a = 2 AND b = 1') will. To apply an uncorrelated filter on |
| c, the filter must be applied in parallel to the correlation filter. For |
| example: |
| CompositeFilter(AND, [CorrelationFilter('a = 2 AND b = 1'), 'c = 3']) |
| |
| If 'c = 3' was included in the correlation filter, c would be grouped as well. |
| This would result in the following values: |
| {a: 1, b: 2, c: 3} |
| {a: 2, b: 1} |
| {b: 3} |
| |
| If any set of correlated values match the sub-filter then the entity matches |
| the correlation filter. |
| """ |
| |
| def __init__(self, subfilter): |
| """Constructor. |
| |
| Args: |
| subfilter: A FilterPredicate to apply to the correlated values |
| """ |
| self._subfilter = subfilter |
| |
| @property |
| def subfilter(self): |
| return self._subfilter |
| |
| def __repr__(self): |
| return '%s(%r)' % (self.__class__.__name__, self.subfilter) |
| |
| def _apply(self, value_map): |
| |
| |
| base_map = dict((prop, []) for prop in self._get_prop_names()) |
| |
| |
| value_maps = [] |
| for prop in base_map: |
| |
| grouped = self._group_values(prop, value_map[prop]) |
| |
| while len(value_maps) < len(grouped): |
| value_maps.append(base_map.copy()) |
| |
| for value, map in zip(grouped, value_maps): |
| map[prop] = value |
| |
| return self._apply_correlated(value_maps) |
| |
| def _apply_correlated(self, value_maps): |
| """Applies sub-filter to the correlated value maps. |
| |
| The default implementation matches when any value_map in value_maps |
| matches the sub-filter. |
| |
| Args: |
| value_maps: A list of correlated value_maps. |
| Returns: |
| True if any the entity matches the correlation filter. |
| """ |
| |
| for map in value_maps: |
| if self._subfilter._apply(map): |
| return True |
| return False |
| |
| def _group_values(self, prop, values): |
| """A function that groups the given values. |
| |
| Override this function to introduce custom grouping logic. The default |
| implementation assumes each value belongs in its own group. |
| |
| Args: |
| prop: The name of the property who's values are being grouped. |
| values: A list of opaque values. |
| |
| Returns: |
| A list of lists of grouped values. |
| """ |
| return [[value] for value in values] |
| |
| def _get_prop_names(self): |
| return self._subfilter._get_prop_names() |
| |
| |
| class CompositeFilter(FilterPredicate): |
| """An immutable filter predicate that combines other predicates. |
| |
| This class proactively merges sub-filters that are combined using the same |
| operator. For example: |
| CompositeFilter(AND, [f1, f2, CompositeFilter(AND, [f3, f4]), f5, f6]) |
| is equivalent to: |
| CompositeFilter(AND, [f1, f2, f3, f4, f5, f6]) |
| |
| Currently filters can only be combined using an AND operator. |
| """ |
| |
| AND = 'and' |
| _OPERATORS = frozenset([AND]) |
| |
| def __init__(self, op, filters): |
| """Constructor. |
| |
| Args: |
| op: The operator to use to combine the given filters |
| filters: A list of one or more filters to combine |
| |
| Raises: |
| datastore_errors.BadArgumentError if op is not in CompsiteFilter.OPERATORS |
| or filters is not a non-empty list containing only FilterPredicates. |
| """ |
| if not op in self._OPERATORS: |
| raise datastore_errors.BadArgumentError('unknown operator (%s)' % (op,)) |
| if not filters or not isinstance(filters, (list, tuple)): |
| raise datastore_errors.BadArgumentError( |
| 'filters argument should be a non-empty list (%r)' % (filters,)) |
| |
| super(CompositeFilter, self).__init__() |
| self._op = op |
| flattened = [] |
| |
| |
| for f in filters: |
| if isinstance(f, CompositeFilter) and f._op == self._op: |
| |
| |
| flattened.extend(f._filters) |
| elif isinstance(f, FilterPredicate): |
| flattened.append(f) |
| else: |
| raise datastore_errors.BadArgumentError( |
| 'filters argument must be a list of FilterPredicates, found (%r)' % |
| (f,)) |
| |
| |
| if op == self.AND: |
| filters = flattened |
| flattened = [] |
| ineq_map = {} |
| |
| for f in filters: |
| if (isinstance(f, _PropertyRangeFilter) or |
| (isinstance(f, PropertyFilter) and f._has_inequality())): |
| name = f._get_prop_name() |
| index = ineq_map.get(name) |
| if index is not None: |
| range_filter = flattened[index] |
| flattened[index] = range_filter.intersect(f) |
| else: |
| if isinstance(f, PropertyFilter): |
| range_filter = _PropertyRangeFilter.from_property_filter(f) |
| else: |
| range_filter = f |
| ineq_map[name] = len(flattened) |
| flattened.append(range_filter) |
| else: |
| flattened.append(f) |
| |
| self._filters = tuple(flattened) |
| |
| @property |
| def op(self): |
| return self._op |
| |
| @property |
| def filters(self): |
| return self._filters |
| |
| def __repr__(self): |
| op = self.op |
| if op == self.AND: |
| op = 'AND' |
| else: |
| op = str(op) |
| return '%s(%s, %r)' % (self.__class__.__name__, op, list(self.filters)) |
| |
| def _get_prop_names(self): |
| names = set() |
| for f in self._filters: |
| names |= f._get_prop_names() |
| return names |
| |
| def _apply(self, value_map): |
| if self._op == self.AND: |
| for f in self._filters: |
| if not f._apply(value_map): |
| return False |
| return True |
| raise NotImplementedError |
| |
| def _prune(self, value_map): |
| |
| |
| |
| if self._op == self.AND: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| matches = collections.defaultdict(set) |
| for f in self._filters: |
| props = f._get_prop_names() |
| local_value_map = dict((k, v) for k, v in value_map.iteritems() |
| if k in props) |
| |
| if not f._prune(local_value_map): |
| return False |
| |
| |
| for (prop, values) in local_value_map.iteritems(): |
| matches[prop].update(values) |
| |
| |
| for prop, value_set in matches.iteritems(): |
| |
| value_map[prop] = sorted(value_set) |
| return True |
| raise NotImplementedError |
| |
| def _to_pbs(self): |
| """Returns the internal only pb representation.""" |
| |
| |
| |
| pbs = [] |
| for f in self._filters: |
| pbs.extend(f._to_pbs()) |
| return pbs |
| |
| def __eq__(self, other): |
| if self.__class__ is other.__class__: |
| return super(CompositeFilter, self).__eq__(other) |
| |
| |
| if len(self._filters) == 1: |
| result = self._filters[0].__eq__(other) |
| if result is NotImplemented and hasattr(other, '__eq__'): |
| return other.__eq__(self._filters[0]) |
| return result |
| return NotImplemented |
| |
| |
| class _IgnoreFilter(_SinglePropertyFilter): |
| """A filter that removes all entities with the given keys.""" |
| |
| def __init__(self, key_value_set): |
| super(_IgnoreFilter, self).__init__() |
| self._keys = key_value_set |
| |
| def _get_prop_name(self): |
| return datastore_types.KEY_SPECIAL_PROPERTY |
| |
| def _apply_to_value(self, value): |
| return value not in self._keys |
| |
| |
| class _DedupingFilter(_IgnoreFilter): |
| """A filter that removes duplicate keys.""" |
| |
| def __init__(self, key_value_set=None): |
| super(_DedupingFilter, self).__init__(key_value_set or set()) |
| |
| def _apply_to_value(self, value): |
| if super(_DedupingFilter, self)._apply_to_value(value): |
| self._keys.add(value) |
| return True |
| return False |
| |
| |
| class Order(_PropertyComponent): |
| """A base class that represents a sort order on a query. |
| |
| All sub-classes must be immutable as these are often stored without creating a |
| defensive copying. |
| |
| This class can be used as either the cmp or key arg in sorted() or |
| list.sort(). To provide a stable ordering a trailing key ascending order is |
| always used. |
| """ |
| |
| @datastore_rpc._positional(1) |
| def reversed(self, group_by=None): |
| """Constructs an order representing the reverse of the current order. |
| |
| This function takes into account the effects of orders on properties not in |
| the group_by clause of a query. For example, consider: |
| SELECT A, First(B) ... GROUP BY A ORDER BY A, B |
| Changing the order of B would effect which value is listed in the 'First(B)' |
| column which would actually change the results instead of just reversing |
| them. |
| |
| Args: |
| group_by: If specified, only orders on properties in group_by will be |
| reversed. |
| |
| Returns: |
| A new order representing the reverse direction. |
| """ |
| raise NotImplementedError |
| |
| def _key(self, lhs_value_map): |
| """Creates a key for the given value map.""" |
| raise NotImplementedError |
| |
| def _cmp(self, lhs_value_map, rhs_value_map): |
| """Compares the given value maps.""" |
| raise NotImplementedError |
| |
| def _to_pb(self): |
| """Internal only function to generate a filter pb.""" |
| raise NotImplementedError |
| |
| def key_for_filter(self, filter_predicate): |
| if filter_predicate: |
| return lambda x: self.key(x, filter_predicate) |
| return self.key |
| |
| def cmp_for_filter(self, filter_predicate): |
| if filter_predicate: |
| return lambda x, y: self.cmp(x, y, filter_predicate) |
| return self.cmp |
| |
| def key(self, entity, filter_predicate=None): |
| """Constructs a "key" value for the given entity based on the current order. |
| |
| This function can be used as the key argument for list.sort() and sorted(). |
| |
| Args: |
| entity: The entity_pb.EntityProto to convert |
| filter_predicate: A FilterPredicate used to prune values before comparing |
| entities or None. |
| |
| Returns: |
| A key value that identifies the position of the entity when sorted by |
| the current order. |
| """ |
| names = self._get_prop_names() |
| names.add(datastore_types.KEY_SPECIAL_PROPERTY) |
| if filter_predicate is not None: |
| names |= filter_predicate._get_prop_names() |
| |
| value_map = _make_key_value_map(entity, names) |
| if filter_predicate is not None: |
| filter_predicate._prune(value_map) |
| return (self._key(value_map), |
| value_map[datastore_types.KEY_SPECIAL_PROPERTY]) |
| |
| def cmp(self, lhs, rhs, filter_predicate=None): |
| """Compares the given values taking into account any filters. |
| |
| This function can be used as the cmp argument for list.sort() and sorted(). |
| |
| This function is slightly more efficient that Order.key when comparing two |
| entities, however it is much less efficient when sorting a list of entities. |
| |
| Args: |
| lhs: An entity_pb.EntityProto |
| rhs: An entity_pb.EntityProto |
| filter_predicate: A FilterPredicate used to prune values before comparing |
| entities or None. |
| |
| Returns: |
| An integer <, = or > 0 representing the operator that goes in between lhs |
| and rhs that to create a true statement. |
| |
| """ |
| names = self._get_prop_names() |
| if filter_predicate is not None: |
| names |= filter_predicate._get_prop_names() |
| |
| lhs_value_map = _make_key_value_map(lhs, names) |
| rhs_value_map = _make_key_value_map(rhs, names) |
| if filter_predicate is not None: |
| filter_predicate._prune(lhs_value_map) |
| filter_predicate._prune(rhs_value_map) |
| result = self._cmp(lhs_value_map, rhs_value_map) |
| if result: |
| return result |
| |
| if not lhs.has_key() and not rhs.has_key(): |
| return 0 |
| |
| |
| |
| lhs_key = (lhs_value_map.get(datastore_types.KEY_SPECIAL_PROPERTY) or |
| datastore_types.ReferenceToKeyValue(lhs.key())) |
| rhs_key = (rhs_value_map.get(datastore_types.KEY_SPECIAL_PROPERTY) or |
| datastore_types.ReferenceToKeyValue(rhs.key())) |
| |
| return cmp(lhs_key, rhs_key) |
| |
| |
| class _ReverseOrder(_BaseComponent): |
| """Reverses the comparison for the given object.""" |
| |
| def __init__(self, obj): |
| """Constructor for _ReverseOrder. |
| |
| Args: |
| obj: Any comparable and hashable object. |
| """ |
| super(_ReverseOrder, self).__init__() |
| self._obj = obj |
| |
| def __hash__(self): |
| return hash(self._obj) |
| |
| def __cmp__(self, other): |
| assert self.__class__ == other.__class__, ( |
| 'A datastore_query._ReverseOrder object can only be compared to ' |
| 'an object of the same type.') |
| return -cmp(self._obj, other._obj) |
| |
| |
| class PropertyOrder(Order): |
| """An immutable class that represents a sort order for a single property.""" |
| |
| ASCENDING = datastore_pb.Query_Order.ASCENDING |
| DESCENDING = datastore_pb.Query_Order.DESCENDING |
| _DIRECTIONS = frozenset([ASCENDING, DESCENDING]) |
| |
| def __init__(self, prop, direction=ASCENDING): |
| """Constructor. |
| |
| Args: |
| prop: the name of the prop by which to sort. |
| direction: the direction in which to sort the given prop. |
| |
| Raises: |
| datastore_errors.BadArgumentError if the prop name or direction is |
| invalid. |
| """ |
| datastore_types.ValidateString(prop, |
| 'prop', |
| datastore_errors.BadArgumentError) |
| if not direction in self._DIRECTIONS: |
| raise datastore_errors.BadArgumentError('unknown direction: %r' % |
| (direction,)) |
| super(PropertyOrder, self).__init__() |
| self.__order = datastore_pb.Query_Order() |
| self.__order.set_property(prop.encode('utf-8')) |
| self.__order.set_direction(direction) |
| |
| @property |
| def prop(self): |
| return self.__order.property() |
| |
| @property |
| def direction(self): |
| return self.__order.direction() |
| |
| def __repr__(self): |
| name = self.prop |
| direction = self.direction |
| extra = '' |
| if direction == self.DESCENDING: |
| extra = ', DESCENDING' |
| name = repr(name).encode('utf-8')[1:-1] |
| return '%s(<%s>%s)' % (self.__class__.__name__, name, extra) |
| |
| @datastore_rpc._positional(1) |
| def reversed(self, group_by=None): |
| if group_by and self.__order.property() not in group_by: |
| return self |
| |
| if self.__order.direction() == self.ASCENDING: |
| return PropertyOrder(self.__order.property().decode('utf-8'), |
| self.DESCENDING) |
| else: |
| return PropertyOrder(self.__order.property().decode('utf-8'), |
| self.ASCENDING) |
| |
| def _get_prop_names(self): |
| return set([self.__order.property()]) |
| |
| def _key(self, lhs_value_map): |
| lhs_values = lhs_value_map[self.__order.property()] |
| if not lhs_values: |
| raise datastore_errors.BadArgumentError( |
| 'Missing value for property (%s)' % self.__order.property()) |
| |
| if self.__order.direction() == self.ASCENDING: |
| return min(lhs_values) |
| else: |
| return _ReverseOrder(max(lhs_values)) |
| |
| def _cmp(self, lhs_value_map, rhs_value_map): |
| lhs_values = lhs_value_map[self.__order.property()] |
| rhs_values = rhs_value_map[self.__order.property()] |
| |
| if not lhs_values and not rhs_values: |
| return 0 |
| |
| if not lhs_values: |
| raise datastore_errors.BadArgumentError( |
| 'LHS missing value for property (%s)' % self.__order.property()) |
| |
| if not rhs_values: |
| raise datastore_errors.BadArgumentError( |
| 'RHS missing value for property (%s)' % self.__order.property()) |
| |
| if self.__order.direction() == self.ASCENDING: |
| return cmp(min(lhs_values), min(rhs_values)) |
| else: |
| return cmp(max(rhs_values), max(lhs_values)) |
| |
| @classmethod |
| def _from_pb(cls, order_pb): |
| |
| self = cls.__new__(cls) |
| self.__order = order_pb |
| return self |
| |
| def _to_pb(self): |
| """Returns the internal only pb representation.""" |
| return self.__order |
| |
| def __getstate__(self): |
| raise pickle.PicklingError( |
| 'Pickling of datastore_query.PropertyOrder is unsupported.') |
| |
| |
| class CompositeOrder(Order): |
| """An immutable class that represents a sequence of Orders. |
| |
| This class proactively flattens sub-orders that are of type CompositeOrder. |
| For example: |
| CompositeOrder([O1, CompositeOrder([02, 03]), O4]) |
| is equivalent to: |
| CompositeOrder([O1, 02, 03, O4]) |
| """ |
| |
| def __init__(self, orders): |
| """Constructor. |
| |
| Args: |
| orders: A list of Orders which are applied in order. |
| """ |
| if not isinstance(orders, (list, tuple)): |
| raise datastore_errors.BadArgumentError( |
| 'orders argument should be list or tuple (%r)' % (orders,)) |
| |
| super(CompositeOrder, self).__init__() |
| flattened = [] |
| for order in orders: |
| if isinstance(order, CompositeOrder): |
| flattened.extend(order._orders) |
| elif isinstance(order, Order): |
| flattened.append(order) |
| else: |
| raise datastore_errors.BadArgumentError( |
| 'orders argument should only contain Order (%r)' % (order,)) |
| self._orders = tuple(flattened) |
| |
| @property |
| def orders(self): |
| return self._orders |
| |
| def __repr__(self): |
| return '%s(%r)' % (self.__class__.__name__, list(self.orders)) |
| |
| @datastore_rpc._positional(1) |
| def reversed(self, group_by=None): |
| return CompositeOrder([order.reversed(group_by=group_by) |
| for order in self._orders]) |
| |
| def _get_prop_names(self): |
| names = set() |
| for order in self._orders: |
| names |= order._get_prop_names() |
| return names |
| |
| def _key(self, lhs_value_map): |
| result = [] |
| for order in self._orders: |
| result.append(order._key(lhs_value_map)) |
| return tuple(result) |
| |
| def _cmp(self, lhs_value_map, rhs_value_map): |
| for order in self._orders: |
| result = order._cmp(lhs_value_map, rhs_value_map) |
| if result != 0: |
| return result |
| return 0 |
| |
| def size(self): |
| """Returns the number of sub-orders the instance contains.""" |
| return len(self._orders) |
| |
| def _to_pbs(self): |
| """Returns an ordered list of internal only pb representations.""" |
| return [order._to_pb() for order in self._orders] |
| |
| def __eq__(self, other): |
| if self.__class__ is other.__class__: |
| return super(CompositeOrder, self).__eq__(other) |
| |
| |
| if len(self._orders) == 1: |
| result = self._orders[0].__eq__(other) |
| if result is NotImplemented and hasattr(other, '__eq__'): |
| return other.__eq__(self._orders[0]) |
| return result |
| |
| return NotImplemented |
| |
| |
| class FetchOptions(datastore_rpc.Configuration): |
| """An immutable class that contains all options for fetching results. |
| |
| These options apply to any request that pulls results from a query. |
| |
| This class reserves the right to define configuration options of any name |
| except those that start with 'user_'. External subclasses should only define |
| function or variables with names that start with in 'user_'. |
| |
| Options are set by passing keyword arguments to the constructor corresponding |
| to the configuration options defined below and in datastore_rpc.Configuration. |
| |
| This object can be used as the default config for a datastore_rpc.Connection |
| but in that case some options will be ignored, see option documentation below |
| for details. |
| """ |
| |
| @datastore_rpc.ConfigOption |
| def produce_cursors(value): |
| """If a Cursor should be returned with the fetched results. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is not a bool. |
| """ |
| if not isinstance(value, bool): |
| raise datastore_errors.BadArgumentError( |
| 'produce_cursors argument should be bool (%r)' % (value,)) |
| return value |
| |
| @datastore_rpc.ConfigOption |
| def offset(value): |
| """The number of results to skip before returning the first result. |
| |
| Only applies to the first request it is used with and is ignored if present |
| on datastore_rpc.Connection.config. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is not a integer or is less |
| than zero. |
| """ |
| datastore_types.ValidateInteger(value, |
| 'offset', |
| datastore_errors.BadArgumentError, |
| zero_ok=True) |
| return value |
| |
| @datastore_rpc.ConfigOption |
| def batch_size(value): |
| """The number of results to attempt to retrieve in a batch. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is not a integer or is not |
| greater than zero. |
| """ |
| datastore_types.ValidateInteger(value, |
| 'batch_size', |
| datastore_errors.BadArgumentError) |
| return value |
| |
| |
| class QueryOptions(FetchOptions): |
| """An immutable class that contains all options for running a query. |
| |
| This class contains options that control execution process (deadline, |
| batch_size, read_policy, etc) and what part of the query results are returned |
| (keys_only, projection, offset, limit, etc) Options that control the contents |
| of the query results are specified on the datastore_query.Query directly. |
| |
| This class reserves the right to define configuration options of any name |
| except those that start with 'user_'. External subclasses should only define |
| function or variables with names that start with in 'user_'. |
| |
| Options are set by passing keyword arguments to the constructor corresponding |
| to the configuration options defined below and in FetchOptions and |
| datastore_rpc.Configuration. |
| |
| This object can be used as the default config for a datastore_rpc.Connection |
| but in that case some options will be ignored, see below for details. |
| """ |
| |
| |
| ORDER_FIRST = datastore_pb.Query.ORDER_FIRST |
| ANCESTOR_FIRST = datastore_pb.Query.ANCESTOR_FIRST |
| FILTER_FIRST = datastore_pb.Query.FILTER_FIRST |
| _HINTS = frozenset([ORDER_FIRST, ANCESTOR_FIRST, FILTER_FIRST]) |
| |
| @datastore_rpc.ConfigOption |
| def keys_only(value): |
| """If the query should only return keys. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is not a bool. |
| """ |
| if not isinstance(value, bool): |
| raise datastore_errors.BadArgumentError( |
| 'keys_only argument should be bool (%r)' % (value,)) |
| return value |
| |
| @datastore_rpc.ConfigOption |
| def projection(value): |
| """A list or tuple of property names to project. |
| |
| If None, the entire entity is returned. |
| |
| Specifying a projection: |
| - may change the index requirements for the given query; |
| - will cause a partial entity to be returned; |
| - will cause only entities that contain those properties to be returned; |
| |
| A partial entities only contain the property name and value for properties |
| in the projection (meaning and multiple will not be set). They will also |
| only contain a single value for any multi-valued property. However, if a |
| multi-valued property is specified in the order, an inequality property, or |
| the projected properties, the entity will be returned multiple times. Once |
| for each unique combination of values. |
| |
| However, projection queries are significantly faster than normal queries. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is empty or not a list or tuple |
| of strings. |
| """ |
| if isinstance(value, list): |
| value = tuple(value) |
| elif not isinstance(value, tuple): |
| raise datastore_errors.BadArgumentError( |
| 'projection argument should be a list or tuple (%r)' % (value,)) |
| if not value: |
| raise datastore_errors.BadArgumentError( |
| 'projection argument cannot be empty') |
| for prop in value: |
| if not isinstance(prop, basestring): |
| raise datastore_errors.BadArgumentError( |
| 'projection argument should contain only strings (%r)' % (prop,)) |
| |
| return value |
| |
| @datastore_rpc.ConfigOption |
| def limit(value): |
| """Limit on the number of results to return. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is not an integer or is less |
| than zero. |
| """ |
| datastore_types.ValidateInteger(value, |
| 'limit', |
| datastore_errors.BadArgumentError, |
| zero_ok=True) |
| return value |
| |
| @datastore_rpc.ConfigOption |
| def prefetch_size(value): |
| """Number of results to attempt to return on the initial request. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is not an integer or is not |
| greater than zero. |
| """ |
| datastore_types.ValidateInteger(value, |
| 'prefetch_size', |
| datastore_errors.BadArgumentError, |
| zero_ok=True) |
| return value |
| |
| @datastore_rpc.ConfigOption |
| def start_cursor(value): |
| """Cursor to use a start position. |
| |
| Ignored if present on datastore_rpc.Connection.config. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is not a Cursor. |
| """ |
| if not isinstance(value, Cursor): |
| raise datastore_errors.BadArgumentError( |
| 'start_cursor argument should be datastore_query.Cursor (%r)' % |
| (value,)) |
| return value |
| |
| @datastore_rpc.ConfigOption |
| def end_cursor(value): |
| """Cursor to use as an end position. |
| |
| Ignored if present on datastore_rpc.Connection.config. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is not a Cursor. |
| """ |
| if not isinstance(value, Cursor): |
| raise datastore_errors.BadArgumentError( |
| 'end_cursor argument should be datastore_query.Cursor (%r)' % |
| (value,)) |
| return value |
| |
| @datastore_rpc.ConfigOption |
| def hint(value): |
| """Hint on how the datastore should plan the query. |
| |
| Raises: |
| datastore_errors.BadArgumentError if value is not a known hint. |
| """ |
| if value not in QueryOptions._HINTS: |
| raise datastore_errors.BadArgumentError('Unknown query hint (%r)' % |
| (value,)) |
| return value |
| |
| |
| class Cursor(_BaseComponent): |
| """An immutable class that represents a relative position in a query. |
| |
| The position denoted by a Cursor is relative to a result in a query even |
| if the result has been removed from the given query. Usually to position |
| immediately after the last result returned by a batch. |
| |
| A cursor should only be used on a query with an identical signature to the |
| one that produced it. |
| """ |
| |
| @datastore_rpc._positional(1) |
| def __init__(self, _cursor_pb=None, urlsafe=None): |
| """Constructor. |
| |
| A Cursor constructed with no arguments points the first result of any |
| query. If such a Cursor is used as an end_cursor no results will ever be |
| returned. |
| """ |
| |
| |
| super(Cursor, self).__init__() |
| if urlsafe is not None: |
| if _cursor_pb is not None: |
| raise datastore_errors.BadArgumentError( |
| 'Do not use _cursor_pb and urlsafe together') |
| _cursor_pb = self._bytes_to_cursor_pb(self._urlsafe_to_bytes(urlsafe)) |
| if _cursor_pb is not None: |
| if not isinstance(_cursor_pb, datastore_pb.CompiledCursor): |
| raise datastore_errors.BadArgumentError( |
| '_cursor_pb argument should be datastore_pb.CompiledCursor (%r)' % |
| (_cursor_pb,)) |
| self.__compiled_cursor = _cursor_pb |
| else: |
| self.__compiled_cursor = datastore_pb.CompiledCursor() |
| |
| def __repr__(self): |
| arg = self.to_websafe_string() |
| if arg: |
| arg = '<%s>' % arg |
| return '%s(%s)' % (self.__class__.__name__, arg) |
| |
| def reversed(self): |
| """Creates a cursor for use in a query with a reversed sort order.""" |
| for pos in self.__compiled_cursor.position_list(): |
| if pos.has_start_key(): |
| raise datastore_errors.BadRequestError('Cursor cannot be reversed.') |
| |
| rev_pb = datastore_pb.CompiledCursor() |
| rev_pb.CopyFrom(self.__compiled_cursor) |
| for pos in rev_pb.position_list(): |
| pos.set_start_inclusive(not pos.start_inclusive()) |
| return Cursor(_cursor_pb=rev_pb) |
| |
| def to_bytes(self): |
| """Serialize cursor as a byte string.""" |
| return self.__compiled_cursor.Encode() |
| |
| @staticmethod |
| def from_bytes(cursor): |
| """Gets a Cursor given its byte string serialized form. |
| |
| The serialized form of a cursor may change in a non-backwards compatible |
| way. In this case cursors must be regenerated from a new Query request. |
| |
| Args: |
| cursor: A serialized cursor as returned by .to_bytes. |
| |
| Returns: |
| A Cursor. |
| |
| Raises: |
| datastore_errors.BadValueError if the cursor argument does not represent a |
| serialized cursor. |
| """ |
| cursor_pb = Cursor._bytes_to_cursor_pb(cursor) |
| return Cursor(_cursor_pb=cursor_pb) |
| |
| @staticmethod |
| def _bytes_to_cursor_pb(cursor): |
| |
| try: |
| cursor_pb = datastore_pb.CompiledCursor(cursor) |
| except (ValueError, TypeError), e: |
| raise datastore_errors.BadValueError( |
| 'Invalid cursor (%r). Details: %s' % (cursor, e)) |
| except Exception, e: |
| |
| |
| |
| |
| |
| |
| if e.__class__.__name__ == 'ProtocolBufferDecodeError': |
| raise datastore_errors.BadValueError( |
| 'Invalid cursor %s. Details: %s' % (cursor, e)) |
| else: |
| raise |
| return cursor_pb |
| |
| def urlsafe(self): |
| """Serialize cursor as a websafe string. |
| |
| Returns: |
| A base64-encoded serialized cursor. |
| """ |
| return base64.urlsafe_b64encode(self.to_bytes()) |
| to_websafe_string = urlsafe |
| |
| @staticmethod |
| def from_websafe_string(cursor): |
| """Gets a Cursor given its websafe serialized form. |
| |
| The serialized form of a cursor may change in a non-backwards compatible |
| way. In this case cursors must be regenerated from a new Query request. |
| |
| Args: |
| cursor: A serialized cursor as returned by .to_websafe_string. |
| |
| Returns: |
| A Cursor. |
| |
| Raises: |
| datastore_errors.BadValueError if the cursor argument is not a string |
| type of does not represent a serialized cursor. |
| """ |
| decoded_bytes = Cursor._urlsafe_to_bytes(cursor) |
| return Cursor.from_bytes(decoded_bytes) |
| |
| @staticmethod |
| def _urlsafe_to_bytes(cursor): |
| |
| if not isinstance(cursor, basestring): |
| raise datastore_errors.BadValueError( |
| 'cursor argument should be str or unicode (%r)' % (cursor,)) |
| |
| try: |
| |
| |
| decoded_bytes = base64.b64decode(str(cursor).replace('-', '+').replace('_', '/')) |
| except (ValueError, TypeError), e: |
| raise datastore_errors.BadValueError( |
| 'Invalid cursor %s. Details: %s' % (cursor, e)) |
| return decoded_bytes |
| |
| @staticmethod |
| def _from_query_result(query_result): |
| if query_result.has_compiled_cursor(): |
| return Cursor(_cursor_pb=query_result.compiled_cursor()) |
| return None |
| |
| def advance(self, offset, query, conn): |
| """Advances a Cursor by the given offset. |
| |
| Args: |
| offset: The amount to advance the current query. |
| query: A Query identical to the one this cursor was created from. |
| conn: The datastore_rpc.Connection to use. |
| |
| Returns: |
| A new cursor that is advanced by offset using the given query. |
| """ |
| datastore_types.ValidateInteger(offset, |
| 'offset', |
| datastore_errors.BadArgumentError) |
| if not isinstance(query, Query): |
| raise datastore_errors.BadArgumentError( |
| 'query argument should be datastore_query.Query (%r)' % (query,)) |
| |
| query_options = QueryOptions( |
| start_cursor=self, offset=offset, limit=0, produce_cursors=True) |
| return query.run(conn, query_options).next_batch(0).cursor(0) |
| |
| def _to_pb(self): |
| """Returns the internal only pb representation.""" |
| return self.__compiled_cursor |
| |
| |
| class _QueryKeyFilter(_BaseComponent): |
| """A class that implements the key filters available on a Query.""" |
| |
| @datastore_rpc._positional(1) |
| def __init__(self, app=None, namespace=None, kind=None, ancestor=None): |
| """Constructs a _QueryKeyFilter. |
| |
| If app/namespace and ancestor are not defined, the app/namespace set in the |
| environment is used. |
| |
| Args: |
| app: a string representing the required app id or None. |
| namespace: a string representing the required namespace or None. |
| kind: a string representing the required kind or None. |
| ancestor: a entity_pb.Reference representing the required ancestor or |
| None. |
| |
| Raises: |
| datastore_erros.BadArgumentError if app and ancestor.app() do not match or |
| an unexpected type is passed in for any argument. |
| """ |
| if kind is not None: |
| datastore_types.ValidateString( |
| kind, 'kind', datastore_errors.BadArgumentError) |
| |
| if ancestor is not None: |
| if not isinstance(ancestor, entity_pb.Reference): |
| raise datastore_errors.BadArgumentError( |
| 'ancestor argument should be entity_pb.Reference (%r)' % |
| (ancestor,)) |
| if app is None: |
| app = ancestor.app() |
| elif app != ancestor.app(): |
| raise datastore_errors.BadArgumentError( |
| 'ancestor argument should match app ("%r" != "%r")' % |
| (ancestor.app(), app)) |
| |
| if namespace is None: |
| namespace = ancestor.name_space() |
| elif namespace != ancestor.name_space(): |
| raise datastore_errors.BadArgumentError( |
| 'ancestor argument should match namespace ("%r" != "%r")' % |
| (ancestor.name_space(), namespace)) |
| |
| pb = entity_pb.Reference() |
| pb.CopyFrom(ancestor) |
| ancestor = pb |
| self.__ancestor = ancestor |
| self.__path = ancestor.path().element_list() |
| else: |
| self.__ancestor = None |
| self.__path = None |
| |
| super(_QueryKeyFilter, self).__init__() |
| self.__app = datastore_types.ResolveAppId(app).encode('utf-8') |
| self.__namespace = ( |
| datastore_types.ResolveNamespace(namespace).encode('utf-8')) |
| self.__kind = kind and kind.encode('utf-8') |
| |
| @property |
| def app(self): |
| return self.__app |
| |
| @property |
| def namespace(self): |
| return self.__namespace |
| |
| @property |
| def kind(self): |
| return self.__kind |
| |
| @property |
| def ancestor(self): |
| |
| return self.__ancestor |
| |
| def __call__(self, entity_or_reference): |
| """Apply the filter. |
| |
| Accepts either an entity or a reference to avoid the need to extract keys |
| from entities when we have a list of entities (which is a common case). |
| |
| Args: |
| entity_or_reference: Either an entity_pb.EntityProto or |
| entity_pb.Reference. |
| """ |
| if isinstance(entity_or_reference, entity_pb.Reference): |
| key = entity_or_reference |
| elif isinstance(entity_or_reference, entity_pb.EntityProto): |
| key = entity_or_reference.key() |
| else: |
| raise datastore_errors.BadArgumentError( |
| 'entity_or_reference argument must be an entity_pb.EntityProto ' + |
| 'or entity_pb.Reference (%r)' % (entity_or_reference)) |
| return (key.app() == self.__app and |
| key.name_space() == self.__namespace and |
| (not self.__kind or |
| key.path().element_list()[-1].type() == self.__kind) and |
| (not self.__path or |
| key.path().element_list()[0:len(self.__path)] == self.__path)) |
| |
| def _to_pb(self): |
| pb = datastore_pb.Query() |
| |
| pb.set_app(self.__app) |
| datastore_types.SetNamespace(pb, self.__namespace) |
| if self.__kind is not None: |
| pb.set_kind(self.__kind) |
| if self.__ancestor: |
| ancestor = pb.mutable_ancestor() |
| ancestor.CopyFrom(self.__ancestor) |
| |
| return pb |
| |
| |
| class _BaseQuery(_BaseComponent): |
| """A base class for query implementations.""" |
| |
| def run(self, conn, query_options=None): |
| """Runs the query using provided datastore_rpc.Connection. |
| |
| Args: |
| conn: The datastore_rpc.Connection to use |
| query_options: Optional query options to use |
| |
| Returns: |
| A Batcher that implicitly fetches query results asynchronously. |
| |
| Raises: |
| datastore_errors.BadArgumentError if any of the arguments are invalid. |
| """ |
| return Batcher(query_options, self.run_async(conn, query_options)) |
| |
| def run_async(self, conn, query_options=None): |
| """Runs the query using the provided datastore_rpc.Connection. |
| |
| Args: |
| conn: the datastore_rpc.Connection on which to run the query. |
| query_options: Optional QueryOptions with which to run the query. |
| |
| Returns: |
| An async object that can be used to grab the first Batch. Additional |
| batches can be retrieved by calling Batch.next_batch/next_batch_async. |
| |
| Raises: |
| datastore_errors.BadArgumentError if any of the arguments are invalid. |
| """ |
| raise NotImplementedError |
| |
| def __getstate__(self): |
| raise pickle.PicklingError( |
| 'Pickling of %r is unsupported.' % self) |
| |
| |
| class Query(_BaseQuery): |
| """An immutable class that represents a query signature. |
| |
| A query signature consists of a source of entities (specified as app, |
| namespace and optionally kind and ancestor) as well as a FilterPredicate, |
| grouping and a desired ordering. |
| """ |
| |
| @datastore_rpc._positional(1) |
| def __init__(self, app=None, namespace=None, kind=None, ancestor=None, |
| filter_predicate=None, group_by=None, order=None): |
| """Constructor. |
| |
| Args: |
| app: Optional app to query, derived from the environment if not specified. |
| namespace: Optional namespace to query, derived from the environment if |
| not specified. |
| kind: Optional kind to query. |
| ancestor: Optional ancestor to query, an entity_pb.Reference. |
| filter_predicate: Optional FilterPredicate by which to restrict the query. |
| order: Optional Order in which to return results. |
| group_by: Optional list of properties to group the results by. |
| |
| Raises: |
| datastore_errors.BadArgumentError if any argument is invalid. |
| """ |
| super(Query, self).__init__() |
| |
| |
| if filter_predicate is not None and not isinstance(filter_predicate, |
| FilterPredicate): |
| raise datastore_errors.BadArgumentError( |
| 'filter_predicate should be datastore_query.FilterPredicate (%r)' % |
| (ancestor,)) |
| |
| |
| if isinstance(order, CompositeOrder): |
| if order.size() == 0: |
| order = None |
| elif isinstance(order, Order): |
| order = CompositeOrder([order]) |
| elif order is not None: |
| raise datastore_errors.BadArgumentError( |
| 'order should be Order (%r)' % (order,)) |
| |
| |
| if group_by is not None: |
| if isinstance(group_by, list): |
| group_by = tuple(group_by) |
| elif not isinstance(group_by, tuple): |
| raise datastore_errors.BadArgumentError( |
| 'group_by argument should be a list or tuple (%r)' % (group_by,)) |
| if not group_by: |
| raise datastore_errors.BadArgumentError( |
| 'group_by argument cannot be empty') |
| for prop in group_by: |
| if not isinstance(prop, basestring): |
| raise datastore_errors.BadArgumentError( |
| 'group_by argument should contain only strings (%r)' % (prop,)) |
| |
| self._key_filter = _QueryKeyFilter(app=app, namespace=namespace, kind=kind, |
| ancestor=ancestor) |
| self._order = order |
| self._filter_predicate = filter_predicate |
| self._group_by = group_by |
| |
| @property |
| def app(self): |
| return self._key_filter.app |
| |
| @property |
| def namespace(self): |
| return self._key_filter.namespace |
| |
| @property |
| def kind(self): |
| return self._key_filter.kind |
| |
| @property |
| def ancestor(self): |
| return self._key_filter.ancestor |
| |
| @property |
| def filter_predicate(self): |
| return self._filter_predicate |
| |
| @property |
| def order(self): |
| return self._order |
| |
| @property |
| def group_by(self): |
| return self._group_by |
| |
| def __repr__(self): |
| args = [] |
| args.append('app=%r' % self.app) |
| ns = self.namespace |
| if ns: |
| args.append('namespace=%r' % ns) |
| kind = self.kind |
| if kind is not None: |
| args.append('kind=%r' % kind) |
| ancestor = self.ancestor |
| if ancestor is not None: |
| websafe = base64.urlsafe_b64encode(ancestor.Encode()) |
| args.append('ancestor=<%s>' % websafe) |
| filter_predicate = self.filter_predicate |
| if filter_predicate is not None: |
| args.append('filter_predicate=%r' % filter_predicate) |
| order = self.order |
| if order is not None: |
| args.append('order=%r' % order) |
| group_by = self.group_by |
| if group_by is not None: |
| args.append('group_by=%r' % (group_by,)) |
| return '%s(%s)' % (self.__class__.__name__, ', '.join(args)) |
| |
| def run_async(self, conn, query_options=None): |
| if not isinstance(conn, datastore_rpc.BaseConnection): |
| raise datastore_errors.BadArgumentError( |
| 'conn should be a datastore_rpc.BaseConnection (%r)' % (conn,)) |
| |
| if not QueryOptions.is_configuration(query_options): |
| |
| |
| query_options = QueryOptions(config=query_options) |
| |
| start_cursor = query_options.start_cursor |
| if not start_cursor and query_options.produce_cursors: |
| start_cursor = Cursor() |
| |
| req = self._to_pb(conn, query_options) |
| return Batch.create_async(self, query_options, conn, req, |
| start_cursor=start_cursor) |
| |
| @classmethod |
| def _from_pb(cls, query_pb): |
| kind = query_pb.has_kind() and query_pb.kind().decode('utf-8') or None |
| ancestor = query_pb.has_ancestor() and query_pb.ancestor() or None |
| |
| filter_predicate = None |
| if query_pb.filter_size() > 0: |
| filter_predicate = CompositeFilter( |
| CompositeFilter.AND, |
| [PropertyFilter._from_pb(filter_pb) |
| for filter_pb in query_pb.filter_list()]) |
| |
| order = None |
| if query_pb.order_size() > 0: |
| order = CompositeOrder([PropertyOrder._from_pb(order_pb) |
| for order_pb in query_pb.order_list()]) |
| |
| group_by = None |
| if query_pb.group_by_property_name_size() > 0: |
| group_by = tuple(name.decode('utf-8') |
| for name in query_pb.group_by_property_name_list()) |
| |
| return Query(app=query_pb.app().decode('utf-8'), |
| namespace=query_pb.name_space().decode('utf-8'), |
| kind=kind, |
| ancestor=ancestor, |
| filter_predicate=filter_predicate, |
| order=order, |
| group_by=group_by) |
| |
| def _to_pb(self, conn, query_options): |
| """Returns the internal only pb representation.""" |
| pb = self._key_filter._to_pb() |
| |
| |
| if self._filter_predicate: |
| for f in self._filter_predicate._to_pbs(): |
| pb.add_filter().CopyFrom(f) |
| |
| |
| if self._order: |
| for order in self._order._to_pbs(): |
| pb.add_order().CopyFrom(order) |
| |
| |
| if self._group_by: |
| pb.group_by_property_name_list().extend(self._group_by) |
| |
| |
| if QueryOptions.keys_only(query_options, conn.config): |
| pb.set_keys_only(True) |
| |
| projection = QueryOptions.projection(query_options, conn.config) |
| if projection: |
| if self._group_by: |
| extra = set(projection) - set(self._group_by) |
| if extra: |
| raise datastore_errors.BadRequestError( |
| 'projections includes properties not in the group_by argument: %s' |
| % extra) |
| pb.property_name_list().extend(projection) |
| elif self._group_by: |
| raise datastore_errors.BadRequestError( |
| 'cannot specify group_by without a projection') |
| |
| if QueryOptions.produce_cursors(query_options, conn.config): |
| pb.set_compile(True) |
| |
| limit = QueryOptions.limit(query_options, conn.config) |
| if limit is not None: |
| pb.set_limit(limit) |
| |
| count = QueryOptions.prefetch_size(query_options, conn.config) |
| if count is None: |
| count = QueryOptions.batch_size(query_options, conn.config) |
| if count is not None: |
| pb.set_count(count) |
| |
| |
| if query_options.offset: |
| pb.set_offset(query_options.offset) |
| |
| |
| if query_options.start_cursor is not None: |
| pb.mutable_compiled_cursor().CopyFrom(query_options.start_cursor._to_pb()) |
| |
| |
| if query_options.end_cursor is not None: |
| pb.mutable_end_compiled_cursor().CopyFrom( |
| query_options.end_cursor._to_pb()) |
| |
| |
| if ((query_options.hint == QueryOptions.ORDER_FIRST and pb.order_size()) or |
| (query_options.hint == QueryOptions.ANCESTOR_FIRST and |
| pb.has_ancestor()) or |
| (query_options.hint == QueryOptions.FILTER_FIRST and |
| pb.filter_size() > 0)): |
| pb.set_hint(query_options.hint) |
| |
| |
| conn._set_request_read_policy(pb, query_options) |
| conn._set_request_transaction(pb) |
| |
| return pb |
| |
| |
| def apply_query(query, entities): |
| """Performs the given query on a set of in-memory entities. |
| |
| This function can perform queries impossible in the datastore (e.g a query |
| with multiple inequality filters on different properties) because all |
| operations are done in memory. For queries that can also be executed on the |
| the datastore, the results produced by this function may not use the same |
| implicit ordering as the datastore. To ensure compatibility, explicit |
| ordering must be used (e.g. 'ORDER BY ineq_prop, ..., __key__'). |
| |
| Order by __key__ should always be used when a consistent result is desired |
| (unless there is a sort order on another globally unique property). |
| |
| Args: |
| query: a datastore_query.Query to apply |
| entities: a list of entity_pb.EntityProto on which to apply the query. |
| |
| Returns: |
| A list of entity_pb.EntityProto contain the results of the query. |
| """ |
| if not isinstance(query, Query): |
| raise datastore_errors.BadArgumentError( |
| "query argument must be a datastore_query.Query (%r)" % (query,)) |
| |
| if not isinstance(entities, list): |
| raise datastore_errors.BadArgumentError( |
| "entities argument must be a list (%r)" % (entities,)) |
| |
| filtered_entities = filter(query._key_filter, entities) |
| |
| if not query._order: |
| |
| |
| |
| |
| if query._filter_predicate: |
| return filter(query._filter_predicate, filtered_entities) |
| return filtered_entities |
| |
| |
| |
| |
| |
| names = query._order._get_prop_names() |
| if query._filter_predicate: |
| names |= query._filter_predicate._get_prop_names() |
| |
| |
| exists_filter = _PropertyExistsFilter(names) |
| |
| value_maps = [] |
| for entity in filtered_entities: |
| value_map = _make_key_value_map(entity, names) |
| |
| |
| |
| if exists_filter._apply(value_map) and ( |
| not query._filter_predicate or |
| query._filter_predicate._prune(value_map)): |
| value_map['__entity__'] = entity |
| value_maps.append(value_map) |
| |
| value_maps.sort(query._order._cmp) |
| return [value_map['__entity__'] for value_map in value_maps] |
| |
| |
| class _AugmentedQuery(_BaseQuery): |
| """A query that combines a datastore query with in-memory filters/results.""" |
| |
| @datastore_rpc._positional(2) |
| def __init__(self, query, in_memory_results=None, in_memory_filter=None, |
| max_filtered_count=None): |
| """Constructor for _AugmentedQuery. |
| |
| Do not call directly. Use the utility functions instead (e.g. |
| datastore_query.inject_results) |
| |
| Args: |
| query: A datastore_query.Query object to augment. |
| in_memory_results: a list of pre- sorted and filtered result to add to the |
| stream of datastore results or None . |
| in_memory_filter: a set of in-memory filters to apply to the datastore |
| results or None. |
| max_filtered_count: the maximum number of datastore entities that will be |
| filtered out by in_memory_filter if known. |
| """ |
| if not isinstance(query, Query): |
| raise datastore_errors.BadArgumentError( |
| 'query argument should be datastore_query.Query (%r)' % (query,)) |
| if (in_memory_filter is not None and |
| not isinstance(in_memory_filter, FilterPredicate)): |
| raise datastore_errors.BadArgumentError( |
| 'in_memory_filter argument should be ' + |
| 'datastore_query.FilterPredicate (%r)' % (in_memory_filter,)) |
| if (in_memory_results is not None and |
| not isinstance(in_memory_results, list)): |
| raise datastore_errors.BadArgumentError( |
| 'in_memory_results argument should be a list of' + |
| 'datastore_pv.EntityProto (%r)' % (in_memory_results,)) |
| datastore_types.ValidateInteger(max_filtered_count, |
| 'max_filtered_count', |
| empty_ok=True, |
| zero_ok=True) |
| self._query = query |
| self._max_filtered_count = max_filtered_count |
| self._in_memory_filter = in_memory_filter |
| self._in_memory_results = in_memory_results |
| |
| def run_async(self, conn, query_options=None): |
| if not isinstance(conn, datastore_rpc.BaseConnection): |
| raise datastore_errors.BadArgumentError( |
| 'conn should be a datastore_rpc.BaseConnection (%r)' % (conn,)) |
| |
| if not QueryOptions.is_configuration(query_options): |
| |
| |
| query_options = QueryOptions(config=query_options) |
| |
| if self._query._order: |
| |
| |
| changes = {'keys_only': False} |
| else: |
| changes = {} |
| |
| if self._in_memory_filter or self._in_memory_results: |
| |
| |
| |
| in_memory_offset = query_options.offset |
| in_memory_limit = query_options.limit |
| |
| if in_memory_limit is not None: |
| if self._in_memory_filter is None: |
| |
| changes['limit'] = in_memory_limit |
| elif self._max_filtered_count is not None: |
| |
| |
| changes['limit'] = in_memory_limit + self._max_filtered_count |
| else: |
| |
| changes['limit'] = None |
| |
| if in_memory_offset: |
| |
| changes['offset'] = None |
| if changes.get('limit', None) is not None: |
| changes['limit'] += in_memory_offset |
| else: |
| in_memory_offset = None |
| else: |
| in_memory_offset = None |
| in_memory_limit = None |
| |
| req = self._query._to_pb( |
| conn, QueryOptions(config=query_options, **changes)) |
| |
| start_cursor = query_options.start_cursor |
| if not start_cursor and query_options.produce_cursors: |
| start_cursor = Cursor() |
| |
| return _AugmentedBatch.create_async(self, query_options, conn, req, |
| in_memory_offset=in_memory_offset, |
| in_memory_limit=in_memory_limit, |
| start_cursor=start_cursor) |
| |
| |
| @datastore_rpc._positional(1) |
| def inject_results(query, updated_entities=None, deleted_keys=None): |
| """Creates a query object that will inject changes into results. |
| |
| Args: |
| query: The datastore_query.Query to augment |
| updated_entities: A list of entity_pb.EntityProto's that have been updated |
| and should take priority over any values returned by query. |
| deleted_keys: A list of entity_pb.Reference's for entities that have been |
| deleted and should be removed from query results. |
| |
| Returns: |
| A datastore_query.AugmentedQuery if in memory filtering is requred, |
| query otherwise. |
| """ |
| if not isinstance(query, Query): |
| raise datastore_errors.BadArgumentError( |
| 'query argument should be datastore_query.Query (%r)' % (query,)) |
| |
| overriden_keys = set() |
| |
| if deleted_keys is not None: |
| if not isinstance(deleted_keys, list): |
| raise datastore_errors.BadArgumentError( |
| 'deleted_keys argument must be a list (%r)' % (deleted_keys,)) |
| deleted_keys = filter(query._key_filter, deleted_keys) |
| for key in deleted_keys: |
| overriden_keys.add(datastore_types.ReferenceToKeyValue(key)) |
| |
| if updated_entities is not None: |
| if not isinstance(updated_entities, list): |
| raise datastore_errors.BadArgumentError( |
| 'updated_entities argument must be a list (%r)' % (updated_entities,)) |
| |
| |
| updated_entities = filter(query._key_filter, updated_entities) |
| for entity in updated_entities: |
| overriden_keys.add(datastore_types.ReferenceToKeyValue(entity.key())) |
| |
| updated_entities = apply_query(query, updated_entities) |
| else: |
| updated_entities = [] |
| |
| if not overriden_keys: |
| return query |
| |
| return _AugmentedQuery(query, |
| in_memory_filter=_IgnoreFilter(overriden_keys), |
| in_memory_results=updated_entities, |
| max_filtered_count=len(overriden_keys)) |
| |
| |
| class _BatchShared(object): |
| """Data shared among the batches of a query.""" |
| |
| def __init__(self, query, query_options, conn, augmented_query=None): |
| self.__query = query |
| self.__query_options = query_options |
| self.__conn = conn |
| self.__augmented_query = augmented_query |
| self.__was_first_result_processed = False |
| |
| @property |
| def query(self): |
| return self.__query |
| |
| @property |
| def query_options(self): |
| return self.__query_options |
| |
| @property |
| def conn(self): |
| return self.__conn |
| |
| @property |
| def augmented_query(self): |
| return self.__augmented_query |
| |
| @property |
| def keys_only(self): |
| return self.__keys_only |
| |
| @property |
| def compiled_query(self): |
| return self.__compiled_query |
| |
| @property |
| def index_list(self): |
| """Returns the list of indexes used by the query. |
| Possibly None when the adapter does not implement pb_to_index. |
| """ |
| return self.__index_list |
| |
| def process_query_result_if_first(self, query_result): |
| if not self.__was_first_result_processed: |
| self.__was_first_result_processed = True |
| self.__keys_only = query_result.keys_only() |
| if query_result.has_compiled_query(): |
| self.__compiled_query = query_result.compiled_query |
| else: |
| self.__compiled_query = None |
| try: |
| self.__index_list = [self.__conn.adapter.pb_to_index(index_pb) |
| for index_pb in query_result.index_list()] |
| except NotImplementedError: |
| |
| self.__index_list = None |
| |
| |
| class Batch(object): |
| """A batch of results returned by a query. |
| |
| This class contains a batch of results returned from the datastore and |
| relevant metadata. This metadata includes: |
| query: The query that produced this batch |
| query_options: The QueryOptions used to run the query. This does not |
| contained any options passed to the .next_batch() call that created the |
| current batch. |
| start_cursor, end_cursor: These are the cursors that can be used |
| with a query to re-fetch this batch. They can also be used to |
| find all entities before or after the given batch (by use start_cursor as |
| an end cursor or vice versa). start_cursor can also be advanced to |
| point to a position within the batch using Cursor.advance(). |
| skipped_results: the number of result skipped because of the offset |
| given to the request that generated it. This can be set either on |
| the original Query.run() request or in subsequent .next_batch() calls. |
| more_results: If this is true there are more results that can be retrieved |
| either by .next_batch() or Batcher.next(). |
| |
| This class is also able to fetch the next batch of the query using |
| .next_batch(). As batches of results must be fetched serially, .next_batch() |
| can only be called once. Additional calls to .next_batch() will return None. |
| When there are no more batches .next_batch() will return None as well. Note |
| that batches returned by iterating over Batcher will always return None for |
| .next_batch() as the Bather handles fetching the next batch automatically. |
| |
| A Batch typically represents the result of a single RPC request. The datastore |
| operates on a "best effort" basis so the batch returned by .next_batch() |
| or Query.run_async().get_result() may not have satisfied the requested offset |
| or number of results (specified through FetchOptions.offset and |
| FetchOptions.batch_size respectively). To satisfy these restrictions |
| additional batches may be needed (with FetchOptions that specify the remaining |
| offset or results needed). The Batcher class hides these limitations. |
| """ |
| |
| @classmethod |
| @datastore_rpc._positional(5) |
| def create_async(cls, query, query_options, conn, req, |
| start_cursor): |
| batch_shared = _BatchShared(query, query_options, conn) |
| batch0 = cls(batch_shared, start_cursor=start_cursor) |
| return batch0._make_query_result_rpc_call('RunQuery', query_options, req) |
| |
| @datastore_rpc._positional(2) |
| def __init__(self, batch_shared, start_cursor=Cursor()): |
| """Constructor. |
| |
| This class is constructed in stages (one when an RPC is sent and another |
| when an rpc is completed) and should not be constructed directly!! |
| Use Query.run_async().get_result() to create a Batch or Query.run() |
| to use a batcher. |
| |
| This constructor does not perform verification. |
| |
| Args: |
| batch_shared: Data shared between batches for a a single query run. |
| start_cursor: Optional cursor pointing before this batch. |
| """ |
| |
| |
| self._batch_shared = batch_shared |
| self.__start_cursor = start_cursor |
| |
| @property |
| def query_options(self): |
| """The QueryOptions used to retrieve the first batch.""" |
| return self._batch_shared.query_options |
| |
| @property |
| def query(self): |
| """The query the current batch came from.""" |
| return self._batch_shared.query |
| |
| @property |
| def results(self): |
| """A list of entities in this batch.""" |
| return self.__results |
| |
| @property |
| def keys_only(self): |
| """Whether the entities in this batch only contain keys.""" |
| return self._batch_shared.keys_only |
| |
| @property |
| def index_list(self): |
| """Returns the list of indexes used to peform this batch's query. |
| Possibly None when the adapter does not implement pb_to_index. |
| """ |
| return self._batch_shared.index_list |
| |
| @property |
| def start_cursor(self): |
| """A cursor that points to the position just before the current batch.""" |
| return self.__start_cursor |
| |
| @property |
| def end_cursor(self): |
| """A cursor that points to the position just after the current batch.""" |
| return self.__end_cursor |
| |
| @property |
| def skipped_results(self): |
| """The number of results skipped because of an offset in the request. |
| |
| An offset is satisfied before any results are returned. The start_cursor |
| points to the position in the query before the skipped results. |
| """ |
| return self._skipped_results |
| |
| @property |
| def more_results(self): |
| """Whether more results can be retrieved from the query.""" |
| return self.__more_results |
| |
| def next_batch(self, fetch_options=None): |
| """Synchronously get the next batch or None if there are no more batches. |
| |
| Args: |
| fetch_options: Optional fetch options to use when fetching the next batch. |
| Merged with both the fetch options on the original call and the |
| connection. |
| |
| Returns: |
| A new Batch of results or None if either the next batch has already been |
| fetched or there are no more results. |
| """ |
| async = self.next_batch_async(fetch_options) |
| if async is None: |
| return None |
| return async.get_result() |
| |
| def _compiled_query(self): |
| return self._batch_shared.compiled_query |
| |
| def cursor(self, index): |
| """Gets the cursor that points to the result at the given index. |
| |
| The index is relative to first result in .results. Since start_cursor |
| points to the position before the first skipped result and the end_cursor |
| points to the position after the last result, the range of indexes this |
| function supports is limited to [-skipped_results, len(results)]. |
| |
| Args: |
| index: An int, the index relative to the first result before which the |
| cursor should point. |
| |
| Returns: |
| A Cursor that points just before the result at the given index which if |
| used as a start_cursor will cause the first result to result[index]. |
| """ |
| if not isinstance(index, (int, long)): |
| raise datastore_errors.BadArgumentError( |
| 'index argument should be entity_pb.Reference (%r)' % (index,)) |
| if not -self._skipped_results <= index <= len(self.__results): |
| raise datastore_errors.BadArgumentError( |
| 'index argument must be in the inclusive range [%d, %d]' % |
| (-self._skipped_results, len(self.__results))) |
| |
| if index == len(self.__results): |
| return self.__end_cursor |
| elif index == -self._skipped_results: |
| return self.__start_cursor |
| else: |
| return self.__start_cursor.advance(index + self._skipped_results, |
| self._batch_shared.query, |
| self._batch_shared.conn) |
| |
| def next_batch_async(self, fetch_options=None): |
| """Asynchronously get the next batch or None if there are no more batches. |
| |
| Args: |
| fetch_options: Optional fetch options to use when fetching the next batch. |
| Merged with both the fetch options on the original call and the |
| connection. |
| |
| Returns: |
| An async object that can be used to get the next Batch or None if either |
| the next batch has already been fetched or there are no more results. |
| """ |
| if not self.__datastore_cursor: |
| return None |
| |
| fetch_options, next_batch = self._make_next_batch(fetch_options) |
| req = self._to_pb(fetch_options) |
| |
| config = self._batch_shared.query_options.merge(fetch_options) |
| return next_batch._make_query_result_rpc_call( |
| 'Next', config, req) |
| |
| def _to_pb(self, fetch_options=None): |
| req = datastore_pb.NextRequest() |
| |
| if FetchOptions.produce_cursors(fetch_options, |
| self._batch_shared.query_options, |
| self._batch_shared.conn.config): |
| req.set_compile(True) |
| |
| count = FetchOptions.batch_size(fetch_options, |
| self._batch_shared.query_options, |
| self._batch_shared.conn.config) |
| if count is not None: |
| req.set_count(count) |
| |
| if fetch_options is not None and fetch_options.offset: |
| req.set_offset(fetch_options.offset) |
| |
| req.mutable_cursor().CopyFrom(self.__datastore_cursor) |
| self.__datastore_cursor = None |
| return req |
| |
| def _extend(self, next_batch): |
| """Combines the current batch with the next one. Called by batcher.""" |
| self.__datastore_cursor = next_batch.__datastore_cursor |
| next_batch.__datastore_cursor = None |
| self.__more_results = next_batch.__more_results |
| self.__results.extend(next_batch.__results) |
| self.__end_cursor = next_batch.__end_cursor |
| self._skipped_results += next_batch._skipped_results |
| |
| def _make_query_result_rpc_call(self, name, config, req): |
| """Makes either a RunQuery or Next call that will modify the instance. |
| |
| Args: |
| name: A string, the name of the call to invoke. |
| config: The datastore_rpc.Configuration to use for the call. |
| req: The request to send with the call. |
| |
| Returns: |
| A UserRPC object that can be used to fetch the result of the RPC. |
| """ |
| return self._batch_shared.conn._make_rpc_call(config, name, req, |
| datastore_pb.QueryResult(), |
| self.__query_result_hook) |
| |
| _need_index_header = 'The suggested index for this query is:' |
| |
| def __query_result_hook(self, rpc): |
| """Internal method used as get_result_hook for RunQuery/Next operation.""" |
| try: |
| self._batch_shared.conn.check_rpc_success(rpc) |
| except datastore_errors.NeedIndexError, exc: |
| |
| if isinstance(rpc.request, datastore_pb.Query): |
| _, kind, ancestor, props = datastore_index.CompositeIndexForQuery( |
| rpc.request) |
| |
| props = datastore_index.GetRecommendedIndexProperties(props) |
| yaml = datastore_index.IndexYamlForQuery(kind, ancestor, props) |
| xml = datastore_index.IndexXmlForQuery(kind, ancestor, props) |
| |
| |
| raise datastore_errors.NeedIndexError( |
| '\n'.join([str(exc), self._need_index_header, yaml]), |
| original_message=str(exc), header=self._need_index_header, |
| yaml_index=yaml, xml_index=xml) |
| raise |
| query_result = rpc.response |
| |
| self._batch_shared.process_query_result_if_first(query_result) |
| |
| self.__end_cursor = Cursor._from_query_result(query_result) |
| self._skipped_results = query_result.skipped_results() |
| |
| if query_result.more_results(): |
| self.__datastore_cursor = query_result.cursor() |
| self.__more_results = True |
| else: |
| self._end() |
| |
| self.__results = self._process_results(query_result.result_list()) |
| return self |
| |
| def _end(self): |
| """Changes the internal state so that no more batches can be produced.""" |
| self.__datastore_cursor = None |
| self.__more_results = False |
| |
| def _make_next_batch(self, fetch_options): |
| """Creates the object to store the next batch. |
| |
| Args: |
| fetch_options: The datastore_query.FetchOptions passed in by the user or |
| None. |
| |
| Returns: |
| A tuple containing the fetch options that should be used internally and |
| the object that should be used to contain the next batch. |
| """ |
| return fetch_options, Batch(self._batch_shared, |
| start_cursor=self.__end_cursor) |
| |
| def _process_results(self, results): |
| """Converts the datastore results into results returned to the user. |
| |
| Args: |
| results: A list of entity_pb.EntityProto's returned by the datastore |
| |
| Returns: |
| A list of results that should be returned to the user. |
| """ |
| pb_to_query_result = self._batch_shared.conn.adapter.pb_to_query_result |
| return [pb_to_query_result(result, self._batch_shared.query_options) |
| for result in results] |
| |
| def __getstate__(self): |
| raise pickle.PicklingError( |
| 'Pickling of datastore_query.Batch is unsupported.') |
| |
| |
| class _AugmentedBatch(Batch): |
| """A batch produced by a datastore_query._AugmentedQuery.""" |
| |
| @classmethod |
| @datastore_rpc._positional(5) |
| def create_async(cls, augmented_query, query_options, conn, req, |
| in_memory_offset, in_memory_limit, start_cursor): |
| batch_shared = _BatchShared(augmented_query._query, |
| query_options, |
| conn, |
| augmented_query) |
| batch0 = cls(batch_shared, |
| in_memory_offset=in_memory_offset, |
| in_memory_limit=in_memory_limit, |
| start_cursor=start_cursor) |
| return batch0._make_query_result_rpc_call('RunQuery', query_options, req) |
| |
| @datastore_rpc._positional(2) |
| def __init__(self, batch_shared, |
| in_memory_offset=None, |
| in_memory_limit=None, |
| next_index=0, |
| start_cursor=Cursor()): |
| """A Constructor for datastore_query._AugmentedBatch. |
| |
| Constructed by datastore_query._AugmentedQuery. Should not be called |
| directly. |
| """ |
| super(_AugmentedBatch, self).__init__(batch_shared, |
| start_cursor=start_cursor) |
| self.__in_memory_offset = in_memory_offset |
| self.__in_memory_limit = in_memory_limit |
| self.__next_index = next_index |
| |
| @property |
| def query(self): |
| """The query the current batch came from.""" |
| return self._batch_shared.augmented_query |
| |
| def cursor(self, index): |
| raise NotImplementedError |
| |
| def _extend(self, next_batch): |
| super(_AugmentedBatch, self)._extend(next_batch) |
| self.__in_memory_limit = next_batch.__in_memory_limit |
| self.__in_memory_offset = next_batch.__in_memory_offset |
| self.__next_index = next_batch.__next_index |
| |
| def _process_results(self, results): |
| |
| in_memory_filter = self._batch_shared.augmented_query._in_memory_filter |
| if in_memory_filter: |
| results = filter(in_memory_filter, results) |
| |
| |
| in_memory_results = self._batch_shared.augmented_query._in_memory_results |
| if in_memory_results and self.__next_index < len(in_memory_results): |
| |
| original_query = super(_AugmentedBatch, self).query |
| if original_query._order: |
| |
| if results: |
| next_result = in_memory_results[self.__next_index] |
| next_key = original_query._order.key(next_result) |
| i = 0 |
| while i < len(results): |
| result = results[i] |
| result_key = original_query._order.key(result) |
| while next_key <= result_key: |
| results.insert(i, next_result) |
| i += 1 |
| self.__next_index += 1 |
| if self.__next_index >= len(in_memory_results): |
| break |
| next_result = in_memory_results[self.__next_index] |
| next_key = original_query._order.key(next_result) |
| i += 1 |
| elif results or not super(_AugmentedBatch, self).more_results: |
| |
| results = in_memory_results + results |
| self.__next_index = len(in_memory_results) |
| |
| |
| if self.__in_memory_offset: |
| assert not self._skipped_results |
| offset = min(self.__in_memory_offset, len(results)) |
| if offset: |
| self._skipped_results += offset |
| self.__in_memory_offset -= offset |
| results = results[offset:] |
| |
| if self.__in_memory_limit is not None: |
| results = results[:self.__in_memory_limit] |
| self.__in_memory_limit -= len(results) |
| if self.__in_memory_limit <= 0: |
| self._end() |
| |
| return super(_AugmentedBatch, self)._process_results(results) |
| |
| def _make_next_batch(self, fetch_options): |
| in_memory_offset = FetchOptions.offset(fetch_options) |
| augmented_query = self._batch_shared.augmented_query |
| if in_memory_offset and (augmented_query._in_memory_filter or |
| augmented_query._in_memory_results): |
| fetch_options = FetchOptions(offset=0) |
| else: |
| in_memory_offset = None |
| return (fetch_options, |
| _AugmentedBatch(self._batch_shared, |
| in_memory_offset=in_memory_offset, |
| in_memory_limit=self.__in_memory_limit, |
| start_cursor=self.end_cursor, |
| next_index=self.__next_index)) |
| |
| |
| class Batcher(object): |
| """A class that implements the Iterator interface for Batches. |
| |
| Typically constructed by a call to Query.run(). |
| |
| The class hides the "best effort" nature of the datastore by potentially |
| making multiple requests to the datastore and merging the resulting batches. |
| This is accomplished efficiently by prefetching results and mixing both |
| non-blocking and blocking calls to the datastore as needed. |
| |
| Iterating through batches is almost always more efficient than pulling all |
| results at once as RPC latency is hidden by asynchronously prefetching |
| results. |
| |
| The batches produce by this class cannot be used to fetch the next batch |
| (through Batch.next_batch()) as before the current batch is returned the |
| request for the next batch has already been sent. |
| """ |
| |
| |
| ASYNC_ONLY = None |
| AT_LEAST_OFFSET = 0 |
| AT_LEAST_ONE = object() |
| |
| def __init__(self, query_options, first_async_batch): |
| """Constructor. |
| |
| Although this class can be manually constructed, it is preferable to use |
| Query.run(query_options). |
| |
| Args: |
| query_options: The QueryOptions used to create the first batch. |
| first_async_batch: The first batch produced by |
| Query.run_async(query_options). |
| """ |
| self.__next_batch = first_async_batch |
| self.__initial_offset = QueryOptions.offset(query_options) or 0 |
| self.__skipped_results = 0 |
| |
| def next(self): |
| """Get the next batch. See .next_batch().""" |
| return self.next_batch(self.AT_LEAST_ONE) |
| |
| def next_batch(self, min_batch_size): |
| """Get the next batch. |
| |
| The batch returned by this function cannot be used to fetch the next batch |
| (through Batch.next_batch()). Instead this function will always return None. |
| To retrieve the next batch use .next() or .next_batch(N). |
| |
| This function may return a batch larger than min_to_fetch, but will never |
| return smaller unless there are no more results. |
| |
| Special values can be used for min_batch_size: |
| ASYNC_ONLY - Do not perform any synchrounous fetches from the datastore |
| even if the this produces a batch with no results. |
| AT_LEAST_OFFSET - Only pull enough results to satifiy the offset. |
| AT_LEAST_ONE - Pull batches until at least one result is returned. |
| |
| Args: |
| min_batch_size: The minimum number of results to retrieve or one of |
| (ASYNC_ONLY, AT_LEAST_OFFSET, AT_LEAST_ONE) |
| |
| Returns: |
| The next Batch of results. |
| """ |
| if min_batch_size in (Batcher.ASYNC_ONLY, Batcher.AT_LEAST_OFFSET, |
| Batcher.AT_LEAST_ONE): |
| exact = False |
| else: |
| exact = True |
| datastore_types.ValidateInteger(min_batch_size, |
| 'min_batch_size', |
| datastore_errors.BadArgumentError) |
| if not self.__next_batch: |
| raise StopIteration |
| |
| |
| batch = self.__next_batch.get_result() |
| self.__next_batch = None |
| self.__skipped_results += batch.skipped_results |
| |
| if min_batch_size is not Batcher.ASYNC_ONLY: |
| if min_batch_size is Batcher.AT_LEAST_ONE: |
| min_batch_size = 1 |
| |
| needed_results = min_batch_size - len(batch.results) |
| while (batch.more_results and |
| (self.__skipped_results < self.__initial_offset or |
| needed_results > 0)): |
| if batch.query_options.batch_size: |
| |
| batch_size = max(batch.query_options.batch_size, needed_results) |
| elif exact: |
| batch_size = needed_results |
| else: |
| batch_size = None |
| next_batch = batch.next_batch(FetchOptions( |
| offset=max(0, self.__initial_offset - self.__skipped_results), |
| batch_size=batch_size)) |
| self.__skipped_results += next_batch.skipped_results |
| needed_results = max(0, needed_results - len(next_batch.results)) |
| batch._extend(next_batch) |
| |
| |
| |
| |
| self.__next_batch = batch.next_batch_async() |
| return batch |
| |
| def __getstate__(self): |
| raise pickle.PicklingError( |
| 'Pickling of datastore_query.Batcher is unsupported.') |
| |
| def __iter__(self): |
| return self |
| |
| |
| class ResultsIterator(object): |
| """An iterator over the results from Batches obtained from a Batcher. |
| |
| ResultsIterator implements Python's iterator protocol, so results can be |
| accessed with the for-statement: |
| |
| > it = ResultsIterator(Query(kind='Person').run()) |
| > for person in it: |
| > print 'Hi, %s!' % person['name'] |
| |
| At any time ResultsIterator.cursor() can be used to grab the Cursor that |
| points just after the last result returned by the iterator. |
| """ |
| |
| def __init__(self, batcher): |
| """Constructor. |
| |
| Args: |
| batcher: A datastore_query.Batcher |
| """ |
| if not isinstance(batcher, Batcher): |
| raise datastore_errors.BadArgumentError( |
| 'batcher argument should be datastore_query.Batcher (%r)' % |
| (batcher,)) |
| |
| self.__batcher = batcher |
| self.__current_batch = None |
| self.__current_pos = 0 |
| |
| def index_list(self): |
| """Returns the list of indexes used to perform the query. |
| Possibly None when the adapter does not implement pb_to_index. |
| """ |
| return self._ensure_current_batch().index_list |
| |
| def cursor(self): |
| """Returns a cursor that points just after the last result returned.""" |
| return self._ensure_current_batch().cursor(self.__current_pos) |
| |
| def _ensure_current_batch(self): |
| if not self.__current_batch: |
| self.__current_batch = self.__batcher.next() |
| self.__current_pos = 0 |
| return self.__current_batch |
| |
| def _compiled_query(self): |
| """Returns the compiled query associated with the iterator. |
| |
| Internal only do not use. |
| """ |
| if not self.__current_batch: |
| self.__current_batch = self.__batcher.next() |
| self.__current_pos = 0 |
| return self.__current_batch._compiled_query() |
| |
| |
| def next(self): |
| """Returns the next query result.""" |
| while (not self.__current_batch or |
| self.__current_pos >= len(self.__current_batch.results)): |
| |
| next_batch = self.__batcher.next() |
| if not next_batch: |
| |
| |
| raise StopIteration |
| |
| self.__current_pos = 0 |
| self.__current_batch = next_batch |
| |
| result = self.__current_batch.results[self.__current_pos] |
| self.__current_pos += 1 |
| return result |
| |
| def __iter__(self): |
| return self |