| # -*- coding: utf-8 -*- |
| # Copyright 2019 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """Internal helpers for reporting test status to stdout.""" |
| |
| from __future__ import annotations |
| |
| import collections |
| import datetime |
| import logging |
| import os |
| import sys |
| from typing import TYPE_CHECKING |
| |
| from collections import defaultdict |
| from io import StringIO |
| from itertools import groupby |
| |
| import attr |
| import coverage |
| |
| from .fail_tracker import FailTracker |
| from ...warn.cause import CallSite, ImportSite |
| |
| if TYPE_CHECKING: |
| import PB.recipe_engine.internal.test.runner as runner_pb2 |
| |
| |
| @attr.s |
| class Reporter: |
| _recipe_deps = attr.ib() |
| |
| _use_emoji = attr.ib() |
| _is_train = attr.ib() |
| _fail_tracker = attr.ib() |
| # If set, will print warning details (even if there are other fatal failures) |
| _enable_warning_details = attr.ib() |
| # If set, will print duration details (even if there are fatal failures). |
| _enable_duration_details = attr.ib() |
| |
| _column_count = attr.ib(default=0) |
| _error_buf = attr.ib(factory=StringIO) |
| |
| _start_time = attr.ib(factory=datetime.datetime.now) |
| |
| # default to 80 cols if we're outputting to not a tty. Otherwise, set this to |
| # -1 to allow the terminal to do all wrapping. |
| # |
| # This allows nice presentation on the bots (i.e. 80 columns), while also |
| # allowing full-width display with correct wrapping on terminals/cmd.exe. |
| _column_max = attr.ib() |
| @_column_max.default |
| def _column_max_default(self): |
| # 1 == stdout |
| return -1 if os.isatty(1) else 80 |
| |
| _verbose = attr.ib() |
| @_verbose.default |
| def _verbose_default(self): |
| return logging.getLogger().level < logging.WARNING |
| |
| def _space_for_columns(self, item_columns): |
| """Preemptively ensures we have space to print something which takes |
| `item_columns` space. |
| |
| Increments self._column_count as a side effect. |
| """ |
| if self._column_max == -1: |
| # output is a tty, let it do the wrapping. |
| return |
| |
| self._column_count += item_columns |
| if self._column_count > self._column_max: |
| self._column_count = 0 |
| print() |
| |
| def short_report(self, outcome_msg, can_abort=True): |
| """Prints all test results from `outcome_msg` to stdout. |
| |
| Detailed error messages (if any) will be accumulated in this reporter. |
| |
| NOTE: Will report and then raise SystemExit if the outcome_msg contains an |
| 'internal_error', as this indicates that the test harness is in an invalid |
| state. |
| |
| Args: |
| |
| * outcome_msg (Outcome proto) - The message to report. |
| * can_abort(bool) - True by default. Check whether the program should |
| abort for a global failure found in test results. |
| |
| Returns: |
| * bool - If test results have any failure. |
| |
| Raises SystemExit if outcome_msg has an internal_error. |
| """ |
| # Global error; this means something in the actual test execution code went |
| # wrong. |
| if outcome_msg.internal_error: |
| if can_abort: |
| # This is pretty bad. |
| print('ABORT ABORT ABORT') |
| print('Global failure(s):') |
| for failure in outcome_msg.internal_error: |
| print(' ', failure) |
| if can_abort: |
| sys.exit(1) |
| return True |
| |
| has_fail = False |
| for test_name, test_result in outcome_msg.test_results.items(): |
| _print_summary_info( |
| self._recipe_deps, self._verbose, self._use_emoji, test_name, |
| test_result, self._space_for_columns) |
| |
| _print_detail_info(self._error_buf, test_name, test_result) |
| |
| has_fail = self._fail_tracker.cache_recent_fails(test_name, |
| test_result) or has_fail |
| |
| return has_fail |
| |
| |
| def final_report(self, cov, outcome_msg): |
| """Prints all final information about the test run to stdout. |
| Raises SystemExit if the tests have failed. |
| |
| Args: |
| |
| * cov (coverage.Coverage|None) - The accumulated coverage data to report. |
| If None, then no coverage analysis/report will be done. Coverage less |
| than 100% counts as a test failure. |
| * outcome_msg (Outcome proto) - |
| Consulted for uncovered_modules and unused_expectation_files. |
| coverage_percent is also populated as a side effect. |
| Any uncovered_modules/unused_expectation_files count as a test failure. |
| |
| Side-effects: Populates outcome_msg.coverage_percent. |
| |
| Raises SystemExit if the tests failed. |
| """ |
| self._fail_tracker.cleanup() |
| |
| fail = False |
| if self._error_buf.tell() > 0: |
| fail = True |
| sys.stdout.write( |
| 'Errors in %s\n' % (self._error_buf.getvalue())) |
| |
| # For some integration tests we have repos which don't actually have any |
| # recipe files at all. We skip coverage measurement if cov has no data. |
| if cov and cov.get_data().measured_files(): |
| covf = StringIO() |
| pct = 0 |
| try: |
| pct = cov.report(file=covf, show_missing=True, skip_covered=True) |
| outcome_msg.coverage_percent = pct |
| except coverage.CoverageException as ex: |
| print('%s: %s' % (ex.__class__.__name__, ex)) |
| if int(pct) != 100: |
| fail = True |
| print(covf.getvalue()) |
| print('FATAL: Insufficient total coverage (%.2f%%)' % pct) |
| print() |
| |
| duration = (datetime.datetime.now() - self._start_time).total_seconds() |
| print('-' * 70) |
| print('Ran %d tests in %0.3fs' % (len(outcome_msg.test_results), |
| duration)) |
| print() |
| |
| if outcome_msg.uncovered_modules: |
| fail = True |
| print('------') |
| print('ERROR: The following modules lack any form of test coverage:') |
| for modname in outcome_msg.uncovered_modules: |
| print(' ', modname) |
| print() |
| print('Please add test recipes for them (e.g. recipes in the module\'s') |
| print('"tests" subdirectory).') |
| print() |
| |
| if outcome_msg.unused_expectation_files: |
| fail = True |
| print('------') |
| print('ERROR: The below expectation files have no associated test case:') |
| for expect_file in outcome_msg.unused_expectation_files: |
| print(' ', expect_file) |
| print() |
| |
| warning_result = _collect_warning_result(outcome_msg) |
| if warning_result: |
| print('------') |
| if len(warning_result) == 1: |
| print('Found 1 warning') |
| else: |
| print('Found %d warnings' % len(warning_result)) |
| print() |
| if self._enable_warning_details or not fail: |
| _print_warnings(warning_result, self._recipe_deps) |
| else: |
| print('Fix test failures or pass --show-warnings for details.') |
| print() |
| |
| duration_result = _collect_duration_result(outcome_msg) |
| if duration_result: |
| print('------') |
| if len(duration_result) == 1: |
| print('Found 1 long-running test') |
| else: |
| print(f'Found {len(duration_result)} long-running tests') |
| print() |
| if self._enable_duration_details or not fail: |
| _print_durations(duration_result, self._enable_duration_details) |
| else: |
| print('Fix test failures or pass --show-durations for details.') |
| print() |
| |
| status_warning_result = _collect_global_warnings_result(outcome_msg) |
| if status_warning_result: |
| print('------') |
| print('Found these warnings in the following tests:') |
| for test_name, warning in sorted(status_warning_result): |
| print('\t%s - %s' % (test_name, warning)) |
| print() |
| |
| if fail: |
| print('------') |
| print('FAILED') |
| print() |
| if not self._is_train: |
| print('NOTE: You may need to re-train the expectation files by running') |
| print() |
| print(' ./recipes.py test train') |
| print() |
| print('This will update all the .json files to have content which') |
| print('matches the current recipe logic. Review them for correctness') |
| print('and include them with your CL.') |
| sys.exit(1) |
| |
| print('------') |
| print('TESTS OK') |
| |
| |
| # Internal helper stuff |
| |
| # Map of top-level field name (in recipe_engine.internal.test.Outcome) |
| # to: |
| # |
| # (success, verbose message, emoji icon, text icon) |
| # |
| # _check_field will scan for the first entry which has fields set. |
| FIELD_TO_DISPLAY = collections.OrderedDict([ |
| ('internal_error', (False, 'internal testrunner error', '🆘', '!')), |
| |
| ('bad_test', (False, 'test specification was bad/invalid', '🛑', 'S')), |
| ('crash_mismatch', (False, 'recipe crashed in an unexpected way', '🔥', 'E')), |
| ('check', (False, 'failed post_process check(s)', '❌', 'X')), |
| ('diff', (False, 'expectation file has diff', '⚡', 'D')), |
| |
| ('removed', (True, 'removed expectation file', '🌟', 'R')), |
| ('written', (True, 'updated expectation file', '💾', 'D')), |
| |
| ('global_warnings', (True, 'warning emitted', '❗', 'W')), |
| ]) |
| |
| |
| def _check_field(test_result, field_name): |
| for descriptor, value in test_result.ListFields(): |
| if descriptor.name == field_name: |
| return FIELD_TO_DISPLAY[field_name], value |
| |
| return (None, None, None, None), None |
| |
| |
| def _print_summary_info(recipe_deps, verbose, use_emoji, test_name, test_result, |
| space_for_columns): |
| # Pick the first populated field in the TestResults.Results |
| for field_name in FIELD_TO_DISPLAY: |
| (success, verbose_msg, emj, txt), _ = _check_field(test_result, field_name) |
| icon = emj if use_emoji else txt |
| if icon: |
| break |
| |
| # handle warnings and 'nothing' specially: |
| if not icon: |
| success = True |
| for warning_name in test_result.warnings: |
| verbose_msg = 'warnings' |
| if recipe_deps.warning_definitions[warning_name].deadline: |
| icon = '🟡' if use_emoji else 'W' |
| verbose_msg = 'warnings with deadline' |
| break |
| |
| if not icon: |
| icon = '.' |
| |
| if verbose: |
| msg = '' if not verbose_msg else ' (%s)' % verbose_msg |
| print('%s ... %s%s' % (test_name, 'ok' if success else 'FAIL', msg)) |
| else: |
| space_for_columns(1 if len(icon) == 1 else 2) |
| sys.stdout.write(icon) |
| sys.stdout.flush() |
| |
| |
| def _print_detail_info(err_buf, test_name, test_result): |
| verbose_msg = None |
| |
| def _header(): |
| print('=' * 70, file=err_buf) |
| print('FAIL (%s) - %s' % (verbose_msg, test_name), file=err_buf) |
| print('-' * 70, file=err_buf) |
| |
| for field in ('internal_error', 'bad_test', 'crash_mismatch'): |
| (_, verbose_msg, _, _), lines = _check_field(test_result, field) |
| if lines: |
| _header() |
| for line in lines: |
| print(line, file=err_buf) |
| print(file=err_buf) |
| |
| (_, verbose_msg, _, _), lines_groups = _check_field(test_result, 'check') |
| if lines_groups: |
| _header() |
| for group in lines_groups: |
| for line in group.lines: |
| print(line, file=err_buf) |
| print(file=err_buf) |
| |
| (_, verbose_msg, _, _), lines = _check_field(test_result, 'diff') |
| if lines: |
| _header() |
| for line in lines.lines: |
| print(line, file=err_buf) |
| print(file=err_buf) |
| |
| |
| @attr.s |
| class PerWarningResult: |
| call_sites = attr.ib(factory=set) |
| import_sites = attr.ib(factory=set) |
| |
| |
| def _collect_warning_result(outcome_msg): |
| """Collects issued warnings from all test outcomes and dedupes causes for |
| each warning. |
| """ |
| result = defaultdict(PerWarningResult) |
| for name, causes in outcome_msg.warnings.items(): |
| for cause in causes.causes: |
| if cause.WhichOneof('oneof_cause') == 'call_site': |
| result[name].call_sites.add(CallSite.from_cause_pb(cause)) |
| else: |
| result[name].import_sites.add(ImportSite.from_cause_pb(cause)) |
| return result |
| |
| |
| def _collect_global_warnings_result(outcome_msg): |
| result = [] |
| for test_name, test_result in outcome_msg.test_results.items(): |
| _, warnings = _check_field(test_result, 'global_warnings') |
| if warnings: |
| for warning in warnings: |
| result.append((test_name, warning)) |
| return result |
| |
| |
| def _print_warnings(warning_result, recipe_deps): |
| def print_bug_links(definition): |
| bug_links = [ |
| f'https://{bug.host}/p/{bug.project}/issues/detail?id={bug.id}' |
| for bug in definition.monorail_bug |
| ] + [ |
| f'https://{iss.host}/{iss.id}' |
| for iss in definition.google_issue |
| ] |
| |
| if bug_links: |
| print() |
| if len(bug_links) == 1: |
| print(f'Bug Link: {bug_links[0]}') |
| else: |
| print('Bug Links:') |
| for link in bug_links: |
| print(f' {link}') |
| |
| |
| def print_call_sites(call_sites): |
| def stringify_frame(frame): |
| return ':'.join((os.path.normpath(frame.file), str(frame.line))) |
| |
| if not call_sites: |
| return |
| print('Call Sites:') |
| sorted_sites = sorted(call_sites, |
| key=lambda s: (s.site.file, s.site.line)) |
| if sorted_sites[0].call_stack: |
| # call site contains the full stack. |
| for call_site in sorted_sites: |
| print(' site: %s' % stringify_frame(call_site.site)) |
| print(' stack:') |
| for f in call_site.call_stack: |
| print(' ' +stringify_frame(f)) |
| print() |
| else: |
| for file_name, sites in groupby(sorted_sites, key=lambda s: s.site.file): |
| # Print sites that have the same file in a single line. |
| # E.g. /path/to/site:123 (and 456, 789) |
| site_iter = iter(sites) |
| line = stringify_frame(next(site_iter).site) |
| additional_lines = ', '.join(str(s.site.line) for s in site_iter) |
| if additional_lines: |
| line = '%s (and %s)' % (line, additional_lines) |
| print(' ' + line) |
| |
| def print_import_sites(import_sites): |
| if not import_sites: |
| return |
| print('Import Sites:') |
| for import_site in sorted(import_sites, |
| key=lambda s: ( |
| s.repo or "", s.module or "", s.recipe or "")): |
| repo = recipe_deps.repos[import_site.repo] |
| if import_site.module: |
| mod_path = repo.modules[import_site.module].path |
| print(' %s' % os.path.normpath(os.path.join(mod_path, '__init__.py'))) |
| else: |
| print(' %s' % os.path.normpath(repo.recipes[import_site.recipe].path)) |
| |
| for warning_name in sorted(warning_result): |
| causes = warning_result[warning_name] |
| print('*' * 70) |
| print('{:^70}'.format('WARNING: %s' % warning_name)) |
| print('{:^70}'.format('Found %d call sites and %d import sites' % ( |
| len(causes.call_sites), len(causes.import_sites),))) |
| print('*' * 70) |
| definition = recipe_deps.warning_definitions[warning_name] |
| if definition.description: |
| print('Description:') |
| for desc in definition.description: |
| print(' %s' % desc) |
| if definition.deadline: |
| print('Deadline: %s' % definition.deadline) |
| print_bug_links(definition) |
| print_call_sites(causes.call_sites) |
| print_import_sites(causes.import_sites) |
| |
| |
| def _collect_duration_result( |
| outcome_msg: runner_pb2.Outcome) -> dict[str, datetime.timedelta]: |
| """Collects durations from all test outcomes saves the long ones.""" |
| result = defaultdict(PerWarningResult) |
| for name, test_result in outcome_msg.test_results.items(): |
| duration = datetime.timedelta( |
| milliseconds=test_result.duration.ToMilliseconds() |
| ) |
| if duration >= datetime.timedelta(seconds=5): |
| result[name] = duration |
| |
| return result |
| |
| |
| SOFT_MAX_DURATIONS = 8 |
| HARD_MAX_DURATIONS = 12 |
| |
| |
| def _print_durations(duration_result: dict[str, datetime.timedelta], |
| full: bool): |
| durations = list(duration_result.items()) |
| durations.sort(key=lambda x: (x[1], x[0])) |
| |
| # Don't show "x long-running tests hidden" when x is a very small number. |
| # In that case, just show the tests. |
| exit_early = True |
| if full or len(durations) <= HARD_MAX_DURATIONS: |
| exit_early = False |
| |
| for i, (name, duration) in enumerate(reversed(durations)): |
| if exit_early and i >= SOFT_MAX_DURATIONS: |
| print(f'{len(durations)-i} long-running tests hidden, use ' |
| '--show-durations to show all') |
| break |
| print(f'{duration.total_seconds():8.3f}s {name}') |