7660 Avoid CI stability checks timing out (#32202)

* allow stability checks to avoid TC timeout by checking times in between repeat runs

* fix flake8 issue

* remove empty flag to trigger stability checks

* some commenting explaining how iterations are being tracked

* add --repeat-max-time flag

* better descriptors for announcing results

If the repeat runs stop early to avoid, timeout, a message will be written in the results explaining so.

* cast max_time to timedelta

* correct syntax for kwargs

* specify kwargs source for run_test_iteration

* remove empty css flags tag to trigger stability checks

* Add clause to handle an ineffective number of iterations completing

* replace unassociated change used to trigger stability checks

* Implement changes suggested by @stephenmcgruer

* Add only necessary formatting changes to stability

* wptrunner reformatting changes suggested by @jgraham

* functional changes to stability tests suggested by @jgraham

* flake8 changes "line break before binary operator"

* change run_tests to return counts object

* ensure run_tests return values are properly accessed

Now that wptrunner's run_tests returns more than 1 value, the return type will be a tuple for the older variables that expect only 1 value. We need to ensure that we pull the expected first value (boolean) out of that tuple.

* run_tests has consistent return values even in fail cases

wptrunner's run_tests would return a tuple only if not issues arose while running, and would return only a boolean in the case of some expected issue. Now a tuple is returned in all instances as expected.

* Return full counts in TestStatus class

run_tests will now return a new TestStatus object rather than returning only the number of iterations run. This will allow for more robust statistics to be shown in the future.

* small formatting changes

reducing some comments and logs to taking less vertical space.

* small wording change

TestStatus docstring

* Replace counts with TestStatus object

forego the use of defaultdict counts keeping track of test info/results and instead use the custom class TestStatus.

* convert some log strings to f-strings
diff --git a/tools/wptrunner/wptrunner/stability.py b/tools/wptrunner/wptrunner/stability.py
index eeb5af2..ad319a5 100644
--- a/tools/wptrunner/wptrunner/stability.py
+++ b/tools/wptrunner/wptrunner/stability.py
@@ -288,7 +288,10 @@
     # warning+ level logs only
     logger.add_handler(StreamHandler(log, JSONFormatter()))
 
-    wptrunner.run_tests(**kwargs)
+    # Use the number of iterations of the test suite that were run to process the results.
+    # if the runs were stopped to avoid hitting the maximum run time.
+    _, test_status = wptrunner.run_tests(**kwargs)
+    iterations = test_status.repeated_runs
 
     logger._state.handlers = initial_handlers
     logger._state.running_tests = set()
@@ -311,12 +314,24 @@
         if repeat_loop:
             desc = "Running tests in a loop %d times%s" % (repeat_loop,
                                                            flags_string)
-            steps.append((desc, functools.partial(run_step, logger, repeat_loop, False, kwargs_extra)))
+            steps.append((desc,
+                          functools.partial(run_step,
+                                            logger,
+                                            repeat_loop,
+                                            False,
+                                            kwargs_extra),
+                          repeat_loop))
 
         if repeat_restart:
             desc = "Running tests in a loop with restarts %s times%s" % (repeat_restart,
                                                                          flags_string)
-            steps.append((desc, functools.partial(run_step, logger, repeat_restart, True, kwargs_extra)))
+            steps.append((desc,
+                          functools.partial(run_step,
+                                            logger,
+                                            repeat_restart,
+                                            True,
+                                            kwargs_extra),
+                          repeat_restart))
 
     return steps
 
@@ -335,6 +350,7 @@
 
     logger.info(':::')
 
+
 def check_stability(logger, repeat_loop=10, repeat_restart=5, chaos_mode=True, max_time=None,
                     output_results=True, **kwargs):
     kwargs_extras = [{}]
@@ -348,7 +364,7 @@
 
     github_checks_outputter = get_gh_checks_outputter(kwargs["github_checks_text_file"])
 
-    for desc, step_func in steps:
+    for desc, step_func, expected_iterations in steps:
         if max_time and datetime.now() - start_time > max_time:
             logger.info("::: Test verification is taking too long: Giving up!")
             logger.info("::: So far, all checks passed, but not all checks were run.")
@@ -359,6 +375,14 @@
         logger.info('::: Running test verification step "%s"...' % desc)
         logger.info(':::')
         results, inconsistent, slow, iterations = step_func(**kwargs)
+
+        if iterations <= 1 and expected_iterations > 1:
+            step_results.append((desc, "FAIL"))
+            logger.info("::: Reached iteration timeout before finishing 2 or more repeat runs.")
+            logger.info("::: At least 2 successful repeat runs are required to validate stability.")
+            write_summary(logger, step_results, "TIMEOUT")
+            return 1
+
         if output_results:
             write_results(logger.info, results, iterations)
 
@@ -378,6 +402,12 @@
             write_summary(logger, step_results, "FAIL")
             return 1
 
-        step_results.append((desc, "PASS"))
+        # If the tests passed but the number of iterations didn't match the number expected to run,
+        # it is likely that the runs were stopped early to avoid a timeout.
+        if iterations != expected_iterations:
+            result = f"PASS *  {iterations}/{expected_iterations} repeats completed"
+            step_results.append((desc, result))
+        else:
+            step_results.append((desc, "PASS"))
 
     write_summary(logger, step_results, "PASS")
diff --git a/tools/wptrunner/wptrunner/wptcommandline.py b/tools/wptrunner/wptrunner/wptcommandline.py
index 69f5794..ce50b3e 100644
--- a/tools/wptrunner/wptrunner/wptcommandline.py
+++ b/tools/wptrunner/wptrunner/wptcommandline.py
@@ -116,6 +116,10 @@
                             default=None,
                             help="The maximum number of minutes for the job to run",
                             type=lambda x: timedelta(minutes=float(x)))
+    mode_group.add_argument("--repeat-max-time", action="store",
+                            default=100,
+                            help="The maximum number of minutes for the test suite to attempt repeat runs",
+                            type=int)
     output_results_group = mode_group.add_mutually_exclusive_group()
     output_results_group.add_argument("--verify-no-output-results", action="store_false",
                                       dest="verify_output_results",
diff --git a/tools/wptrunner/wptrunner/wptrunner.py b/tools/wptrunner/wptrunner/wptrunner.py
index 76ef3db..7fbe776 100644
--- a/tools/wptrunner/wptrunner/wptrunner.py
+++ b/tools/wptrunner/wptrunner/wptrunner.py
@@ -1,6 +1,7 @@
 import json
 import os
 import sys
+from datetime import datetime, timedelta
 
 import wptserve
 from wptserve import sslutils
@@ -150,6 +151,133 @@
     return kwargs["pause_after_test"]
 
 
+def run_test_iteration(test_status, test_loader, test_source_kwargs, test_source_cls, run_info,
+                       recording, test_environment, product, run_test_kwargs):
+    """Runs the entire test suite.
+    This is called for each repeat run requested."""
+    tests = []
+    for test_type in test_loader.test_types:
+        tests.extend(test_loader.tests[test_type])
+
+    try:
+        test_groups = test_source_cls.tests_by_group(
+            tests, **test_source_kwargs)
+    except Exception:
+        logger.critical("Loading tests failed")
+        return False
+
+    logger.suite_start(test_groups,
+                       name='web-platform-test',
+                       run_info=run_info,
+                       extra={"run_by_dir": run_test_kwargs["run_by_dir"]})
+    for test_type in run_test_kwargs["test_types"]:
+        logger.info(f"Running {test_type} tests")
+
+        browser_cls = product.get_browser_cls(test_type)
+
+        browser_kwargs = product.get_browser_kwargs(logger,
+                                                    test_type,
+                                                    run_info,
+                                                    config=test_environment.config,
+                                                    num_test_groups=len(test_groups),
+                                                    **run_test_kwargs)
+
+        executor_cls = product.executor_classes.get(test_type)
+        executor_kwargs = product.get_executor_kwargs(logger,
+                                                      test_type,
+                                                      test_environment,
+                                                      run_info,
+                                                      **run_test_kwargs)
+
+        if executor_cls is None:
+            logger.error(f"Unsupported test type {test_type} for product {product.name}")
+            continue
+
+        for test in test_loader.disabled_tests[test_type]:
+            logger.test_start(test.id)
+            logger.test_end(test.id, status="SKIP")
+            test_status.skipped += 1
+
+        if test_type == "testharness":
+            run_tests = {"testharness": []}
+            for test in test_loader.tests["testharness"]:
+                if ((test.testdriver and not executor_cls.supports_testdriver) or
+                        (test.jsshell and not executor_cls.supports_jsshell)):
+                    logger.test_start(test.id)
+                    logger.test_end(test.id, status="SKIP")
+                    test_status.skipped += 1
+                else:
+                    run_tests["testharness"].append(test)
+        else:
+            run_tests = test_loader.tests
+
+        recording.pause()
+        with ManagerGroup("web-platform-tests",
+                          run_test_kwargs["processes"],
+                          test_source_cls,
+                          test_source_kwargs,
+                          browser_cls,
+                          browser_kwargs,
+                          executor_cls,
+                          executor_kwargs,
+                          run_test_kwargs["rerun"],
+                          run_test_kwargs["pause_after_test"],
+                          run_test_kwargs["pause_on_unexpected"],
+                          run_test_kwargs["restart_on_unexpected"],
+                          run_test_kwargs["debug_info"],
+                          not run_test_kwargs["no_capture_stdio"],
+                          recording=recording) as manager_group:
+            try:
+                manager_group.run(test_type, run_tests)
+            except KeyboardInterrupt:
+                logger.critical("Main thread got signal")
+                manager_group.stop()
+                raise
+            test_status.total_tests += manager_group.test_count()
+            test_status.unexpected += manager_group.unexpected_count()
+            test_status.unexpected_pass += manager_group.unexpected_pass_count()
+
+    return True
+
+
+def evaluate_runs(test_status, run_test_kwargs):
+    """Evaluates the test counts after the given number of repeat runs has finished"""
+    if test_status.total_tests == 0:
+        if test_status.skipped > 0:
+            logger.warning("All requested tests were skipped")
+        else:
+            if run_test_kwargs["default_exclude"]:
+                logger.info("No tests ran")
+                return True
+            else:
+                logger.critical("No tests ran")
+                return False
+
+    if test_status.unexpected and not run_test_kwargs["fail_on_unexpected"]:
+        logger.info(f"Tolerating {test_status.unexpected} unexpected results")
+        return True
+
+    all_unexpected_passed = (test_status.unexpected and
+                             test_status.unexpected == test_status.unexpected_pass)
+    if all_unexpected_passed and not run_test_kwargs["fail_on_unexpected_pass"]:
+        logger.info(f"Tolerating {test_status.unexpected_pass} unexpected results "
+                    "because they all PASS")
+        return True
+
+    return test_status.unexpected == 0
+
+
+class TestStatus:
+    """Class that stores information on the results of test runs for later reference"""
+    def __init__(self):
+        self.total_tests = 0
+        self.skipped = 0
+        self.unexpected = 0
+        self.unexpected_pass = 0
+        self.repeated_runs = 0
+        self.expected_repeated_runs = 0
+
+
 def run_tests(config, test_paths, product, **kwargs):
     """Set up the test environment, load the list of tests to be executed, and
     invoke the remainder of the code to execute tests"""
@@ -196,17 +324,16 @@
 
         logger.info("Using %i client processes" % kwargs["processes"])
 
-        skipped_tests = 0
-        test_total = 0
-        unexpected_total = 0
-        unexpected_pass_total = 0
+        test_status = TestStatus()
+        repeat = kwargs["repeat"]
+        test_status.expected_repeat = repeat
 
         if len(test_loader.test_ids) == 0 and kwargs["test_list"]:
             logger.critical("Unable to find any tests at the path(s):")
             for path in kwargs["test_list"]:
                 logger.critical("  %s" % path)
             logger.critical("Please check spelling and make sure there are tests in the specified path(s).")
-            return False
+            return False, test_status
         kwargs["pause_after_test"] = get_pause_after_test(test_loader, **kwargs)
 
         ssl_config = {"type": kwargs["ssl_type"],
@@ -235,143 +362,65 @@
             recording.set(["startup", "ensure_environment"])
             try:
                 test_environment.ensure_started()
+                start_time = datetime.now()
             except env.TestEnvironmentError as e:
                 logger.critical("Error starting test environment: %s" % e)
                 raise
 
             recording.set(["startup"])
 
-            repeat = kwargs["repeat"]
-            repeat_count = 0
+            max_time = None
+            if "repeat_max_time" in kwargs:
+                max_time = timedelta(minutes=kwargs["repeat_max_time"])
+
             repeat_until_unexpected = kwargs["repeat_until_unexpected"]
 
-            while repeat_count < repeat or repeat_until_unexpected:
-                repeat_count += 1
+            # keep track of longest time taken to complete a test suite iteration
+            # so that the runs can be stopped to avoid a possible TC timeout.
+            longest_iteration_time = timedelta()
+
+            while test_status.repeated_runs < repeat or repeat_until_unexpected:
+                # if the next repeat run could cause the TC timeout to be reached,
+                # stop now and use the test results we have.
+                # Pad the total time by 10% to ensure ample time for the next iteration(s).
+                estimate = (datetime.now() +
+                            timedelta(seconds=(longest_iteration_time.total_seconds() * 1.1)))
+                if not repeat_until_unexpected and max_time and estimate >= start_time + max_time:
+                    logger.info(f"Ran {test_status.repeated_runs} of {repeat} iterations.")
+                    break
+
+                # begin tracking runtime of the test suite
+                iteration_start = datetime.now()
+                test_status.repeated_runs += 1
                 if repeat_until_unexpected:
-                    logger.info("Repetition %i" % (repeat_count))
+                    logger.info(f"Repetition {test_status.repeated_runs}")
                 elif repeat > 1:
-                    logger.info("Repetition %i / %i" % (repeat_count, repeat))
+                    logger.info(f"Repetition {test_status.repeated_runs} / {repeat}")
 
-                test_count = 0
-                unexpected_count = 0
-                unexpected_pass_count = 0
-
-                tests = []
-                for test_type in test_loader.test_types:
-                    tests.extend(test_loader.tests[test_type])
-
-                try:
-                    test_groups = test_source_cls.tests_by_group(tests, **test_source_kwargs)
-                except Exception:
-                    logger.critical("Loading tests failed")
-                    return False
-
-                logger.suite_start(test_groups,
-                                   name='web-platform-test',
-                                   run_info=run_info,
-                                   extra={"run_by_dir": kwargs["run_by_dir"]})
-                for test_type in kwargs["test_types"]:
-                    logger.info("Running %s tests" % test_type)
-
-                    browser_cls = product.get_browser_cls(test_type)
-
-                    browser_kwargs = product.get_browser_kwargs(logger,
-                                                                test_type,
-                                                                run_info,
-                                                                config=test_environment.config,
-                                                                num_test_groups=len(test_groups),
-                                                                **kwargs)
-
-                    executor_cls = product.executor_classes.get(test_type)
-                    executor_kwargs = product.get_executor_kwargs(logger,
-                                                                  test_type,
-                                                                  test_environment,
-                                                                  run_info,
-                                                                  **kwargs)
-
-                    if executor_cls is None:
-                        logger.error("Unsupported test type %s for product %s" %
-                                     (test_type, product.name))
-                        continue
-
-                    for test in test_loader.disabled_tests[test_type]:
-                        logger.test_start(test.id)
-                        logger.test_end(test.id, status="SKIP")
-                        skipped_tests += 1
-
-                    if test_type == "testharness":
-                        run_tests = {"testharness": []}
-                        for test in test_loader.tests["testharness"]:
-                            if ((test.testdriver and not executor_cls.supports_testdriver) or
-                                (test.jsshell and not executor_cls.supports_jsshell)):
-                                logger.test_start(test.id)
-                                logger.test_end(test.id, status="SKIP")
-                                skipped_tests += 1
-                            else:
-                                run_tests["testharness"].append(test)
-                    else:
-                        run_tests = test_loader.tests
-
-                    recording.pause()
-                    with ManagerGroup("web-platform-tests",
-                                      kwargs["processes"],
-                                      test_source_cls,
-                                      test_source_kwargs,
-                                      browser_cls,
-                                      browser_kwargs,
-                                      executor_cls,
-                                      executor_kwargs,
-                                      kwargs["rerun"],
-                                      kwargs["pause_after_test"],
-                                      kwargs["pause_on_unexpected"],
-                                      kwargs["restart_on_unexpected"],
-                                      kwargs["debug_info"],
-                                      not kwargs["no_capture_stdio"],
-                                      recording=recording) as manager_group:
-                        try:
-                            manager_group.run(test_type, run_tests)
-                        except KeyboardInterrupt:
-                            logger.critical("Main thread got signal")
-                            manager_group.stop()
-                            raise
-                        test_count += manager_group.test_count()
-                        unexpected_count += manager_group.unexpected_count()
-                        unexpected_pass_count += manager_group.unexpected_pass_count()
+                iter_success = run_test_iteration(test_status, test_loader, test_source_kwargs,
+                                                  test_source_cls, run_info, recording,
+                                                  test_environment, product, kwargs)
+                # if there were issues with the suite run(tests not loaded, etc.) return
+                if not iter_success:
+                    return False, test_status
                 recording.set(["after-end"])
-                test_total += test_count
-                unexpected_total += unexpected_count
-                unexpected_pass_total += unexpected_pass_count
-                logger.info("Got %i unexpected results, with %i unexpected passes" %
-                            (unexpected_count, unexpected_pass_count))
+                logger.info(f"Got {test_status.unexpected} unexpected results, "
+                    f"with {test_status.unexpected_pass} unexpected passes")
                 logger.suite_end()
-                if repeat_until_unexpected and unexpected_total > 0:
+
+                # Note this iteration's runtime
+                iteration_runtime = datetime.now() - iteration_start
+                # determine the longest test suite runtime seen.
+                longest_iteration_time = max(longest_iteration_time,
+                                             iteration_runtime)
+
+                if repeat_until_unexpected and test_status.unexpected > 0:
                     break
-                if repeat_count == 1 and len(test_loader.test_ids) == skipped_tests:
+                if test_status.repeated_runs == 1 and len(test_loader.test_ids) == test_status.skipped:
                     break
 
-    if test_total == 0:
-        if skipped_tests > 0:
-            logger.warning("All requested tests were skipped")
-        else:
-            if kwargs["default_exclude"]:
-                logger.info("No tests ran")
-                return True
-            else:
-                logger.critical("No tests ran")
-                return False
-
-    if unexpected_total and not kwargs["fail_on_unexpected"]:
-        logger.info("Tolerating %s unexpected results" % unexpected_total)
-        return True
-
-    all_unexpected_passed = (unexpected_total and
-                             unexpected_total == unexpected_pass_total)
-    if all_unexpected_passed and not kwargs["fail_on_unexpected_pass"]:
-        logger.info("Tolerating %i unexpected results because they all PASS" %
-                    unexpected_pass_total)
-        return True
-
-    return unexpected_total == 0
+    # Return the evaluation of the runs and the number of repeated iterations that were run.
+    return evaluate_runs(test_status, kwargs), test_status
 
 
 def check_stability(**kwargs):
@@ -411,7 +460,7 @@
         elif kwargs["verify"] or kwargs["stability"]:
             rv = check_stability(**kwargs) or logged_critical.has_log
         else:
-            rv = not run_tests(**kwargs) or logged_critical.has_log
+            rv = not run_tests(**kwargs)[0] or logged_critical.has_log
     finally:
         logger.remove_handler(handler)
     return rv