blob: 2e609f98e4fea719a95fd060063e7bb343e01cd6 [file] [log] [blame]
# Copyright 2025 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import zipfile
from typing import TYPE_CHECKING, Optional
from google.protobuf import text_format
from google.protobuf.json_format import MessageToJson
from perfetto.trace_processor.api import TraceProcessor
from crossbench import path as pth
from crossbench.probes.probe_context import ProbeContext
from crossbench.probes.results import EmptyProbeResult, LocalProbeResult, \
ProbeResult
from crossbench.probes.trace_processor.uri_resolver import \
CrossbenchTraceUriResolver
if TYPE_CHECKING:
from crossbench.probes.trace_processor.query_config import \
TraceProcessorQueryConfig
from crossbench.probes.trace_processor.trace_processor import \
TraceProcessorProbe # noqa: F401
class TraceProcessorProbeContext(ProbeContext["TraceProcessorProbe"]):
def get_default_result_path(self) -> pth.AnyPath:
result_dir = super().get_default_result_path()
self.host_platform.mkdir(result_dir)
return result_dir
def setup(self) -> None:
pass
def start(self) -> None:
pass
def stop(self) -> None:
pass
def teardown(self) -> ProbeResult:
return self._merge_trace_files().merge(self._maybe_run_tp())
def _merge_trace_files(self) -> LocalProbeResult:
with self.run.actions("TRACE_PROCESSOR: Merging trace files", verbose=True):
traces = list(self.run.results.all_traces())
if len(traces) == 1:
# Symlink the existing trace to save time and space
self.host_platform.symlink_or_copy(traces[0], self.merged_trace_path)
else:
with zipfile.ZipFile(self.merged_trace_path, "w") as zip_file:
for f in traces:
zip_file.write(f, arcname=f.relative_to(self.run.out_dir))
return LocalProbeResult(perfetto=(self.merged_trace_path,))
def _maybe_run_tp(self) -> ProbeResult:
if not self.probe.needs_tp_run:
return EmptyProbeResult()
with TraceProcessor(
trace=CrossbenchTraceUriResolver(self),
config=self.probe.tp_config) as tp:
return self._run_queries(tp).merge(self._run_metrics(tp)).merge(
self._summarize_trace(tp))
def _run_queries(self, tp: TraceProcessor) -> LocalProbeResult:
def run_query(query: TraceProcessorQueryConfig) -> pth.LocalPath:
csv_file = self.local_result_path / f"{query.name}.csv"
tp.query(query.sql).as_pandas_dataframe().to_csv(
path_or_buf=csv_file, index=False)
return csv_file
with self.run.actions("TRACE_PROCESSOR: Running queries", verbose=True):
files = tuple(map(run_query, self.probe.queries))
return LocalProbeResult(csv=files)
def _run_metrics(self, tp: TraceProcessor) -> LocalProbeResult:
def run_metric(metric: str) -> pth.LocalPath:
json_file = self.local_result_path / f"{pth.safe_filename(metric)}.json"
proto = tp.metric([metric])
assert not json_file.exists(), (
f"Cannot override previously generated metric {json_file}")
json_file.write_text(MessageToJson(proto))
return json_file
with self.run.actions("TRACE_PROCESSOR: Running metrics", verbose=True):
files = tuple(map(run_metric, self.probe.metrics))
return LocalProbeResult(json=files)
def _summarize_trace(self, tp: TraceProcessor) -> ProbeResult:
if not self.probe.summary_metrics and not self.probe.metric_definitions:
return EmptyProbeResult()
with self.run.actions(
"TRACE_PROCESSOR: Running trace summary", verbose=True):
# Trace processor interprets an empty list as 'emit no metrics' and
# 'None' as emit all metrics specified in the metric definitions.
# When no metric IDs are explicitly given, default to the more
# sensible option of emitting every metric.
metric_ids: Optional[list[str]] = None
if len(self.probe.summary_metrics):
metric_ids = list(self.probe.summary_metrics)
proto_result = tp.trace_summary(
specs=list(self.probe.metric_definitions), metric_ids=metric_ids)
proto_file = self.local_result_path / "v2_metrics.pb"
proto_file.write_bytes(proto_result.SerializeToString())
textproto_file = self.local_result_path / "v2_metrics.textproto"
textproto_file.write_bytes(text_format.MessageToBytes(proto_result))
return LocalProbeResult(file=[proto_file, textproto_file])
@property
def merged_trace_path(self) -> pth.LocalPath:
return self.local_result_path / "merged_trace.zip"
@property
def symbolized_trace_path(self) -> pth.LocalPath:
return self.local_result_path / "symbolized_trace_path.zip"