blob: 52873ddfb6d80f06fe2c37185ab48e2fc62a09b5 [file] [log] [blame]
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Implements the interface of the results_processor module.
Provides functions to process intermediate results, and the entry point to
the standalone version of Results Processor.
"""
import json
import logging
import os
import random
import re
from py_utils import cloud_storage
from core.results_processor import command_line
from core.results_processor import compute_metrics
from core.results_processor import formatters
from core.results_processor import util
from tracing.trace_data import trace_data
from tracing.value.diagnostics import date_range
from tracing.value.diagnostics import generic_set
from tracing.value.diagnostics import reserved_infos
from tracing.value import histogram
from tracing.value import histogram_set
from tracing.value import legacy_unit_info
TELEMETRY_RESULTS = '_telemetry_results.jsonl'
MEASUREMENTS_NAME = 'measurements.json'
FORMATS_WITH_METRICS = ['csv', 'histograms', 'html']
def ProcessResults(options):
"""Process intermediate results and produce the requested outputs.
This function takes the intermediate results generated by Telemetry after
running benchmarks (including artifacts such as traces, etc.), and processes
them as requested by the result processing options.
Args:
options: An options object with values parsed from the command line and
after any adjustments from ProcessOptions were applied.
"""
if not getattr(options, 'output_formats', None):
return 0
intermediate_results = _LoadIntermediateResults(
os.path.join(options.intermediate_dir, TELEMETRY_RESULTS))
AggregateTraces(intermediate_results)
UploadArtifacts(
intermediate_results, options.upload_bucket, options.results_label)
if any(fmt in FORMATS_WITH_METRICS for fmt in options.output_formats):
histogram_dicts = _ComputeMetrics(intermediate_results,
options.results_label)
for output_format in options.output_formats:
logging.info('Processing format: %s', output_format)
formatter = formatters.FORMATTERS[output_format]
if output_format in FORMATS_WITH_METRICS:
formatter.ProcessHistogramDicts(histogram_dicts, options)
else:
formatter.ProcessIntermediateResults(intermediate_results, options)
return GenerateExitCode(intermediate_results)
def GenerateExitCode(intermediate_results):
"""Generate an exit code as expected by callers.
Returns:
1 if there were failed tests.
-1 if all tests were skipped.
0 otherwise.
"""
if any(r['status'] == 'FAIL' for r in intermediate_results['testResults']):
return 1
if all(r['status'] == 'SKIP' for r in intermediate_results['testResults']):
return -1
return 0
def _LoadIntermediateResults(intermediate_file):
"""Load intermediate results from a file into a single dict."""
results = {'benchmarkRun': {}, 'testResults': []}
with open(intermediate_file) as f:
for line in f:
record = json.loads(line)
if 'benchmarkRun' in record:
results['benchmarkRun'].update(record['benchmarkRun'])
if 'testResult' in record:
test_result = record['testResult']
results['testResults'].append(test_result)
return results
def _AggregateTraceWorker(artifacts):
traces = [name for name in artifacts if name.startswith('trace/')]
trace_files = [artifacts.pop(name)['filePath'] for name in traces]
html_path = os.path.join(
os.path.dirname(os.path.commonprefix(trace_files)),
compute_metrics.HTML_TRACE_NAME)
trace_data.SerializeAsHtml(trace_files, html_path)
artifacts[compute_metrics.HTML_TRACE_NAME] = {
'filePath': html_path,
'contentType': 'text/html',
}
def AggregateTraces(intermediate_results):
"""Replace individual traces with an aggregate one for each test result.
For each test run with traces, generates an aggregate HTML trace. Removes
all entries for individual traces and adds one entry for aggregate one.
"""
work_list = []
for result in intermediate_results['testResults']:
artifacts = result.get('outputArtifacts', {})
# TODO(crbug.com/981349): Stop checking for HTML_TRACE_NAME after
# Telemetry does not aggregate traces anymore.
if (any(name.startswith('trace/') for name in artifacts) and
compute_metrics.HTML_TRACE_NAME not in artifacts):
work_list.append(artifacts)
if work_list:
for _ in util.ApplyInParallel(_AggregateTraceWorker, work_list):
pass
# TODO(crbug.com/981349): This is to clean up traces that have been
# aggregated by Telemetry. Remove this after Telemetry no longer does this.
for result in intermediate_results['testResults']:
artifacts = result.get('outputArtifacts', {})
for name in artifacts.keys():
if name.startswith('trace/'):
del artifacts[name]
def _RunIdentifier(results_label, start_time):
"""Construct an identifier for the current script run"""
if results_label:
identifier_parts = [re.sub(r'\W+', '_', results_label)]
else:
identifier_parts = []
# Time is rounded to seconds and delimiters are removed.
# The first 19 chars of the string match 'YYYY-MM-DDTHH:MM:SS'.
identifier_parts.append(re.sub(r'\W+', '', start_time[:19]))
identifier_parts.append(str(random.randint(1, 1e5)))
return '_'.join(identifier_parts)
def UploadArtifacts(intermediate_results, upload_bucket, results_label):
"""Upload all artifacts to cloud.
For each test run, uploads all its artifacts to cloud and sets remoteUrl
fields in intermediate_results.
"""
if upload_bucket is None:
return
run_identifier = _RunIdentifier(
results_label, intermediate_results['benchmarkRun']['startTime'])
work_list = []
for result in intermediate_results['testResults']:
artifacts = result.get('outputArtifacts', {})
for name, artifact in artifacts.iteritems():
if 'remoteUrl' in artifact:
continue
# TODO(crbug.com/981349): Remove this check after Telemetry does not
# save histograms as an artifact anymore.
if name == compute_metrics.HISTOGRAM_DICTS_FILE:
continue
remote_name = '/'.join([run_identifier, result['testPath'], name])
work_list.append((artifact, remote_name))
def PoolUploader(work_item):
artifact, remote_name = work_item
artifact['remoteUrl'] = cloud_storage.Insert(
upload_bucket, remote_name, artifact['filePath'])
for _ in util.ApplyInParallel(PoolUploader, work_list):
pass
for result in intermediate_results['testResults']:
artifacts = result.get('outputArtifacts', {})
for name, artifact in artifacts.iteritems():
logging.info('Uploaded %s of %s to %s', name, result['testPath'],
artifact['remoteUrl'])
def _ComputeMetrics(intermediate_results, results_label):
histogram_dicts = compute_metrics.ComputeTBMv2Metrics(intermediate_results)
histogram_dicts += ExtractMeasurements(intermediate_results)
histogram_dicts = AddDiagnosticsToHistograms(
histogram_dicts, intermediate_results, results_label)
return histogram_dicts
def AddDiagnosticsToHistograms(histogram_dicts, intermediate_results,
results_label):
"""Add diagnostics to histogram dicts"""
histograms = histogram_set.HistogramSet()
histograms.ImportDicts(histogram_dicts)
diagnostics = intermediate_results['benchmarkRun'].get('diagnostics', {})
for name, diag in diagnostics.items():
# For now, we only support GenericSet diagnostics that are serialized
# as lists of values.
assert isinstance(diag, list)
histograms.AddSharedDiagnosticToAllHistograms(
name, generic_set.GenericSet(diag))
if results_label is not None:
histograms.AddSharedDiagnosticToAllHistograms(
reserved_infos.LABELS.name,
generic_set.GenericSet([results_label]))
histograms.DeduplicateDiagnostics()
return histograms.AsDicts()
def MeasurementToHistogram(name, measurement):
unit = measurement['unit']
samples = measurement['samples']
description = measurement.get('description')
if unit in legacy_unit_info.LEGACY_UNIT_INFO:
info = legacy_unit_info.LEGACY_UNIT_INFO[unit]
unit = info.name
samples = [s * info.conversion_factor for s in samples]
if unit not in histogram.UNIT_NAMES:
raise ValueError('Unknown unit: %s' % unit)
return histogram.Histogram.Create(name, unit, samples,
description=description)
def _GlobalDiagnostics(benchmark_run):
"""Extract diagnostics information about the whole benchmark run.
These diagnostics will be added to ad-hoc measurements recorded by
benchmarks.
"""
timestamp_ms = util.IsoTimestampToEpoch(benchmark_run['startTime']) * 1e3
return {
reserved_infos.BENCHMARK_START.name: date_range.DateRange(timestamp_ms),
}
def _StoryDiagnostics(test_result):
"""Extract diagnostics information about the specific story.
These diagnostics will be added to ad-hoc measurements recorded by
benchmarks.
"""
benchmark_name, story_name = test_result['testPath'].split('/', 1)
story_tags = [tag['value'] for tag in test_result.get('tags', [])
if tag['key'] == 'story_tag']
return {
reserved_infos.BENCHMARKS.name: generic_set.GenericSet([benchmark_name]),
reserved_infos.STORIES.name: generic_set.GenericSet([story_name]),
reserved_infos.STORY_TAGS.name: generic_set.GenericSet(story_tags),
}
def ExtractMeasurements(intermediate_results):
"""Add ad-hoc measurements to histogram dicts"""
histograms = histogram_set.HistogramSet()
global_diagnostics = _GlobalDiagnostics(intermediate_results['benchmarkRun'])
for result in intermediate_results['testResults']:
artifacts = result.get('outputArtifacts', {})
if MEASUREMENTS_NAME in artifacts:
with open(artifacts[MEASUREMENTS_NAME]['filePath']) as f:
measurements = json.load(f)['measurements']
diagnostics = global_diagnostics.copy()
diagnostics.update(_StoryDiagnostics(result))
for name, measurement in measurements.iteritems():
histograms.AddHistogram(MeasurementToHistogram(name, measurement),
diagnostics=diagnostics)
return histograms.AsDicts()
def main(args=None):
"""Entry point for the standalone version of the results_processor script."""
parser = command_line.ArgumentParser(standalone=True)
options = parser.parse_args(args)
command_line.ProcessOptions(options, standalone=True)
return ProcessResults(options)