blob: e93859cea5d07aabcbfefaee6fc0c105652f5dcf [file] [log] [blame]
"""Prospective Search for NDB.
This reimplements all of the standard APIs with the following changes:
- A document_class argument must be an NDB Model class.
- A document must be an NDB Model instance.
- get_document() always returns an NDB Model instance.
The exceptions and public constants exported by the standard module
are re-exported here.
"""
# TODO: Ideally prospective search would support NDB natively, or
# support protobufs natively (in addition to ENTITY and MODEL).
# TODO: Should we try to support async calls as well? That can't be
# done without rewriting the standard prospective_search API module.
import base64
from .google_imports import datastore
from .google_imports import datastore_types
from .google_imports import prospective_search
from .google_imports import prospective_search_pb
from .google_imports import entity_pb
from . import model
from . import tasklets
# Re-export constants and exceptions from prospective_search.
DEFAULT_RESULT_BATCH_SIZE = prospective_search.DEFAULT_RESULT_BATCH_SIZE
DEFAULT_LEASE_DURATION_SEC = prospective_search.DEFAULT_LEASE_DURATION_SEC
DEFAULT_LIST_SUBSCRIPTIONS_MAX_RESULTS = \
prospective_search.DEFAULT_LIST_SUBSCRIPTIONS_MAX_RESULTS
DEFAULT_LIST_TOPICS_MAX_RESULTS = \
prospective_search.DEFAULT_LIST_TOPICS_MAX_RESULTS
Error = prospective_search.Error
DocumentTypeError = prospective_search.DocumentTypeError
QuerySyntaxError = prospective_search.QuerySyntaxError
SchemaError = prospective_search.SchemaError
SubscriptionDoesNotExist = prospective_search.SubscriptionDoesNotExist
TopicNotSpecified = prospective_search.TopicNotSpecified
SubscriptionState = prospective_search.SubscriptionState
subscription_state_name = prospective_search.subscription_state_name
__all__ = ['get_document',
'get_subscription',
'list_subscriptions',
'list_topics',
'match',
'unsubscribe',
'subscribe',
'subscription_state_name',
'DEFAULT_RESULT_BATCH_SIZE',
'DEFAULT_LEASE_DURATION_SEC',
'DEFAULT_LIST_SUBSCRIPTIONS_MAX_RESULTS',
'DEFAULT_LIST_TOPICS_MAX_RESULTS',
'DocumentTypeError',
'Error',
'QuerySyntaxError',
'SchemaError',
'SubscriptionDoesNotExist',
'SubscriptionState',
'TopicNotSpecified']
_doc_class = prospective_search_pb.MatchRequest # For testing get_document().
_MODEL_TYPE_TO_PYTHON_TYPE = {
model.StringProperty: str,
model.IntegerProperty: int,
model.BooleanProperty: bool,
model.FloatProperty: float,
model.TextProperty: str,
}
def _add_schema_entry(prop_class, name, schema):
"""Add single entry to SchemaEntries by invoking add_entry."""
python_type = _MODEL_TYPE_TO_PYTHON_TYPE.get(prop_class, None)
if not python_type:
return
if python_type not in schema:
schema[python_type] = [name]
else:
schema[python_type].append(name)
def _model_to_entity_schema(document_class):
"""Produce schema from NDB Model class."""
schema = {}
for name, prop in document_class._properties.iteritems():
_add_schema_entry(prop.__class__, name, schema)
return schema
def _get_document_topic(document_class, topic):
assert issubclass(document_class, model.Model)
if topic:
return topic
return document_class._get_kind()
def subscribe(document_class,
query,
sub_id,
schema=None,
topic=None,
lease_duration_sec=DEFAULT_LEASE_DURATION_SEC):
"""Subscribe a query."""
assert schema is None
topic = _get_document_topic(document_class, topic)
schema = _model_to_entity_schema(document_class)
return prospective_search.subscribe(
datastore.Entity,
query,
sub_id,
schema=schema,
topic=topic,
lease_duration_sec=lease_duration_sec)
def unsubscribe(document_class, sub_id, topic=None):
topic = _get_document_topic(document_class, topic)
prospective_search.unsubscribe(datastore.Entity, sub_id, topic=topic)
def get_subscription(document_class, sub_id, topic=None):
"""Get subscription information."""
topic = _get_document_topic(document_class, topic)
return prospective_search.get_subscription(datastore.Entity, sub_id,
topic=topic)
def list_subscriptions(document_class,
sub_id_start='',
topic=None,
max_results=DEFAULT_LIST_SUBSCRIPTIONS_MAX_RESULTS,
expires_before=None):
"""List subscriptions on a topic."""
topic = _get_document_topic(document_class, topic)
return prospective_search.list_subscriptions(
datastore.Entity,
sub_id_start=sub_id_start,
topic=topic,
max_results=max_results,
expires_before=expires_before)
list_topics = prospective_search.list_topics
def match(document,
topic=None,
result_key=None,
result_relative_url='/_ah/prospective_search',
result_task_queue='default',
result_batch_size=DEFAULT_RESULT_BATCH_SIZE,
result_return_document=True):
"""Match document with all subscribed queries on specified topic."""
# Convert document to datastore.Entity.
topic = _get_document_topic(document.__class__, topic)
pb = document._to_pb()
entity = datastore.Entity('temp-kind').FromPb(pb)
return prospective_search.match(
entity,
topic=topic,
result_key=result_key,
result_relative_url=result_relative_url,
result_task_queue=result_task_queue,
result_batch_size=result_batch_size,
result_return_document=result_return_document)
def get_document(request):
"""Decodes document from prospective_search result POST request.
Args:
request: received POST request
Returns:
document: original NDB Model document from match call.
Raises:
DocumentTypeError: if document class is not recognized.
"""
doc_class = request.get('python_document_class')
if not doc_class:
return None
entity = entity_pb.EntityProto()
entity.ParseFromString(base64.urlsafe_b64decode(
request.get('document').encode('utf-8')))
doc_class = int(doc_class)
ctx = tasklets.get_context()
adapter = ctx._conn.adapter
return adapter.pb_to_entity(entity)