Move gtest-parallel to gtest_parallel.py and make gtest-parallel a thin wrapper (#29)
diff --git a/gtest-parallel b/gtest-parallel
index 4f5f91b..19b0803 100755
--- a/gtest-parallel
+++ b/gtest-parallel
@@ -1,5 +1,5 @@
#!/usr/bin/env python2
-# Copyright 2013 Google Inc. All rights reserved.
+# Copyright 2017 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,574 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import cPickle
-import errno
-import gzip
-import json
-import multiprocessing
-import optparse
-import os
-import re
-import shutil
-import signal
-import subprocess
+import gtest_parallel
import sys
-import tempfile
-import thread
-import threading
-import time
-import zlib
-# An object that catches SIGINT sent to the Python process and notices
-# if processes passed to wait() die by SIGINT (we need to look for
-# both of those cases, because pressing Ctrl+C can result in either
-# the main process or one of the subprocesses getting the signal).
-#
-# Before a SIGINT is seen, wait(p) will simply call p.wait() and
-# return the result. Once a SIGINT has been seen (in the main process
-# or a subprocess, including the one the current call is waiting for),
-# wait(p) will call p.terminate() and raise ProcessWasInterrupted.
-class SigintHandler(object):
- class ProcessWasInterrupted(Exception): pass
- sigint_returncodes = {-signal.SIGINT, # Unix
- -1073741510, # Windows
- }
- def __init__(self):
- self.__lock = threading.Lock()
- self.__processes = set()
- self.__got_sigint = False
- signal.signal(signal.SIGINT, lambda signal_num, frame: self.interrupt())
- def __on_sigint(self):
- self.__got_sigint = True
- while self.__processes:
- try:
- self.__processes.pop().terminate()
- except OSError:
- pass
- def interrupt(self):
- with self.__lock:
- self.__on_sigint()
- def got_sigint(self):
- with self.__lock:
- return self.__got_sigint
- def wait(self, p):
- with self.__lock:
- if self.__got_sigint:
- p.terminate()
- self.__processes.add(p)
- code = p.wait()
- with self.__lock:
- self.__processes.discard(p)
- if code in self.sigint_returncodes:
- self.__on_sigint()
- if self.__got_sigint:
- raise self.ProcessWasInterrupted
- return code
-sigint_handler = SigintHandler()
-
-# Return the width of the terminal, or None if it couldn't be
-# determined (e.g. because we're not being run interactively).
-def term_width(out):
- if not out.isatty():
- return None
- try:
- p = subprocess.Popen(["stty", "size"],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- (out, err) = p.communicate()
- if p.returncode != 0 or err:
- return None
- return int(out.split()[1])
- except (IndexError, OSError, ValueError):
- return None
-
-# Output transient and permanent lines of text. If several transient
-# lines are written in sequence, the new will overwrite the old. We
-# use this to ensure that lots of unimportant info (tests passing)
-# won't drown out important info (tests failing).
-class Outputter(object):
- def __init__(self, out_file):
- self.__out_file = out_file
- self.__previous_line_was_transient = False
- self.__width = term_width(out_file) # Line width, or None if not a tty.
- def transient_line(self, msg):
- if self.__width is None:
- self.__out_file.write(msg + "\n")
- else:
- self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width))
- self.__previous_line_was_transient = True
- def flush_transient_output(self):
- if self.__previous_line_was_transient:
- self.__out_file.write("\n")
- self.__previous_line_was_transient = False
- def permanent_line(self, msg):
- self.flush_transient_output()
- self.__out_file.write(msg + "\n")
-
-
-class Task(object):
- """Stores information about a task (single execution of a test).
-
- This class stores information about the test to be executed (gtest binary and
- test name), and its result (log file, exit code and runtime).
- Each task is uniquely identified by the gtest binary, the test name and an
- execution number that increases each time the test is executed.
- Additionaly we store the last execution time, so that next time the test is
- executed, the slowest tests are run first.
- """
- def __init__(self, test_binary, test_name, test_command,
- last_execution_time, output_dir):
- self.test_name = test_name
- self.output_dir = output_dir
- self.test_binary = test_binary
- self.test_command = test_command
- self.last_execution_time = last_execution_time
-
- self.test_id = (test_binary, test_name)
-
- self.execution_number = task_manager.get_next_execution_number(self.test_id)
- log_name = '%s-%s-%s.log' % (self.__normalize(test_binary),
- self.__normalize(test_name),
- self.execution_number)
-
- self.log_file = os.path.join(output_dir, log_name)
- self.task_id = (test_binary, test_name, self.execution_number)
-
- self.exit_code = None
- self.runtime_ms = None
-
- def __lt__(self, other):
- if self.last_execution_time is None:
- return True
- if other.last_execution_time is None:
- return False
- return self.last_execution_time > other.last_execution_time
-
- def __normalize(self, string):
- return re.sub('[^A-Za-z0-9]', '_', string)
-
- def run(self):
- begin = time.time()
- with open(self.log_file, 'w') as log:
- task = subprocess.Popen(self.test_command, stdout=log, stderr=log)
- try:
- self.exit_code = sigint_handler.wait(task)
- except sigint_handler.ProcessWasInterrupted:
- thread.exit()
- self.runtime_ms = int(1000 * (time.time() - begin))
- self.last_execution_time = None if self.exit_code else self.runtime_ms
-
-
-class TaskManager(object):
- """Executes the tasks and stores the passed, failed and interrupted tasks.
-
- When a task is run, this class keeps track if it passed, failed or was
- interrupted. After a task finishes it calls the relevant functions of the
- Logger, TestResults and TestTimes classes, and in case of failure, retries the
- test as specified by the --retry_failed flag.
- """
- def __init__(self, times, logger, test_results):
- self.times = times
- self.logger = logger
- self.test_results = test_results
-
- self.global_exit_code = 0
-
- self.passed = []
- self.failed = []
- self.started = {}
- self.execution_number = {}
-
- self.lock = threading.Lock()
-
- def get_next_execution_number(self, test_id):
- with self.lock:
- next_execution_number = self.execution_number.setdefault(test_id, 1)
- self.execution_number[test_id] += 1
- return next_execution_number
-
- def __register_start(self, task):
- with self.lock:
- self.started[task.task_id] = task
-
- def __register_exit(self, task):
- self.logger.log_exit(task)
- self.times.record_test_time(task.test_binary, task.test_name,
- task.last_execution_time)
- if self.test_results:
- test_results.log(task.test_name, task.runtime_ms,
- "PASS" if task.exit_code == 0 else "FAIL")
-
- with self.lock:
- self.started.pop(task.task_id)
- if task.exit_code == 0:
- self.passed.append(task)
- else:
- self.failed.append(task)
-
- def run_task(self, task):
- for try_number in range(options.retry_failed + 1):
- self.__register_start(task)
- task.run()
- self.__register_exit(task)
-
- if task.exit_code == 0:
- break
-
- if try_number < options.retry_failed:
- # We need create a new Task instance. Each task represents a single test
- # execution, with its own runtime, exit code and log file.
- task = Task(task.test_binary, task.test_name, task.test_command,
- task.last_execution_time, task.output_dir)
-
- with self.lock:
- if task.exit_code != 0:
- self.global_exit_code = task.exit_code
-
-
-class FilterFormat(object):
- def __init__(self, output_dir):
- if sys.stdout.isatty():
- # stdout needs to be unbuffered since the output is interactive.
- sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
-
- self.output_dir = output_dir
-
- self.total_tasks = 0
- self.finished_tasks = 0
- self.out = Outputter(sys.stdout)
- self.stdout_lock = threading.Lock()
-
- def move_to(self, destination_dir, tasks):
- destination_dir = os.path.join(self.output_dir, destination_dir)
- os.makedirs(destination_dir)
- for task in tasks:
- shutil.move(task.log_file, destination_dir)
-
-
- def print_tests(self, message, tasks):
- self.out.permanent_line("%s (%s/%s):" %
- (message, len(tasks), self.total_tasks))
- for task in sorted(tasks):
- runtime_ms = 'Interrupted'
- if task.runtime_ms is not None:
- runtime_ms = '%d ms' % task.runtime_ms
- self.out.permanent_line("%11s: %s %s (try #%d)" % (
- runtime_ms, task.test_binary, task.test_name, task.execution_number))
-
- def log_exit(self, task):
- with self.stdout_lock:
- self.finished_tasks += 1
- self.out.transient_line("[%d/%d] %s (%d ms)"
- % (self.finished_tasks, self.total_tasks,
- task.test_name, task.runtime_ms))
- if task.exit_code != 0:
- with open(task.log_file) as f:
- for line in f.readlines():
- self.out.permanent_line(line.rstrip())
- self.out.permanent_line(
- "[%d/%d] %s returned/aborted with exit code %d (%d ms)"
- % (self.finished_tasks, self.total_tasks, task.test_name,
- task.exit_code, task.runtime_ms))
-
- def log_tasks(self, total_tasks):
- self.total_tasks += total_tasks
- self.out.transient_line("[0/%d] Running tests..." % self.total_tasks)
-
- def flush(self):
- self.out.flush_transient_output()
-
-
-class CollectTestResults(object):
- def __init__(self, json_dump_filepath):
- self.test_results_lock = threading.Lock()
- self.json_dump_file = open(json_dump_filepath, 'w')
- self.test_results = {
- "interrupted": False,
- "path_delimiter": ".",
- # Third version of the file format. See the link in the flag description
- # for details.
- "version": 3,
- "seconds_since_epoch": int(time.time()),
- "num_failures_by_type": {
- "PASS": 0,
- "FAIL": 0,
- },
- "tests": {},
- }
-
- def log(self, test, runtime_ms, actual_result):
- with self.test_results_lock:
- self.test_results['num_failures_by_type'][actual_result] += 1
- results = self.test_results['tests']
- for name in test.split('.'):
- results = results.setdefault(name, {})
-
- if results:
- results['actual'] += ' ' + actual_result
- results['times'].append(runtime_ms)
- else: # This is the first invocation of the test
- results['actual'] = actual_result
- results['times'] = [runtime_ms]
- results['time'] = runtime_ms
- results['expected'] = 'PASS'
-
- def dump_to_file_and_close(self):
- json.dump(self.test_results, self.json_dump_file)
- self.json_dump_file.close()
-
-class DummyTimer(object):
- def start(self):
- pass
- def cancel(self):
- pass
-
-# Record of test runtimes. Has built-in locking.
-class TestTimes(object):
- def __init__(self, save_file):
- "Create new object seeded with saved test times from the given file."
- self.__times = {} # (test binary, test name) -> runtime in ms
-
- # Protects calls to record_test_time(); other calls are not
- # expected to be made concurrently.
- self.__lock = threading.Lock()
-
- try:
- with gzip.GzipFile(save_file, "rb") as f:
- times = cPickle.load(f)
- except (EOFError, IOError, cPickle.UnpicklingError, zlib.error):
- # File doesn't exist, isn't readable, is malformed---whatever.
- # Just ignore it.
- return
-
- # Discard saved times if the format isn't right.
- if type(times) is not dict:
- return
- for ((test_binary, test_name), runtime) in times.items():
- if (type(test_binary) is not str or type(test_name) is not str
- or type(runtime) not in {int, long, type(None)}):
- return
-
- self.__times = times
-
- def get_test_time(self, binary, testname):
- """Return the last duration for the given test as an integer number of
- milliseconds, or None if the test failed or if there's no record for it."""
- return self.__times.get((binary, testname), None)
-
- def record_test_time(self, binary, testname, runtime_ms):
- """Record that the given test ran in the specified number of
- milliseconds. If the test failed, runtime_ms should be None."""
- with self.__lock:
- self.__times[(binary, testname)] = runtime_ms
-
- def write_to_file(self, save_file):
- "Write all the times to file."
- try:
- with open(save_file, "wb") as f:
- with gzip.GzipFile("", "wb", 9, f) as gzf:
- cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL)
- except IOError:
- pass # ignore errors---saving the times isn't that important
-
-
-def find_tests(binaries, additional_args):
- test_count = 0
- tasks = []
- for test_binary in binaries:
- command = [test_binary]
- if options.gtest_also_run_disabled_tests:
- command += ['--gtest_also_run_disabled_tests']
-
- list_command = command + ['--gtest_list_tests']
- if options.gtest_filter != '':
- list_command += ['--gtest_filter=' + options.gtest_filter]
-
- try:
- test_list = subprocess.Popen(list_command,
- stdout=subprocess.PIPE).communicate()[0]
- except OSError as e:
- sys.exit("%s: %s" % (test_binary, str(e)))
-
- command += additional_args + ['--gtest_color=' + options.gtest_color]
-
- test_group = ''
- for line in test_list.split('\n'):
- if not line.strip():
- continue
- if line[0] != " ":
- # Remove comments for typed tests and strip whitespace.
- test_group = line.split('#')[0].strip()
- continue
- # Remove comments for parameterized tests and strip whitespace.
- line = line.split('#')[0].strip()
- if not line:
- continue
-
- test_name = test_group + line
- if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test_name:
- continue
-
- last_execution_time = times.get_test_time(test_binary, test_name)
- if options.failed and last_execution_time is not None:
- continue
-
- test_command = command + ['--gtest_filter=' + test_name]
- if (test_count - options.shard_index) % options.shard_count == 0:
- for _ in range(options.repeat):
- tasks.append(Task(test_binary, test_name, test_command,
- last_execution_time, options.output_dir))
-
- test_count += 1
-
- return tasks
-
-
-def execute_tasks(tasks):
- class WorkerFn(object):
- def __init__(self, tasks):
- self.task_id = 0
- self.tasks = tasks
- self.task_lock = threading.Lock()
-
- def __call__(self):
- while True:
- with self.task_lock:
- if self.task_id < len(self.tasks):
- task = self.tasks[self.task_id]
- self.task_id += 1
- else:
- return
- task_manager.run_task(task)
-
- def start_daemon(func):
- t = threading.Thread(target=func)
- t.daemon = True
- t.start()
- return t
-
- try:
- timeout.start()
- worker_fn = WorkerFn(tasks)
- workers = [start_daemon(worker_fn) for _ in range(options.workers)]
- for worker in workers:
- worker.join()
- finally:
- timeout.cancel()
-
-
-# Remove additional arguments (anything after --).
-additional_args = []
-
-for i in range(len(sys.argv)):
- if sys.argv[i] == '--':
- additional_args = sys.argv[i+1:]
- sys.argv = sys.argv[:i]
- break
-
-parser = optparse.OptionParser(
- usage = 'usage: %prog [options] binary [binary ...] -- [additional args]')
-
-parser.add_option('-d', '--output_dir', type='string',
- default=os.path.join(tempfile.gettempdir(), "gtest-parallel"),
- help='output directory for test logs')
-parser.add_option('-r', '--repeat', type='int', default=1,
- help='Number of times to execute all the tests.')
-parser.add_option('--retry_failed', type='int', default=0,
- help='Number of times to repeat failed tests.')
-parser.add_option('--failed', action='store_true', default=False,
- help='run only failed and new tests')
-parser.add_option('-w', '--workers', type='int',
- default=multiprocessing.cpu_count(),
- help='number of workers to spawn')
-parser.add_option('--gtest_color', type='string', default='yes',
- help='color output')
-parser.add_option('--gtest_filter', type='string', default='',
- help='test filter')
-parser.add_option('--gtest_also_run_disabled_tests', action='store_true',
- default=False, help='run disabled tests too')
-parser.add_option('--print_test_times', action='store_true', default=False,
- help='list the run time of each test at the end of execution')
-parser.add_option('--shard_count', type='int', default=1,
- help='total number of shards (for sharding test execution '
- 'between multiple machines)')
-parser.add_option('--shard_index', type='int', default=0,
- help='zero-indexed number identifying this shard (for '
- 'sharding test execution between multiple machines)')
-parser.add_option('--dump_json_test_results', type='string', default=None,
- help='Saves the results of the tests as a JSON machine-'
- 'readable file. The format of the file is specified at '
- 'https://www.chromium.org/developers/the-json-test-results-format')
-parser.add_option('--timeout', type='int', default=None,
- help='Interrupt all remaining processes after the given '
- 'time (in seconds).')
-
-(options, binaries) = parser.parse_args()
-
-if binaries == []:
- parser.print_usage()
- sys.exit(1)
-
-if options.shard_count < 1:
- parser.error("Invalid number of shards: %d. Must be at least 1." %
- options.shard_count)
-if not (0 <= options.shard_index < options.shard_count):
- parser.error("Invalid shard index: %d. Must be between 0 and %d "
- "(less than the number of shards)." %
- (options.shard_index, options.shard_count - 1))
-
-# Check that all test binaries have an unique basename. That way we can ensure
-# the logs are saved to unique files even when two different binaries have
-# common tests.
-unique_binaries = set(os.path.basename(binary) for binary in binaries)
-assert len(unique_binaries) == len(binaries), (
- "All test binaries must have an unique basename.")
-
-# Remove files from old test runs.
-if os.path.isdir(options.output_dir):
- shutil.rmtree(options.output_dir)
-# Create directory for test log output.
-try:
- os.makedirs(options.output_dir)
-except OSError as e:
- # Ignore errors if this directory already exists.
- if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir):
- raise e
-
-timeout = (DummyTimer() if options.timeout is None
- else threading.Timer(options.timeout, sigint_handler.interrupt))
-
-test_results = None
-if options.dump_json_test_results is not None:
- test_results = CollectTestResults(options.dump_json_test_results)
-
-save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times")
-times = TestTimes(save_file)
-logger = FilterFormat(options.output_dir)
-
-task_manager = TaskManager(times, logger, test_results)
-
-tasks = find_tests(binaries, additional_args)
-logger.log_tasks(len(tasks))
-execute_tasks(tasks)
-
-if task_manager.passed:
- logger.move_to('passed', task_manager.passed)
- if options.print_test_times:
- logger.print_tests('PASSED TESTS', task_manager.passed)
-
-if task_manager.failed:
- logger.print_tests('FAILED TESTS', task_manager.failed)
- logger.move_to('failed', task_manager.failed)
-
-if task_manager.started:
- logger.print_tests('INTERRUPTED TESTS', task_manager.started.values())
- logger.move_to('interrupted', task_manager.started.values())
-
-logger.flush()
-times.write_to_file(save_file)
-if test_results:
- test_results.dump_to_file_and_close()
-
-if sigint_handler.got_sigint():
- task_manager.global_exit_code = -signal.SIGINT
-sys.exit(task_manager.global_exit_code)
+sys.exit(gtest_parallel.main())
diff --git a/gtest_parallel.py b/gtest_parallel.py
new file mode 100755
index 0000000..e7def05
--- /dev/null
+++ b/gtest_parallel.py
@@ -0,0 +1,594 @@
+#!/usr/bin/env python2
+# Copyright 2013 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import cPickle
+import errno
+import gzip
+import json
+import multiprocessing
+import optparse
+import os
+import re
+import shutil
+import signal
+import subprocess
+import sys
+import tempfile
+import thread
+import threading
+import time
+import zlib
+
+# An object that catches SIGINT sent to the Python process and notices
+# if processes passed to wait() die by SIGINT (we need to look for
+# both of those cases, because pressing Ctrl+C can result in either
+# the main process or one of the subprocesses getting the signal).
+#
+# Before a SIGINT is seen, wait(p) will simply call p.wait() and
+# return the result. Once a SIGINT has been seen (in the main process
+# or a subprocess, including the one the current call is waiting for),
+# wait(p) will call p.terminate() and raise ProcessWasInterrupted.
+class SigintHandler(object):
+ class ProcessWasInterrupted(Exception): pass
+ sigint_returncodes = {-signal.SIGINT, # Unix
+ -1073741510, # Windows
+ }
+ def __init__(self):
+ self.__lock = threading.Lock()
+ self.__processes = set()
+ self.__got_sigint = False
+ signal.signal(signal.SIGINT, lambda signal_num, frame: self.interrupt())
+ def __on_sigint(self):
+ self.__got_sigint = True
+ while self.__processes:
+ try:
+ self.__processes.pop().terminate()
+ except OSError:
+ pass
+ def interrupt(self):
+ with self.__lock:
+ self.__on_sigint()
+ def got_sigint(self):
+ with self.__lock:
+ return self.__got_sigint
+ def wait(self, p):
+ with self.__lock:
+ if self.__got_sigint:
+ p.terminate()
+ self.__processes.add(p)
+ code = p.wait()
+ with self.__lock:
+ self.__processes.discard(p)
+ if code in self.sigint_returncodes:
+ self.__on_sigint()
+ if self.__got_sigint:
+ raise self.ProcessWasInterrupted
+ return code
+sigint_handler = SigintHandler()
+
+# Return the width of the terminal, or None if it couldn't be
+# determined (e.g. because we're not being run interactively).
+def term_width(out):
+ if not out.isatty():
+ return None
+ try:
+ p = subprocess.Popen(["stty", "size"],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ (out, err) = p.communicate()
+ if p.returncode != 0 or err:
+ return None
+ return int(out.split()[1])
+ except (IndexError, OSError, ValueError):
+ return None
+
+# Output transient and permanent lines of text. If several transient
+# lines are written in sequence, the new will overwrite the old. We
+# use this to ensure that lots of unimportant info (tests passing)
+# won't drown out important info (tests failing).
+class Outputter(object):
+ def __init__(self, out_file):
+ self.__out_file = out_file
+ self.__previous_line_was_transient = False
+ self.__width = term_width(out_file) # Line width, or None if not a tty.
+ def transient_line(self, msg):
+ if self.__width is None:
+ self.__out_file.write(msg + "\n")
+ else:
+ self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width))
+ self.__previous_line_was_transient = True
+ def flush_transient_output(self):
+ if self.__previous_line_was_transient:
+ self.__out_file.write("\n")
+ self.__previous_line_was_transient = False
+ def permanent_line(self, msg):
+ self.flush_transient_output()
+ self.__out_file.write(msg + "\n")
+
+
+class Task(object):
+ """Stores information about a task (single execution of a test).
+
+ This class stores information about the test to be executed (gtest binary and
+ test name), and its result (log file, exit code and runtime).
+ Each task is uniquely identified by the gtest binary, the test name and an
+ execution number that increases each time the test is executed.
+ Additionaly we store the last execution time, so that next time the test is
+ executed, the slowest tests are run first.
+ """
+ def __init__(self, test_binary, test_name, test_command, execution_number,
+ last_execution_time, output_dir):
+ self.test_name = test_name
+ self.output_dir = output_dir
+ self.test_binary = test_binary
+ self.test_command = test_command
+ self.execution_number = execution_number
+ self.last_execution_time = last_execution_time
+
+ self.exit_code = None
+ self.runtime_ms = None
+
+ self.test_id = (test_binary, test_name)
+ self.task_id = (test_binary, test_name, self.execution_number)
+
+ log_name = '%s-%s-%s.log' % (self.__normalize(test_binary),
+ self.__normalize(test_name),
+ self.execution_number)
+
+ self.log_file = os.path.join(output_dir, log_name)
+
+ def __lt__(self, other):
+ if self.last_execution_time is None:
+ return True
+ if other.last_execution_time is None:
+ return False
+ return self.last_execution_time > other.last_execution_time
+
+ def __normalize(self, string):
+ return re.sub('[^A-Za-z0-9]', '_', string)
+
+ def run(self):
+ begin = time.time()
+ with open(self.log_file, 'w') as log:
+ task = subprocess.Popen(self.test_command, stdout=log, stderr=log)
+ try:
+ self.exit_code = sigint_handler.wait(task)
+ except sigint_handler.ProcessWasInterrupted:
+ thread.exit()
+ self.runtime_ms = int(1000 * (time.time() - begin))
+ self.last_execution_time = None if self.exit_code else self.runtime_ms
+
+
+class TaskManager(object):
+ """Executes the tasks and stores the passed, failed and interrupted tasks.
+
+ When a task is run, this class keeps track if it passed, failed or was
+ interrupted. After a task finishes it calls the relevant functions of the
+ Logger, TestResults and TestTimes classes, and in case of failure, retries the
+ test as specified by the --retry_failed flag.
+ """
+ def __init__(self, times, logger, test_results, times_to_retry,
+ initial_execution_number):
+ self.times = times
+ self.logger = logger
+ self.test_results = test_results
+ self.times_to_retry = times_to_retry
+ self.initial_execution_number = initial_execution_number
+
+ self.global_exit_code = 0
+
+ self.passed = []
+ self.failed = []
+ self.started = {}
+ self.execution_number = {}
+
+ self.lock = threading.Lock()
+
+ def __get_next_execution_number(self, test_id):
+ with self.lock:
+ next_execution_number = self.execution_number.setdefault(
+ test_id, self.initial_execution_number)
+ self.execution_number[test_id] += 1
+ return next_execution_number
+
+ def __register_start(self, task):
+ with self.lock:
+ self.started[task.task_id] = task
+
+ def __register_exit(self, task):
+ self.logger.log_exit(task)
+ self.times.record_test_time(task.test_binary, task.test_name,
+ task.last_execution_time)
+ if self.test_results:
+ test_results.log(task.test_name, task.runtime_ms,
+ "PASS" if task.exit_code == 0 else "FAIL")
+
+ with self.lock:
+ self.started.pop(task.task_id)
+ if task.exit_code == 0:
+ self.passed.append(task)
+ else:
+ self.failed.append(task)
+
+ def run_task(self, task):
+ for try_number in range(self.times_to_retry + 1):
+ self.__register_start(task)
+ task.run()
+ self.__register_exit(task)
+
+ if task.exit_code == 0:
+ break
+
+ if try_number < self.times_to_retry:
+ execution_number = self.__get_next_execution_number(task.test_id)
+ # We need create a new Task instance. Each task represents a single test
+ # execution, with its own runtime, exit code and log file.
+ task = Task(task.test_binary, task.test_name, task.test_command,
+ execution_number, task.last_execution_time, task.output_dir)
+
+ with self.lock:
+ if task.exit_code != 0:
+ self.global_exit_code = task.exit_code
+
+
+class FilterFormat(object):
+ def __init__(self, output_dir):
+ if sys.stdout.isatty():
+ # stdout needs to be unbuffered since the output is interactive.
+ sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
+
+ self.output_dir = output_dir
+
+ self.total_tasks = 0
+ self.finished_tasks = 0
+ self.out = Outputter(sys.stdout)
+ self.stdout_lock = threading.Lock()
+
+ def move_to(self, destination_dir, tasks):
+ destination_dir = os.path.join(self.output_dir, destination_dir)
+ os.makedirs(destination_dir)
+ for task in tasks:
+ shutil.move(task.log_file, destination_dir)
+
+
+ def print_tests(self, message, tasks):
+ self.out.permanent_line("%s (%s/%s):" %
+ (message, len(tasks), self.total_tasks))
+ for task in sorted(tasks):
+ runtime_ms = 'Interrupted'
+ if task.runtime_ms is not None:
+ runtime_ms = '%d ms' % task.runtime_ms
+ self.out.permanent_line("%11s: %s %s (try #%d)" % (
+ runtime_ms, task.test_binary, task.test_name, task.execution_number))
+
+ def log_exit(self, task):
+ with self.stdout_lock:
+ self.finished_tasks += 1
+ self.out.transient_line("[%d/%d] %s (%d ms)"
+ % (self.finished_tasks, self.total_tasks,
+ task.test_name, task.runtime_ms))
+ if task.exit_code != 0:
+ with open(task.log_file) as f:
+ for line in f.readlines():
+ self.out.permanent_line(line.rstrip())
+ self.out.permanent_line(
+ "[%d/%d] %s returned/aborted with exit code %d (%d ms)"
+ % (self.finished_tasks, self.total_tasks, task.test_name,
+ task.exit_code, task.runtime_ms))
+
+ def log_tasks(self, total_tasks):
+ self.total_tasks += total_tasks
+ self.out.transient_line("[0/%d] Running tests..." % self.total_tasks)
+
+ def flush(self):
+ self.out.flush_transient_output()
+
+
+class CollectTestResults(object):
+ def __init__(self, json_dump_filepath):
+ self.test_results_lock = threading.Lock()
+ self.json_dump_file = open(json_dump_filepath, 'w')
+ self.test_results = {
+ "interrupted": False,
+ "path_delimiter": ".",
+ # Third version of the file format. See the link in the flag description
+ # for details.
+ "version": 3,
+ "seconds_since_epoch": int(time.time()),
+ "num_failures_by_type": {
+ "PASS": 0,
+ "FAIL": 0,
+ },
+ "tests": {},
+ }
+
+ def log(self, test, runtime_ms, actual_result):
+ with self.test_results_lock:
+ self.test_results['num_failures_by_type'][actual_result] += 1
+ results = self.test_results['tests']
+ for name in test.split('.'):
+ results = results.setdefault(name, {})
+
+ if results:
+ results['actual'] += ' ' + actual_result
+ results['times'].append(runtime_ms)
+ else: # This is the first invocation of the test
+ results['actual'] = actual_result
+ results['times'] = [runtime_ms]
+ results['time'] = runtime_ms
+ results['expected'] = 'PASS'
+
+ def dump_to_file_and_close(self):
+ json.dump(self.test_results, self.json_dump_file)
+ self.json_dump_file.close()
+
+class DummyTimer(object):
+ def start(self):
+ pass
+ def cancel(self):
+ pass
+
+# Record of test runtimes. Has built-in locking.
+class TestTimes(object):
+ def __init__(self, save_file):
+ "Create new object seeded with saved test times from the given file."
+ self.__times = {} # (test binary, test name) -> runtime in ms
+
+ # Protects calls to record_test_time(); other calls are not
+ # expected to be made concurrently.
+ self.__lock = threading.Lock()
+
+ try:
+ with gzip.GzipFile(save_file, "rb") as f:
+ times = cPickle.load(f)
+ except (EOFError, IOError, cPickle.UnpicklingError, zlib.error):
+ # File doesn't exist, isn't readable, is malformed---whatever.
+ # Just ignore it.
+ return
+
+ # Discard saved times if the format isn't right.
+ if type(times) is not dict:
+ return
+ for ((test_binary, test_name), runtime) in times.items():
+ if (type(test_binary) is not str or type(test_name) is not str
+ or type(runtime) not in {int, long, type(None)}):
+ return
+
+ self.__times = times
+
+ def get_test_time(self, binary, testname):
+ """Return the last duration for the given test as an integer number of
+ milliseconds, or None if the test failed or if there's no record for it."""
+ return self.__times.get((binary, testname), None)
+
+ def record_test_time(self, binary, testname, runtime_ms):
+ """Record that the given test ran in the specified number of
+ milliseconds. If the test failed, runtime_ms should be None."""
+ with self.__lock:
+ self.__times[(binary, testname)] = runtime_ms
+
+ def write_to_file(self, save_file):
+ "Write all the times to file."
+ try:
+ with open(save_file, "wb") as f:
+ with gzip.GzipFile("", "wb", 9, f) as gzf:
+ cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL)
+ except IOError:
+ pass # ignore errors---saving the times isn't that important
+
+
+def find_tests(binaries, additional_args, options, times):
+ test_count = 0
+ tasks = []
+ for test_binary in binaries:
+ command = [test_binary]
+ if options.gtest_also_run_disabled_tests:
+ command += ['--gtest_also_run_disabled_tests']
+
+ list_command = command + ['--gtest_list_tests']
+ if options.gtest_filter != '':
+ list_command += ['--gtest_filter=' + options.gtest_filter]
+
+ try:
+ test_list = subprocess.Popen(list_command,
+ stdout=subprocess.PIPE).communicate()[0]
+ except OSError as e:
+ sys.exit("%s: %s" % (test_binary, str(e)))
+
+ command += additional_args + ['--gtest_color=' + options.gtest_color]
+
+ test_group = ''
+ for line in test_list.split('\n'):
+ if not line.strip():
+ continue
+ if line[0] != " ":
+ # Remove comments for typed tests and strip whitespace.
+ test_group = line.split('#')[0].strip()
+ continue
+ # Remove comments for parameterized tests and strip whitespace.
+ line = line.split('#')[0].strip()
+ if not line:
+ continue
+
+ test_name = test_group + line
+ if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test_name:
+ continue
+
+ last_execution_time = times.get_test_time(test_binary, test_name)
+ if options.failed and last_execution_time is not None:
+ continue
+
+ test_command = command + ['--gtest_filter=' + test_name]
+ if (test_count - options.shard_index) % options.shard_count == 0:
+ for execution_number in range(options.repeat):
+ tasks.append(Task(test_binary, test_name, test_command,
+ execution_number + 1, last_execution_time,
+ options.output_dir))
+
+ test_count += 1
+
+ return tasks
+
+
+def execute_tasks(tasks, pool_size, task_manager, timeout):
+ class WorkerFn(object):
+ def __init__(self, tasks):
+ self.task_id = 0
+ self.tasks = tasks
+ self.task_lock = threading.Lock()
+
+ def __call__(self):
+ while True:
+ with self.task_lock:
+ if self.task_id < len(self.tasks):
+ task = self.tasks[self.task_id]
+ self.task_id += 1
+ else:
+ return
+ task_manager.run_task(task)
+
+ def start_daemon(func):
+ t = threading.Thread(target=func)
+ t.daemon = True
+ t.start()
+ return t
+
+ try:
+ timeout.start()
+ worker_fn = WorkerFn(tasks)
+ workers = [start_daemon(worker_fn) for _ in range(pool_size)]
+ for worker in workers:
+ worker.join()
+ finally:
+ timeout.cancel()
+
+
+def main():
+ # Remove additional arguments (anything after --).
+ additional_args = []
+
+ for i in range(len(sys.argv)):
+ if sys.argv[i] == '--':
+ additional_args = sys.argv[i+1:]
+ sys.argv = sys.argv[:i]
+ break
+
+ parser = optparse.OptionParser(
+ usage = 'usage: %prog [options] binary [binary ...] -- [additional args]')
+
+ parser.add_option('-d', '--output_dir', type='string',
+ default=os.path.join(tempfile.gettempdir(), "gtest-parallel"),
+ help='output directory for test logs')
+ parser.add_option('-r', '--repeat', type='int', default=1,
+ help='Number of times to execute all the tests.')
+ parser.add_option('--retry_failed', type='int', default=0,
+ help='Number of times to repeat failed tests.')
+ parser.add_option('--failed', action='store_true', default=False,
+ help='run only failed and new tests')
+ parser.add_option('-w', '--workers', type='int',
+ default=multiprocessing.cpu_count(),
+ help='number of workers to spawn')
+ parser.add_option('--gtest_color', type='string', default='yes',
+ help='color output')
+ parser.add_option('--gtest_filter', type='string', default='',
+ help='test filter')
+ parser.add_option('--gtest_also_run_disabled_tests', action='store_true',
+ default=False, help='run disabled tests too')
+ parser.add_option('--print_test_times', action='store_true', default=False,
+ help='list the run time of each test at the end of execution')
+ parser.add_option('--shard_count', type='int', default=1,
+ help='total number of shards (for sharding test execution '
+ 'between multiple machines)')
+ parser.add_option('--shard_index', type='int', default=0,
+ help='zero-indexed number identifying this shard (for '
+ 'sharding test execution between multiple machines)')
+ parser.add_option('--dump_json_test_results', type='string', default=None,
+ help='Saves the results of the tests as a JSON machine-'
+ 'readable file. The format of the file is specified at '
+ 'https://www.chromium.org/developers/the-json-test-results-format')
+ parser.add_option('--timeout', type='int', default=None,
+ help='Interrupt all remaining processes after the given '
+ 'time (in seconds).')
+
+ (options, binaries) = parser.parse_args()
+
+ if binaries == []:
+ parser.print_usage()
+ sys.exit(1)
+
+ if options.shard_count < 1:
+ parser.error("Invalid number of shards: %d. Must be at least 1." %
+ options.shard_count)
+ if not (0 <= options.shard_index < options.shard_count):
+ parser.error("Invalid shard index: %d. Must be between 0 and %d "
+ "(less than the number of shards)." %
+ (options.shard_index, options.shard_count - 1))
+
+ # Check that all test binaries have an unique basename. That way we can ensure
+ # the logs are saved to unique files even when two different binaries have
+ # common tests.
+ unique_binaries = set(os.path.basename(binary) for binary in binaries)
+ assert len(unique_binaries) == len(binaries), (
+ "All test binaries must have an unique basename.")
+
+ # Remove files from old test runs.
+ if os.path.isdir(options.output_dir):
+ shutil.rmtree(options.output_dir)
+ # Create directory for test log output.
+ try:
+ os.makedirs(options.output_dir)
+ except OSError as e:
+ # Ignore errors if this directory already exists.
+ if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir):
+ raise e
+
+ timeout = (DummyTimer() if options.timeout is None
+ else threading.Timer(options.timeout, sigint_handler.interrupt))
+
+ test_results = None
+ if options.dump_json_test_results is not None:
+ test_results = CollectTestResults(options.dump_json_test_results)
+
+ save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times")
+ times = TestTimes(save_file)
+ logger = FilterFormat(options.output_dir)
+
+ task_manager = TaskManager(times, logger, test_results, options.retry_failed,
+ options.repeat + 1)
+
+ tasks = find_tests(binaries, additional_args, options, times)
+ logger.log_tasks(len(tasks))
+ execute_tasks(tasks, options.workers, task_manager, timeout)
+
+ if task_manager.passed:
+ logger.move_to('passed', task_manager.passed)
+ if options.print_test_times:
+ logger.print_tests('PASSED TESTS', task_manager.passed)
+
+ if task_manager.failed:
+ logger.print_tests('FAILED TESTS', task_manager.failed)
+ logger.move_to('failed', task_manager.failed)
+
+ if task_manager.started:
+ logger.print_tests('INTERRUPTED TESTS', task_manager.started.values())
+ logger.move_to('interrupted', task_manager.started.values())
+
+ logger.flush()
+ times.write_to_file(save_file)
+ if test_results:
+ test_results.dump_to_file_and_close()
+
+ if sigint_handler.got_sigint():
+ return -signal.SIGINT
+
+ return task_manager.global_exit_code