| #!/usr/bin/env python2 |
| # Copyright 2013 Google Inc. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import cPickle |
| import errno |
| import gzip |
| import json |
| import multiprocessing |
| import optparse |
| import os |
| import re |
| import shutil |
| import signal |
| import subprocess |
| import sys |
| import tempfile |
| import thread |
| import threading |
| import time |
| import zlib |
| |
| # An object that catches SIGINT sent to the Python process and notices |
| # if processes passed to wait() die by SIGINT (we need to look for |
| # both of those cases, because pressing Ctrl+C can result in either |
| # the main process or one of the subprocesses getting the signal). |
| # |
| # Before a SIGINT is seen, wait(p) will simply call p.wait() and |
| # return the result. Once a SIGINT has been seen (in the main process |
| # or a subprocess, including the one the current call is waiting for), |
| # wait(p) will call p.terminate() and raise ProcessWasInterrupted. |
| class SigintHandler(object): |
| class ProcessWasInterrupted(Exception): pass |
| sigint_returncodes = {-signal.SIGINT, # Unix |
| -1073741510, # Windows |
| } |
| def __init__(self): |
| self.__lock = threading.Lock() |
| self.__processes = set() |
| self.__got_sigint = False |
| signal.signal(signal.SIGINT, lambda signal_num, frame: self.interrupt()) |
| def __on_sigint(self): |
| self.__got_sigint = True |
| while self.__processes: |
| try: |
| self.__processes.pop().terminate() |
| except OSError: |
| pass |
| def interrupt(self): |
| with self.__lock: |
| self.__on_sigint() |
| def got_sigint(self): |
| with self.__lock: |
| return self.__got_sigint |
| def wait(self, p): |
| with self.__lock: |
| if self.__got_sigint: |
| p.terminate() |
| self.__processes.add(p) |
| code = p.wait() |
| with self.__lock: |
| self.__processes.discard(p) |
| if code in self.sigint_returncodes: |
| self.__on_sigint() |
| if self.__got_sigint: |
| raise self.ProcessWasInterrupted |
| return code |
| sigint_handler = SigintHandler() |
| |
| # Return the width of the terminal, or None if it couldn't be |
| # determined (e.g. because we're not being run interactively). |
| def term_width(out): |
| if not out.isatty(): |
| return None |
| try: |
| p = subprocess.Popen(["stty", "size"], |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| (out, err) = p.communicate() |
| if p.returncode != 0 or err: |
| return None |
| return int(out.split()[1]) |
| except (IndexError, OSError, ValueError): |
| return None |
| |
| # Output transient and permanent lines of text. If several transient |
| # lines are written in sequence, the new will overwrite the old. We |
| # use this to ensure that lots of unimportant info (tests passing) |
| # won't drown out important info (tests failing). |
| class Outputter(object): |
| def __init__(self, out_file): |
| self.__out_file = out_file |
| self.__previous_line_was_transient = False |
| self.__width = term_width(out_file) # Line width, or None if not a tty. |
| def transient_line(self, msg): |
| if self.__width is None: |
| self.__out_file.write(msg + "\n") |
| else: |
| self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width)) |
| self.__previous_line_was_transient = True |
| def flush_transient_output(self): |
| if self.__previous_line_was_transient: |
| self.__out_file.write("\n") |
| self.__previous_line_was_transient = False |
| def permanent_line(self, msg): |
| self.flush_transient_output() |
| self.__out_file.write(msg + "\n") |
| |
| |
| class Task(object): |
| """Stores information about a task (single execution of a test). |
| |
| This class stores information about the test to be executed (gtest binary and |
| test name), and its result (log file, exit code and runtime). |
| Each task is uniquely identified by the gtest binary, the test name and an |
| execution number that increases each time the test is executed. |
| Additionaly we store the last execution time, so that next time the test is |
| executed, the slowest tests are run first. |
| """ |
| def __init__(self, test_binary, test_name, test_command, |
| last_execution_time, output_dir): |
| self.test_name = test_name |
| self.output_dir = output_dir |
| self.test_binary = test_binary |
| self.test_command = test_command |
| self.last_execution_time = last_execution_time |
| |
| self.test_id = (test_binary, test_name) |
| |
| self.execution_number = task_manager.get_next_execution_number(self.test_id) |
| log_name = '%s-%s-%s.log' % (self.__normalize(test_binary), |
| self.__normalize(test_name), |
| self.execution_number) |
| |
| self.log_file = os.path.join(output_dir, log_name) |
| self.task_id = (test_binary, test_name, self.execution_number) |
| |
| self.exit_code = None |
| self.runtime_ms = None |
| |
| def __lt__(self, other): |
| if self.last_execution_time is None: |
| return True |
| if other.last_execution_time is None: |
| return False |
| return self.last_execution_time > other.last_execution_time |
| |
| def __normalize(self, string): |
| return re.sub('[^A-Za-z0-9]', '_', string) |
| |
| def run(self): |
| begin = time.time() |
| with open(self.log_file, 'w') as log: |
| task = subprocess.Popen(self.test_command, stdout=log, stderr=log) |
| try: |
| self.exit_code = sigint_handler.wait(task) |
| except sigint_handler.ProcessWasInterrupted: |
| thread.exit() |
| self.runtime_ms = int(1000 * (time.time() - begin)) |
| self.last_execution_time = None if self.exit_code else self.runtime_ms |
| |
| |
| class TaskManager(object): |
| """Executes the tasks and stores the passed, failed and interrupted tasks. |
| |
| When a task is run, this class keeps track if it passed, failed or was |
| interrupted. After a task finishes it calls the relevant functions of the |
| Logger, TestResults and TestTimes classes, and in case of failure, retries the |
| test as specified by the --retry_failed flag. |
| """ |
| def __init__(self, times, logger, test_results): |
| self.times = times |
| self.logger = logger |
| self.test_results = test_results |
| |
| self.global_exit_code = 0 |
| |
| self.passed = [] |
| self.failed = [] |
| self.started = {} |
| self.execution_number = {} |
| |
| self.lock = threading.Lock() |
| |
| def get_next_execution_number(self, test_id): |
| with self.lock: |
| next_execution_number = self.execution_number.setdefault(test_id, 1) |
| self.execution_number[test_id] += 1 |
| return next_execution_number |
| |
| def __register_start(self, task): |
| with self.lock: |
| self.started[task.task_id] = task |
| |
| def __register_exit(self, task): |
| self.logger.log_exit(task) |
| self.times.record_test_time(task.test_binary, task.test_name, |
| task.last_execution_time) |
| if self.test_results: |
| test_results.log(task.test_name, task.runtime_ms, |
| "PASS" if task.exit_code == 0 else "FAIL") |
| |
| with self.lock: |
| self.started.pop(task.task_id) |
| if task.exit_code == 0: |
| self.passed.append(task) |
| else: |
| self.failed.append(task) |
| |
| def run_task(self, task): |
| for try_number in range(options.retry_failed + 1): |
| self.__register_start(task) |
| task.run() |
| self.__register_exit(task) |
| |
| if task.exit_code == 0: |
| break |
| |
| if try_number < options.retry_failed: |
| # We need create a new Task instance. Each task represents a single test |
| # execution, with its own runtime, exit code and log file. |
| task = Task(task.test_binary, task.test_name, task.test_command, |
| task.last_execution_time, task.output_dir) |
| |
| with self.lock: |
| if task.exit_code != 0: |
| self.global_exit_code = task.exit_code |
| |
| |
| class FilterFormat(object): |
| def __init__(self, output_dir): |
| if sys.stdout.isatty(): |
| # stdout needs to be unbuffered since the output is interactive. |
| sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) |
| |
| self.output_dir = output_dir |
| |
| self.total_tasks = 0 |
| self.finished_tasks = 0 |
| self.out = Outputter(sys.stdout) |
| self.stdout_lock = threading.Lock() |
| |
| def move_to(self, destination_dir, tasks): |
| destination_dir = os.path.join(self.output_dir, destination_dir) |
| os.makedirs(destination_dir) |
| for task in tasks: |
| shutil.move(task.log_file, destination_dir) |
| |
| |
| def print_tests(self, message, tasks): |
| self.out.permanent_line("%s (%s/%s):" % |
| (message, len(tasks), self.total_tasks)) |
| for task in sorted(tasks): |
| runtime_ms = 'Interrupted' |
| if task.runtime_ms is not None: |
| runtime_ms = '%d ms' % task.runtime_ms |
| self.out.permanent_line("%11s: %s %s (try #%d)" % ( |
| runtime_ms, task.test_binary, task.test_name, task.execution_number)) |
| |
| def log_exit(self, task): |
| with self.stdout_lock: |
| self.finished_tasks += 1 |
| self.out.transient_line("[%d/%d] %s (%d ms)" |
| % (self.finished_tasks, self.total_tasks, |
| task.test_name, task.runtime_ms)) |
| if task.exit_code != 0: |
| with open(task.log_file) as f: |
| for line in f.readlines(): |
| self.out.permanent_line(line.rstrip()) |
| self.out.permanent_line( |
| "[%d/%d] %s returned/aborted with exit code %d (%d ms)" |
| % (self.finished_tasks, self.total_tasks, task.test_name, |
| task.exit_code, task.runtime_ms)) |
| |
| def log_tasks(self, total_tasks): |
| self.total_tasks += total_tasks |
| self.out.transient_line("[0/%d] Running tests..." % self.total_tasks) |
| |
| def flush(self): |
| self.out.flush_transient_output() |
| |
| |
| class CollectTestResults(object): |
| def __init__(self, json_dump_filepath): |
| self.test_results_lock = threading.Lock() |
| self.json_dump_file = open(json_dump_filepath, 'w') |
| self.test_results = { |
| "interrupted": False, |
| "path_delimiter": ".", |
| # Third version of the file format. See the link in the flag description |
| # for details. |
| "version": 3, |
| "seconds_since_epoch": int(time.time()), |
| "num_failures_by_type": { |
| "PASS": 0, |
| "FAIL": 0, |
| }, |
| "tests": {}, |
| } |
| |
| def log(self, test, runtime_ms, actual_result): |
| with self.test_results_lock: |
| self.test_results['num_failures_by_type'][actual_result] += 1 |
| results = self.test_results['tests'] |
| for name in test.split('.'): |
| results = results.setdefault(name, {}) |
| |
| if results: |
| results['actual'] += ' ' + actual_result |
| results['times'].append(runtime_ms) |
| else: # This is the first invocation of the test |
| results['actual'] = actual_result |
| results['times'] = [runtime_ms] |
| results['time'] = runtime_ms |
| results['expected'] = 'PASS' |
| |
| def dump_to_file_and_close(self): |
| json.dump(self.test_results, self.json_dump_file) |
| self.json_dump_file.close() |
| |
| class DummyTimer(object): |
| def start(self): |
| pass |
| def cancel(self): |
| pass |
| |
| # Record of test runtimes. Has built-in locking. |
| class TestTimes(object): |
| def __init__(self, save_file): |
| "Create new object seeded with saved test times from the given file." |
| self.__times = {} # (test binary, test name) -> runtime in ms |
| |
| # Protects calls to record_test_time(); other calls are not |
| # expected to be made concurrently. |
| self.__lock = threading.Lock() |
| |
| try: |
| with gzip.GzipFile(save_file, "rb") as f: |
| times = cPickle.load(f) |
| except (EOFError, IOError, cPickle.UnpicklingError, zlib.error): |
| # File doesn't exist, isn't readable, is malformed---whatever. |
| # Just ignore it. |
| return |
| |
| # Discard saved times if the format isn't right. |
| if type(times) is not dict: |
| return |
| for ((test_binary, test_name), runtime) in times.items(): |
| if (type(test_binary) is not str or type(test_name) is not str |
| or type(runtime) not in {int, long, type(None)}): |
| return |
| |
| self.__times = times |
| |
| def get_test_time(self, binary, testname): |
| """Return the last duration for the given test as an integer number of |
| milliseconds, or None if the test failed or if there's no record for it.""" |
| return self.__times.get((binary, testname), None) |
| |
| def record_test_time(self, binary, testname, runtime_ms): |
| """Record that the given test ran in the specified number of |
| milliseconds. If the test failed, runtime_ms should be None.""" |
| with self.__lock: |
| self.__times[(binary, testname)] = runtime_ms |
| |
| def write_to_file(self, save_file): |
| "Write all the times to file." |
| try: |
| with open(save_file, "wb") as f: |
| with gzip.GzipFile("", "wb", 9, f) as gzf: |
| cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL) |
| except IOError: |
| pass # ignore errors---saving the times isn't that important |
| |
| |
| def find_tests(binaries, additional_args): |
| test_count = 0 |
| tasks = [] |
| for test_binary in binaries: |
| command = [test_binary] |
| if options.gtest_also_run_disabled_tests: |
| command += ['--gtest_also_run_disabled_tests'] |
| |
| list_command = command + ['--gtest_list_tests'] |
| if options.gtest_filter != '': |
| list_command += ['--gtest_filter=' + options.gtest_filter] |
| |
| try: |
| test_list = subprocess.Popen(list_command, |
| stdout=subprocess.PIPE).communicate()[0] |
| except OSError as e: |
| sys.exit("%s: %s" % (test_binary, str(e))) |
| |
| command += additional_args + ['--gtest_color=' + options.gtest_color] |
| |
| test_group = '' |
| for line in test_list.split('\n'): |
| if not line.strip(): |
| continue |
| if line[0] != " ": |
| # Remove comments for typed tests and strip whitespace. |
| test_group = line.split('#')[0].strip() |
| continue |
| # Remove comments for parameterized tests and strip whitespace. |
| line = line.split('#')[0].strip() |
| if not line: |
| continue |
| |
| test_name = test_group + line |
| if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test_name: |
| continue |
| |
| last_execution_time = times.get_test_time(test_binary, test_name) |
| if options.failed and last_execution_time is not None: |
| continue |
| |
| test_command = command + ['--gtest_filter=' + test_name] |
| if (test_count - options.shard_index) % options.shard_count == 0: |
| for _ in range(options.repeat): |
| tasks.append(Task(test_binary, test_name, test_command, |
| last_execution_time, options.output_dir)) |
| |
| test_count += 1 |
| |
| return tasks |
| |
| |
| def execute_tasks(tasks): |
| class WorkerFn(object): |
| def __init__(self, tasks): |
| self.task_id = 0 |
| self.tasks = tasks |
| self.task_lock = threading.Lock() |
| |
| def __call__(self): |
| while True: |
| with self.task_lock: |
| if self.task_id < len(self.tasks): |
| task = self.tasks[self.task_id] |
| self.task_id += 1 |
| else: |
| return |
| task_manager.run_task(task) |
| |
| def start_daemon(func): |
| t = threading.Thread(target=func) |
| t.daemon = True |
| t.start() |
| return t |
| |
| try: |
| timeout.start() |
| worker_fn = WorkerFn(tasks) |
| workers = [start_daemon(worker_fn) for _ in range(options.workers)] |
| for worker in workers: |
| worker.join() |
| finally: |
| timeout.cancel() |
| |
| |
| # Remove additional arguments (anything after --). |
| additional_args = [] |
| |
| for i in range(len(sys.argv)): |
| if sys.argv[i] == '--': |
| additional_args = sys.argv[i+1:] |
| sys.argv = sys.argv[:i] |
| break |
| |
| parser = optparse.OptionParser( |
| usage = 'usage: %prog [options] binary [binary ...] -- [additional args]') |
| |
| parser.add_option('-d', '--output_dir', type='string', |
| default=os.path.join(tempfile.gettempdir(), "gtest-parallel"), |
| help='output directory for test logs') |
| parser.add_option('-r', '--repeat', type='int', default=1, |
| help='Number of times to execute all the tests.') |
| parser.add_option('--retry_failed', type='int', default=0, |
| help='Number of times to repeat failed tests.') |
| parser.add_option('--failed', action='store_true', default=False, |
| help='run only failed and new tests') |
| parser.add_option('-w', '--workers', type='int', |
| default=multiprocessing.cpu_count(), |
| help='number of workers to spawn') |
| parser.add_option('--gtest_color', type='string', default='yes', |
| help='color output') |
| parser.add_option('--gtest_filter', type='string', default='', |
| help='test filter') |
| parser.add_option('--gtest_also_run_disabled_tests', action='store_true', |
| default=False, help='run disabled tests too') |
| parser.add_option('--print_test_times', action='store_true', default=False, |
| help='list the run time of each test at the end of execution') |
| parser.add_option('--shard_count', type='int', default=1, |
| help='total number of shards (for sharding test execution ' |
| 'between multiple machines)') |
| parser.add_option('--shard_index', type='int', default=0, |
| help='zero-indexed number identifying this shard (for ' |
| 'sharding test execution between multiple machines)') |
| parser.add_option('--dump_json_test_results', type='string', default=None, |
| help='Saves the results of the tests as a JSON machine-' |
| 'readable file. The format of the file is specified at ' |
| 'https://www.chromium.org/developers/the-json-test-results-format') |
| parser.add_option('--timeout', type='int', default=None, |
| help='Interrupt all remaining processes after the given ' |
| 'time (in seconds).') |
| |
| (options, binaries) = parser.parse_args() |
| |
| if binaries == []: |
| parser.print_usage() |
| sys.exit(1) |
| |
| if options.shard_count < 1: |
| parser.error("Invalid number of shards: %d. Must be at least 1." % |
| options.shard_count) |
| if not (0 <= options.shard_index < options.shard_count): |
| parser.error("Invalid shard index: %d. Must be between 0 and %d " |
| "(less than the number of shards)." % |
| (options.shard_index, options.shard_count - 1)) |
| |
| # Check that all test binaries have an unique basename. That way we can ensure |
| # the logs are saved to unique files even when two different binaries have |
| # common tests. |
| unique_binaries = set(os.path.basename(binary) for binary in binaries) |
| assert len(unique_binaries) == len(binaries), ( |
| "All test binaries must have an unique basename.") |
| |
| # Remove files from old test runs. |
| if os.path.isdir(options.output_dir): |
| shutil.rmtree(options.output_dir) |
| # Create directory for test log output. |
| try: |
| os.makedirs(options.output_dir) |
| except OSError as e: |
| # Ignore errors if this directory already exists. |
| if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir): |
| raise e |
| |
| timeout = (DummyTimer() if options.timeout is None |
| else threading.Timer(options.timeout, sigint_handler.interrupt)) |
| |
| test_results = None |
| if options.dump_json_test_results is not None: |
| test_results = CollectTestResults(options.dump_json_test_results) |
| |
| save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times") |
| times = TestTimes(save_file) |
| logger = FilterFormat(options.output_dir) |
| |
| task_manager = TaskManager(times, logger, test_results) |
| |
| tasks = find_tests(binaries, additional_args) |
| logger.log_tasks(len(tasks)) |
| execute_tasks(tasks) |
| |
| if task_manager.passed: |
| logger.move_to('passed', task_manager.passed) |
| if options.print_test_times: |
| logger.print_tests('PASSED TESTS', task_manager.passed) |
| |
| if task_manager.failed: |
| logger.print_tests('FAILED TESTS', task_manager.failed) |
| logger.move_to('failed', task_manager.failed) |
| |
| if task_manager.started: |
| logger.print_tests('INTERRUPTED TESTS', task_manager.started.values()) |
| logger.move_to('interrupted', task_manager.started.values()) |
| |
| logger.flush() |
| times.write_to_file(save_file) |
| if test_results: |
| test_results.dump_to_file_and_close() |
| |
| if sigint_handler.got_sigint(): |
| task_manager.global_exit_code = -signal.SIGINT |
| sys.exit(task_manager.global_exit_code) |