blob: 649d3f44ea7214ead4fc78f5856fcadc5eb4f621 [file] [log] [blame]
# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module for working with BigQuery results."""
import collections
import os
from typing import List, Tuple
from flake_suppressor_common import common_typing as ct
from flake_suppressor_common import data_types
from flake_suppressor_common import expectations
from flake_suppressor_common import tag_utils
from typ import expectations_parser
class ResultProcessor():
def __init__(self, expectations_processor: expectations.ExpectationProcessor):
self._expectations_processor = expectations_processor
def AggregateResults(self,
results: ct.QueryJsonType) -> ct.AggregatedResultsType:
"""Aggregates BigQuery results.
Also filters out any results that have already been suppressed.
Args:
results: Parsed JSON results from a BigQuery query.
Returns:
A map in the following format:
{
'test_suite': {
'test_name': {
'typ_tags_as_tuple': [ 'list', 'of', 'urls' ],
},
},
}
"""
results = self._ConvertJsonResultsToResultObjects(results)
results = self._FilterOutSuppressedResults(results)
aggregated_results = {}
for r in results:
build_url = 'http://ci.chromium.org/b/%s' % r.build_id
build_url_list = aggregated_results.setdefault(r.suite, {}).setdefault(
r.test, {}).setdefault(r.tags, [])
build_url_list.append(build_url)
return aggregated_results
def _ConvertJsonResultsToResultObjects(self, results: ct.QueryJsonType
) -> List[data_types.Result]:
"""Converts JSON BigQuery results to data_types.Result objects.
Args:
results: Parsed JSON results from a BigQuery query
Returns:
The contents of |results| as a list of data_types.Result objects.
"""
object_results = []
for r in results:
suite, test_name = self.GetTestSuiteAndNameFromResultDbName(r['name'])
build_id = r['id'].split('-')[-1]
typ_tags = tuple(tag_utils.TagUtils.RemoveIgnoredTags(r['typ_tags']))
object_results.append(
data_types.Result(suite, test_name, typ_tags, build_id))
return object_results
def _FilterOutSuppressedResults(self, results: List[data_types.Result]
) -> List[data_types.Result]:
"""Filters out results that have already been suppressed in the repo.
Args:
results: A list of data_types.Result objects.
Returns:
|results| with any already-suppressed failures removed.
"""
# Get all the expectations.
origin_expectation_contents = (
self._expectations_processor.GetLocalCheckoutExpectationFileContents())
origin_expectations = collections.defaultdict(list)
for filename, contents in origin_expectation_contents.items():
list_parser = expectations_parser.TaggedTestListParser(contents)
for e in list_parser.expectations:
expectation = data_types.Expectation(e.test, e.tags, e.raw_results,
e.reason)
origin_expectations[filename].append(expectation)
# Discard any results that already have a matching expectation.
kept_results = []
for r in results:
expectation_filename = (
self._expectations_processor.GetExpectationFileForSuite(
r.suite, r.tags))
expectation_filename = os.path.basename(expectation_filename)
should_keep = True
for e in origin_expectations[expectation_filename]:
if e.AppliesToResult(r):
should_keep = False
break
if should_keep:
kept_results.append(r)
return kept_results
def GetTestSuiteAndNameFromResultDbName(self, result_db_name: str
) -> Tuple[str, str]:
raise NotImplementedError