blob: dd5df5d8db21e7f9971c18a6f8c9f55f647cfca4 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2014 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import base64
import hashlib
import json
import logging
import sys
import unittest
from Crypto.PublicKey import RSA
import test_env
test_env.setup_test_env()
from google.appengine.api import memcache
from google.appengine.api import taskqueue
from protorpc.remote import protojson
import webapp2
import webtest
from components import auth
from components import auth_testing
from components import utils
from test_support import test_case
import acl
import config
import gcs
import handlers_backend
import handlers_endpoints_v1
import model
def make_private_key():
new_key = RSA.generate(1024)
pem_key = base64.b64encode(new_key.exportKey('PEM'))
config.settings()._ds_cfg.gs_private_key = pem_key
def hash_content(content):
"""Create and return the hash of some content in a given namespace."""
hash_algo = hashlib.sha1()
hash_algo.update(content)
return hash_algo.hexdigest()
def generate_digest(content):
"""Create a Digest from content (in a given namespace) for preupload.
Arguments:
content: the content to be hashed
Returns:
a Digest corresponding to the namespace pair
"""
return handlers_endpoints_v1.Digest(
digest=hash_content(content), size=len(content))
def generate_collection(contents, namespace=None):
if namespace is None:
namespace = handlers_endpoints_v1.Namespace()
return handlers_endpoints_v1.DigestCollection(
namespace=namespace,
items=[generate_digest(content) for content in contents])
def generate_embedded(namespace, digest):
return {
'd': digest.digest,
'i': str(int(digest.is_isolated)),
'n': namespace.namespace,
's': str(digest.size),
}
def preupload_status_to_request(preupload_status, content):
"""Create a Storage/FinalizeRequest corresponding to a PreuploadStatus."""
ticket = preupload_status.get('upload_ticket')
url = preupload_status.get('gs_upload_url', None)
if url is not None:
return handlers_endpoints_v1.FinalizeRequest(upload_ticket=ticket)
return handlers_endpoints_v1.StorageRequest(
upload_ticket=ticket, content=content)
validate = handlers_endpoints_v1.TokenSigner.validate
def pad_string(string, size=handlers_endpoints_v1.MIN_SIZE_FOR_GS):
return string + '0' * (size - len(string))
class FileInfaux(object):
"""Fake file info to mock GCS retrieval."""
def __init__(self, content):
self.size = len(content)
def get_file_info_factory(content=None):
"""Return a function to mock gcs.get_file_info."""
result = None if content is None else FileInfaux(content)
return lambda unused_bucket, unused_id: result
### Isolate Service Test
class IsolateServiceTest(test_case.EndpointsTestCase):
"""Test the IsolateService's API methods."""
api_service_cls = handlers_endpoints_v1.IsolateService
store_prefix = 'https://sample-app.storage.googleapis.com/'
APP_DIR = test_env.APP_DIR
def setUp(self):
super(IsolateServiceTest, self).setUp()
self.testbed.init_blobstore_stub()
self.testbed.init_urlfetch_stub()
admin = auth.Identity(auth.IDENTITY_USER, 'admin@example.com')
full_access_group = config.settings().auth.full_access_group
auth.bootstrap_group(full_access_group, [admin])
auth_testing.mock_get_current_identity(self, admin)
version = utils.get_app_version()
self.mock(utils, 'get_task_queue_host', lambda: version)
self.testbed.setup_env(current_version_id='testbed.version')
self.source_ip = '127.0.0.1'
# It is needed solely for self.execute_tasks(), which processes tasks queues
# on the backend application.
self.app = webtest.TestApp(
webapp2.WSGIApplication(handlers_backend.get_routes(), debug=True),
extra_environ={'REMOTE_ADDR': self.source_ip})
# add a private key; signing depends on config.settings()
make_private_key()
# Remove the check for dev server in should_push_to_gs().
self.mock(utils, 'is_local_dev_server', lambda: False)
@staticmethod
def message_to_dict(message):
"""Returns a JSON-ish dictionary corresponding to the RPC message."""
return json.loads(protojson.encode_message(message))
def store_request(self, content):
"""Generate a Storage/FinalizeRequest via preupload status."""
collection = generate_collection([content])
response = self.call_api(
'preupload', self.message_to_dict(collection), 200)
message = response.json.get(u'items', [{}])[0]
return preupload_status_to_request(message, content)
def mock_side_effect(self, original_object, attribute_name, side_effect):
"""Add a side effect to a mocked attribute, preserving functionality."""
original_function = getattr(original_object, attribute_name)
def original_with_side_effect(*args, **kwargs):
side_effect()
return original_function(*args, **kwargs)
self.mock(original_object, attribute_name, original_with_side_effect)
def test_pre_upload_ok(self):
"""Assert that preupload correctly posts a valid DigestCollection."""
good_digests = generate_collection(['a pony'])
response = self.call_api(
'preupload', self.message_to_dict(good_digests), 200)
message = response.json.get(u'items', [{}])[0]
self.assertEqual('', message.get(u'gs_upload_url', ''))
expected = validate(
message.get(u'upload_ticket', ''),
handlers_endpoints_v1.UPLOAD_MESSAGES[0])
self.assertEqual(
expected,
generate_embedded(good_digests.namespace, good_digests.items[0]))
def test_finalize_url_ok(self):
"""Assert that a finalize_url is generated when should_push_to_gs."""
digests = generate_collection([pad_string('duckling')])
response = self.call_api(
'preupload', self.message_to_dict(digests), 200)
message = response.json.get(u'items', [{}])[0]
self.assertTrue(message.get(u'gs_upload_url', '').startswith(
self.store_prefix))
expected = validate(
message.get(u'upload_ticket', ''),
handlers_endpoints_v1.UPLOAD_MESSAGES[1])
self.assertEqual(
expected,
generate_embedded(digests.namespace, digests.items[0]))
def test_pre_upload_invalid_hash(self):
"""Assert that status 400 is returned when the digest is invalid."""
bad_collection = handlers_endpoints_v1.DigestCollection(
namespace=handlers_endpoints_v1.Namespace())
bad_digest = hash_content('some stuff')
bad_digest = 'g' + bad_digest[1:] # that's not hexadecimal!
bad_collection.items.append(
handlers_endpoints_v1.Digest(digest=bad_digest, size=10))
with self.call_should_fail('400'):
self.call_api(
'preupload', self.message_to_dict(bad_collection), 200)
def test_pre_upload_invalid_namespace(self):
"""Assert that status 400 is returned when the namespace is invalid."""
bad_collection = handlers_endpoints_v1.DigestCollection(
namespace=handlers_endpoints_v1.Namespace(namespace='~tildewhatevs'))
bad_collection.items.append(generate_digest('pangolin'))
with self.call_should_fail('400'):
self.call_api(
'preupload', self.message_to_dict(bad_collection), 200)
def test_check_existing_finds_existing_entities(self):
"""Assert that existence check is working."""
collection = generate_collection(
['small content', 'larger content', 'biggest content'])
key = model.get_entry_key(
collection.namespace.namespace, collection.items[0].digest)
# guarantee that one digest already exists in the datastore
model.new_content_entry(key).put()
response = self.call_api(
'preupload', self.message_to_dict(collection), 200)
# we should see one enqueued task and two new URLs in the response
items = response.json['items']
self.assertEqual(2, len(items))
self.assertEqual([1, 2], [int(item['index']) for item in items])
for item in items:
self.assertIsNotNone(item.get('upload_ticket'))
# remove tasks so tearDown doesn't complain
_ = self.execute_tasks()
def test_check_existing_enqueues_tasks(self):
"""Assert that existent entities are enqueued."""
collection = handlers_endpoints_v1.DigestCollection(
namespace=handlers_endpoints_v1.Namespace())
collection.items.append(generate_digest('some content'))
key = model.get_entry_key(
collection.namespace.namespace, collection.items[0].digest)
# guarantee that one digest already exists in the datastore
model.new_content_entry(key).put()
self.call_api(
'preupload', self.message_to_dict(collection), 200)
# find enqueued tasks
enqueued_tasks = self.execute_tasks()
self.assertEqual(1, enqueued_tasks)
def test_store_inline_ok(self):
"""Assert that inline content storage completes successfully."""
request = self.store_request('sibilance')
embedded = validate(
request.upload_ticket, handlers_endpoints_v1.UPLOAD_MESSAGES[0])
key = model.get_entry_key(embedded['n'], embedded['d'])
# assert that store_inline puts the correct entity into the datastore
self.call_api(
'store_inline', self.message_to_dict(request), 200)
stored = key.get()
self.assertEqual(key, stored.key)
# assert that expected (digest, size) pair is generated by stored content
self.assertEqual(
(embedded['d'].encode('utf-8'), int(embedded['s'])),
handlers_endpoints_v1.hash_content(stored.content, embedded['n']))
def test_store_inline_empty_content(self):
"""Assert that inline content storage works when content is empty."""
request = self.store_request('')
embedded = validate(
request.upload_ticket, handlers_endpoints_v1.UPLOAD_MESSAGES[0])
key = model.get_entry_key(embedded['n'], embedded['d'])
# assert that store_inline puts the correct entity into the datastore
self.call_api(
'store_inline', self.message_to_dict(request), 200)
stored = key.get()
self.assertEqual(key, stored.key)
# assert that expected (digest, size) pair is generated by stored content
self.assertEqual(
(embedded['d'].encode('utf-8'), int(embedded['s'])),
handlers_endpoints_v1.hash_content(stored.content, embedded['n']))
def test_store_inline_bad_mac(self):
"""Assert that inline content storage fails when token is altered."""
request = self.store_request('sonority')
request.upload_ticket += '7'
with self.call_should_fail('400'):
self.call_api(
'store_inline', self.message_to_dict(request), 200)
def test_store_inline_no_upload_ticket(self):
"""Assert that inline content storage fails when there is no ticket."""
request = self.store_request('silence')
request.upload_ticket = None
with self.call_should_fail('400'):
self.call_api(
'store_inline', self.message_to_dict(request), 200)
def test_store_inline_bad_digest(self):
"""Assert that inline content storage fails when data do not match."""
request = self.store_request('anseres sacri')
request.content = ':)' + request.content[2:]
with self.call_should_fail('400'):
self.call_api(
'store_inline', self.message_to_dict(request), 200)
def test_finalized_data_in_gs(self):
"""Assert that data are actually in GS when finalized."""
# create content
content = pad_string('huge, important data')
request = self.store_request(content)
# this should succeed
self.mock(gcs, 'get_file_info', get_file_info_factory(content))
self.call_api(
'finalize_gs_upload', self.message_to_dict(request), 200)
# this should fail
self.mock(gcs, 'get_file_info', get_file_info_factory())
with self.call_should_fail('400'):
self.call_api(
'finalize_gs_upload', self.message_to_dict(request), 200)
self.assertEqual(1, self.execute_tasks())
def test_finalized_no_upload_ticket(self):
"""Assert that GS finalization fails when there is no ticket."""
request = self.store_request(pad_string('silence'))
request.upload_ticket = None
with self.call_should_fail('400'):
self.call_api(
'finalize_gs_upload', self.message_to_dict(request), 200)
def test_finalize_gs_creates_content_entry(self):
"""Assert that finalize_gs_upload creates a content entry."""
content = pad_string('empathy')
request = self.store_request(content)
embedded = validate(
request.upload_ticket, handlers_endpoints_v1.UPLOAD_MESSAGES[1])
key = model.get_entry_key(embedded['n'], embedded['d'])
# finalize_gs_upload should put a new ContentEntry into the database
self.mock(gcs, 'get_file_info', get_file_info_factory(content))
self.call_api(
'finalize_gs_upload', self.message_to_dict(request), 200)
stored = key.get()
self.assertEqual(key, stored.key)
# assert that expected attributes are present
self.assertEqual(None, stored.content)
self.assertEqual(int(embedded['s']), stored.expanded_size)
# ensure that verification occurs
self.mock(gcs, 'read_file', lambda _bucket, _key: content)
# add a side effect in execute_tasks()
# TODO(cmassaro): there must be a better way than this
def set_verified():
stored_entry = stored.key.get()
if not stored_entry.is_verified:
stored_entry.is_verified = True
self.mock_side_effect(self._taskqueue_stub, 'DeleteTask', set_verified)
# assert that verification occurs in the taskqueue
self.assertFalse(stored.key.get().is_verified)
self.assertEqual(1, self.execute_tasks())
self.assertTrue(stored.key.get().is_verified)
def test_storage_wrong_type(self):
"""Assert that GS and inline storage fail when the wrong type is sent."""
small = 'elephant'
large = pad_string('mouse')
small_request = self.store_request(small)
large_request = self.store_request(large)
# try the large entity
self.mock(gcs, 'get_file_info', get_file_info_factory(large))
with self.call_should_fail('400'):
self.call_api(
'finalize_gs_upload', self.message_to_dict(small_request), 200)
# try the inline stored entity
with self.call_should_fail('400'):
self.call_api(
'store_inline', self.message_to_dict(large_request), 200)
def test_storage_server_error(self):
"""Assert that GS storage raises appropriate error when storage fails."""
# pretend that taskqueue addition fails
def _taskqueue_add_mock(*_args, **_kwargs):
raise taskqueue.Error()
self.mock(taskqueue, 'add', _taskqueue_add_mock)
# make a GCS-sized request
big_datum = pad_string('gigas')
request = self.store_request(big_datum)
self.mock(gcs, 'get_file_info', get_file_info_factory(big_datum))
# should raise InternalServerErrorException
with self.call_should_fail('500'):
self.call_api('finalize_gs_upload', self.message_to_dict(request), 200)
def test_retrieve_memcache_ok(self):
"""Assert that content retrieval goes off swimmingly in the normal case."""
content = 'Grecian Urn'
request = self.store_request(content)
embedded = validate(
request.upload_ticket, handlers_endpoints_v1.UPLOAD_MESSAGES[0])
self.call_api(
'store_inline', self.message_to_dict(request), 200)
retrieve_request = handlers_endpoints_v1.RetrieveRequest(
digest=embedded['d'], namespace=handlers_endpoints_v1.Namespace())
response = self.call_api(
'retrieve', self.message_to_dict(retrieve_request), 200)
retrieved = response.json
self.assertEqual(content, base64.b64decode(retrieved.get(u'content', '')))
def test_retrieve_db_ok(self):
"""Assert that content retrieval works for non-memcached DB entities."""
content = 'Isabella, or the Pot of Basil'
request = self.store_request(content)
embedded = validate(
request.upload_ticket, handlers_endpoints_v1.UPLOAD_MESSAGES[0])
self.call_api(
'store_inline', self.message_to_dict(request), 200)
retrieve_request = handlers_endpoints_v1.RetrieveRequest(
digest=embedded['d'], namespace=handlers_endpoints_v1.Namespace())
memcache.flush_all()
response = self.call_api(
'retrieve', self.message_to_dict(retrieve_request), 200)
retrieved = response.json
self.assertEqual(content, base64.b64decode(retrieved.get(u'content', '')))
def test_retrieve_gs_url_ok(self):
"""Assert that URL retrieval works for GS entities."""
# get URL via preupload
content = pad_string('Lycidas')
collection = generate_collection([content])
preupload_status = self.call_api(
'preupload', self.message_to_dict(collection), 200)
message = preupload_status.json.get(u'items', [{}])[0]
# finalize GS upload
request = preupload_status_to_request(message, content)
embedded = validate(
request.upload_ticket, handlers_endpoints_v1.UPLOAD_MESSAGES[1])
self.mock(gcs, 'get_file_info', get_file_info_factory(content))
self.call_api(
'finalize_gs_upload', self.message_to_dict(request), 200)
# retrieve the upload URL
retrieve_request = handlers_endpoints_v1.RetrieveRequest(
digest=embedded['d'], namespace=handlers_endpoints_v1.Namespace())
retrieved_response = self.call_api(
'retrieve', self.message_to_dict(retrieve_request), 200)
retrieved = retrieved_response.json
self.assertNotEqual(message.get(u'gs_upload_url', ''), '')
self.assertNotEqual(retrieved.get(u'url', ''), '')
self.assertTrue(retrieved.get(u'url', '').startswith(self.store_prefix))
# clear the taskqueue
self.assertEqual(1, self.execute_tasks())
def test_retrieve_partial_ok(self):
"""Assert that content retrieval works when a range is specified."""
content = 'Song of the Andoumboulou'
offset = 5
request = self.store_request(content)
embedded = validate(
request.upload_ticket, handlers_endpoints_v1.UPLOAD_MESSAGES[0])
self.call_api(
'store_inline', self.message_to_dict(request), 200)
retrieve_request = handlers_endpoints_v1.RetrieveRequest(
digest=embedded['d'],
namespace=handlers_endpoints_v1.Namespace(),
offset=offset) # TODO(cmassaro): determine where offsets come from
response = self.call_api(
'retrieve', self.message_to_dict(retrieve_request), 200)
retrieved = response.json
self.assertEqual(content[offset:], base64.b64decode(retrieved.get(
u'content', '')))
def test_retrieve_partial_bad_offset_fails(self):
"""Assert that retrieval fails with status 416 when offset is invalid."""
content = 'Of Man\'s first Disobedience, and the Fruit'
request = self.store_request(content)
embedded = validate(
request.upload_ticket, handlers_endpoints_v1.UPLOAD_MESSAGES[0])
self.call_api(
'store_inline', self.message_to_dict(request), 200)
requests = [handlers_endpoints_v1.RetrieveRequest(
digest=embedded['d'],
namespace=handlers_endpoints_v1.Namespace(),
offset=offset) for offset in [-1, len(content) + 1]]
for request in requests:
with self.call_should_fail('400'):
self.call_api('retrieve', self.message_to_dict(request), 200)
def test_retrieve_not_found(self):
"""Assert that HTTP 404 response is served when content is absent."""
# get a valid digest
content = """\xe1\xbc\x84\xce\xbd\xce\xb4\xcf\x81\xce\xb1
\xce\xbc\xce\xbf\xce\xb9
\xe1\xbc\x94\xce\xbd\xce\xbd\xce\xb5\xcf\x80\xce\xb5"""
collection = generate_collection([content])
preupload_status = self.call_api(
'preupload', self.message_to_dict(collection), 200)
message = preupload_status.json.get(u'items', [{}])[0]
# get the digest
request = preupload_status_to_request(message, content)
embedded = validate(
request.upload_ticket, handlers_endpoints_v1.UPLOAD_MESSAGES[0])
# don't upload data; try to retrieve
retrieve_request = handlers_endpoints_v1.RetrieveRequest(
digest=embedded['d'], namespace=handlers_endpoints_v1.Namespace())
with self.call_should_fail('404'):
self.call_api(
'retrieve', self.message_to_dict(retrieve_request), 200)
def test_server_details_ok(self):
"""Assert that server_details returns the correct version."""
response = self.call_api('server_details', {}, 200).json
self.assertEqual(utils.get_app_version(), response['server_version'])
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.FATAL)
unittest.main()