blob: 277cc478e82223ecacf1ac3972668d8478f33e03 [file] [log] [blame]
# Copyright 2014 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import base64
import hashlib
import json
import logging
import re
import zlib
import httpserver
ALGO = hashlib.sha1
def hash_content(content):
return ALGO(content).hexdigest()
class FakeSigner(object):
@classmethod
def generate(cls, message, embedded):
return '%s_<<<%s>>>' % (repr(message), json.dumps(embedded))
@classmethod
def validate(cls, ticket, message):
a = re.match(r'^' + repr(message) + r'_<<<(.*)>>>$', ticket, re.DOTALL)
if not a:
raise ValueError('Message %s cannot validate ticket %s' % (
repr(message), ticket))
return json.loads(a.groups()[0])
class FakeIsolateServerHandler(httpserver.Handler):
"""An extremely minimal implementation of the isolate server API v1.0."""
def _should_push_to_gs(self, isolated, size):
max_memcache = 500 * 1024
min_direct_gs = 501
if isolated and size <= max_memcache:
return False
return size >= min_direct_gs
def _generate_signed_url(self, digest, namespace='default'):
return '%s/FAKE_GCS/%s/%s' % (self.server.url, namespace, digest)
def _generate_ticket(self, entry_dict):
"""Generates an 'upload_ticket' to reply as PreuploadStatus."""
embedded = dict(
entry_dict,
**{
'c': 'flate',
'h': 'SHA-1',
})
message = (
'gs' if self._should_push_to_gs(embedded['i'], embedded['s'])
else 'datastore')
return FakeSigner.generate(message, embedded)
def _storage_helper(self, body, finalize_gs):
"""Processes handlers_endpoints_v1.StorageRequest."""
request = json.loads(body)
message = 'gs' if finalize_gs else 'datastore'
content = request['content'] if not finalize_gs else None
embedded = FakeSigner.validate(request['upload_ticket'], message)
# Embedded is an internal format used by
# handlers_endpoints_v1.IsolateService.generate_ticket.
namespace = embedded['n']
d = embedded['d']
if self.server.store_hash_instead:
if not finalize_gs:
self.server.contents.setdefault(namespace, {})[d] = hash_content(
content)
else:
self.server.contents.setdefault(namespace, {})[d] = content
self.send_json({'ok': True})
### Faked HTTP Methods
def do_GET(self):
logging.info('GET %s', self.path)
if self.path == '/auth/api/v1/server/oauth_config':
self.send_json({
'client_id': 'c',
'client_not_so_secret': 's',
'primary_url': self.server.url})
elif self.path == '/auth/api/v1/accounts/self':
self.send_json({'identity': 'user:joe', 'xsrf_token': 'foo'})
else:
raise NotImplementedError(self.path)
def do_POST(self):
logging.info('POST %s', self.path)
body = self.read_body()
if self.path.startswith('/_ah/api/isolateservice/v1/preupload'):
response = {'items': []}
def append_entry(entry, index, li):
"""Converts a {'h', 's', 'i'} to ["<upload url>", "<finalize url>"] or
None.
"""
if entry['d'] not in self.server.contents.get(entry['n'], {}):
# handlers_endpoints_v1.PreuploadStatus
status = {
'digest': entry['d'],
'index': str(index),
'upload_ticket': self._generate_ticket(entry),
}
if self._should_push_to_gs(entry['i'], entry['s']):
status['gs_upload_url'] = self._generate_signed_url(entry['d'])
li.append(status)
# Don't use finalize url for the fake.
request = json.loads(body)
namespace = request['namespace']['namespace']
for index, i in enumerate(request['items']):
append_entry({
'd': i['digest'],
'i': i['is_isolated'],
'n': namespace,
's': i['size'],
}, index, response['items'])
logging.info('Returning %s' % response)
self.send_json(response)
elif self.path.startswith('/_ah/api/isolateservice/v1/store_inline'):
self._storage_helper(body, False)
elif self.path.startswith('/_ah/api/isolateservice/v1/finalize_gs_upload'):
self._storage_helper(body, True)
elif self.path.startswith('/_ah/api/isolateservice/v1/retrieve'):
request = json.loads(body)
namespace = request['namespace']['namespace']
data = self.server.contents[namespace].get(request['digest'])
if data is None:
logging.error(
'Failed to retrieve %s / %s', namespace, request['digest'])
self.send_json({'content': data})
elif self.path.startswith('/_ah/api/isolateservice/v1/server_details'):
self.send_json({'server_version': 'such a good version'})
else:
raise NotImplementedError(self.path)
def do_PUT(self):
logging.info('PUT %s', self.path)
if self.path.startswith('/FAKE_GCS/'):
namespace, h = self.path[len('/FAKE_GCS/'):].split('/', 1)
if self.server.store_hash_instead:
a = ALGO()
for c in self.yield_body():
a.update(c)
self.server.contents.setdefault(namespace, {})[h] = a.hexdigest()
else:
self.server.contents.setdefault(namespace, {})[h] = self.read_body()
self.send_octet_stream('')
else:
raise NotImplementedError(self.path)
class FakeIsolateServer(httpserver.Server):
_HANDLER_CLS = FakeIsolateServerHandler
def __init__(self):
super(FakeIsolateServer, self).__init__()
self._server.contents = {}
self._server.store_hash_instead = False
def store_hash_instead(self):
"""Stops saving content in memory. Used to test large files."""
self._server.store_hash_instead = True
@property
def contents(self):
return self._server.contents
def add_content_compressed(self, namespace, content):
assert not self._server.store_hash_instead
h = hash_content(content)
logging.info('add_content_compressed(%s, %s)', namespace, h)
self._server.contents.setdefault(namespace, {})[h] = base64.b64encode(
zlib.compress(content))
return h
def add_content(self, namespace, content):
assert not self._server.store_hash_instead
h = hash_content(content)
logging.info('add_content(%s, %s)', namespace, h)
self._server.contents.setdefault(namespace, {})[h] = base64.b64encode(
content)
return h