| """ |
| Low-level OS functionality wrappers used by pathlib. |
| """ |
| |
| from errno import * |
| from io import TextIOWrapper, text_encoding |
| import os |
| import sys |
| try: |
| import fcntl |
| except ImportError: |
| fcntl = None |
| try: |
| import posix |
| except ImportError: |
| posix = None |
| try: |
| import _winapi |
| except ImportError: |
| _winapi = None |
| |
| |
| def _get_copy_blocksize(infd): |
| """Determine blocksize for fastcopying on Linux. |
| Hopefully the whole file will be copied in a single call. |
| The copying itself should be performed in a loop 'till EOF is |
| reached (0 return) so a blocksize smaller or bigger than the actual |
| file size should not make any difference, also in case the file |
| content changes while being copied. |
| """ |
| try: |
| blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB |
| except OSError: |
| blocksize = 2 ** 27 # 128 MiB |
| # On 32-bit architectures truncate to 1 GiB to avoid OverflowError, |
| # see gh-82500. |
| if sys.maxsize < 2 ** 32: |
| blocksize = min(blocksize, 2 ** 30) |
| return blocksize |
| |
| |
| if fcntl and hasattr(fcntl, 'FICLONE'): |
| def _ficlone(source_fd, target_fd): |
| """ |
| Perform a lightweight copy of two files, where the data blocks are |
| copied only when modified. This is known as Copy on Write (CoW), |
| instantaneous copy or reflink. |
| """ |
| fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd) |
| else: |
| _ficlone = None |
| |
| |
| if posix and hasattr(posix, '_fcopyfile'): |
| def _fcopyfile(source_fd, target_fd): |
| """ |
| Copy a regular file content using high-performance fcopyfile(3) |
| syscall (macOS). |
| """ |
| posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA) |
| else: |
| _fcopyfile = None |
| |
| |
| if hasattr(os, 'copy_file_range'): |
| def _copy_file_range(source_fd, target_fd): |
| """ |
| Copy data from one regular mmap-like fd to another by using a |
| high-performance copy_file_range(2) syscall that gives filesystems |
| an opportunity to implement the use of reflinks or server-side |
| copy. |
| This should work on Linux >= 4.5 only. |
| """ |
| blocksize = _get_copy_blocksize(source_fd) |
| offset = 0 |
| while True: |
| sent = os.copy_file_range(source_fd, target_fd, blocksize, |
| offset_dst=offset) |
| if sent == 0: |
| break # EOF |
| offset += sent |
| else: |
| _copy_file_range = None |
| |
| |
| if hasattr(os, 'sendfile'): |
| def _sendfile(source_fd, target_fd): |
| """Copy data from one regular mmap-like fd to another by using |
| high-performance sendfile(2) syscall. |
| This should work on Linux >= 2.6.33 only. |
| """ |
| blocksize = _get_copy_blocksize(source_fd) |
| offset = 0 |
| while True: |
| sent = os.sendfile(target_fd, source_fd, offset, blocksize) |
| if sent == 0: |
| break # EOF |
| offset += sent |
| else: |
| _sendfile = None |
| |
| |
| if _winapi and hasattr(_winapi, 'CopyFile2'): |
| def copyfile2(source, target): |
| """ |
| Copy from one file to another using CopyFile2 (Windows only). |
| """ |
| _winapi.CopyFile2(source, target, 0) |
| else: |
| copyfile2 = None |
| |
| |
| def copyfileobj(source_f, target_f): |
| """ |
| Copy data from file-like object source_f to file-like object target_f. |
| """ |
| try: |
| source_fd = source_f.fileno() |
| target_fd = target_f.fileno() |
| except Exception: |
| pass # Fall through to generic code. |
| else: |
| try: |
| # Use OS copy-on-write where available. |
| if _ficlone: |
| try: |
| _ficlone(source_fd, target_fd) |
| return |
| except OSError as err: |
| if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV): |
| raise err |
| |
| # Use OS copy where available. |
| if _fcopyfile: |
| try: |
| _fcopyfile(source_fd, target_fd) |
| return |
| except OSError as err: |
| if err.errno not in (EINVAL, ENOTSUP): |
| raise err |
| if _copy_file_range: |
| try: |
| _copy_file_range(source_fd, target_fd) |
| return |
| except OSError as err: |
| if err.errno not in (ETXTBSY, EXDEV): |
| raise err |
| if _sendfile: |
| try: |
| _sendfile(source_fd, target_fd) |
| return |
| except OSError as err: |
| if err.errno != ENOTSOCK: |
| raise err |
| except OSError as err: |
| # Produce more useful error messages. |
| err.filename = source_f.name |
| err.filename2 = target_f.name |
| raise err |
| |
| # Last resort: copy with fileobj read() and write(). |
| read_source = source_f.read |
| write_target = target_f.write |
| while buf := read_source(1024 * 1024): |
| write_target(buf) |
| |
| |
| def _open_reader(obj): |
| cls = type(obj) |
| try: |
| open_reader = cls.__open_reader__ |
| except AttributeError: |
| cls_name = cls.__name__ |
| raise TypeError(f"{cls_name} can't be opened for reading") from None |
| else: |
| return open_reader(obj) |
| |
| |
| def _open_writer(obj, mode): |
| cls = type(obj) |
| try: |
| open_writer = cls.__open_writer__ |
| except AttributeError: |
| cls_name = cls.__name__ |
| raise TypeError(f"{cls_name} can't be opened for writing") from None |
| else: |
| return open_writer(obj, mode) |
| |
| |
| def _open_updater(obj, mode): |
| cls = type(obj) |
| try: |
| open_updater = cls.__open_updater__ |
| except AttributeError: |
| cls_name = cls.__name__ |
| raise TypeError(f"{cls_name} can't be opened for updating") from None |
| else: |
| return open_updater(obj, mode) |
| |
| |
| def vfsopen(obj, mode='r', buffering=-1, encoding=None, errors=None, |
| newline=None): |
| """ |
| Open the file pointed to by this path and return a file object, as |
| the built-in open() function does. |
| |
| Unlike the built-in open() function, this function additionally accepts |
| 'openable' objects, which are objects with any of these special methods: |
| |
| __open_reader__() |
| __open_writer__(mode) |
| __open_updater__(mode) |
| |
| '__open_reader__' is called for 'r' mode; '__open_writer__' for 'a', 'w' |
| and 'x' modes; and '__open_updater__' for 'r+' and 'w+' modes. If text |
| mode is requested, the result is wrapped in an io.TextIOWrapper object. |
| """ |
| if buffering != -1: |
| raise ValueError("buffer size can't be customized") |
| text = 'b' not in mode |
| if text: |
| # Call io.text_encoding() here to ensure any warning is raised at an |
| # appropriate stack level. |
| encoding = text_encoding(encoding) |
| try: |
| return open(obj, mode, buffering, encoding, errors, newline) |
| except TypeError: |
| pass |
| if not text: |
| if encoding is not None: |
| raise ValueError("binary mode doesn't take an encoding argument") |
| if errors is not None: |
| raise ValueError("binary mode doesn't take an errors argument") |
| if newline is not None: |
| raise ValueError("binary mode doesn't take a newline argument") |
| mode = ''.join(sorted(c for c in mode if c not in 'bt')) |
| if mode == 'r': |
| stream = _open_reader(obj) |
| elif mode in ('a', 'w', 'x'): |
| stream = _open_writer(obj, mode) |
| elif mode in ('+r', '+w'): |
| stream = _open_updater(obj, mode[1]) |
| else: |
| raise ValueError(f'invalid mode: {mode}') |
| if text: |
| stream = TextIOWrapper(stream, encoding, errors, newline) |
| return stream |
| |
| |
| def vfspath(obj): |
| """ |
| Return the string representation of a virtual path object. |
| """ |
| cls = type(obj) |
| try: |
| vfspath_method = cls.__vfspath__ |
| except AttributeError: |
| cls_name = cls.__name__ |
| raise TypeError(f"expected JoinablePath object, not {cls_name}") from None |
| else: |
| return vfspath_method(obj) |
| |
| |
| def ensure_distinct_paths(source, target): |
| """ |
| Raise OSError(EINVAL) if the other path is within this path. |
| """ |
| # Note: there is no straightforward, foolproof algorithm to determine |
| # if one directory is within another (a particularly perverse example |
| # would be a single network share mounted in one location via NFS, and |
| # in another location via CIFS), so we simply checks whether the |
| # other path is lexically equal to, or within, this path. |
| if source == target: |
| err = OSError(EINVAL, "Source and target are the same path") |
| elif source in target.parents: |
| err = OSError(EINVAL, "Source path is a parent of target path") |
| else: |
| return |
| err.filename = vfspath(source) |
| err.filename2 = vfspath(target) |
| raise err |
| |
| |
| def ensure_different_files(source, target): |
| """ |
| Raise OSError(EINVAL) if both paths refer to the same file. |
| """ |
| try: |
| source_file_id = source.info._file_id |
| target_file_id = target.info._file_id |
| except AttributeError: |
| if source != target: |
| return |
| else: |
| try: |
| if source_file_id() != target_file_id(): |
| return |
| except (OSError, ValueError): |
| return |
| err = OSError(EINVAL, "Source and target are the same file") |
| err.filename = vfspath(source) |
| err.filename2 = vfspath(target) |
| raise err |