blob: 89f172795ecbc93f647220fa377e79757c4804ed [file] [log] [blame]
# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import argparse
import csv
import gzip
import json
import logging
import os
import shutil
import stat
import zipfile
from contextlib import ExitStack
from typing import IO, TYPE_CHECKING, Any, Iterable, List, Optional, Tuple
from urllib.request import urlretrieve
from crossbench import cli_helper, exception
from crossbench import path as pth
from crossbench import plt
from crossbench.browsers.browser_helper import BROWSERS_CACHE
from crossbench.helper.path_finder import TraceProcessorFinder
from crossbench.plt.base import ListCmdArgs, Platform
from crossbench.probes import metric as cb_metric
from crossbench.probes.probe import Probe, ProbeConfigParser, ProbeContext
from crossbench.probes.results import LocalProbeResult, ProbeResult
if TYPE_CHECKING:
from crossbench.runner.groups.browsers import BrowsersRunGroup
from crossbench.runner.run import Run
_QUERIES_DIR = pth.LocalPath(__file__).parent / "queries"
_MODULES_DIR = pth.LocalPath(__file__).parent / "modules/ext"
_METRICS_DIR = pth.LocalPath(__file__).parent / "metrics"
def _is_trace_file(path: pth.LocalPath):
return path.name.endswith(".trace.pb") or path.name.endswith(
".trace.pb.gz") or path.name.endswith(".perf.data") or path.name.endswith(
"logcat.txt")
TRACE_PROCESSOR_DOWNLOAD_URL = "https://get.perfetto.dev/trace_processor"
def _download_trace_processor() -> pth.LocalPath:
destination = BROWSERS_CACHE / "trace_processor"
urlretrieve(TRACE_PROCESSOR_DOWNLOAD_URL, destination)
st = os.stat(destination)
os.chmod(destination, st.st_mode | stat.S_IEXEC)
return destination
# TODO(carlscab): We should use the python API to start a TraceProcessor
# instance and run all queries there, instead of creating one instance per query
class TraceProcessor:
"""
Helper class to run trace processor queries and metrics and store result in
csv files.
"""
def __init__(self,
platform: Platform,
trace_processor: pth.LocalPath):
super().__init__()
# TODO: add throw parameter.
self._exceptions = exception.Annotator(throw=True)
self._platform = platform
self._trace_processor = trace_processor
def _build_trace_processor_cmd(self,
trace_file: pth.LocalPath,
metric: Optional[str] = None,
query: Optional[str] = None) -> ListCmdArgs:
cmd: ListCmdArgs = [
self._trace_processor,
"--metric-extension",
str(_METRICS_DIR) + "@/ext",
"--add-sql-module",
_MODULES_DIR,
]
if metric:
cmd += ["--metrics-output", "json", "--run-metrics", metric]
if query:
cmd += ["--query-file", _QUERIES_DIR / f"{query}.sql"]
cmd.append(trace_file)
return cmd
def run_metrics(self, metrics: Iterable[str], trace_file: pth.LocalPath,
out_dir: pth.LocalPath) -> List[pth.LocalPath]:
if not metrics:
return []
json_files: List[pth.LocalPath] = []
self._platform.mkdir(out_dir)
for metric in metrics:
with self._exceptions.capture(f"Running metric: {metric}"):
safe_filename = pth.safe_filename(metric)
out_file = out_dir / f"{safe_filename}.json"
cmd = self._build_trace_processor_cmd(trace_file, metric=metric)
with out_file.open("x") as f:
self._platform.sh(*cmd, stdout=f)
json_files.append(out_file)
# TODO: consume exception in the caller instead.
self._exceptions.assert_success()
return json_files
def run_queries(self, queries: Iterable[str], trace_file: pth.LocalPath,
out_dir: pth.LocalPath) -> List[pth.LocalPath]:
if not queries:
return []
csv_files: List[pth.LocalPath] = []
self._platform.mkdir(out_dir)
for query in queries:
with self._exceptions.capture(f"Running query: {query}"):
safe_filename = pth.safe_filename(query)
out_file = out_dir / f"{safe_filename}.csv"
cmd = self._build_trace_processor_cmd(trace_file, query=query)
with out_file.open("x") as f:
self._platform.sh(*cmd, stdout=f)
csv_files.append(out_file)
# TODO: consume exception in the caller instead.
self._exceptions.assert_success()
return csv_files
def check_metrics(self, metrics: Iterable[str]) -> None:
"""
Runs all specified metrics on an empty trace. This will ensure that the
metrics are correctly defined in trace processor.
"""
for metric in metrics:
cmd = self._build_trace_processor_cmd(pth.LocalPath("/dev/null"),
metric=metric)
process = self._platform.sh(*cmd, capture_output=True, check=False)
if process.returncode != 0:
logging.error(
"Checking metric '%s' failed. Trace processor stderr:\n%s", metric,
process.stderr.decode("ascii"))
raise RuntimeError(f"Metric check failed: {metric}")
def check_queries(self, queries: Iterable[str]) -> None:
"""
Runs all specified queries on an empty trace. This will ensure that query
files exist and do not contain SQL syntax errors.
"""
for query in queries:
cmd = self._build_trace_processor_cmd(pth.LocalPath("/dev/null"),
query=query)
process = self._platform.sh(*cmd, capture_output=True, check=False)
if process.returncode != 0:
logging.error("Checking query '%s' failed. Trace processor stderr:\n%s",
query, process.stderr.decode("ascii"))
raise RuntimeError(f"Query check failed: {query}")
class TraceProcessorProbe(Probe):
"""
Trace processor probe.
"""
NAME = "trace_processor"
@classmethod
def config_parser(cls) -> ProbeConfigParser:
parser = super().config_parser()
parser.add_argument(
"metrics",
type=str,
is_list=True,
default=tuple(),
help="Name of metric to be run (can be any metric from Perfetto or "
"a custom metric from probes/trace_processor/metrics)")
parser.add_argument(
"queries",
type=str,
is_list=True,
default=tuple(),
help="Name of query to be run (under probes/trace_processor/queries)")
parser.add_argument(
"trace_processor_bin",
type=cli_helper.parse_local_binary_path,
required=False,
help="Path to the trace_processor binary")
return parser
def __init__(self,
metrics: Iterable[str],
queries: Iterable[str],
trace_processor_bin: Optional[pth.LocalPath] = None):
super().__init__()
self._metrics = tuple(metrics)
self._queries = tuple(queries)
if not trace_processor_bin:
if tp_chromium_path := TraceProcessorFinder(plt.PLATFORM).path:
trace_processor_bin = pth.LocalPath(tp_chromium_path)
else:
trace_processor_bin = _download_trace_processor()
self._trace_processor_bin = cli_helper.parse_local_binary_path(
trace_processor_bin, "trace_processor")
@property
def metrics(self) -> Tuple[str, ...]:
return self._metrics
@property
def queries(self) -> Tuple[str, ...]:
return self._queries
@property
def trace_processor_bin(self) -> pth.LocalPath:
return self._trace_processor_bin
def get_context(self, run: Run) -> TraceProcessorProbeContext:
return TraceProcessorProbeContext(self, run)
def _merge_csv(self, group_dir: pth.LocalPath,
group: BrowsersRunGroup) -> List[pth.LocalPath]:
writers: dict[str, csv.DictWriter] = {}
csv_files: List[pth.LocalPath] = []
with ExitStack() as stack:
extra_columns: dict[str, Any] = {}
for story in group.story_groups:
extra_columns["browser_label"] = story.browser.label
for rep in story.repetitions_groups:
extra_columns["cb_story_name"] = rep.story.name
for run in rep.runs:
extra_columns["repetition"] = run.repetition
for file in run.results[self].csv_list:
with file.open(newline="") as csv_file:
def skip_empty_lines(line):
return line != "\n"
reader = csv.DictReader(
filter(skip_empty_lines, csv_file),
dialect=csv.unix_dialect)
if not file.name in writers:
merged_csv_file = group_dir / file.name
csv_files.append(merged_csv_file)
f = merged_csv_file.open("x", newline="")
stack.enter_context(f)
fieldnames = sorted(extra_columns.keys()) + list(
reader.fieldnames)
cli_helper.parse_unique_sequence(fieldnames, "field names",
ValueError)
w = csv.DictWriter(f, fieldnames=fieldnames)
w.writeheader()
writers[file.name] = w
w = writers[file.name]
for row in reader:
row.update(extra_columns)
w.writerow(row)
return csv_files
def _merge_json(self, group_dir: pth.LocalPath,
group: BrowsersRunGroup) -> List[pth.LocalPath]:
merged_metrics = cb_metric.MetricsMerger()
for run in group.runs:
for file in run.results[self].json_list:
with file.open() as json_file:
merged_metrics.add(json.load(json_file))
merged_json_file = group_dir / "metrics.json"
with merged_json_file.open("x") as f:
json.dump(merged_metrics.to_json(), f, indent=4)
return [merged_json_file]
def merge_browsers(self, group: BrowsersRunGroup) -> ProbeResult:
group_dir = group.get_local_probe_result_path(self)
group_dir.mkdir()
merged_csv = self._merge_csv(group_dir, group)
merged_json = self._merge_json(group_dir, group)
return LocalProbeResult(csv=merged_csv, json=merged_json)
def log_browsers_result(self, group: BrowsersRunGroup) -> None:
logging.info("-" * 80)
logging.critical("TraceProcessor traces:")
for run in group.runs:
logging.critical(" - %s", run.results[self].file)
class TraceProcessorProbeContext(ProbeContext[TraceProcessorProbe]):
def __init__(self, probe: TraceProcessorProbe, run: Run) -> None:
super().__init__(probe, run)
self._trace_processor = TraceProcessor(run.runner_platform,
probe.trace_processor_bin)
def get_default_result_path(self) -> pth.RemotePath:
result_dir = super().get_default_result_path()
self.runner_platform.mkdir(result_dir)
return result_dir
def setup(self) -> None:
# Before actually running the benchmark, make sure all the metrics and
# queries exist and don't contain errors.
self._trace_processor.check_metrics(self.probe.metrics)
self._trace_processor.check_queries(self.probe.queries)
def start(self) -> None:
pass
def stop(self) -> None:
pass
def teardown(self) -> ProbeResult:
with self.run.actions("TRACE_PROCESSOR: Merging trace files", verbose=True):
merged_trace = self._merge_trace_files()
with self.run.actions("TRACE_PROCESSOR: Running queries", verbose=True):
csv_files = self._trace_processor.run_queries(
queries=self.probe.queries,
trace_file=merged_trace,
out_dir=self.local_result_path)
with self.run.actions("TRACE_PROCESSOR: Running metrics", verbose=True):
json_files = self._trace_processor.run_metrics(
metrics=self.probe.metrics,
trace_file=merged_trace,
out_dir=self.local_result_path)
return LocalProbeResult(file=[merged_trace], csv=csv_files, json=json_files)
def _merge_trace_files(self) -> pth.LocalPath:
merged_trace = self.local_result_path / "merged_trace.zip"
with zipfile.ZipFile(merged_trace, "w") as zip_file:
for f in self.run.results.all_traces():
zip_file.write(f, arcname=f.relative_to(self.run.out_dir))
return merged_trace
def _write_probe_result_traces(self, probe_name: str, output_f: IO) -> None:
# TODO: implement probe dependencies
probe_results = self.run.results.get_by_name(probe_name)
assert probe_results, (
f"Did not find results for required probe {probe_name}")
if probe_results.is_empty:
logging.warning("TRACE_PROCESSOR: No trace files found for %s",
probe_name)
return
for f in probe_results.file_list:
if not _is_trace_file(f):
continue
if f.suffix == ".gz":
with gzip.open(f) as input_f:
shutil.copyfileobj(input_f, output_f)
else:
with f.open("rb") as input_f:
shutil.copyfileobj(input_f, output_f)