blob: 4e22cd5fb42ed64ff4fcc82e1d2764430ae46208 [file] [log] [blame]
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Implements the interface of the results_processor module.
Provides functions to process intermediate results, and the entry point to
the standalone version of Results Processor.
"""
import json
import logging
import os
import random
import re
from py_utils import cloud_storage
from core.results_processor import command_line
from core.results_processor import compute_metrics
from core.results_processor import formatters
from core.results_processor import util
from tracing.trace_data import trace_data
from tracing.value.diagnostics import generic_set
from tracing.value.diagnostics import reserved_infos
from tracing.value import histogram_set
TELEMETRY_RESULTS = '_telemetry_results.jsonl'
FORMATS_WITH_METRICS = ['csv', 'histograms', 'html']
def ProcessResults(options):
"""Process intermediate results and produce the requested outputs.
This function takes the intermediate results generated by Telemetry after
running benchmarks (including artifacts such as traces, etc.), and processes
them as requested by the result processing options.
Args:
options: An options object with values parsed from the command line and
after any adjustments from ProcessOptions were applied.
"""
if not getattr(options, 'output_formats', None):
return 0
intermediate_results = _LoadIntermediateResults(
os.path.join(options.intermediate_dir, TELEMETRY_RESULTS))
AggregateTraces(intermediate_results)
UploadArtifacts(
intermediate_results, options.upload_bucket, options.results_label)
if any(fmt in FORMATS_WITH_METRICS for fmt in options.output_formats):
histogram_dicts = _ComputeMetrics(intermediate_results,
options.results_label)
for output_format in options.output_formats:
logging.info('Processing format: %s', output_format)
formatter = formatters.FORMATTERS[output_format]
if output_format in FORMATS_WITH_METRICS:
formatter.ProcessHistogramDicts(histogram_dicts, options)
else:
formatter.ProcessIntermediateResults(intermediate_results, options)
return GenerateExitCode(intermediate_results)
def GenerateExitCode(intermediate_results):
"""Generate an exit code as expected by callers.
Returns:
1 if there were failed tests.
-1 if all tests were skipped.
0 otherwise.
"""
if any(r['status'] == 'FAIL' for r in intermediate_results['testResults']):
return 1
if all(r['status'] == 'SKIP' for r in intermediate_results['testResults']):
return -1
return 0
def _LoadIntermediateResults(intermediate_file):
"""Load intermediate results from a file into a single dict."""
results = {'benchmarkRun': {}, 'testResults': []}
with open(intermediate_file) as f:
for line in f:
record = json.loads(line)
if 'benchmarkRun' in record:
results['benchmarkRun'].update(record['benchmarkRun'])
if 'testResult' in record:
test_result = record['testResult']
# TODO(crbug.com/1011813): This is for compatibility with old version
# of LUCI format. Remove it when Telemetry switches to a new version.
if 'artifacts' in test_result:
test_result['outputArtifacts'] = test_result.pop('artifacts')
results['testResults'].append(test_result)
return results
def AggregateTraces(intermediate_results):
"""Replace individual traces with an aggregate one for each test result.
For each test run with traces, generates an aggregate HTML trace. Removes
all entries for individual traces and adds one entry for aggregate one.
"""
for result in intermediate_results['testResults']:
artifacts = result.get('outputArtifacts', {})
traces = [name for name in artifacts if name.startswith('trace/')]
if len(traces) > 0:
if compute_metrics.HTML_TRACE_NAME not in artifacts:
trace_files = [artifacts[name]['filePath'] for name in traces]
html_path = os.path.join(
os.path.dirname(os.path.commonprefix(trace_files)),
compute_metrics.HTML_TRACE_NAME)
trace_data.SerializeAsHtml(trace_files, html_path)
artifacts[compute_metrics.HTML_TRACE_NAME] = {
'filePath': html_path,
'contentType': 'text/html',
}
for trace in traces:
del artifacts[trace]
def _RunIdentifier(results_label, start_time):
"""Construct an identifier for the current script run"""
if results_label:
identifier_parts = [re.sub(r'\W+', '_', results_label)]
else:
identifier_parts = []
# Time is rounded to seconds and delimiters are removed.
# The first 19 chars of the string match 'YYYY-MM-DDTHH:MM:SS'.
identifier_parts.append(re.sub(r'\W+', '', start_time[:19]))
identifier_parts.append(str(random.randint(1, 1e5)))
return '_'.join(identifier_parts)
def UploadArtifacts(intermediate_results, upload_bucket, results_label):
"""Upload all artifacts to cloud.
For each test run, uploads all its artifacts to cloud and sets remoteUrl
fields in intermediate_results.
"""
if upload_bucket is None:
return
run_identifier = _RunIdentifier(
results_label, intermediate_results['benchmarkRun']['startTime'])
work_list = []
for result in intermediate_results['testResults']:
artifacts = result.get('outputArtifacts', {})
for name, artifact in artifacts.iteritems():
if 'remoteUrl' in artifact:
continue
# TODO(crbug.com/981349): Remove this check after Telemetry does not
# save histograms as an artifact anymore.
if name == compute_metrics.HISTOGRAM_DICTS_FILE:
continue
remote_name = '/'.join([run_identifier, result['testPath'], name])
work_list.append((artifact, remote_name))
def PoolUploader(work_item):
artifact, remote_name = work_item
artifact['remoteUrl'] = cloud_storage.Insert(
upload_bucket, remote_name, artifact['filePath'])
for _ in util.ApplyInParallel(PoolUploader, work_list):
pass
for result in intermediate_results['testResults']:
artifacts = result.get('outputArtifacts', {})
for name, artifact in artifacts.iteritems():
logging.info('Uploaded %s of %s to %s', name, result['testPath'],
artifact['remoteUrl'])
def _ComputeMetrics(intermediate_results, results_label):
histogram_dicts = compute_metrics.ComputeTBMv2Metrics(intermediate_results)
histogram_dicts = AddDiagnosticsToHistograms(
histogram_dicts, intermediate_results, results_label)
return histogram_dicts
def AddDiagnosticsToHistograms(histogram_dicts, intermediate_results,
results_label):
"""Add diagnostics to histogram dicts"""
histograms = histogram_set.HistogramSet()
histograms.ImportDicts(histogram_dicts)
diagnostics = intermediate_results['benchmarkRun'].get('diagnostics', {})
for name, diag in diagnostics.items():
# For now, we only support GenericSet diagnostics that are serialized
# as lists of values.
assert isinstance(diag, list)
histograms.AddSharedDiagnosticToAllHistograms(
name, generic_set.GenericSet(diag))
if results_label is not None:
histograms.AddSharedDiagnosticToAllHistograms(
reserved_infos.LABELS.name,
generic_set.GenericSet([results_label]))
return histograms.AsDicts()
def main(args=None):
"""Entry point for the standalone version of the results_processor script."""
parser = command_line.ArgumentParser(standalone=True)
options = parser.parse_args(args)
command_line.ProcessOptions(options, standalone=True)
return ProcessResults(options)