blob: 3d5e0a9ead903219bfa71659ffad35e9adc496da [file] [log] [blame]
# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import argparse
import datetime as dt
import enum
import inspect
import logging
from typing import (TYPE_CHECKING, Any, Dict, Iterable, List, Optional,
Sequence, Set, Tuple, Type, Union)
from crossbench import cli_helper, compat, exception, helper
from crossbench import path as pth
from crossbench import plt
from crossbench.benchmarks.base import BenchmarkProbeMixin
from crossbench.env import (HostEnvironment, HostEnvironmentConfig,
ValidationMode)
from crossbench.helper.state import BaseState, StateMachine
from crossbench.probes import all as all_probes
from crossbench.probes.internal import ResultsSummaryProbe
from crossbench.probes.probe import Probe, ProbeIncompatibleBrowser
from crossbench.probes.trace_processor.trace_processor import \
TraceProcessorProbe
from crossbench.runner.groups import (BrowserSessionRunGroup, BrowsersRunGroup,
CacheTemperatureRunGroup,
RepetitionsRunGroup, RunThreadGroup,
StoriesRunGroup)
from crossbench.runner.run import Run
from crossbench.runner.timing import Timing
if TYPE_CHECKING:
from crossbench.benchmarks.base import Benchmark
from crossbench.browsers.browser import Browser
from crossbench.stories.story import Story
class RunnerException(exception.MultiException):
pass
@enum.unique
class ThreadMode(compat.StrEnumWithHelp):
NONE = ("none", (
"Execute all browser-sessions sequentially, default. "
"Low interference risk, use for worry-free time-critical measurements."))
PLATFORM = ("platform", (
"Execute browser-sessions from each platform in parallel threads. "
"Might cause some interference with probes that do heavy "
"post-processing."))
BROWSER = ("browser", (
"Execute browser-sessions from each browser in parallel thread. "
"High interference risk, don't use for time-critical measurements."))
SESSION = ("session", (
"Execute run from each browser-session in a parallel thread. "
"High interference risk, don't use for time-critical measurements."))
def group(self, runs: List[Run]) -> List[RunThreadGroup]:
if self == ThreadMode.NONE:
return [RunThreadGroup(runs)]
groups: Dict[Any, List[Run]] = {}
if self == ThreadMode.SESSION:
groups = helper.group_by(
runs, lambda run: run.browser_session, sort_key=None)
elif self == ThreadMode.PLATFORM:
groups = helper.group_by(
runs, lambda run: run.browser_platform, sort_key=None)
elif self == ThreadMode.BROWSER:
groups = helper.group_by(runs, lambda run: run.browser, sort_key=None)
else:
raise ValueError(f"Unexpected thread mode: {self}")
return [
RunThreadGroup(runs, index=index)
for index, runs in enumerate(groups.values())
]
@enum.unique
class RunnerState(BaseState):
INITIAL = enum.auto()
SETUP = enum.auto()
RUNNING = enum.auto()
TEARDOWN = enum.auto()
class Runner:
@classmethod
def get_out_dir(cls,
cwd: pth.LocalPath,
suffix: str = "",
test: bool = False) -> pth.LocalPath:
if test:
return cwd / "results" / "test"
if suffix:
suffix = "_" + suffix
return (cwd / "results" /
f"{dt.datetime.now().strftime('%Y-%m-%d_%H%M%S')}{suffix}")
@classmethod
def add_cli_parser(
cls, benchmark_cls: Type[Benchmark],
parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
parser.add_argument(
"--repetitions",
"--repeat",
"--invocations",
"-r",
default=1,
type=cli_helper.parse_positive_int,
help=("Number of times each benchmark story is repeated. "
"Defaults to 1. "
"Metrics are aggregated over multiple repetitions"))
parser.add_argument(
"--warmup-repetitions",
"--warmups",
default=0,
type=cli_helper.parse_positive_zero_int,
help=("Number of times each benchmark story is repeated for warmup. "
"Defaults to 0. "
"Metrics for warmup-repetitions are discarded."))
parser.add_argument(
"--cache-temperatures",
default=["default"],
const=["cold", "warm", "hot"],
action="store_const",
help=("Repeat each run with different cache temperatures without "
"closing the browser in between."))
parser.add_argument(
"--thread-mode",
"--parallel",
default=ThreadMode.NONE,
type=ThreadMode,
help=("Change how Runs are executed.\n" +
ThreadMode.help_text(indent=2)))
out_dir_group = parser.add_argument_group("Output Directory Options")
out_dir_group.add_argument(
"--no-symlinks",
"--nosymlinks",
dest="create_symlinks",
action="store_false",
default=True,
help="Do not create symlinks in the output directory.")
out_dir_xor_group = out_dir_group.add_mutually_exclusive_group()
out_dir_xor_group.add_argument(
"--out-dir",
"--output-directory",
"-o",
type=pth.LocalPath,
help=("Results will be stored in this directory. "
"Defaults to results/${DATE}_${LABEL}"))
out_dir_xor_group.add_argument(
"--label",
"--name",
type=cli_helper.parse_non_empty_str,
default=benchmark_cls.NAME,
help=("Add a name to the default output directory. "
"Defaults to the benchmark name"))
return parser
@classmethod
def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
if args.out_dir:
out_dir = args.out_dir
else:
label = args.label
assert label
root_dir = pth.LocalPath(__file__).parents[2]
out_dir = cls.get_out_dir(root_dir, label)
return {
"out_dir": out_dir,
"browsers": args.browser,
"repetitions": args.repetitions,
"warmup_repetitions": args.warmup_repetitions,
"cache_temperatures": args.cache_temperatures,
"thread_mode": args.thread_mode,
"throw": args.throw,
"create_symlinks": args.create_symlinks,
}
def __init__(self,
out_dir: pth.LocalPath,
browsers: Sequence[Browser],
benchmark: Benchmark,
additional_probes: Iterable[Probe] = (),
platform: plt.Platform = plt.PLATFORM,
env_config: Optional[HostEnvironmentConfig] = None,
env_validation_mode: ValidationMode = ValidationMode.THROW,
repetitions: int = 1,
warmup_repetitions: int = 0,
cache_temperatures: Iterable[str] = ("default",),
timing: Timing = Timing(),
thread_mode: ThreadMode = ThreadMode.NONE,
throw: bool = False,
create_symlinks: bool = True):
self._state = StateMachine(RunnerState.INITIAL)
self.out_dir = out_dir
assert not self.out_dir.exists(), f"out_dir={self.out_dir} exists already"
self.out_dir.mkdir(parents=True)
self._timing = timing
self._browsers: Tuple[Browser, ...] = tuple(browsers)
self._validate_browsers()
self._benchmark = benchmark
self._stories = tuple(benchmark.stories)
self._repetitions = cli_helper.parse_positive_int(repetitions,
"repetitions")
self._warmup_repetitions = cli_helper.parse_positive_zero_int(
warmup_repetitions, "warmup repetitions")
self._cache_temperatures: Tuple[str, ...] = tuple(cache_temperatures)
self._probes: List[Probe] = []
self._default_probes: List[Probe] = []
# Contains both measure and warmup runs:
self._all_runs: List[Run] = []
self._measured_runs: List[Run] = []
self._thread_mode = thread_mode
self._exceptions = exception.Annotator(throw)
self._platform = platform
self._env = HostEnvironment(
self, # pytype: disable=wrong-arg-types
env_config,
env_validation_mode)
self._attach_default_probes(additional_probes)
self._prepare_benchmark()
self._cache_temperature_groups: Tuple[CacheTemperatureRunGroup, ...] = ()
self._repetitions_groups: Tuple[RepetitionsRunGroup, ...] = ()
self._story_groups: Tuple[StoriesRunGroup, ...] = ()
self._browser_group: Optional[BrowsersRunGroup] = None
self._create_symlinks: bool = create_symlinks
def _prepare_benchmark(self) -> None:
benchmark_probe_cls: Type[BenchmarkProbeMixin]
for benchmark_probe_cls in self._benchmark.PROBES:
assert inspect.isclass(benchmark_probe_cls), (
f"{self._benchmark}.PROBES must contain classes only, "
f"but got {type(benchmark_probe_cls)}")
assert issubclass(
benchmark_probe_cls,
Probe), (f"Expected Probe class but got {type(benchmark_probe_cls)}")
assert issubclass(benchmark_probe_cls, BenchmarkProbeMixin), (
f"{benchmark_probe_cls} should be BenchmarkProbeMixin "
f"for {type(self._benchmark)}.PROBES")
assert benchmark_probe_cls.NAME, (
f"Expected probe.NAME for {benchmark_probe_cls}")
self.attach_probe(benchmark_probe_cls(benchmark=self._benchmark))
def _validate_browsers(self) -> None:
assert self.browsers, "No browsers provided"
browser_unique_names = [browser.unique_name for browser in self.browsers]
cli_helper.parse_unique_sequence(browser_unique_names, "browser names")
def _attach_default_probes(self, probe_list: Iterable[Probe]) -> None:
assert len(self._probes) == 0
assert len(self._default_probes) == 0
for probe_cls in all_probes.INTERNAL_PROBES:
default_probe: Probe = probe_cls() # pytype: disable=not-instantiable
self.attach_probe(default_probe)
self._default_probes.append(default_probe)
for index, probe in enumerate(probe_list):
assert (not isinstance(probe, TraceProcessorProbe) or index == 0), (
f"TraceProcessorProbe must be first in the list to be able "
f"to process other probes data. Found it at index: {index}")
self.attach_probe(probe)
# Results probe must be first in the list, and thus last to be processed
# so all other probes have data by the time we write the results summary.
assert isinstance(self._probes[0], ResultsSummaryProbe)
def attach_probe(self,
probe: Probe,
matching_browser_only: bool = False) -> Probe:
if probe in self._probes:
raise ValueError(f"Cannot add the same probe twice: {probe.NAME}")
probe_was_used = False
for browser in self.browsers:
try:
probe.validate_browser(self.env, browser)
browser.attach_probe(probe)
probe_was_used = True
except ProbeIncompatibleBrowser as e:
if matching_browser_only:
logging.error("Skipping incompatible probe=%s for browser=%s:",
probe.name, browser.unique_name)
logging.error(" %s", e)
continue
raise
if probe_was_used:
self._probes.append(probe)
return probe
@property
def timing(self) -> Timing:
return self._timing
@property
def cache_temperatures(self) -> Tuple[str, ...]:
return self._cache_temperatures
@property
def browsers(self) -> Tuple[Browser, ...]:
return self._browsers
@property
def stories(self) -> Tuple[Story, ...]:
return self._stories
@property
def probes(self) -> Iterable[Probe]:
return iter(self._probes)
@property
def default_probes(self) -> Iterable[Probe]:
return iter(self._default_probes)
@property
def benchmark(self) -> Benchmark:
return self._benchmark
@property
def repetitions(self) -> int:
return self._repetitions
@property
def warmup_repetitions(self) -> int:
return self._warmup_repetitions
@property
def create_symlinks(self) -> bool:
return self._create_symlinks
@property
def exceptions(self) -> exception.Annotator:
return self._exceptions
@property
def is_success(self) -> bool:
return len(self._measured_runs) > 0 and self._exceptions.is_success
@property
def platform(self) -> plt.Platform:
return self._platform
@property
def env(self) -> HostEnvironment:
return self._env
@property
def platforms(self) -> Set[plt.Platform]:
return set(browser.platform for browser in self.browsers)
@property
def all_runs(self) -> Tuple[Run, ...]:
return tuple(self._all_runs)
@property
def runs(self) -> Tuple[Run, ...]:
return tuple(self._measured_runs)
@property
def cache_temperature_groups(self) -> Tuple[CacheTemperatureRunGroup, ...]:
assert self._cache_temperature_groups, (
f"No CacheTemperatureRunGroup in {self}")
return self._cache_temperature_groups
@property
def repetitions_groups(self) -> Tuple[RepetitionsRunGroup, ...]:
assert self._repetitions_groups, f"No RepetitionsRunGroup in {self}"
return self._repetitions_groups
@property
def story_groups(self) -> Tuple[StoriesRunGroup, ...]:
assert self._story_groups, f"No StoriesRunGroup in {self}"
return self._story_groups
@property
def browser_group(self) -> BrowsersRunGroup:
assert self._browser_group, f"No BrowsersRunGroup in {self}"
return self._browser_group
@property
def has_browser_group(self) -> bool:
return self._browser_group is not None
def wait(self,
time: Union[int, float, dt.timedelta],
absolute_time: bool = False) -> None:
if not absolute_time:
delta = self.timing.timedelta(time)
else:
if isinstance(time, (int, float)):
delta = dt.timedelta(seconds=time)
else:
delta = time
self._platform.sleep(delta)
def run(self, is_dry_run: bool = False) -> None:
self._state.expect(RunnerState.INITIAL)
with helper.SystemSleepPreventer():
with self._exceptions.annotate("Preparing"):
self._setup()
with self._exceptions.annotate("Running"):
self._run(is_dry_run)
if self._exceptions.throw:
# Ensure that we bail out on the first exception.
self.assert_successful_sessions_and_runs()
if not is_dry_run:
self._teardown()
self.assert_successful_sessions_and_runs()
def _setup(self) -> None:
self._state.transition(RunnerState.INITIAL, to=RunnerState.SETUP)
logging.info("-" * 80)
logging.info("SETUP")
logging.info("-" * 80)
assert self.repetitions > 0, (
f"Invalid repetitions count: {self.repetitions}")
assert self.browsers, "No browsers provided: self.browsers is empty"
assert self.stories, "No stories provided: self.stories is empty"
self._setup_browsers()
self._exceptions.assert_success()
with self._exceptions.annotate("Preparing Runs"):
self._all_runs = list(self.get_runs())
assert self._all_runs, f"{type(self)}.get_runs() produced no runs"
logging.info("DISCOVERED %d RUN(S)", len(self._all_runs))
self._measured_runs = [run for run in self._all_runs if not run.is_warmup]
with self._exceptions.capture("Preparing Environment"):
self._env.setup()
with self._exceptions.annotate(
f"Preparing Benchmark: {self._benchmark.NAME}"):
self._benchmark.setup(self) # pytype: disable=wrong-arg-types
def _setup_browsers(self) -> None:
logging.info("PREPARING %d BROWSER(S)", len(self.browsers))
for browser in self.browsers:
with self._exceptions.capture(
f"Preparing browser type={browser.type_name} "
f"unique_name={browser.unique_name}"):
self._setup_browser(browser)
def _setup_browser(self, browser: Browser) -> None:
browser.setup_binary(self) # pytype: disable=wrong-arg-types
for probe in browser.probes:
assert probe in self._probes, (
f"Browser {browser} probe {probe} not in Runner.probes. "
"Use Runner.attach_probe()")
def has_any_live_network(self) -> bool:
for browser in self.browsers:
if browser.network.is_live:
return True
return False
def get_runs(self) -> Iterable[Run]:
index = 0
session_index = 0
throw = self._exceptions.throw
total_repetitions = self.repetitions + self.warmup_repetitions
for repetition in range(total_repetitions):
is_warmup: bool = repetition < self.warmup_repetitions
for story in self.stories:
for browser in self.browsers:
# TODO: implement browser-session start/stop
browser_session = BrowserSessionRunGroup(self, browser, session_index,
self.out_dir, throw)
session_index += 1
for t_index, temperature in enumerate(self.cache_temperatures):
name_parts = [f"story={story.name}"]
if total_repetitions > 1:
name_parts.append(f"repetition={repetition}")
if len(self.cache_temperatures) > 1:
name_parts.append(f"cache={temperature}")
name_parts.append(f"index={index}")
yield self.create_run(
browser_session,
story,
repetition,
is_warmup,
f"{t_index}_{temperature}",
index,
name=", ".join(name_parts),
timeout=self.timing.run_timeout,
throw=throw)
index += 1
browser_session.set_ready()
def create_run(self, browser_session: BrowserSessionRunGroup, story: Story,
repetition: int, is_warmup: bool, temperature: str, index: int,
name: str, timeout: dt.timedelta, throw: bool) -> Run:
return Run(self, browser_session, story, repetition, is_warmup, temperature,
index, name, timeout, throw)
def assert_successful_sessions_and_runs(self) -> None:
failed_runs = list(run for run in self.runs if not run.is_success)
self._exceptions.assert_success(
f"Runs Failed: {len(failed_runs)}/{len(tuple(self.runs))} runs failed.",
RunnerException)
def _get_thread_groups(self) -> List[RunThreadGroup]:
# Also include warmup runs here.
return self._thread_mode.group(self._all_runs)
def _run(self, is_dry_run: bool = False) -> None:
self._state.transition(RunnerState.SETUP, to=RunnerState.RUNNING)
thread_groups: List[RunThreadGroup] = []
with self._exceptions.info("Creating thread groups for all Runs"):
thread_groups = self._get_thread_groups()
group_count = len(thread_groups)
if group_count == 1:
self._run_single_threaded(thread_groups[0])
return
with self._exceptions.annotate(f"Starting {group_count} thread groups."):
for thread_group in thread_groups:
thread_group.is_dry_run = is_dry_run
thread_group.start()
with self._exceptions.annotate(
"Waiting for all thread groups to complete."):
for thread_group in thread_groups:
thread_group.join()
def _run_single_threaded(self, thread_group: RunThreadGroup) -> None:
# Special case single thread groups
with self._exceptions.annotate("Running single thread group"):
thread_group.run()
def _teardown(self) -> None:
self._state.transition(RunnerState.RUNNING, to=RunnerState.TEARDOWN)
logging.info("=" * 80)
logging.info("RUNS COMPLETED")
logging.info("-" * 80)
logging.info("MERGING PROBE DATA")
throw = self._exceptions.throw
logging.debug("MERGING PROBE DATA: cache temperatures")
self._cache_temperature_groups = CacheTemperatureRunGroup.groups(
self._measured_runs, throw)
for cache_temp_group in self._cache_temperature_groups:
cache_temp_group.merge(self)
self._exceptions.extend(cache_temp_group.exceptions, is_nested=True)
logging.debug("MERGING PROBE DATA: repetitions")
self._repetitions_groups = RepetitionsRunGroup.groups(
self._cache_temperature_groups, throw)
for repetition_group in self._repetitions_groups:
repetition_group.merge(self)
self._exceptions.extend(repetition_group.exceptions, is_nested=True)
logging.debug("MERGING PROBE DATA: stories")
self._story_groups = StoriesRunGroup.groups(self._repetitions_groups, throw)
for story_group in self._story_groups:
story_group.merge(self)
self._exceptions.extend(story_group.exceptions, is_nested=True)
logging.debug("MERGING PROBE DATA: browsers")
self._browser_group = BrowsersRunGroup(self._story_groups, throw)
self._browser_group.merge(self)
self._exceptions.extend(self._browser_group.exceptions, is_nested=True)
def cool_down(self) -> None:
# Cool down between runs
if not self._platform.is_thermal_throttled():
return
logging.info("COOLDOWN")
for _ in helper.WaitRange(1, 100).wait_with_backoff(self._platform):
if not self._platform.is_thermal_throttled():
break
logging.info("COOLDOWN: still hot, waiting some more")