blob: 13a4e8de15462577f1e36c47ad38df2bf41fd31c [file] [log] [blame] [edit]
#!/usr/bin/env python3
#
# Copyright (C) 2024 Igalia S.L.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
# pylint: disable=missing-docstring,invalid-name
"""MVT WebDriver test runner"""
import os
import sys
scripts_dir = os.path.dirname(os.path.abspath(__file__))
if os.path.isdir(os.path.join(scripts_dir, 'webkitpy')):
sys.path.insert(0, scripts_dir)
import webkitpy
import argparse
import json
import logging
import traceback
import urllib3
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import TimeoutException, WebDriverException
from urllib3.exceptions import HTTPError, ReadTimeoutError
from webkitpy.common.host import Host
top_level_directory = os.path.normpath(os.path.join(os.path.dirname(__file__), '..', '..'))
_log = logging.getLogger(__name__)
LOG_MESSAGE = 25
TEST_SUITES = [
"codec-support-test",
"dash-html5-test",
"dash-shaka-test",
"dash-dashjs-test",
"hls-shaka-test",
"hls-hlsjs-test",
"hss-html5-test",
"hss-dashjs-test",
"progressive-html5-test",
]
def parse_args(argument_list):
def restricted_int_range(min_val, max_val):
def validator(x):
x = int(x)
if min_val <= x <= max_val:
return x
raise argparse.ArgumentTypeError(f"Value must be between {min_val} and {max_val}")
return validator
parser = argparse.ArgumentParser(
description="Run MVT suite with WebDriver.",
epilog="""
This script uses WebDriver to automatically run the MVT
test suite, collecting the results in a JSON file.
For more info, check https://github.com/rdkcentral/mvt
""",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
configuration = parser.add_mutually_exclusive_group(required=True)
configuration.add_argument('--debug', action='store_const', const='Debug', dest='configuration', help='Use a Debug build')
configuration.add_argument('--release', action='store_const', const='Release', dest='configuration', help='Use a Release build')
platform = parser.add_mutually_exclusive_group(required=True)
platform.add_argument('--gtk', action='store_const', dest='platform', const='gtk', help='Use the GTK port')
platform.add_argument('--wpe', action='store_const', dest='platform', const='wpe', help='Use the WPE port')
expectations = parser.add_mutually_exclusive_group(required=False)
expectations.add_argument('--update-expectations', action='store_true', help='Update the existing expectations with the new test results')
expectations.add_argument('--reset-expectations', action='store_true', help='Forget the existing expectations and create new ones based on this run')
parser.add_argument("--mvt-instance-address", default="https://mvt-rdk.igalia.com", help="MVT instance address to use")
parser.add_argument("--suite", default=None, choices=TEST_SUITES, help="Specific suite to run. Will run all suites if no suite is specified.")
parser.add_argument("--retry-unexpected-failures", type=restricted_int_range(0,2000), help="Number of times to retry each unexpected failure. Pass 0 to disable.", default=3)
parser.add_argument("--retry-webdriver-timeouts", type=restricted_int_range(0,2000), help="Number of times to retry when the connection with WebDriver times out. Pass 0 to disable.", default=3)
parser.add_argument("--timeout-to-load", type=restricted_int_range(30,2000), help="Number of seconds to wait for load/ready requests.", default=60)
parser.add_argument("--timeout-to-run", type=restricted_int_range(120,2000), help="Number of seconds to wait for any MVT test to finish.", default=600)
parser.add_argument("--headless", action='store_true', help=argparse.SUPPRESS) # FIXME: this doesn't have any effect, delete it once the build master restarts.
parser.add_argument("--display-server", choices=['xvfb', 'xorg', 'weston', 'wayland', 'headless'], default=argparse.SUPPRESS,
help='"xvfb": Use a virtualized X11 server. "xorg": Use the current X11 session. '
'"weston": Use a virtualized Weston server. "wayland": Use the current wayland session. '
'"headless": Headless mode in current session. (default: xvfb for gtk and headless for wpe)')
parser.add_argument("--browser-name", choices=['MiniBrowser', 'cog'], default='MiniBrowser', help="Select the browser to use via 'run-minibrowser'.")
parser.add_argument('--log-level', dest='log_level', choices=['minimal', 'info', 'debug'], default='info')
parser.add_argument('--', dest='extra_browser_args', help='Pass extra arguments to the browser (run-minibrowser) after two dashes (--)')
# argparse gets confused when parsing several arguments with dashes after '--'
# so manually parse that into a list before invoking argparse
extra_browser_args = []
if '--' in argument_list:
dashes_ind = argument_list.index('--')
extra_browser_args = argument_list[dashes_ind+1:]
argument_list = argument_list[:dashes_ind]
args = parser.parse_args(argument_list)
args.extra_browser_args = extra_browser_args
if not hasattr(args, 'display_server'):
args.display_server = 'xvfb' if args.platform == 'gtk' else 'headless'
return args
class MVTWebDriverRunner():
def __init__(self, port, platform, configuration, extra_browser_args, mvt_instance_address, browser_name, max_retries_if_timeout, timeout_to_load, timeout_to_run):
self.mvt_instance_address = mvt_instance_address
if platform == "gtk":
from selenium.webdriver import WebKitGTKOptions as WebKitOptions
from selenium.webdriver.webkitgtk.service import Service
from selenium.webdriver import WebKitGTK as WebKitDriver
elif platform == "wpe":
from selenium.webdriver import WPEWebKitOptions as WebKitOptions
from selenium.webdriver.wpewebkit.service import Service
from selenium.webdriver import WPEWebKit as WebKitDriver
else:
raise NotImplementedError(f"Unknown platform {platform}")
self.environ_for_test = self.start_display_server(port)
self._max_retries_if_timeout = max_retries_if_timeout
self.timeout_to_load = timeout_to_load
self.timeout_to_run = timeout_to_run
self.driver_class = WebKitDriver
self.driver_options = WebKitOptions()
self.driver_options.binary_location = os.path.join(top_level_directory, "Tools/Scripts/run-minibrowser")
assert(isinstance(extra_browser_args, list))
run_minibrowser_args = [f"--{configuration}", f"--{platform}", "--automation"] + extra_browser_args
_log.debug(f'Passing the following extra arguments to run-minibrowser: "{run_minibrowser_args}"')
for run_minibrowser_arg in run_minibrowser_args:
self.driver_options.add_argument(run_minibrowser_arg)
if browser_name:
self.driver_options.set_capability('browserName', browser_name)
self._run_webdriver_script_path = os.path.join(top_level_directory, "Tools/Scripts/run-webdriver")
self.configuration = configuration
self.platform = platform
self.driver_service_class = Service
self.driver_process_group = None
self.driver_service = None
self.driver = None
_log.info(f'Starting WebDriver MVT runner for platform {self.platform.upper()}')
def start_display_server(self, port):
self._display_driver = port._driver_class()(port, worker_number=0, pixel_tests=False, no_timeout=True)
if not self._display_driver.check_driver(port):
raise RuntimeError("Failed to check driver %s" % self._display_driver.__class__.__name__)
_log.info(f'Using display server: {self._display_driver.__class__.__name__}')
return self._display_driver._setup_environ_for_test()
def stop_display_server(self):
if self._display_driver:
self._display_driver.stop()
self._display_driver = None
def start_driver(self, retry_if_failure_to_start=True):
if retry_if_failure_to_start:
return self._retry_if_timeout(self.start_driver, None, False)
# Start the service (WebDriver) in its own process group to ensure cleaning everything at stop_driver()
self.driver_service = self.driver_service_class(executable_path=self._run_webdriver_script_path, env=self.environ_for_test,
service_args=[f"--{self.configuration}", f"--{self.platform}"], popen_kw={"process_group": 0})
self.driver = self.driver_class(options=self.driver_options, service=self.driver_service)
self.driver.maximize_window()
self.driver_process_group = os.getpgid(self.driver_service.process.pid)
_log.info(f'Started WebDriver session: {self.driver} with process group {self.driver_process_group}')
def stop_driver(self):
_log.info(f'Stopping WebDriver session: {self.driver} with process group {self.driver_process_group}')
# Kill all the process directly instead of calling self.driver.quit() because:
# 1. Selenium driver.quit() will send first the WebDriver command "/shutdown". But if the driver has hanged that will cause extra timeouts.
# 2. WebKitGTK/WPE WebDriver does not implement the "/shutdown" command in any case, so it will simply reply with "not supported" and then
# Selenium will try to kill it. However, the method implemented here with process groups is more reliable than how Selenium does that.
try:
if self.driver_process_group:
os.killpg(self.driver_process_group, 9)
except ProcessLookupError as e:
_log.warning('Exception raised when trying to stop the WebDriver: {e}')
finally:
self.driver = None
self.driver_service = None
self.driver_process_group = None
def reset_driver(self):
self.stop_driver()
self.start_driver()
def _suite_has_loaded(self, driver):
return driver.execute_script('return typeof globalRunner !== "undefined" && globalRunner !== null && document.readyState == "complete";')
def _suite_has_started(self, driver):
return driver.execute_script('return globalRunner.hasOwnProperty("testToRun") && globalRunner.hasOwnProperty("currentTestIdx") && globalRunner.hasOwnProperty("testList");')
def _have_all_tests_finished(self, driver):
return driver.execute_script('return globalRunner.testToRun == 0;')
def _retry_if_timeout(self, func, func_reset, *args, **kwargs):
attempt_number = 0
while True:
try:
return func(*args, **kwargs)
except (WebDriverException, HTTPError) as e:
who_timed_out = "MVT remote server" if isinstance(e, TimeoutException) else "WebDriver"
_log.warning(f'The connection with the {who_timed_out} timed out at {func.__name__}(): {e}.')
if attempt_number == self._max_retries_if_timeout:
_log.error(f'Maximum number of attempts to retry timeouts reached. Retry by rising --retry-webdriver-timeouts (current value {self._max_retries_if_timeout}). {e}')
raise
attempt_number += 1
if func_reset:
_log.info(f'Trying a restart with {func_reset.__name__}() and then retry running {func.__name__}(): [{attempt_number} of {self._max_retries_if_timeout}]')
func_reset()
else:
_log.info(f'Trying to retry running {func.__name__}(): [{attempt_number} of {self._max_retries_if_timeout}]')
def get_tests_from_suite(self, suite, retry_if_timeout=True):
if retry_if_timeout:
return self._retry_if_timeout(self.get_tests_from_suite, self.reset_driver, suite, False)
test_url = f"{self.mvt_instance_address}/?test_type={suite}"
self.driver.get(test_url)
WebDriverWait(self.driver, self.timeout_to_load).until(self._suite_has_loaded)
return self.driver.execute_script("return window.globalRunner.testList.map(item => item.prototype.name);")
def run_test_from_suite(self, suite, test_id, test_name, retry_if_timeout=True):
if retry_if_timeout:
return self._retry_if_timeout(self.run_test_from_suite, self.reset_driver, suite, test_id, test_name, False)
test_url = f"{self.mvt_instance_address}/?test_type={suite}&testnumbers={test_id}"
self.driver.get(test_url)
WebDriverWait(self.driver, self.timeout_to_load).until(self._suite_has_loaded)
# Due to browser security policies, there are issues when trying to play
# a video with sound when such playback is not trigerred from an input
# event that has the isTrusted bit. A mouse click from selenium/webdriver
# does not set this bit, but a synthetic press Enter key event does.
self.driver.find_element(By.ID, 'run-selected').send_keys(Keys.ENTER)
WebDriverWait(self.driver, self.timeout_to_load).until(self._suite_has_started)
WebDriverWait(self.driver, self.timeout_to_run).until(self._have_all_tests_finished)
results = self.driver.execute_script("return getMvtTestResults();")
assert(results['name'] == suite)
assert(len(results['tests']) == 1), f"Unexpected number of test results: {len(results['tests'])}"
assert(results['tests'][0]['name'] == test_name)
return results['tests'][0]
class MVTResultsExpectationsParser():
def __init__(self, platform, configuration, should_update_expectations, should_reset_expectations):
self._expectations_file_path = os.path.join(top_level_directory, f"Tools/{platform}/MVT_TestExpectations_{configuration}.json")
self._expectations_data_dict = {}
self._tests_executed_this_run = {}
self._new_tests_found_this_run = {}
self._results_this_run = {}
self.should_update_expectations = should_update_expectations or should_reset_expectations
if should_reset_expectations:
_log.info(f'Resetting with the result of this run the expectations file at "{self._expectations_file_path}"')
elif os.path.isfile(self._expectations_file_path):
if self.should_update_expectations:
_log.info(f'Using the results of this run to update the expectations file at "{self._expectations_file_path}"')
else:
_log.info(f'Comparing the results of this run with the expectations file at "{self._expectations_file_path}"')
with open(self._expectations_file_path, 'r') as fd:
self._expectations_data_dict = json.load(fd)
else:
_log.warning(f'Expectations file "{self._expectations_file_path}" not found.')
_log.info('Pass the flag "--update-expectations" to create it with the results of this run')
def _is_unexpected_failure(self, test_suite, test_name, test_result):
expectations_for_suite = self._expectations_data_dict.get(test_suite)
if expectations_for_suite and test_name in expectations_for_suite:
return test_result not in expectations_for_suite[test_name]
return test_result != 'passed'
def _store_test_results_this_run(self, test_suite, test_name, test_results):
if test_suite not in self._results_this_run:
self._results_this_run[test_suite] = {}
if test_suite not in self._tests_executed_this_run:
self._tests_executed_this_run[test_suite] = set()
self._tests_executed_this_run[test_suite].add(test_name)
# first load current result data for this test if any
test_is_tracked = False
self._results_this_run[test_suite][test_name] = []
if test_suite in self._expectations_data_dict:
if test_name in self._expectations_data_dict[test_suite]:
self._results_this_run[test_suite][test_name] = self._expectations_data_dict[test_suite][test_name]
test_is_tracked = True
# store list of new tests found
if not test_is_tracked:
if test_suite not in self._new_tests_found_this_run:
self._new_tests_found_this_run[test_suite] = set()
self._new_tests_found_this_run[test_suite].add(test_name)
# Add the new results
for test_result in test_results:
self._results_this_run[test_suite][test_name].append(test_result)
# Remove duplicates
self._results_this_run[test_suite][test_name] = sorted(set(self._results_this_run[test_suite][test_name]))
def check_for_old_tests_and_maybe_delete_old_expectations(self, test_suite):
old_tests = []
if test_suite in self._expectations_data_dict:
# Iterate over a copy since we are modifying the original list inside the loop
for test_name in list(self._expectations_data_dict[test_suite]):
if test_name not in self._tests_executed_this_run[test_suite]:
old_tests.append(test_name)
del self._expectations_data_dict[test_suite][test_name]
return old_tests
def check_for_new_tests(self, test_suite):
new_tests = []
for test_name in self._tests_executed_this_run[test_suite]:
if test_suite in self._expectations_data_dict and test_name not in self._expectations_data_dict[test_suite]:
new_tests.append(test_name)
return new_tests
# This function keeps lists and other objects in-line, pretty-printing only the dicts
# For the typical json object used on the MVT expectations this looks better and more compact
# that can be achieved by only using json.dumps()
def _json_compact_printer(self, obj, indent=4, ilevel=0):
if isinstance(obj, dict):
dict_str = '{\n'
ilevel += 1
sorted_keys = sorted(obj.keys())
for key in sorted_keys:
value_repr = v=self._json_compact_printer(obj[key], indent, ilevel)
ending = '\n' if key == sorted_keys[-1] else ',\n'
dict_str += '{i}{k} : {v}{e}'.format(i=' '*indent*ilevel, k=json.dumps(key), v=value_repr, e=ending)
dict_str += '{i}}}'.format(i=' '*indent*(ilevel-1))
return dict_str
elif isinstance(obj, list):
return json.dumps(sorted(obj))
else:
return json.dumps(obj)
def maybe_update_expectations(self, did_ran_all_suites):
for suite in self._expectations_data_dict:
if suite not in self._results_this_run:
if did_ran_all_suites:
_log.error(f'MVT suite "{suite}" is declared on the expectations file but not longer found.')
self._results_this_run[suite] = self._expectations_data_dict[suite]
if self.should_update_expectations:
with open(self._expectations_file_path, 'w') as fd:
fd.write(self._json_compact_printer(self._results_this_run))
fd.write('\n')
_log.info(f'Expectations file updated at "{self._expectations_file_path}"')
def run_mvt_one_tsuite(mvtwebdriver_runner, mvtresultsexpectations_parser, suite, args_retry_unexpected_failures):
unexpected_failures_list = []
unexpected_passes_list = []
expected_failures_list = []
tests_run_list = []
test_list = mvtwebdriver_runner.get_tests_from_suite(suite)
total_tests_to_run = len(test_list)
for idx in range(0, total_tests_to_run):
test_id = idx+1
test_name = test_list[idx]
_log.info(f'\n[{suite.upper()}][{test_id}/{total_tests_to_run}] Running test "{test_name}"')
test_run_data = mvtwebdriver_runner.run_test_from_suite(suite, test_id, test_name)
test_result = test_run_data['status']
test_results_after_retries = [ test_result ]
tests_run_list.append(test_name)
if mvtresultsexpectations_parser._is_unexpected_failure(suite, test_name, test_result):
if test_result == 'passed':
_log.info(f'[UNEXPECTED_PASS] {test_name}')
unexpected_passes_list.append(test_name)
else:
_log.warning(f'[UNEXPECTED_FAIL] Test "{test_name}" from "{suite}" has unexpected result: {test_result}')
_log.info(test_run_data['log'])
found_expected_result_or_pass = False
if args_retry_unexpected_failures > 0:
for trytestid in range(1, args_retry_unexpected_failures+1):
_log.info(f"Retrying to run test {test_name}: Try {trytestid} of {args_retry_unexpected_failures}")
test_run_data = mvtwebdriver_runner.run_test_from_suite(suite, test_id, test_name)
test_result = test_run_data['status']
test_results_after_retries.append(test_result)
if test_result == 'passed':
_log.info(f'[FLAKY][PASS] {test_name}.')
found_expected_result_or_pass = True
break
elif not mvtresultsexpectations_parser._is_unexpected_failure(suite, test_name, test_result):
_log.info(f'[FLAKY][FAIL][EXPECTED] {test_name}')
_log.info(test_entry_repeat['log'])
expected_failures_list.append(test_name)
found_expected_result_or_pass = True
break
else:
_log.warning(f'[UNEXPECTED_FAIL] Test "{test_name}" from "{suite}" has unexpected result: {test_result}')
_log.info(test_run_data['log'])
if trytestid < args_retry_unexpected_failures:
_log.info('Restarting WebDriver')
mvtwebdriver_runner.reset_driver()
else:
_log.warning(f'Test {test_name} continues to give unexpected result. Maximum number of retries reached. Marking test as unexpected result')
if not found_expected_result_or_pass:
unexpected_failures_list.append(test_name)
elif test_result != 'passed':
_log.info(f'[FAIL][EXPECTED] {test_name}')
expected_failures_list.append(test_name)
else:
_log.info(f'[PASS] {test_name}')
# Update internal tracking and expectations with this results
mvtresultsexpectations_parser._store_test_results_this_run(suite, test_name, test_results_after_retries)
# Calculate and log the results of this suite run
unexpected_failures = len(unexpected_failures_list)
expected_failures = len(expected_failures_list)
unexpected_passes = len(unexpected_passes_list)
tests_run = len(tests_run_list)
exit_code = unexpected_failures
tests_pass = tests_run - unexpected_failures - expected_failures
suite_summary_header_str = f"[Suite {suite}] Ran {tests_run} tests, of which:"
_log.info('-' * len(suite_summary_header_str))
_log.info(suite_summary_header_str)
_log.info(f"{'-':>4} {tests_pass} tests passed.")
if unexpected_passes > 0:
_log.info(f"{'-':>4} {unexpected_passes} tests were unexpected passes:")
for test_name in unexpected_passes_list:
_log.info(f"{'':>8}{test_name}")
if expected_failures > 0:
_log.info(f"{'-':>4} {expected_failures} tests were expected failures:")
for test_name in expected_failures_list:
_log.info(f"{'':>8}{test_name}")
if unexpected_failures > 0:
_log.info(f"{'-':>4} {unexpected_failures} tests were unexpected failures:")
for test_name in unexpected_failures_list:
_log.info(f"{'':>8}{test_name}")
# Check for old and new tests
old_tests_list = mvtresultsexpectations_parser.check_for_old_tests_and_maybe_delete_old_expectations(suite)
old_tests = len(old_tests_list)
if old_tests > 0:
_log.info(f"{'-':>4} {old_tests} tests are OLD tests (tracked in expectations but no longer found):")
for test_name in old_tests_list:
_log.info(f"{'':>8}{test_name}")
new_tests_list = mvtresultsexpectations_parser.check_for_new_tests(suite)
new_tests = len(new_tests_list)
if new_tests > 0:
_log.info(f"{'-':>4} {new_tests} tests are NEW tests (found but not tracked in expectations):")
for test_name in new_tests_list:
_log.info(f"{'':>8}{test_name}")
_log.info('-' * len(suite_summary_header_str) + '\n')
return [tests_run_list, unexpected_failures_list, expected_failures_list, unexpected_passes_list, old_tests_list, new_tests_list]
def run_mvt_tsuites(mvtwebdriver_runner, mvtresultsexpectations_parser, suites, args_retry_unexpected_failures, will_update_expectations):
num_suites = len(suites)
test_results = {}
for suite in suites:
test_results[suite] = run_mvt_one_tsuite(mvtwebdriver_runner, mvtresultsexpectations_parser, suite, args_retry_unexpected_failures)
# Only one suite was run so just return the result
if num_suites == 1:
total_unexpected_failures = len(test_results[suite][1])
return total_unexpected_failures
# All suites finished, so print a global summary
_log.info('###########')
_log.info('# SUMMARY #')
_log.info('###########')
total_tests_run = 0
total_unexpected_failures = 0
total_unexpected_passes = 0
total_expected_failures = 0
total_old_tests = 0
total_new_tests = 0
for suite in test_results:
total_tests_run += len(test_results[suite][0])
total_unexpected_failures += len(test_results[suite][1])
total_expected_failures += len(test_results[suite][2])
total_unexpected_passes += len(test_results[suite][3])
total_old_tests += len(test_results[suite][4])
total_new_tests += len(test_results[suite][5])
total_tests_pass = total_tests_run - total_unexpected_failures - total_expected_failures
suites_str = ", ".join(suites)
should_advice_update_expectations = False
_log.info(f"Executed {num_suites} MVT test suites: {suites_str}")
_log.info(f"Ran {total_tests_run} tests in total, of which:")
_log.info(f"{'-':>4} {total_tests_pass} tests passed.")
if total_unexpected_passes > 0:
_log.info(f"{'-':>4} {total_unexpected_passes} tests were unexpected passes.")
if total_expected_failures > 0:
_log.info(f"{'-':>4} {total_expected_failures} tests were expected failures.")
if total_unexpected_failures > 0:
_log.info(f"{'-':>4} {total_unexpected_failures} tests were unexpected failures.")
should_advice_update_expectations = True
if total_old_tests > 0:
_log.info(f"{'-':>4} {total_old_tests} tests are OLD tests (tracked in expectations but no longer found).")
should_advice_update_expectations = True
if total_new_tests > 0:
_log.info(f"{'-':>4} {total_new_tests} tests are NEW tests (found but not tracked in expectations).")
should_advice_update_expectations = True
if should_advice_update_expectations and not will_update_expectations:
log.info('Pass the flag "--update-expectations" to update the expectations')
return total_unexpected_failures
def configure_logging(selected_log_level='info'):
class LogHandler(logging.StreamHandler):
def __init__(self, stream):
super().__init__(stream)
def format(self, record):
if record.levelno > LOG_MESSAGE:
return '%s: %s' % (record.levelname, record.getMessage())
return record.getMessage()
logging.addLevelName(LOG_MESSAGE, 'MESSAGE')
if selected_log_level == 'debug':
log_level = logging.DEBUG
elif selected_log_level == 'info':
log_level = logging.INFO
elif selected_log_level == 'quiet':
log_level = logging.NOTSET
elif selected_log_level == 'minimal':
log_level = logging.getLevelName(LOG_MESSAGE)
handler = LogHandler(sys.stdout)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(log_level)
return handler
# This monkey-patches urllib3.request to ensure that it runs always with
# a timeout. Otherwise sometimes selenium hangs indefinitively.
def set_urllib3_request_default_timeout(timeout):
original_request = urllib3.request.RequestMethods.request
def patched_request(self, method, url, *args, **kwargs):
# Only set timeout if not already specified
if 'timeout' not in kwargs:
kwargs['timeout'] = timeout
return original_request(self, method, url, *args, **kwargs)
urllib3.request.RequestMethods.request = patched_request
def main(argument_list):
args = parse_args(argument_list)
configure_logging(args.log_level)
# Configure the browser type
if args.platform == 'wpe':
os.environ['WPE_BROWSER'] = args.browser_name
elif args.platform == 'gtk':
if args.browser_name != 'MiniBrowser':
_log.warning('Only browser MiniBrowser is available for platform gtk. Using MiniBrowser')
args.browser_name = 'MiniBrowser'
else:
raise NotImplementedError(f'Unknown platform {args.platform}')
# Enable WPE headless mode
if args.platform == 'wpe' and args.display_server == 'headless':
if args.browser_name == 'MiniBrowser':
args.extra_browser_args.append("--headless")
elif args.browser_name == 'cog':
os.environ['COG_PLATFORM_NAME'] = 'headless'
else:
raise NotImplementedError(f'Unknown wpe browser: {browser_name}')
# Set a default timeout for all the urllib3 requests (avoid hangs)
set_urllib3_request_default_timeout(args.timeout_to_load)
exit_code = -1
mvtwebdriver_runner = None
try:
port = Host().port_factory.get(args.platform, args)
mvtwebdriver_runner = MVTWebDriverRunner(port, args.platform, args.configuration.lower(), args.extra_browser_args, args.mvt_instance_address, args.browser_name, args.retry_webdriver_timeouts, args.timeout_to_load, args.timeout_to_run)
mvtwebdriver_runner.start_driver()
mvtresultsexpectations_parser = MVTResultsExpectationsParser(args.platform, args.configuration, args.update_expectations, args.reset_expectations)
suites = TEST_SUITES if args.suite is None else [args.suite]
exit_code = run_mvt_tsuites(mvtwebdriver_runner, mvtresultsexpectations_parser, suites, args.retry_unexpected_failures, args.timeout_to_load or args.timeout_to_run )
did_ran_all_suites = len(suites) == len(TEST_SUITES)
mvtresultsexpectations_parser.maybe_update_expectations(did_ran_all_suites)
except ReadTimeoutError as e:
_log.error(f'The connection with WebDriver timed out. Check if the browser works fine. Retry by passing --headless or by rising --timeout-to-load (current value {args.timeout_to_load}). {e}')
except TimeoutException as e:
_log.error(f'The MVT test suite timed out. Check if the remote URL has the expected content. {e}')
except KeyboardInterrupt as e:
print('KeyboardInterrupt: CTRL-C pressed. Stopping.')
except:
traceback.print_exc()
finally:
if mvtwebdriver_runner:
mvtwebdriver_runner.stop_driver()
mvtwebdriver_runner.stop_display_server()
return exit_code
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))