blob: b8e26e81f57b21c6bf824394e11c5d0632cb2076 [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
from dataclasses import dataclass
import logging
import sys
import traceback as tb
from types import TracebackType
from typing import Dict, List, Optional, Tuple, Type
from crossbench import helper
TInfoStack = Tuple[str, ...]
TExceptionTypes = Tuple[Type[BaseException], ...]
@dataclass
class Entry:
traceback: str
exception: BaseException
info_stack: TInfoStack
class MultiException(ValueError):
"""Default exception thrown by ExceptionAnnotator.assert_success.
It holds on to the ExceptionAnnotator and its previously captured exceptions
are automatically added to active ExceptionAnnotator in an
ExceptionAnnotationScope."""
def __init__(self, message: str, exceptions: ExceptionAnnotator):
super().__init__(message)
self.exceptions = exceptions
class ExceptionAnnotationScope:
"""Used in a with-scope to annotate exceptions with a TInfoStack.
Used via the capture/annotate/info helper methods on
ExceptionAnnotator.
"""
def __init__(self,
annotator: ExceptionAnnotator,
exception_types: TExceptionTypes,
entries: Tuple[str, ...],
rethrow: bool = False) -> None:
logging.debug("ExceptionAnnotationScope: %s", entries)
self._annotator = annotator
self._exception_types = exception_types
self._added_info_stack_entries = entries
self.rethrow = rethrow
self._previous_info_stack: TInfoStack = ()
def __enter__(self) -> ExceptionAnnotationScope:
self._annotator._pending_exceptions.clear()
self._previous_info_stack = self._annotator.info_stack
self._annotator._info_stack = self._previous_info_stack + (
self._added_info_stack_entries)
return self
def __exit__(self, exception_type: Optional[Type[BaseException]],
exception_value: Optional[BaseException],
traceback: Optional[TracebackType]) -> bool:
if not exception_value:
self._annotator._info_stack = self._previous_info_stack
# False => exception not handled
return False
logging.debug("Intermediate Exception: %s:%s", exception_type,
exception_value)
if self._exception_types and exception_type and (
issubclass(exception_type, MultiException) or
issubclass(exception_type, self._exception_types)):
# Handle matching exceptions directly here and prevent further
# exception handling by returning True.
self._annotator.append(exception_value)
self._annotator._info_stack = self._previous_info_stack
if self.rethrow:
self._annotator.assert_success(log=False)
return True
if exception_value not in self._annotator._pending_exceptions:
self._annotator._pending_exceptions[
exception_value] = self._annotator.info_stack
# False => exception not handled
return False
class ExceptionAnnotator:
"""Collects exceptions with full backtraces and user-provided info stacks.
Additional stack information is constructed from active
ExceptionAnnotationScopes.
"""
def __init__(self, throw: bool = False):
self._exceptions: List[Entry] = []
self.throw: bool = throw
# The info_stack adds additional meta information to handle exceptions.
# Unlike the source-based backtrace, this can contain dynamic information
# for easier debugging.
self._info_stack: TInfoStack = ()
# Associates raised exception with the info_stack at that time for later
# use in the `handle` method.
# This is cleared whenever we enter a new ExceptionAnnotationScope.
self._pending_exceptions: Dict[BaseException, TInfoStack] = {}
@property
def is_success(self) -> bool:
return len(self._exceptions) == 0
@property
def info_stack(self) -> TInfoStack:
return self._info_stack
@property
def exceptions(self) -> List[Entry]:
return self._exceptions
def assert_success(self,
message: Optional[str] = None,
exception_cls: Type[BaseException] = MultiException,
log: bool = True) -> None:
if self.is_success:
return
if log:
self.log()
if message is None:
message = "Got Exceptions: {}"
message = message.format(self)
if issubclass(exception_cls, MultiException):
raise exception_cls(message, self)
raise exception_cls(message)
def info(self, *stack_entries: str) -> ExceptionAnnotationScope:
"""Only sets info stack entries, exceptions are passed-through."""
return ExceptionAnnotationScope(self, tuple(), stack_entries)
def capture(self,
*stack_entries: str,
exceptions: TExceptionTypes = (Exception,),
rethrow: bool = False) -> ExceptionAnnotationScope:
"""Sets info stack entries and captures exceptions."""
return ExceptionAnnotationScope(self, exceptions, stack_entries, rethrow)
def extend(self, annotator: ExceptionAnnotator,
is_nested: bool = False) -> None:
if is_nested:
self._extend_with_prepended_stack_info(annotator)
else:
self._exceptions.extend(annotator.exceptions)
def _extend_with_prepended_stack_info(self,
annotator: ExceptionAnnotator) -> None:
if annotator == self:
return
for entry in annotator.exceptions:
merged_info_stack = self.info_stack + entry.info_stack
merged_entry = Entry(entry.traceback, entry.exception, merged_info_stack)
self._exceptions.append(merged_entry)
def append(self, exception: BaseException) -> None:
traceback: str = tb.format_exc()
logging.debug("Intermediate Exception %s:%s", type(exception), exception)
logging.debug(traceback)
if isinstance(exception, KeyboardInterrupt):
# Fast exit on KeyboardInterrupts for a better user experience.
sys.exit(0)
if isinstance(exception, MultiException):
# Directly add exceptions from nested annotators.
self.extend(exception.exceptions, is_nested=True)
else:
stack = self.info_stack
if exception in self._pending_exceptions:
stack = self._pending_exceptions[exception]
self._exceptions.append(Entry(traceback, exception, stack))
if self.throw:
raise # pylint: disable=misplaced-bare-raise
def log(self) -> None:
if self.is_success:
return
logging.error("=" * 80)
logging.error("ERRORS occurred:")
logging.error("=" * 80)
for entry in self._exceptions:
logging.debug(entry.exception)
logging.debug(entry.traceback)
logging.debug("-" * 80)
for info_stack, entries in helper.group_by(
self._exceptions, key=lambda entry: tuple(entry.info_stack)).items():
if info_stack:
info = "Info: "
joiner = "\n" + (" " * (len(info) - 2)) + "> "
message = f"{info}{joiner.join(info_stack)}"
logging.error(message)
for entry in entries:
logging.error("- " * 40)
logging.error("Type: %s:", entry.exception.__class__.__name__)
logging.error(" %s", entry.exception)
logging.error("-" * 80)
def to_json(self) -> list:
return [{
"title": str(entry.exception),
"trace": str(entry.traceback).splitlines(),
"info_stack": entry.info_stack
} for entry in self._exceptions]
def __str__(self) -> str:
return "\n".join(str(entry.exception) for entry in self._exceptions)
# Expose simpler name
Annotator = ExceptionAnnotator
def annotate(*stack_entries: str,
exceptions: TExceptionTypes = (Exception,),
throw: bool = False) -> ExceptionAnnotationScope:
return ExceptionAnnotator(throw=throw).capture(
*stack_entries, exceptions=exceptions, rethrow=True)