| # Copyright 2016 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Updates expectations and baselines when updating web-platform-tests. |
| |
| Specifically, this class fetches results from try bots for the current CL, then |
| (1) downloads new baseline files for any tests that can be rebaselined, and |
| (2) updates the generic TestExpectations file for any other failing tests. |
| """ |
| |
| import argparse |
| import contextlib |
| import copy |
| import logging |
| import os |
| import re |
| import shutil |
| import sys |
| import tempfile |
| from collections import defaultdict, namedtuple |
| |
| from blinkpy.common.memoized import memoized |
| from blinkpy.common.net.git_cl import GitCL |
| from blinkpy.common.path_finder import PathFinder |
| from blinkpy.common.system.executive import ScriptError |
| from blinkpy.common.system.log_utils import configure_logging |
| from blinkpy.w3c.wpt_manifest import WPTManifest, BASE_MANIFEST_NAME |
| from blinkpy.web_tests.models.test_expectations import ( |
| ParseError, SystemConfigurationRemover, TestExpectations) |
| from blinkpy.web_tests.models.typ_types import ResultType |
| from blinkpy.web_tests.port.android import ( |
| PRODUCTS, PRODUCTS_TO_EXPECTATION_FILE_PATHS, WPT_SMOKE_TESTS_FILE) |
| |
| _log = logging.getLogger(__name__) |
| |
| |
| # TODO(robertma): Investigate reusing web_tests.models.test_expectations and |
| # alike in this module. |
| |
| SimpleTestResult = namedtuple('SimpleTestResult', ['expected', 'actual', 'bug']) |
| |
| DesktopConfig = namedtuple('DesktopConfig', ['port_name']) |
| |
| |
| class WPTExpectationsUpdater(object): |
| MARKER_COMMENT = '# ====== New tests from wpt-importer added here ======' |
| UMBRELLA_BUG = 'crbug.com/626703' |
| |
| def __init__(self, host, args=None, wpt_manifests=None): |
| self.host = host |
| self.port = self.host.port_factory.get() |
| self.finder = PathFinder(self.host.filesystem) |
| self.git_cl = GitCL(host) |
| self.git = self.host.git(self.finder.chromium_base()) |
| self.configs_with_no_results = [] |
| self.patchset = None |
| self.wpt_manifests = ( |
| wpt_manifests or |
| [self.port.wpt_manifest(d) for d in self.port.WPT_DIRS]) |
| |
| # Get options from command line arguments. |
| parser = argparse.ArgumentParser(description=__doc__) |
| self.add_arguments(parser) |
| self.options = parser.parse_args(args or []) |
| if not (self.options.clean_up_test_expectations or |
| self.options.clean_up_test_expectations_only): |
| assert not self.options.clean_up_affected_tests_only, ( |
| 'Cannot use --clean-up-affected-tests-only without using ' |
| '--clean-up-test-expectations or ' |
| '--clean-up-test-expectations-only') |
| # Set up TestExpectations instance which contains all |
| # expectations files associated with the platform. |
| expectations_dict = {p: self.host.filesystem.read_text_file(p) |
| for p in self.expectations_files()} |
| self._test_expectations = TestExpectations( |
| self.port, expectations_dict=expectations_dict) |
| self.testid_prefix = "ninja://:blink_wpt_tests/" |
| self.test_suite = "blink_wpt_tests" |
| |
| def expectations_files(self): |
| """Returns list of expectations files. |
| |
| Each expectation file in the list will be cleaned of expectations |
| for tests that were removed and will also have test names renamed |
| for tests that were renamed. Also the files may have their expectations |
| updated using builder results. |
| """ |
| return (list(self.port.all_expectations_dict().keys()) + |
| list(PRODUCTS_TO_EXPECTATION_FILE_PATHS.values())) |
| |
| def run(self): |
| """Does required setup before calling update_expectations(). |
| |
| Do not override this function! |
| """ |
| log_level = logging.DEBUG if self.options.verbose else logging.INFO |
| configure_logging(logging_level=log_level, include_time=True) |
| |
| self.patchset = self.options.patchset |
| |
| if (self.options.clean_up_test_expectations or |
| self.options.clean_up_test_expectations_only): |
| # Remove expectations for deleted tests and rename tests in |
| # expectations for renamed tests. |
| self.cleanup_test_expectations_files() |
| |
| if not self.options.clean_up_test_expectations_only: |
| # Use try job results to update expectations and baselines |
| self.update_expectations() |
| |
| return 0 |
| |
| def add_arguments(self, parser): |
| parser.add_argument( |
| '--patchset', |
| default=None, |
| help='Patchset number to fetch new baselines from.') |
| parser.add_argument( |
| '-v', |
| '--verbose', |
| action='store_true', |
| help='More verbose logging.') |
| parser.add_argument( |
| '--clean-up-test-expectations', |
| action='store_true', |
| help='Cleanup test expectations files.') |
| parser.add_argument( |
| '--clean-up-test-expectations-only', |
| action='store_true', |
| help='Clean up expectations and then exit script.') |
| parser.add_argument( |
| '--clean-up-affected-tests-only', |
| action='store_true', |
| help='Only cleanup expectations deleted or renamed in current CL. ' |
| 'If flag is not used then a full cleanup of deleted or ' |
| 'renamed tests will be done in expectations.') |
| parser.add_argument( |
| '--include-unexpected-pass', |
| action='store_true', |
| help='Adds Pass to tests with failure expectations. ' |
| 'This command line argument can be used to mark tests ' |
| 'as flaky.') |
| |
| def update_expectations_for_flag_specific(self, flag_specific): |
| """Downloads new baselines and adds test expectations lines. |
| |
| Returns: |
| A pair: A set of tests that are rebaselined, and a dictionary |
| mapping tests that couldn't be rebaselined to lists of expectation |
| lines written to flag specific test expectations. |
| """ |
| self.port.wpt_manifest.cache_clear() |
| |
| issue_number = self.get_issue_number() |
| if issue_number == 'None': |
| raise ScriptError('No issue on current branch.') |
| |
| if flag_specific == "disable-site-isolation-trials": |
| builder_names = ["linux-rel"] |
| test_suite = "not_site_per_process_blink_wpt_tests" |
| else: |
| builder_names = self.host.builders.all_flag_specific_try_builder_names( |
| flag_specific) |
| test_suite = "blink_wpt_tests" |
| |
| build_to_status = self.git_cl.latest_try_jobs( |
| builder_names=builder_names, |
| patchset=self.patchset) |
| if not build_to_status: |
| raise ScriptError('No try job information was collected.') |
| |
| # Here we build up a dict of failing test results for all platforms. |
| test_expectations = {} |
| for build, job_status in build_to_status.items(): |
| if (job_status.result == 'SUCCESS' and |
| not self.options.include_unexpected_pass): |
| continue |
| # Temporary logging for https://crbug.com/1154650 |
| result_dicts = self.get_failing_results_dicts(build, test_suite) |
| _log.info('Merging failing results dicts for %s', build) |
| for result_dict in result_dicts: |
| test_expectations = self.merge_dicts( |
| test_expectations, result_dict) |
| |
| generic_expectations = TestExpectations(self.port) |
| |
| # do not create baseline for flag-specific builders yet |
| # rebaselined_tests, test_expectations = self.download_text_baselines( |
| # test_expectations, flag_specific) |
| |
| # Do not create expectations for tests which should have baseline |
| _, test_expectations = self.get_tests_to_rebaseline( |
| test_expectations) |
| exp_lines_dict = self.write_to_test_expectations(test_expectations, |
| flag_specific, |
| generic_expectations) |
| return [], exp_lines_dict |
| |
| def update_expectations(self): |
| """Downloads new baselines and adds test expectations lines. |
| |
| Returns: |
| A pair: A set of tests that are rebaselined, and a dictionary |
| mapping tests that couldn't be rebaselined to lists of expectation |
| lines written to TestExpectations. |
| """ |
| # The wpt_manifest function in Port is cached by default, but may be out |
| # of date if this code is called during test import. An out of date |
| # manifest will cause us to mistreat newly added tests, as they will not |
| # exist in the cached manifest. To avoid this, we invalidate the cache |
| # here. See https://crbug.com/1154650 . |
| self.port.wpt_manifest.cache_clear() |
| |
| issue_number = self.get_issue_number() |
| if issue_number == 'None': |
| raise ScriptError('No issue on current branch.') |
| |
| build_to_status = self.get_latest_try_jobs(True) |
| _log.debug('Latest try jobs: %r', build_to_status) |
| if not build_to_status: |
| raise ScriptError('No try job information was collected.') |
| |
| # Here we build up a dict of failing test results for all platforms. |
| test_expectations = {} |
| for build, job_status in build_to_status.items(): |
| if (job_status.result == 'SUCCESS' and |
| not self.options.include_unexpected_pass): |
| continue |
| # Temporary logging for https://crbug.com/1154650 |
| result_dicts = self.get_failing_results_dicts(build, self.test_suite) |
| _log.info('Merging failing results dicts for %s', build) |
| for result_dict in result_dicts: |
| test_expectations = self.merge_dicts( |
| test_expectations, result_dict) |
| |
| # At this point, test_expectations looks like: { |
| # 'test-with-failing-result': { |
| # config1: SimpleTestResult, |
| # config2: SimpleTestResult, |
| # config3: AnotherSimpleTestResult |
| # } |
| # } |
| |
| self.add_results_for_configs_without_results( |
| test_expectations, self.configs_with_no_results) |
| |
| # And then we merge results for different platforms that had the same results. |
| for test_name, platform_result in test_expectations.items(): |
| # platform_result is a dict mapping platforms to results. |
| test_expectations[test_name] = self.merge_same_valued_keys( |
| platform_result) |
| |
| # At this point, test_expectations looks like: { |
| # 'test-with-failing-result': { |
| # (config1, config2): SimpleTestResult, |
| # (config3,): AnotherSimpleTestResult |
| # } |
| # } |
| |
| rebaselined_tests, test_expectations = self.download_text_baselines( |
| test_expectations) |
| exp_lines_dict = self.write_to_test_expectations(test_expectations) |
| return rebaselined_tests, exp_lines_dict |
| |
| def add_results_for_configs_without_results(self, test_expectations, |
| configs_with_no_results): |
| # Handle any platforms with missing results. |
| def os_name(port_name): |
| # Port names are typically "os-os_version". So we can grab the os by |
| # taking everything up to the '-' |
| if '-' not in port_name: |
| return port_name |
| return port_name[:port_name.rfind('-')] |
| |
| # When a config has no results, we try to guess at what its results are |
| # based on other results. We prefer to use results from other builds on |
| # the same OS, but fallback to all other builders otherwise (eg: there |
| # is usually only one Linux). |
| # In both cases, we union the results across the other builders (whether |
| # same OS or all builders), so we are usually over-expecting. |
| for config_no_result in configs_with_no_results: |
| _log.warning("No results for %s, inheriting from other builds" % |
| str(config_no_result)) |
| for test_name in test_expectations: |
| # The union of all other actual statuses is used when there is |
| # no similar OS to inherit from (eg: no results on Linux, and |
| # inheriting from Mac and Win). |
| union_actual_all = set() |
| # The union of statuses on the same OS is used when there are |
| # multiple versions of the same OS with results (eg: no results |
| # on Mac10.12, and inheriting from Mac10.15 and Mac11) |
| union_actual_sameos = set() |
| config_result_dict = test_expectations[test_name] |
| for config in config_result_dict.keys(): |
| result = config_result_dict[config] |
| union_actual_all.add(result.actual) |
| if os_name(config.port_name) == os_name( |
| config_no_result.port_name): |
| union_actual_sameos.add(result.actual) |
| |
| statuses = union_actual_sameos or union_actual_all |
| union_result = SimpleTestResult(expected="", |
| actual=" ".join(sorted(statuses)), |
| bug=self.UMBRELLA_BUG) |
| _log.debug("Inheriting result for test %s on config %s. " |
| "Same-os? %s Result: %s." % |
| (test_name, config_no_result, |
| len(union_actual_sameos) > 0, union_result)) |
| test_expectations[test_name][config_no_result] = union_result |
| |
| def get_issue_number(self): |
| """Returns current CL number. Can be replaced in unit tests.""" |
| return self.git_cl.get_issue_number() |
| |
| def get_latest_try_jobs(self, exclude_flag_specific): |
| """Returns the latest finished try jobs as Build objects.""" |
| builder_names = self._get_try_bots() |
| if exclude_flag_specific: |
| all_flag_specific = self.host.builders.all_flag_specific_try_builder_names("*") |
| builder_names = [b for b in builder_names if b not in all_flag_specific] |
| |
| return self.git_cl.latest_try_jobs(builder_names=builder_names, |
| patchset=self.patchset) |
| |
| def get_failing_results_dicts(self, build, test_suite): |
| """Returns a list of nested dicts of failing test results. |
| |
| Retrieves a full list of web test results from a builder result URL. |
| Collects the builder name, platform and a list of tests that did not |
| run as expected. |
| |
| Args: |
| build: A Build object. |
| |
| Returns: |
| A list of dictionaries that have the following structure. |
| |
| { |
| 'test-with-failing-result': { |
| config: SimpleTestResult |
| } |
| } |
| |
| If results could be fetched but none are failing, |
| this will return an empty list. |
| """ |
| |
| func = lambda x: (x["variant"]["def"]["test_suite"] == test_suite) |
| test_results_list = [] |
| predicate = {"expectancy": "VARIANTS_WITH_ONLY_UNEXPECTED_RESULTS"} |
| rv = self.host.results_fetcher.fetch_results_from_resultdb(self.host, [build], predicate) |
| rv = list(filter(func, rv)) |
| if not self.options.include_unexpected_pass: |
| # if a test first fail then passed unexpectedly |
| passed_test_ids = set([r["testId"] for r in rv if r["status"] == "PASS"]) |
| rv = [r for r in rv if r["testId"] not in passed_test_ids] |
| # only create test expectations for tests that had enough retries, |
| # so that we don't create excessive test expectations due to bot |
| # issues. |
| test_ids = [r["testId"] for r in rv] |
| rv = [r for r in rv if test_ids.count(r["testId"]) >= 3] |
| else: |
| passed_test_ids = set([r["testId"] for r in rv if r["status"] == "PASS"]) |
| test_ids = [r["testId"] for r in rv] |
| rv = [r for r in rv if r["testId"] in passed_test_ids or test_ids.count(r["testId"]) >= 3] |
| |
| test_results_list.extend(rv) |
| |
| has_webdriver_tests = self.host.builders.has_webdriver_tests_for_builder( |
| build.builder_name) |
| |
| webdriver_test_results = [] |
| if has_webdriver_tests: |
| mst = self.host.builders.main_for_builder(build.builder_name) |
| webdriver_test_results.append( |
| self.host.results_fetcher.fetch_webdriver_test_results( |
| build, mst)) |
| |
| webdriver_test_results = filter(None, webdriver_test_results) |
| if not test_results_list and not webdriver_test_results: |
| _log.warning('No results for build %s', build) |
| self.configs_with_no_results.extend(self.get_builder_configs(build)) |
| return [] |
| |
| unexpected_test_results = [] |
| unexpected_test_results.append( |
| self.generate_failing_results_dict_from_resultdb( |
| build, test_results_list)) |
| |
| for results_set in webdriver_test_results: |
| results_dict = self.generate_failing_results_dict( |
| build, results_set) |
| if results_dict: |
| unexpected_test_results.append(results_dict) |
| unexpected_test_results = filter(None, unexpected_test_results) |
| return unexpected_test_results |
| |
| def _get_web_test_results(self, build): |
| """Gets web tests results for a builder. |
| |
| Args: |
| build: Named tuple containing builder name and number |
| |
| Returns: |
| List of web tests results for each web test step |
| in build. |
| """ |
| return [self.host.results_fetcher.fetch_results(build)] |
| |
| def get_builder_configs(self, build, *_): |
| return [DesktopConfig(port_name=self.port_name(build))] |
| |
| @memoized |
| def port_name(self, build): |
| return self.host.builders.port_name_for_builder_name( |
| build.builder_name) |
| |
| def generate_failing_results_dict_from_resultdb(self, build, results_list): |
| """Makes a dict with results for one platform. |
| |
| Args: |
| builder: Builder instance containing builder information.. |
| results_list: A list of results retrieved from ResultDB |
| |
| Returns: |
| A dictionary with the structure: { |
| 'test-name': { |
| ('full-port-name',): SimpleTestResult |
| } |
| } |
| """ |
| configs = self.get_builder_configs(build) |
| if len(configs) > 1: |
| raise ScriptError('More than one configs were produced for' |
| ' builder and web tests step combination') |
| if not configs: |
| raise ScriptError('No configuration was found for builder and web test' |
| ' step combination ') |
| config = configs[0] |
| test_dict = defaultdict(set) |
| for result in results_list: |
| test_name = result["testId"][len(self.testid_prefix):] |
| if not self._is_wpt_test(test_name): |
| continue |
| if result["status"] == "SKIP": |
| continue |
| status = "TIMEOUT" if result["status"] == "ABORT" else result["status"] |
| if status == "PASS" and not self.options.include_unexpected_pass: |
| continue |
| test_dict[test_name].add(status) |
| |
| rv = {} |
| for test_name, result in test_dict.items(): |
| rv[test_name] = { |
| config: |
| # Note: we omit `expected` so that existing expectation lines |
| # don't prevent us from merging current results across platform. |
| # Eg: if a test FAILs everywhere, it should not matter that it |
| # has a pre-existing TIMEOUT expectation on Win7. This code is |
| # not currently capable of updating that existing expectation. |
| SimpleTestResult(expected="", |
| actual=" ".join(sorted(result)), |
| bug=self.UMBRELLA_BUG) |
| } |
| return rv or None |
| |
| def generate_failing_results_dict(self, build, web_test_results): |
| """Makes a dict with results for one platform. |
| |
| Args: |
| builder: Builder instance containing builder information.. |
| web_test_results: A list of WebTestResult objects. |
| |
| Returns: |
| A dictionary with the structure: { |
| 'test-name': { |
| ('full-port-name',): SimpleTestResult |
| } |
| } |
| """ |
| test_dict = {} |
| configs = self.get_builder_configs(build, web_test_results) |
| _log.debug( |
| 'Getting failing results dictionary for %s step in latest %s build', |
| web_test_results.step_name(), build.builder_name) |
| |
| if len(configs) > 1: |
| raise ScriptError('More than one configs were produced for' |
| ' builder and web tests step combination') |
| if not configs: |
| raise ScriptError('No configuration was found for builder and web test' |
| ' step combination ') |
| config = configs[0] |
| for result in web_test_results.didnt_run_as_expected_results(): |
| # TODO(rmhasan) If a test fails unexpectedly then it runs multiple |
| # times until, it passes or a retry limit is reached. Even though |
| # it passed we there are still flaky failures that we are not |
| # creating test expectations for. Maybe we should add a mode |
| # which creates expectations for tests that are flaky but still |
| # pass in a web test step. |
| |
| # Create flaky expectations for flaky tests on Android. In order to |
| # do this we should add 'Pass' to all tests with failing |
| # expectations that pass in the patchset's try job. |
| if result.did_pass() and not self.options.include_unexpected_pass: |
| continue |
| |
| test_name = result.test_name() |
| if not self._is_wpt_test(test_name): |
| continue |
| test_dict[test_name] = { |
| config: |
| # Note: we omit `expected` so that existing expectation lines |
| # don't prevent us from merging current results across platform. |
| # Eg: if a test FAILs everywhere, it should not matter that it |
| # has a pre-existing TIMEOUT expectation on Win7. This code is |
| # not currently capable of updating that existing expectation. |
| SimpleTestResult(expected="", |
| actual=result.actual_results(), |
| bug=self.UMBRELLA_BUG) |
| } |
| return test_dict |
| |
| def _is_wpt_test(self, test_name): |
| """Check if a web test is a WPT tests. |
| |
| In blink web tests results, each test name is relative to |
| the web_tests directory instead of the wpt directory. We |
| need to use the port.is_wpt_test() function to find out if a test |
| is from the WPT suite. |
| |
| Returns: True if a test is in the external/wpt subdirectory of |
| the web_tests directory.""" |
| return self.port.is_wpt_test(test_name) |
| |
| def merge_dicts(self, target, source, path=None): |
| """Recursively merges nested dictionaries. |
| |
| Args: |
| target: First dictionary, which is updated based on source. |
| source: Second dictionary, not modified. |
| path: A list of keys, only used for making error messages. |
| |
| Returns: |
| The updated target dictionary. |
| """ |
| path = path or [] |
| for key in source: |
| if key in target: |
| if (isinstance(target[key], dict)) and isinstance( |
| source[key], dict): |
| self.merge_dicts(target[key], source[key], |
| path + [str(key)]) |
| elif target[key] == source[key]: |
| pass |
| else: |
| # We have two different SimpleTestResults for the same test |
| # from two different builders. This can happen when a CQ bot |
| # and a blink-rel bot run on the same platform. We union the |
| # actual statuses from both builders. |
| _log.info( |
| "Joining differing results for path %s, key %s\n target:%s\nsource:%s" |
| % (path, key, target[key], source[key])) |
| target[key] = SimpleTestResult( |
| expected=target[key].expected, |
| actual='%s %s' % |
| (target[key].actual, source[key].actual), |
| bug=target[key].bug) |
| else: |
| target[key] = source[key] |
| return target |
| |
| def merge_same_valued_keys(self, dictionary): |
| """Merges keys in dictionary with same value. |
| |
| Traverses through a dict and compares the values of keys to one another. |
| If the values match, the keys are combined to a tuple and the previous |
| keys are removed from the dict. |
| |
| Args: |
| dictionary: A dictionary with a dictionary as the value. |
| |
| Returns: |
| A new dictionary with updated keys to reflect matching values of keys. |
| Example: { |
| 'one': {'foo': 'bar'}, |
| 'two': {'foo': 'bar'}, |
| 'three': {'foo': 'bar'} |
| } |
| is converted to a new dictionary with that contains |
| {('one', 'two', 'three'): {'foo': 'bar'}} |
| """ |
| merged_dict = {} |
| matching_value_keys = set() |
| keys = sorted(dictionary.keys()) |
| while keys: |
| current_key = keys[0] |
| found_match = False |
| if current_key == keys[-1]: |
| merged_dict[tuple([current_key])] = dictionary[current_key] |
| keys.remove(current_key) |
| break |
| current_result_set = set(dictionary[current_key].actual.split()) |
| for next_item in keys[1:]: |
| if (current_result_set == |
| set(dictionary[next_item].actual.split())): |
| found_match = True |
| matching_value_keys.update([current_key, next_item]) |
| |
| if next_item == keys[-1]: |
| if found_match: |
| merged_dict[ |
| tuple(sorted(matching_value_keys))] = dictionary[current_key] |
| keys = [ |
| k for k in keys if k not in matching_value_keys |
| ] |
| else: |
| merged_dict[tuple([current_key])] = dictionary[current_key] |
| keys.remove(current_key) |
| matching_value_keys = set() |
| return merged_dict |
| |
| def get_expectations(self, result, test_name=''): |
| """Returns a set of test expectations based on the result of a test. |
| |
| Returns a set of one or more test expectations based on the expected |
| and actual results of a given test name. This function is to decide |
| expectations for tests that could not be rebaselined. |
| |
| Args: |
| result: A SimpleTestResult. |
| test_name: The test name string (optional). |
| |
| Returns: |
| A set of one or more test expectation strings with the first letter |
| capitalized. Example: {'Failure', 'Timeout'}. |
| """ |
| actual_results = set(result.actual.split()) |
| |
| # If the result is MISSING, this implies that the test was not |
| # rebaselined and has an actual result but no baseline. We can't |
| # add a Missing expectation (this is not allowed), but no other |
| # expectation is correct. |
| if 'MISSING' in actual_results: |
| return {'Skip'} |
| expectations = set() |
| failure_types = {'TEXT', 'IMAGE+TEXT', 'IMAGE', 'AUDIO', 'FAIL'} |
| other_types = {'TIMEOUT', 'CRASH', 'PASS'} |
| for actual in actual_results: |
| if actual in failure_types: |
| expectations.add('Failure') |
| if actual in other_types: |
| expectations.add(actual.capitalize()) |
| return expectations |
| |
| def remove_configurations(self, configs_to_remove): |
| """Removes configs from test expectations files for some tests |
| |
| Args: |
| configs_to_remove: A dict maps test names to set of os versions: |
| { |
| 'test-with-failing-result': ['os1', 'os2', ...] |
| } |
| Returns: None |
| """ |
| # SystemConfigurationRemover now only works on generic test expectations |
| # files. This is good enough for now |
| path = self.port.path_to_generic_test_expectations_file() |
| test_expectations = TestExpectations( |
| self.port, |
| expectations_dict={ |
| path: self.host.filesystem.read_text_file(path), |
| }) |
| system_remover = SystemConfigurationRemover(self.host.filesystem, test_expectations) |
| for test, versions in configs_to_remove.items(): |
| system_remover.remove_os_versions(test, versions) |
| system_remover.update_expectations() |
| |
| def create_line_dict_for_flag_specific(self, merged_results, generic_expectations): |
| """Creates list of test expectations lines for flag specific builder. |
| |
| Traverses through the given |merged_results| dictionary and parses the |
| value to create one test expectations line per key. If a test expectation |
| from generic expectations can be inherited, we will reuse that expectation |
| so that we can keep the file size small. Flag specific expectations |
| does not use platform tag, so we don't need handle conflicts either. |
| |
| Test expectation lines have the following format: |
| ['BUG_URL TEST_NAME [EXPECTATION(S)]'] |
| |
| Args: |
| merged_results: A dictionary with the format: |
| { |
| 'test-with-failing-result': { |
| (config1,): SimpleTestResult |
| } |
| } |
| |
| Returns: |
| line_dict: A dictionary from test names to a list of test |
| expectation lines |
| (each SimpleTestResult turns into a line). |
| configs_to_remove: An empty dictionary |
| """ |
| line_dict = defaultdict(list) |
| for test_name, test_results in sorted(merged_results.items()): |
| if not self._is_wpt_test(test_name): |
| _log.warning( |
| 'Non-WPT test "%s" unexpectedly passed to create_line_dict.', |
| test_name) |
| continue |
| expectation_line = generic_expectations.get_expectations(test_name) |
| expectations = expectation_line.results |
| for configs, result in sorted(test_results.items()): |
| new_expectations = self.get_expectations(result, test_name) |
| if 'Failure' in new_expectations: |
| new_expectations.remove('Failure') |
| new_expectations.add('FAIL') |
| if new_expectations != expectations: |
| line_dict[test_name].extend( |
| self._create_lines(test_name, [], result)) |
| # for flag-specific builders, we always have one config for each |
| # test, so quit the loop here |
| break |
| |
| return line_dict, {} |
| |
| |
| def create_line_dict(self, merged_results): |
| """Creates list of test expectations lines. |
| |
| Traverses through the given |merged_results| dictionary and parses the |
| value to create one test expectations line per key. |
| |
| Test expectation lines have the following format: |
| ['BUG_URL [PLATFORM(S)] TEST_NAME [EXPECTATION(S)]'] |
| |
| Args: |
| merged_results: A dictionary with the format: |
| { |
| 'test-with-failing-result': { |
| (config1, config2): SimpleTestResult, |
| (config3,): SimpleTestResult |
| } |
| } |
| |
| Returns: |
| line_dict: A dictionary from test names to a list of test |
| expectation lines |
| (each SimpleTestResult turns into a line). |
| configs_to_remove: A dictionary from test names to a set |
| of os specifiers |
| """ |
| line_dict = defaultdict(list) |
| configs_to_remove = defaultdict(set) |
| for test_name, test_results in sorted(merged_results.items()): |
| if not self._is_wpt_test(test_name): |
| _log.warning( |
| 'Non-WPT test "%s" unexpectedly passed to create_line_dict.', |
| test_name) |
| continue |
| for configs, result in sorted(test_results.items()): |
| line_dict[test_name].extend( |
| self._create_lines(test_name, configs, result)) |
| for config in configs: |
| configs_to_remove[test_name].add( |
| self.host.builders.version_specifier_for_port_name( |
| config.port_name)) |
| |
| return line_dict, configs_to_remove |
| |
| def _create_lines(self, test_name, configs, result): |
| """Constructs test expectation line strings. |
| |
| Args: |
| test_name: The test name string. |
| configs: A list of full configs that the line should apply to. |
| result: A SimpleTestResult. |
| |
| Returns: |
| A list of strings which each is a line of test expectation for given |
| |test_name|. |
| """ |
| lines = [] |
| |
| expectations = '[ %s ]' % \ |
| ' '.join(self.get_expectations(result, test_name)) |
| for specifier in self.normalized_specifiers(test_name, configs): |
| line_parts = [] |
| if specifier: |
| line_parts.append('[ %s ]' % specifier) |
| # Escape literal asterisks for typ (https://crbug.com/1036130). |
| # TODO(weizhong): consider other escapes we added recently |
| line_parts.append(test_name.replace('*', '\\*')) |
| line_parts.append(expectations) |
| |
| # Only add the bug link if the expectations do not include SKIP. |
| if 'Skip' not in expectations and result.bug: |
| line_parts.insert(0, result.bug) |
| |
| lines.append(' '.join(line_parts)) |
| return lines |
| |
| def normalized_specifiers(self, test_name, configs): |
| """Converts and simplifies ports into platform specifiers. |
| |
| Args: |
| test_name: The test name string. |
| configs: A list of full configs that the line should apply to. |
| |
| Returns: |
| A list of specifier string, e.g. ["Mac", "Win"]. |
| [''] will be returned if the line should apply to all platforms. |
| """ |
| if not configs: |
| return [''] |
| |
| specifiers = [] |
| for config in configs: |
| specifiers.append( |
| self.host.builders.version_specifier_for_port_name( |
| config.port_name)) |
| |
| if self.specifiers_can_extend_to_all_platforms(specifiers, test_name): |
| return [''] |
| |
| specifiers = self.simplify_specifiers( |
| specifiers, self.port.configuration_specifier_macros()) |
| if not specifiers: |
| return [''] |
| return specifiers |
| |
| def specifiers_can_extend_to_all_platforms(self, specifiers, test_name): |
| """Tests whether a list of specifiers can be extended to all platforms. |
| |
| Tries to add skipped platform specifiers to the list and tests if the |
| extended list covers all platforms. |
| """ |
| extended_specifiers = specifiers + self.skipped_specifiers(test_name) |
| # If the list is simplified to empty, then all platforms are covered. |
| return not self.simplify_specifiers( |
| extended_specifiers, self.port.configuration_specifier_macros()) |
| |
| def skipped_specifiers(self, test_name): |
| """Returns a list of platform specifiers for which the test is skipped.""" |
| specifiers = [] |
| for port in self.all_try_builder_ports(): |
| if port.skips_test(test_name): |
| specifiers.append( |
| self.host.builders.version_specifier_for_port_name( |
| port.name())) |
| return specifiers |
| |
| @memoized |
| def all_try_builder_ports(self): |
| """Returns a list of Port objects for all try builders.""" |
| return [ |
| self.host.port_factory.get_from_builder_name(name) |
| for name in self._get_try_bots() |
| ] |
| |
| def simplify_specifiers(self, specifiers, specifier_macros): |
| """Simplifies the specifier part of an expectation line if possible. |
| |
| "Simplifying" means finding the shortest list of platform specifiers |
| that is equivalent to the given list of specifiers. This can be done |
| because there are "macro specifiers" that stand in for multiple version |
| specifiers, and an empty list stands in for "all platforms". |
| |
| Args: |
| specifiers: A collection of specifiers (case insensitive). |
| specifier_macros: A dict mapping "macros" for groups of specifiers |
| to lists of version specifiers. e.g. {"win": ["win10", "win11"]}. |
| If there are versions in this dict for that have no corresponding |
| try bots, they are ignored. |
| |
| Returns: |
| A shortened list of specifiers (capitalized). For example, ["win10", |
| "win11"] would be converted to ["Win"]. If the given list covers |
| all supported platforms, then an empty list is returned. |
| """ |
| specifiers = {s.lower() for s in specifiers} |
| covered_by_try_bots = self._platform_specifiers_covered_by_try_bots() |
| for macro, versions in specifier_macros.items(): |
| macro = macro.lower() |
| |
| # Only consider version specifiers that have corresponding try bots. |
| versions = { |
| s.lower() |
| for s in versions if s.lower() in covered_by_try_bots |
| } |
| if len(versions) == 0: |
| continue |
| if versions <= specifiers: |
| specifiers -= versions |
| specifiers.add(macro) |
| if specifiers == {macro.lower() for macro in specifier_macros}: |
| return [] |
| return sorted(specifier.capitalize() for specifier in specifiers) |
| |
| def _platform_specifiers_covered_by_try_bots(self): |
| all_platform_specifiers = set() |
| for builder_name in self._get_try_bots(): |
| all_platform_specifiers.add( |
| self.host.builders.platform_specifier_for_builder( |
| builder_name).lower()) |
| return frozenset(all_platform_specifiers) |
| |
| def write_to_test_expectations(self, test_expectations, |
| flag_specific=None, |
| generic_expectations=None): |
| """Writes the given lines to the TestExpectations file. |
| |
| The place in the file where the new lines are inserted is after a marker |
| comment line. If this marker comment line is not found, then everything |
| including the marker line is appended to the end of the file. |
| |
| All WontFix tests are inserted to NeverFixTests file instead of TextExpectations |
| file. |
| |
| Args: |
| test_expectations: A dictionary mapping test names to a dictionary |
| mapping platforms and test results. |
| Returns: |
| Dictionary mapping test names to lists of test expectation strings. |
| """ |
| if flag_specific: |
| line_dict, configs_to_remove = self.create_line_dict_for_flag_specific( |
| test_expectations, generic_expectations) |
| else: |
| line_dict, configs_to_remove = self.create_line_dict(test_expectations) |
| if not line_dict: |
| _log.info( |
| 'No lines to write to %s, WebdriverExpectations' |
| ' or NeverFixTests.' % (flag_specific or 'TestExpectations') |
| ) |
| return {} |
| |
| if configs_to_remove: |
| _log.info('Clean up stale expectations that' |
| ' could conflict with new expectations') |
| self.remove_configurations(configs_to_remove) |
| |
| line_list = [] |
| wont_fix_list = [] |
| webdriver_list = [] |
| for lines in line_dict.values(): |
| for line in lines: |
| if self.finder.webdriver_prefix() in line: |
| webdriver_list.append(line) |
| else: |
| line_list.append(line) |
| |
| if flag_specific: |
| list_to_expectation = { |
| self.port.path_to_flag_specific_expectations_file(flag_specific): line_list |
| } |
| else: |
| list_to_expectation = { |
| self.port.path_to_generic_test_expectations_file(): line_list, |
| self.port.path_to_webdriver_expectations_file(): webdriver_list |
| } |
| for expectations_file_path, lines in list_to_expectation.items(): |
| if not lines: |
| continue |
| |
| _log.info('Lines to write to %s:\n %s', expectations_file_path, |
| '\n'.join(lines)) |
| # Writes to TestExpectations file. |
| file_contents = self.host.filesystem.read_text_file( |
| expectations_file_path) |
| |
| marker_comment_index = file_contents.find(self.MARKER_COMMENT) |
| if marker_comment_index == -1: |
| file_contents += '\n%s\n' % self.MARKER_COMMENT |
| file_contents += '\n'.join(lines) |
| else: |
| end_of_marker_line = (file_contents[marker_comment_index:]. |
| find('\n')) + marker_comment_index |
| file_contents = ( |
| file_contents[:end_of_marker_line + 1] + '\n'.join(lines) + |
| file_contents[end_of_marker_line:]) |
| |
| self.host.filesystem.write_text_file(expectations_file_path, |
| file_contents) |
| |
| # only write to NeverFixTests for the generic round |
| if wont_fix_list and flag_specific is None: |
| _log.info('Lines to write to NeverFixTests:\n %s', |
| '\n'.join(wont_fix_list)) |
| # Writes to NeverFixTests file. |
| wont_fix_path = self.port.path_to_never_fix_tests_file() |
| wont_fix_file_content = self.host.filesystem.read_text_file( |
| wont_fix_path) |
| if not wont_fix_file_content.endswith('\n'): |
| wont_fix_file_content += '\n' |
| wont_fix_file_content += '\n'.join(wont_fix_list) |
| wont_fix_file_content += '\n' |
| self.host.filesystem.write_text_file(wont_fix_path, |
| wont_fix_file_content) |
| return line_dict |
| |
| @contextlib.contextmanager |
| def prepare_smoke_tests(self, chromium_git): |
| """List test cases that should be run by the smoke test builder |
| |
| Add new and modified test cases to WPT_SMOKE_TESTS_FILE, |
| builder android-weblayer-pie-x86-wpt-smoketest will run those |
| tests. wpt-importer will generate initial expectations for weblayer |
| based on the result. Save and restore WPT_SMOKE_TESTS_FILE as |
| necessary. |
| """ |
| _log.info('Backup file WPTSmokeTestCases.') |
| temp_dir = tempfile.gettempdir() |
| base_path = os.path.basename(WPT_SMOKE_TESTS_FILE) |
| self._saved_test_cases_file = os.path.join(temp_dir, base_path) |
| shutil.copyfile(WPT_SMOKE_TESTS_FILE, self._saved_test_cases_file) |
| tests = (self._list_add_files() |
| + self._list_modified_files()) |
| |
| manifest_path = os.path.join( |
| self.finder.web_tests_dir(), "external", BASE_MANIFEST_NAME) |
| manifest = WPTManifest(self.host, manifest_path) |
| with open(WPT_SMOKE_TESTS_FILE, 'w') as out_file: |
| _log.info('Test cases to run on smoke test builder:') |
| prelen = len('external/wpt/') |
| for test in sorted(tests): |
| if test.startswith('external/wpt/') \ |
| and manifest.is_test_file(test[prelen:]): |
| # path should be the relative path of the test case to |
| # out/Release dir, as that will be same as that on swarming |
| # infra. We use path instead of the test case name, because |
| # that is needed to correctly run cases in js files |
| path = '../../third_party/blink/web_tests/' + test |
| _log.info(' ' + path) |
| out_file.write(path + '\n') |
| try: |
| yield |
| finally: |
| _log.info('Restore file WPTSmokeTestCases.') |
| shutil.copyfile(self._saved_test_cases_file, WPT_SMOKE_TESTS_FILE) |
| chromium_git.commit_locally_with_message('Restore WPTSmokeTestCases') |
| |
| def skip_slow_timeout_tests(self, port): |
| """Skip any Slow and Timeout tests found in TestExpectations. |
| """ |
| _log.info('Skip Slow and Timeout tests.') |
| try: |
| test_expectations = TestExpectations(port) |
| except ParseError as err: |
| _log.warning('Error when parsing TestExpectations\n%s', str(err)) |
| return False |
| |
| path = port.path_to_generic_test_expectations_file() |
| changed = False |
| for line in test_expectations.get_updated_lines(path): |
| if not line.test or line.is_glob: |
| continue |
| if (ResultType.Timeout in line.results and |
| len(line.results) == 1 and |
| (test_expectations.get_expectations(line.test).is_slow_test or |
| port.is_slow_wpt_test(line.test))): |
| test_expectations.remove_expectations(path, [line]) |
| line.add_expectations({ResultType.Skip}) |
| test_expectations.add_expectations(path, [line], line.lineno) |
| changed = True |
| if changed: |
| test_expectations.commit_changes() |
| return changed |
| |
| def cleanup_test_expectations_files(self): |
| """Removes deleted tests from expectations files. |
| |
| Removes expectations for deleted tests or renames test names in |
| expectation files for tests that were renamed. If the |
| --clean-up-affected-tests-only command line argument is used then |
| only tests deleted in the CL will have their expectations removed |
| through this script. If that command line argument is not used then |
| expectations for test files that no longer exist will be deleted. |
| """ |
| deleted_files = self._list_deleted_files() |
| renamed_files = self._list_renamed_files() |
| modified_files = self._list_modified_files() |
| |
| for path in self._test_expectations.expectations_dict: |
| _log.info('Updating %s for any removed or renamed tests.', |
| self.host.filesystem.basename(path)) |
| if path in PRODUCTS_TO_EXPECTATION_FILE_PATHS.values(): |
| # Also delete any expectations for modified test cases at |
| # android side to avoid any conflict |
| # TODO: consider keep the triaged expectations when results do |
| # not change |
| self._clean_single_test_expectations_file( |
| path, deleted_files + modified_files, renamed_files) |
| else: |
| self._clean_single_test_expectations_file( |
| path, deleted_files, renamed_files) |
| self._test_expectations.commit_changes() |
| |
| def _list_files(self, diff_filter): |
| paths = self.git.run( |
| ['diff', 'origin/main', '--diff-filter=' + diff_filter, |
| '--name-only']).splitlines() |
| files = [] |
| for p in paths: |
| rel_path = self._relative_to_web_test_dir(p) |
| if rel_path: |
| files.append(rel_path) |
| return files |
| |
| def _list_add_files(self): |
| return self._list_files('A') |
| |
| def _list_modified_files(self): |
| return self._list_files('M') |
| |
| def _list_deleted_files(self): |
| return self._list_files('D') |
| |
| def _list_renamed_files(self): |
| """Returns a dictionary mapping tests to their new name. |
| |
| Regardless of the command line arguments used this test will only |
| return a dictionary for tests affected in the current CL. |
| |
| Returns a dictionary mapping source name to destination name. |
| """ |
| out = self.git.run([ |
| 'diff', 'origin/main', '-M90%', '--diff-filter=R', |
| '--name-status' |
| ]) |
| renamed_tests = {} |
| for line in out.splitlines(): |
| try: |
| _, source_path, dest_path = line.split('\t') |
| except ValueError: |
| _log.info("ValueError for line: %s" % line) |
| continue |
| source_test = self._relative_to_web_test_dir(source_path) |
| dest_test = self._relative_to_web_test_dir(dest_path) |
| if source_test and dest_test: |
| renamed_tests[source_test] = dest_test |
| return renamed_tests |
| |
| def _clean_single_test_expectations_file( |
| self, path, deleted_files, renamed_files): |
| """Cleans up a single test expectations file. |
| |
| Args: |
| path: Path of expectations file that is being cleaned up. |
| deleted_files: List of file paths relative to the web tests |
| directory which were deleted. |
| renamed_files: Dictionary mapping file paths to their new file |
| name after renaming. |
| """ |
| deleted_files = set(deleted_files) |
| for line in self._test_expectations.get_updated_lines(path): |
| # if a test is a glob type expectation or empty line or comment then |
| # add it to the updated expectations file without modifications |
| if not line.test or line.is_glob: |
| continue |
| root_file = self._get_root_file(line.test) |
| if root_file in deleted_files: |
| self._test_expectations.remove_expectations(path, [line]) |
| elif root_file in renamed_files: |
| self._test_expectations.remove_expectations(path, [line]) |
| new_file_name = renamed_files[root_file] |
| if self.finder.is_webdriver_test_path(line.test): |
| _, subtest_suffix = self.port.split_webdriver_test_name(line.test) |
| line.test = self.port.add_webdriver_subtest_suffix( |
| new_file_name, subtest_suffix) |
| elif self.port.is_wpt_test(line.test): |
| # Based on logic in Base._wpt_test_urls_matching_paths |
| line.test = line.test.replace( |
| re.sub(r'\.js$', '.', root_file), |
| re.sub(r'\.js$', '.', new_file_name)) |
| else: |
| line.test = new_file_name |
| self._test_expectations.add_expectations( |
| path, [line], lineno=line.lineno) |
| elif not root_file or not self.port.test_isfile(root_file): |
| if not self.options.clean_up_affected_tests_only: |
| self._test_expectations.remove_expectations(path, [line]) |
| |
| @memoized |
| def _get_root_file(self, test_name): |
| """Finds the physical file in web tests directory for a test |
| |
| If a test is a WPT test then it will look in each of the WPT manifests |
| for the physical file. If test name cannot be found in any of the manifests |
| then the test no longer exists and the function will return None. If a file |
| is webdriver test then it will strip all subtest arguments and return the |
| file path. If a test is a legacy web test then it will return the test name. |
| |
| Args: |
| test_name: Test name which may include test arguments. |
| |
| Returns: |
| Returns the path of the physical file that backs |
| up a test. The path is relative to the web_tests directory. |
| """ |
| if self.finder.is_webdriver_test_path(test_name): |
| root_test_file, _ = ( |
| self.port.split_webdriver_test_name(test_name)) |
| return root_test_file |
| elif self.port.is_wpt_test(test_name): |
| for wpt_manifest in self.wpt_manifests: |
| if test_name.startswith(wpt_manifest.wpt_dir): |
| wpt_test = test_name[len(wpt_manifest.wpt_dir) + 1:] |
| if wpt_manifest.is_test_url(wpt_test): |
| return self.host.filesystem.join( |
| wpt_manifest.wpt_dir, |
| wpt_manifest.file_path_for_test_url(wpt_test)) |
| # The test was not found in any of the wpt manifests, therefore |
| # the test does not exist. So we will return None in this case. |
| return None |
| else: |
| # Non WPT and non webdriver tests have no file parameters, and |
| # the physical file path is the actual name of the test. |
| return test_name |
| |
| def _relative_to_web_test_dir(self, path_relative_to_repo_root): |
| """Returns a path that's relative to the web tests directory.""" |
| abs_path = self.finder.path_from_chromium_base( |
| path_relative_to_repo_root) |
| if not abs_path.startswith(self.finder.web_tests_dir()): |
| return None |
| return self.host.filesystem.relpath( |
| abs_path, self.finder.web_tests_dir()) |
| |
| # TODO(robertma): Unit test this method. |
| def download_text_baselines(self, test_results, flag_specific=None): |
| """Fetches new baseline files for tests that should be rebaselined. |
| |
| Invokes `blink_tool.py rebaseline-cl` in order to download new baselines |
| (-expected.txt files) for testharness.js tests that did not crash or |
| time out. Then, the platform-specific test is removed from the overall |
| failure test dictionary and the resulting dictionary is returned. |
| |
| Args: |
| test_results: A dictionary of failing test results, mapping test |
| names to lists of platforms to SimpleTestResult. |
| |
| Returns: |
| A pair: A set of tests that are rebaselined, and a modified copy of |
| the test_results dictionary containing only tests that couldn't be |
| rebaselined. |
| """ |
| tests_to_rebaseline, test_results = self.get_tests_to_rebaseline( |
| test_results) |
| if not tests_to_rebaseline: |
| _log.info('No tests to rebaseline.') |
| return tests_to_rebaseline, test_results |
| _log.info('Tests to rebaseline:') |
| for test in tests_to_rebaseline: |
| _log.info(' %s', test) |
| |
| blink_tool = self.finder.path_from_blink_tools('blink_tool.py') |
| command = [ |
| sys.executable, |
| blink_tool, |
| 'rebaseline-cl', |
| '--no-trigger-jobs', |
| '--fill-missing', |
| ] |
| if flag_specific: |
| command.append('--flag-specific=' + flag_specific) |
| if self.options.verbose: |
| command.append('--verbose') |
| if self.patchset: |
| command.append('--patchset=' + str(self.patchset)) |
| command += tests_to_rebaseline |
| rebaseline_output = self.host.executive.run_command(command) |
| _log.info( |
| "Output of rebaseline-cl:\n%s\n--end of rebaseline-cl output --" % |
| rebaseline_output) |
| return tests_to_rebaseline, test_results |
| |
| def get_tests_to_rebaseline(self, test_results): |
| """Filters failing tests that can be rebaselined. |
| |
| Creates a list of tests to rebaseline depending on the tests' platform- |
| specific results. In general, this will be non-ref tests that failed |
| due to a baseline mismatch (rather than crash or timeout). |
| |
| Args: |
| test_results: A dictionary of failing test results, mapping test |
| names to lists of platforms to SimpleTestResult. |
| |
| Returns: |
| A pair: A set of tests to be rebaselined, and a modified copy of |
| the test_results dictionary. The tests to be rebaselined should |
| include testharness.js tests that failed due to a baseline mismatch. |
| """ |
| new_test_results = copy.deepcopy(test_results) |
| tests_to_rebaseline = set() |
| for test_name in test_results: |
| for platforms, result in test_results[test_name].items(): |
| if self.can_rebaseline(test_name, result): |
| # We always assume rebaseline is successful, so we delete |
| # the line if Failure is the only result, otherwise we |
| # change Failure to Pass. |
| # TODO: consider rebaseline failed scenario in future |
| self.update_test_results_for_rebaselined_test( |
| new_test_results, test_name, platforms) |
| tests_to_rebaseline.add(test_name) |
| |
| return sorted(tests_to_rebaseline), new_test_results |
| |
| def update_test_results_for_rebaselined_test(self, test_results, test_name, platforms): |
| """Update test results if we successfully rebaselined a test |
| |
| After rebaseline, a failed test will now pass. And if 'PASS' is the |
| only result, we don't add one line for that. |
| We are assuming rebaseline is always successful for now. We can |
| improve this logic in future once rebaseline-cl can tell which test |
| it failed to rebaseline. |
| """ |
| result = test_results[test_name][platforms] |
| actual = set(result.actual.split(' ')) |
| if 'FAIL' in actual: |
| actual.remove('FAIL') |
| actual.add('PASS') |
| if len(actual) == 1: |
| del test_results[test_name][platforms] |
| else: |
| test_results[test_name][platforms] = SimpleTestResult(expected=result.expected, |
| actual=' '.join(sorted(actual)), |
| bug=result.bug) |
| |
| def can_rebaseline(self, test_name, result): |
| """Checks if a test can be rebaselined. |
| |
| Args: |
| test_name: The test name string. |
| result: A SimpleTestResult. |
| """ |
| if self.is_reference_test(test_name): |
| return False |
| if any(x in result.actual for x in ('CRASH', 'TIMEOUT', 'MISSING')): |
| return False |
| if self.is_webdriver_test(test_name): |
| return False |
| return True |
| |
| def is_reference_test(self, test_name): |
| """Checks whether a given test is a reference test.""" |
| return bool(self.port.reference_files(test_name)) |
| |
| def is_webdriver_test(self, test_name): |
| """Checks whether a given test is a WebDriver test.""" |
| return self.finder.is_webdriver_test_path(test_name) |
| |
| @memoized |
| def _get_try_bots(self): |
| return self.host.builders.filter_builders( |
| is_try=True, exclude_specifiers={'android'}) |