| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2011, 2012, 2017 Igalia S.L. |
| # |
| # This library is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Library General Public |
| # License as published by the Free Software Foundation; either |
| # version 2 of the License, or (at your option) any later version. |
| # |
| # This library is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Library General Public License for more details. |
| # |
| # You should have received a copy of the GNU Library General Public License |
| # along with this library; see the file COPYING.LIB. If not, write to |
| # the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, |
| # Boston, MA 02110-1301, USA. |
| |
| import os |
| import optparse |
| import errno |
| import json |
| import sys |
| import re |
| from signal import SIGKILL, SIGSEGV |
| from glib_test_runner import GLibTestRunner |
| |
| top_level_directory = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "..")) |
| sys.path.insert(0, os.path.join(top_level_directory, "Tools", "glib")) |
| import common |
| from webkitpy.common.host import Host |
| from webkitpy.common.test_expectations import TestExpectations |
| from webkitpy.port.monadodriver import MonadoDriver # noqa: E402 |
| from webkitpy.port.westondriver import WestonDriver |
| from webkitcorepy import Timeout |
| |
| import subprocess |
| |
| |
| UNKNOWN_CRASH_STR = "CRASH_OR_PROBLEM_IN_TEST_EXECUTABLE" |
| |
| |
| def port_options(options): |
| port_options = optparse.Values() |
| if options.debug: |
| setattr(port_options, 'configuration', 'Debug') |
| elif options.release: |
| setattr(port_options, 'configuration', 'Release') |
| return port_options |
| |
| class TestRunner(object): |
| TEST_TARGETS = [] |
| |
| def __init__(self, port, options, tests=[]): |
| self._options = options |
| self._port = Host().port_factory.get(port, port_options(options)) |
| self._driver = self._create_driver() |
| self._weston = None |
| self._monado = None |
| |
| self._build_type = self._port.get_option('configuration') |
| common.set_build_types((self._build_type,)) |
| |
| self._programs_path = common.binary_build_path(self._port) |
| expectations_file = os.path.join(common.top_level_path(), "Tools", "TestWebKitAPI", "glib", "TestExpectations.json") |
| self._expectations = TestExpectations(self._port.name(), expectations_file, self._build_type) |
| self._initial_test_list = tests |
| self._tests = self._get_tests(tests) |
| self._disabled_tests = [] |
| |
| def _test_programs_base_dir(self): |
| return os.path.join(self._programs_path, "TestWebKitAPI") |
| |
| def _get_tests_from_dir(self, test_dir): |
| if not os.path.isdir(test_dir): |
| return [] |
| |
| tests = [] |
| for test_file in os.listdir(test_dir): |
| if not test_file.lower().startswith("test"): |
| continue |
| test_path = os.path.join(test_dir, test_file) |
| if os.path.isfile(test_path) and os.access(test_path, os.X_OK): |
| tests.append(test_path) |
| return tests |
| |
| def _get_all_valid_test_names(self): |
| test_paths = [] |
| base_dir = self._test_programs_base_dir() |
| for test_file in os.listdir(base_dir): |
| test_path = os.path.join(base_dir, test_file) |
| if os.path.isdir(test_path): |
| test_paths.extend(self._get_tests_from_dir(test_path)) |
| elif os.path.isfile(test_path) and os.access(test_path, os.X_OK): |
| test_paths.append(test_path) |
| test_dir_prefix_len = len(self._test_programs_base_dir()) + 1 |
| return (path[test_dir_prefix_len:] for path in test_paths) |
| |
| def _get_tests(self, requested_test_list): |
| tests = [] |
| for test in requested_test_list: |
| test = test.split(':', 1)[0] |
| if not os.path.exists(test): |
| test = os.path.join(self._test_programs_base_dir(), test) |
| if os.path.isdir(test): |
| tests.extend(self._get_tests_from_dir(test)) |
| elif os.path.isfile(test) and os.access(test, os.X_OK): |
| tests.append(test) |
| else: |
| sys.stderr.write(f'WARNING: Unable to find test "{test}". Ignoring it. Pass "-l" to see the available tests\n') |
| # If not test found on requested_test_list then return all available |
| if not tests: |
| for test_target in self.TEST_TARGETS: |
| absolute_test_target = os.path.join(self._test_programs_base_dir(), test_target) |
| if test_target.lower().startswith("test") and os.path.isfile(absolute_test_target) and os.access(absolute_test_target, os.X_OK): |
| tests.append(absolute_test_target) |
| else: |
| tests.extend(self._get_tests_from_dir(absolute_test_target)) |
| return sorted(set(tests)) # Remove duplicates and sort |
| |
| def _create_driver(self, port_options=[]): |
| self._port._display_server = self._options.display_server |
| driver = self._port.create_driver(worker_number=0, no_timeout=True)._make_driver(pixel_tests=False) |
| if not driver.check_driver(self._port): |
| raise RuntimeError("Failed to check driver %s" % driver.__class__.__name__) |
| return driver |
| |
| def _setup_testing_environment_for_driver(self, driver): |
| test_env = driver._setup_environ_for_test() | self._port.environment_for_api_tests() |
| # The python display-server driver may set WPE_DISPLAY, but we unset it here because it causes issues with |
| # some WPE API tests like WPEPlatform/TestDisplayDefault that check the default behaviour of the APIs. |
| test_env.pop("WPE_DISPLAY", None) |
| return test_env |
| |
| def _tear_down_testing_environment(self): |
| if self._driver: |
| self._driver.stop() |
| if self._weston: |
| self._weston.stop() |
| self._weston = None |
| if self._monado: |
| self._monado.stop() |
| |
| def _test_cases_to_skip(self, test_program): |
| if self._options.skipped_action != 'skip': |
| return [] |
| |
| return self._expectations.skipped_subtests(os.path.basename(test_program)) |
| |
| def _should_run_test_program(self, test_program): |
| for disabled_test in self._disabled_tests: |
| if test_program.endswith(disabled_test): |
| return False |
| |
| if self._options.skipped_action != 'skip': |
| return True |
| |
| return os.path.basename(test_program) not in self._expectations.skipped_tests() |
| |
| def _kill_process(self, pid): |
| try: |
| os.kill(pid, SIGKILL) |
| except OSError: |
| # Process already died. |
| pass |
| |
| def _waitpid(self, pid): |
| while True: |
| try: |
| dummy, status = os.waitpid(pid, 0) |
| if os.WIFSIGNALED(status): |
| return -os.WTERMSIG(status) |
| if os.WIFEXITED(status): |
| return os.WEXITSTATUS(status) |
| |
| # Should never happen |
| raise RuntimeError("Unknown child exit status!") |
| except (OSError, IOError) as e: |
| if e.errno == errno.EINTR: |
| continue |
| if e.errno == errno.ECHILD: |
| # This happens if SIGCLD is set to be ignored or waiting |
| # for child processes has otherwise been disabled for our |
| # process. This child is dead, we can't get the status. |
| return 0 |
| raise |
| |
| def _use_wpe_legacy_api(self): |
| return hasattr(self._options, 'wpe_legacy_api') and self._options.wpe_legacy_api |
| |
| def _run_test_glib(self, test_program, subtests, skipped_test_cases): |
| timeout = self._options.timeout |
| wpe_legacy_api = self._use_wpe_legacy_api() |
| if self.is_webxr_test(test_program): |
| env = self._monado_env | self._test_env |
| else: |
| env = self._weston_env if self.is_wpe_platform_wayland_test(test_program) else self._test_env |
| if self.is_wpe_platform_test(test_program): |
| # WPE Platform tests can run without swrast since nothing is rendered. |
| env = dict(env) |
| env.pop('LIBGL_ALWAYS_SOFTWARE') |
| |
| def is_slow_test(test, subtest): |
| return self._expectations.is_slow(test, subtest) |
| |
| runner = GLibTestRunner(test_program, timeout, wpe_legacy_api, is_slow_test, timeout * 10) |
| return runner.run(subtests=subtests, skipped=skipped_test_cases, env=env) |
| |
| def _run_test_qt(self, test_program): |
| env = self._test_env |
| env['XDG_SESSION_TYPE'] = 'wayland' |
| env['QML2_IMPORT_PATH'] = common.library_build_path(self._port, 'qt5', 'qml') |
| |
| name = os.path.basename(test_program) |
| if not hasattr(subprocess, 'TimeoutExpired'): |
| print("Can't run WPEQt test in Python2 without subprocess32") |
| return {name: "FAIL"} |
| |
| try: |
| output = subprocess.check_output([test_program, ], stderr=subprocess.STDOUT, |
| env=env, timeout=self._options.timeout) |
| except subprocess.CalledProcessError as exc: |
| print(exc.output) |
| if exc.returncode > 0: |
| result = "FAIL" |
| elif exc.returncode < 0: |
| result = "CRASH" |
| except subprocess.TimeoutExpired as exp: |
| result = "TIMEOUT" |
| print(exp.output) |
| else: |
| result = "PASS" |
| print("**PASS** %s" % name) |
| return {name: result} |
| |
| def _get_tests_from_google_test_suite(self, test_program, skipped_test_cases): |
| try: |
| output = subprocess.check_output([test_program, '--gtest_list_tests'], env=self._test_env).decode('utf-8') |
| except subprocess.CalledProcessError: |
| sys.stderr.write("ERROR: could not list available tests for binary %s.\n" % (test_program)) |
| sys.stderr.flush() |
| sys.exit(1) |
| |
| tests = [] |
| prefix = None |
| for line in output.split('\n'): |
| if not line.startswith(' '): |
| prefix = line |
| continue |
| else: |
| line = line.partition('#')[0] |
| test_name = prefix + line.strip() |
| if not test_name in skipped_test_cases: |
| tests.append(test_name) |
| return tests |
| |
| def _run_google_test(self, test_program, subtest): |
| command = [test_program, '--gtest_filter=%s' % (subtest)] |
| if self._use_wpe_legacy_api() and os.path.basename(test_program) == 'TestWebKit': |
| command.append('--wpe-legacy-api') |
| |
| timeout = self._options.timeout |
| if self._expectations.is_slow(os.path.basename(test_program), subtest): |
| timeout *= 10 |
| |
| pid, fd = os.forkpty() |
| if pid == 0: |
| os.execvpe(command[0], command, self._test_env) |
| sys.exit(0) |
| |
| with Timeout(timeout): |
| try: |
| common.parse_output_lines(fd, sys.stdout.write) |
| status = self._waitpid(pid) |
| os.close(fd) |
| except Timeout.Exception: |
| self._kill_process(pid) |
| os.close(fd) |
| sys.stdout.write("**TIMEOUT** %s\n" % subtest) |
| sys.stdout.flush() |
| return {subtest: "TIMEOUT"} |
| |
| if status == -SIGSEGV: |
| sys.stdout.write("**CRASH** %s\n" % subtest) |
| sys.stdout.flush() |
| return {subtest: "CRASH"} |
| |
| if status != 0: |
| return {subtest: "FAIL"} |
| |
| return {subtest: "PASS"} |
| |
| def _run_google_test_suite(self, test_program, subtests, skipped_test_cases): |
| result = {} |
| for subtest in self._get_tests_from_google_test_suite(test_program, skipped_test_cases): |
| if subtest in subtests or not subtests: |
| result.update(self._run_google_test(test_program, subtest)) |
| return result |
| |
| def is_glib_test(self, test_program): |
| raise NotImplementedError |
| |
| def is_google_test(self, test_program): |
| raise NotImplementedError |
| |
| def is_qt_test(self, test_program): |
| raise NotImplementedError |
| |
| def is_wpe_platform_test(self, test_program): |
| raise NotImplementedError |
| |
| def is_wpe_platform_wayland_test(self, test_program): |
| raise NotImplementedError |
| |
| def is_webxr_test(self, test_program): |
| return "WebXR" in os.path.basename(test_program) |
| |
| def _run_test(self, test_program, subtests, skipped_test_cases): |
| if self.is_glib_test(test_program): |
| return self._run_test_glib(test_program, subtests, skipped_test_cases) |
| |
| if self.is_google_test(test_program): |
| return self._run_google_test_suite(test_program, subtests, skipped_test_cases) |
| |
| # FIXME: support skipping Qt subtests |
| if self.is_qt_test(test_program): |
| return self._run_test_qt(test_program) |
| |
| sys.stderr.write("WARNING: %s doesn't seem to be a supported test program.\n" % test_program) |
| return {} |
| |
| def _has_gpu_available(self): |
| return os.access("/dev/dri/card0", os.R_OK | os.W_OK) and os.access("/dev/dri/renderD128", os.R_OK | os.W_OK) |
| |
| def _get_test_short_name(self, test_path): |
| return test_path.replace(self._test_programs_base_dir(), '', 1).lstrip('/').split(':', 1)[0] |
| |
| def _getsubtests_to_run_for_test(self, requested_test_name): |
| subtests_to_run = [] |
| requested_test_name = self._get_test_short_name(requested_test_name) |
| for test_name in self._initial_test_list: |
| subtest_name = None |
| if ':' in test_name: |
| test_name, subtest_name = test_name.split(':', 1) |
| if requested_test_name == self._get_test_short_name(test_name): |
| if subtest_name: |
| subtests_to_run.append(subtest_name) |
| else: |
| return [] # If there is any entry matching without ":subtest", return [] which means run all subtests. |
| return sorted(set(subtests_to_run)) # Remove duplicates and sort |
| |
| def list_tests(self): |
| indent = ' ' * 4 |
| print(f'Tests available at {self._test_programs_base_dir()} are:') |
| sys.stdout.write(indent) |
| print(f'\n{indent}'.join(sorted(self._get_all_valid_test_names()))) |
| |
| def run_tests(self): |
| if self._options.list_tests: |
| self.list_tests() |
| return 0 |
| |
| self._test_env = self._setup_testing_environment_for_driver(self._driver) |
| |
| number_of_total_tests = len(self._tests) |
| # Remove skipped tests now instead of when we find them, because |
| # some tests might be skipped while setting up the test environment. |
| self._tests = [test for test in self._tests if self._should_run_test_program(test)] |
| number_of_qt_tests = 0 |
| number_of_wpe_platform_wayland_tests = 0 |
| number_of_webxr_tests = 0 |
| for test in self._tests: |
| if self.is_qt_test(test): |
| number_of_qt_tests += 1 |
| elif self.is_wpe_platform_wayland_test(test): |
| number_of_wpe_platform_wayland_tests += 1 |
| elif self.is_webxr_test(test): |
| number_of_webxr_tests += 1 |
| |
| # Skip Qt tests if there is no GPU <https://webkit.org/b/264458> |
| if number_of_qt_tests > 0 and not self._has_gpu_available(): |
| sys.stderr.write("WARNING: Skipping %d Qt tests because this system doesn't have a working GPU (/dev/dri devices are not available).\n" % number_of_qt_tests) |
| self._tests = [test for test in self._tests if not self.is_qt_test(test)] |
| |
| # Start Weston if WPE Platform Wayland tests are going to be run. |
| if number_of_wpe_platform_wayland_tests > 0: |
| if WestonDriver.check_driver(self._port): |
| self._weston = WestonDriver(self._port, worker_number=0, pixel_tests=False, no_timeout=True) |
| self._weston_env = self._setup_testing_environment_for_driver(self._weston) |
| else: |
| # Skip tests if Weston is not available. |
| sys.stderr.write("WARNING: Skipping %d WPE Platform Wayland tests because Weston couldn't be found.\n" % number_of_wpe_platform_wayland_tests) |
| self._tests = [test for test in self._tests if not self.is_wpe_platform_wayland_test(test)] |
| |
| if number_of_webxr_tests > 0: |
| if MonadoDriver.check_driver(self._port): |
| self._monado = MonadoDriver(self._port, worker_number=0, pixel_tests=False, no_timeout=True) |
| self._monado_env = self._monado._setup_environ_for_test() |
| else: |
| # Skip WebXR tests if monado is not available |
| sys.stderr.write("WARNING: Skipping %d WebXR tests because monado couldn't be found.\n" % number_of_webxr_tests) |
| self._tests = [test for test in self._tests if not self.is_webxr_test(test)] |
| |
| number_of_executed_tests = len(self._tests) |
| |
| crashed_tests = {} |
| failed_tests = {} |
| timed_out_tests = {} |
| passed_tests = {} |
| try: |
| for test in self._tests: |
| subtests = self._getsubtests_to_run_for_test(test) |
| if UNKNOWN_CRASH_STR in subtests: |
| subtests = [] # The binary runner can't run a subtest named UNKNOWN_CRASH_STR, so run all subtests if this is requested. |
| skipped_subtests = self._test_cases_to_skip(test) |
| number_of_total_tests += len(skipped_subtests if not subtests else set(skipped_subtests).intersection(subtests)) |
| results = self._run_test(test, subtests, skipped_subtests) |
| number_of_executed_subtests_for_test = len(results) |
| if number_of_executed_subtests_for_test > 0: |
| # When adding the subtests to the total, substract 1 because the maintest was initially counted as one subtest. |
| number_of_executed_tests += number_of_executed_subtests_for_test - 1 |
| number_of_total_tests += number_of_executed_subtests_for_test - 1 |
| for test_case, result in results.items(): |
| if result in self._expectations.get_expectation(os.path.basename(test), test_case): |
| continue |
| if result == "FAIL": |
| failed_tests.setdefault(test, []).append(test_case) |
| elif result == "TIMEOUT": |
| timed_out_tests.setdefault(test, []).append(test_case) |
| elif result == "CRASH": |
| crashed_tests.setdefault(test, []).append(test_case) |
| elif result == "PASS": |
| passed_tests.setdefault(test, []).append(test_case) |
| else: |
| # No subtests were emitted, either the test binary didn't exist, or we don't know how to run it, or it crashed. |
| sys.stderr.write("ERROR: %s failed to run, as it didn't emit any subtests.\n" % test) |
| crashed_tests[test] = [UNKNOWN_CRASH_STR] |
| finally: |
| self._tear_down_testing_environment() |
| |
| def number_of_tests(tests): |
| return sum(len(value) for value in tests.values()) |
| |
| def report(tests, title, base_dir): |
| if not tests: |
| return |
| sys.stdout.write("\nUnexpected %s (%d)\n" % (title, number_of_tests(tests))) |
| for test in tests: |
| sys.stdout.write(" %s\n" % (test.replace(base_dir, '', 1))) |
| for test_case in tests[test]: |
| sys.stdout.write(" %s\n" % (test_case)) |
| sys.stdout.flush() |
| |
| report(failed_tests, "failures", self._test_programs_base_dir()) |
| report(crashed_tests, "crashes", self._test_programs_base_dir()) |
| report(timed_out_tests, "timeouts", self._test_programs_base_dir()) |
| report(passed_tests, "passes", self._test_programs_base_dir()) |
| |
| def generate_test_list_for_json_output(tests): |
| test_list = [] |
| for test in tests: |
| base_name = self._get_test_short_name(test) |
| for test_case in tests[test]: |
| test_name = "%s:%s" % (base_name, test_case) |
| # FIXME: get output from failed tests |
| test_list.append({"name": test_name, "output": None}) |
| return test_list |
| |
| if self._options.json_output: |
| result_dictionary = {} |
| result_dictionary['Failed'] = generate_test_list_for_json_output(failed_tests) |
| result_dictionary['Crashed'] = generate_test_list_for_json_output(crashed_tests) |
| result_dictionary['Timedout'] = generate_test_list_for_json_output(timed_out_tests) |
| self._port.host.filesystem.write_text_file(self._options.json_output, json.dumps(result_dictionary, indent=4)) |
| |
| number_of_failed_tests = number_of_tests(failed_tests) + number_of_tests(timed_out_tests) + number_of_tests(crashed_tests) |
| number_of_successful_tests = number_of_executed_tests - number_of_failed_tests |
| |
| sys.stdout.write("\nRan %d tests of %d with %d successful\n" % (number_of_executed_tests, number_of_total_tests, number_of_successful_tests)) |
| sys.stdout.flush() |
| |
| return number_of_failed_tests |
| |
| |
| def create_option_parser(): |
| option_parser = optparse.OptionParser(usage='usage: %prog [options] [test MainTest:subtest1 ...]', |
| epilog='When passing the test name you can pass it alone (to run all subtests) ' |
| 'or you can specify specific subtests with the format: ' |
| 'MainTestName1:subtest-name1 MainTestName1:subtest-name2 ...') |
| option_parser.add_option('-r', '--release', |
| action='store_true', dest='release', |
| help='Run in Release') |
| option_parser.add_option('-d', '--debug', |
| action='store_true', dest='debug', |
| help='Run in Debug') |
| option_parser.add_option('--skipped', action='store', dest='skipped_action', |
| choices=['skip', 'ignore', 'only'], default='skip', |
| metavar='skip|ignore|only', |
| help='Specifies how to treat the skipped tests') |
| option_parser.add_option('-t', '--timeout', |
| action='store', type='int', dest='timeout', default=5, |
| help='Time in seconds until a test times out') |
| option_parser.add_option('-l', '--list-tests', |
| action='store_true', dest='list_tests', |
| help='List the tests (main tests) available to run.') |
| option_parser.add_option('--json-output', action='store', default=None, |
| help='Save test results as JSON to file') |
| return option_parser |
| |
| |
| def get_runner_args(argv): |
| runner_args = [] |
| for arg in argv: |
| if (arg == "-d"): |
| runner_args.append("--debug") |
| continue |
| if (arg == "-r"): |
| runner_args.append("--release") |
| continue |
| if (arg == "-t"): |
| runner_args.append("--timeout") |
| continue |
| runner_args.append(arg) |
| return runner_args |