| # Copyright 2022 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import annotations |
| |
| import argparse |
| import contextlib |
| import logging |
| import sys |
| import traceback as tb |
| from dataclasses import dataclass |
| from types import TracebackType |
| from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type |
| |
| from crossbench import helper |
| from crossbench.types import JsonList |
| |
| if TYPE_CHECKING: |
| from crossbench.types import JsonDict |
| |
| TInfoStack = Tuple[str, ...] |
| |
| TExceptionTypes = Tuple[Type[BaseException], ...] |
| |
| |
| @dataclass |
| class Entry: |
| traceback: List[str] |
| exception: BaseException |
| info_stack: TInfoStack |
| |
| |
| class MultiException(ValueError): |
| """Default exception thrown by ExceptionAnnotator.assert_success. |
| It holds on to the ExceptionAnnotator and its previously captured exceptions |
| are automatically added to active ExceptionAnnotator in an |
| ExceptionAnnotationScope.""" |
| |
| def __init__(self, message: str, exceptions: ExceptionAnnotator): |
| super().__init__(message) |
| self.exceptions = exceptions |
| |
| def __len__(self) -> int: |
| return len(self.exceptions) |
| |
| def matching(self, *args: Type[BaseException]) -> List[BaseException]: |
| return self.exceptions.matching(*args) |
| |
| @property |
| def annotator(self) -> ExceptionAnnotator: |
| return self.exceptions |
| |
| |
| class ExceptionAnnotationScope: |
| """Used in a with-scope to annotate exceptions with a TInfoStack. |
| |
| Used via the capture/annotate/info helper methods on |
| ExceptionAnnotator. |
| """ |
| |
| def __init__( |
| self, |
| annotator: ExceptionAnnotator, |
| exception_types: TExceptionTypes, |
| ignore_exception_types: TExceptionTypes, |
| entries: Tuple[str, ...], |
| throw_cls: Optional[Type[BaseException]] = None, |
| ) -> None: |
| logging.debug("ExceptionAnnotationScope: %s", entries) |
| self._annotator = annotator |
| self._exception_types = exception_types |
| self._ignore_exception_types = ignore_exception_types + ( |
| StopIteration, GeneratorExit, StopAsyncIteration) |
| self._ignore_exception_types = ignore_exception_types |
| self._added_info_stack_entries = entries |
| self._throw_cls: Optional[Type[BaseException]] = throw_cls |
| self._previous_info_stack: TInfoStack = () |
| |
| def __enter__(self) -> ExceptionAnnotationScope: |
| self._annotator._pending_exceptions.clear() |
| self._previous_info_stack = self._annotator.info_stack |
| self._annotator._info_stack = self._previous_info_stack + ( |
| self._added_info_stack_entries) |
| return self |
| |
| def __exit__(self, exception_type: Optional[Type[BaseException]], |
| exception_value: Optional[BaseException], |
| traceback: Optional[TracebackType]) -> bool: |
| if not exception_value or not exception_type: |
| self._annotator._info_stack = self._previous_info_stack |
| # False => exception not handled |
| return False |
| if issubclass(exception_type, self._ignore_exception_types) and ( |
| not issubclass(exception_type, MultiException)): |
| self._annotator._info_stack = self._previous_info_stack |
| # False => exception not handled, directly forward |
| return False |
| logging.debug("Intermediate Exception: %s:%s", exception_type, |
| exception_value) |
| if self._exception_types and exception_type and ( |
| issubclass(exception_type, MultiException) or |
| issubclass(exception_type, self._exception_types)): |
| # Handle matching exceptions directly here and prevent further |
| # exception handling by returning True. |
| self._annotator.append(exception_value) |
| self._annotator._info_stack = self._previous_info_stack |
| if self._throw_cls: |
| self._annotator.assert_success( |
| exception_cls=self._throw_cls, |
| log=False, |
| ) |
| return True |
| if exception_value not in self._annotator._pending_exceptions: |
| self._annotator._pending_exceptions[ |
| exception_value] = self._annotator.info_stack |
| # False => exception not handled |
| return False |
| |
| class ExceptionAnnotator: |
| """Collects exceptions with full backtraces and user-provided info stacks. |
| |
| Additional stack information is constructed from active |
| ExceptionAnnotationScopes. |
| """ |
| |
| def __init__(self, |
| throw: bool = False, |
| throw_cls: Optional[Type[BaseException]] = None) -> None: |
| self._exceptions: List[Entry] = [] |
| self.throw: bool = throw |
| self._throw_cls: Optional[Type[BaseException]] = throw_cls |
| # The info_stack adds additional meta information to handle exceptions. |
| # Unlike the source-based backtrace, this can contain dynamic information |
| # for easier debugging. |
| self._info_stack: TInfoStack = () |
| # Associates raised exception with the info_stack at that time for later |
| # use in the `handle` method. |
| # This is cleared whenever we enter a new ExceptionAnnotationScope. |
| self._pending_exceptions: Dict[BaseException, TInfoStack] = {} |
| |
| @property |
| def is_success(self) -> bool: |
| return len(self._exceptions) == 0 |
| |
| @property |
| def info_stack(self) -> TInfoStack: |
| return self._info_stack |
| |
| @property |
| def exceptions(self) -> List[Entry]: |
| return self._exceptions |
| |
| def __getitem__(self, key: Any) -> Entry: |
| if not isinstance(key, int): |
| raise TypeError(f"Expected int key, but got: {key}") |
| return self._exceptions[key] |
| |
| def __len__(self) -> int: |
| return len(self._exceptions) |
| |
| def matching(self, *args: Type[BaseException]) -> List[BaseException]: |
| result = [] |
| for entry in self._exceptions: |
| excption = entry.exception |
| if isinstance(excption, *args): |
| result.append(excption) |
| return result |
| |
| def assert_success(self, |
| message: Optional[str] = None, |
| exception_cls: Type[BaseException] = MultiException, |
| log: bool = True) -> None: |
| if self.is_success: |
| return |
| if log: |
| self.log() |
| if message is None: |
| message = "{}" |
| message = message.format(self) |
| if issubclass(exception_cls, MultiException): |
| exception = exception_cls(message, self) |
| raise exception |
| raise exception_cls(message) |
| |
| def info(self, *stack_entries: str) -> ExceptionAnnotationScope: |
| """Only sets info stack entries, exceptions are passed-through.""" |
| return ExceptionAnnotationScope(self, tuple(), tuple(), stack_entries) |
| |
| def capture( |
| self, |
| *stack_entries: str, |
| exceptions: TExceptionTypes = (Exception,), |
| ignore: TExceptionTypes = tuple(), |
| ) -> ExceptionAnnotationScope: |
| """Sets info stack entries and captures exceptions. |
| - Does not rethrow captured exceptions |
| - Does not directly throw a MultiExceptions, unless assert_success() |
| is called. """ |
| return ExceptionAnnotationScope(self, exceptions, ignore, stack_entries, |
| self._throw_cls) |
| |
| @contextlib.contextmanager |
| def annotate(self, |
| *stack_entries, |
| exceptions: TExceptionTypes = (Exception,), |
| ignore: TExceptionTypes = tuple()): |
| """Sets info stack entries and rethrows an annotated |
| MultiException by default .""" |
| with self.capture(*stack_entries, exceptions=exceptions, ignore=ignore): |
| yield self |
| self.assert_success() |
| |
| def extend(self, annotator: ExceptionAnnotator, |
| is_nested: bool = False) -> None: |
| if is_nested: |
| self._extend_with_prepended_stack_info(annotator) |
| else: |
| self._exceptions.extend(annotator.exceptions) |
| |
| def _extend_with_prepended_stack_info(self, |
| annotator: ExceptionAnnotator) -> None: |
| if annotator == self: |
| return |
| for entry in annotator.exceptions: |
| merged_info_stack = self.info_stack + entry.info_stack |
| merged_entry = Entry(entry.traceback, entry.exception, merged_info_stack) |
| self._exceptions.append(merged_entry) |
| |
| def append(self, exception: BaseException) -> None: |
| traceback_str = tb.format_exc() |
| logging.debug("Intermediate Exception %s:%s", type(exception), exception) |
| logging.debug(traceback_str) |
| traceback: List[str] = traceback_str.splitlines() |
| if isinstance(exception, KeyboardInterrupt): |
| # Fast exit on KeyboardInterrupts for a better user experience. |
| sys.exit(0) |
| if isinstance(exception, MultiException): |
| # Directly add exceptions from nested annotators. |
| self.extend(exception.exceptions, is_nested=True) |
| else: |
| stack = self.info_stack |
| if exception in self._pending_exceptions: |
| stack = self._pending_exceptions[exception] |
| self._exceptions.append(Entry(traceback, exception, stack)) |
| if self.throw: |
| raise # pylint: disable=misplaced-bare-raise |
| |
| def log(self) -> None: |
| if self.is_success: |
| return |
| logging.error("=" * 80) |
| logging.error("ERRORS occurred (1/%d):", len(self._exceptions)) |
| logging.error("=" * 80) |
| for entry in self._exceptions: |
| logging.debug(entry.exception) |
| logging.debug("\n".join(entry.traceback)) |
| logging.debug("-" * 80) |
| is_first_entry = True |
| grouped_entries: Dict[TInfoStack, List[Entry]] = helper.group_by( |
| self._exceptions, key=lambda entry: entry.info_stack, sort_key=None) |
| for info_stack, entries in grouped_entries.items(): |
| logging_level = logging.ERROR if is_first_entry else logging.DEBUG |
| is_first_entry = False |
| if info_stack: |
| info = "Info: " |
| joiner = "\n" + (" " * (len(info) - 2)) + "> " |
| message = f"{info}{joiner.join(info_stack)}" |
| logging.log(logging_level, message) |
| for entry in entries: |
| logging.log(logging_level, "- " * 40) |
| logging.log(logging_level, "Type: %s:", |
| helper.type_name(type(entry.exception))) |
| logging.log(logging_level, " %s", self.format_exception(entry)) |
| logging_level = logging.DEBUG |
| logging.log(logging_level, "-" * 80) |
| |
| def error_messages(self) -> List[str]: |
| return [self.format_exception(entry) for entry in self._exceptions] |
| |
| def to_json(self) -> JsonList: |
| return [{ |
| "info_stack": entry.info_stack, |
| "type": helper.type_name(type(entry.exception)), |
| "title": self.format_exception(entry), |
| "trace": entry.traceback |
| } for entry in self._exceptions] |
| |
| def format_exception(self, entry: Entry) -> str: |
| msg = str(entry.exception).strip() |
| # Try to print the source line for empty AssertionError |
| if not msg and isinstance(entry.exception, AssertionError): |
| return entry.traceback[-2].strip() |
| return msg |
| |
| def __str__(self) -> str: |
| if len(self._exceptions) == 1: |
| entry = self._exceptions[0] |
| stack = "\n\t".join(entry.info_stack) |
| return f"{stack}: {entry.exception}" |
| |
| return "\n".join( |
| f"{entry.info_stack}: {entry.exception}" for entry in self._exceptions) |
| |
| |
| # Expose simpler name |
| Annotator = ExceptionAnnotator |
| |
| def annotate( |
| *stack_entries: str, |
| exceptions: TExceptionTypes = (Exception,), |
| ignore: TExceptionTypes = tuple(), |
| throw_cls: Optional[Type[BaseException]] = MultiException |
| ) -> ExceptionAnnotationScope: |
| """Use to annotate an exception. |
| By default this will throw a MultiException which can keep track of |
| more annotations.""" |
| return ExceptionAnnotator(throw_cls=throw_cls).capture( |
| *stack_entries, exceptions=exceptions, ignore=ignore) |
| |
| |
| class ArgumentTypeMultiException(MultiException, argparse.ArgumentTypeError): |
| pass |
| |
| |
| def annotate_argparsing(*stack_entries: str, |
| exceptions: TExceptionTypes = (Exception,)): |
| """Use this to annotate argument parsing-related code blocks to get more |
| readable annotated exception back. |
| - Wraps multiple exception in an ArgumentTypeMultiException |
| - Single ArgumentTypeError are raised directly |
| """ |
| return annotate( |
| *stack_entries, |
| exceptions=exceptions, |
| throw_cls=ArgumentTypeMultiException) |
| |
| |
| class UnreachableError(RuntimeError): |
| """Used for making checker tools happy in places where it's not directly |
| obvious that we always return, for instance due to using one of the above |
| exception annotations that could in theory mute exceptions and create an |
| additional return path. |
| """ |
| |
| def __init__(self) -> None: |
| super().__init__("Unreachable Code") |