| from test.test_importlib import util |
| |
| abc = util.import_importlib('importlib.abc') |
| init = util.import_importlib('importlib') |
| machinery = util.import_importlib('importlib.machinery') |
| importlib_util = util.import_importlib('importlib.util') |
| |
| import importlib.util |
| from importlib import _bootstrap_external |
| import os |
| import pathlib |
| import string |
| import sys |
| from test import support |
| from test.support import os_helper |
| import textwrap |
| import types |
| import unittest |
| import unittest.mock |
| import warnings |
| |
| try: |
| import _testsinglephase |
| except ImportError: |
| _testsinglephase = None |
| try: |
| import _testmultiphase |
| except ImportError: |
| _testmultiphase = None |
| try: |
| import _interpreters |
| except ModuleNotFoundError: |
| _interpreters = None |
| |
| |
| class DecodeSourceBytesTests: |
| |
| source = "string ='ΓΌ'" |
| |
| def test_ut8_default(self): |
| source_bytes = self.source.encode('utf-8') |
| self.assertEqual(self.util.decode_source(source_bytes), self.source) |
| |
| def test_specified_encoding(self): |
| source = '# coding=latin-1\n' + self.source |
| source_bytes = source.encode('latin-1') |
| assert source_bytes != source.encode('utf-8') |
| self.assertEqual(self.util.decode_source(source_bytes), source) |
| |
| def test_universal_newlines(self): |
| source = '\r\n'.join([self.source, self.source]) |
| source_bytes = source.encode('utf-8') |
| self.assertEqual(self.util.decode_source(source_bytes), |
| '\n'.join([self.source, self.source])) |
| |
| |
| (Frozen_DecodeSourceBytesTests, |
| Source_DecodeSourceBytesTests |
| ) = util.test_both(DecodeSourceBytesTests, util=importlib_util) |
| |
| |
| class ModuleFromSpecTests: |
| |
| def test_no_create_module(self): |
| class Loader: |
| def exec_module(self, module): |
| pass |
| spec = self.machinery.ModuleSpec('test', Loader()) |
| with self.assertRaises(ImportError): |
| module = self.util.module_from_spec(spec) |
| |
| def test_create_module_returns_None(self): |
| class Loader(self.abc.Loader): |
| def create_module(self, spec): |
| return None |
| spec = self.machinery.ModuleSpec('test', Loader()) |
| module = self.util.module_from_spec(spec) |
| self.assertIsInstance(module, types.ModuleType) |
| self.assertEqual(module.__name__, spec.name) |
| |
| def test_create_module(self): |
| name = 'already set' |
| class CustomModule(types.ModuleType): |
| pass |
| class Loader(self.abc.Loader): |
| def create_module(self, spec): |
| module = CustomModule(spec.name) |
| module.__name__ = name |
| return module |
| spec = self.machinery.ModuleSpec('test', Loader()) |
| module = self.util.module_from_spec(spec) |
| self.assertIsInstance(module, CustomModule) |
| self.assertEqual(module.__name__, name) |
| |
| def test___name__(self): |
| spec = self.machinery.ModuleSpec('test', object()) |
| module = self.util.module_from_spec(spec) |
| self.assertEqual(module.__name__, spec.name) |
| |
| def test___spec__(self): |
| spec = self.machinery.ModuleSpec('test', object()) |
| module = self.util.module_from_spec(spec) |
| self.assertEqual(module.__spec__, spec) |
| |
| def test___loader__(self): |
| loader = object() |
| spec = self.machinery.ModuleSpec('test', loader) |
| module = self.util.module_from_spec(spec) |
| self.assertIs(module.__loader__, loader) |
| |
| def test___package__(self): |
| spec = self.machinery.ModuleSpec('test.pkg', object()) |
| module = self.util.module_from_spec(spec) |
| self.assertEqual(module.__package__, spec.parent) |
| |
| def test___path__(self): |
| spec = self.machinery.ModuleSpec('test', object(), is_package=True) |
| module = self.util.module_from_spec(spec) |
| self.assertEqual(module.__path__, spec.submodule_search_locations) |
| |
| def test___file__(self): |
| spec = self.machinery.ModuleSpec('test', object(), origin='some/path') |
| spec.has_location = True |
| module = self.util.module_from_spec(spec) |
| self.assertEqual(module.__file__, spec.origin) |
| |
| |
| (Frozen_ModuleFromSpecTests, |
| Source_ModuleFromSpecTests |
| ) = util.test_both(ModuleFromSpecTests, abc=abc, machinery=machinery, |
| util=importlib_util) |
| |
| |
| class ResolveNameTests: |
| |
| """Tests importlib.util.resolve_name().""" |
| |
| def test_absolute(self): |
| # bacon |
| self.assertEqual('bacon', self.util.resolve_name('bacon', None)) |
| |
| def test_absolute_within_package(self): |
| # bacon in spam |
| self.assertEqual('bacon', self.util.resolve_name('bacon', 'spam')) |
| |
| def test_no_package(self): |
| # .bacon in '' |
| with self.assertRaises(ImportError): |
| self.util.resolve_name('.bacon', '') |
| |
| def test_in_package(self): |
| # .bacon in spam |
| self.assertEqual('spam.eggs.bacon', |
| self.util.resolve_name('.bacon', 'spam.eggs')) |
| |
| def test_other_package(self): |
| # ..bacon in spam.bacon |
| self.assertEqual('spam.bacon', |
| self.util.resolve_name('..bacon', 'spam.eggs')) |
| |
| def test_escape(self): |
| # ..bacon in spam |
| with self.assertRaises(ImportError): |
| self.util.resolve_name('..bacon', 'spam') |
| |
| |
| (Frozen_ResolveNameTests, |
| Source_ResolveNameTests |
| ) = util.test_both(ResolveNameTests, util=importlib_util) |
| |
| |
| class FindSpecTests: |
| |
| class FakeMetaFinder: |
| @staticmethod |
| def find_spec(name, path=None, target=None): return name, path, target |
| |
| def test_sys_modules(self): |
| name = 'some_mod' |
| with util.uncache(name): |
| module = types.ModuleType(name) |
| loader = 'a loader!' |
| spec = self.machinery.ModuleSpec(name, loader) |
| module.__loader__ = loader |
| module.__spec__ = spec |
| sys.modules[name] = module |
| found = self.util.find_spec(name) |
| self.assertEqual(found, spec) |
| |
| def test_sys_modules_without___loader__(self): |
| name = 'some_mod' |
| with util.uncache(name): |
| module = types.ModuleType(name) |
| del module.__loader__ |
| loader = 'a loader!' |
| spec = self.machinery.ModuleSpec(name, loader) |
| module.__spec__ = spec |
| sys.modules[name] = module |
| found = self.util.find_spec(name) |
| self.assertEqual(found, spec) |
| |
| def test_sys_modules_spec_is_None(self): |
| name = 'some_mod' |
| with util.uncache(name): |
| module = types.ModuleType(name) |
| module.__spec__ = None |
| sys.modules[name] = module |
| with self.assertRaises(ValueError): |
| self.util.find_spec(name) |
| |
| def test_sys_modules_loader_is_None(self): |
| name = 'some_mod' |
| with util.uncache(name): |
| module = types.ModuleType(name) |
| spec = self.machinery.ModuleSpec(name, None) |
| module.__spec__ = spec |
| sys.modules[name] = module |
| found = self.util.find_spec(name) |
| self.assertEqual(found, spec) |
| |
| def test_sys_modules_spec_is_not_set(self): |
| name = 'some_mod' |
| with util.uncache(name): |
| module = types.ModuleType(name) |
| try: |
| del module.__spec__ |
| except AttributeError: |
| pass |
| sys.modules[name] = module |
| with self.assertRaises(ValueError): |
| self.util.find_spec(name) |
| |
| def test_success(self): |
| name = 'some_mod' |
| with util.uncache(name): |
| with util.import_state(meta_path=[self.FakeMetaFinder]): |
| self.assertEqual((name, None, None), |
| self.util.find_spec(name)) |
| |
| def test_nothing(self): |
| # None is returned upon failure to find a loader. |
| self.assertIsNone(self.util.find_spec('nevergoingtofindthismodule')) |
| |
| def test_find_submodule(self): |
| name = 'spam' |
| subname = 'ham' |
| with util.temp_module(name, pkg=True) as pkg_dir: |
| fullname, _ = util.submodule(name, subname, pkg_dir) |
| spec = self.util.find_spec(fullname) |
| self.assertIsNot(spec, None) |
| self.assertIn(name, sorted(sys.modules)) |
| self.assertNotIn(fullname, sorted(sys.modules)) |
| # Ensure successive calls behave the same. |
| spec_again = self.util.find_spec(fullname) |
| self.assertEqual(spec_again, spec) |
| |
| def test_find_submodule_parent_already_imported(self): |
| name = 'spam' |
| subname = 'ham' |
| with util.temp_module(name, pkg=True) as pkg_dir: |
| self.init.import_module(name) |
| fullname, _ = util.submodule(name, subname, pkg_dir) |
| spec = self.util.find_spec(fullname) |
| self.assertIsNot(spec, None) |
| self.assertIn(name, sorted(sys.modules)) |
| self.assertNotIn(fullname, sorted(sys.modules)) |
| # Ensure successive calls behave the same. |
| spec_again = self.util.find_spec(fullname) |
| self.assertEqual(spec_again, spec) |
| |
| def test_find_relative_module(self): |
| name = 'spam' |
| subname = 'ham' |
| with util.temp_module(name, pkg=True) as pkg_dir: |
| fullname, _ = util.submodule(name, subname, pkg_dir) |
| relname = '.' + subname |
| spec = self.util.find_spec(relname, name) |
| self.assertIsNot(spec, None) |
| self.assertIn(name, sorted(sys.modules)) |
| self.assertNotIn(fullname, sorted(sys.modules)) |
| # Ensure successive calls behave the same. |
| spec_again = self.util.find_spec(fullname) |
| self.assertEqual(spec_again, spec) |
| |
| def test_find_relative_module_missing_package(self): |
| name = 'spam' |
| subname = 'ham' |
| with util.temp_module(name, pkg=True) as pkg_dir: |
| fullname, _ = util.submodule(name, subname, pkg_dir) |
| relname = '.' + subname |
| with self.assertRaises(ImportError): |
| self.util.find_spec(relname) |
| self.assertNotIn(name, sorted(sys.modules)) |
| self.assertNotIn(fullname, sorted(sys.modules)) |
| |
| def test_find_submodule_in_module(self): |
| # ModuleNotFoundError raised when a module is specified as |
| # a parent instead of a package. |
| with self.assertRaises(ModuleNotFoundError): |
| self.util.find_spec('module.name') |
| |
| |
| (Frozen_FindSpecTests, |
| Source_FindSpecTests |
| ) = util.test_both(FindSpecTests, init=init, util=importlib_util, |
| machinery=machinery) |
| |
| |
| class MagicNumberTests: |
| |
| def test_length(self): |
| # Should be 4 bytes. |
| self.assertEqual(len(self.util.MAGIC_NUMBER), 4) |
| |
| def test_incorporates_rn(self): |
| # The magic number uses \r\n to come out wrong when splitting on lines. |
| self.assertEndsWith(self.util.MAGIC_NUMBER, b'\r\n') |
| |
| |
| (Frozen_MagicNumberTests, |
| Source_MagicNumberTests |
| ) = util.test_both(MagicNumberTests, util=importlib_util) |
| |
| |
| class PEP3147Tests: |
| |
| """Tests of PEP 3147-related functions: cache_from_source and source_from_cache.""" |
| |
| tag = sys.implementation.cache_tag |
| |
| @unittest.skipIf(sys.implementation.cache_tag is None, |
| 'requires sys.implementation.cache_tag not be None') |
| def test_cache_from_source(self): |
| # Given the path to a .py file, return the path to its PEP 3147 |
| # defined .pyc file (i.e. under __pycache__). |
| path = os.path.join('foo', 'bar', 'baz', 'qux.py') |
| expect = os.path.join('foo', 'bar', 'baz', '__pycache__', |
| 'qux.{}.pyc'.format(self.tag)) |
| self.assertEqual(self.util.cache_from_source(path, optimization=''), |
| expect) |
| |
| def test_cache_from_source_no_cache_tag(self): |
| # No cache tag means NotImplementedError. |
| with support.swap_attr(sys.implementation, 'cache_tag', None): |
| with self.assertRaises(NotImplementedError): |
| self.util.cache_from_source('whatever.py') |
| |
| def test_cache_from_source_no_dot(self): |
| # Directory with a dot, filename without dot. |
| path = os.path.join('foo.bar', 'file') |
| expect = os.path.join('foo.bar', '__pycache__', |
| 'file{}.pyc'.format(self.tag)) |
| self.assertEqual(self.util.cache_from_source(path, optimization=''), |
| expect) |
| |
| def test_cache_from_source_cwd(self): |
| path = 'foo.py' |
| expect = os.path.join('__pycache__', 'foo.{}.pyc'.format(self.tag)) |
| self.assertEqual(self.util.cache_from_source(path, optimization=''), |
| expect) |
| |
| def test_cache_from_source_optimization_empty_string(self): |
| # Setting 'optimization' to '' leads to no optimization tag (PEP 488). |
| path = 'foo.py' |
| expect = os.path.join('__pycache__', 'foo.{}.pyc'.format(self.tag)) |
| self.assertEqual(self.util.cache_from_source(path, optimization=''), |
| expect) |
| |
| def test_cache_from_source_optimization_None(self): |
| # Setting 'optimization' to None uses the interpreter's optimization. |
| # (PEP 488) |
| path = 'foo.py' |
| optimization_level = sys.flags.optimize |
| almost_expect = os.path.join('__pycache__', 'foo.{}'.format(self.tag)) |
| if optimization_level == 0: |
| expect = almost_expect + '.pyc' |
| elif optimization_level <= 2: |
| expect = almost_expect + '.opt-{}.pyc'.format(optimization_level) |
| else: |
| msg = '{!r} is a non-standard optimization level'.format(optimization_level) |
| self.skipTest(msg) |
| self.assertEqual(self.util.cache_from_source(path, optimization=None), |
| expect) |
| |
| def test_cache_from_source_optimization_set(self): |
| # The 'optimization' parameter accepts anything that has a string repr |
| # that passes str.alnum(). |
| path = 'foo.py' |
| valid_characters = string.ascii_letters + string.digits |
| almost_expect = os.path.join('__pycache__', 'foo.{}'.format(self.tag)) |
| got = self.util.cache_from_source(path, optimization=valid_characters) |
| # Test all valid characters are accepted. |
| self.assertEqual(got, |
| almost_expect + '.opt-{}.pyc'.format(valid_characters)) |
| # str() should be called on argument. |
| self.assertEqual(self.util.cache_from_source(path, optimization=42), |
| almost_expect + '.opt-42.pyc') |
| # Invalid characters raise ValueError. |
| with self.assertRaises(ValueError): |
| self.util.cache_from_source(path, optimization='path/is/bad') |
| |
| def test_cache_from_source_debug_override_optimization_both_set(self): |
| # Can only set one of the optimization-related parameters. |
| with warnings.catch_warnings(): |
| warnings.simplefilter('ignore') |
| with self.assertRaises(TypeError): |
| self.util.cache_from_source('foo.py', False, optimization='') |
| |
| @unittest.skipUnless(os.sep == '\\' and os.altsep == '/', |
| 'test meaningful only where os.altsep is defined') |
| def test_sep_altsep_and_sep_cache_from_source(self): |
| # Windows path and PEP 3147 where sep is right of altsep. |
| self.assertEqual( |
| self.util.cache_from_source('\\foo\\bar\\baz/qux.py', optimization=''), |
| '\\foo\\bar\\baz\\__pycache__\\qux.{}.pyc'.format(self.tag)) |
| |
| @unittest.skipIf(sys.implementation.cache_tag is None, |
| 'requires sys.implementation.cache_tag not be None') |
| def test_cache_from_source_path_like_arg(self): |
| path = pathlib.PurePath('foo', 'bar', 'baz', 'qux.py') |
| expect = os.path.join('foo', 'bar', 'baz', '__pycache__', |
| 'qux.{}.pyc'.format(self.tag)) |
| self.assertEqual(self.util.cache_from_source(path, optimization=''), |
| expect) |
| |
| @unittest.skipIf(sys.implementation.cache_tag is None, |
| 'requires sys.implementation.cache_tag to not be None') |
| def test_source_from_cache(self): |
| # Given the path to a PEP 3147 defined .pyc file, return the path to |
| # its source. This tests the good path. |
| path = os.path.join('foo', 'bar', 'baz', '__pycache__', |
| 'qux.{}.pyc'.format(self.tag)) |
| expect = os.path.join('foo', 'bar', 'baz', 'qux.py') |
| self.assertEqual(self.util.source_from_cache(path), expect) |
| |
| def test_source_from_cache_no_cache_tag(self): |
| # If sys.implementation.cache_tag is None, raise NotImplementedError. |
| path = os.path.join('blah', '__pycache__', 'whatever.pyc') |
| with support.swap_attr(sys.implementation, 'cache_tag', None): |
| with self.assertRaises(NotImplementedError): |
| self.util.source_from_cache(path) |
| |
| def test_source_from_cache_bad_path(self): |
| # When the path to a pyc file is not in PEP 3147 format, a ValueError |
| # is raised. |
| self.assertRaises( |
| ValueError, self.util.source_from_cache, '/foo/bar/bazqux.pyc') |
| |
| def test_source_from_cache_no_slash(self): |
| # No slashes at all in path -> ValueError |
| self.assertRaises( |
| ValueError, self.util.source_from_cache, 'foo.cpython-32.pyc') |
| |
| def test_source_from_cache_too_few_dots(self): |
| # Too few dots in final path component -> ValueError |
| self.assertRaises( |
| ValueError, self.util.source_from_cache, '__pycache__/foo.pyc') |
| |
| def test_source_from_cache_too_many_dots(self): |
| with self.assertRaises(ValueError): |
| self.util.source_from_cache( |
| '__pycache__/foo.cpython-32.opt-1.foo.pyc') |
| |
| def test_source_from_cache_not_opt(self): |
| # Non-`opt-` path component -> ValueError |
| self.assertRaises( |
| ValueError, self.util.source_from_cache, |
| '__pycache__/foo.cpython-32.foo.pyc') |
| |
| def test_source_from_cache_no__pycache__(self): |
| # Another problem with the path -> ValueError |
| self.assertRaises( |
| ValueError, self.util.source_from_cache, |
| '/foo/bar/foo.cpython-32.foo.pyc') |
| |
| def test_source_from_cache_optimized_bytecode(self): |
| # Optimized bytecode is not an issue. |
| path = os.path.join('__pycache__', 'foo.{}.opt-1.pyc'.format(self.tag)) |
| self.assertEqual(self.util.source_from_cache(path), 'foo.py') |
| |
| def test_source_from_cache_missing_optimization(self): |
| # An empty optimization level is a no-no. |
| path = os.path.join('__pycache__', 'foo.{}.opt-.pyc'.format(self.tag)) |
| with self.assertRaises(ValueError): |
| self.util.source_from_cache(path) |
| |
| @unittest.skipIf(sys.implementation.cache_tag is None, |
| 'requires sys.implementation.cache_tag to not be None') |
| def test_source_from_cache_path_like_arg(self): |
| path = pathlib.PurePath('foo', 'bar', 'baz', '__pycache__', |
| 'qux.{}.pyc'.format(self.tag)) |
| expect = os.path.join('foo', 'bar', 'baz', 'qux.py') |
| self.assertEqual(self.util.source_from_cache(path), expect) |
| |
| @unittest.skipIf(sys.implementation.cache_tag is None, |
| 'requires sys.implementation.cache_tag to not be None') |
| def test_cache_from_source_respects_pycache_prefix(self): |
| # If pycache_prefix is set, cache_from_source will return a bytecode |
| # path inside that directory (in a subdirectory mirroring the .py file's |
| # path) rather than in a __pycache__ dir next to the py file. |
| pycache_prefixes = [ |
| os.path.join(os.path.sep, 'tmp', 'bytecode'), |
| os.path.join(os.path.sep, 'tmp', '\u2603'), # non-ASCII in path! |
| os.path.join(os.path.sep, 'tmp', 'trailing-slash') + os.path.sep, |
| ] |
| drive = '' |
| if os.name == 'nt': |
| drive = 'C:' |
| pycache_prefixes = [ |
| f'{drive}{prefix}' for prefix in pycache_prefixes] |
| pycache_prefixes += [r'\\?\C:\foo', r'\\localhost\c$\bar'] |
| for pycache_prefix in pycache_prefixes: |
| with self.subTest(path=pycache_prefix): |
| path = drive + os.path.join( |
| os.path.sep, 'foo', 'bar', 'baz', 'qux.py') |
| expect = os.path.join( |
| pycache_prefix, 'foo', 'bar', 'baz', |
| 'qux.{}.pyc'.format(self.tag)) |
| with util.temporary_pycache_prefix(pycache_prefix): |
| self.assertEqual( |
| self.util.cache_from_source(path, optimization=''), |
| expect) |
| |
| @unittest.skipIf(sys.implementation.cache_tag is None, |
| 'requires sys.implementation.cache_tag to not be None') |
| def test_cache_from_source_respects_pycache_prefix_relative(self): |
| # If the .py path we are given is relative, we will resolve to an |
| # absolute path before prefixing with pycache_prefix, to avoid any |
| # possible ambiguity. |
| pycache_prefix = os.path.join(os.path.sep, 'tmp', 'bytecode') |
| path = os.path.join('foo', 'bar', 'baz', 'qux.py') |
| root = os.path.splitdrive(os.getcwd())[0] + os.path.sep |
| expect = os.path.join( |
| pycache_prefix, |
| os.path.relpath(os.getcwd(), root), |
| 'foo', 'bar', 'baz', f'qux.{self.tag}.pyc') |
| with util.temporary_pycache_prefix(pycache_prefix): |
| self.assertEqual( |
| self.util.cache_from_source(path, optimization=''), |
| os.path.normpath(expect)) |
| |
| @unittest.skipIf(sys.implementation.cache_tag is None, |
| 'requires sys.implementation.cache_tag to not be None') |
| def test_cache_from_source_in_root_with_pycache_prefix(self): |
| # Regression test for gh-82916 |
| pycache_prefix = os.path.join(os.path.sep, 'tmp', 'bytecode') |
| path = 'qux.py' |
| expect = os.path.join(os.path.sep, 'tmp', 'bytecode', |
| f'qux.{self.tag}.pyc') |
| with util.temporary_pycache_prefix(pycache_prefix): |
| with os_helper.change_cwd('/'): |
| self.assertEqual(self.util.cache_from_source(path), expect) |
| |
| @unittest.skipIf(sys.implementation.cache_tag is None, |
| 'requires sys.implementation.cache_tag to not be None') |
| def test_source_from_cache_inside_pycache_prefix(self): |
| # If pycache_prefix is set and the cache path we get is inside it, |
| # we return an absolute path to the py file based on the remainder of |
| # the path within pycache_prefix. |
| pycache_prefix = os.path.join(os.path.sep, 'tmp', 'bytecode') |
| path = os.path.join(pycache_prefix, 'foo', 'bar', 'baz', |
| f'qux.{self.tag}.pyc') |
| expect = os.path.join(os.path.sep, 'foo', 'bar', 'baz', 'qux.py') |
| with util.temporary_pycache_prefix(pycache_prefix): |
| self.assertEqual(self.util.source_from_cache(path), expect) |
| |
| @unittest.skipIf(sys.implementation.cache_tag is None, |
| 'requires sys.implementation.cache_tag to not be None') |
| def test_source_from_cache_outside_pycache_prefix(self): |
| # If pycache_prefix is set but the cache path we get is not inside |
| # it, just ignore it and handle the cache path according to the default |
| # behavior. |
| pycache_prefix = os.path.join(os.path.sep, 'tmp', 'bytecode') |
| path = os.path.join('foo', 'bar', 'baz', '__pycache__', |
| f'qux.{self.tag}.pyc') |
| expect = os.path.join('foo', 'bar', 'baz', 'qux.py') |
| with util.temporary_pycache_prefix(pycache_prefix): |
| self.assertEqual(self.util.source_from_cache(path), expect) |
| |
| |
| (Frozen_PEP3147Tests, |
| Source_PEP3147Tests |
| ) = util.test_both(PEP3147Tests, util=importlib_util) |
| |
| |
| class MagicNumberTests(unittest.TestCase): |
| """ |
| Test release compatibility issues relating to importlib |
| """ |
| @unittest.skipUnless( |
| sys.version_info.releaselevel in ('candidate', 'final'), |
| 'only applies to candidate or final python release levels' |
| ) |
| def test_magic_number(self): |
| # Each python minor release should generally have a MAGIC_NUMBER |
| # that does not change once the release reaches candidate status. |
| |
| # Once a release reaches candidate status, the value of the constant |
| # EXPECTED_MAGIC_NUMBER in this test should be changed. |
| # This test will then check that the actual MAGIC_NUMBER matches |
| # the expected value for the release. |
| |
| # In exceptional cases, it may be required to change the MAGIC_NUMBER |
| # for a maintenance release. In this case the change should be |
| # discussed in python-dev. If a change is required, community |
| # stakeholders such as OS package maintainers must be notified |
| # in advance. Such exceptional releases will then require an |
| # adjustment to this test case. |
| EXPECTED_MAGIC_NUMBER = 3625 |
| actual = int.from_bytes(importlib.util.MAGIC_NUMBER[:2], 'little') |
| |
| msg = ( |
| "To avoid breaking backwards compatibility with cached bytecode " |
| "files that can't be automatically regenerated by the current " |
| "user, candidate and final releases require the current " |
| "importlib.util.MAGIC_NUMBER to match the expected " |
| "magic number in this test. Set the expected " |
| "magic number in this test to the current MAGIC_NUMBER to " |
| "continue with the release.\n\n" |
| "Changing the MAGIC_NUMBER for a maintenance release " |
| "requires discussion in python-dev and notification of " |
| "community stakeholders." |
| ) |
| self.assertEqual(EXPECTED_MAGIC_NUMBER, actual, msg) |
| |
| |
| @unittest.skipIf(_interpreters is None, 'subinterpreters required') |
| class IncompatibleExtensionModuleRestrictionsTests(unittest.TestCase): |
| |
| def run_with_own_gil(self, script): |
| interpid = _interpreters.create('isolated') |
| def ensure_destroyed(): |
| try: |
| _interpreters.destroy(interpid) |
| except _interpreters.InterpreterNotFoundError: |
| pass |
| self.addCleanup(ensure_destroyed) |
| excsnap = _interpreters.exec(interpid, script) |
| if excsnap is not None: |
| if excsnap.type.__name__ == 'ImportError': |
| raise ImportError(excsnap.msg) |
| |
| def run_with_shared_gil(self, script): |
| interpid = _interpreters.create('legacy') |
| def ensure_destroyed(): |
| try: |
| _interpreters.destroy(interpid) |
| except _interpreters.InterpreterNotFoundError: |
| pass |
| self.addCleanup(ensure_destroyed) |
| excsnap = _interpreters.exec(interpid, script) |
| if excsnap is not None: |
| if excsnap.type.__name__ == 'ImportError': |
| raise ImportError(excsnap.msg) |
| |
| @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module") |
| # gh-117649: single-phase init modules are not currently supported in |
| # subinterpreters in the free-threaded build |
| @support.expected_failure_if_gil_disabled() |
| def test_single_phase_init_module(self): |
| script = textwrap.dedent(''' |
| from importlib.util import _incompatible_extension_module_restrictions |
| with _incompatible_extension_module_restrictions(disable_check=True): |
| import _testsinglephase |
| ''') |
| with self.subTest('check disabled, shared GIL'): |
| self.run_with_shared_gil(script) |
| with self.subTest('check disabled, per-interpreter GIL'): |
| self.run_with_own_gil(script) |
| |
| script = textwrap.dedent(f''' |
| from importlib.util import _incompatible_extension_module_restrictions |
| with _incompatible_extension_module_restrictions(disable_check=False): |
| import _testsinglephase |
| ''') |
| with self.subTest('check enabled, shared GIL'): |
| with self.assertRaises(ImportError): |
| self.run_with_shared_gil(script) |
| with self.subTest('check enabled, per-interpreter GIL'): |
| with self.assertRaises(ImportError): |
| self.run_with_own_gil(script) |
| |
| @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") |
| @support.requires_gil_enabled("gh-117649: not supported in free-threaded build") |
| def test_incomplete_multi_phase_init_module(self): |
| # Apple extensions must be distributed as frameworks. This requires |
| # a specialist loader. |
| if support.is_apple_mobile: |
| loader = "AppleFrameworkLoader" |
| else: |
| loader = "ExtensionFileLoader" |
| |
| prescript = textwrap.dedent(f''' |
| from importlib.util import spec_from_loader, module_from_spec |
| from importlib.machinery import {loader} |
| |
| name = '_test_shared_gil_only' |
| filename = {_testmultiphase.__file__!r} |
| loader = {loader}(name, filename) |
| spec = spec_from_loader(name, loader) |
| |
| ''') |
| |
| script = prescript + textwrap.dedent(''' |
| from importlib.util import _incompatible_extension_module_restrictions |
| with _incompatible_extension_module_restrictions(disable_check=True): |
| module = module_from_spec(spec) |
| loader.exec_module(module) |
| ''') |
| with self.subTest('check disabled, shared GIL'): |
| self.run_with_shared_gil(script) |
| with self.subTest('check disabled, per-interpreter GIL'): |
| self.run_with_own_gil(script) |
| |
| script = prescript + textwrap.dedent(''' |
| from importlib.util import _incompatible_extension_module_restrictions |
| with _incompatible_extension_module_restrictions(disable_check=False): |
| module = module_from_spec(spec) |
| loader.exec_module(module) |
| ''') |
| with self.subTest('check enabled, shared GIL'): |
| self.run_with_shared_gil(script) |
| with self.subTest('check enabled, per-interpreter GIL'): |
| with self.assertRaises(ImportError): |
| self.run_with_own_gil(script) |
| |
| @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") |
| def test_complete_multi_phase_init_module(self): |
| script = textwrap.dedent(''' |
| from importlib.util import _incompatible_extension_module_restrictions |
| with _incompatible_extension_module_restrictions(disable_check=True): |
| import _testmultiphase |
| ''') |
| with self.subTest('check disabled, shared GIL'): |
| self.run_with_shared_gil(script) |
| with self.subTest('check disabled, per-interpreter GIL'): |
| self.run_with_own_gil(script) |
| |
| script = textwrap.dedent(f''' |
| from importlib.util import _incompatible_extension_module_restrictions |
| with _incompatible_extension_module_restrictions(disable_check=False): |
| import _testmultiphase |
| ''') |
| with self.subTest('check enabled, shared GIL'): |
| self.run_with_shared_gil(script) |
| with self.subTest('check enabled, per-interpreter GIL'): |
| self.run_with_own_gil(script) |
| |
| |
| class PatchAtomicWrites: |
| def __init__(self, truncate_at_length, never_complete=False): |
| self.truncate_at_length = truncate_at_length |
| self.never_complete = never_complete |
| self.seen_write = False |
| self._children = [] |
| |
| def __enter__(self): |
| import _pyio |
| |
| oldwrite = os.write |
| |
| # Emulate an os.write that only writes partial data. |
| def write(fd, data): |
| if self.seen_write and self.never_complete: |
| return None |
| self.seen_write = True |
| return oldwrite(fd, data[:self.truncate_at_length]) |
| |
| # Need to patch _io to be _pyio, so that io.FileIO is affected by the |
| # os.write patch. |
| self.children = [ |
| support.swap_attr(_bootstrap_external, '_io', _pyio), |
| support.swap_attr(os, 'write', write) |
| ] |
| for child in self.children: |
| child.__enter__() |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| for child in self.children: |
| child.__exit__(exc_type, exc_val, exc_tb) |
| |
| |
| class MiscTests(unittest.TestCase): |
| |
| def test_atomic_write_retries_incomplete_writes(self): |
| truncate_at_length = 100 |
| length = truncate_at_length * 2 |
| |
| with PatchAtomicWrites(truncate_at_length=truncate_at_length) as cm: |
| # Make sure we write something longer than the point where we |
| # truncate. |
| content = b'x' * length |
| _bootstrap_external._write_atomic(os_helper.TESTFN, content) |
| self.assertTrue(cm.seen_write) |
| |
| self.assertEqual(os.stat(support.os_helper.TESTFN).st_size, length) |
| os.unlink(support.os_helper.TESTFN) |
| |
| def test_atomic_write_errors_if_unable_to_complete(self): |
| truncate_at_length = 100 |
| |
| with ( |
| PatchAtomicWrites( |
| truncate_at_length=truncate_at_length, never_complete=True, |
| ) as cm, |
| self.assertRaises(OSError) |
| ): |
| # Make sure we write something longer than the point where we |
| # truncate. |
| content = b'x' * (truncate_at_length * 2) |
| _bootstrap_external._write_atomic(os_helper.TESTFN, content) |
| self.assertTrue(cm.seen_write) |
| |
| with self.assertRaises(OSError): |
| os.stat(support.os_helper.TESTFN) # Check that the file did not get written. |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |