| # Copyright 2020 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import atexit |
| import base64 |
| import cgi |
| import json |
| import logging |
| import os |
| import requests |
| import sys |
| import traceback |
| |
| import constants |
| |
| # import protos for exceptions reporting |
| THIS_DIR = os.path.abspath(os.path.dirname(__file__)) |
| CHROMIUM_SRC_DIR = os.path.abspath(os.path.join(THIS_DIR, '../../../..')) |
| sys.path.append( |
| os.path.abspath(os.path.join(CHROMIUM_SRC_DIR, 'build/util/lib/proto'))) |
| import exception_occurrences_pb2 |
| |
| from google.protobuf import json_format |
| from google.protobuf import any_pb2 |
| |
| LOGGER = logging.getLogger(__name__) |
| # VALID_STATUSES is a list of valid status values for test_result['status']. |
| # The full list can be obtained at |
| # https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/proto/v1/test_result.proto;drc=ca12b9f52b27f064b0fa47c39baa3b011ffa5790;l=151-174 |
| VALID_STATUSES = {"PASS", "FAIL", "CRASH", "ABORT", "SKIP"} |
| |
| |
| def format_exception_stacktrace(e: Exception): |
| exception_trace = traceback.format_exception(type(e), e, e.__traceback__) |
| return exception_trace |
| |
| |
| def _compose_test_result(test_id, |
| status, |
| expected, |
| duration=None, |
| test_log=None, |
| test_loc=None, |
| tags=None, |
| file_artifacts=None): |
| """Composes the test_result dict item to be posted to result sink. |
| |
| Args: |
| test_id: (str) A unique identifier of the test in LUCI context. |
| status: (str) Status of the test. Must be one in |VALID_STATUSES|. |
| duration: (int) Test duration in milliseconds or None if unknown. |
| expected: (bool) Whether the status is expected. |
| test_log: (str) Log of the test. Optional. |
| tags: (list) List of tags. Each item in list should be a length 2 tuple of |
| string as ("key", "value"). Optional. |
| test_loc: (dict): Test location metadata as described in |
| https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/proto/v1/test_metadata.proto;l=32;drc=37488404d1c8aa8fccca8caae4809ece08828bae |
| file_artifacts: (dict) IDs to abs paths mapping of existing files to |
| report as artifact. |
| |
| Returns: |
| A dict of test results with input information, confirming to |
| https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto |
| """ |
| tags = tags or [] |
| file_artifacts = file_artifacts or {} |
| |
| assert status in VALID_STATUSES, ( |
| '%s is not a valid status (one in %s) for ResultSink.' % |
| (status, VALID_STATUSES)) |
| |
| for tag in tags: |
| assert len(tag) == 2, 'Items in tags should be length 2 tuples of strings' |
| assert isinstance(tag[0], str) and isinstance( |
| tag[1], str), ('Items in' |
| 'tags should be length 2 tuples of strings') |
| |
| test_result = { |
| 'testId': test_id, |
| 'status': status, |
| 'expected': expected, |
| 'tags': [{ |
| 'key': key, |
| 'value': value |
| } for (key, value) in tags], |
| 'testMetadata': { |
| 'name': test_id, |
| 'location': test_loc, |
| } |
| } |
| |
| test_result['artifacts'] = { |
| name: { |
| 'filePath': file_artifacts[name] |
| } for name in file_artifacts |
| } |
| if test_log: |
| message = '' |
| if sys.version_info.major < 3: |
| message = base64.b64encode(test_log) |
| else: |
| # Python3 b64encode takes and returns bytes. The result must be |
| # serializable in order for the eventual json.dumps to succeed |
| message = base64.b64encode(test_log.encode('utf-8')).decode('utf-8') |
| test_result['summaryHtml'] = '<text-artifact artifact-id="Test Log" />' |
| if constants.CRASH_MESSAGE in test_log: |
| test_result['failureReason'] = { |
| 'primaryErrorMessage': constants.CRASH_MESSAGE |
| } |
| test_result['artifacts'].update({ |
| 'Test Log': { |
| 'contents': message |
| }, |
| }) |
| if not test_result['artifacts']: |
| test_result.pop('artifacts') |
| |
| if duration: |
| test_result['duration'] = '%.9fs' % (duration / 1000.0) |
| |
| return test_result |
| |
| |
| def _compose_exception_occurrence(exception): |
| """Composes the exception_occurrence item to be posted to result sink. |
| |
| Args: |
| exception: (Exception) the exception to be posted to result sink. |
| |
| Returns: |
| exception_occurrence: Conforming to protos defined in |
| //build/util/lib/proto/exception_occurrences.proto |
| """ |
| |
| occurrence = exception_occurrences_pb2.ExceptionOccurrence( |
| name=type(exception).__name__, |
| stacktrace=format_exception_stacktrace(exception), |
| ) |
| occurrence.occurred_time.GetCurrentTime() |
| return occurrence |
| |
| |
| class ExceptionResults: |
| results = [] # Static variable to hold exception results |
| |
| @staticmethod |
| def add_result(exception): |
| ExceptionResults.results.append(exception) |
| |
| |
| class ResultSinkClient(object): |
| """Stores constants and handles posting to ResultSink.""" |
| |
| def __init__(self): |
| """Initiates and stores constants to class.""" |
| self.sink = None |
| luci_context_file = os.environ.get('LUCI_CONTEXT') |
| if not luci_context_file: |
| logging.warning('LUCI_CONTEXT not found in environment. ResultDB' |
| ' integration disabled.') |
| return |
| |
| with open(luci_context_file) as f: |
| self.sink = json.load(f).get('result_sink') |
| if not self.sink: |
| logging.warning('ResultSink constants not found in LUCI context.' |
| ' ResultDB integration disabled.') |
| return |
| |
| self.url = ('http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults' % |
| self.sink['address']) |
| self.headers = { |
| 'Content-Type': 'application/json', |
| 'Accept': 'application/json', |
| 'Authorization': 'ResultSink %s' % self.sink['auth_token'], |
| } |
| self._session = requests.Session() |
| |
| # Ensure session is closed at exit. |
| atexit.register(self.close) |
| |
| logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) |
| |
| def close(self): |
| """Closes the connection to result sink server.""" |
| if not self.sink: |
| return |
| LOGGER.info('Closing connection with result sink server.') |
| # Reset to default logging level of test runner scripts. |
| logging.getLogger("urllib3.connectionpool").setLevel(logging.DEBUG) |
| self._session.close() |
| |
| def post(self, test_id, status, expected, **kwargs): |
| """Composes and posts a test and status to result sink. |
| |
| Args: |
| test_id: (str) A unique identifier of the test in LUCI context. |
| status: (str) Status of the test. Must be one in |VALID_STATUSES|. |
| expected: (bool) Whether the status is expected. |
| **kwargs: Optional keyword args. Namely: |
| duration: (int) Test duration in milliseconds or None if unknown. |
| test_log: (str) Log of the test. Optional. |
| tags: (list) List of tags. Each item in list should be a length 2 tuple |
| of string as ("key", "value"). Optional. |
| file_artifacts: (dict) IDs to abs paths mapping of existing files to |
| report as artifact. |
| """ |
| if not self.sink: |
| return |
| self._post_test_result( |
| _compose_test_result(test_id, status, expected, **kwargs)) |
| |
| def post_exceptions(self, exceptions): |
| """Composes and posts exception result to server. |
| |
| Args: |
| exception: [Exception] list of exceptions to be posted to result sink. |
| """ |
| if not self.sink: |
| return |
| exception_occurrences = [] |
| for exception in exceptions: |
| exception_occurrences.append(_compose_exception_occurrence(exception)) |
| self._post_exceptions(exception_occurrences) |
| |
| def _post_test_result(self, test_result): |
| """Posts single test result to server. |
| |
| This method assumes |self.sink| is not None. |
| |
| Args: |
| test_result: (dict) Confirming to protocol defined in |
| https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto |
| """ |
| res = self._session.post( |
| url=self.url, |
| headers=self.headers, |
| data=json.dumps({'testResults': [test_result]}), |
| ) |
| res.raise_for_status() |
| |
| def _post_exceptions(self, exception_occurrences): |
| """Posts exception result to server. |
| |
| This method assumes |self.sink| is not None. |
| |
| Args: |
| exception_occurrences: list of exception_occurrences, |
| conforming to protos defined in |
| //build/util/lib/proto/exception_occurrences.proto |
| """ |
| |
| occurrences = exception_occurrences_pb2.ExceptionOccurrences() |
| occurrences.datapoints.extend(exception_occurrences) |
| any_msg = any_pb2.Any() |
| any_msg.Pack(occurrences) |
| inv_data = json.dumps( |
| { |
| 'invocation': { |
| 'extended_properties': { |
| 'exception_occurrences': |
| json_format.MessageToDict( |
| any_msg, preserving_proto_field_name=True) |
| } |
| }, |
| 'update_mask': { |
| 'paths': ['extended_properties.exception_occurrences'], |
| } |
| }, |
| sort_keys=True) |
| |
| LOGGER.info(inv_data) |
| |
| updateInvo_url = ( |
| 'http://%s/prpc/luci.resultsink.v1.Sink/UpdateInvocation' % |
| self.sink['address']) |
| res = self._session.post( |
| url=updateInvo_url, |
| headers=self.headers, |
| data=inv_data, |
| ) |
| res.raise_for_status() |