|  | # Copyright 2023 The Chromium Authors | 
|  | # Use of this source code is governed by a BSD-style license that can be | 
|  | # found in the LICENSE file. | 
|  | """Helper functions for dealing with .zip files.""" | 
|  |  | 
|  | import os | 
|  | import pathlib | 
|  | import posixpath | 
|  | import stat | 
|  | import time | 
|  | import zipfile | 
|  |  | 
|  | _FIXED_ZIP_HEADER_LEN = 30 | 
|  |  | 
|  |  | 
|  | def _set_alignment(zip_obj, zip_info, alignment): | 
|  | """Sets a ZipInfo's extra field such that the file will be aligned. | 
|  |  | 
|  | Args: | 
|  | zip_obj: The ZipFile object that is being written. | 
|  | zip_info: The ZipInfo object about to be written. | 
|  | alignment: The amount of alignment (e.g. 4, or 4*1024). | 
|  | """ | 
|  | header_size = _FIXED_ZIP_HEADER_LEN + len(zip_info.filename) | 
|  | pos = zip_obj.fp.tell() + header_size | 
|  | padding_needed = (alignment - (pos % alignment)) % alignment | 
|  |  | 
|  | # Python writes |extra| to both the local file header and the central | 
|  | # directory's file header. Android's zipalign tool writes only to the | 
|  | # local file header, so there is more overhead in using Python to align. | 
|  | zip_info.extra = b'\0' * padding_needed | 
|  |  | 
|  |  | 
|  | def _hermetic_date_time(timestamp=None): | 
|  | if not timestamp: | 
|  | return (2001, 1, 1, 0, 0, 0) | 
|  | utc_time = time.gmtime(timestamp) | 
|  | return (utc_time.tm_year, utc_time.tm_mon, utc_time.tm_mday, utc_time.tm_hour, | 
|  | utc_time.tm_min, utc_time.tm_sec) | 
|  |  | 
|  |  | 
|  | def add_to_zip_hermetic(zip_file, | 
|  | zip_path, | 
|  | *, | 
|  | src_path=None, | 
|  | data=None, | 
|  | compress=None, | 
|  | compress_level=1, | 
|  | alignment=None, | 
|  | timestamp=None): | 
|  | """Adds a file to the given ZipFile with a hard-coded modified time. | 
|  |  | 
|  | Args: | 
|  | zip_file: ZipFile instance to add the file to. | 
|  | zip_path: Destination path within the zip file (or ZipInfo instance). | 
|  | src_path: Path of the source file. Mutually exclusive with |data|. | 
|  | data: File data as a string. | 
|  | compress: Whether to enable compression. Default is taken from ZipFile | 
|  | constructor. | 
|  | compress_level: When compress=True, level of compression. | 
|  | alignment: If set, align the data of the entry to this many bytes. | 
|  | timestamp: The last modification date and time for the archive member. | 
|  | """ | 
|  | assert (src_path is None) != (data is None), ( | 
|  | '|src_path| and |data| are mutually exclusive.') | 
|  | if isinstance(zip_path, zipfile.ZipInfo): | 
|  | zipinfo = zip_path | 
|  | zip_path = zipinfo.filename | 
|  | else: | 
|  | zipinfo = zipfile.ZipInfo(filename=zip_path) | 
|  | zipinfo.external_attr = 0o644 << 16 | 
|  |  | 
|  | zipinfo.date_time = _hermetic_date_time(timestamp) | 
|  |  | 
|  | if alignment: | 
|  | _set_alignment(zip_file, zipinfo, alignment) | 
|  |  | 
|  | # Filenames can contain backslashes, but it is more likely that we've | 
|  | # forgotten to use forward slashes as a directory separator. | 
|  | assert '\\' not in zip_path, 'zip_path should not contain \\: ' + zip_path | 
|  | assert not posixpath.isabs(zip_path), 'Absolute zip path: ' + zip_path | 
|  | assert not zip_path.startswith('..'), 'Should not start with ..: ' + zip_path | 
|  | assert posixpath.normpath(zip_path) == zip_path, ( | 
|  | f'Non-canonical zip_path: {zip_path} vs: {posixpath.normpath(zip_path)}') | 
|  | assert zip_path not in zip_file.namelist(), ( | 
|  | 'Tried to add a duplicate zip entry: ' + zip_path) | 
|  |  | 
|  | if src_path and os.path.islink(src_path): | 
|  | zipinfo.external_attr |= stat.S_IFLNK << 16  # mark as a symlink | 
|  | zip_file.writestr(zipinfo, os.readlink(src_path)) | 
|  | return | 
|  |  | 
|  | # Maintain the executable bit. | 
|  | if src_path: | 
|  | st = os.stat(src_path) | 
|  | for mode in (stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH): | 
|  | if st.st_mode & mode: | 
|  | zipinfo.external_attr |= mode << 16 | 
|  |  | 
|  | if src_path: | 
|  | with open(src_path, 'rb') as f: | 
|  | data = f.read() | 
|  |  | 
|  | # zipfile will deflate even when it makes the file bigger. To avoid | 
|  | # growing files, disable compression at an arbitrary cut off point. | 
|  | if len(data) < 16: | 
|  | compress = False | 
|  |  | 
|  | # None converts to ZIP_STORED, when passed explicitly rather than the | 
|  | # default passed to the ZipFile constructor. | 
|  | compress_type = zip_file.compression | 
|  | if compress is not None: | 
|  | compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED | 
|  | zip_file.writestr(zipinfo, data, compress_type, compresslevel=compress_level) | 
|  |  | 
|  |  | 
|  | def add_files_to_zip(inputs, | 
|  | output, | 
|  | *, | 
|  | base_dir=None, | 
|  | path_transform=None, | 
|  | compress=None, | 
|  | compress_level=1, | 
|  | zip_prefix_path=None, | 
|  | timestamp=None): | 
|  | """Creates a zip file from a list of files. | 
|  |  | 
|  | Args: | 
|  | inputs: A list of paths to zip, or a list of (zip_path, fs_path) tuples. | 
|  | output: Path, fileobj, or ZipFile instance to add files to. | 
|  | base_dir: Prefix to strip from inputs. | 
|  | path_transform: Called for each entry path. Returns a new zip path, or None | 
|  | to skip the file. | 
|  | compress: Whether to compress | 
|  | compress_level: When compress=True, level of compression. | 
|  | zip_prefix_path: Path prepended to file path in zip file. | 
|  | timestamp: Unix timestamp to use for files in the archive. | 
|  | """ | 
|  | if base_dir is None: | 
|  | base_dir = '.' | 
|  | input_tuples = [] | 
|  | for tup in inputs: | 
|  | if isinstance(tup, str): | 
|  | src_path = tup | 
|  | zip_path = os.path.relpath(src_path, base_dir) | 
|  | # Zip files always use / as path separator. | 
|  | if os.path.sep != posixpath.sep: | 
|  | zip_path = str(pathlib.Path(zip_path).as_posix()) | 
|  | tup = (zip_path, src_path) | 
|  | input_tuples.append(tup) | 
|  |  | 
|  | # Sort by zip path to ensure stable zip ordering. | 
|  | input_tuples.sort(key=lambda tup: tup[0]) | 
|  |  | 
|  | out_zip = output | 
|  | if not isinstance(output, zipfile.ZipFile): | 
|  | out_zip = zipfile.ZipFile(output, 'w') | 
|  |  | 
|  | try: | 
|  | for zip_path, fs_path in input_tuples: | 
|  | if zip_prefix_path: | 
|  | zip_path = posixpath.join(zip_prefix_path, zip_path) | 
|  | if path_transform: | 
|  | zip_path = path_transform(zip_path) | 
|  | if zip_path is None: | 
|  | continue | 
|  | add_to_zip_hermetic(out_zip, | 
|  | zip_path, | 
|  | src_path=fs_path, | 
|  | compress=compress, | 
|  | compress_level=compress_level, | 
|  | timestamp=timestamp) | 
|  | finally: | 
|  | if output is not out_zip: | 
|  | out_zip.close() | 
|  |  | 
|  |  | 
|  | def zip_directory(output, base_dir, **kwargs): | 
|  | """Zips all files in the given directory.""" | 
|  | inputs = [] | 
|  | for root, _, files in os.walk(base_dir): | 
|  | for f in files: | 
|  | inputs.append(os.path.join(root, f)) | 
|  |  | 
|  | add_files_to_zip(inputs, output, base_dir=base_dir, **kwargs) | 
|  |  | 
|  |  | 
|  | def merge_zips(output, | 
|  | input_zips, | 
|  | path_transform=None, | 
|  | compress=None, | 
|  | compress_level=1): | 
|  | """Combines all files from |input_zips| into |output|. | 
|  |  | 
|  | Args: | 
|  | output: Path, fileobj, or ZipFile instance to add files to. | 
|  | input_zips: Iterable of paths to zip files to merge. | 
|  | path_transform: Called for each entry path. Returns a new zip path, or None | 
|  | to skip the file. | 
|  | compress: Overrides compression setting from origin zip entries. | 
|  | compress_level: When compress=True, level of compression. | 
|  | """ | 
|  | assert not isinstance(input_zips, str)  # Easy mistake to make. | 
|  | if isinstance(output, zipfile.ZipFile): | 
|  | out_zip = output | 
|  | out_filename = output.filename | 
|  | else: | 
|  | assert isinstance(output, str), 'Was: ' + repr(output) | 
|  | out_zip = zipfile.ZipFile(output, 'w') | 
|  | out_filename = output | 
|  |  | 
|  | # Include paths in the existing zip here to avoid adding duplicate files. | 
|  | crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()} | 
|  |  | 
|  | try: | 
|  | for in_file in input_zips: | 
|  | with zipfile.ZipFile(in_file, 'r') as in_zip: | 
|  | for info in in_zip.infolist(): | 
|  | # Ignore directories. | 
|  | if info.filename[-1] == '/': | 
|  | continue | 
|  | if path_transform: | 
|  | dst_name = path_transform(info.filename) | 
|  | if dst_name is None: | 
|  | continue | 
|  | else: | 
|  | dst_name = info.filename | 
|  |  | 
|  | data = in_zip.read(info) | 
|  |  | 
|  | # If there's a duplicate file, ensure contents is the same and skip | 
|  | # adding it multiple times. | 
|  | if dst_name in crc_by_name: | 
|  | orig_filename, orig_crc = crc_by_name[dst_name] | 
|  | new_crc = zipfile.crc32(data) | 
|  | if new_crc == orig_crc: | 
|  | continue | 
|  | msg = f"""File appeared in multiple inputs with differing contents. | 
|  | File: {dst_name} | 
|  | Input1: {orig_filename} | 
|  | Input2: {in_file}""" | 
|  | raise Exception(msg) | 
|  |  | 
|  | if compress is not None: | 
|  | compress_entry = compress | 
|  | else: | 
|  | compress_entry = info.compress_type != zipfile.ZIP_STORED | 
|  | add_to_zip_hermetic(out_zip, | 
|  | dst_name, | 
|  | data=data, | 
|  | compress=compress_entry, | 
|  | compress_level=compress_level) | 
|  | crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC) | 
|  | finally: | 
|  | if output is not out_zip: | 
|  | out_zip.close() |