| # Copyright 2023 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import functools |
| import os |
| import socket |
| import subprocess |
| import sys |
| import threading |
| import time |
| |
| from contextlib import closing |
| from contextlib import contextmanager |
| from typing import List, Optional |
| |
| import attr |
| from chrome.test.variations.drivers import DriverFactory |
| from chrome.test.variations.test_utils import SRC_DIR |
| from selenium import webdriver |
| from selenium.common.exceptions import WebDriverException |
| |
| |
| # The module chromite is under third_party and imported relative to its root. |
| sys.path.append(os.path.join(SRC_DIR, 'third_party')) |
| # The module catapult/telemetry is under third_party and imported relative to |
| # its root. |
| sys.path.append(os.path.join(SRC_DIR, 'third_party', 'catapult', 'telemetry')) |
| |
| from chromite.lib import device, vm |
| from chromite.lib import remote_access |
| from telemetry.core.exceptions import BrowserConnectionGoneException |
| from telemetry.internal.browser import browser_options |
| from telemetry.internal.platform import cros_device |
| from telemetry.internal.platform.cros_platform_backend import CrosPlatformBackend |
| from telemetry.internal.backends.chrome import cros_browser_finder |
| |
| CACHE_DIR = os.path.join(SRC_DIR, "build", "cros_cache") |
| INITIAL_WAIT_TIME_SECONDS = 10 |
| |
| |
| class _PossibleCrOSBrowser(cros_browser_finder.PossibleCrOSBrowser): |
| """The CrOS browser wrapper to filter out start-up args.""" |
| |
| #override |
| def GetBrowserStartupArgs(self, browser_options): |
| startup_args = super().GetBrowserStartupArgs(browser_options) |
| removed_args = [ |
| # This flag disables features from the seed file. We need to remove this |
| # flag so the browser can load the seed file correctly. |
| '--enable-gpu-benchmarking', |
| ] |
| return [arg for arg in startup_args if arg not in removed_args] |
| |
| |
| def _launch_browser(browser_args: List[str]) -> 'Browser': |
| finder_options = browser_options.BrowserFinderOptions() |
| finder_options.browser_type = 'cros-browser' |
| finder_options.verbosity = 2 |
| finder_options.CreateParser().parse_args(args=[]) |
| |
| b_options = finder_options.browser_options |
| b_options.browser_startup_timeout = 15 |
| b_options.AppendExtraBrowserArgs(browser_args) |
| |
| device = cros_device.CrOSDevice( |
| host_name='localhost', |
| ssh_port=9222, |
| ssh_identity=finder_options.ssh_identity, |
| is_local=False) |
| platform = CrosPlatformBackend.CreatePlatformForDevice(device, None) |
| |
| possibleBrowser = _PossibleCrOSBrowser( |
| 'cros-chrome', finder_options, platform, is_guest=False) |
| possibleBrowser.SetUpEnvironment(b_options) |
| |
| try: |
| browser = possibleBrowser.Create() |
| except BrowserConnectionGoneException as e: |
| raise WebDriverException from e |
| return browser |
| |
| |
| def _wait_for_port( |
| port: int, host: str='localhost', timeout:float=5) -> bool: |
| start_time = time.perf_counter() |
| with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: |
| while(time.perf_counter() - start_time <= timeout): |
| if sock.connect_ex((host, port)) == 0: |
| return True |
| else: |
| time.sleep(0.01) |
| return False |
| |
| |
| @attr.attrs() |
| class CrOSDriverFactory(DriverFactory): |
| channel: str = attr.attrib() |
| board: str = attr.attrib() |
| server_port: int = attr.attrib() |
| |
| #override |
| def __attrs_post_init__(self): |
| super().__attrs_post_init__() |
| # We use this to check whether we have started the VM before we attempt to |
| # shut it down. |
| self._vm_started = False |
| |
| def _launch_vm(self) -> vm.VM: |
| parser = vm.VM.GetParser() |
| opts = parser.parse_args([ |
| f'--board={self.board}', |
| f'--cache-dir={CACHE_DIR}', |
| ]) |
| _device = device.Device.Create(opts) |
| |
| # VM will usually be started on a test bot already. |
| if not _device.IsRunning(): |
| _device.Start() |
| return _device |
| |
| def _ssh_forward(self, port: int, server_port: int) -> subprocess.Popen: |
| local_pfs = remote_access.PortForwardSpec(local_port=port) |
| remote_pfs = remote_access.PortForwardSpec(local_port=server_port) |
| tunnel = self.device.remote.agent.CreateTunnel( |
| to_local=[local_pfs], to_remote=[remote_pfs]) |
| if not _wait_for_port(port): |
| return None |
| return tunnel |
| |
| def _copy_seed_file(self, seed_file: str) -> str: |
| assert os.path.exists(seed_file) |
| remote_seed_path = f'/tmp/{os.path.basename(seed_file)}' |
| remote_device = self.device.remote |
| assert remote_device.IsDirWritable('/tmp/'), 'tmp dir not writable' |
| remote_device.CopyToDevice(src=seed_file, |
| dest=remote_seed_path, |
| mode='scp', |
| verbose=True) |
| assert remote_device.IfFileExists(remote_seed_path), ( |
| 'file not pushed to device' |
| ) |
| |
| # The default owner is root, we need to chmod to any user. |
| remote_device.run( |
| ['chmod', 'a+rw', remote_seed_path], remote_sudo=True, print_cmd=True) |
| return remote_seed_path |
| |
| #override |
| @property |
| def supports_startup_timeout(self) -> bool: |
| # ChromeOS is a remote driver that doesn't support browser startup timeout. |
| return False |
| |
| @functools.cached_property |
| def device(self) -> device.Device: |
| device_ = self._launch_vm() |
| self._vm_started = True |
| return device_ |
| |
| @property |
| def vm_started(self) -> bool: |
| return self._vm_started |
| |
| @contextmanager |
| def tunnel_context(self, debugging_port, server_port): |
| tunnel = self._ssh_forward(debugging_port, server_port) |
| if not tunnel: |
| raise WebDriverException(f'Unable to forward port: {debugging_port}') |
| |
| def poll(): |
| stat = tunnel.poll() |
| while stat == None: |
| stat = tunnel.poll() |
| threading.Thread(target=poll).start() |
| |
| try: |
| yield |
| finally: |
| tunnel.terminate() |
| |
| #override |
| @contextmanager |
| def create_driver( |
| self, |
| seed_file: Optional[str] = None, |
| options: Optional[webdriver.ChromeOptions] = None |
| ): |
| # This has a side-effect to boot up the VM if not yet already. |
| assert self.device, "VM fails to boot." |
| |
| browser_args = [ |
| # We need debugging connection via WebSocket with the browser. By default |
| # such connection is blocked so we add this flag to allow it. |
| '--remote-allow-origins=*', |
| ] |
| if seed_file: |
| remote_seed_path = self._copy_seed_file(seed_file) |
| browser_args.extend([ |
| f'--variations-test-seed-path="{remote_seed_path}"', |
| f'--fake-variations-channel={self.channel}', |
| '--disable-variations-safe-mode', |
| '--disable-field-trial-config', |
| # TODO(http://crbug.com/379869158) -- remove this once the new |
| # seed loading mechanism is fixed. |
| '--force-fieldtrials=SeedFileTrial/Default' |
| ]) |
| |
| browser = _launch_browser(browser_args) |
| debugging_port, _ = browser._browser_backend._FindDevToolsPortAndTarget() |
| |
| options = options or self.default_options |
| options.debugger_address=f'localhost:{debugging_port}' |
| |
| |
| with self.tunnel_context(debugging_port, self.server_port): |
| # We want to make sure that the browser window has fully started |
| # on the VM. On LUCI workers it takes more time than in the local |
| # environment, which can cause tests flakiness. |
| time.sleep(INITIAL_WAIT_TIME_SECONDS) |
| driver = webdriver.Chrome(service=self.get_driver_service(), |
| options=options) |
| # VM may not be fully ready before it returns, wait for window handle |
| # to double confirm. |
| self.wait_for_window(driver) |
| try: |
| yield driver |
| finally: |
| driver.quit() |
| |
| def close(self): |
| if self.vm_started and self.device.IsRunning(): |
| self.device.Stop() |
| pass |