blob: 5d6b07c71d64a834728171f7852e13fb489090e4 [file] [log] [blame]
# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Functions for querying ResultDB, via the "rdb rpc" subcommand."""
import datetime
import subprocess
import json
import re
from typing import Optional, Tuple
import errors
def get_test_metadata(invocation, test_regex: str) -> Tuple[str, str]:
"""Fetch test metadata from ResultDB.
Args:
invocation: the invocation to fetch the test
test_regex: The regex to match the test id
Returns:
A tuple of (test name, filename). The test name will for example have the
form SuitName.TestName for GTest tests. The filename is the location in the
source tree where this test is defined.
"""
test_results = query_test_result(invocation=invocation, test_regex=test_regex)
if 'testResults' not in test_results:
raise errors.UserError(
f"ResultDB couldn't query for invocation: {invocation}")
if len(test_results["testResults"]) == 0:
raise errors.UserError(
f"ResultDB couldn't find test result for test regex {test_regex}")
result = test_results["testResults"][0]
try:
name = result['testMetadata']['name']
loc = result['testMetadata']['location']
repo, filename = loc['repo'], loc['fileName']
except KeyError as e:
raise errors.InternalError(
f"Malformed GetTestResult response: no key {e}") from e
if repo != 'https://chromium.googlesource.com/chromium/src':
raise errors.UserError(
f"Test is in repo '{repo}', this tool can only disable tests in " +
"chromium/chromium/src")
return name, filename
def get_test_result_history(test_id: str, page_size: int) -> dict:
"""Make a GetTestResultHistory RPC call to ResultDB.
Args:
test_id: The full test ID to query. This can be a regex.
page_size: The number of results to return within the first response.
Returns:
The GetTestResultHistoryResponse message, in dict form.
"""
now = datetime.datetime.now(datetime.timezone.utc)
request = {
'realm': 'chromium:ci',
'testIdRegexp': test_id,
'timeRange': {
'earliest': (now - datetime.timedelta(hours=6)).isoformat(),
'latest': now.isoformat(),
},
'pageSize': page_size,
}
return rdb_rpc('GetTestResultHistory', request)
def get_test_result(test_name: str) -> dict:
"""Make a GetTestResult RPC call to ResultDB.
Args:
test_name: The name of the test result to query. This specifies a result for
a particular test, within a particular test run. As returned by
GetTestResultHistory.
Returns:
The TestResult message, in dict form.
"""
return rdb_rpc('GetTestResult', {
'name': test_name,
})
def query_test_result(invocation: str, test_regex: str):
"""Make a QueryTestResults RPC call to ResultDB.
Args:
invocation_name: the name of the invocation to query for test results
test_regex: The test regex to filter
Returns:
The QueryTestResults response message, in dict form.
"""
request = {
'invocations': [invocation],
'readMask': {
'paths': ['test_id', 'test_metadata'],
},
'pageSize': 1000,
'predicate': {
'testIdRegexp': test_regex,
},
}
return rdb_rpc('QueryTestResults', request)
# Used for caching RPC responses, for development purposes.
CANNED_RESPONSE_FILE: Optional[str] = None
def rdb_rpc(method: str, request: dict) -> dict:
"""Call the given RPC method, with the given request.
Args:
method: The method to call. Must be within luci.resultdb.v1.ResultDB.
request: The request, in dict format.
Returns:
The response from ResultDB, in dict format.
"""
if CANNED_RESPONSE_FILE is not None:
try:
with open(CANNED_RESPONSE_FILE, 'r') as f:
canned_responses = json.load(f)
except Exception:
canned_responses = {}
# HACK: Strip out timestamps when caching the request. GetTestResultHistory
# includes timestamps based on the current time, which will bust the cache.
# But for e2e testing we just want to cache the result the first time and
# then keep using it.
if 'timeRange' in request:
key_request = dict(request)
del key_request['timeRange']
else:
key_request = request
key = f'{method}/{json.dumps(key_request)}'
if (response_json := canned_responses.get(key, None)) is not None:
return json.loads(response_json)
p = subprocess.Popen(['rdb', 'rpc', 'luci.resultdb.v1.ResultDB', method],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
stdout, stderr = p.communicate(json.dumps(request))
if p.returncode != 0:
# rdb doesn't return unique status codes for different errors, so we have to
# just match on the output.
if 'interactive login is required' in stderr:
raise errors.UserError(
"Authentication is required to fetch test metadata.\n" +
"Please run:\n\trdb auth-login\nand try again")
raise Exception(f'rdb rpc {method} failed with: {stderr}')
if CANNED_RESPONSE_FILE:
canned_responses[key] = stdout
with open(CANNED_RESPONSE_FILE, 'w') as f:
json.dump(canned_responses, f)
return json.loads(stdout)