blob: 91f72a3ff295acadbff2f3f7457e1f0e70875cf5 [file] [log] [blame]
#!/usr/bin/python -B
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tools to finalize a factory bundle."""
from __future__ import print_function
import argparse
import glob
import logging
import os
import pipes
import re
import shutil
import sys
import time
import urlparse
import yaml
from distutils.version import LooseVersion
from pkg_resources import parse_version
import factory_common # pylint: disable=W0611
from chromite.lib import gs
from cros.factory.test.env import paths
from import build_board
from import get_version
from import gsutil
from import MakeUpdateBundle
from cros.factory.utils.file_utils import (
UnopenedTemporaryFile, CopyFileSkipBytes, TryUnlink, ExtractFile, Glob,
from cros.factory.utils import file_utils
from cros.factory.utils import sys_utils
from cros.factory.utils.process_utils import Spawn
from cros.factory.utils.sys_utils import MountPartition
from cros.factory.utils.type_utils import CheckDictKeys
# Default firmware labels to get version and update in README file.
# To override it, assign 'has_firmware' in MANIFEST.yaml.
# Examples:
# has_firmware: [BIOS]
# has_firmware: [EC, BIOS]
# has_firmware: [EC, BIOS, PD]
# Table for translating options from MANIFEST.yaml to lsb-factory.
'method': 'CUTOFF_METHOD',
'check_ac': 'CUTOFF_AC_STATE',
'min_battery_percent': 'CUTOFF_BATTERY_MIN_PERCENTAGE',
'max_battery_percent': 'CUTOFF_BATTERY_MAX_PERCENTAGE',
'min_battery_voltage': 'CUTOFF_BATTERY_MIN_VOLTAGE',
'max_battery_voltage': 'CUTOFF_BATTERY_MAX_VOLTAGE',
'shopfloor': 'SHOPFLOOR_URL'}
# Special string to use a local file instead of downloading one
# (see test_image_version).
LOCAL = 'local'
# Netboot install shims that we are using at the moment.
NETBOOT_SHIMS = ('vmlinux.uimg', 'vmlinux.bin')
def _GetReleaseVersion(mount_point):
"""Returns the release version of an image mounted at mount_point."""
result = get_version.GetReleaseVersion(mount_point)
if not result:
sys.exit('Unable to read lsb-release from %s' % mount_point)
return result
def _GetFirmwareVersions(updater, expected_firmwares):
"""Returns the firmware versions in an updater.
updater: Path to a firmware updater.
expected_firmwares: a list containing the expected firmware labels to
get version, ex: ['BIOS', EC'].
{'BIOS': bios_version, 'EC': ec_version, 'PD': pd_version}.
If the firmware is not found, the version will be None.
versions = get_version.GetFirmwareVersionsWithLabel(updater)
for label in expected_firmwares:
if versions.get(label) is None:
sys.exit('Unable to read %r version from chromeos-firmwareupdater' %
return versions
USAGE = """
Finalizes a factory bundle. This script checks to make sure that the
bundle is valid, outputs version information into the README file, and
tars up the bundle.
The bundle directory (the DIR argument) must have a MANIFEST.yaml file
like the following:
board: link
bundle_name: 20121115_pvt
# True to build a factory image based on the test image and a
# factory toolkit. (If false, the prebuilt factory image is used.)
use_factory_toolkit: true
# Use a particular test image version to build the factory image
# (applies only if use_factory_toolkit is true). This may be:
# - unspecified to use the same version as the factory toolkit
# - a particular version (e.g., 5123.0.0)
# - a GS URL (to a .tar.xz tarball)
# - the string 'local' to use the chromiumos_test_image.bin file
# already present in the bundle
test_image_version: 5123.0.0
# Files to download and add to the bundle.
- install_into: release
source: "gs://.../chromeos_recovery_image.bin"
- install_into: firmware
extract_files: [ec.bin, nv_image-link.bin]
source: 'gs://.../ChromeOS-firmware-...tar.bz2'
# Files to delete if present.
- install_shim/factory_install_shim.bin
# Files that are expected to be in the bundle.
- MANIFEST.yaml # This file!
- ...
The bundle must be in a directory named
factory_bundle_${board}_${self.bundle_name} (where board and self.bundle_name
are the same as above).
class FinalizeBundle(object):
"""Finalizes a factory bundle (see USAGE).
args: Command-line arguments from argparse.
bundle_dir: Path to the bundle directory.
bundle_name: Name of the bundle (e.g., 20121115_proto).
factory_image_path: Path to the factory image in the bundle.
build_board: The BuildBoard object for the board.
board: Board name (e.g., link).
simple_board: For board name like "base_variant", simple_board is "variant".
simple_board == board if board is not a variant board.
This name is used in firmware and hwid.
manifest: Parsed YAML manifest.
expected_files: List of files expected to be in the bundle (relative paths).
all_files: Set of files actually present in the bundle (relative paths).
readme_path: Path to the README file within the bundle.
factory_image_base_version: Build of the factory image (e.g., 3004.100.0)
release_image_path: Path to the release image.
install_shim_version: Build of the install shim.
netboot_install_shim_version: Build of the netboot install shim.
mini_omaha_script_path: Path to the script used to start the mini-Omaha
new_factory_par: Path to a replacement factory.par.
factory_toolkit_path: Path to the factory toolkit.
test_image_path: Path to the test image.
test_image_version: Version of the test image.
toolkit_version: Version of the factory toolkit.
gsutil: A GSUtil object.
args = None
bundle_dir = None
bundle_name = None
factory_image_path = None
build_board = None
board = None
simple_board = None
manifest = None
expected_files = None
all_files = None
readme_path = None
factory_image_base_version = None
install_shim_version = None
netboot_install_shim_version = None
release_image_path = None
mini_omaha_script_path = None
new_factory_par = None
factory_toolkit_path = None
test_image_path = None
test_image_version = None
toolkit_version = None
gsutil = None
has_firmware = DEFAULT_FIRMWARES
def Main(self):
if not sys_utils.InChroot():
sys.exit('Please run this script from within the chroot.')
def ParseArgs(self):
parser = argparse.ArgumentParser(
'--no-download', dest='download', action='store_false',
help="Don't download files from Google Storage (for testing only)")
'--no-updater', dest='updater', action='store_false',
help="Don't make an update bundle (for testing only)")
'--no-archive', dest='archive', action='store_false',
help="Don't make a tarball (for testing only)")
'--no-make-factory-packages', dest='make_factory_package',
help="Don't call make_factory_package (for testing only)")
'--no-check-files', dest='check_files',
help=("Don't check for missing or extra files in the bundle "
'(for testing only)'))
'--tip-of-branch', dest='tip_of_branch', action='store_true',
help='Use tip version of release image, install shim, and '
'netboot install shim on the branch (for testing only)')
'--test-list', dest='test_list', metavar='TEST_LIST',
help='Set active test_list. e.g. --test-list manual_smt to set active '
'test_list to test_list.manual_smt')
'--patch', action='store_true',
help=('Invoke patch_image before finalizing (requires '
'patch_image_args in MANIFEST.yaml)'))
help='Extra arguments for patch_image')
'dir', metavar='DIR',
help='Directory containing the bundle')
self.args = parser.parse_args()
self.bundle_dir = os.path.realpath(self.args.dir)
def LoadManifest(self):
yaml.add_constructor('!glob', Glob.Construct)
self.manifest = yaml.load(open(
os.path.join(self.args.dir, 'MANIFEST.yaml')))
self.manifest, ['board', 'bundle_name', 'add_files', 'delete_files',
'add_files_to_image', 'delete_files_from_image',
'site_tests', 'wipe_option', 'files', 'mini_omaha_url',
'patch_image_args', 'use_factory_toolkit',
'test_image_version', 'complete_script',
'has_firmware', 'external_presenter', 'cutoff_option'])
self.build_board = build_board.BuildBoard(self.manifest['board'])
self.board = self.build_board.full_name
self.simple_board = self.build_board.short_name
self.gsutil = gsutil.GSUtil(self.board)
self.bundle_name = self.manifest['bundle_name']
if not re.match(r'^\d{8}_', self.bundle_name):
sys.exit("The self.bundle_name (currently %r) should be today's date, "
'plus an underscore, plus a description of the build, e.g.: %r' %
(self.bundle_name, time.strftime('%Y%m%d_proto')))
expected_dir_name = 'factory_bundle_' + self.board + '_' + self.bundle_name
if expected_dir_name != os.path.basename(self.bundle_dir):
'bundle_name in manifest is %s, so directory name should be %s, '
'but it is %s' % (
self.bundle_name, expected_dir_name,
self.factory_image_path = os.path.join(
self.bundle_dir, 'factory_test', 'chromiumos_factory_image.bin')
# Get the version from the factory test image, or from the factory toolkit
# and test image if we will be using those
if self.manifest.get('use_factory_toolkit'):
# Try make directory here since 'factory_test' is removed due to
# deprecation of factory test image.
self.factory_toolkit_path = None
for path in ('factory_toolkit', 'factory_test'):
# On older factory branches factory toolkit is put in factory_test/
# directory. On ToT, we deprecated factory test image and factory
# toolkit is moved to factory_toolkit/.
self.factory_toolkit_path = os.path.join(
self.bundle_dir, path, '')
if os.path.isfile(self.factory_toolkit_path):
if not os.path.isfile(self.factory_toolkit_path):
raise Exception('Unable to find factory toolkit in the bundle')
output = Spawn([self.factory_toolkit_path, '--info'],
match = re.match(
r'^Identification: .+ Factory Toolkit (.+)$',
output, re.MULTILINE)
assert match, 'Unable to parse toolkit info: %r' % output
self.toolkit_version = # May be None if locally built'Toolkit version: %s', self.toolkit_version)
# Use test image version in the MANIFEST if specified; otherwise use
# the same one as the toolkit
self.test_image_version = self.manifest.get('test_image_version')
if self.test_image_version:'Test image version: %s, as specified in manifest',
if not self.toolkit_version:
raise Exception(
'Toolkit was built locally or by a tryjob; unable to '
'automatically determine which test image to download')
self.test_image_version = self.toolkit_version'Test image version: %s, same as the toolkit, since '
'no version was specified in the manifest',
self.test_image_path = os.path.join(
self.bundle_dir, 'factory_test', 'chromiumos_test_image.bin')
with MountPartition(self.factory_image_path, 3) as mount:
self.factory_image_base_version = _GetReleaseVersion(mount)'Factory image version: %s',
self.expected_files = set(map(self._SubstVars, self.manifest['files']))
self.readme_path = os.path.join(self.bundle_dir, 'README')
self.has_firmware = self.manifest.get('has_firmware', DEFAULT_FIRMWARES)
def CheckGSUtilVersion(self):
# Check for gsutil >= 3.32.
version = self.gsutil.gs_context.gsutil_version
# Remove 'pre...' string at the end, if any
version = re.sub('pre.*', '', version)
version_split = [int(x) for x in version.split('.')]
if version_split < REQUIRED_GSUTIL_VERSION:
'gsutil version >=%s is required; you seem to have %s.\n'
'Please download and install gsutil ('
', and '
'make sure this is in your PATH before the system gsutil.' % (
'.'.join(str(x) for x in REQUIRED_GSUTIL_VERSION), version))
def Download(self):
need_test_image = (
self.test_image_version and self.test_image_version != LOCAL)
if not 'add_files' in self.manifest and not need_test_image:
# Make sure gsutil is up to date; older versions are pretty broken.
if and need_test_image:
# We need to download the test image, since it is not included in the
# bundle.
channels = ['stable', 'beta', 'canary', 'dev']
if self.test_image_version.startswith('gs://'):
try_urls = [self.test_image_version]
try_urls = []
for channel in channels:
url = (
'%(version)s/ChromeOS-test-*-%(version)s-%(board)s.tar.xz' %
for url in try_urls:
try:'Looking for test image at %s', url)
output = self.gsutil.gs_context.LS(url)
except gs.GSNoSuchKey:
# Not found; try next channel
assert len(output) == 1, (
'Expected %r to matched 1 files, but it matched %r',
url, output)
# Found. Download it!
cached_file = self.gsutil.GSDownload(output[0].strip())
raise Exception('Unable to download test image from %r' % try_urls)
# Untar the test image into place
test_image_dir = os.path.dirname(self.test_image_path)'Extracting test image into %s...', test_image_dir)
Spawn(['tar', '-xvvf', cached_file, '-C', test_image_dir],
if not os.path.exists(self.test_image_path):
raise Exception('No test image at %s' % self.test_image_path)
for f in self.manifest['add_files']:
CheckDictKeys(f, ['install_into', 'source', 'extract_files'])
dest_dir = os.path.join(self.bundle_dir, f['install_into'])
if self.args.tip_of_branch:
f['source'] = self._SubstVars(f['source'])
source = self._SubstVars(f['source'])
cached_file = self.gsutil.GSDownload(source)
if f.get('extract_files'):
# Gets netboot install shim version from source url since version
# is not stored in the image.
install_into = os.path.join(self.bundle_dir, f['install_into'])
shims_to_extract = [x for x in f['extract_files']
if shims_to_extract:
self.netboot_install_shim_version = str(LooseVersion(
# Delete any existing vmlinux.uimg or vmlinux.bin to make sure we will
# not put any wrong file into the bundle, i.e. if we extract only
# vmlinux.uimg we should delete existing vmlinux.bin, and vice versa.
for path in shims_to_extract:
for shim in NETBOOT_SHIMS:
TryUnlink(os.path.join(install_into, os.path.dirname(path), shim))
ExtractFile(cached_file, install_into,
for f in f['extract_files']:
self.expected_files.add(os.path.relpath(os.path.join(install_into, f),
dest_path = os.path.join(dest_dir, os.path.basename(source))
shutil.copyfile(cached_file, dest_path)
self.expected_files.add(os.path.relpath(dest_path, self.bundle_dir))
def _ChangeTipVersion(self, add_file):
"""Changes image to the latest version for testing tip of branch.
Changes install shim, release image, netboot install shim (vmlinux.uimg or
vmlinux.bin) to the latest version of original branch for testing. Check
_GSGetLatestVersion for the detail of choosing the tip version on the
if add_file['install_into'] in ['factory_shim', 'release']:
latest_source = self._GSGetLatestVersion(add_file['source'])'Changing %s source from %s to %s for testing tip of branch',
add_file['install_into'], add_file['source'], latest_source)
self._CheckFileExistsOrDie(add_file['install_into'], latest_source)
add_file['source'] = latest_source
if (add_file.get('extract_files') and
any(any(shim in f for shim in NETBOOT_SHIMS)
for f in add_file['extract_files'])):
latest_source = self._GSGetLatestVersion(add_file['source'])'Changing netboot install shim source from %s to %s for '
'testing tip of branch', add_file['source'], latest_source)
self._CheckFileExistsOrDie('netboot install shim', latest_source)
add_file['source'] = latest_source
def _CheckFileExistsOrDie(self, image, url):
if not self._GSFileExists(url):
sys.exit(('The %s image source on tip of branch is not there, '
'%s does not exist. Perhaps buildbot is still building it '
'or build failed?' % (image, url)))
def _GSFileExists(self, url):
except gs.GSNoSuchKey:
return False
return True
def _GSGetLatestVersion(self, url):
"""Gets the latest version of image on the branch of url.
Finds the latest version of image on the branch specified in url.
This function only cares if input image is on master branch or not.
If input image has a zero minor version, it is on master branch.
For input image on master branch, the function returns the url of the
latest image on master branch. For input image not on master branch,
this function returns the url of the image with the same major version
and the largest minor version. For example, there are these images
4100.0.0 (On master branch)
4100.1.0 (Start of 4100.B branch)
4100.38.1 (Start of 4100.38.B branch)
4120.1.0 (Start of 4120.B branch)
Example input output Description
4100.0.0 4120.0.0 On master branch.
4100.1.0 4100.39.0 On 4100.B branch.
4100.38.0 4100.39.0 On 4100.B branch.
4100.38.5 4100.39.0 On 4100.38.B branch but we decide
4100.39.0 and 4100.38.6 should be
compared together as sub-branch of 4100.B.
4101.0.0 4120.0.0 On master branch.
4120.1.0 4120.2.0 On 4120.B branch.
# Use LooseVersion instead of StrictVersion because we want to preserve the
# trailing 0 like 4100.0.0 instead of truncating it to 4100.0.
version = LooseVersion(os.path.basename(os.path.dirname(url)))
parsed_version = parse_version(str(version))
major_version = parsed_version[0]
minor_version = parsed_version[1] if len(parsed_version) > 2 else None
board_directory = os.path.dirname(os.path.dirname(url))
version_url_list = self.gsutil.gs_context.LS(board_directory)
latest_version = version
for version_url in version_url_list:
version_url = version_url.rstrip('/')
candidate_version = LooseVersion(os.path.basename(version_url))
parsed_candidate_version = parse_version(str(candidate_version))
major_candidate_version = parsed_candidate_version[0]
minor_candidate_version = (parsed_candidate_version[1]
if len(parsed_candidate_version) > 2 else None)
if minor_version and major_candidate_version != major_version:
if not minor_version and minor_candidate_version:
if candidate_version > latest_version:
latest_version = candidate_version
return url.replace(str(version), str(latest_version))
def DeleteFiles(self):
if not 'delete_files' in self.manifest:
for f in self.manifest['delete_files']:
path = os.path.join(self.bundle_dir, f)
if os.path.exists(path):
def BuildFactoryImage(self):
if not self.manifest.get('use_factory_toolkit', False):
return'Creating %s image from %s...',
shutil.copyfile(self.test_image_path, self.factory_image_path)
# PYTHONPATH is set when finalize_bundle is executing from a par file.
# Remove PYTHONPATH variable when spawning factory toolkit installer
# since it cause factory toolkit to find the wrong module path.
exec_env = os.environ.copy()
if 'PYTHONPATH' in exec_env:
del exec_env['PYTHONPATH']
additional_args = []
if self.manifest.get('external_presenter', False):
additional_args = ['--no-enable-presenter']
Spawn([self.factory_toolkit_path, self.factory_image_path, '--yes'] +
additional_args, check_call=True, env=exec_env)
def PatchImage(self):
if not self.args.patch:
patch_image_args = self.manifest.get('patch_image_args')
if not patch_image_args:
sys.exit('--patch flag was specified, but MANIFEST.yaml has no '
factory_par_path = os.path.join(
'/build', self.manifest['board'],
'usr', 'local', 'factory', 'bundle', 'shopfloor', 'factory.par')
factory_par_time_old = os.stat(factory_par_path).st_mtime
patch_command = ([os.path.join(paths.FACTORY_PATH, 'bin', 'patch_image'),
'--input', self.factory_image_path,
'--output', 'IN_PLACE'] +
patch_image_args.split(' ') +
(self.args.patch_image_extra_args.split(' ')
if self.args.patch_image_extra_args else []))
Spawn(patch_command, log=True, check_call=True)
factory_par_time_new = os.stat(factory_par_path).st_mtime
if factory_par_time_old != factory_par_time_new:'%s has changed; will replace factory.par in bundle',
self.new_factory_par = factory_par_path
def ModifyFactoryImage(self):
add_files_to_image = self.manifest.get('add_files_to_image', [])
delete_files_from_image = self.manifest.get('delete_files_from_image', [])
if add_files_to_image or delete_files_from_image:
with MountPartition(self.factory_image_path, 1, rw=True) as mount:
for f in add_files_to_image:
dest_dir = os.path.join(mount, 'dev_image', f['install_into'])
Spawn(['mkdir', '-p', dest_dir], log=True, sudo=True, check_call=True)
Spawn(['cp', '-a', os.path.join(self.bundle_dir, f['source']),
dest_dir], log=True, sudo=True, check_call=True)
to_delete = []
for f in delete_files_from_image:
if isinstance(f, Glob):
path = os.path.join(mount, f)
if os.path.exists(path):
for f in sorted(to_delete):
# For every file x we delete, we'll leave a file called
# 'x_DELETED' so that it's clear why the file is missing.
# But we don't want to delete any of these marker files!
Spawn('echo Deleted by finalize_bundle > %s' %
pipes.quote(f + DELETION_MARKER_SUFFIX),
sudo=True, shell=True, log=True,
Spawn(['rm', '-rf', f], sudo=True, log=True)
# Write and delete a giant file full of zeroes to clear
# all the blocks that we've just deleted.
zero_file = os.path.join(mount, 'ZERO')
# This will fail eventually (no space left on device)
Spawn(['dd', 'if=/dev/zero', 'of=' + zero_file, 'bs=1M'],
sudo=True, call=True, log=True, ignore_stderr=True)
Spawn(['rm', zero_file], sudo=True, log=True, check_call=True)
# Removes unused site_tests
# suite_Factory must be preserved for /usr/local/factory/custom symlink.
site_tests = self.manifest.get('site_tests')
if site_tests is not None:
with MountPartition(self.factory_image_path, 1, rw=True) as mount:
site_tests_dir = os.path.join(mount, 'dev_image', 'autotest',
for name in os.listdir(site_tests_dir):
path = os.path.join(site_tests_dir, name)
if name not in site_tests:
Spawn(['rm', '-rf', path], log=True, sudo=True, check_call=True)
if self.args.test_list:'Setting active test_list to test_list.%s',
with MountPartition(self.factory_image_path, 1, rw=True) as mount:
test_list_py = os.path.join(mount, 'dev_image', 'factory', 'py', 'test',
'test_lists', '')
if os.path.isfile(test_list_py):'Using test_list v2 ACTIVE file')
active = os.path.join(mount, 'dev_image', 'factory', 'py', 'test',
'test_lists', 'ACTIVE')
with open(active, 'w') as f:
else:'Using test_list v1 active symlink')
test_list = 'test_list.%s' % self.args.test_list
active = os.path.join(mount, 'dev_image', 'factory', 'test_lists',
Spawn(['ln', '-sf', test_list, active], log=True, sudo=True,
def SetWipeOption(self):
wipe_option = self.manifest.get('wipe_option', [])
if wipe_option:
assert len(wipe_option) == 1, 'There should be one wipe_option.'
option = wipe_option[0]
assert option in ['shutdown', 'battery_cut_off', 'reboot']
# No need to write option if option is reboot.
if option == 'reboot':
with MountPartition(self.factory_image_path, 1, rw=True) as mount:
wipe_option_path = os.path.join(mount, 'factory_wipe_option')
WriteWithSudo(wipe_option_path, option)
logging.warn('wipe_option is deprecated, use in-place wipe and cutoff_option instead.')
def MakeUpdateBundle(self):
# Make the factory update bundle
if self.args.updater:
updater_path = os.path.join(
self.bundle_dir, 'shopfloor', 'shopfloor_data', 'update',
MakeUpdateBundle(self.factory_image_path, updater_path)
def UpdateNetbootURL(self):
"""Updates Omaha & TFTP servers' URL in netboot firmware.
It takes care of both uboot and depthcharge firmware, if presents.
mini_omaha_url = self.manifest.get('mini_omaha_url')
if not mini_omaha_url:
def UpdateUbootNetboot():
"""Updates Omaha & TFTP servers' URL in uboot netboot firmware."""
netboot_firmware_image = os.path.join(
self.bundle_dir, 'netboot_firmware',
'nv_image-%s.bin' % self.simple_board)
if os.path.exists(netboot_firmware_image):
update_firmware_vars = os.path.join(
self.bundle_dir, 'factory_setup', '')
new_netboot_firmware_image = netboot_firmware_image + '.INPROGRESS'
'-i', netboot_firmware_image,
'-o', new_netboot_firmware_image,
'--omahaserver=%s' % mini_omaha_url,
'--tftpserverip=%s' %
check_call=True, log=True)
shutil.move(new_netboot_firmware_image, netboot_firmware_image)
def UpdateDepthchargeNetboot():
"""Updates Omaha & TFTP servers' URL in depthcharge netboot firmware.
Also copy 'vmlinux.uimg', skips the first 64 bytes and stored it
as 'vmlinux.bin' if there's no existing 'vmlinux.bin' found.
netboot_firmware_image = os.path.join(
self.bundle_dir, 'netboot_firmware', '')
target_bootfile = 'vmlinux-%s.bin' % self.board
if os.path.exists(netboot_firmware_image):
update_firmware_settings = os.path.join(
self.bundle_dir, 'factory_setup', '')
new_netboot_firmware_image = netboot_firmware_image + '.INPROGRESS'
'--bootfile', target_bootfile,
'--input', netboot_firmware_image,
'--output', new_netboot_firmware_image,
'--omahaserver=%s' % mini_omaha_url,
'--tftpserverip=%s' %
check_call=True, log=True)
shutil.move(new_netboot_firmware_image, netboot_firmware_image)
target_netboot_shim = os.path.join(self.bundle_dir, 'factory_shim',
'netboot', 'vmlinux.bin')
if not os.path.exists(target_netboot_shim):
# Only generate 'vmlinux.bin' manually if it does not exist. If
# 'vmlinux.bin' is present (as changed by CL:195554), we will simply
# use it since it is already processed by
netboot_shim = os.path.join(self.bundle_dir, 'factory_shim',
'netboot', 'vmlinux.uimg')
if self.build_board.arch == 'arm':
# No special process needed for ARM-based boards; simply copy the
# file.
shutil.copyfile(netboot_shim, target_netboot_shim)
# If the board is not ARM-based, we need to copy 'vmlinux.uimg' to
# 'vmlinux.bin' and skip the first 64 bytes to strip uboot header.
# Keep both of the files so everyone can be aware of the difference.
CopyFileSkipBytes(netboot_shim, target_netboot_shim, 64)
# Finally, copy 'vmlinux.bin' to 'vmlinux-<board>.bin'.
renamed_netboot_shim = os.path.join(self.bundle_dir, 'factory_shim',
'netboot', target_bootfile)
shutil.copy(target_netboot_shim, renamed_netboot_shim)
def UpdateInstallShim(self):
mini_omaha_url = self.manifest.get('mini_omaha_url')
cutoff_option = self.manifest.get('cutoff_option', {})
if not mini_omaha_url and not cutoff_option:
def CheckCutoffOptions(cutoff_option):
CheckDictKeys(cutoff_option, CUTOFF_OPTION.keys())
if 'method' in cutoff_option:
assert cutoff_option['method'] in ['shutdown', 'reboot',
if 'check_ac' in cutoff_option:
assert cutoff_option['check_ac'] in ['remove_ac', 'connect_ac']
m = re.compile(r'.*_battery_.*')
for key, value in cutoff_option.iteritems():
if m.match(key):
assert isinstance(value, int)
def PatchLSBFactory(mount):
"""Patches lsb-factory in an image.
True if there were any changes.
lsb_factory_path = os.path.join(
mount, 'dev_image', 'etc', 'lsb-factory')'Patching lsb-factory in %s', lsb_factory_path)
orig_lsb_factory = open(lsb_factory_path).read()
lsb_factory = orig_lsb_factory
if mini_omaha_url:
lsb_factory, number_of_subs = re.subn(
r'(?m)^(CHROMEOS_(AU|DEV)SERVER=).+$', r'\1' + mini_omaha_url,
if number_of_subs != 2:
sys.exit('Unable to set mini-Omaha server in %s' % lsb_factory_path)
if cutoff_option:
for key, value in cutoff_option.iteritems():
if value:
arg = '%s=%s\n' % (CUTOFF_OPTION[key], str(value))
# Delete the argument if there is no value set.
arg = ''
lsb_factory, number_of_subs = re.subn(
r'(?m)^%s=.+\n' % CUTOFF_OPTION[key], arg, lsb_factory)
if number_of_subs == 0 and value:
lsb_factory += arg
if lsb_factory == orig_lsb_factory:
return False # No changes
WriteWithSudo(lsb_factory_path, lsb_factory)
return True
def PatchInstallShim(shim):
"""Patches lsb-factory and updates self.install_shim_version."""
def GetSigningKey(shim):
"""Derives signing key from factory install shim's file name."""
if shim.endswith('factory_install_shim.bin'):
return 'unsigned'
key_match ='channel_([\w\-]+)\.bin$', shim)
if key_match:
# Error deriving signing key
return 'undefined'
with MountPartition(shim, 1, rw=True) as mount:
with MountPartition(shim, 3) as mount:
self.install_shim_version = '%s (%s)' % (_GetReleaseVersion(mount),
if cutoff_option:
# Patch in the install shim, if present.
has_install_shim = False
unsigned_shim = os.path.join(self.bundle_dir, 'factory_shim',
if os.path.isfile(unsigned_shim):
has_install_shim = True
signed_shims = glob.glob(os.path.join(self.bundle_dir, 'factory_shim',
if has_install_shim and signed_shims:
sys.exit('Both unsigned and signed install shim exists. '
'Please remove unsigned one')
if len(signed_shims) > 1:
sys.exit('Expected to find 1 signed factory shim but found %d: %r' % (
len(signed_shims), signed_shims))
elif len(signed_shims) == 1:
has_install_shim = True
if not has_install_shim:
logging.warning('There is no install shim in the bundle.')
# Take care of the netboot initrd as well, if present.
netboot_image = os.path.join(self.bundle_dir, 'factory_shim',
'netboot', 'initrd.uimg')
if os.path.exists(netboot_image):
with UnopenedTemporaryFile(prefix='rootfs.') as rootfs:
with open(netboot_image) as netboot_image_in:
with open(rootfs, 'w') as rootfs_out:'Unpacking initrd rootfs')
['gunzip', '-c'],
stdin=netboot_image_in, stdout=rootfs_out, check_call=True)
with MountPartition(rootfs, rw=True) as mount:
lsb_factory_changed = PatchLSBFactory(
os.path.join(mount, 'mnt', 'stateful_partition'))
if lsb_factory_changed:
# Success! Zip it back up.
with UnopenedTemporaryFile(prefix='rootfs.') as rootfs_gz:
with open(rootfs_gz, 'w') as out:
Spawn(['pigz', '-9c', rootfs], stdout=out, log=True, call=True)
new_netboot_image = netboot_image + '.INPROGRESS'
Spawn(['mkimage', '-A', 'x86', '-O', 'linux', '-T', 'ramdisk',
'-a', '0x12008000', '-n', 'Factory Install RootFS',
'-C', 'gzip', '-d', rootfs_gz, new_netboot_image],
check_call=True, log=True)
shutil.move(new_netboot_image, netboot_image)
def MakeFactoryPackages(self):
release_images = glob.glob(os.path.join(self.bundle_dir, 'release/*.bin'))
if len(release_images) != 1:
sys.exit('Expected one release image but found %d' % len(release_images))
self.release_image_path = release_images[0]
factory_setup_dir = os.path.join(self.bundle_dir, 'factory_setup')
make_factory_package = [
'--board', self.board,
'--release', os.path.relpath(self.release_image_path,
'--factory', '../factory_test/chromiumos_factory_image.bin',
'--hwid_updater', '../hwid/' %
if 'complete_script' in self.manifest:
script_base_name = self.manifest['complete_script']
if script_base_name is None:
complete_script = None
complete_script = os.path.join(self.bundle_dir, script_base_name)
if not os.path.exists(complete_script):
raise OSError('Complete script %s does not exist' % complete_script)
# Use factory_setup/, if it exists
complete_script = os.path.join(
self.bundle_dir, 'factory_setup/')
if not os.path.exists(complete_script):
complete_script = None
if complete_script:
make_factory_package.extend(['--complete_script', complete_script])
firmware_updater = os.path.join(
self.bundle_dir, 'firmware', 'chromeos-firmwareupdate')
if os.path.exists(firmware_updater):
make_factory_package += [
'--firmware_updater', os.path.relpath(
firmware_updater, factory_setup_dir)]
if self.args.make_factory_package:
Spawn(make_factory_package, cwd=factory_setup_dir,
check_call=True, log=True)
# Build the mini-Omaha startup script.
self.mini_omaha_script_path = os.path.join(
self.bundle_dir, '')
if os.path.exists(self.mini_omaha_script_path):
with open(self.mini_omaha_script_path, 'w') as f:
'set -e', # Fail on error
'cd $(dirname $(readlink -f "$0"))/factory_setup',
'cat static/miniomaha.conf',
('echo Miniomaha configuration MD5SUM: '
'$(md5sum static/miniomaha.conf)'),
'echo Validating configuration...',
('python --validate_factory_config'),
'echo Starting download server.',
'' # Add newline at EOF
os.fchmod(f.fileno(), 0555)
def FixFactoryPar(self):
"""Fix symlinks to factory.par, and replace factory.par if necessary.
(Certain files may have been turned into real files by the buildbots.)
factory_par_path = os.path.join(self.bundle_dir,
'shopfloor', 'factory.par')
with open(factory_par_path) as f:
factory_par_data =
# Look for files that are identical copies of factory.par.
for root, _, files in os.walk(self.bundle_dir):
for f in files:
path = os.path.join(root, f)
if path == factory_par_path:
# Don't replace it with itself!
if (os.path.islink(path) or
os.path.getsize(path) != len(factory_par_data)):
# It's not a real file, or not the right size. Skip.
with open(path) as fobj:
data =
if data != factory_par_data:
# Data isn't the same. Skip.
# Replace the file with a symlink.'Replacing %s with a symlink', path)
if self.new_factory_par:'Copying %s to %s', self.new_factory_par, factory_par_path)
shutil.copy2(self.new_factory_par, factory_par_path)
def CheckFiles(self):
# Check that the set of files is correct
self.all_files = set()
for root, dirs, files in os.walk(self.bundle_dir):
for f in files:
# Remove backup files and compiled Python files.
if f.endswith('~') or f.endswith('.pyc'):
os.unlink(os.path.join(root, f))
os.path.relpath(os.path.join(root, f), self.bundle_dir))
for d in dirs:
# Remove any empty directories
except OSError:
if not self.args.check_files:'Skip files checking')
missing_files = self.expected_files - self.all_files
extra_files = self.all_files - self.expected_files
if missing_files:
logging.error('Missing files in bundle: %s',
' '.join(sorted(missing_files)))
logging.error("If the files really shouldn't be there, remove them from "
'the "files" section in MANIFEST.yaml')
if extra_files:
logging.error('Unexpected extra files in bundle: %s',
' '.join(sorted(extra_files)))
logging.error('If the files are really expected, '
'add them to the "files" section of MANIFEST.yaml')
if missing_files or extra_files:
sys.exit('Incorrect file set; terminating')
def UpdateReadme(self):
# Grok the README file; we'll be modifying it.
readme_sections = re.findall(
# Section header
r'(\*\*\*\n\*\n\* (.+?)\n\*\n\*\*\*\n)'
# Anything up to (but not including) the next section header
r'((?:(?!\*\*\*).)+)', open(self.readme_path).read(), re.DOTALL)
# This results in a list of tuples (a, b, c), where a is the whole
# section header string; b is the name of the section; and c is the
# contents of the section. Turn each tuple into a list; we'll be
# modifying some of them.
readme_sections = [list(x) for x in readme_sections]
readme_section_index = {} # Map of section name to index
for i, s in enumerate(readme_sections):
readme_section_index[s[1]] = i
if k not in readme_section_index:
sys.exit('README is missing %s section' % k)
# Make sure that the CHANGES section contains this version.
expected_str = '%s changes:' % self.bundle_name
if expected_str not in readme_sections[readme_section_index['CHANGES']][2]:
sys.exit('The string %r was not found in the CHANGES section. '
'Please add a section for it (if this is the first '
'version, just say "initial release").' % expected_str)
def _ExtractFirmwareVersions(updater_file, updater_name):
firmware_versions = _GetFirmwareVersions(updater_file, self.has_firmware)
return [('%s %s' % (updater_name, firmware_type), version)
for firmware_type, version in firmware_versions.iteritems()
if version is not None]
# Get some vital information
vitals = [
('Board', self.board),
('Bundle', '%s (created by %s, %s)' % (
self.bundle_name, os.environ['USER'],
time.strftime('%a %Y-%m-%d %H:%M:%S %z')))]
if self.factory_image_base_version:
vitals.append(('Factory image base', self.factory_image_base_version))
if self.toolkit_version:
vitals.append(('Factory toolkit', self.toolkit_version))
if self.test_image_version == LOCAL:
with MountPartition(self.factory_image_path, 3) as f:
vitals.append(('Test image', '%s (local)' % _GetReleaseVersion(f)))
elif self.test_image_version:
vitals.append(('Test image', self.test_image_version))
with MountPartition(self.factory_image_path, 1) as f:
vitals.append(('Factory updater MD5SUM', open(
os.path.join(f, 'dev_image/factory/MD5SUM')).read().strip()))
stat = os.statvfs(f)
stateful_free_bytes = stat.f_bfree * stat.f_bsize
stateful_total_bytes = stat.f_blocks * stat.f_bsize
'Stateful partition size',
'%d MiB (%d MiB free = %d%% free)' % (
stateful_total_bytes / 1024 / 1024,
stateful_free_bytes / 1024 / 1024,
int(stateful_free_bytes * 100.0 / stateful_total_bytes))))
'Stateful partition inodes',
'%d nodes (%d free)' % (stat.f_files, stat.f_ffree)))
if self.install_shim_version:
vitals.append(('Factory install shim', self.install_shim_version))
if self.netboot_install_shim_version:
vitals.append(('Netboot install shim (vmlinux.uimg/vmlinux.bin)',
with MountPartition(self.release_image_path, 3) as f:
vitals.append(('Release (FSI)', _GetReleaseVersion(f)))
fw_updater_file = os.path.join(f, 'usr/sbin/chromeos-firmwareupdate')
vitals.extend(_ExtractFirmwareVersions(fw_updater_file, 'Release (FSI)'))
# If we have any firmware in the tree, add them to the vitals.
firmwareupdates = []
for f in self.all_files:
path = os.path.join(self.bundle_dir, f)
if os.path.basename(f) == 'chromeos-firmwareupdate':
vitals.extend(_ExtractFirmwareVersions(path, f))
elif os.path.basename(f) == 'ec.bin':
version = get_version.GetFirmwareBinaryVersion(path)
if not version:
sys.exit('Unable to find EC version in %s' % path)
vitals.append((f, version))
elif any(os.path.basename(f).startswith(prefix)
for prefix in ('nv_image', '')):
version = get_version.GetFirmwareBinaryVersion(path)
if not version:
sys.exit('Unable to find BIOS version in %s' % path)
vitals.append((f, version))
vital_lines = []
max_key_length = max(len(k) for k, v in vitals)
for k, v in vitals:
vital_lines.append('%s:%s %s' % (k, ' ' * (max_key_length - len(k)), v))
vital_contents = '\n'.join(vital_lines)
readme_sections[readme_section_index['VITAL INFORMATION']][2] = (
vital_contents + '\n\n')
index = readme_section_index.get('MINI-OMAHA SERVER')
if index is not None:
instructions = [
'To start a mini-Omaha server:',
' ./'
readme_sections[index][2] = (
'\n'.join(instructions) + '\n\n')
with open(self.readme_path, 'w') as f:
for header, _, contents in readme_sections:
f.write(contents)'\n\nUpdated %s; vital information:\n%s\n',
self.readme_path, vital_contents)
def Archive(self):
if self.args.archive:
# Done! tar it up, and encourage the poor shmuck who has to build
# the bundle to take a little break.'Just works! Creating the tarball. '
'This will take a while... meanwhile, go get %s. '
'You deserve it.',
(['some rest'] * 5 +
['a cup of coffee'] * 7 +
['some lunch', 'some fruit'] +
['an afternoon snack'] * 2 +
['a beer'] * 8)[time.localtime().tm_hour])
for mini in [True, False]:
output_file = self.bundle_dir + ('.mini' if mini else '') + '.tar.bz2'
Spawn(['tar', '-cf', output_file,
'-I', 'pbzip2',
'-C', os.path.dirname(self.bundle_dir)] +
(['--exclude', '*.bin'] if mini else []) +
log=True, check_call=True)
'Created %s (%.1f GiB).',
output_file, os.path.getsize(output_file) / (1024. * 1024. * 1024.))'The README file (%s) has been updated. Make sure to check '
'that it is correct!', self.readme_path)
"IMPORTANT: If you modified the README or MANIFEST.yaml, don't forget "
'to check your changes into %s.',
'src', self.build_board.overlay_relpath,
'chromeos-base', 'chromeos-factory-board',
'files', 'bundle'))
def _SubstVars(self, input_str):
"""Substitutes variables into a string.
The following substitutions are made:
${BOARD} -> the simple board name (in uppercase)
${FACTORY_IMAGE_BASE_VERSION} -> the factory image version
subst_vars = {
'BOARD': self.simple_board.upper(),
'FACTORY_IMAGE_BASE_VERSION': (self.factory_image_base_version or
return re.sub(r'\$\{(\w+)\}', lambda match: subst_vars[],
if __name__ == '__main__':