blob: e2cd52c550d33f5c756c8aa9a9d2edf6541f9192 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2012 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Runs a command with optional isolated input/output.
run_isolated takes cares of setting up a temporary environment, running a
command, and tearing it down.
It handles downloading and uploading isolated files, mapping CIPD packages and
reusing stateful named caches.
The isolated files, CIPD packages and named caches are kept as a global LRU
cache.
Any ${EXECUTABLE_SUFFIX} on the command line or the environment variables passed
with the --env option will be replaced with ".exe" string on Windows and "" on
other platforms.
Any ${ISOLATED_OUTDIR} on the command line or the environment variables passed
with the --env option will be replaced by the location of a temporary directory
upon execution of the command specified in the .isolated file. All content
written to this directory will be uploaded upon termination and the .isolated
file describing this directory will be printed to stdout.
Any ${SWARMING_BOT_FILE} on the command line or the environment variables passed
with the --env option will be replaced by the value of the --bot-file parameter.
This file is used by a swarming bot to communicate state of the host to tasks.
It is written to by the swarming bot's on_before_task() hook in the swarming
server's custom bot_config.py.
See
https://chromium.googlesource.com/infra/luci/luci-py.git/+/master/appengine/swarming/doc/Magic-Values.md
for all the variables.
See
https://chromium.googlesource.com/infra/luci/luci-py.git/+/master/appengine/swarming/swarming_bot/config/bot_config.py
for more information about bot_config.py.
"""
__version__ = '1.0.0'
import argparse
import base64
import collections
import contextlib
import errno
import json
import logging
import optparse
import os
import re
import sys
import tempfile
import time
from utils import tools
tools.force_local_third_party()
# third_party/
from depot_tools import fix_encoding
# pylint: disable=ungrouped-imports
import auth
import cipd
import isolate_storage
import isolateserver
import local_caching
from libs import luci_context
from utils import file_path
from utils import fs
from utils import large
from utils import logging_utils
from utils import on_error
from utils import subprocess42
# Magic variables that can be found in the isolate task command line.
ISOLATED_OUTDIR_PARAMETER = '${ISOLATED_OUTDIR}'
EXECUTABLE_SUFFIX_PARAMETER = '${EXECUTABLE_SUFFIX}'
SWARMING_BOT_FILE_PARAMETER = '${SWARMING_BOT_FILE}'
# The name of the log file to use.
RUN_ISOLATED_LOG_FILE = 'run_isolated.log'
# The name of the log to use for the run_test_cases.py command
RUN_TEST_CASES_LOG = 'run_test_cases.log'
# Use short names for temporary directories. This is driven by Windows, which
# imposes a relatively short maximum path length of 260 characters, often
# referred to as MAX_PATH. It is relatively easy to create files with longer
# path length. A use case is with recursive dependency trees like npm packages.
#
# It is recommended to start the script with a `root_dir` as short as
# possible.
# - ir stands for isolated_run
# - io stands for isolated_out
# - it stands for isolated_tmp
ISOLATED_RUN_DIR = u'ir'
ISOLATED_OUT_DIR = u'io'
ISOLATED_TMP_DIR = u'it'
# Keep synced with task_request.py
CACHE_NAME_RE = re.compile(r'^[a-z0-9_]{1,4096}$')
OUTLIVING_ZOMBIE_MSG = """\
*** Swarming tried multiple times to delete the %s directory and failed ***
*** Hard failing the task ***
Swarming detected that your testing script ran an executable, which may have
started a child executable, and the main script returned early, leaving the
children executables playing around unguided.
You don't want to leave children processes outliving the task on the Swarming
bot, do you? The Swarming bot doesn't.
How to fix?
- For any process that starts children processes, make sure all children
processes terminated properly before each parent process exits. This is
especially important in very deep process trees.
- This must be done properly both in normal successful task and in case of
task failure. Cleanup is very important.
- The Swarming bot sends a SIGTERM in case of timeout.
- You have %s seconds to comply after the signal was sent to the process
before the process is forcibly killed.
- To achieve not leaking children processes in case of signals on timeout, you
MUST handle signals in each executable / python script and propagate them to
children processes.
- When your test script (python or binary) receives a signal like SIGTERM or
CTRL_BREAK_EVENT on Windows), send it to all children processes and wait for
them to terminate before quitting.
See
https://chromium.googlesource.com/infra/luci/luci-py.git/+/master/appengine/swarming/doc/Bot.md#Graceful-termination_aka-the-SIGTERM-and-SIGKILL-dance
for more information.
*** May the SIGKILL force be with you ***
"""
# Currently hardcoded. Eventually could be exposed as a flag once there's value.
# 3 weeks
MAX_AGE_SECS = 21*24*60*60
TaskData = collections.namedtuple(
'TaskData',
[
# List of strings; the command line to use, independent of what was
# specified in the isolated file.
'command',
# Relative directory to start command into.
'relative_cwd',
# List of strings; the arguments to add to the command specified in the
# isolated file.
'extra_args',
# Hash of the .isolated file that must be retrieved to recreate the tree
# of files to run the target executable. The command specified in the
# .isolated is executed. Mutually exclusive with command argument.
'isolated_hash',
# isolateserver.Storage instance to retrieve remote objects. This object
# has a reference to an isolateserver.StorageApi, which does the actual
# I/O.
'storage',
# isolateserver.LocalCache instance to keep from retrieving the same
# objects constantly by caching the objects retrieved. Can be on-disk or
# in-memory.
'isolate_cache',
# List of paths relative to root_dir to put into the output isolated
# bundle upon task completion (see link_outputs_to_outdir).
'outputs',
# Function (run_dir) => context manager that installs named caches into
# |run_dir|.
'install_named_caches',
# If True, the temporary directory will be deliberately leaked for later
# examination.
'leak_temp_dir',
# Path to the directory to use to create the temporary directory. If not
# specified, a random temporary directory is created.
'root_dir',
# Kills the process if it lasts more than this amount of seconds.
'hard_timeout',
# Number of seconds to wait between SIGTERM and SIGKILL.
'grace_period',
# Path to a file with bot state, used in place of ${SWARMING_BOT_FILE}
# task command line argument.
'bot_file',
# Logical account to switch LUCI_CONTEXT into.
'switch_to_account',
# Context manager dir => CipdInfo, see install_client_and_packages.
'install_packages_fn',
# Create tree with symlinks instead of hardlinks.
'use_symlinks',
# Environment variables to set.
'env',
# Environment variables to mutate with relative directories.
# Example: {"ENV_KEY": ['relative', 'paths', 'to', 'prepend']}
'env_prefix',
# Lowers the task process priority.
'lower_priority',
# subprocess42.Containment instance. Can be None.
'containment',
])
def _to_str(s):
"""Downgrades a unicode instance to str. Pass str through as-is."""
if isinstance(s, str):
return s
# This is technically incorrect, especially on Windows. In theory
# sys.getfilesystemencoding() should be used to use the right 'ANSI code
# page' on Windows, but that causes other problems, as the character set
# is very limited.
return s.encode('utf-8')
def _to_unicode(s):
"""Upgrades a str instance to unicode. Pass unicode through as-is."""
if isinstance(s, unicode) or s is None:
return s
return s.decode('utf-8')
def make_temp_dir(prefix, root_dir):
"""Returns a new unique temporary directory."""
return unicode(tempfile.mkdtemp(prefix=prefix, dir=root_dir))
def change_tree_read_only(rootdir, read_only):
"""Changes the tree read-only bits according to the read_only specification.
The flag can be 0, 1 or 2, which will affect the possibility to modify files
and create or delete files.
"""
if read_only == 2:
# Files and directories (except on Windows) are marked read only. This
# inhibits modifying, creating or deleting files in the test directory,
# except on Windows where creating and deleting files is still possible.
file_path.make_tree_read_only(rootdir)
elif read_only == 1:
# Files are marked read only but not the directories. This inhibits
# modifying files but creating or deleting files is still possible.
file_path.make_tree_files_read_only(rootdir)
elif read_only in (0, None):
# Anything can be modified.
# TODO(maruel): This is currently dangerous as long as
# DiskContentAddressedCache.touch() is not yet changed to verify the hash of
# the content of the files it is looking at, so that if a test modifies an
# input file, the file must be deleted.
file_path.make_tree_writeable(rootdir)
else:
raise ValueError(
'change_tree_read_only(%s, %s): Unknown flag %s' %
(rootdir, read_only, read_only))
@contextlib.contextmanager
def set_luci_context_account(account, tmp_dir):
"""Sets LUCI_CONTEXT account to be used by the task.
If 'account' is None or '', does nothing at all. This happens when
run_isolated.py is called without '--switch-to-account' flag. In this case,
if run_isolated.py is running in some LUCI_CONTEXT environment, the task will
just inherit whatever account is already set. This may happen is users invoke
run_isolated.py explicitly from their code.
If the requested account is not defined in the context, switches to
non-authenticated access. This happens for Swarming tasks that don't use
'task' service accounts.
If not using LUCI_CONTEXT-based auth, does nothing.
If already running as requested account, does nothing.
"""
if not account:
# Not actually switching.
yield
return
local_auth = luci_context.read('local_auth')
if not local_auth:
# Not using LUCI_CONTEXT auth at all.
yield
return
# See LUCI_CONTEXT.md for the format of 'local_auth'.
if local_auth.get('default_account_id') == account:
# Already set, no need to switch.
yield
return
available = {a['id'] for a in local_auth.get('accounts') or []}
if account in available:
logging.info('Switching default LUCI_CONTEXT account to %r', account)
local_auth['default_account_id'] = account
else:
logging.warning(
'Requested LUCI_CONTEXT account %r is not available (have only %r), '
'disabling authentication', account, sorted(available))
local_auth.pop('default_account_id', None)
with luci_context.write(_tmpdir=tmp_dir, local_auth=local_auth):
yield
def process_command(command, out_dir, bot_file):
"""Replaces parameters in a command line.
Raises:
ValueError if a parameter is requested in |command| but its value is not
provided.
"""
return [replace_parameters(arg, out_dir, bot_file) for arg in command]
def replace_parameters(arg, out_dir, bot_file):
"""Replaces parameter tokens with appropriate values in a string.
Raises:
ValueError if a parameter is requested in |arg| but its value is not
provided.
"""
arg = arg.replace(EXECUTABLE_SUFFIX_PARAMETER, cipd.EXECUTABLE_SUFFIX)
replace_slash = False
if ISOLATED_OUTDIR_PARAMETER in arg:
if not out_dir:
raise ValueError(
'output directory is requested in command or env var, but not '
'provided; please specify one')
arg = arg.replace(ISOLATED_OUTDIR_PARAMETER, out_dir)
replace_slash = True
if SWARMING_BOT_FILE_PARAMETER in arg:
if bot_file:
arg = arg.replace(SWARMING_BOT_FILE_PARAMETER, bot_file)
replace_slash = True
else:
logging.warning('SWARMING_BOT_FILE_PARAMETER found in command or env '
'var, but no bot_file specified. Leaving parameter '
'unchanged.')
if replace_slash:
# Replace slashes only if parameters are present
# because of arguments like '${ISOLATED_OUTDIR}/foo/bar'
arg = arg.replace('/', os.sep)
return arg
def get_command_env(tmp_dir, cipd_info, run_dir, env, env_prefixes, out_dir,
bot_file):
"""Returns full OS environment to run a command in.
Sets up TEMP, puts directory with cipd binary in front of PATH, exposes
CIPD_CACHE_DIR env var, and installs all env_prefixes.
Args:
tmp_dir: temp directory.
cipd_info: CipdInfo object is cipd client is used, None if not.
run_dir: The root directory the isolated tree is mapped in.
env: environment variables to use
env_prefixes: {"ENV_KEY": ['cwd', 'relative', 'paths', 'to', 'prepend']}
out_dir: Isolated output directory. Required to be != None if any of the
env vars contain ISOLATED_OUTDIR_PARAMETER.
bot_file: Required to be != None if any of the env vars contain
SWARMING_BOT_FILE_PARAMETER.
"""
out = os.environ.copy()
for k, v in env.iteritems():
if not v:
out.pop(k, None)
else:
out[k] = replace_parameters(v, out_dir, bot_file)
if cipd_info:
bin_dir = os.path.dirname(cipd_info.client.binary_path)
out['PATH'] = '%s%s%s' % (_to_str(bin_dir), os.pathsep, out['PATH'])
out['CIPD_CACHE_DIR'] = _to_str(cipd_info.cache_dir)
for key, paths in env_prefixes.iteritems():
assert isinstance(paths, list), paths
paths = [os.path.normpath(os.path.join(run_dir, p)) for p in paths]
cur = out.get(key)
if cur:
paths.append(cur)
out[key] = _to_str(os.path.pathsep.join(paths))
tmp_dir = _to_str(tmp_dir)
# pylint: disable=line-too-long
# * python respects $TMPDIR, $TEMP, and $TMP in this order, regardless of
# platform. So $TMPDIR must be set on all platforms.
# https://github.com/python/cpython/blob/2.7/Lib/tempfile.py#L155
out['TMPDIR'] = tmp_dir
if sys.platform == 'win32':
# * chromium's base utils uses GetTempPath().
# https://cs.chromium.org/chromium/src/base/files/file_util_win.cc?q=GetTempPath
# * Go uses GetTempPath().
# * GetTempDir() uses %TMP%, then %TEMP%, then other stuff. So %TMP% must be
# set.
# https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-gettemppathw
out['TMP'] = tmp_dir
# https://blogs.msdn.microsoft.com/oldnewthing/20150417-00/?p=44213
out['TEMP'] = tmp_dir
elif sys.platform == 'darwin':
# * Chromium uses an hack on macOS before calling into
# NSTemporaryDirectory().
# https://cs.chromium.org/chromium/src/base/files/file_util_mac.mm?q=GetTempDir
# https://developer.apple.com/documentation/foundation/1409211-nstemporarydirectory
out['MAC_CHROMIUM_TMPDIR'] = tmp_dir
else:
# TMPDIR is specified as the POSIX standard envvar for the temp directory.
# http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap08.html
# * mktemp on linux respects $TMPDIR.
# * Chromium respects $TMPDIR on linux.
# https://cs.chromium.org/chromium/src/base/files/file_util_posix.cc?q=GetTempDir
# * Go uses $TMPDIR.
# https://go.googlesource.com/go/+/go1.10.3/src/os/file_unix.go#307
pass
return out
def run_command(
command, cwd, env, hard_timeout, grace_period, lower_priority, containment):
"""Runs the command.
Returns:
tuple(process exit code, bool if had a hard timeout)
"""
logging.info(
'run_command(%s, %s, %s, %s, %s, %s)',
command, cwd, hard_timeout, grace_period, lower_priority, containment)
exit_code = None
had_hard_timeout = False
with tools.Profiler('RunTest'):
proc = None
had_signal = []
try:
# TODO(maruel): This code is imperfect. It doesn't handle well signals
# during the download phase and there's short windows were things can go
# wrong.
def handler(signum, _frame):
if proc and not had_signal:
logging.info('Received signal %d', signum)
had_signal.append(True)
raise subprocess42.TimeoutExpired(command, None)
proc = subprocess42.Popen(
command, cwd=cwd, env=env, detached=True, close_fds=True,
lower_priority=lower_priority, containment=containment)
with subprocess42.set_signal_handler(subprocess42.STOP_SIGNALS, handler):
try:
exit_code = proc.wait(hard_timeout or None)
except subprocess42.TimeoutExpired:
if not had_signal:
logging.warning('Hard timeout')
had_hard_timeout = True
logging.warning('Sending SIGTERM')
proc.terminate()
# Ignore signals in grace period. Forcibly give the grace period to the
# child process.
if exit_code is None:
ignore = lambda *_: None
with subprocess42.set_signal_handler(subprocess42.STOP_SIGNALS, ignore):
try:
exit_code = proc.wait(grace_period or None)
except subprocess42.TimeoutExpired:
# Now kill for real. The user can distinguish between the
# following states:
# - signal but process exited within grace period,
# hard_timed_out will be set but the process exit code will be
# script provided.
# - processed exited late, exit code will be -9 on posix.
logging.warning('Grace exhausted; sending SIGKILL')
proc.kill()
logging.info('Waiting for process exit')
exit_code = proc.wait()
except OSError:
# This is not considered to be an internal error. The executable simply
# does not exit.
sys.stderr.write(
'<The executable does not exist or a dependent library is missing>\n'
'<Check for missing .so/.dll in the .isolate or GN file>\n'
'<Command: %s>\n' % command)
if os.environ.get('SWARMING_TASK_ID'):
# Give an additional hint when running as a swarming task.
sys.stderr.write(
'<See the task\'s page for commands to help diagnose this issue '
'by reproducing the task locally>\n')
exit_code = 1
logging.info(
'Command finished with exit code %d (%s)',
exit_code, hex(0xffffffff & exit_code))
return exit_code, had_hard_timeout
def fetch_and_map(isolated_hash, storage, cache, outdir, use_symlinks):
"""Fetches an isolated tree, create the tree and returns (bundle, stats)."""
start = time.time()
bundle = isolateserver.fetch_isolated(
isolated_hash=isolated_hash,
storage=storage,
cache=cache,
outdir=outdir,
use_symlinks=use_symlinks)
return bundle, {
'duration': time.time() - start,
'items_cold': base64.b64encode(large.pack(sorted(cache.added))),
'items_hot': base64.b64encode(
large.pack(sorted(set(cache.used) - set(cache.added)))),
}
def link_outputs_to_outdir(run_dir, out_dir, outputs):
"""Links any named outputs to out_dir so they can be uploaded.
Raises an error if the file already exists in that directory.
"""
if not outputs:
return
isolateserver.create_directories(out_dir, outputs)
for o in outputs:
copy_recursively(os.path.join(run_dir, o), os.path.join(out_dir, o))
def copy_recursively(src, dst):
"""Efficiently copies a file or directory from src_dir to dst_dir.
`item` may be a file, directory, or a symlink to a file or directory.
All symlinks are replaced with their targets, so the resulting
directory structure in dst_dir will never have any symlinks.
To increase speed, copy_recursively hardlinks individual files into the
(newly created) directory structure if possible, unlike Python's
shutil.copytree().
"""
orig_src = src
try:
# Replace symlinks with their final target.
while fs.islink(src):
res = fs.readlink(src)
src = os.path.join(os.path.dirname(src), res)
# TODO(sadafm): Explicitly handle cyclic symlinks.
# Note that fs.isfile (which is a wrapper around os.path.isfile) throws
# an exception if src does not exist. A warning will be logged in that case.
if fs.isfile(src):
file_path.link_file(dst, src, file_path.HARDLINK_WITH_FALLBACK)
return
if not fs.exists(dst):
os.makedirs(dst)
for child in fs.listdir(src):
copy_recursively(os.path.join(src, child), os.path.join(dst, child))
except OSError as e:
if e.errno == errno.ENOENT:
logging.warning('Path %s does not exist or %s is a broken symlink',
src, orig_src)
else:
logging.info("Couldn't collect output file %s: %s", src, e)
def delete_and_upload(storage, out_dir, leak_temp_dir):
"""Deletes the temporary run directory and uploads results back.
Returns:
tuple(outputs_ref, success, stats)
- outputs_ref: a dict referring to the results archived back to the isolated
server, if applicable.
- success: False if something occurred that means that the task must
forcibly be considered a failure, e.g. zombie processes were left
behind.
- stats: uploading stats.
"""
# Upload out_dir and generate a .isolated file out of this directory. It is
# only done if files were written in the directory.
outputs_ref = None
cold = []
hot = []
start = time.time()
if fs.isdir(out_dir) and fs.listdir(out_dir):
with tools.Profiler('ArchiveOutput'):
try:
results, f_cold, f_hot = isolateserver.archive_files_to_storage(
storage, [out_dir], None)
outputs_ref = {
'isolated': results.values()[0],
'isolatedserver': storage.server_ref.url,
'namespace': storage.server_ref.namespace,
}
cold = sorted(i.size for i in f_cold)
hot = sorted(i.size for i in f_hot)
except isolateserver.Aborted:
# This happens when a signal SIGTERM was received while uploading data.
# There is 2 causes:
# - The task was too slow and was about to be killed anyway due to
# exceeding the hard timeout.
# - The amount of data uploaded back is very large and took too much
# time to archive.
sys.stderr.write('Received SIGTERM while uploading')
# Re-raise, so it will be treated as an internal failure.
raise
success = False
try:
if (not leak_temp_dir and fs.isdir(out_dir) and
not file_path.rmtree(out_dir)):
logging.error('Had difficulties removing out_dir %s', out_dir)
else:
success = True
except OSError as e:
# When this happens, it means there's a process error.
logging.exception('Had difficulties removing out_dir %s: %s', out_dir, e)
stats = {
'duration': time.time() - start,
'items_cold': base64.b64encode(large.pack(cold)),
'items_hot': base64.b64encode(large.pack(hot)),
}
return outputs_ref, success, stats
def map_and_run(data, constant_run_path):
"""Runs a command with optional isolated input/output.
Arguments:
- data: TaskData instance.
- constant_run_path: TODO
Returns metadata about the result.
"""
result = {
'duration': None,
'exit_code': None,
'had_hard_timeout': False,
'internal_failure': 'run_isolated did not complete properly',
'stats': {
'isolated': {
#'cipd': {
# 'duration': 0.,
# 'get_client_duration': 0.,
#},
'download': {
#'duration': 0.,
'initial_number_items': len(data.isolate_cache),
'initial_size': data.isolate_cache.total_size,
#'items_cold': '<large.pack()>',
#'items_hot': '<large.pack()>',
},
#'upload': {
# 'duration': 0.,
# 'items_cold': '<large.pack()>',
# 'items_hot': '<large.pack()>',
#},
},
},
#'cipd_pins': {
# 'packages': [
# {'package_name': ..., 'version': ..., 'path': ...},
# ...
# ],
# 'client_package': {'package_name': ..., 'version': ...},
#},
'outputs_ref': None,
'version': 5,
}
if data.root_dir:
file_path.ensure_tree(data.root_dir, 0o700)
elif data.isolate_cache.cache_dir:
data = data._replace(
root_dir=os.path.dirname(data.isolate_cache.cache_dir))
# See comment for these constants.
# If root_dir is not specified, it is not constant.
# TODO(maruel): This is not obvious. Change this to become an error once we
# make the constant_run_path an exposed flag.
if constant_run_path and data.root_dir:
run_dir = os.path.join(data.root_dir, ISOLATED_RUN_DIR)
if os.path.isdir(run_dir):
file_path.rmtree(run_dir)
os.mkdir(run_dir, 0o700)
else:
run_dir = make_temp_dir(ISOLATED_RUN_DIR, data.root_dir)
# storage should be normally set but don't crash if it is not. This can happen
# as Swarming task can run without an isolate server.
out_dir = make_temp_dir(
ISOLATED_OUT_DIR, data.root_dir) if data.storage else None
tmp_dir = make_temp_dir(ISOLATED_TMP_DIR, data.root_dir)
cwd = run_dir
if data.relative_cwd:
cwd = os.path.normpath(os.path.join(cwd, data.relative_cwd))
command = data.command
try:
with data.install_packages_fn(run_dir) as cipd_info:
if cipd_info:
result['stats']['cipd'] = cipd_info.stats
result['cipd_pins'] = cipd_info.pins
if data.isolated_hash:
isolated_stats = result['stats'].setdefault('isolated', {})
bundle, stats = fetch_and_map(
isolated_hash=data.isolated_hash,
storage=data.storage,
cache=data.isolate_cache,
outdir=run_dir,
use_symlinks=data.use_symlinks)
isolated_stats['download'].update(stats)
change_tree_read_only(run_dir, bundle.read_only)
# Inject the command
if not command and bundle.command:
command = bundle.command + data.extra_args
# Only set the relative directory if the isolated file specified a
# command, and no raw command was specified.
if bundle.relative_cwd:
cwd = os.path.normpath(os.path.join(cwd, bundle.relative_cwd))
if not command:
# Handle this as a task failure, not an internal failure.
sys.stderr.write(
'<No command was specified!>\n'
'<Please secify a command when triggering your Swarming task>\n')
result['exit_code'] = 1
return result
if not cwd.startswith(run_dir):
# Handle this as a task failure, not an internal failure. This is a
# 'last chance' way to gate against directory escape.
sys.stderr.write('<Relative CWD is outside of run directory!>\n')
result['exit_code'] = 1
return result
if not os.path.isdir(cwd):
# Accepts relative_cwd that does not exist.
os.makedirs(cwd, 0o700)
# If we have an explicit list of files to return, make sure their
# directories exist now.
if data.storage and data.outputs:
isolateserver.create_directories(run_dir, data.outputs)
with data.install_named_caches(run_dir):
sys.stdout.flush()
start = time.time()
try:
# Need to switch the default account before 'get_command_env' call,
# so it can grab correct value of LUCI_CONTEXT env var.
with set_luci_context_account(data.switch_to_account, tmp_dir):
env = get_command_env(
tmp_dir, cipd_info, run_dir, data.env, data.env_prefix, out_dir,
data.bot_file)
command = tools.fix_python_cmd(command, env)
command = process_command(command, out_dir, data.bot_file)
file_path.ensure_command_has_abs_path(command, cwd)
result['exit_code'], result['had_hard_timeout'] = run_command(
command, cwd, env, data.hard_timeout, data.grace_period,
data.lower_priority, data.containment)
finally:
result['duration'] = max(time.time() - start, 0)
# We successfully ran the command, set internal_failure back to
# None (even if the command failed, it's not an internal error).
result['internal_failure'] = None
except Exception as e:
# An internal error occurred. Report accordingly so the swarming task will
# be retried automatically.
logging.exception('internal failure: %s', e)
result['internal_failure'] = str(e)
on_error.report(None)
# Clean up
finally:
try:
# Try to link files to the output directory, if specified.
if out_dir:
link_outputs_to_outdir(run_dir, out_dir, data.outputs)
success = False
if data.leak_temp_dir:
success = True
logging.warning(
'Deliberately leaking %s for later examination', run_dir)
else:
# On Windows rmtree(run_dir) call above has a synchronization effect: it
# finishes only when all task child processes terminate (since a running
# process locks *.exe file). Examine out_dir only after that call
# completes (since child processes may write to out_dir too and we need
# to wait for them to finish).
if fs.isdir(run_dir):
try:
success = file_path.rmtree(run_dir)
except OSError as e:
logging.error('rmtree(%r) failed: %s', run_dir, e)
success = False
if not success:
sys.stderr.write(OUTLIVING_ZOMBIE_MSG % ('run', data.grace_period))
if result['exit_code'] == 0:
result['exit_code'] = 1
if fs.isdir(tmp_dir):
try:
success = file_path.rmtree(tmp_dir)
except OSError as e:
logging.error('rmtree(%r) failed: %s', tmp_dir, e)
success = False
if not success:
sys.stderr.write(OUTLIVING_ZOMBIE_MSG % ('temp', data.grace_period))
if result['exit_code'] == 0:
result['exit_code'] = 1
# This deletes out_dir if leak_temp_dir is not set.
if out_dir:
isolated_stats = result['stats'].setdefault('isolated', {})
result['outputs_ref'], success, isolated_stats['upload'] = (
delete_and_upload(data.storage, out_dir, data.leak_temp_dir))
if not success and result['exit_code'] == 0:
result['exit_code'] = 1
except Exception as e:
# Swallow any exception in the main finally clause.
if out_dir:
logging.exception('Leaking out_dir %s: %s', out_dir, e)
result['internal_failure'] = str(e)
return result
def run_tha_test(data, result_json):
"""Runs an executable and records execution metadata.
If isolated_hash is specified, downloads the dependencies in the cache,
hardlinks them into a temporary directory and runs the command specified in
the .isolated.
A temporary directory is created to hold the output files. The content inside
this directory will be uploaded back to |storage| packaged as a .isolated
file.
Arguments:
- data: TaskData instance.
- result_json: File path to dump result metadata into. If set, the process
exit code is always 0 unless an internal error occurred.
Returns:
Process exit code that should be used.
"""
if result_json:
# Write a json output file right away in case we get killed.
result = {
'exit_code': None,
'had_hard_timeout': False,
'internal_failure': 'Was terminated before completion',
'outputs_ref': None,
'version': 5,
}
tools.write_json(result_json, result, dense=True)
# run_isolated exit code. Depends on if result_json is used or not.
result = map_and_run(data, True)
logging.info('Result:\n%s', tools.format_json(result, dense=True))
if result_json:
# We've found tests to delete 'work' when quitting, causing an exception
# here. Try to recreate the directory if necessary.
file_path.ensure_tree(os.path.dirname(result_json))
tools.write_json(result_json, result, dense=True)
# Only return 1 if there was an internal error.
return int(bool(result['internal_failure']))
# Marshall into old-style inline output.
if result['outputs_ref']:
# pylint: disable=unsubscriptable-object
data = {
'hash': result['outputs_ref']['isolated'],
'namespace': result['outputs_ref']['namespace'],
'storage': result['outputs_ref']['isolatedserver'],
}
sys.stdout.flush()
print(
'[run_isolated_out_hack]%s[/run_isolated_out_hack]' %
tools.format_json(data, dense=True))
sys.stdout.flush()
return result['exit_code'] or int(bool(result['internal_failure']))
# Yielded by 'install_client_and_packages'.
CipdInfo = collections.namedtuple('CipdInfo', [
'client', # cipd.CipdClient object
'cache_dir', # absolute path to bot-global cipd tag and instance cache
'stats', # dict with stats to return to the server
'pins', # dict with installed cipd pins to return to the server
])
@contextlib.contextmanager
def noop_install_packages(_run_dir):
"""Placeholder for 'install_client_and_packages' if cipd is disabled."""
yield None
def _install_packages(run_dir, cipd_cache_dir, client, packages, timeout):
"""Calls 'cipd ensure' for packages.
Args:
run_dir (str): root of installation.
cipd_cache_dir (str): the directory to use for the cipd package cache.
client (CipdClient): the cipd client to use
packages: packages to install, list [(path, package_name, version), ...].
timeout: max duration in seconds that this function can take.
Returns: list of pinned packages. Looks like [
{
'path': 'subdirectory',
'package_name': 'resolved/package/name',
'version': 'deadbeef...',
},
...
]
"""
package_pins = [None]*len(packages)
def insert_pin(path, name, version, idx):
package_pins[idx] = {
'package_name': name,
# swarming deals with 'root' as '.'
'path': path or '.',
'version': version,
}
by_path = collections.defaultdict(list)
for i, (path, name, version) in enumerate(packages):
# cipd deals with 'root' as ''
if path == '.':
path = ''
by_path[path].append((name, version, i))
pins = client.ensure(
run_dir,
{
subdir: [(name, vers) for name, vers, _ in pkgs]
for subdir, pkgs in by_path.iteritems()
},
cache_dir=cipd_cache_dir,
timeout=timeout,
)
for subdir, pin_list in sorted(pins.iteritems()):
this_subdir = by_path[subdir]
for i, (name, version) in enumerate(pin_list):
insert_pin(subdir, name, version, this_subdir[i][2])
assert None not in package_pins, (packages, pins, package_pins)
return package_pins
@contextlib.contextmanager
def install_client_and_packages(
run_dir, packages, service_url, client_package_name,
client_version, cache_dir, timeout=None):
"""Bootstraps CIPD client and installs CIPD packages.
Yields CipdClient, stats, client info and pins (as single CipdInfo object).
Pins and the CIPD client info are in the form of:
[
{
"path": path, "package_name": package_name, "version": version,
},
...
]
(the CIPD client info is a single dictionary instead of a list)
such that they correspond 1:1 to all input package arguments from the command
line. These dictionaries make their all the way back to swarming, where they
become the arguments of CipdPackage.
If 'packages' list is empty, will bootstrap CIPD client, but won't install
any packages.
The bootstrapped client (regardless whether 'packages' list is empty or not),
will be made available to the task via $PATH.
Args:
run_dir (str): root of installation.
packages: packages to install, list [(path, package_name, version), ...].
service_url (str): CIPD server url, e.g.
"https://chrome-infra-packages.appspot.com."
client_package_name (str): CIPD package name of CIPD client.
client_version (str): Version of CIPD client.
cache_dir (str): where to keep cache of cipd clients, packages and tags.
timeout: max duration in seconds that this function can take.
"""
assert cache_dir
timeoutfn = tools.sliding_timeout(timeout)
start = time.time()
cache_dir = os.path.abspath(cache_dir)
cipd_cache_dir = os.path.join(cache_dir, 'cache') # tag and instance caches
run_dir = os.path.abspath(run_dir)
packages = packages or []
get_client_start = time.time()
client_manager = cipd.get_client(
service_url, client_package_name, client_version, cache_dir,
timeout=timeoutfn())
with client_manager as client:
get_client_duration = time.time() - get_client_start
package_pins = []
if packages:
package_pins = _install_packages(
run_dir, cipd_cache_dir, client, packages, timeoutfn())
file_path.make_tree_files_read_only(run_dir)
total_duration = time.time() - start
logging.info(
'Installing CIPD client and packages took %d seconds', total_duration)
yield CipdInfo(
client=client,
cache_dir=cipd_cache_dir,
stats={
'duration': total_duration,
'get_client_duration': get_client_duration,
},
pins={
'client_package': {
'package_name': client.package_name,
'version': client.instance_id,
},
'packages': package_pins,
})
def create_option_parser():
parser = logging_utils.OptionParserWithLogging(
usage='%prog <options> [command to run or extra args]',
version=__version__,
log_file=RUN_ISOLATED_LOG_FILE)
parser.add_option(
'--clean', action='store_true',
help='Cleans the cache, trimming it necessary and remove corrupted items '
'and returns without executing anything; use with -v to know what '
'was done')
parser.add_option(
'--use-symlinks', action='store_true',
help='Use symlinks instead of hardlinks')
parser.add_option(
'--json',
help='dump output metadata to json file. When used, run_isolated returns '
'non-zero only on internal failure')
parser.add_option(
'--hard-timeout', type='float', help='Enforce hard timeout in execution')
parser.add_option(
'--grace-period', type='float',
help='Grace period between SIGTERM and SIGKILL')
parser.add_option(
'--raw-cmd', action='store_true',
help='Ignore the isolated command, use the one supplied at the command '
'line')
parser.add_option(
'--relative-cwd',
help='Ignore the isolated \'relative_cwd\' and use this one instead; '
'requires --raw-cmd')
parser.add_option(
'--env', default=[], action='append',
help='Environment variables to set for the child process')
parser.add_option(
'--env-prefix', default=[], action='append',
help='Specify a VAR=./path/fragment to put in the environment variable '
'before executing the command. The path fragment must be relative '
'to the isolated run directory, and must not contain a `..` token. '
'The path will be made absolute and prepended to the indicated '
'$VAR using the OS\'s path separator. Multiple items for the same '
'$VAR will be prepended in order.')
parser.add_option(
'--bot-file',
help='Path to a file describing the state of the host. The content is '
'defined by on_before_task() in bot_config.')
parser.add_option(
'--switch-to-account',
help='If given, switches LUCI_CONTEXT to given logical service account '
'(e.g. "task" or "system") before launching the isolated process.')
parser.add_option(
'--output', action='append',
help='Specifies an output to return. If no outputs are specified, all '
'files located in $(ISOLATED_OUTDIR) will be returned; '
'otherwise, outputs in both $(ISOLATED_OUTDIR) and those '
'specified by --output option (there can be multiple) will be '
'returned. Note that if a file in OUT_DIR has the same path '
'as an --output option, the --output version will be returned.')
parser.add_option(
'-a', '--argsfile',
# This is actually handled in parse_args; it's included here purely so it
# can make it into the help text.
help='Specify a file containing a JSON array of arguments to this '
'script. If --argsfile is provided, no other argument may be '
'provided on the command line.')
group = optparse.OptionGroup(parser, 'Data source')
group.add_option(
'-s', '--isolated',
help='Hash of the .isolated to grab from the isolate server.')
isolateserver.add_isolate_server_options(group)
parser.add_option_group(group)
isolateserver.add_cache_options(parser)
cipd.add_cipd_options(parser)
group = optparse.OptionGroup(parser, 'Named caches')
group.add_option(
'--named-cache',
dest='named_caches',
action='append',
nargs=3,
default=[],
help='A named cache to request. Accepts 3 arguments: name, path, hint. '
'name identifies the cache, must match regex [a-z0-9_]{1,4096}. '
'path is a path relative to the run dir where the cache directory '
'must be put to. '
'This option can be specified more than once.')
group.add_option(
'--named-cache-root', default='named_caches',
help='Cache root directory. Default=%default')
parser.add_option_group(group)
group = optparse.OptionGroup(parser, 'Process containment')
parser.add_option(
'--lower-priority', action='store_true',
help='Lowers the child process priority')
parser.add_option(
'--containment-type', choices=('NONE', 'AUTO', 'JOB_OBJECT'),
default='NONE',
help='Type of container to use')
parser.add_option(
'--limit-processes', type='int', default=0,
help='Maximum number of active processes in the containment')
parser.add_option(
'--limit-total-committed-memory', type='int', default=0,
help='Maximum sum of committed memory in the containment')
parser.add_option_group(group)
group = optparse.OptionGroup(parser, 'Debugging')
group.add_option(
'--leak-temp-dir',
action='store_true',
help='Deliberately leak isolate\'s temp dir for later examination. '
'Default: %default')
group.add_option(
'--root-dir', help='Use a directory instead of a random one')
parser.add_option_group(group)
auth.add_auth_options(parser)
parser.set_defaults(cache='cache', cipd_cache='cipd_cache')
return parser
def process_named_cache_options(parser, options, time_fn=None):
"""Validates named cache options and returns a CacheManager."""
if options.named_caches and not options.named_cache_root:
parser.error('--named-cache is specified, but --named-cache-root is empty')
for name, path, hint in options.named_caches:
if not CACHE_NAME_RE.match(name):
parser.error(
'cache name %r does not match %r' % (name, CACHE_NAME_RE.pattern))
if not path:
parser.error('cache path cannot be empty')
try:
long(hint)
except ValueError:
parser.error('cache hint must be a number')
if options.named_cache_root:
# Make these configurable later if there is use case but for now it's fairly
# safe values.
# In practice, a fair chunk of bots are already recycled on a daily schedule
# so this code doesn't have any effect to them, unless they are preloaded
# with a really old cache.
policies = local_caching.CachePolicies(
# 1TiB.
max_cache_size=1024*1024*1024*1024,
min_free_space=options.min_free_space,
max_items=50,
max_age_secs=MAX_AGE_SECS)
root_dir = unicode(os.path.abspath(options.named_cache_root))
return local_caching.NamedCache(root_dir, policies, time_fn=time_fn)
return None
def parse_args(args):
# Create a fake mini-parser just to get out the "-a" command. Note that
# it's not documented here; instead, it's documented in create_option_parser
# even though that parser will never actually get to parse it. This is
# because --argsfile is exclusive with all other options and arguments.
file_argparse = argparse.ArgumentParser(add_help=False)
file_argparse.add_argument('-a', '--argsfile')
(file_args, nonfile_args) = file_argparse.parse_known_args(args)
if file_args.argsfile:
if nonfile_args:
file_argparse.error('Can\'t specify --argsfile with'
'any other arguments (%s)' % nonfile_args)
try:
with open(file_args.argsfile, 'r') as f:
args = json.load(f)
except (IOError, OSError, ValueError) as e:
# We don't need to error out here - "args" is now empty,
# so the call below to parser.parse_args(args) will fail
# and print the full help text.
print >> sys.stderr, 'Couldn\'t read arguments: %s' % e
# Even if we failed to read the args, just call the normal parser now since it
# will print the correct help message.
parser = create_option_parser()
options, args = parser.parse_args(args)
return (parser, options, args)
def _calc_named_cache_hint(named_cache, named_caches):
"""Returns the expected size of the missing named caches."""
present = named_cache.available
size = 0
for name, _, hint in named_caches:
if name not in present:
hint = long(hint)
if hint > 0:
size += hint
return size
def main(args):
# Warning: when --argsfile is used, the strings are unicode instances, when
# parsed normally, the strings are str instances.
(parser, options, args) = parse_args(args)
if not file_path.enable_symlink():
logging.warning('Symlink support is not enabled')
named_cache = process_named_cache_options(parser, options)
# hint is 0 if there's no named cache.
hint = _calc_named_cache_hint(named_cache, options.named_caches)
if hint:
# Increase the --min-free-space value by the hint, and recreate the
# NamedCache instance so it gets the updated CachePolicy.
options.min_free_space += hint
named_cache = process_named_cache_options(parser, options)
# TODO(maruel): CIPD caches should be defined at an higher level here too, so
# they can be cleaned the same way.
isolate_cache = isolateserver.process_cache_options(options, trim=False)
caches = []
if isolate_cache:
caches.append(isolate_cache)
if named_cache:
caches.append(named_cache)
root = caches[0].cache_dir if caches else unicode(os.getcwd())
if options.clean:
if options.isolated:
parser.error('Can\'t use --isolated with --clean.')
if options.isolate_server:
parser.error('Can\'t use --isolate-server with --clean.')
if options.json:
parser.error('Can\'t use --json with --clean.')
if options.named_caches:
parser.error('Can\t use --named-cache with --clean.')
# Trim first, then clean.
local_caching.trim_caches(
caches,
root,
min_free_space=options.min_free_space,
max_age_secs=MAX_AGE_SECS)
for c in caches:
c.cleanup()
return 0
# Trim must still be done for the following case:
# - named-cache was used
# - some entries, with a large hint, where missing
# - --min-free-space was increased accordingly, thus trimming is needed
# Otherwise, this will have no effect, as bot_main calls run_isolated with
# --clean after each task.
if hint:
logging.info('Additional trimming of %d bytes', hint)
local_caching.trim_caches(
caches,
root,
min_free_space=options.min_free_space,
max_age_secs=MAX_AGE_SECS)
if not options.isolated and not args:
parser.error('--isolated or command to run is required.')
auth.process_auth_options(parser, options)
isolateserver.process_isolate_server_options(
parser, options, True, False)
if not options.isolate_server:
if options.isolated:
parser.error('--isolated requires --isolate-server')
if ISOLATED_OUTDIR_PARAMETER in args:
parser.error(
'%s in args requires --isolate-server' % ISOLATED_OUTDIR_PARAMETER)
if options.root_dir:
options.root_dir = unicode(os.path.abspath(options.root_dir))
if options.json:
options.json = unicode(os.path.abspath(options.json))
if any('=' not in i for i in options.env):
parser.error(
'--env required key=value form. value can be skipped to delete '
'the variable')
options.env = dict(i.split('=', 1) for i in options.env)
prefixes = {}
cwd = os.path.realpath(os.getcwd())
for item in options.env_prefix:
if '=' not in item:
parser.error(
'--env-prefix %r is malformed, must be in the form `VAR=./path`'
% item)
key, opath = item.split('=', 1)
if os.path.isabs(opath):
parser.error('--env-prefix %r path is bad, must be relative.' % opath)
opath = os.path.normpath(opath)
if not os.path.realpath(os.path.join(cwd, opath)).startswith(cwd):
parser.error(
'--env-prefix %r path is bad, must be relative and not contain `..`.'
% opath)
prefixes.setdefault(key, []).append(opath)
options.env_prefix = prefixes
cipd.validate_cipd_options(parser, options)
install_packages_fn = noop_install_packages
if options.cipd_enabled:
install_packages_fn = lambda run_dir: install_client_and_packages(
run_dir, cipd.parse_package_args(options.cipd_packages),
options.cipd_server, options.cipd_client_package,
options.cipd_client_version, cache_dir=options.cipd_cache)
@contextlib.contextmanager
def install_named_caches(run_dir):
# WARNING: this function depends on "options" variable defined in the outer
# function.
assert unicode(run_dir), repr(run_dir)
assert os.path.isabs(run_dir), run_dir
named_caches = [
(os.path.join(run_dir, unicode(relpath)), name)
for name, relpath, _ in options.named_caches
]
for path, name in named_caches:
named_cache.install(path, name)
try:
yield
finally:
# Uninstall each named cache, returning it to the cache pool. If an
# uninstall fails for a given cache, it will remain in the task's
# temporary space, get cleaned up by the Swarming bot, and be lost.
#
# If the Swarming bot cannot clean up the cache, it will handle it like
# any other bot file that could not be removed.
for path, name in reversed(named_caches):
try:
# uninstall() doesn't trim but does call save() implicitly. Trimming
# *must* be done manually via periodic 'run_isolated.py --clean'.
named_cache.uninstall(path, name)
except local_caching.NamedCacheError:
logging.exception('Error while removing named cache %r at %r. '
'The cache will be lost.', path, name)
extra_args = []
command = []
if options.raw_cmd:
command = args
if options.relative_cwd:
a = os.path.normpath(os.path.abspath(options.relative_cwd))
if not a.startswith(os.getcwd()):
parser.error(
'--relative-cwd must not try to escape the working directory')
else:
if options.relative_cwd:
parser.error('--relative-cwd requires --raw-cmd')
extra_args = args
containment_type = subprocess42.Containment.NONE
if options.containment_type == 'AUTO':
containment_type = subprocess42.Containment.AUTO
if options.containment_type == 'JOB_OBJECT':
containment_type = subprocess42.Containment.JOB_OBJECT
containment = subprocess42.Containment(
containment_type=containment_type,
limit_processes=options.limit_processes,
limit_total_committed_memory=options.limit_total_committed_memory)
data = TaskData(
command=command,
relative_cwd=options.relative_cwd,
extra_args=extra_args,
isolated_hash=options.isolated,
storage=None,
isolate_cache=isolate_cache,
outputs=options.output,
install_named_caches=install_named_caches,
leak_temp_dir=options.leak_temp_dir,
root_dir=_to_unicode(options.root_dir),
hard_timeout=options.hard_timeout,
grace_period=options.grace_period,
bot_file=options.bot_file,
switch_to_account=options.switch_to_account,
install_packages_fn=install_packages_fn,
use_symlinks=bool(options.use_symlinks),
env=options.env,
env_prefix=options.env_prefix,
lower_priority=bool(options.lower_priority),
containment=containment)
try:
if options.isolate_server:
server_ref = isolate_storage.ServerRef(
options.isolate_server, options.namespace)
storage = isolateserver.get_storage(server_ref)
with storage:
data = data._replace(storage=storage)
# Hashing schemes used by |storage| and |isolate_cache| MUST match.
assert storage.server_ref.hash_algo == server_ref.hash_algo
return run_tha_test(data, options.json)
return run_tha_test(data, options.json)
except (
cipd.Error,
local_caching.NamedCacheError,
local_caching.NoMoreSpace) as ex:
print >> sys.stderr, ex.message
return 1
if __name__ == '__main__':
subprocess42.inhibit_os_error_reporting()
# Ensure that we are always running with the correct encoding.
fix_encoding.fix_encoding()
sys.exit(main(sys.argv[1:]))