blob: 5a924265ee9ecf90e59b2a27d87baab074166d11 [file] [log] [blame]
# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import collections
import json
import logging
from typing import TYPE_CHECKING, ClassVar, Final, Iterable, Optional, Self, \
Type
import pandas as pd
from google.protobuf.json_format import MessageToJson
from perfetto.batch_trace_processor.api import BatchTraceProcessor, \
BatchTraceProcessorConfig
from perfetto.trace_processor.api import TraceProcessor, TraceProcessorConfig
from perfetto.trace_uri_resolver.path import PathUriResolver
from perfetto.trace_uri_resolver.registry import ResolverRegistry
from typing_extensions import override
from crossbench import path as pth
from crossbench import plt
from crossbench.helper.path_finder import LlvmSymbolizerFinder, TraceconvFinder
from crossbench.parse import ObjectParser
from crossbench.probes.metric import MetricsMerger
from crossbench.probes.probe import Probe, ProbeConfigParser, ProbePriority
from crossbench.probes.results import LocalProbeResult, ProbeResult
from crossbench.probes.trace_processor.constants import MODULES_DIR, \
PROBE_NAME, QUERIES_DIR
from crossbench.probes.trace_processor.context.base import \
TraceProcessorProbeContext
from crossbench.probes.trace_processor.context.symbolizing import \
TraceProcessorSymbolizingProbeContext
from crossbench.probes.trace_processor.query_config import \
TraceProcessorQueryConfig
from crossbench.probes.trace_processor.uri_resolver import \
CrossbenchTraceUriResolver
if TYPE_CHECKING:
from crossbench.env.runner_env import RunnerEnv
from crossbench.runner.groups.browsers import BrowsersRunGroup
from crossbench.runner.run import Run
from crossbench.types import JsonDict
class TraceProcessorProbe(Probe):
"""
Trace processor probe.
"""
NAME: ClassVar = PROBE_NAME
PRIORITY: ClassVar = ProbePriority.TRACE_PROCESSOR
@classmethod
@override
def config_parser(cls) -> ProbeConfigParser[Self]:
parser = super().config_parser()
parser.add_argument(
"batch",
type=bool,
default=False,
help="Run queries in batch mode when all the test runs are done. This "
"can considerably reduce the run time at the expense of higher "
"memory usage (all traces will be loaded into memory at the same "
"time)")
parser.add_argument(
"metrics",
type=str,
is_list=True,
default=(),
help="Name of metric to be run (can be any metric from Perfetto)")
parser.add_argument(
"metric_definitions",
type=ObjectParser.str_or_file_contents,
is_list=True,
default=(),
help=("Textproto for perfetto metrics v2 definition files. "
"Can be inline textproto or a path to a .textproto file."))
parser.add_argument(
"summary_metrics",
type=str,
is_list=True,
default=(),
help=("Additional metrics to only include in the trace summary. "
"Includes all of <metrics>. These can be v2 metrics if the "
"corresponding metric definition is supplied."))
parser.add_argument(
"queries",
type=TraceProcessorQueryConfig,
is_list=True,
default=(),
help="Name of query to be run (under probes/trace_processor/queries) "
"or { name: str, sql: str } containing the name and SQL query to run")
parser.add_argument(
"symbolize_profile",
type=ObjectParser.bool,
default=True,
help="Auto symbolize data from system profiles for "
"locally compiled browsers.")
parser.add_argument(
"module_paths",
type=pth.LocalPath,
is_list=True,
default=(),
help="Additional paths to include as trace processor modules.")
parser.add_argument(
"trace_processor_bin",
aliases=("trace_processor",),
type=plt.PLATFORM.parse_local_binary_path,
help="Path to the trace_processor binary")
parser.add_argument(
"traceconv_bin",
aliases=("traceconv",),
type=plt.PLATFORM.parse_local_binary_path,
help="Path to the perfetto traceconv binary")
parser.add_argument(
"llvm_symbolizer_bin",
aliases=("llvm_symbolizer",),
type=plt.PLATFORM.parse_local_binary_path,
help="Path to the llvm-symbolizer binary")
parser.add_argument(
"dev_features",
aliases=("dev",),
type=ObjectParser.bool,
default=True,
help=("Enables trace_processor dev features via the --dev flags. "
"Enabled by default."))
return parser
def __init__(
self,
batch: bool,
metric_definitions: Iterable[str],
summary_metrics: Iterable[str],
metrics: Iterable[str],
queries: Iterable[TraceProcessorQueryConfig],
symbolize_profile: bool,
module_paths: Iterable[pth.LocalPath],
trace_processor_bin: Optional[pth.LocalPath] = None,
traceconv_bin: Optional[pth.LocalPath] = None,
llvm_symbolizer_bin: Optional[pth.LocalPath] = None,
dev_features: bool = True,
) -> None:
super().__init__()
self._platform: Final[plt.Platform] = plt.PLATFORM
self._batch: Final[bool] = batch
self._dev_features: Final[bool] = dev_features
self._metrics: Final[tuple[str, ...]] = tuple(metrics)
self._metric_definitions: Final[tuple[str, ...]] = tuple(metric_definitions)
self._summary_metrics: Final[tuple[
str, ...]] = tuple(metrics) + tuple(summary_metrics)
ObjectParser.unique_sequence([query.name for query in queries],
name="query names")
self._queries: Final[tuple[TraceProcessorQueryConfig, ...]] = tuple(queries)
self._symbolize_profile: Final[bool] = symbolize_profile
self._module_paths: Final[tuple[pth.LocalPath,
...]] = (MODULES_DIR,) + tuple(module_paths)
self._trace_processor_bin: Final[pth.LocalPath
| None] = TraceconvFinder.local_binary(
trace_processor_bin)
self._traceconv_bin: Final[pth.LocalPath
| None] = TraceconvFinder.local_binary(
traceconv_bin)
self._llvm_symbolizer_bin: Final[
pth.LocalPath
| None] = LlvmSymbolizerFinder.local_binary(llvm_symbolizer_bin)
@property
def batch(self) -> bool:
return self._batch
@property
def metrics(self) -> tuple[str, ...]:
return self._metrics
@property
def queries(self) -> tuple[TraceProcessorQueryConfig, ...]:
return self._queries
@property
def metric_definitions(self) -> tuple[str, ...]:
return self._metric_definitions
@property
def summary_metrics(self) -> tuple[str, ...]:
return self._summary_metrics
@property
def module_paths(self) -> tuple[pth.LocalPath, ...]:
return self._module_paths
@property
def has_work(self) -> bool:
return len(self._queries) != 0 or len(self._metrics) != 0 or len(
self._summary_metrics) != 0 or len(self._metric_definitions) != 0
@property
def needs_tp_run(self) -> bool:
return (not self.batch) and self.has_work
@property
def needs_btp_run(self) -> bool:
return self._batch and self.has_work
@property
def trace_processor_bin(self) -> pth.LocalPath | None:
return self._trace_processor_bin
@property
def traceconv_bin(self) -> pth.LocalPath | None:
return self._traceconv_bin
@property
def llvm_symbolizer_bin(self) -> pth.LocalPath | None:
return self._llvm_symbolizer_bin
@property
def symbolize_profile(self) -> bool:
return self._symbolize_profile
@property
def tp_config(self) -> TraceProcessorConfig:
extra_flags: list[str] = []
if self._dev_features:
extra_flags.append("--dev")
for module_path in self.module_paths:
extra_flags.append("--add-sql-module")
extra_flags.append(str(module_path))
return TraceProcessorConfig(
bin_path=self.trace_processor_bin,
ingest_ftrace_in_raw=True,
resolver_registry=ResolverRegistry(
resolvers=[CrossbenchTraceUriResolver, PathUriResolver]),
load_timeout=10,
extra_flags=extra_flags)
@override
def get_context_cls(self) -> Type[TraceProcessorProbeContext]:
# TODO: enable on linux and android
if self._platform.is_macos:
return TraceProcessorSymbolizingProbeContext
return TraceProcessorProbeContext
@override
def validate_env(self, env: RunnerEnv) -> None:
super().validate_env(env)
self._check_sql()
def _check_sql(self) -> None:
"""
Runs all metrics and queries on an empty trace. This will ensure that they
are correctly defined in trace processor.
"""
with TraceProcessor(trace="/dev/null", config=self.tp_config) as tp:
for metric in self.metrics:
tp.metric([metric])
for query in self.queries:
tp.query(query.sql)
if len(self.summary_metrics):
tp.trace_summary(
specs=list(self.metric_definitions),
metric_ids=list(self.summary_metrics))
def _add_cb_columns(self, df: pd.DataFrame, run: Run) -> pd.DataFrame:
df["cb_browser"] = run.browser.unique_name
df["cb_story"] = run.story.name
df["cb_temperature"] = run.temperature
df["cb_run"] = run.repetition
return df
def _aggregate_results_by_query(
self, runs: Iterable[Run]) -> dict[str, pd.DataFrame]:
res: dict[str, pd.DataFrame] = {}
for run in runs:
for file in run.results[self].csv_list:
df = pd.read_csv(file)
df = self._add_cb_columns(df, run)
if file.stem in res:
res[file.stem] = pd.concat([res[file.stem], df])
else:
res[file.stem] = df
return res
def _merge_json(self, runs: Iterable[Run]) -> dict[str, JsonDict]:
merged_metrics: dict[str,
MetricsMerger] = collections.defaultdict(MetricsMerger)
for run in runs:
for file_path in run.results[self].json_list:
with file_path.open() as json_file:
merged_metrics[file_path.stem].add(json.load(json_file))
return {
metric_name: merged.to_json()
for metric_name, merged in merged_metrics.items()
}
@override
def merge_browsers(self, group: BrowsersRunGroup) -> ProbeResult:
if self.needs_btp_run:
return self._run_btp(group)
return self._merge_browser_files(group)
def _merge_browser_files(self, group: BrowsersRunGroup) -> LocalProbeResult:
group_dir = group.get_local_probe_result_path(self)
group_dir.mkdir()
csv_files = []
json_files = []
for query, df in self._aggregate_results_by_query(group.runs).items():
csv_file = group_dir / f"{pth.safe_filename(query)}.csv"
df.to_csv(path_or_buf=csv_file, index=False)
csv_files.append(csv_file)
for metric, data in self._merge_json(group.runs).items():
json_file = group_dir / f"{pth.safe_filename(metric)}.json"
with json_file.open("x") as f:
json.dump(data, f, indent=4)
json_files.append(json_file)
return LocalProbeResult(csv=csv_files, json=json_files)
def _run_btp(self, group: BrowsersRunGroup) -> LocalProbeResult:
group_dir: pth.LocalPath = group.get_local_probe_result_path(self)
group_dir.mkdir()
btp_config = BatchTraceProcessorConfig(tp_config=self.tp_config)
with BatchTraceProcessor(
traces=CrossbenchTraceUriResolver(group.runs),
config=btp_config) as btp:
def run_query(query: TraceProcessorQueryConfig) -> pth.LocalPath:
csv_file = group_dir / f"{query.name}.csv"
btp.query_and_flatten(query.sql).to_csv(
path_or_buf=csv_file, index=False)
return csv_file
csv_files = list(map(run_query, self.queries))
def run_metric(metric: str) -> pth.LocalPath:
json_file = group_dir / f"{pth.safe_filename(metric)}.json"
protos = btp.metric([metric])
with json_file.open("x") as f:
for p in protos:
f.write(MessageToJson(p))
return json_file
json_files = list(map(run_metric, self.metrics))
return LocalProbeResult(csv=csv_files, json=json_files)
@override
def log_browsers_result(self, group: BrowsersRunGroup) -> None:
logging.info("-" * 80)
logging.critical("TraceProcessor merged traces:")
for run in group.runs:
logging.critical(" - %s", run.results[self].perfetto)
__all__ = [
"TraceProcessorProbe",
"QUERIES_DIR",
"MODULES_DIR",
]