Add instrumentation facility to wptrunner

Add simple high-level instrumentation to record time spent in
different kinds of tasks, and particularly in different tests
and directories. This is based on explicit annotations in the
code to say which kind of task is currently running.

The output is intended to be compatible with flamegraph.pl to
generate visualisations of the time spent doing different tasks.
diff --git a/tools/wptrunner/wptrunner/instruments.py b/tools/wptrunner/wptrunner/instruments.py
new file mode 100644
index 0000000..e939648
--- /dev/null
+++ b/tools/wptrunner/wptrunner/instruments.py
@@ -0,0 +1,117 @@
+import time
+import threading
+from Queue import Queue
+
+"""Instrumentation for measuring high-level time spent on various tasks inside the runner.
+
+This is lower fidelity than an actual profile, but allows custom data to be considered,
+so that we can see the time spent in specific tests and test directories.
+
+
+Instruments are intended to be used as context managers with the return value of __enter__
+containing the user-facing API e.g.
+
+with Instrument(*args) as recording:
+    recording.set(["init"])
+    do_init()
+    recording.pause()
+    for thread in test_threads:
+       thread.start(recording, *args)
+    for thread in test_threads:
+       thread.join()
+    recording.set(["teardown"])   # un-pauses the Instrument
+    do_teardown()
+"""
+
+class NullInstrument(object):
+    def set(self, stack):
+        """Set the current task to stack
+
+        :param stack: A list of strings defining the current task.
+                      These are interpreted like a stack trace so that ["foo"] and
+                      ["foo", "bar"] both show up as descendants of "foo"
+        """
+        pass
+
+    def pause(self):
+        """Stop recording a task on the current thread. This is useful if the thread
+        is purely waiting on the results of other threads"""
+        pass
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        return
+
+
+class InstrumentWriter(object):
+    def __init__(self, queue):
+        self.queue = queue
+
+    def set(self, stack):
+        stack.insert(0, threading.current_thread().name)
+        stack = self._check_stack(stack)
+        self.queue.put(("set", threading.current_thread().ident, time.time(), stack))
+
+    def pause(self):
+        self.queue.put(("pause", threading.current_thread().ident, time.time(), None))
+
+    def _check_stack(self, stack):
+        assert isinstance(stack, (tuple, list))
+        return [item.replace(" ", "_") for item in stack]
+
+
+class Instrument(object):
+    def __init__(self, file_path):
+        """Instrument that collects data from multiple threads and sums the time in each
+        thread. The output is in the format required by flamegraph.pl to enable visualisation
+        of the time spent in each task.
+
+        :param file_path: - The path on which to write instrument output. Any existing file
+                            at the path will be overwritten
+        """
+        self.path = file_path
+        self.queue = None
+        self.current = None
+        self.start_time = None
+        self.thread = None
+
+    def __enter__(self):
+        assert self.thread is None
+        assert self.queue is None
+        self.queue = Queue()
+        self.thread = threading.Thread(target=self.run)
+        self.thread.start()
+        return InstrumentWriter(self.queue)
+
+    def __exit__(self, *args, **kwargs):
+        self.queue.put(("stop", None, time.time(), None))
+        self.thread.join()
+        self.thread = None
+        self.queue = None
+
+    def run(self):
+        known_commands = {"stop", "pause", "set"}
+        with open(self.path, "w") as f:
+            thread_data = {}
+            while True:
+                command, thread, time_stamp, stack = self.queue.get()
+                assert command in known_commands
+
+                # If we are done recording, dump the information from all threads to the file
+                # before exiting. Otherwise for either 'set' or 'pause' we only need to dump
+                # information from the current stack (if any) that was recording on the reporting
+                # thread (as that stack is no longer active).
+                items = []
+                if command == "stop":
+                    items = thread_data.values()
+                elif thread in thread_data:
+                    items.append(thread_data.pop(thread))
+                for output_stack, start_time in items:
+                    f.write("%s %d\n" % (";".join(output_stack), int(1000 * (time_stamp - start_time))))
+
+                if command == "set":
+                    thread_data[thread] = (stack, time_stamp)
+                elif command == "stop":
+                    break
diff --git a/tools/wptrunner/wptrunner/testrunner.py b/tools/wptrunner/wptrunner/testrunner.py
index 7b6bf57..bec6ee8 100644
--- a/tools/wptrunner/wptrunner/testrunner.py
+++ b/tools/wptrunner/wptrunner/testrunner.py
@@ -280,7 +280,7 @@
     def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs,
                  executor_cls, executor_kwargs, stop_flag, rerun=1, pause_after_test=False,
                  pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None,
-                 capture_stdio=True):
+                 capture_stdio=True, recording=None):
         """Thread that owns a single TestRunner process and any processes required
         by the TestRunner (e.g. the Firefox binary).
 
@@ -318,6 +318,8 @@
         self.debug_info = debug_info
 
         self.manager_number = next_manager_number()
+        assert recording is not None
+        self.recording = recording
 
         self.command_queue = Queue()
         self.remote_queue = Queue()
@@ -350,6 +352,7 @@
         may also have a stop flag set by the main thread indicating
         that the manager should shut down the next time the event loop
         spins."""
+        self.recording.set(["testrunner", "startup"])
         self.logger = structuredlog.StructuredLogger(self.suite_name)
         with self.browser_cls(self.logger, **self.browser_kwargs) as browser:
             self.browser = BrowserManager(self.logger,
@@ -467,6 +470,7 @@
 
     def start_init(self):
         test, test_group, group_metadata = self.get_next_test()
+        self.recording.set(["testrunner", "init"])
         if test is None:
             return RunnerManagerState.stop()
         else:
@@ -556,6 +560,7 @@
                                                  self.state.test_group,
                                                  self.state.group_metadata)
 
+        self.recording.set(["testrunner", "test"] + self.state.test.id.split("/")[1:])
         self.logger.test_start(self.state.test.id)
         if self.rerun > 1:
             self.logger.info("Run %d/%d" % (self.run_count, self.rerun))
@@ -672,6 +677,7 @@
                                ((subtest_unexpected or is_unexpected) and
                                 self.restart_on_unexpected))
 
+        self.recording.set(["testrunner", "after-test"])
         if (not file_result.status == "CRASH" and
             self.pause_after_test or
             (self.pause_on_unexpected and (subtest_unexpected or is_unexpected))):
@@ -720,6 +726,7 @@
 
     def stop_runner(self, force=False):
         """Stop the TestRunner and the browser binary."""
+        self.recording.set(["testrunner", "stop_runner"])
         if self.test_runner_proc is None:
             return
 
@@ -738,6 +745,7 @@
         self.remote_queue.close()
         self.command_queue = None
         self.remote_queue = None
+        self.recording.pause()
 
     def ensure_runner_stopped(self):
         self.logger.debug("ensure_runner_stopped")
@@ -825,7 +833,8 @@
                  pause_on_unexpected=False,
                  restart_on_unexpected=True,
                  debug_info=None,
-                 capture_stdio=True):
+                 capture_stdio=True,
+                 recording=None):
         self.suite_name = suite_name
         self.size = size
         self.test_source_cls = test_source_cls
@@ -840,6 +849,8 @@
         self.debug_info = debug_info
         self.rerun = rerun
         self.capture_stdio = capture_stdio
+        self.recording = recording
+        assert recording is not None
 
         self.pool = set()
         # Event that is polled by threads so that they can gracefully exit in the face
@@ -877,7 +888,8 @@
                                         self.pause_on_unexpected,
                                         self.restart_on_unexpected,
                                         self.debug_info,
-                                        self.capture_stdio)
+                                        self.capture_stdio,
+                                        recording=self.recording)
             manager.start()
             self.pool.add(manager)
         self.wait()
diff --git a/tools/wptrunner/wptrunner/wptcommandline.py b/tools/wptrunner/wptrunner/wptcommandline.py
index e69bc28..2e6f0c0 100644
--- a/tools/wptrunner/wptrunner/wptcommandline.py
+++ b/tools/wptrunner/wptrunner/wptcommandline.py
@@ -219,6 +219,8 @@
                               help="Run browser in headless mode", default=None)
     config_group.add_argument("--no-headless", action="store_false", dest="headless",
                               help="Don't run browser in headless mode")
+    config_group.add_argument("--instrument-to-file", action="store",
+                              help="Path to write instrumentation logs to")
 
     build_type = parser.add_mutually_exclusive_group()
     build_type.add_argument("--debug-build", dest="debug", action="store_true",
diff --git a/tools/wptrunner/wptrunner/wptrunner.py b/tools/wptrunner/wptrunner/wptrunner.py
index 75ce104..21bdd23 100644
--- a/tools/wptrunner/wptrunner/wptrunner.py
+++ b/tools/wptrunner/wptrunner/wptrunner.py
@@ -8,6 +8,7 @@
 from wptserve import sslutils
 
 from . import environment as env
+from . import instruments
 from . import products
 from . import testloader
 from . import wptcommandline
@@ -139,7 +140,12 @@
 def run_tests(config, test_paths, product, **kwargs):
     """Set up the test environment, load the list of tests to be executed, and
     invoke the remainder of the code to execute tests"""
-    with capture.CaptureIO(logger, not kwargs["no_capture_stdio"]):
+    if kwargs["instrument_to_file"] is None:
+        recorder = instruments.NullInstrument()
+    else:
+        recorder = instruments.Instrument(kwargs["instrument_to_file"])
+    with recorder as recording, capture.CaptureIO(logger, not kwargs["no_capture_stdio"]):
+        recording.set(["startup"])
         env.do_delayed_imports(logger, test_paths)
 
         product = products.load_product(config, product, load_cls=True)
@@ -155,6 +161,7 @@
                 ahem=os.path.join(test_paths["/"]["tests_path"], "fonts/Ahem.ttf")
             ))
 
+        recording.set(["startup", "load_tests"])
         run_info, test_loader = get_loader(test_paths,
                                            product.name,
                                            run_info_extras=product.run_info_extras(**kwargs),
@@ -190,6 +197,7 @@
 
         testharness_timeout_multipler = product.get_timeout_multiplier("testharness", run_info, **kwargs)
 
+        recording.set(["startup", "start_environment"])
         with env.TestEnvironment(test_paths,
                                  testharness_timeout_multipler,
                                  kwargs["pause_after_test"],
@@ -197,12 +205,15 @@
                                  product.env_options,
                                  ssl_config,
                                  env_extras) as test_environment:
+            recording.set(["startup", "ensure_environment"])
             try:
                 test_environment.ensure_started()
             except env.TestEnvironmentError as e:
                 logger.critical("Error starting test environment: %s" % e.message)
                 raise
 
+            recording.set(["startup"])
+
             repeat = kwargs["repeat"]
             repeat_count = 0
             repeat_until_unexpected = kwargs["repeat_until_unexpected"]
@@ -268,6 +279,7 @@
                     else:
                         run_tests = test_loader.tests
 
+                    recording.pause()
                     with ManagerGroup("web-platform-tests",
                                       kwargs["processes"],
                                       test_source_cls,
@@ -281,7 +293,8 @@
                                       kwargs["pause_on_unexpected"],
                                       kwargs["restart_on_unexpected"],
                                       kwargs["debug_info"],
-                                      not kwargs["no_capture_stdio"]) as manager_group:
+                                      not kwargs["no_capture_stdio"],
+                                      recording=recording) as manager_group:
                         try:
                             manager_group.run(test_type, run_tests)
                         except KeyboardInterrupt:
@@ -290,7 +303,7 @@
                             raise
                         test_count += manager_group.test_count()
                         unexpected_count += manager_group.unexpected_count()
-
+                recording.set(["after-end"])
                 test_total += test_count
                 unexpected_total += unexpected_count
                 logger.info("Got %i unexpected results" % unexpected_count)