blob: 11e29c106a97cf51695c8a82fa6825726dfabaf2 [file] [log] [blame]
"""Higher-level Query wrapper.
There are perhaps too many query APIs in the world.
The fundamental API here overloads the 6 comparisons operators to
represent filters on property values, and supports AND and OR
operations (implemented as functions -- Python's 'and' and 'or'
operators cannot be overloaded, and the '&' and '|' operators have a
priority that conflicts with the priority of comparison operators).
For example:
class Employee(Model):
name = StringProperty()
age = IntegerProperty()
rank = IntegerProperty()
@classmethod
def demographic(cls, min_age, max_age):
return cls.query().filter(AND(cls.age >= min_age, cls.age <= max_age))
@classmethod
def ranked(cls, rank):
return cls.query(cls.rank == rank).order(cls.age)
for emp in Employee.seniors(42, 5):
print emp.name, emp.age, emp.rank
The 'in' operator cannot be overloaded, but is supported through the
IN() method. For example:
Employee.query().filter(Employee.rank.IN([4, 5, 6]))
Sort orders are supported through the order() method; unary minus is
overloaded on the Property class to represent a descending order:
Employee.query().order(Employee.name, -Employee.age)
Besides using AND() and OR(), filters can also be combined by
repeatedly calling .filter():
q1 = Employee.query() # A query that returns all employees
q2 = q1.filter(Employee.age >= 30) # Only those over 30
q3 = q2.filter(Employee.age < 40) # Only those in their 30s
A further shortcut is calling .filter() with multiple arguments; this
implies AND():
q1 = Employee.query() # A query that returns all employees
q3 = q1.filter(Employee.age >= 30,
Employee.age < 40) # Only those in their 30s
And finally you can also pass one or more filter expressions directly
to the .query() method:
q3 = Employee.query(Employee.age >= 30,
Employee.age < 40) # Only those in their 30s
Query objects are immutable, so these methods always return a new
Query object; the above calls to filter() do not affect q1. (On the
other hand, operations that are effectively no-ops may return the
original Query object.)
Sort orders can also be combined this way, and .filter() and .order()
calls may be intermixed:
q4 = q3.order(-Employee.age)
q5 = q4.order(Employee.name)
q6 = q5.filter(Employee.rank == 5)
Again, multiple .order() calls can be combined:
q5 = q3.order(-Employee.age, Employee.name)
The simplest way to retrieve Query results is a for-loop:
for emp in q3:
print emp.name, emp.age
Some other methods to run a query and access its results:
q.iter() # Return an iterator; same as iter(q) but more flexible
q.map(callback) # Call the callback function for each query result
q.fetch(N) # Return a list of the first N results
q.get() # Return the first result
q.count(N) # Return the number of results, with a maximum of N
q.fetch_page(N, start_cursor=cursor) # Return (results, cursor, has_more)
All of the above methods take a standard set of additional query
options, either in the form of keyword arguments such as
keys_only=True, or as QueryOptions object passed with
options=QueryOptions(...). The most important query options are:
keys_only: bool, if set the results are keys instead of entities
limit: int, limits the number of results returned
offset: int, skips this many results first
start_cursor: Cursor, start returning results after this position
end_cursor: Cursor, stop returning results after this position
batch_size: int, hint for the number of results returned per RPC
prefetch_size: int, hint for the number of results in the first RPC
produce_cursors: bool, return Cursor objects with the results
For additional (obscure) query options and more details on them,
including an explanation of Cursors, see datastore_query.py.
All of the above methods except for iter() have asynchronous variants
as well, which return a Future; to get the operation's ultimate
result, yield the Future (when inside a tasklet) or call the Future's
get_result() method (outside a tasklet):
q.map_async(callback) # Callback may be a task or a plain function
q.fetch_async(N)
q.get_async()
q.count_async(N)
q.fetch_page_async(N, start_cursor=cursor)
Finally, there's an idiom to efficiently loop over the Query results
in a tasklet, properly yielding when appropriate:
it = q.iter()
while (yield it.has_next_async()):
emp = it.next()
print emp.name, emp.age
"""
from __future__ import with_statement
del with_statement # No need to export this.
__author__ = 'guido@google.com (Guido van Rossum)'
import datetime
import heapq
import itertools
import sys
from .google_imports import datastore_errors
from .google_imports import datastore_rpc
from .google_imports import datastore_types
from .google_imports import datastore_query
from .google_imports import namespace_manager
from . import model
from . import context
from . import tasklets
from . import utils
__all__ = ['Query', 'QueryOptions', 'Cursor', 'QueryIterator',
'RepeatedStructuredPropertyPredicate',
'AND', 'OR', 'ConjunctionNode', 'DisjunctionNode',
'FilterNode', 'PostFilterNode', 'FalseNode', 'Node',
'ParameterNode', 'ParameterizedThing', 'Parameter',
'ParameterizedFunction', 'gql',
]
# Re-export some useful classes from the lower-level module.
Cursor = datastore_query.Cursor
# Some local renamings.
_ASC = datastore_query.PropertyOrder.ASCENDING
_DESC = datastore_query.PropertyOrder.DESCENDING
_AND = datastore_query.CompositeFilter.AND
_KEY = datastore_types._KEY_SPECIAL_PROPERTY
# Table of supported comparison operators.
_OPS = frozenset(['=', '!=', '<', '<=', '>', '>=', 'in'])
# Default limit value. (Yes, the datastore uses int32!)
_MAX_LIMIT = 2 ** 31 - 1
class QueryOptions(context.ContextOptions, datastore_query.QueryOptions):
"""Support both context options and query options (esp. use_cache)."""
class RepeatedStructuredPropertyPredicate(datastore_query.FilterPredicate):
# Used by model.py.
def __init__(self, match_keys, pb, key_prefix):
super(RepeatedStructuredPropertyPredicate, self).__init__()
self.match_keys = match_keys
stripped_keys = []
for key in match_keys:
if not key.startswith(key_prefix):
raise ValueError('key %r does not begin with the specified prefix of %s'
% (key, key_prefix))
stripped_keys.append(key[len(key_prefix):])
value_map = datastore_query._make_key_value_map(pb, stripped_keys)
self.match_values = tuple(value_map[key][0] for key in stripped_keys)
def _get_prop_names(self):
return frozenset(self.match_keys)
def _apply(self, key_value_map):
"""Apply the filter to values extracted from an entity.
Think of self.match_keys and self.match_values as representing a
table with one row. For example:
match_keys = ('name', 'age', 'rank')
match_values = ('Joe', 24, 5)
(Except that in reality, the values are represented by tuples
produced by datastore_types.PropertyValueToKeyValue().)
represents this table:
| name | age | rank |
+---------+-------+--------+
| 'Joe' | 24 | 5 |
Think of key_value_map as a table with the same structure but
(potentially) many rows. This represents a repeated structured
property of a single entity. For example:
{'name': ['Joe', 'Jane', 'Dick'],
'age': [24, 21, 23],
'rank': [5, 1, 2]}
represents this table:
| name | age | rank |
+---------+-------+--------+
| 'Joe' | 24 | 5 |
| 'Jane' | 21 | 1 |
| 'Dick' | 23 | 2 |
We must determine wheter at least one row of the second table
exactly matches the first table. We need this class because the
datastore, when asked to find an entity with name 'Joe', age 24
and rank 5, will include entities that have 'Joe' somewhere in the
name column, 24 somewhere in the age column, and 5 somewhere in
the rank column, but not all aligned on a single row. Such an
entity should not be considered a match.
"""
columns = []
for key in self.match_keys:
column = key_value_map.get(key)
if not column: # None, or an empty list.
return False # If any column is empty there can be no match.
columns.append(column)
# Use izip to transpose the columns into rows.
return self.match_values in itertools.izip(*columns)
# Don't implement _prune()! It would mess up the row correspondence
# within columns.
class ParameterizedThing(object):
"""Base class for Parameter and ParameterizedFunction.
This exists purely for isinstance() checks.
"""
def __eq__(self, other):
raise NotImplementedError
def __ne__(self, other):
eq = self.__eq__(other)
if eq is not NotImplemented:
eq = not eq
return eq
class Parameter(ParameterizedThing):
"""Represents a bound variable in a GQL query.
Parameter(1) corresponds to a slot labeled ":1" in a GQL query.
Parameter('xyz') corresponds to a slot labeled ":xyz".
The value must be set (bound) separately by calling .set(value).
"""
def __init__(self, key):
"""Constructor.
Args:
key: The Parameter key, must be either an integer or a string.
"""
if not isinstance(key, (int, long, basestring)):
raise TypeError('Parameter key must be an integer or string, not %s' %
(key,))
self.__key = key
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.__key)
def __eq__(self, other):
if not isinstance(other, Parameter):
return NotImplemented
return self.__key == other.__key
@property
def key(self):
"""Retrieve the key."""
return self.__key
def resolve(self, bindings, used):
key = self.__key
if key not in bindings:
raise datastore_errors.BadArgumentError(
'Parameter :%s is not bound.' % key)
value = bindings[key]
used[key] = True
return value
class ParameterizedFunction(ParameterizedThing):
"""Represents a GQL function with parameterized arguments.
For example, ParameterizedFunction('key', [Parameter(1)]) stands for
the GQL syntax KEY(:1).
"""
def __init__(self, func, values):
from .google_imports import gql # Late import, to avoid name conflict.
self.__func = func
self.__values = values
# NOTE: A horrible hack using GQL private variables so we can
# reuse GQL's implementations of its built-in functions.
gqli = gql.GQL('SELECT * FROM Dummy')
gql_method = gqli._GQL__cast_operators[func]
self.__method = getattr(gqli, '_GQL' + gql_method.__name__)
def __repr__(self):
return 'ParameterizedFunction(%r, %r)' % (self.__func, self.__values)
def __eq__(self, other):
if not isinstance(other, ParameterizedFunction):
return NotImplemented
return (self.__func == other.__func and
self.__values == other.__values)
@property
def func(self):
return self.__func
@property
def values(self):
return self.__values
def is_parameterized(self):
for val in self.__values:
if isinstance(val, Parameter):
return True
return False
def resolve(self, bindings, used):
values = []
for val in self.__values:
if isinstance(val, Parameter):
val = val.resolve(bindings, used)
values.append(val)
result = self.__method(values)
# The gql module returns slightly different types in some cases.
if self.__func == 'key' and isinstance(result, datastore_types.Key):
result = model.Key.from_old_key(result)
elif self.__func == 'time' and isinstance(result, datetime.datetime):
result = datetime.time(result.hour, result.minute,
result.second, result.microsecond)
elif self.__func == 'date' and isinstance(result, datetime.datetime):
result = datetime.date(result.year, result.month, result.day)
return result
class Node(object):
"""Base class for filter expression tree nodes.
Tree nodes are considered immutable, even though they can contain
Parameter instances, which are not. In particular, two identical
trees may be represented by the same Node object in different
contexts.
"""
def __new__(cls):
if cls is Node:
raise TypeError('Cannot instantiate Node, only a subclass.')
return super(Node, cls).__new__(cls)
def __eq__(self, other):
raise NotImplementedError
def __ne__(self, other):
eq = self.__eq__(other)
if eq is not NotImplemented:
eq = not eq
return eq
def __unordered(self, unused_other):
raise TypeError('Nodes cannot be ordered')
__le__ = __lt__ = __ge__ = __gt__ = __unordered
def _to_filter(self, post=False):
"""Helper to convert to datastore_query.Filter, or None."""
raise NotImplementedError
def _post_filters(self):
"""Helper to extract post-filter Nodes, if any."""
return None
def resolve(self, bindings, used):
"""Return a Node with Parameters replaced by the selected values.
Args:
bindings: A dict mapping integers and strings to values.
used: A dict into which use of use of a binding is recorded.
Returns:
A Node instance.
"""
return self
class FalseNode(Node):
"""Tree node for an always-failing filter."""
def __eq__(self, other):
if not isinstance(other, FalseNode):
return NotImplemented
return True
def _to_filter(self, post=False):
if post:
return None
# Because there's no point submitting a query that will never
# return anything.
raise datastore_errors.BadQueryError(
'Cannot convert FalseNode to predicate')
class ParameterNode(Node):
"""Tree node for a parameterized filter."""
def __new__(cls, prop, op, param):
if not isinstance(prop, model.Property):
raise TypeError('Expected a Property, got %r' % (prop,))
if op not in _OPS:
raise TypeError('Expected a valid operator, got %r' % (op,))
if not isinstance(param, ParameterizedThing):
raise TypeError('Expected a ParameterizedThing, got %r' % (param,))
obj = super(ParameterNode, cls).__new__(cls)
obj.__prop = prop
obj.__op = op
obj.__param = param
return obj
def __repr__(self):
return 'ParameterNode(%r, %r, %r)' % (self.__prop, self.__op, self.__param)
def __eq__(self, other):
if not isinstance(other, ParameterNode):
return NotImplemented
return (self.__prop._name == other.__prop._name and
self.__op == other.__op and
self.__param == other.__param)
def _to_filter(self, post=False):
raise datastore_errors.BadArgumentError(
'Parameter :%s is not bound.' % (self.__param.key,))
def resolve(self, bindings, used):
value = self.__param.resolve(bindings, used)
if self.__op == 'in':
return self.__prop._IN(value)
else:
return self.__prop._comparison(self.__op, value)
class FilterNode(Node):
"""Tree node for a single filter expression."""
def __new__(cls, name, opsymbol, value):
if isinstance(value, model.Key):
value = value.to_old_key()
if opsymbol == '!=':
n1 = FilterNode(name, '<', value)
n2 = FilterNode(name, '>', value)
return DisjunctionNode(n1, n2)
if opsymbol == 'in':
if not isinstance(value, (list, tuple, set, frozenset)):
raise TypeError('in expected a list, tuple or set of values; '
'received %r' % value)
nodes = [FilterNode(name, '=', v) for v in value]
if not nodes:
return FalseNode()
if len(nodes) == 1:
return nodes[0]
return DisjunctionNode(*nodes)
self = super(FilterNode, cls).__new__(cls)
self.__name = name
self.__opsymbol = opsymbol
self.__value = value
return self
def __repr__(self):
return '%s(%r, %r, %r)' % (self.__class__.__name__,
self.__name, self.__opsymbol, self.__value)
def __eq__(self, other):
if not isinstance(other, FilterNode):
return NotImplemented
# TODO: Should nodes with values that compare equal but have
# different types really be considered equal? IIUC the datastore
# doesn't consider 1 equal to 1.0 when it compares property values.
return (self.__name == other.__name and
self.__opsymbol == other.__opsymbol and
self.__value == other.__value)
def _to_filter(self, post=False):
if post:
return None
if self.__opsymbol in ('!=', 'in'):
raise NotImplementedError('Inequality filters are not single filter '
'expressions and therefore cannot be converted '
'to a single filter (%r)' % self.__opsymbol)
value = self.__value
return datastore_query.make_filter(self.__name.decode('utf-8'),
self.__opsymbol, value)
class PostFilterNode(Node):
"""Tree node representing an in-memory filtering operation.
This is used to represent filters that cannot be executed by the
datastore, for example a query for a structured value.
"""
def __new__(cls, predicate):
self = super(PostFilterNode, cls).__new__(cls)
self.predicate = predicate
return self
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self.predicate)
def __eq__(self, other):
if not isinstance(other, PostFilterNode):
return NotImplemented
return self is other
def _to_filter(self, post=False):
if post:
return self.predicate
else:
return None
class ConjunctionNode(Node):
"""Tree node representing a Boolean AND operator on two or more nodes."""
def __new__(cls, *nodes):
if not nodes:
raise TypeError('ConjunctionNode() requires at least one node.')
elif len(nodes) == 1:
return nodes[0]
clauses = [[]] # Outer: Disjunction; inner: Conjunction.
# TODO: Remove duplicates?
for node in nodes:
if not isinstance(node, Node):
raise TypeError('ConjunctionNode() expects Node instances as arguments;'
' received a non-Node instance %r' % node)
if isinstance(node, DisjunctionNode):
# Apply the distributive law: (X or Y) and (A or B) becomes
# (X and A) or (X and B) or (Y and A) or (Y and B).
new_clauses = []
for clause in clauses:
for subnode in node:
new_clause = clause + [subnode]
new_clauses.append(new_clause)
clauses = new_clauses
elif isinstance(node, ConjunctionNode):
# Apply half of the distributive law: (X or Y) and A becomes
# (X and A) or (Y and A).
for clause in clauses:
clause.extend(node.__nodes)
else:
# Ditto.
for clause in clauses:
clause.append(node)
if not clauses:
return FalseNode()
if len(clauses) > 1:
return DisjunctionNode(*[ConjunctionNode(*clause) for clause in clauses])
self = super(ConjunctionNode, cls).__new__(cls)
self.__nodes = clauses[0]
return self
def __iter__(self):
return iter(self.__nodes)
def __repr__(self):
return 'AND(%s)' % (', '.join(map(str, self.__nodes)))
def __eq__(self, other):
if not isinstance(other, ConjunctionNode):
return NotImplemented
return self.__nodes == other.__nodes
def _to_filter(self, post=False):
filters = filter(None,
(node._to_filter(post=post)
for node in self.__nodes
if isinstance(node, PostFilterNode) == post))
if not filters:
return None
if len(filters) == 1:
return filters[0]
return datastore_query.CompositeFilter(_AND, filters)
def _post_filters(self):
post_filters = [node for node in self.__nodes
if isinstance(node, PostFilterNode)]
if not post_filters:
return None
if len(post_filters) == 1:
return post_filters[0]
if post_filters == self.__nodes:
return self
return ConjunctionNode(*post_filters)
def resolve(self, bindings, used):
nodes = [node.resolve(bindings, used) for node in self.__nodes]
if nodes == self.__nodes:
return self
return ConjunctionNode(*nodes)
class DisjunctionNode(Node):
"""Tree node representing a Boolean OR operator on two or more nodes."""
def __new__(cls, *nodes):
if not nodes:
raise TypeError('DisjunctionNode() requires at least one node')
elif len(nodes) == 1:
return nodes[0]
self = super(DisjunctionNode, cls).__new__(cls)
self.__nodes = []
# TODO: Remove duplicates?
for node in nodes:
if not isinstance(node, Node):
raise TypeError('DisjunctionNode() expects Node instances as arguments;'
' received a non-Node instance %r' % node)
if isinstance(node, DisjunctionNode):
self.__nodes.extend(node.__nodes)
else:
self.__nodes.append(node)
return self
def __iter__(self):
return iter(self.__nodes)
def __repr__(self):
return 'OR(%s)' % (', '.join(map(str, self.__nodes)))
def __eq__(self, other):
if not isinstance(other, DisjunctionNode):
return NotImplemented
return self.__nodes == other.__nodes
def resolve(self, bindings, used):
nodes = [node.resolve(bindings, used) for node in self.__nodes]
if nodes == self.__nodes:
return self
return DisjunctionNode(*nodes)
# AND and OR are preferred aliases for these.
AND = ConjunctionNode
OR = DisjunctionNode
def _args_to_val(func, args):
"""Helper for GQL parsing to extract values from GQL expressions.
This can extract the value from a GQL literal, return a Parameter
for a GQL bound parameter (:1 or :foo), and interprets casts like
KEY(...) and plain lists of values like (1, 2, 3).
Args:
func: A string indicating what kind of thing this is.
args: One or more GQL values, each integer, string, or GQL literal.
"""
from .google_imports import gql # Late import, to avoid name conflict.
vals = []
for arg in args:
if isinstance(arg, (int, long, basestring)):
val = Parameter(arg)
elif isinstance(arg, gql.Literal):
val = arg.Get()
else:
raise TypeError('Unexpected arg (%r)' % arg)
vals.append(val)
if func == 'nop':
if len(vals) != 1:
raise TypeError('"nop" requires exactly one value')
return vals[0] # May be a Parameter
pfunc = ParameterizedFunction(func, vals)
if pfunc.is_parameterized():
return pfunc
else:
return pfunc.resolve({}, {})
def _get_prop_from_modelclass(modelclass, name):
"""Helper for FQL parsing to turn a property name into a property object.
Args:
modelclass: The model class specified in the query.
name: The property name. This may contain dots which indicate
sub-properties of structured properties.
Returns:
A Property object.
Raises:
KeyError if the property doesn't exist and the model clas doesn't
derive from Expando.
"""
if name == '__key__':
return modelclass._key
parts = name.split('.')
part, more = parts[0], parts[1:]
prop = modelclass._properties.get(part)
if prop is None:
if issubclass(modelclass, model.Expando):
prop = model.GenericProperty(part)
else:
raise TypeError('Model %s has no property named %r' %
(modelclass._get_kind(), part))
while more:
part = more.pop(0)
if not isinstance(prop, model.StructuredProperty):
raise TypeError('Model %s has no property named %r' %
(modelclass._get_kind(), part))
maybe = getattr(prop, part, None)
if isinstance(maybe, model.Property) and maybe._name == part:
prop = maybe
else:
maybe = prop._modelclass._properties.get(part)
if maybe is not None:
# Must get it this way to get the copy with the long name.
# (See StructuredProperty.__getattr__() for details.)
prop = getattr(prop, maybe._code_name)
else:
if issubclass(prop._modelclass, model.Expando) and not more:
prop = model.GenericProperty()
prop._name = name # Bypass the restriction on dots.
else:
raise KeyError('Model %s has no property named %r' %
(prop._modelclass._get_kind(), part))
return prop
class Query(object):
"""Query object.
Usually constructed by calling Model.query().
See module docstring for examples.
Note that not all operations on Queries are supported by _MultiQuery
instances; the latter are generated as necessary when any of the
operators !=, IN or OR is used.
"""
@utils.positional(1)
def __init__(self, kind=None, ancestor=None, filters=None, orders=None,
app=None, namespace=None, default_options=None,
projection=None, group_by=None):
"""Constructor.
Args:
kind: Optional kind string.
ancestor: Optional ancestor Key.
filters: Optional Node representing a filter expression tree.
orders: Optional datastore_query.Order object.
app: Optional app id.
namespace: Optional namespace.
default_options: Optional QueryOptions object.
projection: Optional list or tuple of properties to project.
group_by: Optional list or tuple of properties to group by.
"""
# TODO(arfuller): Accept projection=Model.key to mean keys_only.
# TODO(arfuller): Consider adding incremental function
# group_by_property(*args) and project(*args, distinct=False).
# Validating input.
if ancestor is not None:
if isinstance(ancestor, ParameterizedThing):
if isinstance(ancestor, ParameterizedFunction):
if ancestor.func != 'key':
raise TypeError('ancestor cannot be a GQL function other than KEY')
else:
if not isinstance(ancestor, model.Key):
raise TypeError('ancestor must be a Key; received %r' % (ancestor,))
if not ancestor.id():
raise ValueError('ancestor cannot be an incomplete key')
if app is not None:
if app != ancestor.app():
raise TypeError('app/ancestor mismatch')
if namespace is None:
namespace = ancestor.namespace()
else:
if namespace != ancestor.namespace():
raise TypeError('namespace/ancestor mismatch')
if filters is not None:
if not isinstance(filters, Node):
raise TypeError('filters must be a query Node or None; received %r' %
(filters,))
if orders is not None:
if not isinstance(orders, datastore_query.Order):
raise TypeError('orders must be an Order instance or None; received %r'
% (orders,))
if default_options is not None:
if not isinstance(default_options, datastore_rpc.BaseConfiguration):
raise TypeError('default_options must be a Configuration or None; '
'received %r' % (default_options,))
if projection is not None:
if default_options.projection is not None:
raise TypeError('cannot use projection= and '
'default_options.projection at the same time')
if default_options.keys_only is not None:
raise TypeError('cannot use projection= and '
'default_options.keys_only at the same time')
self.__kind = kind # String.
self.__ancestor = ancestor # Key.
self.__filters = filters # None or Node subclass.
self.__orders = orders # None or datastore_query.Order instance.
self.__app = app
self.__namespace = namespace
self.__default_options = default_options
# Checked late as _check_properties depends on local state.
self.__projection = None
if projection is not None:
if not projection:
raise TypeError('projection argument cannot be empty')
if not isinstance(projection, (tuple, list)):
raise TypeError(
'projection must be a tuple, list or None; received %r' %
(projection,))
self._check_properties(self._to_property_names(projection))
self.__projection = tuple(projection)
self.__group_by = None
if group_by is not None:
if not group_by:
raise TypeError('group_by argument cannot be empty')
if not isinstance(group_by, (tuple, list)):
raise TypeError(
'group_by must be a tuple, list or None; received %r' % (group_by,))
self._check_properties(self._to_property_names(group_by))
self.__group_by = tuple(group_by)
def __repr__(self):
args = []
if self.app is not None:
args.append('app=%r' % self.app)
if (self.namespace is not None and
self.namespace != namespace_manager.get_namespace()):
# Only show the namespace if set and not the current namespace.
# (This is similar to what Key.__repr__() does.)
args.append('namespace=%r' % self.namespace)
if self.kind is not None:
args.append('kind=%r' % self.kind)
if self.ancestor is not None:
args.append('ancestor=%r' % self.ancestor)
if self.filters is not None:
args.append('filters=%r' % self.filters)
if self.orders is not None:
# TODO: Format orders better.
args.append('orders=...') # PropertyOrder doesn't have a good repr().
if self.projection:
args.append('projection=%r' % (self._to_property_names(self.projection)))
if self.group_by:
args.append('group_by=%r' % (self._to_property_names(self.group_by)))
if self.default_options is not None:
args.append('default_options=%r' % self.default_options)
return '%s(%s)' % (self.__class__.__name__, ', '.join(args))
def _fix_namespace(self):
"""Internal helper to fix the namespace.
This is called to ensure that for queries without an explicit
namespace, the namespace used by async calls is the one in effect
at the time the async call is made, not the one in effect when the
the request is actually generated.
"""
if self.namespace is not None:
return self
namespace = namespace_manager.get_namespace()
return self.__class__(kind=self.kind, ancestor=self.ancestor,
filters=self.filters, orders=self.orders,
app=self.app, namespace=namespace,
default_options=self.default_options,
projection=self.projection, group_by=self.group_by)
def _get_query(self, connection):
self.bind() # Raises an exception if there are unbound parameters.
kind = self.kind
ancestor = self.ancestor
if ancestor is not None:
ancestor = connection.adapter.key_to_pb(ancestor)
filters = self.filters
post_filters = None
if filters is not None:
post_filters = filters._post_filters()
filters = filters._to_filter()
group_by = None
if self.group_by:
group_by = self._to_property_names(self.group_by)
dsquery = datastore_query.Query(app=self.app,
namespace=self.namespace,
kind=kind.decode('utf-8') if kind else None,
ancestor=ancestor,
filter_predicate=filters,
order=self.orders,
group_by=group_by)
if post_filters is not None:
dsquery = datastore_query._AugmentedQuery(
dsquery,
in_memory_filter=post_filters._to_filter(post=True))
return dsquery
@tasklets.tasklet
def run_to_queue(self, queue, conn, options=None, dsquery=None):
"""Run this query, putting entities into the given queue."""
try:
multiquery = self._maybe_multi_query()
if multiquery is not None:
yield multiquery.run_to_queue(queue, conn, options=options)
return
if dsquery is None:
dsquery = self._get_query(conn)
rpc = dsquery.run_async(conn, options)
while rpc is not None:
batch = yield rpc
rpc = batch.next_batch_async(options)
for i, result in enumerate(batch.results):
queue.putq((batch, i, result))
queue.complete()
except GeneratorExit:
raise
except Exception:
if not queue.done():
_, e, tb = sys.exc_info()
queue.set_exception(e, tb)
raise
@tasklets.tasklet
def _run_to_list(self, results, options=None):
# Internal version of run_to_queue(), without a queue.
ctx = tasklets.get_context()
conn = ctx._conn
dsquery = self._get_query(conn)
rpc = dsquery.run_async(conn, options)
while rpc is not None:
batch = yield rpc
if (batch.skipped_results and
datastore_query.FetchOptions.offset(options)):
offset = options.offset - batch.skipped_results
options = datastore_query.FetchOptions(offset=offset, config=options)
rpc = batch.next_batch_async(options)
for result in batch.results:
result = ctx._update_cache_from_query_result(result, options)
if result is not None:
results.append(result)
raise tasklets.Return(results)
def _needs_multi_query(self):
filters = self.filters
return filters is not None and isinstance(filters, DisjunctionNode)
def _maybe_multi_query(self):
if not self._needs_multi_query():
return None
# Switch to a _MultiQuery.
filters = self.filters
subqueries = []
for subfilter in filters:
subquery = self.__class__(kind=self.kind, ancestor=self.ancestor,
filters=subfilter, orders=self.orders,
app=self.app, namespace=self.namespace,
default_options=self.default_options,
projection=self.projection,
group_by=self.group_by)
subqueries.append(subquery)
return _MultiQuery(subqueries)
@property
def kind(self):
"""Accessor for the kind (a string or None)."""
return self.__kind
@property
def ancestor(self):
"""Accessor for the ancestor (a Key or None)."""
return self.__ancestor
@property
def filters(self):
"""Accessor for the filters (a Node or None)."""
return self.__filters
@property
def orders(self):
"""Accessor for the filters (a datastore_query.Order or None)."""
return self.__orders
@property
def app(self):
"""Accessor for the app (a string or None)."""
return self.__app
@property
def namespace(self):
"""Accessor for the namespace (a string or None)."""
return self.__namespace
@property
def default_options(self):
"""Accessor for the default_options (a QueryOptions instance or None)."""
return self.__default_options
@property
def group_by(self):
"""Accessor for the group by properties (a tuple instance or None)."""
return self.__group_by
@property
def projection(self):
"""Accessor for the projected properties (a tuple instance or None)."""
return self.__projection
@property
def is_distinct(self):
"""True if results are guaranteed to contain a unique set of property
values.
This happens when every property in the group_by is also in the projection.
"""
return bool(self.__group_by and
set(self._to_property_names(self.__group_by)) <=
set(self._to_property_names(self.__projection)))
def filter(self, *args):
"""Return a new Query with additional filter(s) applied."""
if not args:
return self
preds = []
f = self.filters
if f:
preds.append(f)
for arg in args:
if not isinstance(arg, Node):
raise TypeError('Cannot filter a non-Node argument; received %r' % arg)
preds.append(arg)
if not preds:
pred = None
elif len(preds) == 1:
pred = preds[0]
else:
pred = ConjunctionNode(*preds)
return self.__class__(kind=self.kind, ancestor=self.ancestor,
filters=pred, orders=self.orders,
app=self.app, namespace=self.namespace,
default_options=self.default_options,
projection=self.projection, group_by=self.group_by)
def order(self, *args):
"""Return a new Query with additional sort order(s) applied."""
# q.order(Employee.name, -Employee.age)
if not args:
return self
orders = []
o = self.orders
if o:
orders.append(o)
for arg in args:
if isinstance(arg, model.Property):
orders.append(datastore_query.PropertyOrder(arg._name, _ASC))
elif isinstance(arg, datastore_query.Order):
orders.append(arg)
else:
raise TypeError('order() expects a Property or query Order; '
'received %r' % arg)
if not orders:
orders = None
elif len(orders) == 1:
orders = orders[0]
else:
orders = datastore_query.CompositeOrder(orders)
return self.__class__(kind=self.kind, ancestor=self.ancestor,
filters=self.filters, orders=orders,
app=self.app, namespace=self.namespace,
default_options=self.default_options,
projection=self.projection, group_by=self.group_by)
# Datastore API using the default context.
def iter(self, **q_options):
"""Construct an iterator over the query.
Args:
**q_options: All query options keyword arguments are supported.
Returns:
A QueryIterator object.
"""
self.bind() # Raises an exception if there are unbound parameters.
return QueryIterator(self, **q_options)
__iter__ = iter
@utils.positional(2)
def map(self, callback, pass_batch_into_callback=None,
merge_future=None, **q_options):
"""Map a callback function or tasklet over the query results.
Args:
callback: A function or tasklet to be applied to each result; see below.
merge_future: Optional Future subclass; see below.
**q_options: All query options keyword arguments are supported.
Callback signature: The callback is normally called with an entity
as argument. However if keys_only=True is given, it is called
with a Key. Also, when pass_batch_into_callback is True, it is
called with three arguments: the current batch, the index within
the batch, and the entity or Key at that index. The callback can
return whatever it wants. If the callback is None, a trivial
callback is assumed that just returns the entity or key passed in
(ignoring produce_cursors).
Optional merge future: The merge_future is an advanced argument
that can be used to override how the callback results are combined
into the overall map() return value. By default a list of
callback return values is produced. By substituting one of a
small number of specialized alternatives you can arrange
otherwise. See tasklets.MultiFuture for the default
implementation and a description of the protocol the merge_future
object must implement the default. Alternatives from the same
module include QueueFuture, SerialQueueFuture and ReducingFuture.
Returns:
When the query has run to completion and all callbacks have
returned, map() returns a list of the results of all callbacks.
(But see 'optional merge future' above.)
"""
return self.map_async(callback,
pass_batch_into_callback=pass_batch_into_callback,
merge_future=merge_future,
**q_options).get_result()
@utils.positional(2)
def map_async(self, callback, pass_batch_into_callback=None,
merge_future=None, **q_options):
"""Map a callback function or tasklet over the query results.
This is the asynchronous version of Query.map().
"""
qry = self._fix_namespace()
return tasklets.get_context().map_query(
qry,
callback,
pass_batch_into_callback=pass_batch_into_callback,
options=self._make_options(q_options),
merge_future=merge_future)
@utils.positional(2)
def fetch(self, limit=None, **q_options):
"""Fetch a list of query results, up to a limit.
Args:
limit: How many results to retrieve at most.
**q_options: All query options keyword arguments are supported.
Returns:
A list of results.
"""
return self.fetch_async(limit, **q_options).get_result()
@utils.positional(2)
def fetch_async(self, limit=None, **q_options):
"""Fetch a list of query results, up to a limit.
This is the asynchronous version of Query.fetch().
"""
if limit is None:
default_options = self._make_options(q_options)
if default_options is not None and default_options.limit is not None:
limit = default_options.limit
else:
limit = _MAX_LIMIT
q_options['limit'] = limit
q_options.setdefault('batch_size', limit)
if self._needs_multi_query():
return self.map_async(None, **q_options)
# Optimization using direct batches.
options = self._make_options(q_options)
qry = self._fix_namespace()
return qry._run_to_list([], options=options)
def get(self, **q_options):
"""Get the first query result, if any.
This is similar to calling q.fetch(1) and returning the first item
of the list of results, if any, otherwise None.
Args:
**q_options: All query options keyword arguments are supported.
Returns:
A single result, or None if there are no results.
"""
return self.get_async(**q_options).get_result()
def get_async(self, **q_options):
"""Get the first query result, if any.
This is the asynchronous version of Query.get().
"""
qry = self._fix_namespace()
return qry._get_async(**q_options)
@tasklets.tasklet
def _get_async(self, **q_options):
"""Internal version of get_async()."""
res = yield self.fetch_async(1, **q_options)
if not res:
raise tasklets.Return(None)
raise tasklets.Return(res[0])
@utils.positional(2)
def count(self, limit=None, **q_options):
"""Count the number of query results, up to a limit.
This returns the same result as len(q.fetch(limit)) but more
efficiently.
Note that you must pass a maximum value to limit the amount of
work done by the query.
Args:
limit: How many results to count at most.
**q_options: All query options keyword arguments are supported.
Returns:
"""
return self.count_async(limit, **q_options).get_result()
@utils.positional(2)
def count_async(self, limit=None, **q_options):
"""Count the number of query results, up to a limit.
This is the asynchronous version of Query.count().
"""
qry = self._fix_namespace()
return qry._count_async(limit=limit, **q_options)
@tasklets.tasklet
def _count_async(self, limit=None, **q_options):
"""Internal version of count_async()."""
# TODO: Support offset by incorporating it to the limit.
if 'offset' in q_options:
raise NotImplementedError('.count() and .count_async() do not support '
'offsets at present.')
if 'limit' in q_options:
raise TypeError('Cannot specify limit as a non-keyword argument and as a '
'keyword argument simultaneously.')
elif limit is None:
limit = _MAX_LIMIT
if self._needs_multi_query():
# _MultiQuery does not support iterating over result batches,
# so just fetch results and count them.
# TODO: Use QueryIterator to avoid materializing the results list.
q_options.setdefault('batch_size', limit)
q_options.setdefault('keys_only', True)
results = yield self.fetch_async(limit, **q_options)
raise tasklets.Return(len(results))
# Issue a special query requesting 0 results at a given offset.
# The skipped_results count will tell us how many hits there were
# before that offset without fetching the items.
q_options['offset'] = limit
q_options['limit'] = 0
options = self._make_options(q_options)
conn = tasklets.get_context()._conn
dsquery = self._get_query(conn)
rpc = dsquery.run_async(conn, options)
total = 0
while rpc is not None:
batch = yield rpc
rpc = batch.next_batch_async(options)
total += batch.skipped_results
raise tasklets.Return(total)
@utils.positional(2)
def fetch_page(self, page_size, **q_options):
"""Fetch a page of results.
This is a specialized method for use by paging user interfaces.
Args:
page_size: The requested page size. At most this many results
will be returned.
In addition, any keyword argument supported by the QueryOptions
class is supported. In particular, to fetch the next page, you
pass the cursor returned by one call to the next call using
start_cursor=<cursor>. A common idiom is to pass the cursor to
the client using <cursor>.to_websafe_string() and to reconstruct
that cursor on a subsequent request using
Cursor.from_websafe_string(<string>).
Returns:
A tuple (results, cursor, more) where results is a list of query
results, cursor is a cursor pointing just after the last result
returned, and more is a bool indicating whether there are
(likely) more results after that.
"""
# NOTE: page_size can't be passed as a keyword.
return self.fetch_page_async(page_size, **q_options).get_result()
@utils.positional(2)
def fetch_page_async(self, page_size, **q_options):
"""Fetch a page of results.
This is the asynchronous version of Query.fetch_page().
"""
qry = self._fix_namespace()
return qry._fetch_page_async(page_size, **q_options)
@tasklets.tasklet
def _fetch_page_async(self, page_size, **q_options):
"""Internal version of fetch_page_async()."""
q_options.setdefault('batch_size', page_size)
q_options.setdefault('produce_cursors', True)
it = self.iter(limit=page_size + 1, **q_options)
results = []
while (yield it.has_next_async()):
results.append(it.next())
if len(results) >= page_size:
break
try:
cursor = it.cursor_after()
except datastore_errors.BadArgumentError:
cursor = None
raise tasklets.Return(results, cursor, it.probably_has_next())
def _make_options(self, q_options):
"""Helper to construct a QueryOptions object from keyword arguments.
Args:
q_options: a dict of keyword arguments.
Note that either 'options' or 'config' can be used to pass another
QueryOptions object, but not both. If another QueryOptions object is
given it provides default values.
If self.default_options is set, it is used to provide defaults,
which have a lower precedence than options set in q_options.
Returns:
A QueryOptions object, or None if q_options is empty.
"""
if not (q_options or self.__projection):
return self.default_options
if 'options' in q_options:
# Move 'options' to 'config' since that is what QueryOptions() uses.
if 'config' in q_options:
raise TypeError('You cannot use config= and options= at the same time')
q_options['config'] = q_options.pop('options')
if q_options.get('projection'):
try:
q_options['projection'] = self._to_property_names(
q_options['projection'])
except TypeError, e:
raise datastore_errors.BadArgumentError(e)
self._check_properties(q_options['projection'])
options = QueryOptions(**q_options)
# Populate projection if it hasn't been overridden.
if (options.keys_only is None and
options.projection is None and
self.__projection):
options = QueryOptions(
projection=self._to_property_names(self.__projection), config=options)
# Populate default options
if self.default_options is not None:
options = self.default_options.merge(options)
return options
def _to_property_names(self, properties):
if not isinstance(properties, (list, tuple)):
properties = [properties] # It will be type-checked below.
fixed = []
for proj in properties:
if isinstance(proj, basestring):
fixed.append(proj)
elif isinstance(proj, model.Property):
fixed.append(proj._name)
else:
raise TypeError(
'Unexpected property (%r); should be string or Property' % (proj,))
return fixed
def _check_properties(self, fixed, **kwargs):
modelclass = model.Model._kind_map.get(self.__kind)
if modelclass is not None:
modelclass._check_properties(fixed, **kwargs)
def analyze(self):
"""Return a list giving the parameters required by a query."""
class MockBindings(dict):
def __contains__(self, key):
self[key] = None
return True
bindings = MockBindings()
used = {}
ancestor = self.ancestor
if isinstance(ancestor, ParameterizedThing):
ancestor = ancestor.resolve(bindings, used)
filters = self.filters
if filters is not None:
filters = filters.resolve(bindings, used)
return sorted(used) # Returns only the keys.
def bind(self, *args, **kwds):
"""Bind parameter values. Returns a new Query object."""
return self._bind(args, kwds)
def _bind(self, args, kwds):
"""Bind parameter values. Returns a new Query object."""
bindings = dict(kwds)
for i, arg in enumerate(args):
bindings[i + 1] = arg
used = {}
ancestor = self.ancestor
if isinstance(ancestor, ParameterizedThing):
ancestor = ancestor.resolve(bindings, used)
filters = self.filters
if filters is not None:
filters = filters.resolve(bindings, used)
unused = []
for i in xrange(1, 1 + len(args)):
if i not in used:
unused.append(i)
if unused:
raise datastore_errors.BadArgumentError(
'Positional arguments %s were given but not used.' %
', '.join(str(i) for i in unused))
return self.__class__(kind=self.kind, ancestor=ancestor,
filters=filters, orders=self.orders,
app=self.app, namespace=self.namespace,
default_options=self.default_options,
projection=self.projection, group_by=self.group_by)
def gql(query_string, *args, **kwds):
"""Parse a GQL query string.
Args:
query_string: Full GQL query, e.g. 'SELECT * FROM Kind WHERE prop = 1'.
*args, **kwds: If present, used to call bind().
Returns:
An instance of query_class.
"""
qry = _gql(query_string)
if args or kwds:
qry = qry._bind(args, kwds)
return qry
@utils.positional(1)
def _gql(query_string, query_class=Query):
"""Parse a GQL query string (internal version).
Args:
query_string: Full GQL query, e.g. 'SELECT * FROM Kind WHERE prop = 1'.
query_class: Optional class to use, default Query.
Returns:
An instance of query_class.
"""
from .google_imports import gql # Late import, to avoid name conflict.
gql_qry = gql.GQL(query_string)
kind = gql_qry.kind()
if kind is None:
# The query must be lacking a "FROM <kind>" class. Let Expando
# stand in for the model class (it won't actually be used to
# construct the results).
modelclass = model.Expando
else:
modelclass = model.Model._kind_map.get(kind)
if modelclass is None:
# If the Adapter has a default model, use it; raise KindError otherwise.
ctx = tasklets.get_context()
modelclass = ctx._conn.adapter.default_model
if modelclass is None:
raise model.KindError(
"No model class found for kind %r. Did you forget to import it?" %
(kind,))
else:
# Adjust kind to the model class's kind (for PolyModel).
kind = modelclass._get_kind()
ancestor = None
flt = gql_qry.filters()
filters = list(modelclass._default_filters())
for name_op in sorted(flt):
name, op = name_op
values = flt[name_op]
op = op.lower()
if op == 'is' and name == gql.GQL._GQL__ANCESTOR:
if len(values) != 1:
raise ValueError('"is" requires exactly one value')
[(func, args)] = values
ancestor = _args_to_val(func, args)
continue
if op not in _OPS:
raise NotImplementedError('Operation %r is not supported.' % op)
for (func, args) in values:
val = _args_to_val(func, args)
prop = _get_prop_from_modelclass(modelclass, name)
if prop._name != name:
raise RuntimeError('Whoa! _get_prop_from_modelclass(%s, %r) '
'returned a property whose name is %r?!' %
(modelclass.__name__, name, prop._name))
if isinstance(val, ParameterizedThing):
node = ParameterNode(prop, op, val)
elif op == 'in':
node = prop._IN(val)
else:
node = prop._comparison(op, val)
filters.append(node)
if filters:
filters = ConjunctionNode(*filters)
else:
filters = None
orders = _orderings_to_orders(gql_qry.orderings(), modelclass)
offset = gql_qry.offset()
limit = gql_qry.limit()
if limit < 0:
limit = None
keys_only = gql_qry._keys_only
if not keys_only:
keys_only = None
options = QueryOptions(offset=offset, limit=limit, keys_only=keys_only)
projection = gql_qry.projection()
if gql_qry.is_distinct():
group_by = projection
else:
group_by = None
qry = query_class(kind=kind,
ancestor=ancestor,
filters=filters,
orders=orders,
default_options=options,
projection=projection,
group_by=group_by)
return qry
class QueryIterator(object):
"""This iterator works both for synchronous and async callers!
For synchronous callers, just use:
for entity in Account.query():
<use entity>
Async callers use this idiom:
it = iter(Account.query())
while (yield it.has_next_async()):
entity = it.next()
<use entity>
You can also use q.iter([options]) instead of iter(q); this allows
passing query options such as keys_only or produce_cursors.
When keys_only is set, it.next() returns a key instead of an entity.
When produce_cursors is set, the methods it.cursor_before() and
it.cursor_after() return Cursor objects corresponding to the query
position just before and after the item returned by it.next().
Before it.next() is called for the first time, both raise an
exception. Once the loop is exhausted, both return the cursor after
the last item returned. Calling it.has_next() does not affect the
cursors; you must call it.next() before the cursors move. Note that
sometimes requesting a cursor requires a datastore roundtrip (but
not if you happen to request a cursor corresponding to a batch
boundary). If produce_cursors is not set, both methods always raise
an exception.
Note that queries requiring in-memory merging of multiple queries
(i.e. queries using the IN, != or OR operators) do not support query
options.
"""
# When produce_cursors is set, _lookahead collects (batch, index)
# pairs passed to _extended_callback(), and (_batch, _index)
# contain the info pertaining to the current item.
_lookahead = None
_batch = None
_index = None
# Indicate the loop is exhausted.
_exhausted = False
@utils.positional(2)
def __init__(self, query, **q_options):
"""Constructor. Takes a Query and query options.
This is normally called by Query.iter() or Query.__iter__().
"""
ctx = tasklets.get_context()
options = query._make_options(q_options)
callback = self._extended_callback
self._iter = ctx.iter_query(query,
callback=callback,
pass_batch_into_callback=True,
options=options)
self._fut = None
def _extended_callback(self, batch, index, ent):
if self._exhausted:
raise RuntimeError('QueryIterator is already exhausted')
# TODO: Make _lookup a deque.
if self._lookahead is None:
self._lookahead = []
self._lookahead.append((batch, index))
return ent
def _consume_item(self):
if self._lookahead:
self._batch, self._index = self._lookahead.pop(0)
else:
self._batch = self._index = None
def cursor_before(self):
"""Return the cursor before the current item.
You must pass a QueryOptions object with produce_cursors=True
for this to work.
If there is no cursor or no current item, raise BadArgumentError.
Before next() has returned there is no cursor. Once the loop is
exhausted, this returns the cursor after the last item.
"""
if self._batch is None:
raise datastore_errors.BadArgumentError('There is no cursor currently')
# TODO: if cursor_after() was called for the previous item
# reuse that result instead of computing it from scratch.
# (Some cursor() calls make a datastore roundtrip.)
# TODO: reimplement the cursor() call to use NDB async I/O;
# perhaps even add async versions of cursor_before/after.
return self._batch.cursor(self._index + self._exhausted)
def cursor_after(self):
"""Return the cursor after the current item.
You must pass a QueryOptions object with produce_cursors=True
for this to work.
If there is no cursor or no current item, raise BadArgumentError.
Before next() has returned there is no cursor. Once the loop is
exhausted, this returns the cursor after the last item.
"""
if self._batch is None:
raise datastore_errors.BadArgumentError('There is no cursor currently')
return self._batch.cursor(self._index + 1) # TODO: inline this as async.
def index_list(self):
"""Return the list of indexes used for this query.
This returns a list of index representations, where an index
representation is the same as what is returned by get_indexes().
Before the first result, the information is unavailable, and then
None is returned. This is not the same as an empty list -- the
empty list means that no index was used to execute the query. (In
the dev_appserver, an empty list may also mean that only built-in
indexes were used; metadata queries also return an empty list
here.)
Proper use is as follows:
q = <modelclass>.query(<filters>)
i = q.iter()
try:
i.next()
except Stopiteration:
pass
indexes = i.index_list()
assert isinstance(indexes, list)
Notes:
- Forcing produce_cursors=False makes this always return None.
- This always returns None for a multi-query.
"""
# TODO: Technically it is possible to implement this for
# multi-query by merging all the index lists from each subquery.
# Return None if the batch has no attribute index_list.
# This also applies when the batch itself is None.
return getattr(self._batch, 'index_list', None)
def __iter__(self):
"""Iterator protocol: get the iterator for this iterator, i.e. self."""
return self
def probably_has_next(self):
"""Return whether a next item is (probably) available.
This is not quite the same as has_next(), because when
produce_cursors is set, some shortcuts are possible. However, in
some cases (e.g. when the query has a post_filter) we can get a
false positive (returns True but next() will raise StopIteration).
There are no false negatives, if Batch.more_results doesn't lie.
"""
if self._lookahead:
return True
if self._batch is not None:
return self._batch.more_results
return self.has_next()
def has_next(self):
"""Return whether a next item is available.
See the module docstring for the usage pattern.
"""
return self.has_next_async().get_result()
@tasklets.tasklet
def has_next_async(self):
"""Return a Future whose result will say whether a next item is available.
See the module docstring for the usage pattern.
"""
if self._fut is None:
self._fut = self._iter.getq()
flag = True
try:
yield self._fut
except EOFError:
flag = False
raise tasklets.Return(flag)
def next(self):
"""Iterator protocol: get next item or raise StopIteration."""
if self._fut is None:
self._fut = self._iter.getq()
try:
try:
ent = self._fut.get_result()
self._consume_item()
return ent
except EOFError:
self._exhausted = True
raise StopIteration
finally:
self._fut = None
class _SubQueryIteratorState(object):
"""Helper class for _MultiQuery."""
def __init__(self, batch_i_entity, iterator, dsquery, orders):
batch, index, entity = batch_i_entity
self.batch = batch
self.index = index
self.entity = entity
self.iterator = iterator
self.dsquery = dsquery
self.orders = orders
def __cmp__(self, other):
if not isinstance(other, _SubQueryIteratorState):
raise NotImplementedError('Can only compare _SubQueryIteratorState '
'instances to other _SubQueryIteratorState '
'instances; not %r' % other)
if not self.orders == other.orders:
raise NotImplementedError('Cannot compare _SubQueryIteratorStates with '
'differing orders (%r != %r)' %
(self.orders, other.orders))
lhs = self.entity._orig_pb
rhs = other.entity._orig_pb
lhs_filter = self.dsquery._filter_predicate
rhs_filter = other.dsquery._filter_predicate
names = self.orders._get_prop_names()
# TODO: In some future version, there won't be a need to add the
# filters' names.
if lhs_filter is not None:
names |= lhs_filter._get_prop_names()
if rhs_filter is not None:
names |= rhs_filter._get_prop_names()
lhs_value_map = datastore_query._make_key_value_map(lhs, names)
rhs_value_map = datastore_query._make_key_value_map(rhs, names)
if lhs_filter is not None:
lhs_filter._prune(lhs_value_map)
if rhs_filter is not None:
rhs_filter._prune(rhs_value_map)
return self.orders._cmp(lhs_value_map, rhs_value_map)
class _MultiQuery(object):
"""Helper class to run queries involving !=, IN or OR operators."""
# This is not instantiated by the user directly, but implicitly when
# iterating over a query with at least one filter using an IN, OR or
# != operator. Note that some options must be interpreted by
# _MultiQuery instead of passed to the underlying Queries' methods,
# e.g. offset (though not necessarily limit, and I'm not sure about
# cursors).
# TODO: Need a way to specify the unification of two queries that
# are identical except one has an ancestor and the other doesn't.
# The HR datastore makes that a useful special case.
def __init__(self, subqueries):
if not isinstance(subqueries, list):
raise TypeError('subqueries must be a list; received %r' % subqueries)
for subq in subqueries:
if not isinstance(subq, Query):
raise TypeError('Each subquery must be a Query instances; received %r'
% subq)
first_subquery = subqueries[0]
kind = first_subquery.kind
orders = first_subquery.orders
if not kind:
raise ValueError('Subquery kind cannot be missing')
for subq in subqueries[1:]:
if subq.kind != kind:
raise ValueError('Subqueries must be for a common kind (%s != %s)' %
(subq.kind, kind))
elif subq.orders != orders:
raise ValueError('Subqueries must have the same order(s) (%s != %s)' %
(subq.orders, orders))
# TODO: Ensure that app and namespace match, when we support them.
self.__subqueries = subqueries
self.__orders = orders
self.ancestor = None # Hack for map_query().
def _make_options(self, q_options):
return self.__subqueries[0].default_options
@property
def orders(self):
return self.__orders
@property
def default_options(self):
return self.__subqueries[0].default_options
@tasklets.tasklet
def run_to_queue(self, queue, conn, options=None):
"""Run this query, putting entities into the given queue."""
if options is None:
# Default options.
offset = None
limit = None
keys_only = None
else:
# Capture options we need to simulate.
offset = options.offset
limit = options.limit
keys_only = options.keys_only
# Cursors are supported for certain orders only.
if (options.start_cursor or options.end_cursor or
options.produce_cursors):
names = set()
if self.__orders is not None:
names = self.__orders._get_prop_names()
if '__key__' not in names:
raise datastore_errors.BadArgumentError(
'_MultiQuery with cursors requires __key__ order')
# Decide if we need to modify the options passed to subqueries.
# NOTE: It would seem we can sometimes let the datastore handle
# the offset natively, but this would thwart the duplicate key
# detection, so we always have to emulate the offset here.
# We can set the limit we pass along to offset + limit though,
# since that is the maximum number of results from a single
# subquery we will ever have to consider.
modifiers = {}
if offset:
modifiers['offset'] = None
if limit is not None:
modifiers['limit'] = min(_MAX_LIMIT, offset + limit)
if keys_only and self.__orders is not None:
modifiers['keys_only'] = None
if modifiers:
options = QueryOptions(config=options, **modifiers)
if offset is None:
offset = 0
if limit is None:
limit = _MAX_LIMIT
if self.__orders is None:
# Run the subqueries sequentially; there is no order to keep.
keys_seen = set()
for subq in self.__subqueries:
if limit <= 0:
break
subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[ser]')
subq.run_to_queue(subit, conn, options=options)
while limit > 0:
try:
batch, index, result = yield subit.getq()
except EOFError:
break
if keys_only:
key = result
else:
key = result._key
if key not in keys_seen:
keys_seen.add(key)
if offset > 0:
offset -= 1
else:
limit -= 1
queue.putq((None, None, result))
queue.complete()
return
# This with-statement causes the adapter to set _orig_pb on all
# entities it converts from protobuf.
# TODO: Does this interact properly with the cache?
with conn.adapter:
# Start running all the sub-queries.
todo = [] # List of (subit, dsquery) tuples.
for subq in self.__subqueries:
dsquery = subq._get_query(conn)
subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[par]')
subq.run_to_queue(subit, conn, options=options, dsquery=dsquery)
todo.append((subit, dsquery))
# Create a list of (first-entity, subquery-iterator) tuples.
state = [] # List of _SubQueryIteratorState instances.
for subit, dsquery in todo:
try:
thing = yield subit.getq()
except EOFError:
continue
else:
state.append(_SubQueryIteratorState(thing, subit, dsquery,
self.__orders))
# Now turn it into a sorted heap. The heapq module claims that
# calling heapify() is more efficient than calling heappush() for
# each item.
heapq.heapify(state)
# Repeatedly yield the lowest entity from the state vector,
# filtering duplicates. This is essentially a multi-way merge
# sort. One would think it should be possible to filter
# duplicates simply by dropping other entities already in the
# state vector that are equal to the lowest entity, but because of
# the weird sorting of repeated properties, we have to explicitly
# keep a set of all keys, so we can remove later occurrences.
# Note that entities will still be sorted correctly, within the
# constraints given by the sort order.
keys_seen = set()
while state and limit > 0:
item = heapq.heappop(state)
batch = item.batch
index = item.index
entity = item.entity
key = entity._key
if key not in keys_seen:
keys_seen.add(key)
if offset > 0:
offset -= 1
else:
limit -= 1
if keys_only:
queue.putq((batch, index, key))
else:
queue.putq((batch, index, entity))
subit = item.iterator
try:
batch, index, entity = yield subit.getq()
except EOFError:
pass
else:
item.batch = batch
item.index = index
item.entity = entity
heapq.heappush(state, item)
queue.complete()
# Datastore API using the default context.
def iter(self, **q_options):
return QueryIterator(self, **q_options)
__iter__ = iter
# TODO: Add fetch() etc.?
# Helper functions to convert between orders and orderings. An order
# is a datastore_query.Order instance. An ordering is a
# (property_name, direction) tuple.
def _order_to_ordering(order):
pb = order._to_pb()
return pb.property(), pb.direction() # TODO: What about UTF-8?
def _orders_to_orderings(orders):
if orders is None:
return []
if isinstance(orders, datastore_query.PropertyOrder):
return [_order_to_ordering(orders)]
if isinstance(orders, datastore_query.CompositeOrder):
# TODO: What about UTF-8?
return [(pb.property(), pb.direction()) for pb in orders._to_pbs()]
raise ValueError('Bad order: %r' % (orders,))
def _ordering_to_order(ordering, modelclass):
name, direction = ordering
prop = _get_prop_from_modelclass(modelclass, name)
if prop._name != name:
raise RuntimeError('Whoa! _get_prop_from_modelclass(%s, %r) '
'returned a property whose name is %r?!' %
(modelclass.__name__, name, prop._name))
return datastore_query.PropertyOrder(name, direction)
def _orderings_to_orders(orderings, modelclass):
orders = [_ordering_to_order(o, modelclass) for o in orderings]
if not orders:
return None
if len(orders) == 1:
return orders[0]
return datastore_query.CompositeOrder(orders)