blob: 50abb9a0240a7282417eb56d50968363b698f164 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""SQlite-based stub for the Python datastore API.
Entities are stored in an sqlite database in a similar fashion to the production
datastore.
Transactions are serialized through __tx_lock. Each transaction acquires it
when it begins and releases it when it commits or rolls back.
"""
import array
import itertools
import logging
import threading
import weakref
from google.appengine.datastore import entity_pb
from google.appengine.api import apiproxy_stub
from google.appengine.api import datastore_types
from google.appengine.datastore import datastore_pb
from google.appengine.datastore import datastore_stub_util
from google.appengine.datastore import sortable_pb_encoder
from google.appengine.runtime import apiproxy_errors
try:
import pysqlite2.dbapi2 as sqlite3
except ImportError:
import sqlite3
import __builtin__
buffer = __builtin__.buffer
datastore_pb.Query.__hash__ = lambda self: hash(self.Encode())
_MAX_TIMEOUT = 5.0
_OPERATOR_MAP = {
datastore_pb.Query_Filter.LESS_THAN: '<',
datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=',
datastore_pb.Query_Filter.EQUAL: '=',
datastore_pb.Query_Filter.GREATER_THAN: '>',
datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=',
'!=': '!=',
}
_ORDER_MAP = {
datastore_pb.Query_Order.ASCENDING: 'ASC',
datastore_pb.Query_Order.DESCENDING: 'DESC',
}
_CORE_SCHEMA = """
CREATE TABLE IF NOT EXISTS Apps (
app_id TEXT NOT NULL PRIMARY KEY,
indexes BLOB);
CREATE TABLE IF NOT EXISTS Namespaces (
app_id TEXT NOT NULL,
name_space TEXT NOT NULL,
PRIMARY KEY (app_id, name_space));
CREATE TABLE IF NOT EXISTS IdSeq (
prefix TEXT NOT NULL PRIMARY KEY,
next_id INT NOT NULL);
CREATE TABLE IF NOT EXISTS ScatteredIdCounters (
prefix TEXT NOT NULL PRIMARY KEY,
next_id INT NOT NULL);
"""
_NAMESPACE_SCHEMA = """
CREATE TABLE "%(prefix)s!Entities" (
__path__ BLOB NOT NULL PRIMARY KEY,
kind TEXT NOT NULL,
entity BLOB NOT NULL);
CREATE INDEX "%(prefix)s!EntitiesByKind" ON "%(prefix)s!Entities" (
kind ASC,
__path__ ASC);
CREATE TABLE "%(prefix)s!EntitiesByProperty" (
kind TEXT NOT NULL,
name TEXT NOT NULL,
value BLOB NOT NULL,
__path__ BLOB NOT NULL REFERENCES Entities,
PRIMARY KEY(kind ASC, name ASC, value ASC, __path__ ASC) ON CONFLICT IGNORE);
CREATE INDEX "%(prefix)s!EntitiesByPropertyDesc"
ON "%(prefix)s!EntitiesByProperty" (
kind ASC,
name ASC,
value DESC,
__path__ ASC);
CREATE INDEX "%(prefix)s!EntitiesByPropertyKey"
ON "%(prefix)s!EntitiesByProperty" (
__path__ ASC);
INSERT OR IGNORE INTO Apps (app_id) VALUES ('%(app_id)s');
INSERT INTO Namespaces (app_id, name_space)
VALUES ('%(app_id)s', '%(name_space)s');
INSERT OR IGNORE INTO IdSeq VALUES ('%(prefix)s', 1);
INSERT OR IGNORE INTO ScatteredIdCounters VALUES ('%(prefix)s', 1);
"""
class SQLiteCursorWrapper(sqlite3.Cursor):
"""Substitutes sqlite3.Cursor, with a cursor that logs commands.
Inherits from sqlite3.Cursor class and extends methods such as execute,
executemany and execute script, so it logs SQL calls.
"""
def execute(self, sql, *args):
"""Replaces execute() with a logging variant."""
if args:
parameters = []
for arg in args:
if isinstance(arg, buffer):
parameters.append('<blob>')
else:
parameters.append(repr(arg))
logging.debug('SQL Execute: %s - \n %s', sql,
'\n '.join(str(param) for param in parameters))
else:
logging.debug(sql)
return super(SQLiteCursorWrapper, self).execute(sql, *args)
def executemany(self, sql, seq_parameters):
"""Replaces executemany() with a logging variant."""
seq_parameters_list = list(seq_parameters)
logging.debug('SQL ExecuteMany: %s - \n %s', sql,
'\n '.join(str(param) for param in seq_parameters_list))
return super(SQLiteCursorWrapper, self).executemany(sql,
seq_parameters_list)
def executescript(self, sql):
"""Replaces executescript() with a logging variant."""
logging.debug('SQL ExecuteScript: %s', sql)
return super(SQLiteCursorWrapper, self).executescript(sql)
class SQLiteConnectionWrapper(sqlite3.Connection):
"""Substitutes sqlite3.Connection with a connection that logs commands.
Inherits from sqlite3.Connection class and overrides cursor
replacing the default cursor with an instance of SQLiteCursorWrapper. This
automatically makes execute, executemany, executescript and others use the
SQLiteCursorWrapper.
"""
def cursor(self):
"""Substitutes standard cursor() with a SQLiteCursorWrapper to log queries.
Substitutes the standard sqlite.Cursor with SQLiteCursorWrapper to ensure
all cursor requests get intercepted.
Returns:
A SQLiteCursorWrapper Instance.
"""
return super(SQLiteConnectionWrapper, self).cursor(SQLiteCursorWrapper)
def ReferencePropertyToReference(refprop):
ref = entity_pb.Reference()
ref.set_app(refprop.app())
if refprop.has_name_space():
ref.set_name_space(refprop.name_space())
for pathelem in refprop.pathelement_list():
ref.mutable_path().add_element().CopyFrom(pathelem)
return ref
def _DedupingEntityGenerator(cursor):
"""Generator that removes duplicate entities from the results.
Generate datastore entities from a cursor, skipping the duplicates
Args:
cursor: a SQLite3.Cursor or subclass.
Yields:
Entities that do not share a key.
"""
seen = set()
for row in cursor:
row_key, row_entity = row[:2]
encoded_row_key = str(row_key)
if encoded_row_key in seen:
continue
seen.add(encoded_row_key)
entity = entity_pb.EntityProto(row_entity)
datastore_stub_util.PrepareSpecialPropertiesForLoad(entity)
yield entity
def _ProjectionPartialEntityGenerator(cursor):
"""Generator that creates partial entities for projection.
Generate partial datastore entities from a cursor, holding only the values
being projected. These entities might share a key.
Args:
cursor: a SQLite3.Cursor or subclass.
Yields:
Partial entities resulting from the projection.
"""
for row in cursor:
entity_original = entity_pb.EntityProto(row[1])
entity = entity_pb.EntityProto()
entity.mutable_key().MergeFrom(entity_original.key())
entity.mutable_entity_group().MergeFrom(entity_original.entity_group())
for name, value_data in zip(row[2::2], row[3::2]):
prop_to_add = entity.add_property()
prop_to_add.set_name(ToUtf8(name))
value_decoder = sortable_pb_encoder.Decoder(
array.array('B', str(value_data)))
prop_to_add.mutable_value().Merge(value_decoder)
prop_to_add.set_multiple(False)
datastore_stub_util.PrepareSpecialPropertiesForLoad(entity)
yield entity
def MakeEntityForQuery(query, *path):
"""Make an entity to be returned by a pseudo-kind query.
Args:
query: the query which will return the entity.
path: pairs of type/name-or-id values specifying the entity's key
Returns:
An entity_pb.EntityProto with app and namespace as in query and the key
specified by path.
"""
pseudo_pb = entity_pb.EntityProto()
pseudo_pb.mutable_entity_group()
pseudo_pk = pseudo_pb.mutable_key()
pseudo_pk.set_app(query.app())
if query.has_name_space():
pseudo_pk.set_name_space(query.name_space())
for i in xrange(0, len(path), 2):
pseudo_pe = pseudo_pk.mutable_path().add_element()
pseudo_pe.set_type(path[i])
if isinstance(path[i + 1], basestring):
pseudo_pe.set_name(path[i + 1])
else:
pseudo_pe.set_id(path[i + 1])
return pseudo_pb
def ToUtf8(s):
"""Encoded s in utf-8 if it is an unicode string."""
if isinstance(s, unicode):
return s.encode('utf-8')
else:
return s
class KindPseudoKind(object):
"""Pseudo-kind for __kind__ queries.
Provides a Query method to perform the actual query.
Public properties:
name: the pseudo-kind name
"""
name = '__kind__'
def Query(self, query, filters, orders):
"""Perform a query on this pseudo-kind.
Args:
query: the original datastore_pb.Query
filters: the filters from query
orders: the orders from query
Returns:
A query cursor to iterate over the query results, or None if the query
is invalid.
"""
kind_range = datastore_stub_util.ParseKindQuery(query, filters, orders)
conn = self._stub._GetConnection()
cursor = None
try:
prefix = self._stub._GetTablePrefix(query)
filters = []
def AddExtremeFilter(extreme, inclusive, is_end):
"""Add filter for kind start/end."""
if not is_end:
if inclusive:
op = datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL
else:
op = datastore_pb.Query_Filter.GREATER_THAN
else:
if inclusive:
op = datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL
else:
op = datastore_pb.Query_Filter.LESS_THAN
filters.append(('kind', op, extreme))
kind_range.MapExtremes(AddExtremeFilter)
params = []
sql_filters = self._stub._CreateFilterString(filters, params)
sql_stmt = ('SELECT kind FROM "%s!Entities" %s GROUP BY kind'
% (prefix, sql_filters))
c = conn.execute(sql_stmt, params)
kinds = []
for row in c.fetchall():
kinds.append(MakeEntityForQuery(query, self.name, ToUtf8(row[0])))
cursor = datastore_stub_util._ExecuteQuery(kinds, query, [], [], [])
finally:
self._stub._ReleaseConnection(conn)
return cursor
class PropertyPseudoKind(object):
"""Pseudo-kind for __property__ queries.
Provides a Query method to perform the actual query.
Public properties:
name: the pseudo-kind name
"""
name = '__property__'
def Query(self, query, filters, orders):
"""Perform a query on this pseudo-kind.
Args:
query: the original datastore_pb.Query
filters: the filters from query
orders: the orders from query
Returns:
A query cursor to iterate over the query results, or None if the query
is invalid.
"""
property_range = datastore_stub_util.ParsePropertyQuery(query, filters,
orders)
keys_only = query.keys_only()
conn = self._stub._GetConnection()
cursor = None
try:
prefix = self._stub._GetTablePrefix(query)
filters = []
def AddExtremeFilter(extreme, inclusive, is_end):
"""Add filter for kind start/end."""
if not is_end:
op = datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL
else:
op = datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL
filters.append(('kind', op, extreme[0]))
property_range.MapExtremes(AddExtremeFilter)
for name in datastore_stub_util.GetInvisibleSpecialPropertyNames():
filters.append(('name', '!=', name))
params = []
sql_filters = self._stub._CreateFilterString(filters, params)
if not keys_only:
sql_stmt = ('SELECT kind, name, value FROM "%s!EntitiesByProperty" %s '
'GROUP BY kind, name, substr(value, 1, 1) '
'ORDER BY kind, name'
% (prefix, sql_filters))
else:
sql_stmt = ('SELECT kind, name FROM "%s!EntitiesByProperty" %s '
'GROUP BY kind, name ORDER BY kind, name'
% (prefix, sql_filters))
c = conn.execute(sql_stmt, params)
properties = []
kind = None
name = None
property_pb = None
for row in c.fetchall():
if not (row[0] == kind and row[1] == name):
if not property_range.Contains((row[0], row[1])):
continue
kind, name = row[:2]
if property_pb:
properties.append(property_pb)
property_pb = MakeEntityForQuery(query, KindPseudoKind.name,
ToUtf8(kind),
self.name, ToUtf8(name))
if not keys_only:
value_data = row[2]
value_decoder = sortable_pb_encoder.Decoder(
array.array('B', str(value_data)))
raw_value_pb = entity_pb.PropertyValue()
raw_value_pb.Merge(value_decoder)
tag = datastore_types.GetPropertyValueTag(raw_value_pb)
tag_name = datastore_stub_util._PROPERTY_TYPE_NAMES[tag]
representation_pb = property_pb.add_property()
representation_pb.set_name('property_representation')
representation_pb.set_multiple(True)
representation_pb.mutable_value().set_stringvalue(tag_name)
if property_pb:
properties.append(property_pb)
cursor = datastore_stub_util._ExecuteQuery(properties, query, [], [], [])
finally:
self._stub._ReleaseConnection(conn)
return cursor
class NamespacePseudoKind(object):
"""Pseudo-kind for __namespace__ queries.
Provides a Query method to perform the actual query.
Public properties:
name: the pseudo-kind name
"""
name = '__namespace__'
def Query(self, query, filters, orders):
"""Perform a query on this pseudo-kind.
Args:
query: the original datastore_pb.Query
filters: the filters from query
orders: the orders from query
Returns:
A query cursor to iterate over the query results, or None if the query
is invalid.
"""
namespace_range = datastore_stub_util.ParseNamespaceQuery(query, filters,
orders)
app_str = query.app()
namespace_entities = []
namespaces = self._stub._DatastoreSqliteStub__namespaces
for app_id, namespace in sorted(namespaces):
if app_id == app_str and namespace_range.Contains(namespace):
if namespace:
ns_id = namespace
else:
ns_id = datastore_types._EMPTY_NAMESPACE_ID
namespace_entities.append(MakeEntityForQuery(query, self.name, ns_id))
return datastore_stub_util._ExecuteQuery(namespace_entities, query,
[], [], [])
class DatastoreSqliteStub(datastore_stub_util.BaseDatastore,
apiproxy_stub.APIProxyStub,
datastore_stub_util.DatastoreStub):
"""Persistent stub for the Python datastore API.
Stores all entities in an SQLite database. A DatastoreSqliteStub instance
handles a single app's data.
"""
_MAX_QUERY_COMPONENTS = 63
READ_ERROR_MSG = ('Data in %s is corrupt or a different version. '
'Try running with the --clear_datastore flag.\n%r')
def __init__(self,
app_id,
datastore_file,
require_indexes=False,
verbose=False,
service_name='datastore_v3',
trusted=False,
consistency_policy=None,
root_path=None,
use_atexit=True,
auto_id_policy=datastore_stub_util.SEQUENTIAL):
"""Constructor.
Initializes the SQLite database if necessary.
Args:
app_id: string
datastore_file: string, path to sqlite database. Use None to create an
in-memory database.
require_indexes: bool, default False. If True, composite indexes must
exist in index.yaml for queries that need them.
verbose: bool, default False. If True, logs all select statements.
service_name: Service name expected for all calls.
trusted: bool, default False. If True, this stub allows an app to access
the data of another app.
consistency_policy: The consistency policy to use or None to use the
default. Consistency policies can be found in
datastore_stub_util.*ConsistencyPolicy
root_path: string, the root path of the app.
use_atexit: bool, indicates if the stub should save itself atexit.
auto_id_policy: enum, datastore_stub_util.SEQUENTIAL or .SCATTERED
"""
datastore_stub_util.BaseDatastore.__init__(self, require_indexes,
consistency_policy,
use_atexit and datastore_file,
auto_id_policy)
apiproxy_stub.APIProxyStub.__init__(self, service_name)
datastore_stub_util.DatastoreStub.__init__(self, weakref.proxy(self),
app_id, trusted, root_path)
self.__datastore_file = datastore_file
self.__verbose = verbose
self.__id_map_sequential = {}
self.__id_map_scattered = {}
self.__id_counter_tables = {
datastore_stub_util.SEQUENTIAL: ('IdSeq', self.__id_map_sequential),
datastore_stub_util.SCATTERED: ('ScatteredIdCounters',
self.__id_map_scattered),
}
self.__id_lock = threading.Lock()
if self.__verbose:
sql_conn = SQLiteConnectionWrapper
else:
sql_conn = sqlite3.Connection
self.__connection = sqlite3.connect(
self.__datastore_file or ':memory:',
timeout=_MAX_TIMEOUT,
check_same_thread=False,
factory=sql_conn)
self.__connection.text_factory = lambda x: unicode(x, 'utf-8', 'ignore')
self.__connection_lock = threading.RLock()
self.__namespaces = set()
self.__query_history = {}
self._RegisterPseudoKind(KindPseudoKind())
self._RegisterPseudoKind(PropertyPseudoKind())
self._RegisterPseudoKind(NamespacePseudoKind())
self._RegisterPseudoKind(datastore_stub_util.EntityGroupPseudoKind())
try:
self.__Init()
except sqlite3.DatabaseError, e:
raise apiproxy_errors.ApplicationError(datastore_pb.Error.INTERNAL_ERROR,
self.READ_ERROR_MSG %
(self.__datastore_file, e))
def __Init(self):
self.__connection.execute('PRAGMA synchronous = OFF')
self.__connection.executescript(_CORE_SCHEMA)
self.__connection.commit()
c = self.__connection.execute('SELECT app_id, name_space FROM Namespaces')
self.__namespaces = set(c.fetchall())
for app_ns in self.__namespaces:
prefix = ('%s!%s' % app_ns).replace('"', '""')
self.__connection.execute(
'INSERT OR IGNORE INTO ScatteredIdCounters VALUES (?, ?)',
(prefix, 1))
self.__connection.commit()
c = self.__connection.execute('SELECT app_id, indexes FROM Apps')
for _, index_proto in c.fetchall():
if not index_proto:
continue
indexes = datastore_pb.CompositeIndices(index_proto)
for index in indexes.index_list():
self._SideLoadIndex(index)
def Clear(self):
"""Clears the datastore."""
conn = self._GetConnection()
try:
datastore_stub_util.BaseDatastore.Clear(self)
datastore_stub_util.DatastoreStub.Clear(self)
c = conn.execute(
"SELECT tbl_name FROM sqlite_master WHERE type = 'table'")
for row in c.fetchall():
conn.execute('DROP TABLE "%s"' % row)
finally:
self._ReleaseConnection(conn)
self.__namespaces = set()
self.__id_map_sequential = {}
self.__id_map_scattered = {}
self.__Init()
def Read(self):
"""Reads the datastore from disk.
Noop for compatibility with file stub.
"""
pass
def Close(self):
"""Closes the SQLite connection and releases the files."""
conn = self._GetConnection()
conn.close()
@staticmethod
def __MakeParamList(size):
"""Returns a comma separated list of sqlite substitution parameters.
Args:
size: Number of parameters in returned list.
Returns:
A comma separated list of substitution parameters.
"""
return ','.join('?' * size)
@staticmethod
def __GetEntityKind(key):
"""Returns the kind of the Entity or Key.
It selects the kind of the last element of the entity_group element
list, as it contains the most specific type of the key.
Args:
key: A Key or EntityProto
Returns:
The kind of the sent Key or Entity
"""
if isinstance(key, entity_pb.EntityProto):
key = key.key()
return key.path().element_list()[-1].type()
@staticmethod
def __EncodeIndexPB(pb):
"""Encodes a protobuf using sortable_pb_encoder to preserve entity order.
Using sortable_pb_encoder, encodes the protobuf, while using
the ordering semantics for the datastore, and validating for the special
case of uservalues ordering.
Args:
pb: An Entity protobuf.
Returns:
A buffer holding the encoded protobuf.
"""
if isinstance(pb, entity_pb.PropertyValue) and pb.has_uservalue():
userval = entity_pb.PropertyValue()
userval.mutable_uservalue().set_email(pb.uservalue().email())
userval.mutable_uservalue().set_auth_domain(pb.uservalue().auth_domain())
userval.mutable_uservalue().set_gaiaid(0)
pb = userval
encoder = sortable_pb_encoder.Encoder()
pb.Output(encoder)
return buffer(encoder.buffer().tostring())
@staticmethod
def __AddQueryParam(query_params, param):
"""Adds a parameter to the query parameters."""
query_params.append(param)
return len(query_params)
@staticmethod
def _CreateFilterString(filter_list, params):
"""Transforms a filter list into an SQL WHERE clause.
Args:
filter_list: The list of (property, operator, value) filters
to transform. A value_type of -1 indicates no value type comparison
should be done.
params: out: A list of parameters to pass to the query.
Returns:
An SQL 'where' clause.
"""
clauses = []
for prop, operator, value in filter_list:
if operator == datastore_pb.Query_Filter.EXISTS:
continue
sql_op = _OPERATOR_MAP[operator]
value_index = DatastoreSqliteStub.__AddQueryParam(params, value)
clauses.append('%s %s :%d' % (prop, sql_op, value_index))
filters = ' AND '.join(clauses)
if filters:
filters = 'WHERE ' + filters
return filters
@staticmethod
def __CreateOrderString(order_list):
"""Returns an 'ORDER BY' clause from the given list of orders.
Args:
order_list: A list of (field, order) tuples.
Returns:
An SQL ORDER BY clause.
"""
orders = ', '.join('%s %s' % (x[0], _ORDER_MAP[x[1]]) for x in order_list)
if orders:
orders = 'ORDER BY ' + orders
return orders
def _GetConnection(self):
"""Retrieves a connection to the SQLite DB.
Returns:
An SQLite connection object.
"""
self.__connection_lock.acquire()
return self.__connection
def _ReleaseConnection(self, conn):
"""Releases a connection for use by other operations.
If a transaction is supplied, no action is taken.
Args:
conn: An SQLite connection object.
"""
conn.commit()
self.__connection_lock.release()
def __ConfigureNamespace(self, conn, prefix, app_id, name_space):
"""Ensures the relevant tables and indexes exist.
Args:
conn: An SQLite database connection.
prefix: The namespace prefix to configure.
app_id: The app ID.
name_space: The per-app namespace name.
"""
format_args = {'app_id': app_id, 'name_space': name_space, 'prefix': prefix}
conn.executescript(_NAMESPACE_SCHEMA % format_args)
conn.commit()
def __WriteIndexData(self, conn, app):
"""Writes index data to disk.
Args:
conn: An SQLite connection.
app: The app ID to write indexes for.
"""
indices = datastore_pb.CompositeIndices()
for index in self.GetIndexes(app, True, self._app_id):
indices.index_list().append(index)
conn.execute('UPDATE Apps SET indexes = ? WHERE app_id = ?',
(app, buffer(indices.Encode())))
def _GetTablePrefix(self, data):
"""Returns the namespace prefix for a query.
Args:
data: An Entity, Key or Query PB, or an (app_id, ns) tuple.
Returns:
A valid table prefix
"""
if isinstance(data, entity_pb.EntityProto):
data = data.key()
if not isinstance(data, tuple):
data = (data.app(), data.name_space())
prefix = ('%s!%s' % data).replace('"', '""')
if data not in self.__namespaces:
self.__namespaces.add(data)
self.__ConfigureNamespace(self.__connection, prefix, *data)
return prefix
def __DeleteRows(self, conn, paths, table):
"""Deletes rows from a table.
Args:
conn: An SQLite connection.
paths: Paths to delete.
table: The table to delete from.
Returns:
The number of rows deleted.
"""
c = conn.execute('DELETE FROM "%s" WHERE __path__ IN (%s)'
% (table, self.__MakeParamList(len(paths))),
paths)
return c.rowcount
def __DeleteEntityRows(self, conn, keys, table):
"""Deletes rows from the specified table that index the keys provided.
Args:
conn: A database connection.
keys: A list of keys to delete index entries for.
table: The table to delete from.
Returns:
The number of rows deleted.
"""
keys = sorted((x.app(), x.name_space(), x) for x in keys)
for (app_id, ns), group in itertools.groupby(keys, lambda x: x[:2]):
path_strings = [self.__EncodeIndexPB(x[2].path()) for x in group]
prefix = self._GetTablePrefix((app_id, ns))
return self.__DeleteRows(conn, path_strings, '%s!%s' % (prefix, table))
def __DeleteIndexEntries(self, conn, keys):
"""Deletes entities from the index.
Args:
conn: An SQLite connection.
keys: A list of keys to delete.
"""
self.__DeleteEntityRows(conn, keys, 'EntitiesByProperty')
def __InsertEntities(self, conn, entities):
"""Inserts or updates entities in the DB.
Args:
conn: A database connection.
entities: A list of entities to store.
"""
def RowGenerator(entities):
for unused_prefix, e in entities:
yield (self.__EncodeIndexPB(e.key().path()),
self.__GetEntityKind(e),
buffer(e.Encode()))
entities = sorted((self._GetTablePrefix(x), x) for x in entities)
for prefix, group in itertools.groupby(entities, lambda x: x[0]):
conn.executemany(
'INSERT OR REPLACE INTO "%s!Entities" VALUES (?, ?, ?)' % prefix,
RowGenerator(group))
def __InsertIndexEntries(self, conn, entities):
"""Inserts index entries for the supplied entities.
Args:
conn: A database connection.
entities: A list of entities to create index entries for.
"""
def RowGenerator(entities):
for unused_prefix, e in entities:
for p in e.property_list():
yield (self.__GetEntityKind(e),
p.name(),
self.__EncodeIndexPB(p.value()),
self.__EncodeIndexPB(e.key().path()))
entities = sorted((self._GetTablePrefix(x), x) for x in entities)
for prefix, group in itertools.groupby(entities, lambda x: x[0]):
conn.executemany(
'INSERT INTO "%s!EntitiesByProperty" VALUES (?, ?, ?, ?)' % prefix,
RowGenerator(group))
def MakeSyncCall(self, service, call, request, response, request_id=None):
"""The main RPC entry point. service must be 'datastore_v3'."""
self.AssertPbIsInitialized(request)
try:
apiproxy_stub.APIProxyStub.MakeSyncCall(self, service, call, request,
response, request_id)
except sqlite3.OperationalError, e:
raise apiproxy_errors.ApplicationError(datastore_pb.Error.INTERNAL_ERROR,
e.args[0])
self.AssertPbIsInitialized(response)
def AssertPbIsInitialized(self, pb):
"""Raises an exception if the given PB is not initialized and valid."""
explanation = []
assert pb.IsInitialized(explanation), explanation
pb.Encode()
def __GenerateFilterInfo(self, filters, query):
"""Transform a list of filters into a more usable form.
Args:
filters: A list of filter PBs.
query: The query to generate filter info for.
Returns:
A dict mapping property names to lists of (op, value) tuples.
"""
filter_info = {}
for filt in filters:
assert filt.property_size() == 1
prop = filt.property(0)
value = prop.value()
if prop.name() == '__key__':
value = ReferencePropertyToReference(value.referencevalue())
assert value.app() == query.app()
assert value.name_space() == query.name_space()
value = value.path()
filter_info.setdefault(prop.name(), []).append(
(filt.op(), self.__EncodeIndexPB(value)))
return filter_info
def __GenerateOrderInfo(self, orders):
"""Transform a list of orders into a more usable form.
Args:
orders: A list of order PBs.
Returns:
A list of (property, direction) tuples.
"""
orders = [(order.property(), order.direction()) for order in orders]
if orders and orders[-1] == ('__key__', datastore_pb.Query_Order.ASCENDING):
orders.pop()
return orders
def __GetPrefixRange(self, prefix):
"""Returns a (min, max) range that encompasses the given prefix.
Args:
prefix: A string prefix to filter for. Must be a PB encodable using
__EncodeIndexPB.
Returns:
(min, max): Start and end string values to filter on.
"""
ancestor_min = self.__EncodeIndexPB(prefix)
ancestor_max = buffer(str(ancestor_min) + '\xfb\xff\xff\xff\x89')
return ancestor_min, ancestor_max
def __KindQuery(self, query, filter_info, order_info):
"""Performs kind only, kind and ancestor, and ancestor only queries."""
if not (set(filter_info.keys()) |
set(x[0] for x in order_info)).issubset(['__key__']):
return None
if len(order_info) > 1:
return None
filters = []
filters.extend(('__path__', op, value) for op, value
in filter_info.get('__key__', []))
if query.has_kind():
filters.append(('kind', datastore_pb.Query_Filter.EQUAL, query.kind()))
if query.has_ancestor():
amin, amax = self.__GetPrefixRange(query.ancestor().path())
filters.append(('__path__',
datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL, amin))
filters.append(('__path__', datastore_pb.Query_Filter.LESS_THAN, amax))
if order_info:
orders = [('__path__', order_info[0][1])]
else:
orders = [('__path__', datastore_pb.Query_Order.ASCENDING)]
params = []
query = ('SELECT Entities.__path__, Entities.entity '
'FROM "%s!Entities" AS Entities %s %s' % (
self._GetTablePrefix(query),
self._CreateFilterString(filters, params),
self.__CreateOrderString(orders)))
return query, params
def __SinglePropertyQuery(self, query, filter_info, order_info):
"""Performs queries satisfiable by the EntitiesByProperty table."""
property_names = set(filter_info.keys())
property_names.update(x[0] for x in order_info)
if len(property_names) != 1:
return None
property_name = property_names.pop()
filter_ops = filter_info.get(property_name, [])
if len([1 for o, _ in filter_ops
if o == datastore_pb.Query_Filter.EQUAL]) > 1:
return None
if len(order_info) > 1 or (order_info and order_info[0][0] == '__key__'):
return None
if query.has_ancestor():
return None
if not query.has_kind():
return None
prefix = self._GetTablePrefix(query)
filters = []
filters.append(('EntitiesByProperty.kind',
datastore_pb.Query_Filter.EQUAL, query.kind()))
filters.append(('name', datastore_pb.Query_Filter.EQUAL, property_name))
for op, value in filter_ops:
if property_name == '__key__':
filters.append(('EntitiesByProperty.__path__', op, value))
else:
filters.append(('value', op, value))
orders = [('EntitiesByProperty.kind', datastore_pb.Query_Order.ASCENDING),
('name', datastore_pb.Query_Order.ASCENDING)]
if order_info:
orders.append(('value', order_info[0][1]))
else:
orders.append(('value', datastore_pb.Query_Order.ASCENDING))
orders.append(('EntitiesByProperty.__path__',
datastore_pb.Query_Order.ASCENDING))
params = []
format_args = (
prefix,
prefix,
self._CreateFilterString(filters, params),
self.__CreateOrderString(orders))
query = ('SELECT Entities.__path__, Entities.entity, '
'EntitiesByProperty.name, EntitiesByProperty.value '
'FROM "%s!EntitiesByProperty" AS EntitiesByProperty INNER JOIN '
'"%s!Entities" AS Entities USING (__path__) %s %s' % format_args)
return query, params
def __StarSchemaQueryPlan(self, query, filter_info, order_info):
"""Executes a query using a 'star schema' based on EntitiesByProperty.
A 'star schema' is a join between an objects table (Entities) and multiple
instances of a facts table (EntitiesByProperty). Ideally, this will result
in a merge join if the only filters are inequalities and the sort orders
match those in the index for the facts table; otherwise, the DB will do its
best to satisfy the query efficiently.
Args:
query: The datastore_pb.Query PB.
filter_info: A dict mapping properties filtered on to (op, value) tuples.
order_info: A list of (property, direction) tuples.
Returns:
(query, params): An SQL query string and list of parameters for it.
"""
filter_sets = []
for name, filter_ops in filter_info.items():
filter_sets.extend((name, [x]) for x in filter_ops
if x[0] == datastore_pb.Query_Filter.EQUAL)
ineq_ops = [x for x in filter_ops
if x[0] != datastore_pb.Query_Filter.EQUAL]
if ineq_ops:
filter_sets.append((name, ineq_ops))
for prop, _ in order_info:
if prop == '__key__':
continue
if prop not in filter_info:
filter_sets.append((prop, []))
prefix = self._GetTablePrefix(query)
joins = []
filters = []
join_name_map = {}
for name, filter_ops in filter_sets:
if name == '__key__':
for op, value in filter_ops:
filters.append(('Entities.__path__', op, buffer(value)))
else:
join_name = 'ebp_%d' % (len(joins),)
join_name_map.setdefault(name, join_name)
joins.append(
'INNER JOIN "%s!EntitiesByProperty" AS %s '
'ON Entities.__path__ = %s.__path__'
% (prefix, join_name, join_name))
filters.append(('%s.kind' % join_name, datastore_pb.Query_Filter.EQUAL,
query.kind()))
filters.append(('%s.name' % join_name, datastore_pb.Query_Filter.EQUAL,
name))
for op, value in filter_ops:
filters.append(('%s.value' % join_name, op, buffer(value)))
if query.has_ancestor():
amin, amax = self.__GetPrefixRange(query.ancestor().path())
filters.append(('%s.__path__' % join_name,
datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL,
amin))
filters.append(('%s.__path__' % join_name,
datastore_pb.Query_Filter.LESS_THAN, amax))
orders = []
for prop, order in order_info:
if prop == '__key__':
orders.append(('Entities.__path__', order))
else:
prop = '%s.value' % (join_name_map[prop],)
orders.append((prop, order))
if not order_info or order_info[-1][0] != '__key__':
orders.append(('Entities.__path__', datastore_pb.Query_Order.ASCENDING))
select_arg = 'Entities.__path__, Entities.entity '
if query.property_name_list():
for value_i in join_name_map.values():
select_arg += ', %s.name, %s.value' % (value_i, value_i)
params = []
format_args = (
select_arg,
prefix,
' '.join(joins),
self._CreateFilterString(filters, params),
self.__CreateOrderString(orders))
query = ('SELECT %s FROM "%s!Entities" AS Entities %s %s %s' % format_args)
logging.debug(query)
return query, params
def __MergeJoinQuery(self, query, filter_info, order_info):
if order_info:
return None
if query.has_ancestor():
return None
if not query.has_kind():
return None
for filter_ops in filter_info.values():
for op, _ in filter_ops:
if op != datastore_pb.Query_Filter.EQUAL:
return None
return self.__StarSchemaQueryPlan(query, filter_info, order_info)
def __LastResortQuery(self, query, filter_info, order_info):
"""Last resort query plan that executes queries requring composite indexes.
Args:
query: The datastore_pb.Query PB.
filter_info: A dict mapping properties filtered on to (op, value) tuples.
order_info: A list of (property, direction) tuples.
Returns:
(query, params): An SQL query string and list of parameters for it.
"""
return self.__StarSchemaQueryPlan(query, filter_info, order_info)
_QUERY_STRATEGIES = [
__KindQuery,
__SinglePropertyQuery,
__MergeJoinQuery,
__LastResortQuery,
]
def _Put(self, entity, insert):
conn = self._GetConnection()
try:
self.__DeleteIndexEntries(conn, [entity.key()])
entity = datastore_stub_util.StoreEntity(entity)
self.__InsertEntities(conn, [entity])
self.__InsertIndexEntries(conn, [entity])
finally:
self._ReleaseConnection(conn)
def _Get(self, key):
conn = self._GetConnection()
try:
prefix = self._GetTablePrefix(key)
c = conn.execute(
'SELECT entity FROM "%s!Entities" WHERE __path__ = ?' % (prefix,),
(self.__EncodeIndexPB(key.path()),))
row = c.fetchone()
if row:
entity = entity_pb.EntityProto()
entity.ParseFromString(row[0])
return datastore_stub_util.LoadEntity(entity)
finally:
self._ReleaseConnection(conn)
def _Delete(self, key):
conn = self._GetConnection()
try:
self.__DeleteIndexEntries(conn, [key])
self.__DeleteEntityRows(conn, [key], 'Entities')
finally:
self._ReleaseConnection(conn)
def _GetEntitiesInEntityGroup(self, entity_group):
query = datastore_pb.Query()
query.set_app(entity_group.app())
if entity_group.name_space():
query.set_name_space(entity_group.name_space())
query.mutable_ancestor().CopyFrom(entity_group)
filter_info = self.__GenerateFilterInfo(query.filter_list(), query)
order_info = self.__GenerateOrderInfo(query.order_list())
sql_stmt, params = self.__KindQuery(query, filter_info, order_info)
conn = self._GetConnection()
try:
db_cursor = conn.execute(sql_stmt, params)
entities = (entity_pb.EntityProto(row[1]) for row in db_cursor.fetchall())
return dict((datastore_types.ReferenceToKeyValue(entity.key()), entity)
for entity in entities)
finally:
self._ReleaseConnection(conn)
def _GetQueryCursor(self, query, filters, orders, index_list,
filter_predicate=None):
"""Returns a query cursor for the provided query.
Args:
query: The datastore_pb.Query to run.
filters: A list of filters that override the ones found on query.
orders: A list of orders that override the ones found on query.
index_list: A list of indexes used by the query.
filter_predicate: an additional filter of type
datastore_query.FilterPredicate. This is passed along to implement V4
specific filters without changing the entire stub.
Returns:
A QueryCursor object.
"""
if query.has_kind() and query.kind() in self._pseudo_kinds:
cursor = self._pseudo_kinds[query.kind()].Query(query, filters, orders)
datastore_stub_util.Check(cursor,
'Could not create query for pseudo-kind')
else:
orders = datastore_stub_util._GuessOrders(filters, orders)
filter_info = self.__GenerateFilterInfo(filters, query)
order_info = self.__GenerateOrderInfo(orders)
for strategy in DatastoreSqliteStub._QUERY_STRATEGIES:
result = strategy(self, query, filter_info, order_info)
if result:
break
else:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.BAD_REQUEST,
'No strategy found to satisfy query.')
sql_stmt, params = result
conn = self._GetConnection()
try:
if query.property_name_list():
db_cursor = _ProjectionPartialEntityGenerator(
conn.execute(sql_stmt, params))
else:
db_cursor = _DedupingEntityGenerator(conn.execute(sql_stmt, params))
dsquery = datastore_stub_util._MakeQuery(query, filters, orders,
filter_predicate)
filtered_entities = [r for r in db_cursor]
if filter_predicate:
filtered_entities = filter(filter_predicate, filtered_entities)
cursor = datastore_stub_util.ListCursor(
query, dsquery, orders, index_list, filtered_entities)
finally:
self._ReleaseConnection(conn)
return cursor
def __AllocateIdsFromBlock(self, conn, prefix, size, id_map, table):
datastore_stub_util.Check(size > 0, 'Size must be greater than 0.')
next_id, block_size = id_map.get(prefix, (0, 0))
if not block_size:
block_size = (size / 1000 + 1) * 1000
c = conn.execute('SELECT next_id FROM %s WHERE prefix = ? LIMIT 1'
% table, (prefix,))
next_id = c.fetchone()[0]
c = conn.execute('UPDATE %s SET next_id = next_id + ? WHERE prefix = ?'
% table, (block_size, prefix))
assert c.rowcount == 1
if size > block_size:
c = conn.execute('SELECT next_id FROM %s WHERE prefix = ? LIMIT 1'
% table, (prefix,))
start = c.fetchone()[0]
c = conn.execute('UPDATE %s SET next_id = next_id + ? WHERE prefix = ?'
% table, (size, prefix))
assert c.rowcount == 1
else:
start = next_id;
next_id += size
block_size -= size
id_map[prefix] = (next_id, block_size)
end = start + size - 1
return start, end
def __AdvanceIdCounter(self, conn, prefix, max_id, table):
datastore_stub_util.Check(max_id >=0,
'Max must be greater than or equal to 0.')
c = conn.execute('SELECT next_id FROM %s WHERE prefix = ? LIMIT 1'
% table, (prefix,))
start = c.fetchone()[0]
if max_id >= start:
c = conn.execute('UPDATE %s SET next_id = ? WHERE prefix = ?' % table,
(max_id + 1, prefix))
assert c.rowcount == 1
end = max(max_id, start - 1)
return start, end
def _AllocateSequentialIds(self, reference, size=1, max_id=None):
conn = self._GetConnection()
try:
datastore_stub_util.CheckAppId(self._trusted, self._app_id,
reference.app())
datastore_stub_util.Check(not (size and max_id),
'Both size and max cannot be set.')
prefix = self._GetTablePrefix(reference)
if size:
start, end = self.__AllocateIdsFromBlock(conn, prefix, size,
self.__id_map_sequential,
'IdSeq')
else:
start, end = self.__AdvanceIdCounter(conn, prefix, max_id, 'IdSeq')
return long(start), long(end)
finally:
self._ReleaseConnection(conn)
def _AllocateIds(self, references):
conn = self._GetConnection()
try:
full_keys = []
for key in references:
datastore_stub_util.CheckAppId(self._trusted, self._app_id, key.app())
prefix = self._GetTablePrefix(key)
last_element = key.path().element_list()[-1]
if last_element.id() or last_element.has_name():
for el in key.path().element_list():
if el.id():
count, id_space = datastore_stub_util.IdToCounter(el.id())
table, _ = self.__id_counter_tables[id_space]
self.__AdvanceIdCounter(conn, prefix, count, table)
else:
count, _ = self.__AllocateIdsFromBlock(conn, prefix, 1,
self.__id_map_scattered,
'ScatteredIdCounters')
last_element.set_id(datastore_stub_util.ToScatteredId(count))
full_keys.append(key)
return full_keys
finally:
self._ReleaseConnection(conn)
def _OnIndexChange(self, app_id):
conn = self._GetConnection()
try:
self.__WriteIndexData(conn, app_id)
finally:
self._ReleaseConnection(conn)