| #!/usr/bin/env python3 |
| # Copyright 2020 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Find ebuilds for rust crates that have been replaced by newer versions. |
| |
| Aids the process of removing unused rust ebuilds that have been replaced |
| by newer versions: |
| |
| 1) Get the list of dev-rust ebuilds. |
| 2) Exclude the newest version of each ebuild. |
| 3) Generate a list of ebuilds that are installed for typical configurations. |
| 4) List the dev-rust ebuilds that aren't included. |
| |
| For example: |
| ./cleanup_crates.py -c --log-level=debug |
| """ |
| import collections |
| import errno |
| import logging |
| import os |
| import pickle |
| import sys |
| |
| from chromite.lib import build_target_lib |
| from chromite.lib import chroot_util |
| from chromite.lib import commandline |
| from chromite.lib import constants |
| from chromite.lib import osutils |
| from chromite.lib import portage_util |
| from chromite.lib.parser import package_info |
| |
| |
| # The path of the cache. |
| DEFAULT_CACHE_PATH = os.path.join( |
| osutils.GetGlobalTempDir(), "cleanup_crates.py" |
| ) |
| |
| # build targets to include for the host. |
| HOST_CONFIGURATIONS = { |
| "virtual/target-sdk", |
| "virtual/target-sdk-post-cross", |
| } |
| # build targets to include for each board. |
| BOARD_CONFIGURATIONS = { |
| "virtual/target-os", |
| "virtual/target-os-dev", |
| "virtual/target-os-test", |
| } |
| |
| # The set of boards to check. This only needs to be a representative set. |
| BOARDS = {"eve", "tatl"} | ( |
| set() |
| if not os.path.isdir( |
| os.path.join(constants.SOURCE_ROOT, "src", "private-overlays") |
| ) |
| else {"reven"} |
| ) |
| |
| _GEN_CONFIG = lambda boards, configs: [(b, c) for b in boards for c in configs] |
| # A tuple of (board, ebuild) pairs used to generate the set of installed |
| # packages. |
| CONFIGURATIONS = _GEN_CONFIG((None,), HOST_CONFIGURATIONS) + _GEN_CONFIG( |
| BOARDS, BOARD_CONFIGURATIONS |
| ) |
| |
| EBUILD_SUFFIX = ".ebuild" |
| |
| |
| def main(argv): |
| """List ebuilds for rust crates replaced by newer versions.""" |
| opts = get_opts(argv) |
| |
| cln = CachedPackageLists( |
| use_cache=opts.cache, |
| clear_cache=opts.clear_cache, |
| cache_dir=opts.cache_dir, |
| ) |
| |
| ebuilds = get_dev_rust_ebuilds() |
| used = cln.get_used_packages(CONFIGURATIONS) |
| if not opts.latest: |
| used.update(latest_versions(ebuilds)) |
| |
| for key, value in find_ebuild_symlinks(ebuilds).items(): |
| if key in used and value not in used: |
| logging.info("Used symlink to unused cpvr: %s -> %s", key, value) |
| used.add(value) |
| |
| used_pv = set() |
| unused_ebuilds = [] |
| for ebuild in ebuilds: |
| if ebuild.cpvr not in used: |
| unused_ebuilds.append(ebuild) |
| else: |
| used_pv.add(ebuild.pv) |
| print("\n".join(sorted(x.cpvr for x in unused_ebuilds))) |
| |
| if opts.apply: |
| remove_ebuilds(unused_ebuilds) |
| remove_manifest_entries( |
| [x for x in unused_ebuilds if x.pv not in used_pv] |
| ) |
| return 0 |
| |
| |
| def get_opts(argv): |
| """Parse the command-line options.""" |
| parser = commandline.ArgumentParser(description=__doc__) |
| parser.add_argument( |
| "-c", |
| "--cache", |
| action="store_true", |
| help="Enables caching of the results of GetPackageDependencies.", |
| ) |
| parser.add_argument( |
| "-x", |
| "--clear-cache", |
| action="store_true", |
| help="Clears the contents of the cache before executing.", |
| ) |
| parser.add_argument( |
| "-C", |
| "--cache-dir", |
| action="store", |
| default=DEFAULT_CACHE_PATH, |
| type="path", |
| help="The path to store the cache (default: %(default)s)", |
| ) |
| parser.add_argument( |
| "-A", |
| "--apply", |
| action="store_true", |
| help="Remove the ebuilds and their Manifest entries.", |
| ) |
| parser.add_argument( |
| "-L", |
| "--latest", |
| action="store_true", |
| help="Remove even the latest version of unused ebuilds", |
| ) |
| opts = parser.parse_args(argv) |
| opts.Freeze() |
| return opts |
| |
| |
| def get_dev_rust_ebuilds(): |
| """Return a list of dev-rust ebuilds excluding cros-workon packages.""" |
| results = [] |
| category = "dev-rust" |
| category_dir = os.path.join( |
| constants.SOURCE_ROOT, constants.CHROMIUMOS_OVERLAY_DIR, category |
| ) |
| for package in os.listdir(category_dir): |
| package_dir = os.path.join(category_dir, package) |
| if not os.path.isdir(package_dir): |
| continue |
| # Skip cros-workon packages. |
| if os.path.exists( |
| os.path.join(package_dir, "%s-9999.ebuild" % package) |
| ): |
| continue |
| for ebuild_name in os.listdir(package_dir): |
| if not ebuild_name.lower().endswith(EBUILD_SUFFIX): |
| continue |
| cpvr = os.path.join(category, ebuild_name[0 : -len(EBUILD_SUFFIX)]) |
| results.append(package_info.parse(cpvr)) |
| return results |
| |
| |
| def latest_versions(packages): |
| """Return a list of the pvr's with the latest version.""" |
| by_atom = collections.defaultdict(list) |
| for pkg in packages: |
| by_atom[pkg.atom].append(pkg) |
| results = [] |
| # Pick out all the old versions, but keep different revisions of the newest |
| # version to ensure we don't keep a symlink and remove the ebuild itself. |
| for pkgs in by_atom.values(): |
| pkgs.sort() |
| results.append(pkgs[-1].cpvr) |
| return results |
| |
| |
| def _get_package_dependencies(board, package): |
| """List the ebuild-version dependencies for a specific board & package.""" |
| if not board: |
| board = None |
| if board and not os.path.isdir( |
| build_target_lib.get_default_sysroot_path(board) |
| ): |
| chroot_util.SetupBoard( |
| board, |
| update_chroot=False, |
| update_host_packages=False, |
| ) |
| return portage_util.GetPackageDependencies(package, board) |
| |
| |
| def get_ebuild_path(package): |
| """Get the absolute path to a chromiumos-overlay ebuild.""" |
| return os.path.join( |
| constants.SOURCE_ROOT, |
| constants.CHROMIUMOS_OVERLAY_DIR, |
| package.relative_path, |
| ) |
| |
| |
| def chase_references(references): |
| """Flatten a pvr -> pvr dict.""" |
| to_chase = set(references.keys()) |
| while to_chase: |
| value = to_chase.pop() |
| stack = [value] |
| value = references[value] |
| while value in to_chase: |
| to_chase.remove(value) |
| stack.append(value) |
| value = references[value] |
| for key in stack: |
| references[key] = value |
| |
| |
| def find_ebuild_symlinks(packages): |
| """Resolve ebuild symlinks as a flattened pvr -> pvr dict.""" |
| links = {} |
| checked = set() |
| for package in packages: |
| if package.cp in checked: |
| continue |
| checked.add(package.cp) |
| ebuild_dir = os.path.dirname(get_ebuild_path(package)) |
| __find_ebuild_symlinks_impl(ebuild_dir, links) |
| chase_references(links) |
| return links |
| |
| |
| def __find_ebuild_symlinks_impl(ebuild_dir, links): |
| package = os.path.basename(ebuild_dir) |
| category = os.path.basename(os.path.dirname(ebuild_dir)) |
| for ebuild_name in os.listdir(ebuild_dir): |
| if not ebuild_name.lower().endswith(EBUILD_SUFFIX): |
| continue |
| ebuild_path = os.path.join(ebuild_dir, ebuild_name) |
| if not os.path.islink(ebuild_path): |
| continue |
| target = os.path.realpath(ebuild_path) |
| target_name = os.path.basename(target) |
| prefix = "%s-" % package |
| if ( |
| os.path.dirname(target) != ebuild_dir |
| or not target_name.startswith(prefix) |
| or not target_name.lower().endswith(EBUILD_SUFFIX) |
| ): |
| logging.warning("Skipping symlink: %s -> %s", ebuild_path, target) |
| continue |
| cpvr = os.path.join(category, ebuild_name[0 : -len(EBUILD_SUFFIX)]) |
| target_cpvr = os.path.join( |
| category, target_name[0 : -len(EBUILD_SUFFIX)] |
| ) |
| links[cpvr] = target_cpvr |
| |
| |
| def remove_ebuilds(packages): |
| """Removes the listed ebuilds.""" |
| for package in packages: |
| ebuild_path = get_ebuild_path(package) |
| ebuild_dir = os.path.dirname(ebuild_path) |
| logging.info("Removing: %s", ebuild_path) |
| try: |
| os.unlink(ebuild_path) |
| except OSError as e: |
| if e.errno != errno.ENOENT: |
| raise |
| logging.warning("Could not find: %s", ebuild_path) |
| try: |
| os.rmdir(ebuild_dir) |
| except OSError as e: |
| if e.errno != errno.ENOTEMPTY: |
| raise |
| |
| |
| def remove_manifest_entries(packages): |
| """Removes the manifest entries for the listed ebuilds.""" |
| for package in packages: |
| ebuild_dir = os.path.dirname(get_ebuild_path(package)) |
| manifest_path = os.path.join(ebuild_dir, "Manifest") |
| if not os.path.exists(manifest_path): |
| logging.warning("Could not find: %s", manifest_path) |
| continue |
| logging.info("Updating: %s", manifest_path) |
| with open(manifest_path, "r") as m: |
| manifest_lines = m.readlines() |
| filtered_lines = [a for a in manifest_lines if package.pv not in a] |
| if not filtered_lines: |
| os.remove(manifest_path) |
| elif len(filtered_lines) != len(manifest_lines): |
| with open(manifest_path, "w") as m: |
| for line in filtered_lines: |
| m.write(line) |
| try: |
| os.rmdir(ebuild_dir) |
| except OSError as e: |
| if e.errno != errno.ENOTEMPTY: |
| raise |
| |
| |
| class CachedPackageLists: |
| """Lists used packages with the specified cache configuration.""" |
| |
| def __init__( |
| self, use_cache=False, clear_cache=False, cache_dir=DEFAULT_CACHE_PATH |
| ): |
| """Initialize the cache if it is enabled.""" |
| self.use_cache = bool(use_cache) |
| self.clear_cache = bool(clear_cache) |
| self.cache_dir = cache_dir |
| if self.clear_cache: |
| osutils.RmDir(self.cache_dir, ignore_missing=True) |
| if self.use_cache: |
| osutils.SafeMakedirs(self.cache_dir) |
| |
| def _try_cache(self, name, fn): |
| """Caches the return value of a function.""" |
| if not self.use_cache: |
| return fn() |
| |
| try: |
| with open(os.path.join(self.cache_dir, name), "rb") as fp: |
| logging.info("cache hit: %s", name) |
| return pickle.load(fp) |
| except FileNotFoundError: |
| pass |
| |
| logging.info("cache miss: %s", name) |
| result = fn() |
| |
| with open(os.path.join(self.cache_dir, name), "wb+") as fp: |
| pickle.dump(result, fp) |
| |
| return result |
| |
| def get_used_packages(self, configurations): |
| """Return the packages installed in the specified configurations.""" |
| |
| def get_deps(board, package): |
| filename_package = package.replace("/", ":") |
| return self._try_cache( |
| f"deps:{board}:{filename_package}", |
| lambda: _get_package_dependencies(board, package), |
| ) |
| |
| used = set() |
| for board, package in configurations: |
| deps = get_deps(board, package) |
| if deps: |
| used.update(deps) |
| else: |
| logging.warning("No depts for (%s, %s)", board, package) |
| return used |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main(sys.argv[1:])) |