blob: 444bb495b6fac6ed707cb460b2ebd38d232e905e [file]
# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import contextlib
import logging
from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Sequence
from crossbench import exception
from crossbench.action_runner.action_runner_listener import \
ActionRunnerListener
from crossbench.action_runner.bond_base import BondActionRunner
from crossbench.benchmarks.loading.input_source import InputSource
from crossbench.cli import ui
if TYPE_CHECKING:
from crossbench.action_runner.action import all as i_action
from crossbench.action_runner.action.base_probe import BaseProbeAction
from crossbench.action_runner.screenshot_annotation import \
ScreenshotAnnotation
from crossbench.benchmarks.loading.config.pages import ActionBlock
from crossbench.benchmarks.loading.page.base import Page
from crossbench.benchmarks.loading.page.combined import CombinedPage
from crossbench.benchmarks.loading.page.interactive import InteractivePage
from crossbench.benchmarks.loading.tab_controller import TabController
from crossbench.runner.run import Run
class ActionNotImplementedError(NotImplementedError):
def __init__(self,
runner: ActionRunner,
action: i_action.Action,
msg_context: str = "") -> None:
self.runner = runner
self.action = action
if msg_context:
msg_context = f", context: {msg_context}"
message = (f"{str(action.TYPE)}-action "
f"not implemented in {type(runner).__name__}{msg_context}")
super().__init__(message)
class InputSourceNotImplementedError(ActionNotImplementedError):
def __init__(self,
runner: ActionRunner,
action: i_action.Action,
input_source: InputSource,
msg_context: str = "") -> None:
if msg_context:
msg_context = f", context: {msg_context}"
input_source_message = (f"Source {repr(input_source)} "
f"not implemented{msg_context}")
super().__init__(runner, action, input_source_message)
class ActionRunner:
def __init__(self) -> None:
self._listener = ActionRunnerListener()
# TODO: Don't share state across runs
self._info_stack: exception.TInfoStack | None = None
self._step_by_step_mode: bool = False
self._failure_screenshot_annotations: list[ScreenshotAnnotation] = []
def set_step_by_step_mode(self, step_by_step_mode: bool) -> None:
self._step_by_step_mode = step_by_step_mode
def set_listener(self, listener: ActionRunnerListener) -> None:
self._listener = listener
# info_stack is a unique identifier for the currently running or most recently
# run action.
@property
def info_stack(self) -> exception.TInfoStack:
if not self._info_stack:
raise RuntimeError("info_stack can not be called before run_blocks")
return self._info_stack
@property
def bond(self) -> BondActionRunner:
return BondActionRunner()
def run_blocks(self, run: Run, page: InteractivePage,
blocks: Iterable[ActionBlock]) -> None:
for block in blocks:
block.run_with(self, run, page)
def run_block(self, run: Run, block: ActionBlock) -> None:
block_index = block.index
# TODO: Instead maybe just pass context down.
# Or pass unique path to every action __init__
with exception.annotate(f"block {block_index}: {block.label}"):
with self._info_stack_annotate(f"block_{block_index}"):
for action_index, action in enumerate(block, start=1):
with self._info_stack_annotate(f"action_{action_index}"):
with exception.annotate(f"action {action_index}: {str(action)}"):
self._run_action_step(run, action)
def _run_action_step(self, run: Run, action: i_action.Action) -> None:
if self._step_by_step_mode:
logging.critical("[STEP-BY-STEP MODE] Next ste: %s", action.to_json())
ui.prompt("[STEP-BY-STEP MODE] Press Enter to continue")
self._failure_screenshot_annotations = []
action.run_with(run, self)
def wait(self, run: Run, action: i_action.WaitAction) -> None:
with run.actions("WaitAction", measure=False) as actions:
actions.wait(action.duration)
def js(self, run: Run, action: i_action.JsAction) -> None:
with run.actions("JS", measure=False) as actions:
actions.js(action.script, action.timeout)
def click(self, run: Run, action: i_action.ClickAction) -> None:
input_source = action.input_source
if input_source is InputSource.JS:
do_click = self.click_js
elif input_source is InputSource.TOUCH:
do_click = self.click_touch
elif input_source is InputSource.MOUSE:
do_click = self.click_mouse
else:
raise RuntimeError(f"Unsupported input source: '{input_source}'")
for i in range(action.attempts):
try:
do_click(run, action)
return
except Exception as e:
if i + 1 < action.attempts:
logging.warning("Click failed with %d attempts left: %s",
action.attempts - i, e)
continue
raise e
def scroll(self, run: Run, action: i_action.ScrollAction) -> None:
input_source = action.input_source
if input_source is InputSource.JS:
self.scroll_js(run, action)
elif input_source is InputSource.TOUCH:
self.scroll_touch(run, action)
elif input_source is InputSource.MOUSE:
self.scroll_mouse(run, action)
else:
raise RuntimeError(f"Unsupported input source: '{input_source}'")
def get(self, run: Run, action: i_action.GetAction) -> None:
del run
raise ActionNotImplementedError(self, action)
def text_input(self, run: Run, action: i_action.TextInputAction) -> None:
input_source = action.input_source
if input_source is InputSource.KEYBOARD:
self.text_input_keyboard(run, action)
elif input_source is InputSource.JS and not action.keyevent:
self.text_input_js(run, action)
else:
raise RuntimeError(f"Unsupported input source: '{input_source}'")
def click_js(self, run: Run, action: i_action.ClickAction) -> None:
del run
raise InputSourceNotImplementedError(self, action, action.input_source)
def click_touch(self, run: Run, action: i_action.ClickAction) -> None:
del run
raise InputSourceNotImplementedError(self, action, action.input_source)
def click_mouse(self, run: Run, action: i_action.ClickAction) -> None:
del run
raise InputSourceNotImplementedError(self, action, action.input_source)
def scroll_js(self, run: Run, action: i_action.ScrollAction) -> None:
del run
raise InputSourceNotImplementedError(self, action, action.input_source)
def scroll_touch(self, run: Run, action: i_action.ScrollAction) -> None:
del run
raise InputSourceNotImplementedError(self, action, action.input_source)
def scroll_mouse(self, run: Run, action: i_action.ScrollAction) -> None:
del run
raise InputSourceNotImplementedError(self, action, action.input_source)
def text_input_js(self, run: Run, action: i_action.TextInputAction) -> None:
del run
raise InputSourceNotImplementedError(self, action, action.input_source)
def text_input_keyboard(self, run: Run,
action: i_action.TextInputAction) -> None:
del run
raise InputSourceNotImplementedError(self, action, action.input_source)
def swipe(self, run: Run, action: i_action.SwipeAction) -> None:
del run
raise ActionNotImplementedError(self, action)
def wait_for_condition(self, run: Run,
action: i_action.WaitForConditionAction) -> None:
del run
raise ActionNotImplementedError(self, action)
def wait_for_element(self, run: Run,
action: i_action.WaitForElementAction) -> None:
del run
raise ActionNotImplementedError(self, action)
def wait_for_ready_state(self, run: Run,
action: i_action.WaitForReadyStateAction) -> None:
del run
raise ActionNotImplementedError(self, action)
def inject_new_document_script(
self, run: Run, action: i_action.InjectNewDocumentScriptAction) -> None:
del run
raise ActionNotImplementedError(self, action)
def invoke_probe(self, run: Run, action: BaseProbeAction) -> None:
del run
raise ActionNotImplementedError(self, action)
def open_devtools(self, _run: Run,
action: i_action.OpenDevToolsAction) -> None:
del _run
raise ActionNotImplementedError(self, action)
def screenshot_impl(
self,
run: Run,
suffix: str,
annotations: Optional[Sequence[ScreenshotAnnotation]] = None) -> None:
del run, suffix, annotations
raise NotImplementedError("screenshot_impl not implemented")
def add_failure_screenshot_annotation(
self, annotation: ScreenshotAnnotation) -> None:
self._failure_screenshot_annotations.append(annotation)
def failure_screenshot(self, run: Run, suffix: str) -> None:
self.screenshot_impl(run, suffix, self._failure_screenshot_annotations)
def _maybe_navigate_to_about_blank(self, run: Run, page: Page) -> None:
if duration := page.about_blank_duration:
run.browser.show_url("about:blank")
run.runner.wait(duration)
def run_page_multiple_tabs(self, run: Run, tabs: TabController,
pages: Iterable[Page]) -> None:
# TODO: refactor possible logics to TabController.
browser = run.browser
for _ in tabs:
try:
for i, page in enumerate(pages):
# Create a new tab for the multiple_tab case.
if i > 0:
browser.switch_to_new_tab()
self._listener.handle_new_tab(run)
page.run_with(run, self, False)
self._listener.handle_page_run(run)
browser.switch_to_new_tab()
self._listener.handle_new_tab(run)
except Exception as e:
self._listener.handle_error(run, e)
raise
def run_combined_page(self, run: Run, page: CombinedPage,
multiple_tabs: bool) -> None:
if multiple_tabs:
self.run_page_multiple_tabs(run, page.tabs, page.pages)
else:
for sub_page in page.pages:
sub_page.run_with(run, self, False)
def run_interactive_page_once(self, run: Run, page: InteractivePage) -> None:
try:
self.run_blocks(run, page, page.blocks)
self._maybe_navigate_to_about_blank(run, page)
except Exception:
page.create_failure_artifacts(run)
raise
def run_interactive_page(self, run: Run, page: InteractivePage,
multiple_tabs: bool) -> None:
if multiple_tabs:
self.run_page_multiple_tabs(run, page.tabs, [page])
else:
self.run_interactive_page_once(run, page)
def run_login(self, run: Run, page: InteractivePage,
login: ActionBlock) -> None:
with self._management_block_scope(run, page, "login"):
with run.browser.network.traffic_shaper.pause():
login.run_with(self, run, page)
def run_setup(self, run: Run, page: InteractivePage,
setup: ActionBlock) -> None:
with self._management_block_scope(run, page, "setup"):
setup.run_with(self, run, page)
def run_teardown(self, run: Run, page: InteractivePage,
teardown: ActionBlock) -> None:
with self._management_block_scope(run, page, "teardown"):
teardown.run_with(self, run, page)
@contextlib.contextmanager
def playback_iteration(self, i: int) -> Iterator[None]:
assert self._info_stack is None
with self._info_stack_annotate(f"playback_{i}"):
yield
@contextlib.contextmanager
def _info_stack_annotate(self, name: str) -> Iterator[None]:
parent_info_stack = self._info_stack
try:
if self._info_stack is not None:
self._info_stack = self._info_stack + (name,)
else:
self._info_stack = (name,)
yield
finally:
self._info_stack = parent_info_stack
@contextlib.contextmanager
def _management_block_scope(self, run: Run, page: InteractivePage,
name: str) -> Iterator[None]:
try:
with exception.annotate(name):
with self._info_stack_annotate(name):
yield
except Exception:
page.create_failure_artifacts(run, "failure")
raise
def teardown(self) -> None:
pass
def switch_tab(self, run: Run, action: i_action.SwitchTabAction) -> None:
del run
raise ActionNotImplementedError(self, action)
def close_tab(self, run: Run, action: i_action.CloseTabAction) -> None:
del run
raise ActionNotImplementedError(self, action)
def close_all_tabs(self, run: Run,
action: i_action.CloseAllTabsAction) -> None:
del run
raise ActionNotImplementedError(self, action)