blob: 48d154bfbd637d3682fc830b828fdf51ebc359f0 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2012 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Runs a command with optional isolated input/output.
run_isolated takes cares of setting up a temporary environment, running a
command, and tearing it down.
It handles downloading and uploading isolated files, mapping CIPD packages and
reusing stateful named caches.
The isolated files, CIPD packages and named caches are kept as a global LRU
cache.
Any ${EXECUTABLE_SUFFIX} on the command line or the environment variables passed
with the --env option will be replaced with ".exe" string on Windows and "" on
other platforms.
Any ${ISOLATED_OUTDIR} on the command line or the environment variables passed
with the --env option will be replaced by the location of a temporary directory
upon execution of the command specified in the .isolated file. All content
written to this directory will be uploaded upon termination and the .isolated
file describing this directory will be printed to stdout.
Any ${SWARMING_BOT_FILE} on the command line or the environment variables passed
with the --env option will be replaced by the value of the --bot-file parameter.
This file is used by a swarming bot to communicate state of the host to tasks.
It is written to by the swarming bot's on_before_task() hook in the swarming
server's custom bot_config.py.
Any ${SWARMING_TASK_ID} on the command line will be replaced by the
SWARMING_TASK_ID value passed with the --env option.
See
https://chromium.googlesource.com/infra/luci/luci-py.git/+/master/appengine/swarming/doc/Magic-Values.md
for all the variables.
See
https://chromium.googlesource.com/infra/luci/luci-py.git/+/master/appengine/swarming/swarming_bot/config/bot_config.py
for more information about bot_config.py.
"""
from __future__ import print_function
__version__ = '1.0.1'
import argparse
import base64
import collections
import contextlib
import distutils
import errno
import json
import logging
import optparse
import os
import platform
import re
import shutil
import sys
import tempfile
import time
from utils import tools
tools.force_local_third_party()
# third_party/
from depot_tools import fix_encoding
# pylint: disable=ungrouped-imports
import auth
import cipd
import errors
import local_caching
from libs import luci_context
from utils import file_path
from utils import fs
from utils import logging_utils
from utils import net
from utils import on_error
from utils import subprocess42
# Magic variables that can be found in the isolate task command line.
ISOLATED_OUTDIR_PARAMETER = '${ISOLATED_OUTDIR}'
EXECUTABLE_SUFFIX_PARAMETER = '${EXECUTABLE_SUFFIX}'
SWARMING_BOT_FILE_PARAMETER = '${SWARMING_BOT_FILE}'
SWARMING_TASK_ID_PARAMETER = '${SWARMING_TASK_ID}'
# The name of the log file to use.
RUN_ISOLATED_LOG_FILE = 'run_isolated.log'
# Use short names for temporary directories. This is driven by Windows, which
# imposes a relatively short maximum path length of 260 characters, often
# referred to as MAX_PATH. It is relatively easy to create files with longer
# path length. A use case is with recursive dependency trees like npm packages.
#
# It is recommended to start the script with a `root_dir` as short as
# possible.
# - ir stands for isolated_run
# - io stands for isolated_out
# - it stands for isolated_tmp
ISOLATED_RUN_DIR = 'ir'
ISOLATED_OUT_DIR = 'io'
ISOLATED_TMP_DIR = 'it'
_CAS_CLIENT_DIR = 'cc'
# TODO(tikuta): take these parameter from luci-config?
_CAS_PACKAGE = 'infra/tools/luci/cas/${platform}'
_LUCI_GO_REVISION = 'git_revision:623f8d17a069eaea6d0fca13147888284ec76ff1'
# Keep synced with task_request.py
CACHE_NAME_RE = re.compile(r'^[a-z0-9_]{1,4096}$')
_FREE_SPACE_BUFFER_FOR_CIPD_PACKAGES = 2 * 1024 * 1024 * 1024
OUTLIVING_ZOMBIE_MSG = """\
*** Swarming tried multiple times to delete the %s directory and failed ***
*** Hard failing the task ***
Swarming detected that your testing script ran an executable, which may have
started a child executable, and the main script returned early, leaving the
children executables playing around unguided.
You don't want to leave children processes outliving the task on the Swarming
bot, do you? The Swarming bot doesn't.
How to fix?
- For any process that starts children processes, make sure all children
processes terminated properly before each parent process exits. This is
especially important in very deep process trees.
- This must be done properly both in normal successful task and in case of
task failure. Cleanup is very important.
- The Swarming bot sends a SIGTERM in case of timeout.
- You have %s seconds to comply after the signal was sent to the process
before the process is forcibly killed.
- To achieve not leaking children processes in case of signals on timeout, you
MUST handle signals in each executable / python script and propagate them to
children processes.
- When your test script (python or binary) receives a signal like SIGTERM or
CTRL_BREAK_EVENT on Windows), send it to all children processes and wait for
them to terminate before quitting.
See
https://chromium.googlesource.com/infra/luci/luci-py.git/+/master/appengine/swarming/doc/Bot.md#Graceful-termination_aka-the-SIGTERM-and-SIGKILL-dance
for more information.
*** May the SIGKILL force be with you ***
"""
# Currently hardcoded. Eventually could be exposed as a flag once there's value.
# 3 weeks
MAX_AGE_SECS = 21*24*60*60
_CAS_KVS_CACHE_THRESHOLD = 5 * 1024 * 1024 * 1024 # 5 GiB
TaskData = collections.namedtuple(
'TaskData',
[
# List of strings; the command line to use, independent of what was
# specified in the isolated file.
'command',
# Relative directory to start command into.
'relative_cwd',
# Digest of the input root on RBE-CAS.
'cas_digest',
# Full CAS instance name.
'cas_instance',
# List of paths relative to root_dir to put into the output isolated
# bundle upon task completion (see link_outputs_to_outdir).
'outputs',
# Function (run_dir) => context manager that installs named caches into
# |run_dir|.
'install_named_caches',
# If True, the temporary directory will be deliberately leaked for later
# examination.
'leak_temp_dir',
# Path to the directory to use to create the temporary directory. If not
# specified, a random temporary directory is created.
'root_dir',
# Kills the process if it lasts more than this amount of seconds.
'hard_timeout',
# Number of seconds to wait between SIGTERM and SIGKILL.
'grace_period',
# Path to a file with bot state, used in place of ${SWARMING_BOT_FILE}
# task command line argument.
'bot_file',
# Logical account to switch LUCI_CONTEXT into.
'switch_to_account',
# Context manager dir => CipdInfo, see install_client_and_packages.
'install_packages_fn',
# Cache directory for `cas` client.
'cas_cache_dir',
# Parameters passed to `cas` client.
'cas_cache_policies',
# Parameters for kvs file used by `cas` client.
'cas_kvs',
# Environment variables to set.
'env',
# Environment variables to mutate with relative directories.
# Example: {"ENV_KEY": ['relative', 'paths', 'to', 'prepend']}
'env_prefix',
# Lowers the task process priority.
'lower_priority',
# subprocess42.Containment instance. Can be None.
'containment',
# Function to trim caches before installing cipd packages and
# downloading isolated files.
'trim_caches_fn',
])
def make_temp_dir(prefix, root_dir):
"""Returns a new unique temporary directory."""
return tempfile.mkdtemp(prefix=prefix, dir=root_dir)
@contextlib.contextmanager
def set_luci_context_account(account, tmp_dir):
"""Sets LUCI_CONTEXT account to be used by the task.
If 'account' is None or '', does nothing at all. This happens when
run_isolated.py is called without '--switch-to-account' flag. In this case,
if run_isolated.py is running in some LUCI_CONTEXT environment, the task will
just inherit whatever account is already set. This may happen if users invoke
run_isolated.py explicitly from their code.
If the requested account is not defined in the context, switches to
non-authenticated access. This happens for Swarming tasks that don't use
'task' service accounts.
If not using LUCI_CONTEXT-based auth, does nothing.
If already running as requested account, does nothing.
"""
if not account:
# Not actually switching.
yield
return
local_auth = luci_context.read('local_auth')
if not local_auth:
# Not using LUCI_CONTEXT auth at all.
yield
return
# See LUCI_CONTEXT.md for the format of 'local_auth'.
if local_auth.get('default_account_id') == account:
# Already set, no need to switch.
yield
return
available = {a['id'] for a in local_auth.get('accounts') or []}
if account in available:
logging.info('Switching default LUCI_CONTEXT account to %r', account)
local_auth['default_account_id'] = account
else:
logging.warning(
'Requested LUCI_CONTEXT account %r is not available (have only %r), '
'disabling authentication', account, sorted(available))
local_auth.pop('default_account_id', None)
with luci_context.write(_tmpdir=tmp_dir, local_auth=local_auth):
yield
def process_command(command, out_dir, bot_file):
"""Replaces parameters in a command line.
Raises:
ValueError if a parameter is requested in |command| but its value is not
provided.
"""
return [replace_parameters(arg, out_dir, bot_file) for arg in command]
def replace_parameters(arg, out_dir, bot_file):
"""Replaces parameter tokens with appropriate values in a string.
Raises:
ValueError if a parameter is requested in |arg| but its value is not
provided.
"""
arg = arg.replace(EXECUTABLE_SUFFIX_PARAMETER, cipd.EXECUTABLE_SUFFIX)
replace_slash = False
if ISOLATED_OUTDIR_PARAMETER in arg:
if not out_dir:
raise ValueError(
'output directory is requested in command or env var, but not '
'provided; please specify one')
arg = arg.replace(ISOLATED_OUTDIR_PARAMETER, out_dir)
replace_slash = True
if SWARMING_BOT_FILE_PARAMETER in arg:
if bot_file:
arg = arg.replace(SWARMING_BOT_FILE_PARAMETER, bot_file)
replace_slash = True
else:
logging.warning('SWARMING_BOT_FILE_PARAMETER found in command or env '
'var, but no bot_file specified. Leaving parameter '
'unchanged.')
if SWARMING_TASK_ID_PARAMETER in arg:
task_id = os.environ.get('SWARMING_TASK_ID')
if task_id:
arg = arg.replace(SWARMING_TASK_ID_PARAMETER, task_id)
if replace_slash:
# Replace slashes only if parameters are present
# because of arguments like '${ISOLATED_OUTDIR}/foo/bar'
arg = arg.replace('/', os.sep)
return arg
def set_temp_dir(env, tmp_dir):
"""Set temp dir to given env var dictionary"""
# pylint: disable=line-too-long
# * python respects $TMPDIR, $TEMP, and $TMP in this order, regardless of
# platform. So $TMPDIR must be set on all platforms.
# https://github.com/python/cpython/blob/2.7/Lib/tempfile.py#L155
env['TMPDIR'] = tmp_dir
if sys.platform == 'win32':
# * chromium's base utils uses GetTempPath().
# https://cs.chromium.org/chromium/src/base/files/file_util_win.cc?q=GetTempPath
# * Go uses GetTempPath().
# * GetTempDir() uses %TMP%, then %TEMP%, then other stuff. So %TMP% must be
# set.
# https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-gettemppathw
env['TMP'] = tmp_dir
# https://blogs.msdn.microsoft.com/oldnewthing/20150417-00/?p=44213
env['TEMP'] = tmp_dir
elif sys.platform == 'darwin':
# * Chromium uses an hack on macOS before calling into
# NSTemporaryDirectory().
# https://cs.chromium.org/chromium/src/base/files/file_util_mac.mm?q=GetTempDir
# https://developer.apple.com/documentation/foundation/1409211-nstemporarydirectory
env['MAC_CHROMIUM_TMPDIR'] = tmp_dir
else:
# TMPDIR is specified as the POSIX standard envvar for the temp directory.
# http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap08.html
# * mktemp on linux respects $TMPDIR.
# * Chromium respects $TMPDIR on linux.
# https://cs.chromium.org/chromium/src/base/files/file_util_posix.cc?q=GetTempDir
# * Go uses $TMPDIR.
# https://go.googlesource.com/go/+/go1.10.3/src/os/file_unix.go#307
pass
def get_command_env(tmp_dir, cipd_info, run_dir, env, env_prefixes, out_dir,
bot_file):
"""Returns full OS environment to run a command in.
Sets up TEMP, puts directory with cipd binary in front of PATH, exposes
CIPD_CACHE_DIR env var, and installs all env_prefixes.
Args:
tmp_dir: temp directory.
cipd_info: CipdInfo object is cipd client is used, None if not.
run_dir: The root directory the isolated tree is mapped in.
env: environment variables to use
env_prefixes: {"ENV_KEY": ['cwd', 'relative', 'paths', 'to', 'prepend']}
out_dir: Isolated output directory. Required to be != None if any of the
env vars contain ISOLATED_OUTDIR_PARAMETER.
bot_file: Required to be != None if any of the env vars contain
SWARMING_BOT_FILE_PARAMETER.
"""
out = os.environ.copy()
for k, v in env.items():
if not v:
out.pop(k, None)
else:
out[k] = replace_parameters(v, out_dir, bot_file)
if cipd_info:
bin_dir = os.path.dirname(cipd_info.client.binary_path)
out['PATH'] = '%s%s%s' % (bin_dir, os.pathsep, out['PATH'])
out['CIPD_CACHE_DIR'] = cipd_info.cache_dir
cipd_info_path = os.path.join(tmp_dir, 'cipd_info.json')
with open(cipd_info_path, 'w') as f:
json.dump(cipd_info.pins, f)
out['ISOLATED_RESOLVED_PACKAGE_VERSIONS_FILE'] = cipd_info_path
for key, paths in env_prefixes.items():
assert isinstance(paths, list), paths
paths = [os.path.normpath(os.path.join(run_dir, p)) for p in paths]
cur = out.get(key)
if cur:
paths.append(cur)
out[key] = os.path.pathsep.join(paths)
set_temp_dir(out, tmp_dir)
return out
def run_command(
command, cwd, env, hard_timeout, grace_period, lower_priority, containment):
"""Runs the command.
Returns:
tuple(process exit code, bool if had a hard timeout)
"""
logging_utils.user_logs('run_command(%s, %s, %s, %s, %s, %s)', command, cwd,
hard_timeout, grace_period, lower_priority,
containment)
exit_code = None
had_hard_timeout = False
with tools.Profiler('RunTest'):
proc = None
had_signal = []
try:
# TODO(maruel): This code is imperfect. It doesn't handle well signals
# during the download phase and there's short windows were things can go
# wrong.
def handler(signum, _frame):
if proc and not had_signal:
logging.info('Received signal %d', signum)
had_signal.append(True)
raise subprocess42.TimeoutExpired(command, None)
proc = subprocess42.Popen(
command, cwd=cwd, env=env, detached=True, close_fds=True,
lower_priority=lower_priority, containment=containment)
logging.info('Subprocess for command started')
with subprocess42.set_signal_handler(subprocess42.STOP_SIGNALS, handler):
try:
exit_code = proc.wait(hard_timeout or None)
logging.info("finished with exit code %d after hard_timeout %s",
exit_code, hard_timeout)
except subprocess42.TimeoutExpired:
if not had_signal:
logging.warning('Hard timeout')
had_hard_timeout = True
logging.warning('Sending SIGTERM')
proc.terminate()
kill_sent = False
# Ignore signals in grace period. Forcibly give the grace period to the
# child process.
if exit_code is None:
ignore = lambda *_: None
with subprocess42.set_signal_handler(subprocess42.STOP_SIGNALS, ignore):
try:
exit_code = proc.wait(grace_period or None)
logging.info("finished with exit code %d after grace_period %s",
exit_code, grace_period)
except subprocess42.TimeoutExpired:
# Now kill for real. The user can distinguish between the
# following states:
# - signal but process exited within grace period,
# hard_timed_out will be set but the process exit code will be
# script provided.
# - processed exited late, exit code will be -9 on posix.
logging.warning('Grace exhausted; sending SIGKILL')
proc.kill()
kill_sent = True
logging.info('Waiting for process exit')
exit_code = proc.wait()
# the process group / job object may be dangling so if we didn't kill
# it already, give it a poke now.
if not kill_sent:
proc.kill()
except OSError as e:
# This is not considered to be an internal error. The executable simply
# does not exit.
sys.stderr.write(
'<The executable does not exist, a dependent library is missing or '
'the command line is too long>\n'
'<Check for missing .so/.dll in the .isolate or GN file or length of '
'command line args>\n'
'<Command: %s>\n'
'<Exception: %s>\n' % (command, e))
if os.environ.get('SWARMING_TASK_ID'):
# Give an additional hint when running as a swarming task.
sys.stderr.write(
'<See the task\'s page for commands to help diagnose this issue '
'by reproducing the task locally>\n')
exit_code = 1
logging.info(
'Command finished with exit code %d (%s)',
exit_code, hex(0xffffffff & exit_code))
return exit_code, had_hard_timeout
def _run_go_cmd_and_wait(cmd, tmp_dir):
"""
Runs an external Go command, `isolated` or `cas`, and wait for its completion.
While this is a generic function to launch a subprocess, it has logic that
is specific to Go `isolated` and `cas` for waiting and logging.
Returns:
The subprocess object
"""
cmd_str = ' '.join(cmd)
try:
env = os.environ.copy()
set_temp_dir(env, tmp_dir)
proc = subprocess42.Popen(cmd, env=env)
exceeded_max_timeout = True
check_period_sec = 30
max_checks = 100
# max timeout = max_checks * check_period_sec = 50 minutes
for i in range(max_checks):
# This is to prevent I/O timeout error during setup.
try:
retcode = proc.wait(check_period_sec)
if retcode != 0:
raise subprocess42.CalledProcessError(retcode, cmd=cmd_str)
exceeded_max_timeout = False
break
except subprocess42.TimeoutExpired:
print('still running (after %d seconds)' % ((i + 1) * check_period_sec))
if exceeded_max_timeout:
proc.terminate()
try:
proc.wait(check_period_sec)
except subprocess42.TimeoutExpired:
logging.exception(
"failed to terminate? timeout happened after %d seconds",
check_period_sec)
proc.kill()
proc.wait()
# Raise unconditionally, because |proc| was forcefully terminated.
raise ValueError("timedout after %d seconds (cmd=%s)" %
(check_period_sec * max_checks, cmd_str))
return proc
except Exception:
logging.exception('Failed to run Go cmd %s', cmd_str)
raise
def _fetch_and_map(cas_client, digest, instance, output_dir, cache_dir,
policies, kvs_dir, tmp_dir):
"""
Fetches a CAS tree using cas client, create the tree and returns download
stats.
"""
start = time.time()
result_json_handle, result_json_path = tempfile.mkstemp(
prefix='fetch-and-map-result-', suffix='.json')
os.close(result_json_handle)
profile_dir = tempfile.mkdtemp(dir=tmp_dir)
try:
cmd = [
cas_client,
'download',
'-digest',
digest,
# flags for cache.
'-cache-dir',
cache_dir,
'-cache-max-size',
str(policies.max_cache_size),
'-cache-min-free-space',
str(policies.min_free_space),
# flags for output.
'-dir',
output_dir,
'-dump-json',
result_json_path,
'-log-level',
'info',
]
# When RUN_ISOLATED_CAS_ADDRESS is set in test mode,
# Use it and ignore CAS instance option.
cas_addr = os.environ.get('RUN_ISOLATED_CAS_ADDRESS')
if cas_addr:
cmd.extend([
'-cas-addr',
cas_addr,
])
else:
cmd.extend([
'-cas-instance',
instance
])
if kvs_dir:
cmd.extend(['-kvs-dir', kvs_dir])
def open_json_and_check(result_json_path, cleanup_dirs):
cas_error = False
result_json = {}
error_digest = digest
try:
with open(result_json_path) as json_file:
result_json = json.load(json_file)
cas_error = result_json.get('result') in ('digest_invalid',
'authentication_error',
'arguments_invalid')
if cas_error and result_json.get('error_details'):
error_digest = result_json['error_details'].get('digest', digest)
except (IOError, ValueError):
logging.error('Failed to read json file: %s', result_json_path)
raise
finally:
if cleanup_dirs:
file_path.rmtree(kvs_dir)
file_path.rmtree(output_dir)
if cas_error:
raise errors.NonRecoverableCasException(result_json['result'],
error_digest, instance)
return result_json
try:
_run_go_cmd_and_wait(cmd, tmp_dir)
except subprocess42.CalledProcessError as ex:
if not kvs_dir:
open_json_and_check(result_json_path, False)
raise
open_json_and_check(result_json_path, True)
logging.exception('Failed to run cas, removing kvs cache dir and retry.')
on_error.report("Failed to run cas %s" % ex)
_run_go_cmd_and_wait(cmd, tmp_dir)
result_json = open_json_and_check(result_json_path, False)
return {
'duration': time.time() - start,
'items_cold': result_json['items_cold'],
'items_hot': result_json['items_hot'],
}
finally:
fs.remove(result_json_path)
file_path.rmtree(profile_dir)
def link_outputs_to_outdir(run_dir, out_dir, outputs):
"""Links any named outputs to out_dir so they can be uploaded.
Raises an error if the file already exists in that directory.
"""
if not outputs:
return
file_path.create_directories(out_dir, outputs)
for o in outputs:
copy_recursively(os.path.join(run_dir, o), os.path.join(out_dir, o))
def copy_recursively(src, dst):
"""Efficiently copies a file or directory from src_dir to dst_dir.
`item` may be a file, directory, or a symlink to a file or directory.
All symlinks are replaced with their targets, so the resulting
directory structure in dst_dir will never have any symlinks.
To increase speed, copy_recursively hardlinks individual files into the
(newly created) directory structure if possible, unlike Python's
shutil.copytree().
"""
orig_src = src
try:
# Replace symlinks with their final target.
while fs.islink(src):
res = fs.readlink(src)
src = os.path.realpath(os.path.join(os.path.dirname(src), res))
# TODO(sadafm): Explicitly handle cyclic symlinks.
if not fs.exists(src):
logging.warning('Path %s does not exist or %s is a broken symlink', src,
orig_src)
return
if fs.isfile(src):
file_path.link_file(dst, src, file_path.HARDLINK_WITH_FALLBACK)
return
if not fs.exists(dst):
os.makedirs(dst)
for child in fs.listdir(src):
copy_recursively(os.path.join(src, child), os.path.join(dst, child))
except OSError as e:
if e.errno == errno.ENOENT:
logging.warning('Path %s does not exist or %s is a broken symlink',
src, orig_src)
else:
logging.info("Couldn't collect output file %s: %s", src, e)
def upload_outdir(cas_client, cas_instance, outdir, tmp_dir):
"""Uploads the results in |outdir|, if there is any.
Returns:
tuple(root_digest, stats)
- root_digest: a digest of the output directory.
- stats: uploading stats.
"""
if not fs.listdir(outdir):
return None, None
digest_file_handle, digest_path = tempfile.mkstemp(prefix='cas-digest',
suffix='.txt')
os.close(digest_file_handle)
stats_json_handle, stats_json_path = tempfile.mkstemp(prefix='upload-stats',
suffix='.json')
os.close(stats_json_handle)
try:
cmd = [
cas_client,
'archive',
'-log-level',
'debug',
'-paths',
# Format: <working directory>:<relative path to dir>
outdir + ':',
# output
'-dump-digest',
digest_path,
'-dump-json',
stats_json_path,
]
# When RUN_ISOLATED_CAS_ADDRESS is set in test mode,
# Use it and ignore CAS instance option.
cas_addr = os.environ.get('RUN_ISOLATED_CAS_ADDRESS')
if cas_addr:
cmd.extend([
'-cas-addr',
cas_addr,
])
elif cas_instance:
cmd.extend([
'-cas-instance',
cas_instance
])
else:
return None, None
start = time.time()
_run_go_cmd_and_wait(cmd, tmp_dir)
with open(digest_path) as digest_file:
digest = digest_file.read()
h, s = digest.split('/')
cas_output_root = {
'cas_instance': cas_instance,
'digest': {
'hash': h,
'size_bytes': int(s)
}
}
with open(stats_json_path) as stats_file:
stats = json.load(stats_file)
stats['duration'] = time.time() - start
return cas_output_root, stats
finally:
fs.remove(digest_path)
fs.remove(stats_json_path)
def map_and_run(data, constant_run_path):
"""Runs a command with optional isolated input/output.
Arguments:
- data: TaskData instance.
- constant_run_path: TODO
Returns metadata about the result.
"""
# TODO(tikuta): take stats from state.json in this case too.
download_stats = {
# 'duration': 0.,
# 'initial_number_items': len(data.cas_cache),
# 'initial_size': data.cas_cache.total_size,
# 'items_cold': '<large.pack()>',
# 'items_hot': '<large.pack()>',
}
result = {
'duration': None,
'exit_code': None,
'had_hard_timeout': False,
'internal_failure': 'run_isolated did not complete properly',
'stats': {
'trim_caches': {
'duration': 0,
},
#'cipd': {
# 'duration': 0.,
# 'get_client_duration': 0.,
#},
'isolated': {
'download': download_stats,
#'upload': {
# 'duration': 0.,
# 'items_cold': '<large.pack()>',
# 'items_hot': '<large.pack()>',
#},
},
'named_caches': {
'install': {
'duration': 0,
},
'uninstall': {
'duration': 0,
},
},
'cleanup': {
'duration': 0,
}
},
#'cipd_pins': {
# 'packages': [
# {'package_name': ..., 'version': ..., 'path': ...},
# ...
# ],
# 'client_package': {'package_name': ..., 'version': ...},
#},
'outputs_ref': None,
'cas_output_root': None,
'version': 5,
}
assert os.path.isabs(data.root_dir), ("data.root_dir is not abs path: %s" %
data.root_dir)
file_path.ensure_tree(data.root_dir, 0o700)
# See comment for these constants.
# TODO(maruel): This is not obvious. Change this to become an error once we
# make the constant_run_path an exposed flag.
if constant_run_path and data.root_dir:
run_dir = os.path.join(data.root_dir, ISOLATED_RUN_DIR)
if os.path.isdir(run_dir):
file_path.rmtree(run_dir)
os.mkdir(run_dir, 0o700)
else:
run_dir = make_temp_dir(ISOLATED_RUN_DIR, data.root_dir)
# storage should be normally set but don't crash if it is not. This can happen
# as Swarming task can run without an isolate server.
out_dir = make_temp_dir(ISOLATED_OUT_DIR, data.root_dir)
tmp_dir = make_temp_dir(ISOLATED_TMP_DIR, data.root_dir)
cwd = run_dir
if data.relative_cwd:
cwd = os.path.normpath(os.path.join(cwd, data.relative_cwd))
command = data.command
cas_client_dir = make_temp_dir(_CAS_CLIENT_DIR, data.root_dir)
cas_client = os.path.join(cas_client_dir, 'cas' + cipd.EXECUTABLE_SUFFIX)
data.trim_caches_fn(result['stats']['trim_caches'])
try:
with data.install_packages_fn(run_dir, cas_client_dir) as cipd_info:
if cipd_info:
result['stats']['cipd'] = cipd_info.stats
result['cipd_pins'] = cipd_info.pins
isolated_stats = result['stats'].setdefault('isolated', {})
if data.cas_digest:
stats = _fetch_and_map(
cas_client=cas_client,
digest=data.cas_digest,
instance=data.cas_instance,
output_dir=run_dir,
cache_dir=data.cas_cache_dir,
policies=data.cas_cache_policies,
kvs_dir=data.cas_kvs,
tmp_dir=tmp_dir)
isolated_stats['download'].update(stats)
logging_utils.user_logs('Fetched CAS inputs')
if not command:
# Handle this as a task failure, not an internal failure.
sys.stderr.write(
'<No command was specified!>\n'
'<Please secify a command when triggering your Swarming task>\n')
result['exit_code'] = 1
return result
if not cwd.startswith(run_dir):
# Handle this as a task failure, not an internal failure. This is a
# 'last chance' way to gate against directory escape.
sys.stderr.write('<Relative CWD is outside of run directory!>\n')
result['exit_code'] = 1
return result
if not os.path.isdir(cwd):
# Accepts relative_cwd that does not exist.
os.makedirs(cwd, 0o700)
# If we have an explicit list of files to return, make sure their
# directories exist now.
if data.outputs:
file_path.create_directories(run_dir, data.outputs)
with data.install_named_caches(run_dir, result['stats']['named_caches']):
logging_utils.user_logs('Installed named caches')
sys.stdout.flush()
start = time.time()
try:
# Need to switch the default account before 'get_command_env' call,
# so it can grab correct value of LUCI_CONTEXT env var.
with set_luci_context_account(data.switch_to_account, tmp_dir):
env = get_command_env(
tmp_dir, cipd_info, run_dir, data.env, data.env_prefix, out_dir,
data.bot_file)
command = tools.find_executable(command, env)
command = process_command(command, out_dir, data.bot_file)
file_path.ensure_command_has_abs_path(command, cwd)
result['exit_code'], result['had_hard_timeout'] = run_command(
command, cwd, env, data.hard_timeout, data.grace_period,
data.lower_priority, data.containment)
finally:
result['duration'] = max(time.time() - start, 0)
# Try to link files to the output directory, if specified.
link_outputs_to_outdir(run_dir, out_dir, data.outputs)
isolated_stats = result['stats'].setdefault('isolated', {})
result['cas_output_root'], upload_stats = upload_outdir(
cas_client, data.cas_instance, out_dir, tmp_dir)
if upload_stats:
isolated_stats['upload'] = upload_stats
# We successfully ran the command, set internal_failure back to
# None (even if the command failed, it's not an internal error).
result['internal_failure'] = None
except errors.NonRecoverableCasException as e:
# We could not find the CAS package. The swarming task should not
# be retried automatically
result['missing_cas'] = [e.to_dict()]
logging.exception('internal failure: %s', e)
result['internal_failure'] = str(e)
on_error.report(None)
except errors.NonRecoverableCipdException as e:
# We could not find the CIPD package. The swarming task should not
# be retried automatically
result['missing_cipd'] = [e.to_dict()]
logging.exception('internal failure: %s', e)
result['internal_failure'] = str(e)
on_error.report(None)
except Exception as e:
# An internal error occurred. Report accordingly so the swarming task will
# be retried automatically.
logging.exception('internal failure: %s', e)
result['internal_failure'] = str(e)
on_error.report(None)
# Clean up
finally:
try:
cleanup_start = time.time()
success = True
if data.leak_temp_dir:
success = True
logging.warning(
'Deliberately leaking %s for later examination', run_dir)
else:
# On Windows rmtree(run_dir) call above has a synchronization effect: it
# finishes only when all task child processes terminate (since a running
# process locks *.exe file). Examine out_dir only after that call
# completes (since child processes may write to out_dir too and we need
# to wait for them to finish).
dirs_to_remove = [run_dir, tmp_dir, cas_client_dir]
if out_dir:
dirs_to_remove.append(out_dir)
for directory in dirs_to_remove:
if not fs.isdir(directory):
continue
start = time.time()
try:
file_path.rmtree(directory)
except OSError as e:
logging.error('rmtree(%r) failed: %s', directory, e)
success = False
finally:
logging.info('Cleanup: rmtree(%r) took %d seconds', directory,
time.time() - start)
if not success:
sys.stderr.write(
OUTLIVING_ZOMBIE_MSG % (directory, data.grace_period))
if sys.platform == 'win32':
subprocess42.check_call(['tasklist.exe', '/V'], stdout=sys.stderr)
else:
subprocess42.check_call(['ps', 'axu'], stdout=sys.stderr)
if result['exit_code'] == 0:
result['exit_code'] = 1
if not success and result['exit_code'] == 0:
result['exit_code'] = 1
except Exception as e:
# Swallow any exception in the main finally clause.
if out_dir:
logging.exception('Leaking out_dir %s: %s', out_dir, e)
result['internal_failure'] = str(e)
on_error.report(None)
finally:
cleanup_duration = time.time() - cleanup_start
result['stats']['cleanup']['duration'] = cleanup_duration
logging.info('Cleanup: removing directories took %d seconds',
cleanup_duration)
return result
def run_tha_test(data, result_json):
"""Runs an executable and records execution metadata.
If isolated_hash is specified, downloads the dependencies in the cache,
hardlinks them into a temporary directory and runs the command specified in
the .isolated.
A temporary directory is created to hold the output files. The content inside
this directory will be uploaded back to |storage| packaged as a .isolated
file.
Arguments:
- data: TaskData instance.
- result_json: File path to dump result metadata into. If set, the process
exit code is always 0 unless an internal error occurred.
Returns:
Process exit code that should be used.
"""
if result_json:
# Write a json output file right away in case we get killed.
result = {
'exit_code': None,
'had_hard_timeout': False,
'internal_failure': 'Was terminated before completion',
'outputs_ref': None,
'cas_output_root': None,
'version': 5,
}
tools.write_json(result_json, result, dense=True)
# run_isolated exit code. Depends on if result_json is used or not.
result = map_and_run(data, True)
logging.info('Result:\n%s', tools.format_json(result, dense=True))
if result_json:
# We've found tests to delete 'work' when quitting, causing an exception
# here. Try to recreate the directory if necessary.
file_path.ensure_tree(os.path.dirname(result_json))
tools.write_json(result_json, result, dense=True)
# Only return 1 if there was an internal error.
return int(bool(result['internal_failure']))
# Marshall into old-style inline output.
if result['outputs_ref']:
# pylint: disable=unsubscriptable-object
data = {
'hash': result['outputs_ref']['isolated'],
'namespace': result['outputs_ref']['namespace'],
'storage': result['outputs_ref']['isolatedserver'],
}
sys.stdout.flush()
print('[run_isolated_out_hack]%s[/run_isolated_out_hack]' %
tools.format_json(data, dense=True))
sys.stdout.flush()
return result['exit_code'] or int(bool(result['internal_failure']))
# Yielded by 'install_client_and_packages'.
CipdInfo = collections.namedtuple('CipdInfo', [
'client', # cipd.CipdClient object
'cache_dir', # absolute path to bot-global cipd tag and instance cache
'stats', # dict with stats to return to the server
'pins', # dict with installed cipd pins to return to the server
])
@contextlib.contextmanager
def copy_local_packages(_run_dir, cas_dir):
"""Copies CIPD packages from luci/luci-go dir."""
go_client_dir = os.environ.get('LUCI_GO_CLIENT_DIR')
assert go_client_dir, ('Please set LUCI_GO_CLIENT_DIR env var to install CIPD'
' packages locally.')
shutil.copy2(os.path.join(go_client_dir, 'cas' + cipd.EXECUTABLE_SUFFIX),
os.path.join(cas_dir, 'cas' + cipd.EXECUTABLE_SUFFIX))
yield None
def _install_packages(run_dir, cipd_cache_dir, client, packages, timeout=None):
"""Calls 'cipd ensure' for packages.
Args:
run_dir (str): root of installation.
cipd_cache_dir (str): the directory to use for the cipd package cache.
client (CipdClient): the cipd client to use
packages: packages to install, list [(path, package_name, version), ...].
timeout (int): if not None, timeout in seconds for cipd ensure to run.
Returns: list of pinned packages. Looks like [
{
'path': 'subdirectory',
'package_name': 'resolved/package/name',
'version': 'deadbeef...',
},
...
]
"""
package_pins = [None]*len(packages)
def insert_pin(path, name, version, idx):
package_pins[idx] = {
'package_name': name,
# swarming deals with 'root' as '.'
'path': path or '.',
'version': version,
}
by_path = collections.defaultdict(list)
for i, (path, name, version) in enumerate(packages):
# cipd deals with 'root' as ''
if path == '.':
path = ''
by_path[path].append((name, version, i))
pins = client.ensure(
run_dir,
{
subdir: [(name, vers) for name, vers, _ in pkgs]
for subdir, pkgs in by_path.items()
},
cache_dir=cipd_cache_dir,
timeout=timeout,
)
for subdir, pin_list in sorted(pins.items()):
this_subdir = by_path[subdir]
for i, (name, version) in enumerate(pin_list):
insert_pin(subdir, name, version, this_subdir[i][2])
assert None not in package_pins, (packages, pins, package_pins)
return package_pins
@contextlib.contextmanager
def install_client_and_packages(run_dir, packages, service_url,
client_package_name, client_version, cache_dir,
cas_dir):
"""Bootstraps CIPD client and installs CIPD packages.
Yields CipdClient, stats, client info and pins (as single CipdInfo object).
Pins and the CIPD client info are in the form of:
[
{
"path": path, "package_name": package_name, "version": version,
},
...
]
(the CIPD client info is a single dictionary instead of a list)
such that they correspond 1:1 to all input package arguments from the command
line. These dictionaries make their all the way back to swarming, where they
become the arguments of CipdPackage.
If 'packages' list is empty, will bootstrap CIPD client, but won't install
any packages.
The bootstrapped client (regardless whether 'packages' list is empty or not),
will be made available to the task via $PATH.
Args:
run_dir (str): root of installation.
packages: packages to install, list [(path, package_name, version), ...].
service_url (str): CIPD server url, e.g.
"https://chrome-infra-packages.appspot.com."
client_package_name (str): CIPD package name of CIPD client.
client_version (str): Version of CIPD client.
cache_dir (str): where to keep cache of cipd clients, packages and tags.
cas_dir (str): where to download cas client.
"""
assert cache_dir
start = time.time()
cache_dir = os.path.abspath(cache_dir)
cipd_cache_dir = os.path.join(cache_dir, 'cache') # tag and instance caches
run_dir = os.path.abspath(run_dir)
packages = packages or []
get_client_start = time.time()
client_manager = cipd.get_client(cache_dir, service_url, client_package_name,
client_version)
with client_manager as client:
logging_utils.user_logs('Installed CIPD client')
get_client_duration = time.time() - get_client_start
package_pins = []
if packages:
package_pins = _install_packages(run_dir, cipd_cache_dir, client,
packages)
logging_utils.user_logs('Installed task packages')
# Install cas client to |cas_dir|.
_install_packages(cas_dir,
cipd_cache_dir,
client, [('', _CAS_PACKAGE, _LUCI_GO_REVISION)],
timeout=10 * 60)
logging_utils.user_logs('Installed CAS client')
file_path.make_tree_files_read_only(run_dir)
total_duration = time.time() - start
logging_utils.user_logs(
'Installing CIPD client and packages took %d seconds', total_duration)
yield CipdInfo(
client=client,
cache_dir=cipd_cache_dir,
stats={
'duration': total_duration,
'get_client_duration': get_client_duration,
},
pins={
'client_package': {
'package_name': client.package_name,
'version': client.instance_id,
},
'packages': package_pins,
})
def create_option_parser():
parser = logging_utils.OptionParserWithLogging(
usage='%prog <options> [command to run or extra args]',
version=__version__,
log_file=RUN_ISOLATED_LOG_FILE)
parser.add_option(
'--clean',
action='store_true',
help='Cleans the cache, trimming it necessary and remove corrupted items '
'and returns without executing anything; use with -v to know what '
'was done')
parser.add_option(
'--json',
help='dump output metadata to json file. When used, run_isolated returns '
'non-zero only on internal failure')
parser.add_option(
'--hard-timeout', type='float', help='Enforce hard timeout in execution')
parser.add_option(
'--grace-period',
type='float',
help='Grace period between SIGTERM and SIGKILL')
parser.add_option(
'--relative-cwd',
help='Ignore the isolated \'relative_cwd\' and use this one instead')
parser.add_option(
'--env',
default=[],
action='append',
help='Environment variables to set for the child process')
parser.add_option(
'--env-prefix',
default=[],
action='append',
help='Specify a VAR=./path/fragment to put in the environment variable '
'before executing the command. The path fragment must be relative '
'to the isolated run directory, and must not contain a `..` token. '
'The path will be made absolute and prepended to the indicated '
'$VAR using the OS\'s path separator. Multiple items for the same '
'$VAR will be prepended in order.')
parser.add_option(
'--bot-file',
help='Path to a file describing the state of the host. The content is '
'defined by on_before_task() in bot_config.')
parser.add_option(
'--switch-to-account',
help='If given, switches LUCI_CONTEXT to given logical service account '
'(e.g. "task" or "system") before launching the isolated process.')
parser.add_option(
'--output',
action='append',
help='Specifies an output to return. If no outputs are specified, all '
'files located in $(ISOLATED_OUTDIR) will be returned; '
'otherwise, outputs in both $(ISOLATED_OUTDIR) and those '
'specified by --output option (there can be multiple) will be '
'returned. Note that if a file in OUT_DIR has the same path '
'as an --output option, the --output version will be returned.')
parser.add_option(
'-a',
'--argsfile',
# This is actually handled in parse_args; it's included here purely so it
# can make it into the help text.
help='Specify a file containing a JSON array of arguments to this '
'script. If --argsfile is provided, no other argument may be '
'provided on the command line.')
parser.add_option(
'--report-on-exception',
action='store_true',
help='Whether report exception during execution to isolate server. '
'This flag should only be used in swarming bot.')
group = optparse.OptionGroup(parser,
'Data source - Content Addressed Storage')
group.add_option(
'--cas-instance', help='Full CAS instance name for input/output files.')
group.add_option(
'--cas-digest',
help='Digest of the input root on RBE-CAS. The format is '
'`{hash}/{size_bytes}`.')
parser.add_option_group(group)
# Cache options.
add_cas_cache_options(parser)
cipd.add_cipd_options(parser)
group = optparse.OptionGroup(parser, 'Named caches')
group.add_option(
'--named-cache',
dest='named_caches',
action='append',
nargs=3,
default=[],
help='A named cache to request. Accepts 3 arguments: name, path, hint. '
'name identifies the cache, must match regex [a-z0-9_]{1,4096}. '
'path is a path relative to the run dir where the cache directory '
'must be put to. '
'This option can be specified more than once.')
group.add_option(
'--named-cache-root',
default='named_caches',
help='Cache root directory. Default=%default')
parser.add_option_group(group)
group = optparse.OptionGroup(parser, 'Process containment')
parser.add_option(
'--lower-priority', action='store_true',
help='Lowers the child process priority')
parser.add_option('--containment-type',
choices=('NONE', 'AUTO', 'JOB_OBJECT'),
default='NONE',
help='Type of container to use')
parser.add_option(
'--limit-processes',
type='int',
default=0,
help='Maximum number of active processes in the containment')
parser.add_option(
'--limit-total-committed-memory',
type='int',
default=0,
help='Maximum sum of committed memory in the containment')
parser.add_option_group(group)
group = optparse.OptionGroup(parser, 'Debugging')
group.add_option(
'--leak-temp-dir',
action='store_true',
help='Deliberately leak isolate\'s temp dir for later examination. '
'Default: %default')
group.add_option('--root-dir', help='Use a directory instead of a random one')
parser.add_option_group(group)
auth.add_auth_options(parser)
parser.set_defaults(cache='cache')
return parser
def add_cas_cache_options(parser):
group = optparse.OptionGroup(parser, 'CAS cache management')
group.add_option(
'--cas-cache',
metavar='DIR',
default='cas-cache',
help='Directory to keep a local cache of the files. Accelerates download '
'by reusing already downloaded files. Default=%default')
group.add_option(
'--kvs-dir',
default='',
help='CAS cache dir using kvs for small files. Default=%default')
group.add_option(
'--max-cache-size',
type='int',
metavar='NNN',
default=50 * 1024 * 1024 * 1024,
help='Trim if the cache gets larger than this value, default=%default')
group.add_option(
'--min-free-space',
type='int',
metavar='NNN',
default=2 * 1024 * 1024 * 1024,
help='Trim if disk free space becomes lower than this value, '
'default=%default')
parser.add_option_group(group)
def process_cas_cache_options(options):
if options.cas_cache:
policies = local_caching.CachePolicies(
max_cache_size=options.max_cache_size,
min_free_space=options.min_free_space,
# max_items isn't used for CAS cache for now.
max_items=None,
max_age_secs=MAX_AGE_SECS)
return local_caching.DiskContentAddressedCache(os.path.abspath(
options.cas_cache),
policies,
trim=False)
return local_caching.MemoryContentAddressedCache()
def process_named_cache_options(parser, options, time_fn=None):
"""Validates named cache options and returns a CacheManager."""
if options.named_caches and not options.named_cache_root:
parser.error('--named-cache is specified, but --named-cache-root is empty')
for name, path, hint in options.named_caches:
if not CACHE_NAME_RE.match(name):
parser.error(
'cache name %r does not match %r' % (name, CACHE_NAME_RE.pattern))
if not path:
parser.error('cache path cannot be empty')
try:
int(hint)
except ValueError:
parser.error('cache hint must be a number')
if options.named_cache_root:
# Make these configurable later if there is use case but for now it's fairly
# safe values.
# In practice, a fair chunk of bots are already recycled on a daily schedule
# so this code doesn't have any effect to them, unless they are preloaded
# with a really old cache.
policies = local_caching.CachePolicies(
# 1TiB.
max_cache_size=1024 * 1024 * 1024 * 1024,
min_free_space=options.min_free_space,
max_items=50,
max_age_secs=MAX_AGE_SECS)
keep = [name for name, _, _ in options.named_caches]
root_dir = os.path.abspath(options.named_cache_root)
cache = local_caching.NamedCache(root_dir,
policies,
time_fn=time_fn,
keep=keep)
# Touch any named caches we're going to use to minimize thrashing
# between tasks that request some (but not all) of the same named caches.
cache.touch(*[name for name, _, _ in options.named_caches])
return cache
return None
def parse_args(args):
# Create a fake mini-parser just to get out the "-a" command. Note that
# it's not documented here; instead, it's documented in create_option_parser
# even though that parser will never actually get to parse it. This is
# because --argsfile is exclusive with all other options and arguments.
file_argparse = argparse.ArgumentParser(add_help=False)
file_argparse.add_argument('-a', '--argsfile')
(file_args, nonfile_args) = file_argparse.parse_known_args(args)
if file_args.argsfile:
if nonfile_args:
file_argparse.error('Can\'t specify --argsfile with'
'any other arguments (%s)' % nonfile_args)
try:
with open(file_args.argsfile, 'r') as f:
args = json.load(f)
except (IOError, OSError, ValueError) as e:
# We don't need to error out here - "args" is now empty,
# so the call below to parser.parse_args(args) will fail
# and print the full help text.
print('Couldn\'t read arguments: %s' % e, file=sys.stderr)
# Even if we failed to read the args, just call the normal parser now since it
# will print the correct help message.
parser = create_option_parser()
options, args = parser.parse_args(args)
if not isinstance(options.cipd_enabled, (bool, int)):
options.cipd_enabled = distutils.util.strtobool(options.cipd_enabled)
return (parser, options, args)
def _calc_named_cache_hint(named_cache, named_caches):
"""Returns the expected size of the missing named caches."""
present = named_cache.available
size = 0
logging.info('available named cache %s', present)
for name, _, hint in named_caches:
if name not in present:
hint = int(hint)
if hint > 0:
logging.info("named cache hint: %s, %d", name, hint)
size += hint
logging.info("total size of named cache hint: %d", size)
return size
def _clean_cmd(parser, options, caches, root):
"""Cleanup cache dirs/files."""
if options.json:
parser.error('Can\'t use --json with --clean.')
if options.named_caches:
parser.error('Can\t use --named-cache with --clean.')
if options.cas_instance or options.cas_digest:
parser.error('Can\t use --cas-instance, --cas-digest with --clean.')
logging.info("initial free space: %d", file_path.get_free_space(root))
if options.kvs_dir and fs.isdir(options.kvs_dir):
# Remove kvs file if its size exceeds fixed threshold.
kvs_dir = options.kvs_dir
size = file_path.get_recursive_size(kvs_dir)
if size >= _CAS_KVS_CACHE_THRESHOLD:
logging.info("remove kvs dir with size: %d", size)
file_path.rmtree(kvs_dir)
# Trim first, then clean.
local_caching.trim_caches(
caches,
root,
min_free_space=options.min_free_space,
max_age_secs=MAX_AGE_SECS)
logging.info("free space after trim: %d", file_path.get_free_space(root))
for c in caches:
c.cleanup()
logging.info("free space after cleanup: %d", file_path.get_free_space(root))
def main(args):
# Warning: when --argsfile is used, the strings are unicode instances, when
# parsed normally, the strings are str instances.
(parser, options, args) = parse_args(args)
# adds another log level for logs which are directed to standard output
# these logs will be uploaded to cloudstorage
logging_utils.set_user_level_logging()
# Must be logged after parse_args(), which eventually calls
# logging_utils.prepare_logging() which expects no logs before its call.
logging_utils.user_logs('Starting run_isolated script')
SWARMING_SERVER = os.environ.get('SWARMING_SERVER')
SWARMING_TASK_ID = os.environ.get('SWARMING_TASK_ID')
if options.report_on_exception and SWARMING_SERVER:
task_url = None
if SWARMING_TASK_ID:
task_url = '%s/task?id=%s' % (SWARMING_SERVER, SWARMING_TASK_ID)
on_error.report_on_exception_exit(SWARMING_SERVER, source=task_url)
if not file_path.enable_symlink():
logging.warning('Symlink support is not enabled')
named_cache = process_named_cache_options(parser, options)
# hint is 0 if there's no named cache.
hint = _calc_named_cache_hint(named_cache, options.named_caches)
if hint:
# Increase the --min-free-space value by the hint, and recreate the
# NamedCache instance so it gets the updated CachePolicy.
options.min_free_space += hint
named_cache = process_named_cache_options(parser, options)
# TODO(maruel): CIPD caches should be defined at an higher level here too, so
# they can be cleaned the same way.
cas_cache = process_cas_cache_options(options)
caches = []
if cas_cache:
caches.append(cas_cache)
if named_cache:
caches.append(named_cache)
root = caches[0].cache_dir if caches else os.getcwd()
if options.clean:
_clean_cmd(parser, options, caches, root)
return 0
# Trim must still be done for the following case:
# - named-cache was used
# - some entries, with a large hint, where missing
# - --min-free-space was increased accordingly, thus trimming is needed
# Otherwise, this will have no effect, as bot_main calls run_isolated with
# --clean after each task.
additional_buffer = _FREE_SPACE_BUFFER_FOR_CIPD_PACKAGES
if options.kvs_dir:
additional_buffer += _CAS_KVS_CACHE_THRESHOLD
# Add some buffer for Go CLI.
min_free_space = options.min_free_space + additional_buffer
def trim_caches_fn(stats):
start = time.time()
local_caching.trim_caches(
caches, root, min_free_space=min_free_space, max_age_secs=MAX_AGE_SECS)
duration = time.time() - start
stats['duration'] = duration
logging_utils.user_logs('trim_caches: took %d seconds', duration)
# Save state of cas cache not to overwrite state from go client.
if cas_cache:
cas_cache.save()
cas_cache = None
if not args:
parser.error('command to run is required.')
auth.process_auth_options(parser, options)
if ISOLATED_OUTDIR_PARAMETER in args and not options.cas_instance:
parser.error('%s in args requires --cas-instance' %
ISOLATED_OUTDIR_PARAMETER)
if options.root_dir:
options.root_dir = os.path.abspath(options.root_dir)
else:
options.root_dir = tempfile.mkdtemp(prefix='root')
if options.json:
options.json = os.path.abspath(options.json)
if any('=' not in i for i in options.env):
parser.error(
'--env required key=value form. value can be skipped to delete '
'the variable')
options.env = dict(i.split('=', 1) for i in options.env)
prefixes = {}
cwd = os.path.realpath(os.getcwd())
for item in options.env_prefix:
if '=' not in item:
parser.error(
'--env-prefix %r is malformed, must be in the form `VAR=./path`'
% item)
key, opath = item.split('=', 1)
if os.path.isabs(opath):
parser.error('--env-prefix %r path is bad, must be relative.' % opath)
opath = os.path.normpath(opath)
if not os.path.realpath(os.path.join(cwd, opath)).startswith(cwd):
parser.error(
'--env-prefix %r path is bad, must be relative and not contain `..`.'
% opath)
prefixes.setdefault(key, []).append(opath)
options.env_prefix = prefixes
cipd.validate_cipd_options(parser, options)
install_packages_fn = copy_local_packages
tmp_cipd_cache_dir = None
if options.cipd_enabled:
cache_dir = options.cipd_cache
if not cache_dir:
tmp_cipd_cache_dir = tempfile.mkdtemp()
cache_dir = tmp_cipd_cache_dir
install_packages_fn = (lambda run_dir, cas_dir: install_client_and_packages(
run_dir,
cipd.parse_package_args(options.cipd_packages),
options.cipd_server,
options.cipd_client_package,
options.cipd_client_version,
cache_dir=cache_dir,
cas_dir=cas_dir))
@contextlib.contextmanager
def install_named_caches(run_dir, stats):
# WARNING: this function depends on "options" variable defined in the outer
# function.
assert str(run_dir), repr(run_dir)
assert os.path.isabs(run_dir), run_dir
named_caches = [(os.path.join(run_dir, str(relpath)), name)
for name, relpath, _ in options.named_caches]
install_start = time.time()
for path, name in named_caches:
named_cache.install(path, name)
install_duration = time.time() - install_start
stats['install']['duration'] = install_duration
logging.info('named_caches: install took %d seconds', install_duration)
try:
yield
finally:
# Uninstall each named cache, returning it to the cache pool. If an
# uninstall fails for a given cache, it will remain in the task's
# temporary space, get cleaned up by the Swarming bot, and be lost.
#
# If the Swarming bot cannot clean up the cache, it will handle it like
# any other bot file that could not be removed.
uninstall_start = time.time()
for path, name in reversed(named_caches):
try:
# uninstall() doesn't trim but does call save() implicitly. Trimming
# *must* be done manually via periodic 'run_isolated.py --clean'.
named_cache.uninstall(path, name)
except local_caching.NamedCacheError:
if sys.platform == 'win32':
# Show running processes.
sys.stderr.write("running process\n")
subprocess42.check_call(['tasklist.exe', '/V'], stdout=sys.stderr)
error = (
'Error while removing named cache %r at %r. The cache will be'
' lost.' % (path, name))
logging.exception(error)
on_error.report(error)
uninstall_duration = time.time() - uninstall_start
stats['uninstall']['duration'] = uninstall_duration
logging.info('named_caches: uninstall took %d seconds',
uninstall_duration)
command = args
if options.relative_cwd:
a = os.path.normpath(os.path.abspath(options.relative_cwd))
if not a.startswith(os.getcwd()):
parser.error(
'--relative-cwd must not try to escape the working directory')
containment_type = subprocess42.Containment.NONE
if options.containment_type == 'AUTO':
containment_type = subprocess42.Containment.AUTO
if options.containment_type == 'JOB_OBJECT':
containment_type = subprocess42.Containment.JOB_OBJECT
containment = subprocess42.Containment(
containment_type=containment_type,
limit_processes=options.limit_processes,
limit_total_committed_memory=options.limit_total_committed_memory)
data = TaskData(command=command,
relative_cwd=options.relative_cwd,
cas_instance=options.cas_instance,
cas_digest=options.cas_digest,
outputs=options.output,
install_named_caches=install_named_caches,
leak_temp_dir=options.leak_temp_dir,
root_dir=options.root_dir,
hard_timeout=options.hard_timeout,
grace_period=options.grace_period,
bot_file=options.bot_file,
switch_to_account=options.switch_to_account,
install_packages_fn=install_packages_fn,
cas_cache_dir=options.cas_cache,
cas_cache_policies=local_caching.CachePolicies(
max_cache_size=options.max_cache_size,
min_free_space=options.min_free_space,
max_items=None,
max_age_secs=None,
),
cas_kvs=options.kvs_dir,
env=options.env,
env_prefix=options.env_prefix,
lower_priority=bool(options.lower_priority),
containment=containment,
trim_caches_fn=trim_caches_fn)
try:
return run_tha_test(data, options.json)
except (cipd.Error, local_caching.NamedCacheError,
local_caching.NoMoreSpace) as ex:
print(ex.message, file=sys.stderr)
on_error.report(None)
return 1
finally:
if tmp_cipd_cache_dir is not None:
try:
file_path.rmtree(tmp_cipd_cache_dir)
except OSError:
logging.exception('Remove tmp_cipd_cache_dir=%s failed',
tmp_cipd_cache_dir)
# Best effort clean up. Failed to do so doesn't affect the outcome.
if __name__ == '__main__':
subprocess42.inhibit_os_error_reporting()
# Ensure that we are always running with the correct encoding.
fix_encoding.fix_encoding()
net.set_user_agent('run_isolated.py/' + __version__)
sys.exit(main(sys.argv[1:]))