| # Copyright 2019 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Implements the interface of the results_processor module. |
| |
| Provides functions to process intermediate results, and the entry point to |
| the standalone version of Results Processor. |
| """ |
| |
| import json |
| import logging |
| import os |
| import random |
| import re |
| |
| from py_utils import cloud_storage |
| from core.results_processor import command_line |
| from core.results_processor import compute_metrics |
| from core.results_processor import formatters |
| from core.results_processor import util |
| |
| from tracing.trace_data import trace_data |
| from tracing.value.diagnostics import generic_set |
| from tracing.value.diagnostics import reserved_infos |
| from tracing.value import histogram_set |
| |
| TELEMETRY_RESULTS = '_telemetry_results.jsonl' |
| |
| FORMATS_WITH_METRICS = ['csv', 'histograms', 'html'] |
| |
| |
| def ProcessResults(options): |
| """Process intermediate results and produce the requested outputs. |
| |
| This function takes the intermediate results generated by Telemetry after |
| running benchmarks (including artifacts such as traces, etc.), and processes |
| them as requested by the result processing options. |
| |
| Args: |
| options: An options object with values parsed from the command line and |
| after any adjustments from ProcessOptions were applied. |
| """ |
| if not getattr(options, 'output_formats', None): |
| return 0 |
| |
| intermediate_results = _LoadIntermediateResults( |
| os.path.join(options.intermediate_dir, TELEMETRY_RESULTS)) |
| |
| AggregateTraces(intermediate_results) |
| |
| UploadArtifacts( |
| intermediate_results, options.upload_bucket, options.results_label) |
| |
| if any(fmt in FORMATS_WITH_METRICS for fmt in options.output_formats): |
| histogram_dicts = _ComputeMetrics(intermediate_results, |
| options.results_label) |
| |
| for output_format in options.output_formats: |
| logging.info('Processing format: %s', output_format) |
| formatter = formatters.FORMATTERS[output_format] |
| if output_format in FORMATS_WITH_METRICS: |
| formatter.ProcessHistogramDicts(histogram_dicts, options) |
| else: |
| formatter.ProcessIntermediateResults(intermediate_results, options) |
| |
| return GenerateExitCode(intermediate_results) |
| |
| |
| def GenerateExitCode(intermediate_results): |
| """Generate an exit code as expected by callers. |
| |
| Returns: |
| 1 if there were failed tests. |
| -1 if all tests were skipped. |
| 0 otherwise. |
| """ |
| if any(r['status'] == 'FAIL' for r in intermediate_results['testResults']): |
| return 1 |
| if all(r['status'] == 'SKIP' for r in intermediate_results['testResults']): |
| return -1 |
| return 0 |
| |
| |
| def _LoadIntermediateResults(intermediate_file): |
| """Load intermediate results from a file into a single dict.""" |
| results = {'benchmarkRun': {}, 'testResults': []} |
| with open(intermediate_file) as f: |
| for line in f: |
| record = json.loads(line) |
| if 'benchmarkRun' in record: |
| results['benchmarkRun'].update(record['benchmarkRun']) |
| if 'testResult' in record: |
| test_result = record['testResult'] |
| # TODO(crbug.com/1011813): This is for compatibility with old version |
| # of LUCI format. Remove it when Telemetry switches to a new version. |
| if 'artifacts' in test_result: |
| test_result['outputArtifacts'] = test_result.pop('artifacts') |
| results['testResults'].append(test_result) |
| return results |
| |
| |
| def AggregateTraces(intermediate_results): |
| """Replace individual traces with an aggregate one for each test result. |
| |
| For each test run with traces, generates an aggregate HTML trace. Removes |
| all entries for individual traces and adds one entry for aggregate one. |
| """ |
| for result in intermediate_results['testResults']: |
| artifacts = result.get('outputArtifacts', {}) |
| traces = [name for name in artifacts if name.startswith('trace/')] |
| if len(traces) > 0: |
| if compute_metrics.HTML_TRACE_NAME not in artifacts: |
| trace_files = [artifacts[name]['filePath'] for name in traces] |
| html_path = os.path.join( |
| os.path.dirname(os.path.commonprefix(trace_files)), |
| compute_metrics.HTML_TRACE_NAME) |
| trace_data.SerializeAsHtml(trace_files, html_path) |
| artifacts[compute_metrics.HTML_TRACE_NAME] = { |
| 'filePath': html_path, |
| 'contentType': 'text/html', |
| } |
| for trace in traces: |
| del artifacts[trace] |
| |
| |
| def _RunIdentifier(results_label, start_time): |
| """Construct an identifier for the current script run""" |
| if results_label: |
| identifier_parts = [re.sub(r'\W+', '_', results_label)] |
| else: |
| identifier_parts = [] |
| # Time is rounded to seconds and delimiters are removed. |
| # The first 19 chars of the string match 'YYYY-MM-DDTHH:MM:SS'. |
| identifier_parts.append(re.sub(r'\W+', '', start_time[:19])) |
| identifier_parts.append(str(random.randint(1, 1e5))) |
| return '_'.join(identifier_parts) |
| |
| |
| def UploadArtifacts(intermediate_results, upload_bucket, results_label): |
| """Upload all artifacts to cloud. |
| |
| For each test run, uploads all its artifacts to cloud and sets remoteUrl |
| fields in intermediate_results. |
| """ |
| if upload_bucket is None: |
| return |
| |
| run_identifier = _RunIdentifier( |
| results_label, intermediate_results['benchmarkRun']['startTime']) |
| work_list = [] |
| |
| for result in intermediate_results['testResults']: |
| artifacts = result.get('outputArtifacts', {}) |
| for name, artifact in artifacts.iteritems(): |
| if 'remoteUrl' in artifact: |
| continue |
| # TODO(crbug.com/981349): Remove this check after Telemetry does not |
| # save histograms as an artifact anymore. |
| if name == compute_metrics.HISTOGRAM_DICTS_FILE: |
| continue |
| remote_name = '/'.join([run_identifier, result['testPath'], name]) |
| work_list.append((artifact, remote_name)) |
| |
| def PoolUploader(work_item): |
| artifact, remote_name = work_item |
| artifact['remoteUrl'] = cloud_storage.Insert( |
| upload_bucket, remote_name, artifact['filePath']) |
| |
| for _ in util.ApplyInParallel(PoolUploader, work_list): |
| pass |
| |
| for result in intermediate_results['testResults']: |
| artifacts = result.get('outputArtifacts', {}) |
| for name, artifact in artifacts.iteritems(): |
| logging.info('Uploaded %s of %s to %s', name, result['testPath'], |
| artifact['remoteUrl']) |
| |
| |
| def _ComputeMetrics(intermediate_results, results_label): |
| histogram_dicts = compute_metrics.ComputeTBMv2Metrics(intermediate_results) |
| histogram_dicts = AddDiagnosticsToHistograms( |
| histogram_dicts, intermediate_results, results_label) |
| return histogram_dicts |
| |
| |
| def AddDiagnosticsToHistograms(histogram_dicts, intermediate_results, |
| results_label): |
| """Add diagnostics to histogram dicts""" |
| histograms = histogram_set.HistogramSet() |
| histograms.ImportDicts(histogram_dicts) |
| diagnostics = intermediate_results['benchmarkRun'].get('diagnostics', {}) |
| for name, diag in diagnostics.items(): |
| # For now, we only support GenericSet diagnostics that are serialized |
| # as lists of values. |
| assert isinstance(diag, list) |
| histograms.AddSharedDiagnosticToAllHistograms( |
| name, generic_set.GenericSet(diag)) |
| |
| if results_label is not None: |
| histograms.AddSharedDiagnosticToAllHistograms( |
| reserved_infos.LABELS.name, |
| generic_set.GenericSet([results_label])) |
| |
| return histograms.AsDicts() |
| |
| |
| def main(args=None): |
| """Entry point for the standalone version of the results_processor script.""" |
| parser = command_line.ArgumentParser(standalone=True) |
| options = parser.parse_args(args) |
| command_line.ProcessOptions(options, standalone=True) |
| return ProcessResults(options) |