| # Copyright 2013 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """All functions related to manipulating paths in recipes. |
| |
| Recipes handle paths a bit differently than python does. All path manipulation |
| in recipes revolves around Path objects. These objects store a base path (always |
| absolute), plus a list of components to join with it. New paths can be derived |
| by calling the .join method with additional components. |
| |
| In this way, all paths in Recipes are absolute, and are constructed from a small |
| collection of anchor points. The built-in anchor points are: |
| * `api.path['start_dir']` - This is the directory that the recipe started in. |
| it's similar to `cwd`, except that it's constant. |
| * `api.path['cache']` - This directory is provided by whatever's running the |
| recipe. Files and directories created under here /may/ be evicted in between |
| runs of the recipe (i.e. to relieve disk pressure). |
| * `api.path['cleanup']` - This directory is provided by whatever's running the |
| recipe. Files and directories created under here /are guaranteed/ to be |
| evicted in between runs of the recipe. Additionally, this directory is |
| guaranteed to be empty when the recipe starts. |
| * `api.path['tmp_base']` - This directory is the system-configured temp dir. |
| This is a weaker form of 'cleanup', and its use should be avoided. This may |
| be removed in the future (or converted to an alias of 'cleanup'). |
| * `api.path['checkout']` - This directory is set by various 'checkout' modules |
| in recipes. It was originally intended to make recipes easier to read and |
| make code somewhat generic or homogenous, but this was a mistake. New code |
| should avoid 'checkout', and instead just explicitly pass paths around. This |
| path may be removed in the future. |
| |
| There are other anchor points which can be defined (e.g. by the |
| `depot_tools/infra_paths` module). Refer to those modules for additional |
| documentation. |
| """ |
| |
| import functools |
| import itertools |
| import os |
| import re |
| import tempfile |
| |
| from recipe_engine import recipe_api |
| from recipe_engine import config_types |
| |
| |
| class Error(Exception): |
| """Error specific to path recipe module.""" |
| |
| |
| def PathToString(api, test): |
| def PathToString_inner(path): |
| assert isinstance(path, config_types.Path) |
| base_path = path.base.resolve(test.enabled) |
| suffix = path.platform_ext.get(api.m.platform.name, '') |
| return api.join(base_path, *path.pieces) + suffix |
| return PathToString_inner |
| |
| |
| def string_filter(func): |
| @functools.wraps(func) |
| def inner(*args, **kwargs): |
| return func(*map(str, args), **kwargs) |
| return inner |
| |
| |
| class path_set(object): |
| """ implements a set which contains all the parents folders of added folders. |
| """ |
| # TODO(iannucci): Expand this to be a full fakey filesystem, including file |
| # contents and file types. Coordinate with the `file` module. |
| def __init__(self, path_mod, initial_paths): |
| self._path_mod = path_mod |
| self._initial_paths = set(initial_paths) |
| self._paths = set() |
| |
| def _initialize(self): |
| self._initialize = lambda: None |
| for path in self._initial_paths: |
| self.add(path) |
| self._initial_paths = None |
| self.contains = lambda path: path in self._paths |
| |
| def add(self, path): |
| path = str(path) |
| self._initialize() |
| prev_path = None |
| while path != prev_path: |
| self._paths.add(path) |
| prev_path, path = path, self._path_mod.dirname(path) |
| |
| def copy(self, source, dest): |
| source, dest = str(source), str(dest) |
| self._initialize() |
| to_add = set() |
| for p in self._paths: |
| if p.startswith(source): |
| to_add.add(p.replace(source, dest)) |
| self._paths |= to_add |
| |
| def remove(self, path, filt): |
| path = str(path) |
| self._initialize() |
| kill_set = set(p for p in self._paths if p.startswith(path) and filt(p)) |
| self._paths -= kill_set |
| |
| def contains(self, path): |
| self._initialize() |
| return self.contains(path) |
| |
| |
| class fake_path(object): |
| """Standin for os.path when we're in test mode. |
| |
| This class simulates the os.path interface exposed by PathApi, respecting the |
| current platform according to the `platform` module. This allows us to |
| simulate path functions according to the platform being tested, rather than |
| the platform which is currently running. |
| """ |
| |
| def __init__(self, api, _mock_path_exists): |
| self._api = api |
| self._mock_path_exists = path_set(self, _mock_path_exists) |
| self._pth = None |
| |
| def _init_pth(self): |
| if not self._pth: |
| if self._api.m.platform.is_win: |
| import ntpath as pth |
| elif self._api.m.platform.is_mac or self._api.m.platform.is_linux: |
| import posixpath as pth |
| self._pth = pth |
| |
| def __getattr__(self, name): |
| self._init_pth() |
| return getattr(self._pth, name) |
| |
| def mock_add_paths(self, path): |
| """ |
| Adds a path and all of its parents to the set of existing paths. |
| """ |
| self._mock_path_exists.add(path) |
| |
| def mock_copy_paths(self, source, dest): |
| """ |
| Duplicates a path and all of its children to another path. |
| """ |
| self._mock_path_exists.copy(source, dest) |
| |
| def mock_remove_paths(self, path, filt): |
| """ |
| Removes a path and all of its children from the set of existing paths. |
| """ |
| self._mock_path_exists.remove(path, filt) |
| |
| def exists(self, path): # pylint: disable=E0202 |
| """Return True if path refers to an existing path.""" |
| return self._mock_path_exists.contains(path) |
| |
| # This matches: |
| # [START_DIR] |
| # RECIPE[some_pkg::some_module:recipe_name] |
| # |
| # and friends at the beginning of a string. |
| ROOT_MATCHER = re.compile('^[A-Z_]*\[[^]]*\]') |
| |
| def normpath(self, path): |
| """Normalizese the path. |
| |
| This splits off a recipe base (i.e. RECIPE[...]) so that normpath is |
| only called on the user-supplied portion of the path. |
| """ |
| self._init_pth() |
| real_normpath = self._pth.normpath |
| m = self.ROOT_MATCHER.match(path) |
| if m: |
| prefix = m.group(0) |
| rest = path[len(prefix):] |
| if rest == '': |
| # normpath turns '' into '.' |
| return prefix |
| return prefix + real_normpath(rest) |
| return real_normpath(path) |
| |
| def abspath(self, path): |
| """Returns the absolute version of path.""" |
| return self.normpath(path) |
| |
| def realpath(self, path): |
| """Returns the canonical version of the path.""" |
| return self.normpath(path) |
| |
| def expanduser(self, path): |
| return path.replace('~', '[HOME]') |
| |
| |
| class PathApi(recipe_api.RecipeApi): |
| _paths_client = recipe_api.RequireClient('paths') |
| |
| # Attribute accesses that we pass through to our "_path_mod" module. |
| OK_ATTRS = ('pardir', 'sep', 'pathsep') |
| |
| # Because the native 'path' type in python is a str, we filter the *args |
| # of these methods to stringify them first (otherwise they would be getting |
| # recipe_util_types.Path instances). |
| FILTER_METHODS = ('abspath', 'basename', 'dirname', 'exists', 'expanduser', |
| 'join', 'split', 'splitext', 'realpath') |
| |
| def get_config_defaults(self): |
| """Internal recipe implementation function.""" |
| return { |
| # Needed downstream in depot_tools |
| 'PLATFORM': self.m.platform.name, |
| 'START_DIR': self._startup_cwd, |
| 'TEMP_DIR': self._temp_dir, |
| 'CACHE_DIR': self._cache_dir, |
| 'CLEANUP_DIR': self._cleanup_dir, |
| } |
| |
| def __init__(self, path_properties, **kwargs): |
| super(PathApi, self).__init__(**kwargs) |
| config_types.Path.set_tostring_fn( |
| PathToString(self, self._test_data)) |
| config_types.NamedBasePath.set_path_api(self) |
| |
| self._path_properties = path_properties |
| |
| # Assigned at "initialize". |
| self._path_mod = None # NT or POSIX path module, or "os.path" in prod. |
| self._start_dir = None |
| self._temp_dir = None |
| self._cache_dir = None |
| self._cleanup_dir = None |
| |
| # Used in mkdtemp when generating and checking expectations. |
| self._test_counter = 0 |
| |
| def _read_path(self, property_name, default): # pragma: no cover |
| """Reads a path from a property. If absent, returns the default. |
| |
| Validates that the path is absolute. |
| """ |
| value = self._path_properties.get(property_name) |
| if not value: |
| assert os.path.isabs(default), default |
| return default |
| if not os.path.isabs(value): |
| raise Error( |
| 'Path "%s" specified by module property %s is not absolute' % ( |
| value, property_name)) |
| return value |
| |
| def _ensure_dir(self, path): # pragma: no cover |
| try: |
| os.makedirs(path) |
| except os.error: |
| pass # Perhaps already exists. |
| |
| def _split_path(self, path): # pragma: no cover |
| """Relative or absolute path -> tuple of components.""" |
| abs_path = os.path.abspath(path).split(self.sep) |
| # Guarantee that the first element is an absolute drive or the posix root. |
| if abs_path[0].endswith(':'): |
| abs_path[0] += '\\' |
| elif abs_path[0] == '': |
| abs_path[0] = '/' |
| else: |
| assert False, 'Got unexpected path format: %r' % abs_path |
| return abs_path |
| |
| def initialize(self): |
| """Internal recipe implementation function.""" |
| if not self._test_data.enabled: # pragma: no cover |
| self._path_mod = os.path |
| start_dir = self._paths_client.start_dir |
| self._startup_cwd = self._split_path(start_dir) |
| |
| tmp_dir = self._read_path('temp_dir', tempfile.gettempdir()) |
| self._ensure_dir(tmp_dir) |
| self._temp_dir = self._split_path(tmp_dir) |
| |
| cache_dir = self._read_path('cache_dir', os.path.join(start_dir, 'cache')) |
| self._ensure_dir(cache_dir) |
| self._cache_dir = self._split_path(cache_dir) |
| |
| # If no cleanup directory is specified, assume that any directory |
| # underneath of the working directory is transient and will be purged in |
| # between builds. |
| cleanup_dir = self._read_path('cleanup_dir', |
| os.path.join(start_dir, 'recipe_cleanup')) |
| self._ensure_dir(cleanup_dir) |
| self._cleanup_dir = self._split_path(cleanup_dir) |
| else: |
| self._path_mod = fake_path(self, self._test_data.get('exists', [])) |
| |
| root = 'C:\\' if self.m.platform.is_win else '/' |
| self._startup_cwd = [root, 'b', 'FakeTestingCWD'] |
| # Appended to placeholder '[TMP]' to get fake path in test. |
| self._temp_dir = [root] |
| self._cache_dir = [root, 'b', 'c'] |
| self._cleanup_dir = [root, 'b', 'cleanup'] |
| |
| self.set_config('BASE') |
| |
| def mock_add_paths(self, path): |
| """For testing purposes, mark that |path| exists.""" |
| if self._test_data.enabled: |
| self._path_mod.mock_add_paths(path) |
| |
| def mock_copy_paths(self, source, dest): |
| """For testing purposes, copy |source| to |dest|.""" |
| if self._test_data.enabled: |
| self._path_mod.mock_copy_paths(source, dest) |
| |
| def mock_remove_paths(self, path, filt=lambda p: True): |
| """For testing purposes, assert that |path| doesn't exist. |
| |
| Args: |
| * path (str|Path) - The path to remove. |
| * filt (func[str] bool) - Called for every candidate path. Return |
| True to remove this path. |
| """ |
| if self._test_data.enabled: |
| self._path_mod.mock_remove_paths(path, filt) |
| |
| def assert_absolute(self, path): |
| """Raises AssertionError if the given path is not an absolute path. |
| |
| Args: |
| * path (Path|str) - The path to check. |
| """ |
| assert self.abspath(path) == str(path), '%s is not absolute' % path |
| |
| def mkdtemp(self, prefix=tempfile.template): |
| """Makes a new temporary directory, returns Path to it. |
| |
| Args: |
| * prefix (str) - a tempfile template for the directory name (defaults |
| to "tmp"). |
| |
| Returns a Path to the new directory. |
| """ |
| if not self._test_data.enabled: # pragma: no cover |
| # New path as str. |
| new_path = tempfile.mkdtemp(prefix=prefix, dir=str(self['cleanup'])) |
| # Ensure it's under self._cleanup_dir, convert to Path. |
| new_path = self._split_path(new_path) |
| assert new_path[:len(self._cleanup_dir)] == self._cleanup_dir, ( |
| 'new_path: %r -- cleanup_dir: %r' % (new_path, self._cleanup_dir)) |
| temp_dir = self['cleanup'].join(*new_path[len(self._cleanup_dir):]) |
| else: |
| self._test_counter += 1 |
| assert isinstance(prefix, basestring) |
| temp_dir = self['cleanup'].join( |
| '%s_tmp_%d' % (prefix, self._test_counter)) |
| self.mock_add_paths(temp_dir) |
| return temp_dir |
| |
| def mkstemp(self, prefix=tempfile.template): |
| """Makes a new temporary file, returns Path to it. |
| |
| Args: |
| * prefix (str) - a tempfile template for the file name (defaults to |
| "tmp"). |
| |
| Returns a Path to the new file. Unlike tempfile.mkstemp, the file's file |
| descriptor is closed. |
| """ |
| if not self._test_data.enabled: # pragma: no cover |
| # New path as str. |
| fd, new_path = tempfile.mkstemp(prefix=prefix, dir=str(self['cleanup'])) |
| # Ensure it's under self._cleanup_dir, convert to Path. |
| new_path = self._split_path(new_path) |
| assert new_path[:len(self._cleanup_dir)] == self._cleanup_dir, ( |
| 'new_path: %r -- cleanup_dir: %r' % (new_path, self._cleanup_dir)) |
| temp_file = self['cleanup'].join(*new_path[len(self._cleanup_dir):]) |
| os.close(fd) |
| else: |
| self._test_counter += 1 |
| assert isinstance(prefix, basestring) |
| temp_file = self['cleanup'].join( |
| '%s_tmp_%d' % (prefix, self._test_counter)) |
| self.mock_add_paths(temp_file) |
| return temp_file |
| |
| def abs_to_path(self, abs_string_path): |
| """Converts an absolute path string `string_path` to a real Path object, |
| using the most appropriate known base path. |
| |
| * abs_string_path MUST be an absolute path |
| * abs_string_path MUST be rooted in one of the configured base paths known |
| to the path module. |
| |
| This method will find the longest match in all the following: |
| * module resource paths |
| * recipe resource paths |
| * repo paths |
| * dynamic_paths |
| * base_paths |
| |
| Example: |
| ``` |
| # assume [START_DIR] == "/basis/dir/for/recipe" |
| api.path.abs_to_path("/basis/dir/for/recipe/some/other/dir") -> |
| Path("[START_DIR]/some/other/dir") |
| ``` |
| |
| Raises an ValueError if the preconditions are not met, otherwise returns the |
| Path object. |
| """ |
| ap = self.abspath(abs_string_path) |
| if ap != abs_string_path: |
| raise ValueError("path is not absolute: %r v %r" % (abs_string_path, ap)) |
| |
| # try module/recipe/repo resource paths first |
| sPath, path = self._paths_client.find_longest_prefix( |
| abs_string_path, self.sep) |
| if path is None: |
| # try base paths now |
| for path_name in itertools.chain(self.c.dynamic_paths, self.c.base_paths): |
| path = self[path_name] |
| sPath = str(path) |
| if abs_string_path.startswith(sPath): |
| break |
| else: |
| path = None |
| |
| if path is None: |
| raise ValueError("could not figure out a base path for %r" % |
| abs_string_path) |
| |
| sub_path = abs_string_path[len(sPath):].strip(self.sep) |
| return path.join(*sub_path.split(self.sep)) |
| |
| |
| def __contains__(self, pathname): |
| return any(path_set.get(pathname) for path_set in ( |
| self.c.dynamic_paths, self.c.base_paths)) |
| |
| def __setitem__(self, pathname, path): |
| assert isinstance(path, config_types.Path), ( |
| 'Setting dynamic path to something other than a Path: %r' % path) |
| assert pathname in self.c.dynamic_paths, ( |
| 'Must declare dynamic path (%r) in config before setting it.' % path) |
| assert isinstance(path.base, config_types.BasePath), ( |
| 'Dynamic path values must be based on a base_path' % path.base) |
| self.c.dynamic_paths[pathname] = path |
| |
| def get(self, name, default=None): |
| """Gets the base path named `name`. See module docstring for more |
| information.""" |
| if name in self.c.base_paths or name in self.c.dynamic_paths: |
| return config_types.Path(config_types.NamedBasePath(name)) |
| return default |
| |
| def __getitem__(self, name): |
| """Gets the base path named `name`. See module docstring for more |
| information.""" |
| result = self.get(name) |
| if not result: |
| raise KeyError('Unknown path: %s' % name) |
| return result |
| |
| def __getattr__(self, name): |
| # retrieve os.path attributes |
| if name in self.OK_ATTRS: |
| return getattr(self._path_mod, name) |
| if name in self.FILTER_METHODS: |
| return string_filter(getattr(self._path_mod, name)) |
| raise AttributeError("'%s' object has no attribute '%s'" % |
| (self._path_mod, name)) # pragma: no cover |
| |
| def __dir__(self): # pragma: no cover |
| # Used for helping out show_me_the_modules.py |
| return self.__dict__.keys() + list(self.OK_ATTRS + self.FILTER_METHODS) |