blob: 0bedb08a308a2c9ae9e266dcc88e4b0c903eb9d9 [file] [log] [blame] [edit]
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Wraps os, os.path and shutil functions to work around MAX_PATH on Windows."""
import builtins
import inspect
import logging
import os
import re
import shutil
import subprocess
import sys
if sys.platform == 'win32':
import ctypes
from ctypes import wintypes
from ctypes import windll
CreateSymbolicLinkW = windll.kernel32.CreateSymbolicLinkW
CreateSymbolicLinkW.argtypes = (
wintypes.LPCWSTR, wintypes.LPCWSTR, wintypes.DWORD)
CreateSymbolicLinkW.restype = wintypes.BOOL
DeleteFile = windll.kernel32.DeleteFileW
DeleteFile.argtypes = (wintypes.LPCWSTR,)
DeleteFile.restype = wintypes.BOOL
GetFileAttributesW = windll.kernel32.GetFileAttributesW
GetFileAttributesW.argtypes = (wintypes.LPCWSTR,)
GetFileAttributesW.restype = wintypes.DWORD
RemoveDirectory = windll.kernel32.RemoveDirectoryW
RemoveDirectory.argtypes = (wintypes.LPCWSTR,)
RemoveDirectory.restype = wintypes.BOOL
DeviceIoControl = windll.kernel32.DeviceIoControl
DeviceIoControl.argtypes = (
wintypes.HANDLE, wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
wintypes.LPVOID, wintypes.DWORD, ctypes.POINTER(wintypes.DWORD),
wintypes.LPVOID)
DeviceIoControl.restype = wintypes.BOOL
# It's *much* faster to temporarily waste 32Kb of memory than call
# DeviceIoControl() one additional time to figure out the buffer size to
# allocate.
_MAX_WIN_PATH_LENGTH = 32768
class _SYMBOLIC_LINK_REPARSE_BUFFER(ctypes.Structure):
_fields_ = (
('SubstituteNameOffset', wintypes.USHORT),
('SubstituteNameLength', wintypes.USHORT),
('PrintNameOffset', wintypes.USHORT),
('PrintNameLength', wintypes.USHORT),
('Flags', wintypes.ULONG),
('PathBuffer', wintypes.WCHAR * _MAX_WIN_PATH_LENGTH),
)
class _MOUNT_POINT_REPARSE_BUFFER(ctypes.Structure):
_fields_ = (
('SubstituteNameOffset', wintypes.USHORT),
('SubstituteNameLength', wintypes.USHORT),
('PrintNameOffset', wintypes.USHORT),
('PrintNameLength', wintypes.USHORT),
('PathBuffer', wintypes.WCHAR * _MAX_WIN_PATH_LENGTH),
)
class _REPARSE_BUFFER(ctypes.Union):
_fields_ = (
('SymbolicLinkReparseBuffer', _SYMBOLIC_LINK_REPARSE_BUFFER),
('MountPointReparseBuffer', _MOUNT_POINT_REPARSE_BUFFER),
)
# https://msdn.microsoft.com/en-us/data/ff552012(v=vs.96)
class REPARSE_DATA_BUFFER(ctypes.Structure):
_fields_ = (
# A bitpacked value.
('ReparseTag', wintypes.DWORD),
('ReparseDataLength', wintypes.WORD),
('Reserved', wintypes.WORD),
('ReparseBuffer', _REPARSE_BUFFER),
)
# Flags for GetFileAttributesW().
FILE_ATTRIBUTE_REPARSE_POINT = 1024
INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
# Flags for CreateFileW().
FILE_FLAG_BACKUP_SEMANTICS = 0x2000000
FILE_FLAG_OPEN_REPARSE_POINT = 0x200000
FILE_SHARE_READ = 1
FILE_SHARE_WRITE = 2
FILE_SHARE_DELETE = 4
FILE_READ_EA = 8
OPEN_ALWAYS = 4
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
# Flags for CreateSymbolicLinkW().
SYMBOLIC_LINK_FLAG_DIRECTORY = 1
SYMBOLIC_LINK_FLAG_ALLOW_UNPRIVILEGED_CREATE = 2
# Flag for DeviceIoControl().
FSCTL_GET_REPARSE_POINT = 0x900a8
# https://msdn.microsoft.com/en-us/library/dd541667.aspx
IO_REPARSE_TAG_MOUNT_POINT = 0xa0000003
IO_REPARSE_TAG_SYMLINK = 0xa000000c
# Set to true or false once symlink support is initialized.
_SUPPORTS_SYMLINKS = None
def _supports_unprivileged_symlinks():
"""Returns True if the OS supports sane symlinks without any user privilege
modification.
Actively work around AppCompat version lie shim. This is kinda insane to
shell out to figure out the real Windows version but this is ironically the
the most reliable way. This code was inspired by _get_os_numbers() in
//appengine/swarming/swarming_bot/api/platforms/win.py.
"""
global _SUPPORTS_SYMLINKS
if _SUPPORTS_SYMLINKS != None:
return _SUPPORTS_SYMLINKS
_SUPPORTS_SYMLINKS = False
# Windows is lying to us until python adds to its manifest:
# <supportedOS Id="{8e0f7a12-bfb3-4fe8-b9a5-48fd50a15a9a}"/>
# and it doesn't.
# So ask nicely to cmd.exe instead, which will always happily report the
# right version. Here's some sample output:
# - XP: Microsoft Windows XP [Version 5.1.2600]
# - Win10: Microsoft Windows [Version 10.0.10240]
# - Win7 or Win2K8R2: Microsoft Windows [Version 6.1.7601]
try:
out = subprocess.check_output(['cmd.exe', '/c', 'ver'], text=True).strip()
match = re.search(r'\[Version (\d+\.\d+)\.(\d+)\]', out, re.IGNORECASE)
if match:
# That's a bit gross but good enough.
major = float(match.group(1))
if major > 10:
_SUPPORTS_SYMLINKS = True
elif major == 10:
# https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/
_SUPPORTS_SYMLINKS = int(match.group(2)) >= 14971
except (subprocess.CalledProcessError, OSError):
# Catastrophic issue.
pass
return _SUPPORTS_SYMLINKS
def extend(path):
"""Adds '\\\\?\\' when given an absolute path so the MAX_PATH (260) limit is
not enforced.
"""
assert os.path.isabs(path), path
assert isinstance(path, str), "%s, type: %s" % (path, type(path))
prefix = '\\\\?\\'
return path if path.startswith(prefix) else prefix + path
def trim(path):
"""Removes '\\\\?\\' when receiving a path."""
assert isinstance(path, str), "%s, type: %s" % (path, type(path))
prefix = '\\\\?\\'
if path.startswith(prefix):
path = path[len(prefix):]
assert os.path.isabs(path), path
return path
def islink(path):
"""Proper implementation of islink() for Windows.
The stdlib is broken.
https://msdn.microsoft.com/library/windows/desktop/aa365682.aspx
os.path.islink() always returns false on WindowsNT/95 and OS/2 in python2,
https://github.com/python/cpython/blob/2.7/Lib/ntpath.py#L220
and also for Windows prior to 6.0 in python3.
https://github.com/python/cpython/blob/3.9/Lib/ntpath.py#L228
"""
res = GetFileAttributesW(extend(path))
if res == INVALID_FILE_ATTRIBUTES:
return False
return bool(res & FILE_ATTRIBUTE_REPARSE_POINT)
def symlink(source, link_name):
"""Creates a symlink on Windows 7 and later.
This function will only work once SeCreateSymbolicLinkPrivilege has been
enabled. See file_path.enable_symlink().
Useful material:
CreateSymbolicLinkW:
https://msdn.microsoft.com/library/windows/desktop/aa363866.aspx
UAC and privilege stripping:
https://msdn.microsoft.com/library/bb530410.aspx
Privilege constants:
https://msdn.microsoft.com/library/windows/desktop/bb530716.aspx
Windows 10 and developer mode:
https://blogs.windows.com/buildingapps/2016/12/02/symlinks-windows-10/
"""
f = extend(link_name)
# We need to convert to absolute path, so we can test if it points to a
# directory or a file.
real_source = source
if not os.path.isabs(source):
real_source = extend(
os.path.normpath(os.path.join(os.path.dirname(link_name), source)))
# pylint: disable=undefined-variable
# isdir is generated dynamically.
flags = SYMBOLIC_LINK_FLAG_DIRECTORY if isdir(real_source) else 0
if _supports_unprivileged_symlinks():
# This enables support for this specific case:
# - Windows 10 with build 14971 or later
# - Admin account
# - UAC enabled
# - Developer mode enabled (not the default)
#
# In this specific case, file_path.enable_symlink() is unnecessary and the
# following flag make it magically work.
flags |= SYMBOLIC_LINK_FLAG_ALLOW_UNPRIVILEGED_CREATE
if not CreateSymbolicLinkW(f, source, flags):
# pylint: disable=undefined-variable
err = ctypes.GetLastError()
if err == 1314:
raise WindowsError(
('symlink(%r, %r) failed: ERROR_PRIVILEGE_NOT_HELD(1314). Try '
'calling file_path.enable_symlink() first') % (source, link_name))
raise WindowsError('symlink(%r, %r) failed: %s' %
(source, link_name, err))
def remove(path):
"""Removes a symlink on Windows 7 and later.
Does not delete the link source.
If path is not a link, but a non-empty directory, will fail with a
WindowsError.
Useful material:
CreateSymbolicLinkW:
https://msdn.microsoft.com/library/windows/desktop/aa363866.aspx
DeleteFileW:
https://msdn.microsoft.com/en-us/library/windows/desktop/aa363915(v=vs.85).aspx
RemoveDirectoryW:
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365488(v=vs.85).aspx
"""
path = extend(path)
# pylint: disable=undefined-variable
# isdir is generated dynamically.
if isdir(path):
if not RemoveDirectory(path):
# pylint: disable=undefined-variable
raise WindowsError('unlink(%r): could not remove directory: %s' %
(path, ctypes.GetLastError()))
else:
if DeleteFile(path):
return
logging.warning('failed to call DeleteFile(%r): win error: %s', path,
ctypes.GetLastError())
# fallback to unlink if it fails to call DeleteFile.
os.unlink(path)
def readlink(path):
"""Reads a symlink on Windows."""
# Interestingly, when using FILE_FLAG_OPEN_REPARSE_POINT and the destination
# is not a reparse point, the actual file will be opened. It's the
# DeviceIoControl() below that will fail with 4390.
handle = windll.kernel32.CreateFileW(
extend(path),
FILE_READ_EA,
FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
None,
OPEN_ALWAYS,
FILE_FLAG_BACKUP_SEMANTICS|FILE_FLAG_OPEN_REPARSE_POINT,
None)
if handle == INVALID_HANDLE_VALUE:
# pylint: disable=undefined-variable
raise WindowsError('readlink(%r): failed to open: %s' %
(path, ctypes.GetLastError()))
try:
buf = REPARSE_DATA_BUFFER()
returned = wintypes.DWORD()
ret = DeviceIoControl(
handle, FSCTL_GET_REPARSE_POINT, None, 0, ctypes.byref(buf),
ctypes.sizeof(buf), ctypes.byref(returned), None)
if not ret:
err = ctypes.GetLastError()
# ERROR_MORE_DATA(234) should not happen because we preallocate the
# maximum size.
if err == 4390:
# pylint: disable=undefined-variable
raise WindowsError(
'readlink(%r): failed to read: ERROR_NOT_A_REPARSE_POINT(4390)' %
path)
# pylint: disable=undefined-variable
raise WindowsError('readlink(%r): failed to read: %s' % (path, err))
if buf.ReparseTag == IO_REPARSE_TAG_SYMLINK:
actual = buf.ReparseBuffer.SymbolicLinkReparseBuffer
elif buf.ReparseTag == IO_REPARSE_TAG_MOUNT_POINT:
actual = buf.ReparseBuffer.MountPointReparseBuffer
else:
raise WindowsError( # pylint: disable=undefined-variable
'readlink(%r): succeeded but doesn\'t know how to parse result!' %
path)
off = actual.PrintNameOffset // 2
end = off + actual.PrintNameLength // 2
return actual.PathBuffer[off:end]
finally:
windll.kernel32.CloseHandle(handle)
def walk(top, topdown=True, onerror=None, followlinks=False):
# We need to reimplement walk() here because the standard library ignores
# the flag followlinks=False. This is because ntpath.islink() is hardcoded
# to return false. On Windows, os.path is an alias to ntpath.
utop = extend(top)
try:
names = listdir(utop) # pylint: disable=undefined-variable
except os.error as err:
if onerror is not None:
onerror(err)
return
dirs, nondirs = [], []
for name in names:
if isdir(os.path.join(top, name)): # pylint: disable=undefined-variable
dirs.append(name)
else:
nondirs.append(name)
if topdown:
yield top, dirs, nondirs
for name in dirs:
new_path = os.path.join(top, name)
if followlinks or not islink(new_path):
for x in walk(new_path, topdown, onerror, followlinks):
yield x
if not topdown:
yield top, dirs, nondirs
else:
def extend(path):
"""Path extending is not needed on POSIX."""
assert os.path.isabs(path), path
assert isinstance(path, str), "%s, type: %s" % (path, type(path))
return path
def trim(path):
"""Path mangling is not needed on POSIX."""
assert os.path.isabs(path), path
assert isinstance(path, str), "%s, type: %s" % (path, type(path))
return path
def islink(path):
return os.path.islink(extend(path))
def symlink(source, link_name):
return os.symlink(source, extend(link_name))
def readlink(path):
return os.readlink(extend(path))
def walk(top, *args, **kwargs):
for root, dirs, files in os.walk(extend(top), *args, **kwargs):
yield trim(root), dirs, files
def remove(path):
return os.remove(extend(path))
## builtin
def open(path, *args, **kwargs): # pylint: disable=redefined-builtin
return builtins.open(extend(path), *args, **kwargs)
## os
def link(source, link_name):
return os.link(extend(source), extend(link_name))
def rename(old, new):
return os.rename(extend(old), extend(new))
def renames(old, new):
return os.renames(extend(old), extend(new))
def unlink(path):
return remove(path)
## shutil
def copy2(src, dst):
return shutil.copy2(extend(src), extend(dst))
def rmtree(path, *args, **kwargs):
return shutil.rmtree(extend(path), *args, **kwargs)
## The rest
def _get_lambda(func):
return lambda path, *args, **kwargs: func(extend(path), *args, **kwargs)
def _is_path_fn(func):
return (inspect.getargspec(func)[0] or [None]) == 'path'
_os_fns = (
'access', 'chdir', 'chflags', 'chroot', 'chmod', 'chown', 'lchflags',
'lchmod', 'lchown', 'listdir', 'lstat', 'mknod', 'mkdir', 'makedirs',
'removedirs', 'rmdir', 'stat', 'statvfs', 'utime')
_os_path_fns = (
'exists', 'lexists', 'getatime', 'getmtime', 'getctime', 'getsize', 'isfile',
'isdir', 'ismount')
for _fn in _os_fns:
if hasattr(os, _fn):
sys.modules[__name__].__dict__.setdefault(
_fn, _get_lambda(getattr(os, _fn)))
for _fn in _os_path_fns:
if hasattr(os.path, _fn):
sys.modules[__name__].__dict__.setdefault(
_fn, _get_lambda(getattr(os.path, _fn)))