blob: 99089a776da0b2f98cf3469ae78f8b3b6f43f93e [file] [log] [blame]
#!/usr/bin/env vpython
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import csv
import datetime
import errno
import json
import multiprocessing
import os
import platform
import re
import shutil
import subprocess
import sys
import traceback
DESCRIPTION = (
"""Tool to download and aggregate traces from a given Pinpoint job.
The tool is intended to be used in two steps:
Step 1: Fetch all traces from a Pinpoint job. Example:
> pinpoint_traces.py fetch \\
--job=https://pinpoint-dot-chromeperf.appspot.com/job/1709f5ff920000
Step 2: Extract all events with a matching name into a CSV file. Example:
> pinpoint_traces.py extract --event='.*Looper.dispatch.*' --output events.csv
NOTE: Currently only supported on Linux.
WARNING: Uses multiprocessing with heavy use of memory and CPU resources in each
subprocess. This causes problems when handling SIGINT (when ^C is
pressed), instead it is recommended to stop by ^Z and using the shell
job control to kill or continue the job (like: kill %1).
""")
THIS_DIRNAME = os.path.dirname(os.path.abspath(__file__))
PINPOINT_CLI = os.path.join(THIS_DIRNAME, os.pardir, 'pinpoint_cli')
HTML2TRACE = os.path.join(THIS_DIRNAME, os.pardir, os.pardir, os.pardir,
'third_party/catapult/tracing/bin/html2trace')
TRACE_PREFIX = 'https://console.developers.google.com/m/cloudstorage/b/chrome-telemetry-output/o/'
SUPPORTED_PHASES = set(['B', 'E', 'X'])
def ParseArgs():
parser = argparse.ArgumentParser(
prog='PINPOINT_TRACES',
formatter_class=argparse.RawTextHelpFormatter,
description=DESCRIPTION)
subparsers = parser.add_subparsers(dest='command')
# Subcommand to fetch traces created by a Pinpoint job.
parser_fetch_traces = subparsers.add_parser(
'fetch', help='Fetches job_results.csv and the traces mentioned in it')
required_named_fetch = parser_fetch_traces.add_argument_group(
'required named arguments')
required_named_fetch.add_argument('--job',
required=True,
help='Pinpoint job ID or URL')
# Subcommand to extract events from all fetched traces.
parser_extract_event = subparsers.add_parser(
'extract',
help=('Extracts information about an event from all downloaded traces'
'into a CSV file'))
required_named_extract = parser_extract_event.add_argument_group(
'required named arguments')
required_named_extract.add_argument('-e',
'--event',
required=True,
help='Regex to match event names against')
required_named_extract.add_argument('-o',
'--output',
required=True,
help='CSV output file name')
return parser.parse_args()
def EnsureDirectoryExists(path):
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
return path
def LastAfterSlash(name):
assert platform.system() == 'Linux'
return name.rsplit('/', 1)[-1]
def FetchTraces(job_id):
"""Fetches traces from the cloud storage and stores then in a new directory.
In order to discover the trace files, fetches the 'job results' file
(job_results.csv) using pinpoint_cli. Extracts all the trace file names from
the job results and fetches them from the predefined cloud storage bucket.
The structure of the directory is like:
~/.local/share/pinpoint_traces/latest/
├── job_results.csv
├── chromium@4efc7af
│   ├── trace1.html
│   ├── trace2.html
├── chromium@5707819
│   ├── trace3.html
│ │
│ ...
...
Args:
job_id: (str) Pinpoint job ID, extracted from the job URL.
Returns:
Full path to the job results file.
"""
timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
top_dir = os.path.expanduser('~/.local/share/pinpoint_traces')
working_dir = os.path.join(top_dir, timestamp)
assert not os.path.exists(working_dir), ('Frequent invocations are not '
'supported')
EnsureDirectoryExists(working_dir)
subprocess.check_call([PINPOINT_CLI, 'get-csv', job_id], cwd=working_dir)
tmp_latest = os.path.join(top_dir, 'tmp-latest')
real_latest = os.path.join(top_dir, 'latest')
try:
os.remove(tmp_latest)
except OSError:
pass
os.symlink(timestamp, tmp_latest)
os.rename(tmp_latest, real_latest)
job_results = os.path.join(real_latest, 'job_results.csv')
assert os.path.exists(job_results)
cloud_traces = set()
with open(job_results) as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
cloud_traces.add((row['run_label'], row['trace_url']))
for run_label, cloud_path in cloud_traces:
assert cloud_path.startswith(TRACE_PREFIX)
cloud_path = cloud_path[len(TRACE_PREFIX):]
assert LastAfterSlash(cloud_path) == 'trace.html'
trace_file_name = cloud_path.replace('/', '_')
cloud_path = 'gs://chrome-telemetry-output/' + cloud_path
tmp_dir = EnsureDirectoryExists(os.path.join(working_dir, 'tmp'))
subprocess.check_call(['gsutil.py', 'cp', cloud_path, tmp_dir])
run_label_dir = EnsureDirectoryExists(os.path.join(working_dir, run_label))
full_trace_file_name = os.path.join(run_label_dir, trace_file_name)
os.rename(os.path.join(tmp_dir, 'trace.html'), full_trace_file_name)
return job_results
class LeanEvent(object):
"""Event that holds only necessary information extracted from JSON."""
def __init__(self, trace_event):
self.name = trace_event['name']
self.phase = trace_event['ph']
self.ts = trace_event['ts']
self.tdur = 0
if self.phase == 'X':
self.dur = trace_event['dur']
if 'tdur' in trace_event:
self.tdur = trace_event['tdur']
def LoadJsonFile(json_file, compiled_re):
"""From the JSON file takes only the events that match.
Note: This function loads a typical JSON file very slowly, for several
seconds. Most time is spent in the Python json module.
Args:
json_file: (str) Path to the JSON file to load
compiled_re: A compiled regular expression that selects event by |name| if
the latter matches.
Returns:
A dict with this internal structure:
{ tid: { (int) ts: [event, ...] } },
where:
tid: (int) Thread ID of all the events it maps to.
ts: (int) Trace timestamp (in microseconds) of the beginning of the event.
event: (LeanEvent) One extracted event only containing the fields for
writing to the CSV file, pairing 'begin' and 'end' events into a
single one.
"""
with open(json_file) as f:
events = json.load(f)
tids = {}
tid_to_pid = {}
for event in events['traceEvents']:
assert 'ph' in event
assert 'ts' in event
phase = event['ph']
if not phase in SUPPORTED_PHASES:
continue
assert 'tid' in event
assert 'pid' in event
tid = event['tid']
pid = event['pid']
old_pid = tid_to_pid.setdefault(tid, pid)
assert pid == old_pid, 'Two TIDs match one PID'
event_name = event['name']
if compiled_re.match(event_name):
ts_to_events = tids.setdefault(tid, {})
ts = event['ts']
event_list = ts_to_events.setdefault(ts, [])
event_list.append(LeanEvent(event))
return tids
def ExtractFromOneTraceThrowing(args):
"""Extracts events from a given trace file.
First, converts the trace (fetched in HTML form) by the Catapult
tool 'html2trace' into a number of JSON files (usually 2). The JSON files
left after conversion are cached for later invocations of this function.
With JSON cache the structure of the working directory becomes as follows:
~/.local/share/pinpoint_traces/latest/
├── job_results.csv
├── chromium@4efc7af
│   ├── trace1.html
│   ├── trace2.html
│   └── json_cache
│   ├── trace1
│   │   ├── json
│   │   └── json.1
│   ├── trace2
│   │   ├── json
│   │   ├── json.1
│   │   └── json.2
├── chromium@5707819
│ │
│ ...
...
Since multiple JSON files are produced for a single HTML file, the loaded
contents from the JSON files is merged. The merged representation is then
scanned to find pairs of begin- and end- events to allow calculating their
duration.
Args: (passed as a single tuple to allow easier posting to subprocess.Pool)
label: (str) A label corresponding to the revision of the code, for example,
'chromium@4efc7af'.
trace_index: (int) The number uniquely identifying the trace html that the
JSON was generated from.
html_trace: (str) Path to the original downloaded trace file.
json_dir: (str) A directory where to look for the cached json files
compiled_regex: Compiled regular expression, as in LoadJsonFile().
Returns:
A tuple (label, trace_index, events), where |events| is a tuple:
(event_name, ts, duration, thread_duration) - the fields that will be
written to the final CSV file.
"""
(label, trace_index, html_trace, json_dir, compiled_regex) = args
if not os.path.exists(json_dir):
print 'Converting trace to json: {}'.format(json_dir)
tmp_json_dir = json_dir + '.tmp'
if os.path.exists(tmp_json_dir) and os.path.isdir(tmp_json_dir):
shutil.rmtree(tmp_json_dir)
EnsureDirectoryExists(tmp_json_dir)
json_file = os.path.join(tmp_json_dir, 'json')
output = subprocess.check_output([HTML2TRACE, html_trace, json_file])
for line in output.splitlines():
assert LastAfterSlash(line).startswith('json')
# Rename atomically to make sure that the JSON cache is not consumed
# partially.
os.rename(tmp_json_dir, json_dir)
# Since several json files are produced from the same trace, an event could
# begin and end in different jsons.
# Step 1: Merge events from all traces.
merged_tids = None
print 'Taking trace: {}'.format(html_trace)
for json_file in os.listdir(json_dir):
if not json_file.startswith('json'):
print '{} not starting with json'.format(json_file)
continue
full_json_file = os.path.join(json_dir, json_file)
tids = LoadJsonFile(full_json_file, compiled_regex)
if not merged_tids:
merged_tids = tids
else:
for tid, ts_to_events in tids.iteritems():
merged_ts_to_events = merged_tids.setdefault(tid, {})
for ts, events in ts_to_events.iteritems():
merged_event_list = merged_ts_to_events.setdefault(ts, [])
merged_event_list.extend(events)
# Step 2: Sort by ts and compute event durations.
csv_events = []
for tid, ts_to_events in merged_tids.iteritems():
begin_events = {}
for ts, events in sorted(ts_to_events.iteritems()):
for event in events:
duration = 0
thread_duration = 0
assert ts == event.ts
if event.phase == 'B':
assert not event.name in begin_events
begin_events[event.name] = event
continue
elif event.phase == 'E':
begin_event = begin_events[event.name]
duration = event.ts - begin_event.ts
del begin_events[event.name]
else:
assert event.phase == 'X'
duration = event.dur
thread_duration = event.tdur
csv_events.append((event.name, ts, duration, thread_duration))
return (label, trace_index, csv_events)
def ExtractFromOneTrace(args):
try:
return ExtractFromOneTraceThrowing(args)
except:
# Print the stack trace that otherwise is not visible for this function
# because it is invoked by multiprocessing.Pool.
traceback.print_exc(file=sys.stderr)
raise
def OutputCsv(trace_tuples, writer):
for (label, trace_index, csv_events) in trace_tuples:
for (name, ts, duration, thread_duration) in csv_events:
writer.writerow([label, trace_index, name, ts, duration, thread_duration])
def ExtractEvents(regex, working_dir, csv_path):
"""Extracts all matching events and outputs them into a CSV file.
Finds the trace files by scanning the working directory.
Args:
regex: A regular expression. Selects an event if its name matches.
working_dir: (str) Path to a working directory with structure as explained
in ExtractFromOneTraceThrowing().
csv_path: (str) Name of the output file.
"""
compiled_regex = re.compile(regex)
tasks = []
for run_label in os.listdir(working_dir):
full_dir = os.path.join(working_dir, run_label)
if not os.path.isdir(full_dir):
continue
trace_index = -1 # ID of the html trace, several json files merge into it.
for html in os.listdir(full_dir):
html_trace = os.path.join(full_dir, html)
if not os.path.isfile(html_trace) or not html_trace.endswith('.html'):
continue
trace_index += 1
json_cache = os.path.join(full_dir, 'json_cache')
json_dir = os.path.join(json_cache, html.rstrip('.html'))
tasks.append(
(run_label, trace_index, html_trace, json_dir, compiled_regex))
# Since multiprocessing accumulates all data in memory until the last task in
# the pool has terminated, run it in batches to avoid swapping.
batch_size = multiprocessing.cpu_count()
if sys.platform == 'win32':
# TODO(crbug.com/1190269) - we can't use more than 56
# cores on Windows or Python3 may hang.
batch_size = min(batch_size, 56)
pool = multiprocessing.Pool(batch_size)
with open(csv_path, 'w') as csv_file:
writer = csv.writer(csv_file)
writer.writerow(
['label', 'trace_index', 'name', 'ts', 'duration', 'thread_duration'])
while tasks:
batch = tasks[:batch_size]
tasks = tasks[batch_size:]
OutputCsv(pool.map(ExtractFromOneTrace, batch), writer)
def main():
args = ParseArgs()
if args.command == 'fetch':
print 'Fetching job_results.csv'
csv_file = FetchTraces(LastAfterSlash(args.job))
print 'See job results in {}'.format(csv_file)
elif args.command == 'extract':
top_dir = os.path.expanduser('~/.local/share/pinpoint_traces')
working_dir = os.path.join(top_dir,
os.readlink(os.path.join(top_dir, 'latest')))
assert os.path.exists(working_dir), 'Broken symlink'
ExtractEvents(args.event, working_dir, args.output)
print 'Wrote output: {}'.format(args.output)
return 0
if __name__ == '__main__':
sys.exit(main())