|  | import functools | 
|  | import inspect | 
|  | import reprlib | 
|  | import sys | 
|  | import traceback | 
|  |  | 
|  | from . import constants | 
|  |  | 
|  |  | 
|  | def _get_function_source(func): | 
|  | func = inspect.unwrap(func) | 
|  | if inspect.isfunction(func): | 
|  | code = func.__code__ | 
|  | return (code.co_filename, code.co_firstlineno) | 
|  | if isinstance(func, functools.partial): | 
|  | return _get_function_source(func.func) | 
|  | if isinstance(func, functools.partialmethod): | 
|  | return _get_function_source(func.func) | 
|  | return None | 
|  |  | 
|  |  | 
|  | def _format_callback_source(func, args): | 
|  | func_repr = _format_callback(func, args, None) | 
|  | source = _get_function_source(func) | 
|  | if source: | 
|  | func_repr += f' at {source[0]}:{source[1]}' | 
|  | return func_repr | 
|  |  | 
|  |  | 
|  | def _format_args_and_kwargs(args, kwargs): | 
|  | """Format function arguments and keyword arguments. | 
|  |  | 
|  | Special case for a single parameter: ('hello',) is formatted as ('hello'). | 
|  | """ | 
|  | # use reprlib to limit the length of the output | 
|  | items = [] | 
|  | if args: | 
|  | items.extend(reprlib.repr(arg) for arg in args) | 
|  | if kwargs: | 
|  | items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items()) | 
|  | return '({})'.format(', '.join(items)) | 
|  |  | 
|  |  | 
|  | def _format_callback(func, args, kwargs, suffix=''): | 
|  | if isinstance(func, functools.partial): | 
|  | suffix = _format_args_and_kwargs(args, kwargs) + suffix | 
|  | return _format_callback(func.func, func.args, func.keywords, suffix) | 
|  |  | 
|  | if hasattr(func, '__qualname__') and func.__qualname__: | 
|  | func_repr = func.__qualname__ | 
|  | elif hasattr(func, '__name__') and func.__name__: | 
|  | func_repr = func.__name__ | 
|  | else: | 
|  | func_repr = repr(func) | 
|  |  | 
|  | func_repr += _format_args_and_kwargs(args, kwargs) | 
|  | if suffix: | 
|  | func_repr += suffix | 
|  | return func_repr | 
|  |  | 
|  |  | 
|  | def extract_stack(f=None, limit=None): | 
|  | """Replacement for traceback.extract_stack() that only does the | 
|  | necessary work for asyncio debug mode. | 
|  | """ | 
|  | if f is None: | 
|  | f = sys._getframe().f_back | 
|  | if limit is None: | 
|  | # Limit the amount of work to a reasonable amount, as extract_stack() | 
|  | # can be called for each coroutine and future in debug mode. | 
|  | limit = constants.DEBUG_STACK_DEPTH | 
|  | stack = traceback.StackSummary.extract(traceback.walk_stack(f), | 
|  | limit=limit, | 
|  | lookup_lines=False) | 
|  | stack.reverse() | 
|  | return stack |