blob: 58f0ca653baac3771b09b35a6ccfbab84d01ef2c [file] [log] [blame]
# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Methods related to querying the ResultDB BigQuery tables."""
import logging
import time
from typing import Collection, Dict, Generator, Iterable, List, Optional, Tuple
# vpython-provided modules.
# pylint: disable=import-error
from google.cloud import bigquery
from google.cloud import bigquery_storage
import pandas
# pylint: enable=import-error
# //third_party/catapult/third_party/typ imports.
from typ import expectations_parser
from typ import json_results
# //testing imports.
from unexpected_passes_common import constants
from unexpected_passes_common import data_types
from unexpected_passes_common import expectations
DEFAULT_NUM_SAMPLES = 100
# Subquery for getting all try builds that were used for CL submission. 30 days
# is chosen because the ResultDB tables we pull data from only keep data around
# for 30 days.
PARTITIONED_SUBMITTED_BUILDS_TEMPLATE = """\
SELECT
CONCAT("build-", CAST(unnested_builds.id AS STRING)) as id
FROM
`commit-queue.{project_view}.attempts`,
UNNEST(builds) as unnested_builds,
UNNEST(gerrit_changes) as unnested_changes
WHERE
unnested_builds.host = "cr-buildbucket.appspot.com"
AND unnested_changes.submit_status = "SUCCESS"
AND start_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),
INTERVAL 30 DAY)"""
QueryResult = pandas.Series
class BigQueryQuerier:
"""Class to handle all BigQuery queries for a script invocation."""
def __init__(self, suite: Optional[str], project: str, num_samples: int,
keep_unmatched_results: bool):
"""
Args:
suite: A string containing the name of the suite that is being queried
for. Can be None if there is no differentiation between different
suites.
project: A string containing the billing project to use for BigQuery.
num_samples: An integer containing the number of builds to pull results
from.
keep_unmatched_results: Whether to store and return unmatched results
for debugging purposes.
"""
self._suite = suite
self._project = project
self._num_samples = num_samples or DEFAULT_NUM_SAMPLES
self._keep_unmatched_results = keep_unmatched_results
assert self._num_samples > 0
def FillExpectationMapForBuilders(
self, expectation_map: data_types.TestExpectationMap,
builders: Collection[data_types.BuilderEntry]
) -> Dict[str, data_types.ResultListType]:
"""Fills |expectation_map| with results from |builders|.
Args:
expectation_map: A data_types.TestExpectationMap. Will be modified
in-place.
builders: An iterable of data_types.BuilderEntry containing the builders
to query.
Returns:
A dict containing any results that were retrieved that did not have a
matching expectation in |expectation_map| in the following format:
{
|builder_type|:|builder_name| (str): [
result1 (data_types.Result),
result2 (data_types.Result),
...
],
}
"""
start_time = time.time()
logging.debug('Starting to fill expectation map for %d builders',
len(builders))
assert isinstance(expectation_map, data_types.TestExpectationMap)
# Ensure that all the builders are of the same type since we make some
# assumptions about that later on.
assert builders
builder_type = None
for b in builders:
if builder_type is None:
builder_type = b.builder_type
else:
assert b.builder_type == builder_type
internal_statuses = set()
for b in builders:
internal_statuses.add(b.is_internal_builder)
matched_builders = set()
all_unmatched_results = {}
for internal in internal_statuses:
for builder_name, results, expectation_files in (
self.GetBuilderGroupedQueryResults(builder_type, internal)):
matching_builder = None
for b in builders:
if b.name == builder_name and b.is_internal_builder == internal:
matching_builder = b
break
if not matching_builder:
logging.warning(
'Did not find a matching builder for name %s and '
'internal status %s. This is normal if the builder '
'is no longer running tests (e.g. it was '
'experimental).', builder_name, internal)
continue
if matching_builder in matched_builders:
raise RuntimeError(
f'Got query result batches matched to builder '
f'{matching_builder} twice - this is indicative of a malformed '
f'query returning results that are not sorted by builder')
matched_builders.add(matching_builder)
prefixed_builder_name = '%s/%s:%s' % (matching_builder.project,
matching_builder.builder_type,
matching_builder.name)
unmatched_results = expectation_map.AddResultList(
prefixed_builder_name, results, expectation_files)
if self._keep_unmatched_results:
if unmatched_results:
all_unmatched_results[prefixed_builder_name] = unmatched_results
else:
logging.info('Dropping %d unmatched results', len(unmatched_results))
logging.debug('Filling expectation map took %f', time.time() - start_time)
return all_unmatched_results
def GetBuilderGroupedQueryResults(
self, builder_type: str, is_internal: bool
) -> Generator[Tuple[str, data_types.ResultListType, Optional[List[str]]],
None, None]:
"""Generates results for all relevant builders grouped by builder name.
Args:
builder_type: Whether the builders are CI or try builders.
is_internal: Whether the builders are internal.
Yields:
A tuple (builder_name, results). |builder_name| is a string specifying the
builder that |results| came from. |results| is a data_types.ResultListType
containing all the results for |builder_name|.
"""
if builder_type == constants.BuilderTypes.CI:
if is_internal:
query = self._GetInternalCiQuery()
else:
query = self._GetPublicCiQuery()
elif builder_type == constants.BuilderTypes.TRY:
if is_internal:
query = self._GetInternalTryQuery()
else:
query = self._GetPublicTryQuery()
else:
raise RuntimeError(f'Unknown builder type {builder_type}')
current_builder = None
rows_for_builder = []
for row in self._GetSeriesForQuery(query):
if current_builder is None:
current_builder = row.builder_name
if row.builder_name != current_builder:
results_for_builder, expectation_files = self._ProcessRowsForBuilder(
rows_for_builder)
# The processing should have cleared out all the stored rows.
assert not rows_for_builder
yield current_builder, results_for_builder, expectation_files
current_builder = row.builder_name
rows_for_builder.append(row)
if current_builder is None:
logging.warning(
'Did not get any results for builder type %s and internal status %s. '
'Depending on where tests are run and how frequently trybots are '
'used for submission, this may be benign.', builder_type, is_internal)
if current_builder is not None and rows_for_builder:
results_for_builder, expectation_files = self._ProcessRowsForBuilder(
rows_for_builder)
assert not rows_for_builder
yield current_builder, results_for_builder, expectation_files
def _GetSeriesForQuery(self,
query: str) -> Generator[pandas.Series, None, None]:
"""Generates results for |query|.
Args:
query: A string containing the BigQuery query to run.
Yields:
A pandas.Series object for each row returned by the query. Columns can be
accessed directly as attributes.
"""
client = bigquery.Client(
project=self._project,
default_query_job_config=bigquery.QueryJobConfig(use_legacy_sql=False))
job = client.query(query)
row_iterator = job.result()
# Using a Dataframe iterator instead of directly using |row_iterator| allows
# us to use the BigQuery Storage API, which results in ~10x faster query
# result retrieval at the cost of a few more dependencies.
dataframe_iterator = row_iterator.to_dataframe_iterable(
bigquery_storage.BigQueryReadClient())
for df in dataframe_iterator:
for _, row in df.iterrows():
yield row
def _GetPublicCiQuery(self) -> str:
"""Returns the BigQuery query for public CI builder results."""
raise NotImplementedError()
def _GetInternalCiQuery(self) -> str:
"""Returns the BigQuery query for internal CI builder results."""
raise NotImplementedError()
def _GetPublicTryQuery(self) -> str:
"""Returns the BigQuery query for public try builder results."""
raise NotImplementedError()
def _GetInternalTryQuery(self) -> str:
"""Returns the BigQuery query for internal try builder results."""
raise NotImplementedError()
def _ProcessRowsForBuilder(
self, rows: List[QueryResult]
) -> Tuple[data_types.ResultListType, Optional[List[str]]]:
"""Processes rows from a query into data_types.Result representations.
Args:
rows: A list of rows from a BigQuery query.
Returns:
A tuple (results, expectation_files). |results| is a list of
data_types.Result objects. |expectation_files| is the list of expectation
files that are used by the tests in |results|, but can be None to specify
that all expectation files should be considered.
"""
# It's possible that a builder runs multiple versions of a test with
# different expectation files for each version. So, find a result for each
# unique step and get the expectation files from all of them.
results_for_each_step = {}
for r in rows:
step_name = r.step_name
if step_name not in results_for_each_step:
results_for_each_step[step_name] = r
expectation_files = set()
for r in results_for_each_step.values():
# None is a special value indicating "use all expectation files", so
# handle that.
ef = self._GetRelevantExpectationFilesForQueryResult(r)
if ef is None:
expectation_files = None
break
expectation_files |= set(ef)
if expectation_files is not None:
expectation_files = list(expectation_files)
# The query result list is potentially very large, so reduce the list as we
# iterate over it instead of using a standard for/in so that we don't
# temporarily end up with a ~2x increase in memory.
results = []
while rows:
r = rows.pop()
if self._ShouldSkipOverResult(r):
continue
results.append(self._ConvertBigQueryRowToResultObject(r))
return results, expectation_files
def _ConvertBigQueryRowToResultObject(self,
row: QueryResult) -> data_types.Result:
"""Converts a single BigQuery result row to a data_types.Result.
Args:
row: A single row from BigQuery.
Returns:
A data_types.Result object containing the information from |row|.
"""
build_id = _StripPrefixFromBuildId(row.id)
test_name = self._StripPrefixFromTestId(row.test_id)
actual_result = _ConvertActualResultToExpectationFileFormat(row.status)
tags = expectations.GetInstance().FilterToKnownTags(row.typ_tags)
step = row.step_name
return data_types.Result(test_name, tags, actual_result, step, build_id)
def _GetRelevantExpectationFilesForQueryResult(
self, query_result: QueryResult) -> Optional[Iterable[str]]:
"""Gets the relevant expectation file names for a given query result.
Args:
query_result: An object representing a row/result from a query. Columns
can be accessed via .column_name.
Returns:
An iterable of strings containing expectation file names that are
relevant to |query_result|, or None if all expectation files should be
considered relevant.
"""
raise NotImplementedError()
# Overridden by subclasses.
# pylint: disable=no-self-use
def _ShouldSkipOverResult(self, result: QueryResult) -> bool:
"""Whether |result| should be ignored and skipped over.
Args:
result: A dict containing a single BigQuery result row.
Returns:
True if the result should be skipped over/ignored, otherwise False.
"""
del result
return False
# pylint: enable=no-self-use
def _StripPrefixFromTestId(self, test_id: str) -> str:
"""Strips the prefix from a test ID, leaving only the test case name.
Args:
test_id: A string containing a full ResultDB test ID, e.g.
ninja://target/directory.suite.class.test_case
Returns:
A string containing the test cases name extracted from |test_id|.
"""
raise NotImplementedError()
def _StripPrefixFromBuildId(build_id: str) -> str:
# Build IDs provided by ResultDB are prefixed with "build-"
split_id = build_id.split('-')
assert len(split_id) == 2
return split_id[-1]
def _ConvertActualResultToExpectationFileFormat(actual_result: str) -> str:
# Web tests use ResultDB's ABORT value for both test timeouts and device
# failures, but Abort is not defined in typ. So, map it to timeout now.
if actual_result == 'ABORT':
actual_result = json_results.ResultType.Timeout
# The result reported to ResultDB is in the format PASS/FAIL, while the
# expected results in an expectation file are in the format Pass/Failure.
return expectations_parser.RESULT_TAGS[actual_result]