blob: 023bcdbb65f5a471c5ed9869fbd8729e22aec09f [file] [log] [blame]
# Copyright 2019 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import collections
import errno
import fnmatch
import io
import json
import os
import re
import shutil
import sys
import coverage
import gevent
import gevent.queue
from google.protobuf import json_format
from recipe_engine import __path__ as RECIPE_ENGINE_PATH
# pylint: disable=import-error
from PB.recipe_engine.internal.test.runner import Description, Outcome
from ..doc.cmd import regenerate_doc, doc_diff
from . import report, test_name
from .fail_tracker import FailTracker
from .runner import RunnerThread
# TODO(crbug.com/1147793): Remove the second return value after migration.
def _push_tests(test_filter: test_name.Filter, is_train, main_repo, description_queue,
recent_fails):
"""
Returns:
* set - unused_expectation_files
"""
unused_expectation_files = set()
used_expectation_files = set()
test_filenames = collections.defaultdict(dict)
def push_test(recipe, test_case):
recipe_filenames = test_filenames[recipe]
expect_file = test_case.expect_file
used_expectation_files.add(expect_file)
if expect_file in recipe_filenames:
og_name = recipe_filenames[expect_file]
if og_name == test_case.name:
raise ValueError(
f'Emitted test with duplicate name {test_case.name!r}')
raise ValueError(
'Emitted test %r which maps to the same JSON file as %r: %r' %
(test_case.name, og_name, expect_file))
recipe_filenames[expect_file] = test_case.name
if not test_filter.full_name(f'{recipe.name}.{test_case.name}'):
return
description_queue.put(
Description(
recipe_name=recipe.name,
test_name=test_case.name))
gevent.sleep() # let any blocking threads pick this up
# If filters are enabled, we'll only clean up expectation files for recipes
# that are included by the filter.
if not test_filter:
unused_expectation_files.update(main_repo.expectation_paths)
# Handle recent fails first
deferred_tests = []
for recipe in main_repo.recipes.values():
if not test_filter.recipe_name(recipe.name):
continue
if test_filter:
unused_expectation_files.update(recipe.expectation_paths)
# Maps expect_file -> original test_name
try:
for test_case in recipe.gen_tests(): # User code, could raise
full_name = recipe.full_name.split('::')[-1] + '.' + test_case.name
if len(recent_fails) == 0 or full_name in recent_fails:
push_test(recipe, test_case)
else:
deferred_tests.append((recipe, test_case))
except KeyboardInterrupt:
raise
except:
print('USER CODE ERROR:')
print(f'Crashed while running GenTests from recipe {recipe.name}')
raise
# Test any non-recently-failed cases
for deferred_test in deferred_tests:
push_test(*deferred_test)
unused_expectation_files -= used_expectation_files
if not is_train:
return sorted(unused_expectation_files)
for path in unused_expectation_files:
os.remove(path)
return set()
def _run(test_results, recipe_deps, use_emoji, test_filter, is_train,
stop, jobs, show_warnings):
"""Run tests in py3 subprocess pools.
"""
main_repo = recipe_deps.main_repo
description_queue = gevent.queue.UnboundQueue()
# outcome_queue is written to by RunnerThreads; it will either contain Outcome
# messages, or it will contain one of our RunnerThread instances (to indicate
# to our main thread here that the RunnerThread is done).
outcome_queue = gevent.queue.UnboundQueue()
test_results.uncovered_modules.extend(sorted(
set(
module.name
for module in main_repo.modules.values()
if not (
module.uses_sloppy_coverage or module.recipes or module.warnings)
)
))
fail_tracker = FailTracker(recipe_deps.previous_test_failures_path)
reporter = report.Reporter(recipe_deps, use_emoji, is_train, fail_tracker,
show_warnings)
cov_dir = None
total_cov = coverage.Coverage(config_file=False, data_file='.total_coverage',
data_suffix=True)
total_cov.save() # Force to ensure the coverage data file is created.
try:
# in case of crash; don't want this undefined in finally clause.
live_threads = []
cov_dir, all_threads = RunnerThread.make_pool(
recipe_deps,
description_queue,
outcome_queue,
is_train,
collect_coverage=not test_filter,
jobs=jobs)
live_threads[:] = all_threads
unused_expectation_files = _push_tests(
test_filter, is_train, main_repo, description_queue,
fail_tracker.recent_fails)
test_results.unused_expectation_files.extend(unused_expectation_files)
def execute_queue():
has_fail = False
while live_threads and not (has_fail and stop):
rslt = outcome_queue.get()
if isinstance(rslt, RunnerThread):
# should be done at this point, but make sure for cleanliness sake.
gevent.wait([rslt])
live_threads.remove(rslt)
continue
test_results.MergeFrom(rslt)
has_fail = reporter.short_report(rslt, can_abort=True)
if has_fail and stop:
break
# At this point we know all subprocesses and their threads have finished
# (because outcome_queue has been closed by each worker, which is how we
# escaped the while loop above).
#
# If we don't have any filters, collect coverage data.
if (test_filter or (stop and has_fail)) is False:
data_paths = [t.cov_file for t in all_threads
if os.path.isfile(t.cov_file)]
if data_paths:
total_cov.combine(data_paths)
return has_fail
# Put None poison pill for each thread.
for thread in all_threads:
description_queue.put(None)
has_fail = execute_queue()
print()
# Don't display coverage if the --stop flag was specified and there's a
# failure
if has_fail and stop:
reporter.final_report(None, test_results)
else:
reporter.final_report(total_cov, test_results)
finally:
for thread in live_threads:
thread.kill()
thread.join()
if cov_dir:
shutil.rmtree(cov_dir, ignore_errors=True)
total_cov.erase()
def main(args):
"""Runs simulation tests on a given repo of recipes.
Args:
args: the parsed args (see add_subparser).
Returns:
Exit code
"""
is_train = args.subcommand == 'train'
ret = Outcome()
def _dump():
if args.json:
output = []
result = json_format.MessageToDict(ret, preserving_proto_field_name=True)
output.append(result)
json.dump(output, args.json)
if args.dump_timing_info:
for testname, result in ret.test_results.items():
as_string = json_format.MessageToJson(result.duration)
args.dump_timing_info.write('%s\t%s\n' % (
testname,
# Durations are encoded like "0.23s". We just want the raw number
# to be put into the file, so skip the first and last quote, and
# the 's'.
as_string[1:-2]))
repo = args.recipe_deps.main_repo
try:
_run(ret, args.recipe_deps, args.use_emoji, args.test_filter, is_train,
args.stop, args.jobs, args.show_warnings)
_dump()
except KeyboardInterrupt:
args.docs = False # skip docs
except SystemExit:
_dump()
raise
docs_enabled = (not repo.recipes_cfg_pb2.no_docs) and args.docs
is_run = args.subcommand == 'run'
if docs_enabled:
if is_run and doc_diff(repo):
print('------')
print('README.recipes.md needs to be updated. Please run:')
print()
print(' ./recipes.py doc')
print()
return 1
if is_train:
print('Generating README.recipes.md')
with open(repo.readme_path, 'w') as f:
regenerate_doc(repo, f)
return 0