blob: 444c4985881a82c03dcbc4afb065f09361b44530 [file] [log] [blame]
# Copyright 2018 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
#py3Only
"""Standalone Python script to extract an archive. Intended to be used by
the 'archive' recipe module internally. Should not be used elsewhere.
"""
# [VPYTHON:BEGIN]
# python_version: "3.11"
# wheel: <
# name: "infra/python/wheels/zstandard/${vpython_platform}"
# version: "version:0.16.0"
# >
# [VPYTHON:END]
import argparse
import fnmatch
import json
import os
import posixpath
import shutil
import stat
import sys
import tarfile
import zipfile
import zstandard
if os.name == 'nt':
def unc_path(path):
prefix = '\\\\?\\'
if path.startswith(prefix):
# Already in UNC format.
return path
return prefix + os.path.abspath(path)
else:
def unc_path(path):
return path
def untar(archive_file, output, stats, safe, include_filter):
"""Untars an archive using 'tarfile' python module.
Works everywhere where Python works (Windows and POSIX).
Args:
archive_file: absolute path to an archive to untar.
output: existing directory to untar to.
stats: the stats dict (see main() for its form)
safe (bool): If True, skips extracting files which would escape `output`.
include_filter (fn(path): bool): A function which is given the archive
path and should return True if we should extract it.
"""
# Open regular files in random-access mode, which allows seeking backwards
# (needed to extract archives containing symlinks on some platforms).
# Otherwise, we open the file in stream mode, though this may fail later
# for the aforementioned case.
unc_output = unc_path(output)
fileobj = None
if os.path.isfile(archive_file):
if os.path.basename(archive_file).endswith(('.tar.zst', '.tzst')):
dctx = zstandard.ZstdDecompressor()
archive_fh = open(archive_file, 'rb')
fileobj = dctx.stream_reader(archive_fh, closefd=True)
open_mode = 'r:'
else:
open_mode = 'r:*'
else:
open_mode = 'r|*'
with tarfile.open(archive_file, open_mode, fileobj=fileobj) as tf:
# monkeypatch the TarFile object to allow printing messages for each
# extracted file. extractall makes a single linear pass over the tarfile;
# other naive implementations (such as `getmembers`) end up doing lots of
# random access over the file. Also patch it to support Unicode filenames.
em = tf._extract_member
def _extract_member(tarinfo, targetpath, **kwargs):
unc_targetpath = unc_path(targetpath)
if safe and not unc_targetpath.startswith(unc_output):
print('Skipping %r (would escape root)' % (tarinfo.name,))
stats['skipped']['filecount'] += 1
stats['skipped']['bytes'] += tarinfo.size
stats['skipped']['names'].append(tarinfo.name)
return
if not include_filter(tarinfo.name):
print('Skipping %r (does not match include_files)' % (tarinfo.name,))
return
print('Extracting %r' % (tarinfo.name,))
stats['extracted']['filecount'] += 1
stats['extracted']['bytes'] += tarinfo.size
em(tarinfo, unc_targetpath, **kwargs)
tf._extract_member = _extract_member
tf.extractall(output)
def unzip(zip_file, output, stats, include_filter):
"""Unzips an archive using 'zipfile' python module.
Works everywhere where Python works (Windows and POSIX).
Args:
zip_file: absolute path to an archive to unzip.
output: existing directory to unzip to.
stats: the stats dict (see main() for its form)
include_filter (fn(path): bool): A function which is given the archive
path and should return True if we should extract it.
"""
with zipfile.ZipFile(zip_file) as zf:
for zipinfo in zf.infolist():
if not include_filter(zipinfo.filename):
print('Skipping %r (does not match include_files)' %
(zipinfo.filename,))
continue
print('Extracting %s' % zipinfo.filename)
stats['extracted']['filecount'] += 1
stats['extracted']['bytes'] += zipinfo.file_size
# By default, zipfile extracts a symlink file as regular file with its
# link destination as its contents. Check if the file is a symlink and
# if so, create it properly.
if stat.S_ISLNK(zipinfo.external_attr >> 16) and os.name != 'nt':
print('Creating %s as symlink' % (zipinfo.filename))
link_dest = zf.open(zipinfo).read()
os.symlink(link_dest, os.path.join(output, zipinfo.filename))
else:
zf.extract(zipinfo, unc_path(output))
if os.name != 'nt':
# POSIX may store permissions in the 16 most significant bits of the
# file's external attributes.
perms = (zipinfo.external_attr >> 16) & 0o777
fullpath = os.path.join(output, zipinfo.filename)
if perms and not os.path.islink(fullpath):
# Don't update permissions to be more restrictive.
old = os.stat(fullpath).st_mode
old_short = old & 0o777
new = old | perms
new_short = new & 0o777
if old_short < new_short:
print('Updating %s permissions (0%o -> 0%o)' %
(zipinfo.filename, old_short, new_short))
os.chmod(fullpath, new)
def main():
# See archive/api.py, def extract(...) for format of |data|.
ap = argparse.ArgumentParser()
ap.add_argument('--json-input', type=argparse.FileType('r'))
ap.add_argument('--json-output', type=argparse.FileType('w'))
opts = ap.parse_args()
data = json.load(opts.json_input)
output = data['output']
archive_file = data['archive_file']
file_type = data.get('archive_type',
'zip' if archive_file.endswith('.zip') else 'tar')
safe_mode = data['safe_mode']
include_files = data['include_files']
# Archive path should exist and be an absolute path to a file.
assert os.path.isabs(archive_file), archive_file
assert os.path.isfile(archive_file), archive_file
# Output path should be an absolute path.
assert os.path.isabs(output), output
# Normalize it to end with a path separator.
output = os.path.join(output, '')
print('Extracting %s (%s) -> %s ...' % (archive_file, file_type, output))
try:
os.makedirs(output, exist_ok=True)
stats = {
'extracted': {
'filecount': 0,
'bytes': 0,
},
'skipped': {
'filecount': 0,
'bytes': 0,
'names': [],
},
}
include_filter = lambda _path: True
if include_files:
def include_filter(path):
path = posixpath.normpath(path)
if path.startswith('./'):
path = path[2:]
for pattern in include_files:
if fnmatch.fnmatch(path, pattern):
return True
return False
if file_type == 'zip':
# NOTE: zipfile module is always safe in python 2.7.4+... it mangles
# extracted file names to ensure they don't escape the extraction root.
unzip(archive_file, output, stats, include_filter)
else:
untar(archive_file, output, stats, safe_mode, include_filter)
json.dump(stats, opts.json_output)
except:
shutil.rmtree(output, ignore_errors=True)
raise
return 0
if __name__ == '__main__':
sys.exit(main())