blob: 750012206958584abe4ba4ae56d5832ddc9e13b9 [file] [log] [blame]
# Copyright 2012 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""This module defines Isolate Server backend url handlers."""
import binascii
import hashlib
import logging
import time
import zlib
import webapp2
from google.appengine import runtime
from google.appengine.api import datastore_errors
from google.appengine.api import memcache
from google.appengine.ext import ndb
import config
import gcs
import mapreduce_jobs
import model
import stats
import template
from components import decorators
from components import utils
# The maximum number of items to delete at a time.
ITEMS_TO_DELETE_ASYNC = 100
### Utility
class Accumulator(object):
"""Accumulates output from a generator."""
def __init__(self, source):
self.accumulated = []
self._source = source
def __iter__(self):
for i in self._source:
self.accumulated.append(i)
yield i
del i
def split_payload(request, chunk_size, max_chunks):
"""Splits a binary payload into elements of |chunk_size| length.
Returns each chunks.
"""
content = request.request.body
if len(content) % chunk_size:
msg = (
'Payload must be in increments of %d bytes, had %d bytes total, last '
'chunk was of length %d' % (
chunk_size,
len(content),
len(content) % chunk_size))
logging.error(msg)
request.abort(400, detail=msg)
count = len(content) / chunk_size
if count > max_chunks:
msg = (
'Requested more than %d hash digests in a single request, '
'aborting' % count)
logging.warning(msg)
request.abort(400, detail=msg)
return [content[i * chunk_size: (i + 1) * chunk_size] for i in xrange(count)]
def payload_to_hashes(request):
"""Converts a raw payload into SHA-1 hashes as bytes."""
return split_payload(
request,
hashlib.sha1().digest_size,
model.MAX_KEYS_PER_DB_OPS)
def incremental_delete(query, delete, check=None):
"""Applies |delete| to objects in a query asynchrously.
This function is itself synchronous.
Arguments:
- query: iterator of items to process.
- delete: callback that accepts a list of objects to delete and returns a list
of objects that have a method .wait() to make sure all calls
completed.
- check: optional callback that can filter out items from |query| from
deletion.
Returns the number of objects found.
"""
to_delete = []
count = 0
deleted_count = 0
futures = []
for item in query:
count += 1
if not (count % 1000):
logging.debug('Found %d items', count)
if check and not check(item):
continue
to_delete.append(item)
deleted_count += 1
if len(to_delete) == ITEMS_TO_DELETE_ASYNC:
logging.info('Deleting %s entries', len(to_delete))
futures.extend(delete(to_delete) or [])
to_delete = []
# That's a lot of on-going operations which could take a significant amount
# of memory. Wait a little on the oldest operations.
# TODO(maruel): Profile memory usage to see if a few thousands of on-going
# RPC objects is a problem in practice.
while len(futures) > 10 * ITEMS_TO_DELETE_ASYNC:
futures.pop(0).wait()
if to_delete:
logging.info('Deleting %s entries', len(to_delete))
futures.extend(delete(to_delete) or [])
ndb.Future.wait_all(futures)
return deleted_count
### Restricted handlers
class InternalCleanupOldEntriesWorkerHandler(webapp2.RequestHandler):
"""Removes the old data from the datastore.
Only a task queue task can use this handler.
"""
# pylint: disable=R0201
@decorators.silence(
datastore_errors.InternalError,
datastore_errors.Timeout,
datastore_errors.TransactionFailedError,
runtime.DeadlineExceededError)
@decorators.require_taskqueue('cleanup')
def post(self):
q = model.ContentEntry.query(
model.ContentEntry.expiration_ts < utils.utcnow()
).iter(keys_only=True)
total = incremental_delete(q, delete=model.delete_entry_and_gs_entry)
logging.info('Deleting %s expired entries', total)
class InternalObliterateWorkerHandler(webapp2.RequestHandler):
"""Deletes all the stuff."""
# pylint: disable=R0201
@decorators.silence(
datastore_errors.InternalError,
datastore_errors.Timeout,
datastore_errors.TransactionFailedError,
runtime.DeadlineExceededError)
@decorators.require_taskqueue('cleanup')
def post(self):
logging.info('Deleting ContentEntry')
incremental_delete(
model.ContentEntry.query().iter(keys_only=True),
ndb.delete_multi_async)
gs_bucket = config.settings().gs_bucket
logging.info('Deleting GS bucket %s', gs_bucket)
incremental_delete(
(i[0] for i in gcs.list_files(gs_bucket)),
lambda filenames: gcs.delete_files(gs_bucket, filenames))
logging.info('Flushing memcache')
# High priority (.isolated files) are cached explicitly. Make sure ghosts
# are zapped too.
memcache.flush_all()
logging.info('Finally done!')
class InternalCleanupTrimLostWorkerHandler(webapp2.RequestHandler):
"""Removes the GS files that are not referenced anymore.
It can happen for example when a ContentEntry is deleted without the file
properly deleted.
Only a task queue task can use this handler.
"""
# pylint: disable=R0201
@decorators.silence(
datastore_errors.InternalError,
datastore_errors.Timeout,
datastore_errors.TransactionFailedError,
runtime.DeadlineExceededError)
@decorators.require_taskqueue('cleanup')
def post(self):
"""Enumerates all GS files and delete those that do not have an associated
ContentEntry.
"""
gs_bucket = config.settings().gs_bucket
def filter_missing():
futures = {}
cutoff = time.time() - 60*60
for filepath, filestats in gcs.list_files(gs_bucket):
# If the file was uploaded in the last hour, ignore it.
if filestats.st_ctime >= cutoff:
continue
# This must match the logic in model.get_entry_key(). Since this request
# will in practice touch every item, do not use memcache since it'll
# mess it up by loading every items in it.
# TODO(maruel): Batch requests to use get_multi_async() similar to
# datastore_utils.page_queries().
future = model.entry_key_from_id(filepath).get_async(
use_cache=False, use_memcache=False)
futures[future] = filepath
if len(futures) > 20:
future = ndb.Future.wait_any(futures)
filepath = futures.pop(future)
if future.get_result():
continue
yield filepath
while futures:
future = ndb.Future.wait_any(futures)
filepath = futures.pop(future)
if future.get_result():
continue
yield filepath
gs_delete = lambda filenames: gcs.delete_files(gs_bucket, filenames)
total = incremental_delete(filter_missing(), gs_delete)
logging.info('Deleted %d lost GS files', total)
# TODO(maruel): Find all the empty directories that are old and remove them.
# We need to safe guard against the race condition where a user would upload
# to this directory.
class InternalCleanupTriggerHandler(webapp2.RequestHandler):
"""Triggers a taskqueue to clean up."""
@decorators.silence(
datastore_errors.InternalError,
datastore_errors.Timeout,
datastore_errors.TransactionFailedError,
runtime.DeadlineExceededError)
@decorators.require_cronjob
def get(self, name):
if name in ('obliterate', 'old', 'orphaned', 'trim_lost'):
url = '/internal/taskqueue/cleanup/' + name
# The push task queue name must be unique over a ~7 days period so use
# the date at second precision, there's no point in triggering each of
# time more than once a second anyway.
now = utils.utcnow().strftime('%Y-%m-%d_%I-%M-%S')
if utils.enqueue_task(url, 'cleanup', name=name + '_' + now):
self.response.out.write('Triggered %s' % url)
else:
self.abort(500, 'Failed to enqueue a cleanup task, see logs')
else:
self.abort(404, 'Unknown job')
class InternalTagWorkerHandler(webapp2.RequestHandler):
"""Tags hot ContentEntry entities that were tested for presence.
Updates .expiration_ts and .next_tag_ts in ContentEntry to note that a client
tagged them.
This makes sure they are not evicted from the LRU cache too fast.
"""
@decorators.silence(
datastore_errors.InternalError,
datastore_errors.Timeout,
datastore_errors.TransactionFailedError,
runtime.DeadlineExceededError)
@decorators.require_taskqueue('tag')
def post(self, namespace, timestamp):
digests = []
now = utils.timestamp_to_datetime(long(timestamp))
expiration = config.settings().default_expiration
try:
digests = payload_to_hashes(self)
# Requests all the entities at once.
futures = ndb.get_multi_async(
model.get_entry_key(namespace, binascii.hexlify(d)) for d in digests)
to_save = []
while futures:
# Return opportunistically the first entity that can be retrieved.
future = ndb.Future.wait_any(futures)
futures.remove(future)
item = future.get_result()
if item and item.next_tag_ts < now:
# Update the timestamp. Add a bit of pseudo randomness.
item.expiration_ts, item.next_tag_ts = model.expiration_jitter(
now, expiration)
to_save.append(item)
if to_save:
ndb.put_multi(to_save)
logging.info(
'Timestamped %d entries out of %s', len(to_save), len(digests))
except Exception as e:
logging.error('Failed to stamp entries: %s\n%d entries', e, len(digests))
raise
class InternalVerifyWorkerHandler(webapp2.RequestHandler):
"""Verify the SHA-1 matches for an object stored in Cloud Storage."""
@staticmethod
def purge_entry(entry, message, *args):
"""Logs error message, deletes |entry| from datastore and GS."""
logging.error(
'Verification failed for %s: %s', entry.key.id(), message % args)
model.delete_entry_and_gs_entry([entry.key])
@decorators.silence(
datastore_errors.InternalError,
datastore_errors.Timeout,
datastore_errors.TransactionFailedError,
gcs.TransientError,
runtime.DeadlineExceededError)
@decorators.require_taskqueue('verify')
def post(self, namespace, hash_key):
original_request = self.request.get('req')
entry = model.get_entry_key(namespace, hash_key).get()
if not entry:
logging.error('Failed to find entity\n%s', original_request)
return
if entry.is_verified:
logging.warning('Was already verified\n%s', original_request)
return
if entry.content is not None:
logging.error(
'Should not be called with inline content\n%s', original_request)
return
# Get GS file size.
gs_bucket = config.settings().gs_bucket
gs_file_info = gcs.get_file_info(gs_bucket, entry.key.id())
# It's None if file is missing.
if not gs_file_info:
# According to the docs, GS is read-after-write consistent, so a file is
# missing only if it wasn't stored at all or it was deleted, in any case
# it's not a valid ContentEntry.
self.purge_entry(entry, 'No such GS file\n%s', original_request)
return
# Expected stored length and actual length should match.
if gs_file_info.size != entry.compressed_size:
self.purge_entry(entry,
'Bad GS file: expected size is %d, actual size is %d\n%s',
entry.compressed_size, gs_file_info.size,
original_request)
return
save_to_memcache = (
entry.compressed_size <= model.MAX_MEMCACHE_ISOLATED and
entry.is_isolated)
expanded_size = 0
digest = hashlib.sha1()
data = None
try:
# Start a loop where it reads the data in block.
stream = gcs.read_file(gs_bucket, entry.key.id())
if save_to_memcache:
# Wraps stream with a generator that accumulates the data.
stream = Accumulator(stream)
for data in model.expand_content(namespace, stream):
expanded_size += len(data)
digest.update(data)
# Make sure the data is GC'ed.
del data
# Hashes should match.
if digest.hexdigest() != hash_key:
self.purge_entry(entry,
'SHA-1 do not match data\n'
'%d bytes, %d bytes expanded, expected %d bytes\n%s',
entry.compressed_size, expanded_size,
entry.expanded_size, original_request)
return
except gcs.NotFoundError as e:
# Somebody deleted a file between get_file_info and read_file calls.
self.purge_entry(
entry, 'File was unexpectedly deleted\n%s', original_request)
return
except (gcs.ForbiddenError, gcs.AuthorizationError) as e:
# Misconfiguration in Google Storage ACLs. Don't delete an entry, it may
# be fine. Maybe ACL problems would be fixed before the next retry.
logging.warning(
'CloudStorage auth issues (%s): %s', e.__class__.__name__, e)
# Abort so the job is retried automatically.
return self.abort(500)
except (gcs.FatalError, zlib.error, IOError) as e:
# ForbiddenError and AuthorizationError inherit FatalError, so this except
# block should be last.
# It's broken or unreadable.
self.purge_entry(entry,
'Failed to read the file (%s): %s\n%s',
e.__class__.__name__, e, original_request)
return
# Verified. Data matches the hash.
entry.expanded_size = expanded_size
entry.is_verified = True
future = entry.put_async()
logging.info(
'%d bytes (%d bytes expanded) verified\n%s',
entry.compressed_size, expanded_size, original_request)
if save_to_memcache:
model.save_in_memcache(namespace, hash_key, ''.join(stream.accumulated))
future.wait()
class InternalStatsUpdateHandler(webapp2.RequestHandler):
"""Called every few minutes to update statistics."""
@decorators.require_cronjob
def get(self):
self.response.headers['Content-Type'] = 'text/plain'
minutes = stats.generate_stats()
if minutes is not None:
msg = 'Processed %d minutes' % minutes
logging.info(msg)
self.response.write(msg)
### Mapreduce related handlers
class InternalLaunchMapReduceJobWorkerHandler(webapp2.RequestHandler):
"""Called via task queue or cron to start a map reduce job."""
@decorators.require_taskqueue(mapreduce_jobs.MAPREDUCE_TASK_QUEUE)
def post(self, job_id): # pylint: disable=R0201
mapreduce_jobs.launch_job(job_id)
###
def get_routes():
"""Returns the routes to be executed on the backend."""
# Namespace can be letters, numbers, '-', '.' and '_'.
namespace = r'/<namespace:%s>' % model.NAMESPACE_RE
# Do not enforce a length limit to support different hashing algorithm. This
# should represent a valid hex value.
hashkey = r'/<hash_key:[a-f0-9]{4,}>'
# This means a complete key is required.
namespace_key = namespace + hashkey
return [
# Triggers a taskqueue.
webapp2.Route(
r'/internal/cron/cleanup/trigger/<name:[a-z_]+>',
InternalCleanupTriggerHandler),
# Cleanup tasks.
webapp2.Route(
r'/internal/taskqueue/cleanup/old',
InternalCleanupOldEntriesWorkerHandler),
webapp2.Route(
r'/internal/taskqueue/cleanup/obliterate',
InternalObliterateWorkerHandler),
webapp2.Route(
r'/internal/taskqueue/cleanup/trim_lost',
InternalCleanupTrimLostWorkerHandler),
# Tasks triggered by other request handlers.
webapp2.Route(
r'/internal/taskqueue/tag%s/<timestamp:\d+>' % namespace,
InternalTagWorkerHandler),
webapp2.Route(
r'/internal/taskqueue/verify%s' % namespace_key,
InternalVerifyWorkerHandler),
# Stats
webapp2.Route(
r'/internal/cron/stats/update', InternalStatsUpdateHandler),
# Mapreduce related urls.
webapp2.Route(
r'/internal/taskqueue/mapreduce/launch/<job_id:[^\/]+>',
InternalLaunchMapReduceJobWorkerHandler),
]
def create_application(debug):
"""Creates the url router for the backend.
The backend only implements urls under /internal/.
"""
# Necessary due to email sent by cron job.
template.bootstrap()
return webapp2.WSGIApplication(get_routes(), debug=debug)