blob: 54dc862432ab471a9c44ecd068c9e3b981975347 [file] [log] [blame]
#!/usr/bin/env vpython3
# Copyright 2017 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Utility exporting basic filesystem operations.
This file was cut from "scripts/common/chromium_utils.py" at:
91310531c31fa645256b4fb5d44b460c42b3e151
"""
import argparse
import errno
import glob
import hashlib
import itertools
import json
import os
import shutil
import subprocess
import sys
import tempfile
import time
def _RmGlob(file_wildcard, root, include_hidden):
"""Removes files matching 'file_wildcard' in root and its subdirectories, if
any exists.
An exception is thrown if root doesn't exist."""
wildcard = os.path.join(os.path.realpath(root), file_wildcard)
for item in glob.glob(wildcard, recursive=True, include_hidden=include_hidden):
try:
os.remove(item)
except OSError as e:
if e.errno != errno.ENOENT:
raise
def _RmContents(path):
if os.path.exists(path):
os.chmod(path, 0o770)
for p in (os.path.join(path, x) for x in os.listdir(path)):
if os.path.isdir(p):
_RmTree(p)
else:
os.unlink(p)
def _RmTree(path):
"""Recursively removes a directory, even if it's marked read-only.
Remove the directory located at path, if it exists.
shutil.rmtree() doesn't work on Windows if any of the files or directories
are read-only, which svn repositories and some .svn files are. We need to
be able to force the files to be writable (i.e., deletable) as we traverse
the tree.
Even with all this, Windows still sometimes fails to delete a file, citing
a permission error (maybe something to do with antivirus scans or disk
indexing). The best suggestion any of the user forums had was to wait a
bit and try again, so we do that too. It's hand-waving, but sometimes it
works. :/
"""
if not os.path.exists(path):
print('WARNING: Failed to find %s during rmtree. Ignoring.\n' % path)
return
if sys.platform == 'win32':
# Give up and use cmd.exe's rd command.
cmd = ['cmd.exe', '/c', 'rd', '/q', '/s', os.path.normcase(path)]
for _ in range(3):
print('RemoveDirectory running %s' % (' '.join(cmd)))
if not subprocess.call(cmd):
break
print(' Failed')
time.sleep(3)
return
# If we call "rmtree" on a file, just delete it.
if not os.path.isdir(path):
os.remove(path)
return
def RemoveWithRetry_non_win(rmfunc, path):
if os.path.islink(path):
return os.remove(path)
return rmfunc(path)
remove_with_retry = RemoveWithRetry_non_win
def RmTreeOnError(function, path, excinfo):
r"""This works around a problem whereby python 2.x on Windows has no ability
to check for symbolic links. os.path.islink always returns False. But
shutil.rmtree will fail if invoked on a symbolic link whose target was
deleted before the link. E.g., reproduce like this:
> mkdir test
> mkdir test\1
> mklink /D test\current test\1
> python -c "import chromium_utils; chromium_utils.RemoveDirectory('test')"
To avoid this issue, we pass this error-handling function to rmtree. If
we see the exact sort of failure, we ignore it. All other failures we re-
raise.
"""
exception_type = excinfo[0]
exception_value = excinfo[1]
# If shutil.rmtree encounters a symbolic link on Windows, os.listdir will
# fail with a WindowsError exception with an ENOENT errno (i.e., file not
# found). We'll ignore that error. Note that WindowsError is not defined
# for non-Windows platforms, so we use OSError (of which it is a subclass)
# to avoid lint complaints about an undefined global on non-Windows
# platforms.
if (function is os.listdir) and issubclass(exception_type, OSError):
if exception_value.errno == errno.ENOENT:
# File does not exist, and we're trying to delete, so we can ignore the
# failure.
print('WARNING: Failed to list %s during rmtree. Ignoring.\n' % path)
else:
raise
else:
raise
for root, dirs, files in os.walk(path, topdown=False):
# For POSIX: making the directory writable guarantees removability.
# Windows will ignore the non-read-only bits in the chmod value.
os.chmod(root, 0o770)
for name in files:
remove_with_retry(os.remove, os.path.join(root, name))
for name in dirs:
remove_with_retry(lambda p: shutil.rmtree(p, onerror=RmTreeOnError),
os.path.join(root, name))
remove_with_retry(os.rmdir, path)
def _EnsureDir(mode, dest):
if not os.path.isdir(dest):
if os.path.exists(dest):
raise OSError(errno.EEXIST, os.strerror(errno.EEXIST))
os.makedirs(dest, mode)
def _Glob(base, pattern, include_hidden):
base = os.path.realpath(base)
hits = glob.glob(os.path.join(base, pattern), recursive=True, include_hidden=include_hidden)
if hits:
print('\n'.join(sorted((os.path.relpath(hit, start=base) for hit in hits))))
def _ListDir(base, recursive):
if recursive:
out = []
for dirpath, _, files in os.walk(base):
out.extend(os.path.relpath(os.path.join(dirpath, f), base) for f in files)
else:
out = os.listdir(base)
# Encode and decode to avoid throwing errors when the `base` dir contains
# unencodable unicode file or dir names. Bad characters will be replaced with
# "?" mark.
print('\n'.join(sorted(out)).encode('utf-8', 'replace').decode(), end='')
def _Remove(path):
try:
os.remove(path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
def _Truncate(path, size_mb):
with open(path, 'w') as f:
f.truncate(size_mb * 1024 * 1024)
def _FlattenSingleDirectories(path):
assert os.path.isabs(path), 'nonabs path: %r' % (path,)
assert os.path.isdir(path), 'nondir path: %r' % (path,)
first_single_dir = None
print('flattening single directories in %r' % (path,))
for root, dirs, files in os.walk(path):
# if it's a single dir, we keep walking
if len(dirs) == 1 and not files:
if not first_single_dir:
first_single_dir = os.path.join(path, dirs[0])
continue
# otherwise we found some stuff!
if not first_single_dir:
# if we didn't find a first_single_dir, we're still in the base directory
# and don't have anything to do.
print('contents appears already flattened')
return 0
print('found contents at: %r' % (os.path.relpath(root, path),))
# first move the first_single_dir out of the way, in case there's
# a file/folder we need to move that has a conflicting name.
tmpname = tempfile.mktemp(dir=path)
print('moving root folder out of the way: %r -> %r' %
(first_single_dir, tmpname))
os.rename(first_single_dir, tmpname)
for name in itertools.chain(dirs, files):
fullname = os.path.join(root, name).replace(first_single_dir, tmpname)
to = os.path.join(path, name)
print('mv %r %r' % (fullname, to))
os.rename(fullname, to)
print('moved %d dirs and %d files' % (len(dirs), len(files)))
print('rm -rf %r' % (tmpname,))
shutil.rmtree(tmpname)
return 0
def _FileHash(sha, rel_path, base_path):
path = os.path.join(base_path, rel_path)
with open(path, 'rb') as f:
sha.update(str(len(rel_path)).encode())
sha.update(rel_path.encode())
while True:
f_stream = f.read(4096)
if not f_stream:
break
sha.update(str(len(f_stream)).encode())
sha.update(f_stream)
def _ComputeHashPaths(base_path, *rel_paths):
sha = hashlib.sha256()
for rel_path in rel_paths:
path = os.path.join(base_path, rel_path)
if os.path.isfile(path):
_FileHash(sha, rel_path, base_path)
elif os.path.isdir(path):
for root, dirs, files in os.walk(path, topdown=True):
dirs.sort() # ensure we walk dirs in sorted order
files.sort()
for f_name in files:
rel_file_path = os.path.relpath(os.path.join(root, f_name), base_path)
_FileHash(sha, rel_file_path, base_path)
print(sha.hexdigest())
return 0
def _CalculateHash(path):
sha = hashlib.sha256()
with open(path, 'rb') as f:
while True:
f_stream = f.read(4096)
if not f_stream:
break
sha.update(f_stream)
print(sha.hexdigest())
return 0
def main(args):
parser = argparse.ArgumentParser()
parser.add_argument('--json-output', required=True,
type=argparse.FileType('w'),
help="path to JSON output file")
subparsers = parser.add_subparsers()
# Subcommand: rmtree
subparser = subparsers.add_parser('rmtree',
help='Recursively remove a directory.')
subparser.add_argument('source', help='A path to remove.')
subparser.set_defaults(func=lambda opts: _RmTree(opts.source))
# Subcommand: chmod
subparser = subparsers.add_parser('chmod', help='Run chmod on a file.')
subparser.add_argument('path', help='A path to run chmod on.')
subparser.add_argument(
'--mode',
help='The octal mode of the file or directory.',
type=lambda s: int(s, 8))
subparser.set_defaults(func=lambda opts: os.chmod(opts.path, opts.mode))
# Subcommand: rmcontents
subparser = subparsers.add_parser('rmcontents',
help='Recursively remove the contents of a directory.')
subparser.add_argument('source', help='The target directory.')
subparser.set_defaults(func=lambda opts: _RmContents(opts.source))
# Subcommand: rmwildcard
subparser = subparsers.add_parser('rmglob',
help='Recursively remove the contents of a directory.')
subparser.add_argument('root', help='The directory to search through.')
subparser.add_argument('wildcard', help='The wildcard expression to remove.')
subparser.add_argument('--hidden', action='store_true',
help='Include hidden files.')
subparser.set_defaults(func=lambda opts:
_RmGlob(opts.wildcard, opts.root, opts.hidden))
# Subcommand: copy
subparser = subparsers.add_parser('copy',
help='Copy one file to another. Behaves like shutil.copy().')
subparser.add_argument('source', help='The file to copy.')
subparser.add_argument('dest', help='The destination to copy to.')
subparser.set_defaults(func=lambda opts: shutil.copy(opts.source, opts.dest))
# Subcommand: copytree
subparser = subparsers.add_parser('copytree',
help='Recursively copy a file tree. Behaves like shutil.copytree().')
subparser.add_argument('--symlinks', action='store_true',
help='Copy symlinks as symlinks.')
subparser.add_argument('source', help='The directory to copy.')
subparser.add_argument('dest', help='The destination directory to copy to.')
subparser.set_defaults(
func=lambda opts: shutil.copytree(opts.source, opts.dest, opts.symlinks))
# Subcommand: move
subparser = subparsers.add_parser('move',
help='Moves/renames a file. Behaves like shutil.move().')
subparser.add_argument('source', help='The item to move.')
subparser.add_argument('dest', help='The destination name.')
subparser.set_defaults(
func=lambda opts: shutil.move(opts.source, opts.dest))
# Subcommand: glob
subparser = subparsers.add_parser('glob',
help='Prints a list of absolute paths with match the pattern.')
subparser.add_argument('base', help='The directory to glob in.')
subparser.add_argument('pattern', help='The glob pattern to expand.')
subparser.add_argument('--hidden', action='store_true',
help='Include hidden files.')
subparser.set_defaults(func=lambda opts:
_Glob(opts.base, opts.pattern, opts.hidden))
# Subcommand: remove
subparser = subparsers.add_parser('remove',
help='Remove a file')
subparser.add_argument('source', help='The file to remove.')
subparser.set_defaults(func=lambda opts: _Remove(opts.source))
# Subcommand: listdir
subparser = subparsers.add_parser('listdir',
help='Print all entries in the given folder to stdout.')
subparser.add_argument('source', help='The dir to list.')
subparser.add_argument('--recursive', action='store_true',
help='Recurse into subdirectories.')
subparser.set_defaults(
func=lambda opts: _ListDir(opts.source, opts.recursive))
# Subcommand: ensure-directory
subparser = subparsers.add_parser('ensure-directory',
help='Ensures that the given path is a directory.')
subparser.add_argument('--mode', help='The octal mode of the directory.',
type=lambda s: int(s, 8))
subparser.add_argument('dest', help='The dir to ensure.')
subparser.set_defaults(func=lambda opts: _EnsureDir(opts.mode, opts.dest))
# Subcommand: filesizes
subparser = subparsers.add_parser('filesizes',
help='Prints a list for sizes in bytes (1 per line) for each given file')
subparser.add_argument('file', nargs='+', help='Path to a file')
subparser.set_defaults(
func=lambda opts: print('\n'.join(str(os.stat(f).st_size)
for f in opts.file)))
# Subcommand: filesizes
subparser = subparsers.add_parser('symlink',
help='Creates a symlink. Behaves like os.symlink.')
subparser.add_argument('source', help='The thing to link to.')
subparser.add_argument('link', help='The link to create.')
subparser.set_defaults(
func=lambda opts: os.symlink(opts.source, opts.link))
# Subcommand: truncate
subparser = subparsers.add_parser(
'truncate', help='Creates an empty file with specified size.')
subparser.add_argument('path', help='The path to the file.')
subparser.add_argument('size_mb', help='The size of the file in megabytes.',
type=int)
subparser.set_defaults(func=lambda opts: _Truncate(opts.path, opts.size_mb))
# Subcommand: flatten_single_directories
subparser = subparsers.add_parser(
'flatten_single_directories',
help=('Moves contents of single/dir/with/contents to the top level '
'directory.'))
subparser.add_argument('path', help='The path to flatten from.')
subparser.set_defaults(func=lambda opts: _FlattenSingleDirectories(opts.path))
# Subcommand: compute_hash
subparser = subparsers.add_parser(
'compute_hash',
help='Computes hash of provided absolute directories and/or files.')
subparser.add_argument('base_path', help='Base path to normalize all files.')
subparser.add_argument('rel_paths', nargs='+',
help='List of relative paths of directories '
'and/or files.')
subparser.set_defaults(func=lambda opts: _ComputeHashPaths(opts.base_path,
*opts.rel_paths))
# Subcommand: file_hash
subparser = subparsers.add_parser(
'file_hash',
help='Computes hash of a file in provided absolute path.')
subparser.add_argument('file_path', help='Absolute path for the file.')
subparser.set_defaults(func=lambda opts: _CalculateHash(opts.file_path))
# Parse arguments.
opts = parser.parse_args(args)
# Actually do the thing.
data = {
'ok': False,
'errno_name': '',
'message': '',
}
try:
opts.func(opts)
data['ok'] = True
except shutil.Error as e:
# Note that shutil.Error's "message" field can sometimes be a tuple, just
# render the entire exception as a string to be safe.
data['message'] = str(e)
except OSError as e:
if e.errno:
data['errno_name'] = errno.errorcode[e.errno]
data['message'] = str(e)
except Exception as e:
data['message'] = 'UNKNOWN: %s' % e
with opts.json_output:
json.dump(data, opts.json_output)
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))