blob: d56e31fbd7d3fa484f4d42a4d11a6fbd631bae8e [file] [log] [blame]
# Copyright 2021 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Functions for querying ResultDB, via the "rdb rpc" subcommand."""
import datetime
import subprocess
import json
import re
from typing import Optional, Tuple
import errors
def get_test_metadata(test_id: str) -> Tuple[str, str]:
"""Fetch test metadata from ResultDB.
Args:
test_id: The full ID of the test to fetch. For Chromium tests this will
begin with "ninja://"
Returns:
A tuple of (test name, filename). The test name will for example have the
form SuitName.TestName for GTest tests. The filename is the location in the
source tree where this test is defined.
"""
history = get_test_result_history(test_id, 1)
if 'entries' not in history:
raise errors.UserError(
f"ResultDB query couldn't find test with ID: {test_id}")
# TODO: Are there other statuses we need to exclude?
if history['entries'][0]['result'].get('status', '') == 'SKIP':
# Test metadata isn't populated for skipped tests. Ideally this wouldn't be
# the case, but for now we just work around it.
# On the off-chance this test is conditionally disabled, we request a bit
# more history in the hopes of finding a test run where it isn't skipped.
history = get_test_result_history(test_id, 10)
for entry in history['entries'][1:]:
if entry['result'].get('status', '') != 'SKIP':
break
else:
raise errors.UserError(
f"Unable to fetch metadata for test {test_id}, as the last 10 runs " +
"in ResultDB have status 'SKIP'. Is the test already disabled?")
else:
entry = history['entries'][0]
try:
inv_name = entry['result']['name']
except KeyError as e:
raise errors.InternalError(
f"Malformed GetTestResultHistory response: no key {e}") from e
# Ideally GetTestResultHistory would return metadata so we could avoid making
# this second RPC.
result = get_test_result(inv_name)
try:
name = result['testMetadata']['name']
loc = result['testMetadata']['location']
repo, filename = loc['repo'], loc['fileName']
except KeyError as e:
raise errors.InternalError(
f"Malformed GetTestResult response: no key {e}") from e
# Parameterised tests put "/n" on the end, where n is a number. This isn't
# reflected in source though, so we strip it.
name = re.sub('/[0-9]*$', '', name)
if repo != 'https://chromium.googlesource.com/chromium/src':
raise errors.UserError(
f"Test is in repo '{repo}', this tool can only disable tests in " +
"chromium/chromium/src")
return name, filename
def get_test_result_history(test_id: str, page_size: int) -> dict:
"""Make a GetTestResultHistory RPC call to ResultDB.
Args:
test_id: The full test ID to query. This can be a regex.
page_size: The number of results to return within the first response.
Returns:
The GetTestResultHistoryResponse message, in dict form.
"""
now = datetime.datetime.now(datetime.timezone.utc)
request = {
'realm': 'chromium:ci',
'testIdRegexp': test_id,
'timeRange': {
'earliest': (now - datetime.timedelta(hours=6)).isoformat(),
'latest': now.isoformat(),
},
'pageSize': page_size,
}
return rdb_rpc('GetTestResultHistory', request)
def get_test_result(test_name: str) -> dict:
"""Make a GetTestResult RPC call to ResultDB.
Args:
test_name: The name of the test result to query. This specifies a result for
a particular test, within a particular test run. As returned by
GetTestResultHistory.
Returns:
The TestResult message, in dict form.
"""
return rdb_rpc('GetTestResult', {
'name': test_name,
})
# Used for caching RPC responses, for development purposes.
CANNED_RESPONSE_FILE: Optional[str] = None
def rdb_rpc(method: str, request: dict) -> dict:
"""Call the given RPC method, with the given request.
Args:
method: The method to call. Must be within luci.resultdb.v1.ResultDB.
request: The request, in dict format.
Returns:
The response from ResultDB, in dict format.
"""
if CANNED_RESPONSE_FILE is not None:
try:
with open(CANNED_RESPONSE_FILE, 'r') as f:
canned_responses = json.load(f)
except Exception:
canned_responses = {}
# HACK: Strip out timestamps when caching the request. GetTestResultHistory
# includes timestamps based on the current time, which will bust the cache.
# But for e2e testing we just want to cache the result the first time and
# then keep using it.
if 'timeRange' in request:
key_request = dict(request)
del key_request['timeRange']
else:
key_request = request
key = f'{method}/{json.dumps(key_request)}'
if (response_json := canned_responses.get(key, None)) is not None:
return json.loads(response_json)
p = subprocess.Popen(['rdb', 'rpc', 'luci.resultdb.v1.ResultDB', method],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
stdout, stderr = p.communicate(json.dumps(request))
if p.returncode != 0:
raise Exception(f'rdb rpc {method} failed with: {stderr}')
if CANNED_RESPONSE_FILE:
canned_responses[key] = stdout
with open(CANNED_RESPONSE_FILE, 'w') as f:
json.dump(canned_responses, f)
return json.loads(stdout)