blob: 92abce26456451d91907fd35e1c83d15b35295ea [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Reads a manifest, creates a tree of hardlinks and runs the test.
Keeps a local cache.
"""
import ctypes
import hashlib
import json
import logging
import optparse
import os
import Queue
import re
import shutil
import stat
import subprocess
import sys
import tempfile
import threading
import time
import urllib
# Types of action accepted by recreate_tree().
HARDLINK, SYMLINK, COPY = range(1, 4)
RE_IS_SHA1 = re.compile(r'^[a-fA-F0-9]{40}$')
class ConfigError(ValueError):
"""Generic failure to load a manifest."""
pass
class MappingError(OSError):
"""Failed to recreate the tree."""
pass
def get_flavor():
"""Returns the system default flavor. Copied from gyp/pylib/gyp/common.py."""
flavors = {
'cygwin': 'win',
'win32': 'win',
'darwin': 'mac',
'sunos5': 'solaris',
'freebsd7': 'freebsd',
'freebsd8': 'freebsd',
}
return flavors.get(sys.platform, 'linux')
def os_link(source, link_name):
"""Add support for os.link() on Windows."""
if sys.platform == 'win32':
if not ctypes.windll.kernel32.CreateHardLinkW(
unicode(link_name), unicode(source), 0):
raise OSError()
else:
os.link(source, link_name)
def readable_copy(outfile, infile):
"""Makes a copy of the file that is readable by everyone."""
shutil.copy(infile, outfile)
read_enabled_mode = (os.stat(outfile).st_mode | stat.S_IRUSR |
stat.S_IRGRP | stat.S_IROTH)
os.chmod(outfile, read_enabled_mode)
def link_file(outfile, infile, action):
"""Links a file. The type of link depends on |action|."""
logging.debug('Mapping %s to %s' % (infile, outfile))
if action not in (HARDLINK, SYMLINK, COPY):
raise ValueError('Unknown mapping action %s' % action)
if not os.path.isfile(infile):
raise MappingError('%s is missing' % infile)
if os.path.isfile(outfile):
raise MappingError(
'%s already exist; insize:%d; outsize:%d' %
(outfile, os.stat(infile).st_size, os.stat(outfile).st_size))
if action == COPY:
readable_copy(outfile, infile)
elif action == SYMLINK and sys.platform != 'win32':
# On windows, symlink are converted to hardlink and fails over to copy.
os.symlink(infile, outfile)
else:
try:
os_link(infile, outfile)
except OSError:
# Probably a different file system.
logging.warn(
'Failed to hardlink, failing back to copy %s to %s' % (
infile, outfile))
readable_copy(outfile, infile)
def _set_write_bit(path, read_only):
"""Sets or resets the executable bit on a file or directory."""
mode = os.lstat(path).st_mode
if read_only:
mode = mode & 0500
else:
mode = mode | 0200
if hasattr(os, 'lchmod'):
os.lchmod(path, mode) # pylint: disable=E1101
else:
if stat.S_ISLNK(mode):
# Skip symlink without lchmod() support.
logging.debug('Can\'t change +w bit on symlink %s' % path)
return
# TODO(maruel): Implement proper DACL modification on Windows.
os.chmod(path, mode)
def make_writable(root, read_only):
"""Toggle the writable bit on a directory tree."""
root = os.path.abspath(root)
for dirpath, dirnames, filenames in os.walk(root, topdown=True):
for filename in filenames:
_set_write_bit(os.path.join(dirpath, filename), read_only)
for dirname in dirnames:
_set_write_bit(os.path.join(dirpath, dirname), read_only)
def rmtree(root):
"""Wrapper around shutil.rmtree() to retry automatically on Windows."""
make_writable(root, False)
if sys.platform == 'win32':
for i in range(3):
try:
shutil.rmtree(root)
break
except WindowsError: # pylint: disable=E0602
delay = (i+1)*2
print >> sys.stderr, (
'The test has subprocess outliving it. Sleep %d seconds.' % delay)
time.sleep(delay)
else:
shutil.rmtree(root)
def is_same_filesystem(path1, path2):
"""Returns True if both paths are on the same filesystem.
This is required to enable the use of hardlinks.
"""
assert os.path.isabs(path1), path1
assert os.path.isabs(path2), path2
if sys.platform == 'win32':
# If the drive letter mismatches, assume it's a separate partition.
# TODO(maruel): It should look at the underlying drive, a drive letter could
# be a mount point to a directory on another drive.
assert re.match(r'^[a-zA-Z]\:\\.*', path1), path1
assert re.match(r'^[a-zA-Z]\:\\.*', path2), path2
if path1[0].lower() != path2[0].lower():
return False
return os.stat(path1).st_dev == os.stat(path2).st_dev
def get_free_space(path):
"""Returns the number of free bytes."""
if sys.platform == 'win32':
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
ctypes.c_wchar_p(path), None, None, ctypes.pointer(free_bytes))
return free_bytes.value
f = os.statvfs(path)
return f.f_bfree * f.f_frsize
def make_temp_dir(prefix, root_dir):
"""Returns a temporary directory on the same file system as root_dir."""
base_temp_dir = None
if not is_same_filesystem(root_dir, tempfile.gettempdir()):
base_temp_dir = os.path.dirname(root_dir)
return tempfile.mkdtemp(prefix=prefix, dir=base_temp_dir)
def load_manifest(content):
"""Verifies the manifest is valid and loads this object with the json data.
"""
try:
data = json.loads(content)
except ValueError:
raise ConfigError('Failed to parse: %s...' % content[:100])
if not isinstance(data, dict):
raise ConfigError('Expected dict, got %r' % data)
for key, value in data.iteritems():
if key == 'command':
if not isinstance(value, list):
raise ConfigError('Expected list, got %r' % value)
for subvalue in value:
if not isinstance(subvalue, basestring):
raise ConfigError('Expected string, got %r' % subvalue)
elif key == 'files':
if not isinstance(value, dict):
raise ConfigError('Expected dict, got %r' % value)
for subkey, subvalue in value.iteritems():
if not isinstance(subkey, basestring):
raise ConfigError('Expected string, got %r' % subkey)
if not isinstance(subvalue, dict):
raise ConfigError('Expected dict, got %r' % subvalue)
for subsubkey, subsubvalue in subvalue.iteritems():
if subsubkey == 'link':
if not isinstance(subsubvalue, basestring):
raise ConfigError('Expected string, got %r' % subsubvalue)
elif subsubkey == 'mode':
if not isinstance(subsubvalue, int):
raise ConfigError('Expected int, got %r' % subsubvalue)
elif subsubkey == 'sha-1':
if not RE_IS_SHA1.match(subsubvalue):
raise ConfigError('Expected sha-1, got %r' % subsubvalue)
elif subsubkey == 'size':
if not isinstance(subsubvalue, int):
raise ConfigError('Expected int, got %r' % subsubvalue)
elif subsubkey == 'timestamp':
if not isinstance(subsubvalue, int):
raise ConfigError('Expected int, got %r' % subsubvalue)
elif subsubkey == 'touched_only':
if not isinstance(subsubvalue, bool):
raise ConfigError('Expected bool, got %r' % subsubvalue)
else:
raise ConfigError('Unknown subsubkey %s' % subsubkey)
if bool('sha-1' in subvalue) and bool('link' in subvalue):
raise ConfigError(
'Did not expect both \'sha-1\' and \'link\', got: %r' % subvalue)
elif key == 'includes':
if not isinstance(value, list):
raise ConfigError('Expected list, got %r' % value)
for subvalue in value:
if not RE_IS_SHA1.match(subvalue):
raise ConfigError('Expected sha-1, got %r' % subvalue)
elif key == 'read_only':
if not isinstance(value, bool):
raise ConfigError('Expected bool, got %r' % value)
elif key == 'relative_cwd':
if not isinstance(value, basestring):
raise ConfigError('Expected string, got %r' % value)
elif key == 'os':
if value != get_flavor():
raise ConfigError(
'Expected \'os\' to be \'%s\' but got \'%s\'' %
(get_flavor(), value))
else:
raise ConfigError('Unknown key %s' % key)
return data
def fix_python_path(cmd):
"""Returns the fixed command line to call the right python executable."""
out = cmd[:]
if out[0] == 'python':
out[0] = sys.executable
elif out[0].endswith('.py'):
out.insert(0, sys.executable)
return out
class Profiler(object):
def __init__(self, name):
self.name = name
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, _exc_type, _exec_value, _traceback):
time_taken = time.time() - self.start_time
logging.info('Profiling: Section %s took %3.3f seconds',
self.name, time_taken)
class Remote(object):
"""Priority based worker queue to fetch or upload files from a
content-address server. Any function may be given as the fetcher/upload,
as long as it takes two inputs (the item contents, and their relative
destination).
Supports local file system, CIFS or http remotes.
When the priority of items is equals, works in strict FIFO mode.
"""
# Initial and maximum number of worker threads.
INITIAL_WORKERS = 2
MAX_WORKERS = 16
# Priorities.
LOW, MED, HIGH = (1<<8, 2<<8, 3<<8)
INTERNAL_PRIORITY_BITS = (1<<8) - 1
RETRIES = 5
def __init__(self, destination_root):
# Function to fetch a remote object or upload to a remote location..
self._do_item = self.get_file_handler(destination_root)
# Contains tuple(priority, index, obj, destination).
self._queue = Queue.PriorityQueue()
# Contains tuple(priority, index, obj).
self._done = Queue.PriorityQueue()
# Contains generated exceptions that haven't been handled yet.
self._exceptions = Queue.Queue()
# To keep FIFO ordering in self._queue. It is assumed xrange's iterator is
# thread-safe.
self._next_index = xrange(0, 1<<30).__iter__().next
# Control access to the following member.
self._ready_lock = threading.Lock()
# Number of threads in wait state.
self._ready = 0
# Control access to the following member.
self._workers_lock = threading.Lock()
self._workers = []
for _ in range(self.INITIAL_WORKERS):
self._add_worker()
def join(self):
"""Blocks until the queue is empty."""
self._queue.join()
def next_exception(self):
"""Returns the next unhandled exception, or None if there is
no exception."""
try:
return self._exceptions.get_nowait()
except Queue.Empty:
return None
def add_item(self, priority, obj, dest):
"""Retrieves an object from the remote data store.
The smaller |priority| gets fetched first.
Thread-safe.
"""
assert (priority & self.INTERNAL_PRIORITY_BITS) == 0
self._add_to_queue(priority, obj, dest)
def get_result(self):
"""Returns the next file that was successfully fetched."""
r = self._done.get()
if r[0] == -1:
# It's an exception.
raise r[2][0], r[2][1], r[2][2]
return r[2]
def _add_to_queue(self, priority, obj, dest):
with self._ready_lock:
start_new_worker = not self._ready
self._queue.put((priority, self._next_index(), obj, dest))
if start_new_worker:
self._add_worker()
def _add_worker(self):
"""Add one worker thread if there isn't too many. Thread-safe."""
with self._workers_lock:
if len(self._workers) >= self.MAX_WORKERS:
return False
worker = threading.Thread(target=self._run)
self._workers.append(worker)
worker.daemon = True
worker.start()
def _step_done(self, result):
"""Worker helper function"""
self._done.put(result)
self._queue.task_done()
if result[0] == -1:
self._exceptions.put(sys.exc_info())
def _run(self):
"""Worker thread loop."""
while True:
try:
with self._ready_lock:
self._ready += 1
item = self._queue.get()
finally:
with self._ready_lock:
self._ready -= 1
if not item:
return
priority, index, obj, dest = item
try:
self._do_item(obj, dest)
except IOError:
# Retry a few times, lowering the priority.
if (priority & self.INTERNAL_PRIORITY_BITS) < self.RETRIES:
self._add_to_queue(priority + 1, obj, dest)
self._queue.task_done()
continue
# Transfers the exception back. It has maximum priority.
self._step_done((-1, 0, sys.exc_info()))
except:
# Transfers the exception back. It has maximum priority.
self._step_done((-1, 0, sys.exc_info()))
else:
self._step_done((priority, index, obj))
@staticmethod
def get_file_handler(file_or_url):
"""Returns a object to retrieve objects from a remote."""
if re.match(r'^https?://.+$', file_or_url):
file_or_url = file_or_url.rstrip('/') + '/'
def download_file(item, dest):
# TODO(maruel): Reuse HTTP connections. The stdlib doesn't make this
# easy.
source = file_or_url + item
logging.debug('download_file(%s, %s)', source, dest)
urllib.urlretrieve(source, dest)
return download_file
def copy_file(item, dest):
source = os.path.join(file_or_url, item)
logging.debug('copy_file(%s, %s)', source, dest)
shutil.copy(source, dest)
return copy_file
class CachePolicies(object):
def __init__(self, max_cache_size, min_free_space, max_items):
"""
Arguments:
- max_cache_size: Trim if the cache gets larger than this value. If 0, the
cache is effectively a leak.
- min_free_space: Trim if disk free space becomes lower than this value. If
0, it unconditionally fill the disk.
- max_items: Maximum number of items to keep in the cache. If 0, do not
enforce a limit.
"""
self.max_cache_size = max_cache_size
self.min_free_space = min_free_space
self.max_items = max_items
class Cache(object):
"""Stateful LRU cache.
Saves its state as json file.
"""
STATE_FILE = 'state.json'
def __init__(self, cache_dir, remote, policies):
"""
Arguments:
- cache_dir: Directory where to place the cache.
- remote: Remote where to fetch items from.
- policies: cache retention policies.
"""
self.cache_dir = cache_dir
self.remote = remote
self.policies = policies
self.state_file = os.path.join(cache_dir, self.STATE_FILE)
# The tuple(file, size) are kept as an array in a LRU style. E.g.
# self.state[0] is the oldest item.
self.state = []
# A lookup map to speed up searching.
self._lookup = {}
self._dirty = False
# Items currently being fetched. Keep it local to reduce lock contention.
self._pending_queue = set()
# Profiling values.
self._added = []
self._removed = []
self._free_disk = 0
if not os.path.isdir(self.cache_dir):
os.makedirs(self.cache_dir)
if os.path.isfile(self.state_file):
try:
self.state = json.load(open(self.state_file, 'r'))
except (IOError, ValueError), e:
# Too bad. The file will be overwritten and the cache cleared.
logging.error(
'Broken state file %s, ignoring.\n%s' % (self.STATE_FILE, e))
if (not isinstance(self.state, list) or
not all(
isinstance(i, (list, tuple)) and len(i) == 2 for i in self.state)):
# Discard.
self.state = []
self._dirty = True
# Ensure that all files listed in the state still exist and add new ones.
previous = set(filename for filename, _ in self.state)
if len(previous) != len(self.state):
logging.warn('Cache state is corrupted')
self._dirty = True
self.state = []
else:
added = 0
for filename in os.listdir(self.cache_dir):
if filename == self.STATE_FILE:
continue
if filename in previous:
previous.remove(filename)
continue
# An untracked file.
self._dirty = True
if not RE_IS_SHA1.match(filename):
logging.warn('Removing unknown file %s from cache', filename)
os.remove(self.path(filename))
else:
# Insert as the oldest file. It will be deleted eventually if not
# accessed.
self._add(filename, False)
added += 1
if added:
logging.warn('Added back %d unknown files', added)
self.state = [
(filename, size) for filename, size in self.state
if filename not in previous
]
self._update_lookup()
with Profiler('SetupTrimming'):
self.trim()
def __enter__(self):
return self
def __exit__(self, _exc_type, _exec_value, _traceback):
with Profiler('CleanupTrimming'):
self.trim()
logging.info(
'%4d (%7dkb) added', len(self._added), sum(self._added) / 1024)
logging.info(
'%4d (%7dkb) current',
len(self.state),
sum(i[1] for i in self.state) / 1024)
logging.info(
'%4d (%7dkb) removed', len(self._removed), sum(self._removed) / 1024)
logging.info('%7dkb free', self._free_disk / 1024)
def remove_lru_file(self):
"""Removes the last recently used file."""
try:
filename, size = self.state.pop(0)
del self._lookup[filename]
self._removed.append(size)
os.remove(self.path(filename))
self._dirty = True
except OSError as e:
logging.error('Error attempting to delete a file\n%s' % e)
def trim(self):
"""Trims anything we don't know, make sure enough free space exists."""
# Ensure maximum cache size.
if self.policies.max_cache_size and self.state:
while sum(i[1] for i in self.state) > self.policies.max_cache_size:
self.remove_lru_file()
# Ensure maximum number of items in the cache.
if self.policies.max_items and self.state:
while len(self.state) > self.policies.max_items:
self.remove_lru_file()
# Ensure enough free space.
self._free_disk = get_free_space(self.cache_dir)
while (
self.policies.min_free_space and
self.state and
self._free_disk < self.policies.min_free_space):
self.remove_lru_file()
self._free_disk = get_free_space(self.cache_dir)
self.save()
def retrieve(self, priority, item):
"""Retrieves a file from the remote, if not already cached, and adds it to
the cache.
"""
assert not '/' in item
path = self.path(item)
index = self._lookup.get(item)
if index is None:
if item in self._pending_queue:
# Already pending. The same object could be referenced multiple times.
return
self.remote.add_item(priority, item, path)
self._pending_queue.add(item)
else:
if index != len(self.state) - 1:
# Was already in cache. Update it's LRU value by putting it at the end.
self.state.append(self.state.pop(index))
self._dirty = True
self._update_lookup()
def add(self, filepath, obj):
"""Forcibly adds a file to the cache."""
if not obj in self._lookup:
link_file(self.path(obj), filepath, HARDLINK)
self._add(obj, True)
def path(self, item):
"""Returns the path to one item."""
return os.path.join(self.cache_dir, item)
def save(self):
"""Saves the LRU ordering."""
json.dump(self.state, open(self.state_file, 'wb'), separators=(',',':'))
def wait_for(self, items):
"""Starts a loop that waits for at least one of |items| to be retrieved.
Returns the first item retrieved.
"""
# Flush items already present.
for item in items:
if item in self._lookup:
return item
assert all(i in self._pending_queue for i in items), (
items, self._pending_queue)
# Note that:
# len(self._pending_queue) ==
# ( len(self.remote._workers) - self.remote._ready +
# len(self._remote._queue) + len(self._remote.done))
# There is no lock-free way to verify that.
while self._pending_queue:
item = self.remote.get_result()
self._pending_queue.remove(item)
self._add(item, True)
if item in items:
return item
def _add(self, item, at_end):
"""Adds an item in the internal state.
If |at_end| is False, self._lookup becomes inconsistent and
self._update_lookup() must be called.
"""
size = os.stat(self.path(item)).st_size
self._added.append(size)
if at_end:
self.state.append((item, size))
self._lookup[item] = len(self.state) - 1
else:
self.state.insert(0, (item, size))
self._dirty = True
def _update_lookup(self):
self._lookup = dict(
(filename, index) for index, (filename, _) in enumerate(self.state))
class Manifest(object):
"""Represents a single parsed manifest, e.g. a .results file."""
def __init__(self, obj_hash):
"""|obj_hash| is really the sha-1 of the file."""
logging.debug('Manifest(%s)' % obj_hash)
self.obj_hash = obj_hash
# Set once all the left-side of the tree is parsed. 'Tree' here means the
# manifest and all the manifest recursively included by it with 'includes'
# key. The order of each manifest sha-1 in 'includes' is important, as the
# later ones are not processed until the firsts are retrieved and read.
self.can_fetch = False
# Raw data.
self.data = {}
# A Manifest instance, one per object in self.includes.
self.children = []
# Set once the manifest is loaded.
self._manifest_parsed = False
# Set once the files are fetched.
self.files_fetched = False
def load(self, content):
"""Verifies the manifest is valid and loads this object with the json data.
"""
logging.debug('Manifest.load(%s)' % self.obj_hash)
assert not self._manifest_parsed
self.data = load_manifest(content)
self.children = [Manifest(i) for i in self.data.get('includes', [])]
self._manifest_parsed = True
def fetch_files(self, cache, files):
"""Adds files in this manifest not present in files dictionary.
Preemptively request files.
Note that |files| is modified by this function.
"""
assert self.can_fetch
if not self._manifest_parsed or self.files_fetched:
return
logging.debug('fetch_files(%s)' % self.obj_hash)
for filepath, properties in self.data.get('files', {}).iteritems():
# Root manifest has priority on the files being mapped. In particular,
# overriden files must not be fetched.
if filepath not in files:
files[filepath] = properties
if 'sha-1' in properties:
# Preemptively request files.
logging.debug('fetching %s' % filepath)
cache.retrieve(Remote.MED, properties['sha-1'])
self.files_fetched = True
class Settings(object):
"""Results of a completely parsed manifest."""
def __init__(self):
self.command = []
self.files = {}
self.read_only = None
self.relative_cwd = None
# The main manifest.
self.root = None
logging.debug('Settings')
def load(self, cache, root_manifest_hash):
"""Loads the manifest and all the included manifests asynchronously.
It enables support for included manifest. They are processed in strict order
but fetched asynchronously from the cache. This is important so that a file
in an included manifest that is overridden by an embedding manifest is not
fetched neededlessly. The includes are fetched in one pass and the files are
fetched as soon as all the manifests on the left-side of the tree were
fetched.
The prioritization is very important here for nested manifests. 'includes'
have the highest priority and the algorithm is optimized for both deep and
wide manifests. A deep one is a long link of manifest referenced one at a
time by one item in 'includes'. A wide one has a large number of 'includes'
in a single manifest. 'left' is defined as an included manifest earlier in
the 'includes' list. So the order of the elements in 'includes' is
important.
"""
self.root = Manifest(root_manifest_hash)
cache.retrieve(Remote.HIGH, root_manifest_hash)
pending = {root_manifest_hash: self.root}
# Keeps the list of retrieved items to refuse recursive includes.
retrieved = [root_manifest_hash]
def update_self(node):
node.fetch_files(cache, self.files)
# Grabs properties.
if not self.command and node.data.get('command'):
self.command = node.data['command']
if self.read_only is None and node.data.get('read_only') is not None:
self.read_only = node.data['read_only']
if (self.relative_cwd is None and
node.data.get('relative_cwd') is not None):
self.relative_cwd = node.data['relative_cwd']
def traverse_tree(node):
if node.can_fetch:
if not node.files_fetched:
update_self(node)
will_break = False
for i in node.children:
if not i.can_fetch:
if will_break:
break
# Automatically mark the first one as fetcheable.
i.can_fetch = True
will_break = True
traverse_tree(i)
while pending:
item_hash = cache.wait_for(pending)
item = pending.pop(item_hash)
item.load(open(cache.path(item_hash), 'r').read())
if item_hash == root_manifest_hash:
# It's the root item.
item.can_fetch = True
for new_child in item.children:
h = new_child.obj_hash
if h in retrieved:
raise ConfigError('Manifest %s is retrieved recursively' % h)
pending[h] = new_child
cache.retrieve(Remote.HIGH, h)
# Traverse the whole tree to see if files can now be fetched.
traverse_tree(self.root)
def check(n):
return all(check(x) for x in n.children) and n.files_fetched
assert check(self.root)
self.relative_cwd = self.relative_cwd or ''
self.read_only = self.read_only or False
def run_tha_test(manifest_hash, cache_dir, remote, policies):
"""Downloads the dependencies in the cache, hardlinks them into a temporary
directory and runs the executable.
"""
settings = Settings()
with Cache(cache_dir, Remote(remote), policies) as cache:
outdir = make_temp_dir('run_tha_test', cache_dir)
try:
# Initiate all the files download.
with Profiler('GetManifests') as _prof:
# Optionally support local files.
if not RE_IS_SHA1.match(manifest_hash):
# Adds it in the cache. While not strictly necessary, this simplifies
# the rest.
h = hashlib.sha1(open(manifest_hash, 'r').read()).hexdigest()
cache.add(manifest_hash, h)
manifest_hash = h
settings.load(cache, manifest_hash)
if not settings.command:
print >> sys.stderr, 'No command to run'
return 1
with Profiler('GetRest') as _prof:
logging.debug('Creating directories')
# Creates the tree of directories to create.
directories = set(os.path.dirname(f) for f in settings.files)
for item in list(directories):
while item:
directories.add(item)
item = os.path.dirname(item)
for d in sorted(directories):
if d:
os.mkdir(os.path.join(outdir, d))
# Creates the links if necessary.
for filepath, properties in settings.files.iteritems():
if 'link' not in properties:
continue
outfile = os.path.join(outdir, filepath)
os.symlink(properties['link'], outfile)
if 'mode' in properties:
# It's not set on Windows.
os.chmod(outfile, properties['mode'])
# Remaining files to be processed.
# Note that files could still be not be downloaded yet here.
remaining = dict()
for filepath, props in settings.files.iteritems():
if 'sha-1' in props:
remaining.setdefault(props['sha-1'], []).append((filepath, props))
# Do bookkeeping while files are being downloaded in the background.
cwd = os.path.join(outdir, settings.relative_cwd)
if not os.path.isdir(cwd):
os.makedirs(cwd)
cmd = settings.command[:]
# Ensure paths are correctly separated on windows.
cmd[0] = cmd[0].replace('/', os.path.sep)
cmd = fix_python_path(cmd)
# Now block on the remaining files to be downloaded and mapped.
while remaining:
obj = cache.wait_for(remaining)
for filepath, properties in remaining.pop(obj):
outfile = os.path.join(outdir, filepath)
link_file(outfile, cache.path(obj), HARDLINK)
if 'mode' in properties:
# It's not set on Windows.
os.chmod(outfile, properties['mode'])
if settings.read_only:
make_writable(outdir, True)
logging.info('Running %s, cwd=%s' % (cmd, cwd))
try:
with Profiler('RunTest') as _prof:
return subprocess.call(cmd, cwd=cwd)
except OSError:
print >> sys.stderr, 'Failed to run %s; cwd=%s' % (cmd, cwd)
raise
finally:
rmtree(outdir)
def main():
parser = optparse.OptionParser(
usage='%prog <options>', description=sys.modules[__name__].__doc__)
parser.add_option(
'-v', '--verbose', action='count', default=0, help='Use multiple times')
parser.add_option('--no-run', action='store_true', help='Skip the run part')
group = optparse.OptionGroup(parser, 'Data source')
group.add_option(
'-m', '--manifest',
metavar='FILE',
help='File/url describing what to map or run')
group.add_option(
'-H', '--hash',
help='Hash of the manifest to grab from the hash table')
parser.add_option_group(group)
group.add_option(
'-r', '--remote', metavar='URL', help='Remote where to get the items')
group = optparse.OptionGroup(parser, 'Cache management')
group.add_option(
'--cache',
default='cache',
metavar='DIR',
help='Cache directory, default=%default')
group.add_option(
'--max-cache-size',
type='int',
metavar='NNN',
default=20*1024*1024*1024,
help='Trim if the cache gets larger than this value, default=%default')
group.add_option(
'--min-free-space',
type='int',
metavar='NNN',
default=1*1024*1024*1024,
help='Trim if disk free space becomes lower than this value, '
'default=%default')
group.add_option(
'--max-items',
type='int',
metavar='NNN',
default=100000,
help='Trim if more than this number of items are in the cache '
'default=%default')
parser.add_option_group(group)
options, args = parser.parse_args()
level = [logging.ERROR, logging.INFO, logging.DEBUG][min(2, options.verbose)]
logging.basicConfig(
level=level,
format='%(levelname)5s %(module)15s(%(lineno)3d): %(message)s')
if bool(options.manifest) == bool(options.hash):
parser.error('One and only one of --manifest or --hash is required.')
if not options.remote:
parser.error('--remote is required.')
if args:
parser.error('Unsupported args %s' % ' '.join(args))
policies = CachePolicies(
options.max_cache_size, options.min_free_space, options.max_items)
try:
return run_tha_test(
options.manifest or options.hash,
os.path.abspath(options.cache),
options.remote,
policies)
except (ConfigError, MappingError), e:
print >> sys.stderr, str(e)
return 1
if __name__ == '__main__':
sys.exit(main())