| # Copyright 2018 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| from __future__ import annotations |
| |
| from recipe_engine import recipe_api |
| from recipe_engine import config_types |
| |
| |
| class ArchiveApi(recipe_api.RecipeApi): |
| """Provides steps to manipulate archive files (tar, zip, etc.).""" |
| |
| ARCHIVE_TYPES = ('tar', 'tgz', 'tbz', 'zip', 'tzst') |
| |
| def package(self, root: config_types.Path) -> Package: |
| """Returns Package object that can be used to compress a set of files. |
| |
| Usage: |
| |
| # Archive root/file and root/directory/** |
| (api.archive.package(root). |
| with_file(root / 'file'). |
| with_dir(root / 'directory'). |
| archive('archive step', output, 'tbz')) |
| |
| # Archive root/** |
| zip_path = ( |
| api.archive.package(root). |
| archive('archive step', api.path.start_dir / 'output.zip') |
| ) |
| |
| Args: |
| |
| * root: a directory that would become root of a package, all files added |
| to an archive must be Paths which are under this directory. If no |
| files or directories are added with 'with_file' or 'with_dir', the |
| entire root directory is packaged. |
| |
| Returns: |
| Package object. |
| """ |
| return Package(self._archive_impl, root) |
| |
| def extract(self, |
| step_name: str, |
| archive_file: config_types.Path | str, |
| output: config_types.Path | str, |
| mode: str = 'safe', |
| include_files: Sequence[str] = (), |
| archive_type: str | None = None): |
| """Step to uncompress |archive_file| into |output| directory. |
| |
| Archive will be unpacked to |output| so that root of an archive is in |
| |output|, i.e. archive.tar/file.txt will become |output|/file.txt. |
| |
| Step will FAIL if |output| already exists. |
| |
| Args: |
| |
| * step_name (str): display name of a step. |
| * archive_file (Path): path to an archive file to uncompress, MUST exist. |
| * output (Path): path to a directory to unpack to. The output directory |
| MAY exist, in which case the extract will unpack on-top-of the existing |
| files. It's an error for one of the extracted files to overlap with an |
| already-present file, however. |
| * mode (str): Must be either 'safe' or 'unsafe'. In safe mode, if the |
| archive attempts to extract files which would escape the extraction |
| `output` location, the extraction will fail (raise StepException) |
| which contains a member `StepException.archive_skipped_files` (all |
| other files will be extracted normally). If 'unsafe', then tarfiles |
| containing paths escaping `output` will be extracted as-is. |
| * include_files (List[str]) - A list of globs matching files within the |
| archive. Any files not matching any of these globs will be skipped. |
| If omitted, all files are extracted (the default). Globs are matched |
| with the `fnmatch` module. If a file "filename" in the archive exists, |
| include_files with "file*" will match it. All paths for the matcher |
| are converted to posix style (forward slash). |
| * archive_type (str): archive_file's archive type ("zip" or "tar"). This |
| allows overriding the default detected type (based on file extension). |
| """ |
| assert mode in ('safe', 'unsafe'), 'Unknown mode %r' % (mode,) |
| |
| script_input = { |
| 'output': str(output), |
| 'archive_file': str(archive_file), |
| 'safe_mode': mode == 'safe', |
| 'include_files': list(include_files), |
| } |
| if archive_type: |
| assert archive_type in ('zip', 'tar'), ('Unknown archive_type %s' % |
| archive_type) |
| script_input['archive_type'] = archive_type |
| step_result = self.m.step( |
| step_name, [ |
| 'vpython3', |
| '-u', |
| self.resource('extract.py'), |
| '--json-input', |
| self.m.json.input(script_input), |
| '--json-output', |
| self.m.json.output(), |
| ], |
| step_test_data=lambda: self.m.json.test_api.output({ |
| 'extracted': { |
| 'filecount': 1337, |
| 'bytes': 0xbadc0ffee, |
| }, |
| })) |
| self.m.path.mock_add_paths(output) |
| j = step_result.json.output |
| if j.get('extracted', {}).get('filecount'): |
| stat = j['extracted'] |
| step_result.presentation.step_text += ( |
| '<br/>extracted %s files - %.02f MB' % ( |
| stat['filecount'], stat['bytes'] / (1000.0**2))) |
| if j.get('skipped', {}).get('filecount'): |
| stat = j['skipped'] |
| step_result.presentation.step_text += ( |
| '<br/>SKIPPED %s files - %.02f MB' % ( |
| stat['filecount'], stat['bytes'] / (1000.0**2))) |
| step_result.presentation.logs['skipped files'] = stat['names'] |
| step_result.presentation.status = self.m.step.FAILURE |
| ex = self.m.step.StepFailure(step_name) |
| ex.archive_skipped_files = stat['names'] |
| raise ex |
| |
| def _archive_impl(self, root, entries, step_name, output, archive_type): |
| assert entries, 'entries is empty!' |
| |
| if archive_type is None: |
| base, ext = self.m.path.splitext(output) |
| if self.m.path.splitext(base)[1] == '.tar': |
| ext = '.tar' + ext |
| archive_type = { |
| '.tbz': 'tbz', |
| '.tbz2': 'tbz', |
| '.tb2': 'tbz', |
| '.tar.bz2': 'tbz', |
| '.tgz': 'tgz', |
| '.tar.gz': 'tgz', |
| '.tzst': 'tzst', |
| '.tar.zst': 'tzst', |
| '.tar': 'tar', |
| '.zip': 'zip', |
| }.get(ext) |
| assert archive_type is not None, ( |
| 'Unable to infer archive_type from extension: %r' % (ext,)) |
| |
| assert archive_type in self.ARCHIVE_TYPES, ( |
| 'Unsupported archive_type %r' % (archive_type,)) |
| |
| script_input = { |
| 'entries': entries, |
| 'output': str(output), |
| 'archive_type': archive_type, |
| 'root': str(root), |
| } |
| self.m.step( |
| step_name, [ |
| 'vpython3', |
| '-u', |
| self.resource('archive.py'), |
| ], |
| stdin=self.m.json.input(script_input)) |
| self.m.path.mock_add_paths(output) |
| |
| |
| class Package: |
| """Used to gather a list of files to archive. |
| |
| Construct this with api.archive.package(). |
| |
| If no 'with_file' or 'with_dir' calls are made, this defaults to including |
| the entire root in the archive. |
| """ |
| |
| def __init__(self, archive_callback, root): |
| self._archive_callback = archive_callback |
| self._root = root |
| self._entries = [] |
| |
| @property |
| def root(self): |
| return self._root |
| |
| def with_file(self, path): |
| """Stages single file to be added to the package. |
| |
| Args: |
| path: absolute path to a file, should be in |root| subdirectory. |
| |
| Returns: |
| `self` to allow chaining. |
| """ |
| assert self._root in path.parents, ( |
| '%r is not a parent of %r' % (self._root, path)) |
| self._entries.append({ |
| 'type': 'file', |
| 'path': str(path), |
| }) |
| return self |
| |
| def with_dir(self, path): |
| """Stages a directory with all its content to be added to the package. |
| |
| Args: |
| path: absolute path to a directory, should be in |root| subdirectory. |
| |
| Returns: |
| `self` to allow chaining. |
| """ |
| assert self._root in (path, *path.parents), ( |
| '%r is not a parent of %r' % (self._root, path)) |
| self._entries.append({ |
| 'type': 'dir', |
| 'path': str(path), |
| }) |
| return self |
| |
| def archive(self, step_name, output, archive_type=None): |
| """Archives all staged files to an archive file indicated by `output`. |
| |
| If no 'with_file' or 'with_dir' calls were made, this will zip the entire |
| root by default. |
| |
| Args: |
| output: path to an archive file to create. |
| archive_type: The type of archive to create. This may be: |
| tar, tgz, tbz, zip. If None, will be inferred from the extension of |
| output. |
| |
| Returns: |
| `output`, for convenience. |
| """ |
| entries = self._entries or [ |
| {'type': 'dir', 'path': str(self._root)} |
| ] |
| self._archive_callback(self._root, entries, step_name, output, |
| archive_type) |
| return output |