| # Copyright 2020 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import atexit |
| import base64 |
| import cgi |
| import json |
| import logging |
| import os |
| import re |
| import requests |
| import sys |
| import traceback |
| |
| import constants |
| |
| # import protos for exceptions reporting |
| THIS_DIR = os.path.abspath(os.path.dirname(__file__)) |
| CHROMIUM_SRC_DIR = os.path.abspath(os.path.join(THIS_DIR, '../../../..')) |
| sys.path.extend([ |
| os.path.abspath(os.path.join(CHROMIUM_SRC_DIR, 'build/util/lib/proto')), |
| os.path.abspath(os.path.join(CHROMIUM_SRC_DIR, 'build/util/')) |
| ]) |
| import measures |
| import exception_recorder |
| |
| from google.protobuf import json_format |
| from google.protobuf import any_pb2 |
| |
| LOGGER = logging.getLogger(__name__) |
| # VALID_STATUSES is a list of valid status values for test_result['status']. |
| # The full list can be obtained at |
| # https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/proto/v1/test_result.proto;drc=ca12b9f52b27f064b0fa47c39baa3b011ffa5790;l=151-174 |
| VALID_STATUSES = {"PASS", "FAIL", "CRASH", "ABORT", "SKIP"} |
| |
| EXTENDED_PROPERTIES_KEY = 'extendedProperties' |
| |
| def format_exception_stacktrace(e: Exception): |
| exception_trace = traceback.format_exception(type(e), e, e.__traceback__) |
| return exception_trace |
| |
| |
| def _compose_test_result(test_id, |
| status, |
| expected, |
| duration=None, |
| test_log=None, |
| test_loc=None, |
| tags=None, |
| file_artifacts=None): |
| """Composes the test_result dict item to be posted to result sink. |
| |
| Args: |
| test_id: (str) A unique identifier of the test in LUCI context. |
| status: (str) Status of the test. Must be one in |VALID_STATUSES|. |
| duration: (int) Test duration in milliseconds or None if unknown. |
| expected: (bool) Whether the status is expected. |
| test_log: (str) Log of the test. Optional. |
| tags: (list) List of tags. Each item in list should be a length 2 tuple of |
| string as ("key", "value"). Optional. |
| test_loc: (dict): Test location metadata as described in |
| https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/proto/v1/test_metadata.proto;l=32;drc=37488404d1c8aa8fccca8caae4809ece08828bae |
| file_artifacts: (dict) IDs to abs paths mapping of existing files to |
| report as artifact. |
| |
| Returns: |
| A dict of test results with input information, conforming to |
| https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto |
| """ |
| tags = tags or [] |
| file_artifacts = file_artifacts or {} |
| |
| assert status in VALID_STATUSES, ( |
| '%s is not a valid status (one in %s) for ResultSink.' % |
| (status, VALID_STATUSES)) |
| |
| for tag in tags: |
| assert len(tag) == 2, 'Items in tags should be length 2 tuples of strings' |
| assert isinstance(tag[0], str) and isinstance( |
| tag[1], str), ('Items in' |
| 'tags should be length 2 tuples of strings') |
| |
| test_result = { |
| 'testId': test_id, |
| 'status': status, |
| 'expected': expected, |
| 'tags': [{ |
| 'key': key, |
| 'value': value |
| } for (key, value) in tags], |
| 'testIdStructured': _get_struct_test_dict(test_id), |
| 'testMetadata': { |
| 'name': test_id, |
| 'location': test_loc, |
| } |
| } |
| |
| test_result['artifacts'] = { |
| name: { |
| 'filePath': file_artifacts[name] |
| } for name in file_artifacts |
| } |
| if test_log: |
| message = '' |
| if sys.version_info.major < 3: |
| message = base64.b64encode(test_log) |
| else: |
| # Python3 b64encode takes and returns bytes. The result must be |
| # serializable in order for the eventual json.dumps to succeed |
| message = base64.b64encode(test_log.encode('utf-8')).decode('utf-8') |
| test_result['summaryHtml'] = '<text-artifact artifact-id="Test Log" />' |
| test_result['artifacts'].update({ |
| 'Test Log': { |
| 'contents': message |
| }, |
| }) |
| # assign primary error message if the host app crashed |
| if constants.CRASH_MESSAGE in test_log: |
| primary_error_message = constants.CRASH_MESSAGE |
| if constants.ASAN_ERROR in test_log: |
| primary_error_message += f' {constants.ASAN_ERROR}' |
| test_result['failureReason'] = { |
| 'primaryErrorMessage': primary_error_message |
| } |
| if not test_result['artifacts']: |
| test_result.pop('artifacts') |
| |
| if duration: |
| test_result['duration'] = '%.9fs' % (duration / 1000.0) |
| |
| return test_result |
| |
| |
| def _get_struct_test_dict(test_id): |
| """Returns a structured_test_dict with filled in fields. |
| |
| Args: |
| test_id: A string of the test_id. |
| |
| Returns: |
| A dictionary with the struct fields filled in. |
| """ |
| # Source comes from: |
| # infra/go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto |
| struct_test_dict = { |
| 'coarseName': None, # Not used for gtests or xctests. |
| 'fineName': None, |
| 'caseNameComponents': [''], |
| } |
| |
| found_match = False |
| # We may encounter gtests or XCTests which are parsed differently. |
| # Attempt to parse gtests based on: |
| # infra/go/src/infra/tools/result_adapter/gtest.go |
| # Type-parameterised test (e.g. MyInstantiation/FooTest/MyType.DoesBar) |
| re_match = re.search(r'^((\w+)/)?(\w+)/(\w+)\.(\w+)$', test_id) |
| if re_match: |
| suite = re_match.group(3) |
| name = re_match.group(5) |
| instantiation = re_match.group(2) |
| case_id = re_match.group(4) |
| found_match = True |
| |
| # Value-parameterised test (e.g. MyInstantiation/FooTest.DoesBar/TestValue) |
| re_match = re.search(r'^((\w+)/)?(\w+)\.(\w+)/(\w+)$', test_id) |
| if not found_match and re_match: |
| suite = re_match.group(3) |
| name = re_match.group(4) |
| instantiation = re_match.group(2) |
| case_id = re_match.group(5) |
| found_match = True |
| |
| # Neither type nor value-parameterised (e.g. FooTest.DoesBar) |
| re_match = re.search(r'^(\w+)\.(\w+)$', test_id) |
| if not found_match and re_match: |
| suite = re_match.group(1) |
| name = re_match.group(2) |
| instantiation = "" |
| case_id = "" |
| found_match = True |
| |
| if found_match: |
| struct_test_dict['fineName'] = suite |
| if not case_id: |
| struct_test_dict['caseNameComponents'] = [name] |
| elif not instantiation: |
| struct_test_dict['caseNameComponents'] = ['%s/%s' % (name, case_id)] |
| else: |
| struct_test_dict['caseNameComponents'] = ['%s/%s.%s' % (name, instantiation, case_id)] |
| |
| # XCTests format. |
| re_match = re.search(r'(.*)/(.*)', test_id) |
| if not found_match and re_match: |
| struct_test_dict['fineName'] = re_match.group(1) |
| struct_test_dict['caseNameComponents'] = [re_match.group(2)] |
| found_match = True |
| |
| if not found_match: |
| logging.error( |
| 'Test id %s did not match known format, so could not be parsed.' % |
| test_id) |
| |
| return struct_test_dict |
| |
| |
| def _to_camel_case(s): |
| """Converts the string s from snake_case to lowerCamelCase.""" |
| |
| elems = s.split('_') |
| return elems[0] + ''.join(elem.capitalize() for elem in elems[1:]) |
| |
| |
| class ResultSinkClient(object): |
| """Stores constants and handles posting to ResultSink.""" |
| |
| def __init__(self): |
| """Initiates and stores constants to class.""" |
| self.sink = None |
| luci_context_file = os.environ.get('LUCI_CONTEXT') |
| if not luci_context_file: |
| logging.warning('LUCI_CONTEXT not found in environment. ResultDB' |
| ' integration disabled.') |
| return |
| |
| with open(luci_context_file) as f: |
| self.sink = json.load(f).get('result_sink') |
| if not self.sink: |
| logging.warning('ResultSink constants not found in LUCI context.' |
| ' ResultDB integration disabled.') |
| return |
| |
| self.url = ('http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults' % |
| self.sink['address']) |
| self.headers = { |
| 'Content-Type': 'application/json', |
| 'Accept': 'application/json', |
| 'Authorization': 'ResultSink %s' % self.sink['auth_token'], |
| } |
| self._session = requests.Session() |
| |
| # Ensure session is closed at exit. |
| atexit.register(self.close) |
| |
| logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) |
| |
| def close(self): |
| """Closes the connection to result sink server.""" |
| if not self.sink: |
| return |
| LOGGER.info('Closing connection with result sink server.') |
| # Reset to default logging level of test runner scripts. |
| logging.getLogger("urllib3.connectionpool").setLevel(logging.DEBUG) |
| self._session.close() |
| |
| def post(self, test_id, status, expected, **kwargs): |
| """Composes and posts a test and status to result sink. |
| |
| Args: |
| test_id: (str) A unique identifier of the test in LUCI context. |
| status: (str) Status of the test. Must be one in |VALID_STATUSES|. |
| expected: (bool) Whether the status is expected. |
| **kwargs: Optional keyword args. Namely: |
| duration: (int) Test duration in milliseconds or None if unknown. |
| test_log: (str) Log of the test. Optional. |
| tags: (list) List of tags. Each item in list should be a length 2 tuple |
| of string as ("key", "value"). Optional. |
| file_artifacts: (dict) IDs to abs paths mapping of existing files to |
| report as artifact. |
| """ |
| if not self.sink: |
| return |
| self._post_test_result( |
| _compose_test_result(test_id, status, expected, **kwargs)) |
| |
| def _post_test_result(self, test_result): |
| """Posts single test result to server. |
| |
| This method assumes |self.sink| is not None. |
| |
| Args: |
| test_result: (dict) Confirming to protocol defined in |
| https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto |
| """ |
| res = self._session.post( |
| url=self.url, |
| headers=self.headers, |
| data=json.dumps({'testResults': [test_result]}), |
| ) |
| res.raise_for_status() |
| |
| def post_extended_properties(self): |
| """Posts extended properties to server with retry. |
| """ |
| if not self.sink: |
| return |
| try_count = 0 |
| try_count_max = 2 |
| while try_count < try_count_max: |
| try_count += 1 |
| try: |
| self._post_extended_properties() |
| break |
| except Exception as e: |
| logging.error("Got error %s when uploading extended properties.", e) |
| if try_count < try_count_max: |
| # Upload can fail due to record size being too big. In this case, |
| # report just the upload failure. |
| exception_recorder.clear() |
| measures.clear() |
| exception_recorder.register(e) |
| else: |
| # Swallow the exception if the upload fails again and hit the max |
| # try so that it won't fail the test task (and it shouldn't). |
| logging.error("Hit max retry. Skip uploading extended properties.") |
| |
| def _post_extended_properties(self): |
| """Posts extended properties to server. |
| |
| Assumes self.sink has been initialized. |
| |
| Packages exception_occurrences_pb2 and test_script_metrics_pb2 and sends an |
| UpdateInvocation post request to result sink. |
| """ |
| invocation = {EXTENDED_PROPERTIES_KEY: {}} |
| paths = [] |
| |
| # Sink server by default decodes payload with protojson, i.e. codecJSONV2 |
| # in https://source.chromium.org/search?q=f:server.go%20func:requestCodec |
| # which requires loweCamelCase names in the json request. |
| # For the value for update mask, see "JSON Encoding of Field Masks" in |
| # https://protobuf.dev/reference/protobuf/google.protobuf/#field-masks |
| if exception_recorder.size() > 0: |
| invocation[EXTENDED_PROPERTIES_KEY][ |
| exception_recorder.EXCEPTION_OCCURRENCES_KEY] = \ |
| exception_recorder.to_dict() |
| paths.append('%s.%s' % (EXTENDED_PROPERTIES_KEY, |
| _to_camel_case(exception_recorder.EXCEPTION_OCCURRENCES_KEY))) |
| |
| if measures.size() > 0: |
| invocation[EXTENDED_PROPERTIES_KEY][measures.TEST_SCRIPT_METRICS_KEY] = \ |
| measures.to_dict() |
| paths.append('%s.%s' % |
| (EXTENDED_PROPERTIES_KEY, _to_camel_case(measures.TEST_SCRIPT_METRICS_KEY))) |
| |
| req = {'invocation': invocation, 'updateMask': ','.join(paths)} |
| |
| inv_data = json.dumps(req, sort_keys=True) |
| |
| LOGGER.info(inv_data) |
| |
| updateInvo_url = ( |
| 'http://%s/prpc/luci.resultsink.v1.Sink/UpdateInvocation' % |
| self.sink['address']) |
| res = self._session.post( |
| url=updateInvo_url, |
| headers=self.headers, |
| data=inv_data, |
| ) |
| res.raise_for_status() |