| # Copyright 2020 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Methods related to querying the ResultDB BigQuery tables.""" |
| |
| import logging |
| import time |
| from typing import Collection, Dict, Generator, Iterable, List, Optional, Tuple |
| |
| # vpython-provided modules. |
| # pylint: disable=import-error |
| from google.cloud import bigquery |
| from google.cloud import bigquery_storage |
| import pandas |
| # pylint: enable=import-error |
| |
| # //third_party/catapult/third_party/typ imports. |
| from typ import expectations_parser |
| from typ import json_results |
| |
| # //testing imports. |
| from unexpected_passes_common import constants |
| from unexpected_passes_common import data_types |
| from unexpected_passes_common import expectations |
| |
| DEFAULT_NUM_SAMPLES = 100 |
| |
| # Subquery for getting all try builds that were used for CL submission. 30 days |
| # is chosen because the ResultDB tables we pull data from only keep data around |
| # for 30 days. |
| PARTITIONED_SUBMITTED_BUILDS_TEMPLATE = """\ |
| SELECT |
| CONCAT("build-", CAST(unnested_builds.id AS STRING)) as id |
| FROM |
| `commit-queue.{project_view}.attempts`, |
| UNNEST(builds) as unnested_builds, |
| UNNEST(gerrit_changes) as unnested_changes |
| WHERE |
| unnested_builds.host = "cr-buildbucket.appspot.com" |
| AND unnested_changes.submit_status = "SUCCESS" |
| AND start_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), |
| INTERVAL 30 DAY)""" |
| |
| QueryResult = pandas.Series |
| |
| |
| class BigQueryQuerier: |
| """Class to handle all BigQuery queries for a script invocation.""" |
| |
| def __init__(self, suite: Optional[str], project: str, num_samples: int, |
| keep_unmatched_results: bool): |
| """ |
| Args: |
| suite: A string containing the name of the suite that is being queried |
| for. Can be None if there is no differentiation between different |
| suites. |
| project: A string containing the billing project to use for BigQuery. |
| num_samples: An integer containing the number of builds to pull results |
| from. |
| keep_unmatched_results: Whether to store and return unmatched results |
| for debugging purposes. |
| """ |
| self._suite = suite |
| self._project = project |
| self._num_samples = num_samples or DEFAULT_NUM_SAMPLES |
| self._keep_unmatched_results = keep_unmatched_results |
| |
| assert self._num_samples > 0 |
| |
| def FillExpectationMapForBuilders( |
| self, expectation_map: data_types.TestExpectationMap, |
| builders: Collection[data_types.BuilderEntry] |
| ) -> Dict[str, data_types.ResultListType]: |
| """Fills |expectation_map| with results from |builders|. |
| |
| Args: |
| expectation_map: A data_types.TestExpectationMap. Will be modified |
| in-place. |
| builders: An iterable of data_types.BuilderEntry containing the builders |
| to query. |
| |
| Returns: |
| A dict containing any results that were retrieved that did not have a |
| matching expectation in |expectation_map| in the following format: |
| { |
| |builder_type|:|builder_name| (str): [ |
| result1 (data_types.Result), |
| result2 (data_types.Result), |
| ... |
| ], |
| } |
| """ |
| start_time = time.time() |
| logging.debug('Starting to fill expectation map for %d builders', |
| len(builders)) |
| assert isinstance(expectation_map, data_types.TestExpectationMap) |
| # Ensure that all the builders are of the same type since we make some |
| # assumptions about that later on. |
| assert builders |
| builder_type = None |
| for b in builders: |
| if builder_type is None: |
| builder_type = b.builder_type |
| else: |
| assert b.builder_type == builder_type |
| |
| internal_statuses = set() |
| for b in builders: |
| internal_statuses.add(b.is_internal_builder) |
| |
| matched_builders = set() |
| all_unmatched_results = {} |
| for internal in internal_statuses: |
| for builder_name, results, expectation_files in ( |
| self.GetBuilderGroupedQueryResults(builder_type, internal)): |
| matching_builder = None |
| for b in builders: |
| if b.name == builder_name and b.is_internal_builder == internal: |
| matching_builder = b |
| break |
| |
| if not matching_builder: |
| logging.warning( |
| 'Did not find a matching builder for name %s and ' |
| 'internal status %s. This is normal if the builder ' |
| 'is no longer running tests (e.g. it was ' |
| 'experimental).', builder_name, internal) |
| continue |
| |
| if matching_builder in matched_builders: |
| raise RuntimeError( |
| f'Got query result batches matched to builder ' |
| f'{matching_builder} twice - this is indicative of a malformed ' |
| f'query returning results that are not sorted by builder') |
| matched_builders.add(matching_builder) |
| |
| prefixed_builder_name = '%s/%s:%s' % (matching_builder.project, |
| matching_builder.builder_type, |
| matching_builder.name) |
| unmatched_results = expectation_map.AddResultList( |
| prefixed_builder_name, results, expectation_files) |
| if self._keep_unmatched_results: |
| if unmatched_results: |
| all_unmatched_results[prefixed_builder_name] = unmatched_results |
| else: |
| logging.info('Dropping %d unmatched results', len(unmatched_results)) |
| |
| logging.debug('Filling expectation map took %f', time.time() - start_time) |
| return all_unmatched_results |
| |
| def GetBuilderGroupedQueryResults( |
| self, builder_type: str, is_internal: bool |
| ) -> Generator[Tuple[str, data_types.ResultListType, Optional[List[str]]], |
| None, None]: |
| """Generates results for all relevant builders grouped by builder name. |
| |
| Args: |
| builder_type: Whether the builders are CI or try builders. |
| is_internal: Whether the builders are internal. |
| |
| Yields: |
| A tuple (builder_name, results). |builder_name| is a string specifying the |
| builder that |results| came from. |results| is a data_types.ResultListType |
| containing all the results for |builder_name|. |
| """ |
| if builder_type == constants.BuilderTypes.CI: |
| if is_internal: |
| query = self._GetInternalCiQuery() |
| else: |
| query = self._GetPublicCiQuery() |
| elif builder_type == constants.BuilderTypes.TRY: |
| if is_internal: |
| query = self._GetInternalTryQuery() |
| else: |
| query = self._GetPublicTryQuery() |
| else: |
| raise RuntimeError(f'Unknown builder type {builder_type}') |
| |
| current_builder = None |
| rows_for_builder = [] |
| for row in self._GetSeriesForQuery(query): |
| if current_builder is None: |
| current_builder = row.builder_name |
| if row.builder_name != current_builder: |
| results_for_builder, expectation_files = self._ProcessRowsForBuilder( |
| rows_for_builder) |
| # The processing should have cleared out all the stored rows. |
| assert not rows_for_builder |
| yield current_builder, results_for_builder, expectation_files |
| current_builder = row.builder_name |
| rows_for_builder.append(row) |
| |
| if current_builder is None: |
| logging.warning( |
| 'Did not get any results for builder type %s and internal status %s. ' |
| 'Depending on where tests are run and how frequently trybots are ' |
| 'used for submission, this may be benign.', builder_type, is_internal) |
| |
| if current_builder is not None and rows_for_builder: |
| results_for_builder, expectation_files = self._ProcessRowsForBuilder( |
| rows_for_builder) |
| assert not rows_for_builder |
| yield current_builder, results_for_builder, expectation_files |
| |
| def _GetSeriesForQuery(self, |
| query: str) -> Generator[pandas.Series, None, None]: |
| """Generates results for |query|. |
| |
| Args: |
| query: A string containing the BigQuery query to run. |
| |
| Yields: |
| A pandas.Series object for each row returned by the query. Columns can be |
| accessed directly as attributes. |
| """ |
| client = bigquery.Client( |
| project=self._project, |
| default_query_job_config=bigquery.QueryJobConfig(use_legacy_sql=False)) |
| job = client.query(query) |
| row_iterator = job.result() |
| # Using a Dataframe iterator instead of directly using |row_iterator| allows |
| # us to use the BigQuery Storage API, which results in ~10x faster query |
| # result retrieval at the cost of a few more dependencies. |
| dataframe_iterator = row_iterator.to_dataframe_iterable( |
| bigquery_storage.BigQueryReadClient()) |
| for df in dataframe_iterator: |
| for _, row in df.iterrows(): |
| yield row |
| |
| def _GetPublicCiQuery(self) -> str: |
| """Returns the BigQuery query for public CI builder results.""" |
| raise NotImplementedError() |
| |
| def _GetInternalCiQuery(self) -> str: |
| """Returns the BigQuery query for internal CI builder results.""" |
| raise NotImplementedError() |
| |
| def _GetPublicTryQuery(self) -> str: |
| """Returns the BigQuery query for public try builder results.""" |
| raise NotImplementedError() |
| |
| def _GetInternalTryQuery(self) -> str: |
| """Returns the BigQuery query for internal try builder results.""" |
| raise NotImplementedError() |
| |
| def _ProcessRowsForBuilder( |
| self, rows: List[QueryResult] |
| ) -> Tuple[data_types.ResultListType, Optional[List[str]]]: |
| """Processes rows from a query into data_types.Result representations. |
| |
| Args: |
| rows: A list of rows from a BigQuery query. |
| |
| Returns: |
| A tuple (results, expectation_files). |results| is a list of |
| data_types.Result objects. |expectation_files| is the list of expectation |
| files that are used by the tests in |results|, but can be None to specify |
| that all expectation files should be considered. |
| """ |
| # It's possible that a builder runs multiple versions of a test with |
| # different expectation files for each version. So, find a result for each |
| # unique step and get the expectation files from all of them. |
| results_for_each_step = {} |
| for r in rows: |
| step_name = r.step_name |
| if step_name not in results_for_each_step: |
| results_for_each_step[step_name] = r |
| |
| expectation_files = set() |
| for r in results_for_each_step.values(): |
| # None is a special value indicating "use all expectation files", so |
| # handle that. |
| ef = self._GetRelevantExpectationFilesForQueryResult(r) |
| if ef is None: |
| expectation_files = None |
| break |
| expectation_files |= set(ef) |
| if expectation_files is not None: |
| expectation_files = list(expectation_files) |
| |
| # The query result list is potentially very large, so reduce the list as we |
| # iterate over it instead of using a standard for/in so that we don't |
| # temporarily end up with a ~2x increase in memory. |
| results = [] |
| while rows: |
| r = rows.pop() |
| if self._ShouldSkipOverResult(r): |
| continue |
| results.append(self._ConvertBigQueryRowToResultObject(r)) |
| |
| return results, expectation_files |
| |
| def _ConvertBigQueryRowToResultObject(self, |
| row: QueryResult) -> data_types.Result: |
| """Converts a single BigQuery result row to a data_types.Result. |
| |
| Args: |
| row: A single row from BigQuery. |
| |
| Returns: |
| A data_types.Result object containing the information from |row|. |
| """ |
| build_id = _StripPrefixFromBuildId(row.id) |
| test_name = self._StripPrefixFromTestId(row.test_id) |
| actual_result = _ConvertActualResultToExpectationFileFormat(row.status) |
| tags = expectations.GetInstance().FilterToKnownTags(row.typ_tags) |
| step = row.step_name |
| return data_types.Result(test_name, tags, actual_result, step, build_id) |
| |
| def _GetRelevantExpectationFilesForQueryResult( |
| self, query_result: QueryResult) -> Optional[Iterable[str]]: |
| """Gets the relevant expectation file names for a given query result. |
| |
| Args: |
| query_result: An object representing a row/result from a query. Columns |
| can be accessed via .column_name. |
| |
| Returns: |
| An iterable of strings containing expectation file names that are |
| relevant to |query_result|, or None if all expectation files should be |
| considered relevant. |
| """ |
| raise NotImplementedError() |
| |
| # Overridden by subclasses. |
| # pylint: disable=no-self-use |
| def _ShouldSkipOverResult(self, result: QueryResult) -> bool: |
| """Whether |result| should be ignored and skipped over. |
| |
| Args: |
| result: A dict containing a single BigQuery result row. |
| |
| Returns: |
| True if the result should be skipped over/ignored, otherwise False. |
| """ |
| del result |
| return False |
| # pylint: enable=no-self-use |
| |
| def _StripPrefixFromTestId(self, test_id: str) -> str: |
| """Strips the prefix from a test ID, leaving only the test case name. |
| |
| Args: |
| test_id: A string containing a full ResultDB test ID, e.g. |
| ninja://target/directory.suite.class.test_case |
| |
| Returns: |
| A string containing the test cases name extracted from |test_id|. |
| """ |
| raise NotImplementedError() |
| |
| |
| def _StripPrefixFromBuildId(build_id: str) -> str: |
| # Build IDs provided by ResultDB are prefixed with "build-" |
| split_id = build_id.split('-') |
| assert len(split_id) == 2 |
| return split_id[-1] |
| |
| |
| def _ConvertActualResultToExpectationFileFormat(actual_result: str) -> str: |
| # Web tests use ResultDB's ABORT value for both test timeouts and device |
| # failures, but Abort is not defined in typ. So, map it to timeout now. |
| if actual_result == 'ABORT': |
| actual_result = json_results.ResultType.Timeout |
| # The result reported to ResultDB is in the format PASS/FAIL, while the |
| # expected results in an expectation file are in the format Pass/Failure. |
| return expectations_parser.RESULT_TAGS[actual_result] |