blob: 5e20515188b0e122353b1f720b8bc253e9308c45 [file] [log] [blame]
# coding=utf-8
# Copyright 2014 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""This module defines Isolate Server model(s)."""
import datetime
import hashlib
import logging
import random
import zlib
from google.appengine.api import memcache
from google.appengine.ext import ndb
import config
import gcs
from components import datastore_utils
from components import utils
# The maximum number of entries that can be queried in a single request.
MAX_KEYS_PER_DB_OPS = 1000
# Maximum size of file stored in GS to be saved in memcache. The value must be
# small enough so that the whole content can safely fit in memory.
MAX_MEMCACHE_ISOLATED = 500*1024
# Valid namespace key.
NAMESPACE_RE = r'[a-z0-9A-Z\-._]+'
#### Models
class ContentEntry(ndb.Model):
"""Represents the content, keyed by its SHA-1 hash.
Parent is a ContentShard.
Key is '<namespace>-<hash>'.
Eventually, the table name could have a prefix to determine the hashing
algorithm, like 'sha1-'.
There's usually only one table name:
- default: The default CAD.
- temporary*: This family of namespace is a discardable namespace for
testing purpose only.
The table name can have suffix:
- -deflate: The namespace contains the content in deflated format. The
content key is the hash of the uncompressed data, not the
compressed one. That is why it is in a separate namespace.
"""
# Cache the file size for statistics purposes.
compressed_size = ndb.IntegerProperty(indexed=False)
# The value is the Cache the expanded file size for statistics purposes. Its
# value is different from size *only* in compressed namespaces. It may be -1
# if yet unknown.
expanded_size = ndb.IntegerProperty(indexed=False)
# Set to True once the entry's content has been verified to match the hash.
is_verified = ndb.BooleanProperty()
# The content stored inline. This is only valid if the content was smaller
# than MIN_SIZE_FOR_GS.
content = ndb.BlobProperty()
# Moment when this item expires and should be cleared. This is the only
# property that has to be indexed.
expiration_ts = ndb.DateTimeProperty()
# Moment when this item should have its expiration time updatd.
next_tag_ts = ndb.DateTimeProperty()
# Moment when this item was created.
creation_ts = ndb.DateTimeProperty(indexed=False, auto_now=True)
# It is an important item, normally .isolated file.
is_isolated = ndb.BooleanProperty(default=False)
@property
def is_compressed(self):
"""Is it the raw data or was it modified in any form, e.g. compressed, so
that the SHA-1 doesn't match.
"""
return self.key.parent().id().endswith(('-bzip2', '-deflate', '-gzip'))
### Private stuff.
# Valid hash keys.
_HASH_LETTERS = frozenset('0123456789abcdef')
### Public API.
def is_valid_hex(hex_string):
"""Returns True if the string consists of hexadecimal characters."""
return _HASH_LETTERS.issuperset(hex_string)
def check_hash(hash_key, length):
"""Checks the validity of an hash_key. Doesn't use a regexp for speed.
Raises in case of non-validity.
"""
# It is faster than running a regexp.
if len(hash_key) != length or not is_valid_hex(hash_key):
raise ValueError('Invalid \'%s\' as ContentEntry key' % hash_key)
def get_entry_key(namespace, hash_key):
"""Returns a valid ndb.Key for a ContentEntry."""
if isinstance(namespace, unicode):
namespace = namespace.encode('utf-8')
if isinstance(hash_key, unicode):
hash_key = hash_key.encode('utf-8')
check_hash(hash_key, hashlib.sha1().digest_size * 2)
return entry_key_from_id('%s/%s' % (namespace, hash_key))
def entry_key_from_id(key_id):
"""Returns the ndb.Key for the key_id."""
hash_key = key_id.rsplit('/', 1)[1]
N = config.settings().sharding_letters
return ndb.Key(
ContentEntry, key_id,
parent=datastore_utils.shard_key(hash_key, N, 'ContentShard'))
def get_content(namespace, hash_key):
"""Returns the content from either memcache or datastore, when stored inline.
This does NOT return data from GCS, it is up to the client to do that.
Returns:
tuple(content, ContentEntry)
At most only one of the two is set.
Raises LookupError if the content cannot be found.
Raises ValueError if the hash_key is invalid.
"""
memcache_entry = memcache.get(hash_key, namespace='table_%s' % namespace)
if memcache_entry is not None:
return (memcache_entry, None)
else:
# Raises ValueError
key = get_entry_key(namespace, hash_key)
entity = key.get()
if entity is None:
raise LookupError("namespace %s, key %s does not refer to anything" %
(namespace, hash_key))
return (entity.content, entity)
def expiration_jitter(now, expiration):
"""Returns expiration/next_tag pair to set in a ContentEntry."""
jittered = random.uniform(1, 1.2) * expiration
expiration = now + datetime.timedelta(seconds=jittered)
next_tag = now + datetime.timedelta(seconds=jittered*0.1)
return expiration, next_tag
def expand_content(namespace, source):
"""Yields expanded data from source."""
# TODO(maruel): Add bzip2.
# TODO(maruel): Remove '-gzip' since it's a misnomer.
if namespace.endswith(('-deflate', '-gzip')):
zlib_state = zlib.decompressobj()
for i in source:
data = zlib_state.decompress(i, gcs.CHUNK_SIZE)
yield data
del data
while zlib_state.unconsumed_tail:
data = zlib_state.decompress(
zlib_state.unconsumed_tail, gcs.CHUNK_SIZE)
yield data
del data
del i
data = zlib_state.flush()
yield data
del data
# Forcibly delete the state.
del zlib_state
else:
# Returns the source as-is.
for i in source:
yield i
del i
def save_in_memcache(namespace, hash_key, content, async=False):
namespace_key = 'table_%s' % namespace
if async:
return ndb.get_context().memcache_set(
hash_key, content, namespace=namespace_key)
try:
if not memcache.set(hash_key, content, namespace=namespace_key):
msg = 'Failed to save content to memcache.\n%s\\%s %d bytes' % (
namespace_key, hash_key, len(content))
if len(content) < 100*1024:
logging.error(msg)
else:
logging.warning(msg)
except ValueError as e:
logging.error(e)
def new_content_entry(key, **kwargs):
"""Generates a new ContentEntry for the request.
Doesn't store it. Just creates a new ContentEntry instance.
"""
expiration, next_tag = expiration_jitter(
utils.utcnow(), config.settings().default_expiration)
return ContentEntry(
key=key, expiration_ts=expiration, next_tag_ts=next_tag, **kwargs)
def delete_entry_and_gs_entry(keys_to_delete):
"""Deletes synchronously a list of ContentEntry and their GS files.
For each ContentEntry, it deletes the ContentEntry first, then the files in
GS. The worst case is that the GS files are left behind and will be reaped by
a lost GS task queue. The reverse is much worse, having a ContentEntry
pointing to a deleted GS entry will lead to lookup failures.
"""
futures = {}
exc = None
bucket = config.settings().gs_bucket
# Note that some content entries may NOT have corresponding GS files. That
# happens for small entries stored inline in the datastore or memcache. Since
# this function operates only on keys, it can't distinguish "large" entries
# stored in GS from "small" ones stored inline. So instead it tries to delete
# all corresponding GS files, silently skipping ones that are not there.
for key in keys_to_delete:
# Always delete ContentEntry first.
futures[key.delete_async()] = key.string_id()
# Note: this is worst case O(n²) but will scale better than that. The goal
# is to delete files as soon as possible.
for f in futures.keys():
if f.done():
k = futures.pop(f)
try:
f.get_result()
# This is synchronous.
gcs.delete_file(bucket, k, ignore_missing=True)
except Exception as exc:
break
if exc:
break
while futures:
f = ndb.Future.wait_any(futures)
k = futures.pop(f)
try:
f.get_result()
# This is synchronous.
gcs.delete_file(bucket, k, ignore_missing=True)
except Exception as exc:
continue
if exc:
raise exc # pylint: disable=raising-bad-type