[wptrunner] Implement `--retry-unexpected` (#35925)

This PR implements: https://github.com/web-platform-tests/rfcs/blob/master/rfcs/retry_unexpected.md
diff --git a/tools/wptrunner/wptrunner/testrunner.py b/tools/wptrunner/wptrunner/testrunner.py
index 733dba0..b6e4e78 100644
--- a/tools/wptrunner/wptrunner/testrunner.py
+++ b/tools/wptrunner/wptrunner/testrunner.py
@@ -321,6 +321,7 @@
         self.test_count = 0
         self.unexpected_count = 0
         self.unexpected_pass_count = 0
+        self.unexpected_tests = set()
 
         # This may not really be what we want
         self.daemon = True
@@ -673,6 +674,9 @@
         if is_unexpected_pass:
             self.unexpected_pass_count += 1
 
+        if is_unexpected or subtest_unexpected:
+            self.unexpected_tests.add(test.id)
+
         if "assertion_count" in file_result.extra:
             assertion_count = file_result.extra["assertion_count"]
             if assertion_count is not None and assertion_count > 0:
@@ -899,12 +903,11 @@
     def run(self, test_type, tests):
         """Start all managers in the group"""
         self.logger.debug("Using %i processes" % self.size)
-        type_tests = tests[test_type]
-        if not type_tests:
+        if not tests:
             self.logger.info("No %s tests to run" % test_type)
             return
 
-        test_queue = make_test_queue(type_tests, self.test_source_cls, **self.test_source_kwargs)
+        test_queue = make_test_queue(tests, self.test_source_cls, **self.test_source_kwargs)
 
         for idx in range(self.size):
             manager = TestRunnerManager(self.suite_name,
@@ -948,3 +951,6 @@
 
     def unexpected_pass_count(self):
         return sum(manager.unexpected_pass_count for manager in self.pool)
+
+    def unexpected_tests(self):
+        return set().union(*(manager.unexpected_tests for manager in self.pool))
diff --git a/tools/wptrunner/wptrunner/wptcommandline.py b/tools/wptrunner/wptrunner/wptcommandline.py
index 115e9de..a9c63f9 100644
--- a/tools/wptrunner/wptrunner/wptcommandline.py
+++ b/tools/wptrunner/wptrunner/wptcommandline.py
@@ -180,6 +180,12 @@
                                  help="Number of times to run the tests, restarting between each run")
     debugging_group.add_argument("--repeat-until-unexpected", action="store_true", default=None,
                                  help="Run tests in a loop until one returns an unexpected result")
+    debugging_group.add_argument('--retry-unexpected', type=int, default=0,
+                                 help=('Maximum number of times to retry '
+                                       'each test that consistently runs '
+                                       'unexpectedly in the initial repeat '
+                                       'loop. A retried test takes any '
+                                       'expected status as its final result.'))
     debugging_group.add_argument('--pause-after-test', action="store_true", default=None,
                                  help="Halt the test runner after each test (this happens by default if only a single test is run)")
     debugging_group.add_argument('--no-pause-after-test', dest="pause_after_test", action="store_false",
diff --git a/tools/wptrunner/wptrunner/wptrunner.py b/tools/wptrunner/wptrunner/wptrunner.py
index 6a661db..af6651a 100644
--- a/tools/wptrunner/wptrunner/wptrunner.py
+++ b/tools/wptrunner/wptrunner/wptrunner.py
@@ -158,7 +158,7 @@
 def run_test_iteration(test_status, test_loader, test_source_kwargs, test_source_cls, run_info,
                        recording, test_environment, product, run_test_kwargs):
     """Runs the entire test suite.
-    This is called for each repeat run requested."""
+    This is called for each repeat or retry run requested."""
     tests = []
     for test_type in test_loader.test_types:
         tests.extend(test_loader.tests[test_type])
@@ -170,6 +170,12 @@
         logger.critical("Loading tests failed")
         return False
 
+    if test_status.retries_remaining:
+        for test_type, tests in dict(test_groups).items():
+            test_groups[test_type] = [test for test in tests
+                                      if test in test_status.unexpected_tests]
+
+    unexpected_tests = set()
     logger.suite_start(test_groups,
                        name='web-platform-test',
                        run_info=run_info,
@@ -178,7 +184,6 @@
         logger.info(f"Running {test_type} tests")
 
         browser_cls = product.get_browser_cls(test_type)
-
         browser_kwargs = product.get_browser_kwargs(logger,
                                                     test_type,
                                                     run_info,
@@ -203,7 +208,7 @@
             test_status.skipped += 1
 
         if test_type == "testharness":
-            run_tests = {"testharness": []}
+            tests_to_run = []
             for test in test_loader.tests["testharness"]:
                 if ((test.testdriver and not executor_cls.supports_testdriver) or
                         (test.jsshell and not executor_cls.supports_jsshell)):
@@ -211,9 +216,12 @@
                     logger.test_end(test.id, status="SKIP")
                     test_status.skipped += 1
                 else:
-                    run_tests["testharness"].append(test)
+                    tests_to_run.append(test)
         else:
-            run_tests = test_loader.tests
+            tests_to_run = test_loader.tests[test_type]
+        if test_status.retries_remaining:
+            tests_to_run = [test for test in tests_to_run
+                            if test.id in test_status.unexpected_tests]
 
         recording.pause()
         with ManagerGroup("web-platform-tests",
@@ -233,7 +241,7 @@
                           run_test_kwargs["restart_on_new_group"],
                           recording=recording) as manager_group:
             try:
-                manager_group.run(test_type, run_tests)
+                manager_group.run(test_type, tests_to_run)
             except KeyboardInterrupt:
                 logger.critical("Main thread got signal")
                 manager_group.stop()
@@ -241,6 +249,12 @@
             test_status.total_tests += manager_group.test_count()
             test_status.unexpected += manager_group.unexpected_count()
             test_status.unexpected_pass += manager_group.unexpected_pass_count()
+            unexpected_tests.update(manager_group.unexpected_tests())
+
+    if test_status.repeated_runs == 1:
+        test_status.unexpected_tests = unexpected_tests
+    else:
+        test_status.unexpected_tests &= unexpected_tests
 
     return True
 
@@ -282,6 +296,8 @@
         self.repeated_runs = 0
         self.expected_repeated_runs = 0
         self.all_skipped = False
+        self.unexpected_tests = set()
+        self.retries_remaining = 0
 
 
 def run_tests(config, test_paths, product, **kwargs):
@@ -428,10 +444,44 @@
                     test_status.all_skipped = True
                     break
 
+            if not test_status.all_skipped and kwargs["retry_unexpected"] > 0:
+                retry_success = retry_unexpected_tests(test_status, test_loader,
+                                                       test_source_kwargs,
+                                                       test_source_cls, run_info,
+                                                       recording, test_environment,
+                                                       product, kwargs)
+                if not retry_success:
+                    return False, test_status
+
     # Return the evaluation of the runs and the number of repeated iterations that were run.
     return evaluate_runs(test_status, kwargs), test_status
 
 
+def retry_unexpected_tests(test_status, test_loader, test_source_kwargs,
+                           test_source_cls, run_info, recording,
+                           test_environment, product, kwargs):
+    kwargs["rerun"] = 1
+    max_retries = kwargs["retry_unexpected"]
+    test_status.retries_remaining = max_retries
+    while (test_status.retries_remaining > 0 and not
+           evaluate_runs(test_status, kwargs)):
+        logger.info(f"Retry {max_retries - test_status.retries_remaining + 1}")
+        test_status.total_tests = 0
+        test_status.skipped = 0
+        test_status.unexpected = 0
+        test_status.unexpected_pass = 0
+        iter_success = run_test_iteration(test_status, test_loader,
+                                          test_source_kwargs, test_source_cls,
+                                          run_info, recording, test_environment,
+                                          product, kwargs)
+        if not iter_success:
+            return False
+        recording.set(["after-end"])
+        logger.suite_end()
+        test_status.retries_remaining -= 1
+    return True
+
+
 def check_stability(**kwargs):
     from . import stability
     if kwargs["stability"]: