blob: 93575a46646452c14f183eeadc92498aec1fe1cf [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Archives a set of files to a server."""
__version__ = '0.2'
import functools
import hashlib
import json
import logging
import os
import re
import sys
import threading
import time
import urllib
import zlib
from third_party import colorama
from third_party.depot_tools import fix_encoding
from third_party.depot_tools import subcommand
from utils import net
from utils import threading_utils
from utils import tools
# Version of isolate protocol passed to the server in /handshake request.
ISOLATE_PROTOCOL_VERSION = '1.0'
# The number of files to check the isolate server per /pre-upload query.
# All files are sorted by likelihood of a change in the file content
# (currently file size is used to estimate this: larger the file -> larger the
# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
# and so on. Numbers here is a trade-off; the more per request, the lower the
# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
# larger values cause longer lookups, increasing the initial latency to start
# uploading, which is especially an issue for large files. This value is
# optimized for the "few thousands files to look up with minimal number of large
# files missing" case.
ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
# A list of already compressed extension types that should not receive any
# compression before being uploaded.
ALREADY_COMPRESSED_TYPES = [
'7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
'wav', 'zip'
]
# The file size to be used when we don't know the correct file size,
# generally used for .isolated files.
UNKNOWN_FILE_SIZE = None
# The size of each chunk to read when downloading and unzipping files.
ZIPPED_FILE_CHUNK = 16 * 1024
# Chunk size to use when doing disk I/O.
DISK_FILE_CHUNK = 1024 * 1024
# Chunk size to use when reading from network stream.
NET_IO_FILE_CHUNK = 16 * 1024
# Read timeout in seconds for downloads from isolate storage. If there's no
# response from the server within this timeout whole download will be aborted.
DOWNLOAD_READ_TIMEOUT = 60
# Maximum expected delay (in seconds) between successive file fetches
# in run_tha_test. If it takes longer than that, a deadlock might be happening
# and all stack frames for all threads are dumped to log.
DEADLOCK_TIMEOUT = 5 * 60
# The delay (in seconds) to wait between logging statements when retrieving
# the required files. This is intended to let the user (or buildbot) know that
# the program is still running.
DELAY_BETWEEN_UPDATES_IN_SECS = 30
# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
# specify the names here.
SUPPORTED_ALGOS = {
'md5': hashlib.md5,
'sha-1': hashlib.sha1,
'sha-512': hashlib.sha512,
}
# Used for serialization.
SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
class ConfigError(ValueError):
"""Generic failure to load a .isolated file."""
pass
class MappingError(OSError):
"""Failed to recreate the tree."""
pass
def is_valid_hash(value, algo):
"""Returns if the value is a valid hash for the corresponding algorithm."""
size = 2 * algo().digest_size
return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
def hash_file(filepath, algo):
"""Calculates the hash of a file without reading it all in memory at once.
|algo| should be one of hashlib hashing algorithm.
"""
digest = algo()
with open(filepath, 'rb') as f:
while True:
chunk = f.read(DISK_FILE_CHUNK)
if not chunk:
break
digest.update(chunk)
return digest.hexdigest()
def stream_read(stream, chunk_size):
"""Reads chunks from |stream| and yields them."""
while True:
data = stream.read(chunk_size)
if not data:
break
yield data
def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
"""Yields file content in chunks of given |chunk_size|."""
with open(filepath, 'rb') as f:
while True:
data = f.read(chunk_size)
if not data:
break
yield data
def file_write(filepath, content_generator):
"""Writes file content as generated by content_generator.
Creates the intermediary directory as needed.
Returns the number of bytes written.
Meant to be mocked out in unit tests.
"""
filedir = os.path.dirname(filepath)
if not os.path.isdir(filedir):
os.makedirs(filedir)
total = 0
with open(filepath, 'wb') as f:
for d in content_generator:
total += len(d)
f.write(d)
return total
def zip_compress(content_generator, level=7):
"""Reads chunks from |content_generator| and yields zip compressed chunks."""
compressor = zlib.compressobj(level)
for chunk in content_generator:
compressed = compressor.compress(chunk)
if compressed:
yield compressed
tail = compressor.flush(zlib.Z_FINISH)
if tail:
yield tail
def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
"""Reads zipped data from |content_generator| and yields decompressed data.
Decompresses data in small chunks (no larger than |chunk_size|) so that
zip bomb file doesn't cause zlib to preallocate huge amount of memory.
Raises IOError if data is corrupted or incomplete.
"""
decompressor = zlib.decompressobj()
compressed_size = 0
try:
for chunk in content_generator:
compressed_size += len(chunk)
data = decompressor.decompress(chunk, chunk_size)
if data:
yield data
while decompressor.unconsumed_tail:
data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
if data:
yield data
tail = decompressor.flush()
if tail:
yield tail
except zlib.error as e:
raise IOError(
'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
# Ensure all data was read and decompressed.
if decompressor.unused_data or decompressor.unconsumed_tail:
raise IOError('Not all data was decompressed')
def get_zip_compression_level(filename):
"""Given a filename calculates the ideal zip compression level to use."""
file_ext = os.path.splitext(filename)[1].lower()
# TODO(csharp): Profile to find what compression level works best.
return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
def create_directories(base_directory, files):
"""Creates the directory structure needed by the given list of files."""
logging.debug('create_directories(%s, %d)', base_directory, len(files))
# Creates the tree of directories to create.
directories = set(os.path.dirname(f) for f in files)
for item in list(directories):
while item:
directories.add(item)
item = os.path.dirname(item)
for d in sorted(directories):
if d:
os.mkdir(os.path.join(base_directory, d))
def create_links(base_directory, files):
"""Creates any links needed by the given set of files."""
for filepath, properties in files:
if 'l' not in properties:
continue
if sys.platform == 'win32':
# TODO(maruel): Create junctions or empty text files similar to what
# cygwin do?
logging.warning('Ignoring symlink %s', filepath)
continue
outfile = os.path.join(base_directory, filepath)
# symlink doesn't exist on Windows. So the 'link' property should
# never be specified for windows .isolated file.
os.symlink(properties['l'], outfile) # pylint: disable=E1101
if 'm' in properties:
lchmod = getattr(os, 'lchmod', None)
if lchmod:
lchmod(outfile, properties['m'])
def is_valid_file(filepath, size):
"""Determines if the given files appears valid.
Currently it just checks the file's size.
"""
if size == UNKNOWN_FILE_SIZE:
return os.path.isfile(filepath)
actual_size = os.stat(filepath).st_size
if size != actual_size:
logging.warning(
'Found invalid item %s; %d != %d',
os.path.basename(filepath), actual_size, size)
return False
return True
class WorkerPool(threading_utils.AutoRetryThreadPool):
"""Thread pool that automatically retries on IOError and runs a preconfigured
function.
"""
# Initial and maximum number of worker threads.
INITIAL_WORKERS = 2
MAX_WORKERS = 16
RETRIES = 5
def __init__(self):
super(WorkerPool, self).__init__(
[IOError],
self.RETRIES,
self.INITIAL_WORKERS,
self.MAX_WORKERS,
0,
'remote')
class Item(object):
"""An item to push to Storage.
It starts its life in a main thread, travels to 'contains' thread, then to
'push' thread and then finally back to the main thread.
It is never used concurrently from multiple threads.
"""
def __init__(self, digest, size, is_isolated=False):
self.digest = digest
self.size = size
self.is_isolated = is_isolated
self.compression_level = 6
self.push_state = None
def content(self, chunk_size):
"""Iterable with content of this item in chunks of given size.
Arguments:
chunk_size: preferred size of the chunk to produce, may be ignored.
"""
raise NotImplementedError()
class FileItem(Item):
"""A file to push to Storage."""
def __init__(self, path, digest, size, is_isolated):
super(FileItem, self).__init__(digest, size, is_isolated)
self.path = path
self.compression_level = get_zip_compression_level(path)
def content(self, chunk_size):
return file_read(self.path, chunk_size)
class BufferItem(Item):
"""A byte buffer to push to Storage."""
def __init__(self, buf, algo, is_isolated=False):
super(BufferItem, self).__init__(
algo(buf).hexdigest(), len(buf), is_isolated)
self.buffer = buf
def content(self, _chunk_size):
return [self.buffer]
class Storage(object):
"""Efficiently downloads or uploads large set of files via StorageApi."""
def __init__(self, storage_api, use_zip):
self.use_zip = use_zip
self._storage_api = storage_api
self._cpu_thread_pool = None
self._net_thread_pool = None
@property
def cpu_thread_pool(self):
"""ThreadPool for CPU-bound tasks like zipping."""
if self._cpu_thread_pool is None:
self._cpu_thread_pool = threading_utils.ThreadPool(
2, max(threading_utils.num_processors(), 2), 0, 'zip')
return self._cpu_thread_pool
@property
def net_thread_pool(self):
"""AutoRetryThreadPool for IO-bound tasks, retries IOError."""
if self._net_thread_pool is None:
self._net_thread_pool = WorkerPool()
return self._net_thread_pool
def close(self):
"""Waits for all pending tasks to finish."""
if self._cpu_thread_pool:
self._cpu_thread_pool.join()
self._cpu_thread_pool.close()
self._cpu_thread_pool = None
if self._net_thread_pool:
self._net_thread_pool.join()
self._net_thread_pool.close()
self._net_thread_pool = None
def __enter__(self):
"""Context manager interface."""
return self
def __exit__(self, _exc_type, _exc_value, _traceback):
"""Context manager interface."""
self.close()
return False
def upload_tree(self, indir, infiles):
"""Uploads the given tree to the isolate server.
Arguments:
indir: root directory the infiles are based in.
infiles: dict of files to upload from |indir|.
Returns:
List of items that were uploaded. All other items are already there.
"""
logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
# Convert |indir| + |infiles| into a list of FileItem objects.
# Filter out symlinks, since they are not represented by items on isolate
# server side.
items = [
FileItem(
path=os.path.join(indir, filepath),
digest=metadata['h'],
size=metadata['s'],
is_isolated=metadata.get('priority') == '0')
for filepath, metadata in infiles.iteritems()
if 'l' not in metadata
]
return self.upload_items(items)
def upload_items(self, items):
"""Uploads bunch of items to the isolate server.
Will upload only items that are missing.
Arguments:
items: list of Item instances that represents data to upload.
Returns:
List of items that were uploaded. All other items are already there.
"""
# TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
# used by swarming.py. There's no need to spawn multiple threads and try to
# do stuff in parallel: there's nothing to parallelize. 'contains' check and
# 'push' should be performed sequentially in the context of current thread.
# For each digest keep only first Item that matches it. All other items
# are just indistinguishable copies from the point of view of isolate
# server (it doesn't care about paths at all, only content and digests).
seen = {}
duplicates = 0
for item in items:
if seen.setdefault(item.digest, item) is not item:
duplicates += 1
items = seen.values()
if duplicates:
logging.info('Skipped %d duplicated files', duplicates)
# Enqueue all upload tasks.
missing = set()
channel = threading_utils.TaskChannel()
for missing_item in self.get_missing_items(items):
missing.add(missing_item)
self.async_push(
channel,
WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
missing_item)
uploaded = []
# No need to spawn deadlock detector thread if there's nothing to upload.
if missing:
with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
# Wait for all started uploads to finish.
while len(uploaded) != len(missing):
detector.ping()
item = channel.pull()
uploaded.append(item)
logging.debug(
'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
logging.info('All files are uploaded')
# Print stats.
total = len(items)
total_size = sum(f.size for f in items)
logging.info(
'Total: %6d, %9.1fkb',
total,
total_size / 1024.)
cache_hit = set(items) - missing
cache_hit_size = sum(f.size for f in cache_hit)
logging.info(
'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
len(cache_hit),
cache_hit_size / 1024.,
len(cache_hit) * 100. / total,
cache_hit_size * 100. / total_size if total_size else 0)
cache_miss = missing
cache_miss_size = sum(f.size for f in cache_miss)
logging.info(
'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
len(cache_miss),
cache_miss_size / 1024.,
len(cache_miss) * 100. / total,
cache_miss_size * 100. / total_size if total_size else 0)
return uploaded
def get_fetch_url(self, digest):
"""Returns an URL that can be used to fetch an item with given digest.
Arguments:
digest: hex digest of item to fetch.
Returns:
An URL or None if underlying protocol doesn't support this.
"""
return self._storage_api.get_fetch_url(digest)
def async_push(self, channel, priority, item):
"""Starts asynchronous push to the server in a parallel thread.
Arguments:
channel: TaskChannel that receives back |item| when upload ends.
priority: thread pool task priority for the push.
item: item to upload as instance of Item class.
"""
def push(content):
"""Pushes an item and returns its id, to pass as a result to |channel|."""
self._storage_api.push(item, content)
return item
# If zipping is not required, just start a push task.
if not self.use_zip:
self.net_thread_pool.add_task_with_channel(channel, priority, push,
item.content(DISK_FILE_CHUNK))
return
# If zipping is enabled, zip in a separate thread.
def zip_and_push():
# TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
# content right here. It will block until all file is zipped.
try:
stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
item.compression_level)
data = ''.join(stream)
except Exception as exc:
logging.error('Failed to zip \'%s\': %s', item, exc)
channel.send_exception(exc)
return
self.net_thread_pool.add_task_with_channel(
channel, priority, push, [data])
self.cpu_thread_pool.add_task(priority, zip_and_push)
def async_fetch(self, channel, priority, digest, size, sink):
"""Starts asynchronous fetch from the server in a parallel thread.
Arguments:
channel: TaskChannel that receives back |digest| when download ends.
priority: thread pool task priority for the fetch.
digest: hex digest of an item to download.
size: expected size of the item (after decompression).
sink: function that will be called as sink(generator).
"""
def fetch():
try:
# Prepare reading pipeline.
stream = self._storage_api.fetch(digest)
if self.use_zip:
stream = zip_decompress(stream, DISK_FILE_CHUNK)
# Run |stream| through verifier that will assert its size.
verifier = FetchStreamVerifier(stream, size)
# Verified stream goes to |sink|.
sink(verifier.run())
except Exception as err:
logging.warning('Failed to fetch %s: %s', digest, err)
raise
return digest
# Don't bother with zip_thread_pool for decompression. Decompression is
# really fast and most probably IO bound anyway.
self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
def get_missing_items(self, items):
"""Yields items that are missing from the server.
Issues multiple parallel queries via StorageApi's 'contains' method.
Arguments:
items: a list of Item objects to check.
Yields:
Item objects that are missing from the server.
"""
channel = threading_utils.TaskChannel()
pending = 0
# Enqueue all requests.
for batch in self.batch_items_for_check(items):
self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
self._storage_api.contains, batch)
pending += 1
# Yield results as they come in.
for _ in xrange(pending):
for missing in channel.pull():
yield missing
@staticmethod
def batch_items_for_check(items):
"""Splits list of items to check for existence on the server into batches.
Each batch corresponds to a single 'exists?' query to the server via a call
to StorageApi's 'contains' method.
Arguments:
items: a list of Item objects.
Yields:
Batches of items to query for existence in a single operation,
each batch is a list of Item objects.
"""
batch_count = 0
batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
next_queries = []
for item in sorted(items, key=lambda x: x.size, reverse=True):
next_queries.append(item)
if len(next_queries) == batch_size_limit:
yield next_queries
next_queries = []
batch_count += 1
batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
if next_queries:
yield next_queries
class FetchQueue(object):
"""Fetches items from Storage and places them into LocalCache.
It manages multiple concurrent fetch operations. Acts as a bridge between
Storage and LocalCache so that Storage and LocalCache don't depend on each
other at all.
"""
def __init__(self, storage, cache):
self.storage = storage
self.cache = cache
self._channel = threading_utils.TaskChannel()
self._pending = set()
self._accessed = set()
self._fetched = cache.cached_set()
def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
"""Starts asynchronous fetch of item |digest|."""
# Fetching it now?
if digest in self._pending:
return
# Mark this file as in use, verify_all_cached will later ensure it is still
# in cache.
self._accessed.add(digest)
# Already fetched? Notify cache to update item's LRU position.
if digest in self._fetched:
# 'touch' returns True if item is in cache and not corrupted.
if self.cache.touch(digest, size):
return
# Item is corrupted, remove it from cache and fetch it again.
self._fetched.remove(digest)
self.cache.evict(digest)
# TODO(maruel): It should look at the free disk space, the current cache
# size and the size of the new item on every new item:
# - Trim the cache as more entries are listed when free disk space is low,
# otherwise if the amount of data downloaded during the run > free disk
# space, it'll crash.
# - Make sure there's enough free disk space to fit all dependencies of
# this run! If not, abort early.
# Start fetching.
self._pending.add(digest)
self.storage.async_fetch(
self._channel, priority, digest, size,
functools.partial(self.cache.write, digest))
def wait(self, digests):
"""Starts a loop that waits for at least one of |digests| to be retrieved.
Returns the first digest retrieved.
"""
# Flush any already fetched items.
for digest in digests:
if digest in self._fetched:
return digest
# Ensure all requested items are being fetched now.
assert all(digest in self._pending for digest in digests), (
digests, self._pending)
# Wait for some requested item to finish fetching.
while self._pending:
digest = self._channel.pull()
self._pending.remove(digest)
self._fetched.add(digest)
if digest in digests:
return digest
# Should never reach this point due to assert above.
raise RuntimeError('Impossible state')
def inject_local_file(self, path, algo):
"""Adds local file to the cache as if it was fetched from storage."""
with open(path, 'rb') as f:
data = f.read()
digest = algo(data).hexdigest()
self.cache.write(digest, [data])
self._fetched.add(digest)
return digest
@property
def pending_count(self):
"""Returns number of items to be fetched."""
return len(self._pending)
def verify_all_cached(self):
"""True if all accessed items are in cache."""
return self._accessed.issubset(self.cache.cached_set())
class FetchStreamVerifier(object):
"""Verifies that fetched file is valid before passing it to the LocalCache."""
def __init__(self, stream, expected_size):
self.stream = stream
self.expected_size = expected_size
self.current_size = 0
def run(self):
"""Generator that yields same items as |stream|.
Verifies |stream| is complete before yielding a last chunk to consumer.
Also wraps IOError produced by consumer into MappingError exceptions since
otherwise Storage will retry fetch on unrelated local cache errors.
"""
# Read one chunk ahead, keep it in |stored|.
# That way a complete stream can be verified before pushing last chunk
# to consumer.
stored = None
for chunk in self.stream:
assert chunk is not None
if stored is not None:
self._inspect_chunk(stored, is_last=False)
try:
yield stored
except IOError as exc:
raise MappingError('Failed to store an item in cache: %s' % exc)
stored = chunk
if stored is not None:
self._inspect_chunk(stored, is_last=True)
try:
yield stored
except IOError as exc:
raise MappingError('Failed to store an item in cache: %s' % exc)
def _inspect_chunk(self, chunk, is_last):
"""Called for each fetched chunk before passing it to consumer."""
self.current_size += len(chunk)
if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
(self.expected_size != self.current_size)):
raise IOError('Incorrect file size: expected %d, got %d' % (
self.expected_size, self.current_size))
class StorageApi(object):
"""Interface for classes that implement low-level storage operations."""
def get_fetch_url(self, digest):
"""Returns an URL that can be used to fetch an item with given digest.
Arguments:
digest: hex digest of item to fetch.
Returns:
An URL or None if the protocol doesn't support this.
"""
raise NotImplementedError()
def fetch(self, digest):
"""Fetches an object and yields its content.
Arguments:
digest: hash digest of item to download.
Yields:
Chunks of downloaded item (as str objects).
"""
raise NotImplementedError()
def push(self, item, content):
"""Uploads an |item| with content generated by |content| generator.
Arguments:
item: Item object that holds information about an item being pushed.
content: a generator that yields chunks to push.
Returns:
None.
"""
raise NotImplementedError()
def contains(self, items):
"""Checks for existence of given |items| on the server.
Mutates |items| by assigning opaque implement specific object to Item's
push_state attribute on missing entries in the datastore.
Arguments:
items: list of Item objects.
Returns:
A list of items missing on server as a list of Item objects.
"""
raise NotImplementedError()
class IsolateServer(StorageApi):
"""StorageApi implementation that downloads and uploads to Isolate Server.
It uploads and downloads directly from Google Storage whenever appropriate.
"""
class _PushState(object):
"""State needed to call .push(), to be stored in Item.push_state."""
def __init__(self, upload_url, finalize_url):
self.upload_url = upload_url
self.finalize_url = finalize_url
self.uploaded = False
self.finalized = False
def __init__(self, base_url, namespace):
super(IsolateServer, self).__init__()
assert base_url.startswith('http'), base_url
self.base_url = base_url.rstrip('/')
self.namespace = namespace
self._lock = threading.Lock()
self._server_caps = None
@staticmethod
def _generate_handshake_request():
"""Returns a dict to be sent as handshake request body."""
# TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
return {
'client_app_version': __version__,
'fetcher': True,
'protocol_version': ISOLATE_PROTOCOL_VERSION,
'pusher': True,
}
@staticmethod
def _validate_handshake_response(caps):
"""Validates and normalizes handshake response."""
logging.info('Protocol version: %s', caps['protocol_version'])
logging.info('Server version: %s', caps['server_app_version'])
if caps.get('error'):
raise MappingError(caps['error'])
if not caps['access_token']:
raise ValueError('access_token is missing')
return caps
@property
def _server_capabilities(self):
"""Performs handshake with the server if not yet done.
Returns:
Server capabilities dictionary as returned by /handshake endpoint.
Raises:
MappingError if server rejects the handshake.
"""
# TODO(maruel): Make this request much earlier asynchronously while the
# files are being enumerated.
with self._lock:
if self._server_caps is None:
request_body = json.dumps(
self._generate_handshake_request(), separators=(',', ':'))
response = net.url_read(
url=self.base_url + '/content-gs/handshake',
data=request_body,
content_type='application/json',
method='POST')
if response is None:
raise MappingError('Failed to perform handshake.')
try:
caps = json.loads(response)
if not isinstance(caps, dict):
raise ValueError('Expecting JSON dict')
self._server_caps = self._validate_handshake_response(caps)
except (ValueError, KeyError, TypeError) as exc:
# KeyError exception has very confusing str conversion: it's just a
# missing key value and nothing else. So print exception class name
# as well.
raise MappingError('Invalid handshake response (%s): %s' % (
exc.__class__.__name__, exc))
return self._server_caps
def get_fetch_url(self, digest):
assert isinstance(digest, basestring)
return '%s/content-gs/retrieve/%s/%s' % (
self.base_url, self.namespace, digest)
def fetch(self, digest):
source_url = self.get_fetch_url(digest)
logging.debug('download_file(%s)', source_url)
# Because the app engine DB is only eventually consistent, retry 404 errors
# because the file might just not be visible yet (even though it has been
# uploaded).
connection = net.url_open(
source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
if not connection:
raise IOError('Unable to open connection to %s' % source_url)
return stream_read(connection, NET_IO_FILE_CHUNK)
def push(self, item, content):
assert isinstance(item, Item)
assert isinstance(item.push_state, IsolateServer._PushState)
assert not item.push_state.finalized
# TODO(vadimsh): Do not read from |content| generator when retrying push.
# If |content| is indeed a generator, it can not be re-winded back
# to the beginning of the stream. A retry will find it exhausted. A possible
# solution is to wrap |content| generator with some sort of caching
# restartable generator. It should be done alongside streaming support
# implementation.
# This push operation may be a retry after failed finalization call below,
# no need to reupload contents in that case.
if not item.push_state.uploaded:
# A cheezy way to avoid memcpy of (possibly huge) file, until streaming
# upload support is implemented.
if isinstance(content, list) and len(content) == 1:
content = content[0]
else:
content = ''.join(content)
# PUT file to |upload_url|.
response = net.url_read(
url=item.push_state.upload_url,
data=content,
content_type='application/octet-stream',
method='PUT')
if response is None:
raise IOError('Failed to upload a file %s to %s' % (
item.digest, item.push_state.upload_url))
item.push_state.uploaded = True
else:
logging.info(
'A file %s already uploaded, retrying finalization only', item.digest)
# Optionally notify the server that it's done.
if item.push_state.finalize_url:
# TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
# send it to isolated server. That way isolate server can verify that
# the data safely reached Google Storage (GS provides MD5 and CRC32C of
# stored files).
response = net.url_read(
url=item.push_state.finalize_url,
data='',
content_type='application/json',
method='POST')
if response is None:
raise IOError('Failed to finalize an upload of %s' % item.digest)
item.push_state.finalized = True
def contains(self, items):
logging.info('Checking existence of %d files...', len(items))
# Request body is a json encoded list of dicts.
body = [
{
'h': item.digest,
's': item.size,
'i': int(item.is_isolated),
} for item in items
]
query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
self.base_url,
self.namespace,
urllib.quote(self._server_capabilities['access_token']))
response_body = net.url_read(
url=query_url,
data=json.dumps(body, separators=(',', ':')),
content_type='application/json',
method='POST')
if response_body is None:
raise MappingError('Failed to execute /pre-upload query')
# Response body is a list of push_urls (or null if file is already present).
try:
response = json.loads(response_body)
if not isinstance(response, list):
raise ValueError('Expecting response with json-encoded list')
if len(response) != len(items):
raise ValueError(
'Incorrect number of items in the list, expected %d, '
'but got %d' % (len(items), len(response)))
except ValueError as err:
raise MappingError(
'Invalid response from server: %s, body is %s' % (err, response_body))
# Pick Items that are missing, attach _PushState to them.
missing_items = []
for i, push_urls in enumerate(response):
if push_urls:
assert len(push_urls) == 2, str(push_urls)
item = items[i]
assert item.push_state is None
item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
missing_items.append(item)
logging.info('Queried %d files, %d cache hit',
len(items), len(items) - len(missing_items))
return missing_items
class FileSystem(StorageApi):
"""StorageApi implementation that fetches data from the file system.
The common use case is a NFS/CIFS file server that is mounted locally that is
used to fetch the file on a local partition.
"""
def __init__(self, base_path):
super(FileSystem, self).__init__()
self.base_path = base_path
def get_fetch_url(self, digest):
return None
def fetch(self, digest):
assert isinstance(digest, basestring)
return file_read(os.path.join(self.base_path, digest))
def push(self, item, content):
assert isinstance(item, Item)
file_write(os.path.join(self.base_path, item.digest), content)
def contains(self, items):
return [
item for item in items
if not os.path.exists(os.path.join(self.base_path, item.digest))
]
class LocalCache(object):
"""Local cache that stores objects fetched via Storage.
It can be accessed concurrently from multiple threads, so it should protect
its internal state with some lock.
"""
def __enter__(self):
"""Context manager interface."""
return self
def __exit__(self, _exc_type, _exec_value, _traceback):
"""Context manager interface."""
return False
def cached_set(self):
"""Returns a set of all cached digests (always a new object)."""
raise NotImplementedError()
def touch(self, digest, size):
"""Ensures item is not corrupted and updates its LRU position.
Arguments:
digest: hash digest of item to check.
size: expected size of this item.
Returns:
True if item is in cache and not corrupted.
"""
raise NotImplementedError()
def evict(self, digest):
"""Removes item from cache if it's there."""
raise NotImplementedError()
def read(self, digest):
"""Returns contents of the cached item as a single str."""
raise NotImplementedError()
def write(self, digest, content):
"""Reads data from |content| generator and stores it in cache."""
raise NotImplementedError()
def link(self, digest, dest, file_mode=None):
"""Ensures file at |dest| has same content as cached |digest|."""
raise NotImplementedError()
class MemoryCache(LocalCache):
"""LocalCache implementation that stores everything in memory."""
def __init__(self):
super(MemoryCache, self).__init__()
# Let's not assume dict is thread safe.
self._lock = threading.Lock()
self._contents = {}
def cached_set(self):
with self._lock:
return set(self._contents)
def touch(self, digest, size):
with self._lock:
return digest in self._contents
def evict(self, digest):
with self._lock:
self._contents.pop(digest, None)
def read(self, digest):
with self._lock:
return self._contents[digest]
def write(self, digest, content):
# Assemble whole stream before taking the lock.
data = ''.join(content)
with self._lock:
self._contents[digest] = data
def link(self, digest, dest, file_mode=None):
file_write(dest, [self.read(digest)])
if file_mode is not None:
os.chmod(dest, file_mode)
def get_hash_algo(_namespace):
"""Return hash algorithm class to use when uploading to given |namespace|."""
# TODO(vadimsh): Implement this at some point.
return hashlib.sha1
def is_namespace_with_compression(namespace):
"""Returns True if given |namespace| stores compressed objects."""
return namespace.endswith(('-gzip', '-deflate'))
def get_storage_api(file_or_url, namespace):
"""Returns an object that implements StorageApi interface."""
if re.match(r'^https?://.+$', file_or_url):
return IsolateServer(file_or_url, namespace)
else:
return FileSystem(file_or_url)
def get_storage(file_or_url, namespace):
"""Returns Storage class configured with appropriate StorageApi instance."""
return Storage(
get_storage_api(file_or_url, namespace),
is_namespace_with_compression(namespace))
def upload_tree(base_url, indir, infiles, namespace):
"""Uploads the given tree to the given url.
Arguments:
base_url: The base url, it is assume that |base_url|/has/ can be used to
query if an element was already uploaded, and |base_url|/store/
can be used to upload a new element.
indir: Root directory the infiles are based in.
infiles: dict of files to upload from |indir| to |base_url|.
namespace: The namespace to use on the server.
"""
with get_storage(base_url, namespace) as storage:
storage.upload_tree(indir, infiles)
return 0
def load_isolated(content, os_flavor, algo):
"""Verifies the .isolated file is valid and loads this object with the json
data.
Arguments:
- content: raw serialized content to load.
- os_flavor: OS to load this file on. Optional.
- algo: hashlib algorithm class. Used to confirm the algorithm matches the
algorithm used on the Isolate Server.
"""
try:
data = json.loads(content)
except ValueError:
raise ConfigError('Failed to parse: %s...' % content[:100])
if not isinstance(data, dict):
raise ConfigError('Expected dict, got %r' % data)
# Check 'version' first, since it could modify the parsing after.
value = data.get('version', '1.0')
if not isinstance(value, basestring):
raise ConfigError('Expected string, got %r' % value)
if not re.match(r'^(\d+)\.(\d+)$', value):
raise ConfigError('Expected a compatible version, got %r' % value)
if value.split('.', 1)[0] != '1':
raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
if algo is None:
# Default the algorithm used in the .isolated file itself, falls back to
# 'sha-1' if unspecified.
algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
for key, value in data.iteritems():
if key == 'algo':
if not isinstance(value, basestring):
raise ConfigError('Expected string, got %r' % value)
if value not in SUPPORTED_ALGOS:
raise ConfigError(
'Expected one of \'%s\', got %r' %
(', '.join(sorted(SUPPORTED_ALGOS)), value))
if value != SUPPORTED_ALGOS_REVERSE[algo]:
raise ConfigError(
'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
elif key == 'command':
if not isinstance(value, list):
raise ConfigError('Expected list, got %r' % value)
if not value:
raise ConfigError('Expected non-empty command')
for subvalue in value:
if not isinstance(subvalue, basestring):
raise ConfigError('Expected string, got %r' % subvalue)
elif key == 'files':
if not isinstance(value, dict):
raise ConfigError('Expected dict, got %r' % value)
for subkey, subvalue in value.iteritems():
if not isinstance(subkey, basestring):
raise ConfigError('Expected string, got %r' % subkey)
if not isinstance(subvalue, dict):
raise ConfigError('Expected dict, got %r' % subvalue)
for subsubkey, subsubvalue in subvalue.iteritems():
if subsubkey == 'l':
if not isinstance(subsubvalue, basestring):
raise ConfigError('Expected string, got %r' % subsubvalue)
elif subsubkey == 'm':
if not isinstance(subsubvalue, int):
raise ConfigError('Expected int, got %r' % subsubvalue)
elif subsubkey == 'h':
if not is_valid_hash(subsubvalue, algo):
raise ConfigError('Expected sha-1, got %r' % subsubvalue)
elif subsubkey == 's':
if not isinstance(subsubvalue, int):
raise ConfigError('Expected int, got %r' % subsubvalue)
else:
raise ConfigError('Unknown subsubkey %s' % subsubkey)
if bool('h' in subvalue) == bool('l' in subvalue):
raise ConfigError(
'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
subvalue)
if bool('h' in subvalue) != bool('s' in subvalue):
raise ConfigError(
'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
subvalue)
if bool('s' in subvalue) == bool('l' in subvalue):
raise ConfigError(
'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
subvalue)
if bool('l' in subvalue) and bool('m' in subvalue):
raise ConfigError(
'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
subvalue)
elif key == 'includes':
if not isinstance(value, list):
raise ConfigError('Expected list, got %r' % value)
if not value:
raise ConfigError('Expected non-empty includes list')
for subvalue in value:
if not is_valid_hash(subvalue, algo):
raise ConfigError('Expected sha-1, got %r' % subvalue)
elif key == 'read_only':
if not isinstance(value, bool):
raise ConfigError('Expected bool, got %r' % value)
elif key == 'relative_cwd':
if not isinstance(value, basestring):
raise ConfigError('Expected string, got %r' % value)
elif key == 'os':
if os_flavor and value != os_flavor:
raise ConfigError(
'Expected \'os\' to be \'%s\' but got \'%s\'' %
(os_flavor, value))
elif key == 'version':
# Already checked above.
pass
else:
raise ConfigError('Unknown key %r' % key)
# Automatically fix os.path.sep if necessary. While .isolated files are always
# in the the native path format, someone could want to download an .isolated
# tree from another OS.
wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
if 'files' in data:
data['files'] = dict(
(k.replace(wrong_path_sep, os.path.sep), v)
for k, v in data['files'].iteritems())
for v in data['files'].itervalues():
if 'l' in v:
v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
if 'relative_cwd' in data:
data['relative_cwd'] = data['relative_cwd'].replace(
wrong_path_sep, os.path.sep)
return data
class IsolatedFile(object):
"""Represents a single parsed .isolated file."""
def __init__(self, obj_hash, algo):
"""|obj_hash| is really the sha-1 of the file."""
logging.debug('IsolatedFile(%s)' % obj_hash)
self.obj_hash = obj_hash
self.algo = algo
# Set once all the left-side of the tree is parsed. 'Tree' here means the
# .isolate and all the .isolated files recursively included by it with
# 'includes' key. The order of each sha-1 in 'includes', each representing a
# .isolated file in the hash table, is important, as the later ones are not
# processed until the firsts are retrieved and read.
self.can_fetch = False
# Raw data.
self.data = {}
# A IsolatedFile instance, one per object in self.includes.
self.children = []
# Set once the .isolated file is loaded.
self._is_parsed = False
# Set once the files are fetched.
self.files_fetched = False
def load(self, os_flavor, content):
"""Verifies the .isolated file is valid and loads this object with the json
data.
"""
logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
assert not self._is_parsed
self.data = load_isolated(content, os_flavor, self.algo)
self.children = [
IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
]
self._is_parsed = True
def fetch_files(self, fetch_queue, files):
"""Adds files in this .isolated file not present in |files| dictionary.
Preemptively request files.
Note that |files| is modified by this function.
"""
assert self.can_fetch
if not self._is_parsed or self.files_fetched:
return
logging.debug('fetch_files(%s)' % self.obj_hash)
for filepath, properties in self.data.get('files', {}).iteritems():
# Root isolated has priority on the files being mapped. In particular,
# overriden files must not be fetched.
if filepath not in files:
files[filepath] = properties
if 'h' in properties:
# Preemptively request files.
logging.debug('fetching %s' % filepath)
fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
self.files_fetched = True
class Settings(object):
"""Results of a completely parsed .isolated file."""
def __init__(self):
self.command = []
self.files = {}
self.read_only = None
self.relative_cwd = None
# The main .isolated file, a IsolatedFile instance.
self.root = None
def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
"""Loads the .isolated and all the included .isolated asynchronously.
It enables support for "included" .isolated files. They are processed in
strict order but fetched asynchronously from the cache. This is important so
that a file in an included .isolated file that is overridden by an embedding
.isolated file is not fetched needlessly. The includes are fetched in one
pass and the files are fetched as soon as all the ones on the left-side
of the tree were fetched.
The prioritization is very important here for nested .isolated files.
'includes' have the highest priority and the algorithm is optimized for both
deep and wide trees. A deep one is a long link of .isolated files referenced
one at a time by one item in 'includes'. A wide one has a large number of
'includes' in a single .isolated file. 'left' is defined as an included
.isolated file earlier in the 'includes' list. So the order of the elements
in 'includes' is important.
"""
self.root = IsolatedFile(root_isolated_hash, algo)
# Isolated files being retrieved now: hash -> IsolatedFile instance.
pending = {}
# Set of hashes of already retrieved items to refuse recursive includes.
seen = set()
def retrieve(isolated_file):
h = isolated_file.obj_hash
if h in seen:
raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
assert h not in pending
seen.add(h)
pending[h] = isolated_file
fetch_queue.add(WorkerPool.HIGH, h)
retrieve(self.root)
while pending:
item_hash = fetch_queue.wait(pending)
item = pending.pop(item_hash)
item.load(os_flavor, fetch_queue.cache.read(item_hash))
if item_hash == root_isolated_hash:
# It's the root item.
item.can_fetch = True
for new_child in item.children:
retrieve(new_child)
# Traverse the whole tree to see if files can now be fetched.
self._traverse_tree(fetch_queue, self.root)
def check(n):
return all(check(x) for x in n.children) and n.files_fetched
assert check(self.root)
self.relative_cwd = self.relative_cwd or ''
self.read_only = self.read_only or False
def _traverse_tree(self, fetch_queue, node):
if node.can_fetch:
if not node.files_fetched:
self._update_self(fetch_queue, node)
will_break = False
for i in node.children:
if not i.can_fetch:
if will_break:
break
# Automatically mark the first one as fetcheable.
i.can_fetch = True
will_break = True
self._traverse_tree(fetch_queue, i)
def _update_self(self, fetch_queue, node):
node.fetch_files(fetch_queue, self.files)
# Grabs properties.
if not self.command and node.data.get('command'):
# Ensure paths are correctly separated on windows.
self.command = node.data['command']
if self.command:
self.command[0] = self.command[0].replace('/', os.path.sep)
self.command = tools.fix_python_path(self.command)
if self.read_only is None and node.data.get('read_only') is not None:
self.read_only = node.data['read_only']
if (self.relative_cwd is None and
node.data.get('relative_cwd') is not None):
self.relative_cwd = node.data['relative_cwd']
def fetch_isolated(
isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
"""Aggressively downloads the .isolated file(s), then download all the files.
Arguments:
isolated_hash: hash of the root *.isolated file.
storage: Storage class that communicates with isolate storage.
cache: LocalCache class that knows how to store and map files locally.
algo: hash algorithm to use.
outdir: Output directory to map file tree to.
os_flavor: OS flavor to choose when reading sections of *.isolated file.
require_command: Ensure *.isolated specifies a command to run.
Returns:
Settings object that holds details about loaded *.isolated file.
"""
with cache:
fetch_queue = FetchQueue(storage, cache)
settings = Settings()
with tools.Profiler('GetIsolateds'):
# Optionally support local files by manually adding them to cache.
if not is_valid_hash(isolated_hash, algo):
isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
# Load all *.isolated and start loading rest of the files.
settings.load(fetch_queue, isolated_hash, os_flavor, algo)
if require_command and not settings.command:
# TODO(vadimsh): All fetch operations are already enqueue and there's no
# easy way to cancel them.
raise ConfigError('No command to run')
with tools.Profiler('GetRest'):
# Create file system hierarchy.
if not os.path.isdir(outdir):
os.makedirs(outdir)
create_directories(outdir, settings.files)
create_links(outdir, settings.files.iteritems())
# Ensure working directory exists.
cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
if not os.path.isdir(cwd):
os.makedirs(cwd)
# Multimap: digest -> list of pairs (path, props).
remaining = {}
for filepath, props in settings.files.iteritems():
if 'h' in props:
remaining.setdefault(props['h'], []).append((filepath, props))
# Now block on the remaining files to be downloaded and mapped.
logging.info('Retrieving remaining files (%d of them)...',
fetch_queue.pending_count)
last_update = time.time()
with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
while remaining:
detector.ping()
# Wait for any item to finish fetching to cache.
digest = fetch_queue.wait(remaining)
# Link corresponding files to a fetched item in cache.
for filepath, props in remaining.pop(digest):
cache.link(digest, os.path.join(outdir, filepath), props.get('m'))
# Report progress.
duration = time.time() - last_update
if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
msg = '%d files remaining...' % len(remaining)
print msg
logging.info(msg)
last_update = time.time()
# Cache could evict some items we just tried to fetch, it's a fatal error.
if not fetch_queue.verify_all_cached():
raise MappingError('Cache is too small to hold all requested files')
return settings
@subcommand.usage('<file1..fileN> or - to read from stdin')
def CMDarchive(parser, args):
"""Archives data to the server."""
options, files = parser.parse_args(args)
if files == ['-']:
files = sys.stdin.readlines()
if not files:
parser.error('Nothing to upload')
# Load the necessary metadata.
# TODO(maruel): Use a worker pool to upload as the hashing is being done.
infiles = dict(
(
f,
{
's': os.stat(f).st_size,
'h': hash_file(f, get_hash_algo(options.namespace)),
}
)
for f in files)
with tools.Profiler('Archive'):
ret = upload_tree(
base_url=options.isolate_server,
indir=os.getcwd(),
infiles=infiles,
namespace=options.namespace)
if not ret:
print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
return ret
def CMDdownload(parser, args):
"""Download data from the server.
It can either download individual files or a complete tree from a .isolated
file.
"""
parser.add_option(
'-i', '--isolated', metavar='HASH',
help='hash of an isolated file, .isolated file content is discarded, use '
'--file if you need it')
parser.add_option(
'-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
help='hash and destination of a file, can be used multiple times')
parser.add_option(
'-t', '--target', metavar='DIR', default=os.getcwd(),
help='destination directory')
options, args = parser.parse_args(args)
if args:
parser.error('Unsupported arguments: %s' % args)
if bool(options.isolated) == bool(options.file):
parser.error('Use one of --isolated or --file, and only one.')
options.target = os.path.abspath(options.target)
storage = get_storage(options.isolate_server, options.namespace)
cache = MemoryCache()
algo = get_hash_algo(options.namespace)
# Fetching individual files.
if options.file:
channel = threading_utils.TaskChannel()
pending = {}
for digest, dest in options.file:
pending[digest] = dest
storage.async_fetch(
channel,
WorkerPool.MED,
digest,
UNKNOWN_FILE_SIZE,
functools.partial(file_write, os.path.join(options.target, dest)))
while pending:
fetched = channel.pull()
dest = pending.pop(fetched)
logging.info('%s: %s', fetched, dest)
# Fetching whole isolated tree.
if options.isolated:
settings = fetch_isolated(
isolated_hash=options.isolated,
storage=storage,
cache=cache,
algo=algo,
outdir=options.target,
os_flavor=None,
require_command=False)
rel = os.path.join(options.target, settings.relative_cwd)
print('To run this test please run from the directory %s:' %
os.path.join(options.target, rel))
print(' ' + ' '.join(settings.command))
return 0
class OptionParserIsolateServer(tools.OptionParserWithLogging):
def __init__(self, **kwargs):
tools.OptionParserWithLogging.__init__(self, **kwargs)
self.add_option(
'-I', '--isolate-server',
metavar='URL', default='',
help='Isolate server to use')
self.add_option(
'--namespace', default='default-gzip',
help='The namespace to use on the server, default: %default')
def parse_args(self, *args, **kwargs):
options, args = tools.OptionParserWithLogging.parse_args(
self, *args, **kwargs)
options.isolate_server = options.isolate_server.rstrip('/')
if not options.isolate_server:
self.error('--isolate-server is required.')
return options, args
def main(args):
dispatcher = subcommand.CommandDispatcher(__name__)
try:
return dispatcher.execute(
OptionParserIsolateServer(version=__version__), args)
except Exception as e:
tools.report_error(e)
return 1
if __name__ == '__main__':
fix_encoding.fix_encoding()
tools.disable_buffering()
colorama.init()
sys.exit(main(sys.argv[1:]))