blob: 35e263c2ae70e248ffcc386dc2b3498b8c686812 [file] [log] [blame]
# Copyright 2020 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""record warnings during recipe executions."""
import os
import types
from collections import defaultdict
from functools import cached_property
import attr
from .cause import CallSite, Frame, ImportSite
from .escape import escape_warning_predicate, IGNORE
from ..attr_util import attr_type, attr_seq_type
from ..recipe_deps import Recipe, RecipeDeps, RecipeModule
from ...engine_types import FrozenDict
from ...util import sentinel
# The sentinel that instructs recipe engine not to record warnings.
NULL_WARNING_RECORDER = sentinel('NULL_WARNING_RECORDER',
recorded_warnings=FrozenDict(),
record_execution_warning=(lambda _self, _name, _frames: None),
record_import_warning=(lambda _self, _name, _importer: None),
)
@attr.s(frozen=True, slots=True)
class _AnnotatedFrame:
"""A wrapper class over built-in frame which associates additional attributes
with the wrapped frame.
"""
# The wrapped frame
frame = attr.ib(validator=attr_type(types.FrameType))
# If set, the human-readable reason why the wrapped frame is skipped for the
# purposes of warning attribution. Examples:
# * 'user escape at /path/to/file:123
# * 'python built-in'
skip_reason = attr.ib(validator=attr.validators.optional(attr_type(str)))
@attr.s
class WarningRecorder:
"""A WarningRecorder records and analyzes warnings, preserves all unique
causes for a given warning.
There're two types of warnings; Execution warnings and Import warnings:
* Execution Warning: issued within the execution of recipe code.
* Import Warning: issued during dependency resolution (DEPS), when a recipe
or recipe module depends on a module with warning declared.
"""
# The RecipeDeps object for current recipe execution.
recipe_deps = attr.ib(validator=attr_type(RecipeDeps))
# Filter function that all execution warnings will be filtered through
# before storing. If the function returns False, the warning will be
# discarded. The function takes following two arguments and returns a bool.
# * name (str) - Fully qualified warning name e.g. 'repo/WARNING_NAME'
# * cause (warning_pb.Cause) - Cause of the warning
call_site_filter = attr.ib(default=lambda name, cause: True)
# Same functionality and function signature as call_site_filter but applies
# to import warnings.
import_site_filter = attr.ib(default=lambda name, cause: True)
# Boolean tells whether to preserve entire call stack for execution warning
# or not.
include_call_stack = attr.ib(validator=attr_type(bool), default=False)
# Internal holder for recorded warnings.
# key: fully qualified warning name (str)
# value: Set[CallSite|ImportSite] (defined in cause.py, not the proto message)
_recorded_warnings = attr.ib(init=False, factory=lambda: defaultdict(set))
@property
def recorded_warnings(self):
"""Returns all recorded warnings in the form of
{
"repo_name_1/WARNING_NAME_1": tuple[warning_pb.Cause]
"repo_name_2/WARNING_NAME_2": tuple[warning_pb.Cause]
}
cause inside the tuple is guaranteed to be unique for each warning.
"""
return {
name: tuple(site.cause_pb for site in sites)
for (name, sites) in self._recorded_warnings.items()
}
def record_execution_warning(self, name, frames):
"""Record the warning issued during recipe execution and its cause (
warning_pb.CallSite). A frame will be attributed as call site frame if it
is the first frame in the supplied frames matching the following
conditions:
* The source code of the frame is 'recipe code' (i.e. in the current
recipe repo or one of its dependencies).
* The function that the frame executes is not escaped from the issued
warning.
Args:
* name (str): Fully qualified warning name (e.g. repo_name/WARNING_NAME).
* frames (List[Frame]): List of frames captured at the time the given
warning is issued.
"""
self._validate_warning_name(name)
# TODO(yiwzhang): update proto to include skip reason and populate
call_site_frame, _ = self._attribute_call_site(name, frames)
if call_site_frame is IGNORE:
return
call_site = CallSite(
site=Frame.from_built_in_frame(call_site_frame) if (
call_site_frame) else Frame(),
)
# return if call_site_frame isn't in the main repo; We don't want to report
# warnings from other repos. It's possible to have a warning where ALL
# frames are skipped, so only do this check if we actually had an attributed
# call_site.
if call_site.site.file:
if not call_site.site.file.startswith(self._main_repo_paths):
return
if self.include_call_stack or not call_site_frame:
# Capture call stack if explicitly requested or attributing call site
# fails
call_site = attr.evolve(
call_site,
call_stack=[Frame.from_built_in_frame(f) for f in frames]
)
if (call_site not in self._recorded_warnings[name]) and (
self.call_site_filter(name, call_site.cause_pb)):
self._recorded_warnings[name].add(call_site)
def record_import_warning(self, name, importer):
"""Record the warning issued during DEPS resolution and its cause (
warning_pb.ImportSite).
Args:
* name (str): Fully qualified warning name (e.g. repo_name/WARNING_NAME).
* importer (Recipe|RecipeModule): The recipe or recipe module which
depends on a recipe module with given warning name declared.
Raise ValueError if the importer is not instance of Recipe or RecipeModule
"""
self._validate_warning_name(name)
if not isinstance(importer, (Recipe, RecipeModule)):
raise ValueError(
"Expect importer to be either type %s or %s. Got %s" % (
RecipeModule.__name__, Recipe.__name__, type(importer)))
# return if the import isn't from the main repo; We don't want to report
# warnings from other repos.
if importer.repo.name != self.recipe_deps.main_repo.name:
return
import_site = ImportSite(
repo=importer.repo.name,
module=importer.name if isinstance(importer, RecipeModule) else None,
recipe=importer.name if isinstance(importer, Recipe) else None,
)
if (import_site not in self._recorded_warnings[name]) and (
self.import_site_filter(name, import_site.cause_pb)):
self._recorded_warnings[name].add(import_site)
def _validate_warning_name(self, name):
"""Checks whether the given warning name is fully-qualified and defined in
the recipe repo.
"""
if '/' not in name:
raise ValueError('expected fully-qualified warning name, got %s' % name)
if name not in self.recipe_deps.warning_definitions:
repo, warning = name.split('/', 1)
raise ValueError(
'warning "%s" is not defined in recipe repo %s' % (warning, repo))
@cached_property
def _skip_frame_predicates(self):
"""A tuple of predicate functions to decide whether or not to skip a given
frame for warning attribution. The predicates are connected with logic OR,
meaning that if one of the predicates says to skip, the frame will be
skipped. A predicate function will have signature as follows.
Args:
* name (str) - Fully qualified warning name e.g. 'repo/WARNING_NAME'.
* frame (types.FrameType) - A frame in call stack that the predicate
function is currently evaluating against.
Returns a human-readable reason (str) why the given frame should be skipped.
Returns None if the warning can be attributed to the given frame.
Returns escape.IGNORE if the warning should be ignored.
"""
return (
self._non_recipe_code_predicate,
escape_warning_predicate
)
def _attribute_call_site(self, name, frames):
"""Walk up the given stack frames and attribute the first non-skipped frame
as call site. self._skip_frame_predicates is used to decide whether to skip
a frame or not.
Returns a tuple of (frame, List[AnnotatedFrames]) where frame is the
attributed call site and the annotated frames in the list are all skipped
frames with their skipped reasons. Call site frame will be returned as None
if all of the frames are skipped.
Returns (escape.IGNORE, escape.IGNORE) if the warning should be ignored.
"""
skipped_frames = []
for frame in frames:
lazy_skip_reasons = (p(name, frame) for p in self._skip_frame_predicates)
reason = next((r for r in lazy_skip_reasons if r is not None), None)
if reason is IGNORE:
return IGNORE, IGNORE
if reason is None:
return frame, skipped_frames # culprit found
skipped_frames.append(_AnnotatedFrame(frame=frame, skip_reason=reason))
return None, skipped_frames
@cached_property
def _main_repo_paths(self):
"""A tuple of root paths of all recipe code in the current recipe repo.
"""
return (
self.recipe_deps.main_repo.recipes_dir,
self.recipe_deps.main_repo.modules_dir,
)
@cached_property
def _all_repo_paths(self):
"""A tuple of root paths of all recipe code in the current executing
recipe deps.
"""
ret = []
for repo in self.recipe_deps.repos.values():
ret.append(repo.recipes_dir)
ret.append(repo.modules_dir)
return tuple(ret)
def _non_recipe_code_predicate(self, _name, frame):
"""A predicate that skips a frame when it is executing a code object whose
source is not in any of the recipe repos in the currently executing
recipe_deps.
"""
code_file_path = os.path.abspath(frame.f_code.co_filename)
for repo_path in self._all_repo_paths:
if code_file_path.startswith(repo_path):
return None
return 'non recipe code'