blob: 629b0f07b42a9e0e656e787232c57538cfa6d81b [file] [log] [blame]
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""All functions related to manipulating paths in recipes.
Recipes handle paths a bit differently than python does. All path manipulation
in recipes revolves around Path objects. These objects store a base path (always
absolute), plus a list of components to join with it. New paths can be derived
by calling the .join method with additional components.
In this way, all paths in Recipes are absolute, and are constructed from a small
collection of anchor points. The built-in anchor points are:
* `api.path['start_dir']` - This is the directory that the recipe started in.
it's similar to `cwd`, except that it's constant.
* `api.path['cache']` - This directory is provided by whatever's running the
recipe. Files and directories created under here /may/ be evicted in between
runs of the recipe (i.e. to relieve disk pressure).
* `api.path['cleanup']` - This directory is provided by whatever's running the
recipe. Files and directories created under here /are guaranteed/ to be
evicted in between runs of the recipe. Additionally, this directory is
guaranteed to be empty when the recipe starts.
* `api.path['tmp_base']` - This directory is the system-configured temp dir.
This is a weaker form of 'cleanup', and its use should be avoided. This may
be removed in the future (or converted to an alias of 'cleanup').
* `api.path['checkout']` - This directory is set by various checkout modules
in recipes. It was originally intended to make recipes easier to read and
make code somewhat generic or homogeneous, but this was a mistake. New code
should avoid 'checkout', and instead just explicitly pass paths around. This
path may be removed in the future.
There are other anchor points which can be defined (e.g. by the
`depot_tools/infra_paths` module). Refer to those modules for additional
documentation.
"""
from __future__ import annotations
import collections
from collections.abc import Iterable
import copy
import enum
import itertools
import os
import re
import tempfile
import types
from typing import Any, Callable, Literal, cast
from recipe_engine import recipe_api, recipe_test_api
from recipe_engine import config_types
class FileType(enum.Enum):
FILE = 1
DIRECTORY = 2
CheckoutPathName = 'checkout'
class Error(Exception):
"""Error specific to path recipe module."""
def PathToString(api, test):
def PathToString_inner(path):
assert isinstance(path, config_types.Path)
return api.join(path.base.resolve(test.enabled), *path.pieces)
return PathToString_inner
class path_set:
"""Implements a set which contains all the parents folders of added
folders."""
# TODO(iannucci): Expand this to be a full fakey filesystem, including file
# contents and file types. Coordinate with the `file` module.
def __init__(self, path_mod: fake_path, initial_paths):
self._path_mod: types.ModuleType = path_mod
self._initial_paths: set[config_types.Path]|None = set(initial_paths)
# An entry in self._paths means an object exists in the mock filesystem.
# The value (either FILE or DIRECTORY) is the type of that object.
self._paths: dict[config_types.Path, FileType] = {}
def _initialize(self) -> None: # pylint: disable=method-hidden
self._initialize: Callable[[], None] = lambda: None
for path in self._initial_paths:
self.add(path, FileType.FILE)
self._initial_paths = None
self.contains: Callable[[config_types.Path], bool] = (
lambda path: path in self._paths
)
@property
def _separator(self) -> str:
return self._path_mod.sep
def _is_contained_in(self, path: config_types.Path,
root: config_types.Path, match_root: bool) -> bool:
if not path.startswith(root):
return False
if len(path) == len(root):
return match_root
return path[len(root)] == self._separator
def add(self, path: config_types.Path, kind: FileType):
path = str(path)
self._initialize()
prev_path: str|None = None
while path != prev_path:
self._paths[path] = kind
prev_path, path = path, self._path_mod.dirname(path)
kind = FileType.DIRECTORY
def copy(self, source: config_types.Path, dest: config_types.Path) -> None:
source, dest = str(source), str(dest)
self._initialize()
to_add: dict[str, FileType] = {}
for p in self._paths:
if self._is_contained_in(p, source, match_root=True):
to_add[p.replace(source, dest)] = self._paths[p]
for path, kind in to_add.items():
self.add(path, kind)
def remove(self, path: config_types.Path,
filt: Callable[[config_types.Path], bool]) -> None:
path: str = str(path)
self._initialize()
match_root: bool = True
if path[-1] == self._separator:
match_root = False
path: str = path.rstrip(self._separator)
kill_set: set[config_types.Path] = set(
p for p in self._paths
if self._is_contained_in(p, path, match_root) and filt(p))
for entry in kill_set:
del self._paths[entry]
# pylint: disable=method-hidden
def contains(self, path: config_types.Path) -> bool:
self._initialize()
return self.contains(path)
def kind(self, path: config_types.Path) -> FileType:
self._initialize()
return self._paths[path]
import ntpath
import posixpath
PathCommonModule = Literal[ntpath, posixpath]
class fake_path:
"""Standin for os.path when we're in test mode.
This class simulates the os.path interface exposed by PathApi, respecting the
current platform according to the `platform` module. This allows us to
simulate path functions according to the platform being tested, rather than
the platform which is currently running.
"""
def __init__(self, is_windows: bool, _mock_path_exists):
if is_windows:
import ntpath as pth
else:
import posixpath as pth
self._pth: PathCommonModule = pth
self._mock_path_exists = path_set(self, _mock_path_exists)
def __getattr__(self, name: str) -> Any:
return getattr(self._pth, name)
def mock_add_paths(self, path: config_types.Path, kind: FileType) -> None:
"""Adds a path and all of its parents to the set of existing paths."""
assert kind in FileType
self._mock_path_exists.add(path, kind)
def mock_copy_paths(self, source: config_types.Path,
dest: config_types.Path) -> None:
"""Duplicates a path and all of its children to another path."""
self._mock_path_exists.copy(source, dest)
def mock_remove_paths(self, path: config_types.Path,
filt: Callable[[config_types.Path], bool]) -> None:
"""Removes a path and all of its children from the set of existing paths."""
self._mock_path_exists.remove(path, filt)
def exists(self, path: config_types.Path) -> bool: # pylint: disable=E0202
"""Returns True if path refers to an existing path."""
return self._mock_path_exists.contains(path)
def isdir(self, path: config_types.Path) -> bool:
return (self.exists(path) and
self._mock_path_exists.kind(path) == FileType.DIRECTORY)
def isfile(self, path: config_types.Path) -> bool:
return (self.exists(path) and
self._mock_path_exists.kind(path) == FileType.FILE)
# This matches:
# [START_DIR]
# RECIPE[some_pkg::some_module:recipe_name]
#
# and friends at the beginning of a string.
ROOT_MATCHER = re.compile(r'^[A-Z_]*\[[^]]*\]')
def normpath(self, path: config_types.Path) -> config_types.Path:
"""Normalizes the path.
This splits off a recipe base (i.e. RECIPE[...]) so that normpath is
only called on the user-supplied portion of the path.
"""
real_normpath = self._pth.normpath
m = self.ROOT_MATCHER.match(path)
if m:
prefix = m.group(0)
rest = path[len(prefix):]
if rest == '':
# normpath turns '' into '.'
return prefix
return prefix + real_normpath(rest)
return real_normpath(path)
def abspath(self, path: config_types.Path) -> config_types.Path:
"""Returns the absolute version of path."""
return self.normpath(path)
def realpath(self, path: config_types.Path) -> config_types.Path:
"""Returns the canonical version of the path."""
return self.normpath(path)
class PathApi(recipe_api.RecipeApi):
_paths_client = recipe_api.RequireClient('paths')
# This is the literal string 'checkout'.
#
# This is only being added as an intermediate step to removing the
# dictionary-like API from the path module, and will be removed in the near
# future. Do not use this.
#
# Use the .checkout_dir @property directly, instead.
CheckoutPathName = CheckoutPathName
def get_config_defaults(self) -> dict[str, Any]:
"""Internal recipe implementation function."""
# TODO(iannucci): Completely remove config from path.
return {
'START_DIR': self._startup_cwd,
'TEMP_DIR': self._temp_dir,
'CACHE_DIR': self._cache_dir,
'CLEANUP_DIR': self._cleanup_dir,
'HOME_DIR': self._home_dir,
}
def __init__(self, path_properties, **kwargs):
super().__init__(**kwargs)
config_types.Path.set_tostring_fn(PathToString(self, self._test_data))
config_types.NamedBasePath.set_path_api(self)
self._path_properties = path_properties
# Assigned at "initialize".
# NT or POSIX path module, or "os.path" in prod.
self._path_mod: ModuleType|None = None
self._startup_cwd: config_types.Path|None = None
self._temp_dir: config_types.Path|None = None
self._cache_dir: config_types.Path|None = None
self._cleanup_dir: config_types.Path|None = None
self._home_dir: config_types.Path|None = None
# checkout_dir can be set at most once per recipe run.
self._checkout_dir: config_types.Path|None = None
# Used in mkdtemp and mkstemp when generating and checking expectations.
self._test_counter: collections.Counter = collections.Counter()
def _read_path(self, property_name, default): # pragma: no cover
"""Reads a path from a property. If absent, returns the default.
Validates that the path is absolute.
"""
value = self._path_properties.get(property_name)
if not value:
assert os.path.isabs(default), default
return default
if not os.path.isabs(value):
raise Error('Path "%s" specified by module property %s is not absolute' %
(value, property_name))
return value
def _ensure_dir(self, path: str) -> None: # pragma: no cover
os.makedirs(path, exist_ok=True)
def _split_path(self, path: config_types.Path
) -> tuple[str, ...]: # pragma: no cover
"""Relative or absolute path -> tuple of components."""
abs_path: list[str, ...] = os.path.abspath(path).split(self.sep)
# Guarantee that the first element is an absolute drive or the posix root.
if abs_path[0].endswith(':'):
abs_path[0] += '\\'
elif abs_path[0] == '':
abs_path[0] = '/'
else:
assert False, 'Got unexpected path format: %r' % abs_path
return tuple(abs_path)
def initialize(self) -> None:
"""Internal recipe implementation function."""
if not self._test_data.enabled: # pragma: no cover
self._path_mod: ModuleType = os.path
start_dir = self._paths_client.start_dir
self._startup_cwd = self._split_path(start_dir)
self._home_dir = self._split_path(self._path_mod.expanduser('~'))
tmp_dir = self._read_path('temp_dir', tempfile.gettempdir())
self._ensure_dir(tmp_dir)
self._temp_dir = self._split_path(tmp_dir)
cache_dir = self._read_path('cache_dir', os.path.join(start_dir, 'cache'))
self._ensure_dir(cache_dir)
self._cache_dir = self._split_path(cache_dir)
# If no cleanup directory is specified, assume that any directory
# underneath of the working directory is transient and will be purged in
# between builds.
cleanup_dir = self._read_path('cleanup_dir',
os.path.join(start_dir, 'recipe_cleanup'))
self._ensure_dir(cleanup_dir)
self._cleanup_dir = self._split_path(cleanup_dir)
else:
tdata = cast(recipe_test_api.ModuleTestData, self._test_data)
# HACK: The platform test_api sets platform.name specifically for the
# path module when users use api.platform.name(...) in their tests.
# This is dirty, but it avoids a LOT of interdependency complexity.
#
# In the current version of this code, we initialize _path_mod in
# `initialize` (rather than __init__) which is already late, but we also
# are calling the set_tostring_fn and set_path_api global variable hacks
# in __init__ which globally modify the behavior of NamedBasePath and Path
# across the entire process.
#
# In a subsequent CL, we will be able to move _path_mod initialization
# into __init__, and remove the set_tostring_fn/set_path_api
# interdependency, and we will also be able to return fully-encapsulated
# Path objects from this module.
is_windows: bool = tdata.get('platform.name', 'linux') == 'win'
self._path_mod = fake_path(is_windows, tdata.get('exists', []))
root: str = 'C:\\' if is_windows else '/'
self._startup_cwd = [root, 'b', 'FakeTestingCWD']
# Appended to placeholder '[TMP]' to get fake path in test.
self._temp_dir = [root]
self._cache_dir = [root, 'b', 'c']
self._cleanup_dir = [root, 'b', 'cleanup']
self._home_dir = [root, 'home', 'fake_user']
self.set_config('BASE')
def assert_absolute(self, path: config_types.Path | str) -> None:
"""Raises AssertionError if the given path is not an absolute path.
Args:
* path - The path to check.
"""
if self.abspath(path) != str(path):
raise AssertionError('%s is not absolute' % path)
def mkdtemp(self, prefix: str = tempfile.template) -> config_types.Path:
"""Makes a new temporary directory, returns Path to it.
Args:
* prefix - a tempfile template for the directory name (defaults to "tmp").
Returns a Path to the new directory.
"""
if not self._test_data.enabled: # pragma: no cover
# New path as str.
new_path = tempfile.mkdtemp(prefix=prefix, dir=str(self['cleanup']))
# Ensure it's under self._cleanup_dir, convert to Path.
new_path = self._split_path(new_path)
assert new_path[:len(self._cleanup_dir)] == self._cleanup_dir, (
'new_path: %r -- cleanup_dir: %r' % (new_path, self._cleanup_dir))
temp_dir = self['cleanup'].join(*new_path[len(self._cleanup_dir):])
else:
self._test_counter[prefix] += 1
assert isinstance(prefix, str)
temp_dir = self['cleanup'].join('%s_tmp_%d' %
(prefix, self._test_counter[prefix]))
self.mock_add_paths(temp_dir, FileType.DIRECTORY)
return temp_dir
def mkstemp(self, prefix: str = tempfile.template) -> config_types.Path:
"""Makes a new temporary file, returns Path to it.
Args:
* prefix - a tempfile template for the file name (defaults to "tmp").
Returns a Path to the new file. Unlike tempfile.mkstemp, the file's file
descriptor is closed.
"""
if not self._test_data.enabled: # pragma: no cover
# New path as str.
fd, new_path = tempfile.mkstemp(prefix=prefix, dir=str(self['cleanup']))
# Ensure it's under self._cleanup_dir, convert to Path.
split_path: list[str] = self._split_path(new_path)
assert split_path[:len(self._cleanup_dir)] == self._cleanup_dir, (
'new_path: %r -- cleanup_dir: %r' % (split_path, self._cleanup_dir))
temp_file: config_types.Path = self['cleanup'].join(
*split_path[len(self._cleanup_dir):])
os.close(fd)
else:
self._test_counter[prefix] += 1
assert isinstance(prefix, str)
temp_file: config_types.Path = self['cleanup'].join(
'%s_tmp_%d' % (prefix, self._test_counter[prefix]))
self.mock_add_paths(temp_file, FileType.FILE)
return temp_file
def abs_to_path(self, abs_string_path: str) -> config_types.Path:
"""Converts an absolute path string `abs_string_path` to a real Path
object, using the most appropriate known base path.
* abs_string_path MUST be an absolute path
* abs_string_path MUST be rooted in one of the configured base paths known
to the path module.
This method will find the longest match in all the following:
* module resource paths
* recipe resource paths
* repo paths
* checkout_dir
* base_paths
Example:
```
# assume [START_DIR] == "/basis/dir/for/recipe"
api.path.abs_to_path("/basis/dir/for/recipe/some/other/dir") ->
Path("[START_DIR]/some/other/dir")
```
Raises an ValueError if the preconditions are not met, otherwise returns the
Path object.
"""
if isinstance(abs_string_path, config_types.Path):
return abs_string_path
ap = self.abspath(abs_string_path)
if ap != abs_string_path:
raise ValueError("path is not absolute: %r v %r" % (abs_string_path, ap))
# try module/recipe/repo resource paths first
sPath, path = self._paths_client.find_longest_prefix(
abs_string_path, self.sep)
if path is None:
# try base paths now
for path_name in itertools.chain((CheckoutPathName,), self.c.base_paths):
path = self[path_name]
sPath = str(path)
if abs_string_path.startswith(sPath):
break
else:
path = None
if path is None:
raise ValueError("could not figure out a base path for %r" %
abs_string_path)
sub_path = abs_string_path[len(sPath):].strip(self.sep)
return path.join(*sub_path.split(self.sep))
def __contains__(self, pathname: str) -> bool:
if pathname == CheckoutPathName:
return bool(self.checkout_dir)
return pathname in self.c.base_paths
def __setitem__(self, pathname: Literal[CheckoutPathName], path: config_types.Path) -> None:
"""Sets the checkout path.
DEPRECATED - Use `api.path.set_checkout_dir` instead.
The only valid value of `pathname` is the literal string CheckoutPathName.
"""
if pathname != CheckoutPathName:
raise ValueError(
f'The only valid dynamic path value is `checkout`. Got {pathname!r}.'
' Use `api.path.checkout_dir = <path>` instead.')
self.checkout_dir = path
@property
def checkout_dir(self) -> config_types.Path|None:
"""Returns the Path which was assigned to this checkout_dir property."""
return self._checkout_dir
@checkout_dir.setter
def checkout_dir(self, path: config_types.Path) -> None:
"""Sets the global variable `api.path.checkout_dir` to the given path.
"""
if not isinstance(path, config_types.Path):
raise ValueError(
f'api.path.checkout_dir called with bad type: {path!r} ({type(path)})')
if (current := self._checkout_dir) is not None:
if current == path:
return
raise ValueError(
f'api.path.checkout_dir can only be set once. old:{current!r} new:{path!r}')
self._checkout_dir = path
def get(self,
name: str,
default: config_types.Path|None = None) -> config_types.Path:
"""Gets the base path named `name`. See module docstring for more info."""
if name == CheckoutPathName:
return config_types.Path(config_types.NamedBasePath(CheckoutPathName))
if name in self.c.base_paths:
return config_types.Path(config_types.NamedBasePath(name))
return default
def __getitem__(self, name: str) -> config_types.Path:
"""Gets the base path named `name`. See module docstring for more info."""
result = self.get(name)
if not result:
raise KeyError('Unknown path: %s' % name)
return result
@property
def pardir(self) -> str:
"""Equivalent to os.pardir."""
return self._path_mod.pardir
@property
def sep(self) -> str:
"""Equivalent to os.sep."""
return self._path_mod.sep
@property
def pathsep(self) -> str:
"""Equivalent to os.pathsep."""
return self._path_mod.pathsep
def abspath(self, path: config_types.Path | str):
"""Equivalent to os.abspath."""
return self._path_mod.abspath(str(path))
def basename(self, path: config_types.Path | str):
"""Equivalent to os.path.basename."""
return self._path_mod.basename(str(path))
def dirname(self, path: config_types.Path | str) -> config_types.Path | str:
"""For "foo/bar/baz", return "foo/bar".
This corresponds to os.path.dirname().
The type of the return value matches the type of the argument.
Args:
path: path to take directory name of
Returns dirname of path
"""
if isinstance(path, config_types.Path):
return self.abs_to_path(self._path_mod.dirname(str(path)))
# If path is not a Path object it's likely a string. Leave return value as a
# string.
return self._path_mod.dirname(str(path))
def join(self, path, *paths):
"""Equivalent to os.path.join.
Note that Path objects returned from this module (e.g.
api.path['start_dir']) have a built-in join method (e.g.
new_path = p.join('some', 'name')). Many recipe modules expect Path objects
rather than strings. Using this `join` method gives you raw path joining
functionality and returns a string.
If your path is rooted in one of the path module's root paths (i.e. those
retrieved with api.path[something]), then you can convert from a string path
back to a Path with the `abs_to_path` method.
"""
return self._path_mod.join(str(path), *map(str, paths))
def split(self, path):
"""For "foo/bar/baz", return ("foo/bar", "baz").
This corresponds to os.path.split().
The type of the first item in the return value matches the type of the
argument.
Args:
path (Path or str): path to split into directory name and basename
Returns (dirname(path), basename(path)).
"""
dirname, basename = self._path_mod.split(str(path))
if isinstance(path, config_types.Path):
return (self.abs_to_path(dirname), basename)
# If path is not a Path object it's likely a string. Leave both elements in
# return tuple as strings.
return (dirname, basename)
def splitext(
self, path: config_types.Path | str
) -> tuple[config_types.Path | str, str]:
"""For "foo/bar.baz", return ("foo/bar", ".baz").
This corresponds to os.path.splitext().
The type of the first item in the return value matches the type of the
argument.
Args:
path: Path to split into name and extension
Returns:
(name, extension_including_dot).
"""
name, ext = self._path_mod.splitext(str(path))
if isinstance(path, config_types.Path):
return (self.abs_to_path(name), ext)
# If path is not a Path object it's likely a string. Leave both elements in
# return tuple as strings.
return (name, ext)
def realpath(self, path: config_types.Path | str):
"""Equivalent to os.path.realpath."""
return self._path_mod.realpath(str(path))
def relpath(self, path, start):
"""Roughly equivalent to os.path.relpath.
Unlike os.path.relpath, `start` is _required_. If you want the 'current
directory', use the `recipe_engine/context` module's `cwd` property.
"""
return self._path_mod.relpath(str(path), str(start))
def normpath(self, path):
"""Equivalent to os.path.normpath."""
return self._path_mod.normpath(str(path))
def expanduser(self, path): # pragma: no cover
"""Do not use this, use `api.path['home']` instead.
This ONLY handles `path` == "~", and returns `str(api.path['home'])`.
"""
if path == "~":
return str(self['home'])
raise ValueError("expanduser only supports `~`.")
def exists(self, path):
"""Equivalent to os.path.exists.
The presence or absence of paths can be mocked during the execution of the
recipe by using the mock_* methods.
"""
return self._path_mod.exists(str(path))
def isdir(self, path):
"""Equivalent to os.path.isdir.
The presence or absence of paths can be mocked during the execution of the
recipe by using the mock_* methods.
"""
return self._path_mod.isdir(str(path))
def isfile(self, path):
"""Equivalent to os.path.isfile.
The presence or absence of paths can be mocked during the execution of the
recipe by using the mock_* methods.
"""
return self._path_mod.isfile(str(path))
def mock_add_paths(self, path: config_types.Path,
kind: FileType = FileType.FILE) -> None:
"""For testing purposes, mark that |path| exists."""
if self._test_data.enabled:
self._path_mod.mock_add_paths(path, kind)
def mock_add_file(self, path: config_types.Path) -> None:
"""For testing purposes, mark that file |path| exists."""
self.mock_add_paths(path, FileType.FILE)
def mock_add_directory(self, path: config_types.Path) -> None:
"""For testing purposes, mark that file |path| exists."""
self.mock_add_paths(path, FileType.DIRECTORY)
def mock_copy_paths(self, source: config_types.Path,
dest: config_types.Path) -> None:
"""For testing purposes, copy |source| to |dest|."""
if self._test_data.enabled:
self._path_mod.mock_copy_paths(source, dest)
def mock_remove_paths(
self,
path: config_types.Path,
should_remove: Callable[[str], bool] = lambda p: True) -> None:
"""For testing purposes, mark that |path| doesn't exist.
Args:
path: The path to remove.
should_remove: Called for every candidate path. Return True to remove this
path.
"""
if self._test_data.enabled:
self._path_mod.mock_remove_paths(path, should_remove)
def separate(self, path: config_types.Path) -> None:
"""Separate a path's pieces in-place with this platform's separator char."""
path.separate(self.sep)
def eq(self, path1: config_types.Path, path2: config_types.Path) -> bool:
"""Check whether path1 points to the same path as path2.
Under most circumstances, path equality is checked via `path1 == path2`.
However, if the paths are constructed via differently joined dirs, such as
('foo' / 'bar') vs. ('foo/bar'), that doesn't work. This method addresses
that problem by creating copies of the paths, and then separating them
according to self.sep. The original paths are not modified.
"""
path1_copy = copy.deepcopy(path1)
path2_copy = copy.deepcopy(path2)
self.separate(path1_copy)
self.separate(path2_copy)
return path1_copy == path2_copy
def is_parent_of(self, parent: config_types.Path,
child: config_types.Path) -> bool:
"""Check whether child is contained within parent.
Under most circumstances, this would be checked via
`parent.is_parent_of(child)`. However, if the paths are constructed via
differently joined dirs, such as ('foo', 'bar') vs. ('foo/bar', 'baz.txt'),
that doesn't work. This method addresses that problem by creating copies of
the paths, and then separating them according to self.sep. The original
paths are not modified.
"""
parent_copy = copy.deepcopy(parent)
child_copy = copy.deepcopy(child)
self.separate(parent_copy)
self.separate(child_copy)
return parent_copy.is_parent_of(child_copy)