blob: 7f11733a3cad2433da4fe0ebc92f822bfe626aef [file] [log] [blame]
# SPDX-License-Identifier: MIT
from __future__ import absolute_import, division, print_function
import platform
import sys
import threading
import types
import warnings
PY2 = sys.version_info[0] == 2
PYPY = platform.python_implementation() == "PyPy"
PY36 = sys.version_info[:2] >= (3, 6)
HAS_F_STRINGS = PY36
PY310 = sys.version_info[:2] >= (3, 10)
if PYPY or PY36:
ordered_dict = dict
else:
from collections import OrderedDict
ordered_dict = OrderedDict
if PY2:
from collections import Mapping, Sequence
from UserDict import IterableUserDict
# We 'bundle' isclass instead of using inspect as importing inspect is
# fairly expensive (order of 10-15 ms for a modern machine in 2016)
def isclass(klass):
return isinstance(klass, (type, types.ClassType))
def new_class(name, bases, kwds, exec_body):
"""
A minimal stub of types.new_class that we need for make_class.
"""
ns = {}
exec_body(ns)
return type(name, bases, ns)
# TYPE is used in exceptions, repr(int) is different on Python 2 and 3.
TYPE = "type"
def iteritems(d):
return d.iteritems()
# Python 2 is bereft of a read-only dict proxy, so we make one!
class ReadOnlyDict(IterableUserDict):
"""
Best-effort read-only dict wrapper.
"""
def __setitem__(self, key, val):
# We gently pretend we're a Python 3 mappingproxy.
raise TypeError(
"'mappingproxy' object does not support item assignment"
)
def update(self, _):
# We gently pretend we're a Python 3 mappingproxy.
raise AttributeError(
"'mappingproxy' object has no attribute 'update'"
)
def __delitem__(self, _):
# We gently pretend we're a Python 3 mappingproxy.
raise TypeError(
"'mappingproxy' object does not support item deletion"
)
def clear(self):
# We gently pretend we're a Python 3 mappingproxy.
raise AttributeError(
"'mappingproxy' object has no attribute 'clear'"
)
def pop(self, key, default=None):
# We gently pretend we're a Python 3 mappingproxy.
raise AttributeError(
"'mappingproxy' object has no attribute 'pop'"
)
def popitem(self):
# We gently pretend we're a Python 3 mappingproxy.
raise AttributeError(
"'mappingproxy' object has no attribute 'popitem'"
)
def setdefault(self, key, default=None):
# We gently pretend we're a Python 3 mappingproxy.
raise AttributeError(
"'mappingproxy' object has no attribute 'setdefault'"
)
def __repr__(self):
# Override to be identical to the Python 3 version.
return "mappingproxy(" + repr(self.data) + ")"
def metadata_proxy(d):
res = ReadOnlyDict()
res.data.update(d) # We blocked update, so we have to do it like this.
return res
def just_warn(*args, **kw): # pragma: no cover
"""
We only warn on Python 3 because we are not aware of any concrete
consequences of not setting the cell on Python 2.
"""
class _AnnotationExtractor:
"""
Always return None, allows to keep ``if PY2``s from code.
"""
__slots__ = ["sig"]
sig = None
def __init__(self, callable):
pass
def get_first_param_type(self):
return None
def get_return_type(self):
return None
else: # Python 3 and later.
import inspect
from collections.abc import Mapping, Sequence # noqa
def just_warn(*args, **kw):
"""
We only warn on Python 3 because we are not aware of any concrete
consequences of not setting the cell on Python 2.
"""
warnings.warn(
"Running interpreter doesn't sufficiently support code object "
"introspection. Some features like bare super() or accessing "
"__class__ will not work with slotted classes.",
RuntimeWarning,
stacklevel=2,
)
def isclass(klass):
return isinstance(klass, type)
TYPE = "class"
def iteritems(d):
return d.items()
new_class = types.new_class
def metadata_proxy(d):
return types.MappingProxyType(dict(d))
class _AnnotationExtractor:
"""
Extract type annotations from a callable, returning None whenever there
is none.
"""
__slots__ = ["sig"]
def __init__(self, callable):
try:
self.sig = inspect.signature(callable)
except (ValueError, TypeError): # inspect failed
self.sig = None
def get_first_param_type(self):
"""
Return the type annotation of the first argument if it's not empty.
"""
if not self.sig:
return None
params = list(self.sig.parameters.values())
if params and params[0].annotation is not inspect.Parameter.empty:
return params[0].annotation
return None
def get_return_type(self):
"""
Return the return type if it's not empty.
"""
if (
self.sig
and self.sig.return_annotation is not inspect.Signature.empty
):
return self.sig.return_annotation
return None
def make_set_closure_cell():
"""Return a function of two arguments (cell, value) which sets
the value stored in the closure cell `cell` to `value`.
"""
# pypy makes this easy. (It also supports the logic below, but
# why not do the easy/fast thing?)
if PYPY:
def set_closure_cell(cell, value):
cell.__setstate__((value,))
return set_closure_cell
# Otherwise gotta do it the hard way.
# Create a function that will set its first cellvar to `value`.
def set_first_cellvar_to(value):
x = value
return
# This function will be eliminated as dead code, but
# not before its reference to `x` forces `x` to be
# represented as a closure cell rather than a local.
def force_x_to_be_a_cell(): # pragma: no cover
return x
try:
# Extract the code object and make sure our assumptions about
# the closure behavior are correct.
if PY2:
co = set_first_cellvar_to.func_code
else:
co = set_first_cellvar_to.__code__
if co.co_cellvars != ("x",) or co.co_freevars != ():
raise AssertionError # pragma: no cover
# Convert this code object to a code object that sets the
# function's first _freevar_ (not cellvar) to the argument.
if sys.version_info >= (3, 8):
# CPython 3.8+ has an incompatible CodeType signature
# (added a posonlyargcount argument) but also added
# CodeType.replace() to do this without counting parameters.
set_first_freevar_code = co.replace(
co_cellvars=co.co_freevars, co_freevars=co.co_cellvars
)
else:
args = [co.co_argcount]
if not PY2:
args.append(co.co_kwonlyargcount)
args.extend(
[
co.co_nlocals,
co.co_stacksize,
co.co_flags,
co.co_code,
co.co_consts,
co.co_names,
co.co_varnames,
co.co_filename,
co.co_name,
co.co_firstlineno,
co.co_lnotab,
# These two arguments are reversed:
co.co_cellvars,
co.co_freevars,
]
)
set_first_freevar_code = types.CodeType(*args)
def set_closure_cell(cell, value):
# Create a function using the set_first_freevar_code,
# whose first closure cell is `cell`. Calling it will
# change the value of that cell.
setter = types.FunctionType(
set_first_freevar_code, {}, "setter", (), (cell,)
)
# And call it to set the cell.
setter(value)
# Make sure it works on this interpreter:
def make_func_with_cell():
x = None
def func():
return x # pragma: no cover
return func
if PY2:
cell = make_func_with_cell().func_closure[0]
else:
cell = make_func_with_cell().__closure__[0]
set_closure_cell(cell, 100)
if cell.cell_contents != 100:
raise AssertionError # pragma: no cover
except Exception:
return just_warn
else:
return set_closure_cell
set_closure_cell = make_set_closure_cell()
# Thread-local global to track attrs instances which are already being repr'd.
# This is needed because there is no other (thread-safe) way to pass info
# about the instances that are already being repr'd through the call stack
# in order to ensure we don't perform infinite recursion.
#
# For instance, if an instance contains a dict which contains that instance,
# we need to know that we're already repr'ing the outside instance from within
# the dict's repr() call.
#
# This lives here rather than in _make.py so that the functions in _make.py
# don't have a direct reference to the thread-local in their globals dict.
# If they have such a reference, it breaks cloudpickle.
repr_context = threading.local()