blob: 8d9a5798c51ad5b294c416c7be92880718ba7660 [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2023 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""General utilities for `rust_crates`."""
import dataclasses
import hashlib
import os
from pathlib import Path
import shutil
import subprocess
import sys
import tempfile
# This sets up the paths for chromite
import chromite_init
from chromite.lib import commandline
from chromite.lib import constants
from chromite.lib import cros_build_lib
def run_inside_chroot():
"""Restart the command inside the chroot if it isn't already."""
try:
commandline.RunInsideChroot()
except commandline.ChrootRequiredError as e:
sys.exit(
cros_build_lib.run(
e.cmd,
check=False,
enter_chroot=True,
chroot_args=e.chroot_args,
extra_env=e.extra_env,
cwd=constants.SOURCE_ROOT
/ "src"
/ "third_party"
/ "rust_crates",
).returncode
)
def die_if_running_as_root():
"""Exit with an error if this script is being run as root."""
if os.geteuid() == 0:
sys.exit("Don't run this as root and/or with sudo.")
def install_tomli_and_reexec_if_unavailable():
"""Installs the tomli{,_w} modules & restarts the program if necessary."""
try:
import tomli
import tomli_w
return
except ImportError:
pass
try:
import pip
has_pip = True
except ImportError:
has_pip = False
print("toml module isn't available; autoinstalling...")
if not has_pip:
print("ensuring pip is available.")
# The pip version is bundled with python, so we don't need to worry
# about version checks there.
subprocess.run(
[
"python",
"-m",
"ensurepip",
],
check=True,
)
tempdir = Path(tempfile.mkdtemp(prefix="pip-install-tomli"))
tomli_wheels = (
(
"tomli-2.0.1-py3-none-any.whl.bz2",
"218bc14a227cce9a113d6e67e9713e9ce5272325b37373329e673f9f217832e7",
),
(
"tomli_w-1.0.0-py3-none-any.whl.bz2",
"e00d7fc56ce3072cc348e4583642848e41f7c795630f004027c4f778f729576c",
),
)
for bz2_whl_name, sha in tomli_wheels:
download_gs_file_to(
target_path=tempdir / bz2_whl_name,
gs_path=f"gs://chromeos-localmirror/distfiles/{bz2_whl_name}",
sha256=sha,
)
subprocess.run(["bzip2", "-d", bz2_whl_name], check=True, cwd=tempdir)
subprocess.run(
[
"python",
"-m",
"pip",
"--disable-pip-version-check",
"install",
"--user",
tempdir / Path(bz2_whl_name).stem,
],
check=True,
)
# Only clean this up on successful installs. It's useful for debugging, and
# lands in /tmp anyway.
shutil.rmtree(tempdir)
# Now restart from scratch. This is necessary if pip had to be installed,
# since pip adds superpowers to Python's module loader. It's not super
# clear how to dynamically add those, and this function is intended to be
# called _very_ early, so re-exec'ing isn't much of an issue.
if not has_pip:
os.execvp(sys.argv[0], sys.argv)
@dataclasses.dataclass(frozen=True)
class GitHeadAncestry:
"""Describes information about theancestry of HEAD."""
# Our upstream branch. May be inferred.
upstream_branch: str
# Whether our upstream branch was inferred.
is_upstream_assumed: bool
# True if HEAD is a direct ancestor of `upstream_branch`.
is_upstream_an_ancestor: bool
def collect_git_head_ancestry(rust_crates: Path) -> GitHeadAncestry:
"""Populates a GitHeadAncestry object."""
upstream = subprocess.run(
[
"git",
"rev-parse",
"@{u}",
],
cwd=rust_crates,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)
if upstream.returncode:
# Assume we're at detached-head. This is technically wrong for
# branches; we highlight that later.
upstream = "cros/main"
upstream_assumed = True
else:
upstream = upstream.stdout.strip()
upstream_assumed = False
assert upstream, "`git rev-parse @{u}` unexpectedly gave no output"
merge_base = subprocess.run(
[
"git",
"merge-base",
"HEAD",
upstream,
],
cwd=rust_crates,
check=True,
stdout=subprocess.PIPE,
encoding="utf-8",
)
merge_base = merge_base.stdout.strip()
upstream_sha = subprocess.run(
[
"git",
"rev-parse",
upstream,
],
cwd=rust_crates,
check=True,
stdout=subprocess.PIPE,
encoding="utf-8",
)
upstream_sha = upstream_sha.stdout.strip()
return GitHeadAncestry(
upstream_branch=upstream,
is_upstream_assumed=upstream_assumed,
is_upstream_an_ancestor=upstream_sha == merge_base,
)
def exit_if_head_is_not_up_to_date(rust_crates: Path, disable_check_flag: str):
"""Runs `sys.exit` with helpful messages if HEAD isn't up-to-date."""
ancestry = collect_git_head_ancestry(rust_crates)
if ancestry.is_upstream_an_ancestor:
return
exit_message_lines = [
f"Error: HEAD is not a child of {ancestry.upstream_branch}.",
"This may lead to this script giving incorrect results.",
"Please run `repo sync`, or if you'd like to just update this repo, "
f"`git rebase {ancestry.upstream_branch}`. Afterward, rerun this.",
]
if ancestry.is_upstream_assumed:
exit_message_lines.append("")
exit_message_lines.append(
"Note: upstream branch assumed to be "
f"{ancestry.upstream_branch} for lack of a better option."
)
exit_message_lines.append("")
exit_message_lines.append(
f"Note: pass {disable_check_flag} to disable this check."
)
sys.exit("\n".join(exit_message_lines))
def download_gs_file_to(
target_path: Path,
gs_path: str,
sha256: str,
):
"""Downloads a file from gs://, checking its SHA against `sha256`.
Args:
target_path: the path to download the file to.
gs_path: the gs:// path to download from.
sha256: the expected sha256 of the file, in hex.
"""
print(f"Downloading {gs_path}...")
subprocess.run(
["gsutil", "-q", "cp", gs_path, target_path],
check=True,
stdin=subprocess.DEVNULL,
)
print("Verifying SHA...")
with target_path.open("rb") as f:
got_sha256 = hashlib.sha256()
for block in iter(lambda: f.read(32 * 1024), b""):
got_sha256.update(block)
got_sha256 = got_sha256.hexdigest()
if got_sha256 != sha256:
raise ValueError(
f"SHA256 mismatch for {gs_path}. Got {got_sha256}, want "
f"{sha256}"
)