| #!/usr/bin/python |
| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """An utility to manipulate GPT on a disk image. |
| |
| Chromium OS factory software usually needs to access partitions from disk |
| images. However, there is no good, lightweight, and portable GPT utility. |
| Most Chromium OS systems use `cgpt`, but that's not by default installed on |
| Ubuntu. Most systems have parted (GNU) or partx (util-linux-ng) but they have |
| their own problems. |
| |
| For example, when a disk image is resized (usually enlarged for putting more |
| resources on stateful partition), GPT table must be updated. However, |
| - `parted` can't repair partition without interactive console in exception |
| handler. |
| - `partx` cannot fix headers nor make changes to partition table. |
| - `cgpt repair` does not fix `LastUsableLBA` so we cannot enlarge partition. |
| - `gdisk` is not installed on most systems. |
| |
| As a result, we need a dedicated tool to help processing GPT. |
| |
| This pygpt.py provides a simple and customized implementation for processing |
| GPT, as a replacement for `cgpt`. |
| """ |
| |
| |
| from __future__ import print_function |
| |
| import argparse |
| import binascii |
| import collections |
| import logging |
| import os |
| import struct |
| import uuid |
| |
| |
| # The binascii.crc32 returns signed integer, so CRC32 in in struct must be |
| # declared as 'signed' (l) instead of 'unsigned' (L). |
| # http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_table_header_.28LBA_1.29 |
| HEADER_FORMAT = """ |
| 8s Signature |
| 4s Revision |
| L HeaderSize |
| l CRC32 |
| 4s Reserved |
| Q CurrentLBA |
| Q BackupLBA |
| Q FirstUsableLBA |
| Q LastUsableLBA |
| 16s DiskGUID |
| Q PartitionEntriesStartingLBA |
| L PartitionEntriesNumber |
| L PartitionEntrySize |
| l PartitionArrayCRC32 |
| """ |
| |
| # http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_entries |
| PARTITION_FORMAT = """ |
| 16s TypeGUID |
| 16s UniqueGUID |
| Q FirstLBA |
| Q LastLBA |
| Q Attributes |
| 72s Names |
| """ |
| |
| |
| def BuildStructFormatAndNamedTuple(name, description): |
| """Builds the format string for struct and create corresponding namedtuple. |
| |
| Args: |
| name: A string for name of the named tuple. |
| description: A string with struct descriptor and attribute name. |
| |
| Returns: |
| A pair of (struct_format, namedtuple_class). |
| """ |
| elements = description.split() |
| struct_format = '<' + ''.join(elements[::2]) |
| tuple_class = collections.namedtuple(name, elements[1::2]) |
| return (struct_format, tuple_class) |
| |
| |
| class GPT(object): |
| """A GPT helper class. |
| |
| To load GPT from an existing disk image file, use `LoadFromFile`. |
| After modifications were made, use `WriteToFile` to commit changes. |
| |
| Attributes: |
| header: A namedtuple of GPT header. |
| partitions: A list of GPT partition entry nametuple. |
| """ |
| |
| HEADER_FORMAT, HEADER_CLASS = BuildStructFormatAndNamedTuple( |
| 'Header', HEADER_FORMAT) |
| PARTITION_FORMAT, PARTITION_CLASS = BuildStructFormatAndNamedTuple( |
| 'Partition', PARTITION_FORMAT) |
| BLOCK_SIZE = 512 |
| HEADER_SIGNATURE = 'EFI PART' |
| TYPE_GUID_UNUSED = '\x00' * 16 |
| TYPE_GUID_MAP = { |
| '00000000-0000-0000-0000-000000000000': 'Unused', |
| 'EBD0A0A2-B9E5-4433-87C0-68B6B72699C7': 'Linux data', |
| 'FE3A2A5D-4F32-41A7-B725-ACCC3285A309': 'ChromeOS kernel', |
| '3CB8E202-3B7E-47DD-8A3C-7FF2A13CFCEC': 'ChromeOS rootfs', |
| '2E0A753D-9E48-43B0-8337-B15192CB1B5E': 'ChromeOS reserved', |
| 'CAB6E88E-ABF3-4102-A07A-D4BB9BE3C1D3': 'ChromeOS firmware', |
| 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B': 'EFI System Partition', |
| } |
| TYPE_GUID_LIST_BOOTABLE = [ |
| 'FE3A2A5D-4F32-41A7-B725-ACCC3285A309', # ChromeOS kernel |
| 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B', # EFI System Partition |
| ] |
| |
| def __init__(self): |
| self.header = None |
| self.partitions = None |
| |
| @staticmethod |
| def GetAttributeSuccess(attrs): |
| return (attrs >> 56) & 1 |
| |
| @staticmethod |
| def GetAttributeTries(attrs): |
| return (attrs >> 52) & 0xf |
| |
| @staticmethod |
| def GetAttributePriority(attrs): |
| return (attrs >> 48) & 0xf |
| |
| @staticmethod |
| def NewNamedTuple(base, **dargs): |
| """Builds a new named tuple based on dargs.""" |
| # pylint: disable=protected-access |
| return base._replace(**dargs) |
| |
| @classmethod |
| def ReadHeader(cls, f): |
| return cls.HEADER_CLASS(*struct.unpack( |
| cls.HEADER_FORMAT, f.read(struct.calcsize(cls.HEADER_FORMAT)))) |
| |
| @classmethod |
| def ReadPartitionEntry(cls, f): |
| return cls.PARTITION_CLASS(*struct.unpack( |
| cls.PARTITION_FORMAT, f.read(struct.calcsize(cls.PARTITION_FORMAT)))) |
| |
| @classmethod |
| def GetHeaderBlob(cls, header): |
| return struct.pack(cls.HEADER_FORMAT, *header) |
| |
| @classmethod |
| def GetHeaderCRC32(cls, header): |
| return binascii.crc32(cls.GetHeaderBlob(cls.NewNamedTuple(header, CRC32=0))) |
| |
| @classmethod |
| def GetPartitionsBlob(cls, partitions): |
| return ''.join(struct.pack(cls.PARTITION_FORMAT, *partition) |
| for partition in partitions) |
| |
| @classmethod |
| def GetPartitionsCRC32(cls, partitions): |
| return binascii.crc32(cls.GetPartitionsBlob(partitions)) |
| |
| @classmethod |
| def LoadFromFile(cls, f): |
| """Loads a GPT table from give disk image file object.""" |
| gpt = GPT() |
| f.seek(gpt.BLOCK_SIZE * 1) |
| header = gpt.ReadHeader(f) |
| if header.Signature != cls.HEADER_SIGNATURE: |
| raise ValueError('Invalid signature in GPT header.') |
| f.seek(gpt.BLOCK_SIZE * header.PartitionEntriesStartingLBA) |
| partitions = [gpt.ReadPartitionEntry(f) |
| for unused_i in range(header.PartitionEntriesNumber)] |
| gpt.header = header |
| gpt.partitions = partitions |
| return gpt |
| |
| def GetValidPartitions(self): |
| """Returns the list of partitions before entry with empty type GUID. |
| |
| In partition table, the first entry with empty type GUID indicates end of |
| valid partitions. In most implementations all partitions after that should |
| be zeroed. |
| """ |
| for i, p in enumerate(self.partitions): |
| if p.TypeGUID == self.TYPE_GUID_UNUSED: |
| return self.partitions[:i] |
| return self.partitions |
| |
| def GetMaxUsedLBA(self): |
| """Returns the max LastLBA from all valid partitions.""" |
| return max(p.LastLBA for p in self.GetValidPartitions()) |
| |
| def GetMinUsedLBA(self): |
| """Returns the min FirstLBA from all valid partitions.""" |
| return min(p.FirstLBA for p in self.GetValidPartitions()) |
| |
| def GetPartitionTableBlocks(self, header=None): |
| """Returns the blocks (or LBA) of partition table from given header.""" |
| if header is None: |
| header = self.header |
| size = header.PartitionEntrySize * header.PartitionEntriesNumber |
| blocks = size / self.BLOCK_SIZE |
| if size % self.BLOCK_SIZE: |
| blocks += 1 |
| return blocks |
| |
| def Resize(self, new_size): |
| """Adjust GPT for a disk image in given size. |
| |
| Args: |
| new_size: Integer for new size of disk image file. |
| """ |
| old_size = self.BLOCK_SIZE * (self.header.BackupLBA + 1) |
| if new_size % self.BLOCK_SIZE: |
| raise ValueError('New file size %d is not valid for image files.' % |
| new_size) |
| new_blocks = new_size / self.BLOCK_SIZE |
| if old_size != new_size: |
| logging.warn('Image size (%d, LBA=%d) changed from %d (LBA=%d).', |
| new_size, new_blocks, old_size, old_size / self.BLOCK_SIZE) |
| else: |
| logging.info('Image size (%d, LBA=%d) not changed.', |
| new_size, new_blocks) |
| |
| # Re-calculate all related fields. |
| backup_lba = new_blocks - 1 |
| partitions_blocks = self.GetPartitionTableBlocks() |
| |
| # To add allow adding more blocks for partition table, we should reserve |
| # same space between primary and backup partition tables and real |
| # partitions. |
| min_used_lba = self.GetMinUsedLBA() |
| max_used_lba = self.GetMaxUsedLBA() |
| primary_reserved = min_used_lba - self.header.PartitionEntriesStartingLBA |
| primary_last_lba = (self.header.PartitionEntriesStartingLBA + |
| partitions_blocks - 1) |
| |
| if primary_last_lba >= min_used_lba: |
| raise ValueError('Partition table overlaps partitions.') |
| if max_used_lba + partitions_blocks >= backup_lba: |
| raise ValueError('Partitions overlaps backup partition table.') |
| |
| last_usable_lba = backup_lba - primary_reserved - 1 |
| if last_usable_lba < max_used_lba: |
| last_usable_lba = max_used_lba |
| |
| self.header = self.NewNamedTuple( |
| self.header, |
| BackupLBA=backup_lba, |
| LastUsableLBA=last_usable_lba) |
| |
| def GetFreeSpace(self): |
| """Returns the free (available) space left according to LastUsableLBA.""" |
| max_lba = self.GetMaxUsedLBA() |
| assert max_lba <= self.header.LastUsableLBA, "Partitions too large." |
| return self.BLOCK_SIZE * (self.header.LastUsableLBA - max_lba) |
| |
| def ExpandPartition(self, i): |
| """Expands a given partition to last usable LBA. |
| |
| Args: |
| i: Index (0-based) of target partition. |
| """ |
| # Assume no partitions overlap, we need to make sure partition[i] has |
| # largest LBA. |
| if i < 0 or i >= len(self.GetValidPartitions()): |
| raise ValueError('Partition index %d is invalid.' % (i + 1)) |
| p = self.partitions[i] |
| max_used_lba = self.GetMaxUsedLBA() |
| if max_used_lba > p.LastLBA: |
| raise ValueError('Cannot expand partition %d because it is not the last ' |
| 'allocated partition.' % (i + 1)) |
| |
| old_blocks = p.LastLBA - p.FirstLBA + 1 |
| p = self.NewNamedTuple(p, LastLBA=self.header.LastUsableLBA) |
| new_blocks = p.LastLBA - p.FirstLBA + 1 |
| self.partitions[i] = p |
| logging.warn('Partition NR=%d expanded, size in LBA: %d -> %d.', |
| i + 1, old_blocks, new_blocks) |
| |
| def UpdateChecksum(self): |
| """Updates all checksum values in GPT header.""" |
| header = self.NewNamedTuple( |
| self.header, |
| CRC32=0, |
| PartitionArrayCRC32=self.GetPartitionsCRC32(self.partitions)) |
| self.header = self.NewNamedTuple( |
| header, |
| CRC32=self.GetHeaderCRC32(header)) |
| |
| def GetBackupHeader(self): |
| """Returns the backup header according to current header.""" |
| partitions_starting_lba = ( |
| self.header.BackupLBA - self.GetPartitionTableBlocks()) |
| header = self.NewNamedTuple( |
| self.header, |
| CRC32=0, |
| BackupLBA=self.header.CurrentLBA, |
| CurrentLBA=self.header.BackupLBA, |
| PartitionEntriesStartingLBA=partitions_starting_lba) |
| return self.NewNamedTuple( |
| header, |
| CRC32=self.GetHeaderCRC32(header)) |
| |
| def WriteToFile(self, f): |
| """Updates partition table in a disk image file.""" |
| |
| def WriteData(name, blob, lba): |
| """Writes a blob into given location.""" |
| logging.info('Writing %s in LBA %d (offset %d)', |
| name, lba, lba * self.BLOCK_SIZE) |
| f.seek(lba * self.BLOCK_SIZE) |
| f.write(blob) |
| |
| self.UpdateChecksum() |
| WriteData('GPT Header', self.GetHeaderBlob(self.header), |
| self.header.CurrentLBA) |
| WriteData('GPT Partitions', self.GetPartitionsBlob(self.partitions), |
| self.header.PartitionEntriesStartingLBA) |
| logging.info('Usable LBA: First=%d, Last=%d', |
| self.header.FirstUsableLBA, self.header.LastUsableLBA) |
| backup_header = self.GetBackupHeader() |
| WriteData('Backup Partitions', self.GetPartitionsBlob(self.partitions), |
| backup_header.PartitionEntriesStartingLBA) |
| WriteData('Backup Header', self.GetHeaderBlob(backup_header), |
| backup_header.CurrentLBA) |
| |
| |
| class GPTCommands(object): |
| """Collection of GPT sub commands for command line to use. |
| |
| The commands are derived from `cgpt`, but not necessary to be 100% compatible |
| with cgpt. |
| """ |
| |
| FORMAT_ARGS = [ |
| ('begin', 'beginning sector'), |
| ('size', 'partition size'), |
| ('type', 'type guid'), |
| ('unique', 'unique guid'), |
| ('label', 'label'), |
| ('Successful', 'Successful flag'), |
| ('Tries', 'Tries flag'), |
| ('Priority', 'Priority flag'), |
| ('Legacy', 'Legacy Boot flag'), |
| ('Attribute', 'raw 16-bit attribute value (bits 48-63)')] |
| |
| def __init__(self): |
| pass |
| |
| @classmethod |
| def RegisterRepair(cls, p): |
| """Registers the repair command to argparser. |
| |
| Args: |
| p: An argparse parser instance. |
| """ |
| p.add_argument( |
| '--expand', action='store_true', default=False, |
| help='Expands stateful partition to full disk.') |
| p.add_argument('image_file', type=argparse.FileType('rb+'), |
| help='Disk image file to repair.') |
| |
| def Repair(self, args): |
| """Repair damaged GPT headers and tables.""" |
| gpt = GPT.LoadFromFile(args.image_file) |
| gpt.Resize(os.path.getsize(args.image_file.name)) |
| |
| free_space = gpt.GetFreeSpace() |
| if args.expand: |
| if free_space: |
| gpt.ExpandPartition(0) |
| else: |
| logging.warn('- No extra space to expand.') |
| elif free_space: |
| logging.warn('Extra space found (%d, LBA=%d), ' |
| 'use --expand to expand partitions.', |
| free_space, free_space / gpt.BLOCK_SIZE) |
| |
| gpt.WriteToFile(args.image_file) |
| print('Disk image file %s repaired.' % args.image_file.name) |
| |
| @classmethod |
| def RegisterShow(cls, p): |
| """Registers the repair command to argparser. |
| |
| Args: |
| p: An argparse parser instance. |
| """ |
| p.add_argument('--numeric', '-n', action='store_true', |
| help='Numeric output only.') |
| p.add_argument('--quick', '-q', action='store_true', |
| help='Quick output.') |
| p.add_argument('--index', '-i', type=int, default=None, |
| help='Show specified partition only, with format args.') |
| for name, help_str in cls.FORMAT_ARGS: |
| # TODO(hungte) Alert if multiple args were specified. |
| p.add_argument('--%s' % name, '-%c' % name[0], action='store_true', |
| help='[format] %s.' % help_str) |
| p.add_argument('image_file', type=argparse.FileType('rb'), |
| help='Disk image file to show.') |
| |
| |
| def Show(self, args): |
| """Show partition table and entries.""" |
| |
| def FormatGUID(bytes_le): |
| return str(uuid.UUID(bytes_le=bytes_le)).upper() |
| |
| def FormatTypeGUID(p): |
| guid_str = FormatGUID(p.TypeGUID) |
| if not args.numeric: |
| names = gpt.TYPE_GUID_MAP.get(guid_str) |
| if names: |
| return names |
| return guid_str |
| |
| def FormatNames(p): |
| return p.Names.decode('utf-16-le').strip('\0') |
| |
| def IsBootableType(type_guid): |
| return type_guid in gpt.TYPE_GUID_LIST_BOOTABLE |
| |
| def FormatAttribute(attr): |
| if args.numeric: |
| return '[%x]' % (attr >> 48) |
| if attr & 4: |
| return 'legacy_boot=1' |
| return 'priority=%d tries=%d successful=%d' % ( |
| gpt.GetAttributePriority(attr), |
| gpt.GetAttributeTries(attr), |
| gpt.GetAttributeSuccess(attr)) |
| |
| def ApplyFormatArgs(p): |
| if args.begin: |
| return p.FirstLBA |
| elif args.size: |
| return p.LastLBA - p.FirstLBA + 1 |
| elif args.type: |
| return FormatTypeGUID(p) |
| elif args.unique: |
| return FormatGUID(p.UniqueGUID) |
| elif args.label: |
| return FormatNames(p) |
| elif args.Successful: |
| return gpt.GetAttributeSuccess(p.Attributes) |
| elif args.Priority: |
| return gpt.GetAttributePriority(p.Attributes) |
| elif args.Tries: |
| return gpt.GetAttributeTries(p.Attributes) |
| elif args.Legacy: |
| raise NotImplementedError |
| elif args.Attribute: |
| return '[%x]' % (p.Attributes >> 48) |
| else: |
| return None |
| |
| def IsFormatArgsSpecified(): |
| return any(getattr(args, arg[0]) for arg in self.FORMAT_ARGS) |
| |
| gpt = GPT.LoadFromFile(args.image_file) |
| fmt = '%12s %11s %7s %s' |
| fmt2 = '%32s %s: %s' |
| header = ('start', 'size', 'part', 'contents') |
| |
| if IsFormatArgsSpecified() and args.index is None: |
| raise ValueError('Format arguments must be used with -i.') |
| |
| partitions = gpt.GetValidPartitions() |
| if not (args.index is None or 0 < args.index <= len(partitions)): |
| raise ValueError('Invalid partition index: %d' % args.index) |
| |
| do_print_gpt_blocks = False |
| if not (args.quick or IsFormatArgsSpecified()): |
| print(fmt % header) |
| if args.index is None: |
| do_print_gpt_blocks = True |
| |
| if do_print_gpt_blocks: |
| print(fmt % (gpt.header.CurrentLBA, 1, '', 'Pri GPT header')) |
| print(fmt % (gpt.header.PartitionEntriesStartingLBA, |
| gpt.GetPartitionTableBlocks(), '', 'Pri GPT table')) |
| |
| for i, p in enumerate(partitions): |
| if args.index is not None and i != args.index - 1: |
| continue |
| |
| if IsFormatArgsSpecified(): |
| print(ApplyFormatArgs(p)) |
| continue |
| |
| type_guid = FormatGUID(p.TypeGUID) |
| print(fmt % (p.FirstLBA, p.LastLBA - p.FirstLBA + 1, i + 1, |
| FormatTypeGUID(p) if args.quick else |
| 'Label: "%s"' % FormatNames(p))) |
| |
| if not args.quick: |
| print(fmt2 % ('', 'Type', FormatTypeGUID(p))) |
| print(fmt2 % ('', 'UUID', FormatGUID(p.UniqueGUID))) |
| if args.numeric or IsBootableType(type_guid): |
| print(fmt2 % ('', 'Attr', FormatAttribute(p.Attributes))) |
| |
| if do_print_gpt_blocks: |
| f = args.image_file |
| f.seek(gpt.header.BackupLBA * gpt.BLOCK_SIZE) |
| backup_header = gpt.ReadHeader(f) |
| print(fmt % (backup_header.PartitionEntriesStartingLBA, |
| gpt.GetPartitionTableBlocks(backup_header), '', |
| 'Sec GPT table')) |
| print(fmt % (gpt.header.BackupLBA, 1, '', 'Sec GPT header')) |
| |
| def Create(self, args): |
| """Create or reset GPT headers and tables.""" |
| del args # Not used yet. |
| raise NotImplementedError |
| |
| def Add(self, args): |
| """Add, edit or remove a partition entry.""" |
| del args # Not used yet. |
| raise NotImplementedError |
| |
| def Boot(self, args): |
| """Edit the PMBR sector for legacy BIOSes.""" |
| del args # Not used yet. |
| raise NotImplementedError |
| |
| def Find(self, args): |
| """Locate a partition by its GUID.""" |
| del args # Not used yet. |
| raise NotImplementedError |
| |
| def Prioritize(self, args): |
| """Reorder the priority of all kernel partitions.""" |
| del args # Not used yet. |
| raise NotImplementedError |
| |
| def Legacy(self, args): |
| """Switch between GPT and Legacy GPT.""" |
| del args # Not used yet. |
| raise NotImplementedError |
| |
| @classmethod |
| def RegisterAllCommands(cls, subparsers): |
| """Registers all available commands to an argparser subparsers instance.""" |
| subcommands = [('show', cls.Show, cls.RegisterShow), |
| ('repair', cls.Repair, cls.RegisterRepair)] |
| for name, invocation, register_command in subcommands: |
| register_command(subparsers.add_parser(name, help=invocation.__doc__)) |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description='GPT Utility.') |
| parser.add_argument('--verbose', '-v', action='count', default=0, |
| help='increase verbosity.') |
| parser.add_argument('--debug', '-d', action='store_true', |
| help='enable debug output.') |
| subparsers = parser.add_subparsers(help='Sub-command help.', dest='command') |
| GPTCommands.RegisterAllCommands(subparsers) |
| |
| args = parser.parse_args() |
| log_level = max(logging.WARNING - args.verbose * 10, logging.DEBUG) |
| if args.debug: |
| log_level = logging.DEBUG |
| logging.basicConfig(format='%(module)s:%(funcName)s %(message)s', |
| level=log_level) |
| commands = GPTCommands() |
| try: |
| getattr(commands, args.command.capitalize())(args) |
| except Exception as e: |
| if args.verbose or args.debug: |
| logging.exception('Failure in command [%s]', args.command) |
| exit('ERROR: %s' % e) |
| |
| |
| if __name__ == '__main__': |
| main() |