| # Copyright 2021 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Module for interacting with expectation files.""" |
| |
| import base64 |
| import collections |
| from datetime import timedelta, date |
| import itertools |
| import os |
| import posixpath |
| import re |
| from typing import Dict, List, Set, Tuple, Union |
| import urllib.request |
| |
| from flake_suppressor_common import common_typing as ct |
| |
| from typ import expectations_parser |
| |
| CHROMIUM_SRC_DIR = os.path.realpath( |
| os.path.join(os.path.dirname(__file__), '..', '..')) |
| GITILES_URL = 'https://chromium.googlesource.com/chromium/src/+/refs/heads/main' |
| TEXT_FORMAT_ARG = '?format=TEXT' |
| |
| TAG_GROUP_REGEX = re.compile(r'# tags: \[([^\]]*)\]', re.MULTILINE | re.DOTALL) |
| |
| TestToUrlsType = Dict[str, List[str]] |
| SuiteToTestsType = Dict[str, TestToUrlsType] |
| TagOrderedAggregateResultType = Dict[ct.TagTupleType, SuiteToTestsType] |
| |
| |
| def OverFailedBuildThreshold(failed_result_tuple_list: List[ct.ResultTupleType], |
| build_fail_total_number_threshold: int) -> bool: |
| """Check if the number of failed build in |failed_result_tuple_list| is |
| equal to or more than |build_fail_total_number_threshold|. |
| |
| Args: |
| failed_result_tuple_list: A list of ct.ResultTupleType failed test results. |
| build_fail_total_number_threshold: Threshold base on the number of failed |
| build caused by a test. |
| |
| Returns: |
| Whether number of failed build in |failed_result_tuple_list| is equal to |
| or more than |build_fail_total_number_threshold|. |
| """ |
| unique_build_ids = set() |
| for result in failed_result_tuple_list: |
| if '/' in result.build_url: |
| unique_build_ids.add(result.build_url.split('/')[-1]) |
| if len(unique_build_ids) >= build_fail_total_number_threshold: |
| return True |
| return False |
| |
| |
| def OverFailedBuildByConsecutiveDayThreshold( |
| failed_result_tuple_list: List[ct.ResultTupleType], |
| build_fail_consecutive_day_threshold: int) -> bool: |
| """Check if the max number of build fail in consecutive date |
| is equal to or more than |build_fail_consecutive_day_threshold|. |
| |
| Args: |
| failed_result_tuple_list: A list of ct.ResultTupleType failed test result. |
| build_fail_consecutive_day_threshold: Threshold base on the number of |
| consecutive days that a test caused build fail. |
| |
| Returns: |
| Whether the max number of build fail in consecutive date |
| is equal to or more than |build_fail_consecutive_day_threshold|. |
| """ |
| dates = {t.date: False for t in failed_result_tuple_list} |
| |
| for cur_date, is_checked in dates.items(): |
| # A beginning point. |
| if not is_checked: |
| count = 1 |
| |
| while count < build_fail_consecutive_day_threshold: |
| new_date = cur_date + timedelta(days=count) |
| if new_date in dates: |
| count += 1 |
| # Mark checked date. |
| dates[new_date] = True |
| else: |
| break |
| |
| if count >= build_fail_consecutive_day_threshold: |
| return True |
| |
| return False |
| |
| |
| def FailedBuildWithinRecentDayThreshold( |
| failed_result_tuple_list: List[ct.ResultTupleType], |
| build_fail_recent_day_threshold: int) -> bool: |
| """Check if there are any failed builds within the most |
| recent |build_fail_latest_day_threshold| days. |
| |
| Args: |
| failed_result_tuple_list: A list of ct.ResultTupleType failed test result. |
| build_fail_recent_day_threshold: Threshold base on the recent day range |
| that the test caused build fail. |
| |
| Returns: |
| Whether the test caused build fail within the recent day. |
| """ |
| recent_check_day = date.today() - timedelta( |
| days=build_fail_recent_day_threshold) |
| for test in failed_result_tuple_list: |
| if test.date >= recent_check_day: |
| return True |
| return False |
| |
| |
| class ExpectationProcessor(): |
| # pylint: disable=too-many-locals |
| def IterateThroughResultsForUser(self, result_map: ct.AggregatedResultsType, |
| group_by_tags: bool, |
| include_all_tags: bool) -> None: |
| """Iterates over |result_map| for the user to provide input. |
| |
| For each unique result, user will be able to decide whether to ignore it (do |
| nothing), mark as flaky (add RetryOnFailure expectation), or mark as failing |
| (add Failure expectation). If the latter two are chosen, they can also |
| associate a bug with the new expectation. |
| |
| Args: |
| result_map: Aggregated query results from results.AggregateResults to |
| iterate over. |
| group_by_tags: A boolean denoting whether to attempt to group expectations |
| by tags or not. If True, expectations will be added after an existing |
| expectation whose tags are the largest subset of the produced tags. If |
| False, new expectations will be appended to the end of the file. |
| include_all_tags: A boolean denoting whether all tags should be used for |
| expectations or only the most specific ones. |
| """ |
| typ_tag_ordered_result_map = self._ReorderMapByTypTags(result_map) |
| for suite, test_map in result_map.items(): |
| if self.IsSuiteUnsupported(suite): |
| continue |
| for test, tag_map in test_map.items(): |
| for typ_tags, build_url_list in tag_map.items(): |
| |
| print('') |
| print('Suite: %s' % suite) |
| print('Test: %s' % test) |
| print('Configuration:\n %s' % '\n '.join(typ_tags)) |
| print('Failed builds:\n %s' % '\n '.join(build_url_list)) |
| |
| other_failures_for_test = self.FindFailuresInSameTest( |
| result_map, suite, test, typ_tags) |
| if other_failures_for_test: |
| print('Other failures in same test found on other configurations') |
| for (tags, failure_count) in other_failures_for_test: |
| print(' %d failures on %s' % (failure_count, ' '.join(tags))) |
| |
| other_failures_for_config = self.FindFailuresInSameConfig( |
| typ_tag_ordered_result_map, suite, test, typ_tags) |
| if other_failures_for_config: |
| print('Other failures on same configuration found in other tests') |
| for (name, failure_count) in other_failures_for_config: |
| print(' %d failures in %s' % (failure_count, name)) |
| |
| expected_result, bug = self.PromptUserForExpectationAction() |
| if not expected_result: |
| continue |
| |
| self.ModifyFileForResult(suite, test, typ_tags, bug, expected_result, |
| group_by_tags, include_all_tags) |
| |
| # pylint: enable=too-many-locals |
| |
| # pylint: disable=too-many-locals,too-many-arguments |
| def IterateThroughResultsWithThresholds( |
| self, result_map: ct.AggregatedResultsType, group_by_tags: bool, |
| result_counts: ct.ResultCountType, ignore_threshold: float, |
| flaky_threshold: float, include_all_tags: bool) -> None: |
| """Iterates over |result_map| and generates expectations based off |
| thresholds. |
| |
| Args: |
| result_map: Aggregated query results from results.AggregateResults to |
| iterate over. |
| group_by_tags: A boolean denoting whether to attempt to group expectations |
| by tags or not. If True, expectations will be added after an existing |
| expectation whose tags are the largest subset of the produced tags. If |
| False, new expectations will be appended to the end of the file. |
| result_counts: A dict in the format output by queries.GetResultCounts. |
| ignore_threshold: A float containing the fraction of failed tests under |
| which failures will be ignored. |
| flaky_threshold: A float containing the fraction of failed tests under |
| which failures will be suppressed with RetryOnFailure and above which |
| will be suppressed with Failure. |
| include_all_tags: A boolean denoting whether all tags should be used for |
| expectations or only the most specific ones. |
| """ |
| assert isinstance(ignore_threshold, float) |
| assert isinstance(flaky_threshold, float) |
| for suite, test_map in result_map.items(): |
| if self.IsSuiteUnsupported(suite): |
| continue |
| for test, tag_map in test_map.items(): |
| for typ_tags, build_url_list in tag_map.items(): |
| failure_count = len(build_url_list) |
| total_count = result_counts[typ_tags][test] |
| fraction = failure_count / total_count |
| if fraction < ignore_threshold: |
| continue |
| expected_result = self.GetExpectedResult(fraction, flaky_threshold) |
| if expected_result: |
| self.ModifyFileForResult(suite, test, typ_tags, '', expected_result, |
| group_by_tags, include_all_tags) |
| |
| def CreateExpectationsForAllResults( |
| self, result_map: ct.AggregatedStatusResultsType, group_by_tags: bool, |
| include_all_tags: bool, build_fail_total_number_threshold: int, |
| build_fail_consecutive_day_threshold: int, |
| build_fail_recent_day_threshold: int) -> None: |
| """Iterates over |result_map|, selects tests that hit all |
| build-fail*-thresholds and adds expectations for their results. Same |
| test in all builders that caused build fail must be over all threshold |
| requirement. |
| |
| Args: |
| result_map: Aggregated query results from results.AggregateResults to |
| iterate over. |
| group_by_tags: A boolean denoting whether to attempt to group expectations |
| by tags or not. If True, expectations will be added after an existing |
| expectation whose tags are the largest subset of the produced tags. If |
| False, new expectations will be appended to the end of the file. |
| include_all_tags: A boolean denoting whether all tags should be used for |
| expectations or only the most specific ones. |
| build_fail_total_number_threshold: Threshold based on the number of |
| failed builds caused by a test. Add to the expectations, if actual |
| is equal to or more than this threshold. All build-fail*-thresholds |
| must be hit in order for a test to actually be suppressed. |
| build_fail_consecutive_day_threshold: Threshold based on the number of |
| consecutive days that a test caused build fail. Add to the |
| expectations, if the consecutive days that it caused build fail |
| are equal to or more than this. All build-fail*-thresholds |
| must be hit in order for a test to actually be suppressed. |
| build_fail_recent_day_threshold: How many days worth of recent builds |
| to check for non-hidden failures. A test will be suppressed if |
| it has non-hidden failures within this time span. All |
| build-fail*-thresholds must be hit in order for a test to actually |
| be suppressed. |
| """ |
| for suite, test_map in result_map.items(): |
| if self.IsSuiteUnsupported(suite): |
| continue |
| for test, tag_map in test_map.items(): |
| # Same test in all builders that caused build fail must be over all |
| # threshold requirement. |
| all_results = list(itertools.chain(*tag_map.values())) |
| if (not OverFailedBuildThreshold(all_results, |
| build_fail_total_number_threshold) |
| or not OverFailedBuildByConsecutiveDayThreshold( |
| all_results, build_fail_consecutive_day_threshold)): |
| continue |
| for typ_tags, result_tuple_list in tag_map.items(): |
| if not FailedBuildWithinRecentDayThreshold( |
| result_tuple_list, build_fail_recent_day_threshold): |
| continue |
| status = set() |
| for test_result in result_tuple_list: |
| # Should always add a pass to all flaky web tests in |
| # TestsExpectation that have passed runs. |
| status.add('Pass') |
| if test_result.status == ct.ResultStatus.CRASH: |
| status.add('Crash') |
| elif test_result.status == ct.ResultStatus.FAIL: |
| status.add('Failure') |
| elif test_result.status == ct.ResultStatus.ABORT: |
| status.add('Timeout') |
| if status: |
| status_list = list(status) |
| status_list.sort() |
| self.ModifyFileForResult(suite, test, typ_tags, '', |
| ' '.join(status_list), group_by_tags, |
| include_all_tags) |
| |
| # pylint: enable=too-many-locals,too-many-arguments |
| |
| def FindFailuresInSameTest(self, result_map: ct.AggregatedResultsType, |
| target_suite: str, target_test: str, |
| target_typ_tags: ct.TagTupleType |
| ) -> List[Tuple[ct.TagTupleType, int]]: |
| """Finds all other failures that occurred in the given test. |
| |
| Ignores the failures for the test on the same configuration. |
| |
| Args: |
| result_map: Aggregated query results from results.AggregateResults. |
| target_suite: A string containing the test suite being checked. |
| target_test: A string containing the target test case being checked. |
| target_typ_tags: A tuple of strings containing the typ tags that the |
| failure took place on. |
| |
| Returns: |
| A list of tuples (typ_tags, count). |typ_tags| is a list of strings |
| defining a configuration the specified test failed on. |count| is how many |
| times the test failed on that configuration. |
| """ |
| assert isinstance(target_typ_tags, tuple) |
| other_failures = [] |
| tag_map = result_map.get(target_suite, {}).get(target_test, {}) |
| for typ_tags, build_url_list in tag_map.items(): |
| if typ_tags == target_typ_tags: |
| continue |
| other_failures.append((typ_tags, len(build_url_list))) |
| return other_failures |
| |
| def FindFailuresInSameConfig( |
| self, typ_tag_ordered_result_map: TagOrderedAggregateResultType, |
| target_suite: str, target_test: str, |
| target_typ_tags: ct.TagTupleType) -> List[Tuple[str, int]]: |
| """Finds all other failures that occurred on the given configuration. |
| |
| Ignores the failures for the given test on the given configuration. |
| |
| Args: |
| typ_tag_ordered_result_map: Aggregated query results from |
| results.AggregateResults that have been reordered using |
| _ReorderMapByTypTags. |
| target_suite: A string containing the test suite the original failure was |
| found in. |
| target_test: A string containing the test case the original failure was |
| found in. |
| target_typ_tags: A tuple of strings containing the typ tags defining the |
| configuration to find failures for. |
| |
| Returns: |
| A list of tuples (full_name, count). |full_name| is a string containing a |
| test suite and test case concatenated together. |count| is how many times |
| |full_name| failed on the configuration specified by |target_typ_tags|. |
| """ |
| assert isinstance(target_typ_tags, tuple) |
| other_failures = [] |
| suite_map = typ_tag_ordered_result_map.get(target_typ_tags, {}) |
| for suite, test_map in suite_map.items(): |
| for test, build_url_list in test_map.items(): |
| if suite == target_suite and test == target_test: |
| continue |
| full_name = '%s.%s' % (suite, test) |
| other_failures.append((full_name, len(build_url_list))) |
| return other_failures |
| |
| def _ReorderMapByTypTags(self, result_map: ct.AggregatedResultsType |
| ) -> TagOrderedAggregateResultType: |
| """Rearranges|result_map| to use typ tags as the top level keys. |
| |
| Args: |
| result_map: Aggregated query results from results.AggregateResults |
| |
| Returns: |
| A dict containing the same contents as |result_map|, but in the following |
| format: |
| { |
| typ_tags (tuple of str): { |
| suite (str): { |
| test (str): build_url_list (list of str), |
| }, |
| }, |
| } |
| """ |
| reordered_map = {} |
| for suite, test_map in result_map.items(): |
| for test, tag_map in test_map.items(): |
| for typ_tags, build_url_list in tag_map.items(): |
| reordered_map.setdefault(typ_tags, |
| {}).setdefault(suite, |
| {})[test] = build_url_list |
| return reordered_map |
| |
| def PromptUserForExpectationAction( |
| self) -> Union[Tuple[str, str], Tuple[None, None]]: |
| """Prompts the user on what to do to handle a failure. |
| |
| Returns: |
| A tuple (expected_result, bug). |expected_result| is a string containing |
| the expected result to use for the expectation, e.g. RetryOnFailure. |bug| |
| is a string containing the bug to use for the expectation. If the user |
| chooses to ignore the failure, both will be None. Otherwise, both are |
| filled, although |bug| may be an empty string if no bug is provided. |
| """ |
| prompt = ('How should this failure be handled? (i)gnore/(r)etry on ' |
| 'failure/(f)ailure: ') |
| valid_inputs = ['f', 'i', 'r'] |
| response = input(prompt).lower() |
| while response not in valid_inputs: |
| print('Invalid input, valid inputs are %s' % (', '.join(valid_inputs))) |
| response = input(prompt).lower() |
| |
| if response == 'i': |
| return (None, None) |
| expected_result = 'RetryOnFailure' if response == 'r' else 'Failure' |
| |
| prompt = ('What is the bug URL that should be associated with this ' |
| 'expectation? E.g. crbug.com/1234. ') |
| response = input(prompt) |
| return (expected_result, response) |
| |
| # pylint: disable=too-many-locals,too-many-arguments |
| def ModifyFileForResult(self, suite: str, test: str, |
| typ_tags: ct.TagTupleType, bug: str, |
| expected_result: str, group_by_tags: bool, |
| include_all_tags: bool) -> None: |
| """Adds an expectation to the appropriate expectation file. |
| |
| Args: |
| suite: A string containing the suite the failure occurred in. |
| test: A string containing the test case the failure occurred in. |
| typ_tags: A tuple of strings containing the typ tags the test produced. |
| bug: A string containing the bug to associate with the new expectation. |
| expected_result: A string containing the expected result to use for the |
| new expectation, e.g. RetryOnFailure. |
| group_by_tags: A boolean denoting whether to attempt to group expectations |
| by tags or not. If True, expectations will be added after an existing |
| expectation whose tags are the largest subset of the produced tags. If |
| False, new expectations will be appended to the end of the file. |
| include_all_tags: A boolean denoting whether all tags should be used for |
| expectations or only the most specific ones. |
| """ |
| expectation_file = self.GetExpectationFileForSuite(suite, typ_tags) |
| if not include_all_tags: |
| typ_tags = self.FilterToMostSpecificTypTags(typ_tags, expectation_file) |
| bug = '%s ' % bug if bug else bug |
| |
| def AppendExpectationToEnd(): |
| expectation_line = '%s[ %s ] %s [ %s ]\n' % (bug, ' '.join( |
| self.ProcessTypTagsBeforeWriting(typ_tags)), test, expected_result) |
| with open(expectation_file, 'a') as outfile: |
| outfile.write(expectation_line) |
| |
| if group_by_tags: |
| insertion_line, best_matching_tags = ( |
| self.FindBestInsertionLineForExpectation(typ_tags, expectation_file)) |
| if insertion_line == -1: |
| AppendExpectationToEnd() |
| else: |
| # If we've already filtered tags, then use those instead of the "best |
| # matching" ones. |
| tags_to_use = best_matching_tags |
| if not include_all_tags: |
| tags_to_use = typ_tags |
| # enumerate starts at 0 but line numbers start at 1. |
| insertion_line -= 1 |
| tags_to_use = list(self.ProcessTypTagsBeforeWriting(tags_to_use)) |
| tags_to_use.sort() |
| expectation_line = '%s[ %s ] %s [ %s ]\n' % (bug, ' '.join(tags_to_use), |
| test, expected_result) |
| with open(expectation_file) as infile: |
| input_contents = infile.read() |
| output_contents = '' |
| for lineno, line in enumerate(input_contents.splitlines(True)): |
| output_contents += line |
| if lineno == insertion_line: |
| output_contents += expectation_line |
| with open(expectation_file, 'w') as outfile: |
| outfile.write(output_contents) |
| else: |
| AppendExpectationToEnd() |
| |
| # pylint: enable=too-many-locals,too-many-arguments |
| |
| # pylint: disable=too-many-locals |
| def FilterToMostSpecificTypTags(self, typ_tags: ct.TagTupleType, |
| expectation_file: str) -> ct.TagTupleType: |
| """Filters |typ_tags| to the most specific set. |
| |
| Assumes that the tags in |expectation_file| are ordered from least specific |
| to most specific within each tag group. |
| |
| Args: |
| typ_tags: A tuple of strings containing the typ tags the test produced. |
| expectation_file: A string containing a filepath pointing to the |
| expectation file to filter tags with. |
| |
| Returns: |
| A tuple containing the contents of |typ_tags| with only the most specific |
| tag from each tag group remaining. |
| """ |
| with open(expectation_file) as infile: |
| contents = infile.read() |
| |
| tag_groups = self.GetTagGroups(contents) |
| num_matches = 0 |
| tags_in_same_group = collections.defaultdict(list) |
| for tag in typ_tags: |
| for index, tag_group in enumerate(tag_groups): |
| if tag in tag_group: |
| tags_in_same_group[index].append(tag) |
| num_matches += 1 |
| break |
| if num_matches != len(typ_tags): |
| all_tags = set() |
| for group in tag_groups: |
| all_tags |= set(group) |
| raise RuntimeError('Found tags not in expectation file: %s' % |
| ' '.join(set(typ_tags) - all_tags)) |
| |
| filtered_tags = [] |
| for index, tags in tags_in_same_group.items(): |
| if len(tags) == 1: |
| filtered_tags.append(tags[0]) |
| else: |
| tag_group = tag_groups[index] |
| best_index = -1 |
| for t in tags: |
| i = tag_group.index(t) |
| if i > best_index: |
| best_index = i |
| filtered_tags.append(tag_group[best_index]) |
| |
| # Sort to keep order consistent with what we were given. |
| filtered_tags.sort() |
| return tuple(filtered_tags) |
| |
| # pylint: enable=too-many-locals |
| |
| def FindBestInsertionLineForExpectation(self, typ_tags: ct.TagTupleType, |
| expectation_file: str |
| ) -> Tuple[int, Set[str]]: |
| """Finds the best place to insert an expectation when grouping by tags. |
| |
| Args: |
| typ_tags: A tuple of strings containing typ tags that were produced by the |
| failing test. |
| expectation_file: A string containing a filepath to the expectation file |
| to use. |
| |
| Returns: |
| A tuple (insertion_line, best_matching_tags). |insertion_line| is an int |
| specifying the line number to insert the expectation into. |
| |best_matching_tags| is a set containing the tags of an existing |
| expectation that was found to be the closest match. If no appropriate |
| line is found, |insertion_line| is -1 and |best_matching_tags| is empty. |
| """ |
| best_matching_tags = set() |
| best_insertion_line = -1 |
| with open(expectation_file) as f: |
| content = f.read() |
| list_parser = expectations_parser.TaggedTestListParser(content) |
| for e in list_parser.expectations: |
| expectation_tags = e.tags |
| if not expectation_tags.issubset(typ_tags): |
| continue |
| if len(expectation_tags) > len(best_matching_tags): |
| best_matching_tags = expectation_tags |
| best_insertion_line = e.lineno |
| elif len(expectation_tags) == len(best_matching_tags): |
| if best_insertion_line < e.lineno: |
| best_insertion_line = e.lineno |
| return best_insertion_line, best_matching_tags |
| |
| def GetOriginExpectationFileContents(self) -> Dict[str, str]: |
| """Gets expectation file contents from origin/main. |
| |
| Returns: |
| A dict of expectation file name (str) -> expectation file contents (str) |
| that are available on origin/main. File paths are relative to the |
| Chromium src dir and are OS paths. |
| """ |
| # Get the path to the expectation file directory in gitiles, i.e. the POSIX |
| # path relative to the Chromium src directory. |
| origin_file_contents = {} |
| expectation_files = self.ListOriginExpectationFiles() |
| for f in expectation_files: |
| filepath_posix = f.replace(os.sep, '/') |
| origin_filepath_url = posixpath.join(GITILES_URL, |
| filepath_posix) + TEXT_FORMAT_ARG |
| response = urllib.request.urlopen(origin_filepath_url).read() |
| decoded_text = base64.b64decode(response).decode('utf-8') |
| # After the URL access maintain all the paths as os paths. |
| origin_file_contents[f] = decoded_text |
| |
| return origin_file_contents |
| |
| def GetLocalCheckoutExpectationFileContents(self) -> Dict[str, str]: |
| """Gets expectation file contents from the local checkout. |
| |
| Returns: |
| A dict of expectation file name (str) -> expectation file contents (str) |
| that are available from the local checkout. File paths are relative to |
| the Chromium src dir and are OS paths. |
| """ |
| local_file_contents = {} |
| expectation_files = self.ListLocalCheckoutExpectationFiles() |
| for f in expectation_files: |
| absolute_filepath = os.path.join(CHROMIUM_SRC_DIR, f) |
| with open(absolute_filepath) as infile: |
| local_file_contents[f] = infile.read() |
| return local_file_contents |
| |
| def AssertCheckoutIsUpToDate(self) -> None: |
| """Confirms that the local checkout's expectations are up to date.""" |
| origin_file_contents = self.GetOriginExpectationFileContents() |
| local_file_contents = self.GetLocalCheckoutExpectationFileContents() |
| if origin_file_contents != local_file_contents: |
| raise RuntimeError( |
| 'Local Chromium checkout expectations are out of date. Please ' |
| 'perform a `git pull`.') |
| |
| def GetExpectationFileForSuite(self, suite: str, |
| typ_tags: ct.TagTupleType) -> str: |
| """Finds the correct expectation file for the given suite. |
| |
| Args: |
| suite: A string containing the test suite to look for. |
| typ_tags: A tuple of strings containing typ tags that were produced by |
| the failing test. |
| |
| Returns: |
| A string containing a filepath to the correct expectation file for |
| |suite|and |typ_tags|. |
| """ |
| raise NotImplementedError |
| |
| def ListGitilesDirectory(self, origin_dir: str) -> List[str]: |
| """Gets the list of all files from origin/main under origin_dir. |
| |
| Args: |
| origin_dir: A string containing the path to the directory containing |
| expectation files. Path is relative to the Chromium src dir. |
| |
| Returns: |
| A list of filename strings under origin_dir. |
| """ |
| origin_dir_url = posixpath.join(GITILES_URL, origin_dir) + TEXT_FORMAT_ARG |
| response = urllib.request.urlopen(origin_dir_url).read() |
| # Response is a base64 encoded, newline-separated list of files in the |
| # directory in the format: `mode file_type hash name` |
| files = [] |
| decoded_text = base64.b64decode(response).decode('utf-8') |
| for line in decoded_text.splitlines(): |
| files.append(line.split()[-1]) |
| return files |
| |
| def IsSuiteUnsupported(self, suite: str) -> bool: |
| raise NotImplementedError |
| |
| def ListLocalCheckoutExpectationFiles(self) -> List[str]: |
| """Finds the list of all expectation files from the local checkout. |
| |
| Returns: |
| A list of strings containing relative file paths to expectation files. |
| OS paths relative to Chromium src dir are returned. |
| """ |
| raise NotImplementedError |
| |
| def ListOriginExpectationFiles(self) -> List[str]: |
| """Finds the list of all expectation files from origin/main. |
| |
| Returns: |
| A list of strings containing relative file paths to expectation files. |
| OS paths are relative to Chromium src directory. |
| """ |
| raise NotImplementedError |
| |
| def GetTagGroups(self, contents: str) -> List[List[str]]: |
| tag_groups = [] |
| for match in TAG_GROUP_REGEX.findall(contents): |
| tag_groups.append(match.strip().replace('#', '').split()) |
| return tag_groups |
| |
| def GetExpectedResult(self, fraction: float, flaky_threshold: float) -> str: |
| raise NotImplementedError |
| |
| def ProcessTypTagsBeforeWriting(self, |
| typ_tags: ct.TagTupleType) -> ct.TagTupleType: |
| return typ_tags |