blob: 56663f887253ecbbc7d007f010cbfcefa6d57db5 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utility to manipulate Chrome OS disk & firmware images for manufacturing.
Run "image_tool help" for more info and a list of subcommands.
To add a subcommand, just add a new SubCommand subclass to this file.
"""
from __future__ import print_function
import argparse
import contextlib
import glob
import inspect
import json
import logging
import os
import pipes
import re
import shutil
import subprocess
import sys
import tempfile
import textwrap
import time
import urlparse
# The edit_lsb command works better if readline enabled, but will still work if
# that is not available.
try:
import readline # pylint: disable=unused-import
except ImportError:
pass
# This file needs to run on various environments, for example a fresh Ubuntu
# that does not have Chromium OS source tree nor chroot. So we do want to
# prevent introducing more cros.factory dependency except very few special
# modules (pygpt, fmap, netboot_firmware_settings).
# Please don't add more cros.factory modules.
import factory_common # pylint: disable=unused-import
from cros.factory.utils import fmap
from cros.factory.utils import pygpt
from cros.factory.tools import netboot_firmware_settings
# Partition index for Chrome OS stateful partition.
PART_CROS_STATEFUL = 1
# Partition index for Chrome OS kernel A.
PART_CROS_KERNEL_A = 2
# Partition index for Chrome OS rootfs A.
PART_CROS_ROOTFS_A = 3
# Special options to mount Chrome OS rootfs partitions. (-t ext2, -o ro).
FS_TYPE_CROS_ROOTFS = 'ext2'
# Relative path of firmware updater on Chrome OS disk images.
PATH_CROS_FIRMWARE_UPDATER = '/usr/sbin/chromeos-firmwareupdate'
# The name of folder must match /etc/init/cros-payloads.conf.
DIR_CROS_PAYLOADS = 'cros_payloads'
# Relative path of RMA image metadata.
PATH_CROS_RMA_METADATA = os.path.join(DIR_CROS_PAYLOADS, 'rma_metadata.json')
# Mode for new created folder, 0755 = u+rwx, go+rx
MODE_NEW_DIR = 0o755
# Regular expression for parsing LSB value, which should be sh compatible.
RE_LSB = re.compile(r'^ *(.*)="?(.*[^"])"?$', re.MULTILINE)
# Key for Chrome OS board name in /etc/lsb-release.
KEY_LSB_CROS_BOARD = 'CHROMEOS_RELEASE_BOARD'
# Key for Chrome OS build version in /etc/lsb-release.
KEY_LSB_CROS_VERSION = 'CHROMEOS_RELEASE_VERSION'
# Regular expression for reading file system information from dumpe2fs.
RE_BLOCK_COUNT = re.compile(r'^Block count: *(.*)$', re.MULTILINE)
RE_BLOCK_SIZE = re.compile(r'^Block size: *(.*)$', re.MULTILINE)
# Simple constant(s)
MEGABYTE = 1048576
# The storage industry treat "mega" and "giga" differently.
GIGABYTE_STORAGE = 1000000000
# Default size of each disk block (or sector).
DEFAULT_BLOCK_SIZE = pygpt.GPT.DEFAULT_BLOCK_SIZE
class ArgTypes(object):
"""Helper class to collect all argument type checkers."""
@staticmethod
def ExistsPath(path):
"""An argument with existing path."""
if not os.path.exists(path):
raise argparse.ArgumentTypeError('Does not exist: %s' % path)
return path
@staticmethod
def GlobPath(pattern):
"""An argument as glob pattern, and solved as single path.
This is a useful type to specify default values with wildcard.
If the pattern is prefixed with '-', the value is returned as None without
raising exceptions.
If the pattern has '|', split the pattern by '|' and return the first
matched pattern.
"""
allow_none = False
if pattern.startswith('-'):
# Special trick to allow defaults.
pattern = pattern[1:]
allow_none = True
goals = pattern.split('|')
for i, goal in enumerate(goals):
found = glob.glob(goal)
if len(found) < 1:
if i + 1 < len(goals):
continue
if allow_none:
return None
raise argparse.ArgumentTypeError('Does not exist: %s' % pattern)
if len(found) > 1:
raise argparse.ArgumentTypeError(
'Too many files found for <%s>: %s' % (pattern, found))
return found[0]
class SysUtils(object):
"""Collection of system utilities."""
@staticmethod
def Shell(commands, sudo=False, output=False, check=True, silent=False,
**kargs):
"""Helper to execute 'sudo' command in a shell.
A simplified implementation. To reduce dependency, we don't want to use
process_utils.Spawn.
Args:
sudo: Execute the command with sudo if needed.
output: Returns the output from command (check_call).
"""
if not isinstance(commands, basestring):
commands = ' '.join(pipes.quote(arg) for arg in commands)
kargs['shell'] = True
caller = subprocess.check_output if output else subprocess.check_call
if sudo and os.geteuid() != 0:
commands = 'sudo -E ' + commands
if silent:
commands += ' >/dev/null 2>&1'
if not check:
if output:
commands += ' || true'
else:
caller = subprocess.call
return caller(commands, **kargs)
@staticmethod
def Sudo(commands, **kargs):
"""Shortcut to Shell(commands, sudo=True)."""
kargs['sudo'] = True
return Shell(commands, **kargs)
@staticmethod
def SudoOutput(commands, **kargs):
"""Shortcut to Sudo(commands, output=True)."""
kargs['output'] = True
return Sudo(commands, **kargs)
@staticmethod
def FindCommand(command):
"""Returns the right path to invoke given command."""
provided = os.path.join(
os.path.dirname(os.path.abspath(sys.argv[0])), command)
if not os.path.exists(provided):
provided = Shell(['which', command], output=True, check=False).strip()
if not provided:
raise RuntimeError('Cannot find program: %s' % command)
return provided
@classmethod
def FindCommands(cls, *commands):
"""Find any of the given commands in order."""
for cmd in commands:
try:
return cls.FindCommand(cmd)
except Exception:
pass
raise RuntimeError(
'Cannot find any of the following commands: %s' % ', '.join(commands))
@classmethod
def FindCGPT(cls):
"""Returns the best match of `cgpt` style command.
The `cgpt` is a native program that is hard to deploy. As an alternative, we
have the `pygpt` that emulates most of its functions, and that is accessible
via `image_tool gpt`.
"""
if os.path.exists(__file__) and os.access(__file__, os.X_OK):
return '%s gpt' % __file__
# Are we inside PAR?
par_path = os.environ.get('PAR_PATH')
if par_path:
if os.path.basename(par_path) == 'image_tool':
return '%s gpt' % par_path
return 'sh %s image_tool gpt' % par_path
# Nothing more - let's try to find the real programs.
return cls.FindCommands('pygpt', 'cgpt')
@classmethod
def FindBZip2(cls):
"""Returns a path to best working 'bzip2'."""
return cls.FindCommands('lbzip2', 'pbzip2', 'bzip2')
@staticmethod
@contextlib.contextmanager
def TempDirectory(prefix='imgtool_', delete=True):
"""Context manager to allocate and remove temporary folder.
Args:
prefix: a string as prefix of the created folder name.
"""
tmp_folder = None
try:
tmp_folder = tempfile.mkdtemp(prefix=prefix)
yield tmp_folder
finally:
if tmp_folder and delete:
Sudo(['rm', '-rf', tmp_folder], check=False)
@staticmethod
def PartialCopy(src_path, dest_path, count, src_offset=0, dest_offset=0,
buffer_size=32 * MEGABYTE, verbose=None):
"""Copy partial contents from one file to another file, like 'dd'."""
with open(src_path, 'rb') as src:
if verbose is None:
verbose = count / buffer_size > 5
with open(dest_path, 'r+b') as dest:
src.seek(src_offset)
dest.seek(dest_offset)
remains = count
while remains > 0:
data = src.read(min(remains, buffer_size))
dest.write(data)
remains -= len(data)
if verbose:
sys.stderr.write('.')
if verbose:
sys.stderr.write('\n')
# Short cut to SysUtils.
Shell = SysUtils.Shell
Sudo = SysUtils.Sudo
SudoOutput = SysUtils.SudoOutput
def Aligned(value, alignment):
"""Helper utility to calculate aligned numbers.
Args:
value: an integer as original value.
alignment: an integer for alignment.
"""
remains = value % alignment
return value - remains + (alignment if remains else 0)
class GPT(pygpt.GPT):
"""A special version GPT object with more helper utilities."""
class Partition(pygpt.GPT.Partition):
"""A special GPT Partition object with mount ability."""
@staticmethod
@contextlib.contextmanager
def _Map(image, offset, size, partscan=False, block_size=None):
"""Context manager to map (using losetup) partition(s) from disk image.
Args:
image: a path to disk image to map.
offset: an integer as offset to partition, or None to map whole disk.
size: an integer as partition size, or None for whole disk.
partscan: True to scan partition table and create sub device files.
block_size: specify the size of each logical block.
"""
loop_dev = None
args = ['losetup', '--show', '--find']
if offset is None:
# Note "losetup -P" needs Ubuntu 15+.
# TODO(hungte) Use partx if -P is not supported (partx -d then -a).
if partscan:
args += ['-P']
else:
args += ['-o', str(offset), '--sizelimit', str(size)]
if block_size is not None and block_size != pygpt.GPT.DEFAULT_BLOCK_SIZE:
# For Linux kernel without commit "loop: add ioctl for changing logical
# block size", calling "-b 512" will fail, so we should only add "-b"
# when needed, so people with standard block size can work happily
# without upgrading their host kernel.
args += ['-b', str(block_size)]
args += [image]
try:
loop_dev = SudoOutput(args).strip()
yield loop_dev
finally:
if loop_dev:
Sudo(['umount', '-R', loop_dev], check=False, silent=True)
Sudo(['losetup', '-d', loop_dev], check=False, silent=True)
def Map(self):
"""Maps given partition to loop block device."""
logging.debug('Map %s: %s(+%s)', self, self.offset, self.size)
return self._Map(self.image, self.offset, self.size)
@classmethod
def MapAll(cls, image, partscan=True, block_size=None):
"""Maps an image with all partitions to loop block devices.
Map the image to /dev/loopN, and all partitions will be created as
/dev/loopNpM, where M stands for partition number.
This is not supported by older systems.
Args:
image: a path to disk image to map.
partscan: True to scan partition table and create sub device files.
block_size: specify the size of each logical block.
Returns:
The mapped major loop device (/dev/loopN).
"""
return cls._Map(image, None, None, partscan, block_size)
@contextlib.contextmanager
def Mount(self, mount_point=None, rw=False, fs_type=None, options=None,
auto_umount=True, silent=False):
"""Context manager to mount partition from given disk image.
Args:
mount_point: directory to mount, or None to use temporary directory.
rw: True to mount as read-write, otherwise read-only (-o ro).
fs_type: string as file system type (-t).
options: string as extra mount options (-o).
auto_umount: True to un-mount when leaving context.
silent: True to hide all warning and error messages.
"""
options = options or []
if isinstance(options, basestring):
options = [options]
options = ['rw' if rw else 'ro'] + options
options += ['loop', 'offset=%s' % self.offset, 'sizelimit=%s' % self.size]
args = ['mount', '-o', ','.join(options)]
if fs_type:
args += ['-t', fs_type]
temp_dir = None
try:
if not mount_point:
temp_dir = tempfile.mkdtemp(prefix='imgtool_')
mount_point = temp_dir
args += [self.image, mount_point]
logging.debug('Partition.Mount: %s', ' '.join(args))
Sudo(args, silent=silent)
yield mount_point
finally:
if auto_umount:
if mount_point:
Sudo(['umount', '-R', mount_point], check=False)
if temp_dir:
os.rmdir(temp_dir)
def MountAsCrOSRootfs(self, *args, **kargs):
"""Mounts as Chrome OS root file system with rootfs verification enabled.
The Chrome OS disk image with rootfs verification turned on will enable
the RO bit in ext2 attributes and can't be mounted without specifying
mount arguments "-t ext2 -o ro".
"""
assert kargs.get('rw', False) is False, (
'Cannot change Chrome OS rootfs %s.' % self)
assert kargs.get('fs_type', FS_TYPE_CROS_ROOTFS) == FS_TYPE_CROS_ROOTFS, (
'Chrome OS rootfs %s must be mounted as %s.' % (
self, FS_TYPE_CROS_ROOTFS))
kargs['rw'] = False
kargs['fs_type'] = FS_TYPE_CROS_ROOTFS
return self.Mount(*args, **kargs)
def CopyFile(self, rel_path, dest, **mount_options):
"""Copies a file inside partition to given destination.
Args:
rel_path: relative path to source on disk partition.
dest: path of destination (file or directory).
mount_options: anything that must be passed to Partition.Mount.
"""
with self.Mount(**mount_options) as rootfs:
# If rel_path is absolute then os.join will discard rootfs.
if os.path.isabs(rel_path):
rel_path = '.' + rel_path
src_path = os.path.join(rootfs, rel_path)
dest_path = (os.path.join(dest, os.path.basename(rel_path)) if
os.path.isdir(dest) else dest)
logging.debug('Copying %s => %s ...', src_path, dest_path)
shutil.copy(src_path, dest_path)
return dest_path
@staticmethod
def _ParseExtFileSystemSize(block_dev):
"""Helper to parse ext* file system size using dumpe2fs.
Args:
raw_part: a path to block device.
"""
raw_info = SudoOutput(['dumpe2fs', '-h', block_dev])
# The 'block' in file system may be different from disk/partition logical
# block size (LBA).
fs_block_count = int(RE_BLOCK_COUNT.findall(raw_info)[0])
fs_block_size = int(RE_BLOCK_SIZE.findall(raw_info)[0])
return fs_block_count * fs_block_size
def GetFileSystemSize(self):
"""Returns the (ext*) file system size.
It is possible the real space occupied by file system is smaller than
partition size, especially in Chrome OS, the extra space is reserved for
verity data (rootfs verification) or to help quick wiping in factory
process.
"""
with self.Map() as raw_part:
return self._ParseExtFileSystemSize(raw_part)
def ResizeFileSystem(self, new_size=None):
"""Resizes the file system in given partition.
resize2fs may not accept size in number > INT32, so we have to specify the
size in larger units, for example MB; and that implies the result may be
different from new_size.
Args:
new_size: The expected new size. None to use whole partition.
Returns:
New size in bytes.
"""
with self.Map() as raw_part:
# File system must be clean before we can perform resize2fs.
# e2fsck may return 1 "errors corrected" or 2 "corrected and need
# reboot".
old_size = self._ParseExtFileSystemSize(raw_part)
result = Sudo(['e2fsck', '-y', '-f', raw_part], check=False)
if result > 2:
raise RuntimeError('Failed ensuring file system integrity (e2fsck).')
args = ['resize2fs', '-f', raw_part]
if new_size:
args.append('%sM' % (new_size / MEGABYTE))
Sudo(args)
real_size = self._ParseExtFileSystemSize(raw_part)
logging.debug(
'%s (%s) file system resized from %s (%sM) to %s (%sM), req = %s M',
self, self.size, old_size, old_size / MEGABYTE,
real_size, real_size / MEGABYTE,
new_size / MEGABYTE if new_size else '(ALL)')
return real_size
def Copy(self, dest, check_equal=True):
"""Copies one partition to another partition.
Args:
dest: a Partition object as the destination.
check_equal: True to raise exception if the sizes of partitions are
different.
"""
if self.size != dest.size:
if check_equal:
raise RuntimeError(
'Partition size is different (%d, %d).' % (self.size, dest.size))
elif self.size > dest.size:
raise RuntimeError(
'Source partition (%s) is larger than destination (%s).' %
(self.size, dest.size))
SysUtils.PartialCopy(self.image, dest.image, self.size, self.offset,
dest.offset)
def Partition(image, number):
"""Returns a GPT object by given parameters."""
part = GPT.LoadFromFile(image).GetPartition(number)
if part.IsUnused():
raise RuntimeError('Partition %s is unused.' % part)
return part
class LSBFile(object):
"""Access /etc/lsb-release file (or files in same format).
The /etc/lsb-release can be loaded directly by shell ( . /etc/lsb-release ).
There is no really good and easy way to parse that without sh, but fortunately
for the fields we care, it's usually A=B or A="B C".
Also, in Chrome OS, the /etc/lsb-release was implemented without using quotes
(i,e., A=B C, no matter if the value contains space or not).
"""
def __init__(self, path=None, is_cros=True):
self._path = path
self._raw_data = ''
self._dict = {}
self._is_cros = is_cros
if not path:
return
with open(path) as f:
self._raw_data = f.read().strip() # Remove trailing \n or \r
self._dict = dict(RE_LSB.findall(self._raw_data))
def AsRawData(self):
return self._raw_data
def AsDict(self):
return self._dict
def GetPath(self):
return self._path
def FormatKeyValue(self, key, value):
return ('%s=%s' if self._is_cros or ' ' not in value else '%s="%s"') % (
key, value)
def GetValue(self, key, default=None):
return self._dict.get(key, default)
def AppendValue(self, key, value):
self._dict[key] = value
self._raw_data += '\n' + self.FormatKeyValue(key, value)
def SetValue(self, key, value):
if key in self._dict:
self._dict[key] = value
self._raw_data = re.sub(
r'^' + re.escape(key) + r'=.*', self.FormatKeyValue(key, value),
self._raw_data, flags=re.MULTILINE)
else:
self.AppendValue(key, value)
def DeleteValue(self, key):
if key not in self._dict:
return
self._dict.pop(key)
self._raw_data = re.sub(r'^' + re.escape(key) + r'=.*\n*', '',
self._raw_data, flags=re.MULTILINE)
def Install(self, destination, backup=False):
"""Installs the contents to the given location as lsb-release style file.
The file will be owned by root:root, with file mode 0644.
"""
with tempfile.NamedTemporaryFile(prefix='lsb_') as f:
f.write(self._raw_data + '\n')
f.flush()
os.chmod(f.name, 0o644)
if backup and os.path.exists(destination):
bak_file = '%s.bak.%s' % (destination, time.strftime('%Y%m%d%H%M%S'))
Sudo(['cp', '-pf', destination, bak_file])
Sudo(['cp', '-pf', f.name, destination])
Sudo(['chown', 'root:root', destination])
def GetChromeOSBoard(self, remove_signer=True):
"""Returns the Chrome OS board name.
Gets the value using KEY_LSB_CROS_BOARD. For test or DEV signed images, this
is exactly the board name we passed to build commands. For PreMP/MP signed
images, this may have suffix '-signed-KEY', where KEY is the key name like
'mpv2'.
Args:
remove_signer: True to remove '-signed-XX' information.
"""
board = self.GetValue(KEY_LSB_CROS_BOARD, '')
if remove_signer:
# For signed images, the board may come in $BOARD-signed-$KEY.
signed_index = board.find('-signed-')
if signed_index > -1:
board = board[:signed_index]
return board
def GetChromeOSVersion(self, remove_timestamp=True):
"""Returns the Chrome OS build version.
Gets the value using KEY_LSB_CROS_VERSION. For self-built images, this may
include a time stamp.
Args:
remove_timestamp: Remove the timestamp like version info if available.
"""
version = self.GetValue('CHROMEOS_RELEASE_VERSION', '')
if remove_timestamp:
version = version.split()[0]
return version
class RMAImageBoardInfo(object):
"""Store the RMA image information related to one board."""
__slots__ = ['board', 'kernel', 'rootfs']
def __init__(self,
board,
kernel=PART_CROS_KERNEL_A,
rootfs=PART_CROS_ROOTFS_A):
self.board = board
self.kernel = kernel
self.rootfs = rootfs
def ToDict(self):
return {k: getattr(self, k) for k in self.__slots__}
def _WriteRMAMetadata(stateful, board_list):
"""Write RMA metadata to mounted stateful parititon.
Args:
stateful: path of stateful partition mount point.
board_list: a list of RMAImageBoardInfo object.
"""
with tempfile.NamedTemporaryFile(prefix='metadata_') as f:
json.dump([b.ToDict() for b in board_list], f)
f.flush()
os.chmod(f.name, 0o644)
destination = os.path.join(stateful, PATH_CROS_RMA_METADATA)
Sudo(['cp', '-pf', f.name, destination])
Sudo(['chown', 'root:root', destination])
def _ReadRMAMetadata(stateful):
"""Read RMA metadata from mounted stateful partition.
Args:
stateful: path of stateful partition mount point.
Returns:
RMA metadata, or None if file doesn't not exist.
"""
if os.path.exists(os.path.join(stateful, PATH_CROS_RMA_METADATA)):
with open(os.path.join(stateful, PATH_CROS_RMA_METADATA)) as f:
metadata = json.load(f)
metadata = [
RMAImageBoardInfo(board=e['board'],
kernel=e['kernel'],
rootfs=e['rootfs'])
for e in metadata]
return metadata
else:
logging.warning('Cannot find %s/%s', stateful, PATH_CROS_RMA_METADATA)
return None
class ChromeOSFactoryBundle(object):
"""Utilities to work with factory bundle."""
# Types of build targets (for DefineBundleArguments to use).
PREFLASH = 1
RMA = 2
BUNDLE = 3
def __init__(self, temp_dir, board, release_image, test_image, toolkit,
factory_shim=None, enable_firmware=True, firmware=None,
hwid=None, complete=None, netboot=None, setup_dir=None,
server_url=None):
self._temp_dir = temp_dir
# Member data will be looked up by getattr so we don't prefix with '_'.
self._board = board
self.release_image = release_image
self.test_image = test_image
self.toolkit = toolkit
self.factory_shim = factory_shim
self.enable_firmware = enable_firmware
self._firmware = firmware
self.hwid = hwid
self.complete = complete
self.netboot = netboot
self.setup_dir = setup_dir
self.server_url = server_url
self.components = [
'release_image', 'test_image', 'toolkit', 'firmware', 'hwid',
'complete']
@staticmethod
def DefineBundleArguments(parser, build_type):
"""Define common argparse arguments to work with factory bundle.
Args:
parser: An argparse subparser to add argument definitions.
"""
# Common arguments for all types.
parser.add_argument(
'--release_image', default='release_image/*.bin',
type=ArgTypes.GlobPath,
help=('path to a Chromium OS (release or recovery) image. '
'default: %(default)s'))
parser.add_argument(
'--test_image', default='test_image/*.bin',
type=ArgTypes.GlobPath,
help='path to a Chromium OS test image. default: %(default)s')
parser.add_argument(
'--toolkit', default='toolkit/*.run',
type=ArgTypes.GlobPath,
help='path to a Chromium OS factory toolkit. default: %(default)s')
parser.add_argument(
'--hwid', default='-hwid/*.sh',
type=ArgTypes.GlobPath,
help='path to a HWID bundle if available. default: %(default)s')
if build_type in [ChromeOSFactoryBundle.RMA, ChromeOSFactoryBundle.BUNDLE]:
# firmware/ may be updater*.sh or chromeos-firmwareupdate.
parser.add_argument(
'--firmware', default='-firmware/*update*',
type=ArgTypes.GlobPath,
help=('optional path to a firmware update (chromeos-firmwareupdate); '
'if not specified, extract firmware from --release_image '
'unless if --no-firmware is specified'))
parser.add_argument(
'--no-firmware', dest='enable_firmware', action='store_false',
default=True,
help='skip running firmware updater')
parser.add_argument(
'--factory_shim', default='factory_shim/*.bin',
type=ArgTypes.GlobPath,
help=('path to a factory shim (build_image factory_install), '
'default: %(default)s'))
parser.add_argument(
'--complete_script', dest='complete', default='-complete/*.sh',
type=ArgTypes.GlobPath,
help='path to a script for last-step execution of factory install')
parser.add_argument(
'--board',
help='board name for dynamic installation')
if build_type in [ChromeOSFactoryBundle.BUNDLE]:
parser.add_argument(
'--setup_dir', default='-setup',
type=ArgTypes.GlobPath,
help='path to scripts for setup and deployment from factory zip')
parser.add_argument(
'--netboot', default='-netboot|factory_shim/netboot',
type=ArgTypes.GlobPath,
help='path to netboot firmware (image.net.bin) and kernel (vmlinuz)')
# TODO(hungte) Support more flexible names like 'evt2'.
parser.add_argument(
'-p', '--phase', choices=['proto', 'evt', 'dvt', 'pvt', 'mp'],
default='proto',
help='build phase (evt, dvt, pvt or mp).')
parser.add_argument(
'-s', '--server_url',
help='URL to factory server. The host part may be used for TFTP.')
@property
def board(self):
"""Determines the right 'board' configuration."""
if self._board:
return self._board
part = Partition(self.release_image, PART_CROS_ROOTFS_A)
with part.MountAsCrOSRootfs() as rootfs:
self._board = LSBFile(
os.path.join(rootfs, 'etc', 'lsb-release')).GetChromeOSBoard()
logging.info('Detected board as %s from %s.', self._board, part)
return self._board
@property
def firmware(self):
if not self.enable_firmware:
return None
elif self._firmware is not None:
return self._firmware
part = Partition(self.release_image, PART_CROS_ROOTFS_A)
logging.info('Loaded %s from %s.', PATH_CROS_FIRMWARE_UPDATER, part)
self._firmware = part.CopyFile(
PATH_CROS_FIRMWARE_UPDATER, self._temp_dir, fs_type=FS_TYPE_CROS_ROOTFS)
return self._firmware
def CreatePayloads(self, target_dir):
"""Builds cros_payload contents into target_dir.
This is needed to store payloads or install to another system.
Args:
target_dir: a path to a folder for generating cros_payload contents.
Returns:
The JSON path in target_dir for cros_payload to use.
"""
logging.debug('Generating cros_payload contents...')
json_path = os.path.join(target_dir, '%s.json' % self.board)
with open(json_path, 'wt') as f:
f.write('{}')
cros_payload = SysUtils.FindCommand('cros_payload')
for component in self.components:
resource = getattr(self, component)
if resource:
logging.debug('Add %s payloads from %s...', component, resource)
Shell([cros_payload, 'add', json_path, component, resource])
else:
print('Leaving %s component payload as empty.' % component)
return json_path
def GetPMBR(self, image_path):
"""Creates a file containing PMBR contents from given image.
Chrome OS firmware does not really need PMBR, but many legacy operating
systems, UEFI BIOS, or particular SOC may need it, so we do want to create
PMBR using a bootable image (for example release or factory_shim image).
Args:
image_path: a path to a Chromium OS disk image to read back PMBR.
Returns:
A file (in self._temp_dir) containing PMBR.
"""
pmbr_path = os.path.join(self._temp_dir, '_pmbr')
with open(image_path) as src:
with open(pmbr_path, 'wb') as dest:
# The PMBR is always less than DEFAULT_BLOCK_SIZE, no matter if the
# disk has larger sector size.
dest.write(src.read(DEFAULT_BLOCK_SIZE))
return pmbr_path
def ExecutePartitionScript(
self, image_path, block_size, pmbr_path, rootfs, verbose=False):
"""Creates a partition script from write_gpt.sh inside image_path.
To initialize (create partition table) on a new preflashed disk image for
Chrome OS, we need to execute the write_gpt.sh included in rootfs of disk
images.
Args:
image_path: the disk image to initialize partition table.
block_size: the size of each logical block.
pmbr_path: a path to a file with PMBR code (by self.CreatePMBR).
rootfs: a directory to root file system containing write_gpt.sh script.
verbose: True to enable debug out of script execution (-x).
"""
write_gpt_path = os.path.join(rootfs, 'usr', 'sbin', 'write_gpt.sh')
chromeos_common_path = os.path.join(
rootfs, 'usr', 'share', 'misc', 'chromeos-common.sh')
if not os.path.exists(write_gpt_path):
raise RuntimeError('Missing write_gpt.sh.')
if not os.path.exists(chromeos_common_path):
raise RuntimeError('Missing chromeos-common.sh.')
# pygpt is already available, but to allow write_gpt.sh access gpt
# commands, have to find an externally executable GPT.
cgpt_command = SysUtils.FindCGPT()
with GPT.Partition.MapAll(
image_path, partscan=False, block_size=block_size) as loop_dev:
# stateful partitions are enlarged only if the target is a block device
# (not file), in order to reduce USB image size. As a result, we have to
# run partition script with disk mapped.
commands = [
# Currently write_gpt.sh will load chromeos_common from a fixed path.
# In future when it supports overriding ROOT, we can invoke prevent
# sourcing chromeos_common.sh explicitly below.
'. "%s"' % chromeos_common_path,
'. "%s"' % write_gpt_path,
'GPT="%s"' % cgpt_command,
'set -e',
'write_base_table "%s" "%s"' % (loop_dev, pmbr_path),
# write_base_table will set partition #2 to S=0, T=15, P=15.
# However, if update_engine is disabled (very common in factory) or if
# the system has to do several quick reboots before reaching
# chromeos-setgoodkernel, then the device may run out of tries without
# setting S=1 and will stop booting. So we want to explicitly set S=1.
'%s add -i 2 -S 1 "%s"' % (cgpt_command, loop_dev)]
# The commands must be executed in a single invocation for '.' to work.
command = ' ; '.join(commands)
Sudo("bash %s -c '%s'" % ('-x' if verbose else '', command))
def InitDiskImage(self, output, sectors, sector_size, verbose=False):
"""Initializes (resize and partition) a new disk image.
Args:
output: a path to disk image to initialize.
sectors: integer for new size in number of sectors.
sector_size: size of each sector (block) in bytes.
verbose: provide more details when calling partition execution script.
Returns:
An integer as the size (in bytes) of output file.
"""
new_size = sectors * sector_size
print('Initialize disk image in %s*%s bytes [%s G]' %
(sectors, sector_size, new_size / GIGABYTE_STORAGE))
pmbr_path = self.GetPMBR(self.release_image)
# TODO(hungte) Support block device as output, and support 'preserve'.
Shell(['truncate', '-s', '0', output])
Shell(['truncate', '-s', str(new_size), output])
part = Partition(self.release_image, PART_CROS_ROOTFS_A)
with part.MountAsCrOSRootfs() as rootfs:
self.ExecutePartitionScript(
output, sector_size, pmbr_path, rootfs, verbose)
return new_size
def CreateDiskImage(self, output, sectors, sector_size, stateful_free_space,
verbose=False):
"""Creates the installed disk image.
This creates a complete image that can be pre-flashed to and boot from
internal storage.
Args:
output: a path to disk image to initialize.
sectors: number of sectors in disk image.
sector_size: size of each sector in bytes.
stateful_free_space: extra free space to claim in MB.
verbose: provide more verbose output when initializing disk image.
"""
new_size = self.InitDiskImage(output, sectors, sector_size, verbose)
payloads_dir = os.path.join(self._temp_dir, DIR_CROS_PAYLOADS)
os.mkdir(payloads_dir)
json_path = self.CreatePayloads(payloads_dir)
cros_payload = SysUtils.FindCommand('cros_payload')
with GPT.Partition.MapAll(output) as output_dev:
Sudo([cros_payload, 'install', json_path, output_dev,
'test_image', 'release_image'])
# output_dev (via /dev/loopX) needs root permission so we have to leave
# previous context and resize using the real disk image file.
part = Partition(output, PART_CROS_STATEFUL)
part.ResizeFileSystem(
part.GetFileSystemSize() + stateful_free_space * MEGABYTE)
with GPT.Partition.MapAll(output) as output_dev:
targets = ['toolkit', 'release_image.crx_cache']
if self.hwid:
targets += ['hwid']
Sudo([cros_payload, 'install', json_path, output_dev] + targets)
logging.debug('Add /etc/lsb-factory if not exists.')
with part.Mount(rw=True) as stateful:
Sudo(['touch', os.path.join(stateful, 'dev_image', 'etc', 'lsb-factory')],
check=False)
return new_size
def CreateRMAImage(self, output):
"""Creates the RMA bootable installation disk image.
This creates an RMA image that can boot and install all factory software
resouces to device.
Args:
output: a path to disk image to initialize.
block_size: the size of block (sector) in bytes in output image.
"""
# It is possible to enlarge the disk by calculating sizes of all input
# files, create cros_payloads folder in the disk image file, to minimize
# execution time. However, that implies we have to shrink disk image later
# (due to gz), and run build_payloads using root, which are all not easy.
# As a result, here we want to create payloads in temporary folder then copy
# into disk image.
payloads_dir = os.path.join(self._temp_dir, DIR_CROS_PAYLOADS)
os.mkdir(payloads_dir, MODE_NEW_DIR)
self.CreatePayloads(payloads_dir)
payloads_size = int(
SudoOutput(['du', '-sk', payloads_dir]).split()[0]) * 1024
print('cros_payloads size: %s M' % (payloads_size / MEGABYTE))
shutil.copyfile(self.factory_shim, output)
block_size = Partition(output, PART_CROS_STATEFUL).block_size
old_size = os.path.getsize(output)
new_size = Aligned(old_size + payloads_size, block_size)
print('Changing size: %s M => %s M' %
(old_size / MEGABYTE, new_size / MEGABYTE))
Shell(['truncate', '-s', str(new_size), output])
gpt = pygpt.GPT.LoadFromFile(output)
gpt.Resize(new_size)
gpt.ExpandPartition(PART_CROS_STATEFUL)
gpt.WriteToFile(output)
part = Partition(output, PART_CROS_STATEFUL)
part.ResizeFileSystem()
with part.Mount(rw=True) as stateful:
print('Moving payload files to disk image...')
new_name = os.path.join(stateful, DIR_CROS_PAYLOADS)
if os.path.exists(new_name):
raise RuntimeError('Factory shim already contains %s - already RMA?' %
DIR_CROS_PAYLOADS)
Sudo(['chown', '-R', 'root:root', payloads_dir])
Sudo(['mv', '-f', payloads_dir, stateful])
# Update lsb-factory file.
lsb_path = os.path.join(stateful, 'dev_image', 'etc', 'lsb-factory')
lsb_file = LSBFile(lsb_path if os.path.exists(lsb_path) else None)
lsb_file.AppendValue('FACTORY_INSTALL_FROM_USB', '1')
lsb_file.AppendValue('USE_CROS_PAYLOAD', '1')
lsb_file.Install(lsb_path)
_WriteRMAMetadata(stateful,
board_list=[RMAImageBoardInfo(board=self.board)])
Sudo(['df', '-h', stateful])
@staticmethod
def ShowRMAImage(image):
"""Show the content of a RMA image."""
gpt = GPT.LoadFromFile(image)
stateful_part = gpt.GetPartition(PART_CROS_STATEFUL)
with stateful_part.Mount() as stateful:
if not os.path.exists(os.path.join(stateful, PATH_CROS_RMA_METADATA)):
raise RuntimeError('Cannot find file /%s, is this a RMA shim?' %
PATH_CROS_RMA_METADATA)
payloads_dir = os.path.join(stateful, DIR_CROS_PAYLOADS)
if not os.path.exists(payloads_dir):
raise RuntimeError('Cannot find dir /%s, is this a RMA shim?' %
DIR_CROS_PAYLOADS)
metadata = _ReadRMAMetadata(stateful)
print('This RMA shim contains boards: %s' % (
' '.join(board_info.board for board_info in metadata)))
split_line = '-' * 25
print(split_line)
for board_info in metadata:
print('board:', board_info.board)
# Print the content of cros-payload.
cros_payload_metadata = os.path.join(
payloads_dir, '%s.json' % board_info.board)
if os.path.exists(cros_payload_metadata):
with open(cros_payload_metadata) as f:
cros_payload_metadata = json.load(f)
for resource_name, value in cros_payload_metadata.iteritems():
print('%s: %s' % (resource_name, value.get('version', '<unknown>')))
else:
logging.warning('Cannot find cros-payload metadata of %s',
board_info.board)
# Print the version of install shim rootfs.
part = gpt.GetPartition(board_info.rootfs)
with part.MountAsCrOSRootfs() as rootfs:
lsb_path = os.path.join(rootfs, 'etc', 'lsb-release')
version = LSBFile(lsb_path).GetChromeOSVersion(remove_timestamp=False)
print('install_shim:', version)
print(split_line)
@staticmethod
def MergeRMAImage(output, images):
"""Merges multiple RMA (USB installation) disk images.
The RMA image should have factory_install kernel and rootfs in (2, 3) and
resources in stateful partition cros_payloads. This function extracts
all stateful partitions and then generate the output image by merging the
resource files to partition 1 and cloning partition 2/3 of each input image.
The layout of the merged output image:
1 stateful [cros_payloads from all rmaimgX]
2 kernel [install-rmaimg1]
3 rootfs [install-rmaimg1]
4 kernel [install-rmaimg2]
5 rootfs [install-rmaimg2]
6 kernel [install-rmaimg3]
7 rootfs [install-rmaimg3]
...
"""
stateful_parts = []
kern_rootfs_parts = []
block_size = 0
# Currently we only support merging images in same block size.
for path in images:
gpt = pygpt.GPT.LoadFromFile(path)
if block_size == 0:
block_size = gpt.block_size
assert gpt.block_size == block_size, (
'Cannot merge image %s due to different block size (%s, %s)' %
(path, block_size, gpt.block_size))
stateful_parts.append(gpt.GetPartition(PART_CROS_STATEFUL))
with Partition(path, PART_CROS_STATEFUL).Mount() as src_dir:
src_metadata = _ReadRMAMetadata(src_dir)
for board_info in src_metadata:
kern_rootfs_parts.append(gpt.GetPartition(board_info.kernel))
kern_rootfs_parts.append(gpt.GetPartition(board_info.rootfs))
# Build a new image based on first image's layout.
gpt = pygpt.GPT.LoadFromFile(images[0])
pad_blocks = gpt.header.FirstUsableLBA
state_blocks = sum(p.blocks for p in stateful_parts)
data_blocks = state_blocks + sum(p.blocks for p in kern_rootfs_parts)
# pad_blocks hold header and partition tables, in front and end of image.
new_size = (data_blocks + pad_blocks * 2) * block_size
Shell(['truncate', '-s', '0', output])
Shell(['truncate', '-s', str(new_size), output])
gpt.Resize(new_size)
assert (gpt.header.LastUsableLBA - gpt.header.FirstUsableLBA + 1 >=
data_blocks), 'Disk image is too small.'
# Clear existing entries because this GPT was cloned from other image.
for p in gpt.partitions:
p.Zero()
used_guids = []
def AddPartition(number, p, begin, blocks=None):
next_lba = begin + (blocks or p.blocks)
guid = p.UniqueGUID
if guid in used_guids:
# Ideally we don't need to change UniqueGUID, but if user specified same
# disk images in images then this may cause problems, for example
# INVALID_ENTRIES in cgpt.
logging.warning(
'Duplicated UniqueGUID found from %s, replace with random.', p)
guid = pygpt.GUID.Random()
used_guids.append(guid)
# The target number location will be different so we have to clone,
# update and call UpdatePartition with new number explicitly.
p = p.Clone()
p.Update(
UniqueGUID=guid,
FirstLBA=begin,
LastLBA=next_lba - 1)
gpt.UpdatePartition(p, number=number)
return next_lba
begin = AddPartition(
1, stateful_parts[0], gpt.header.FirstUsableLBA, state_blocks)
for i, p in enumerate(kern_rootfs_parts, 2):
begin = AddPartition(i, p, begin)
gpt.WriteToFile(output)
gpt.WriteProtectiveMBR(output, create=True)
logging.info('Creating new image file as %s M...', new_size / MEGABYTE)
new_state = Partition(output, PART_CROS_STATEFUL)
old_state = Partition(images[0], PART_CROS_STATEFUL)
old_state.Copy(new_state, check_equal=False)
logging.debug('Maximize stateful file system...')
new_state.ResizeFileSystem()
with new_state.Mount(rw=True) as stateful:
payloads_dir = os.path.join(stateful, DIR_CROS_PAYLOADS)
board_list = []
board_set = set() # To detect duplicated boards.
board_index = 0
for i, src_path in enumerate(images):
print('Copying %s root/kernel partitions...' % src_path)
with Partition(src_path, PART_CROS_STATEFUL).Mount() as src_dir:
src_metadata = _ReadRMAMetadata(src_dir)
for board_info in src_metadata:
print('Found board: %s' % board_info.board)
new_kernel = board_index * 2 + PART_CROS_KERNEL_A
new_rootfs = board_index * 2 + PART_CROS_ROOTFS_A
board_index += 1
Partition(src_path, board_info.kernel).Copy(
Partition(output, new_kernel))
Partition(src_path, board_info.rootfs).Copy(
Partition(output, new_rootfs))
if board_info.board in board_set:
logging.warning('Duplicated board: %r', board_info.board)
board_set.add(board_info.board)
board_info.kernel = new_kernel
board_info.rootfs = new_rootfs
board_list.append(board_info)
if i == 0:
continue
print('Copying %s stateful resources...' % src_path)
Sudo('cp -pr %s/* %s/.' %
(os.path.join(src_dir, DIR_CROS_PAYLOADS), payloads_dir))
_WriteRMAMetadata(stateful, board_list)
@staticmethod
def GetKernelVersion(image_path):
raw_output = Shell(['file', image_path], output=True)
versions = (line.strip().partition(' ')[2] for line in raw_output.split(',')
if line.startswith(' version'))
return next(versions, 'Unknown')
@staticmethod
def GetFirmwareVersion(image_path):
with open(image_path) as f:
fw_image = fmap.FirmwareImage(f.read())
ro = fw_image.get_section('RO_FRID').strip('\xff').strip('\0')
for rw_name in ['RW_FWID', 'RW_FWID_A']:
if fw_image.has_section(rw_name):
rw = fw_image.get_section(rw_name).strip('\xff').strip('\0')
break
else:
raise RuntimeError('Unknown RW firmware version in %s' % image_path)
return {'ro': ro, 'rw': rw}
@staticmethod
def GetFirmwareUpdaterVersion(updater):
if not updater:
return {}
with SysUtils.TempDirectory() as extract_dir:
Shell([updater, '--sb_extract', extract_dir], silent=True)
targets = {'main': 'bios.bin', 'ec': 'ec.bin'}
# TODO(hungte) Read VERSION.signer for signing keys.
results = {}
for target, image in targets.iteritems():
image_path = os.path.join(extract_dir, image)
if not os.path.exists(image_path):
continue
results[target] = ChromeOSFactoryBundle.GetFirmwareVersion(image_path)
return results
def GenerateTFTP(self, tftp_root):
"""Generates TFTP data in a given folder."""
with open(os.path.join(tftp_root, '..', 'dnsmasq.conf'), 'w') as f:
f.write(textwrap.dedent(
'''\
# This is a sample config, can be invoked by "dnsmasq -d -C FILE".
interface=eth2
tftp-root=/var/tftp
enable-tftp
dhcp-leasefile=/tmp/dnsmasq.leases
dhcp-range=192.168.200.50,192.168.200.150,12h
port=0'''))
tftp_server_ip = ''
if self.server_url:
tftp_server_ip = urlparse.urlparse(self.server_url).hostname
server_url_config = os.path.join(
tftp_root, 'omahaserver_%s.conf' % self.board)
with open(server_url_config, 'w') as f:
f.write(self.server_url)
cmdline_sample = os.path.join(
tftp_root, 'chrome-bot', self.board, 'cmdline.sample')
with open(cmdline_sample, 'w') as f:
config = (
'lsm.module_locking=0 cros_netboot_ramfs cros_factory_install '
'cros_secure cros_netboot earlyprintk cros_debug loglevel=7 '
'console=ttyS2,115200n8')
if tftp_server_ip:
config += ' tftpserverip=%s' % tftp_server_ip
f.write(config)
def CreateNetbootFirmware(self, src_path, dest_path):
parser = argparse.ArgumentParser()
netboot_firmware_settings.DefineCommandLineArgs(parser)
# This comes from sys-boot/chromeos-bootimage: ${PORTAGE_USER}/${BOARD_USE}
tftp_board_dir = 'chrome-bot/%s' % self.board
args = [
'--argsfile', os.path.join(tftp_board_dir, 'cmdline'),
'--bootfile', os.path.join(tftp_board_dir, 'vmlinuz'),
'--input', src_path,
'--output', dest_path]
if self.server_url:
args += [
'--factory-server-url', self.server_url,
'--tftpserverip', urlparse.urlparse(self.server_url).hostname]
netboot_firmware_settings.NetbootFirmwareSettings(parser.parse_args(args))
@staticmethod
def GetImageVersion(image):
if not image:
return 'N/A'
part = Partition(image, PART_CROS_ROOTFS_A)
with part.MountAsCrOSRootfs() as rootfs:
lsb_path = os.path.join(rootfs, 'etc', 'lsb-release')
return LSBFile(lsb_path).GetChromeOSVersion(remove_timestamp=False)
def GetToolkitVersion(self, toolkit=None):
return Shell([toolkit or self.toolkit, '--lsm'], output=True).strip()
def CreateBundle(self, output_dir, phase, notes, timestamp=None):
"""Creates a bundle from given resources."""
def FormatFirmwareVersion(info):
if not info:
return 'N/A'
if info['ro'] == info['rw']:
return info['ro']
return 'RO: %s, RW: %s' % (info['ro'], info['rw'])
def AddResource(dir_name, resources_glob, do_copy=False):
"""Adds resources to specified sub directory under bundle_dir.
Returns the path of last created resource.
"""
if not resources_glob:
return None
resources = glob.glob(resources_glob)
if not resources:
raise RuntimeError('Cannot find resource: %s' % resources_glob)
resource_dir = os.path.join(bundle_dir, dir_name)
if not os.path.exists(resource_dir):
os.makedirs(resource_dir)
dest_path = None
for resource in resources:
dest_name = os.path.basename(resource)
# Many files downloaded from CPFE or GoldenEye may contain '%2F' in its
# name and we want to remove them.
strip = dest_name.rfind('%2F')
if strip >= 0:
# 3 as len('%2F')
dest_name = dest_name[strip + 3:]
dest_path = os.path.join(resource_dir, dest_name)
if do_copy:
shutil.copy(resource, dest_path)
else:
os.symlink(os.path.abspath(resource), dest_path)
return dest_path
if timestamp is None:
timestamp = time.strftime('%Y%m%d%H%M')
bundle_name = '%s_%s_%s' % (self.board, timestamp, phase)
output_name = 'factory_bundle_%s.tar.bz2' % bundle_name
bundle_dir = os.path.join(self._temp_dir, 'bundle')
os.mkdir(bundle_dir)
part = Partition(self.release_image, PART_CROS_ROOTFS_A)
release_firmware_updater = part.CopyFile(
PATH_CROS_FIRMWARE_UPDATER, self._temp_dir, fs_type=FS_TYPE_CROS_ROOTFS)
# The 'vmlinuz' may be in netboot/ folder (factory zip style) or
# netboot/tftp/chrome-bot/$BOARD/vmlinuz (factory bundle style).
netboot_vmlinuz = None
has_tftp = False
if self.netboot:
netboot_vmlinuz = os.path.join(self.netboot, 'vmlinuz')
if not os.path.exists(netboot_vmlinuz):
netboot_vmlinuz = os.path.join(
self.netboot, 'tftp', 'chrome-bot', self.board, 'vmlinuz')
has_tftp = True
readme_path = os.path.join(bundle_dir, 'README.md')
with open(readme_path, 'w') as f:
fw_ver = self.GetFirmwareUpdaterVersion(self.firmware)
fsi_fw_ver = self.GetFirmwareUpdaterVersion(release_firmware_updater)
info = [
('Board', self.board),
('Bundle', '%s (created by %s)' % (
bundle_name, os.environ.get('USER', 'unknown'))),
('Factory toolkit', self.GetToolkitVersion()),
('Test image', self.GetImageVersion(self.test_image)),
('Factory shim', self.GetImageVersion(self.factory_shim)),
('AP firmware', FormatFirmwareVersion(fw_ver.get('main'))),
('EC firmware', FormatFirmwareVersion(fw_ver.get('ec'))),
('Release (FSI)', self.GetImageVersion(self.release_image)),
]
if fsi_fw_ver != fw_ver:
info += [
('FSI AP firmware', FormatFirmwareVersion(fsi_fw_ver.get('main'))),
('FSI EC firmware', FormatFirmwareVersion(fsi_fw_ver.get('ec')))]
if self.netboot:
info += [
('Netboot firmware', FormatFirmwareVersion(self.GetFirmwareVersion(
os.path.join(self.netboot, 'image.net.bin')))),
('Netboot kernel', self.GetKernelVersion(netboot_vmlinuz))]
info += [('Factory server URL', self.server_url or 'N/A')]
key_len = max(len(k) for (k, v) in info)
f.write(textwrap.dedent(
'''\
# Chrome OS Factory Bundle
%s
## Additional Notes
%s
''') % ('\n'.join('- %-*s%s' % (key_len + 2, k + ':', v)
for (k, v) in info), notes))
Shell(['cat', readme_path])
output_path = os.path.join(output_dir, output_name)
AddResource('toolkit', self.toolkit)
AddResource('release_image', self.release_image)
AddResource('test_image', self.test_image)
AddResource('firmware', self.firmware)
AddResource('complete', self.complete)
AddResource('hwid', self.hwid)
if self.server_url:
shim_path = AddResource('factory_shim', self.factory_shim, do_copy=True)
with Partition(shim_path, PART_CROS_STATEFUL).Mount(rw=True) as stateful:
logging.info('Patching factory_shim lsb-factory file...')
lsb = LSBFile(os.path.join(stateful, 'dev_image', 'etc', 'lsb-factory'))
lsb.SetValue('CHROMEOS_AUSERVER', self.server_url)
lsb.SetValue('CHROMEOS_DEVSERVER', self.server_url)
lsb.Install(lsb.GetPath())
else:
AddResource('factory_shim', self.factory_shim)
if self.setup_dir:
AddResource('setup', os.path.join(self.setup_dir, '*'))
if self.netboot:
os.mkdir(os.path.join(bundle_dir, 'netboot'))
self.CreateNetbootFirmware(
os.path.join(self.netboot, 'image.net.bin'),
os.path.join(bundle_dir, 'netboot', 'image.net.bin'))
if has_tftp:
AddResource('netboot', os.path.join(self.netboot, 'tftp'))
else:
AddResource('netboot/tftp/chrome-bot/%s' % self.board, netboot_vmlinuz)
self.GenerateTFTP(os.path.join(bundle_dir, 'netboot', 'tftp'))
Shell(['tar', '-I', SysUtils.FindBZip2(), '-chvf', output_path,
'-C', bundle_dir, '.'])
# Print final results again since tar may have flood screen output.
Shell(['cat', readme_path])
return output_path
# TODO(hungte) Generalize this (copied from py/tools/factory.py) for all
# commands to utilize easily.
class SubCommand(object):
"""A subcommand.
Properties:
name: The name of the command (set by the subclass).
parser: The ArgumentParser object.
subparser: The subparser object created with parser.add_subparsers.
subparsers: A collection of all subparsers.
args: The parsed arguments.
"""
name = None # Overridden by subclass
aliases = [] # Overridden by subclass
parser = None
args = None
subparser = None
subparsers = None
def __init__(self, parser, subparsers):
assert self.name
self.parser = parser
self.subparsers = subparsers
subparser = subparsers.add_parser(
self.name, help=self.__doc__.splitlines()[0],
description=self.__doc__)
subparser.set_defaults(subcommand=self)
self.subparser = subparser
def Init(self):
"""Initializes the subparser.
May be implemented the subclass, which may use "self.subparser" to
refer to the subparser object.
"""
pass
def Run(self):
"""Runs the command.
Must be implemented by the subclass.
"""
raise NotImplementedError
class HelpCommand(SubCommand):
"""Get help on COMMAND"""
name = 'help'
def Init(self):
self.subparser.add_argument('command', metavar='COMMAND', nargs='?')
def Run(self):
if self.args.command:
choice = self.subparsers.choices.get(self.args.command)
if not choice:
sys.exit('Unknown subcommand %r' % self.args.command)
choice.print_help()
else:
self.parser.print_help()
class MountPartitionCommand(SubCommand):
"""Mounts a partition from Chromium OS disk image.
Chrome OS rootfs with rootfs verification turned on will be mounted as
read-only. All other file systems will be mounted as read-write."""
name = 'mount'
aliases = ['mount_partition']
def Init(self):
self.subparser.add_argument(
'-rw', '--rw', action='store_true',
help='mount partition read/write')
self.subparser.add_argument(
'-ro', '--ro', dest='rw', action='store_false',
help='mount partition read-only')
self.subparser.add_argument(
'image', type=ArgTypes.ExistsPath,
help='path to the Chromium OS image')
self.subparser.add_argument(
'partition_number', type=int,
help='which partition (1-based) to mount')
self.subparser.add_argument(
'mount_point', type=ArgTypes.ExistsPath,
help='the path to mount partition')
def Run(self):
part = Partition(self.args.image, self.args.partition_number)
mode = ''
rw = True
silent = True
try_ro = True
if self.args.rw is not None:
rw = self.args.rw
silent = False
try_ro = False
try:
with part.Mount(self.args.mount_point, rw=rw, auto_umount=False,
silent=silent):
mode = 'RW' if rw else 'RO'
except subprocess.CalledProcessError:
if not try_ro:
raise
logging.debug('Failed mounting %s, try again as ro/ext2...', part)
with part.MountAsCrOSRootfs(self.args.mount_point, auto_umount=False):
mode = 'RO'
print('OK: Mounted %s as %s on %s.' % (part, mode, self.args.mount_point))
class GetFirmwareCommand(SubCommand):
"""Extracts firmware updater from a Chrome OS disk image."""
# Only Chrome OS disk images should have firmware updater, not Chromium OS.
name = 'get_firmware'
aliases = ['extract_firmware_updater']
def Init(self):
self.subparser.add_argument(
'-i', '--image', type=ArgTypes.ExistsPath, required=True,
help='path to the Chrome OS (release) image')
self.subparser.add_argument(
'-o', '--output_dir', default='.',
help='directory to save output file(s)')
def Run(self):
part = Partition(self.args.image, PART_CROS_ROOTFS_A)
output = part.CopyFile(PATH_CROS_FIRMWARE_UPDATER, self.args.output_dir,
fs_type=FS_TYPE_CROS_ROOTFS)
print('OK: Extracted %s:%s to: %s' % (
part, PATH_CROS_FIRMWARE_UPDATER, output))
class NetbootFirmwareSettingsCommand(SubCommand):
"""Access Chrome OS netboot firmware (image.net.bin) settings."""
name = 'netboot'
aliases = ['netboot_firmware_settings']
def Init(self):
netboot_firmware_settings.DefineCommandLineArgs(self.subparser)
def Run(self):
netboot_firmware_settings.NetbootFirmwareSettings(self.args)
class GPTCommand(SubCommand):
"""Access GPT (GUID Partition Table) with `cgpt` style commands."""
name = 'gpt'
aliases = ['pygpt', 'cgpt']
gpt = None
def Init(self):
self.gpt = pygpt.GPTCommands()
self.gpt.DefineArgs(self.subparser)
def Run(self):
self.gpt.Execute(self.args)
class ResizeFileSystemCommand(SubCommand):
"""Changes file system size from a partition on a Chromium OS disk image."""
name = 'resize'
aliases = ['resize_image_fs']
def Init(self):
self.subparser.add_argument(
'-i', '--image', type=ArgTypes.ExistsPath, required=True,
help='path to the Chromium OS disk image')
self.subparser.add_argument(
'-p', '--partition_number', type=int, default=1,
help='file system on which partition to resize')
self.subparser.add_argument(
'-s', '--size_mb', type=int, default=1024,
help='file system size to change (set or add, see --append) in MB')
self.subparser.add_argument(
'-a', '--append', dest='append', action='store_true', default=True,
help='append (increase) file system by +size_mb')
self.subparser.add_argument(
'--no-append', dest='append', action='store_false',
help='set file system to a new size of size_mb')
def Run(self):
part = Partition(self.args.image, self.args.partition_number)
curr_size = part.GetFileSystemSize()
if self.args.append:
new_size = curr_size + self.args.size_mb * MEGABYTE
else:
new_size = self.args.size_mb * MEGABYTE
if new_size > part.size:
raise RuntimeError(
'Requested size (%s MB) larger than %s partition (%s MB).' % (
new_size / MEGABYTE, part, part.size / MEGABYTE))
new_size = part.ResizeFileSystem(new_size)
print('OK: %s file system has been resized from %s to %s MB.' %
(part, curr_size / MEGABYTE, new_size / MEGABYTE))
class CreatePreflashImageCommand(SubCommand):
"""Create a disk image for factory to pre-flash into internal storage.
The output contains factory toolkit, release and test images.
The manufacturing line can directly dump this image to device boot media
(eMMC, SSD, NVMe, ... etc) using 'dd' command or copy machines.
"""
name = 'preflash'
def Init(self):
ChromeOSFactoryBundle.DefineBundleArguments(
self.subparser, ChromeOSFactoryBundle.PREFLASH)
self.subparser.add_argument(
'--sectors', type=int, default=31277232,
help=('size of image in sectors (see --sector-size). '
'default: %(default)s'))
self.subparser.add_argument(
'--sector-size', type=int, default=DEFAULT_BLOCK_SIZE,
help='size of each sector. default: %(default)s')
self.subparser.add_argument(
'--stateful_free_space', type=int, default=1024,
help=('extra space to claim in stateful partition in MB. '
'default: %(default)s'))
self.subparser.add_argument(
'-o', '--output', required=True,
help='path to the output disk image file.')
def Run(self):
with SysUtils.TempDirectory(prefix='diskimg_') as temp_dir:
bundle = ChromeOSFactoryBundle(
temp_dir=temp_dir,
board='default',
release_image=self.args.release_image,
test_image=self.args.test_image,
toolkit=self.args.toolkit,
factory_shim=None,
enable_firmware=False,
hwid=self.args.hwid,
complete=None)
new_size = bundle.CreateDiskImage(
self.args.output, self.args.sectors, self.args.sector_size,
self.args.stateful_free_space, self.args.verbose)
print('OK: Generated pre-flash disk image at %s [%s G]' % (
self.args.output, new_size / GIGABYTE_STORAGE))
class CreateRMAImageCommmand(SubCommand):
"""Create an RMA image for factory to boot from USB and repair device.
The output is a special factory install shim (factory_install) with all
resources (release, test images and toolkit). The manufacturing line or RMA
centers can boot it from USB and install all factory software bits into
a device.
"""
name = 'rma-create'
aliases = ['rma']
def Init(self):
ChromeOSFactoryBundle.DefineBundleArguments(
self.subparser, ChromeOSFactoryBundle.RMA)
self.subparser.add_argument(
'-o', '--output', required=True,
help='path to the output RMA image file')
def Run(self):
# TODO(hungte) always print bundle info (what files have been found)
with SysUtils.TempDirectory(prefix='rma_') as temp_dir:
bundle = ChromeOSFactoryBundle(
temp_dir=temp_dir,
board=self.args.board,
release_image=self.args.release_image,
test_image=self.args.test_image,
toolkit=self.args.toolkit,
factory_shim=self.args.factory_shim,
enable_firmware=self.args.enable_firmware,
firmware=self.args.firmware,
hwid=self.args.hwid,
complete=self.args.complete)
bundle.CreateRMAImage(self.args.output)
print('OK: Generated %s RMA image at %s' %
(bundle.board, self.args.output))
class MergeRMAImageCommand(SubCommand):
"""Merge multiple RMA images into one single large image."""
name = 'rma-merge'
aliases = ['merge_rma']
def Init(self):
self.subparser.add_argument(
'-f', '--force', action='store_true',
help='Overwrite existing output image file.')
self.subparser.add_argument(
'-o', '--output', required=True,
help='Path to the merged output image.')
self.subparser.add_argument(
'-i', '--images', required=True, nargs='+',
type=ArgTypes.ExistsPath,
help='Path to input RMA images')
def Run(self):
"""Merge multiple RMA (USB installation) disk images.
The RMA images should be created by 'image_tool rma' command, with different
board names.
"""
output = self.args.output
if os.path.exists(output) and not self.args.force:
raise RuntimeError(
'Output already exists (add -f to overwrite): %s' % output)
if len(self.args.images) < 2:
raise RuntimeError('Need > 1 input image files to merge.')
print('Scanning %s input image files...' % len(self.args.images))
ChromeOSFactoryBundle.MergeRMAImage(self.args.output, self.args.images)
print('OK: Merged successfully in new image: %s' % output)
class ShowRMAImageCommand(SubCommand):
"""Show the content of a RMA image."""
name = 'rma-show'
def Init(self):
self.subparser.add_argument(
'rma_image', type=ArgTypes.ExistsPath,
help='the path to mount partition')
def Run(self):
ChromeOSFactoryBundle.ShowRMAImage(self.args.rma_image)
class CreateBundleCommand(SubCommand):
"""Creates a factory bundle from given arguments."""
name = 'bundle'
def Init(self):
ChromeOSFactoryBundle.DefineBundleArguments(
self.subparser, ChromeOSFactoryBundle.BUNDLE)
self.subparser.add_argument(
'-o', '--output_dir', default='.',
help='directory for the output factory bundle file')
self.subparser.add_argument(
'--timestamp',
help='override the timestamp field in output file name')
self.subparser.add_argument(
'-n', '--notes',
help='additional notes or comments for bundle release')
def Run(self):
with SysUtils.TempDirectory(prefix='bundle_') as temp_dir:
bundle = ChromeOSFactoryBundle(
temp_dir=temp_dir,
board=self.args.board,
release_image=self.args.release_image,
test_image=self.args.test_image,
toolkit=self.args.toolkit,
factory_shim=self.args.factory_shim,
enable_firmware=self.args.enable_firmware,
firmware=self.args.firmware,
hwid=self.args.hwid,
complete=self.args.complete,
netboot=self.args.netboot,
setup_dir=self.args.setup_dir,
server_url=self.args.server_url)
output_file = bundle.CreateBundle(
self.args.output_dir, self.args.phase, self.args.notes,
timestamp=self.args.timestamp)
print('OK: Created %s factory bundle: %s' % (bundle.board, output_file))
class CreateDockerImageCommand(SubCommand):
"""Create a Docker image from existing Chromium OS disk image.
The architecture of the source Chromium OS disk image should be the same as
the docker host (basically, amd64).
"""
name = 'docker'
def Init(self):
self.subparser.add_argument(
'-i', '--image', type=ArgTypes.ExistsPath, required=True,
help='path to the Chromium OS image')
def _CreateDocker(self, image, root):
"""Creates a docker image from prepared rootfs and stateful partition.
Args:
image: a path to raw input image.
root: a path to prepared (mounted) Chromium OS disk image.
"""
logging.debug('Checking image board and version...')
lsb_data = LSBFile(os.path.join(root, 'etc', 'lsb-release'))
board = lsb_data.GetChromeOSBoard()
version = lsb_data.GetChromeOSVersion()
if not board or not version:
raise RuntimeError('Input image does not have proper Chromium OS '
'board [%s] or version [%s] info.' % (board, version))
docker_name = 'cros/%s_test:%s' % (board, version)
docker_tag = 'cros/%s_test:%s' % (board, 'latest')
print('Creating Docker image as %s ...' % docker_name)
# Use pv if possible. It may be hard to estimate the real size of files in
# mounted folder so we will use 2/3 of raw disk image - which works on most
# test images.
try:
pv = '%s -s %s' % (SysUtils.FindCommand('pv'),
os.path.getsize(image) / 3 * 2)
except Exception:
pv = 'cat'
Sudo('tar -C "%s" -c . | %s | docker import - "%s"' %
(root, pv, docker_name))
Sudo(['docker', 'tag', docker_name, docker_tag])
return docker_name
def Run(self):
rootfs_part = Partition(self.args.image, PART_CROS_ROOTFS_A)
state_part = Partition(self.args.image, PART_CROS_STATEFUL)
with state_part.Mount() as state:
with rootfs_part.MountAsCrOSRootfs() as rootfs:
Sudo(['mount', '--bind', os.path.join(state, 'var_overlay'),
os.path.join(rootfs, 'var')])
Sudo(['mount', '--bind', os.path.join(state, 'dev_image'),
os.path.join(rootfs, 'usr', 'local')])
docker_name = self._CreateDocker(self.args.image, rootfs)
print('OK: Successfully built docker image [%s] from %s.' %
(docker_name, self.args.image))
class EditLSBCommand(SubCommand):
"""Edit contents of 'lsb-factory' file from a factory_install image."""
name = 'edit_lsb'
old_data = ''
lsb = None
def Init(self):
self.subparser.add_argument(
'-i', '--image', type=ArgTypes.ExistsPath, required=True,
help='Path to the factory_install image.')
def _DoURL(self, title, keys, default_port=8080, suffix=''):
host = raw_input('Enter %s host: ' % title).strip()
if not host:
return
port = raw_input('Enter port (default=%s): ' % default_port).strip()
if not port:
port = str(default_port)
url = 'http://%s:%s%s' % (host, port, suffix)
for key in keys:
self.lsb.SetValue(key, url)
def _DoOptions(self, title, key, options):
print('%s (%s):' % (title, key))
for i, value in enumerate(options):
print('(%s) %s' % (i + 1, value))
while True:
answer = raw_input(
'Please select an option [1-%d]: ' % len(options)).strip().lower()
try:
selected = int(answer)
if not 0 < selected <= len(options):
raise ValueError('out of range')
except ValueError:
print('Invalid option: %s' % answer)
continue
break
new_value = options[selected - 1]
self.lsb.SetValue(key, new_value)
return new_value
def _DoOptionalNumber(self, title, key, min_value, max_value):
print('%s (%s): ' % (title, key))
while True:
prompt = 'Enter a number%s or empty to remove this setting: ' % (
'' if min_value is None else (
' in [%s, %s]' % (min_value, max_value)))
answer = raw_input(prompt).strip()
if not answer:
self.lsb.DeleteValue(key)
return None
try:
selected = int(answer)
if min_value is not None and not min_value <= selected <= max_value:
raise ValueError('out of range')
except ValueError:
print('Invalid option: %s' % answer)
continue
break
self.lsb.SetValue(key, str(selected))
return selected
def EditServerAddress(self):
"""Modify Chrome OS Factory Server address."""
self._DoURL(
'Chrome OS Factory Server', ['CHROMEOS_AUSERVER', 'CHROMEOS_DEVSERVER'],
suffix='/update')
def EditBoardPrompt(self):
"""Enable/disable board prompt on download."""
answer = raw_input('Enable (y) or disable (n) board prompt? ').lower()
while not answer.strip() in ['y', 'n']:
answer = raw_input('Please input "y" or "n": ').lower()
self.lsb.SetValue('USER_SELECT',
'true' if answer.strip() == 'y' else 'false')
def EditCutOff(self):
"""Modify cutoff method after factory reset.
All options are defined in src/platform/factory/sh/cutoff/options.sh
"""
answer = self._DoOptions(
'Select cutoff method after factory reset', 'CUTOFF_METHOD',
['shutdown', 'reboot', 'battery_cutoff', 'ectool_cutoff'])
if not answer.endswith('cutoff'):
return
answer = self._DoOptions(
'Select cutoff AC state', 'CUTOFF_AC_STATE',
['remove_ac', 'connect_ac'])
answer = self._DoOptionalNumber(
'Minimum allowed battery percentage', 'CUTOFF_BATTERY_MIN_PERCENTAGE',
0, 100)
self._DoOptionalNumber(
'Maximum allowed battery percentage', 'CUTOFF_BATTERY_MAX_PERCENTAGE',
0 if answer is None else answer, 100)
self._DoOptionalNumber(
'Minimum allowed battery voltage (mA)', 'CUTOFF_BATTERY_MIN_VOLTAGE',
None, None)
self._DoOptionalNumber(
'Maximum allowed battery voltage (mA)', 'CUTOFF_BATTERY_MAX_VOLTAGE',
None, None)
self._DoURL(
'Chrome OS Factory Server or Shopfloor Service for OQC ReFinalize',
['SHOPFLOOR_URL'])
def DoMenu(self, *args, **kargs):
redo_options = True
while True:
if redo_options:
print('=' * 72)
print(self.lsb.AsRawData())
print('-' * 72)
for i, arg in enumerate(args):
print('(%d) %s' % (i + 1, arg.__doc__.splitlines()[0]))
for k, v in kargs.iteritems():
print('(%s) %s' % (k, v.__doc__.splitlines()[0]))
print('=' * 72)
redo_options = False
answer = raw_input('Please select an option: ').strip().lower()
if answer.isdigit():
answer = int(answer)
if not 1 <= answer <= len(args):
print('Invalid option [%s].' % answer)
continue
selected = args[answer - 1]
elif answer not in kargs:
print('Invalid option [%s].' % answer)
continue
else:
selected = kargs.get(answer)
if selected():
return
redo_options = True
def Write(self):
"""Apply changes and exit."""
if self.old_data == self.lsb.AsRawData():
print('QUIT. No modifications.')
else:
self.lsb.Install(self.lsb.GetPath(), backup=True)
print('DONE. All changes saved properly.')
return True
def Quit(self):
"""Quit without saving changes."""
print('QUIT. No changes were applied.')
return True
def Run(self):
lsb_file = os.path.join('dev_image', 'etc', 'lsb-factory')
with Partition(self.args.image, PART_CROS_STATEFUL).Mount(rw=True) as state:
src_file = os.path.join(state, lsb_file)
if not os.path.exists(src_file):
raise RuntimeError(
'No %s file in disks image: %s. Please make sure you have '
'specified a factory_install image.' % (lsb_file, self.args.image))
self.lsb = LSBFile(src_file)
self.old_data = self.lsb.AsRawData()
self.DoMenu(self.EditServerAddress,
self.EditBoardPrompt,
self.EditCutOff,
w=self.Write,
q=self.Quit)
def main():
parser = argparse.ArgumentParser(
prog='image_tool',
description=(
'Tools to manipulate Chromium OS disk images for factory. '
'Use "image_tool help COMMAND" for more info on a '
'subcommand.'))
parser.add_argument('--verbose', '-v', action='count', default=0,
help='Verbose output')
subparsers = parser.add_subparsers(title='subcommands')
verb = sys.argv[1] if (len(sys.argv) > 1) else None
selected_command = None
for unused_key, v in sorted(globals().items()):
if v != SubCommand and inspect.isclass(v) and issubclass(v, SubCommand):
subcommand = v(parser, subparsers)
subcommand.Init()
if verb in subcommand.aliases:
selected_command = subcommand.name
if selected_command:
args = parser.parse_args([selected_command] + sys.argv[2:])
else:
args = parser.parse_args()
logging.basicConfig(level=logging.WARNING - args.verbose * 10)
args.subcommand.args = args
args.subcommand.Run()
if __name__ == '__main__':
main()