| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2024 Igalia S.L. |
| # |
| # This program is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Lesser General Public |
| # License as published by the Free Software Foundation; either |
| # version 2.1 of the License, or (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Lesser General Public License for more details. |
| # |
| # You should have received a copy of the GNU Lesser General Public |
| # License along with this program; if not, write to the |
| # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, |
| # Boston, MA 02110-1301, USA. |
| # pylint: disable=missing-docstring,invalid-name |
| |
| """MVT WebDriver test runner""" |
| |
| import os |
| import sys |
| |
| scripts_dir = os.path.dirname(os.path.abspath(__file__)) |
| if os.path.isdir(os.path.join(scripts_dir, 'webkitpy')): |
| sys.path.insert(0, scripts_dir) |
| import webkitpy |
| |
| import argparse |
| import json |
| import logging |
| import traceback |
| import urllib3 |
| from selenium.webdriver.common.by import By |
| from selenium.webdriver.common.keys import Keys |
| from selenium.webdriver.support.ui import WebDriverWait |
| from selenium.common.exceptions import TimeoutException, WebDriverException |
| from urllib3.exceptions import HTTPError, ReadTimeoutError |
| from webkitpy.common.host import Host |
| |
| top_level_directory = os.path.normpath(os.path.join(os.path.dirname(__file__), '..', '..')) |
| |
| _log = logging.getLogger(__name__) |
| LOG_MESSAGE = 25 |
| TEST_SUITES = [ |
| "codec-support-test", |
| "dash-html5-test", |
| "dash-shaka-test", |
| "dash-dashjs-test", |
| "hls-shaka-test", |
| "hls-hlsjs-test", |
| "hss-html5-test", |
| "hss-dashjs-test", |
| "progressive-html5-test", |
| ] |
| |
| def parse_args(argument_list): |
| |
| def restricted_int_range(min_val, max_val): |
| def validator(x): |
| x = int(x) |
| if min_val <= x <= max_val: |
| return x |
| raise argparse.ArgumentTypeError(f"Value must be between {min_val} and {max_val}") |
| return validator |
| |
| parser = argparse.ArgumentParser( |
| description="Run MVT suite with WebDriver.", |
| epilog=""" |
| This script uses WebDriver to automatically run the MVT |
| test suite, collecting the results in a JSON file. |
| For more info, check https://github.com/rdkcentral/mvt |
| """, |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter |
| ) |
| configuration = parser.add_mutually_exclusive_group(required=True) |
| configuration.add_argument('--debug', action='store_const', const='Debug', dest='configuration', help='Use a Debug build') |
| configuration.add_argument('--release', action='store_const', const='Release', dest='configuration', help='Use a Release build') |
| platform = parser.add_mutually_exclusive_group(required=True) |
| platform.add_argument('--gtk', action='store_const', dest='platform', const='gtk', help='Use the GTK port') |
| platform.add_argument('--wpe', action='store_const', dest='platform', const='wpe', help='Use the WPE port') |
| expectations = parser.add_mutually_exclusive_group(required=False) |
| expectations.add_argument('--update-expectations', action='store_true', help='Update the existing expectations with the new test results') |
| expectations.add_argument('--reset-expectations', action='store_true', help='Forget the existing expectations and create new ones based on this run') |
| parser.add_argument("--mvt-instance-address", default="https://mvt-rdk.igalia.com", help="MVT instance address to use") |
| parser.add_argument("--suite", default=None, choices=TEST_SUITES, help="Specific suite to run. Will run all suites if no suite is specified.") |
| parser.add_argument("--retry-unexpected-failures", type=restricted_int_range(0,2000), help="Number of times to retry each unexpected failure. Pass 0 to disable.", default=3) |
| parser.add_argument("--retry-webdriver-timeouts", type=restricted_int_range(0,2000), help="Number of times to retry when the connection with WebDriver times out. Pass 0 to disable.", default=3) |
| parser.add_argument("--timeout-to-load", type=restricted_int_range(30,2000), help="Number of seconds to wait for load/ready requests.", default=60) |
| parser.add_argument("--timeout-to-run", type=restricted_int_range(120,2000), help="Number of seconds to wait for any MVT test to finish.", default=600) |
| parser.add_argument("--headless", action='store_true', help=argparse.SUPPRESS) # FIXME: this doesn't have any effect, delete it once the build master restarts. |
| parser.add_argument("--display-server", choices=['xvfb', 'xorg', 'weston', 'wayland', 'headless'], default=argparse.SUPPRESS, |
| help='"xvfb": Use a virtualized X11 server. "xorg": Use the current X11 session. ' |
| '"weston": Use a virtualized Weston server. "wayland": Use the current wayland session. ' |
| '"headless": Headless mode in current session. (default: xvfb for gtk and headless for wpe)') |
| parser.add_argument("--browser-name", choices=['MiniBrowser', 'cog'], default='MiniBrowser', help="Select the browser to use via 'run-minibrowser'.") |
| parser.add_argument('--log-level', dest='log_level', choices=['minimal', 'info', 'debug'], default='info') |
| parser.add_argument('--', dest='extra_browser_args', help='Pass extra arguments to the browser (run-minibrowser) after two dashes (--)') |
| |
| # argparse gets confused when parsing several arguments with dashes after '--' |
| # so manually parse that into a list before invoking argparse |
| extra_browser_args = [] |
| if '--' in argument_list: |
| dashes_ind = argument_list.index('--') |
| extra_browser_args = argument_list[dashes_ind+1:] |
| argument_list = argument_list[:dashes_ind] |
| args = parser.parse_args(argument_list) |
| args.extra_browser_args = extra_browser_args |
| if not hasattr(args, 'display_server'): |
| args.display_server = 'xvfb' if args.platform == 'gtk' else 'headless' |
| return args |
| |
| |
| class MVTWebDriverRunner(): |
| |
| def __init__(self, port, platform, configuration, extra_browser_args, mvt_instance_address, browser_name, max_retries_if_timeout, timeout_to_load, timeout_to_run): |
| self.mvt_instance_address = mvt_instance_address |
| if platform == "gtk": |
| from selenium.webdriver import WebKitGTKOptions as WebKitOptions |
| from selenium.webdriver.webkitgtk.service import Service |
| from selenium.webdriver import WebKitGTK as WebKitDriver |
| elif platform == "wpe": |
| from selenium.webdriver import WPEWebKitOptions as WebKitOptions |
| from selenium.webdriver.wpewebkit.service import Service |
| from selenium.webdriver import WPEWebKit as WebKitDriver |
| else: |
| raise NotImplementedError(f"Unknown platform {platform}") |
| self.environ_for_test = self.start_display_server(port) |
| self._max_retries_if_timeout = max_retries_if_timeout |
| self.timeout_to_load = timeout_to_load |
| self.timeout_to_run = timeout_to_run |
| self.driver_class = WebKitDriver |
| self.driver_options = WebKitOptions() |
| self.driver_options.binary_location = os.path.join(top_level_directory, "Tools/Scripts/run-minibrowser") |
| assert(isinstance(extra_browser_args, list)) |
| run_minibrowser_args = [f"--{configuration}", f"--{platform}", "--automation"] + extra_browser_args |
| _log.debug(f'Passing the following extra arguments to run-minibrowser: "{run_minibrowser_args}"') |
| for run_minibrowser_arg in run_minibrowser_args: |
| self.driver_options.add_argument(run_minibrowser_arg) |
| if browser_name: |
| self.driver_options.set_capability('browserName', browser_name) |
| self._run_webdriver_script_path = os.path.join(top_level_directory, "Tools/Scripts/run-webdriver") |
| self.configuration = configuration |
| self.platform = platform |
| self.driver_service_class = Service |
| self.driver_process_group = None |
| self.driver_service = None |
| self.driver = None |
| _log.info(f'Starting WebDriver MVT runner for platform {self.platform.upper()}') |
| |
| def start_display_server(self, port): |
| self._display_driver = port._driver_class()(port, worker_number=0, pixel_tests=False, no_timeout=True) |
| if not self._display_driver.check_driver(port): |
| raise RuntimeError("Failed to check driver %s" % self._display_driver.__class__.__name__) |
| _log.info(f'Using display server: {self._display_driver.__class__.__name__}') |
| return self._display_driver._setup_environ_for_test() |
| |
| def stop_display_server(self): |
| if self._display_driver: |
| self._display_driver.stop() |
| self._display_driver = None |
| |
| def start_driver(self, retry_if_failure_to_start=True): |
| if retry_if_failure_to_start: |
| return self._retry_if_timeout(self.start_driver, None, False) |
| # Start the service (WebDriver) in its own process group to ensure cleaning everything at stop_driver() |
| self.driver_service = self.driver_service_class(executable_path=self._run_webdriver_script_path, env=self.environ_for_test, |
| service_args=[f"--{self.configuration}", f"--{self.platform}"], popen_kw={"process_group": 0}) |
| self.driver = self.driver_class(options=self.driver_options, service=self.driver_service) |
| self.driver.maximize_window() |
| self.driver_process_group = os.getpgid(self.driver_service.process.pid) |
| _log.info(f'Started WebDriver session: {self.driver} with process group {self.driver_process_group}') |
| |
| def stop_driver(self): |
| _log.info(f'Stopping WebDriver session: {self.driver} with process group {self.driver_process_group}') |
| # Kill all the process directly instead of calling self.driver.quit() because: |
| # 1. Selenium driver.quit() will send first the WebDriver command "/shutdown". But if the driver has hanged that will cause extra timeouts. |
| # 2. WebKitGTK/WPE WebDriver does not implement the "/shutdown" command in any case, so it will simply reply with "not supported" and then |
| # Selenium will try to kill it. However, the method implemented here with process groups is more reliable than how Selenium does that. |
| try: |
| if self.driver_process_group: |
| os.killpg(self.driver_process_group, 9) |
| except ProcessLookupError as e: |
| _log.warning('Exception raised when trying to stop the WebDriver: {e}') |
| finally: |
| self.driver = None |
| self.driver_service = None |
| self.driver_process_group = None |
| |
| def reset_driver(self): |
| self.stop_driver() |
| self.start_driver() |
| |
| def _suite_has_loaded(self, driver): |
| return driver.execute_script('return typeof globalRunner !== "undefined" && globalRunner !== null && document.readyState == "complete";') |
| |
| def _suite_has_started(self, driver): |
| return driver.execute_script('return globalRunner.hasOwnProperty("testToRun") && globalRunner.hasOwnProperty("currentTestIdx") && globalRunner.hasOwnProperty("testList");') |
| |
| def _have_all_tests_finished(self, driver): |
| return driver.execute_script('return globalRunner.testToRun == 0;') |
| |
| def _retry_if_timeout(self, func, func_reset, *args, **kwargs): |
| attempt_number = 0 |
| while True: |
| try: |
| return func(*args, **kwargs) |
| except (WebDriverException, HTTPError) as e: |
| who_timed_out = "MVT remote server" if isinstance(e, TimeoutException) else "WebDriver" |
| _log.warning(f'The connection with the {who_timed_out} timed out at {func.__name__}(): {e}.') |
| if attempt_number == self._max_retries_if_timeout: |
| _log.error(f'Maximum number of attempts to retry timeouts reached. Retry by rising --retry-webdriver-timeouts (current value {self._max_retries_if_timeout}). {e}') |
| raise |
| attempt_number += 1 |
| if func_reset: |
| _log.info(f'Trying a restart with {func_reset.__name__}() and then retry running {func.__name__}(): [{attempt_number} of {self._max_retries_if_timeout}]') |
| func_reset() |
| else: |
| _log.info(f'Trying to retry running {func.__name__}(): [{attempt_number} of {self._max_retries_if_timeout}]') |
| |
| def get_tests_from_suite(self, suite, retry_if_timeout=True): |
| if retry_if_timeout: |
| return self._retry_if_timeout(self.get_tests_from_suite, self.reset_driver, suite, False) |
| test_url = f"{self.mvt_instance_address}/?test_type={suite}" |
| self.driver.get(test_url) |
| WebDriverWait(self.driver, self.timeout_to_load).until(self._suite_has_loaded) |
| return self.driver.execute_script("return window.globalRunner.testList.map(item => item.prototype.name);") |
| |
| def run_test_from_suite(self, suite, test_id, test_name, retry_if_timeout=True): |
| if retry_if_timeout: |
| return self._retry_if_timeout(self.run_test_from_suite, self.reset_driver, suite, test_id, test_name, False) |
| test_url = f"{self.mvt_instance_address}/?test_type={suite}&testnumbers={test_id}" |
| self.driver.get(test_url) |
| WebDriverWait(self.driver, self.timeout_to_load).until(self._suite_has_loaded) |
| # Due to browser security policies, there are issues when trying to play |
| # a video with sound when such playback is not trigerred from an input |
| # event that has the isTrusted bit. A mouse click from selenium/webdriver |
| # does not set this bit, but a synthetic press Enter key event does. |
| self.driver.find_element(By.ID, 'run-selected').send_keys(Keys.ENTER) |
| WebDriverWait(self.driver, self.timeout_to_load).until(self._suite_has_started) |
| WebDriverWait(self.driver, self.timeout_to_run).until(self._have_all_tests_finished) |
| results = self.driver.execute_script("return getMvtTestResults();") |
| assert(results['name'] == suite) |
| assert(len(results['tests']) == 1), f"Unexpected number of test results: {len(results['tests'])}" |
| assert(results['tests'][0]['name'] == test_name) |
| return results['tests'][0] |
| |
| |
| class MVTResultsExpectationsParser(): |
| |
| def __init__(self, platform, configuration, should_update_expectations, should_reset_expectations): |
| self._expectations_file_path = os.path.join(top_level_directory, f"Tools/{platform}/MVT_TestExpectations_{configuration}.json") |
| self._expectations_data_dict = {} |
| self._tests_executed_this_run = {} |
| self._new_tests_found_this_run = {} |
| self._results_this_run = {} |
| self.should_update_expectations = should_update_expectations or should_reset_expectations |
| if should_reset_expectations: |
| _log.info(f'Resetting with the result of this run the expectations file at "{self._expectations_file_path}"') |
| elif os.path.isfile(self._expectations_file_path): |
| if self.should_update_expectations: |
| _log.info(f'Using the results of this run to update the expectations file at "{self._expectations_file_path}"') |
| else: |
| _log.info(f'Comparing the results of this run with the expectations file at "{self._expectations_file_path}"') |
| with open(self._expectations_file_path, 'r') as fd: |
| self._expectations_data_dict = json.load(fd) |
| else: |
| _log.warning(f'Expectations file "{self._expectations_file_path}" not found.') |
| _log.info('Pass the flag "--update-expectations" to create it with the results of this run') |
| |
| def _is_unexpected_failure(self, test_suite, test_name, test_result): |
| expectations_for_suite = self._expectations_data_dict.get(test_suite) |
| if expectations_for_suite and test_name in expectations_for_suite: |
| return test_result not in expectations_for_suite[test_name] |
| return test_result != 'passed' |
| |
| def _store_test_results_this_run(self, test_suite, test_name, test_results): |
| if test_suite not in self._results_this_run: |
| self._results_this_run[test_suite] = {} |
| if test_suite not in self._tests_executed_this_run: |
| self._tests_executed_this_run[test_suite] = set() |
| self._tests_executed_this_run[test_suite].add(test_name) |
| # first load current result data for this test if any |
| test_is_tracked = False |
| self._results_this_run[test_suite][test_name] = [] |
| if test_suite in self._expectations_data_dict: |
| if test_name in self._expectations_data_dict[test_suite]: |
| self._results_this_run[test_suite][test_name] = self._expectations_data_dict[test_suite][test_name] |
| test_is_tracked = True |
| # store list of new tests found |
| if not test_is_tracked: |
| if test_suite not in self._new_tests_found_this_run: |
| self._new_tests_found_this_run[test_suite] = set() |
| self._new_tests_found_this_run[test_suite].add(test_name) |
| # Add the new results |
| for test_result in test_results: |
| self._results_this_run[test_suite][test_name].append(test_result) |
| # Remove duplicates |
| self._results_this_run[test_suite][test_name] = sorted(set(self._results_this_run[test_suite][test_name])) |
| |
| def check_for_old_tests_and_maybe_delete_old_expectations(self, test_suite): |
| old_tests = [] |
| if test_suite in self._expectations_data_dict: |
| # Iterate over a copy since we are modifying the original list inside the loop |
| for test_name in list(self._expectations_data_dict[test_suite]): |
| if test_name not in self._tests_executed_this_run[test_suite]: |
| old_tests.append(test_name) |
| del self._expectations_data_dict[test_suite][test_name] |
| return old_tests |
| |
| def check_for_new_tests(self, test_suite): |
| new_tests = [] |
| for test_name in self._tests_executed_this_run[test_suite]: |
| if test_suite in self._expectations_data_dict and test_name not in self._expectations_data_dict[test_suite]: |
| new_tests.append(test_name) |
| return new_tests |
| |
| # This function keeps lists and other objects in-line, pretty-printing only the dicts |
| # For the typical json object used on the MVT expectations this looks better and more compact |
| # that can be achieved by only using json.dumps() |
| def _json_compact_printer(self, obj, indent=4, ilevel=0): |
| if isinstance(obj, dict): |
| dict_str = '{\n' |
| ilevel += 1 |
| sorted_keys = sorted(obj.keys()) |
| for key in sorted_keys: |
| value_repr = v=self._json_compact_printer(obj[key], indent, ilevel) |
| ending = '\n' if key == sorted_keys[-1] else ',\n' |
| dict_str += '{i}{k} : {v}{e}'.format(i=' '*indent*ilevel, k=json.dumps(key), v=value_repr, e=ending) |
| dict_str += '{i}}}'.format(i=' '*indent*(ilevel-1)) |
| return dict_str |
| elif isinstance(obj, list): |
| return json.dumps(sorted(obj)) |
| else: |
| return json.dumps(obj) |
| |
| def maybe_update_expectations(self, did_ran_all_suites): |
| for suite in self._expectations_data_dict: |
| if suite not in self._results_this_run: |
| if did_ran_all_suites: |
| _log.error(f'MVT suite "{suite}" is declared on the expectations file but not longer found.') |
| self._results_this_run[suite] = self._expectations_data_dict[suite] |
| if self.should_update_expectations: |
| with open(self._expectations_file_path, 'w') as fd: |
| fd.write(self._json_compact_printer(self._results_this_run)) |
| fd.write('\n') |
| _log.info(f'Expectations file updated at "{self._expectations_file_path}"') |
| |
| |
| def run_mvt_one_tsuite(mvtwebdriver_runner, mvtresultsexpectations_parser, suite, args_retry_unexpected_failures): |
| unexpected_failures_list = [] |
| unexpected_passes_list = [] |
| expected_failures_list = [] |
| tests_run_list = [] |
| test_list = mvtwebdriver_runner.get_tests_from_suite(suite) |
| total_tests_to_run = len(test_list) |
| for idx in range(0, total_tests_to_run): |
| test_id = idx+1 |
| test_name = test_list[idx] |
| _log.info(f'\n[{suite.upper()}][{test_id}/{total_tests_to_run}] Running test "{test_name}"') |
| test_run_data = mvtwebdriver_runner.run_test_from_suite(suite, test_id, test_name) |
| test_result = test_run_data['status'] |
| test_results_after_retries = [ test_result ] |
| tests_run_list.append(test_name) |
| if mvtresultsexpectations_parser._is_unexpected_failure(suite, test_name, test_result): |
| if test_result == 'passed': |
| _log.info(f'[UNEXPECTED_PASS] {test_name}') |
| unexpected_passes_list.append(test_name) |
| else: |
| _log.warning(f'[UNEXPECTED_FAIL] Test "{test_name}" from "{suite}" has unexpected result: {test_result}') |
| _log.info(test_run_data['log']) |
| found_expected_result_or_pass = False |
| if args_retry_unexpected_failures > 0: |
| for trytestid in range(1, args_retry_unexpected_failures+1): |
| _log.info(f"Retrying to run test {test_name}: Try {trytestid} of {args_retry_unexpected_failures}") |
| test_run_data = mvtwebdriver_runner.run_test_from_suite(suite, test_id, test_name) |
| test_result = test_run_data['status'] |
| test_results_after_retries.append(test_result) |
| if test_result == 'passed': |
| _log.info(f'[FLAKY][PASS] {test_name}.') |
| found_expected_result_or_pass = True |
| break |
| elif not mvtresultsexpectations_parser._is_unexpected_failure(suite, test_name, test_result): |
| _log.info(f'[FLAKY][FAIL][EXPECTED] {test_name}') |
| _log.info(test_entry_repeat['log']) |
| expected_failures_list.append(test_name) |
| found_expected_result_or_pass = True |
| break |
| else: |
| _log.warning(f'[UNEXPECTED_FAIL] Test "{test_name}" from "{suite}" has unexpected result: {test_result}') |
| _log.info(test_run_data['log']) |
| if trytestid < args_retry_unexpected_failures: |
| _log.info('Restarting WebDriver') |
| mvtwebdriver_runner.reset_driver() |
| else: |
| _log.warning(f'Test {test_name} continues to give unexpected result. Maximum number of retries reached. Marking test as unexpected result') |
| if not found_expected_result_or_pass: |
| unexpected_failures_list.append(test_name) |
| elif test_result != 'passed': |
| _log.info(f'[FAIL][EXPECTED] {test_name}') |
| expected_failures_list.append(test_name) |
| else: |
| _log.info(f'[PASS] {test_name}') |
| |
| # Update internal tracking and expectations with this results |
| mvtresultsexpectations_parser._store_test_results_this_run(suite, test_name, test_results_after_retries) |
| |
| # Calculate and log the results of this suite run |
| unexpected_failures = len(unexpected_failures_list) |
| expected_failures = len(expected_failures_list) |
| unexpected_passes = len(unexpected_passes_list) |
| tests_run = len(tests_run_list) |
| exit_code = unexpected_failures |
| tests_pass = tests_run - unexpected_failures - expected_failures |
| suite_summary_header_str = f"[Suite {suite}] Ran {tests_run} tests, of which:" |
| _log.info('-' * len(suite_summary_header_str)) |
| _log.info(suite_summary_header_str) |
| _log.info(f"{'-':>4} {tests_pass} tests passed.") |
| if unexpected_passes > 0: |
| _log.info(f"{'-':>4} {unexpected_passes} tests were unexpected passes:") |
| for test_name in unexpected_passes_list: |
| _log.info(f"{'':>8}{test_name}") |
| if expected_failures > 0: |
| _log.info(f"{'-':>4} {expected_failures} tests were expected failures:") |
| for test_name in expected_failures_list: |
| _log.info(f"{'':>8}{test_name}") |
| if unexpected_failures > 0: |
| _log.info(f"{'-':>4} {unexpected_failures} tests were unexpected failures:") |
| for test_name in unexpected_failures_list: |
| _log.info(f"{'':>8}{test_name}") |
| # Check for old and new tests |
| old_tests_list = mvtresultsexpectations_parser.check_for_old_tests_and_maybe_delete_old_expectations(suite) |
| old_tests = len(old_tests_list) |
| if old_tests > 0: |
| _log.info(f"{'-':>4} {old_tests} tests are OLD tests (tracked in expectations but no longer found):") |
| for test_name in old_tests_list: |
| _log.info(f"{'':>8}{test_name}") |
| new_tests_list = mvtresultsexpectations_parser.check_for_new_tests(suite) |
| new_tests = len(new_tests_list) |
| if new_tests > 0: |
| _log.info(f"{'-':>4} {new_tests} tests are NEW tests (found but not tracked in expectations):") |
| for test_name in new_tests_list: |
| _log.info(f"{'':>8}{test_name}") |
| _log.info('-' * len(suite_summary_header_str) + '\n') |
| return [tests_run_list, unexpected_failures_list, expected_failures_list, unexpected_passes_list, old_tests_list, new_tests_list] |
| |
| def run_mvt_tsuites(mvtwebdriver_runner, mvtresultsexpectations_parser, suites, args_retry_unexpected_failures, will_update_expectations): |
| num_suites = len(suites) |
| test_results = {} |
| for suite in suites: |
| test_results[suite] = run_mvt_one_tsuite(mvtwebdriver_runner, mvtresultsexpectations_parser, suite, args_retry_unexpected_failures) |
| |
| # Only one suite was run so just return the result |
| if num_suites == 1: |
| total_unexpected_failures = len(test_results[suite][1]) |
| return total_unexpected_failures |
| |
| # All suites finished, so print a global summary |
| _log.info('###########') |
| _log.info('# SUMMARY #') |
| _log.info('###########') |
| total_tests_run = 0 |
| total_unexpected_failures = 0 |
| total_unexpected_passes = 0 |
| total_expected_failures = 0 |
| total_old_tests = 0 |
| total_new_tests = 0 |
| for suite in test_results: |
| total_tests_run += len(test_results[suite][0]) |
| total_unexpected_failures += len(test_results[suite][1]) |
| total_expected_failures += len(test_results[suite][2]) |
| total_unexpected_passes += len(test_results[suite][3]) |
| total_old_tests += len(test_results[suite][4]) |
| total_new_tests += len(test_results[suite][5]) |
| total_tests_pass = total_tests_run - total_unexpected_failures - total_expected_failures |
| suites_str = ", ".join(suites) |
| should_advice_update_expectations = False |
| _log.info(f"Executed {num_suites} MVT test suites: {suites_str}") |
| _log.info(f"Ran {total_tests_run} tests in total, of which:") |
| _log.info(f"{'-':>4} {total_tests_pass} tests passed.") |
| if total_unexpected_passes > 0: |
| _log.info(f"{'-':>4} {total_unexpected_passes} tests were unexpected passes.") |
| if total_expected_failures > 0: |
| _log.info(f"{'-':>4} {total_expected_failures} tests were expected failures.") |
| if total_unexpected_failures > 0: |
| _log.info(f"{'-':>4} {total_unexpected_failures} tests were unexpected failures.") |
| should_advice_update_expectations = True |
| if total_old_tests > 0: |
| _log.info(f"{'-':>4} {total_old_tests} tests are OLD tests (tracked in expectations but no longer found).") |
| should_advice_update_expectations = True |
| if total_new_tests > 0: |
| _log.info(f"{'-':>4} {total_new_tests} tests are NEW tests (found but not tracked in expectations).") |
| should_advice_update_expectations = True |
| if should_advice_update_expectations and not will_update_expectations: |
| log.info('Pass the flag "--update-expectations" to update the expectations') |
| return total_unexpected_failures |
| |
| |
| def configure_logging(selected_log_level='info'): |
| class LogHandler(logging.StreamHandler): |
| def __init__(self, stream): |
| super().__init__(stream) |
| |
| def format(self, record): |
| if record.levelno > LOG_MESSAGE: |
| return '%s: %s' % (record.levelname, record.getMessage()) |
| return record.getMessage() |
| |
| logging.addLevelName(LOG_MESSAGE, 'MESSAGE') |
| if selected_log_level == 'debug': |
| log_level = logging.DEBUG |
| elif selected_log_level == 'info': |
| log_level = logging.INFO |
| elif selected_log_level == 'quiet': |
| log_level = logging.NOTSET |
| elif selected_log_level == 'minimal': |
| log_level = logging.getLevelName(LOG_MESSAGE) |
| |
| handler = LogHandler(sys.stdout) |
| logger = logging.getLogger() |
| logger.addHandler(handler) |
| logger.setLevel(log_level) |
| return handler |
| |
| |
| # This monkey-patches urllib3.request to ensure that it runs always with |
| # a timeout. Otherwise sometimes selenium hangs indefinitively. |
| def set_urllib3_request_default_timeout(timeout): |
| original_request = urllib3.request.RequestMethods.request |
| def patched_request(self, method, url, *args, **kwargs): |
| # Only set timeout if not already specified |
| if 'timeout' not in kwargs: |
| kwargs['timeout'] = timeout |
| return original_request(self, method, url, *args, **kwargs) |
| urllib3.request.RequestMethods.request = patched_request |
| |
| |
| def main(argument_list): |
| args = parse_args(argument_list) |
| configure_logging(args.log_level) |
| |
| # Configure the browser type |
| if args.platform == 'wpe': |
| os.environ['WPE_BROWSER'] = args.browser_name |
| elif args.platform == 'gtk': |
| if args.browser_name != 'MiniBrowser': |
| _log.warning('Only browser MiniBrowser is available for platform gtk. Using MiniBrowser') |
| args.browser_name = 'MiniBrowser' |
| else: |
| raise NotImplementedError(f'Unknown platform {args.platform}') |
| |
| # Enable WPE headless mode |
| if args.platform == 'wpe' and args.display_server == 'headless': |
| if args.browser_name == 'MiniBrowser': |
| args.extra_browser_args.append("--headless") |
| elif args.browser_name == 'cog': |
| os.environ['COG_PLATFORM_NAME'] = 'headless' |
| else: |
| raise NotImplementedError(f'Unknown wpe browser: {browser_name}') |
| |
| # Set a default timeout for all the urllib3 requests (avoid hangs) |
| set_urllib3_request_default_timeout(args.timeout_to_load) |
| exit_code = -1 |
| mvtwebdriver_runner = None |
| try: |
| port = Host().port_factory.get(args.platform, args) |
| mvtwebdriver_runner = MVTWebDriverRunner(port, args.platform, args.configuration.lower(), args.extra_browser_args, args.mvt_instance_address, args.browser_name, args.retry_webdriver_timeouts, args.timeout_to_load, args.timeout_to_run) |
| mvtwebdriver_runner.start_driver() |
| mvtresultsexpectations_parser = MVTResultsExpectationsParser(args.platform, args.configuration, args.update_expectations, args.reset_expectations) |
| suites = TEST_SUITES if args.suite is None else [args.suite] |
| exit_code = run_mvt_tsuites(mvtwebdriver_runner, mvtresultsexpectations_parser, suites, args.retry_unexpected_failures, args.timeout_to_load or args.timeout_to_run ) |
| did_ran_all_suites = len(suites) == len(TEST_SUITES) |
| mvtresultsexpectations_parser.maybe_update_expectations(did_ran_all_suites) |
| except ReadTimeoutError as e: |
| _log.error(f'The connection with WebDriver timed out. Check if the browser works fine. Retry by passing --headless or by rising --timeout-to-load (current value {args.timeout_to_load}). {e}') |
| except TimeoutException as e: |
| _log.error(f'The MVT test suite timed out. Check if the remote URL has the expected content. {e}') |
| except KeyboardInterrupt as e: |
| print('KeyboardInterrupt: CTRL-C pressed. Stopping.') |
| except: |
| traceback.print_exc() |
| finally: |
| if mvtwebdriver_runner: |
| mvtwebdriver_runner.stop_driver() |
| mvtwebdriver_runner.stop_display_server() |
| return exit_code |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main(sys.argv[1:])) |