| # Copyright (C) 2011 Google Inc. All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # * Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above |
| # copyright notice, this list of conditions and the following disclaimer |
| # in the documentation and/or other materials provided with the |
| # distribution. |
| # * Neither the name of Google Inc. nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| import logging |
| import math |
| import threading |
| import time |
| |
| from webkitpy.common import message_pool |
| from webkitpy.layout_tests.controllers import single_test_runner |
| from webkitpy.layout_tests.models.test_run_results import TestRunResults |
| from webkitpy.layout_tests.models import test_expectations |
| from webkitpy.layout_tests.models import test_failures |
| from webkitpy.layout_tests.models import test_results |
| from webkitpy.tool import grammar |
| |
| |
| _log = logging.getLogger(__name__) |
| |
| |
| TestExpectations = test_expectations.TestExpectations |
| |
| # Export this so callers don't need to know about message pools. |
| WorkerException = message_pool.WorkerException |
| |
| |
| class TestRunInterruptedException(Exception): |
| """Raised when a test run should be stopped immediately.""" |
| def __init__(self, reason): |
| Exception.__init__(self) |
| self.reason = reason |
| self.msg = reason |
| |
| def __reduce__(self): |
| return self.__class__, (self.reason,) |
| |
| |
| class LayoutTestRunner(object): |
| def __init__(self, options, port, printer, results_directory, test_is_slow_fn): |
| self._options = options |
| self._port = port |
| self._printer = printer |
| self._results_directory = results_directory |
| self._test_is_slow = test_is_slow_fn |
| self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards) |
| self._filesystem = self._port.host.filesystem |
| |
| self._expectations = None |
| self._test_inputs = [] |
| self._retrying = False |
| |
| self._current_run_results = None |
| |
| def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying): |
| self._expectations = expectations |
| self._test_inputs = test_inputs |
| self._retrying = retrying |
| |
| # FIXME: rename all variables to test_run_results or some such ... |
| run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip)) |
| self._current_run_results = run_results |
| self._printer.num_tests = len(test_inputs) |
| self._printer.num_completed = 0 |
| |
| if not retrying: |
| self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type) |
| |
| for test_name in set(tests_to_skip): |
| result = test_results.TestResult(test_name) |
| result.type = test_expectations.SKIP |
| run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name)) |
| |
| self._printer.write_update('Sharding tests ...') |
| locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel) |
| |
| # We don't have a good way to coordinate the workers so that they don't |
| # try to run the shards that need a lock. The easiest solution is to |
| # run all of the locked shards first. |
| all_shards = locked_shards + unlocked_shards |
| num_workers = min(num_workers, len(all_shards)) |
| self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards)) |
| |
| if self._options.dry_run: |
| return run_results |
| |
| self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers)) |
| |
| try: |
| with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool: |
| pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards) |
| except TestRunInterruptedException, e: |
| _log.warning(e.reason) |
| run_results.interrupted = True |
| except KeyboardInterrupt: |
| self._printer.flush() |
| self._printer.writeln('Interrupted, exiting ...') |
| raise |
| except Exception, e: |
| _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e))) |
| raise |
| |
| return run_results |
| |
| def _worker_factory(self, worker_connection): |
| results_directory = self._results_directory |
| if self._retrying: |
| self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries')) |
| results_directory = self._filesystem.join(self._results_directory, 'retries') |
| return Worker(worker_connection, results_directory, self._options) |
| |
| def _mark_interrupted_tests_as_skipped(self, run_results): |
| for test_input in self._test_inputs: |
| if test_input.test_name not in run_results.results_by_name: |
| result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()]) |
| # FIXME: We probably need to loop here if there are multiple iterations. |
| # FIXME: Also, these results are really neither expected nor unexpected. We probably |
| # need a third type of result. |
| run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name)) |
| |
| def _interrupt_if_at_failure_limits(self, run_results): |
| # Note: The messages in this method are constructed to match old-run-webkit-tests |
| # so that existing buildbot grep rules work. |
| def interrupt_if_at_failure_limit(limit, failure_count, run_results, message): |
| if limit and failure_count >= limit: |
| message += " %d tests run." % (run_results.expected + run_results.unexpected) |
| self._mark_interrupted_tests_as_skipped(run_results) |
| raise TestRunInterruptedException(message) |
| |
| interrupt_if_at_failure_limit( |
| self._options.exit_after_n_failures, |
| run_results.unexpected_failures, |
| run_results, |
| "Exiting early after %d failures." % run_results.unexpected_failures) |
| interrupt_if_at_failure_limit( |
| self._options.exit_after_n_crashes_or_timeouts, |
| run_results.unexpected_crashes + run_results.unexpected_timeouts, |
| run_results, |
| # This differs from ORWT because it does not include WebProcess crashes. |
| "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts)) |
| |
| def _update_summary_with_result(self, run_results, result): |
| expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type) |
| exp_str = self._expectations.get_expectations_string(result.test_name) |
| got_str = self._expectations.expectation_to_string(result.type) |
| |
| run_results.add(result, expected, self._test_is_slow(result.test_name)) |
| |
| self._printer.print_finished_test(result, expected, exp_str, got_str) |
| |
| self._interrupt_if_at_failure_limits(run_results) |
| |
| def handle(self, name, source, *args): |
| method = getattr(self, '_handle_' + name) |
| if method: |
| return method(source, *args) |
| raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args))) |
| |
| def _handle_started_test(self, worker_name, test_input, test_timeout_sec): |
| self._printer.print_started_test(test_input.test_name) |
| |
| def _handle_finished_test_list(self, worker_name, list_name): |
| pass |
| |
| def _handle_finished_test(self, worker_name, result, log_messages=[]): |
| self._update_summary_with_result(self._current_run_results, result) |
| |
| |
| class Worker(object): |
| def __init__(self, caller, results_directory, options): |
| self._caller = caller |
| self._worker_number = caller.worker_number |
| self._name = caller.name |
| self._results_directory = results_directory |
| self._options = options |
| |
| # The remaining fields are initialized in start() |
| self._host = None |
| self._port = None |
| self._batch_size = None |
| self._batch_count = None |
| self._filesystem = None |
| self._driver = None |
| self._num_tests = 0 |
| |
| def __del__(self): |
| self.stop() |
| |
| def start(self): |
| """This method is called when the object is starting to be used and it is safe |
| for the object to create state that does not need to be pickled (usually this means |
| it is called in a child process).""" |
| self._host = self._caller.host |
| self._filesystem = self._host.filesystem |
| self._port = self._host.port_factory.get(self._options.platform, self._options) |
| |
| self._batch_count = 0 |
| self._batch_size = self._options.batch_size or 0 |
| |
| def handle(self, name, source, test_list_name, test_inputs): |
| assert name == 'test_list' |
| for test_input in test_inputs: |
| self._run_test(test_input, test_list_name) |
| self._caller.post('finished_test_list', test_list_name) |
| |
| def _update_test_input(self, test_input): |
| if test_input.reference_files is None: |
| # Lazy initialization. |
| test_input.reference_files = self._port.reference_files(test_input.test_name) |
| if test_input.reference_files: |
| test_input.should_run_pixel_test = True |
| else: |
| test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input) |
| |
| def _run_test(self, test_input, shard_name): |
| self._batch_count += 1 |
| |
| stop_when_done = False |
| if self._batch_size > 0 and self._batch_count >= self._batch_size: |
| self._batch_count = 0 |
| stop_when_done = True |
| |
| self._update_test_input(test_input) |
| test_timeout_sec = self._timeout(test_input) |
| start = time.time() |
| self._caller.post('started_test', test_input, test_timeout_sec) |
| |
| result = self._run_test_with_timeout(test_input, test_timeout_sec, stop_when_done) |
| result.shard_name = shard_name |
| result.worker_name = self._name |
| result.total_run_time = time.time() - start |
| result.test_number = self._num_tests |
| self._num_tests += 1 |
| |
| self._caller.post('finished_test', result) |
| |
| self._clean_up_after_test(test_input, result) |
| |
| def stop(self): |
| _log.debug("%s cleaning up" % self._name) |
| self._kill_driver() |
| |
| def _timeout(self, test_input): |
| """Compute the appropriate timeout value for a test.""" |
| # The driver watchdog uses 2.5x the timeout; we want to be |
| # larger than that. We also add a little more padding if we're |
| # running tests in a separate thread. |
| # |
| # Note that we need to convert the test timeout from a |
| # string value in milliseconds to a float for Python. |
| driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0 |
| if not self._options.run_singly: |
| return driver_timeout_sec |
| |
| thread_padding_sec = 1.0 |
| thread_timeout_sec = driver_timeout_sec + thread_padding_sec |
| return thread_timeout_sec |
| |
| def _kill_driver(self): |
| # Be careful about how and when we kill the driver; if driver.stop() |
| # raises an exception, this routine may get re-entered via __del__. |
| driver = self._driver |
| self._driver = None |
| if driver: |
| _log.debug("%s killing driver" % self._name) |
| driver.stop() |
| |
| def _run_test_with_timeout(self, test_input, timeout, stop_when_done): |
| if self._options.run_singly: |
| return self._run_test_in_another_thread(test_input, timeout, stop_when_done) |
| return self._run_test_in_this_thread(test_input, stop_when_done) |
| |
| def _clean_up_after_test(self, test_input, result): |
| test_name = test_input.test_name |
| |
| if result.failures: |
| # Check and kill the driver if we need to. |
| if any([f.driver_needs_restart() for f in result.failures]): |
| self._kill_driver() |
| # Reset the batch count since the shell just bounced. |
| self._batch_count = 0 |
| |
| # Print the error message(s). |
| _log.debug("%s %s failed:" % (self._name, test_name)) |
| for f in result.failures: |
| _log.debug("%s %s" % (self._name, f.message())) |
| elif result.type == test_expectations.SKIP: |
| _log.debug("%s %s skipped" % (self._name, test_name)) |
| else: |
| _log.debug("%s %s passed" % (self._name, test_name)) |
| |
| def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done): |
| """Run a test in a separate thread, enforcing a hard time limit. |
| |
| Since we can only detect the termination of a thread, not any internal |
| state or progress, we can only run per-test timeouts when running test |
| files singly. |
| |
| Args: |
| test_input: Object containing the test filename and timeout |
| thread_timeout_sec: time to wait before killing the driver process. |
| Returns: |
| A TestResult |
| """ |
| worker = self |
| |
| driver = self._port.create_driver(self._worker_number) |
| |
| class SingleTestThread(threading.Thread): |
| def __init__(self): |
| threading.Thread.__init__(self) |
| self.result = None |
| |
| def run(self): |
| self.result = worker._run_single_test(driver, test_input, stop_when_done) |
| |
| thread = SingleTestThread() |
| thread.start() |
| thread.join(thread_timeout_sec) |
| result = thread.result |
| failures = [] |
| if thread.isAlive(): |
| # If join() returned with the thread still running, the |
| # driver is completely hung and there's nothing |
| # more we can do with it. We have to kill all the |
| # drivers to free it up. If we're running more than |
| # one driver thread, we'll end up killing the other |
| # drivers too, introducing spurious crashes. We accept |
| # that tradeoff in order to avoid losing the rest of this |
| # thread's results. |
| _log.error('Test thread hung: killing all drivers') |
| failures = [test_failures.FailureTimeout()] |
| |
| driver.stop() |
| |
| if not result: |
| result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0) |
| return result |
| |
| def _run_test_in_this_thread(self, test_input, stop_when_done): |
| """Run a single test file using a shared driver process. |
| |
| Args: |
| test_input: Object containing the test filename, uri and timeout |
| |
| Returns: a TestResult object. |
| """ |
| if self._driver and self._driver.has_crashed(): |
| self._kill_driver() |
| if not self._driver: |
| self._driver = self._port.create_driver(self._worker_number) |
| return self._run_single_test(self._driver, test_input, stop_when_done) |
| |
| def _run_single_test(self, driver, test_input, stop_when_done): |
| return single_test_runner.run_single_test(self._port, self._options, self._results_directory, |
| self._name, driver, test_input, stop_when_done) |
| |
| |
| class TestShard(object): |
| """A test shard is a named list of TestInputs.""" |
| |
| def __init__(self, name, test_inputs): |
| self.name = name |
| self.test_inputs = test_inputs |
| self.requires_lock = test_inputs[0].requires_lock |
| |
| def __repr__(self): |
| return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock) |
| |
| def __eq__(self, other): |
| return self.name == other.name and self.test_inputs == other.test_inputs |
| |
| |
| class Sharder(object): |
| def __init__(self, test_split_fn, max_locked_shards): |
| self._split = test_split_fn |
| self._max_locked_shards = max_locked_shards |
| |
| def shard_tests(self, test_inputs, num_workers, fully_parallel): |
| """Groups tests into batches. |
| This helps ensure that tests that depend on each other (aka bad tests!) |
| continue to run together as most cross-tests dependencies tend to |
| occur within the same directory. |
| Return: |
| Two list of TestShards. The first contains tests that must only be |
| run under the server lock, the second can be run whenever. |
| """ |
| |
| # FIXME: Move all of the sharding logic out of manager into its |
| # own class or module. Consider grouping it with the chunking logic |
| # in prepare_lists as well. |
| if num_workers == 1: |
| return self._shard_in_two(test_inputs) |
| elif fully_parallel: |
| return self._shard_every_file(test_inputs) |
| return self._shard_by_directory(test_inputs, num_workers) |
| |
| def _shard_in_two(self, test_inputs): |
| """Returns two lists of shards, one with all the tests requiring a lock and one with the rest. |
| |
| This is used when there's only one worker, to minimize the per-shard overhead.""" |
| locked_inputs = [] |
| unlocked_inputs = [] |
| for test_input in test_inputs: |
| if test_input.requires_lock: |
| locked_inputs.append(test_input) |
| else: |
| unlocked_inputs.append(test_input) |
| |
| locked_shards = [] |
| unlocked_shards = [] |
| if locked_inputs: |
| locked_shards = [TestShard('locked_tests', locked_inputs)] |
| if unlocked_inputs: |
| unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)] |
| |
| return locked_shards, unlocked_shards |
| |
| def _shard_every_file(self, test_inputs): |
| """Returns two lists of shards, each shard containing a single test file. |
| |
| This mode gets maximal parallelism at the cost of much higher flakiness.""" |
| locked_shards = [] |
| unlocked_shards = [] |
| for test_input in test_inputs: |
| # Note that we use a '.' for the shard name; the name doesn't really |
| # matter, and the only other meaningful value would be the filename, |
| # which would be really redundant. |
| if test_input.requires_lock: |
| locked_shards.append(TestShard('.', [test_input])) |
| else: |
| unlocked_shards.append(TestShard('.', [test_input])) |
| |
| return locked_shards, unlocked_shards |
| |
| def _shard_by_directory(self, test_inputs, num_workers): |
| """Returns two lists of shards, each shard containing all the files in a directory. |
| |
| This is the default mode, and gets as much parallelism as we can while |
| minimizing flakiness caused by inter-test dependencies.""" |
| locked_shards = [] |
| unlocked_shards = [] |
| unlocked_slow_shards = [] |
| tests_by_dir = {} |
| # FIXME: Given that the tests are already sorted by directory, |
| # we can probably rewrite this to be clearer and faster. |
| for test_input in test_inputs: |
| directory = self._split(test_input.test_name)[0] |
| tests_by_dir.setdefault(directory, []) |
| tests_by_dir[directory].append(test_input) |
| |
| for directory, test_inputs in tests_by_dir.iteritems(): |
| shard = TestShard(directory, test_inputs) |
| if test_inputs[0].requires_lock: |
| locked_shards.append(shard) |
| # In practice, virtual test suites are slow to run. It's a bit hacky, but |
| # put them first since they're the long-tail of test runtime. |
| elif directory.startswith('virtual'): |
| unlocked_slow_shards.append(shard) |
| else: |
| unlocked_shards.append(shard) |
| |
| # Sort the shards by directory name. |
| locked_shards.sort(key=lambda shard: shard.name) |
| unlocked_slow_shards.sort(key=lambda shard: shard.name) |
| unlocked_shards.sort(key=lambda shard: shard.name) |
| |
| # Put a ceiling on the number of locked shards, so that we |
| # don't hammer the servers too badly. |
| |
| # FIXME: For now, limit to one shard or set it |
| # with the --max-locked-shards. After testing to make sure we |
| # can handle multiple shards, we should probably do something like |
| # limit this to no more than a quarter of all workers, e.g.: |
| # return max(math.ceil(num_workers / 4.0), 1) |
| return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'), |
| unlocked_slow_shards + unlocked_shards) |
| |
| def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix): |
| """Takes a list of shards and redistributes the tests into no more |
| than |max_new_shards| new shards.""" |
| |
| # This implementation assumes that each input shard only contains tests from a |
| # single directory, and that tests in each shard must remain together; as a |
| # result, a given input shard is never split between output shards. |
| # |
| # Each output shard contains the tests from one or more input shards and |
| # hence may contain tests from multiple directories. |
| |
| def divide_and_round_up(numerator, divisor): |
| return int(math.ceil(float(numerator) / divisor)) |
| |
| def extract_and_flatten(shards): |
| test_inputs = [] |
| for shard in shards: |
| test_inputs.extend(shard.test_inputs) |
| return test_inputs |
| |
| def split_at(seq, index): |
| return (seq[:index], seq[index:]) |
| |
| num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards) |
| new_shards = [] |
| remaining_shards = old_shards |
| while remaining_shards: |
| some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new) |
| new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards))) |
| return new_shards |