blob: d37921b03a7f6b9f0678b95e6c62fa1c6f396760 [file] [log] [blame]
#!/usr/bin/python -B
#
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tools to finalize a factory bundle."""
from __future__ import print_function
import argparse
import contextlib
import errno
import glob
import logging
import os
import re
import shutil
import sys
import time
import urlparse
import yaml
from distutils.version import LooseVersion
import factory_common # pylint: disable=W0611
from cros.factory.tools import build_board
from cros.factory.tools import get_version
from cros.factory.tools import gsutil
from cros.factory.tools.make_update_bundle import MakeUpdateBundle
from cros.factory.utils.file_utils import (
TryUnlink, ExtractFile, WriteWithSudo)
from cros.factory.utils import file_utils
from cros.factory.utils import sys_utils
from cros.factory.utils.process_utils import Spawn
from cros.factory.utils.sys_utils import MountPartition
from cros.factory.utils.type_utils import CheckDictKeys
REQUIRED_GSUTIL_VERSION = [3, 32] # 3.32
DELETION_MARKER_SUFFIX = '_DELETED'
# Default firmware labels to get version and update in README file.
# To override it, assign 'has_firmware' in MANIFEST.yaml.
# Examples:
# has_firmware: [BIOS]
# has_firmware: [EC, BIOS]
# has_firmware: [EC, BIOS, PD]
DEFAULT_FIRMWARES = ['BIOS', 'EC']
# Special string to use a local file instead of downloading one.
LOCAL = 'local'
# Netboot install shims that we are using at the moment.
NETBOOT_SHIMS = ('vmlinuz', 'vmlinux.bin')
# Legacy: resources may live in different places due to historical reason. To
# maintain backward compatibility, we have to search for a set of directories.
# TODO(crbug.com/706756): once we completely remove the old directories, we can
# simple make this a string instead of a list of
# strings.
TEST_IMAGE_SEARCH_DIRS = ['test_image', 'factory_test']
RELEASE_IMAGE_SEARCH_DIRS = ['release_image', 'release']
TOOLKIT_SEARCH_DIRS = ['toolkit', 'factory_toolkit']
# When version is fixed, we'll try to find the resource in the following order.
RESOURCE_CHANNELS = ['stable', 'beta', 'dev', 'canary']
def _GetReleaseVersion(mount_point):
"""Returns the release version of an image mounted at mount_point."""
result = get_version.GetReleaseVersion(mount_point)
if not result:
sys.exit('Unable to read lsb-release from %s' % mount_point)
return result
def _GetFirmwareVersions(updater, expected_firmwares):
"""Returns the firmware versions in an updater.
Args:
updater: Path to a firmware updater.
expected_firmwares: a list containing the expected firmware labels to
get version, ex: ['BIOS', EC'].
Returns:
{'BIOS': bios_version, 'EC': ec_version, 'PD': pd_version}.
If the firmware is not found, the version will be None.
"""
versions = get_version.GetFirmwareVersionsWithLabel(updater)
for label in expected_firmwares:
if versions.get(label) is None:
sys.exit('Unable to read %r version from chromeos-firmwareupdater' %
label)
return versions
USAGE = """
Finalizes a factory bundle. This script checks to make sure that the
bundle is valid, outputs version information into the README file, and
tars up the bundle.
The input is a MANIFEST.yaml file like the following:
board: link
bundle_name: 20121115_pvt
# Specify the version of test image directly.
test_image: 9876.0.0
# Specify that a local release image should be used.
release_image: local
# Specify the version of factory toolkit directly.
toolkit: 9678.12.0
# Files to download and add to the bundle.
add_files:
- install_into: release
source: "gs://.../chromeos_recovery_image.bin"
- install_into: firmware
extract_files: [ec.bin, nv_image-link.bin]
source: 'gs://.../ChromeOS-firmware-...tar.bz2'
"""
class FinalizeBundle(object):
"""Finalizes a factory bundle (see USAGE).
Properties:
args: Command-line arguments from argparse.
bundle_dir: Path to the bundle directory.
bundle_name: Name of the bundle (e.g., 20121115_proto).
factory_image_path: Path to the factory image in the bundle.
build_board: The BuildBoard object for the board.
board: Board name (e.g., link).
simple_board: For board name like "base_variant", simple_board is "variant".
simple_board == board if board is not a variant board.
This name is used in firmware and hwid.
manifest: Parsed YAML manifest.
readme_path: Path to the README file within the bundle.
factory_image_base_version: Build of the factory image (e.g., 3004.100.0)
install_shim_version: Build of the install shim.
netboot_install_shim_version: Build of the netboot install shim.
mini_omaha_script_path: Path to the script used to start the mini-Omaha
server.
new_factory_par: Path to a replacement factory.par.
test_image_source: Source (LOCAL or a version) of the test image.
test_image_path: Path to the test image.
test_image_version: Version of the test image.
release_image_source : Source (LOCAL or a version) of the release image.
release_image_path: Path to the release image.
release_image_version: Version of the release image.
toolkit_source: Source (LOCAL or a version) of the factory toolkit.
toolkit_path: Path to the factory toolkit.
toolkit_version: Version of the factory toolkit.
gsutil: A GSUtil object.
"""
args = None
bundle_dir = None
bundle_name = None
factory_image_path = None
build_board = None
board = None
simple_board = None
manifest = None
readme_path = None
factory_image_base_version = None
install_shim_version = None
netboot_install_shim_version = None
mini_omaha_script_path = None
new_factory_par = None
test_image_source = None
test_image_path = None
test_image_version = None
release_image_source = None
release_image_path = None
release_image_version = None
toolkit_source = None
toolkit_path = None
toolkit_version = None
gsutil = None
has_firmware = DEFAULT_FIRMWARES
def Main(self):
self.ParseArgs()
self.LoadManifest()
self.LocateResources()
self.DownloadResources()
self.GetAndSetResourceVersions()
self.BuildFactoryImage()
self.UpdateNetbootURL()
self.UpdateInstallShim()
self.ModifyFactoryImage()
self.MakeUpdateBundle()
self.MakeFactoryPackages()
self.FixFactoryPar()
self.RemoveUnnecessaryFiles()
self.UpdateReadme()
self.Archive()
def ParseArgs(self):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=USAGE)
parser.add_argument(
'--no-download', dest='download', action='store_false',
help="Don't download files from Google Storage (for testing only)")
parser.add_argument(
'--no-updater', dest='updater', action='store_false',
help="Don't make an update bundle (for testing only)")
parser.add_argument(
'--no-archive', dest='archive', action='store_false',
help="Don't make a tarball (for testing only)")
parser.add_argument(
'--no-make-factory-packages', dest='make_factory_package',
action='store_false',
help="Don't call make_factory_package (for testing only)")
parser.add_argument(
'--test-list', dest='test_list', metavar='TEST_LIST',
help='Set active test_list. e.g. --test-list manual_smt to set active '
'test_list to test_list.manual_smt')
parser.add_argument('manifest', metavar='MANIFEST',
help='Path to the manifest file')
parser.add_argument('dir', metavar='DIR', nargs='?',
default=os.getcwd(), help='Working directory')
self.args = parser.parse_args()
def LoadManifest(self):
with open(self.args.manifest) as f:
self.manifest = yaml.load(f)
CheckDictKeys(self.manifest,
['board', 'bundle_name', 'add_files', 'add_files_to_image',
'mini_omaha_url', 'toolkit', 'test_image', 'release_image',
'complete_script', 'has_firmware', 'external_presenter'])
self.build_board = build_board.BuildBoard(self.manifest['board'])
self.board = self.build_board.full_name
self.simple_board = self.build_board.short_name
self.gsutil = gsutil.GSUtil(self.board)
self.bundle_name = self.manifest['bundle_name']
if not re.match(r'^\d{8}_', self.bundle_name):
sys.exit("The bundle_name (currently %r) should be today's date, "
'plus an underscore, plus a description of the build, e.g.: %r' %
(self.bundle_name, time.strftime('%Y%m%d_proto')))
# If the basename of the working directory is equal to the expected name, we
# believe that the user intentionally wants to make the bundle in the
# working directory (this is also needed if any of the resource is assigned
# as local). Otherwise, we'll create a directory with expected name under
# the working directory.
expected_dir_name = 'factory_bundle_%s_%s' % (self.board, self.bundle_name)
logging.info('Expected bundle directory name is %r', expected_dir_name)
if expected_dir_name == os.path.basename(self.args.dir):
self.bundle_dir = self.args.dir
logging.info('The working directory name matches the expected bundle '
'directory name, will finalized bundle directly in the '
'working directory %r', self.bundle_dir)
else:
self.bundle_dir = os.path.join(self.args.dir, expected_dir_name)
logging.info('The working directory name does not match the expected '
'bundle directory name, will create a new directoy and '
'finalize bundle in %r', self.bundle_dir)
self.bundle_dir = os.path.realpath(self.bundle_dir)
file_utils.TryMakeDirs(self.bundle_dir)
self.test_image_source = self.manifest.get('test_image')
self.release_image_source = self.manifest.get('release_image')
self.toolkit_source = self.manifest.get('toolkit')
self.readme_path = os.path.join(self.bundle_dir, 'README')
self.has_firmware = self.manifest.get('has_firmware', DEFAULT_FIRMWARES)
def _GetImageVersion(self, image_path):
"""Returns version of the image."""
with MountPartition(image_path, 3) as m:
return _GetReleaseVersion(m)
def _MatchImageVersion(self, image_path, requested_version):
"""Returns True if an image matches the requested version, False
otherwise."""
logging.info('Checking whether the version of image %r is %r',
image_path, requested_version)
image_version = self._GetImageVersion(image_path)
logging.info('Version of image %r is %r', image_path, image_version)
return image_version == requested_version
def _LocateOneResource(self, resource_name, resource_source, search_dirs,
version_checker):
"""Locates a resource under all search directories.
This function tries to locate a local resource under all search_dirs
(relative to self.bundle_dir). If multiple resources were found, an
exception will be raised. If resource_source is LOCAL but no resource was
found, an exception will also be raised. See details below.
Resource source can be LOCAL or non-LOCAL; found entries can be 0, 1, or
multiple, and here's how we should react to each case:
found LOCAL non-LOCAL
------------------------------------------------------------
=0 reject accept (will download later)
=1 accept accept if version matches, reject otherwise
>1 reject reject
Args:
resource_name: name of the resource, such as 'test image' or
'factory toolkit'.
resource_source: source of the resource, LOCAL or a version string.
search_dirs: a list of directories under self.bundle_dir to search. Every
element in this list will be joined to self.bundle_dir first.
version_checker: a callable that with signature
(resource_path, requested_version) that returns True if the resource
matches the requested_version, False otherwise.
Returns:
Path to the resource (if only one is found and its version matches).
"""
abs_search_dirs = [os.path.join(self.bundle_dir, d) for d in search_dirs]
# TODO(crbug.com/706756): once the directory structure has been fixed, we
# can just build up the path instead of searching
# through all search_dirs.
logging.info('Searching %s in %r', resource_name, search_dirs)
found_entries = FinalizeBundle._ListAllFilesIn(abs_search_dirs)
if len(found_entries) > 1:
raise Exception(
'There should be only one %s in %r but found multiple: %r' % (
resource_name, search_dirs, found_entries))
resource_path = None
if len(found_entries) == 1:
resource_path = found_entries[0]
logging.info(
'A local copy of %s is found at %r', resource_name, resource_path)
if resource_source == LOCAL and len(found_entries) == 0:
raise Exception(
'%s source is specified as %r but no one found under %r' % (
resource_name.capitalize(), LOCAL, search_dirs))
if (resource_source != LOCAL and len(found_entries) == 1 and
not version_checker(resource_path, resource_source)):
raise Exception(
'Requested %s version is %r but found a local one with different '
'version at %r' % (resource_name, resource_source, resource_path))
return resource_path
def LocateResources(self):
"""Locates test image, release image, and factory toolkit.
This function tries to locate test image, release image, and factory toolkit
in self.bundle_dir, and sets the following attributes respectively:
- self.test_image_path
- self.releases_image_path
- self.toolkit_path
If a resource is found, the corresponding attribute is set to its path; it
it's not found, the attribute is set to None (and we'll raise an error if
the source of the resource is set to LOCAL in this case).
"""
self.test_image_path = self._LocateOneResource(
'test image', self.test_image_source, TEST_IMAGE_SEARCH_DIRS,
self._MatchImageVersion)
self.release_image_path = self._LocateOneResource(
'release image', self.release_image_source, RELEASE_IMAGE_SEARCH_DIRS,
self._MatchImageVersion)
# TODO(crbug.com/707155): see #c1. Unlike images, we don't handle non-local
# case even if a local toolkit is found. It's not possible to be certain
# that the version of the local toolkit matches the requested one
# because we should not only consider the toolkit but also other
# resources. We have to always download. So the version_checker should
# always return True.
self.toolkit_path = self._LocateOneResource(
'factory toolkit', self.toolkit_source, TOOLKIT_SEARCH_DIRS,
lambda unused_path, unused_version: True)
def _CheckGSUtilVersion(self):
# Check for gsutil >= 3.32.
version = self.gsutil.GetVersion()
# Remove 'pre...' string at the end, if any
version = re.sub('pre.*', '', version)
version_split = [int(x) for x in version.split('.')]
if version_split < REQUIRED_GSUTIL_VERSION:
sys.exit(
'gsutil version >=%s is required; you seem to have %s.\n'
'Please download and install gsutil ('
'https://developers.google.com/storage/docs/gsutil_install), and '
'make sure this is in your PATH before the system gsutil.' % (
'.'.join(str(x) for x in REQUIRED_GSUTIL_VERSION), version))
def DownloadResources(self):
"""Downloads test image, release image, factory toolkit if needed."""
need_test_image = (self.test_image_source != LOCAL and
self.test_image_path is None)
need_release_image = (self.release_image_source != LOCAL and
self.release_image_path is None)
# TODO(crbug.com/707155): see #c1. We have to always download the factory
# toolkit unless the "toolkit" source in config
# refers to only the toolkit version instead of
# factory.zip.
need_toolkit = (self.toolkit_source != LOCAL)
if (not 'add_files' in self.manifest and
not need_test_image and not need_release_image and not need_toolkit):
return
# Make sure gsutil is up to date; older versions are pretty broken.
self._CheckGSUtilVersion()
if self.args.download:
if need_test_image:
self.test_image_path = self._DownloadTestImage(
self.test_image_source,
os.path.join(self.bundle_dir, TEST_IMAGE_SEARCH_DIRS[0]))
if not os.path.exists(self.test_image_path):
raise Exception('No test image at %s' % self.test_image_path)
if need_release_image:
self.release_image_path = self._DownloadReleaseImage(
self.release_image_source,
os.path.join(self.bundle_dir, RELEASE_IMAGE_SEARCH_DIRS[0]))
if not os.path.exists(self.release_image_path):
raise Exception('No release image at %s' % self.release_image_path)
if need_toolkit:
self.toolkit_path = self._DownloadFactoryToolkit(self.toolkit_source,
self.bundle_dir)
# TODO(b/36702884): remove this section once finalize_bundle supports
# grabbing firmware from different sources
# TODO(littlecvr): merge LocateResources(), DownloadResources(), and
# GetAndSetResourceVersions(). If this section is removed,
# DownloadResources() becomes simple enough to be merged.
# One thing to note is that we should make finalize_bundle
# workable without gsutil if all resources come from LOCAL.
# Therefore, when merging these function, make sure we
# don't call self._CheckGSUtilVersion() if not necessary.
for f in self.manifest['add_files']:
CheckDictKeys(f, ['install_into', 'source', 'extract_files'])
dest_dir = os.path.join(self.bundle_dir, f['install_into'])
file_utils.TryMakeDirs(dest_dir)
source = self._SubstVars(f['source'])
if self.args.download:
cached_file = self._DownloadResource([source])
if f.get('extract_files'):
# Gets netboot install shim version from source url since version
# is not stored in the image.
install_into = os.path.join(self.bundle_dir, f['install_into'])
shims_to_extract = [x for x in f['extract_files']
if x in NETBOOT_SHIMS]
if shims_to_extract:
self.netboot_install_shim_version = str(LooseVersion(
os.path.basename(os.path.dirname(source))))
# Delete any existing vmlinuz or vmlinux.bin to make sure we will
# not put any wrong file into the bundle, i.e. if we extract only
# vmlinuz we should delete existing vmlinux.bin, and vice versa.
for path in shims_to_extract:
for shim in NETBOOT_SHIMS:
TryUnlink(os.path.join(install_into, os.path.dirname(path), shim))
if self.args.download:
ExtractFile(cached_file, install_into,
only_extracts=f['extract_files'])
else:
dest_path = os.path.join(dest_dir, os.path.basename(source))
if self.args.download:
shutil.copyfile(cached_file, dest_path)
def GetAndSetResourceVersions(self):
"""Gets and sets versions of test, release image, and factory toolkit."""
self.test_image_version = self._GetImageVersion(self.test_image_path)
logging.info('Test image version: %s', self.test_image_version)
self.release_image_version = self._GetImageVersion(self.release_image_path)
logging.info('Release image version: %s', self.release_image_version)
output = Spawn([self.toolkit_path, '--info'], check_output=True).stdout_data
match = re.match(r'^Identification: .+ Factory Toolkit (.+)$', output, re.M)
assert match, 'Unable to parse toolkit info: %r' % output
self.toolkit_version = match.group(1) # May be None if locally built
logging.info('Toolkit version: %s', self.toolkit_version)
def BuildFactoryImage(self):
self.factory_image_path = os.path.join(self.bundle_dir, 'factory_image',
'chromiumos_factory_image.bin')
logging.info('Creating %s image from %s...',
os.path.basename(self.factory_image_path),
os.path.basename(self.test_image_path))
file_utils.TryMakeDirs(os.path.dirname(self.factory_image_path))
shutil.copyfile(self.test_image_path, self.factory_image_path)
# PYTHONPATH is set when finalize_bundle is executing from a par file.
# Remove PYTHONPATH variable when spawning factory toolkit installer
# since it cause factory toolkit to find the wrong module path.
exec_env = os.environ.copy()
if 'PYTHONPATH' in exec_env:
del exec_env['PYTHONPATH']
additional_args = []
if self.manifest.get('external_presenter', False):
additional_args = ['--no-enable-presenter']
Spawn([self.toolkit_path, self.factory_image_path, '--yes'] +
additional_args, check_call=True, env=exec_env)
def ModifyFactoryImage(self):
add_files_to_image = self.manifest.get('add_files_to_image', [])
if add_files_to_image:
with MountPartition(self.factory_image_path, 1, rw=True) as mount:
for f in add_files_to_image:
dest_dir = os.path.join(mount, 'dev_image', f['install_into'])
Spawn(['mkdir', '-p', dest_dir], log=True, sudo=True, check_call=True)
Spawn(['cp', '-a', os.path.join(self.bundle_dir, f['source']),
dest_dir], log=True, sudo=True, check_call=True)
if self.args.test_list:
logging.info('Setting active test_list to %s', self.args.test_list)
with MountPartition(self.factory_image_path, 1, rw=True) as mount:
logging.info('Using test_list ACTIVE file')
active = os.path.join(mount, 'dev_image', 'factory', 'py', 'test',
'test_lists', 'ACTIVE')
with open(active, 'w') as f:
f.write(self.args.test_list)
# TODO(littlecvr): remove this function once shopfloor has been deprecated.
def MakeUpdateBundle(self):
# Make the factory update bundle
if self.args.updater:
updater_path = os.path.join(
self.bundle_dir, 'shopfloor', 'shopfloor_data', 'update',
'factory.tar.bz2')
file_utils.TryMakeDirs(os.path.dirname(updater_path))
MakeUpdateBundle(self.factory_image_path, updater_path)
def UpdateNetbootURL(self):
"""Updates Omaha & TFTP servers' URL in depthcharge netboot firmware."""
mini_omaha_url = self.manifest.get('mini_omaha_url')
if not mini_omaha_url:
return
netboot_firmware_image = os.path.join(
self.bundle_dir, 'netboot_firmware', 'image.net.bin')
target_bootfile = 'vmlinuz-%s' % self.board
target_argsfile = 'cmdline-%s' % self.board
if os.path.exists(netboot_firmware_image):
netboot_firmware_settings = os.path.join(
self.bundle_dir, 'setup', 'netboot_firmware_settings.py')
new_netboot_firmware_image = netboot_firmware_image + '.INPROGRESS'
Spawn([netboot_firmware_settings,
'--argsfile', target_argsfile,
'--bootfile', target_bootfile,
'--input', netboot_firmware_image,
'--output', new_netboot_firmware_image,
'--omahaserver=%s' % mini_omaha_url,
'--tftpserverip=%s' %
urlparse.urlparse(mini_omaha_url).hostname],
check_call=True, log=True)
shutil.move(new_netboot_firmware_image, netboot_firmware_image)
# support both 'vmlinux.bin' and 'vmlinuz'.
legacy_netboot_shim = os.path.join(self.bundle_dir, 'factory_shim',
'netboot', 'vmlinux.bin')
target_netboot_shim = os.path.join(self.bundle_dir, 'factory_shim',
'netboot', 'vmlinuz')
if not os.path.exists(target_netboot_shim):
target_netboot_shim = legacy_netboot_shim
# Finally, copy to 'vmlinuz-<board>'.
renamed_netboot_shim = os.path.join(self.bundle_dir, 'factory_shim',
'netboot', target_bootfile)
shutil.copy(target_netboot_shim, renamed_netboot_shim)
def UpdateInstallShim(self):
mini_omaha_url = self.manifest.get('mini_omaha_url')
if not mini_omaha_url:
return
def PatchLSBFactory(mount):
"""Patches lsb-factory in an image.
Returns:
True if there were any changes.
"""
lsb_factory_path = os.path.join(
mount, 'dev_image', 'etc', 'lsb-factory')
logging.info('Patching lsb-factory in %s', lsb_factory_path)
orig_lsb_factory = open(lsb_factory_path).read()
lsb_factory = orig_lsb_factory
if mini_omaha_url:
lsb_factory, number_of_subs = re.subn(
r'(?m)^(CHROMEOS_(AU|DEV)SERVER=).+$', r'\1' + mini_omaha_url,
lsb_factory)
if number_of_subs != 2:
sys.exit('Unable to set mini-Omaha server in %s' % lsb_factory_path)
if lsb_factory == orig_lsb_factory:
return False # No changes
WriteWithSudo(lsb_factory_path, lsb_factory)
return True
def PatchInstallShim(shim):
"""Patches lsb-factory and updates self.install_shim_version."""
def GetSigningKey(shim):
"""Derives signing key from factory install shim's file name."""
if shim.endswith('factory_install_shim.bin'):
return 'unsigned'
key_match = re.search(r'channel_([\w\-]+)\.bin$', shim)
if key_match:
return key_match.group(1)
else:
# Error deriving signing key
return 'undefined'
with MountPartition(shim, 1, rw=True) as mount:
PatchLSBFactory(mount)
with MountPartition(shim, 3) as mount:
self.install_shim_version = '%s (%s)' % (_GetReleaseVersion(mount),
GetSigningKey(shim))
# Patch in the install shim, if present.
has_install_shim = False
unsigned_shim = os.path.join(self.bundle_dir, 'factory_shim',
'factory_install_shim.bin')
if os.path.isfile(unsigned_shim):
PatchInstallShim(unsigned_shim)
has_install_shim = True
signed_shims = glob.glob(os.path.join(self.bundle_dir, 'factory_shim',
'chromeos_*_factory*.bin'))
if has_install_shim and signed_shims:
sys.exit('Both unsigned and signed install shim exists. '
'Please remove unsigned one')
if len(signed_shims) > 1:
sys.exit('Expected to find 1 signed factory shim but found %d: %r' % (
len(signed_shims), signed_shims))
elif len(signed_shims) == 1:
PatchInstallShim(signed_shims[0])
has_install_shim = True
if not has_install_shim:
logging.warning('There is no install shim in the bundle.')
def MakeFactoryPackages(self):
setup_dir = os.path.join(self.bundle_dir, 'setup')
make_factory_package = [
'./make_factory_package.sh',
'--board', self.board,
'--release', os.path.relpath(self.release_image_path, setup_dir),
'--test', os.path.relpath(self.test_image_path, setup_dir),
'--toolkit', os.path.relpath(self.toolkit_path, setup_dir),
'--hwid', '../hwid/hwid_v3_bundle_%s.sh' %
self.simple_board.upper()]
if 'complete_script' in self.manifest:
script_base_name = self.manifest['complete_script']
if script_base_name is None:
complete_script = None
else:
complete_script = os.path.join(self.bundle_dir, script_base_name)
if not os.path.exists(complete_script):
raise OSError('Complete script %s does not exist' % complete_script)
else:
# Use setup/complete_script_sample.sh, if it exists
complete_script = os.path.join(
self.bundle_dir, 'setup/complete_script_sample.sh')
if not os.path.exists(complete_script):
complete_script = None
if complete_script:
make_factory_package.extend(['--complete_script', complete_script])
firmware_updater = os.path.join(
self.bundle_dir, 'firmware', 'chromeos-firmwareupdate')
if os.path.exists(firmware_updater):
make_factory_package += ['--firmware',
os.path.relpath(firmware_updater, setup_dir)]
if self.args.make_factory_package:
Spawn(make_factory_package, cwd=setup_dir, check_call=True, log=True)
# TODO(littlecvr): should be pulled outside of this file.
# Build the mini-Omaha startup script.
self.mini_omaha_script_path = os.path.join(
self.bundle_dir, 'start_download_server.sh')
if os.path.exists(self.mini_omaha_script_path):
os.unlink(self.mini_omaha_script_path)
with open(self.mini_omaha_script_path, 'w') as f:
f.write('\n'.join([
'#!/bin/bash',
'set -e', # Fail on error
'cd $(dirname $(readlink -f "$0"))/setup',
'cat static/miniomaha.conf',
('echo Miniomaha configuration MD5SUM: '
'$(md5sum static/miniomaha.conf)'),
'echo Validating configuration...',
('python miniomaha.py --validate_factory_config'),
'echo Starting download server.',
'python miniomaha.py',
'' # Add newline at EOF
]))
os.fchmod(f.fileno(), 0555)
def FixFactoryPar(self):
"""Fix symlinks to factory.par, and replace factory.par if necessary.
(Certain files may have been turned into real files by the buildbots.)
"""
factory_par_path = os.path.join(self.bundle_dir, 'shopfloor', 'factory.par')
with open(factory_par_path) as f:
factory_par_data = f.read()
# Look for files that are identical copies of factory.par.
for root, _, files in os.walk(self.bundle_dir):
for f in files:
path = os.path.join(root, f)
if path == factory_par_path:
# Don't replace it with itself!
continue
if (os.path.islink(path) or
os.path.getsize(path) != len(factory_par_data)):
# It's not a real file, or not the right size. Skip.
continue
with open(path) as fobj:
data = fobj.read()
if data != factory_par_data:
# Data isn't the same. Skip.
continue
# Replace the file with a symlink.
logging.info('Replacing %s with a symlink', path)
os.unlink(path)
os.symlink(os.path.relpath(factory_par_path,
os.path.dirname(path)),
path)
if self.new_factory_par:
logging.info('Copying %s to %s', self.new_factory_par, factory_par_path)
shutil.copy2(self.new_factory_par, factory_par_path)
def RemoveUnnecessaryFiles(self):
"""Removes vim backup files, pyc files, and empty directories."""
logging.info('Removing unnecessary files')
for root, dirs, files in os.walk(self.bundle_dir):
for f in files: # Remove backup files and compiled Python files.
if f.endswith('~') or f.endswith('.pyc'):
path = os.path.join(root, f)
os.unlink(path)
logging.info('Removed file %r', path)
for d in dirs: # Remove any empty directories
try:
path = os.path.join(root, d)
os.rmdir(path)
logging.info('Removed empty directory %r', path)
except OSError as e:
if e.errno != errno.ENOTEMPTY:
raise
def UpdateReadme(self):
# Grok the README file; we'll be modifying it.
readme_sections = re.findall(
# Section header
r'(\*\*\*\n\*\n\* (.+?)\n\*\n\*\*\*\n)'
# Anything up to (but not including) the next section header
r'((?:(?!\*\*\*).)+)', open(self.readme_path).read(), re.DOTALL)
# This results in a list of tuples (a, b, c), where a is the whole
# section header string; b is the name of the section; and c is the
# contents of the section. Turn each tuple into a list; we'll be
# modifying some of them.
readme_sections = [list(x) for x in readme_sections]
readme_section_index = {} # Map of section name to index
for i, s in enumerate(readme_sections):
readme_section_index[s[1]] = i
for k in ['VITAL INFORMATION', 'CHANGES']:
if k not in readme_section_index:
sys.exit('README is missing %s section' % k)
# Make sure that the CHANGES section contains this version.
expected_str = '%s changes:' % self.bundle_name
if expected_str not in readme_sections[readme_section_index['CHANGES']][2]:
logging.warning('The string %r was not found in the CHANGES section. '
'Please add a section for it (if this is the first '
'version, just say "initial release").', expected_str)
def _ExtractFirmwareVersions(updater_file, updater_name):
firmware_versions = _GetFirmwareVersions(updater_file, self.has_firmware)
return [('%s %s' % (updater_name, firmware_type), version)
for firmware_type, version in firmware_versions.iteritems()
if version is not None]
# Get some vital information
vitals = [
('Board', self.board),
('Bundle', '%s (created by %s, %s)' % (
self.bundle_name, os.environ['USER'],
time.strftime('%a %Y-%m-%d %H:%M:%S %z')))]
if self.factory_image_base_version:
vitals.append(('Factory image base', self.factory_image_base_version))
if self.toolkit_version:
vitals.append(('Factory toolkit', self.toolkit_version))
if self.test_image_source == LOCAL:
with MountPartition(self.factory_image_path, 3) as f:
vitals.append(('Test image', '%s (local)' % _GetReleaseVersion(f)))
elif self.test_image_source:
vitals.append(('Test image', self.test_image_source))
with MountPartition(self.factory_image_path, 1) as f:
vitals.append(('Factory updater MD5SUM', open(
os.path.join(f, 'dev_image/factory/MD5SUM')).read().strip()))
stat = os.statvfs(f)
stateful_free_bytes = stat.f_bfree * stat.f_bsize
stateful_total_bytes = stat.f_blocks * stat.f_bsize
vitals.append((
'Stateful partition size',
'%d MiB (%d MiB free = %d%% free)' % (
stateful_total_bytes / 1024 / 1024,
stateful_free_bytes / 1024 / 1024,
int(stateful_free_bytes * 100.0 / stateful_total_bytes))))
vitals.append((
'Stateful partition inodes',
'%d nodes (%d free)' % (stat.f_files, stat.f_ffree)))
if self.install_shim_version:
vitals.append(('Factory install shim', self.install_shim_version))
if self.netboot_install_shim_version:
vitals.append(('Netboot install shim (vmlinuz/vmlinux.bin)',
self.netboot_install_shim_version))
with MountPartition(self.release_image_path, 3) as f:
vitals.append(('Release (FSI)', _GetReleaseVersion(f)))
fw_updater_file = os.path.join(f, 'usr/sbin/chromeos-firmwareupdate')
vitals.extend(_ExtractFirmwareVersions(fw_updater_file, 'Release (FSI)'))
# If we have any firmware in the tree, add them to the vitals.
firmwareupdates = []
for root, unused_dirs, files in os.walk(self.bundle_dir):
for f in files:
path = os.path.join(root, f)
relpath = os.path.relpath(path, self.bundle_dir)
if f == 'chromeos-firmwareupdate':
firmwareupdates.append(path)
vitals.extend(_ExtractFirmwareVersions(path, relpath))
elif f == 'ec.bin':
version = get_version.GetFirmwareBinaryVersion(path)
if not version:
sys.exit('Unable to find EC version in %s' % path)
vitals.append((relpath, version))
elif any(f.startswith(prefix) for prefix in ('nv_image', 'image.net')):
version = get_version.GetFirmwareBinaryVersion(path)
if not version:
sys.exit('Unable to find BIOS version in %s' % path)
vitals.append((relpath, version))
vital_lines = []
max_key_length = max(len(k) for k, v in vitals)
for k, v in vitals:
vital_lines.append('%s:%s %s' % (k, ' ' * (max_key_length - len(k)), v))
vital_contents = '\n'.join(vital_lines)
readme_sections[readme_section_index['VITAL INFORMATION']][2] = (
vital_contents + '\n\n')
index = readme_section_index.get('MINI-OMAHA SERVER')
if index is not None:
instructions = [
'To start a mini-Omaha server:',
'',
' ./start_download_server.sh'
]
readme_sections[index][2] = (
'\n'.join(instructions) + '\n\n')
with open(self.readme_path, 'w') as f:
for header, _, contents in readme_sections:
f.write(header)
f.write(contents)
logging.info('\n\nUpdated %s; vital information:\n%s\n',
self.readme_path, vital_contents)
def Archive(self):
if self.args.archive:
# Done! tar it up, and encourage the poor shmuck who has to build
# the bundle to take a little break.
logging.info('Just works! Creating the tarball. '
'This will take a while... meanwhile, go get %s. '
'You deserve it.',
(['some rest'] * 5 +
['a cup of coffee'] * 7 +
['some lunch', 'some fruit'] +
['an afternoon snack'] * 2 +
['a beer'] * 8)[time.localtime().tm_hour])
for mini in [True, False]:
output_file = self.bundle_dir + ('.mini' if mini else '') + '.tar.bz2'
Spawn(['tar', '-cf', output_file,
'-I', file_utils.GetCompressor('bz2'),
'-C', os.path.dirname(self.bundle_dir)] +
(['--exclude', '*.bin'] if mini else []) +
[os.path.basename(self.bundle_dir)],
log=True, check_call=True)
logging.info(
'Created %s (%.1f GiB).',
output_file, os.path.getsize(output_file) / (1024. * 1024. * 1024.))
logging.info('The README file (%s) has been updated. Make sure to check '
'that it is correct!', self.readme_path)
if sys_utils.InChroot():
factory_board_bundle_path = (
os.path.join(self.build_board.factory_board_files, 'bundle'))
else:
factory_board_bundle_path = 'factory-board'
logging.info(
"IMPORTANT: If you modified the README or MANIFEST.yaml, don't forget "
'to check your changes into %s.',
factory_board_bundle_path)
def _SubstVars(self, input_str):
"""Substitutes variables into a string.
The following substitutions are made:
${BOARD} -> the simple board name (in uppercase)
${FACTORY_IMAGE_BASE_VERSION} -> the factory image version
"""
subst_vars = {
'BOARD': self.simple_board.upper(),
'FACTORY_IMAGE_BASE_VERSION': (self.factory_image_base_version or
self.toolkit_version)
}
return re.sub(r'\$\{(\w+)\}', lambda match: subst_vars[match.group(1)],
input_str)
@contextlib.contextmanager
def _DownloadResource(self, possible_urls, resource_name=None):
"""Downloads a resource file from given URLs.
This function downloads a resource from a list of possible URLs (only the
first one found by the function will be downloaded). If no file is found at
all possible URLs, an exception will be raised.
Args:
possible_urls: a single or a list of possible GS URLs to search.
resource_name: a human readable name of the resource, just for logging,
won't affect the behavior of downloading.
"""
resource_name = resource_name or 'resource'
if not isinstance(possible_urls, list):
possible_urls = [possible_urls]
found_url = None
# ls to see if a given URL exists.
for url in possible_urls:
try:
logging.info('Looking for %s at %s', resource_name, url)
output = self.gsutil.LS(url)
except gsutil.NoSuchKey: # Not found; try next
continue
assert len(output) == 1, (
'Expected %r to matched 1 files, but it matched %r', url, output)
# Found. Download it!
found_url = output[0].strip()
break
if found_url is None:
raise Exception('No %s found in %r' % (resource_name, possible_urls))
downloaded_path = self.gsutil.GSDownload(found_url)
try:
yield (downloaded_path, found_url)
finally:
TryUnlink(downloaded_path)
def _DownloadAndExtractImage(self, image_name, possible_urls, target_dir):
with self._DownloadResource(
possible_urls, image_name) as (downloaded_path, found_url):
try:
file_utils.TryMakeDirs(target_dir)
image_basename = os.path.basename(found_url)
if image_basename.endswith('.bin'): # just move
dst_path = os.path.join(target_dir, image_basename)
logging.info('Moving %r to %r', downloaded_path, dst_path)
shutil.move(downloaded_path, dst_path)
elif image_basename.endswith('.tar.xz'):
logging.info('Extracting %s image into %s...', image_name, target_dir)
file_utils.ExtractFile(downloaded_path, target_dir)
extracted_path = os.listdir(target_dir)
assert len(extracted_path) == 1, (
'Expect only one file in %r but found multiple' % target_dir)
extracted_path = os.path.join(target_dir, extracted_path[0])
# Replace '.tar.xz' with the extracted ext name ('.bin' normally).
unused_name, ext = os.path.splitext(extracted_path)
dst_path = os.path.join(target_dir, image_basename[:-7] + '.' + ext)
shutil.move(extracted_path, dst_path)
else:
raise ValueError(
"Don't know how to handle file extension of %r" % downloaded_path)
return dst_path
finally:
TryUnlink(downloaded_path)
def _DownloadTestImage(self, requested_version, target_dir):
possible_urls = []
for channel in RESOURCE_CHANNELS:
url = '%s/%s' % (
FinalizeBundle._ResourceBaseURL(
channel, self.build_board.gsutil_name, requested_version),
'*test*.tar.xz')
possible_urls.append(url)
return self._DownloadAndExtractImage('test image', possible_urls,
target_dir)
def _DownloadReleaseImage(self, requested_version, target_dir):
possible_urls = []
# Signed recovery image ends with .bin and takes higher priority, so .bin
# must be searched first. Unsigned recovery image ends with .tar.xz.
for ext in ['.bin', '.tar.xz']:
for channel in RESOURCE_CHANNELS:
url = '%s/%s%s' % (
FinalizeBundle._ResourceBaseURL(
channel, self.build_board.gsutil_name, requested_version),
'*recovery*', ext)
possible_urls.append(url)
return self._DownloadAndExtractImage('release image', possible_urls,
target_dir)
def _DownloadFactoryToolkit(self, requested_version, target_dir):
possible_urls = []
for channel in RESOURCE_CHANNELS:
url = '%s/%s' % (
FinalizeBundle._ResourceBaseURL(
channel, self.build_board.gsutil_name, requested_version),
'*factory*.zip')
possible_urls.append(url)
with self._DownloadResource(
possible_urls, 'factory toolkit') as (downloaded_path, unused_url):
file_utils.ExtractFile(downloaded_path, target_dir)
return self._LocateOneResource(
'factory toolkit', LOCAL, TOOLKIT_SEARCH_DIRS,
lambda unused_path, unused_version: True)
@staticmethod
def _ResourceBaseURL(channel, board, version):
return (
'gs://chromeos-releases/%(channel)s-channel/%(board)s/%(version)s' %
dict(channel=channel, board=board, version=version))
@staticmethod
def _ListAllFilesIn(search_dirs):
"""Returns all files under search_dirs.
Args:
search_dirs: a list of directories to search.
"""
found_entries = []
for d in search_dirs:
if os.path.isdir(d):
for f in os.listdir(d):
p = os.path.join(d, f)
if os.path.isfile(p):
found_entries.append(p)
return found_entries
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
FinalizeBundle().Main()