| # Copyright 2019 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Implements the interface of the results_processor module. |
| |
| Provides functions to process intermediate results, and the entry point to |
| the standalone version of Results Processor. |
| """ |
| |
| import json |
| import logging |
| import os |
| import random |
| import re |
| |
| from py_utils import cloud_storage |
| from core.results_processor import command_line |
| from core.results_processor import compute_metrics |
| from core.results_processor import formatters |
| from core.results_processor import util |
| |
| from tracing.trace_data import trace_data |
| from tracing.value.diagnostics import date_range |
| from tracing.value.diagnostics import generic_set |
| from tracing.value.diagnostics import reserved_infos |
| from tracing.value import histogram |
| from tracing.value import histogram_set |
| from tracing.value import legacy_unit_info |
| |
| TELEMETRY_RESULTS = '_telemetry_results.jsonl' |
| MEASUREMENTS_NAME = 'measurements.json' |
| |
| FORMATS_WITH_METRICS = ['csv', 'histograms', 'html'] |
| |
| |
| def ProcessResults(options): |
| """Process intermediate results and produce the requested outputs. |
| |
| This function takes the intermediate results generated by Telemetry after |
| running benchmarks (including artifacts such as traces, etc.), and processes |
| them as requested by the result processing options. |
| |
| Args: |
| options: An options object with values parsed from the command line and |
| after any adjustments from ProcessOptions were applied. |
| """ |
| if not getattr(options, 'output_formats', None): |
| return 0 |
| |
| intermediate_results = _LoadIntermediateResults( |
| os.path.join(options.intermediate_dir, TELEMETRY_RESULTS)) |
| |
| AggregateTraces(intermediate_results) |
| |
| UploadArtifacts( |
| intermediate_results, options.upload_bucket, options.results_label) |
| |
| if any(fmt in FORMATS_WITH_METRICS for fmt in options.output_formats): |
| histogram_dicts = _ComputeMetrics(intermediate_results, |
| options.results_label) |
| |
| for output_format in options.output_formats: |
| logging.info('Processing format: %s', output_format) |
| formatter = formatters.FORMATTERS[output_format] |
| if output_format in FORMATS_WITH_METRICS: |
| formatter.ProcessHistogramDicts(histogram_dicts, options) |
| else: |
| formatter.ProcessIntermediateResults(intermediate_results, options) |
| |
| return GenerateExitCode(intermediate_results) |
| |
| |
| def GenerateExitCode(intermediate_results): |
| """Generate an exit code as expected by callers. |
| |
| Returns: |
| 1 if there were failed tests. |
| -1 if all tests were skipped. |
| 0 otherwise. |
| """ |
| if any(r['status'] == 'FAIL' for r in intermediate_results['testResults']): |
| return 1 |
| if all(r['status'] == 'SKIP' for r in intermediate_results['testResults']): |
| return -1 |
| return 0 |
| |
| |
| def _LoadIntermediateResults(intermediate_file): |
| """Load intermediate results from a file into a single dict.""" |
| results = {'benchmarkRun': {}, 'testResults': []} |
| with open(intermediate_file) as f: |
| for line in f: |
| record = json.loads(line) |
| if 'benchmarkRun' in record: |
| results['benchmarkRun'].update(record['benchmarkRun']) |
| if 'testResult' in record: |
| test_result = record['testResult'] |
| results['testResults'].append(test_result) |
| return results |
| |
| |
| def _AggregateTraceWorker(artifacts): |
| traces = [name for name in artifacts if name.startswith('trace/')] |
| trace_files = [artifacts.pop(name)['filePath'] for name in traces] |
| html_path = os.path.join( |
| os.path.dirname(os.path.commonprefix(trace_files)), |
| compute_metrics.HTML_TRACE_NAME) |
| trace_data.SerializeAsHtml(trace_files, html_path) |
| artifacts[compute_metrics.HTML_TRACE_NAME] = { |
| 'filePath': html_path, |
| 'contentType': 'text/html', |
| } |
| |
| |
| def AggregateTraces(intermediate_results): |
| """Replace individual traces with an aggregate one for each test result. |
| |
| For each test run with traces, generates an aggregate HTML trace. Removes |
| all entries for individual traces and adds one entry for aggregate one. |
| """ |
| work_list = [] |
| for result in intermediate_results['testResults']: |
| artifacts = result.get('outputArtifacts', {}) |
| # TODO(crbug.com/981349): Stop checking for HTML_TRACE_NAME after |
| # Telemetry does not aggregate traces anymore. |
| if (any(name.startswith('trace/') for name in artifacts) and |
| compute_metrics.HTML_TRACE_NAME not in artifacts): |
| work_list.append(artifacts) |
| |
| if work_list: |
| for _ in util.ApplyInParallel(_AggregateTraceWorker, work_list): |
| pass |
| |
| # TODO(crbug.com/981349): This is to clean up traces that have been |
| # aggregated by Telemetry. Remove this after Telemetry no longer does this. |
| for result in intermediate_results['testResults']: |
| artifacts = result.get('outputArtifacts', {}) |
| for name in artifacts.keys(): |
| if name.startswith('trace/'): |
| del artifacts[name] |
| |
| |
| def _RunIdentifier(results_label, start_time): |
| """Construct an identifier for the current script run""" |
| if results_label: |
| identifier_parts = [re.sub(r'\W+', '_', results_label)] |
| else: |
| identifier_parts = [] |
| # Time is rounded to seconds and delimiters are removed. |
| # The first 19 chars of the string match 'YYYY-MM-DDTHH:MM:SS'. |
| identifier_parts.append(re.sub(r'\W+', '', start_time[:19])) |
| identifier_parts.append(str(random.randint(1, 1e5))) |
| return '_'.join(identifier_parts) |
| |
| |
| def UploadArtifacts(intermediate_results, upload_bucket, results_label): |
| """Upload all artifacts to cloud. |
| |
| For each test run, uploads all its artifacts to cloud and sets remoteUrl |
| fields in intermediate_results. |
| """ |
| if upload_bucket is None: |
| return |
| |
| run_identifier = _RunIdentifier( |
| results_label, intermediate_results['benchmarkRun']['startTime']) |
| work_list = [] |
| |
| for result in intermediate_results['testResults']: |
| artifacts = result.get('outputArtifacts', {}) |
| for name, artifact in artifacts.iteritems(): |
| if 'remoteUrl' in artifact: |
| continue |
| # TODO(crbug.com/981349): Remove this check after Telemetry does not |
| # save histograms as an artifact anymore. |
| if name == compute_metrics.HISTOGRAM_DICTS_FILE: |
| continue |
| remote_name = '/'.join([run_identifier, result['testPath'], name]) |
| work_list.append((artifact, remote_name)) |
| |
| def PoolUploader(work_item): |
| artifact, remote_name = work_item |
| artifact['remoteUrl'] = cloud_storage.Insert( |
| upload_bucket, remote_name, artifact['filePath']) |
| |
| for _ in util.ApplyInParallel(PoolUploader, work_list): |
| pass |
| |
| for result in intermediate_results['testResults']: |
| artifacts = result.get('outputArtifacts', {}) |
| for name, artifact in artifacts.iteritems(): |
| logging.info('Uploaded %s of %s to %s', name, result['testPath'], |
| artifact['remoteUrl']) |
| |
| |
| def _ComputeMetrics(intermediate_results, results_label): |
| histogram_dicts = compute_metrics.ComputeTBMv2Metrics(intermediate_results) |
| histogram_dicts += ExtractMeasurements(intermediate_results) |
| histogram_dicts = AddDiagnosticsToHistograms( |
| histogram_dicts, intermediate_results, results_label) |
| return histogram_dicts |
| |
| |
| def AddDiagnosticsToHistograms(histogram_dicts, intermediate_results, |
| results_label): |
| """Add diagnostics to histogram dicts""" |
| histograms = histogram_set.HistogramSet() |
| histograms.ImportDicts(histogram_dicts) |
| diagnostics = intermediate_results['benchmarkRun'].get('diagnostics', {}) |
| for name, diag in diagnostics.items(): |
| # For now, we only support GenericSet diagnostics that are serialized |
| # as lists of values. |
| assert isinstance(diag, list) |
| histograms.AddSharedDiagnosticToAllHistograms( |
| name, generic_set.GenericSet(diag)) |
| |
| if results_label is not None: |
| histograms.AddSharedDiagnosticToAllHistograms( |
| reserved_infos.LABELS.name, |
| generic_set.GenericSet([results_label])) |
| |
| histograms.DeduplicateDiagnostics() |
| return histograms.AsDicts() |
| |
| |
| def MeasurementToHistogram(name, measurement): |
| unit = measurement['unit'] |
| samples = measurement['samples'] |
| description = measurement.get('description') |
| if unit in legacy_unit_info.LEGACY_UNIT_INFO: |
| info = legacy_unit_info.LEGACY_UNIT_INFO[unit] |
| unit = info.name |
| samples = [s * info.conversion_factor for s in samples] |
| if unit not in histogram.UNIT_NAMES: |
| raise ValueError('Unknown unit: %s' % unit) |
| return histogram.Histogram.Create(name, unit, samples, |
| description=description) |
| |
| |
| def _GlobalDiagnostics(benchmark_run): |
| """Extract diagnostics information about the whole benchmark run. |
| |
| These diagnostics will be added to ad-hoc measurements recorded by |
| benchmarks. |
| """ |
| timestamp_ms = util.IsoTimestampToEpoch(benchmark_run['startTime']) * 1e3 |
| return { |
| reserved_infos.BENCHMARK_START.name: date_range.DateRange(timestamp_ms), |
| } |
| |
| |
| def _StoryDiagnostics(test_result): |
| """Extract diagnostics information about the specific story. |
| |
| These diagnostics will be added to ad-hoc measurements recorded by |
| benchmarks. |
| """ |
| benchmark_name, story_name = test_result['testPath'].split('/', 1) |
| story_tags = [tag['value'] for tag in test_result.get('tags', []) |
| if tag['key'] == 'story_tag'] |
| return { |
| reserved_infos.BENCHMARKS.name: generic_set.GenericSet([benchmark_name]), |
| reserved_infos.STORIES.name: generic_set.GenericSet([story_name]), |
| reserved_infos.STORY_TAGS.name: generic_set.GenericSet(story_tags), |
| } |
| |
| |
| def ExtractMeasurements(intermediate_results): |
| """Add ad-hoc measurements to histogram dicts""" |
| histograms = histogram_set.HistogramSet() |
| global_diagnostics = _GlobalDiagnostics(intermediate_results['benchmarkRun']) |
| |
| for result in intermediate_results['testResults']: |
| artifacts = result.get('outputArtifacts', {}) |
| if MEASUREMENTS_NAME in artifacts: |
| with open(artifacts[MEASUREMENTS_NAME]['filePath']) as f: |
| measurements = json.load(f)['measurements'] |
| diagnostics = global_diagnostics.copy() |
| diagnostics.update(_StoryDiagnostics(result)) |
| for name, measurement in measurements.iteritems(): |
| histograms.AddHistogram(MeasurementToHistogram(name, measurement), |
| diagnostics=diagnostics) |
| |
| return histograms.AsDicts() |
| |
| |
| def main(args=None): |
| """Entry point for the standalone version of the results_processor script.""" |
| parser = command_line.ArgumentParser(standalone=True) |
| options = parser.parse_args(args) |
| command_line.ProcessOptions(options, standalone=True) |
| return ProcessResults(options) |