| # Copyright 2021 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Module for working with BigQuery results.""" |
| |
| import collections |
| import datetime |
| import os |
| from collections import defaultdict |
| from typing import List, Tuple |
| |
| from flake_suppressor_common import common_typing as ct |
| from flake_suppressor_common import data_types |
| from flake_suppressor_common import expectations |
| from flake_suppressor_common import tag_utils |
| |
| from typ import expectations_parser |
| |
| |
| class ResultProcessor(): |
| def __init__(self, expectations_processor: expectations.ExpectationProcessor): |
| self._expectations_processor = expectations_processor |
| |
| def AggregateResults(self, |
| results: ct.QueryJsonType) -> ct.AggregatedResultsType: |
| """Aggregates BigQuery results. |
| |
| Also filters out any results that have already been suppressed. |
| |
| Args: |
| results: Parsed JSON results from a BigQuery query. |
| |
| Returns: |
| A map in the following format: |
| { |
| 'test_suite': { |
| 'test_name': { |
| 'typ_tags_as_tuple': [ 'list', 'of', 'urls' ], |
| }, |
| }, |
| } |
| """ |
| results = self._ConvertJsonResultsToResultObjects(results) |
| results = self._FilterOutSuppressedResults(results) |
| aggregated_results = {} |
| for r in results: |
| build_url = 'http://ci.chromium.org/b/%s' % r.build_id |
| |
| build_url_list = aggregated_results.setdefault(r.suite, {}).setdefault( |
| r.test, {}).setdefault(r.tags, []) |
| build_url_list.append(build_url) |
| return aggregated_results |
| |
| def AggregateTestStatusResults( |
| self, results: ct.QueryJsonType) -> ct.AggregatedStatusResultsType: |
| """Aggregates BigQuery results. |
| |
| Also filters out any results that have already been suppressed. |
| |
| Args: |
| results: Parsed JSON results from a BigQuery query. |
| |
| Returns: |
| A map in the following format: |
| { |
| 'test_suite': { |
| 'test_name': { |
| ('typ', 'tags', 'as', 'tuple'): |
| [ (status, url, date, is_slow, typ_expectations), |
| (status, url, date, is_slow, typ_expectations) ], |
| }, |
| }, |
| } |
| """ |
| results = self._ConvertJsonResultsToResultObjects(results) |
| results = self._FilterOutSuppressedResults(results) |
| aggregated_results = defaultdict( |
| lambda: defaultdict(lambda: defaultdict(list))) |
| for r in results: |
| build_url = 'http://ci.chromium.org/b/%s' % r.build_id |
| aggregated_results[r.suite][r.test][r.tags].append( |
| ct.ResultTupleType(r.status, build_url, r.date, r.is_slow, |
| r.typ_expectations)) |
| return aggregated_results |
| |
| def _ConvertJsonResultsToResultObjects(self, results: ct.QueryJsonType |
| ) -> List[data_types.Result]: |
| """Converts JSON BigQuery results to data_types.Result objects. |
| |
| Args: |
| results: Parsed JSON results from a BigQuery query |
| |
| Returns: |
| The contents of |results| as a list of data_types.Result objects. |
| """ |
| object_results = [] |
| for r in results: |
| suite, test_name = self.GetTestSuiteAndNameFromResultDbName(r['name']) |
| build_id = r['id'].split('-')[-1] |
| typ_tags = tuple(tag_utils.TagUtils.RemoveIgnoredTags(r['typ_tags'])) |
| status = None |
| date = None |
| is_slow = None |
| typ_expectations = None |
| if 'status' in r: |
| status = r['status'] |
| if 'date' in r: |
| date = datetime.date.fromisoformat(r['date']) |
| if 'is_slow' in r: |
| is_slow = r['is_slow'] |
| if 'typ_expectations' in r: |
| typ_expectations = r['typ_expectations'] |
| object_results.append( |
| data_types.Result(suite, test_name, typ_tags, build_id, status, date, |
| is_slow, typ_expectations)) |
| return object_results |
| |
| def _FilterOutSuppressedResults(self, results: List[data_types.Result] |
| ) -> List[data_types.Result]: |
| """Filters out results that have already been suppressed in the repo. |
| |
| Args: |
| results: A list of data_types.Result objects. |
| |
| Returns: |
| |results| with any already-suppressed failures removed. |
| """ |
| # Get all the expectations. |
| origin_expectation_contents = ( |
| self._expectations_processor.GetLocalCheckoutExpectationFileContents()) |
| origin_expectations = collections.defaultdict(list) |
| for filename, contents in origin_expectation_contents.items(): |
| list_parser = expectations_parser.TaggedTestListParser(contents) |
| for e in list_parser.expectations: |
| expectation = data_types.Expectation(e.test, e.tags, e.raw_results, |
| e.reason) |
| origin_expectations[filename].append(expectation) |
| |
| # Discard any results that already have a matching expectation. |
| kept_results = [] |
| for r in results: |
| expectation_filename = ( |
| self._expectations_processor.GetExpectationFileForSuite( |
| r.suite, r.tags)) |
| expectation_filename = os.path.basename(expectation_filename) |
| should_keep = True |
| for e in origin_expectations[expectation_filename]: |
| if e.AppliesToResult(r): |
| should_keep = False |
| break |
| if should_keep: |
| kept_results.append(r) |
| |
| return kept_results |
| |
| def GetTestSuiteAndNameFromResultDbName(self, result_db_name: str |
| ) -> Tuple[str, str]: |
| raise NotImplementedError |