blob: 65bed1e07e6bc7a3b4e39a8b13c576d9a773738c [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import argparse
import datetime as dt
import itertools
import json
import logging
import pathlib
import sys
import tempfile
import textwrap
import traceback
from typing import (TYPE_CHECKING, Any, Dict, Iterable, List, Optional,
Sequence, TextIO, Tuple, Type, Union)
import hjson
from tabulate import tabulate
import crossbench.benchmarks.all as benchmarks
from crossbench import helper
from crossbench.benchmarks.base import Benchmark
from crossbench.browsers import all as browsers
from crossbench.browsers.base import convert_flags_to_label
from crossbench.browsers.chrome import ChromeDownloader
from crossbench.cli_helper import existing_file_type, positive_float_type
from crossbench.env import (HostEnvironment, HostEnvironmentConfig,
ValidationMode)
from crossbench.exception import ExceptionAnnotator
from crossbench.flags import ChromeFlags, Flags
from crossbench.probes.all import GENERAL_PURPOSE_PROBES
from crossbench.runner import Runner, Timing
if TYPE_CHECKING:
from crossbench.browsers.base import Browser
from crossbench.probes.base import Probe
BrowserLookupTableT = Dict[str, Tuple[Type[browsers.Browser], pathlib.Path]]
FlagGroupItemT = Optional[Tuple[str, Optional[str]]]
def _map_flag_group_item(flag_name: str,
flag_value: Optional[str]) -> FlagGroupItemT:
if flag_value is None:
return None
if flag_value == "":
return (flag_name, None)
return (flag_name, flag_value)
class ConfigFileError(ValueError):
pass
class FlagGroupConfig:
"""This object corresponds to a flag-group in a configuration file.
It contains mappings from flags to multiple values.
"""
_variants: Dict[str, Iterable[Optional[str]]]
name: str
def __init__(self, name: str,
variants: Dict[str, Union[Iterable[Optional[str]], str]]):
self.name = name
self._variants = {}
for flag_name, flag_variants_or_value in variants.items():
assert flag_name not in self._variants
assert flag_name
if isinstance(flag_variants_or_value, str):
self._variants[flag_name] = (str(flag_variants_or_value),)
else:
assert isinstance(flag_variants_or_value, Iterable)
flag_variants = tuple(flag_variants_or_value)
assert len(flag_variants) == len(set(flag_variants)), (
"Flag variant contains duplicate entries: {flag_variants}")
self._variants[flag_name] = tuple(flag_variants_or_value)
def get_variant_items(self) -> Iterable[Tuple[FlagGroupItemT, ...]]:
for flag_name, flag_values in self._variants.items():
yield tuple(
_map_flag_group_item(flag_name, flag_value)
for flag_value in flag_values)
class BrowserConfig:
@classmethod
def from_cli_args(cls, args: argparse.Namespace) -> BrowserConfig:
browser_config = BrowserConfig()
if args.browser_config:
path = args.browser_config.expanduser()
with path.open(encoding="utf-8") as f:
browser_config.load(f)
else:
browser_config.load_from_args(args)
return browser_config
def __init__(self,
raw_config_data: Optional[Dict] = None,
browser_lookup_override: Optional[BrowserLookupTableT] = None):
self.flag_groups: Dict[str, FlagGroupConfig] = {}
self._variants: List[Browser] = []
self._browser_lookup_override = browser_lookup_override or {}
self._exceptions = ExceptionAnnotator()
if raw_config_data:
self.load_dict(raw_config_data)
@property
def variants(self) -> List[Browser]:
self._exceptions.assert_success(
"Could not create variants from config files: {}", ConfigFileError)
return self._variants
def load(self, f: TextIO) -> None:
with self._exceptions.capture(f"Loading browser config file: {f.name}"):
config = {}
with self._exceptions.info(f"Parsing {hjson.__name__}"):
config = hjson.load(f)
with self._exceptions.info(f"Parsing config file: {f.name}"):
self.load_dict(config)
def load_dict(self, raw_config_data: Dict) -> None:
try:
if "flags" in raw_config_data:
with self._exceptions.info("Parsing config['flags']"):
self._parse_flag_groups(raw_config_data["flags"])
if "browsers" not in raw_config_data:
raise ConfigFileError("Config does not provide a 'browsers' dict.")
if not raw_config_data["browsers"]:
raise ConfigFileError("Config contains empty 'browsers' dict.")
with self._exceptions.info("Parsing config['browsers']"):
self._parse_browsers(raw_config_data["browsers"])
except Exception as e: # pylint: disable=broad-except
self._exceptions.append(e)
def _parse_flag_groups(self, data: Dict[str, Any]) -> None:
for flag_name, group_config in data.items():
with self._exceptions.capture(
f"Parsing flag-group: flags['{flag_name}']"):
self._parse_flag_group(flag_name, group_config)
def _parse_flag_group(self, name: str,
raw_flag_group_data: Dict[str, Any]) -> None:
if name in self.flag_groups:
raise ConfigFileError(f"flag-group flags['{name}'] exists already")
variants: Dict[str, List[str]] = {}
for flag_name, values in raw_flag_group_data.items():
if not flag_name.startswith("-"):
raise ConfigFileError(f"Invalid flag name: '{flag_name}'")
if flag_name not in variants:
flag_values = variants[flag_name] = []
else:
flag_values = variants[flag_name]
if isinstance(values, str):
values = [values]
for value in values:
if value == "None,":
raise ConfigFileError(
f"Please use null instead of None for flag '{flag_name}' ")
# O(n^2) check, assuming very few values per flag.
if value in flag_values:
raise ConfigFileError(
"Same flag variant was specified more than once: "
f"'{value}' for entry '{flag_name}'")
flag_values.append(value)
self.flag_groups[name] = FlagGroupConfig(name, variants)
def _parse_browsers(self, data: Dict[str, Any]) -> None:
for name, browser_config in data.items():
with self._exceptions.info(f"Parsing browsers['{name}']"):
self._parse_browser(name, browser_config)
self._ensure_unique_browser_names()
def _parse_browser(self, name: str, raw_browser_data: Dict[str, Any]) -> None:
path_or_identifier = raw_browser_data["path"]
if path_or_identifier in self._browser_lookup_override:
browser_cls, path = self._browser_lookup_override[path_or_identifier]
else:
path = self._get_browser_path(path_or_identifier)
browser_cls = self._get_browser_cls_from_path(path)
if not path.exists():
raise ConfigFileError(f"browsers['{name}'].path='{path}' does not exist.")
raw_flags = ()
with self._exceptions.info(f"Parsing browsers['{name}'].flags"):
raw_flags = self._parse_flags(name, raw_browser_data)
variants_flags = ()
with self._exceptions.info(
f"Expand browsers['{name}'].flags into full variants"):
variants_flags = tuple(
browser_cls.default_flags(flags) for flags in raw_flags)
logging.info("SELECTED BROWSER: '%s' with %s flag variants:", name,
len(variants_flags))
for i in range(len(variants_flags)):
logging.info(" %s: %s", i, variants_flags[i])
# pytype: disable=not-instantiable
self._variants += [
browser_cls(
label=self._flags_to_label(name, flags), path=path, flags=flags)
for flags in variants_flags
]
# pytype: enable=not-instantiable
def _flags_to_label(self, name: str, flags: Flags) -> str:
return f"{name}_{convert_flags_to_label(*flags.get_list())}"
def _parse_flags(self, name: str, data: Dict[str, Any]):
flags_variants: List[Tuple[FlagGroupItemT, ...]] = []
flag_group_names = data.get("flags", [])
if isinstance(flag_group_names, str):
flag_group_names = [flag_group_names]
if not isinstance(flag_group_names, list):
raise ConfigFileError(f"'flags' is not a list for browser='{name}'")
seen_flag_group_names = set()
for flag_group_name in flag_group_names:
if flag_group_name in seen_flag_group_names:
raise ConfigFileError(
f"Duplicate group name '{flag_group_name}' for browser='{name}'")
seen_flag_group_names.add(flag_group_name)
# Use temporary FlagGroupConfig for inline fixed flag definition
if flag_group_name.startswith("--"):
flag_name, flag_value = Flags.split(flag_group_name)
# No-value-flags produce flag_value == None, convert this to the "" for
# compatibility with the flag variants, where None would mean removing
# the flag.
if flag_value is None:
flag_value = ""
flag_group = FlagGroupConfig("temporary", {flag_name: flag_value})
assert flag_group_name not in self.flag_groups
else:
flag_group = self.flag_groups.get(flag_group_name, None)
if flag_group is None:
raise ConfigFileError(f"group='{flag_group_name}' "
f"for browser='{name}' does not exist.")
flags_variants += flag_group.get_variant_items()
if len(flags_variants) == 0:
# use empty default
return ({},)
# IN: [
# (None, ("--foo", "f1")),
# (("--bar", "b1"), ("--bar", "b2")),
# ]
# OUT: [
# (None, ("--bar", "b1")),
# (None, ("--bar", "b2")),
# (("--foo", "f1"), ("--bar", "b1")),
# (("--foo", "f1"), ("--bar", "b2")),
# ]:
flags_variants = list(itertools.product(*flags_variants))
# IN: [
# (None, None)
# (None, ("--foo", "f1")),
# (("--foo", "f1"), ("--bar", "b1")),
# ]
# OUT: [
# (("--foo", "f1"),),
# (("--foo", "f1"), ("--bar", "b1")),
# ]
#
flags_variants = list(
tuple(flag_item
for flag_item in flags_items
if flag_item is not None)
for flags_items in flags_variants)
assert flags_variants
return flags_variants
def _get_browser_cls_from_path(self, path: pathlib.Path) -> Type[Browser]:
path_str = str(path).lower()
if "safari" in path_str:
return browsers.SafariWebDriver
if "chrome" in path_str:
return browsers.ChromeWebDriver
if "chromium" in path_str:
# TODO: technically this should be ChromiumWebDriver
return browsers.ChromeWebDriver
if "firefox" in path_str:
return browsers.FirefoxWebDriver
if "edge" in path_str:
return browsers.EdgeWebDriver
raise ValueError(f"Unsupported browser='{path}'")
def _ensure_unique_browser_names(self) -> None:
if self._has_unique_variant_names():
return
# Expand to full version names
for browser in self._variants:
browser.unique_name = f"{browser.type}_{browser.version}_{browser.label}"
if self._has_unique_variant_names():
return
logging.info("Got unique browser names and versions, "
"please use --browser-config for more meaningful names")
# Last resort, add index
for index, browser in enumerate(self._variants):
browser.unique_name += f"_{index}"
assert self._has_unique_variant_names()
def _has_unique_variant_names(self) -> bool:
names = [browser.unique_name for browser in self._variants]
unique_names = set(names)
return len(unique_names) == len(names)
def load_from_args(self, args: argparse.Namespace) -> None:
browser_list = args.browser or ["chrome-stable"]
assert isinstance(browser_list, list)
if len(browser_list) != len(set(browser_list)):
raise ValueError(f"Got duplicate --browser arguments: {browser_list}")
for browser in browser_list:
self._append_browser(args, browser)
self._verify_browser_flags(args)
self._ensure_unique_browser_names()
def _verify_browser_flags(self, args: argparse.Namespace) -> None:
chrome_args = {
"--enable-features": args.enable_features,
"--disable-features": args.disable_features,
"--js-flags": args.js_flags
}
for flag_name, value in chrome_args.items():
if not value:
continue
for browser in self._variants:
if not isinstance(browser, browsers.Chromium):
raise ValueError(f"Used chrome/chromium-specific flags {flag_name} "
f"for non-chrome {browser.unique_name}.\n"
"Use --browser-config for complex variants.")
if not args.other_browser_args:
return
browser_types = set(browser.type for browser in self._variants)
if len(browser_types) > 1:
raise ValueError(f"Multiple browser types {browser_types} "
"cannot be used with common extra browser flags: "
f"{args.other_browser_args}.\n"
"Use --browser-config for complex variants.")
def _append_browser(self, args: argparse.Namespace, browser: str) -> None:
assert browser, "Expected non-empty browser name"
path = self._get_browser_path(browser)
browser_cls = self._get_browser_cls_from_path(path)
flags = browser_cls.default_flags()
if issubclass(browser_cls, browsers.Chromium):
assert isinstance(flags, ChromeFlags)
self._init_chrome_flags(args, flags)
for flag_str in args.other_browser_args:
flag_name, flag_value = Flags.split(flag_str)
flags.set(flag_name, flag_value)
label = convert_flags_to_label(*flags.get_list())
browser_instance = browser_cls( # pytype: disable=not-instantiable
label=label,
path=path,
flags=flags)
logging.info("SELECTED BROWSER: name=%s path='%s' ",
browser_instance.unique_name, path)
self._variants.append(browser_instance)
def _init_chrome_flags(self, args: argparse.Namespace,
flags: ChromeFlags) -> None:
if args.enable_features:
for feature in args.enable_features.split(","):
flags.features.enable(feature)
if args.disable_features:
for feature in args.disable_features.split(","):
flags.features.disable(feature)
if args.js_flags:
for js_flag in args.js_flags.split(","):
js_flag_name, js_flag_value = Flags.split(js_flag.lstrip())
flags.js_flags.set(js_flag_name, js_flag_value)
def _get_browser_path(self, path_or_identifier: str) -> pathlib.Path:
identifier = path_or_identifier.lower()
# We're not using a dict-based lookup here, since not all browsers are
# available on all platforms
if identifier in ("chrome", "chrome-stable", "chr-stable"):
return browsers.Chrome.stable_path()
if identifier in ("chrome-beta", "chr-beta"):
return browsers.Chrome.beta_path()
if identifier in ("chrome-dev", "chr-dev"):
return browsers.Chrome.dev_path()
if identifier in ("chrome-canary", "chr-canary"):
return browsers.Chrome.canary_path()
if identifier in ("edge", "edge-stable"):
return browsers.Edge.stable_path()
if identifier == "edge-beta":
return browsers.Edge.beta_path()
if identifier == "edge-dev":
return browsers.Edge.dev_path()
if identifier == "edge-canary":
return browsers.Edge.canary_path()
if identifier in ("safari", "sf"):
return browsers.Safari.default_path()
if identifier in ("safari-technology-preview", "safari-tp", "sf-tp", "tp"):
return browsers.Safari.technology_preview_path()
if identifier in ("firefox", "ff"):
return browsers.Firefox.default_path()
if identifier in ("firefox-dev", "firefox-developer-edition", "ff-dev"):
return browsers.Firefox.developer_edition_path()
if identifier in ("firefox-nightly", "ff-nightly", "ff-trunk"):
return browsers.Firefox.nightly_path()
if ChromeDownloader.VERSION_RE.match(identifier):
return ChromeDownloader.load(identifier)
path = pathlib.Path(path_or_identifier)
if path.exists():
return path
path = path.expanduser()
if path.exists():
return path
if len(path.parts) > 1:
raise ValueError(f"Browser at '{path}' does not exist.")
raise ValueError(
f"Unknown browser path or short name: '{path_or_identifier}'")
class ProbeConfig:
LOOKUP: Dict[str, Type[Probe]] = {
cls.NAME: cls for cls in GENERAL_PURPOSE_PROBES
}
@classmethod
def from_cli_args(cls, args: argparse.Namespace) -> ProbeConfig:
if args.probe_config:
with args.probe_config.open(encoding="utf-8") as f:
return cls.load(f, throw=args.throw)
return cls(args.probe, throw=args.throw)
@classmethod
def load(cls, file: TextIO, throw: bool = False) -> ProbeConfig:
probe_config = cls(throw=throw)
probe_config.load_config_file(file)
return probe_config
def __init__(self,
probe_names_with_args: Optional[Iterable[str]] = None,
throw: bool = False):
self._exceptions = ExceptionAnnotator(throw=throw)
self._probes: List[Probe] = []
if not probe_names_with_args:
return
for probe_name_with_args in probe_names_with_args:
with self._exceptions.capture(f"Parsing --probe={probe_name_with_args}"):
self.add_probe(probe_name_with_args)
@property
def probes(self) -> List[Probe]:
self._exceptions.assert_success("Could not load probes: {}",
ConfigFileError)
return self._probes
def add_probe(self, probe_name_with_args: str) -> None:
# look for "ProbeName{json_key:json_value, ...}"
inline_config = {}
if probe_name_with_args[-1] == "}":
probe_name, json_args = probe_name_with_args.split("{", maxsplit=1)
assert json_args[-1] == "}"
inline_config = hjson.loads("{" + json_args)
else:
# Default case without the additional hjson payload
probe_name = probe_name_with_args
if probe_name not in self.LOOKUP:
self.raise_unknown_probe(probe_name)
with self._exceptions.info(
f"Parsing inline probe config: {probe_name}",
f" Use 'describe probe {probe_name}' for more details"):
probe_cls: Type[Probe] = self.LOOKUP[probe_name]
probe: Probe = probe_cls.from_config(
inline_config, throw=self._exceptions.throw)
self._probes.append(probe)
def load_config_file(self, file: TextIO) -> None:
with self._exceptions.capture(f"Loading probe config file: {file.name}"):
data = None
with self._exceptions.info(f"Parsing {hjson.__name__}"):
data = hjson.load(file)
if not isinstance(data, dict) or "probes" not in data:
raise ValueError(
"Probe config file does not contain a 'probes' dict value.")
self.load_dict(data["probes"])
def load_dict(self, data: Dict[str, Any]) -> None:
for probe_name, config_data in data.items():
with self._exceptions.info(
f"Parsing probe config probes['{probe_name}']"):
if probe_name not in self.LOOKUP:
self.raise_unknown_probe(probe_name)
probe_cls = self.LOOKUP[probe_name]
self._probes.append(probe_cls.from_config(config_data))
def raise_unknown_probe(self, probe_name: str) -> None:
raise ValueError(f"Unknown probe name: '{probe_name}'\n"
f"Options are: {list(self.LOOKUP.keys())}")
def inline_env_config(value: str) -> HostEnvironmentConfig:
if value in HostEnvironment.CONFIGS:
return HostEnvironment.CONFIGS[value]
if value[0] != "{":
raise argparse.ArgumentTypeError(
f"Invalid env config name: '{value}'. "
f"choices = {list(HostEnvironment.CONFIGS.keys())}")
# Assume hjson data
kwargs = None
msg = ""
try:
kwargs = hjson.loads(value)
return HostEnvironmentConfig(**kwargs)
except Exception as e:
msg = f"\n{e}"
raise argparse.ArgumentTypeError(
f"Invalid inline config string: {value}{msg}") from e
def env_config_file(value: str) -> HostEnvironmentConfig:
config_path = existing_file_type(value)
try:
with config_path.open(encoding="utf-8") as f:
data = hjson.load(f)
if "env" not in data:
raise argparse.ArgumentTypeError("No 'env' property found")
kwargs = data["env"]
return HostEnvironmentConfig(**kwargs)
except Exception as e:
msg = f"\n{e}"
raise argparse.ArgumentTypeError(
f"Invalid env config file: {value}{msg}") from e
BenchmarkClsT = Type[Benchmark]
class CrossBenchCLI:
BENCHMARKS: Tuple[Tuple[BenchmarkClsT, Tuple[str, ...]], ...] = (
(benchmarks.Speedometer20Benchmark, ()),
(benchmarks.Speedometer21Benchmark, ("speedometer",)),
(benchmarks.JetStream20Benchmark, ()),
(benchmarks.JetStream21Benchmark, ("jetstream",)),
(benchmarks.MotionMark12Benchmark, ("motionmark",)),
(benchmarks.PageLoadBenchmark, ()),
)
RUNNER_CLS: Type[Runner] = Runner
def __init__(self):
self._subparsers: Dict[BenchmarkClsT, argparse.ArgumentParser] = {}
self.parser = argparse.ArgumentParser()
self._setup_parser()
self._setup_subparser()
def _setup_parser(self) -> None:
self._add_verbosity_argument(self.parser)
# Disable colors by default when piped to a file.
has_color = hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
self.parser.add_argument(
"--no-color",
dest="color",
action="store_false",
default=has_color,
help="Disable colored output")
def _add_verbosity_argument(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"-v",
"--verbose",
dest="verbosity",
action="count",
default=0,
help="Increase output verbosity (0..2)")
def _setup_subparser(self) -> None:
self.subparsers = self.parser.add_subparsers(
title="Subcommands", dest="subcommand", required=True)
for benchmark_cls, alias in self.BENCHMARKS:
self._setup_benchmark_subparser(benchmark_cls, alias)
self._setup_describe_subparser()
def _setup_describe_subparser(self) -> None:
describe_parser = self.subparsers.add_parser(
"describe", aliases=["desc"], help="Print all benchmarks and stories")
describe_parser.add_argument(
"category",
nargs="?",
choices=["all", "benchmark", "benchmarks", "probe", "probes"],
default="all",
help="Limit output to the given category, defaults to 'all'")
describe_parser.add_argument(
"filter",
nargs="?",
help=("Only display the given item from the provided category. "
"By default all items are displayed. "
"Example: describe probes v8.log"))
describe_parser.add_argument("--json",
default=False,
action="store_true",
help="Print the data as json data")
describe_parser.set_defaults(subcommand=self.describe_subcommand)
self._add_verbosity_argument(describe_parser)
def describe_subcommand(self, args: argparse.Namespace) -> None:
benchmarks_data: Dict[str, Any] = {}
for benchmark_cls, aliases in self.BENCHMARKS:
if args.filter and benchmark_cls.NAME != args.filter:
continue
benchmark_info = benchmark_cls.describe()
benchmark_info["aliases"] = aliases or "None"
benchmark_info["help"] = f"See `{benchmark_cls.NAME} --help`"
benchmarks_data[benchmark_cls.NAME] = benchmark_info
data: Dict[str, Dict[str, Any]] = {
"benchmarks": benchmarks_data,
"probes": {
probe_cls.NAME: probe_cls.help_text()
for probe_cls in GENERAL_PURPOSE_PROBES
if not args.filter or probe_cls.NAME == args.filter
}
}
if args.json:
if args.category in ("probe", "probes"):
data = data["probes"]
elif args.category in ("benchmark", "benchmarks"):
data = data["benchmarks"]
else:
assert args.category == "all"
print(json.dumps(data, indent=2))
return
# Create tabular format
if args.category in ("all", "benchmark", "benchmarks"):
table: List[List[Optional[str]]] = [["Benchmark", "Property", "Value"]]
for benchmark_name, values in data["benchmarks"].items():
table.append([benchmark_name, ])
for name, value in values.items():
if isinstance(value, (tuple, list)):
value = "\n".join(value)
elif isinstance(value, dict):
value = tabulate(value.items(), tablefmt="plain")
table.append([None, name, value])
if len(table) > 1:
print(tabulate(table, tablefmt="grid"))
if args.category in ("all", "probe", "probes"):
table = [["Probe", "Help"]]
for probe_name, probe_desc in data["probes"].items():
table.append([probe_name, probe_desc])
if len(table) > 1:
print(tabulate(table, tablefmt="grid"))
def _setup_benchmark_subparser(self, benchmark_cls: Type[Benchmark],
aliases: Sequence[str]) -> None:
subparser = benchmark_cls.add_cli_parser(self.subparsers, aliases)
self.RUNNER_CLS.add_cli_parser(subparser)
assert isinstance(subparser, argparse.ArgumentParser), (
f"Benchmark class {benchmark_cls}.add_cli_parser did not return "
f"an ArgumentParser: {subparser}")
self._subparsers[benchmark_cls] = subparser
runner_group = subparser.add_argument_group("Runner Settings", "")
runner_group.add_argument(
"--cool-down-time",
type=positive_float_type,
default=2,
help="Time the runner waits between different runs or repetitions. "
"Increase this to let the CPU cool down between runs.")
runner_group.add_argument(
"--time-unit",
type=positive_float_type,
default=1,
help="Absolute duration of 1 time unit in the runner. "
"Increase this for slow builds or machines. "
"Default 1 time unit == 1 second.")
env_group = subparser.add_argument_group("Runner Environment Settings", "")
env_settings_group = env_group.add_mutually_exclusive_group()
env_settings_group.add_argument(
"--env",
type=inline_env_config,
help="Set default runner environment settings. "
f"Possible values: {', '.join(HostEnvironment.CONFIGS)}"
"or an inline hjson configuration (see --env-config). "
"Mutually exclusive with --env-config")
env_settings_group.add_argument(
"--env-config",
type=env_config_file,
help="Path to an env.config.hjson file that specifies detailed "
"runner environment settings and requirements. "
"See config/env.config.hjson for more details."
"Mutually exclusive with --env")
env_group.add_argument(
"--env-validation",
default="prompt",
choices=[mode.value for mode in ValidationMode],
help=textwrap.dedent("""
Set how runner env is validated (see als --env-config/--env):
throw: Strict mode, throw and abort on env issues,
prompt: Prompt to accept potential env issues,
warn: Only display a warning for env issues,
skip: Don't perform any env validation".
"""))
env_group.add_argument(
"--dry-run",
action="store_true",
default=False,
help="Don't run any browsers or probes")
browser_group = subparser.add_mutually_exclusive_group()
browser_group.add_argument(
"--browser",
action="append",
default=[],
help="Browser binary. Use this to test a simple browser variant. "
"Use [chrome, stable, dev, canary, safari] "
"for system default browsers or a full path. "
"Repeat for adding multiple browsers. "
"Defaults to 'chrome-stable'. "
"Use --browser=chrome-M107 to download the latest milestone, "
"--browser=chrome-100.0.4896.168 to download a specific version "
"(macos and googlers only)."
"Cannot be used with --browser-config")
browser_group.add_argument(
"--browser-config",
type=existing_file_type,
help="Browser configuration.json file. "
"Use this to run multiple browsers and/or multiple flag configurations."
"See config/browser.config.example.hjson on how to set up a complex "
"configuration file. "
"Cannot be used together with --browser.")
probe_group = subparser.add_mutually_exclusive_group()
probe_group.add_argument(
"--probe",
action="append",
default=[],
help="Enable general purpose probes to measure data on all cb.stories. "
"This argument can be specified multiple times to add more probes. "
"Use inline hjson (e.g. '--probe=$NAME{$CONFIG}') to configure probes. "
"Use 'describe probes' or 'describe probe $NAME' for probe "
"configuration details."
"Cannot be used together with --probe-config."
f"\n\nChoices: {', '.join(ProbeConfig.LOOKUP.keys())}")
probe_group.add_argument(
"--probe-config",
type=existing_file_type,
help="Browser configuration.json file. "
"Use this config file to specify more complex Probe settings."
"See config/probe.config.example.hjson on how to set up a complex "
"configuration file. "
"Cannot be used together with --probe.")
chrome_args = subparser.add_argument_group(
"Chrome-forwarded Options",
"For convenience these arguments are directly are forwarded "
"directly to chrome. Any other browser option can be passed "
"after the '--' arguments separator.")
chrome_args.add_argument("--js-flags", dest="js_flags")
doc_str = "See chrome's base/feature_list.h source file for more details"
chrome_args.add_argument(
"--enable-features",
help="Comma-separated list of enabled chrome features. " + doc_str,
default="")
chrome_args.add_argument(
"--disable-features",
help="Command-separated list of disabled chrome features. " + doc_str,
default="")
subparser.set_defaults(
subcommand=self.benchmark_subcommand, benchmark_cls=benchmark_cls)
self._add_verbosity_argument(subparser)
subparser.add_argument("other_browser_args", nargs="*")
def benchmark_subcommand(self, args: argparse.Namespace) -> None:
benchmark = None
runner = None
try:
self._benchmark_subcommand_helper(args)
benchmark = self._get_benchmark(args)
with tempfile.TemporaryDirectory(prefix="crossbench") as tmp_dirname:
if args.dry_run:
args.out_dir = pathlib.Path(tmp_dirname) / "results"
args.browser = self._get_browsers(args)
probes = self._get_probes(args)
env_config = self._get_env_config(args)
env_validation_mode = self._get_env_validation_mode(args)
timing = self._get_timing(args)
runner = self._get_runner(args, benchmark, env_config,
env_validation_mode, timing)
for probe in probes:
runner.attach_probe(probe, matching_browser_only=True)
self._run_benchmark(args, runner)
except KeyboardInterrupt:
sys.exit(2)
except Exception as e: # pylint: disable=broad-except
if args.throw:
raise
self._log_benchmark_subcommand_failure(benchmark, runner, e)
sys.exit(3)
def _benchmark_subcommand_helper(self, args: argparse.Namespace) -> None:
"""Handle common subcommand mistakes that are not easily implementable
with argparse.
run: => just run the benchmark
help => use --help
describe => use describe benchmark NAME
"""
if not args.other_browser_args:
return
maybe_command = args.other_browser_args[0]
if maybe_command == "run":
args.other_browser_args.pop()
return
if maybe_command == "help":
logging.error("Please use --help")
self._subparsers[args.benchmark_cls].print_help()
sys.exit(0)
if maybe_command == "describe":
logging.warning("Please use `describe benchmark %s`",
args.benchmark_cls.NAME)
# Patch args to simulate: describe benchmark BENCHMARK_NAME
args.category = "benchmarks"
args.filter = args.benchmark_cls.NAME
args.json = False
self.describe_subcommand(args)
def _log_benchmark_subcommand_failure(self, benchmark: Optional[Benchmark],
runner: Optional[Runner],
e: Exception) -> None:
logging.debug(e)
logging.error("")
logging.error("#" * 80)
logging.error("SUBCOMMAND UNSUCCESSFUL got %s:", e.__class__.__name__)
logging.error("-" * 80)
self._log_benchmark_subcommand_exception(e)
logging.error("-" * 80)
if benchmark:
logging.error("Running '%s' was not successful:", benchmark.NAME)
logging.error("- Check run results.json for detailed backtraces")
logging.error("- Use --throw to throw on the first logged exception")
logging.error("- Use -vv for detailed logging")
if runner and runner.runs:
self._log_runner_debug_hints(runner)
logging.error("#" * 80)
sys.exit(3)
def _log_benchmark_subcommand_exception(self, e: Exception) -> None:
message = str(e)
if message:
logging.error(message)
return
if isinstance(e, AssertionError):
self._log_assertion_error_statement(e)
def _log_assertion_error_statement(self, e: AssertionError) -> None:
_, exception, tb = sys.exc_info()
if exception is not e:
return
tb_info = traceback.extract_tb(tb)
filename, line, _, text = tb_info[-1]
logging.info('%s:%s: %s', filename, line, text)
def _log_runner_debug_hints(self, runner: Runner) -> None:
failed_runs = [run for run in runner.runs if not run.is_success]
if not failed_runs:
return
failed_run = failed_runs[0]
logging.error("- Check log outputs (first out of %d failed runs): %s",
len(failed_runs), failed_run.out_dir)
for log_file in failed_run.out_dir.glob("*.log"):
try:
log_file = log_file.relative_to(pathlib.Path.cwd())
finally:
pass
logging.error(" - %s", log_file)
def _run_benchmark(self, args: argparse.Namespace, runner: Runner) -> None:
try:
runner.run(is_dry_run=args.dry_run)
logging.info("")
self._log_results(args, runner, is_success=runner.is_success)
except: # pylint disable=broad-except
self._log_results(args, runner, is_success=False)
raise
finally:
if not args.out_dir:
latest = runner.out_dir.parent / "latest"
if latest.is_symlink():
latest.unlink()
if not latest.exists():
latest.symlink_to(runner.out_dir, target_is_directory=True)
else:
logging.error("Could not create %s", latest)
def _log_results(self, args: argparse.Namespace, runner: Runner,
is_success: bool) -> None:
logging.info("=" * 80)
if is_success:
logging.info("RESULTS: %s", runner.out_dir)
else:
logging.info("RESULTS (maybe incomplete/broken): %s", runner.out_dir)
logging.info("=" * 80)
for probe in runner.probes:
try:
probe.log_browsers_result(runner.browser_group)
except Exception as e: # pylint disable=broad-except
if args.throw:
raise
logging.debug("log_result_summary failed: %s", e)
def _get_browsers(self, args: argparse.Namespace) -> Sequence[Browser]:
args.browser_config = BrowserConfig.from_cli_args(args)
return args.browser_config.variants
def _get_probes(self, args: argparse.Namespace) -> Sequence[Probe]:
args.probe_config = ProbeConfig.from_cli_args(args)
return args.probe_config.probes
def _get_benchmark(self, args: argparse.Namespace) -> Benchmark:
benchmark_cls = self._get_benchmark_cls(args)
assert issubclass(
benchmark_cls,
Benchmark), (f"benchmark_cls={benchmark_cls} is not subclass of Runner")
return benchmark_cls.from_cli_args(args)
def _get_benchmark_cls(self, args: argparse.Namespace) -> Type[Benchmark]:
return args.benchmark_cls
def _get_env_validation_mode(
self,
args: argparse.Namespace,
) -> ValidationMode:
return ValidationMode[args.env_validation.upper()]
def _get_env_config(
self,
args: argparse.Namespace,
) -> HostEnvironmentConfig:
if args.env:
return args.env
if args.env_config:
return args.env_config
return HostEnvironmentConfig()
def _get_timing(
self,
args: argparse.Namespace,
) -> Timing:
assert args.cool_down_time >= 0
cool_down_time = dt.timedelta(seconds=args.cool_down_time)
assert args.time_unit > 0, "--time-unit must be > 0"
unit = dt.timedelta(seconds=args.time_unit)
return Timing(cool_down_time, unit)
def _get_runner(self, args: argparse.Namespace, benchmark: Benchmark,
env_config: HostEnvironmentConfig,
env_validation_mode: ValidationMode,
timing: Timing) -> Runner:
runner_kwargs = self.RUNNER_CLS.kwargs_from_cli(args)
return self.RUNNER_CLS(
benchmark=benchmark,
env_config=env_config,
env_validation_mode=env_validation_mode,
timing=timing,
**runner_kwargs)
def run(self, argv: Sequence[str]) -> None:
args: argparse.Namespace = self.parser.parse_args(argv)
self._initialize_logging(args)
args.subcommand(args)
def _initialize_logging(self, args: argparse.Namespace) -> None:
logging.getLogger().setLevel(logging.INFO)
console_handler = logging.StreamHandler()
if args.verbosity == 0:
console_handler.setLevel(logging.INFO)
elif args.verbosity >= 1:
console_handler.setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
console_handler.addFilter(logging.Filter("root"))
if args.color:
console_handler.setFormatter(helper.ColoredLogFormatter())
logging.getLogger().addHandler(console_handler)