blob: 83f01fe117855d93b6067b427f3cfec2c72657fb [file] [log] [blame]
# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import atexit
import base64
import cgi
import json
import logging
import os
import re
import requests
import sys
import traceback
import constants
# import protos for exceptions reporting
THIS_DIR = os.path.abspath(os.path.dirname(__file__))
CHROMIUM_SRC_DIR = os.path.abspath(os.path.join(THIS_DIR, '../../../..'))
sys.path.extend([
os.path.abspath(os.path.join(CHROMIUM_SRC_DIR, 'build/util/lib/proto')),
os.path.abspath(os.path.join(CHROMIUM_SRC_DIR, 'build/util/'))
])
import measures
import exception_recorder
from google.protobuf import json_format
from google.protobuf import any_pb2
LOGGER = logging.getLogger(__name__)
# VALID_STATUSES is a list of valid status values for test_result['status'].
# The full list can be obtained at
# https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/proto/v1/test_result.proto;drc=ca12b9f52b27f064b0fa47c39baa3b011ffa5790;l=151-174
VALID_STATUSES = {"PASS", "FAIL", "CRASH", "ABORT", "SKIP"}
EXTENDED_PROPERTIES_KEY = 'extendedProperties'
def format_exception_stacktrace(e: Exception):
exception_trace = traceback.format_exception(type(e), e, e.__traceback__)
return exception_trace
def _compose_test_result(test_id,
status,
expected,
duration=None,
test_log=None,
test_loc=None,
tags=None,
file_artifacts=None):
"""Composes the test_result dict item to be posted to result sink.
Args:
test_id: (str) A unique identifier of the test in LUCI context.
status: (str) Status of the test. Must be one in |VALID_STATUSES|.
duration: (int) Test duration in milliseconds or None if unknown.
expected: (bool) Whether the status is expected.
test_log: (str) Log of the test. Optional.
tags: (list) List of tags. Each item in list should be a length 2 tuple of
string as ("key", "value"). Optional.
test_loc: (dict): Test location metadata as described in
https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/proto/v1/test_metadata.proto;l=32;drc=37488404d1c8aa8fccca8caae4809ece08828bae
file_artifacts: (dict) IDs to abs paths mapping of existing files to
report as artifact.
Returns:
A dict of test results with input information, conforming to
https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto
"""
tags = tags or []
file_artifacts = file_artifacts or {}
assert status in VALID_STATUSES, (
'%s is not a valid status (one in %s) for ResultSink.' %
(status, VALID_STATUSES))
for tag in tags:
assert len(tag) == 2, 'Items in tags should be length 2 tuples of strings'
assert isinstance(tag[0], str) and isinstance(
tag[1], str), ('Items in'
'tags should be length 2 tuples of strings')
test_result = {
'testId': test_id,
'status': status,
'expected': expected,
'tags': [{
'key': key,
'value': value
} for (key, value) in tags],
'testIdStructured': _get_struct_test_dict(test_id),
'testMetadata': {
'name': test_id,
'location': test_loc,
}
}
test_result['artifacts'] = {
name: {
'filePath': file_artifacts[name]
} for name in file_artifacts
}
if test_log:
message = ''
if sys.version_info.major < 3:
message = base64.b64encode(test_log)
else:
# Python3 b64encode takes and returns bytes. The result must be
# serializable in order for the eventual json.dumps to succeed
message = base64.b64encode(test_log.encode('utf-8')).decode('utf-8')
test_result['summaryHtml'] = '<text-artifact artifact-id="Test Log" />'
test_result['artifacts'].update({
'Test Log': {
'contents': message
},
})
# assign primary error message if the host app crashed
if constants.CRASH_MESSAGE in test_log:
primary_error_message = constants.CRASH_MESSAGE
if constants.ASAN_ERROR in test_log:
primary_error_message += f' {constants.ASAN_ERROR}'
test_result['failureReason'] = {
'primaryErrorMessage': primary_error_message
}
if not test_result['artifacts']:
test_result.pop('artifacts')
if duration:
test_result['duration'] = '%.9fs' % (duration / 1000.0)
return test_result
def _get_struct_test_dict(test_id):
"""Returns a structured_test_dict with filled in fields.
Args:
test_id: A string of the test_id.
Returns:
A dictionary with the struct fields filled in.
"""
# Source comes from:
# infra/go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto
struct_test_dict = {
'coarseName': None, # Not used for gtests or xctests.
'fineName': None,
'caseNameComponents': [''],
}
found_match = False
# We may encounter gtests or XCTests which are parsed differently.
# Attempt to parse gtests based on:
# infra/go/src/infra/tools/result_adapter/gtest.go
# Type-parameterised test (e.g. MyInstantiation/FooTest/MyType.DoesBar)
re_match = re.search(r'^((\w+)/)?(\w+)/(\w+)\.(\w+)$', test_id)
if re_match:
suite = re_match.group(3)
name = re_match.group(5)
instantiation = re_match.group(2)
case_id = re_match.group(4)
found_match = True
# Value-parameterised test (e.g. MyInstantiation/FooTest.DoesBar/TestValue)
re_match = re.search(r'^((\w+)/)?(\w+)\.(\w+)/(\w+)$', test_id)
if not found_match and re_match:
suite = re_match.group(3)
name = re_match.group(4)
instantiation = re_match.group(2)
case_id = re_match.group(5)
found_match = True
# Neither type nor value-parameterised (e.g. FooTest.DoesBar)
re_match = re.search(r'^(\w+)\.(\w+)$', test_id)
if not found_match and re_match:
suite = re_match.group(1)
name = re_match.group(2)
instantiation = ""
case_id = ""
found_match = True
if found_match:
struct_test_dict['fineName'] = suite
if not case_id:
struct_test_dict['caseNameComponents'] = [name]
elif not instantiation:
struct_test_dict['caseNameComponents'] = ['%s/%s' % (name, case_id)]
else:
struct_test_dict['caseNameComponents'] = ['%s/%s.%s' % (name, instantiation, case_id)]
# XCTests format.
re_match = re.search(r'(.*)/(.*)', test_id)
if not found_match and re_match:
struct_test_dict['fineName'] = re_match.group(1)
struct_test_dict['caseNameComponents'] = [re_match.group(2)]
found_match = True
if not found_match:
logging.error(
'Test id %s did not match known format, so could not be parsed.' %
test_id)
return struct_test_dict
def _to_camel_case(s):
"""Converts the string s from snake_case to lowerCamelCase."""
elems = s.split('_')
return elems[0] + ''.join(elem.capitalize() for elem in elems[1:])
class ResultSinkClient(object):
"""Stores constants and handles posting to ResultSink."""
def __init__(self):
"""Initiates and stores constants to class."""
self.sink = None
luci_context_file = os.environ.get('LUCI_CONTEXT')
if not luci_context_file:
logging.warning('LUCI_CONTEXT not found in environment. ResultDB'
' integration disabled.')
return
with open(luci_context_file) as f:
self.sink = json.load(f).get('result_sink')
if not self.sink:
logging.warning('ResultSink constants not found in LUCI context.'
' ResultDB integration disabled.')
return
self.url = ('http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults' %
self.sink['address'])
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'ResultSink %s' % self.sink['auth_token'],
}
self._session = requests.Session()
# Ensure session is closed at exit.
atexit.register(self.close)
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
def close(self):
"""Closes the connection to result sink server."""
if not self.sink:
return
LOGGER.info('Closing connection with result sink server.')
# Reset to default logging level of test runner scripts.
logging.getLogger("urllib3.connectionpool").setLevel(logging.DEBUG)
self._session.close()
def post(self, test_id, status, expected, **kwargs):
"""Composes and posts a test and status to result sink.
Args:
test_id: (str) A unique identifier of the test in LUCI context.
status: (str) Status of the test. Must be one in |VALID_STATUSES|.
expected: (bool) Whether the status is expected.
**kwargs: Optional keyword args. Namely:
duration: (int) Test duration in milliseconds or None if unknown.
test_log: (str) Log of the test. Optional.
tags: (list) List of tags. Each item in list should be a length 2 tuple
of string as ("key", "value"). Optional.
file_artifacts: (dict) IDs to abs paths mapping of existing files to
report as artifact.
"""
if not self.sink:
return
self._post_test_result(
_compose_test_result(test_id, status, expected, **kwargs))
def _post_test_result(self, test_result):
"""Posts single test result to server.
This method assumes |self.sink| is not None.
Args:
test_result: (dict) Confirming to protocol defined in
https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto
"""
res = self._session.post(
url=self.url,
headers=self.headers,
data=json.dumps({'testResults': [test_result]}),
)
res.raise_for_status()
def post_extended_properties(self):
"""Posts extended properties to server with retry.
"""
if not self.sink:
return
try_count = 0
try_count_max = 2
while try_count < try_count_max:
try_count += 1
try:
self._post_extended_properties()
break
except Exception as e:
logging.error("Got error %s when uploading extended properties.", e)
if try_count < try_count_max:
# Upload can fail due to record size being too big. In this case,
# report just the upload failure.
exception_recorder.clear()
measures.clear()
exception_recorder.register(e)
else:
# Swallow the exception if the upload fails again and hit the max
# try so that it won't fail the test task (and it shouldn't).
logging.error("Hit max retry. Skip uploading extended properties.")
def _post_extended_properties(self):
"""Posts extended properties to server.
Assumes self.sink has been initialized.
Packages exception_occurrences_pb2 and test_script_metrics_pb2 and sends an
UpdateInvocation post request to result sink.
"""
invocation = {EXTENDED_PROPERTIES_KEY: {}}
paths = []
# Sink server by default decodes payload with protojson, i.e. codecJSONV2
# in https://source.chromium.org/search?q=f:server.go%20func:requestCodec
# which requires loweCamelCase names in the json request.
# For the value for update mask, see "JSON Encoding of Field Masks" in
# https://protobuf.dev/reference/protobuf/google.protobuf/#field-masks
if exception_recorder.size() > 0:
invocation[EXTENDED_PROPERTIES_KEY][
exception_recorder.EXCEPTION_OCCURRENCES_KEY] = \
exception_recorder.to_dict()
paths.append('%s.%s' % (EXTENDED_PROPERTIES_KEY,
_to_camel_case(exception_recorder.EXCEPTION_OCCURRENCES_KEY)))
if measures.size() > 0:
invocation[EXTENDED_PROPERTIES_KEY][measures.TEST_SCRIPT_METRICS_KEY] = \
measures.to_dict()
paths.append('%s.%s' %
(EXTENDED_PROPERTIES_KEY, _to_camel_case(measures.TEST_SCRIPT_METRICS_KEY)))
req = {'invocation': invocation, 'updateMask': ','.join(paths)}
inv_data = json.dumps(req, sort_keys=True)
LOGGER.info(inv_data)
updateInvo_url = (
'http://%s/prpc/luci.resultsink.v1.Sink/UpdateInvocation' %
self.sink['address'])
res = self._session.post(
url=updateInvo_url,
headers=self.headers,
data=inv_data,
)
res.raise_for_status()