| # Copyright 2019 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| import collections |
| import collections.abc |
| import errno |
| import json |
| import os |
| import re |
| import sys |
| import tempfile |
| import time |
| import traceback |
| |
| import attr |
| import difflib |
| import coverage |
| import gevent |
| |
| from builtins import range |
| from builtins import str |
| from gevent import subprocess |
| from google.protobuf import duration_pb2 |
| from google.protobuf import json_format as jsonpb |
| |
| from recipe_engine import __path__ as RECIPE_ENGINE_PATH |
| |
| # pylint: disable=import-error |
| import PB |
| from PB.recipe_engine.internal.test.runner import Description, Outcome |
| from PB.go.chromium.org.luci.buildbucket.proto.common import Status |
| |
| |
| from ... import legacy |
| |
| from .... import config_types |
| from .... import engine_types |
| |
| from ...global_shutdown import GLOBAL_SHUTDOWN |
| from ...simple_cfg import RECIPES_CFG_LOCATION_REL |
| from ...test import magic_check_fn |
| from ...test.execute_test_case import execute_test_case |
| |
| from ... import debugger |
| |
| from .expectation_conversion import transform_expectations |
| from .pipe import write_message, read_message |
| |
| |
| def _merge_presentation_updates(steps_ran, presentation_steps): |
| """Merges the steps ran (from the SimulationStepRunner) with the steps |
| presented (from the SimulationAnnotatorStreamEngine). |
| |
| Args: |
| |
| * steps_ran (Dict[str, dict]) - Mapping of step name to its run details as |
| an expectation dict (e.g. 'cmd', 'env', etc.) |
| * presentation_steps (OrderedDict[str, dict]) - Mapping of step name (in the |
| order that they were presented) to a dict containing the collected |
| annotations for that step. |
| |
| Returns OrderedDict[str, expectation: dict]. This will have the order of steps |
| in the order that they were presented. |
| """ |
| ret = collections.OrderedDict() |
| for step_name, step_presented in presentation_steps.items(): |
| # root annotations |
| if step_name is None: |
| continue |
| ret[step_name] = steps_ran.get(step_name, { |
| 'name': step_name, |
| # TODO(iannucci): Drop 'cmd' field for presentation-only steps. |
| 'cmd': [], |
| 'cost': None, |
| }) |
| debug_logs = step_presented.get('logs', {}).get('$debug', None) |
| # wowo hacks! |
| # We only want to see $debug if it's got a crash in it. |
| if debug_logs and 'Unhandled exception:' not in debug_logs.splitlines(): |
| step_presented['logs'].pop('$debug') |
| |
| ret[step_name].update(step_presented) |
| |
| return ret |
| |
| |
| def _check_bad_test(test_results, test_data, steps_ran, presentation_steps): |
| """Check to see if the user-provided test was malformed in some way. |
| |
| Currently this only identifies issues around unconsumed or misplaced |
| step_data. |
| |
| Args: |
| |
| * test_results (Outcome.Results) - The object to update in the event the |
| test was bad. |
| * test_data (recipe_engine.recipe_test_api.TestData) - The user-provided |
| test data object, after running the test. We're checking to see that it's |
| empty now. |
| * steps_ran (List[str]) - The list of step names which the |
| SimulationStepRunner saw. This will only include step names run via |
| `api.step()`, and notably omits 'presentation only' steps such as parent |
| nest steps or steps emitted by the engine for UI purposes (e.g. crash |
| reports). |
| * presentation_steps (List[str]) - The list of step names which the |
| SimulationAnnotatorStreamEngine saw. This is the full list of steps which |
| would occur on the build UI. |
| """ |
| for step in test_data.step_data: |
| # This is an unconsumed step name. |
| |
| if step in presentation_steps: |
| # If the step is unconsumed but present in presentation_steps it means |
| # that the step was really a presentation-only step (like a parent nesting |
| # step) and not eligble for test data. |
| test_results.bad_test.extend([ |
| 'Mock data provided for presentation only step %r.' % step, |
| 'Presentation-only steps (like parent nesting steps) have no', |
| 'subprocess associated with them and so cannot have mock data.', |
| 'Please change your test so that it provides mock data for one of', |
| 'the real steps.', |
| ]) |
| |
| else: |
| test_results.bad_test.append( |
| 'Mock data provided for non-existent step %r.' % step) |
| |
| if test_results.bad_test: |
| test_results.bad_test.extend([ |
| '', |
| 'For reference, this test ran the following steps:', |
| ]) |
| test_results.bad_test.extend( |
| ' ' + repr(s) for s in presentation_steps if s in steps_ran) |
| |
| |
| def _exception_class_names(exception: Exception): |
| if isinstance(exception, ExceptionGroup): |
| types = set() |
| for exc in exception.exceptions: |
| types.update(_exception_class_names(exc)) |
| return types |
| |
| return set((exception.__class__.__name__,)) |
| |
| |
| def _check_exception(test_results, expected_exceptions, |
| uncaught_exception_info): |
| """Check to see if the test run failed with an exception from RunSteps. |
| |
| Args: |
| |
| * test_results (Outcome.Results) - The Outcome object to update in |
| the event the test exception expectation was bad. |
| * expected_exceptions (sequence of str) - The name of the exceptions that |
| the test case expected. |
| * uncaught_exception_info (Tuple[type, Exception, traceback]|None) - The |
| exception info for any uncaught exception triggered by user recipe code. |
| |
| Returns CrashFailure|None. |
| """ |
| # Check to see if the user expected the recipe to crash in this test case or |
| # not. |
| if uncaught_exception_info: |
| exc_type, exc, _ = uncaught_exception_info |
| exc_name = exc_type.__name__ |
| else: |
| exc_name = exc = None |
| if expected_exceptions: |
| if not exc: |
| test_results.crash_mismatch.append( |
| 'Missing expected exception in RunSteps. `api.expect_exception` is' |
| ' specified, but the exception did not occur.' |
| ) |
| |
| else: |
| uncaught_exception_types = sorted(_exception_class_names(exc)) |
| expected_exception_types = sorted(set(expected_exceptions)) |
| |
| if uncaught_exception_types != expected_exception_types: |
| test_results.crash_mismatch.append( |
| 'Expected exception mismatch in RunSteps. The test expected ' |
| f'{expected_exception_types} but saw {uncaught_exception_types}' |
| ) |
| |
| elif exc: |
| msg_lines = [ |
| 'Unexpected exception in RunSteps. Use `api.expect_exception` if' |
| ' the crash is intentional.', |
| ] |
| |
| for line in traceback.format_exception(exc): |
| for part in line.rstrip().splitlines(): |
| msg_lines.append(part) |
| |
| msg_lines += [ |
| l.rstrip('\n') |
| for l in traceback.format_exception_only(exc_type, exc) |
| ] |
| test_results.crash_mismatch.extend(msg_lines) |
| |
| |
| def _check_status(raw_result, test_data, test_results, enforce_status_check): |
| """Check the status of the test against the expected status. |
| |
| Args: |
| |
| raw_result - A RawResult, including `status`. |
| test_data - The TestData object for the current test. |
| test_results - The Outcome.Results object to update in |
| the event the status is mismatched. |
| enforce_status_check (bool): Whether to enforce status checks on the test. |
| |
| Side-effect: updates test_failures with the formatted status failures. |
| """ |
| build_status = raw_result.status |
| expected_status = test_data.expected_status or Status.SUCCESS |
| if raw_result.status != expected_status: |
| # TODO (crbug.com/1426908): This will need to be happen without the config |
| # prop on all builds once tests are updated: |
| if enforce_status_check: |
| test_results.crash_mismatch.append( |
| 'Status mismatch in RunSteps. The test expected %r but ' |
| 'the status was %r.' % (Status.Name(expected_status), |
| Status.Name(build_status)) |
| ) |
| else: |
| test_results.global_warnings.append( |
| 'expected %s, got %s' % (Status.Name(expected_status), |
| Status.Name(build_status)) |
| ) |
| |
| |
| def _diff_test(test_results, expect_file, new_expect, is_train): |
| """Compares the actual and expected results. |
| |
| Args: |
| |
| * test_results (Outcome.Results) - The object to update in case the diff |
| doesn't match up. |
| * expect_file (str) - Absolute path to where this test's expectation JSON |
| file is. |
| * new_expect (Jsonish test expectation) - What the simulation actually |
| produced. |
| |
| Side-effects: |
| * If we're writing the expectation, may update expectation on disk |
| * Otherwise, updates test_results if there's a diff with what's on disk. |
| """ |
| cur_expect_text = None |
| try: |
| with open(expect_file) as fil: |
| cur_expect_text = fil.read() |
| except IOError: |
| pass # missing, it's fine |
| except Exception as ex: # pylint: disable=broad-except |
| if not is_train: |
| test_results.internal_error.append( |
| 'Unexpected exception reading test expectation %r: %r' % ( |
| expect_file, ex)) |
| return |
| |
| # Occurs if the expectation is not on disk and the test case dropped the |
| # expectation data. |
| # |
| # Otherwise `new_expect_text` will be `null`, which doesn't match None. |
| if new_expect is None and cur_expect_text is None: |
| return |
| |
| new_expect_text = json.dumps( |
| _encode_decode(new_expect), sort_keys=True, indent=2) |
| |
| if new_expect_text == cur_expect_text: |
| return |
| |
| if is_train: |
| if new_expect is None: |
| try: |
| os.remove(expect_file) |
| test_results.removed = True |
| except OSError: |
| pass |
| return |
| |
| # Try to make the expectation dir. |
| try: |
| os.makedirs(os.path.dirname(expect_file)) |
| except OSError as ex: |
| if ex.errno != errno.EEXIST: |
| raise |
| |
| try: |
| with open(expect_file, 'w') as fil: |
| fil.write(new_expect_text) |
| test_results.written = True |
| except Exception as ex: # pylint: disable=broad-except |
| test_results.internal_error.append( |
| 'Unexpected exception writing test expectation %r: %r' % ( |
| expect_file, ex)) |
| return |
| |
| if new_expect is None: |
| test_results.diff.lines.extend([ |
| 'Test expectation exists on disk at %r.' % (expect_file,), |
| 'However, the test case dropped all expectation information (i.e. with a' |
| ' `post_process` function). Please re-run `recipes.py test train` or ' |
| 'delete this expectation file.', |
| ]) |
| return |
| |
| test_results.diff.lines.extend( |
| difflib.unified_diff( |
| str(cur_expect_text).splitlines(), |
| str(new_expect_text).splitlines(), |
| fromfile='current expectation file', |
| tofile='actual test result', |
| n=4, lineterm='')) |
| |
| |
| def _run_test(path_cleaner, test_results, recipe_deps, test_desc, test_data, |
| is_train): |
| """This is the main 'function' run by the worker. It executes the test in the |
| recipe, compares/diffs/writes the expectation file and updates `test_results` |
| as a side effect. |
| |
| Args: |
| |
| * test_results (Outcome.Results) |
| * recipe_deps (RecipeDeps) |
| * test_desc (Description) |
| * test_data (TestData) |
| """ |
| start_time = time.time() |
| |
| test_case_result = execute_test_case( |
| recipe_deps, test_desc.recipe_name, test_data) |
| |
| duration = time.time() - start_time |
| test_results.duration.CopyFrom( |
| duration_pb2.Duration( |
| seconds=int(duration), nanos=int((duration % 1) * (10**9)))) |
| |
| for name, causes in test_case_result.warnings.items(): |
| test_results.warnings[name].causes.extend(causes) |
| |
| raw_expectations = _merge_presentation_updates(test_case_result.ran_steps, |
| test_case_result.annotations) |
| _check_bad_test(test_results, test_data, |
| list(test_case_result.ran_steps), |
| list(raw_expectations)) |
| _check_exception(test_results, test_data.expected_exceptions, |
| test_case_result.uncaught_exception) |
| |
| _check_status( |
| test_case_result.raw_result, test_data, test_results, |
| recipe_deps.main_repo.recipes_cfg_pb2.enforce_test_expected_status) |
| |
| # Convert the result to a json object by dumping to json, and then parsing. |
| # TODO(iannucci): Use real objects so this only needs to be serialized once. |
| raw_expectations['$result'] = json.loads(jsonpb.MessageToJson( |
| legacy.to_legacy_result(test_case_result.raw_result), |
| including_default_value_fields=True, |
| )) |
| |
| if not raw_expectations['$result'].get('failure'): # on success |
| if test_case_result.raw_result.summary_markdown: # has markdown populated |
| raw_expectations['$result']['summaryMarkdown'] = ( |
| test_case_result.raw_result.summary_markdown |
| ) |
| |
| raw_expectations['$result']['name'] = '$result' |
| |
| raw_expectations = magic_check_fn.post_process( |
| test_results, raw_expectations, test_data) |
| |
| transform_expectations(path_cleaner, raw_expectations) |
| |
| _diff_test(test_results, test_data.expect_file, raw_expectations, is_train) |
| |
| |
| def _cover_all_imports(main_repo): |
| # If our process is supposed to collect coverage for all recipe module |
| # imports, do that after we receive the first Description. This way we can |
| # reply to the main process with an Outcome. Otherwise the main process |
| # could be blocked on writing a Description while we're trying to write an |
| # Outcome. |
| if not main_repo.modules: |
| # Prevents a coverage warning when there are no modules to collect coverage |
| # from. |
| return coverage.CoverageData(no_disk=True) |
| |
| cov = coverage.Coverage( |
| config_file=False, |
| data_file=None, |
| include=[os.path.join(main_repo.modules_dir, '*', '*.py')]) |
| cov.start() |
| for module in main_repo.modules.values(): |
| # Allow exceptions to raise here; they'll be reported as a 'global' |
| # failure. |
| module.API |
| module.CONFIG_CTX |
| module.TEST_API |
| cov.stop() |
| return cov.get_data() |
| |
| # administrative stuff (main, pipe handling, etc.) |
| |
| def main(recipe_deps, cov_file, is_train, cover_module_imports): |
| gevent.get_hub().exception_stream = None |
| |
| main_repo = recipe_deps.main_repo |
| |
| cov_data = coverage.CoverageData(basename=cov_file) |
| if cover_module_imports: |
| cov_data.update(_cover_all_imports(main_repo)) |
| |
| test_data_cache = {} |
| |
| path_cleaner = _make_path_cleaner(recipe_deps) |
| |
| fatal = False |
| |
| while True: |
| # Reset global state as early as possible for each test case. |
| config_types.ResetGlobalVariableAssignments() |
| engine_types.PerGreentletStateRegistry.clear() |
| GLOBAL_SHUTDOWN.clear() |
| |
| test_desc = _read_test_desc() |
| if not test_desc: |
| break # EOF or error |
| |
| result = Outcome() |
| try: |
| full_name = '%s.%s' % (test_desc.recipe_name, test_desc.test_name) |
| test_result = result.test_results[full_name] |
| |
| recipe = main_repo.recipes[test_desc.recipe_name] |
| |
| if cov_file: |
| # We have to start coverage now because we want to cover the importation |
| # of the covered recipe and/or covered recipe modules. |
| cov = coverage.Coverage(config_file=False, concurrency='gevent', |
| data_file=None, |
| include=recipe.coverage_patterns) |
| cov.start() # to cover execfile of recipe/module.__init__ |
| |
| test_data = _get_test_data(test_data_cache, recipe, test_desc.test_name) |
| try: |
| _run_test(path_cleaner, test_result, recipe_deps, test_desc, test_data, |
| is_train) |
| except Exception as ex: # pylint: disable=broad-except |
| test_result.internal_error.append('Uncaught exception: %r' % (ex,)) |
| test_result.internal_error.extend(traceback.format_exc().splitlines()) |
| if cov_file: |
| cov.stop() |
| cov_data.update(cov.get_data()) |
| |
| except Exception as ex: # pylint: disable=broad-except |
| result.internal_error.append('Uncaught exception: %r' % (ex,)) |
| result.internal_error.extend(traceback.format_exc().splitlines()) |
| fatal = True |
| |
| if (not write_message(sys.stdout.buffer, result) |
| or fatal): |
| break # EOF |
| |
| if cov_file: |
| # Write data to the cov_file. |
| cov_data.write() |
| |
| |
| def _read_test_desc(): |
| try: |
| return read_message(sys.stdin.buffer, Description) |
| except Exception as ex: # pylint: disable=broad-except |
| write_message(sys.stdout.buffer, Outcome(internal_error=[ |
| 'while reading: %r' % (ex,) |
| ]+traceback.format_exc().splitlines())) |
| return None |
| |
| def _get_test_data(cache, recipe, test_name): |
| key = (recipe.name, test_name) |
| if key not in cache: |
| for test_data in recipe.gen_tests(): |
| cache[(recipe.name, test_data.name)] = test_data |
| return cache[key] |
| |
| |
| # TODO(iannucci): fix test system so that non-JSONish types cannot leak into |
| # raw_expectations. |
| def _encode_decode(obj): |
| """For py3: ensure any bytes are decoded to str""" |
| if isinstance(obj, str): |
| return obj |
| |
| if isinstance(obj, bytes): |
| return obj.decode('utf-8', 'replace') |
| |
| if isinstance(obj, collections.abc.Mapping): |
| return {_encode_decode(k): _encode_decode(v) for k, v in obj.items()} |
| |
| if isinstance(obj, collections.abc.Iterable): |
| return [_encode_decode(i) for i in obj] |
| |
| return obj |
| |
| |
| def _make_path_cleaner(recipe_deps): |
| """Returns a filtering function which substitutes real paths-on-disk with |
| expectation-compatible `RECIPE_REPO[repo name]` mock paths. This only works |
| for paths contained in double-quotes (e.g. as part of a stack trace). |
| |
| Args: |
| |
| * recipe_deps (RecipeDeps) - All of the loaded recipe dependencies. |
| |
| Returns `func(lines : List[str]) -> List[str]` which converts real on-disk |
| absolute paths to RECIPE_REPO mock paths. |
| """ |
| # maps path_to_replace -> replacement |
| roots = {} |
| # paths of all recipe_deps |
| for repo in recipe_deps.repos.values(): |
| roots[repo.path] = 'RECIPE_REPO[%s]' % repo.name |
| main_repo_root = 'RECIPE_REPO[%s]' % recipe_deps.main_repo.name |
| |
| # Derive path to python prefix. We WOULD use `sys.prefix` and |
| # `sys.real_prefix` (a vpython construction), however SOME python |
| # distributions have these set to unhelpful paths (like '/usr'). So, we import |
| # one library known to be in the vpython prefix and one known to be in the |
| # system prefix and then derive the real paths from those. |
| # |
| # FIXME(iannucci): This is all pretty fragile. |
| dirn = os.path.dirname |
| # os is in the vpython root |
| roots[os.path.abspath(dirn(dirn(dirn(os.__file__))))] = 'PYTHON' |
| # io is in the system root |
| import io |
| roots[os.path.abspath(dirn(dirn(dirn(io.__file__))))] = 'PYTHON' |
| # coverage is in the local site-packages in the vpython root |
| roots[os.path.abspath(dirn(dirn(coverage.__file__)))] = \ |
| 'PYTHON(site-packages)' |
| |
| def _root_subber(match): |
| root = roots[match.group(1)] |
| path = match.group(2).replace('\\', '/') |
| line = ', line ' + match.group(3) |
| # If this is a path from some other repo, then replace the line number as |
| # it is very noisy, but the general shape of the traceback can still be |
| # useful. |
| if root != main_repo_root: |
| line = '' |
| return '"%s%s"%s' % (root, path, line) |
| |
| # Replace paths from longest to shortest; because of the way the recipe engine |
| # fetches dependencies (i.e. into the .recipe_deps folder) dependencies of |
| # repo X will have a prefix of X's path. |
| paths = sorted(list(roots), key=lambda v: -len(v)) |
| |
| # Look for paths in double quotes (as we might see in a stack trace) |
| replacer = re.compile( |
| r'"(%s)([^"]*)", line (\d+)' % ('|'.join(map(re.escape, paths)),)) |
| |
| return lambda lines: [replacer.sub(_root_subber, line) for line in lines] |
| |
| |
| @attr.s(frozen=True) |
| class DescriptionWithCallback: |
| |
| description = attr.ib() |
| callback = attr.ib() |
| |
| |
| class RunnerThread(gevent.Greenlet): |
| def __init__(self, recipe_deps, description_queue, outcome_queue, is_train, |
| cov_file, cover_module_imports): |
| super(RunnerThread, self).__init__() |
| |
| self.cov_file = cov_file |
| self.exit_code = None |
| |
| engine_path = recipe_deps.repos['recipe_engine'].path |
| |
| cmd = [ |
| # NOTE: We use sys.executable to play nice with pydevd's os.exec patching. |
| # If we re-invoke vpython3 here, it will be confused and won't do it's |
| # awful dark magic on the runners. |
| # |
| # If they ever introduce python4 (heaven help us all), you'll probably |
| # need to generate the virtualenv for python4 here and then directly use |
| # the interpreter here, rather than invoking, say `vpython4`. |
| # |
| # Assuming ANY of those things are still a thing at that point :). |
| sys.executable, |
| '-u', |
| os.path.join(engine_path, 'recipe_engine', 'main.py'), |
| '--package', |
| os.path.join(recipe_deps.main_repo.path, RECIPES_CFG_LOCATION_REL), |
| '--proto-override', |
| os.path.dirname(PB.__path__[0]), |
| '--log-level', |
| 'ERROR', |
| ] |
| # Carry through all repos explicitly via overrides |
| for repo_name, repo in recipe_deps.repos.items(): |
| if repo_name == recipe_deps.main_repo.name: |
| continue |
| cmd.extend(['-O', '%s=%s' % (repo_name, repo.path)]) |
| |
| cmd.extend(['test', '_runner']) |
| if is_train: |
| cmd.append('--train') |
| if cov_file: |
| cmd.extend(['--cov-file', cov_file]) |
| if cover_module_imports: |
| cmd.append('--cover-module-imports') |
| |
| self._runner_proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, stderr=None) |
| self._description_queue = description_queue |
| self._outcome_queue = outcome_queue |
| |
| @classmethod |
| def make_pool(cls, recipe_deps, description_queue, outcome_queue, is_train, |
| collect_coverage, jobs): |
| """Returns a pool (list) of started RunnerThread instances. |
| |
| Each RunnerThread owns a `recipes.py test _runner` subprocess and |
| coordinates communication to and from that subprocess. |
| |
| This makes `jobs` runners. |
| |
| Args: |
| |
| * recipe_deps (RecipeDeps) |
| * description_queue (gevent.queue.Queue) - The queue to pull Description |
| messages from to feed to the runner subprocess. |
| * outcome_queue (gevent.queue.Queue) - The queue to push Outcome messages |
| sourced from the runner subprocess. |
| * is_train (bool) - Whether or not the runner subprocess should train |
| (write) expectation files to disk. If False will not write/delete |
| anything on the filesystem. |
| * collect_coverage (bool) - Whether or not to collect coverage. May be |
| false if the user specified a test filter. |
| * jobs (int) - The number of workers to use for running tests. |
| |
| Returns List[RunnerThread]. |
| """ |
| if collect_coverage: |
| cov_dir = tempfile.mkdtemp('.recipe_test_coverage') |
| cov_file = lambda tid: os.path.join(cov_dir, 'thread-%d.coverage' % tid) |
| else: |
| cov_dir = None |
| cov_file = lambda tid: None |
| |
| # We assign import coverage to (only) the first runner subprocess; there's |
| # no need to duplicate this work to all runners. |
| pool = [ |
| cls(recipe_deps, |
| description_queue, |
| outcome_queue, |
| is_train, |
| cov_file(i), |
| cover_module_imports=(i == 0)) for i in range(jobs) |
| ] |
| for thread in pool: |
| thread.start() |
| return cov_dir, pool |
| |
| # pylint: disable=method-hidden |
| def _run(self): |
| try: |
| while True: |
| test_desc = self._description_queue.get() |
| if not test_desc: |
| self._runner_proc.stdout.close() |
| try: |
| self._runner_proc.stdin.write(b'\0') |
| self._runner_proc.stdin.close() |
| except OSError as e: |
| # The subprocess has already aborted in starting stage. |
| if e.errno == errno.EPIPE: |
| pass |
| self._runner_proc.wait() |
| return |
| |
| callback = None |
| if isinstance(test_desc, DescriptionWithCallback): |
| callback = test_desc.callback |
| test_desc = test_desc.description |
| |
| if not write_message(self._runner_proc.stdin, test_desc): |
| self._outcome_queue.put(Outcome(internal_error=[ |
| 'Unable to send test description for (%s.%s) from %r' % ( |
| test_desc.recipe_name, test_desc.test_name, self.name |
| ) |
| ])) |
| return |
| |
| result = read_message(self._runner_proc.stdout, Outcome) |
| if callback: |
| callback() |
| if result is None: |
| return |
| |
| self._outcome_queue.put(result) |
| except KeyboardInterrupt: |
| pass |
| except gevent.GreenletExit: |
| pass |
| except Exception as ex: # pylint: disable=broad-except |
| self._outcome_queue.put(Outcome(internal_error=[ |
| 'Uncaught exception in %r: %s' % (self.name, ex) |
| ]+traceback.format_exc().splitlines())) |
| finally: |
| try: |
| self._runner_proc.kill() |
| except OSError: |
| pass |
| self.exit_code = self._runner_proc.wait() |
| # We rely on the thread to dump coverage information to disk; if we don't |
| # wait for the process to die, then our main thread will race with the |
| # runner thread for the coverage information. On windows this almost |
| # always causes an IOError, on *nix this will likely result in flakily |
| # truncated coverage files. |
| # |
| # Sending ourselves down the pipe lets the main process know that we've |
| # quit so it can remove us from the live threads. |
| self._outcome_queue.put(self) |