blob: 41423fb2d5f253b0ddc44d5d75575760ae587ac2 [file] [log] [blame]
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Read disk information from a CrOS image using cgpt."""
import collections
import os
from chromite.lib import chroot_lib
from chromite.lib import cros_build_lib
from chromite.lib import osutils
# MiniOS partition type GUID
MINIOS_TYPE_GUID = "09845860-705F-4BB5-B16C-8A8A099CAF52"
class Error(Exception):
"""Raised when there is an error with Cgpt."""
class MultiplePartitionLabel(Error):
"""Raised when a duplicate partition label is found."""
# Storage of partition properties as read from `cgpt show`.
Partition = collections.namedtuple(
"Partition",
("part_num", "label", "start", "size", "part_type", "uuid", "attr"),
)
class Disk:
"""GPT disk image manager.
Attributes:
image_file: cros disk image
partitions: OrderedDict of partitions, indexed by part_num
"""
def __init__(self, image_file, partitions=None) -> None:
self.image_file = image_file
self.partitions = partitions or collections.OrderedDict()
@classmethod
def FromImage(
cls, image_file, chroot: chroot_lib.Chroot = chroot_lib.Chroot()
):
"""Returns new Disk initialized from given |image_file|.
Raises:
RunCommandError: if error running cgpt command
CgptError: if error parsing out output of cgpt command
"""
if not os.path.getsize(image_file):
raise Error(f"File {image_file} is empty")
# If 'cgpt' binary doesn't exist in path, try within chroot.
enter_chroot = osutils.Which("cpgt") is None
if enter_chroot:
image_file = chroot.chroot_path(image_file)
cmd_result = cros_build_lib.run(
["cgpt", "show", "-n", image_file],
enter_chroot=enter_chroot,
chroot_args=chroot.get_enter_args(),
capture_output=True,
encoding="utf-8",
)
# Covert output to a file for processing via readline().
cgpt_result = iter(cmd_result.stdout.splitlines())
# Read header.
if not cmd_result.stdout or next(cgpt_result).split() != [
"start",
"size",
"part",
"contents",
]:
raise Error("Unable to find header in cgpt output")
partitions = collections.OrderedDict()
for line in cgpt_result:
if "Label:" not in line:
# Skip anything not a partition.
continue
# The expected output for a partition is 4 lines in the following
# format.
# <start> <size> <part_num> Label: "<label>"
# Type: <part_type>
# UUID: <uuid>
# Attr: <attr>
start, size, part_num, _, label = line.split()
start = int(start)
size = int(size)
part_num = int(part_num)
label = label.strip('"')
part_type = ""
uuid = ""
attr = ""
# Read next 3 lines for Type, UUID, and Attr.
for _ in range(3):
line = next(cgpt_result)
outputs = line.split()
if len(outputs) != 2:
raise Error("Unexpected line %r" % outputs)
if outputs[0] == "Type:":
part_type = outputs[1]
elif outputs[0] == "UUID:":
uuid = outputs[1]
elif outputs[0] == "Attr:":
attr = outputs[1]
else:
raise Error("Unexpected partition value: %r" % line.strip())
partitions[part_num] = Partition(
part_num=part_num,
label=label,
start=start,
size=size,
part_type=part_type,
uuid=uuid,
attr=attr,
)
return cls(image_file, partitions=partitions)
def GetPartitionByLabel(self, label):
"""Returns Partition with the given |label|.
Raises:
MultiplePartitionLabel: multiple partitions with that label found
KeyError: label not found
"""
part = None
for part_next in self.partitions.values():
if part_next.label == label:
if part is None:
part = part_next
else:
raise MultiplePartitionLabel(label)
if part is None:
raise KeyError(label)
return part
def GetPartitionByTypeGuid(self, type_guid):
"""Returns Partitions with the given |type_guid|.
Raises:
KeyError: if the type GUID is not found
"""
parts = [
x for x in self.partitions.values() if x.part_type == type_guid
]
if not parts:
raise KeyError(type_guid)
return parts