blob: 935d402fd239dbc4a70880d79e97a38e46a49b3d [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2023 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Build a VM disk image for Bruschetta.
This script builds a disk imagethat can boot as a guest on ChromeOS with VM
integrations.
"""
import argparse
import contextlib
import json
import os
import pathlib
import shutil
import subprocess
import tempfile
from typing import Dict, List, Optional, Tuple
# pylint: disable=import-error
import jinja2
import requests
import yaml
SCRIPT_PATH = pathlib.Path(__file__).parent
DISK_CONFIG_TEMPLATE = SCRIPT_PATH / "disk_config.tpl"
SETUP_SCRIPT = SCRIPT_PATH / "setup.sh"
DATA_PATH = SCRIPT_PATH / "data"
LVM_VG_NAME = "refvm"
def main():
ap = argparse.ArgumentParser()
ap.add_argument(
"-o",
"--out",
default="refvm.img",
help="output file (default: %(default)s)",
)
ap.add_argument(
"-s",
"--size",
default=10,
type=int,
help="image size in GiB (default: %(default)s)",
)
ap.add_argument("--cache-dir", help="directory for debootstrap caches")
ap.add_argument(
"--cros-version",
type=int,
help="install VM tools for this CrOS version",
)
ap.add_argument(
"--cros-tools",
default="release",
choices=["release", "staging"],
help="source of VM tools (default: %(default)s)",
)
ap.add_argument(
"--debian-release",
default="trixie",
help="OS version to be installed (default: %(default)s)",
)
ap.add_argument(
"--vg-name",
default="refvm",
help="name of LVM VG in installed OS (default: %(default)s)",
)
args = ap.parse_args()
cache_dir = pathlib.Path(args.cache_dir) if args.cache_dir else None
cros_bucket_name = {
"release": "cros-packages",
"staging": "cros-packages-staging",
}[args.cros_tools]
cros_version = args.cros_version or get_latest_cros_version(
cros_bucket_name
)
cros_packages_url = (
f"https://storage.googleapis.com/{cros_bucket_name}/{cros_version}/"
)
image_path = pathlib.Path(args.out)
image_size = args.size * 1024**3
if image_path.exists():
raise Exception(f"Image file '{image_path}' already exists")
with tempfile.TemporaryDirectory() as temp_dir_name, open(
image_path, "wb"
) as image:
temp_dir = pathlib.Path(temp_dir_name)
image.seek(image_size)
image.truncate()
image.seek(0)
with setup_loop(loop_file=image_path, vg_name=args.vg_name) as loop:
disk_vars, fstab = setup_storage(
temp_dir=temp_dir, target_device=loop, vg_name=args.vg_name
)
target = temp_dir / "target"
with mount_target(target, disk_vars):
run_debootstrap(
args.debian_release, target, cache_dir=cache_dir
)
with open(target / "etc/fstab", "w", encoding="utf-8") as f:
print("Writing fstab")
f.write(fstab)
with prepare_chroot(target):
shutil.copytree(DATA_PATH, target / "tmp/data")
shutil.copy(SETUP_SCRIPT, target / "tmp/setup.sh")
os.chmod(target / "tmp/setup.sh", 0o755)
run_in_chroot(
target,
["/tmp/setup.sh"],
env={
"CROS_PACKAGES_URL": cros_packages_url,
"RELEASE": args.debian_release,
},
)
print("Install complete, syncing")
subprocess.run(["sync"], check=True)
print(f"Built image '{image_path}' ({os.path.getsize(image_path)} bytes)")
def get_latest_cros_version(bucket: str) -> int:
res = requests.get(
f"https://storage.googleapis.com/storage/v1/b/{bucket}/o"
"?delimiter=/&matchGlob=**/"
)
res.raise_for_status()
# The returned prefixes include a trailing /, remove it.
prefixes = [p[:-1] for p in res.json()["prefixes"]]
# And find the latest version among valid version numbers.
version = max([int(p) for p in prefixes if p.isnumeric()])
print(f"Detected latest CrOS version for {bucket} is {version}")
return version
@contextlib.contextmanager
def setup_loop(loop_file: pathlib.Path, vg_name: str):
res = subprocess.run(
["losetup", "--show", "-f", loop_file],
capture_output=True,
check=True,
text=True,
)
loop_device = pathlib.Path(res.stdout.strip())
try:
yield loop_device
finally:
subprocess.run(["vgchange", "-an", vg_name], check=True)
subprocess.run(["losetup", "-d", loop_device], check=True)
def setup_storage(
temp_dir: pathlib.Path, target_device: pathlib.Path, vg_name: str
) -> Tuple[Dict[str, str], str]:
jinja_env = jinja2.Environment(
loader=jinja2.FileSystemLoader("/"),
autoescape=False,
keep_trailing_newline=True,
)
template = jinja_env.get_template(str(DISK_CONFIG_TEMPLATE))
disk_config = template.render(vg_name=vg_name)
ignored_vgs = [
vg for vg in list_volume_groups() if not vg.startswith(vg_name)
]
subprocess.run(
[
"setup-storage",
"-X",
"-y",
"-L",
temp_dir,
"-f",
"-",
"-D",
target_device.relative_to(pathlib.Path("/dev")),
],
check=True,
env={"SS_IGNORE_VG": ",".join(ignored_vgs), **os.environ},
input=disk_config,
text=True,
)
with open(
temp_dir / "disk_var.yml",
encoding="utf-8",
) as f:
disk_vars = yaml.load(f, Loader=yaml.SafeLoader)
with open(temp_dir / "fstab", encoding="utf-8") as f:
fstab = f.read()
return (disk_vars, fstab)
def list_volume_groups() -> List[str]:
res = subprocess.run(
["vgs", "--reportformat=json"],
capture_output=True,
check=True,
text=True,
)
data = json.loads(res.stdout)
vg_names = []
for vg in data["report"][0]["vg"]:
vg_names.append(vg["vg_name"])
return vg_names
@contextlib.contextmanager
def mount_target(target: pathlib.Path, disk_vars: Dict[str, str]):
try:
location = target
location.mkdir()
subprocess.run(
["mount", disk_vars["ROOT_PARTITION"], location], check=True
)
location = target / "boot"
location.mkdir()
subprocess.run(
["mount", disk_vars["BOOT_PARTITION"], location], check=True
)
location = target / "boot/efi"
location.mkdir()
subprocess.run(["mount", disk_vars["ESP_DEVICE"], location], check=True)
yield
finally:
subprocess.run(["umount", "-R", target], check=True)
@contextlib.contextmanager
def prepare_chroot(target: pathlib.Path):
mounts = []
try:
mountpoint = target / "dev"
subprocess.run(["mount", "--bind", "/dev", mountpoint], check=True)
mounts.append(mountpoint)
mountpoint = target / "dev/pts"
subprocess.run(["mount", "--bind", "/dev/pts", mountpoint], check=True)
mounts.append(mountpoint)
mountpoint = target / "sys"
subprocess.run(
["mount", "-t", "sysfs", "sysfs", mountpoint], check=True
)
mounts.append(mountpoint)
mountpoint = target / "proc"
subprocess.run(["mount", "-t", "proc", "proc", mountpoint], check=True)
mounts.append(mountpoint)
mountpoint = target / "tmp"
subprocess.run(
["mount", "-t", "tmpfs", "tmpfs", mountpoint], check=True
)
mounts.append(mountpoint)
mountpoint = target / "run"
subprocess.run(
["mount", "-t", "tmpfs", "tmpfs", mountpoint], check=True
)
mounts.append(mountpoint)
yield
finally:
for mountpoint in mounts[::-1]:
subprocess.run(["umount", mountpoint], check=True)
def run_debootstrap(
suite: str, target: pathlib.Path, cache_dir: Optional[pathlib.Path] = None
):
cache_args = []
if cache_dir:
if not cache_dir.exists():
cache_dir.mkdir()
cache_args += ["--cache-dir", cache_dir]
# Run with eatmydata to improve build time.
subprocess.run(
[
"eatmydata",
"debootstrap",
"--include=eatmydata",
"--no-merged-usr",
*cache_args,
suite,
target,
],
check=True,
)
def run_in_chroot(
target: pathlib.Path,
command: List[str],
env: Optional[Dict[str, str]] = None,
):
env = {**os.environ, "LANG": "C.UTF-8", **(env if env else {})}
# Run with eatmydata to improve build time.
subprocess.run(
["eatmydata", "chroot", target, *command], check=True, env=env
)
if __name__ == "__main__":
main()