|  | from _weakrefset import WeakSet | 
|  |  | 
|  |  | 
|  | def get_cache_token(): | 
|  | """Returns the current ABC cache token. | 
|  |  | 
|  | The token is an opaque object (supporting equality testing) identifying the | 
|  | current version of the ABC cache for virtual subclasses. The token changes | 
|  | with every call to ``register()`` on any ABC. | 
|  | """ | 
|  | return ABCMeta._abc_invalidation_counter | 
|  |  | 
|  |  | 
|  | class ABCMeta(type): | 
|  | """Metaclass for defining Abstract Base Classes (ABCs). | 
|  |  | 
|  | Use this metaclass to create an ABC.  An ABC can be subclassed | 
|  | directly, and then acts as a mix-in class.  You can also register | 
|  | unrelated concrete classes (even built-in classes) and unrelated | 
|  | ABCs as 'virtual subclasses' -- these and their descendants will | 
|  | be considered subclasses of the registering ABC by the built-in | 
|  | issubclass() function, but the registering ABC won't show up in | 
|  | their MRO (Method Resolution Order) nor will method | 
|  | implementations defined by the registering ABC be callable (not | 
|  | even via super()). | 
|  | """ | 
|  |  | 
|  | # A global counter that is incremented each time a class is | 
|  | # registered as a virtual subclass of anything.  It forces the | 
|  | # negative cache to be cleared before its next use. | 
|  | # Note: this counter is private. Use `abc.get_cache_token()` for | 
|  | #       external code. | 
|  | _abc_invalidation_counter = 0 | 
|  |  | 
|  | def __new__(mcls, name, bases, namespace, **kwargs): | 
|  | cls = super().__new__(mcls, name, bases, namespace, **kwargs) | 
|  | # Compute set of abstract method names | 
|  | abstracts = {name | 
|  | for name, value in namespace.items() | 
|  | if getattr(value, "__isabstractmethod__", False)} | 
|  | for base in bases: | 
|  | for name in getattr(base, "__abstractmethods__", set()): | 
|  | value = getattr(cls, name, None) | 
|  | if getattr(value, "__isabstractmethod__", False): | 
|  | abstracts.add(name) | 
|  | cls.__abstractmethods__ = frozenset(abstracts) | 
|  | # Set up inheritance registry | 
|  | cls._abc_registry = WeakSet() | 
|  | cls._abc_cache = WeakSet() | 
|  | cls._abc_negative_cache = WeakSet() | 
|  | cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter | 
|  | return cls | 
|  |  | 
|  | def register(cls, subclass): | 
|  | """Register a virtual subclass of an ABC. | 
|  |  | 
|  | Returns the subclass, to allow usage as a class decorator. | 
|  | """ | 
|  | if not isinstance(subclass, type): | 
|  | raise TypeError("Can only register classes") | 
|  | if issubclass(subclass, cls): | 
|  | return subclass  # Already a subclass | 
|  | # Subtle: test for cycles *after* testing for "already a subclass"; | 
|  | # this means we allow X.register(X) and interpret it as a no-op. | 
|  | if issubclass(cls, subclass): | 
|  | # This would create a cycle, which is bad for the algorithm below | 
|  | raise RuntimeError("Refusing to create an inheritance cycle") | 
|  | cls._abc_registry.add(subclass) | 
|  | ABCMeta._abc_invalidation_counter += 1  # Invalidate negative cache | 
|  | return subclass | 
|  |  | 
|  | def _dump_registry(cls, file=None): | 
|  | """Debug helper to print the ABC registry.""" | 
|  | print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file) | 
|  | print(f"Inv. counter: {get_cache_token()}", file=file) | 
|  | for name in cls.__dict__: | 
|  | if name.startswith("_abc_"): | 
|  | value = getattr(cls, name) | 
|  | if isinstance(value, WeakSet): | 
|  | value = set(value) | 
|  | print(f"{name}: {value!r}", file=file) | 
|  |  | 
|  | def _abc_registry_clear(cls): | 
|  | """Clear the registry (for debugging or testing).""" | 
|  | cls._abc_registry.clear() | 
|  |  | 
|  | def _abc_caches_clear(cls): | 
|  | """Clear the caches (for debugging or testing).""" | 
|  | cls._abc_cache.clear() | 
|  | cls._abc_negative_cache.clear() | 
|  |  | 
|  | def __instancecheck__(cls, instance): | 
|  | """Override for isinstance(instance, cls).""" | 
|  | # Inline the cache checking | 
|  | subclass = instance.__class__ | 
|  | if subclass in cls._abc_cache: | 
|  | return True | 
|  | subtype = type(instance) | 
|  | if subtype is subclass: | 
|  | if (cls._abc_negative_cache_version == | 
|  | ABCMeta._abc_invalidation_counter and | 
|  | subclass in cls._abc_negative_cache): | 
|  | return False | 
|  | # Fall back to the subclass check. | 
|  | return cls.__subclasscheck__(subclass) | 
|  | return any(cls.__subclasscheck__(c) for c in (subclass, subtype)) | 
|  |  | 
|  | def __subclasscheck__(cls, subclass): | 
|  | """Override for issubclass(subclass, cls).""" | 
|  | if not isinstance(subclass, type): | 
|  | raise TypeError('issubclass() arg 1 must be a class') | 
|  | # Check cache | 
|  | if subclass in cls._abc_cache: | 
|  | return True | 
|  | # Check negative cache; may have to invalidate | 
|  | if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter: | 
|  | # Invalidate the negative cache | 
|  | cls._abc_negative_cache = WeakSet() | 
|  | cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter | 
|  | elif subclass in cls._abc_negative_cache: | 
|  | return False | 
|  | # Check the subclass hook | 
|  | ok = cls.__subclasshook__(subclass) | 
|  | if ok is not NotImplemented: | 
|  | assert isinstance(ok, bool) | 
|  | if ok: | 
|  | cls._abc_cache.add(subclass) | 
|  | else: | 
|  | cls._abc_negative_cache.add(subclass) | 
|  | return ok | 
|  | # Check if it's a direct subclass | 
|  | if cls in getattr(subclass, '__mro__', ()): | 
|  | cls._abc_cache.add(subclass) | 
|  | return True | 
|  | # Check if it's a subclass of a registered class (recursive) | 
|  | for rcls in cls._abc_registry: | 
|  | if issubclass(subclass, rcls): | 
|  | cls._abc_cache.add(subclass) | 
|  | return True | 
|  | # Check if it's a subclass of a subclass (recursive) | 
|  | for scls in cls.__subclasses__(): | 
|  | if issubclass(subclass, scls): | 
|  | cls._abc_cache.add(subclass) | 
|  | return True | 
|  | # No dice; update negative cache | 
|  | cls._abc_negative_cache.add(subclass) | 
|  | return False |