blob: 9ae5ddb15a0ab1b05cd4171017979dde0791c75d [file] [log] [blame]
# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import abc
import contextlib
import datetime as dt
import logging
from typing import (TYPE_CHECKING, Generic, Iterable, List, Optional, Tuple,
TypeVar)
from crossbench.helper.state import State, StateMachine
from crossbench.probes.probe import Probe
from crossbench.probes.probe_context import BaseProbeContext
from crossbench.probes.results import EmptyProbeResult, ProbeResult
from crossbench.runner.result_origin import ResultOrigin
if TYPE_CHECKING:
from crossbench.probes.results import ProbeResultDict
ResultOriginT = TypeVar("ResultOriginT", bound=ResultOrigin)
ProbeContextT = TypeVar("ProbeContextT", bound=BaseProbeContext)
class ProbeContextManager(Generic[ResultOriginT, ProbeContextT], abc.ABC):
def __init__(self, result_origin: ResultOriginT,
probe_results: ProbeResultDict):
self._state = StateMachine(State.INITIAL)
self._origin = result_origin
self._probe_results = probe_results
self._probe_contexts: List[ProbeContextT] = []
# TODO: either prefix timers or use custom duration
self._durations = result_origin.durations
self._exceptions = result_origin.exceptions
@property
def is_ready(self) -> bool:
return self._state == State.READY
@property
def is_running(self) -> bool:
return self._state == State.RUN
def measure(self, name):
return self._origin.measure(name)
@contextlib.contextmanager
def capture(self, label: str, measure: bool = False):
with self._exceptions.capture(label):
if not measure:
yield
else:
with self._origin.durations.measure(label):
yield
@property
def is_success(self) -> bool:
return self._exceptions.is_success
def setup(self, probes: Iterable[Probe], is_dry_run: bool):
self._state.transition(State.INITIAL, to=State.SETUP)
if not is_dry_run:
if not self._setup_probes(tuple(probes), is_dry_run):
return
self._state.transition(State.SETUP, to=State.READY)
def _setup_probes(self, probes: Tuple[Probe, ...], is_dry_run: bool) -> bool:
with self.capture("probes-setup", measure=True):
self._validate_probes(probes)
self._create_contexts(probes)
self._setup_contexts()
if not self.is_success:
self._handle_setup_error(is_dry_run)
return self.is_success
def _validate_probes(self, probes: Tuple[Probe, ...]):
assert not self._probe_contexts, "Wrong probe context initialization order"
probe_set = set()
for probe in probes:
assert probe not in probe_set, (f"Got duplicate probe name={probe.name}")
probe_set.add(probe)
assert probe.is_attached, (
f"Probe {probe.name} is not properly attached to a browser")
def _create_contexts(self, probes: Tuple[Probe, ...]):
for probe in probes:
if probe.PRODUCES_DATA:
self._probe_results[probe] = EmptyProbeResult()
with self.capture(f"{probe.name} get_context"):
if probe_context := self.get_probe_context(probe):
self._probe_contexts.append(probe_context)
def _setup_contexts(self):
for probe_context in self._probe_contexts:
with self.capture(f"probes-setup {probe_context.name}"):
probe_context.setup() # pytype: disable=wrong-arg-types
def _handle_setup_error(self, is_dry_run: bool) -> None:
self._state.transition(State.SETUP, to=State.DONE)
logging.debug("Handling setup error")
assert not self.is_success
# Special handling for crucial runner probes
internal_probe_contexts = [
context for context in self._probe_contexts if context.probe.is_internal
]
self._teardown(internal_probe_contexts, is_dry_run, setup_error=True)
@contextlib.contextmanager
def open(self, is_dry_run: bool):
self._state.transition(State.READY, to=State.RUN)
probe_start_time = dt.datetime.now()
combined_contexts = contextlib.ExitStack()
for probe_context in self._probe_contexts:
probe_context.set_start_time(probe_start_time)
if not is_dry_run:
combined_contexts.enter_context(probe_context.open())
with combined_contexts:
self._durations["probes-start"] = dt.datetime.now() - probe_start_time
yield self
def teardown(self, is_dry_run: bool, setup_error: bool = False) -> None:
self._state.transition(State.READY, State.RUN, to=State.DONE)
with self.measure("probes-teardown"):
self._teardown(self._probe_contexts, is_dry_run, setup_error)
self._probe_contexts = []
def _teardown(self,
probe_contexts: List[ProbeContextT],
is_dry_run: bool,
setup_error: bool = False) -> None:
if setup_error:
assert self._probe_contexts, "Invalid state"
self._state.expect(State.DONE)
logging.debug("PROBE SCOPE TEARDOWN")
if is_dry_run:
return
for probe_context in reversed(probe_contexts):
with self.capture(f"Probe {probe_context.name} teardown", measure=True):
assert probe_context.result_origin == self._origin
probe_results: ProbeResult = probe_context.teardown()
probe = probe_context.probe
if probe_results.is_empty:
logging.warning("Probe did not extract any data. probe=%s in %s",
probe, self._origin)
self._probe_results[probe] = probe_results
@abc.abstractmethod
def get_probe_context(self, probe: Probe) -> Optional[ProbeContextT]:
pass