| # Copyright 2013 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| from __future__ import annotations |
| |
| import itertools |
| |
| from dataclasses import dataclass, field |
| from typing import ClassVar, Generator, TYPE_CHECKING |
| |
| if TYPE_CHECKING: |
| from recipe_engine.internal.recipe_deps import RecipeModule, Recipe, RecipeRepo |
| |
| def ResetGlobalVariableAssignments(): |
| """This function is called from inside of the recipe test runner prior to each |
| test case executed. |
| |
| See the class variables below for what they are and what sets them. |
| """ |
| CheckoutBasePath._resolved = None |
| Path._OS_SEP = None |
| |
| |
| @dataclass(frozen=True) |
| class CheckoutBasePath: |
| """CheckoutBasePath is a placeholder base for Paths relative to |
| api.path.checkout_dir. |
| |
| This base is used in the following cases: |
| * Construction of Paths to be sent from GenTests to RunSteps (e.g. when |
| mocking paths with api.path.exists() from GenTests). |
| * In select circumstances when constructing Paths inside of the recipe |
| engine's "config" subsystem. |
| |
| Paths using CheckoutBasePath are 'slippery' and will try to resolve to |
| a ResolvedBasePath at almost every opportunity. Resolving a CheckoutBasePath |
| requires that the recipe has already assigned a value to checkout_dir in the |
| recipe_engine/path module, which in turn will assign to the |
| CheckoutBasePath._resolved class variable. |
| |
| If the checkout_dir has not yet been set, `resolve` on this class will raise |
| a ValueError stating as such. |
| """ |
| |
| # HACK: This is directly assigned to by the recipe_engine/path module in the |
| # checkout_dir setter. |
| # |
| # This is also reset by the ResetGlobalVariableAssignments() function in this |
| # file, which is called from the recipe tests prior to each test case. |
| _resolved: ClassVar[Path | None] = None |
| |
| def maybe_resolve(self) -> Path | None: |
| """If CheckoutBasePath can be resolved to a real Path, return that, |
| otherwise return None.""" |
| if self._resolved: |
| return self._resolved |
| |
| def resolve(self) -> Path: |
| """Resolve this CheckoutBasePath, raise ValueError if it's not yet defined.""" |
| checkout_dir = self.maybe_resolve() |
| if checkout_dir is None: |
| raise ValueError( |
| f'Cannot resolve CheckoutBasePath() - api.path.checkout_dir is unset.' |
| ) |
| return checkout_dir |
| |
| def __str__(self) -> str: |
| """Returns the resolved path as a string. |
| |
| We never want to render CheckoutBasePath as anything other than the real |
| path base that it points to, which means that this will raise ValueError if |
| checkout_dir has not yet been assigned. |
| """ |
| return str(self.resolve()) |
| |
| def __repr__(self) -> str: |
| if self._resolved: |
| return str(self) |
| return 'CheckoutBasePath[UNRESOLVED]' |
| |
| |
| @dataclass(frozen=True, order=True) |
| class ResolvedBasePath: |
| """ResolvedBasePath represents a 'resolved' base path. |
| |
| In tests, this will contain a string like "[START_DIR]", "[CACHE]", etc. These |
| names come from the recipe_engine/path module. |
| |
| In non-tests, this will contain an actual absolute filesystem path as a string. |
| """ |
| resolved: str |
| |
| @classmethod |
| def for_recipe_module(cls, test_enabled: bool, |
| module: RecipeModule) -> ResolvedBasePath: |
| if not test_enabled: |
| return cls(module.path) |
| |
| # We change python's module delimiter . to ::, since . is already used |
| # by expect tests. |
| return cls(f'RECIPE_MODULE[{module.repo.name}::{module.name}]') |
| |
| @classmethod |
| def for_recipe_script_resources(cls, test_enabled: bool, |
| recipe: Recipe) -> ResolvedBasePath: |
| if not test_enabled: |
| return cls(recipe.resources_dir) |
| return cls(f'RECIPE[{recipe.full_name}].resources') |
| |
| @classmethod |
| def for_bundled_repo(cls, test_enabled: bool, |
| repo: RecipeRepo) -> ResolvedBasePath: |
| if not test_enabled: |
| return cls(repo.path) |
| return cls(f'RECIPE_REPO[{repo.name}]') |
| |
| def __repr__(self) -> str: |
| return self.resolved |
| |
| |
| @dataclass(frozen=True) |
| class Path: |
| """Represents an absolute path which is relative to a 'base' path. |
| |
| The `base` is either a ResolvedBasePath or a CheckoutBasePath. |
| |
| This Path is made aware of the currently simulated path separator from the |
| __init__ method of the recipe_engine/path module, which assigns to this |
| class's _OS_SEP variable. |
| """ |
| base: CheckoutBasePath | ResolvedBasePath |
| pieces: tuple[str, ...] |
| |
| # HACK: This is directly assigned to by the recipe_engine/path module, and is |
| # populated with the current path separator character (either '/' or '\\'). |
| # |
| # This is also reset by the ResetGlobalVariableAssignments() function in this |
| # file, which is called from the recipe tests prior to each test case. |
| _OS_SEP: ClassVar[str | None] = None |
| |
| # This field is used to cache the output of __str__. |
| # |
| # Why not use @functools.cache on __str__? Unfortunately, this effectively |
| # creates a global variable Path.__str__.<wrapper func>.cache, which is a dict |
| # mapping Path instances to their __str__() values. This is 'fine', but it ends |
| # up capturing the value of _OS_SEP which can change multiple times per |
| # process run (see ResetGlobalVariableAssignments). This can still be used by |
| # adding Path.__str__.cache_clear() to ResetGlobalVariableAssignments, but |
| # I don't think introducing the extra global variable is necessary or |
| # desirable here, especially since we know that Path is immutable. |
| _str: str | None = field(default=None, repr=False, hash=False, compare=False) |
| |
| def __init__(self, base: CheckoutBasePath | ResolvedBasePath, *pieces: str): |
| """Creates a Path. |
| |
| Args: |
| base: Either a CheckoutBasePath, which represents a placeholder for |
| a ResolvedBasePath, or a ResolvedBasePath. |
| *pieces: The components of the path relative to base. |
| - If this recipe is being run on windows, pieces with '/' or '\\' will be |
| split. On non-windows, they will be split only on '/'. |
| - Split pieces equaling '..' must not go above the `base`. That is, if |
| you give `Path(ResolvedBasePath('[CACHE]'), '..', 'something')`, this will |
| raise ValueError because the '..' would bring this Path above the |
| base. However, `Path(ResolvedBasePath('[CACHE]'), 'something', '..')` |
| is OK and would be equivalent to `Path(ResolvedBasePath('[CACHE]'))`. |
| - Empty pieces and pieces which are '.' are ignored. |
| - If the recipe is not yet running (e.g. you are calling Path from |
| GenTests), and you include a '\\' in a piece, this will raise |
| ValueError (just use '/' or separate the pieces yourself in that case). |
| """ |
| super().__init__() |
| if not isinstance(base, (CheckoutBasePath, ResolvedBasePath)): |
| raise ValueError( |
| 'First argument to Path must be CheckoutBasePath or ResolvedBasePath, ' |
| f'got {base!r} ({type(base)!r})') |
| |
| # If they gave us a CheckoutBasePath, but it's already resolvable, |
| # immediately transmute it into a ResolvedBasePath. |
| if isinstance(base, CheckoutBasePath): |
| if resolved_path := base.maybe_resolve(): |
| base = resolved_path.base |
| pieces = resolved_path.pieces + tuple(pieces) |
| |
| has_backslashes: bool = False |
| for i, piece in enumerate(pieces): |
| if not isinstance(piece, str): |
| raise ValueError('Variadic arguments to Path must only be `str`, ' |
| f'argument {i} was {piece!r} ({type(piece)!r})') |
| has_backslashes = has_backslashes or '\\' in piece |
| |
| # NOTE: we always separate on '/', regardless of _OS_SEP, as users like to |
| # pass pieces to Path constructors and join() which contain a slash already |
| # (but almost never pass them with '\\'). |
| # |
| # However, if they make a path, using backslash in pieces, during GenTests, |
| # we can't tell if these should be separated or not and so raise a ValueError. |
| if self._OS_SEP is None and has_backslashes: |
| raise ValueError( |
| f'Cannot instantiate Path({base!r}, {pieces!r}) - Pieces contain' |
| ' backslash and recipe_engine/path has not been initialized yet.' |
| ' Please use "/" (even for windows) or pass the pieces to join' |
| ' separately.') |
| need_backslash_split = has_backslashes and self._OS_SEP == '\\' |
| |
| normalized_pieces = [] |
| for piece in pieces: |
| slash_pieces = piece.split('/') |
| if need_backslash_split: |
| new_slash_pieces = [] |
| for sp in slash_pieces: |
| new_slash_pieces.extend(sp.split(self._OS_SEP)) |
| slash_pieces = new_slash_pieces |
| normalized_pieces.extend(p for p in slash_pieces if p and p != '.') |
| |
| # At this point normalized_pieces is pieces but where: |
| # * All pieces have been split by / and/or \ - there are no more |
| # splittable slashes in normalized_pieces. |
| # * All empty pieces (which are '' or '.' pieces) have been removed. |
| |
| # Next, we normalize '..' passed in - This is allowed as long as it can be |
| # fully resolved within the given pieces. Otherwise we'll raise an exception |
| # if a joined '..' would take us above the base of this Path. |
| # |
| # Note that we start i at 1 and not 0: if pieces[0] == '..', this will be |
| # caught in the next check section. |
| i = 1 |
| while 0 < i < len(normalized_pieces): |
| piece = normalized_pieces[i] |
| if piece == '..': |
| # At this point normalized_pieces looks like: |
| # 'previous' 'something' '..' 'other' 'things' |
| # i-2 [i-1 i ] i+1 i+2 |
| # |
| # The [section] is the items in [i-1:i+1] (this syntax is a half-open |
| # range). Assigning `[]` to this range will remove the previous element, |
| # and also the '..'. This shifts the list to now be: |
| # 'previous' 'other' 'things' |
| # i-2 i-1 i |
| # |
| # Which means that we need to decrement i by one to evaluate 'other', |
| # which is the next item to analyze. |
| normalized_pieces[i - 1:i + 1] = [] |
| i -= 1 |
| else: |
| i += 1 |
| |
| # Finally, check to see if any '..' was left over and raise. |
| if normalized_pieces and normalized_pieces[0] == '..': |
| raise ValueError( |
| f'Unable to compute {base!r} / {pieces!r} without going above the base.' |
| ) |
| |
| # This is a frozen dataclass, so we have to assign using object.__setattr__. |
| # Believe it or not, this is actually documented in |
| # https://docs.python.org/3.11/library/dataclasses.html#frozen-instances |
| object.__setattr__(self, 'base', base) |
| object.__setattr__(self, 'pieces', tuple(normalized_pieces)) |
| |
| @property |
| def parents(self) -> Generator[Path, None, None]: |
| """For 'foo/bar/baz', yield 'foo/bar' then 'foo'.""" |
| result: list[Path] = [] |
| prev: Path = self |
| curr: Path = self.parent |
| while prev != curr: |
| yield curr |
| prev, curr = curr, curr.parent |
| |
| @property |
| def parent(self) -> Path: |
| """For 'foo/bar/baz', return 'foo/bar'.""" |
| return Path(self.base, *self.pieces[0:-1]) |
| |
| @property |
| def name(self) -> str: |
| """For 'foo/bar/baz', return 'baz'.""" |
| return self.pieces[-1] |
| |
| @property |
| def stem(self) -> str: |
| """For 'dir/foo.tar.gz', return 'foo.tar'.""" |
| return self.name.rsplit('.', 1)[0] |
| |
| @property |
| def suffix(self) -> str: |
| """For 'dir/foo.tar.gz', return '.gz'.""" |
| return '.' + self.name.rsplit('.', 1)[1] |
| |
| @property |
| def suffixes(self) -> str: |
| """For 'dir/foo.tar.gz', return ['.tar', '.gz'].""" |
| return [f'.{x}' for x in self.name.split('.')[1:]] |
| |
| def _resolve(self) -> Path: |
| """If self.base is a ResolvedBasePath, this will return self. |
| |
| Otherwise, this will resolve self.base and return an equivalent path to |
| `self` but with a ResolvedBasePath base. If CheckoutBasePath is |
| unresolvable, this raises ValueError. |
| """ |
| if not isinstance(self.base, CheckoutBasePath): |
| return self |
| return self.base.resolve().joinpath(*self.pieces) |
| |
| def __eq__(self, other: Path | str) -> bool: |
| if isinstance(other, str): |
| return str(self) == other |
| |
| # first, if both bases are checkout, just compare pieces, since we know that |
| # CheckoutBasePath, once assigned, will always match. |
| if (isinstance(self.base, CheckoutBasePath) and |
| isinstance(other.base, CheckoutBasePath)): |
| return self.pieces == other.pieces |
| |
| try: |
| spath = self._resolve() |
| opath = other._resolve() |
| return spath.base == opath.base and spath.pieces == opath.pieces |
| except Exception as ex: |
| raise ValueError('Path.__eq__ invalid for mismatched bases ' |
| '(CheckoutBasePath vs ResolvedBasePath) ' |
| 'before checkout_dir is set') from ex |
| |
| def __lt__(self, other: Path | str) -> bool: |
| if isinstance(other, str): |
| return str(self) < other |
| |
| # first, if both bases are checkout, just compare pieces, since we know that |
| # CheckoutBasePath, once assigned, will always match. |
| if (isinstance(self.base, CheckoutBasePath) and |
| isinstance(other.base, CheckoutBasePath)): |
| return self.pieces < other.pieces |
| try: |
| spath = self._resolve() |
| opath = other._resolve() |
| return (spath.base, spath.pieces) < (opath.base, opath.pieces) |
| except Exception as ex: |
| raise ValueError('Path.__lt__ invalid for mismatched bases ' |
| '(CheckoutBasePath vs ResolvedBasePath) ' |
| 'before checkout_dir is set') from ex |
| |
| def __truediv__(self, piece: str) -> Path: |
| """Adds the shorthand '/'-operator for .joinpath(), returning a new path.""" |
| return self.joinpath(piece) |
| |
| def joinpath(self, *pieces: str) -> Path: |
| """Appends *pieces to this Path, returning a new Path. |
| |
| Empty values ('', None) in pieces will be omitted. |
| |
| Args: |
| pieces: The components of the path relative to base. The normal Path |
| __init__ rules for '..' and '.' apply. |
| |
| Returns: |
| The new Path. |
| """ |
| if not pieces: |
| return self |
| return Path( |
| self.base, |
| *[p for p in itertools.chain(self.pieces, pieces) if p]) |
| |
| def join(self, *pieces: str) -> Path: |
| return self.joinpath(*pieces) |
| |
| def is_parent_of(self, other: Path) -> bool: |
| """True if |other| is in a subdirectory of this Path.""" |
| spath = self |
| opath = other |
| # If they are BOTH CheckoutBasePath we can use them directly, otherwise we |
| # need to resolve them (which may raise) |
| if not (isinstance(self.base, CheckoutBasePath) and |
| isinstance(other.base, CheckoutBasePath)): |
| spath = self._resolve() |
| opath = other._resolve() |
| |
| # NOTE: This assumes that none of the ResolvedBasePath's overlap, which is |
| # currently true, and simplifies things quite a bit. |
| if spath.base != opath.base: |
| return False |
| |
| # They have the same base, so return True if the paths have a matching prefix. |
| # |
| # If spath.pieces is longer than opath.pieces, this will be False, which is |
| # correct. |
| return spath.pieces == opath.pieces[:len(spath.pieces)] |
| |
| def __str__(self) -> str: |
| if self._str is None: |
| if not self._OS_SEP: |
| raise ValueError('Unable to render Path to string - ' |
| 'recipe_engine/path has not been initialized yet.') |
| str_val = self._OS_SEP.join(itertools.chain((str(self.base),), self.pieces)) |
| object.__setattr__(self, '_str', str_val) |
| return str_val |
| return self._str |
| |
| def __repr__(self) -> str: |
| # Try to resolve `self` - if it's rooted in CheckoutBasePath and |
| # checkout_dir is already set, display the fully resolved path, since all |
| # interactions with this Path will behave in that fashion. |
| # |
| # Otherwise, just use `self` as-is to allow repr(Path) to work in scenarios |
| # where checkout_dir hasn't been set (for example, when reporting errors |
| # involving unresolved checkout_dir paths...) |
| try: |
| spath = self._resolve() |
| except ValueError: |
| spath = self |
| |
| # NOTE: It would be good to switch to the dataclass-repr instead (or just |
| # use __str__ all the time) |
| s = 'Path(%r' % (spath.base,) |
| if spath.pieces: |
| s += ', %s' % ', '.join(repr(x) for x in spath.pieces) |
| return s + ')' |
| |
| def __hash__(self) -> int: |
| spath = self._resolve() |
| return hash(('config_types.Path', spath.base, spath.pieces)) |