blob: 3009d974c35b73c9f325ec48e28adfc5251ca606 [file]
# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import datetime as dt
import logging
import time
from typing import TYPE_CHECKING, Any, Callable, Final, Optional, Sequence, \
cast
from typing_extensions import override
from crossbench.action_runner.base import ActionRunner, \
InputSourceNotImplementedError
from crossbench.action_runner.default_bond_action_runner import \
DefaultBondActionRunner
from crossbench.action_runner.element_not_found_error import \
ElementNotFoundError
from crossbench.browsers.chromium.devtools import \
DevToolsInBrowserClient as DevToolsClient
from crossbench.probes.screenshot import ScreenshotProbe, \
ScreenshotProbeContext
from crossbench.runner.probe_context_lookup_error import \
ProbeContextLookupError
if TYPE_CHECKING:
from crossbench.action_runner.action import all as i_action
from crossbench.action_runner.action.base_probe import BaseProbeAction
from crossbench.action_runner.bond_base import BondActionRunner
from crossbench.action_runner.screenshot_annotation import \
ScreenshotAnnotation
from crossbench.runner.actions import Actions
from crossbench.runner.run import Run
class DefaultActionRunner(ActionRunner):
"""Default action runner that uses JavaScript for most page interactions."""
XPATH_SELECT_ELEMENT: Final[str] = """
let elements = [];
let xpathResult = document.evaluate(arguments[0], document);
let currentElement = xpathResult.iterateNext();
let element = currentElement;
while (currentElement) {
elements.push(currentElement);
currentElement = xpathResult.iterateNext();
}
"""
CSS_SELECT_ELEMENT: Final[str] = """
let elements = document.querySelectorAll(arguments[0]);
let element = elements[0];
"""
CHECK_ELEMENT_EXISTS: Final[str] = """
if (!element) return 0;
"""
ELEMENT_SCROLL_INTO_VIEW: Final[str] = """
element.scrollIntoView({ block: 'nearest' });
"""
CHECK_ELEMENT_RECT: Final[str] = """
const rect = element.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return 0;
"""
ELEMENT_CLICK: Final[str] = """
element.click();
"""
RETURN_SUCCESS: Final[str] = """
return elements.length;
"""
SELECT_WINDOW: Final[str] = """
let elements = [window];
let element = window;
"""
SCROLL_ELEMENT_TO: Final[str] = """
element.scrollTo({top:arguments[1], behavior:'smooth'});
"""
GET_CURRENT_SCROLL_POSITION: Final[str] = """
if (!element) return [0, 0];
return [elements.length, element[arguments[1]]];
"""
_bond: DefaultBondActionRunner | None = None
def get_selector_script(self,
selector: str,
check_element_exists: bool = False,
scroll_into_view: bool = False,
check_element_rect: bool = False,
click: bool = False,
return_on_success: bool = False) -> tuple[str, str]:
# TODO: support more selector types
script: str = ""
prefix = "xpath/"
if selector.startswith(prefix):
selector = selector[len(prefix):]
script = self.XPATH_SELECT_ELEMENT
else:
script = self.CSS_SELECT_ELEMENT
if check_element_exists:
script += self.CHECK_ELEMENT_EXISTS
if scroll_into_view:
script += self.ELEMENT_SCROLL_INTO_VIEW
if check_element_rect:
script += self.CHECK_ELEMENT_RECT
if click:
script += self.ELEMENT_CLICK
if return_on_success:
script += self.RETURN_SUCCESS
return selector, script
@property
@override
def bond(self) -> BondActionRunner:
if not self._bond:
self._bond = DefaultBondActionRunner(self)
return self._bond
@override
def teardown(self) -> None:
if self._bond:
self._bond.teardown()
@override
def get(self, run: Run, action: i_action.GetAction) -> None:
with run.actions(f"Get {action.url}", measure=False) as actions:
with actions.wait_until(action.duration):
actions.show_url(action.url, str(action.target), action.ready_state,
action.timeout)
@override
def click_js(self, run: Run, action: i_action.ClickAction) -> None:
if action.duration > dt.timedelta():
raise InputSourceNotImplementedError(self, action, action.input_source,
"Non-zero duration not implemented")
selector_config = action.position.selector
if not selector_config:
raise RuntimeError("Missing selector")
selector, script = self.get_selector_script(
selector_config.selector,
check_element_exists=True,
scroll_into_view=selector_config.scroll_into_view,
click=True,
return_on_success=True)
with run.actions("ClickAction", measure=False) as actions:
if selector_config.wait:
self.wait_for_element_impl(
actions,
selector=selector_config.selector,
timeout=action.timeout,
required=selector_config.required)
if not actions.js(
script, arguments=[selector]) and selector_config.required:
raise ElementNotFoundError(selector)
if action.verify:
self.wait_for_element_impl(
actions, selector=action.verify, timeout=action.timeout)
@override
def scroll_js(self, run: Run, action: i_action.ScrollAction) -> None:
with run.actions("ScrollAction", measure=False) as actions:
selector = ""
selector_script = self.SELECT_WINDOW
if action.selector:
selector, selector_script = self.get_selector_script(action.selector)
current_scroll_position_script = (
selector_script + self.GET_CURRENT_SCROLL_POSITION)
found_element, initial_scroll_y = actions.js(
current_scroll_position_script,
arguments=[selector,
self._get_scroll_field(bool(action.selector))])
if not found_element:
if action.required:
raise ElementNotFoundError(selector)
return
do_scroll_script = selector_script + self.SCROLL_ELEMENT_TO
duration_s = action.duration.total_seconds()
distance = action.distance
start_time = time.time()
# TODO: use the chrome.gpuBenchmarking.smoothScrollBy extension
# if available.
while True:
time_delta = time.time() - start_time
if time_delta >= duration_s:
break
scroll_y = initial_scroll_y + time_delta / duration_s * distance
actions.js(do_scroll_script, arguments=[selector, scroll_y])
actions.wait(0.2)
scroll_y = initial_scroll_y + distance
actions.js(do_scroll_script, arguments=[selector, scroll_y])
def text_input_js(self, run: Run, action: i_action.TextInputAction) -> None:
with run.actions("TextInput", measure=False) as actions:
if text := action.text:
actions.js(
"document.activeElement.value = arguments[0]", arguments=[text])
else:
raise InputSourceNotImplementedError(self, action, action.input_source)
def wait_for_element_impl(self,
actions: Actions,
selector: str,
timeout: dt.timedelta,
expected_count: int = 1,
or_more: bool = False,
scroll_into_view: bool = False,
check_element_rect: bool = False,
required: bool = True) -> None:
selector, selector_script = self.get_selector_script(
selector=selector,
check_element_exists=True,
scroll_into_view=scroll_into_view,
check_element_rect=check_element_rect,
return_on_success=True)
# TODO: if check_element_rect, we should wait for the position to be the
# same
def _exact_match(js_result: int) -> bool:
return js_result == expected_count
def _or_more_match(js_result: int) -> bool:
return js_result >= expected_count
success_condition = _exact_match
if or_more:
success_condition = _or_more_match
try:
actions.wait_js_condition(
selector_script,
min_interval=0.2,
timeout=timeout,
arguments=(selector,),
success_condition=success_condition)
except (TimeoutError, ValueError) as e:
if required:
raise
logging.debug("Element %s not found: %s", selector, e)
@override
def wait_for_element(self, run: Run,
action: i_action.WaitForElementAction) -> None:
with run.actions("WaitForElementAction", measure=False) as actions:
self.wait_for_element_impl(
actions=actions,
selector=action.selector,
expected_count=action.expected_count,
or_more=action.or_more,
timeout=action.timeout,
check_element_rect=action.check_rect)
@override
def wait_for_condition(self, run: Run,
action: i_action.WaitForConditionAction) -> None:
with run.actions("WaitForConditionAction", measure=False) as actions:
actions.wait_js_condition(
action.condition, min_interval=0.1, timeout=action.timeout)
@override
def wait_for_ready_state(self, run: Run,
action: i_action.WaitForReadyStateAction) -> None:
with run.actions(
f"Wait for ready state {action.ready_state}", measure=False) as actions:
actions.wait_for_ready_state(action.ready_state, action.timeout)
@override
def inject_new_document_script(
self, run: Run, action: i_action.InjectNewDocumentScriptAction) -> None:
run.browser.run_script_on_new_document(action.script)
def open_devtools(self, _run: Run,
action: i_action.OpenDevToolsAction) -> None:
logging.info("Opening DevTools panel '%s'...", action.panel_name)
DevToolsClient().open_frontend(_run.browser, action.panel_name)
@override
def switch_tab(self, run: Run, action: i_action.SwitchTabAction) -> None:
with run.actions("SwitchTabAction", measure=False):
run.browser.switch_tab(action.title, action.url, action.tab_index,
action.relative_tab_index, action.timeout)
@override
def close_tab(self, run: Run, action: i_action.CloseTabAction) -> None:
with run.actions("CloseTabAction", measure=False):
run.browser.close_tab(action.title, action.url, action.tab_index,
action.relative_tab_index, action.timeout)
@override
def close_all_tabs(self, run: Run,
action: i_action.CloseAllTabsAction) -> None:
del action
with run.actions("CloseAllTabsAction", measure=False):
run.browser.close_all_tabs()
def _get_scroll_field(self, has_selector: bool) -> str:
if has_selector:
return "scrollTop"
return "scrollY"
def _rate_limit_keystrokes(
self, run: Run, action: i_action.TextInputAction,
do_type_function: Callable[[Run, Actions, str], Any]) -> None:
action_text = cast(str, action.text)
character_delay_s = (action.duration / len(action_text)).total_seconds()
start_time = time.time()
action_expected_end_time = start_time + action.duration.total_seconds()
with run.actions("TextInput", measure=False) as actions:
# When no duration is specified, input the entire text at once.
if action.duration == dt.timedelta():
do_type_function(run, actions, action_text)
return
character_expected_end_time = start_time
for character in action_text:
character_expected_end_time += character_delay_s
do_type_function(run, actions, character)
expected_end_delta = character_expected_end_time - time.time()
if expected_end_delta > 0:
actions.wait(expected_end_delta)
overrun_time = time.time() - action_expected_end_time
# There will always be a slight overrun due to the overhead of the final
# actions.wait() call, but that is acceptable. Check if the overrun was
# significant.
if overrun_time > 0.01:
logging.warning(
"text_input action is behind schedule! Consider extending this "
"action's duration otherwise the action may timeout.")
@override
def invoke_probe(self, run: Run, action: BaseProbeAction) -> None:
ctx = run.get_probe_context(action.probe_cls)
if ctx is None:
raise ProbeContextLookupError(action.probe_cls)
with run.actions(f"Invoke Probe ({action.probe_cls.NAME})", measure=False):
ctx.invoke(
info_stack=self.info_stack, timeout=action.timeout, **action.kwargs)
@override
def screenshot_impl(
self,
run: Run,
suffix: str,
annotations: Optional[Sequence[ScreenshotAnnotation]] = None) -> None:
# TODO: use invoke_probe helper
ctx = run.get_probe_context(ScreenshotProbe)
if not ctx:
logging.debug("No screenshot probe for screenshot on %s",
repr(self.info_stack))
return
assert isinstance(ctx, ScreenshotProbeContext)
ctx.screenshot("_".join(self.info_stack) + f"_{suffix}", annotations)