blob: 5ffed39d68fd88d498d6c6a5ea58c3fc3a2ce80b [file] [log] [blame]
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
"""Control of and utilities for debugging."""
import contextlib
import inspect
import os
import re
import sys
try:
import _thread
except ImportError:
import thread as _thread
from coverage.misc import isolate_module
os = isolate_module(os)
# When debugging, it can be helpful to force some options, especially when
# debugging the configuration mechanisms you usually use to control debugging!
# This is a list of forced debugging options.
FORCED_DEBUG = []
# A hack for debugging testing in sub-processes.
_TEST_NAME_FILE = "" # "/tmp/covtest.txt"
class DebugControl(object):
"""Control and output for debugging."""
def __init__(self, options, output):
"""Configure the options and output file for debugging."""
self.options = options
self.output = output
self.suppress_callers = False
def __repr__(self):
return "<DebugControl options=%r output=%r>" % (self.options, self.output)
def should(self, option):
"""Decide whether to output debug information in category `option`."""
if option == "callers" and self.suppress_callers:
return False
return (option in self.options or option in FORCED_DEBUG)
@contextlib.contextmanager
def without_callers(self):
"""A context manager to prevent call stacks from being logged."""
old = self.suppress_callers
self.suppress_callers = True
try:
yield
finally:
self.suppress_callers = old
def write(self, msg):
"""Write a line of debug output.
`msg` is the line to write. A newline will be appended.
"""
if self.should('pid'):
msg = "pid %5d: %s" % (os.getpid(), msg)
self.output.write(msg+"\n")
if self.should('callers'):
dump_stack_frames(out=self.output, skip=1)
self.output.flush()
def write_formatted_info(self, header, info):
"""Write a sequence of (label,data) pairs nicely."""
self.write(info_header(header))
for line in info_formatter(info):
self.write(" %s" % line)
def info_header(label):
"""Make a nice header string."""
return "--{0:-<60s}".format(" "+label+" ")
def info_formatter(info):
"""Produce a sequence of formatted lines from info.
`info` is a sequence of pairs (label, data). The produced lines are
nicely formatted, ready to print.
"""
info = list(info)
if not info:
return
label_len = max(len(l) for l, _d in info)
for label, data in info:
if data == []:
data = "-none-"
if isinstance(data, (list, set, tuple)):
prefix = "%*s:" % (label_len, label)
for e in data:
yield "%*s %s" % (label_len+1, prefix, e)
prefix = ""
else:
yield "%*s: %s" % (label_len, label, data)
def short_stack(limit=None, skip=0):
"""Return a string summarizing the call stack.
The string is multi-line, with one line per stack frame. Each line shows
the function name, the file name, and the line number:
...
start_import_stop : /Users/ned/coverage/trunk/tests/coveragetest.py @95
import_local_file : /Users/ned/coverage/trunk/tests/coveragetest.py @81
import_local_file : /Users/ned/coverage/trunk/coverage/backward.py @159
...
`limit` is the number of frames to include, defaulting to all of them.
`skip` is the number of frames to skip, so that debugging functions can
call this and not be included in the result.
"""
stack = inspect.stack()[limit:skip:-1]
return "\n".join("%30s : %s @%d" % (t[3], t[1], t[2]) for t in stack)
def dump_stack_frames(limit=None, out=None, skip=0):
"""Print a summary of the stack to stdout, or some place else."""
out = out or sys.stdout
out.write(short_stack(limit=limit, skip=skip+1))
out.write("\n")
def short_id(id64):
"""Given a 64-bit id, make a shorter 16-bit one."""
id16 = 0
for offset in range(0, 64, 16):
id16 ^= id64 >> offset
return id16 & 0xFFFF
class DebugOutputFile(object): # pragma: debugging
"""A file-like object that includes pid and cwd information."""
def __init__(self, outfile):
self.outfile = outfile
self.cwd = None
SYS_MOD_NAME = '$coverage.debug.DebugOutputFile.the_one'
@classmethod
def the_one(cls):
"""Get the process-wide singleton DebugOutputFile."""
# Because of the way igor.py deletes and re-imports modules,
# this class can be defined more than once. But we really want
# a process-wide singleton. So stash it in sys.modules instead of
# on a class attribute. Yes, this is aggressively gross.
the_one = sys.modules.get(cls.SYS_MOD_NAME)
if the_one is None:
filename = os.environ.get("COVERAGE_LOG", "/tmp/covlog.txt")
sys.modules[cls.SYS_MOD_NAME] = the_one = cls(open(filename, "a"))
cmd = " ".join(getattr(sys, 'argv', ['???']))
the_one._write("New process: executable: %s\n" % (sys.executable,))
the_one._write("New process: cmd: %s\n" % (cmd,))
if hasattr(os, 'getppid'):
the_one._write("New process: parent pid: %s\n" % (os.getppid(),))
return the_one
def write(self, text, stack=False):
"""Just like file.write"""
cwd = os.getcwd()
if cwd != self.cwd:
self._write("cwd is now {0!r}\n".format(cwd))
self.cwd = cwd
self._write(text, stack=stack)
def _write(self, text, stack=False):
"""The raw text-writer, so that we can use it ourselves."""
# Thread ids are useful, but too long. Make a shorter one.
tid = "{0:04x}".format(short_id(_thread.get_ident()))
# Aspectlib prints stack traces, but includes its own frames. Scrub those out:
# <<< aspectlib/__init__.py:257:function_wrapper < igor.py:143:run_tests < ...
text = re.sub(r"(?<= )aspectlib/[^.]+\.py:\d+:\w+ < ", "", text)
self.outfile.write("{0:5d}.{1}: {2}".format(os.getpid(), tid, text))
self.outfile.flush()
if stack:
dump_stack_frames(out=self.outfile, skip=1)
def log(msg, stack=False): # pragma: debugging
"""Write a log message as forcefully as possible."""
DebugOutputFile.the_one().write(msg+"\n", stack=stack)
def enable_aspectlib_maybe(): # pragma: debugging
"""For debugging, we can use aspectlib to trace execution.
Define COVERAGE_ASPECTLIB to enable and configure aspectlib to trace
execution::
$ export COVERAGE_LOG=covaspect.txt
$ export COVERAGE_ASPECTLIB=coverage.Coverage:coverage.data.CoverageData
$ coverage run blah.py ...
This will trace all the public methods on Coverage and CoverageData,
writing the information to covaspect.txt.
"""
aspects = os.environ.get("COVERAGE_ASPECTLIB", "")
if not aspects:
return
import aspectlib # pylint: disable=import-error
import aspectlib.debug # pylint: disable=import-error
aspects_file = DebugOutputFile.the_one()
aspect_log = aspectlib.debug.log(
print_to=aspects_file, attributes=['id'], stacktrace=30, use_logging=False
)
public_methods = re.compile(r'^(__init__|[a-zA-Z].*)$')
for aspect in aspects.split(':'):
aspectlib.weave(aspect, aspect_log, methods=public_methods)