blob: f7cb34f4d873cda87bf91759d8e2fdc7f1adc69e [file] [log] [blame]
# Copyright 2022 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import argparse
import itertools
import json
import logging
import pathlib
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union
import crossbench as cb
try:
import hjson
except ModuleNotFoundError:
logging.debug("hjson module not found")
hjson = None
class FlagGroupConfig:
"""
This object is create from configuration files and mainly contains a mapping
from flag-names to multiple values.
"""
_variants: Dict[str, Iterable[Optional[str]]]
name: str
def __init__(self, name: str,
variants: Dict[str, Union[Iterable[Optional[str]], str]]):
self.name = name
self._variants = {}
for flag_name, flag_variants_or_value in variants.items():
assert flag_name not in self._variants
assert flag_name
if isinstance(flag_variants_or_value, str):
self._variants[flag_name] = (str(flag_variants_or_value),)
else:
assert isinstance(flag_variants_or_value, Iterable)
flag_variants = tuple(flag_variants_or_value)
assert len(flag_variants) == len(set(flag_variants)), (
"Flag variant contains duplicate entries: {flag_variants}")
self._variants[flag_name] = tuple(flag_variants_or_value)
def get_variant_items(self) -> Iterable[Optional[Tuple[str, Optional[str]]]]:
for flag_name, flag_values in self._variants.items():
yield tuple(
self._map(flag_name, flag_value) for flag_value in flag_values)
@staticmethod
def _map(flag_name: str, flag_value: Optional[str]):
if flag_value is None:
return None
if flag_value == "":
return (flag_name, None)
return (flag_name, flag_value)
class BrowserConfig:
@classmethod
def load(cls, f, browser_lookup_override={}):
try:
if hjson:
config = hjson.load(f)
else:
config = json.load(f)
except ValueError as e:
raise ValueError(f"Failed to parse config file: {f}") from e
return cls(config, browser_lookup_override)
flag_groups: Dict[str, FlagGroupConfig]
variants: List[cb.browsers.Browser]
def __init__(self,
raw_config_data: Optional[Dict] = None,
browser_lookup_override: Dict[
str, Tuple[Type[cb.browsers.Browser], pathlib.Path]] = {}):
self.flag_groups = {}
self.variants = []
self._browser_lookup_override = browser_lookup_override
if raw_config_data:
if "flags" in raw_config_data:
for flag_name, group_config in raw_config_data["flags"].items():
self._parse_flag_group(flag_name, group_config)
assert "browsers" in raw_config_data, (
"Config does not provide a 'browsers' dict.")
assert len(raw_config_data["browsers"]), ("Config contains empty 'browsers' dict.")
for name, browser_config in raw_config_data["browsers"].items():
self._parse_browser(name, browser_config)
def _parse_flag_group(self, name, raw_flag_group_data):
assert name not in self.flag_groups, (f"flag-group='{name}' exists already")
variants = {}
for flag_name, values in raw_flag_group_data.items():
if not flag_name.startswith("-"):
raise Exception(f"Invalid flag name: '{flag_name}'")
if flag_name not in variants:
flag_values = variants[flag_name] = set()
else:
flag_values = variants[flag_name]
if isinstance(values, str):
values = [values]
for value in values:
assert value not in flag_values, (
"Same flag variant was specified more than once: "
f"'{value}' for entry '{flag_name}'")
flag_values.add(value)
self.flag_groups[name] = FlagGroupConfig(name, variants)
def _parse_browser(self, name, raw_browser_data):
path_or_identifier = raw_browser_data["path"]
if path_or_identifier in self._browser_lookup_override:
cls, path = self._browser_lookup_override[path_or_identifier]
else:
path = self._get_browser_path(path_or_identifier)
cls = self._get_browser_cls_from_path(path)
assert path.exists(), f"Browser='{name}' path='{path}' does not exist."
variants_flags = tuple(
cls.default_flags(flags)
for flags in self._parse_flags(name, raw_browser_data))
logging.info("Running browser '%s' with %s flag variants:", name,
len(variants_flags))
for i in range(len(variants_flags)):
logging.info(" %s: %s", i, variants_flags[i])
# pytype: disable=not-instantiable
self.variants += [
cls(label=self._flags_to_label(name, flags), path=path, flags=flags)
for flags in variants_flags
]
# pytype: enable=not-instantiable
def _flags_to_label(self, name: str, flags: cb.flags.Flags) -> str:
return f"{name}_{cb.browsers.convert_flags_to_label(*flags.get_list())}"
def _parse_flags(self, name, data):
flags_product = []
flag_group_names = data["flags"]
assert isinstance(flag_group_names,
list), (f"'flags' is not a list for browser='{name}'")
for flag_group_name in flag_group_names:
# Use temporary FlagGroupConfig for inline fixed flag definition
if flag_group_name.startswith("--"):
flag_name, flag_value = cb.flags.Flags.split(flag_group_name)
flag_group = FlagGroupConfig("temporary", {flag_name: flag_value})
assert flag_group_name not in self.flag_groups
else:
flag_group = self.flag_groups.get(flag_group_name, None)
assert flag_group is not None, (f"Flag-group='{flag_group_name}' "
f"for browser='{name}' does not exist.")
flags_product += flag_group.get_variant_items()
if len(flags_product) == 0:
# use empty default
return (dict(),)
flags_product = itertools.product(*flags_product)
# Filter out (.., None) value
flags_product = list(
list(flag_item
for flag_item in flags_items
if flag_item is not None)
for flags_items in flags_product)
assert flags_product
return flags_product
def _get_browser_cls_from_path(self, path):
path_str = str(path).lower()
if "safari" in path_str:
return cb.browsers.SafariWebDriver
if "chrome" in path_str:
return cb.browsers.ChromeWebDriver
raise Exception(f"Unsupported browser='{path}'")
def load_from_args(self, args):
path = self._get_browser_path(args.browser or "chrome")
logging.warning("SELECTED BROWSER: %s", path)
cls = self._get_browser_cls_from_path(path)
flags = cls.default_flags()
if args.enable_features:
for feature in args.enabled_features.split(","):
flags.features.enable(feature)
if args.disable_features:
for feature in args.disabled_features.split(","):
flags.features.disable(feature)
if args.js_flags:
flags.js_flags.update(args.js_flags.split(","))
for flag_str in args.other_browser_args:
flags.set(*cb.flags.Flags.split(flag_str))
label = cb.browsers.convert_flags_to_label(*flags.get_list())
browser = cls(label=label, path=path, flags=flags)
self.variants.append(browser)
def _get_browser_path(self, path_or_identifier: str) -> pathlib.Path:
identifier = path_or_identifier.lower()
# We're not using a dict-based lookup here, since not all browsers are
# available on all platforms
if identifier == "chrome" or identifier == "stable":
return cb.browsers.Chrome.stable_path
if identifier == "chrome dev" or identifier == "dev":
return cb.browsers.Chrome.dev_path
if identifier == "chrome canary" or identifier == "canary":
return cb.browsers.Chrome.canary_path
if identifier == "safari":
return cb.browsers.Safari.default_path
if identifier == "safari technology preview" or identifier == "tp":
return cb.browsers.Safari.technology_preview_path
path = pathlib.Path(path_or_identifier)
if path.exists():
return path
path = path.expanduser()
if path.exists():
return path
if len(path.parts) > 1:
raise Exception(f"Browser at '{path}' does not exist.")
raise Exception(
f"Unknown browser path or short name: '{path_or_identifier}'")
class CrossBenchCLI:
BENCHMARKS = (
cb.benchmarks.Speedometer20Runner,
cb.benchmarks.JetStream2Benchmark,
cb.benchmarks.MotionMark12Benchmark,
cb.benchmarks.PageLoadBenchmark,
)
GENERAL_PURPOSE_PROBES_BY_NAME = {
cls.NAME: cls for cls in cb.probes.GENERAL_PURPOSE_PROBES
}
RUNNER_CLS = cb.runner.Runner
def __init__(self):
self.parser = argparse.ArgumentParser()
self._setup_parser()
self._setup_subparser()
def _setup_parser(self):
self.parser.add_argument(
"-v",
"--verbose",
dest="verbosity",
action="count",
default=0,
help="Increase output verbosity (0..2)")
def _setup_subparser(self):
self.subparsers = self.parser.add_subparsers(
title="Subcommands", dest="subcommand", required=True)
for benchmark_cls in self.BENCHMARKS:
self._setup_benchmark_subparser(benchmark_cls)
describe_parser = self.subparsers.add_parser(
"describe", help="Print all benchmarks and stories")
describe_parser.set_defaults(subcommand=self.describe_subcommand)
def describe_subcommand(self, args):
data = {
"benchmarks": {
benchmark_cls.NAME: benchmark_cls.describe()
for benchmark_cls in self.BENCHMARKS
},
"probes": {
probe_cls.NAME: probe_cls.__doc__.strip()
for probe_cls in cb.probes.GENERAL_PURPOSE_PROBES
}
}
print(json.dumps(data, indent=2))
def _setup_benchmark_subparser(self, benchmark_cls):
subparser = benchmark_cls.add_cli_parser(self.subparsers)
self.RUNNER_CLS.add_cli_parser(subparser)
assert isinstance(subparser, argparse.ArgumentParser), (
f"Benchmark class {benchmark_cls}.add_cli_parser did not return "
f"an ArgumentParser: {subparser}")
subparser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="Don't run any browsers or probes")
browser_group = subparser.add_mutually_exclusive_group()
browser_group.add_argument(
"--browser",
help="Browser binary. Use this to test a single browser. "
"Use a shortname [chrome, stable, dev, canary, safari] "
"for system default browsers or a full path. "
"Defaults to 'chrome'. "
"Cannot be used with --browser-config")
browser_group.add_argument(
"--browser-config",
type=pathlib.Path,
help="Browser configuration.json file. "
"Use this to run multiple browsers and/or multiple flag configurations."
"See browser.config.example.hjson on how to set up a complex "
"configuration file. "
"Cannot be used together with --browser.")
subparser.add_argument(
"--probe",
action="append",
default=[],
choices=self.GENERAL_PURPOSE_PROBES_BY_NAME.keys(),
help="Enable general purpose probes to measure data on all cb.stories "
"This argument can be specified multiple times to add more probes")
subparser.add_argument("other_browser_args", nargs="*")
chrome_args = subparser.add_argument_group(
"Chrome-forwarded Options",
"For convenience these arguments are directly are forwarded "
"directly to chrome. Any other browser option can be passed "
"after the '--' arguments separator.")
chrome_args.add_argument("--js-flags", dest="js_flags")
DOC = "See chrome's base/feature_list.h source file for more details"
chrome_args.add_argument(
"--enable-features",
help="Comma-separated list of enabled chrome features. " + DOC,
default="")
chrome_args.add_argument(
"--disable-features",
help="Command-separated list of disabled chrome features. " + DOC,
default="")
subparser.set_defaults(
subcommand=self.benchmark_subcommand, benchmark_cls=benchmark_cls)
def benchmark_subcommand(self, args):
if args.browser_config:
path = args.browser_config.expanduser()
if not path.exists():
raise argparse.ArgumentTypeError(
f"Given path '{path.absolute}' does not exist")
assert args.browser is None, (
"Cannot specify --browser and --browser-config at the same time")
with path.open() as f:
args.browser_config = BrowserConfig.load(f)
else:
args.browser_config = BrowserConfig()
args.browser_config.load_from_args(args)
args.browsers = args.browser_config.variants
benchmark_cls = args.benchmark_cls
assert issubclass(benchmark_cls, cb.benchmarks.Benchmark), (
f"benchmark_cls={benchmark_cls} is not subclass of Runner")
benchmark = benchmark_cls(**benchmark_cls.kwargs_from_cli(args))
runner_kwargs = self.RUNNER_CLS.kwargs_from_cli(args)
runner = self.RUNNER_CLS(benchmark=benchmark, **runner_kwargs)
for probe_name in args.probe:
probe = self.GENERAL_PURPOSE_PROBES_BY_NAME[probe_name]()
runner.attach_probe(probe, matching_browser_only=True)
runner.run(is_dry_run=args.dry_run)
results_json = runner.out_dir / "results.json"
print(f"RESULTS: {results_json}")
def run(self, argv):
args = self.parser.parse_args(argv)
self._initialize_logging(args)
args.subcommand(args)
def _initialize_logging(self, args):
logging.getLogger().setLevel(logging.INFO)
consoleHandler = logging.StreamHandler()
if args.verbosity == 0:
consoleHandler.setLevel(logging.WARNING)
elif args.verbosity == 1:
consoleHandler.setLevel(logging.INFO)
elif args.verbosity > 1:
consoleHandler.setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
consoleHandler.addFilter(logging.Filter("root"))
logging.getLogger().addHandler(consoleHandler)