Move gtest-parallel to gtest_parallel.py and make gtest-parallel a thin wrapper (#29)

diff --git a/gtest-parallel b/gtest-parallel
index 4f5f91b..19b0803 100755
--- a/gtest-parallel
+++ b/gtest-parallel
@@ -1,5 +1,5 @@
 #!/usr/bin/env python2
-# Copyright 2013 Google Inc. All rights reserved.
+# Copyright 2017 Google Inc. All rights reserved.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,574 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import cPickle
-import errno
-import gzip
-import json
-import multiprocessing
-import optparse
-import os
-import re
-import shutil
-import signal
-import subprocess
+import gtest_parallel
 import sys
-import tempfile
-import thread
-import threading
-import time
-import zlib
 
-# An object that catches SIGINT sent to the Python process and notices
-# if processes passed to wait() die by SIGINT (we need to look for
-# both of those cases, because pressing Ctrl+C can result in either
-# the main process or one of the subprocesses getting the signal).
-#
-# Before a SIGINT is seen, wait(p) will simply call p.wait() and
-# return the result. Once a SIGINT has been seen (in the main process
-# or a subprocess, including the one the current call is waiting for),
-# wait(p) will call p.terminate() and raise ProcessWasInterrupted.
-class SigintHandler(object):
-  class ProcessWasInterrupted(Exception): pass
-  sigint_returncodes = {-signal.SIGINT,  # Unix
-                        -1073741510,     # Windows
-                        }
-  def __init__(self):
-    self.__lock = threading.Lock()
-    self.__processes = set()
-    self.__got_sigint = False
-    signal.signal(signal.SIGINT, lambda signal_num, frame: self.interrupt())
-  def __on_sigint(self):
-    self.__got_sigint = True
-    while self.__processes:
-      try:
-        self.__processes.pop().terminate()
-      except OSError:
-        pass
-  def interrupt(self):
-    with self.__lock:
-      self.__on_sigint()
-  def got_sigint(self):
-    with self.__lock:
-      return self.__got_sigint
-  def wait(self, p):
-    with self.__lock:
-      if self.__got_sigint:
-        p.terminate()
-      self.__processes.add(p)
-    code = p.wait()
-    with self.__lock:
-      self.__processes.discard(p)
-      if code in self.sigint_returncodes:
-        self.__on_sigint()
-      if self.__got_sigint:
-        raise self.ProcessWasInterrupted
-    return code
-sigint_handler = SigintHandler()
-
-# Return the width of the terminal, or None if it couldn't be
-# determined (e.g. because we're not being run interactively).
-def term_width(out):
-  if not out.isatty():
-    return None
-  try:
-    p = subprocess.Popen(["stty", "size"],
-                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-    (out, err) = p.communicate()
-    if p.returncode != 0 or err:
-      return None
-    return int(out.split()[1])
-  except (IndexError, OSError, ValueError):
-    return None
-
-# Output transient and permanent lines of text. If several transient
-# lines are written in sequence, the new will overwrite the old. We
-# use this to ensure that lots of unimportant info (tests passing)
-# won't drown out important info (tests failing).
-class Outputter(object):
-  def __init__(self, out_file):
-    self.__out_file = out_file
-    self.__previous_line_was_transient = False
-    self.__width = term_width(out_file)  # Line width, or None if not a tty.
-  def transient_line(self, msg):
-    if self.__width is None:
-      self.__out_file.write(msg + "\n")
-    else:
-      self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width))
-      self.__previous_line_was_transient = True
-  def flush_transient_output(self):
-    if self.__previous_line_was_transient:
-      self.__out_file.write("\n")
-      self.__previous_line_was_transient = False
-  def permanent_line(self, msg):
-    self.flush_transient_output()
-    self.__out_file.write(msg + "\n")
-
-
-class Task(object):
-  """Stores information about a task (single execution of a test).
-
-  This class stores information about the test to be executed (gtest binary and
-  test name), and its result (log file, exit code and runtime).
-  Each task is uniquely identified by the gtest binary, the test name and an
-  execution number that increases each time the test is executed.
-  Additionaly we store the last execution time, so that next time the test is
-  executed, the slowest tests are run first.
-  """
-  def __init__(self, test_binary, test_name, test_command,
-               last_execution_time, output_dir):
-    self.test_name = test_name
-    self.output_dir = output_dir
-    self.test_binary = test_binary
-    self.test_command = test_command
-    self.last_execution_time = last_execution_time
-
-    self.test_id = (test_binary, test_name)
-
-    self.execution_number = task_manager.get_next_execution_number(self.test_id)
-    log_name = '%s-%s-%s.log' % (self.__normalize(test_binary),
-                                 self.__normalize(test_name),
-                                 self.execution_number)
-
-    self.log_file = os.path.join(output_dir, log_name)
-    self.task_id = (test_binary, test_name, self.execution_number)
-
-    self.exit_code = None
-    self.runtime_ms = None
-
-  def __lt__(self, other):
-    if self.last_execution_time is None:
-      return True
-    if other.last_execution_time is None:
-      return False
-    return self.last_execution_time > other.last_execution_time
-
-  def __normalize(self, string):
-    return re.sub('[^A-Za-z0-9]', '_', string)
-
-  def run(self):
-    begin = time.time()
-    with open(self.log_file, 'w') as log:
-      task = subprocess.Popen(self.test_command, stdout=log, stderr=log)
-      try:
-        self.exit_code = sigint_handler.wait(task)
-      except sigint_handler.ProcessWasInterrupted:
-        thread.exit()
-    self.runtime_ms = int(1000 * (time.time() - begin))
-    self.last_execution_time = None if self.exit_code else self.runtime_ms
-
-
-class TaskManager(object):
-  """Executes the tasks and stores the passed, failed and interrupted tasks.
-
-  When a task is run, this class keeps track if it passed, failed or was
-  interrupted. After a task finishes it calls the relevant functions of the
-  Logger, TestResults and TestTimes classes, and in case of failure, retries the
-  test as specified by the --retry_failed flag.
-  """
-  def __init__(self, times, logger, test_results):
-    self.times = times
-    self.logger = logger
-    self.test_results = test_results
-
-    self.global_exit_code = 0
-
-    self.passed = []
-    self.failed = []
-    self.started = {}
-    self.execution_number = {}
-
-    self.lock = threading.Lock()
-
-  def get_next_execution_number(self, test_id):
-    with self.lock:
-      next_execution_number = self.execution_number.setdefault(test_id, 1)
-      self.execution_number[test_id] += 1
-    return next_execution_number
-
-  def __register_start(self, task):
-    with self.lock:
-      self.started[task.task_id] = task
-
-  def __register_exit(self, task):
-    self.logger.log_exit(task)
-    self.times.record_test_time(task.test_binary, task.test_name,
-                                task.last_execution_time)
-    if self.test_results:
-      test_results.log(task.test_name, task.runtime_ms,
-                       "PASS" if task.exit_code == 0 else "FAIL")
-
-    with self.lock:
-      self.started.pop(task.task_id)
-      if task.exit_code == 0:
-        self.passed.append(task)
-      else:
-        self.failed.append(task)
-
-  def run_task(self, task):
-    for try_number in range(options.retry_failed + 1):
-      self.__register_start(task)
-      task.run()
-      self.__register_exit(task)
-
-      if task.exit_code == 0:
-        break
-
-      if try_number < options.retry_failed:
-        # We need create a new Task instance. Each task represents a single test
-        # execution, with its own runtime, exit code and log file.
-        task = Task(task.test_binary, task.test_name, task.test_command,
-                    task.last_execution_time, task.output_dir)
-
-    with self.lock:
-      if task.exit_code != 0:
-        self.global_exit_code = task.exit_code
-
-
-class FilterFormat(object):
-  def __init__(self, output_dir):
-    if sys.stdout.isatty():
-      # stdout needs to be unbuffered since the output is interactive.
-      sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
-
-    self.output_dir = output_dir
-
-    self.total_tasks = 0
-    self.finished_tasks = 0
-    self.out = Outputter(sys.stdout)
-    self.stdout_lock = threading.Lock()
-
-  def move_to(self, destination_dir, tasks):
-    destination_dir = os.path.join(self.output_dir, destination_dir)
-    os.makedirs(destination_dir)
-    for task in tasks:
-        shutil.move(task.log_file, destination_dir)
-
-
-  def print_tests(self, message, tasks):
-    self.out.permanent_line("%s (%s/%s):" %
-                            (message, len(tasks), self.total_tasks))
-    for task in sorted(tasks):
-      runtime_ms = 'Interrupted'
-      if task.runtime_ms is not None:
-        runtime_ms = '%d ms' % task.runtime_ms
-      self.out.permanent_line("%11s: %s %s (try #%d)" % (
-          runtime_ms, task.test_binary, task.test_name, task.execution_number))
-
-  def log_exit(self, task):
-    with self.stdout_lock:
-      self.finished_tasks += 1
-      self.out.transient_line("[%d/%d] %s (%d ms)"
-                              % (self.finished_tasks, self.total_tasks,
-                                 task.test_name, task.runtime_ms))
-      if task.exit_code != 0:
-        with open(task.log_file) as f:
-          for line in f.readlines():
-            self.out.permanent_line(line.rstrip())
-        self.out.permanent_line(
-          "[%d/%d] %s returned/aborted with exit code %d (%d ms)"
-          % (self.finished_tasks, self.total_tasks, task.test_name,
-             task.exit_code, task.runtime_ms))
-
-  def log_tasks(self, total_tasks):
-    self.total_tasks += total_tasks
-    self.out.transient_line("[0/%d] Running tests..." % self.total_tasks)
-
-  def flush(self):
-    self.out.flush_transient_output()
-
-
-class CollectTestResults(object):
-  def __init__(self, json_dump_filepath):
-    self.test_results_lock = threading.Lock()
-    self.json_dump_file = open(json_dump_filepath, 'w')
-    self.test_results = {
-        "interrupted": False,
-        "path_delimiter": ".",
-        # Third version of the file format. See the link in the flag description
-        # for details.
-        "version": 3,
-        "seconds_since_epoch": int(time.time()),
-        "num_failures_by_type": {
-            "PASS": 0,
-            "FAIL": 0,
-        },
-        "tests": {},
-    }
-
-  def log(self, test, runtime_ms, actual_result):
-    with self.test_results_lock:
-      self.test_results['num_failures_by_type'][actual_result] += 1
-      results = self.test_results['tests']
-      for name in test.split('.'):
-        results = results.setdefault(name, {})
-
-      if results:
-        results['actual'] += ' ' + actual_result
-        results['times'].append(runtime_ms)
-      else:  # This is the first invocation of the test
-        results['actual'] = actual_result
-        results['times'] = [runtime_ms]
-        results['time'] = runtime_ms
-        results['expected'] = 'PASS'
-
-  def dump_to_file_and_close(self):
-    json.dump(self.test_results, self.json_dump_file)
-    self.json_dump_file.close()
-
-class DummyTimer(object):
-  def start(self):
-    pass
-  def cancel(self):
-    pass
-
-# Record of test runtimes. Has built-in locking.
-class TestTimes(object):
-  def __init__(self, save_file):
-    "Create new object seeded with saved test times from the given file."
-    self.__times = {}  # (test binary, test name) -> runtime in ms
-
-    # Protects calls to record_test_time(); other calls are not
-    # expected to be made concurrently.
-    self.__lock = threading.Lock()
-
-    try:
-      with gzip.GzipFile(save_file, "rb") as f:
-        times = cPickle.load(f)
-    except (EOFError, IOError, cPickle.UnpicklingError, zlib.error):
-      # File doesn't exist, isn't readable, is malformed---whatever.
-      # Just ignore it.
-      return
-
-    # Discard saved times if the format isn't right.
-    if type(times) is not dict:
-      return
-    for ((test_binary, test_name), runtime) in times.items():
-      if (type(test_binary) is not str or type(test_name) is not str
-          or type(runtime) not in {int, long, type(None)}):
-        return
-
-    self.__times = times
-
-  def get_test_time(self, binary, testname):
-    """Return the last duration for the given test as an integer number of
-    milliseconds, or None if the test failed or if there's no record for it."""
-    return self.__times.get((binary, testname), None)
-
-  def record_test_time(self, binary, testname, runtime_ms):
-    """Record that the given test ran in the specified number of
-    milliseconds. If the test failed, runtime_ms should be None."""
-    with self.__lock:
-      self.__times[(binary, testname)] = runtime_ms
-
-  def write_to_file(self, save_file):
-    "Write all the times to file."
-    try:
-      with open(save_file, "wb") as f:
-        with gzip.GzipFile("", "wb", 9, f) as gzf:
-          cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL)
-    except IOError:
-      pass  # ignore errors---saving the times isn't that important
-
-
-def find_tests(binaries, additional_args):
-  test_count = 0
-  tasks = []
-  for test_binary in binaries:
-    command = [test_binary]
-    if options.gtest_also_run_disabled_tests:
-      command += ['--gtest_also_run_disabled_tests']
-
-    list_command = command + ['--gtest_list_tests']
-    if options.gtest_filter != '':
-      list_command += ['--gtest_filter=' + options.gtest_filter]
-
-    try:
-      test_list = subprocess.Popen(list_command,
-                                   stdout=subprocess.PIPE).communicate()[0]
-    except OSError as e:
-      sys.exit("%s: %s" % (test_binary, str(e)))
-
-    command += additional_args + ['--gtest_color=' + options.gtest_color]
-
-    test_group = ''
-    for line in test_list.split('\n'):
-      if not line.strip():
-        continue
-      if line[0] != " ":
-        # Remove comments for typed tests and strip whitespace.
-        test_group = line.split('#')[0].strip()
-        continue
-      # Remove comments for parameterized tests and strip whitespace.
-      line = line.split('#')[0].strip()
-      if not line:
-        continue
-
-      test_name = test_group + line
-      if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test_name:
-        continue
-
-      last_execution_time = times.get_test_time(test_binary, test_name)
-      if options.failed and last_execution_time is not None:
-        continue
-
-      test_command = command + ['--gtest_filter=' + test_name]
-      if (test_count - options.shard_index) % options.shard_count == 0:
-        for _ in range(options.repeat):
-          tasks.append(Task(test_binary, test_name, test_command,
-                            last_execution_time, options.output_dir))
-
-      test_count += 1
-
-  return tasks
-
-
-def execute_tasks(tasks):
-  class WorkerFn(object):
-    def __init__(self, tasks):
-      self.task_id = 0
-      self.tasks = tasks
-      self.task_lock = threading.Lock()
-
-    def __call__(self):
-      while True:
-        with self.task_lock:
-          if self.task_id < len(self.tasks):
-            task = self.tasks[self.task_id]
-            self.task_id += 1
-          else:
-            return
-        task_manager.run_task(task)
-
-  def start_daemon(func):
-    t = threading.Thread(target=func)
-    t.daemon = True
-    t.start()
-    return t
-
-  try:
-    timeout.start()
-    worker_fn = WorkerFn(tasks)
-    workers = [start_daemon(worker_fn) for _ in range(options.workers)]
-    for worker in workers:
-      worker.join()
-  finally:
-    timeout.cancel()
-
-
-# Remove additional arguments (anything after --).
-additional_args = []
-
-for i in range(len(sys.argv)):
-  if sys.argv[i] == '--':
-    additional_args = sys.argv[i+1:]
-    sys.argv = sys.argv[:i]
-    break
-
-parser = optparse.OptionParser(
-    usage = 'usage: %prog [options] binary [binary ...] -- [additional args]')
-
-parser.add_option('-d', '--output_dir', type='string',
-                  default=os.path.join(tempfile.gettempdir(), "gtest-parallel"),
-                  help='output directory for test logs')
-parser.add_option('-r', '--repeat', type='int', default=1,
-                  help='Number of times to execute all the tests.')
-parser.add_option('--retry_failed', type='int', default=0,
-                  help='Number of times to repeat failed tests.')
-parser.add_option('--failed', action='store_true', default=False,
-                  help='run only failed and new tests')
-parser.add_option('-w', '--workers', type='int',
-                  default=multiprocessing.cpu_count(),
-                  help='number of workers to spawn')
-parser.add_option('--gtest_color', type='string', default='yes',
-                  help='color output')
-parser.add_option('--gtest_filter', type='string', default='',
-                  help='test filter')
-parser.add_option('--gtest_also_run_disabled_tests', action='store_true',
-                  default=False, help='run disabled tests too')
-parser.add_option('--print_test_times', action='store_true', default=False,
-                  help='list the run time of each test at the end of execution')
-parser.add_option('--shard_count', type='int', default=1,
-                  help='total number of shards (for sharding test execution '
-                       'between multiple machines)')
-parser.add_option('--shard_index', type='int', default=0,
-                  help='zero-indexed number identifying this shard (for '
-                       'sharding test execution between multiple machines)')
-parser.add_option('--dump_json_test_results', type='string', default=None,
-                  help='Saves the results of the tests as a JSON machine-'
-                       'readable file. The format of the file is specified at '
-                       'https://www.chromium.org/developers/the-json-test-results-format')
-parser.add_option('--timeout', type='int', default=None,
-                  help='Interrupt all remaining processes after the given '
-                       'time (in seconds).')
-
-(options, binaries) = parser.parse_args()
-
-if binaries == []:
-  parser.print_usage()
-  sys.exit(1)
-
-if options.shard_count < 1:
-  parser.error("Invalid number of shards: %d. Must be at least 1." %
-               options.shard_count)
-if not (0 <= options.shard_index < options.shard_count):
-  parser.error("Invalid shard index: %d. Must be between 0 and %d "
-               "(less than the number of shards)." %
-               (options.shard_index, options.shard_count - 1))
-
-# Check that all test binaries have an unique basename. That way we can ensure
-# the logs are saved to unique files even when two different binaries have
-# common tests.
-unique_binaries = set(os.path.basename(binary) for binary in binaries)
-assert len(unique_binaries) == len(binaries), (
-    "All test binaries must have an unique basename.")
-
-# Remove files from old test runs.
-if os.path.isdir(options.output_dir):
-  shutil.rmtree(options.output_dir)
-# Create directory for test log output.
-try:
-  os.makedirs(options.output_dir)
-except OSError as e:
-  # Ignore errors if this directory already exists.
-  if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir):
-    raise e
-
-timeout = (DummyTimer() if options.timeout is None
-           else threading.Timer(options.timeout, sigint_handler.interrupt))
-
-test_results = None
-if options.dump_json_test_results is not None:
-  test_results = CollectTestResults(options.dump_json_test_results)
-
-save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times")
-times = TestTimes(save_file)
-logger = FilterFormat(options.output_dir)
-
-task_manager = TaskManager(times, logger, test_results)
-
-tasks = find_tests(binaries, additional_args)
-logger.log_tasks(len(tasks))
-execute_tasks(tasks)
-
-if task_manager.passed:
-  logger.move_to('passed', task_manager.passed)
-  if options.print_test_times:
-    logger.print_tests('PASSED TESTS', task_manager.passed)
-
-if task_manager.failed:
-  logger.print_tests('FAILED TESTS', task_manager.failed)
-  logger.move_to('failed', task_manager.failed)
-
-if task_manager.started:
-  logger.print_tests('INTERRUPTED TESTS', task_manager.started.values())
-  logger.move_to('interrupted', task_manager.started.values())
-
-logger.flush()
-times.write_to_file(save_file)
-if test_results:
-  test_results.dump_to_file_and_close()
-
-if sigint_handler.got_sigint():
-  task_manager.global_exit_code = -signal.SIGINT
-sys.exit(task_manager.global_exit_code)
+sys.exit(gtest_parallel.main())
diff --git a/gtest_parallel.py b/gtest_parallel.py
new file mode 100755
index 0000000..e7def05
--- /dev/null
+++ b/gtest_parallel.py
@@ -0,0 +1,594 @@
+#!/usr/bin/env python2
+# Copyright 2013 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import cPickle
+import errno
+import gzip
+import json
+import multiprocessing
+import optparse
+import os
+import re
+import shutil
+import signal
+import subprocess
+import sys
+import tempfile
+import thread
+import threading
+import time
+import zlib
+
+# An object that catches SIGINT sent to the Python process and notices
+# if processes passed to wait() die by SIGINT (we need to look for
+# both of those cases, because pressing Ctrl+C can result in either
+# the main process or one of the subprocesses getting the signal).
+#
+# Before a SIGINT is seen, wait(p) will simply call p.wait() and
+# return the result. Once a SIGINT has been seen (in the main process
+# or a subprocess, including the one the current call is waiting for),
+# wait(p) will call p.terminate() and raise ProcessWasInterrupted.
+class SigintHandler(object):
+  class ProcessWasInterrupted(Exception): pass
+  sigint_returncodes = {-signal.SIGINT,  # Unix
+                        -1073741510,     # Windows
+                        }
+  def __init__(self):
+    self.__lock = threading.Lock()
+    self.__processes = set()
+    self.__got_sigint = False
+    signal.signal(signal.SIGINT, lambda signal_num, frame: self.interrupt())
+  def __on_sigint(self):
+    self.__got_sigint = True
+    while self.__processes:
+      try:
+        self.__processes.pop().terminate()
+      except OSError:
+        pass
+  def interrupt(self):
+    with self.__lock:
+      self.__on_sigint()
+  def got_sigint(self):
+    with self.__lock:
+      return self.__got_sigint
+  def wait(self, p):
+    with self.__lock:
+      if self.__got_sigint:
+        p.terminate()
+      self.__processes.add(p)
+    code = p.wait()
+    with self.__lock:
+      self.__processes.discard(p)
+      if code in self.sigint_returncodes:
+        self.__on_sigint()
+      if self.__got_sigint:
+        raise self.ProcessWasInterrupted
+    return code
+sigint_handler = SigintHandler()
+
+# Return the width of the terminal, or None if it couldn't be
+# determined (e.g. because we're not being run interactively).
+def term_width(out):
+  if not out.isatty():
+    return None
+  try:
+    p = subprocess.Popen(["stty", "size"],
+                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    (out, err) = p.communicate()
+    if p.returncode != 0 or err:
+      return None
+    return int(out.split()[1])
+  except (IndexError, OSError, ValueError):
+    return None
+
+# Output transient and permanent lines of text. If several transient
+# lines are written in sequence, the new will overwrite the old. We
+# use this to ensure that lots of unimportant info (tests passing)
+# won't drown out important info (tests failing).
+class Outputter(object):
+  def __init__(self, out_file):
+    self.__out_file = out_file
+    self.__previous_line_was_transient = False
+    self.__width = term_width(out_file)  # Line width, or None if not a tty.
+  def transient_line(self, msg):
+    if self.__width is None:
+      self.__out_file.write(msg + "\n")
+    else:
+      self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width))
+      self.__previous_line_was_transient = True
+  def flush_transient_output(self):
+    if self.__previous_line_was_transient:
+      self.__out_file.write("\n")
+      self.__previous_line_was_transient = False
+  def permanent_line(self, msg):
+    self.flush_transient_output()
+    self.__out_file.write(msg + "\n")
+
+
+class Task(object):
+  """Stores information about a task (single execution of a test).
+
+  This class stores information about the test to be executed (gtest binary and
+  test name), and its result (log file, exit code and runtime).
+  Each task is uniquely identified by the gtest binary, the test name and an
+  execution number that increases each time the test is executed.
+  Additionaly we store the last execution time, so that next time the test is
+  executed, the slowest tests are run first.
+  """
+  def __init__(self, test_binary, test_name, test_command, execution_number,
+               last_execution_time, output_dir):
+    self.test_name = test_name
+    self.output_dir = output_dir
+    self.test_binary = test_binary
+    self.test_command = test_command
+    self.execution_number = execution_number
+    self.last_execution_time = last_execution_time
+
+    self.exit_code = None
+    self.runtime_ms = None
+
+    self.test_id = (test_binary, test_name)
+    self.task_id = (test_binary, test_name, self.execution_number)
+
+    log_name = '%s-%s-%s.log' % (self.__normalize(test_binary),
+                                 self.__normalize(test_name),
+                                 self.execution_number)
+
+    self.log_file = os.path.join(output_dir, log_name)
+
+  def __lt__(self, other):
+    if self.last_execution_time is None:
+      return True
+    if other.last_execution_time is None:
+      return False
+    return self.last_execution_time > other.last_execution_time
+
+  def __normalize(self, string):
+    return re.sub('[^A-Za-z0-9]', '_', string)
+
+  def run(self):
+    begin = time.time()
+    with open(self.log_file, 'w') as log:
+      task = subprocess.Popen(self.test_command, stdout=log, stderr=log)
+      try:
+        self.exit_code = sigint_handler.wait(task)
+      except sigint_handler.ProcessWasInterrupted:
+        thread.exit()
+    self.runtime_ms = int(1000 * (time.time() - begin))
+    self.last_execution_time = None if self.exit_code else self.runtime_ms
+
+
+class TaskManager(object):
+  """Executes the tasks and stores the passed, failed and interrupted tasks.
+
+  When a task is run, this class keeps track if it passed, failed or was
+  interrupted. After a task finishes it calls the relevant functions of the
+  Logger, TestResults and TestTimes classes, and in case of failure, retries the
+  test as specified by the --retry_failed flag.
+  """
+  def __init__(self, times, logger, test_results, times_to_retry,
+               initial_execution_number):
+    self.times = times
+    self.logger = logger
+    self.test_results = test_results
+    self.times_to_retry = times_to_retry
+    self.initial_execution_number = initial_execution_number
+
+    self.global_exit_code = 0
+
+    self.passed = []
+    self.failed = []
+    self.started = {}
+    self.execution_number = {}
+
+    self.lock = threading.Lock()
+
+  def __get_next_execution_number(self, test_id):
+    with self.lock:
+      next_execution_number = self.execution_number.setdefault(
+          test_id, self.initial_execution_number)
+      self.execution_number[test_id] += 1
+    return next_execution_number
+
+  def __register_start(self, task):
+    with self.lock:
+      self.started[task.task_id] = task
+
+  def __register_exit(self, task):
+    self.logger.log_exit(task)
+    self.times.record_test_time(task.test_binary, task.test_name,
+                                task.last_execution_time)
+    if self.test_results:
+      test_results.log(task.test_name, task.runtime_ms,
+                       "PASS" if task.exit_code == 0 else "FAIL")
+
+    with self.lock:
+      self.started.pop(task.task_id)
+      if task.exit_code == 0:
+        self.passed.append(task)
+      else:
+        self.failed.append(task)
+
+  def run_task(self, task):
+    for try_number in range(self.times_to_retry + 1):
+      self.__register_start(task)
+      task.run()
+      self.__register_exit(task)
+
+      if task.exit_code == 0:
+        break
+
+      if try_number < self.times_to_retry:
+        execution_number = self.__get_next_execution_number(task.test_id)
+        # We need create a new Task instance. Each task represents a single test
+        # execution, with its own runtime, exit code and log file.
+        task = Task(task.test_binary, task.test_name, task.test_command,
+                    execution_number, task.last_execution_time, task.output_dir)
+
+    with self.lock:
+      if task.exit_code != 0:
+        self.global_exit_code = task.exit_code
+
+
+class FilterFormat(object):
+  def __init__(self, output_dir):
+    if sys.stdout.isatty():
+      # stdout needs to be unbuffered since the output is interactive.
+      sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
+
+    self.output_dir = output_dir
+
+    self.total_tasks = 0
+    self.finished_tasks = 0
+    self.out = Outputter(sys.stdout)
+    self.stdout_lock = threading.Lock()
+
+  def move_to(self, destination_dir, tasks):
+    destination_dir = os.path.join(self.output_dir, destination_dir)
+    os.makedirs(destination_dir)
+    for task in tasks:
+        shutil.move(task.log_file, destination_dir)
+
+
+  def print_tests(self, message, tasks):
+    self.out.permanent_line("%s (%s/%s):" %
+                            (message, len(tasks), self.total_tasks))
+    for task in sorted(tasks):
+      runtime_ms = 'Interrupted'
+      if task.runtime_ms is not None:
+        runtime_ms = '%d ms' % task.runtime_ms
+      self.out.permanent_line("%11s: %s %s (try #%d)" % (
+          runtime_ms, task.test_binary, task.test_name, task.execution_number))
+
+  def log_exit(self, task):
+    with self.stdout_lock:
+      self.finished_tasks += 1
+      self.out.transient_line("[%d/%d] %s (%d ms)"
+                              % (self.finished_tasks, self.total_tasks,
+                                 task.test_name, task.runtime_ms))
+      if task.exit_code != 0:
+        with open(task.log_file) as f:
+          for line in f.readlines():
+            self.out.permanent_line(line.rstrip())
+        self.out.permanent_line(
+          "[%d/%d] %s returned/aborted with exit code %d (%d ms)"
+          % (self.finished_tasks, self.total_tasks, task.test_name,
+             task.exit_code, task.runtime_ms))
+
+  def log_tasks(self, total_tasks):
+    self.total_tasks += total_tasks
+    self.out.transient_line("[0/%d] Running tests..." % self.total_tasks)
+
+  def flush(self):
+    self.out.flush_transient_output()
+
+
+class CollectTestResults(object):
+  def __init__(self, json_dump_filepath):
+    self.test_results_lock = threading.Lock()
+    self.json_dump_file = open(json_dump_filepath, 'w')
+    self.test_results = {
+        "interrupted": False,
+        "path_delimiter": ".",
+        # Third version of the file format. See the link in the flag description
+        # for details.
+        "version": 3,
+        "seconds_since_epoch": int(time.time()),
+        "num_failures_by_type": {
+            "PASS": 0,
+            "FAIL": 0,
+        },
+        "tests": {},
+    }
+
+  def log(self, test, runtime_ms, actual_result):
+    with self.test_results_lock:
+      self.test_results['num_failures_by_type'][actual_result] += 1
+      results = self.test_results['tests']
+      for name in test.split('.'):
+        results = results.setdefault(name, {})
+
+      if results:
+        results['actual'] += ' ' + actual_result
+        results['times'].append(runtime_ms)
+      else:  # This is the first invocation of the test
+        results['actual'] = actual_result
+        results['times'] = [runtime_ms]
+        results['time'] = runtime_ms
+        results['expected'] = 'PASS'
+
+  def dump_to_file_and_close(self):
+    json.dump(self.test_results, self.json_dump_file)
+    self.json_dump_file.close()
+
+class DummyTimer(object):
+  def start(self):
+    pass
+  def cancel(self):
+    pass
+
+# Record of test runtimes. Has built-in locking.
+class TestTimes(object):
+  def __init__(self, save_file):
+    "Create new object seeded with saved test times from the given file."
+    self.__times = {}  # (test binary, test name) -> runtime in ms
+
+    # Protects calls to record_test_time(); other calls are not
+    # expected to be made concurrently.
+    self.__lock = threading.Lock()
+
+    try:
+      with gzip.GzipFile(save_file, "rb") as f:
+        times = cPickle.load(f)
+    except (EOFError, IOError, cPickle.UnpicklingError, zlib.error):
+      # File doesn't exist, isn't readable, is malformed---whatever.
+      # Just ignore it.
+      return
+
+    # Discard saved times if the format isn't right.
+    if type(times) is not dict:
+      return
+    for ((test_binary, test_name), runtime) in times.items():
+      if (type(test_binary) is not str or type(test_name) is not str
+          or type(runtime) not in {int, long, type(None)}):
+        return
+
+    self.__times = times
+
+  def get_test_time(self, binary, testname):
+    """Return the last duration for the given test as an integer number of
+    milliseconds, or None if the test failed or if there's no record for it."""
+    return self.__times.get((binary, testname), None)
+
+  def record_test_time(self, binary, testname, runtime_ms):
+    """Record that the given test ran in the specified number of
+    milliseconds. If the test failed, runtime_ms should be None."""
+    with self.__lock:
+      self.__times[(binary, testname)] = runtime_ms
+
+  def write_to_file(self, save_file):
+    "Write all the times to file."
+    try:
+      with open(save_file, "wb") as f:
+        with gzip.GzipFile("", "wb", 9, f) as gzf:
+          cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL)
+    except IOError:
+      pass  # ignore errors---saving the times isn't that important
+
+
+def find_tests(binaries, additional_args, options, times):
+  test_count = 0
+  tasks = []
+  for test_binary in binaries:
+    command = [test_binary]
+    if options.gtest_also_run_disabled_tests:
+      command += ['--gtest_also_run_disabled_tests']
+
+    list_command = command + ['--gtest_list_tests']
+    if options.gtest_filter != '':
+      list_command += ['--gtest_filter=' + options.gtest_filter]
+
+    try:
+      test_list = subprocess.Popen(list_command,
+                                   stdout=subprocess.PIPE).communicate()[0]
+    except OSError as e:
+      sys.exit("%s: %s" % (test_binary, str(e)))
+
+    command += additional_args + ['--gtest_color=' + options.gtest_color]
+
+    test_group = ''
+    for line in test_list.split('\n'):
+      if not line.strip():
+        continue
+      if line[0] != " ":
+        # Remove comments for typed tests and strip whitespace.
+        test_group = line.split('#')[0].strip()
+        continue
+      # Remove comments for parameterized tests and strip whitespace.
+      line = line.split('#')[0].strip()
+      if not line:
+        continue
+
+      test_name = test_group + line
+      if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test_name:
+        continue
+
+      last_execution_time = times.get_test_time(test_binary, test_name)
+      if options.failed and last_execution_time is not None:
+        continue
+
+      test_command = command + ['--gtest_filter=' + test_name]
+      if (test_count - options.shard_index) % options.shard_count == 0:
+        for execution_number in range(options.repeat):
+          tasks.append(Task(test_binary, test_name, test_command,
+                            execution_number + 1, last_execution_time,
+                            options.output_dir))
+
+      test_count += 1
+
+  return tasks
+
+
+def execute_tasks(tasks, pool_size, task_manager, timeout):
+  class WorkerFn(object):
+    def __init__(self, tasks):
+      self.task_id = 0
+      self.tasks = tasks
+      self.task_lock = threading.Lock()
+
+    def __call__(self):
+      while True:
+        with self.task_lock:
+          if self.task_id < len(self.tasks):
+            task = self.tasks[self.task_id]
+            self.task_id += 1
+          else:
+            return
+        task_manager.run_task(task)
+
+  def start_daemon(func):
+    t = threading.Thread(target=func)
+    t.daemon = True
+    t.start()
+    return t
+
+  try:
+    timeout.start()
+    worker_fn = WorkerFn(tasks)
+    workers = [start_daemon(worker_fn) for _ in range(pool_size)]
+    for worker in workers:
+      worker.join()
+  finally:
+    timeout.cancel()
+
+
+def main():
+  # Remove additional arguments (anything after --).
+  additional_args = []
+
+  for i in range(len(sys.argv)):
+    if sys.argv[i] == '--':
+      additional_args = sys.argv[i+1:]
+      sys.argv = sys.argv[:i]
+      break
+
+  parser = optparse.OptionParser(
+      usage = 'usage: %prog [options] binary [binary ...] -- [additional args]')
+
+  parser.add_option('-d', '--output_dir', type='string',
+                    default=os.path.join(tempfile.gettempdir(), "gtest-parallel"),
+                    help='output directory for test logs')
+  parser.add_option('-r', '--repeat', type='int', default=1,
+                    help='Number of times to execute all the tests.')
+  parser.add_option('--retry_failed', type='int', default=0,
+                    help='Number of times to repeat failed tests.')
+  parser.add_option('--failed', action='store_true', default=False,
+                    help='run only failed and new tests')
+  parser.add_option('-w', '--workers', type='int',
+                    default=multiprocessing.cpu_count(),
+                    help='number of workers to spawn')
+  parser.add_option('--gtest_color', type='string', default='yes',
+                    help='color output')
+  parser.add_option('--gtest_filter', type='string', default='',
+                    help='test filter')
+  parser.add_option('--gtest_also_run_disabled_tests', action='store_true',
+                    default=False, help='run disabled tests too')
+  parser.add_option('--print_test_times', action='store_true', default=False,
+                    help='list the run time of each test at the end of execution')
+  parser.add_option('--shard_count', type='int', default=1,
+                    help='total number of shards (for sharding test execution '
+                         'between multiple machines)')
+  parser.add_option('--shard_index', type='int', default=0,
+                    help='zero-indexed number identifying this shard (for '
+                         'sharding test execution between multiple machines)')
+  parser.add_option('--dump_json_test_results', type='string', default=None,
+                    help='Saves the results of the tests as a JSON machine-'
+                         'readable file. The format of the file is specified at '
+                         'https://www.chromium.org/developers/the-json-test-results-format')
+  parser.add_option('--timeout', type='int', default=None,
+                    help='Interrupt all remaining processes after the given '
+                         'time (in seconds).')
+
+  (options, binaries) = parser.parse_args()
+
+  if binaries == []:
+    parser.print_usage()
+    sys.exit(1)
+
+  if options.shard_count < 1:
+    parser.error("Invalid number of shards: %d. Must be at least 1." %
+                 options.shard_count)
+  if not (0 <= options.shard_index < options.shard_count):
+    parser.error("Invalid shard index: %d. Must be between 0 and %d "
+                 "(less than the number of shards)." %
+                 (options.shard_index, options.shard_count - 1))
+
+  # Check that all test binaries have an unique basename. That way we can ensure
+  # the logs are saved to unique files even when two different binaries have
+  # common tests.
+  unique_binaries = set(os.path.basename(binary) for binary in binaries)
+  assert len(unique_binaries) == len(binaries), (
+      "All test binaries must have an unique basename.")
+
+  # Remove files from old test runs.
+  if os.path.isdir(options.output_dir):
+    shutil.rmtree(options.output_dir)
+  # Create directory for test log output.
+  try:
+    os.makedirs(options.output_dir)
+  except OSError as e:
+    # Ignore errors if this directory already exists.
+    if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir):
+      raise e
+
+  timeout = (DummyTimer() if options.timeout is None
+             else threading.Timer(options.timeout, sigint_handler.interrupt))
+
+  test_results = None
+  if options.dump_json_test_results is not None:
+    test_results = CollectTestResults(options.dump_json_test_results)
+
+  save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times")
+  times = TestTimes(save_file)
+  logger = FilterFormat(options.output_dir)
+
+  task_manager = TaskManager(times, logger, test_results, options.retry_failed,
+                             options.repeat + 1)
+
+  tasks = find_tests(binaries, additional_args, options, times)
+  logger.log_tasks(len(tasks))
+  execute_tasks(tasks, options.workers, task_manager, timeout)
+
+  if task_manager.passed:
+    logger.move_to('passed', task_manager.passed)
+    if options.print_test_times:
+      logger.print_tests('PASSED TESTS', task_manager.passed)
+
+  if task_manager.failed:
+    logger.print_tests('FAILED TESTS', task_manager.failed)
+    logger.move_to('failed', task_manager.failed)
+
+  if task_manager.started:
+    logger.print_tests('INTERRUPTED TESTS', task_manager.started.values())
+    logger.move_to('interrupted', task_manager.started.values())
+
+  logger.flush()
+  times.write_to_file(save_file)
+  if test_results:
+    test_results.dump_to_file_and_close()
+
+  if sigint_handler.got_sigint():
+    return -signal.SIGINT
+
+  return task_manager.global_exit_code