blob: b93f5618ed3978c10b2fa59e8d916242d8921ee5 [file] [log] [blame]
# Copyright 2025 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module for reporting test results."""
import dataclasses
import logging
import pathlib
import queue
import sys
import threading
import constants
sys.path.insert(0, str(constants.CHROMIUM_SRC / 'build' / 'util'))
from lib.results import result_sink
from lib.results import result_types
_RESULT_THREAD_POLLING_SLEEP_DURATION = 0.5
@dataclasses.dataclass
class TestResult:
"""Represents the result of a single test run."""
# The path to the test file that was run.
test_file: pathlib.Path
# Whether the test ran successfully.
success: bool
# The duration of the test run in seconds.
duration: float
# Stdout/stderr of the test.
test_log: str
def __lt__(self, other: 'TestResult') -> bool:
return self.test_file < other.test_file
class AtomicCounter:
"""Thread-safe integer counter."""
def __init__(self):
self._counter = 0
self._lock = threading.Lock()
def get(self) -> int:
with self._lock:
return self._counter
def increment(self) -> None:
with self._lock:
self._counter += 1
def report_result(result_sink_client: result_sink.ResultSinkClient,
test_result: TestResult) -> None:
"""Reports a test result to ResultDB if possible.
Args:
result_sink_client: A ResultSinkClient to use for reporting.
test_result: A TestResult instance containing the result to report.
"""
relative_path = test_result.test_file.relative_to(constants.CHROMIUM_SRC)
posix_path = relative_path.as_posix()
result_sink_client.Post(
test_id=str(posix_path),
status=result_types.PASS if test_result.success else result_types.FAIL,
duration=test_result.duration * 1000,
test_log=test_result.test_log,
test_file=f'//{str(posix_path)}')
class ResultThread(threading.Thread):
"""Class for reporting test results from a queue."""
def __init__(self, print_output_on_success: bool, **kwargs):
"""
Args:
print_output_on_success: If true, test logs will always be printed
to stdout instead of only for failed tests.
"""
super().__init__(daemon=True, **kwargs)
self.result_input_queue = queue.Queue()
self.failed_result_output_queue = queue.Queue()
self.total_results_reported = AtomicCounter()
self._print_output_on_success = print_output_on_success
self._shutdown_event = threading.Event()
self._result_sink_client = result_sink.TryInitClient()
self._fatal_exception = None
def run(self) -> None:
try:
self._process_incoming_results_until_shutdown()
except Exception as e:
self._fatal_exception = e
def _process_incoming_results_until_shutdown(self) -> None:
while not self._shutdown_event.is_set():
try:
test_result = self.result_input_queue.get(
timeout=_RESULT_THREAD_POLLING_SLEEP_DURATION)
except queue.Empty:
continue
if not test_result.success or self._print_output_on_success:
sys.stdout.write(test_result.test_log)
if self._result_sink_client:
report_result(self._result_sink_client, test_result)
if test_result.success:
logging.info('Test passed in %.2f seconds: %s',
test_result.duration, str(test_result.test_file))
else:
logging.warning('Test failed in %.2f seconds: %s',
test_result.duration,
str(test_result.test_file))
self.failed_result_output_queue.put(test_result)
self.total_results_reported.increment()
def shutdown(self) -> None:
"""Tells the thread to shut down gracefully."""
self._shutdown_event.set()
def maybe_reraise_fatal_exception(self) -> None:
"""Reraises the fatal exception that caused the thread to die.
No-op if no exception is stored.
"""
if self._fatal_exception:
raise self._fatal_exception