| import abc |
| import contextlib |
| import functools |
| import importlib |
| import io |
| import pathlib |
| import sys |
| import types |
| from importlib.machinery import ModuleSpec |
| from importlib.resources.abc import ResourceReader, Traversable, TraversableResources |
| |
| from test.support import import_helper, os_helper |
| |
| from . import _path |
| from . import zip as zip_ |
| |
| |
| class Reader(ResourceReader): |
| def __init__(self, **kwargs): |
| vars(self).update(kwargs) |
| |
| def get_resource_reader(self, package): |
| return self |
| |
| def open_resource(self, path): |
| self._path = path |
| if isinstance(self.file, Exception): |
| raise self.file |
| return self.file |
| |
| def resource_path(self, path_): |
| self._path = path_ |
| if isinstance(self.path, Exception): |
| raise self.path |
| return self.path |
| |
| def is_resource(self, path_): |
| self._path = path_ |
| if isinstance(self.path, Exception): |
| raise self.path |
| |
| def part(entry): |
| return entry.split('/') |
| |
| return any( |
| len(parts) == 1 and parts[0] == path_ for parts in map(part, self._contents) |
| ) |
| |
| def contents(self): |
| if isinstance(self.path, Exception): |
| raise self.path |
| yield from self._contents |
| |
| |
| def create_package_from_loader(loader, is_package=True): |
| name = 'testingpackage' |
| module = types.ModuleType(name) |
| spec = ModuleSpec(name, loader, origin='does-not-exist', is_package=is_package) |
| module.__spec__ = spec |
| module.__loader__ = loader |
| return module |
| |
| |
| def create_package(file=None, path=None, is_package=True, contents=()): |
| return create_package_from_loader( |
| Reader(file=file, path=path, _contents=contents), |
| is_package, |
| ) |
| |
| |
| class CommonTestsBase(metaclass=abc.ABCMeta): |
| """ |
| Tests shared by test_open, test_path, and test_read. |
| """ |
| |
| @abc.abstractmethod |
| def execute(self, package, path): |
| """ |
| Call the pertinent legacy API function (e.g. open_text, path) |
| on package and path. |
| """ |
| |
| def test_package_name(self): |
| """ |
| Passing in the package name should succeed. |
| """ |
| self.execute(self.data.__name__, 'utf-8.file') |
| |
| def test_package_object(self): |
| """ |
| Passing in the package itself should succeed. |
| """ |
| self.execute(self.data, 'utf-8.file') |
| |
| def test_string_path(self): |
| """ |
| Passing in a string for the path should succeed. |
| """ |
| path = 'utf-8.file' |
| self.execute(self.data, path) |
| |
| def test_pathlib_path(self): |
| """ |
| Passing in a pathlib.PurePath object for the path should succeed. |
| """ |
| path = pathlib.PurePath('utf-8.file') |
| self.execute(self.data, path) |
| |
| def test_importing_module_as_side_effect(self): |
| """ |
| The anchor package can already be imported. |
| """ |
| del sys.modules[self.data.__name__] |
| self.execute(self.data.__name__, 'utf-8.file') |
| |
| def test_missing_path(self): |
| """ |
| Attempting to open or read or request the path for a |
| non-existent path should succeed if open_resource |
| can return a viable data stream. |
| """ |
| bytes_data = io.BytesIO(b'Hello, world!') |
| package = create_package(file=bytes_data, path=FileNotFoundError()) |
| self.execute(package, 'utf-8.file') |
| assert package.__loader__._path == 'utf-8.file' |
| |
| def test_extant_path(self): |
| # Attempting to open or read or request the path when the |
| # path does exist should still succeed. Does not assert |
| # anything about the result. |
| bytes_data = io.BytesIO(b'Hello, world!') |
| # any path that exists |
| path = __file__ |
| package = create_package(file=bytes_data, path=path) |
| self.execute(package, 'utf-8.file') |
| assert package.__loader__._path == 'utf-8.file' |
| |
| def test_useless_loader(self): |
| package = create_package(file=FileNotFoundError(), path=FileNotFoundError()) |
| with self.assertRaises(FileNotFoundError): |
| self.execute(package, 'utf-8.file') |
| |
| |
| fixtures = dict( |
| data01={ |
| '__init__.py': '', |
| 'binary.file': bytes(range(4)), |
| 'utf-16.file': '\ufeffHello, UTF-16 world!\n'.encode('utf-16-le'), |
| 'utf-8.file': 'Hello, UTF-8 world!\n'.encode('utf-8'), |
| 'subdirectory': { |
| '__init__.py': '', |
| 'binary.file': bytes(range(4, 8)), |
| }, |
| }, |
| data02={ |
| '__init__.py': '', |
| 'one': {'__init__.py': '', 'resource1.txt': 'one resource'}, |
| 'two': {'__init__.py': '', 'resource2.txt': 'two resource'}, |
| 'subdirectory': {'subsubdir': {'resource.txt': 'a resource'}}, |
| }, |
| namespacedata01={ |
| 'binary.file': bytes(range(4)), |
| 'utf-16.file': '\ufeffHello, UTF-16 world!\n'.encode('utf-16-le'), |
| 'utf-8.file': 'Hello, UTF-8 world!\n'.encode('utf-8'), |
| 'subdirectory': { |
| 'binary.file': bytes(range(12, 16)), |
| }, |
| }, |
| ) |
| |
| |
| class ModuleSetup: |
| def setUp(self): |
| self.fixtures = contextlib.ExitStack() |
| self.addCleanup(self.fixtures.close) |
| |
| self.fixtures.enter_context(import_helper.isolated_modules()) |
| self.data = self.load_fixture(self.MODULE) |
| |
| def load_fixture(self, module): |
| self.tree_on_path({module: fixtures[module]}) |
| return importlib.import_module(module) |
| |
| |
| class ZipSetup(ModuleSetup): |
| MODULE = 'data01' |
| |
| def tree_on_path(self, spec): |
| temp_dir = self.fixtures.enter_context(os_helper.temp_dir()) |
| modules = pathlib.Path(temp_dir) / 'zipped modules.zip' |
| self.fixtures.enter_context( |
| import_helper.DirsOnSysPath(str(zip_.make_zip_file(spec, modules))) |
| ) |
| |
| |
| class DiskSetup(ModuleSetup): |
| MODULE = 'data01' |
| |
| def tree_on_path(self, spec): |
| temp_dir = self.fixtures.enter_context(os_helper.temp_dir()) |
| _path.build(spec, pathlib.Path(temp_dir)) |
| self.fixtures.enter_context(import_helper.DirsOnSysPath(temp_dir)) |
| |
| |
| class MemorySetup(ModuleSetup): |
| """Support loading a module in memory.""" |
| |
| MODULE = 'data01' |
| |
| def load_fixture(self, module): |
| self.fixtures.enter_context(self.augment_sys_metapath(module)) |
| return importlib.import_module(module) |
| |
| @contextlib.contextmanager |
| def augment_sys_metapath(self, module): |
| finder_instance = self.MemoryFinder(module) |
| sys.meta_path.append(finder_instance) |
| yield |
| sys.meta_path.remove(finder_instance) |
| |
| class MemoryFinder(importlib.abc.MetaPathFinder): |
| def __init__(self, module): |
| self._module = module |
| |
| def find_spec(self, fullname, path, target=None): |
| if fullname != self._module: |
| return None |
| |
| return importlib.machinery.ModuleSpec( |
| name=fullname, |
| loader=MemorySetup.MemoryLoader(self._module), |
| is_package=True, |
| ) |
| |
| class MemoryLoader(importlib.abc.Loader): |
| def __init__(self, module): |
| self._module = module |
| |
| def exec_module(self, module): |
| pass |
| |
| def get_resource_reader(self, fullname): |
| return MemorySetup.MemoryTraversableResources(self._module, fullname) |
| |
| class MemoryTraversableResources(TraversableResources): |
| def __init__(self, module, fullname): |
| self._module = module |
| self._fullname = fullname |
| |
| def files(self): |
| return MemorySetup.MemoryTraversable(self._module, self._fullname) |
| |
| class MemoryTraversable(Traversable): |
| """Implement only the abstract methods of `Traversable`. |
| |
| Besides `.__init__()`, no other methods may be implemented or overridden. |
| This is critical for validating the concrete `Traversable` implementations. |
| """ |
| |
| def __init__(self, module, fullname): |
| self._module = module |
| self._fullname = fullname |
| |
| def _resolve(self): |
| """ |
| Fully traverse the `fixtures` dictionary. |
| |
| This should be wrapped in a `try/except KeyError` |
| but it is not currently needed and lowers the code coverage numbers. |
| """ |
| path = pathlib.PurePosixPath(self._fullname) |
| return functools.reduce(lambda d, p: d[p], path.parts, fixtures) |
| |
| def iterdir(self): |
| directory = self._resolve() |
| if not isinstance(directory, dict): |
| # Filesystem openers raise OSError, and that exception is mirrored here. |
| raise OSError(f"{self._fullname} is not a directory") |
| for path in directory: |
| yield MemorySetup.MemoryTraversable( |
| self._module, f"{self._fullname}/{path}" |
| ) |
| |
| def is_dir(self) -> bool: |
| return isinstance(self._resolve(), dict) |
| |
| def is_file(self) -> bool: |
| return not self.is_dir() |
| |
| def open(self, mode='r', encoding=None, errors=None, *_, **__): |
| contents = self._resolve() |
| if isinstance(contents, dict): |
| # Filesystem openers raise OSError when attempting to open a directory, |
| # and that exception is mirrored here. |
| raise OSError(f"{self._fullname} is a directory") |
| if isinstance(contents, str): |
| contents = contents.encode("utf-8") |
| result = io.BytesIO(contents) |
| if "b" in mode: |
| return result |
| return io.TextIOWrapper(result, encoding=encoding, errors=errors) |
| |
| @property |
| def name(self): |
| return pathlib.PurePosixPath(self._fullname).name |
| |
| |
| class CommonTests(DiskSetup, CommonTestsBase): |
| pass |