| # Copyright 2022 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import annotations |
| |
| import abc |
| import atexit |
| import logging |
| import os |
| import time |
| import traceback |
| from typing import TYPE_CHECKING, Any, List, Optional, Sequence, cast |
| |
| import selenium.common.exceptions |
| import urllib3 |
| from selenium import webdriver |
| from selenium.webdriver.remote.remote_connection import RemoteConnection |
| |
| from crossbench.browsers.attributes import BrowserAttributes |
| from crossbench.browsers.browser import Browser |
| from crossbench.types import JsonDict |
| |
| if TYPE_CHECKING: |
| import datetime as dt |
| |
| from selenium.webdriver.common.timeouts import Timeouts |
| |
| from crossbench.browsers.settings import Settings |
| from crossbench.env import HostEnvironment |
| from crossbench.path import AnyPath, LocalPath |
| from crossbench.runner.groups.session import BrowserSessionRunGroup |
| |
| |
| class DriverException(RuntimeError): |
| """Wrapper for more readable error messages than the default |
| WebDriver exceptions.""" |
| |
| def __init__(self, msg: str, browser: Optional[Browser] = None) -> None: |
| self._browser = browser |
| self._msg = msg |
| super().__init__(msg) |
| |
| def __str__(self) -> str: |
| browser_prefix = "" |
| if self._browser: |
| browser_prefix = f"browser={self._browser}: " |
| return f"{browser_prefix}{self._msg}" |
| |
| |
| class WebDriverBrowser(Browser, metaclass=abc.ABCMeta): |
| _driver: webdriver.Remote |
| _driver_path: Optional[AnyPath] |
| _driver_pid: int |
| _pid: int |
| log_file: Optional[LocalPath] |
| |
| def __init__(self, |
| label: str, |
| path: Optional[AnyPath] = None, |
| settings: Optional[Settings] = None): |
| super().__init__(label, path, settings) |
| self._driver_path = self._settings.driver_path |
| |
| @property |
| def attributes(self) -> BrowserAttributes: |
| return BrowserAttributes.WEBDRIVER |
| |
| @property |
| def driver(self) -> webdriver.Remote: |
| return self._driver |
| |
| @property |
| def driver_log_file(self) -> LocalPath: |
| log_file = self.log_file |
| assert log_file |
| return log_file.with_suffix(".driver.log") |
| |
| def setup_binary(self) -> None: |
| self._driver_path = self.platform.absolute(self._find_driver()) |
| # TODO: support remote chromedriver as well |
| assert self.platform.host_platform.exists(self._driver_path), ( |
| f"Webdriver path '{self._driver_path}' does not exist") |
| |
| @abc.abstractmethod |
| def _find_driver(self) -> AnyPath: |
| pass |
| |
| @abc.abstractmethod |
| def _validate_driver_version(self) -> None: |
| pass |
| |
| def validate_env(self, env: HostEnvironment) -> None: |
| super().validate_env(env) |
| self._validate_driver_version() |
| |
| def start(self, session: BrowserSessionRunGroup) -> None: |
| assert self._driver_path |
| if timeout := self.http_request_timeout: |
| logging.debug("Setting http request timeout to %s", timeout) |
| RemoteConnection.set_timeout(timeout.total_seconds()) |
| try: |
| self._driver = self._start_driver(session, self._driver_path) |
| except selenium.common.exceptions.SessionNotCreatedException as e: |
| msg = e.msg or "Could not create Webdriver session." |
| raise DriverException(msg, self) from e |
| self._is_running = True |
| atexit.register(self.force_quit) |
| self._find_driver_pid() |
| self._set_driver_timeouts(session) |
| self._setup_window() |
| |
| def _find_driver_pid(self) -> None: |
| service = getattr(self._driver, "service", None) |
| if not service: |
| return |
| self._driver_pid = service.process.pid |
| candidates: List[int] = [] |
| for child in self.platform.process_children(self._driver_pid): |
| if str(child["exe"]) == str(self.path): |
| candidates.append(child["pid"]) |
| if len(candidates) == 1: |
| self._pid = candidates[0] |
| else: |
| logging.debug( |
| "Could not find unique browser process for webdriver: %s, got %s", |
| self, candidates) |
| |
| def _set_driver_timeouts(self, session: BrowserSessionRunGroup) -> None: |
| """Adjust the global webdriver timeouts if the runner has custom timeout |
| unit values. |
| If timing.has_no_timeout each value is set to SAFE_MAX_TIMEOUT_TIMEDELTA.""" |
| timing = session.timing |
| if not timing.timeout_unit: |
| return |
| if timing.has_no_timeout: |
| logging.info("Disabling webdriver timeouts") |
| else: |
| factor = timing.timeout_unit.total_seconds() |
| if factor != 1.0: |
| logging.info("Increasing webdriver timeouts by %fx", factor) |
| timeouts: Timeouts = self.driver.timeouts |
| if implicit_wait := getattr(timeouts, "implicit_wait", None): |
| timeouts.implicit_wait = timing.timeout_timedelta( |
| implicit_wait).total_seconds() |
| if script := getattr(timeouts, "script", None): |
| timeouts.script = timing.timeout_timedelta(script).total_seconds() |
| if page_load := getattr(timeouts, "page_load", None): |
| timeouts.page_load = timing.timeout_timedelta(page_load).total_seconds() |
| self.driver.timeouts = timeouts |
| |
| def _setup_window(self) -> None: |
| # Force main window to foreground. |
| self._driver.switch_to.window(self._driver.current_window_handle) |
| if self.viewport.is_headless: |
| return |
| if self.viewport.is_fullscreen: |
| self._driver.fullscreen_window() |
| elif self.viewport.is_maximized: |
| self._driver.maximize_window() |
| else: |
| self._driver.set_window_position(self.viewport.x, self.viewport.y) |
| self._driver.set_window_size(self.viewport.width, self.viewport.height) |
| |
| @abc.abstractmethod |
| def _start_driver(self, session: BrowserSessionRunGroup, |
| driver_path: AnyPath) -> webdriver.Remote: |
| pass |
| |
| def details_json(self) -> JsonDict: |
| details: JsonDict = super().details_json() |
| log = cast(JsonDict, details["log"]) |
| if self.log_file: |
| log["driver"] = os.fspath(self.driver_log_file) |
| return details |
| |
| def show_url(self, url: str, target: Optional[str] = None) -> None: |
| logging.debug("WebDriverBrowser.show_url(%s, %s)", url, target) |
| try: |
| if target in ("_self", None): |
| handles = self._driver.window_handles |
| assert handles, "Browser has no more opened windows." |
| self._driver.switch_to.window(handles[-1]) |
| elif target == "_new_tab": |
| self._driver.switch_to.new_window("tab") |
| elif target == "_new_window": |
| self._driver.switch_to.new_window("window") |
| else: |
| raise RuntimeError(f"unexpected target {target}") |
| self._driver.get(url) |
| except selenium.common.exceptions.WebDriverException as e: |
| if msg := e.msg: |
| self._wrap_webdriver_exception(e, msg, url) |
| raise |
| |
| def switch_to_new_tab(self) -> None: |
| self._driver.switch_to.new_window("tab") |
| |
| def screenshot(self, path: LocalPath) -> None: |
| if not self._driver.get_screenshot_as_file(path.as_posix()): |
| raise DriverException( |
| f"Browser failed to get_screenshot_as_file to file '{path}'", self) |
| |
| def _wrap_webdriver_exception( |
| self, e: selenium.common.exceptions.WebDriverException, msg: str, |
| url: str) -> None: |
| if "net::ERR_CONNECTION_REFUSED" in msg: |
| raise DriverException( |
| f"Browser failed to load URL={url}. The URL is likely unreachable.", |
| self) from e |
| if "net::ERR_INTERNET_DISCONNECTED" in msg: |
| raise DriverException( |
| f"Browser failed to load URL={url}. " |
| f"The device is not connected to the internet.", self) from e |
| |
| def js( |
| self, |
| script: str, |
| timeout: Optional[dt.timedelta] = None, |
| arguments: Sequence[object] = () |
| ) -> Any: |
| logging.debug("WebDriverBrowser.js() timeout=%s, script: %s", timeout, |
| script) |
| assert self._is_running |
| try: |
| if timeout is not None: |
| assert timeout.total_seconds() > 0, ( |
| f"timeout must be a positive number, got: {timeout}") |
| self._driver.set_script_timeout(timeout.total_seconds()) |
| return self._driver.execute_script(script, *arguments) |
| except selenium.common.exceptions.WebDriverException as e: |
| # pylint: disable=raise-missing-from |
| raise ValueError(f"Could not execute JS: {e.msg}") |
| |
| def close_all_tabs(self) -> None: |
| try: |
| all_handles = self._driver.window_handles |
| for handle in all_handles: |
| self._driver.switch_to.window(handle) |
| self._driver.close() |
| except (selenium.common.exceptions.InvalidSessionIdException, |
| urllib3.exceptions.MaxRetryError) as e: |
| logging.debug("%s: Got errors while closing all tabs: {%s}", self, e) |
| |
| def quit(self) -> None: |
| assert self._is_running |
| self.close_all_tabs() |
| self.force_quit() |
| |
| def force_quit(self) -> None: |
| if getattr(self, "_driver", None) is None or not self._is_running: |
| return |
| atexit.unregister(self.force_quit) |
| logging.debug("WebDriverBrowser.force_quit()") |
| try: |
| try: |
| # Close the current window. |
| self._driver.close() |
| time.sleep(0.1) |
| except selenium.common.exceptions.NoSuchWindowException: |
| # No window is good. |
| pass |
| except selenium.common.exceptions.InvalidSessionIdException: |
| # Closing the last tab will close the session as well. |
| return |
| try: |
| self._driver.quit() |
| except selenium.common.exceptions.InvalidSessionIdException: |
| return |
| # Sometimes a second quit is needed, ignore any warnings there |
| try: |
| self._driver.quit() |
| except Exception as e: # pylint: disable=broad-except |
| logging.debug("Driver raised exception on quit: %s\n%s", e, |
| traceback.format_exc()) |
| return |
| except Exception as e: # pylint: disable=broad-except |
| logging.debug("Could not quit browser: %s\n%s", e, traceback.format_exc()) |
| finally: |
| self._is_running = False |
| |
| |
| class RemoteWebDriver(WebDriverBrowser, Browser): |
| """Represent a remote WebDriver that has already been started""" |
| |
| def __init__(self, label: str, driver: webdriver.Remote) -> None: |
| super().__init__(label=label, path=None) |
| self._driver = driver |
| self.version: str = driver.capabilities["browserVersion"] |
| self.major_version: int = int(self.version.split(".")[0]) |
| |
| @property |
| def type_name(self) -> str: |
| return "remote" |
| |
| @property |
| def attributes(self) -> BrowserAttributes: |
| return BrowserAttributes.WEBDRIVER | BrowserAttributes.REMOTE |
| |
| def _validate_driver_version(self) -> None: |
| pass |
| |
| def _extract_version(self) -> str: |
| raise NotImplementedError() |
| |
| def _find_driver(self) -> LocalPath: |
| raise NotImplementedError() |
| |
| def _start_driver(self, session: BrowserSessionRunGroup, |
| driver_path: AnyPath) -> webdriver.Remote: |
| raise NotImplementedError() |
| |
| def setup_binary(self) -> None: |
| pass |
| |
| def start(self, session: BrowserSessionRunGroup) -> None: |
| # Driver has already been started. We just need to mark it as running. |
| self._is_running = True |
| if self.viewport.is_fullscreen: |
| self._driver.fullscreen_window() |
| elif self.viewport.is_maximized: |
| self._driver.maximize_window() |
| else: |
| self._driver.set_window_position(self.viewport.x, self.viewport.y) |
| self._driver.set_window_size(self.viewport.width, self.viewport.height) |
| |
| def quit(self) -> None: |
| # External code that started the driver is responsible for shutting it down. |
| self._is_running = False |