| #!/usr/bin/env python |
| # Copyright 2012 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """Runs a command with optional isolated input/output. |
| |
| run_isolated takes cares of setting up a temporary environment, running a |
| command, and tearing it down. |
| |
| It handles downloading and uploading isolated files, mapping CIPD packages and |
| reusing stateful named caches. |
| |
| The isolated files, CIPD packages and named caches are kept as a global LRU |
| cache. |
| |
| Any ${EXECUTABLE_SUFFIX} on the command line or the environment variables passed |
| with the --env option will be replaced with ".exe" string on Windows and "" on |
| other platforms. |
| |
| Any ${ISOLATED_OUTDIR} on the command line or the environment variables passed |
| with the --env option will be replaced by the location of a temporary directory |
| upon execution of the command specified in the .isolated file. All content |
| written to this directory will be uploaded upon termination and the .isolated |
| file describing this directory will be printed to stdout. |
| |
| Any ${SWARMING_BOT_FILE} on the command line or the environment variables passed |
| with the --env option will be replaced by the value of the --bot-file parameter. |
| This file is used by a swarming bot to communicate state of the host to tasks. |
| It is written to by the swarming bot's on_before_task() hook in the swarming |
| server's custom bot_config.py. |
| |
| See |
| https://chromium.googlesource.com/infra/luci/luci-py.git/+/master/appengine/swarming/doc/Magic-Values.md |
| for all the variables. |
| |
| See |
| https://chromium.googlesource.com/infra/luci/luci-py.git/+/master/appengine/swarming/swarming_bot/config/bot_config.py |
| for more information about bot_config.py. |
| """ |
| |
| from __future__ import print_function |
| |
| __version__ = '1.0.1' |
| |
| import argparse |
| import base64 |
| import collections |
| import contextlib |
| import distutils |
| import errno |
| import json |
| import logging |
| import optparse |
| import os |
| import platform |
| import re |
| import sys |
| import tempfile |
| import time |
| |
| from utils import tools |
| tools.force_local_third_party() |
| |
| # third_party/ |
| from depot_tools import fix_encoding |
| import six |
| |
| # pylint: disable=ungrouped-imports |
| import auth |
| import cipd |
| import isolate_storage |
| import isolateserver |
| import local_caching |
| from libs import luci_context |
| from utils import file_path |
| from utils import fs |
| from utils import large |
| from utils import logging_utils |
| from utils import net |
| from utils import on_error |
| from utils import subprocess42 |
| |
| |
| # Magic variables that can be found in the isolate task command line. |
| ISOLATED_OUTDIR_PARAMETER = '${ISOLATED_OUTDIR}' |
| EXECUTABLE_SUFFIX_PARAMETER = '${EXECUTABLE_SUFFIX}' |
| SWARMING_BOT_FILE_PARAMETER = '${SWARMING_BOT_FILE}' |
| |
| |
| # The name of the log file to use. |
| RUN_ISOLATED_LOG_FILE = 'run_isolated.log' |
| |
| |
| # The name of the log to use for the run_test_cases.py command |
| RUN_TEST_CASES_LOG = 'run_test_cases.log' |
| |
| |
| # Use short names for temporary directories. This is driven by Windows, which |
| # imposes a relatively short maximum path length of 260 characters, often |
| # referred to as MAX_PATH. It is relatively easy to create files with longer |
| # path length. A use case is with recursive dependency trees like npm packages. |
| # |
| # It is recommended to start the script with a `root_dir` as short as |
| # possible. |
| # - ir stands for isolated_run |
| # - io stands for isolated_out |
| # - it stands for isolated_tmp |
| # - ic stands for isolated_client |
| ISOLATED_RUN_DIR = u'ir' |
| ISOLATED_OUT_DIR = u'io' |
| ISOLATED_TMP_DIR = u'it' |
| ISOLATED_CLIENT_DIR = u'ic' |
| |
| # TODO(tikuta): take these parameter from luci-config? |
| # Update tag by `./client/update_isolated.sh`. |
| # Or take revision from |
| # https://ci.chromium.org/p/infra-internal/g/infra-packagers/console |
| ISOLATED_PACKAGE = 'infra/tools/luci/isolated/${platform}' |
| ISOLATED_REVISION = 'git_revision:58b9c29f4c1aedddc1da81481be3a5e08b18234e' |
| |
| # Keep synced with task_request.py |
| CACHE_NAME_RE = re.compile(r'^[a-z0-9_]{1,4096}$') |
| |
| |
| OUTLIVING_ZOMBIE_MSG = """\ |
| *** Swarming tried multiple times to delete the %s directory and failed *** |
| *** Hard failing the task *** |
| |
| Swarming detected that your testing script ran an executable, which may have |
| started a child executable, and the main script returned early, leaving the |
| children executables playing around unguided. |
| |
| You don't want to leave children processes outliving the task on the Swarming |
| bot, do you? The Swarming bot doesn't. |
| |
| How to fix? |
| - For any process that starts children processes, make sure all children |
| processes terminated properly before each parent process exits. This is |
| especially important in very deep process trees. |
| - This must be done properly both in normal successful task and in case of |
| task failure. Cleanup is very important. |
| - The Swarming bot sends a SIGTERM in case of timeout. |
| - You have %s seconds to comply after the signal was sent to the process |
| before the process is forcibly killed. |
| - To achieve not leaking children processes in case of signals on timeout, you |
| MUST handle signals in each executable / python script and propagate them to |
| children processes. |
| - When your test script (python or binary) receives a signal like SIGTERM or |
| CTRL_BREAK_EVENT on Windows), send it to all children processes and wait for |
| them to terminate before quitting. |
| |
| See |
| https://chromium.googlesource.com/infra/luci/luci-py.git/+/master/appengine/swarming/doc/Bot.md#Graceful-termination_aka-the-SIGTERM-and-SIGKILL-dance |
| for more information. |
| |
| *** May the SIGKILL force be with you *** |
| """ |
| |
| |
| # Currently hardcoded. Eventually could be exposed as a flag once there's value. |
| # 3 weeks |
| MAX_AGE_SECS = 21*24*60*60 |
| |
| |
| TaskData = collections.namedtuple( |
| 'TaskData', |
| [ |
| # List of strings; the command line to use, independent of what was |
| # specified in the isolated file. |
| 'command', |
| # Relative directory to start command into. |
| 'relative_cwd', |
| # List of strings; the arguments to add to the command specified in the |
| # isolated file. |
| 'extra_args', |
| # Hash of the .isolated file that must be retrieved to recreate the tree |
| # of files to run the target executable. The command specified in the |
| # .isolated is executed. Mutually exclusive with command argument. |
| 'isolated_hash', |
| # isolateserver.Storage instance to retrieve remote objects. This object |
| # has a reference to an isolateserver.StorageApi, which does the actual |
| # I/O. |
| 'storage', |
| # isolateserver.LocalCache instance to keep from retrieving the same |
| # objects constantly by caching the objects retrieved. Can be on-disk or |
| # in-memory. |
| 'isolate_cache', |
| # List of paths relative to root_dir to put into the output isolated |
| # bundle upon task completion (see link_outputs_to_outdir). |
| 'outputs', |
| # Function (run_dir) => context manager that installs named caches into |
| # |run_dir|. |
| 'install_named_caches', |
| # If True, the temporary directory will be deliberately leaked for later |
| # examination. |
| 'leak_temp_dir', |
| # Path to the directory to use to create the temporary directory. If not |
| # specified, a random temporary directory is created. |
| 'root_dir', |
| # Kills the process if it lasts more than this amount of seconds. |
| 'hard_timeout', |
| # Number of seconds to wait between SIGTERM and SIGKILL. |
| 'grace_period', |
| # Path to a file with bot state, used in place of ${SWARMING_BOT_FILE} |
| # task command line argument. |
| 'bot_file', |
| # Logical account to switch LUCI_CONTEXT into. |
| 'switch_to_account', |
| # Context manager dir => CipdInfo, see install_client_and_packages. |
| 'install_packages_fn', |
| # Use go isolated client. |
| 'use_go_isolated', |
| # Cache directory for go isolated client. |
| 'go_cache_dir', |
| # Parameters passed to go isolated client. |
| 'go_cache_policies', |
| # Environment variables to set. |
| 'env', |
| # Environment variables to mutate with relative directories. |
| # Example: {"ENV_KEY": ['relative', 'paths', 'to', 'prepend']} |
| 'env_prefix', |
| # Lowers the task process priority. |
| 'lower_priority', |
| # subprocess42.Containment instance. Can be None. |
| 'containment', |
| ]) |
| |
| |
| def _to_str(s): |
| """Downgrades a unicode instance to str. Pass str through as-is.""" |
| if isinstance(s, str): |
| return s |
| # This is technically incorrect, especially on Windows. In theory |
| # sys.getfilesystemencoding() should be used to use the right 'ANSI code |
| # page' on Windows, but that causes other problems, as the character set |
| # is very limited. |
| return s.encode('utf-8') |
| |
| |
| def _to_unicode(s): |
| """Upgrades a str instance to unicode. Pass unicode through as-is.""" |
| if isinstance(s, six.text_type) or s is None: |
| return s |
| return s.decode('utf-8') |
| |
| |
| def make_temp_dir(prefix, root_dir): |
| """Returns a new unique temporary directory.""" |
| return six.text_type(tempfile.mkdtemp(prefix=prefix, dir=root_dir)) |
| |
| |
| @contextlib.contextmanager |
| def set_luci_context_account(account, tmp_dir): |
| """Sets LUCI_CONTEXT account to be used by the task. |
| |
| If 'account' is None or '', does nothing at all. This happens when |
| run_isolated.py is called without '--switch-to-account' flag. In this case, |
| if run_isolated.py is running in some LUCI_CONTEXT environment, the task will |
| just inherit whatever account is already set. This may happen if users invoke |
| run_isolated.py explicitly from their code. |
| |
| If the requested account is not defined in the context, switches to |
| non-authenticated access. This happens for Swarming tasks that don't use |
| 'task' service accounts. |
| |
| If not using LUCI_CONTEXT-based auth, does nothing. |
| If already running as requested account, does nothing. |
| """ |
| if not account: |
| # Not actually switching. |
| yield |
| return |
| |
| local_auth = luci_context.read('local_auth') |
| if not local_auth: |
| # Not using LUCI_CONTEXT auth at all. |
| yield |
| return |
| |
| # See LUCI_CONTEXT.md for the format of 'local_auth'. |
| if local_auth.get('default_account_id') == account: |
| # Already set, no need to switch. |
| yield |
| return |
| |
| available = {a['id'] for a in local_auth.get('accounts') or []} |
| if account in available: |
| logging.info('Switching default LUCI_CONTEXT account to %r', account) |
| local_auth['default_account_id'] = account |
| else: |
| logging.warning( |
| 'Requested LUCI_CONTEXT account %r is not available (have only %r), ' |
| 'disabling authentication', account, sorted(available)) |
| local_auth.pop('default_account_id', None) |
| |
| with luci_context.write(_tmpdir=tmp_dir, local_auth=local_auth): |
| yield |
| |
| |
| def process_command(command, out_dir, bot_file): |
| """Replaces parameters in a command line. |
| |
| Raises: |
| ValueError if a parameter is requested in |command| but its value is not |
| provided. |
| """ |
| return [replace_parameters(arg, out_dir, bot_file) for arg in command] |
| |
| |
| def replace_parameters(arg, out_dir, bot_file): |
| """Replaces parameter tokens with appropriate values in a string. |
| |
| Raises: |
| ValueError if a parameter is requested in |arg| but its value is not |
| provided. |
| """ |
| arg = arg.replace(EXECUTABLE_SUFFIX_PARAMETER, cipd.EXECUTABLE_SUFFIX) |
| replace_slash = False |
| if ISOLATED_OUTDIR_PARAMETER in arg: |
| if not out_dir: |
| raise ValueError( |
| 'output directory is requested in command or env var, but not ' |
| 'provided; please specify one') |
| arg = arg.replace(ISOLATED_OUTDIR_PARAMETER, out_dir) |
| replace_slash = True |
| if SWARMING_BOT_FILE_PARAMETER in arg: |
| if bot_file: |
| arg = arg.replace(SWARMING_BOT_FILE_PARAMETER, bot_file) |
| replace_slash = True |
| else: |
| logging.warning('SWARMING_BOT_FILE_PARAMETER found in command or env ' |
| 'var, but no bot_file specified. Leaving parameter ' |
| 'unchanged.') |
| if replace_slash: |
| # Replace slashes only if parameters are present |
| # because of arguments like '${ISOLATED_OUTDIR}/foo/bar' |
| arg = arg.replace('/', os.sep) |
| return arg |
| |
| |
| |
| def get_command_env(tmp_dir, cipd_info, run_dir, env, env_prefixes, out_dir, |
| bot_file): |
| """Returns full OS environment to run a command in. |
| |
| Sets up TEMP, puts directory with cipd binary in front of PATH, exposes |
| CIPD_CACHE_DIR env var, and installs all env_prefixes. |
| |
| Args: |
| tmp_dir: temp directory. |
| cipd_info: CipdInfo object is cipd client is used, None if not. |
| run_dir: The root directory the isolated tree is mapped in. |
| env: environment variables to use |
| env_prefixes: {"ENV_KEY": ['cwd', 'relative', 'paths', 'to', 'prepend']} |
| out_dir: Isolated output directory. Required to be != None if any of the |
| env vars contain ISOLATED_OUTDIR_PARAMETER. |
| bot_file: Required to be != None if any of the env vars contain |
| SWARMING_BOT_FILE_PARAMETER. |
| """ |
| out = os.environ.copy() |
| for k, v in env.items(): |
| if not v: |
| out.pop(k, None) |
| else: |
| out[k] = replace_parameters(v, out_dir, bot_file) |
| |
| if cipd_info: |
| bin_dir = os.path.dirname(cipd_info.client.binary_path) |
| out['PATH'] = '%s%s%s' % (_to_str(bin_dir), os.pathsep, out['PATH']) |
| out['CIPD_CACHE_DIR'] = _to_str(cipd_info.cache_dir) |
| |
| for key, paths in env_prefixes.items(): |
| assert isinstance(paths, list), paths |
| paths = [os.path.normpath(os.path.join(run_dir, p)) for p in paths] |
| cur = out.get(key) |
| if cur: |
| paths.append(cur) |
| out[key] = _to_str(os.path.pathsep.join(paths)) |
| |
| tmp_dir = _to_str(tmp_dir) |
| # pylint: disable=line-too-long |
| # * python respects $TMPDIR, $TEMP, and $TMP in this order, regardless of |
| # platform. So $TMPDIR must be set on all platforms. |
| # https://github.com/python/cpython/blob/2.7/Lib/tempfile.py#L155 |
| out['TMPDIR'] = tmp_dir |
| if sys.platform == 'win32': |
| # * chromium's base utils uses GetTempPath(). |
| # https://cs.chromium.org/chromium/src/base/files/file_util_win.cc?q=GetTempPath |
| # * Go uses GetTempPath(). |
| # * GetTempDir() uses %TMP%, then %TEMP%, then other stuff. So %TMP% must be |
| # set. |
| # https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-gettemppathw |
| out['TMP'] = tmp_dir |
| # https://blogs.msdn.microsoft.com/oldnewthing/20150417-00/?p=44213 |
| out['TEMP'] = tmp_dir |
| elif sys.platform == 'darwin': |
| # * Chromium uses an hack on macOS before calling into |
| # NSTemporaryDirectory(). |
| # https://cs.chromium.org/chromium/src/base/files/file_util_mac.mm?q=GetTempDir |
| # https://developer.apple.com/documentation/foundation/1409211-nstemporarydirectory |
| out['MAC_CHROMIUM_TMPDIR'] = tmp_dir |
| else: |
| # TMPDIR is specified as the POSIX standard envvar for the temp directory. |
| # http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap08.html |
| # * mktemp on linux respects $TMPDIR. |
| # * Chromium respects $TMPDIR on linux. |
| # https://cs.chromium.org/chromium/src/base/files/file_util_posix.cc?q=GetTempDir |
| # * Go uses $TMPDIR. |
| # https://go.googlesource.com/go/+/go1.10.3/src/os/file_unix.go#307 |
| pass |
| return out |
| |
| |
| def run_command( |
| command, cwd, env, hard_timeout, grace_period, lower_priority, containment): |
| """Runs the command. |
| |
| Returns: |
| tuple(process exit code, bool if had a hard timeout) |
| """ |
| logging.info( |
| 'run_command(%s, %s, %s, %s, %s, %s)', |
| command, cwd, hard_timeout, grace_period, lower_priority, containment) |
| |
| exit_code = None |
| had_hard_timeout = False |
| with tools.Profiler('RunTest'): |
| proc = None |
| had_signal = [] |
| try: |
| # TODO(maruel): This code is imperfect. It doesn't handle well signals |
| # during the download phase and there's short windows were things can go |
| # wrong. |
| def handler(signum, _frame): |
| if proc and not had_signal: |
| logging.info('Received signal %d', signum) |
| had_signal.append(True) |
| raise subprocess42.TimeoutExpired(command, None) |
| |
| proc = subprocess42.Popen( |
| command, cwd=cwd, env=env, detached=True, close_fds=True, |
| lower_priority=lower_priority, containment=containment) |
| with subprocess42.set_signal_handler(subprocess42.STOP_SIGNALS, handler): |
| try: |
| exit_code = proc.wait(hard_timeout or None) |
| except subprocess42.TimeoutExpired: |
| if not had_signal: |
| logging.warning('Hard timeout') |
| had_hard_timeout = True |
| logging.warning('Sending SIGTERM') |
| proc.terminate() |
| |
| # Ignore signals in grace period. Forcibly give the grace period to the |
| # child process. |
| if exit_code is None: |
| ignore = lambda *_: None |
| with subprocess42.set_signal_handler(subprocess42.STOP_SIGNALS, ignore): |
| try: |
| exit_code = proc.wait(grace_period or None) |
| except subprocess42.TimeoutExpired: |
| # Now kill for real. The user can distinguish between the |
| # following states: |
| # - signal but process exited within grace period, |
| # hard_timed_out will be set but the process exit code will be |
| # script provided. |
| # - processed exited late, exit code will be -9 on posix. |
| logging.warning('Grace exhausted; sending SIGKILL') |
| proc.kill() |
| logging.info('Waiting for process exit') |
| exit_code = proc.wait() |
| except OSError as e: |
| # This is not considered to be an internal error. The executable simply |
| # does not exit. |
| sys.stderr.write( |
| '<The executable does not exist, a dependent library is missing or ' |
| 'the command line is too long>\n' |
| '<Check for missing .so/.dll in the .isolate or GN file or length of ' |
| 'command line args>\n' |
| '<Command: %s, Exception: %s>\n' % (command, e)) |
| if os.environ.get('SWARMING_TASK_ID'): |
| # Give an additional hint when running as a swarming task. |
| sys.stderr.write( |
| '<See the task\'s page for commands to help diagnose this issue ' |
| 'by reproducing the task locally>\n') |
| exit_code = 1 |
| logging.info( |
| 'Command finished with exit code %d (%s)', |
| exit_code, hex(0xffffffff & exit_code)) |
| return exit_code, had_hard_timeout |
| |
| |
| def _fetch_and_map_with_go(isolated_hash, storage, outdir, go_cache_dir, |
| policies, isolated_client): |
| """ |
| Fetches an isolated tree using go client, create the tree and returns |
| (bundle, stats). |
| """ |
| start = time.time() |
| server_ref = storage.server_ref |
| result_json_handle, result_json_path = tempfile.mkstemp( |
| prefix=u'fetch-and-map-result-', suffix=u'.json') |
| os.close(result_json_handle) |
| try: |
| cmd = [ |
| isolated_client, |
| 'download', |
| '-isolate-server', |
| server_ref.url, |
| '-namespace', |
| server_ref.namespace, |
| '-isolated', |
| isolated_hash, |
| |
| # flags for cache |
| '-cache-dir', |
| go_cache_dir, |
| '-cache-max-items', |
| str(policies.max_items), |
| '-cache-max-size', |
| str(policies.max_cache_size), |
| '-cache-min-free-space', |
| str(policies.min_free_space), |
| |
| # flags for output |
| '-output-dir', |
| outdir, |
| '-fetch-and-map-result-json', |
| result_json_path, |
| ] |
| proc = subprocess42.Popen(cmd) |
| cmd_str = ' '.join(cmd) |
| |
| exceeded_max_timeout = True |
| check_period_sec = 30 |
| max_checks = 100 |
| # max timeout = max_checks * check_period_sec = 50 minutes |
| for i in range(max_checks): |
| # This is to prevent I/O timeout error during isolated setup. |
| try: |
| retcode = proc.wait(check_period_sec) |
| if retcode != 0: |
| raise ValueError("retcode is not 0: %s (cmd=%s)" % (retcode, cmd_str)) |
| exceeded_max_timeout = False |
| break |
| except subprocess42.TimeoutExpired: |
| print('still running isolated (after %d seconds)' % ( |
| (i + 1) * check_period_sec)) |
| |
| if exceeded_max_timeout: |
| proc.terminate() |
| try: |
| proc.wait(check_period_sec) |
| except subprocess42.TimeoutExpired: |
| logging.exception( |
| "failed to terminate? timeout happened after %d seconds", |
| check_period_sec) |
| proc.kill() |
| proc.wait() |
| # Raise unconditionally, because |proc| was forcefully terminated. |
| raise ValueError("timedout after %d seconds (cmd=%s)" % |
| (check_period_sec * max_checks, cmd_str)) |
| |
| with open(result_json_path) as json_file: |
| result_json = json.load(json_file) |
| |
| isolated = result_json['isolated'] |
| bundle = isolateserver.IsolatedBundle(filter_cb=None) |
| # Only following properties are used in caller. |
| bundle.command = isolated.get('command') |
| bundle.relative_cwd = isolated.get('relative_cwd') |
| |
| return bundle, { |
| 'duration': time.time() - start, |
| 'items_cold': result_json['items_cold'], |
| 'items_hot': result_json['items_hot'], |
| } |
| finally: |
| fs.remove(result_json_path) |
| |
| |
| # TODO(crbug.com/932396): remove this function. |
| def fetch_and_map(isolated_hash, storage, cache, outdir): |
| """Fetches an isolated tree, create the tree and returns (bundle, stats).""" |
| start = time.time() |
| bundle = isolateserver.fetch_isolated( |
| isolated_hash=isolated_hash, |
| storage=storage, |
| cache=cache, |
| outdir=outdir, |
| use_symlinks=False) |
| hot = (collections.Counter(cache.used) - |
| collections.Counter(cache.added)).elements() |
| return bundle, { |
| 'duration': time.time() - start, |
| 'items_cold': base64.b64encode(large.pack(sorted(cache.added))).decode(), |
| 'items_hot': base64.b64encode(large.pack(sorted(hot))).decode(), |
| } |
| |
| |
| def link_outputs_to_outdir(run_dir, out_dir, outputs): |
| """Links any named outputs to out_dir so they can be uploaded. |
| |
| Raises an error if the file already exists in that directory. |
| """ |
| if not outputs: |
| return |
| isolateserver.create_directories(out_dir, outputs) |
| for o in outputs: |
| copy_recursively(os.path.join(run_dir, o), os.path.join(out_dir, o)) |
| |
| |
| def copy_recursively(src, dst): |
| """Efficiently copies a file or directory from src_dir to dst_dir. |
| |
| `item` may be a file, directory, or a symlink to a file or directory. |
| All symlinks are replaced with their targets, so the resulting |
| directory structure in dst_dir will never have any symlinks. |
| |
| To increase speed, copy_recursively hardlinks individual files into the |
| (newly created) directory structure if possible, unlike Python's |
| shutil.copytree(). |
| """ |
| orig_src = src |
| try: |
| # Replace symlinks with their final target. |
| while fs.islink(src): |
| res = fs.readlink(src) |
| src = os.path.join(os.path.dirname(src), res) |
| # TODO(sadafm): Explicitly handle cyclic symlinks. |
| |
| # Note that fs.isfile (which is a wrapper around os.path.isfile) throws |
| # an exception if src does not exist. A warning will be logged in that case. |
| if fs.isfile(src): |
| file_path.link_file(dst, src, file_path.HARDLINK_WITH_FALLBACK) |
| return |
| |
| if not fs.exists(dst): |
| os.makedirs(dst) |
| |
| for child in fs.listdir(src): |
| copy_recursively(os.path.join(src, child), os.path.join(dst, child)) |
| |
| except OSError as e: |
| if e.errno == errno.ENOENT: |
| logging.warning('Path %s does not exist or %s is a broken symlink', |
| src, orig_src) |
| else: |
| logging.info("Couldn't collect output file %s: %s", src, e) |
| |
| |
| def upload_then_delete(storage, out_dir, leak_temp_dir): |
| """Deletes the temporary run directory and uploads results back. |
| |
| Returns: |
| tuple(outputs_ref, success, stats) |
| - outputs_ref: a dict referring to the results archived back to the isolated |
| server, if applicable. |
| - success: False if something occurred that means that the task must |
| forcibly be considered a failure, e.g. zombie processes were left |
| behind. |
| - stats: uploading stats. |
| """ |
| # Upload out_dir and generate a .isolated file out of this directory. It is |
| # only done if files were written in the directory. |
| outputs_ref = None |
| cold = [] |
| hot = [] |
| start = time.time() |
| |
| if fs.isdir(out_dir) and fs.listdir(out_dir): |
| with tools.Profiler('ArchiveOutput'): |
| try: |
| results, f_cold, f_hot = isolateserver.archive_files_to_storage( |
| storage, [out_dir], None, verify_push=True) |
| outputs_ref = { |
| 'isolated': list(results.values())[0], |
| 'isolatedserver': storage.server_ref.url, |
| 'namespace': storage.server_ref.namespace, |
| } |
| cold = sorted(i.size for i in f_cold) |
| hot = sorted(i.size for i in f_hot) |
| except isolateserver.Aborted: |
| # This happens when a signal SIGTERM was received while uploading data. |
| # There is 2 causes: |
| # - The task was too slow and was about to be killed anyway due to |
| # exceeding the hard timeout. |
| # - The amount of data uploaded back is very large and took too much |
| # time to archive. |
| sys.stderr.write('Received SIGTERM while uploading') |
| # Re-raise, so it will be treated as an internal failure. |
| raise |
| |
| success = False |
| try: |
| if (not leak_temp_dir and fs.isdir(out_dir) and |
| not file_path.rmtree(out_dir)): |
| logging.error('Had difficulties removing out_dir %s', out_dir) |
| else: |
| success = True |
| except OSError as e: |
| # When this happens, it means there's a process error. |
| logging.exception('Had difficulties removing out_dir %s: %s', out_dir, e) |
| stats = { |
| 'duration': time.time() - start, |
| 'items_cold': base64.b64encode(large.pack(cold)).decode(), |
| 'items_hot': base64.b64encode(large.pack(hot)).decode(), |
| } |
| return outputs_ref, success, stats |
| |
| |
| def map_and_run(data, constant_run_path): |
| """Runs a command with optional isolated input/output. |
| |
| Arguments: |
| - data: TaskData instance. |
| - constant_run_path: TODO |
| |
| Returns metadata about the result. |
| """ |
| |
| if data.isolate_cache: |
| download_stats = { |
| #'duration': 0., |
| 'initial_number_items': len(data.isolate_cache), |
| 'initial_size': data.isolate_cache.total_size, |
| #'items_cold': '<large.pack()>', |
| #'items_hot': '<large.pack()>', |
| } |
| else: |
| # TODO(tikuta): take stats from state.json in this case too. |
| download_stats = {} |
| |
| result = { |
| 'duration': None, |
| 'exit_code': None, |
| 'had_hard_timeout': False, |
| 'internal_failure': 'run_isolated did not complete properly', |
| 'stats': { |
| #'cipd': { |
| # 'duration': 0., |
| # 'get_client_duration': 0., |
| #}, |
| 'isolated': { |
| 'download': download_stats, |
| #'upload': { |
| # 'duration': 0., |
| # 'items_cold': '<large.pack()>', |
| # 'items_hot': '<large.pack()>', |
| #}, |
| }, |
| }, |
| #'cipd_pins': { |
| # 'packages': [ |
| # {'package_name': ..., 'version': ..., 'path': ...}, |
| # ... |
| # ], |
| # 'client_package': {'package_name': ..., 'version': ...}, |
| #}, |
| 'outputs_ref': None, |
| 'version': 5, |
| } |
| |
| if data.root_dir: |
| file_path.ensure_tree(data.root_dir, 0o700) |
| elif data.use_go_isolated: |
| data = data._replace(root_dir=os.path.dirname(data.go_cache_dir)) |
| elif data.isolate_cache.cache_dir: |
| data = data._replace( |
| root_dir=os.path.dirname(data.isolate_cache.cache_dir)) |
| # See comment for these constants. |
| # If root_dir is not specified, it is not constant. |
| # TODO(maruel): This is not obvious. Change this to become an error once we |
| # make the constant_run_path an exposed flag. |
| if constant_run_path and data.root_dir: |
| run_dir = os.path.join(data.root_dir, ISOLATED_RUN_DIR) |
| if os.path.isdir(run_dir): |
| file_path.rmtree(run_dir) |
| os.mkdir(run_dir, 0o700) |
| else: |
| run_dir = make_temp_dir(ISOLATED_RUN_DIR, data.root_dir) |
| # storage should be normally set but don't crash if it is not. This can happen |
| # as Swarming task can run without an isolate server. |
| out_dir = make_temp_dir( |
| ISOLATED_OUT_DIR, data.root_dir) if data.storage else None |
| tmp_dir = make_temp_dir(ISOLATED_TMP_DIR, data.root_dir) |
| isolated_client_dir = make_temp_dir(ISOLATED_CLIENT_DIR, data.root_dir) |
| cwd = run_dir |
| if data.relative_cwd: |
| cwd = os.path.normpath(os.path.join(cwd, data.relative_cwd)) |
| command = data.command |
| try: |
| with data.install_packages_fn(run_dir, isolated_client_dir) as cipd_info: |
| if cipd_info: |
| result['stats']['cipd'] = cipd_info.stats |
| result['cipd_pins'] = cipd_info.pins |
| |
| if data.isolated_hash: |
| isolated_stats = result['stats'].setdefault('isolated', {}) |
| if data.use_go_isolated: |
| bundle, stats = _fetch_and_map_with_go( |
| isolated_hash=data.isolated_hash, |
| storage=data.storage, |
| outdir=run_dir, |
| go_cache_dir=data.go_cache_dir, |
| policies=data.go_cache_policies, |
| isolated_client=os.path.join(isolated_client_dir, |
| 'isolated' + cipd.EXECUTABLE_SUFFIX)) |
| else: |
| bundle, stats = fetch_and_map( |
| isolated_hash=data.isolated_hash, |
| storage=data.storage, |
| cache=data.isolate_cache, |
| outdir=run_dir) |
| isolated_stats['download'].update(stats) |
| |
| # Inject the command |
| if not command and bundle.command: |
| command = bundle.command + data.extra_args |
| # Only set the relative directory if the isolated file specified a |
| # command, and no raw command was specified. |
| if bundle.relative_cwd: |
| cwd = os.path.normpath(os.path.join(cwd, bundle.relative_cwd)) |
| |
| if not command: |
| # Handle this as a task failure, not an internal failure. |
| sys.stderr.write( |
| '<No command was specified!>\n' |
| '<Please secify a command when triggering your Swarming task>\n') |
| result['exit_code'] = 1 |
| return result |
| |
| if not cwd.startswith(run_dir): |
| # Handle this as a task failure, not an internal failure. This is a |
| # 'last chance' way to gate against directory escape. |
| sys.stderr.write('<Relative CWD is outside of run directory!>\n') |
| result['exit_code'] = 1 |
| return result |
| |
| if not os.path.isdir(cwd): |
| # Accepts relative_cwd that does not exist. |
| os.makedirs(cwd, 0o700) |
| |
| # If we have an explicit list of files to return, make sure their |
| # directories exist now. |
| if data.storage and data.outputs: |
| isolateserver.create_directories(run_dir, data.outputs) |
| |
| with data.install_named_caches(run_dir): |
| sys.stdout.flush() |
| start = time.time() |
| try: |
| # Need to switch the default account before 'get_command_env' call, |
| # so it can grab correct value of LUCI_CONTEXT env var. |
| with set_luci_context_account(data.switch_to_account, tmp_dir): |
| env = get_command_env( |
| tmp_dir, cipd_info, run_dir, data.env, data.env_prefix, out_dir, |
| data.bot_file) |
| command = tools.find_executable(command, env) |
| command = process_command(command, out_dir, data.bot_file) |
| file_path.ensure_command_has_abs_path(command, cwd) |
| |
| result['exit_code'], result['had_hard_timeout'] = run_command( |
| command, cwd, env, data.hard_timeout, data.grace_period, |
| data.lower_priority, data.containment) |
| finally: |
| result['duration'] = max(time.time() - start, 0) |
| |
| # We successfully ran the command, set internal_failure back to |
| # None (even if the command failed, it's not an internal error). |
| result['internal_failure'] = None |
| except Exception as e: |
| # An internal error occurred. Report accordingly so the swarming task will |
| # be retried automatically. |
| logging.exception('internal failure: %s', e) |
| result['internal_failure'] = str(e) |
| on_error.report(None) |
| |
| # Clean up |
| finally: |
| try: |
| # Try to link files to the output directory, if specified. |
| if out_dir: |
| link_outputs_to_outdir(run_dir, out_dir, data.outputs) |
| |
| success = False |
| if data.leak_temp_dir: |
| success = True |
| logging.warning( |
| 'Deliberately leaking %s for later examination', run_dir) |
| else: |
| # On Windows rmtree(run_dir) call above has a synchronization effect: it |
| # finishes only when all task child processes terminate (since a running |
| # process locks *.exe file). Examine out_dir only after that call |
| # completes (since child processes may write to out_dir too and we need |
| # to wait for them to finish). |
| for directory in (run_dir, tmp_dir, isolated_client_dir): |
| if not fs.isdir(directory): |
| continue |
| try: |
| success = file_path.rmtree(directory) |
| except OSError as e: |
| logging.error('rmtree(%r) failed: %s', directory, e) |
| success = False |
| if not success: |
| sys.stderr.write( |
| OUTLIVING_ZOMBIE_MSG % (directory, data.grace_period)) |
| if result['exit_code'] == 0: |
| result['exit_code'] = 1 |
| |
| # This deletes out_dir if leak_temp_dir is not set. |
| if out_dir: |
| isolated_stats = result['stats'].setdefault('isolated', {}) |
| result['outputs_ref'], success, isolated_stats['upload'] = ( |
| upload_then_delete(data.storage, out_dir, data.leak_temp_dir)) |
| if not success and result['exit_code'] == 0: |
| result['exit_code'] = 1 |
| except Exception as e: |
| # Swallow any exception in the main finally clause. |
| if out_dir: |
| logging.exception('Leaking out_dir %s: %s', out_dir, e) |
| result['internal_failure'] = str(e) |
| on_error.report(None) |
| return result |
| |
| |
| def run_tha_test(data, result_json): |
| """Runs an executable and records execution metadata. |
| |
| If isolated_hash is specified, downloads the dependencies in the cache, |
| hardlinks them into a temporary directory and runs the command specified in |
| the .isolated. |
| |
| A temporary directory is created to hold the output files. The content inside |
| this directory will be uploaded back to |storage| packaged as a .isolated |
| file. |
| |
| Arguments: |
| - data: TaskData instance. |
| - result_json: File path to dump result metadata into. If set, the process |
| exit code is always 0 unless an internal error occurred. |
| |
| Returns: |
| Process exit code that should be used. |
| """ |
| if result_json: |
| # Write a json output file right away in case we get killed. |
| result = { |
| 'exit_code': None, |
| 'had_hard_timeout': False, |
| 'internal_failure': 'Was terminated before completion', |
| 'outputs_ref': None, |
| 'version': 5, |
| } |
| tools.write_json(result_json, result, dense=True) |
| |
| # run_isolated exit code. Depends on if result_json is used or not. |
| result = map_and_run(data, True) |
| logging.info('Result:\n%s', tools.format_json(result, dense=True)) |
| |
| if result_json: |
| # We've found tests to delete 'work' when quitting, causing an exception |
| # here. Try to recreate the directory if necessary. |
| file_path.ensure_tree(os.path.dirname(result_json)) |
| tools.write_json(result_json, result, dense=True) |
| # Only return 1 if there was an internal error. |
| return int(bool(result['internal_failure'])) |
| |
| # Marshall into old-style inline output. |
| if result['outputs_ref']: |
| # pylint: disable=unsubscriptable-object |
| data = { |
| 'hash': result['outputs_ref']['isolated'], |
| 'namespace': result['outputs_ref']['namespace'], |
| 'storage': result['outputs_ref']['isolatedserver'], |
| } |
| sys.stdout.flush() |
| print('[run_isolated_out_hack]%s[/run_isolated_out_hack]' % |
| tools.format_json(data, dense=True)) |
| sys.stdout.flush() |
| return result['exit_code'] or int(bool(result['internal_failure'])) |
| |
| |
| # Yielded by 'install_client_and_packages'. |
| CipdInfo = collections.namedtuple('CipdInfo', [ |
| 'client', # cipd.CipdClient object |
| 'cache_dir', # absolute path to bot-global cipd tag and instance cache |
| 'stats', # dict with stats to return to the server |
| 'pins', # dict with installed cipd pins to return to the server |
| ]) |
| |
| |
| @contextlib.contextmanager |
| def noop_install_packages(_run_dir, _isolated_dir): |
| """Placeholder for 'install_client_and_packages' if cipd is disabled.""" |
| yield None |
| |
| |
| def _install_packages(run_dir, cipd_cache_dir, client, packages): |
| """Calls 'cipd ensure' for packages. |
| |
| Args: |
| run_dir (str): root of installation. |
| cipd_cache_dir (str): the directory to use for the cipd package cache. |
| client (CipdClient): the cipd client to use |
| packages: packages to install, list [(path, package_name, version), ...]. |
| |
| Returns: list of pinned packages. Looks like [ |
| { |
| 'path': 'subdirectory', |
| 'package_name': 'resolved/package/name', |
| 'version': 'deadbeef...', |
| }, |
| ... |
| ] |
| """ |
| package_pins = [None]*len(packages) |
| def insert_pin(path, name, version, idx): |
| package_pins[idx] = { |
| 'package_name': name, |
| # swarming deals with 'root' as '.' |
| 'path': path or '.', |
| 'version': version, |
| } |
| |
| by_path = collections.defaultdict(list) |
| for i, (path, name, version) in enumerate(packages): |
| # cipd deals with 'root' as '' |
| if path == '.': |
| path = '' |
| by_path[path].append((name, version, i)) |
| |
| pins = client.ensure( |
| run_dir, |
| { |
| subdir: [(name, vers) for name, vers, _ in pkgs |
| ] for subdir, pkgs in by_path.items() |
| }, |
| cache_dir=cipd_cache_dir, |
| ) |
| |
| for subdir, pin_list in sorted(pins.items()): |
| this_subdir = by_path[subdir] |
| for i, (name, version) in enumerate(pin_list): |
| insert_pin(subdir, name, version, this_subdir[i][2]) |
| |
| assert None not in package_pins, (packages, pins, package_pins) |
| |
| return package_pins |
| |
| |
| @contextlib.contextmanager |
| def install_client_and_packages(run_dir, packages, service_url, |
| client_package_name, client_version, cache_dir, |
| isolated_dir): |
| """Bootstraps CIPD client and installs CIPD packages. |
| |
| Yields CipdClient, stats, client info and pins (as single CipdInfo object). |
| |
| Pins and the CIPD client info are in the form of: |
| [ |
| { |
| "path": path, "package_name": package_name, "version": version, |
| }, |
| ... |
| ] |
| (the CIPD client info is a single dictionary instead of a list) |
| |
| such that they correspond 1:1 to all input package arguments from the command |
| line. These dictionaries make their all the way back to swarming, where they |
| become the arguments of CipdPackage. |
| |
| If 'packages' list is empty, will bootstrap CIPD client, but won't install |
| any packages. |
| |
| The bootstrapped client (regardless whether 'packages' list is empty or not), |
| will be made available to the task via $PATH. |
| |
| Args: |
| run_dir (str): root of installation. |
| packages: packages to install, list [(path, package_name, version), ...]. |
| service_url (str): CIPD server url, e.g. |
| "https://chrome-infra-packages.appspot.com." |
| client_package_name (str): CIPD package name of CIPD client. |
| client_version (str): Version of CIPD client. |
| cache_dir (str): where to keep cache of cipd clients, packages and tags. |
| isolated_dir (str): where to download isolated client. |
| """ |
| assert cache_dir |
| |
| start = time.time() |
| |
| cache_dir = os.path.abspath(cache_dir) |
| cipd_cache_dir = os.path.join(cache_dir, 'cache') # tag and instance caches |
| run_dir = os.path.abspath(run_dir) |
| packages = packages or [] |
| |
| get_client_start = time.time() |
| client_manager = cipd.get_client(service_url, client_package_name, |
| client_version, cache_dir) |
| |
| with client_manager as client: |
| get_client_duration = time.time() - get_client_start |
| |
| package_pins = [] |
| if packages: |
| package_pins = _install_packages(run_dir, cipd_cache_dir, client, |
| packages) |
| |
| # Install isolated client to |isolated_dir|. |
| _install_packages(isolated_dir, cipd_cache_dir, client, |
| [('', ISOLATED_PACKAGE, ISOLATED_REVISION)]) |
| |
| file_path.make_tree_files_read_only(run_dir) |
| |
| total_duration = time.time() - start |
| logging.info('Installing CIPD client and packages took %d seconds', |
| total_duration) |
| |
| yield CipdInfo( |
| client=client, |
| cache_dir=cipd_cache_dir, |
| stats={ |
| 'duration': total_duration, |
| 'get_client_duration': get_client_duration, |
| }, |
| pins={ |
| 'client_package': { |
| 'package_name': client.package_name, |
| 'version': client.instance_id, |
| }, |
| 'packages': package_pins, |
| }) |
| |
| |
| def create_option_parser(): |
| parser = logging_utils.OptionParserWithLogging( |
| usage='%prog <options> [command to run or extra args]', |
| version=__version__, |
| log_file=RUN_ISOLATED_LOG_FILE) |
| parser.add_option( |
| '--clean', |
| action='store_true', |
| help='Cleans the cache, trimming it necessary and remove corrupted items ' |
| 'and returns without executing anything; use with -v to know what ' |
| 'was done') |
| parser.add_option( |
| '--json', |
| help='dump output metadata to json file. When used, run_isolated returns ' |
| 'non-zero only on internal failure') |
| parser.add_option( |
| '--hard-timeout', type='float', help='Enforce hard timeout in execution') |
| parser.add_option( |
| '--grace-period', |
| type='float', |
| help='Grace period between SIGTERM and SIGKILL') |
| parser.add_option( |
| '--raw-cmd', |
| action='store_true', |
| help='Ignore the isolated command, use the one supplied at the command ' |
| 'line') |
| parser.add_option( |
| '--relative-cwd', |
| help='Ignore the isolated \'relative_cwd\' and use this one instead; ' |
| 'requires --raw-cmd') |
| parser.add_option( |
| '--env', |
| default=[], |
| action='append', |
| help='Environment variables to set for the child process') |
| parser.add_option( |
| '--env-prefix', |
| default=[], |
| action='append', |
| help='Specify a VAR=./path/fragment to put in the environment variable ' |
| 'before executing the command. The path fragment must be relative ' |
| 'to the isolated run directory, and must not contain a `..` token. ' |
| 'The path will be made absolute and prepended to the indicated ' |
| '$VAR using the OS\'s path separator. Multiple items for the same ' |
| '$VAR will be prepended in order.') |
| parser.add_option( |
| '--bot-file', |
| help='Path to a file describing the state of the host. The content is ' |
| 'defined by on_before_task() in bot_config.') |
| parser.add_option( |
| '--switch-to-account', |
| help='If given, switches LUCI_CONTEXT to given logical service account ' |
| '(e.g. "task" or "system") before launching the isolated process.') |
| parser.add_option( |
| '--output', |
| action='append', |
| help='Specifies an output to return. If no outputs are specified, all ' |
| 'files located in $(ISOLATED_OUTDIR) will be returned; ' |
| 'otherwise, outputs in both $(ISOLATED_OUTDIR) and those ' |
| 'specified by --output option (there can be multiple) will be ' |
| 'returned. Note that if a file in OUT_DIR has the same path ' |
| 'as an --output option, the --output version will be returned.') |
| parser.add_option( |
| '-a', |
| '--argsfile', |
| # This is actually handled in parse_args; it's included here purely so it |
| # can make it into the help text. |
| help='Specify a file containing a JSON array of arguments to this ' |
| 'script. If --argsfile is provided, no other argument may be ' |
| 'provided on the command line.') |
| parser.add_option( |
| '--report-on-exception', |
| action='store_true', |
| help='Whether report exception during execution to isolate server. ' |
| 'This flag should only be used in swarming bot.') |
| |
| group = optparse.OptionGroup(parser, 'Data source') |
| group.add_option( |
| '-s', '--isolated', |
| help='Hash of the .isolated to grab from the isolate server.') |
| isolateserver.add_isolate_server_options(group) |
| parser.add_option_group(group) |
| |
| isolateserver.add_cache_options(parser) |
| |
| cipd.add_cipd_options(parser) |
| |
| group = optparse.OptionGroup(parser, 'Named caches') |
| group.add_option( |
| '--named-cache', |
| dest='named_caches', |
| action='append', |
| nargs=3, |
| default=[], |
| help='A named cache to request. Accepts 3 arguments: name, path, hint. ' |
| 'name identifies the cache, must match regex [a-z0-9_]{1,4096}. ' |
| 'path is a path relative to the run dir where the cache directory ' |
| 'must be put to. ' |
| 'This option can be specified more than once.') |
| group.add_option( |
| '--named-cache-root', |
| default='named_caches', |
| help='Cache root directory. Default=%default') |
| parser.add_option_group(group) |
| |
| group = optparse.OptionGroup(parser, 'Process containment') |
| parser.add_option( |
| '--lower-priority', action='store_true', |
| help='Lowers the child process priority') |
| parser.add_option( |
| '--containment-type', |
| choices=('NONE', 'AUTO', 'JOB_OBJECT'), |
| default='NONE', |
| help='Type of container to use') |
| parser.add_option( |
| '--limit-processes', |
| type='int', |
| default=0, |
| help='Maximum number of active processes in the containment') |
| parser.add_option( |
| '--limit-total-committed-memory', |
| type='int', |
| default=0, |
| help='Maximum sum of committed memory in the containment') |
| parser.add_option_group(group) |
| |
| group = optparse.OptionGroup(parser, 'Debugging') |
| group.add_option( |
| '--leak-temp-dir', |
| action='store_true', |
| help='Deliberately leak isolate\'s temp dir for later examination. ' |
| 'Default: %default') |
| group.add_option('--root-dir', help='Use a directory instead of a random one') |
| parser.add_option_group(group) |
| |
| auth.add_auth_options(parser) |
| |
| parser.set_defaults(cache='cache') |
| return parser |
| |
| |
| def process_named_cache_options(parser, options, time_fn=None): |
| """Validates named cache options and returns a CacheManager.""" |
| if options.named_caches and not options.named_cache_root: |
| parser.error('--named-cache is specified, but --named-cache-root is empty') |
| for name, path, hint in options.named_caches: |
| if not CACHE_NAME_RE.match(name): |
| parser.error( |
| 'cache name %r does not match %r' % (name, CACHE_NAME_RE.pattern)) |
| if not path: |
| parser.error('cache path cannot be empty') |
| try: |
| int(hint) |
| except ValueError: |
| parser.error('cache hint must be a number') |
| if options.named_cache_root: |
| # Make these configurable later if there is use case but for now it's fairly |
| # safe values. |
| # In practice, a fair chunk of bots are already recycled on a daily schedule |
| # so this code doesn't have any effect to them, unless they are preloaded |
| # with a really old cache. |
| policies = local_caching.CachePolicies( |
| # 1TiB. |
| max_cache_size=1024*1024*1024*1024, |
| min_free_space=options.min_free_space, |
| max_items=50, |
| max_age_secs=MAX_AGE_SECS) |
| root_dir = six.text_type(os.path.abspath(options.named_cache_root)) |
| cache = local_caching.NamedCache(root_dir, policies, time_fn=time_fn) |
| # Touch any named caches we're going to use to minimize thrashing |
| # between tasks that request some (but not all) of the same named caches. |
| cache.touch(*[name for name, _, _ in options.named_caches]) |
| return cache |
| return None |
| |
| |
| def parse_args(args): |
| # Create a fake mini-parser just to get out the "-a" command. Note that |
| # it's not documented here; instead, it's documented in create_option_parser |
| # even though that parser will never actually get to parse it. This is |
| # because --argsfile is exclusive with all other options and arguments. |
| file_argparse = argparse.ArgumentParser(add_help=False) |
| file_argparse.add_argument('-a', '--argsfile') |
| (file_args, nonfile_args) = file_argparse.parse_known_args(args) |
| if file_args.argsfile: |
| if nonfile_args: |
| file_argparse.error('Can\'t specify --argsfile with' |
| 'any other arguments (%s)' % nonfile_args) |
| try: |
| with open(file_args.argsfile, 'r') as f: |
| args = json.load(f) |
| except (IOError, OSError, ValueError) as e: |
| # We don't need to error out here - "args" is now empty, |
| # so the call below to parser.parse_args(args) will fail |
| # and print the full help text. |
| print('Couldn\'t read arguments: %s' % e, file=sys.stderr) |
| |
| # Even if we failed to read the args, just call the normal parser now since it |
| # will print the correct help message. |
| parser = create_option_parser() |
| options, args = parser.parse_args(args) |
| if not isinstance(options.cipd_enabled, (bool, int)): |
| options.cipd_enabled = distutils.util.strtobool(options.cipd_enabled) |
| return (parser, options, args) |
| |
| |
| def _calc_named_cache_hint(named_cache, named_caches): |
| """Returns the expected size of the missing named caches.""" |
| present = named_cache.available |
| size = 0 |
| for name, _, hint in named_caches: |
| if name not in present: |
| hint = int(hint) |
| if hint > 0: |
| size += hint |
| return size |
| |
| |
| def main(args): |
| # Warning: when --argsfile is used, the strings are unicode instances, when |
| # parsed normally, the strings are str instances. |
| (parser, options, args) = parse_args(args) |
| |
| if options.report_on_exception and options.isolate_server: |
| on_error.report_on_exception_exit(options.isolate_server) |
| |
| if not file_path.enable_symlink(): |
| logging.warning('Symlink support is not enabled') |
| |
| named_cache = process_named_cache_options(parser, options) |
| # hint is 0 if there's no named cache. |
| hint = _calc_named_cache_hint(named_cache, options.named_caches) |
| if hint: |
| # Increase the --min-free-space value by the hint, and recreate the |
| # NamedCache instance so it gets the updated CachePolicy. |
| options.min_free_space += hint |
| named_cache = process_named_cache_options(parser, options) |
| |
| # TODO(crbug.com/932396): Remove this. |
| use_go_isolated = options.cipd_enabled |
| |
| # TODO(maruel): CIPD caches should be defined at an higher level here too, so |
| # they can be cleaned the same way. |
| if use_go_isolated and not options.clean: |
| isolate_cache = None |
| else: |
| isolate_cache = isolateserver.process_cache_options(options, trim=False) |
| |
| caches = [] |
| if isolate_cache: |
| caches.append(isolate_cache) |
| if named_cache: |
| caches.append(named_cache) |
| root = caches[0].cache_dir if caches else six.text_type(os.getcwd()) |
| if options.clean: |
| if options.isolated: |
| parser.error('Can\'t use --isolated with --clean.') |
| if options.isolate_server: |
| parser.error('Can\'t use --isolate-server with --clean.') |
| if options.json: |
| parser.error('Can\'t use --json with --clean.') |
| if options.named_caches: |
| parser.error('Can\t use --named-cache with --clean.') |
| # Trim first, then clean. |
| local_caching.trim_caches( |
| caches, |
| root, |
| min_free_space=options.min_free_space, |
| max_age_secs=MAX_AGE_SECS) |
| for c in caches: |
| c.cleanup() |
| return 0 |
| |
| # Trim must still be done for the following case: |
| # - named-cache was used |
| # - some entries, with a large hint, where missing |
| # - --min-free-space was increased accordingly, thus trimming is needed |
| # Otherwise, this will have no effect, as bot_main calls run_isolated with |
| # --clean after each task. |
| if hint: |
| logging.info('Additional trimming of %d bytes', hint) |
| local_caching.trim_caches( |
| caches, |
| root, |
| min_free_space=options.min_free_space, |
| max_age_secs=MAX_AGE_SECS) |
| |
| if not options.isolated and not args: |
| parser.error('--isolated or command to run is required.') |
| |
| auth.process_auth_options(parser, options) |
| |
| isolateserver.process_isolate_server_options(parser, options, False) |
| if not options.isolate_server: |
| if options.isolated: |
| parser.error('--isolated requires --isolate-server') |
| if ISOLATED_OUTDIR_PARAMETER in args: |
| parser.error( |
| '%s in args requires --isolate-server' % ISOLATED_OUTDIR_PARAMETER) |
| |
| if options.root_dir: |
| options.root_dir = six.text_type(os.path.abspath(options.root_dir)) |
| if options.json: |
| options.json = six.text_type(os.path.abspath(options.json)) |
| |
| if any('=' not in i for i in options.env): |
| parser.error( |
| '--env required key=value form. value can be skipped to delete ' |
| 'the variable') |
| options.env = dict(i.split('=', 1) for i in options.env) |
| |
| prefixes = {} |
| cwd = os.path.realpath(os.getcwd()) |
| for item in options.env_prefix: |
| if '=' not in item: |
| parser.error( |
| '--env-prefix %r is malformed, must be in the form `VAR=./path`' |
| % item) |
| key, opath = item.split('=', 1) |
| if os.path.isabs(opath): |
| parser.error('--env-prefix %r path is bad, must be relative.' % opath) |
| opath = os.path.normpath(opath) |
| if not os.path.realpath(os.path.join(cwd, opath)).startswith(cwd): |
| parser.error( |
| '--env-prefix %r path is bad, must be relative and not contain `..`.' |
| % opath) |
| prefixes.setdefault(key, []).append(opath) |
| options.env_prefix = prefixes |
| |
| cipd.validate_cipd_options(parser, options) |
| |
| install_packages_fn = noop_install_packages |
| tmp_cipd_cache_dir = None |
| if options.cipd_enabled: |
| cache_dir = options.cipd_cache |
| if not cache_dir: |
| tmp_cipd_cache_dir = six.text_type(tempfile.mkdtemp()) |
| cache_dir = tmp_cipd_cache_dir |
| install_packages_fn = ( |
| lambda run_dir, isolated_dir: install_client_and_packages( |
| run_dir, |
| cipd.parse_package_args(options.cipd_packages), |
| options.cipd_server, |
| options.cipd_client_package, |
| options.cipd_client_version, |
| cache_dir=cache_dir, |
| isolated_dir=isolated_dir)) |
| |
| @contextlib.contextmanager |
| def install_named_caches(run_dir): |
| # WARNING: this function depends on "options" variable defined in the outer |
| # function. |
| assert six.text_type(run_dir), repr(run_dir) |
| assert os.path.isabs(run_dir), run_dir |
| named_caches = [(os.path.join(run_dir, six.text_type(relpath)), name) |
| for name, relpath, _ in options.named_caches] |
| for path, name in named_caches: |
| named_cache.install(path, name) |
| try: |
| yield |
| finally: |
| # Uninstall each named cache, returning it to the cache pool. If an |
| # uninstall fails for a given cache, it will remain in the task's |
| # temporary space, get cleaned up by the Swarming bot, and be lost. |
| # |
| # If the Swarming bot cannot clean up the cache, it will handle it like |
| # any other bot file that could not be removed. |
| for path, name in reversed(named_caches): |
| try: |
| # uninstall() doesn't trim but does call save() implicitly. Trimming |
| # *must* be done manually via periodic 'run_isolated.py --clean'. |
| named_cache.uninstall(path, name) |
| except local_caching.NamedCacheError: |
| logging.exception('Error while removing named cache %r at %r. ' |
| 'The cache will be lost.', path, name) |
| |
| extra_args = [] |
| command = [] |
| if options.raw_cmd: |
| command = args |
| if options.relative_cwd: |
| a = os.path.normpath(os.path.abspath(options.relative_cwd)) |
| if not a.startswith(os.getcwd()): |
| parser.error( |
| '--relative-cwd must not try to escape the working directory') |
| else: |
| if options.relative_cwd: |
| parser.error('--relative-cwd requires --raw-cmd') |
| extra_args = args |
| |
| containment_type = subprocess42.Containment.NONE |
| if options.containment_type == 'AUTO': |
| containment_type = subprocess42.Containment.AUTO |
| if options.containment_type == 'JOB_OBJECT': |
| containment_type = subprocess42.Containment.JOB_OBJECT |
| containment = subprocess42.Containment( |
| containment_type=containment_type, |
| limit_processes=options.limit_processes, |
| limit_total_committed_memory=options.limit_total_committed_memory) |
| |
| data = TaskData( |
| command=command, |
| relative_cwd=options.relative_cwd, |
| extra_args=extra_args, |
| isolated_hash=options.isolated, |
| storage=None, |
| isolate_cache=isolate_cache, |
| outputs=options.output, |
| install_named_caches=install_named_caches, |
| leak_temp_dir=options.leak_temp_dir, |
| root_dir=_to_unicode(options.root_dir), |
| hard_timeout=options.hard_timeout, |
| grace_period=options.grace_period, |
| bot_file=options.bot_file, |
| switch_to_account=options.switch_to_account, |
| install_packages_fn=install_packages_fn, |
| use_go_isolated=use_go_isolated, |
| go_cache_dir=options.cache, |
| go_cache_policies=local_caching.CachePolicies( |
| max_cache_size=options.max_cache_size, |
| min_free_space=options.min_free_space, |
| max_items=options.max_items, |
| max_age_secs=None, |
| ), |
| env=options.env, |
| env_prefix=options.env_prefix, |
| lower_priority=bool(options.lower_priority), |
| containment=containment) |
| try: |
| if options.isolate_server: |
| server_ref = isolate_storage.ServerRef( |
| options.isolate_server, options.namespace) |
| storage = isolateserver.get_storage(server_ref) |
| with storage: |
| data = data._replace(storage=storage) |
| # Hashing schemes used by |storage| and |isolate_cache| MUST match. |
| assert storage.server_ref.hash_algo == server_ref.hash_algo |
| return run_tha_test(data, options.json) |
| return run_tha_test(data, options.json) |
| except (cipd.Error, local_caching.NamedCacheError, |
| local_caching.NoMoreSpace) as ex: |
| print(ex.message, file=sys.stderr) |
| return 1 |
| finally: |
| if tmp_cipd_cache_dir is not None: |
| try: |
| file_path.rmtree(tmp_cipd_cache_dir) |
| except OSError: |
| logging.exception('Remove tmp_cipd_cache_dir=%s failed', |
| tmp_cipd_cache_dir) |
| # Best effort clean up. Failed to do so doesn't affect the outcome. |
| |
| |
| if __name__ == '__main__': |
| subprocess42.inhibit_os_error_reporting() |
| # Ensure that we are always running with the correct encoding. |
| fix_encoding.fix_encoding() |
| net.set_user_agent('run_isolated.py/' + __version__) |
| sys.exit(main(sys.argv[1:])) |