| # Copyright 2014 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Classes for running different kinds of tests. |
| |
| This module contains two main class hierarchies: test specs and tests. |
| Test specs are immutable objects that define the details of a specific |
| test and can be used to create the test object, which actually knows how |
| to execute a test. Test objects can also be decorated with test |
| wrappers, which can modify the execution of the test. |
| |
| The class `AbstractTestSpec` is the root of the class hierarchy for test |
| specs and test wrapper specs. It defines the single method `get_test` |
| which is how the test or wrapped test is obtained from the spec. |
| |
| All test spec types inherit from `TestSpec`. `TestSpec` implements the |
| `get_test` method in terms of the `test_class` property, which concrete |
| subclasses must override to return the class of the test type. All test |
| wrapper types inherit from `TestWrapperSpec`.`TestWrapperSpec` |
| implements the `get_test` method in terms of the `test_wrapper_class` |
| property, which concrete subclasses must override to return the class of |
| the test wrapper type. |
| |
| The class `AbstractTest` is the root of the class hierarchy for tests |
| and test wrappers. All test types inherit from `Test` and all test |
| wrapper types inherit from `TestWrapper`, which are both abstract base |
| classes. Each concrete test type or test wrapper type has an associated |
| spec type that contains the input details for the test or test wrapper. |
| """ |
| |
| from __future__ import annotations |
| |
| import abc |
| import attr |
| from collections.abc import Iterable, Set |
| import contextlib |
| from enum import StrEnum |
| import hashlib |
| import itertools |
| import inspect |
| import re |
| import struct |
| import urllib |
| |
| from recipe_engine import step_data |
| from recipe_engine.config_types import Path |
| |
| from .resultdb import ResultDB |
| |
| from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb2 |
| from PB.go.chromium.org.luci.resultdb.proto.v1 import (test_result as |
| test_result_pb2) |
| |
| from RECIPE_MODULES.build import chromium_swarming |
| from RECIPE_MODULES.build.attr_utils import (attrib, attrs, command_args, enum, |
| mapping, sequence) |
| from RECIPE_MODULES.build.chromium_utr.instruction import get_utr_instruction |
| from RECIPE_MODULES.build.skylab.test_runner import TestRunner |
| from RECIPE_MODULES.build.test_utils import util |
| |
| # Pylint doesn't understand an abstract class hierarchy where a subclass will |
| # override some of the abstract methods of its base and remain abstract itself. |
| # The actual implementation of abstract base classes ensures that everything is |
| # overridden when attempting to instantiate a class, so trying to make this |
| # pylint clean isn't worth the effort. |
| # pylint: disable=abstract-method |
| |
| RESULTS_URL = 'https://chromeperf.appspot.com' |
| |
| # When we retry failing tests, we try to choose a high repeat count so that |
| # flaky tests will produce both failures and successes. The tradeoff is with |
| # total run time, which we want to keep low. |
| REPEAT_COUNT_FOR_FAILING_TESTS = 10 |
| |
| # To guarantee a deterministic failure, we would extend the retry time when |
| # running failed tests on CI retry shards. |
| RETRY_LIMIT_FOR_CI_RETRY_SHARDS = 5 |
| |
| # Pinned version of |
| # https://chromium.googlesource.com/infra/infra/+/main/go/src/infra/cmd/mac_toolchain |
| MAC_TOOLCHAIN_PACKAGE = 'infra/tools/mac_toolchain/${platform}' |
| MAC_TOOLCHAIN_VERSION = ( |
| 'git_revision:b0c0a706097c27444dbe3f84e5553f1aaa77c1a6') |
| MAC_TOOLCHAIN_ROOT = '.' |
| |
| ALLOWED_RESULT_HANDLER_NAMES = ('default', 'layout tests', 'fake') |
| |
| # Matches the name of the new invocation that gets printed to stderr when |
| # calling `rdb stream -new`. |
| RDB_INVOCATION_NAME_RE = re.compile(r'rdb-stream: included "(\S+)" in "\S+"') |
| |
| INCLUDE_CI_FOOTER = 'Include-Ci-Only-Tests' |
| |
| INVALID_SUITE_STATUS = 'Invalid' |
| INCOMPLETE_SUITE_STATUS = 'Incomplete' |
| FAILURE_SUITE_STATUS = 'Failure' |
| SUCCESS_SUITE_STATUS = 'Success' |
| |
| |
| def _merge_arg(args, flag, value): |
| args = [a for a in args if not a.startswith(flag)] |
| if value is not None: |
| return args + ['%s=%s' % (flag, str(value))] |
| return args + [flag] |
| |
| |
| @attrs() |
| class TestOptionFlags: |
| """Flags for supporting TestOptions features. |
| |
| For each of the options in TestOptions, the different test types have |
| varying support and will require different arguments to be set. This |
| type abstracts out those details and provides a mechanism for adding |
| the appropriate flags to arguments when supported. |
| """ |
| |
| # Flag argument used to specify test filters |
| filter_flag = attrib(str, default='') |
| # The delimiter to use between values when specifying test filters |
| filter_delimiter = attrib(str, default='') |
| # Flag argument used to define how many times to repeat tests |
| repeat_flag = attrib(str, default='') |
| # Flag argument used to define the upper limit of retries. |
| retry_limit_flag = attrib(str, default='') |
| # Flag argument used to run disabled tests. |
| run_disabled_flag = attrib(str, default='') |
| # Flag argument used to set how many tests run in a given shard |
| batch_limit_flag = attrib(str, default='') |
| |
| @classmethod |
| def create(cls, **kwargs): |
| filter_flag = kwargs.get('filter_flag') |
| filter_delimiter = kwargs.get('filter_delimiter') |
| if filter_flag and not filter_delimiter: |
| raise ValueError("'filter_delimiter' must be set if 'filter_flag' is") |
| return cls(**kwargs) |
| |
| |
| _DEFAULT_OPTION_FLAGS = TestOptionFlags.create() |
| _GTEST_OPTION_FLAGS = TestOptionFlags.create( |
| filter_flag='--gtest_filter', |
| filter_delimiter=':', |
| repeat_flag='--gtest_repeat', |
| retry_limit_flag='--test-launcher-retry-limit', |
| run_disabled_flag='--gtest_also_run_disabled_tests', |
| batch_limit_flag='--test-launcher-batch-limit', |
| ) |
| _ISOLATED_SCRIPT_OPTION_FLAGS = TestOptionFlags.create( |
| filter_flag='--isolated-script-test-filter', |
| filter_delimiter='::', |
| repeat_flag='--isolated-script-test-repeat', |
| retry_limit_flag='--isolated-script-test-launcher-retry-limit', |
| ) |
| # webkit_layout_tests were renamed to blink_web_tests, which only supports |
| # gtest style arguments. See crbug/831345 and crrev/c/1006067 for details. |
| # batch limit was never supported for webkit_layout_tests, so we'll exclude |
| # override of that variable. |
| _BLINK_WEB_TESTS_OPTION_FLAGS = TestOptionFlags.create( |
| filter_flag='--gtest_filter', |
| filter_delimiter=':', |
| repeat_flag='--gtest_repeat', |
| retry_limit_flag='--test-launcher-retry-limit', |
| run_disabled_flag='--gtest_also_run_disabled_tests', |
| ) |
| _ANGLE_UNITTESTS_OPTION_FLAGS = TestOptionFlags.create( |
| filter_flag='--gtest_filter', |
| filter_delimiter=':', |
| repeat_flag='--gtest_repeat', |
| retry_limit_flag='--flaky-retries', |
| run_disabled_flag='--gtest_also_run_disabled_tests', |
| ) |
| |
| |
| @attrs() |
| class TestOptions: |
| """Test-type agnostic configuration of test running options.""" |
| |
| # How many times to run each test |
| repeat_count = attrib(int, default=None) |
| # A list of tests to restrict execution |
| test_filter = attrib(sequence[str], default=()) |
| # Whether to run tests that have been disabled. |
| run_disabled = attrib(bool, default=False) |
| # How many times to retry tests until getting a pass |
| retry_limit = attrib(int, default=None) |
| # Whether to run all tests independently, with no state leaked between them. |
| # This can significantly increase the time it takes to run tests. |
| force_independent_tests = attrib(bool, default=False) |
| |
| @classmethod |
| def create(cls, **kwargs): |
| return cls(**kwargs) |
| |
| def for_running(self, suffix, tests_to_retry): |
| """Gets options for running for a given suffix and tests to retry. |
| |
| When retrying tests without patch, we want to run the tests a fixed |
| number of times, regardless of whether they succeed, to see if they |
| flakily fail. Some recipes specify an explicit repeat_count -- for |
| those, we don't override their desired behavior. |
| |
| Args: |
| suffix: A string suffix. |
| tests_to_retry: A container of tests to retry. An empty container |
| indicates that it is not a retry and all tests should be run. |
| """ |
| # If there are too many tests, avoid setting a repeat count since that can |
| # cause timeouts. tests_to_retry can be None to indicate that all tests |
| # should be run. It can also rarely be the empty list, which is caused by an |
| # infra failure even though results are valid and all tests passed. |
| # https://crbug.com/910706. |
| if not tests_to_retry or len(tests_to_retry) > 100: |
| return self |
| |
| if self.repeat_count is None and suffix == 'without patch': |
| return attr.evolve( |
| self, |
| repeat_count=REPEAT_COUNT_FOR_FAILING_TESTS, |
| # If we're repeating the tests 10 times, then we want to set |
| # retry_limit=0. The default retry_limit of 3 means that failing tests |
| # will be retried 40 times, which is not our intention. |
| retry_limit=0, |
| # Since we're retrying a small number of tests, force them to be |
| # independent. This increases run time but produces more reliable |
| # results. |
| force_independent_tests=True, |
| ) |
| |
| # Allow more retries for CI shard retries. |
| if suffix == 'retry shards': |
| return attr.evolve(self, retry_limit=RETRY_LIMIT_FOR_CI_RETRY_SHARDS) |
| |
| return self |
| |
| def add_args(self, args, flags): |
| """Add arguments to the command line corresponding to the options. |
| |
| Args: |
| args: A sequence of strings containing the command-line. |
| flags: The TestOptionFlags instance containing the supported flags |
| for the test. |
| |
| Returns: |
| args: A list of strings containing the command-line. For any |
| enabled options, if there is a supporting flag, the command-line |
| will be modified to add the flag or replace it if it was already |
| present. |
| """ |
| args = list(args) |
| |
| if self.test_filter and flags.filter_flag: |
| args = _merge_arg(args, flags.filter_flag, |
| flags.filter_delimiter.join(self.test_filter)) |
| |
| if self.repeat_count and self.repeat_count > 1 and flags.repeat_flag: |
| args = _merge_arg(args, flags.repeat_flag, self.repeat_count) |
| |
| if self.retry_limit is not None and flags.retry_limit_flag: |
| args = _merge_arg(args, flags.retry_limit_flag, self.retry_limit) |
| |
| if self.run_disabled and flags.run_disabled_flag: |
| args = _merge_arg(args, flags.run_disabled_flag, None) |
| |
| if self.force_independent_tests and flags.batch_limit_flag: |
| args = _merge_arg(args, flags.batch_limit_flag, 1) |
| |
| return args |
| |
| |
| def _add_suffix(step_name, suffix): |
| if not suffix: |
| return step_name |
| return '{} ({})'.format(step_name, suffix) |
| |
| |
| def _present_info_messages(presentation, test, messages): |
| messages = list(messages) |
| if test.is_rts: |
| messages.append('Ran tests selected by RTS.') |
| if test.spec.description: |
| messages.append(test.spec.description) |
| messages.append(presentation.step_text) |
| presentation.step_text = '\n'.join(messages) |
| |
| |
| class AbstractTestSpec(abc.ABC): |
| """Abstract base class for specs for tests and wrapped tests.""" |
| |
| @abc.abstractmethod |
| def get_test(self, chromium_tests_api): |
| """Get a test instance described by the spec. |
| |
| Returns: |
| An instance of either a `Test` subclass or an instance of a |
| `TestWrapper` subclass. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| |
| class TestLocality(StrEnum): |
| LOCAL = 'local' |
| SWARMING = 'swarming' |
| SKYLAB = 'skylab' |
| |
| |
| class AbstractTest(abc.ABC): |
| """Abstract base class for tests and wrapped tests.""" |
| |
| @property |
| @abc.abstractmethod |
| def name(self) -> str: |
| """The name of the test's step without a phase suffix. |
| |
| Additional suffixes may be present (e.g. os and GPU for swarming |
| tests). |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def spec(self) -> AbstractTestSpec: |
| """The spec for the test.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @spec.setter |
| @abc.abstractmethod |
| def spec(self, value: AbstractTestSpec) -> None: |
| """The spec for the test.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| def is_enabled(self) -> bool: |
| """Whether the test is enabled or not. |
| |
| Tests that are not enabled should still support having pre_run and |
| run called and can produce empty steps to provide information in the |
| build, but users should not call methods dealing with results on |
| tests that are not enabled. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def is_experimental(self) -> bool: |
| """Whether the test is experimental or not. |
| |
| Failures in experimental tests should not fail the builds. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def is_ci_only(self) -> bool: |
| """Whether the test is ci_only or not. |
| |
| If failures are present in ci_only tests, additional information will be |
| added to the build's summary to indicate how to run them on the try builder. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def canonical_name(self) -> str: |
| """Canonical name of the test, no suffix attached.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def step_name(self, suffix: str) -> str: |
| """Helper to uniformly combine tests's name with a suffix. |
| |
| Note this step_name is not necessarily the same as the step_name in actual |
| builds, since there could be post-processing on the step_name by other |
| apis, like swarming (see api.chromium_swarming.get_step_name()). |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def target_name(self) -> str: |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def compile_targets(self) -> Iterable[str]: |
| """the compile targets needed by this test.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def uses_local_devices(self) -> bool: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def isolate_target(self) -> str | None: |
| """The name of the isolate to create for the test.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| def uses_isolate(self) -> bool: |
| """Returns true if the test is run via an isolate. |
| |
| This does not need to be overridden in any subclasses. Overriding |
| isolate_target to return a non-false value will cause the test to |
| report that it uses isolate. |
| """ |
| return bool(self.isolate_target) |
| |
| @property |
| @abc.abstractmethod |
| def locality(self) -> TestLocality: |
| """Where the test is executed.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| def runs_locally(self) -> bool: |
| """Whether the test runs locally.""" |
| return self.locality == TestLocality.LOCAL |
| |
| @property |
| def runs_on_swarming(self) -> bool: |
| """Whether or not the test runs on swarming.""" |
| return self.locality == TestLocality.SWARMING |
| |
| @property |
| def runs_on_skylab(self) -> bool: |
| """Whether the test runs on skylab.""" |
| return self.locality == TestLocality.SKYLAB |
| |
| @property |
| @abc.abstractmethod |
| def supports_rts(self) -> bool: |
| """Determine whether the test supports RTS. |
| |
| Regression Test Selection (RTS) is a mode of operation where a subset of the |
| tests are run. This should be checked before trying to set is_rts to enable |
| RTS. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def is_rts(self) -> bool: |
| """Determine whether the test is currently running with RTS. |
| |
| Regression Test Selection (RTS) is a mode of operation where a subset of the |
| tests are run. This property determines whether this mode is enabled or not. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @is_rts.setter |
| @abc.abstractmethod |
| def is_rts(self, value: bool) -> None: |
| """Set whether the test is currently running with RTS. |
| |
| Regression Test Selection (RTS) is a mode of operation where a subset of the |
| tests are run. This property will enable running only the tests selected by |
| RTS. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def test_id_prefix(self) -> str: |
| """Prefix of test_id in ResultDB. e.g. |
| |
| "ninja://chrome/test:telemetry_gpu_integration_test/trace_test/" |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def option_flags(self) -> TestOptionFlags: |
| """Get the flags that the test uses for TestOptions.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def test_options(self) -> TestOptions: |
| """Get the test options that will be used when running the test.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def retry_only_failed_tests(self) -> bool: |
| """Whether to retry only the failed tests, with patch.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @test_options.setter |
| @abc.abstractmethod |
| def test_options(self, value: TestOptions) -> None: |
| """Set the test options that will be used when running the test.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def pre_run(self, suffix: str, include_utr_instruction: bool = False) -> None: |
| """Steps to execute before running the test.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def run( |
| self, |
| checkout_dir: Path, |
| source_dir: Path, |
| build_dir: Path, |
| suffix: str, |
| info_messages: Iterable[str] = (), |
| ) -> None: |
| """Run the test.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def get_invocation_names(self, suffix: str) -> Iterable[str]: |
| """Returns the invocation names tracking the test's results in RDB.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def failure_on_exit(self, suffix: str) -> bool: |
| """Returns True if the test (or any of its shards) exited non-zero. |
| |
| Used to determine the result of the test in the absence of anything |
| uploaded to RDB. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def did_complete(self, suffix: str) -> bool: |
| """Returns True if the test had a chance to run to completion. |
| |
| False implies invalid results (see below). |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def has_valid_results(self, suffix: str) -> bool: |
| """Returns True if results (failures) are valid. |
| |
| If False, this indicates the test failed but also failed to report any |
| results in machine-readable format. |
| |
| With did_complete() above and deterministic_failures() below, this |
| makes it possible to distinguish between the following cases: |
| a) Test ran, and exited zero, |
| b) Test ran, exited non-zero, and reported failures. |
| c) Test ran, exited non-zero, but did not report any failures: |
| d) Test was not able to run. |
| |
| Both b) and c) are often due to the code-under-test. d) is often due to an |
| infrastructure failure. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def get_rdb_results(self, suffix: str) -> util.RDBPerSuiteResults: |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def update_rdb_results( |
| self, |
| suffix: str, |
| results: util.RDBPerSuiteResults, |
| ) -> None: |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def deterministic_failures(self, suffix: str) -> Set[str]: |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def notrun_failures(self, suffix: str) -> Set[str]: |
| """Returns tests that had status NOTRUN/UNKNOWN. |
| |
| FindIt has special logic for handling for tests with status NOTRUN/UNKNOWN. |
| This method returns test for which every test run had a result of either |
| NOTRUN or UNKNOWN. |
| |
| Returns: |
| not_run_tests: A set of strings. Only valid if valid_results is True. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def known_luci_analysis_flaky_failures(self) -> Set[str]: |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def add_known_luci_analysis_flaky_failures( |
| self, |
| test_names: Iterable[str], |
| ) -> None: |
| """Add known flaky failures on ToT.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def weak_luci_analysis_flaky_failures(self) -> Set[str]: |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def add_weak_luci_analysis_flaky_failure(self, test_name: str) -> None: |
| """Add known weak flaky failures.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def check_flakiness_for_new_tests(self) -> bool: |
| """Whether to check flakiness for new tests in try jobs.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| def failures_including_retry( |
| self, |
| suffix: str, |
| ) -> tuple[bool, Set[str] | None]: |
| """Returns test failures after retries. |
| |
| This method only considers tests to be failures if every test run fails, |
| if the test runner retried tests, they're still considered successes as long |
| as they didn't cause step failures. |
| |
| It also considers retried shards and the known flaky tests on tip of tree |
| when determining if a test failed, which is to say that a test is determined |
| as a failure if and only if it succeeded neither original run or retry and |
| is NOT known to be flaky on tip of tree. |
| |
| Returns: A tuple (valid_results, failures). |
| valid_results: A Boolean indicating whether results are valid. |
| failures: A set of strings. Only valid if valid_results is True. |
| """ |
| original_run_valid = self.has_valid_results(suffix) |
| if original_run_valid: |
| failures = self.deterministic_failures(suffix) |
| retry_suffix = 'retry shards' |
| if suffix: |
| retry_suffix = ' '.join([retry_suffix, suffix]) |
| retry_shards_valid = self.has_valid_results(retry_suffix) |
| if retry_shards_valid: |
| retry_shards_failures = self.deterministic_failures(retry_suffix) |
| |
| if original_run_valid and retry_shards_valid: |
| # Returning retry_shards_failures ensures that if there were passed |
| # tests in the original run that failed in the retry shards, those |
| # failures are exposed in the build. |
| return True, ( |
| set(retry_shards_failures) - self.known_luci_analysis_flaky_failures) |
| |
| if original_run_valid: |
| return True, set(failures) - self.known_luci_analysis_flaky_failures |
| |
| if retry_shards_valid: |
| return True, set( |
| retry_shards_failures) - self.known_luci_analysis_flaky_failures |
| |
| return False, None |
| |
| def with_patch_failures_including_retry(self) -> tuple[bool, Set[str] | None]: |
| return self.failures_including_retry('with patch') |
| |
| # TODO(crbug.com/1040596): Remove this method and update callers to use |
| # |deterministic_failures('with patch')| once the bug is fixed. |
| # |
| # Currently, the sematics of this method is only a subset of |
| # |deterministic_failures('with patch')| due to that it's missing tests that |
| # failed "with patch", but passed in "retry shards with patch". |
| def has_failures_to_summarize(self) -> bool: |
| _, failures = self.failures_including_retry('with patch') |
| return bool(failures or self.known_luci_analysis_flaky_failures) |
| |
| def without_patch_failures_to_ignore(self) -> tuple[bool, Set[str]]: |
| """Returns test failures that should be ignored. |
| |
| Tests that fail in 'without patch' should be ignored, since they're failing |
| without the CL patched in. If a test is flaky, it is treated as a failing |
| test. |
| |
| Returns: A tuple (valid_results, failures_to_ignore). |
| valid_results: A Boolean indicating whether failures_to_ignore is valid. |
| failures_to_ignore: A set of strings. Only valid if valid_results is True. |
| """ |
| results = self.get_rdb_results('without patch') |
| if not self.has_valid_results('without patch') or not results: |
| return (False, None) |
| |
| ignored_failures = set() |
| for test in results.all_tests: |
| for i, status in enumerate(test.statuses): |
| expected = test.expectednesses[i] |
| if status != test_result_pb2.PASS and not expected: |
| ignored_failures.add(test.test_name) |
| break |
| |
| return (True, ignored_failures) |
| |
| def get_status(self, suffix: str) -> str: |
| """Returns the status of the test for the given suffix |
| |
| Determines whether the test suite has succeeded, failed, or has invalid |
| results for the provided suffix, checking exonerations for without |
| patch and retrying shards |
| |
| Args: |
| suffix: String suffix representing the phase of the build used to |
| determine which phases can be used to exonerate the suite. Expected to |
| be 'with patch' to allow 'without patch' to exonerate |
| |
| Returns: A string designating the suite's current status |
| """ |
| if not self.did_complete(suffix): |
| return INCOMPLETE_SUITE_STATUS |
| if suffix == 'with patch': |
| valid, test_failures = self.with_patch_failures_including_retry() |
| if not valid: |
| return INVALID_SUITE_STATUS |
| if not test_failures: |
| return SUCCESS_SUITE_STATUS |
| # Check if the without patch exonerates this suite |
| valid, without_patch_failures = self.deterministic_without_patch_failures( |
| ) |
| if valid and not without_patch_failures: |
| return SUCCESS_SUITE_STATUS |
| return FAILURE_SUITE_STATUS |
| valid, test_failures = self.failures_including_retry(suffix) |
| if not valid: |
| return INVALID_SUITE_STATUS |
| if test_failures: |
| return FAILURE_SUITE_STATUS |
| return SUCCESS_SUITE_STATUS |
| |
| def deterministic_without_patch_failures( |
| self) -> tuple[bool, Set[str] | None]: |
| # Check if the suite succeeded in without patch |
| valid_results, ignored_failures = self.without_patch_failures_to_ignore() |
| if not valid_results: |
| return False, None |
| |
| valid_results, test_failures = self.with_patch_failures_including_retry() |
| assert valid_results, ( |
| "If there were no valid results, then there was no " |
| "point in running 'without patch'. This is a recipe bug.") |
| |
| # The FAILURE and NOTRUN test statuses are both considered deterministic |
| # failures. But some suites can have trouble during later phases, causing |
| # some tests that exited with FAILURE in the 'with patch' phase to exit |
| # with NOTRUN in the 'without patch' phase. So when a 'without patch' test |
| # fails with a different status, don't ignore it. |
| if ignored_failures: |
| with_patch_notruns = self.notrun_failures('with patch') |
| without_patch_notruns = self.notrun_failures('without patch') |
| for ignored_failure in ignored_failures.copy(): |
| if ((ignored_failure in with_patch_notruns) != |
| (ignored_failure in without_patch_notruns)): |
| ignored_failures.remove(ignored_failure) |
| # Remove the tests that failed wo patch |
| return True, test_failures - ignored_failures |
| |
| |
| @attrs() |
| class TestSpec(AbstractTestSpec): |
| """Abstract base class for specs for tests. |
| |
| Attributes: |
| * name - The displayed name of the test. |
| * target_name - The ninja build target for the test, a key in |
| one of the gn_isolate_map.pyl files being used, e.g. "browser_tests". |
| * full_test_target - A fully qualified Ninja target, e.g. |
| "//chrome/test:browser_tests". |
| * waterfall_builder_group - The matching waterfall builder group. |
| This value would be the builder group of the mirrored builder for |
| a try builder. |
| * waterfall_buildername - The matching waterfall builder name. This |
| value would be the name of the mirrored builder for a try builder. |
| * resultdb - The ResultDB integration configuration. If |
| `resultdb.enable` is not True, then ResultDB integration is |
| disabled. |
| * test_id_prefix: A prefix to be added to the test Id for the test |
| e.g. |
| "ninja://chrome/test:telemetry_gpu_integration_test/trace_test/". |
| * retry_only_failed_tests: Whether to retry only the failed tests, with |
| patch. The alternative is the status quo of retrying the entire shard. |
| """ |
| |
| _name = attrib(str) |
| target_name = attrib(str) |
| description = attrib(str, default=None) |
| full_test_target = attrib(str, default=None) |
| waterfall_builder_group = attrib(str, default=None) |
| waterfall_buildername = attrib(str, default=None) |
| resultdb = attrib(ResultDB, default=ResultDB.create()) |
| # TODO(crbug/1106965): remove test_id_prefix, if deriver gets turned down. |
| test_id_prefix = attrib(str, default=None) |
| check_flakiness_for_new_tests = attrib(bool, default=True) |
| results_handler_name = attrib(str, default=None) |
| retry_only_failed_tests = attrib(bool, default=True) |
| |
| @property |
| def name(self): |
| """The name of the step without a phase suffix. |
| |
| Additional suffixes may be present (e.g. os and GPU for swarming |
| tests). |
| """ |
| return self._name |
| |
| @property |
| def canonical_name(self): |
| """Canonical name of the test, no suffix attached.""" |
| return self._name |
| |
| @classmethod |
| def create(cls, name, **kwargs): |
| """Create a TestSpec. |
| |
| Arguments: |
| * name - The name of the test. The returned spec will have this |
| value for name. |
| * kwargs - Additional keyword arguments that will be used to |
| initialize the attributes of the returned spec. If the |
| `target_name` keyword is not set, the `target_name` attribute of |
| the returned spec have the value of `name`. |
| """ |
| kwargs['target_name'] = kwargs.get('target_name') or name |
| return cls(name=name, **kwargs) |
| |
| @property |
| @abc.abstractmethod |
| def test_class(self): |
| """The test class associated with the spec.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| def get_test(self, chromium_tests_api): |
| """Get the test described by the spec.""" |
| return self.test_class(self, chromium_tests_api) |
| |
| |
| class Test(AbstractTest): |
| """ |
| Base class for a test suite that can be run locally or remotely. |
| |
| Tests consist of three components: |
| * configuration |
| * logic for how to run the test |
| * results |
| |
| The logic for how to run a test should only depend on the configuration, and |
| not on global state. For example, once a SwarmingTest has been configured, |
| calling pre_run() or run() should work regardless of context. |
| |
| The only exception is for local tests that depend on file-system state. We do |
| not want to add the complexity of a file-system cache to this class, so |
| changes to the file-system can affect behavior of the test. |
| |
| As part of this contract, methods that access configuration or results should |
| never take an "api" parameter; the configuration and results should be |
| self-contained. Likewise, the logic for running tests must take an "api" |
| parameter to access relevant recipe modules, but should not look up state from |
| those modules; the state should already be stored in the configuration. |
| """ |
| |
| def __init__(self, spec, chromium_tests_api): |
| super().__init__() |
| |
| self._spec = spec |
| self._chromium_tests_api = chromium_tests_api |
| |
| self._test_options = TestOptions.create() |
| |
| # Set of test names that are either flaky or deterministically failing on |
| # ToT, according to luci analysis criteria. |
| # TODO (crbug/1314194): Update name to something else since |
| # this also includes tests failing on ToT |
| self._known_luci_analysis_flaky_failures = set() |
| |
| # Set of test names that barely meet luci analysis flaky criteria. These can |
| # trigger a retry of the shard to avoid data cannibalization |
| self._weak_luci_analysis_flaky_failures = set() |
| |
| # Used to track results of tests as reported by RDB. Separate from |
| # _deterministic_failures above as that is populated by parsing the tests' |
| # JSON results, while this field is populated entirely by RDB's API. Also |
| # keyed via suffix like _deterministic_failures above. |
| self._rdb_results = {} |
| |
| # Maps suffix to wheter or not the test exited non-zero. In conjunction with |
| # _rdb_results above, can safely handle any type of test failure without |
| # inspecting JSON. |
| self._failure_on_exit_suffix_map = {} |
| |
| # Marks the test as using RTS. When enabled this suite will only run the |
| # tests chosen by RTS. |
| self._is_rts = False |
| |
| # Include the UTR instructions in the reproduction instruction to run this |
| # test |
| self._include_utr_instruction = False |
| |
| @property |
| def spec(self) -> AbstractTestSpec: |
| return self._spec |
| |
| @spec.setter |
| def spec(self, value: AbstractTestSpec) -> None: |
| self._spec = value |
| |
| @property |
| def is_enabled(self) -> bool: |
| return True |
| |
| @property |
| def is_experimental(self) -> bool: |
| return False |
| |
| @property |
| def is_ci_only(self) -> bool: |
| return False |
| |
| @property |
| def option_flags(self) -> TestOptionFlags: |
| return _DEFAULT_OPTION_FLAGS |
| |
| @property |
| def test_options(self) -> TestOptions: |
| return self._test_options |
| |
| @test_options.setter |
| def test_options(self, value: TestOptions) -> None: |
| self._test_options = value |
| |
| @property |
| def name(self) -> str: |
| return self.spec.name |
| |
| @property |
| def canonical_name(self) -> str: |
| return self.spec.canonical_name |
| |
| @property |
| def target_name(self) -> str: |
| return self.spec.target_name |
| |
| @property |
| def check_flakiness_for_new_tests(self) -> bool: |
| """Whether to check flakiness for new tests in try jobs. |
| |
| Default True unless specified in test spec json files. |
| """ |
| return self.spec.check_flakiness_for_new_tests |
| |
| @property |
| def test_id_prefix(self) -> str: |
| return self.spec.test_id_prefix |
| |
| @property |
| def isolate_target(self) -> str | None: |
| """Returns isolate target name. |
| |
| Test types that use isolate should override this to return the |
| appropriate isolate target. |
| """ |
| return None |
| |
| @property |
| def supports_rts(self) -> bool: |
| """Determine whether the test supports RTS. |
| |
| Test types that support RTS should override this. |
| """ |
| return False |
| |
| @property |
| def is_rts(self) -> bool: |
| return self._is_rts |
| |
| @is_rts.setter |
| def is_rts(self, value: bool) -> None: |
| if value: |
| assert self.supports_rts |
| self._is_rts = value |
| |
| @property |
| def retry_only_failed_tests(self) -> bool: |
| return self.spec.retry_only_failed_tests |
| |
| @property |
| def api(self): |
| """Returns the chromium_tests RecipeApi object associated with the test.""" |
| return self._chromium_tests_api |
| |
| def get_rdb_results(self, suffix: str) -> util.RDBPerSuiteResults: |
| return self._rdb_results.get(suffix) |
| |
| def update_rdb_results( |
| self, |
| suffix: str, |
| results: util.RDBPerSuiteResults, |
| ) -> None: |
| self._rdb_results[suffix] = results |
| |
| @property |
| def known_luci_analysis_flaky_failures(self) -> Set[str]: |
| return self._known_luci_analysis_flaky_failures |
| |
| @property |
| def weak_luci_analysis_flaky_failures(self) -> Set[str]: |
| return self._weak_luci_analysis_flaky_failures |
| |
| def add_known_luci_analysis_flaky_failures( |
| self, |
| test_names: Iterable[str], |
| ) -> None: |
| self._known_luci_analysis_flaky_failures.update(test_names) |
| |
| def add_weak_luci_analysis_flaky_failure(self, test_name: str) -> None: |
| self._weak_luci_analysis_flaky_failures.add(test_name) |
| |
| def _update_failure_on_exit(self, suffix, failure_on_exit): |
| self._failure_on_exit_suffix_map[suffix] = failure_on_exit |
| rdb_results = self._rdb_results.get(suffix) |
| if rdb_results: |
| self._rdb_results[suffix] = rdb_results.with_failure_on_exit( |
| failure_on_exit) |
| |
| def failure_on_exit(self, suffix: str) -> bool: |
| return self._failure_on_exit_suffix_map.get(suffix, True) |
| |
| def did_complete(self, suffix: str) -> bool: |
| return True |
| |
| def has_valid_results(self, suffix: str) -> bool: |
| if suffix not in self._rdb_results: |
| return False |
| |
| return not self._rdb_results[suffix].invalid |
| |
| def deterministic_failures(self, suffix: str) -> Set[str]: |
| failure_msg = ( |
| 'There is no data for the test run suffix ({0}). This should never ' |
| 'happen as all calls to deterministic_failures() should first check ' |
| 'that the data exists.'.format(suffix)) |
| assert suffix in self._rdb_results, failure_msg |
| return { |
| t.test_name for t in self._rdb_results[suffix].unexpected_failing_tests |
| } |
| |
| def notrun_failures(self, suffix: str) -> Set[str]: |
| assert self.has_valid_results(suffix), ( |
| 'notrun_failures must only be called when the test run is known to ' |
| 'have valid results.') |
| return set( |
| t.test_name for t in self._rdb_results[suffix].unexpected_skipped_tests) |
| |
| @property |
| def uses_local_devices(self) -> bool: |
| return False |
| |
| def step_name(self, suffix: str) -> str: |
| step_name = _add_suffix(self.name, suffix) |
| return step_name |
| |
| def _tests_to_retry(self, suffix: str) -> Set[str] | None: |
| """Computes the tests to run on an invocation of the test suite. |
| |
| Args: |
| suffix: A unique identifier for this test suite invocation. Only supported |
| suffix will get a list of filtered tests. Others will return None |
| meaning all tests should be run. |
| |
| Returns: |
| A list of tests to retry. Returning None means all tests should be run. |
| """ |
| if suffix == 'retry shards': |
| valid_results, failures = self.failures_including_retry('') |
| # Invalid results should be treated as if every test failed. |
| return failures if valid_results else None |
| |
| if (suffix == 'without patch' or |
| (suffix == 'retry shards with patch' and self.retry_only_failed_tests)): |
| valid_results, failures = self.with_patch_failures_including_retry() |
| # Invalid results should be treated as if every test failed. |
| return failures if valid_results else None |
| |
| # If we don't recognize the step, then return None. This makes it easy for |
| # bugs to slip through, but this matches the previous behavior. Importantly, |
| # all the tests fail to pass a suffix. |
| return None |
| |
| def _present_rdb_results(self, |
| step_result, |
| rdb_results, |
| as_nested_step=False): |
| """Add a summary of test failures tracked in RDB to the given step_result. |
| |
| This duplicates info present in the "Test Results" tab in the new Milo UI. |
| TODO(crbug.com/1245085): Remove this if/when all users have migrated to |
| the new UI. |
| |
| Args: |
| step_result: Active step to display test info on. |
| rdb_results: A util.RDBPerSuiteResults instance containing the suite's |
| results. |
| as_nested_step: True if we need to treat step_result as a nested step |
| for display purposes. |
| """ |
| if not rdb_results or not rdb_results.unexpected_failing_tests: |
| return |
| |
| _, failures_text = self.api.m.test_utils.limit_failures( |
| sorted([t.test_name for t in rdb_results.unexpected_failing_tests])) |
| display_text = self.api.m.presentation_utils.format_step_text( |
| [['deterministic failures [caused step to fail]:', failures_text]]) |
| if as_nested_step: |
| step_result.step_text += display_text |
| else: |
| step_result.presentation.step_text += display_text |
| |
| def _instructions_tag_for_suffix(self, instruction_type: str, |
| suffix: str) -> str: |
| identifiers = [self.name, instruction_type, suffix] |
| return 'tag_' + str( |
| hashlib.sha1(''.join( |
| str(hashlib.sha1(identifier.encode('utf-8')).hexdigest()) |
| for identifier in identifiers |
| if identifier).encode('utf-8')).hexdigest()) |
| |
| |
| class AbstractSwarmingTest(AbstractTest): |
| """Interface for tests that run on swarming.""" |
| |
| @property |
| @abc.abstractmethod |
| def raw_cmd(self) -> Iterable[str]: |
| raise NotImplementedError() # pragma: no cover |
| |
| @raw_cmd.setter |
| @abc.abstractmethod |
| def raw_cmd(self, value: Iterable[str]) -> None: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def rts_raw_cmd(self) -> Iterable[str]: |
| raise NotImplementedError() # pragma: no cover |
| |
| @rts_raw_cmd.setter |
| @abc.abstractmethod |
| def rts_raw_cmd(self, value: Iterable[str]) -> None: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def relative_cwd(self) -> str: |
| raise NotImplementedError() # pragma: no cover |
| |
| @relative_cwd.setter |
| @abc.abstractmethod |
| def relative_cwd(self, value: str) -> None: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def isolate_profile_data(self) -> bool: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def shards(self) -> int: |
| raise NotImplementedError() # pragma: no cover |
| |
| @abc.abstractmethod |
| def get_task(self, suffix: str) -> chromium_swarming.SwarmingTask: |
| raise NotImplementedError() # pragma: no cover |
| |
| |
| class AbstractSkylabTest(AbstractTest): |
| """Interface for tests that run on skylab.""" |
| |
| @property |
| @abc.abstractmethod |
| def is_GPU_test(self) -> bool: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def ctp_build_ids(self) -> dict[str, int]: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def test_runner_builds(self) -> dict[str, TestRunner]: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def exe_rel_path(self) -> str: |
| raise NotImplementedError() # pragma: no cover |
| |
| @exe_rel_path.setter |
| @abc.abstractmethod |
| def exe_rel_path(self, value: str) -> None: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def lacros_gcs_path(self) -> str: |
| raise NotImplementedError() # pragma: no cover |
| |
| @lacros_gcs_path.setter |
| @abc.abstractmethod |
| def lacros_gcs_path(self, value: str) -> None: |
| raise NotImplementedError() # pragma: no cover |
| |
| @property |
| @abc.abstractmethod |
| def build_output_dir(self) -> str: |
| raise NotImplementedError() # pragma: no cover |
| |
| @build_output_dir.setter |
| @abc.abstractmethod |
| def build_output_dir(self, value: str) -> None: |
| raise NotImplementedError() # pragma: no cover |
| |
| @attrs() |
| class TestWrapperSpec(AbstractTestSpec): |
| """Abstract base class for specs for test wrappers. |
| |
| Attributes: |
| * test_spec - The spec for the wrapped test. |
| """ |
| |
| _test_spec = attrib(AbstractTestSpec) |
| |
| @classmethod |
| def create(cls, test_spec, **kwargs): |
| """Create a TestWrapperSpec. |
| |
| Arguments: |
| * test_spec - The spec for the wrapped test. |
| * kwargs - Additional keyword arguments that will be used to |
| initialize the attributes of the returned spec. |
| """ |
| return cls(test_spec, **kwargs) |
| |
| def get_test(self, chromium_tests_api): |
| """Get the test described by the spec.""" |
| return self.test_wrapper_class(self, |
| self._test_spec.get_test(chromium_tests_api), |
| chromium_tests_api) |
| |
| @property |
| @abc.abstractmethod |
| def test_wrapper_class(self): |
| """The test wrapper class associated with the spec.""" |
| raise NotImplementedError() # pragma: no cover |
| |
| |
| class _TestDelegateAbstractMeta(abc.ABCMeta): |
| """A metaclass that delegates abstract methods to a wrapped test. |
| |
| When a new class is created by this metaclass, any abstractmethod |
| defined in the new class' bases will be overridden with a method that |
| will call the method with the same name on the instance's _test |
| attribute. Properties will be similarly overriden to get/set the |
| attribute of the same name from the instance's test attribute. |
| """ |
| |
| def __new__(mcs, class_name, bases, namespace, /, **kwargs): |
| for base in bases: |
| for name, value in inspect.getmembers(base, mcs._is_abstractmethod): |
| if name not in namespace: |
| if isinstance(value, property): |
| delegate = mcs._test_wrapper_delegate_property(name, value) |
| else: |
| delegate = mcs._test_wrapper_delegate_method(name) |
| namespace[name] = delegate |
| return super().__new__(mcs, class_name, bases, namespace, **kwargs) |
| |
| @staticmethod |
| def _is_abstractmethod(obj): |
| # How abc.ABC determines if a method is abstract, see |
| # https://docs.python.org/3.8/library/abc.html#abc.abstractmethod |
| return getattr(obj, '__isabstractmethod__', False) |
| |
| @staticmethod |
| def _test_wrapper_delegate_method(name): |
| |
| def wrapped(self, *args, **kwargs): |
| return getattr(self._test, name)(*args, **kwargs) |
| |
| return wrapped |
| |
| @classmethod |
| def _test_wrapper_delegate_property(mcs, name, prop): |
| fget = prop.fget |
| if mcs._is_abstractmethod(fget): |
| |
| def fget(self): |
| return getattr(self._test, name) |
| |
| fset = prop.fset |
| if mcs._is_abstractmethod(fset): |
| |
| def fset(self, value): |
| return setattr(self._test, name, value) |
| |
| return property(fget, fset) |
| |
| |
| class TestWrapper( |
| AbstractSwarmingTest, |
| AbstractSkylabTest, |
| AbstractTest, |
| # This handles delegating abstract methods in the base classes to _test |
| metaclass=_TestDelegateAbstractMeta, |
| ): |
| """A base class for wrapping Tests to modify behavior. |
| |
| All abstract methods in base classes are automatically overriden to |
| defer to the wrapped test. Subclasses are free to override the |
| behavior for these methods. |
| |
| TestWrapper implements the interface for swarming tests and skylab |
| tests because arbitrary tests can be wrapped, but the corresponding |
| methods should only be called if the wrapped test is of the |
| corresponding type. |
| """ |
| |
| def __init__(self, spec, test, chromium_tests_api): |
| super().__init__() |
| self._wrapper_spec = spec |
| self._test = test |
| self._chromium_tests_api = chromium_tests_api |
| |
| @property |
| def api(self): |
| """Returns the chromium_tests RecipeApi object associated with the test.""" |
| return self._chromium_tests_api |
| |
| @property |
| def _disabled_message(self) -> str: |
| """A message that explains why this test is disabled. |
| |
| An empty message indicates that the test is not disabled. |
| TestWrappers that disable the test should override this to return a |
| non-empty value. |
| """ |
| return '' |
| |
| @property |
| def _info_message(self) -> str: |
| """Optional info to display in the step for the test. |
| |
| This will be called to provide additional information for enabled |
| tests. This will not be called if disabled_message is non-empty. |
| """ |
| return '' |
| |
| @property |
| def is_enabled(self): |
| return not self._disabled_message and self._test.is_enabled |
| |
| def pre_run(self, suffix: str, include_utr_instruction: bool = False) -> None: |
| if not self._disabled_message: |
| return self._test.pre_run(suffix, include_utr_instruction) |
| |
| def run( |
| self, |
| checkout_dir: Path, |
| source_dir: Path, |
| build_dir: Path, |
| suffix: str, |
| info_messages: Iterable[str] = (), |
| ) -> None: |
| # Don't call methods on self that take the suffix, if the subclass performs |
| # suffix modification then the suffix passed in should already be modified |
| if self._disabled_message: |
| info_messages = itertools.chain([self._disabled_message], info_messages) |
| self.api.m.step.empty( |
| self._test.step_name(suffix), step_text='\n'.join(info_messages)) |
| return |
| |
| if self._info_message: |
| info_messages = itertools.chain([self._info_message], info_messages) |
| return self._test.run(checkout_dir, source_dir, build_dir, suffix, |
| info_messages) |
| |
| |
| class CiOnlyTestSpec(TestWrapperSpec): |
| """A spec for a test that is marked ci_only.""" |
| |
| @property |
| def test_wrapper_class(self): |
| """The test wrapper class associated with the spec.""" |
| return CiOnlyTest |
| |
| |
| class CiOnlyTest(TestWrapper): |
| """A test wrapper that runs the wrapped test that is marked ci_only.""" |
| |
| def __init__(self, spec, test, chromium_tests_api): |
| super().__init__(spec, test, chromium_tests_api) |
| self._disabled = self._compute_disabled() |
| |
| @property |
| def is_ci_only(self) -> bool: |
| return True |
| |
| @property |
| def check_flakiness_for_new_tests(self) -> bool: |
| # There isn't sufficient result history data for tests that are normally |
| # CI-only. |
| return False |
| |
| def _compute_disabled(self) -> bool: |
| if not self.api.m.tryserver.is_tryserver: |
| return False |
| |
| if (self.api.m.cv.active and |
| self.api.m.cv.run_mode in self.api.MEGA_CQ_MODE_NAMES): |
| return False |
| |
| enabled_tests_by_builder = self.api.get_footer_enabled_ci_only_tests() |
| |
| def test_is_enabled_for_builder(builder): |
| tests_for_builder = enabled_tests_by_builder.get(builder, set()) |
| return ('*' in tests_for_builder or |
| self.canonical_name in tests_for_builder) |
| |
| if test_is_enabled_for_builder('*'): |
| return False |
| |
| if test_is_enabled_for_builder(self._builder_id): |
| return False |
| |
| return True |
| |
| @property |
| def _builder_id(self) -> str: |
| spec = self._test.spec |
| return f'{spec.waterfall_builder_group}:{spec.waterfall_buildername}' |
| |
| @property |
| def _footer_to_enable(self) -> str: |
| return f'{INCLUDE_CI_FOOTER}: {self._builder_id}|{self.canonical_name}' |
| |
| @property |
| def _disabled_message(self): |
| return (("This test is not being run because it is marked 'ci_only'." |
| f" Add '{self._footer_to_enable}' to CL footers to override.") |
| if self._disabled else '') |
| |
| @property |
| def _info_message(self): |
| if (self.api.m.cv.active and |
| self.api.m.cv.run_mode in self.api.MEGA_CQ_MODE_NAMES): |
| return 'This test is being run on Mega CQ runs' |
| |
| if self.api.m.tryserver.is_tryserver: |
| return ('This test is being run due to the' |
| f' {INCLUDE_CI_FOOTER} gerrit footer') |
| return ('This test will not be run on try builders by default, add the ' |
| f'following CL footer to override: `{self._footer_to_enable}`\n') |
| |
| |
| class SuccessReuseTestSpec(TestWrapperSpec): |
| """A spec for a test that is being skipped.""" |
| |
| @property |
| def test_wrapper_class(self): |
| """The test wrapper class associated with the spec.""" |
| return SuccessReuseTest |
| |
| |
| class SuccessReuseTest(TestWrapper): |
| """A test wrapper that provides steps for a skipped test.""" |
| |
| @property |
| def _disabled_message(self): |
| return ("This test is not being run because it has passed in the last 24 " |
| "hours with the equivalent patchset") |
| |
| |
| @attrs() |
| class ExperimentalTestSpec(TestWrapperSpec): |
| """A spec for a test to be executed at some percentage.""" |
| |
| experiment_percentage = attrib(int) |
| |
| @classmethod |
| def create(cls, test_spec, experiment_percentage): # pylint: disable=line-too-long,arguments-differ |
| """Create an ExperimentalTestSpec. |
| |
| Arguments: |
| * test_spec - The spec of the wrapped test. |
| * experiment_percentage - The percentage chance that the test will be |
| executed. |
| """ |
| experiment_percentage = max(0, min(100, experiment_percentage)) |
| return super().create( |
| test_spec, experiment_percentage=experiment_percentage) |
| |
| @property |
| def test_wrapper_class(self): |
| """The test wrapper class associated with the spec.""" |
| return ExperimentalTest |
| |
| |
| class ExperimentalTest(TestWrapper): |
| """A test wrapper that runs the wrapped test on an experimental test. |
| |
| Experimental tests: |
| - can run at <= 100%, depending on the experiment_percentage. |
| - will not cause the build to fail. |
| """ |
| |
| def __init__(self, spec, test, chromium_tests_api): |
| super().__init__(spec, test, chromium_tests_api) |
| self._is_in_experiment = self._calculate_is_in_experiment() |
| |
| def _calculate_is_in_experiment(self): |
| # Arbitrarily determine whether to run the test based on its experiment |
| # key. Tests with the same experiment key should always either be in the |
| # experiment or not; i.e., given the same key, this should always either |
| # return True or False, but not both. |
| # |
| # The experiment key is either: |
| # - builder name + patchset + name of the test, for trybots |
| # - builder name + build number + name of the test, for CI bots |
| # |
| # These keys: |
| # - ensure that the same experiment configuration is always used for |
| # a given patchset |
| # - allow independent variation of experiments on the same test |
| # across different builders |
| # - allow independent variation of experiments on different tests |
| # across a single build |
| # |
| # The overall algorithm is copied from the CQ's implementation of |
| # experimental builders, albeit with different experiment keys. |
| criteria = [ |
| self.api.m.buildbucket.builder_name, |
| (self.api.m.tryserver.gerrit_change and |
| self.api.m.tryserver.gerrit_change.change) or |
| self.api.m.buildbucket.build.number or '0', |
| self.name, |
| ] |
| |
| digest = hashlib.sha1(''.join( |
| str(c) for c in criteria).encode('utf-8')).digest() |
| short = struct.unpack_from('<H', digest)[0] |
| return self._wrapper_spec.experiment_percentage * 0xffff >= short * 100 |
| |
| def _experimental_suffix(self, suffix): |
| if not suffix: |
| return 'experimental' |
| return '%s, experimental' % (suffix) |
| |
| def _actually_has_valid_results(self, suffix): |
| """Check if the underlying test produced valid results. |
| |
| The ExperimentalTest reports that it always has valid results, so |
| various result methods (failures, notrun_failures, etc.) will be |
| called. If the underlying test does not have valid results, then |
| calling the superclass version of the method would violate the |
| contract, so this method indicates if calling the superclass version |
| should be safe. |
| """ |
| return super().has_valid_results(self._experimental_suffix(suffix)) |
| |
| @property |
| def _disabled_message(self): |
| if self._is_in_experiment: |
| return '' |
| return 'This test was not selected for its experiment in this build' |
| |
| @property |
| def _info_message(self): |
| return ('This is an experimental test that was selected for this build,' |
| ' failures will not cause build failures') |
| |
| @property |
| def is_experimental(self) -> bool: |
| return True |
| |
| def step_name(self, suffix: str) -> str: |
| return self._test.step_name(self._experimental_suffix(suffix)) |
| |
| #override |
| def pre_run(self, suffix: str, include_utr_instruction: bool = False) -> None: |
| try: |
| return super().pre_run( |
| self._experimental_suffix(suffix), include_utr_instruction) |
| except self.api.m.step.StepFailure: |
| pass |
| |
| #override |
| def run( |
| self, |
| checkout_dir: Path, |
| source_dir: Path, |
| build_dir: Path, |
| suffix: str, |
| info_messages: Iterable[str] = (), |
| ) -> None: |
| try: |
| return super().run( |
| checkout_dir, |
| source_dir, |
| build_dir, |
| self._experimental_suffix(suffix), |
| info_messages, |
| ) |
| except self.api.m.step.StepFailure as e: |
| return e.result |
| |
| def get_task(self, suffix: str) -> chromium_swarming.SwarmingTask: |
| return self._test.get_task(self._experimental_suffix(suffix)) |
| |
| #override |
| def has_valid_results(self, suffix: str) -> bool: |
| # Call the wrapped test's implementation in case it has side effects, but |
| # ignore the result. |
| super().has_valid_results(self._experimental_suffix(suffix)) |
| return True |
| |
| #override |
| def failure_on_exit(self, suffix: str) -> bool: |
| # Call the wrapped test's implementation in case it has side effects, but |
| # ignore the result. |
| super().failure_on_exit(self._experimental_suffix(suffix)) |
| return False |
| |
| #override |
| def deterministic_failures(self, suffix: str) -> Set[str]: |
| if self._actually_has_valid_results(suffix): |
| # Call the wrapped test's implementation in case it has side effects, |
| # but ignore the result. |
| super().deterministic_failures(self._experimental_suffix(suffix)) |
| return [] |
| |
| #override |
| def notrun_failures( |
| self, |
| suffix: str, |
| ) -> Set[str]: # pragma: no cover |
| if self._actually_has_valid_results(suffix): |
| # Call the wrapped test's implementation in case it has side effects, |
| # but ignore the result. |
| super().notrun_failures(self._experimental_suffix(suffix)) |
| return set() |
| |
| def get_invocation_names(self, suffix: str) -> Iterable[str]: |
| return super().get_invocation_names(self._experimental_suffix(suffix)) |
| |
| def get_rdb_results(self, suffix: str) -> util.RDBPerSuiteResults: |
| return super().get_rdb_results(self._experimental_suffix(suffix)) |
| |
| def update_rdb_results( |
| self, |
| suffix: str, |
| results: util.RDBPerSuiteResults, |
| ) -> None: |
| return super().update_rdb_results( |
| self._experimental_suffix(suffix), results) |
| |
| |
| class LocalTest(Test): |
| """Abstract class for local tests. |
| |
| This class contains logic related to running tests locally, namely for local |
| RDB invocations. All of which is intended to be shared with any subclasses. |
| """ |
| |
| def __init__(self, spec, chromium_tests_api): |
| super().__init__(spec, chromium_tests_api) |
| self._suffix_to_invocation_names = {} |
| |
| @property |
| def locality(self) -> TestLocality: |
| return TestLocality.LOCAL |
| |
| def pre_run(self, suffix: str, include_utr_instruction: bool = False) -> None: |
| del suffix |
| |
| def get_invocation_names(self, suffix: str) -> Iterable[str]: |
| inv = self._suffix_to_invocation_names.get(suffix) |
| return [inv] if inv else [] |
| |
| def _prep_local_rdb(self, |
| source_dir: Path, |
| *, |
| temp=None, |
| include_artifacts=True): |
| """Returns a ResultDB instance suitable for local test runs. |
| |
| Main difference between remote swarming runs and local test runs (ie: |
| ScriptTests and LocalIsolatedScriptTests) is the location of a temp |
| result file and the location of the result_adapter binary. |
| |
| Args: |
| source_dir: The path to the top-level repo. |
| temp: Path to temp file to store results. |
| include_artifacts: If True, add the parent dir of temp as an artifact dir. |
| """ |
| temp = temp or self.api.m.path.mkstemp() |
| artifact_dir = self.api.m.path.dirname(temp) if include_artifacts else '' |
| base_tags = None |
| if (self.api.m.chromium.c and self.api.m.chromium.c.TARGET_PLATFORM): |
| base_tags = (('target_platform', self.api.m.chromium.c.TARGET_PLATFORM),) |
| resultdb = attr.evolve( |
| self.spec.resultdb, |
| artifact_directory=artifact_dir, |
| base_tags=base_tags, |
| base_variant=dict( |
| self.spec.resultdb.base_variant or {}, |
| test_suite=self.canonical_name), |
| result_adapter_path=str(source_dir / 'tools/resultdb/result_adapter'), |
| result_file=self.api.m.path.abspath(temp), |
| # Give each local test suite its own invocation to make it easier to |
| # fetch results. |
| include=True) |
| return resultdb |
| |
| def _update_inv_name_from_stderr(self, stderr, suffix): |
| """Scans the given stderr for a local test for the test's invocation name. |
| |
| And updates self._suffix_to_invocation_names with the name. |
| |
| Args: |
| stderr: stderr from the step_data.StepData obj returned from the step |
| wrapped with `rdb stream -new ...`. |
| suffix: String suffix representing the phase of the build. |
| """ |
| # TODO(crbug.com/1227180): Specify our own custom invocation name rather |
| # than parsing stderr. |
| match = RDB_INVOCATION_NAME_RE.search(stderr) |
| if match: |
| inv_name = match.group(1) |
| self._suffix_to_invocation_names[suffix] = inv_name |
| |
| |
| @attrs() |
| class ScriptTestSpec(TestSpec): |
| """A spec for a test that runs a script. |
| |
| Attributes: |
| * script - The filename of a script to run. The script must be |
| located within the //testing/scripts directory of the checkout. |
| * compile_targets - The compile targets that need to be built to run |
| the script. |
| * script_args - Arguments to be passed to the script. |
| """ |
| |
| script = attrib(str) |
| compile_targets = attrib(sequence[str]) |
| script_args = attrib(command_args, default=()) |
| |
| @property |
| def test_class(self): |
| """The test class associated with the spec.""" |
| return ScriptTest |
| |
| |
| class ScriptTest(LocalTest): |
| """ |
| Test which uses logic from script inside chromium repo. |
| |
| This makes it possible to keep the logic src-side as opposed |
| to the build repo most Chromium developers are unfamiliar with. |
| |
| Another advantage is being to test changes to these scripts |
| on trybots. |
| |
| All new tests are strongly encouraged to use this infrastructure. |
| """ |
| |
| def compile_targets(self) -> Iterable[str]: |
| return self.spec.compile_targets |
| |
| def run( |
| self, |
| checkout_dir: Path, |
| source_dir: Path, |
| build_dir: Path, |
| suffix: str, |
| info_messages: Iterable[str] = (), |
| ) -> None: |
| del checkout_dir |
| |
| run_args = [] |
| |
| tests_to_retry = self._tests_to_retry(suffix) |
| if tests_to_retry: |
| run_args.extend( |
| ['--filter-file', |
| self.api.m.json.input(sorted(tests_to_retry))]) |
| |
| resultdb = self._prep_local_rdb(source_dir) |
| |
| step_test_data = lambda: ( |
| self.api.m.json.test_api.output({ |
| 'valid': True, |
| 'failures': [] |
| }) + self.api.m.raw_io.test_api.stream_output_text( |
| 'rdb-stream: included "invocations/test-name" in ' |
| '"invocations/build-inv"', 'stderr')) |
| |
| script_args = [] |
| if self.spec.script_args: |
| script_args = ['--args', self.api.m.json.input(self.spec.script_args)] |
| |
| # Enforce that all scripts are in the specified directory for |
| # consistency. |
| common_args, paths, properties = ( |
| self.api.m.chromium_tests.get_common_args_for_scripts( |
| source_dir, build_dir)) |
| cmd = ([ |
| 'vpython3', |
| (source_dir / 'testing/scripts' / |
| self.api.m.path.basename(self.spec.script)) |
| ] + common_args + script_args + |
| ['run', '--output', self.api.m.json.output()] + run_args) |
| step_name = self.step_name(suffix) |
| if resultdb: |
| cmd = resultdb.wrap(self.api.m, cmd, step_name=step_name) |
| result = self.api.m.step( |
| step_name, |
| cmd=cmd, |
| raise_on_failure=False, |
| stderr=self.api.m.raw_io.output_text( |
| add_output_log=True, name='stderr'), |
| step_test_data=step_test_data) |
| result.presentation.logs['paths.json'] = str(paths) |
| result.presentation.logs['properties.json'] = str(properties) |
| |
| status = result.presentation.status |
| |
| failures = None |
| if result.json.output: |
| failures = result.json.output.get('failures') |
| if failures is None: |
| self.api.m.step.empty( |
| '%s with suffix %s had an invalid result' % (self.name, suffix), |
| status=self.api.m.step.FAILURE, |
| step_text=( |
| 'The recipe expected the result to contain the key \'failures\'.' |
| ' Contents are:\n%s' % |
| self.api.m.json.dumps(result.json.output, indent=2))) |
| |
| self._update_failure_on_exit(suffix, result.retcode != 0) |
| |
| _, failures = self.api.m.test_utils.limit_failures(failures) |
| result.presentation.step_text += ( |
| self.api.m.presentation_utils.format_step_text([['failures:', |
| failures]])) |
| |
| self._update_inv_name_from_stderr(result.stderr, suffix) |
| |
| _present_info_messages(result.presentation, self, info_messages) |
| |
| self.api.m.step.raise_on_failure(result, status) |
| |
| |
| @attrs() |
| class LocalGTestTestSpec(TestSpec): |
| """A spec for a test that runs a gtest-based test locally. |
| |
| Attributes: |
| * args - Arguments to be passed to the test. |
| * use_xvfb - Whether to use the X virtual frame buffer. Only has an |
| effect on Linux. Mostly harmless to set this, except on GPU |
| builders. |
| """ |
| |
| args = attrib(command_args, default=()) |
| use_xvfb = attrib(bool, default=True) |
| |
| @property |
| def test_class(self): |
| """The test class associated with the spec.""" |
| return LocalGTestTest |
| |
| |
| class LocalGTestTest(LocalTest): |
| |
| |
| @property |
| def option_flags(self) -> TestOptionFlags: |
| return _GTEST_OPTION_FLAGS |
| |
| @property |
| def uses_local_devices(self) -> bool: |
| return True |
| |
| def compile_targets(self) -> Iterable[str]: |
| return [self.target_name] |
| |
| def run( |
| self, |
| checkout_dir: Path, |
| source_dir: Path, |
| build_dir: Path, |
| suffix: str, |
| info_messages: Iterable[str] = (), |
| ) -> None: |
| tests_to_retry = self._tests_to_retry(suffix) |
| # pylint apparently gets confused by a property in a base class where the |
| # setter is overridden |
| test_options = self.test_options.for_running(suffix, tests_to_retry) # pylint: disable=no-member |
| args = test_options.add_args(self.spec.args, self.option_flags) |
| |
| if tests_to_retry: |
| args = _merge_arg(args, '--gtest_filter', ':'.join(tests_to_retry)) |
| |
| resultdb = self._prep_local_rdb(source_dir, include_artifacts=False) |
| gtest_results_file = self.api.m.json.output( |
| add_json_log=False, leak_to=resultdb.result_file) |
| |
| step_test_data = lambda: ( |
| self.api.m.test_utils.test_api.canned_gtest_output(True) + self.api.m. |
| raw_io.test_api.stream_output_text( |
| 'rdb-stream: included "invocations/some-inv-name" in ' |
| '"invocations/parent-inv-name"', 'stderr')) |
| |
| kwargs = { |
| 'name': self.step_name(suffix), |
| 'args': args, |
| 'step_test_data': step_test_data, |
| 'resultdb': resultdb, |
| 'parse_gtest_output': True, |
| } |
| kwargs['xvfb'] = self.spec.use_xvfb |
| kwargs['test_type'] = self.name |
| kwargs['test_launcher_summary_output'] = gtest_results_file |
| |
| step_result = self.api.m.chromium.runtest( |
| checkout_dir, |
| build_dir, |
| self.target_name, |
| builder_group=self.spec.waterfall_builder_group, |
| stderr=self.api.m.raw_io.output_text( |
| add_output_log=True, name='stderr'), |
| raise_on_failure=False, |
| **kwargs) |
| |
| status = step_result.presentation.status |
| |
| # TODO(kbr): add functionality to generate_gtest to be able to force running |
| # these local gtests via isolate from the src-side JSON files. |
| # crbug.com/584469 |
| self._update_failure_on_exit(suffix, step_result.retcode != 0) |
| |
| self._update_inv_name_from_stderr(step_result.stderr, suffix) |
| |
| _present_info_messages(step_result.presentation, self, info_messages) |
| |
| self.api.m.step.raise_on_failure(step_result, status) |
| |
| |
| def _clean_step_name(step_name, suffix): |
| """ |
| Based on |
| https://crrev.com/48baea8de14f5a17aef2edd7d0b8c00d7bbf7909/go/src/infra/appengine/test-results/frontend/builders.go#260 |
| Some tests add 'suffixes' in addition to the regular suffix, in order to |
| distinguish different runs of the same test suite on different hardware. We |
| don't want this to happen for layout test result uploads, since we have no |
| easy way to discover this new name. So, we normalize the step name before |
| uploading results. |
| """ |
| if ' ' in step_name: |
| step_name = step_name.split(' ')[0] |
| |
| return _add_suffix(step_name, suffix) |
| |
| |
| def _archive_layout_test_results(api, |
| step_name, |
| step_suffix=None, |
| swarm_task_ids=None): |
| # LayoutTest's special archive and upload results |
| results_dir = api.path.start_dir / 'layout-test-results' |
| |
| buildername = api.buildbucket.builder_full_name |
| buildnumber = api.buildbucket.build.number |
| |
| gcs_bucket = 'chromium-layout-test-archives' |
| cmd = [ |
| 'python3', |
| api.chromium_tests.resource('archive_layout_test_results.py'), |
| '--results-dir', |
| results_dir, |
| '--build-number', |
| buildnumber, |
| '--builder-name', |
| buildername, |
| '--gs-bucket', |
| f'gs://{gcs_bucket}', |
| ] |
| if swarm_task_ids: |
| cmd.extend(['--task-ids', ','.join(swarm_task_ids)]) |
| |
| # TODO: The naming of the archive step is clunky, but the step should |
| # really be triggered src-side as part of the post-collect merge and |
| # upload, and so this should go away when we make that change. |
| step_name = _clean_step_name(step_name, step_suffix) |
| cmd += ['--step-name', step_name] |
| archive_step_name = 'archive results for ' + step_name |
| |
| cmd += ['--bot-utils-gsutil-py-path', api.depot_tools.gsutil_py_path] |
| archive_result = api.step(archive_step_name, cmd) |
| |
| # TODO(tansell): Move this to render_results function |
| sanitized_buildername = re.sub('[ .()]', '_', buildername) |
| |
| # Also link the new version of results.html which would fetch test results |
| # from result DB. It will have parameters in following format: |
| # ?json=<full_results_jsonp.js> |
| base = f"https://{gcs_bucket}.storage.googleapis.com/results.html" |
| path_full_results_jsonp = "%s/%s/%s/full_results_jsonp.js" % ( |
| sanitized_buildername, buildnumber, urllib.parse.quote(step_name)) |
| web_test_results = f"{base}?json={path_full_results_jsonp}" |
| archive_result.presentation.links['web_test_results'] = web_test_results |
| return web_test_results |
| |
| |
| @attrs() |
| class SwarmingTestSpec(TestSpec): |
| """Spec for a test that runs via swarming. |
| |
| Attributes: |
| * cipd_packages - The CIPD packages to be loaded for the test's |
| swarming tasks. |
| * containment_type - The type of containment to use for the test's |
| swarming tasks. See `swarming.py trigger --help` for more info. |
| * dimensions - Requested dimensions of the test. The keys are |
| dimension names. The values are the value of the dimensions or |
| None to clear a dimension. |
| * expiration - The expiration timeout in seconds of the test's |
| swarming tasks. |
| * optional_dimensions - Optional dimensions that create additional |
| fallback task sices. The keys are cumulative expiration times for |
| the additional slices mapping to dicts of the same form as the |
| `dimensions` attribute. Additional task slices will be created for |
| each item, in order of the expiration time for the item using the |
| dimensions specified in the value. The final slice will set the |
| dimensions according to the `dimensions` attribute. |
| * extra_suffix - An additional suffix applied to the test's step |
| name. |
| * hard_timeout - The execution timeout in seconds of the test's |
| swarming tasks. |
| * io_timeout - The maximum amount of time in seconds swarming will |
| allow the task to be silent (no stdout or stderr). |
| * trigger_script - An optional script used for triggering the test's |
| swarming tasks. |
| * merge - An optional script used for merging results between the |
| test's swarming tasks. |
| * args - Arguments to be passed to the test. |
| * isolate_profile_data - Whether to isolate profile data during task |
| execution. |
| * named_caches - Named caches to mount for the test's swarming |
| tasks. The keys are the named of the cache and the values are the |
| path relative to the swarming task's root directory where the |
| cache should be mounted. |
| * shards - The number of shards to trigger. |
| * server - The Swarming server to run the test on. If not provided, the |
| server the current task is running on will be used. |
| * service_account - The service account to run the test's swarming |
| tasks as. |
| * idempotent - Whether to mark the test's swarming tasks as |
| idempotent. If not provided, the default logic used by the |
| `chromium_swarming` recipe module will be used. |
| * realm - The realm to run the Swarming task in. |
| """ |
| # pylint: disable=abstract-method |
| |
| cipd_packages = attrib(sequence[chromium_swarming.CipdPackage], default=()) |
| containment_type = attrib(str, default=None) |
| dimensions = attrib(mapping[str, ...], default={}) |
| expiration = attrib(int, default=None) |
| optional_dimensions = attrib(mapping[int, mapping[str, ...]], default={}) |
| extra_suffix = attrib(str, default=None) |
| hard_timeout = attrib(int, default=None) |
| io_timeout = attrib(int, default=None) |
| trigger_script = attrib(chromium_swarming.TriggerScript, default=None) |
| merge = attrib(chromium_swarming.MergeScript, default=None) |
| args = attrib(command_args, default=()) |
| isolate_profile_data = attrib(bool, False) |
| named_caches = attrib(mapping[str, str], default={}) |
| shards = attrib(int, default=1) |
| server = attrib(str, default=None) |
| service_account = attrib(str, default=None) |
| idempotent = attrib(bool, default=None) |
| realm = attrib(str, default=None) |
| |
| @classmethod |
| def create(cls, name, **kwargs): |
| """Create a SwarmingTestSpec. |
| |
| Arguments: |
| * name - The name of the test. |
| * kwargs - Additional keyword arguments that will be used to |
| initialize the attributes of the returned spec. If the keyword |
| `extra_suffix` is not set, a value will be computed if the |
| `'gpu'` dimension is specified or if the `'os'` dimension is |
| `'Android'` and the `'device_type'` dimension is set. |
| """ |
| dimensions = kwargs.get('dimensions', {}) |
| extra_suffix = kwargs.pop('extra_suffix', None) |
| if extra_suffix is None: |
| if dimensions.get('gpu'): |
| extra_suffix = cls._get_gpu_suffix(dimensions) |
| elif dimensions.get('os') == 'Android' and dimensions.get('device_type'): |
| extra_suffix = cls._get_android_suffix(dimensions) |
| return super().create(name, extra_suffix=extra_suffix, **kwargs) |
| |
| @property |
| def name(self): |
| if self.extra_suffix: |
| return '%s %s' % (self._name, self.extra_suffix) |
| return self._name |
| |
| def with_shards(self, shards): |
| return attr.evolve(self, shards=int(shards)) |
| |
| @staticmethod |
| def _get_gpu_suffix(dimensions): |
| all_gpu_dimensions = dimensions.get('gpu', '').split('|') |
| all_gpu_identifiers = [] |
| |
| for gpu_dimension in all_gpu_dimensions: |
| # Expected format if present is vendor:model-driver |
| split_gpu_dimension = gpu_dimension.split(':') |
| gpu_vendor_id = split_gpu_dimension[0].lower() |
| vendor_ids = { |
| '8086': 'Intel', |
| '10de': 'NVIDIA', |
| '1002': 'AMD', |
| 'none': 'SwiftShader', # explicit 'none' means requesting SwS |
| } |
| gpu_model_id = '' |
| if len(split_gpu_dimension) > 1: |
| gpu_model_id = split_gpu_dimension[1].split('-')[0] |
| if gpu_vendor_id in vendor_ids: |
| if gpu_model_id: |
| all_gpu_identifiers.append('%s 0x%s' % |
| (vendor_ids[gpu_vendor_id], gpu_model_id)) |
| else: |
| all_gpu_identifiers.append(vendor_ids[gpu_vendor_id]) |
| else: |
| if gpu_model_id: |
| all_gpu_identifiers.append('(%s:%s)' % (gpu_vendor_id, gpu_model_id)) |
| else: |
| all_gpu_identifiers.append('(%s)' % gpu_vendor_id) |
| gpu_identifier = '/'.join(all_gpu_identifiers) |
| |
| os = dimensions.get('os', '') |
| if os.lower().startswith('mac'): |
| if dimensions.get('hidpi', '') == '1': |
| os_name = 'Mac Retina' |
| else: |
| os_name = 'Mac' |
| elif os.lower().startswith('windows'): |
| os_name = 'Windows' |
| else: |
| # TODO(crbug/1018836): Use distro specific name instead of Linux. |
| os_name = 'Linux' |
| |
| return 'on %s GPU on %s' % (gpu_identifier, os_name) |
| |
| @staticmethod |
| def _get_android_suffix(dimensions): |
| device_codenames = { |
| 'angler': 'Nexus 6P', |
| 'athene': 'Moto G4', |
| 'blueline': 'Pixel 3', |
| 'bullhead': 'Nexus 5X', |
| 'cheetah': 'Pixel 7 Pro', |
| 'crosshatch': 'Pixel 3 XL', |
| 'dragon': 'Pixel C', |
| 'flame': 'Pixel 4', |
| 'flo': 'Nexus 7 [2013]', |
| 'flounder': 'Nexus 9', |
| 'foster': 'NVIDIA Shield', |
| 'fugu': 'Nexus Player', |
| 'goyawifi': 'Galaxy Tab 3', |
| 'grouper': 'Nexus 7 [2012]', |
| 'hammerhead': 'Nexus 5', |
| 'herolte': 'Galaxy S7 [Global]', |
| 'heroqlteatt': 'Galaxy S7 [AT&T]', |
| 'j5xnlte': 'Galaxy J5', |
| 'm0': 'Galaxy S3', |
| 'mako': 'Nexus 4', |
| 'manta': 'Nexus 10', |
| 'marlin': 'Pixel 1 XL', |
| 'oriole': 'Pixel 6', |
| 'panther': 'Pixel 7', |
| 'raven': 'Pixel 6 Pro', |
| 'redfin': 'Pixel 5', |
| 'sailfish': 'Pixel 1', |
| 'sargo': 'Pixel 3a', |
| 'shamu': 'Nexus 6', |
| 'shiba': 'Pixel 8', |
| 'sprout': 'Android One', |
| 'sunfish': 'Pixel 4a', |
| 'taimen': 'Pixel 2 XL', |
| 'tangorpro': 'Pixel Tablet', |
| 'walleye': 'Pixel 2', |
| 'zerofltetmo': 'Galaxy S6', |
| } |
| targetted_device = dimensions['device_type'] |
| product_name = device_codenames.get(targetted_device, targetted_device) |
| return 'on Android device %s' % product_name |
| |
| |
| class SwarmingTest(Test, AbstractSwarmingTest): |
| # Some suffixes should have marginally higher priority. See crbug.com/937151. |
| SUFFIXES_TO_INCREASE_PRIORITY = set( |
| ['without patch', 'retry shards with patch']) |
| # The flake endorser triggers test "shards" as different test suffixes. |
| # For example, there could be an android_browsertests (check flakiness |
| # shard #0) and android_browsertests (check flakiness shard #1). Since the |
| # shard # can vary, we need to check if 'check flakiness' is in the test |
| # suffix being triggered. |
| # Why these shards need higher priority: crbug.com/1366122 |
| CHECK_FLAKINESS_SUFFIX = 'check flakiness' |
| |
| def __init__(self, spec, chromium_tests_api): |
| super().__init__(spec, chromium_tests_api) |
| |
| self._tasks = {} |
| self._raw_cmd = [] |
| self._rts_raw_cmd = [] |
| self._relative_cwd = None |
| |
| def _dispatches_to_windows(self): |
| if self.spec.dimensions: |
| os = self.spec.dimensions.get('os', '') |
| return os.startswith('Windows') |
| return False |
| |
| @property |
| def locality(self) -> TestLocality: |
| return TestLocality.SWARMING |
| |
| @property |
| def isolate_target(self) -> str: |
| return self.target_name |
| |
| @property |
| def isolate_profile_data(self) -> bool: |
| return self.spec.isolate_profile_data |
| |
| @property |
| def raw_cmd(self) -> Iterable[str]: |
| return self._raw_cmd |
| |
| @raw_cmd.setter |
| def raw_cmd(self, value: Iterable[str]) -> None: |
| self._raw_cmd = value |
| |
| @property |
| def rts_raw_cmd(self) -> Iterable[str]: |
| return self._rts_raw_cmd |
| |
| @rts_raw_cmd.setter |
| def rts_raw_cmd(self, value: Iterable[str]) -> None: |
| self._rts_raw_cmd = value |
| |
| @property |
| def relative_cwd(self) -> str: |
| return self._relative_cwd |
| |
| @relative_cwd.setter |
| def relative_cwd(self, value: str) -> None: |
| self._relative_cwd = value |
| |
| @property |
| def shards(self) -> int: |
| return self.spec.shards |
| |
| @property |
| def supports_rts(self) -> bool: |
| return bool(self.rts_raw_cmd) |
| |
| def _add_instructions(self, suffix: str, include_utr_instruction: bool): |
| """Gets the reproduction instructions to be attached to the invocation""" |
| |
| # Attempt to find the last compile for the local step dependency |
| local_dependency = self.api.m.repro_instructions.get_dependency(r'compile') |
| |
| task = self._tasks[suffix] |
| prebuilt_task_id = task.trigger_output.get('tasks', |
| []).get(0, |
| {}).get('task_id', None) |
| cas_digest = self.api.m.isolate.isolated_tests.get(self.isolate_target) |
| remote_instruction = None |
| remote_dependency = None |
| builder = self.api.m.properties.get('orchestrator', {}).get( |
| 'builder_name', self.api.m.buildbucket.build.builder.builder) |
| |
| def _create_prebuilt_instruction(extra_args: Iterable[str]): |
| prebuilt_instruction = [] |
| if include_utr_instruction: |
| prebuilt_instruction.append('**Re-trigger in swarming:**') |
| prebuilt_instruction.append( |
| get_utr_instruction( |
| 'test', |
| self.api.m.buildbucket.build.builder.project, |
| self.api.m.led.shadowed_bucket or |
| self.api.m.buildbucket.build.builder.bucket, |
| builder, |
| [self.name], |
| utr_flags=['--reuse-task', prebuilt_task_id], |
| extra_args=extra_args, |
| )) |
| prebuilt_instruction.append( |
| '*Note: additional args can be used by extending this command*') |
| prebuilt_instruction.append('') |
| if cas_digest: |
| prebuilt_instruction.append('**Download test binary:**') |
| prebuilt_instruction.append('```./tools/luci-go/cas download ' |
| f'-cas-instance {self.api.m.cas.instance} ' |
| f'-digest {cas_digest} -dir tmp```') |
| prebuilt_instruction.append('Run `./tools/luci-go/cas login` if needed') |
| prebuilt_instruction.append( |
| '*See the local instructions tab and run from the tmp dir*') |
| return '<br/>'.join(prebuilt_instruction) |
| |
| if include_utr_instruction: |
| remote_dependency = self.api.m.repro_instructions.get_dependency( |
| r'bot_update') |
| remote_instruction = get_utr_instruction( |
| 'compile-and-test', |
| self.api.m.buildbucket.build.builder.project, |
| self.api.m.led.shadowed_bucket or |
| self.api.m.buildbucket.build.builder.bucket, |
| builder, |
| [self.name], |
| ) |
| self.api.m.repro_instructions.create_step_instruction( |
| self._instructions_tag_for_suffix('step', suffix), |
| f'{self.name} instructions', |
| local_content=task.get_local_instruction(), |
| remote_content=remote_instruction, |
| local_dependency=local_dependency, |
| remote_dependency=remote_dependency, |
| prebuilt_content=_create_prebuilt_instruction(None), |
| ) |
| |
| test_invocations = [ |
| inv if '/' not in inv else inv.split('/')[1] |
| for inv in task.get_invocation_names() |
| ] |
| |
| if test_invocations: |
| # Escaping the brackets makes the placeholder a constant string of |
| # {{test.tags.test_name}} which will be replaced in milo with the |
| # actual test name |
| filter_arg = f'{self.option_flags.filter_flag}={{{{test.tags.test_name}}}}' |
| if include_utr_instruction: |
| remote_instruction = get_utr_instruction( |
| 'compile-and-test', |
| self.api.m.buildbucket.build.builder.project, |
| self.api.m.led.shadowed_bucket or |
| self.api.m.buildbucket.build.builder.bucket, |
| self.api.m.buildbucket.build.builder.builder.replace( |
| '-compilator', ''), [self.name], |
| extra_args=['--', filter_arg]) |
| self.api.m.repro_instructions.create_test_result_instruction( |
| self._instructions_tag_for_suffix('test', suffix), |
| f'{self.name} instructions', |
| test_invocations, |
| local_content=task.get_local_instruction(extra_args=[filter_arg]), |
| remote_content=remote_instruction, |
| prebuilt_content=_create_prebuilt_instruction(['--', filter_arg]), |
| local_dependency=local_dependency, |
| remote_dependency=remote_dependency, |
| ) |
| |
| def did_complete(self, suffix) -> bool: |
| return suffix in self._tasks and not self._tasks[ |
| suffix].has_incomplete_shards |
| |
| def has_valid_results(self, suffix: str) -> bool: |
| if not super().has_valid_results(suffix): |
| return False |
| |
| rdb_results = self.get_rdb_results(suffix) |
| task = self.get_task(suffix) |
| |
| if not rdb_results or not task: |
| return False |
| |
| num_failed_shards = len(task.failed_shards) |
| |
| rdb_invocations_with_fails = { |
| test.invocation_id for test in rdb_results.unexpected_failing_tests |
| } |
| |
| # This check ensure that all shards from the task that marked as a failure |
| # have corresponding tests in RDB. This will ensure that when a shard fails |
| # and reports no results to RDB, the specific shard will be marked as invalid, |
| # labeling the whole step as invalid. |
| return num_failed_shards == len(rdb_invocations_with_fails) |
| |
| @abc.abstractmethod |
| def _create_task( |
| self, |
| suffix: str, |
| cas_input_root: str, |
| include_utr_instruction: bool, |
| ) -> chromium_swarming.SwarmingTask: |
| """Creates a swarming task. Must be overridden in subclasses. |
| |
| Args: |
| suffix: Suffix added to the test name. |
| cas_input_root: Hash or digest of the isolated test to be run. |
| |
| Returns: |
| A SwarmingTask object. |
| """ |
| raise NotImplementedError() # pragma: no cover |
| |
| def _handle_results( |
| self, |
| suffix: str, |
| step_result: step_data.StepData, |
| ) -> None: |
| """Handle step results from collecting the swarming tasks. |
| |
| Swarming tests that require additional handling after collecting the |
| tasks should override this. |
| """ |
| del suffix, step_result |
| |
| def _shards_to_retry_with(self, original_num_shards, num_tests_to_retry, |
| test_options): |
| """Calculates the number of shards to run when retrying this test. |
| |
| Args: |
| original_num_shards: The number of shards used to run the test when it |
| first ran. |
| num_tests_to_retry: The number of tests we're trying to retry. |
| test_options: The TestOptions for running for a given retry. |
| |
| Returns: |
| The number of shards to use when retrying tests that failed. |
| |
| Note that this assumes this test has run 'with patch', and knows how many |
| tests ran in that case. It doesn't make sense to ask how this test should |
| run when retried, if it hasn't run already. |
| """ |
| if original_num_shards <= 1: |
| return original_num_shards |
| |
| total_tests_ran = max( |
| result.total_tests_ran for result in self._rdb_results.values()) |
| assert total_tests_ran, ( |
| "We cannot compute the total number of tests to re-run if no tests " |
| "were run 'with patch'. Expected the results tracker to contain key " |
| "'total_tests_ran', but it didn't") |
| |
| # We want to approximately match the previous shard load. Using only one |
| # shard can lead to a single shard running many more tests than it |
| # normally does. As the number of tests to retry approaches the total |
| # number of total tests ran, we get closer to running with the same number |
| # of shards as we originally were triggered with. |
| # Note that this technically breaks when we're running a tryjob on a CL |
| # which changes the number of tests to be run. |
| # Clamp to be 1 < value < original_num_shards, so that we don't trigger too |
| # many shards, or 0 shards. |
| # |
| # Since we repeat failing tests REPEAT_COUNT_FOR_FAILING_TESTS times, we |
| # artificially inflate the number of shards by that factor, since we expect |
| # tests to take that much longer to run. |
| # |
| # We never allow more than num_test_to_retry shards, since that would leave |
| # shards doing nothing. |
| return int( |
| min( |
| max( |
| original_num_shards * (test_options.repeat_count or 1) * |
| (float(num_tests_to_retry) / total_tests_ran), 1), |
| original_num_shards, |
| num_tests_to_retry, |
| )) |
| |
| def _apply_swarming_task_config(self, task, suffix, filter_flag, |
| filter_delimiter, extra_args): |
| """Applies shared configuration for swarming tasks. |
| """ |
| add_one_test_shard_enabled = False |
| shards = self.spec.shards |
| # When this experiment is enabled, we want to trigger suites with one |
| # additional shard so that we can go back and query for test overhead |
| # estimations. |
| # See go/nplus1shardsproposal |
| # For now, only add a shard if the suite already runs with multiple shards |
| # Although rare, some suites may be swarmed but unable to work properly |
| # with more than one shard. |
| buildbucket_experiments = self.api.m.buildbucket.build.input.experiments |
| add_one_test_shard_enabled = ( |
| 'chromium.add_one_test_shard' in buildbucket_experiments and |
| suffix in ['with patch', 'retry shards with patch'] and shards > 1) |
| if add_one_test_shard_enabled: |
| shards += 1 |
| |
| tests_to_retry = self._tests_to_retry(suffix) |
| if tests_to_retry: |
| # The filter list is eventually passed to the binary over the command |
| # line. On Windows, the command line max char limit is 8191 characters. |
| # On other OSes, the max char limit is over 100,000 characters. We avoid |
| # sending the filter list if we're close to the limit -- this causes all |
| # tests to be run. |
| char_limit = 6000 if self._dispatches_to_windows() else 90000 |
| expected_filter_length = ( |
| sum(len(x) for x in tests_to_retry) + |
| len(tests_to_retry) * len(filter_delimiter)) |
| if expected_filter_length >= char_limit: |
| tests_to_retry = None |
| |
| test_options = self.test_options.for_running(suffix, tests_to_retry) |
| args = test_options.add_args(extra_args, self.option_flags) |
| if tests_to_retry: |
| test_list = filter_delimiter.join(tests_to_retry) |
| # Append filter with individual tests to retry |
| args = _merge_arg(args, filter_flag, test_list) |
| shards = self._shards_to_retry_with(shards, len(tests_to_retry), |
| test_options) |
| |
| task.extra_args.extend(args) |
| task.shards = shards |
| |
| task_request = task.request |
| task_slice = task_request[0] |
| |
| merge = self.spec.merge |
| using_pgo = self.api.m.chromium_tests.m.pgo.using_pgo |
| if self.isolate_profile_data or using_pgo: |
| # Targets built with 'use_clang_coverage' or 'use_clang_profiling' (also |
| # set by chrome_pgo_phase=1) will look at this environment variable to |
| # determine where to write the profile dumps. The %Nm syntax is understood |
| # by this instrumentation, see: |
| # https://clang.llvm.org/docs/SourceBasedCodeCoverage.html#id4 |
| llvm_profile_file = '${ISOLATED_OUTDIR}/profraw/' |
| |
| # Enable the continuous mode for coverage builds only, not PGO. Coverage |
| # builds have isolate_profile_data set and using_pgo *not* set. |
| # |
| # TODO(crbug.com/41493392): reenable the continuous mode on Windows once |
| # test timeout issues are resolved. |
| if not using_pgo and not self._dispatches_to_windows(): |
| # Enable the continuous mode, which is set by adding %c to |
| # LLVM_PROFILE_FILE, for coverage builds. The continuous mode causes the |
| # instrumentation to update counters in real time instead of flushing |
| # them to disk at process shutdown, which recovers coverage data for |
| # sandboxed processes and processes that exit abnormally (e.g. death |
| # tests). See https://crbug.com/1468343. |
| llvm_profile_file = llvm_profile_file + 'default-%2m%c.profraw' |
| else: |
| llvm_profile_file = llvm_profile_file + 'default-%2m.profraw' |
| env_vars = { |
| 'LLVM_PROFILE_FILE': llvm_profile_file, |
| } |
| |
| # crbug.com/1124774 - For PGO, we're increasing the shutdown timeout to |
| # 300 seconds to allow sufficient time for all processes to finish writing |
| # profiles. |
| if using_pgo: |
| env_vars['CHROME_SHUTDOWN_TIMEOUT'] = '300' |
| if self.api.m.chromium.c.TARGET_PLATFORM == 'android': |
| env_vars['CHROME_PGO_PROFILING'] = '1' |
| |
| task_slice = task_slice.with_env_vars(**env_vars) |
| |
| sparse = True |
| skip_validation = False |
| |
| # code coverage runs llvm-profdata merge with --sparse. PGO does not. |
| if using_pgo: |
| sparse = False |
| skip_validation = True |
| # TODO(crbug.com/1076055) - Refactor this to the profiles recipe_module |
| # Wrap the merge script specific to the test type (i.e. gtest vs isolated |
| # script tests) in a wrapper that knows how to merge coverage/pgo profile |
| # data. If the test object does not specify a merge script, use the one |
| # defined by the swarming task in the chromium_swarm module. (The default |
| # behavior for non-coverage/non-profile tests). |
| merge = self.api.m.code_coverage.shard_merge( |
| self.step_name(suffix), |
| self.target_name, |
| skip_validation=skip_validation, |
| sparse=sparse, |
| additional_merge=self.spec.merge or task.merge) |
| |
| if suffix.startswith('retry shards'): |
| task_slice = task_slice.with_idempotent(False) |
| elif self.spec.idempotent is not None: |
| task_slice = task_slice.with_idempotent(self.spec.idempotent) |
| |
| # task.shard_indices dictates how many shards will be triggered |
| # CI builders retry invalid shards with the suffix "retry shards", instead |
| # of "retry shards with patch" so check for both options. |
| # We will re-calculate the number of shards for tests_to_retry and retry the |
| # same failed shard indices otherwise. |
| if not tests_to_retry and suffix in [ |
| 'retry shards', 'retry shards with patch' |
| ]: |
| if suffix == 'retry shards': |
| # CI builders use the default '' suffix when calling run_tests() |
| # in test_utils/api |
| task_suffix = '' |
| else: |
| task_suffix = 'with patch' |
| task.task_to_retry = self._tasks[task_suffix] |
| assert task.task_to_retry, ( |
| '\'retry_shards_with_patch\' expects that the \'with patch\' phase ' |
| 'has already run, but it apparently hasn\'t.') |
| task.shard_indices = task.task_to_retry.failed_shards |
| # Test suite failure is determined by merging and examining the JSON |
| # output from the shards. Failed shards are determined by looking at the |
| # swarming output [retcode !=0 or state != 'SUCCESS']. It is possible that |
| # these are not in sync. This will cause no tasks to be dispatched for |
| # 'retry shards with patch'. This error has graceful recovery: 'retry |
| # shards with patch' will simply reuse results from 'with patch'. |
| # Regardless, we want to emit a failing step so that the error is not |
| # overlooked. |
| if len(task.shard_indices) == 0: # pragma: no cover |
| self.api.m.step.empty( |
| 'missing failed shards', |
| status=self.api.m.step.FAILURE, |
| step_text=( |
| "Retry shards with patch is being run on {}," |
| " which has no failed shards." |
| " This usually happens because of a test runner bug." |
| " The test runner reports test failures, but had exit_code 0." |
| .format(self.step_name(suffix='with patch')))) |
| else: |
| task.shard_indices = range(task.shards) |
| |
| task.build_properties = self.api.m.chromium.build_properties |
| task.containment_type = self.spec.containment_type |
| if merge: |
| task.merge = merge |
| |
| task.trigger_script = self.spec.trigger_script |
| |
| ensure_file = task_slice.cipd_ensure_file |
| for package in self.spec.cipd_packages: |
| ensure_file.add_package(package.name, package.version, package.root) |
| |
| task_slice = (task_slice.with_cipd_ensure_file(ensure_file)) |
| |
| task.named_caches.update(self.spec.named_caches) |
| |
| if (suffix in self.SUFFIXES_TO_INCREASE_PRIORITY or |
| self.CHECK_FLAKINESS_SUFFIX in suffix): |
| task_request = task_request.with_priority(task_request.priority - 1) |
| |
| if self.spec.expiration: |
| task_slice = task_slice.with_expiration_secs(self.spec.expiration) |
| |
| if self.spec.hard_timeout: |
| task_slice = task_slice.with_execution_timeout_secs( |
| self.spec.hard_timeout) |
| |
| if self.spec.io_timeout: |
| task_slice = task_slice.with_io_timeout_secs(self.spec.io_timeout) |
| |
| task_dimensions = task_slice.dimensions |
| # Add custom dimensions. |
| task_dimensions.update(self.spec.dimensions) |
| task_slice = task_slice.with_dimensions(**task_dimensions) |
| |
| # Add optional dimensions. |
| task.optional_dimensions = self.spec.optional_dimensions |
| |
| # Add tags. |
| tags = { |
| 'ninja_target': [self.spec.full_test_target or ''], |
| # TODO(crbug/1106965): remove test_id_prefix from tags, if deriver |
| # gets turned down. |
| 'test_id_prefix': [self.test_id_prefix or ''], |
| 'test_suite': [self.canonical_name], |
| 'waterfall_builder_group': [self.spec.waterfall_builder_group or ''], |
| 'waterfall_buildername': [self.spec.waterfall_buildername or ''], |
| 'test_phase': [suffix or ''], |
| } |
| if add_one_test_shard_enabled: |
| tags.update({ |
| 'experimental_shard_count': [str(shards)], |
| 'normally_assigned_shard_count': [str(shards - 1)], |
| }) |
| |
| if self.spec.server: |
| task.server = self.spec.server |
| |
| if self.spec.realm: |
| task_request = task_request.with_realm(self.spec.realm) |
| |
| task.request = ( |
| task_request.with_slice(0, task_slice).with_name( |
| self.step_name(suffix)).with_service_account( |
| self.spec.service_account or '').with_tags(tags)) |
| return task |
| |
| def get_task(self, suffix: str) -> chromium_swarming.SwarmingTask: |
| return self._tasks.get(suffix) |
| |
| def get_invocation_names(self, suffix: str) -> Iterable[str]: |
| task = self.get_task(suffix) |
| if task: |
| return task.get_invocation_names() |
| return [] |
| |
| def pre_run(self, suffix: str, include_utr_instruction: bool = False) -> None: |
| """Launches the test on Swarming.""" |
| assert suffix not in self._tasks, ('Test %s was already triggered' % |
| self.step_name(suffix)) |
| |
| task_input = self.api.m.isolate.isolated_tests.get(self.isolate_target) |
| if not task_input: |
| return self.api.m.step.empty( |
| '[error] %s' % self.step_name(suffix), |
| status=self.api.m.step.INFRA_FAILURE, |
| step_text=('*.isolated file for target %s is missing' % |
| self.isolate_target)) |
| |
| # Create task. |
| self._tasks[suffix] = self._create_task(suffix, task_input, |
| include_utr_instruction) |
| |
| # Export TARGET_PLATFORM to resultdb tags |
| resultdb = self.spec.resultdb |
| base_tags = resultdb.base_tags or tuple() |
| if (self.api.m.chromium.c and self.api.m.chromium.c.TARGET_PLATFORM): |
| resultdb = attr.evolve( |
| resultdb, |
| base_tags=base_tags + |
| (('target_platform', self.api.m.chromium.c.TARGET_PLATFORM),)) |
| |
| self.api.m.chromium_swarming.trigger_task( |
| self._tasks[suffix], resultdb=resultdb) |
| |
| # Add instructions now that we have invocations |
| self._add_instructions(suffix, include_utr_instruction) |
| |
| def run( |
| self, |
| checkout_dir: Path, |
| source_dir: Path, |
| build_dir: Path, |
| suffix: str, |
| info_messages: Iterable[str] = (), |
| ) -> None: |
| """Waits for launched test to finish and collects the results.""" |
| # There's no guarantee that a checkout exists for a swarming test, so |
| # checkout_dir and source_dir shouldn't be used |
| del checkout_dir, source_dir, build_dir |
| |
| step_result, _ = ( |
| self.api.m.chromium_swarming.collect_task( |
| self._tasks[suffix], raise_on_failure=False)) |
| |
| metadata = self._step_metadata(suffix) |
| metadata['full_step_name'] = '.'.join(step_result.name_tokens) |
| step_result.presentation.logs['step_metadata'] = (self.api.m.json.dumps( |
| metadata, indent=2, sort_keys=True)).splitlines() |
| |
| self._update_failure_on_exit( |
| suffix, |
| (bool(self._tasks[suffix].failed_shards) or step_result.retcode != 0)) |
| |
| info_message_list = list(info_messages) |
| if suffix == 'retry shards with patch' and self.retry_only_failed_tests: |
| info_message_list.append( |
| 'Ran only previously failing tests, instead of the entire shard. ' |
| 'This is enabled on a per suite and per builder basis.\n') |
| _present_info_messages(step_result.presentation, self, info_message_list) |
| |
| self._present_rdb_results(step_result, self._rdb_results.get(suffix)) |
| |
| self._handle_results(suffix, step_result) |
| |
| def _step_metadata(self, suffix): |
| data = { |
| 'waterfall_builder_group': self.spec.waterfall_builder_group, |
| 'waterfall_buildername': self.spec.waterfall_buildername, |
| 'canonical_step_name': self.canonical_name, |
| 'isolate_target_name': self.isolate_target, |
| } |
| if suffix is not None: |
| data['patched'] = suffix in ('with patch', 'retry shards with patch') |
| data['dimensions'] = self._tasks[suffix].request[0].dimensions |
| data['swarm_task_ids'] = self._tasks[suffix].get_task_ids() |
| return data |
| |
| @attrs() |
| class SwarmingGTestTestSpec(SwarmingTestSpec): |
| """A spec for a test that runs a gtest-based test via swarming.""" |
| |
| @property |
| def test_class(self): |
| """The test class associated with the spec.""" |
| return SwarmingGTestTest |
| |
| |
| class SwarmingGTestTest(SwarmingTest): |
| |
| @property |
| def option_flags(self) -> TestOptionFlags: |
| return _GTEST_OPTION_FLAGS |
| |
| def compile_targets(self) -> Iterable[str]: |
| return [self.target_name] |
| |
| def _create_task( |
| self, |
| suffix: str, |
| cas_input_root: str, |
| include_utr_instruction: bool, |
| ) -> chromium_swarming.SwarmingTask: |
| json_override = None |
| # TODO(crbug.com/1255217): Remove this android exception when logcats and |
| # tombstones are in resultdb. |
| if self.api.m.chromium.c.TARGET_PLATFORM != 'android': |
| json_override = self.api.m.path.mkstemp() |
| |
| if self.is_rts: |
| cmd = self.rts_raw_cmd |
| else: |
| cmd = self.raw_cmd |
| # gtests only support 1 test-launcher-filter-file. Remove the filter file |
| # arg from the raw command and combine it after the test spec is consumed |
| cmd_filters = [arg for arg in cmd if '--test-launcher-filter-file=' in arg] |
| for cmd_filter in cmd_filters: |
| cmd.remove(cmd_filter) |
| task = self.api.m.chromium_swarming.gtest_task( |
| test_name=self.name, |
| raw_cmd=cmd, |
| relative_cwd=self.relative_cwd, |
| cas_input_root=cas_input_root, |
| collect_json_output_override=json_override, |
| instructions_tag=self._instructions_tag_for_suffix('step', suffix), |
| include_utr_instruction=include_utr_instruction) |
| extra_args = list(self.spec.args) + cmd_filters |
| merged_filter_file_arg = ';'.join( |
| arg[len('--test-launcher-filter-file='):] |
| for arg in extra_args |
| if arg.startswith('--test-launcher-filter-file=')) |
| if merged_filter_file_arg: |
| extra_args = _merge_arg(extra_args, '--test-launcher-filter-file', |
| merged_filter_file_arg) |
| self._apply_swarming_task_config(task, suffix, '--gtest_filter', ':', |
| extra_args) |
| return task |
| |
| |
| @attrs() |
| class LocalIsolatedScriptTestSpec(TestSpec): |
| """Spec for a test that runs an isolated script locally. |
| |
| Attributes: |
| * args - Arguments to be passed to the test. |
| * results_handler_name - A name identifying the type of |
| `ResultsHandler` that will be used for processing the test |
| results: |
| * 'default' - JSONResultsHandler |
| * 'layout tests' - LayoutTestResultsHandler |
| * 'fake' - FakeCustomResultsHandler |
| * isolate_profile_data - Whether to isolate profile data during task |
| execution. |
| """ |
| |
| args = attrib(command_args, default=()) |
| results_handler_name = attrib( |
| enum(ALLOWED_RESULT_HANDLER_NAMES), default='default') |
| isolate_profile_data = attrib(bool, False) |
| |
| @property |
| def test_class(self): |
| """The test class associated with the spec.""" |
| return LocalIsolatedScriptTest |
| |
| |
| class LocalIsolatedScriptTest(LocalTest): |
| |
| def __init__(self, spec, chromium_tests_api): |
| super().__init__(spec, chromium_tests_api) |
| self.raw_cmd = [] |
| self.relative_cwd = None |
| |
| @property |
| def option_flags(self) -> TestOptionFlags: |
| if 'blink_web_tests' in self.name: |
| return _BLINK_WEB_TESTS_OPTION_FLAGS |
| if 'angle_unittests' in self.name: |
| return _ANGLE_UNITTESTS_OPTION_FLAGS |
| return _ISOLATED_SCRIPT_OPTION_FLAGS |
| |
| @property |
| def isolate_target(self) -> bool: |
| return self.target_name |
| |
| def compile_targets(self) -> Iterable[str]: |
| return [self.target_name] |
| |
| # TODO(nednguyen, kbr): figure out what to do with Android. |
| # (crbug.com/533480) |
| def run( |
| self, |
| checkout_dir: Path, |
| source_dir: Path, |
| build_dir: Path, |
| suffix: str, |
| info_messages: Iterable[str] = (), |
| ) -> None: |
| del checkout_dir, build_dir |
| |
| tests_to_retry = self._tests_to_retry(suffix) |
| # pylint apparently gets confused by a property in a base class where the |
| # setter is overridden |
| test_options = self.test_options.for_running(suffix, tests_to_retry) # pylint: disable=no-member |
| pre_args = [] |
| if self.relative_cwd: |
| pre_args += ['--relative-cwd', self.relative_cwd] |
| |
| cmd = list(self.raw_cmd) |
| cmd.extend(self.spec.args) |
| args = test_options.add_args(cmd, self.option_flags) |
| # TODO(nednguyen, kbr): define contract with the wrapper script to rerun |
| # a subset of the tests. (crbug.com/533481) |
| |
| temp = self.api.m.path.mkstemp() |
| json_results_file = self.api.m.json.output(leak_to=temp) |
| args.extend(['--isolated-script-test-output', json_results_file]) |
| |
| step_test_data = lambda: ( |
| self.api.m.json.test_api.output({ |
| 'valid': True, |
| 'failures': [] |
| }) + self.api.m.raw_io.test_api.stream_output_text( |
| 'rdb-stream: included "invocations/test-name" in ' |
| '"invocations/build-inv"', 'stderr')) |
| |
| kwargs = {} |
| if self.spec.isolate_profile_data: |
| kwargs.update({ |
| # Targets built with 'use_clang_coverage' will look at this |
| # environment variable to determine where to write the profile dumps. |
| # The %Nm syntax # is understood by this instrumentation, see: |
| # https://clang.llvm.org/docs/SourceBasedCodeCoverage.html#id4 |
| # The %c syntax enables the continuous mode, which updates counters |
| # in real time instead of flusing to disk at process exit. |
| # We use one profile only as this is meant for short, single-process |
| # tests. Anything longer or more complex should be running on swarming |
| # instead of locally. |
| 'env': { |
| 'LLVM_PROFILE_FILE': |
| '${ISOLATED_OUTDIR}/profraw/default-%1m%c.profraw', |
| }, |
| # The results of the script will be isolated, and the .isolate will be |
| # dumped to stdout. |
| 'stdout': self.api.m.raw_io.output_text(), |
| }) |
| |
| resultdb = self._prep_local_rdb(source_dir, temp=temp) |
| |
| step_result = self.api.m.isolate.run_isolated( |
| self.step_name(suffix), |
| self.api.m.isolate.isolated_tests[self.target_name], |
| args, |
| pre_args=pre_args, |
| step_test_data=step_test_data, |
| raise_on_failure=False, |
| resultdb=resultdb if resultdb else None, |
| stderr=self.api.m.raw_io.output_text( |
| add_output_log=True, name='stderr'), |
| **kwargs) |
| |
| status = step_result.presentation.status |
| |
| self._update_inv_name_from_stderr(step_result.stderr, suffix) |
| self._update_failure_on_exit(suffix, step_result.retcode != 0) |
| |
| _present_info_messages(step_result.presentation, self, info_messages) |
| |
| self.api.m.step.raise_on_failure(step_result, status) |
| |
| |
| @attrs() |
| class SwarmingIsolatedScriptTestSpec(SwarmingTestSpec): |
| """Spec for a test that runs an isolated script via swarming. |
| |
| Attributes: |
| * results_handler_name - A name identifying the type of |
| `ResultsHandler` that will be used for processing the test |
| results: |
| * 'default' - JSONResultsHandler |
| * 'layout tests' - LayoutTestResultsHandler |
| * 'fake' - FakeCustomResultsHandler |
| """ |
| |
| results_handler_name = attrib( |
| enum(ALLOWED_RESULT_HANDLER_NAMES), default='default') |
| |
| @property |
| def test_class(self): |
| """The test class associated with the spec.""" |
| return SwarmingIsolatedScriptTest |
| |
| |
| class SwarmingIsolatedScriptTest(SwarmingTest): |
| |
| def compile_targets(self) -> Iterable[str]: |
| return [self.target_name] |
| |
| @property |
| def option_flags(self) -> TestOptionFlags: |
| if 'blink_web_tests' in self.name: |
| return _BLINK_WEB_TESTS_OPTION_FLAGS |
| if 'angle_unittests' in self.name: |
| return _ANGLE_UNITTESTS_OPTION_FLAGS |
| return _ISOLATED_SCRIPT_OPTION_FLAGS |
| |
| def _create_task( |
| self, |
| suffix: str, |
| cas_input_root: str, |
| include_utr_instruction: bool, |
| ) -> chromium_swarming.SwarmingTask: |
| if self.is_rts: |
| cmd = self.rts_raw_cmd |
| else: |
| cmd = self.raw_cmd |
| |
| task = self.api.m.chromium_swarming.isolated_script_task( |
| raw_cmd=cmd, |
| relative_cwd=self.relative_cwd, |
| cas_input_root=cas_input_root, |
| instructions_tag=self._instructions_tag_for_suffix('step', suffix), |
| test_name=self.name, |
| include_utr_instruction=include_utr_instruction) |
| |
| self._apply_swarming_task_config(task, suffix, |
| '--isolated-script-test-filter', '::', |
| self.spec.args) |
| return task |
| |
| def _handle_results( |
| self, |
| suffix: str, |
| step_result: step_data.StepData, |
| ) -> None: |
| if self.spec.results_handler_name == 'layout tests': |
| upload_step_name = step_result.name_tokens[-1] |
| swarm_task_ids = self._tasks[suffix].get_task_ids() |
| _archive_layout_test_results( |
| self.api.m, |
| upload_step_name, |
| step_suffix=suffix, |
| swarm_task_ids=swarm_task_ids) |
| |
| |
| @attrs() |
| class MockTestSpec(TestSpec): |
| """Spec for a mock test. |
| |
| Attributes: |
| * failures - The test cases to report as failures. |
| * has_valid_results - Whether the test has valid results. |
| * per_suffix_failures - A mapping of suffix to the test cases to |
| report as failures for the suffix. |
| * per_suffix_valid - A mapping of suffix to whether the test has |
| * per_suffix_complete - A mapping of suffix to whether the test had |
| complete shards.. |
| * invocation_names - Used as return value in |MockTest|'s |
| |get_invocation_names| method. |
| * retry_only_failed_tests - Whether to only retry failed tests, instead |
| of the entire shard. |
| """ |
| |
| failures = attrib(sequence[str], default=()) |
| has_valid_results = attrib(bool, default=True) |
| did_complete = attrib(bool, default=True) |
| per_suffix_failures = attrib(mapping[str, sequence[str]], default={}) |
| per_suffix_valid = attrib(mapping[str, bool], default={}) |
| per_suffix_complete = attrib(mapping[str, bool], default={}) |
| runs_on_swarming = attrib(bool, default=False) |
| shards = attrib(int, default=1) |
| invocation_names = attrib(sequence[str], default=[]) |
| supports_rts = attrib(bool, default=False) |
| option_flags = attrib(TestOptionFlags, default=_DEFAULT_OPTION_FLAGS) |
| retry_only_failed_tests = attrib(bool, default=True) |
| |
| @property |
| def test_class(self): |
| """The test class associated with the spec.""" |
| return MockTest |
| |
| |
| class MockTask: |
| |
| def __init__(self, shards: int): |
| self._shards = shards |
| self.server = 'mock-swarming.appspot.com' |
| |
| def get_task_ids(self) -> Iterable[str]: |
| return [f'fake-task-id-{id(self)}-{i}' for i in range(self._shards)] |
| |
| |
| class MockTest(AbstractSwarmingTest, Test): |
| """A Test solely intended to be used in recipe tests.""" |
| |
| class ExitCodes: |
| FAILURE = 1 |
| INFRA_FAILURE = 2 |
| |
| def __init__(self, spec, chromium_tests_api): |
| super().__init__(spec, chromium_tests_api) |
| # We mutate the set of failures depending on the exit code of the test |
| # steps, so get a mutable copy |
| self._failures = list(spec.failures) |
| # Tasks for if the test is mocking a swarming test |
| self._tasks_by_suffix = {} |
| self._raw_cmd = [] |
| self._rts_raw_cmd = [] |
| self._relative_cwd = None |
| |
| @property |
| def option_flags(self) -> TestOptionFlags: |
| return self.spec.option_flags |
| |
| @property |
| def locality(self): |
| return (TestLocality.SWARMING |
| if self.spec.runs_on_swarming else TestLocality.LOCAL) |
| |
| @property |
| def isolate_profile_data(self) -> bool: |
| return False |
| |
| @property |
| def raw_cmd(self) -> Iterable[str]: |
| return self._raw_cmd |
| |
| @raw_cmd.setter |
| def raw_cmd(self, value: Iterable[str]) -> None: |
| self._raw_cmd = value |
| |
| @property |
| def rts_raw_cmd(self) -> Iterable[str]: |
| return self._rts_raw_cmd |
| |
| @rts_raw_cmd.setter |
| def rts_raw_cmd(self, value: Iterable[str]) -> None: |
| self._rts_raw_cmd = value |
| |
| @property |
| def relative_cwd(self) -> str: |
| return self._relative_cwd |
| |
| @relative_cwd.setter |
| def relative_cwd(self, value: str) -> None: |
| self._relative_cwd = value |
| |
| @property |
| def shards(self): |
| assert self.runs_on_swarming |
| return self.spec.shards |
| |
| @property |
| def retry_only_failed_tests(self) -> bool: |
| return self.spec.retry_only_failed_tests |
| |
| def get_task(self, suffix): |
| assert self.runs_on_swarming |
| return self._tasks_by_suffix[suffix] |
| |
| @contextlib.contextmanager |
| def _mock_exit_codes(self): |
| try: |
| yield |
| except self.api.m.step.StepFailure as f: |
| if f.result.retcode == self.ExitCodes.INFRA_FAILURE: |
| i = self.api.m.step.InfraFailure(f.name, result=f.result) |
| i.result.presentation.status = self.api.m.step.EXCEPTION |
| raise i from f |
| self._failures.append('test_failure') |
| raise |
| |
| def pre_run(self, suffix: str, include_utr_instruction: bool = False) -> None: |
| with self._mock_exit_codes(): |
| self.api.m.step('pre_run {}'.format(self.step_name(suffix)), |
| ['mock_test.pre_run']) |
| if self.runs_on_swarming: |
| self._tasks_by_suffix[suffix] = MockTask(self.shards) |
| |
| def run( |
| self, |
| checkout_dir: Path, |
| source_dir: Path, |
| build_dir: Path, |
| suffix: str, |
| info_messages: Iterable[str] = (), |
| ) -> None: |
| del checkout_dir, source_dir, build_dir |
| |
| with self._mock_exit_codes(): |
| step_result = self.api.m.step(self.step_name(suffix), ['mock_test']) |
| |
| _present_info_messages(step_result.presentation, self, info_messages) |
| |
| |
| def has_valid_results(self, suffix: str) -> bool: |
| if suffix in self.spec.per_suffix_valid: # pragma: no cover |
| return self.spec.per_suffix_valid[suffix] |
| return self.spec.has_valid_results |
| |
| def did_complete(self, suffix: str) -> bool: |
| if suffix in self.spec.per_suffix_complete: # pragma: no cover |
| return self.spec.per_suffix_complete[suffix] |
| return self.spec.did_complete |
| |
| def deterministic_failures(self, suffix: str) -> Set[str]: |
| if suffix in self.spec.per_suffix_failures: # pragma: no cover |
| return self.spec.per_suffix_failures[suffix] |
| return set(self._failures) |
| |
| def compile_targets(self) -> Iterable[str]: # pragma: no cover |
| return [] |
| |
| def get_invocation_names(self, suffix: str) -> Iterable[str]: |
| return self.spec.invocation_names |
| |
| @property |
| def supports_rts(self) -> bool: |
| return self.spec.supports_rts |
| |
| |
| @attrs() |
| class SkylabTestSpec(TestSpec): |
| """Spec for a suite that runs on CrOS Skylab.""" |
| # The CrOS board name, e.g. eve, kevin. |
| cros_board = attrib(str) |
| # Build target of the ChromeOS board. If unspecified, cros_board will be used. |
| # BUild_target is used to look for LKGM image if needed. |
| cros_build_target = attrib(str, default='') |
| # The CrOS DUT model. |
| cros_model = attrib(str, default='') |
| # Use the LKGM version of CrOS image. |
| # When this is set to true, cros_img must be empty. |
| use_lkgm = attrib(bool, default=False) |
| # The GS path presenting CrOS image to provision the DUT, |
| # e.g. atlas-release/R88-13545.0.0 |
| cros_img = attrib(str, default='') |
| # The optional GS bucket of CrOS image. |
| bucket = attrib(str, default='') |
| # The optional Public CTP Builder and luci bucket. |
| # The public_builder and public_builder_bucket fields can be used when |
| # default CTP builder is not sufficient/advised |
| # (ex: chromium cq, satlab for partners). |
| public_builder = attrib(str, default='') |
| public_builder_bucket = attrib(str, default='') |
| # The skylab device pool to run the test. By default the |
| # quota pool, shared by all CrOS tests. |
| dut_pool = attrib(str, default='') |
| # The number of shards used to run the test. |
| shards = attrib(int, default=1) |
| # Deprecated. Skylab tests retries once for infra failure |
| # same with swarming tests. For test failure retry, consider |
| # shard_level_retries_on_ctp. |
| retries = attrib(int, default=1) |
| # Maximum number to retry a failed shard. |
| # When set to zero, retries continue infinitely until timeout. |
| shard_level_retries_on_ctp = attrib(int, default=-1) |
| # The timeout for the test in second. Default is one hour. |
| timeout_sec = attrib(int, default=3600) |
| # The runtime timeout sent to the test execution environment. |
| max_run_sec = attrib(int, default=0) |
| |
| # Attributes for Tast First Class requests. |
| # On ChromeOS, these parameters are not limited to tast first class but all |
| # tests, so not using "tast" here. |
| cros_test_tags = attrib(sequence[str], default=()) |
| cros_test_tags_exclude = attrib(sequence[str], default=()) |
| cros_test_names = attrib(sequence[str], default=()) |
| cros_test_names_exclude = attrib(sequence[str], default=()) |
| cros_test_names_from_file = attrib(sequence[str], default=()) |
| cros_test_names_exclude_from_file = attrib(sequence[str], default=()) |
| cros_test_max_in_shard = attrib(int, default=0) |
| cros_ctp_suite_name = attrib(str, default='') |
| |
| # Generic arguments to pass to the test command run in skylab. |
| test_args = attrib(command_args, default=()) |
| |
| # The name of the autotest to be executed in Skylab. |
| # This is tied to an autotest control file that contains setup |
| # informations and runs the actual test. For tast test, an |
| # autotest wrapper is required. e.g. tast.lacros |
| autotest_name = attrib(str, default='') |
| |
| # Spec for the Multi-DUT tests. |
| secondary_cros_board = attrib(str, default='') |
| secondary_cros_img = attrib(str, default='') |
| secondary_cros_build_target = attrib(str, default='') |
| # Optional argument to control whether to provision browser files |
| # through `secondary_lacros_gcs_path` in the `crosfleet` command. |
| # If True, `skip` is put in `secondary_lacros_gcs_path` |
| # in the position corresponding to a DUT. |
| # e.g. [False,True] yield "skip,gs://path1" |
| # If this argument is empty, by default browser files are sent |
| # to all secondary DUTs. |
| # The length has to match that of `secondary_cros_board`. |
| should_provision_browser_files = attrib(sequence[bool], default=()) |
| |
| # Spec for telemetry tests. |
| benchmark = attrib(str, default='') |
| story_filter = attrib(str, default='') |
| results_label = attrib(str, default='') |
| test_shard_map_filename = attrib(str, default='') |
| |
| # For GPU specific args. |
| extra_browser_args = attrib(str, default='') |
| |
| # Strip ELF binary symbol before deploying Chrome to ChromeOS devices. |
| # This option make deployed binary have similar size to the real release |
| # build of ChromeOS. It won't impact any test results in theory. |
| # Currently only enabling for disk_usage_tests (b/40671387) to track metrics |
| # of more accurate disk usage of a ChromeOS with target Chrome browser. |
| strip_chrome = attrib(bool, default=False) |
| |
| @property |
| def test_class(self): |
| return SkylabTest |
| |
| |
| class SkylabTest(AbstractSkylabTest, Test): |
| |
| def __init__(self, spec, chromium_tests_api): |
| super().__init__(spec, chromium_tests_api) |
| # Dict of a cros_test_platform, aka CTP, build ID keyed with |
| # suffix. CTP build is the entrance of the CrOS hardware tests. |
| # which kicks off test_runner builds for our test suite. |
| # Each test suite invokes one CTP build, and the CTP build |
| # initiates test_runner builds for each shard. |
| self._ctp_build_ids = {} |
| # Dict of skylab.TestRunner list with suffix as the key. |
| # TestRunner represents the test execution in Skylab. For a suffix |
| # given, there should be one test runner to represent each shard. |
| self._test_runner_builds = {} |
| |
| # These fields represent the variables generated at the runtime. |
| self._lacros_gcs_path = '' |
| self._exe_rel_path = '' |
| self._build_output_dir = '' |
| self.telemetry_shard_index = None |
| |
| @property |
| def locality(self) -> TestLocality: |
| return TestLocality.SKYLAB |
| |
| @property |
| def is_tag_criteria_test(self) -> bool: |
| return bool(self.spec.cros_test_tags or self.spec.cros_test_tags_exclude or |
| self.spec.cros_test_names or |
| self.spec.cros_test_names_exclude or |
| self.spec.cros_test_names_from_file or |
| self.spec.cros_test_names_exclude_from_file) |
| |
| @property |
| def is_GPU_test(self) -> bool: |
| return self.spec.autotest_name == 'chromium_Graphics' |
| |
| @property |
| def ctp_build_ids(self) -> dict[str, int]: |
| return self._ctp_build_ids |
| |
| @property |
| def test_runner_builds(self) -> dict[str, TestRunner]: |
| return self._test_runner_builds |
| |
| def did_complete(self, suffix) -> bool: |
| # Return false to trigger retry if any shard has infra failure. |
| if not self.test_runner_builds.get(suffix): |
| return False |
| for t in self.test_runner_builds[suffix]: |
| # If any attempt runner in the shard has a deterministic result, |
| # consider the shard done. |
| if not t.status in (common_pb2.FAILURE, common_pb2.SUCCESS): |
| return False |
| return True |
| |
| @property |
| def exe_rel_path(self) -> str: |
| return self._exe_rel_path |
| |
| @exe_rel_path.setter |
| def exe_rel_path(self, value: str) -> None: |
| self._exe_rel_path = value |
| |
| @property |
| def lacros_gcs_path(self) -> str: |
| return self._lacros_gcs_path |
| |
| @lacros_gcs_path.setter |
| def lacros_gcs_path(self, value: str) -> None: |
| self._lacros_gcs_path = value |
| |
| @property |
| def build_output_dir(self) -> str: |
| return self._build_output_dir |
| |
| @build_output_dir.setter |
| def build_output_dir(self, value: str) -> None: |
| self._build_output_dir = value |
| |
| def _raise_failed_nested_step(self, suffix, step, status, failure_msg): |
| step.status = status |
| step.step_text += failure_msg |
| self._update_failure_on_exit(suffix, True) |
| raise self.api.m.step.StepFailure(status) |
| |
| def get_invocation_names(self, suffix: str) -> Iterable[str]: |
| if build_id := self.ctp_build_ids.get(suffix): |
| return [f'invocations/build-{build_id}'] |
| return [] |
| |
| def pre_run(self, suffix: str, include_utr_instruction: bool = False) -> None: |
| retry_shards = [] |
| runtime_excluded_tests = [] |
| runtime_override_tests = [] |
| runtime_no_retry = False |
| if self.is_tag_criteria_test: |
| if suffix == 'without patch': |
| valid, failures = self.with_patch_failures_including_retry() |
| runtime_no_retry = True |
| assert valid, "Invalid with patch result should not trigger without patch" |
| runtime_override_tests = [ |
| t.removeprefix(self.test_id_prefix or '') for t in failures |
| ] |
| rdb_results = self._rdb_results.get( |
| self.api.m.test_utils.remove_retry_shards(suffix)) |
| if rdb_results: |
| for individual_test in rdb_results.all_tests: |
| if any(individual_test.expectednesses): |
| runtime_excluded_tests.append( |
| individual_test.test_name.removeprefix(self.test_id_prefix or |
| '')) |
| else: |
| for tr in self.test_runner_builds.get( |
| self.api.m.test_utils.remove_retry_shards(suffix), []): |
| # TODO(b/364830287): Change back to not status in [SUCCESS, FAILURE] |
| if not tr.status in [common_pb2.SUCCESS] and tr.shard >= 0: |
| retry_shards.append(tr.shard) |
| self.api.m.skylab.schedule_suite( |
| self, |
| suffix, |
| retry_shards=retry_shards, |
| runtime_no_retry=runtime_no_retry, |
| runtime_override_tests=runtime_override_tests, |
| runtime_excluded_tests=runtime_excluded_tests) |
| |
| self._add_instructions(suffix, include_utr_instruction) |
| |
| def run( |
| self, |
| checkout_dir: Path, |
| source_dir: Path, |
| build_dir: Path, |
| suffix: str, |
| info_messages: Iterable[str] = (), |
| ) -> None: |
| # There's no guarantee that a checkout exists for a skylab test, so |
| # checkout_dir and source_dir shouldn't be used |
| del checkout_dir, source_dir, build_dir |
| |
| with self.api.m.step.nest(self.step_name(suffix)) as step: |
| step.tags['resultdb.instruction.id'] = self._instructions_tag_for_suffix( |
| 'step', suffix) |
| self.api.m.skylab.fetch_test_runners(self, suffix) |
| |
| _present_info_messages(step, self, info_messages) |
| |
| bb_url = 'https://ci.chromium.org/b/%d' |
| rdb_results = self._rdb_results.get(suffix) |
| if rdb_results.total_tests_ran: |
| # If any test result was reported by RDB, the test run completed |
| # its lifecycle as expected. |
| self._update_failure_on_exit(suffix, False) |
| else: |
| if ctp_id := self.ctp_build_ids.get(suffix): |
| step.links['CTP Build'] = bb_url % ctp_id |
| |
| self._raise_failed_nested_step( |
| suffix, step, self.api.m.step.EXCEPTION, |
| 'Test did not run or failed to report to ResultDB.' |
| 'Check the CTP build for details.') |
| |
| if rdb_results.unexpected_failing_tests: |
| step.status = self.api.m.step.FAILURE |
| self._present_rdb_results(step, rdb_results, as_nested_step=True) |
| |
| # RDB may not collect all failures from test runners. E.g. |
| # infra failure on one shard and did not upload its results |
| # to RDB. So iterate all shards and raise an exception |
| # if any shard did not reach a deterministic success or |
| # failure. |
| shard_steps = [] |
| test_runners = sorted( |
| self.test_runner_builds.get(suffix, []), key=lambda x: x.shard) |
| for tr in test_runners: |
| with self.api.m.step.nest( |
| f'shard: #{tr.shard}' if tr.shard >= 0 else f'shard: #{tr.name}', |
| status='last') as shard_step: |
| if tr.status == common_pb2.FAILURE: |
| shard_step.status = self.api.m.step.FAILURE |
| elif tr.status != common_pb2.SUCCESS: |
| shard_step.status = self.api.m.step.EXCEPTION |
| if tr.url: |
| shard_step.links['test results'] = (f'{tr.url}/test-results') |
| if tr.log_url: |
| shard_step.links['debug log'] = tr.log_url |
| shard_steps.append(shard_step) |
| |
| if any(not s.status in [self.api.m.step.SUCCESS, self.api.m.step.FAILURE] |
| for s in shard_steps): |
| self._raise_failed_nested_step(suffix, step, self.api.m.step.EXCEPTION, |
| 'Some shards were unsuccessful.') |
| |
| def _add_instructions(self, suffix: str, include_utr_instruction: bool): |
| """Gets the reproduction instructions to be attached to the invocation""" |
| |
| if include_utr_instruction: |
| remote_dependency = self.api.m.repro_instructions.get_dependency( |
| r'bot_update') |
| builder = self.api.m.properties.get('orchestrator', {}).get( |
| 'builder_name', self.api.m.buildbucket.build.builder.builder) |
| remote_instruction = get_utr_instruction( |
| 'compile-and-test', |
| self.api.m.buildbucket.build.builder.project, |
| self.api.m.led.shadowed_bucket or |
| self.api.m.buildbucket.build.builder.bucket, |
| builder, |
| [self.name], |
| ) |
| self.api.m.repro_instructions.create_step_instruction( |
| self._instructions_tag_for_suffix('step', suffix), |
| f'{self.name} instructions', |
| remote_content=remote_instruction, |
| remote_dependency=remote_dependency, |
| ) |
| |
| test_invocations = [ |
| inv if '/' not in inv else inv.split('/')[1] |
| for inv in self.get_invocation_names(suffix) |
| ] |
| |
| if test_invocations: |
| remote_instruction = get_utr_instruction( |
| 'compile-and-test', |
| self.api.m.buildbucket.build.builder.project, |
| self.api.m.led.shadowed_bucket or |
| self.api.m.buildbucket.build.builder.bucket, |
| builder, |
| [self.name], |
| ) |
| self.api.m.repro_instructions.create_test_result_instruction( |
| self._instructions_tag_for_suffix('test', suffix), |
| f'{self.name} instructions', |
| test_invocations, |
| remote_content=remote_instruction, |
| remote_dependency=remote_dependency, |
| recursive=True, |
| ) |
| |
| def compile_targets(self) -> Iterable[str]: |
| t = [self.spec.target_name] |
| return t |