Move gtest-parallel to and make gtest-parallel a thin wrapper (#29)

+#!/usr/bin/env python2
+# Copyright 2013 Google Inc. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import cPickle
+import errno
+import gzip
+import json
+import multiprocessing
+import optparse
+import os
+import re
+import shutil
+import signal
+import subprocess
+import sys
+import tempfile
+import thread
+import threading
+import time
+import zlib
+# An object that catches SIGINT sent to the Python process and notices
+# if processes passed to wait() die by SIGINT (we need to look for
+# both of those cases, because pressing Ctrl+C can result in either
+# the main process or one of the subprocesses getting the signal).
+# Before a SIGINT is seen, wait(p) will simply call p.wait() and
+# return the result. Once a SIGINT has been seen (in the main process
+# or a subprocess, including the one the current call is waiting for),
+# wait(p) will call p.terminate() and raise ProcessWasInterrupted.
+class SigintHandler(object):
+  class ProcessWasInterrupted(Exception): pass
+  sigint_returncodes = {-signal.SIGINT,  # Unix
+                        -1073741510,     # Windows
+                        }
+  def __init__(self):
+    self.__lock = threading.Lock()
+    self.__processes = set()
+    self.__got_sigint = False
+    signal.signal(signal.SIGINT, lambda signal_num, frame: self.interrupt())
+  def __on_sigint(self):
+    self.__got_sigint = True
+    while self.__processes:
+      try:
+        self.__processes.pop().terminate()
+      except OSError:
+        pass
+  def interrupt(self):
+    with self.__lock:
+      self.__on_sigint()
+  def got_sigint(self):
+    with self.__lock:
+      return self.__got_sigint
+  def wait(self, p):
+    with self.__lock:
+      if self.__got_sigint:
+        p.terminate()
+      self.__processes.add(p)
+    code = p.wait()
+    with self.__lock:
+      self.__processes.discard(p)
+      if code in self.sigint_returncodes:
+        self.__on_sigint()
+      if self.__got_sigint:
+        raise self.ProcessWasInterrupted
+    return code
+sigint_handler = SigintHandler()
+# Return the width of the terminal, or None if it couldn't be
+# determined (e.g. because we're not being run interactively).
+def term_width(out):
+  if not out.isatty():
+    return None
+  try:
+    p = subprocess.Popen(["stty", "size"],
+                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    (out, err) = p.communicate()
+    if p.returncode != 0 or err:
+      return None
+    return int(out.split()[1])
+  except (IndexError, OSError, ValueError):
+    return None
+# Output transient and permanent lines of text. If several transient
+# lines are written in sequence, the new will overwrite the old. We
+# use this to ensure that lots of unimportant info (tests passing)
+# won't drown out important info (tests failing).
+class Outputter(object):
+  def __init__(self, out_file):
+    self.__out_file = out_file
+    self.__previous_line_was_transient = False
+    self.__width = term_width(out_file)  # Line width, or None if not a tty.
+  def transient_line(self, msg):
+    if self.__width is None:
+      self.__out_file.write(msg + "\n")
+    else:
+      self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width))
+      self.__previous_line_was_transient = True
+  def flush_transient_output(self):
+    if self.__previous_line_was_transient:
+      self.__out_file.write("\n")
+      self.__previous_line_was_transient = False
+  def permanent_line(self, msg):
+    self.flush_transient_output()
+    self.__out_file.write(msg + "\n")
+class Task(object):
+  """Stores information about a task (single execution of a test).
+  This class stores information about the test to be executed (gtest binary and
+  test name), and its result (log file, exit code and runtime).
+  Each task is uniquely identified by the gtest binary, the test name and an
+  execution number that increases each time the test is executed.
+  Additionaly we store the last execution time, so that next time the test is
+  executed, the slowest tests are run first.
+  """
+  def __init__(self, test_binary, test_name, test_command, execution_number,
+               last_execution_time, output_dir):
+    self.test_name = test_name
+    self.output_dir = output_dir
+    self.test_binary = test_binary
+    self.test_command = test_command
+    self.execution_number = execution_number
+    self.last_execution_time = last_execution_time
+    self.exit_code = None
+    self.runtime_ms = None
+    self.test_id = (test_binary, test_name)
+    self.task_id = (test_binary, test_name, self.execution_number)
+    log_name = '%s-%s-%s.log' % (self.__normalize(test_binary),
+                                 self.__normalize(test_name),
+                                 self.execution_number)
+    self.log_file = os.path.join(output_dir, log_name)
+  def __lt__(self, other):
+    if self.last_execution_time is None:
+      return True
+    if other.last_execution_time is None:
+      return False
+    return self.last_execution_time > other.last_execution_time
+  def __normalize(self, string):
+    return re.sub('[^A-Za-z0-9]', '_', string)
+  def run(self):
+    begin = time.time()
+    with open(self.log_file, 'w') as log:
+      task = subprocess.Popen(self.test_command, stdout=log, stderr=log)
+      try:
+        self.exit_code = sigint_handler.wait(task)
+      except sigint_handler.ProcessWasInterrupted:
+        thread.exit()
+    self.runtime_ms = int(1000 * (time.time() - begin))
+    self.last_execution_time = None if self.exit_code else self.runtime_ms
+class TaskManager(object):
+  """Executes the tasks and stores the passed, failed and interrupted tasks.
+  When a task is run, this class keeps track if it passed, failed or was
+  interrupted. After a task finishes it calls the relevant functions of the
+  Logger, TestResults and TestTimes classes, and in case of failure, retries the
+  test as specified by the --retry_failed flag.
+  """
+  def __init__(self, times, logger, test_results, times_to_retry,
+               initial_execution_number):
+    self.times = times
+    self.logger = logger
+    self.test_results = test_results
+    self.times_to_retry = times_to_retry
+    self.initial_execution_number = initial_execution_number
+    self.global_exit_code = 0
+    self.passed = []
+    self.failed = []
+    self.started = {}
+    self.execution_number = {}
+    self.lock = threading.Lock()
+  def __get_next_execution_number(self, test_id):
+    with self.lock:
+      next_execution_number = self.execution_number.setdefault(
+          test_id, self.initial_execution_number)
+      self.execution_number[test_id] += 1
+    return next_execution_number
+  def __register_start(self, task):
+    with self.lock:
+      self.started[task.task_id] = task
+  def __register_exit(self, task):
+    self.logger.log_exit(task)
+    self.times.record_test_time(task.test_binary, task.test_name,
+                                task.last_execution_time)
+    if self.test_results:
+      test_results.log(task.test_name, task.runtime_ms,
+                       "PASS" if task.exit_code == 0 else "FAIL")
+    with self.lock:
+      self.started.pop(task.task_id)
+      if task.exit_code == 0:
+        self.passed.append(task)
+      else:
+        self.failed.append(task)
+  def run_task(self, task):
+    for try_number in range(self.times_to_retry + 1):
+      self.__register_start(task)
+      self.__register_exit(task)
+      if task.exit_code == 0:
+        break
+      if try_number < self.times_to_retry:
+        execution_number = self.__get_next_execution_number(task.test_id)
+        # We need create a new Task instance. Each task represents a single test
+        # execution, with its own runtime, exit code and log file.
+        task = Task(task.test_binary, task.test_name, task.test_command,
+                    execution_number, task.last_execution_time, task.output_dir)
+    with self.lock:
+      if task.exit_code != 0:
+        self.global_exit_code = task.exit_code
+class FilterFormat(object):
+  def __init__(self, output_dir):
+    if sys.stdout.isatty():
+      # stdout needs to be unbuffered since the output is interactive.
+      sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
+    self.output_dir = output_dir
+    self.total_tasks = 0
+    self.finished_tasks = 0
+    self.out = Outputter(sys.stdout)
+    self.stdout_lock = threading.Lock()
+  def move_to(self, destination_dir, tasks):
+    destination_dir = os.path.join(self.output_dir, destination_dir)
+    os.makedirs(destination_dir)
+    for task in tasks:
+        shutil.move(task.log_file, destination_dir)
+  def print_tests(self, message, tasks):
+    self.out.permanent_line("%s (%s/%s):" %
+                            (message, len(tasks), self.total_tasks))
+    for task in sorted(tasks):
+      runtime_ms = 'Interrupted'
+      if task.runtime_ms is not None:
+        runtime_ms = '%d ms' % task.runtime_ms
+      self.out.permanent_line("%11s: %s %s (try #%d)" % (
+          runtime_ms, task.test_binary, task.test_name, task.execution_number))
+  def log_exit(self, task):
+    with self.stdout_lock:
+      self.finished_tasks += 1
+      self.out.transient_line("[%d/%d] %s (%d ms)"
+                              % (self.finished_tasks, self.total_tasks,
+                                 task.test_name, task.runtime_ms))
+      if task.exit_code != 0:
+        with open(task.log_file) as f:
+          for line in f.readlines():
+            self.out.permanent_line(line.rstrip())
+        self.out.permanent_line(
+          "[%d/%d] %s returned/aborted with exit code %d (%d ms)"
+          % (self.finished_tasks, self.total_tasks, task.test_name,
+             task.exit_code, task.runtime_ms))
+  def log_tasks(self, total_tasks):
+    self.total_tasks += total_tasks
+    self.out.transient_line("[0/%d] Running tests..." % self.total_tasks)
+  def flush(self):
+    self.out.flush_transient_output()
+class CollectTestResults(object):
+  def __init__(self, json_dump_filepath):
+    self.test_results_lock = threading.Lock()
+    self.json_dump_file = open(json_dump_filepath, 'w')
+    self.test_results = {
+        "interrupted": False,
+        "path_delimiter": ".",
+        # Third version of the file format. See the link in the flag description
+        # for details.
+        "version": 3,
+        "seconds_since_epoch": int(time.time()),
+        "num_failures_by_type": {
+            "PASS": 0,
+            "FAIL": 0,
+        },
+        "tests": {},
+    }
+  def log(self, test, runtime_ms, actual_result):
+    with self.test_results_lock:
+      self.test_results['num_failures_by_type'][actual_result] += 1
+      results = self.test_results['tests']
+      for name in test.split('.'):
+        results = results.setdefault(name, {})
+      if results:
+        results['actual'] += ' ' + actual_result
+        results['times'].append(runtime_ms)
+      else:  # This is the first invocation of the test
+        results['actual'] = actual_result
+        results['times'] = [runtime_ms]
+        results['time'] = runtime_ms
+        results['expected'] = 'PASS'
+  def dump_to_file_and_close(self):
+    json.dump(self.test_results, self.json_dump_file)
+    self.json_dump_file.close()
+class DummyTimer(object):
+  def start(self):
+    pass
+  def cancel(self):
+    pass
+# Record of test runtimes. Has built-in locking.
+class TestTimes(object):
+  def __init__(self, save_file):
+    "Create new object seeded with saved test times from the given file."
+    self.__times = {}  # (test binary, test name) -> runtime in ms
+    # Protects calls to record_test_time(); other calls are not
+    # expected to be made concurrently.
+    self.__lock = threading.Lock()
+    try:
+      with gzip.GzipFile(save_file, "rb") as f:
+        times = cPickle.load(f)
+    except (EOFError, IOError, cPickle.UnpicklingError, zlib.error):
+      # File doesn't exist, isn't readable, is malformed---whatever.
+      # Just ignore it.
+      return
+    # Discard saved times if the format isn't right.
+    if type(times) is not dict:
+      return
+    for ((test_binary, test_name), runtime) in times.items():
+      if (type(test_binary) is not str or type(test_name) is not str
+          or type(runtime) not in {int, long, type(None)}):
+        return
+    self.__times = times
+  def get_test_time(self, binary, testname):
+    """Return the last duration for the given test as an integer number of
+    milliseconds, or None if the test failed or if there's no record for it."""
+    return self.__times.get((binary, testname), None)
+  def record_test_time(self, binary, testname, runtime_ms):
+    """Record that the given test ran in the specified number of
+    milliseconds. If the test failed, runtime_ms should be None."""
+    with self.__lock:
+      self.__times[(binary, testname)] = runtime_ms
+  def write_to_file(self, save_file):
+    "Write all the times to file."
+    try:
+      with open(save_file, "wb") as f:
+        with gzip.GzipFile("", "wb", 9, f) as gzf:
+          cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL)
+    except IOError:
+      pass  # ignore errors---saving the times isn't that important
+def find_tests(binaries, additional_args, options, times):
+  test_count = 0
+  tasks = []
+  for test_binary in binaries:
+    command = [test_binary]
+    if options.gtest_also_run_disabled_tests:
+      command += ['--gtest_also_run_disabled_tests']
+    list_command = command + ['--gtest_list_tests']
+    if options.gtest_filter != '':
+      list_command += ['--gtest_filter=' + options.gtest_filter]
+    try:
+      test_list = subprocess.Popen(list_command,
+                                   stdout=subprocess.PIPE).communicate()[0]
+    except OSError as e:
+      sys.exit("%s: %s" % (test_binary, str(e)))
+    command += additional_args + ['--gtest_color=' + options.gtest_color]
+    test_group = ''
+    for line in test_list.split('\n'):
+      if not line.strip():
+        continue
+      if line[0] != " ":
+        # Remove comments for typed tests and strip whitespace.
+        test_group = line.split('#')[0].strip()
+        continue
+      # Remove comments for parameterized tests and strip whitespace.
+      line = line.split('#')[0].strip()
+      if not line:
+        continue
+      test_name = test_group + line
+      if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test_name:
+        continue
+      last_execution_time = times.get_test_time(test_binary, test_name)
+      if options.failed and last_execution_time is not None:
+        continue
+      test_command = command + ['--gtest_filter=' + test_name]
+      if (test_count - options.shard_index) % options.shard_count == 0:
+        for execution_number in range(options.repeat):
+          tasks.append(Task(test_binary, test_name, test_command,
+                            execution_number + 1, last_execution_time,
+                            options.output_dir))
+      test_count += 1
+  return tasks
+def execute_tasks(tasks, pool_size, task_manager, timeout):
+  class WorkerFn(object):
+    def __init__(self, tasks):
+      self.task_id = 0
+      self.tasks = tasks
+      self.task_lock = threading.Lock()
+    def __call__(self):
+      while True:
+        with self.task_lock:
+          if self.task_id < len(self.tasks):
+            task = self.tasks[self.task_id]
+            self.task_id += 1
+          else:
+            return
+        task_manager.run_task(task)
+  def start_daemon(func):
+    t = threading.Thread(target=func)
+    t.daemon = True
+    t.start()
+    return t
+  try:
+    timeout.start()
+    worker_fn = WorkerFn(tasks)
+    workers = [start_daemon(worker_fn) for _ in range(pool_size)]
+    for worker in workers:
+      worker.join()
+  finally:
+    timeout.cancel()
+def main():
+  # Remove additional arguments (anything after --).
+  additional_args = []
+  for i in range(len(sys.argv)):
+    if sys.argv[i] == '--':
+      additional_args = sys.argv[i+1:]
+      sys.argv = sys.argv[:i]
+      break
+  parser = optparse.OptionParser(
+      usage = 'usage: %prog [options] binary [binary ...] -- [additional args]')
+  parser.add_option('-d', '--output_dir', type='string',
+                    default=os.path.join(tempfile.gettempdir(), "gtest-parallel"),
+                    help='output directory for test logs')
+  parser.add_option('-r', '--repeat', type='int', default=1,
+                    help='Number of times to execute all the tests.')
+  parser.add_option('--retry_failed', type='int', default=0,
+                    help='Number of times to repeat failed tests.')
+  parser.add_option('--failed', action='store_true', default=False,
+                    help='run only failed and new tests')
+  parser.add_option('-w', '--workers', type='int',
+                    default=multiprocessing.cpu_count(),
+                    help='number of workers to spawn')
+  parser.add_option('--gtest_color', type='string', default='yes',
+                    help='color output')
+  parser.add_option('--gtest_filter', type='string', default='',
+                    help='test filter')
+  parser.add_option('--gtest_also_run_disabled_tests', action='store_true',
+                    default=False, help='run disabled tests too')
+  parser.add_option('--print_test_times', action='store_true', default=False,
+                    help='list the run time of each test at the end of execution')
+  parser.add_option('--shard_count', type='int', default=1,
+                    help='total number of shards (for sharding test execution '
+                         'between multiple machines)')
+  parser.add_option('--shard_index', type='int', default=0,
+                    help='zero-indexed number identifying this shard (for '
+                         'sharding test execution between multiple machines)')
+  parser.add_option('--dump_json_test_results', type='string', default=None,
+                    help='Saves the results of the tests as a JSON machine-'
+                         'readable file. The format of the file is specified at '
+                         '')
+  parser.add_option('--timeout', type='int', default=None,
+                    help='Interrupt all remaining processes after the given '
+                         'time (in seconds).')
+  (options, binaries) = parser.parse_args()
+  if binaries == []:
+    parser.print_usage()
+    sys.exit(1)
+  if options.shard_count < 1:
+    parser.error("Invalid number of shards: %d. Must be at least 1." %
+                 options.shard_count)
+  if not (0 <= options.shard_index < options.shard_count):
+    parser.error("Invalid shard index: %d. Must be between 0 and %d "
+                 "(less than the number of shards)." %
+                 (options.shard_index, options.shard_count - 1))
+  # Check that all test binaries have an unique basename. That way we can ensure
+  # the logs are saved to unique files even when two different binaries have
+  # common tests.
+  unique_binaries = set(os.path.basename(binary) for binary in binaries)
+  assert len(unique_binaries) == len(binaries), (
+      "All test binaries must have an unique basename.")
+  # Remove files from old test runs.
+  if os.path.isdir(options.output_dir):
+    shutil.rmtree(options.output_dir)
+  # Create directory for test log output.
+  try:
+    os.makedirs(options.output_dir)
+  except OSError as e:
+    # Ignore errors if this directory already exists.
+    if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir):
+      raise e
+  timeout = (DummyTimer() if options.timeout is None
+             else threading.Timer(options.timeout, sigint_handler.interrupt))
+  test_results = None
+  if options.dump_json_test_results is not None:
+    test_results = CollectTestResults(options.dump_json_test_results)
+  save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times")
+  times = TestTimes(save_file)
+  logger = FilterFormat(options.output_dir)
+  task_manager = TaskManager(times, logger, test_results, options.retry_failed,
+                             options.repeat + 1)
+  tasks = find_tests(binaries, additional_args, options, times)
+  logger.log_tasks(len(tasks))
+  execute_tasks(tasks, options.workers, task_manager, timeout)
+  if task_manager.passed:
+    logger.move_to('passed', task_manager.passed)
+    if options.print_test_times:
+      logger.print_tests('PASSED TESTS', task_manager.passed)
+  if task_manager.failed:
+    logger.print_tests('FAILED TESTS', task_manager.failed)
+    logger.move_to('failed', task_manager.failed)
+  if task_manager.started:
+    logger.print_tests('INTERRUPTED TESTS', task_manager.started.values())
+    logger.move_to('interrupted', task_manager.started.values())
+  logger.flush()
+  times.write_to_file(save_file)
+  if test_results:
+    test_results.dump_to_file_and_close()
+  if sigint_handler.got_sigint():
+    return -signal.SIGINT
+  return task_manager.global_exit_code