blob: 860fc65279dff987b70268eba4a66feac51a6402 [file] [log] [blame]
# Copyright 2014 The Swarming Authors. All rights reserved.
# Use of this source code is governed by the Apache v2.0 license that can be
# found in the LICENSE file.
"""This module defines Isolate Server frontend url handlers."""
import binascii
import datetime
import re
import time
from google.appengine import runtime
from google.appengine.api import datastore_errors
from google.appengine.api import memcache
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
import endpoints
from protorpc import message_types
from protorpc import messages
from protorpc import remote
from components import auth
from components import utils
import config
import gcs
from handlers_api import hash_content
from handlers_api import MIN_SIZE_FOR_DIRECT_GS
import model
import stats
### Request Types
class Digest(messages.Message):
"""ProtoRPC message containing digest information."""
digest = messages.StringField(1)
is_isolated = messages.BooleanField(2, default=False)
size = messages.IntegerField(3)
class Namespace(messages.Message):
"""Encapsulates namespace, compression, and hash algorithm."""
namespace = messages.StringField(1, default='default')
digest_hash = messages.StringField(2, default='SHA-1')
compression = messages.StringField(3, default='flate')
class DigestCollection(messages.Message):
"""Endpoints request type analogous to the existing JSON post body."""
items = messages.MessageField(Digest, 1, repeated=True)
namespace = messages.MessageField(Namespace, 2, repeated=False)
class StorageRequest(messages.Message):
"""ProtoRPC message representing an entity to be added to the data store."""
upload_ticket = messages.StringField(1)
content = messages.BytesField(2)
class FinalizeRequest(messages.Message):
"""Request to validate upload of large Google storage entities."""
upload_ticket = messages.StringField(1)
class RetrieveRequest(messages.Message):
"""Request to retrieve content from memcache, datastore, or GS."""
digest = messages.StringField(1, required=True)
namespace = messages.MessageField(Namespace, 2)
offset = messages.IntegerField(3, default=0)
### Response Types
class PreuploadStatus(messages.Message):
"""Endpoints response type for a single URL or pair of URLs."""
gs_upload_url = messages.StringField(1)
upload_ticket = messages.StringField(2)
index = messages.IntegerField(3)
class UrlCollection(messages.Message):
"""Endpoints response type analogous to existing JSON response."""
items = messages.MessageField(PreuploadStatus, 1, repeated=True)
class RetrievedContent(messages.Message):
"""Content retrieved from DB, or GS URL."""
content = messages.BytesField(1)
url = messages.StringField(2)
class PushPing(messages.Message):
"""Indicates whether data storage executed successfully."""
ok = messages.BooleanField(1)
class ServerDetails(messages.Message):
"""Reports the current API version."""
server_version = messages.StringField(1)
### Utility
# default expiration time for signed links
DEFAULT_LINK_EXPIRATION = datetime.timedelta(hours=4)
# messages for generating and validating upload tickets
UPLOAD_MESSAGES = ['datastore', 'gs']
class TokenSigner(auth.TokenKind):
"""Used to create upload tickets."""
expiration_sec = DEFAULT_LINK_EXPIRATION.total_seconds()
secret_key = auth.SecretKey('isolate_upload_token', scope='local')
@ndb.transactional
def store_and_enqueue_verify_task(entry, task_queue_host):
entry.put()
taskqueue.add(
url='/internal/taskqueue/verify/%s' % entry.key.id(),
queue_name='verify',
headers={'Host': task_queue_host},
transactional=True,
)
def entry_key_or_error(namespace, digest):
try:
return model.entry_key(namespace, digest)
except ValueError as error:
raise endpoints.BadRequestException(error.message)
### API
@auth.endpoints_api(name='isolateservice', version='v1')
class IsolateService(remote.Service):
"""Implement API methods corresponding to handlers in handlers_api."""
_gs_url_signer = None
### Endpoints Methods
@auth.endpoints_method(DigestCollection, UrlCollection, http_method='POST')
def preupload(self, request):
"""Checks for entry's existence and generates upload URLs.
Arguments:
request: the DigestCollection to be posted
Returns:
the UrlCollection corresponding to the uploaded digests
The response list is commensurate to the request's; each UrlMessage has
* if an entry is missing: two URLs: the URL to upload a file
to and the URL to call when the upload is done (can be null).
* if the entry is already present: null URLs ('').
UrlCollection([
UrlMessage(
upload_url = "<upload url>"
finalize_url = "<finalize url>"
)
UrlMessage(
upload_url = '')
...
])
"""
response = UrlCollection(items=[])
# check for namespace error
if not re.match(r'^%s$' % model.NAMESPACE_RE, request.namespace.namespace):
raise endpoints.BadRequestException(
'Invalid namespace; allowed keys must pass regexp "%s"' %
model.NAMESPACE_RE)
# check for existing elements
new_digests, existing_digests = self.partition_collection(request)
# process all elements; add an upload ticket for cache misses
for index, digest_element in enumerate(request.items):
# check for error conditions
if not model.is_valid_hex(digest_element.digest):
raise endpoints.BadRequestException(
'Invalid hex code: %s' % (digest_element.digest))
if digest_element in new_digests:
# generate preupload ticket
status = PreuploadStatus(
index=index,
upload_ticket=self.generate_ticket(
digest_element, request.namespace))
# generate GS upload URL if necessary
if self.should_push_to_gs(digest_element):
key = entry_key_or_error(
request.namespace.namespace, digest_element.digest)
status.gs_upload_url = self.gs_url_signer.get_upload_url(
filename=key.id(),
content_type='application/octet-stream',
expiration=DEFAULT_LINK_EXPIRATION)
response.items.append(status)
# Tag existing entities and collect stats.
self.tag_existing(DigestCollection(
items=list(existing_digests), namespace=request.namespace))
stats.add_entry(stats.LOOKUP, len(request.items), len(existing_digests))
return response
@auth.endpoints_method(StorageRequest, PushPing)
def store_inline(self, request):
"""Stores relatively small entities in the datastore."""
return self.storage_helper(request, False)
@auth.endpoints_method(FinalizeRequest, PushPing)
def finalize_gs_upload(self, request):
"""Informs client that large entities have been uploaded to GCS."""
return self.storage_helper(request, True)
@auth.endpoints_method(RetrieveRequest, RetrievedContent)
def retrieve(self, request):
"""Retrieves content from a storage location."""
content = None
key = None
offset = request.offset
# try the memcache
memcache_entry = memcache.get(
request.digest, namespace='table_%s' % request.namespace.namespace)
if memcache_entry is not None:
content = memcache_entry
found = 'memcache'
# try ndb
else:
key = entry_key_or_error(request.namespace.namespace, request.digest)
stored = key.get()
if stored is None:
raise endpoints.NotFoundException('Unable to retrieve the entry.')
content = stored.content # will be None if entity is in GCS
found = 'inline'
# Return and log stats here if something has been found.
if content is not None:
# make sure that offset is acceptable
if offset < 0 or offset > len(content):
raise endpoints.BadRequestException(
'Invalid offset %d. Offset must be between 0 and content length.' %
offset)
stats.add_entry(stats.RETURN, len(content) - offset, found)
return RetrievedContent(content=content[offset:])
# The data is in GS; log stats and return the URL.
if offset < 0 or offset > stored.compressed_size:
raise endpoints.BadRequestException(
'Invalid offset %d. Offset must be between 0 and content length.' %
offset)
stats.add_entry(
stats.RETURN,
stored.compressed_size - offset,
'GS; %s' % stored.key.id())
return RetrievedContent(url=self.gs_url_signer.get_download_url(
filename=key.id(),
expiration=DEFAULT_LINK_EXPIRATION))
@auth.endpoints_method(message_types.VoidMessage, ServerDetails)
def server_details(self, _request):
return ServerDetails(server_version=utils.get_app_version())
### Utility
def storage_helper(self, request, uploaded_to_gs):
"""Implement shared logic between store_inline and finalize_gs."""
# validate token or error out
if not request.upload_ticket:
raise endpoints.BadRequestException(
'Upload ticket was empty or not provided.')
try:
embedded = TokenSigner.validate(
request.upload_ticket, UPLOAD_MESSAGES[uploaded_to_gs])
except (auth.InvalidTokenError, ValueError) as error:
raise endpoints.BadRequestException(
'Ticket validation failed: %s' % error.message)
# read data and convert types
digest = embedded['d'].encode('utf-8')
is_isolated = bool(int(embedded['i']))
namespace = embedded['n']
size = int(embedded['s'])
# create a key
key = entry_key_or_error(namespace, digest)
# get content and compressed size
if uploaded_to_gs:
# ensure that file info is uploaded to GS first
# TODO(cmassaro): address analogous TODO from handlers_api
file_info = gcs.get_file_info(config.settings().gs_bucket, key.id())
if not file_info:
raise endpoints.BadRequestException(
'File should be in Google Storage.\nFile: \'%s\' Size: %d.' % (
key.id(), size))
content = None
compressed_size = file_info.size
else:
content = request.content
compressed_size = len(content)
# all is well; create an entry
entry = model.new_content_entry(
key=key,
is_isolated=is_isolated,
compressed_size=compressed_size,
expanded_size=size,
is_verified=not uploaded_to_gs,
content=content,
)
# DB: assert that embedded content is the data sent by the request
if not uploaded_to_gs:
if (digest, size) != hash_content(content, namespace):
raise endpoints.BadRequestException(
'Embedded digest does not match provided data.')
entry.put()
# GCS: enqueue verification task
else:
try:
store_and_enqueue_verify_task(entry, utils.get_task_queue_host())
except (
datastore_errors.Error,
runtime.apiproxy_errors.CancelledError,
runtime.apiproxy_errors.DeadlineExceededError,
runtime.apiproxy_errors.OverQuotaError,
runtime.DeadlineExceededError,
taskqueue.Error) as e:
raise endpoints.InternalServerErrorException(
'Unable to store the entity: %s.' % e.__class__.__name__)
return PushPing(ok=True)
@classmethod
def generate_ticket(cls, digest, namespace):
"""Generates an HMAC-SHA1 signature for a given set of parameters.
Used by preupload to sign store URLs and by store_content to validate them.
Args:
digest: the Digest being uploaded
namespace: the namespace associated with the original DigestCollection
Returns:
base64-encoded upload ticket
"""
uploaded_to_gs = cls.should_push_to_gs(digest)
# get a single dictionary containing important information
embedded = {
'c': namespace.compression,
'd': digest.digest,
'h': namespace.digest_hash,
'i': str(int(digest.is_isolated)),
'n': namespace.namespace,
's': str(digest.size),
}
message = UPLOAD_MESSAGES[uploaded_to_gs]
try:
result = TokenSigner.generate(message, embedded)
except ValueError as error:
raise endpoints.BadRequestException(
'Ticket generation failed: %s' % error.message)
return result
@classmethod
def check_entries_exist(cls, entries):
"""Assess which entities already exist in the datastore.
Arguments:
entries: a DigestCollection to be posted
Yields:
digest, Boolean pairs, where Boolean indicates existence of the entry
Raises:
BadRequestException if any digest is not a valid hexadecimal number.
"""
# Kick off all queries in parallel. Build mapping Future -> digest.
futures = {}
for digest in entries.items:
# check for error conditions
key = entry_key_or_error(entries.namespace.namespace, digest.digest)
futures[key.get_async(use_cache=False)] = digest
# Pick first one that finishes and yield it, rinse, repeat.
while futures:
future = ndb.Future.wait_any(futures)
# TODO(maruel): For items that were present, make sure
# future.get_result().compressed_size == digest.size.
yield futures.pop(future), bool(future.get_result())
@classmethod
def partition_collection(cls, entries):
"""Create sets of existent and new digests."""
seen_unseen = [set(), set()]
for digest, exists in cls.check_entries_exist(entries):
seen_unseen[exists].add(digest)
return seen_unseen
@classmethod
def should_push_to_gs(cls, digest):
"""True to direct client to upload given EntryInfo directly to GS."""
# Relatively small *.isolated files go through app engine to cache them.
if digest.is_isolated and digest.size <= model.MAX_MEMCACHE_ISOLATED:
return False
# All other large enough files go through GS.
return digest.size >= MIN_SIZE_FOR_DIRECT_GS
@property
def gs_url_signer(self):
"""On demand instance of CloudStorageURLSigner object."""
if not self._gs_url_signer:
settings = config.settings()
self._gs_url_signer = gcs.URLSigner(
settings.gs_bucket,
settings.gs_client_id_email,
settings.gs_private_key)
return self._gs_url_signer
@classmethod
def tag_existing(cls, collection):
"""Tag existing digests with new timestamp.
Arguments:
collection: a DigestCollection containing existing digests
Returns:
the enqueued task if there were existing entries; None otherwise
"""
if collection.items:
url = '/internal/taskqueue/tag/%s/%s' % (
collection.namespace.namespace,
utils.datetime_to_timestamp(utils.utcnow()))
payload = ''.join(
binascii.unhexlify(digest.digest) for digest in collection.items)
return utils.enqueue_task(url, 'tag', payload=payload)