blob: 9ce34d8c110270189c9b2430293a767eea0a8dca [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tools to finalize a factory bundle."""
from __future__ import print_function
import argparse
import contextlib
import errno
import glob
import logging
import os
import re
import shutil
import sys
import textwrap
import time
import urlparse
import yaml
import factory_common # pylint: disable=W0611
from cros.factory.tools import get_version
from cros.factory.tools import gsutil
from cros.factory.utils import cros_board_utils
from cros.factory.utils import file_utils
from cros.factory.utils.process_utils import Spawn
from cros.factory.utils import sys_utils
from cros.factory.utils.sys_utils import MountPartition
from cros.factory.utils.type_utils import CheckDictKeys
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
REQUIRED_GSUTIL_VERSION = [3, 32] # 3.32
DELETION_MARKER_SUFFIX = '_DELETED'
# Default firmware labels to get version and update in README file.
# To override it, assign 'has_firmware' in MANIFEST.yaml.
# Examples:
# has_firmware: [BIOS]
# has_firmware: [EC, BIOS]
# has_firmware: [EC, BIOS, PD]
DEFAULT_FIRMWARES = ['BIOS', 'EC']
FIRMWARE_UPDATER_NAME = 'chromeos-firmwareupdate'
FIRMWARE_UPDATER_PATH = os.path.join('usr', 'sbin', FIRMWARE_UPDATER_NAME)
# Special string to use a local file instead of downloading one.
LOCAL = 'local'
# Legacy: resources may live in different places due to historical reason. To
# maintain backward compatibility, we have to search for a set of directories.
# TODO(crbug.com/706756): once we completely remove the old directories, we can
# simple make this a string instead of a list of
# strings.
TEST_IMAGE_SEARCH_DIRS = ['test_image', 'factory_test']
RELEASE_IMAGE_SEARCH_DIRS = ['release_image', 'release']
TOOLKIT_SEARCH_DIRS = ['toolkit', 'factory_toolkit']
# When version is fixed, we'll try to find the resource in the following order.
RESOURCE_CHANNELS = ['stable', 'beta', 'dev', 'canary']
class FinalizeBundleException(Exception):
pass
def _GetReleaseVersion(mount_point):
"""Returns the release version of an image mounted at mount_point."""
result = get_version.GetReleaseVersion(mount_point)
if not result:
raise FinalizeBundleException(
'Unable to read lsb-release from %s' % mount_point)
return result
def _GetFirmwareVersions(updater, expected_firmwares):
"""Returns the firmware versions in an updater.
Args:
updater: Path to a firmware updater.
expected_firmwares: a list containing the expected firmware labels to
get version, ex: ['BIOS', EC'].
Returns:
{'BIOS': bios_version, 'EC': ec_version, 'PD': pd_version}.
If the firmware is not found, the version will be None.
"""
versions = get_version.GetFirmwareVersionsWithLabel(updater)
for label in expected_firmwares:
if versions.get(label) is None:
raise FinalizeBundleException(
'Unable to read %r version from %s' % (label, updater))
return versions
USAGE = """
Finalizes a factory bundle. This script checks to make sure that the
bundle is valid, outputs version information into the README file, and
tars up the bundle.
The input is a MANIFEST.yaml file like the following:
board: link
project: link
bundle_name: 20121115_pvt
# Specify the version of test image directly.
test_image: 9876.0.0
# Specify that a local release image should be used.
release_image: local
# Specify the version of factory toolkit directly.
toolkit: 9678.12.0
"""
class FinalizeBundle(object):
"""Finalizes a factory bundle (see USAGE).
Properties:
args: Command-line arguments from argparse.
bundle_dir: Path to the bundle directory.
bundle_name: Name of the bundle (e.g., 20121115_proto).
build_board: The BuildBoard object for the board.
board: Board name (e.g., link).
manifest: Parsed YAML manifest.
readme_path: Path to the README file within the bundle.
install_shim_version: Build of the install shim.
new_factory_par: Path to a replacement factory.par.
test_image_source: Source (LOCAL or a version) of the test image.
test_image_path: Path to the test image.
test_image_version: Version of the test image.
release_image_source : Source (LOCAL or a version) of the release image.
release_image_path: Path to the release image.
release_image_version: Version of the release image.
toolkit_source: Source (LOCAL or a version) of the factory toolkit.
toolkit_path: Path to the factory toolkit.
toolkit_version: Version of the factory toolkit.
gsutil: A GSUtil object.
workdir: Working directory.
download: True to download files from Google Storage (for testing only).
archive: True to make a tarball (for testing only).
"""
bundle_dir = None
bundle_name = None
build_board = None
board = None
manifest = None
readme_path = None
install_shim_version = None
new_factory_par = None
project = None
test_image_source = None
test_image_path = None
test_image_version = None
release_image_source = None
release_image_path = None
release_image_version = None
toolkit_source = None
toolkit_path = None
toolkit_version = None
gsutil = None
has_firmware = DEFAULT_FIRMWARES
work_dir = None
download = None
archive = None
def __init__(self, manifest, work_dir, download=True, archive=True):
self.manifest = manifest
self.work_dir = work_dir
self.download = download
self.archive = archive
def Main(self):
self.ProcessManifest()
self.LocateResources()
self.DownloadResources()
self.AddDefaultCompleteScript()
self.AddFirmwareUpdaterAndImages()
self.GetAndSetResourceVersions()
self.PrepareNetboot()
self.UpdateInstallShim()
self.FixFactoryPar()
self.RemoveUnnecessaryFiles()
self.UpdateReadme()
self.Archive()
def ProcessManifest(self):
try:
CheckDictKeys(self.manifest,
['board', 'project', 'bundle_name', 'server_url',
'toolkit', 'test_image', 'release_image', 'firmware',
'hwid', 'has_firmware'])
except ValueError as e:
logging.error(e.message)
raise FinalizeBundleException(
'Invalid manifest content. '
'Please refer to setup/BUNDLE.md (https://goo.gl/pM1pxo)')
self.build_board = cros_board_utils.BuildBoard(self.manifest['board'])
self.board = self.build_board.full_name
self.gsutil = gsutil.GSUtil(self.board)
# assume project=board for backward compatibility
self.project = self.manifest.get('project', self.board).lower()
self.bundle_name = self.manifest['bundle_name']
if not re.match(r'\d{8}_', self.bundle_name):
raise FinalizeBundleException(
"The bundle_name (currently %r) should be today's date, "
'plus an underscore, plus a description of the build, e.g.: %r' %
(self.bundle_name, time.strftime('%Y%m%d_proto')))
# If the basename of the working directory is equal to the expected name, we
# believe that the user intentionally wants to make the bundle in the
# working directory (this is also needed if any of the resource is assigned
# as local). Otherwise, we'll create a directory with expected name under
# the working directory.
expected_dir_name = 'factory_bundle_%s_%s' % (self.project,
self.bundle_name)
logging.info('Expected bundle directory name is %r', expected_dir_name)
if expected_dir_name == os.path.basename(self.work_dir):
self.bundle_dir = self.work_dir
logging.info('The working directory name matches the expected bundle '
'directory name, will finalized bundle directly in the '
'working directory %r', self.bundle_dir)
else:
self.bundle_dir = os.path.join(self.work_dir, expected_dir_name)
logging.info('The working directory name does not match the expected '
'bundle directory name, will create a new directoy and '
'finalize bundle in %r', self.bundle_dir)
self.bundle_dir = os.path.realpath(self.bundle_dir)
file_utils.TryMakeDirs(self.bundle_dir)
self.test_image_source = self.manifest.get('test_image')
self.release_image_source = self.manifest.get('release_image')
self.toolkit_source = self.manifest.get('toolkit')
self.readme_path = os.path.join(self.bundle_dir, 'README')
self.has_firmware = self.manifest.get('has_firmware', DEFAULT_FIRMWARES)
def _GetImageVersion(self, image_path):
"""Returns version of the image."""
with MountPartition(image_path, 3) as m:
return _GetReleaseVersion(m)
def _MatchImageVersion(self, image_path, requested_version):
"""Returns True if an image matches the requested version, False
otherwise."""
logging.info('Checking whether the version of image %r is %r',
image_path, requested_version)
image_version = self._GetImageVersion(image_path)
logging.info('Version of image %r is %r', image_path, image_version)
return image_version == requested_version
def _LocateOneResource(self, resource_name, resource_source, search_dirs,
version_checker):
"""Locates a resource under all search directories.
This function tries to locate a local resource under all search_dirs
(relative to self.bundle_dir). If multiple resources were found, an
exception will be raised. If resource_source is LOCAL but no resource was
found, an exception will also be raised. See details below.
Resource source can be LOCAL or non-LOCAL; found entries can be 0, 1, or
multiple, and here's how we should react to each case:
found LOCAL non-LOCAL
------------------------------------------------------------
=0 reject accept (will download later)
=1 accept accept if version matches, reject otherwise
>1 reject reject
Args:
resource_name: name of the resource, such as 'test image' or
'factory toolkit'.
resource_source: source of the resource, LOCAL or a version string.
search_dirs: a list of directories under self.bundle_dir to search. Every
element in this list will be joined to self.bundle_dir first.
version_checker: a callable that with signature
(resource_path, requested_version) that returns True if the resource
matches the requested_version, False otherwise.
Returns:
Path to the resource (if only one is found and its version matches).
"""
abs_search_dirs = [os.path.join(self.bundle_dir, d) for d in search_dirs]
# TODO(crbug.com/706756): once the directory structure has been fixed, we
# can just build up the path instead of searching
# through all search_dirs.
logging.info('Searching %s in %r', resource_name, search_dirs)
found_entries = FinalizeBundle._ListAllFilesIn(abs_search_dirs)
len_found_entries = len(found_entries)
resource_path = None
is_local = resource_source == LOCAL
if len_found_entries == 1:
resource_path = found_entries[0]
logging.info(
'A local copy of %s is found at %r', resource_name, resource_path)
if not is_local and not version_checker(resource_path, resource_source):
raise FinalizeBundleException(
'Requested %s version is %r but found a local one with different '
'version at %r' % (resource_name, resource_source, resource_path))
elif len_found_entries > 1:
raise FinalizeBundleException(
'There should be only one %s in %r but found multiple: %r' % (
resource_name, search_dirs, found_entries))
else:
assert len_found_entries == 0
if is_local:
raise FinalizeBundleException(
'%s source is specified as %r but no one found under %r' % (
resource_name.capitalize(), LOCAL, abs_search_dirs))
if not self.download:
raise FinalizeBundleException(
'Need %s but no files found under %r' % (
resource_name.capitalize(), abs_search_dirs))
# Will be downloaded later.
return resource_path
def LocateResources(self):
"""Locates test image, release image, and factory toolkit.
This function tries to locate test image, release image, and factory toolkit
in self.bundle_dir, and sets the following attributes respectively:
- self.test_image_path
- self.releases_image_path
- self.toolkit_path
If a resource is found, the corresponding attribute is set to its path; if
it's not found, the attribute is set to None (and we'll raise an error if
the source of the resource is set to LOCAL or if self.download is set
to False in this case).
"""
self.test_image_path = self._LocateOneResource(
'test image', self.test_image_source, TEST_IMAGE_SEARCH_DIRS,
self._MatchImageVersion)
self.release_image_path = self._LocateOneResource(
'release image', self.release_image_source, RELEASE_IMAGE_SEARCH_DIRS,
self._MatchImageVersion)
# TODO(crbug.com/707155): see #c1. Unlike images, we don't handle non-local
# case even if a local toolkit is found. It's not possible to be certain
# that the version of the local toolkit matches the requested one
# because we should not only consider the toolkit but also other
# resources. We have to always download. So the version_checker should
# always return True.
self.toolkit_path = self._LocateOneResource(
'factory toolkit', self.toolkit_source, TOOLKIT_SEARCH_DIRS,
lambda unused_path, unused_version: True)
def _CheckGSUtilVersion(self):
# Check for gsutil >= 3.32.
version = self.gsutil.GetVersion()
# Remove 'pre...' string at the end, if any
version = re.sub('pre.*', '', version)
version_split = [int(x) for x in version.split('.')]
if version_split < REQUIRED_GSUTIL_VERSION:
raise FinalizeBundleException(
'gsutil version >=%s is required; you seem to have %s.\n'
'Please download and install gsutil ('
'https://developers.google.com/storage/docs/gsutil_install), and '
'make sure this is in your PATH before the system gsutil.' % (
'.'.join(str(x) for x in REQUIRED_GSUTIL_VERSION), version))
def DownloadResources(self):
"""Downloads test image, release image, factory toolkit if needed."""
need_test_image = (self.test_image_source != LOCAL and
self.test_image_path is None)
need_release_image = (self.release_image_source != LOCAL and
self.release_image_path is None)
# TODO(crbug.com/707155): see #c1. We have to always download the factory
# toolkit unless the "toolkit" source in config
# refers to only the toolkit version instead of
# factory.zip.
need_toolkit = (self.toolkit_source != LOCAL)
if not need_test_image and not need_release_image and not need_toolkit:
return
# Make sure gsutil is up to date; older versions are pretty broken.
self._CheckGSUtilVersion()
if self.download:
if need_test_image:
self.test_image_path = self._DownloadTestImage(
self.test_image_source,
os.path.join(self.bundle_dir, TEST_IMAGE_SEARCH_DIRS[0]))
if not os.path.exists(self.test_image_path):
raise FinalizeBundleException(
'No test image at %s' % self.test_image_path)
if need_release_image:
self.release_image_path = self._DownloadReleaseImage(
self.release_image_source,
os.path.join(self.bundle_dir, RELEASE_IMAGE_SEARCH_DIRS[0]))
if not os.path.exists(self.release_image_path):
raise FinalizeBundleException(
'No release image at %s' % self.release_image_path)
if need_toolkit:
self.toolkit_path = self._DownloadFactoryToolkit(self.toolkit_source,
self.bundle_dir)
def GetAndSetResourceVersions(self):
"""Gets and sets versions of test, release image, and factory toolkit."""
self.test_image_version = self._GetImageVersion(self.test_image_path)
logging.info('Test image version: %s', self.test_image_version)
self.release_image_version = self._GetImageVersion(self.release_image_path)
logging.info('Release image version: %s', self.release_image_version)
output = Spawn([self.toolkit_path, '--info'], check_output=True).stdout_data
match = re.match(r'^Identification: .+ Factory Toolkit (.+)$', output, re.M)
assert match, 'Unable to parse toolkit info: %r' % output
self.toolkit_version = match.group(1) # May be None if locally built
logging.info('Toolkit version: %s', self.toolkit_version)
def AddDefaultCompleteScript(self):
"""Adds default complete script if not set."""
complete_dir = os.path.join(self.bundle_dir, 'complete')
file_utils.TryMakeDirs(complete_dir)
num_complete_scripts = len(os.listdir(complete_dir))
if num_complete_scripts == 1:
# Complete script already provided.
return
elif num_complete_scripts > 1:
raise FinalizeBundleException(
'Not having exactly one file under %s.' % complete_dir)
default_complete_script = os.path.join(
self.bundle_dir, 'setup', 'complete_script_sample.sh')
shutil.copy(default_complete_script, complete_dir)
def AddFirmwareUpdaterAndImages(self):
"""Add firmware updater into bundle directory, and extract firmware images
into firmware_images/."""
firmware_src = self.manifest.get('firmware', 'release_image')
firmware_dir = os.path.join(self.bundle_dir, 'firmware')
file_utils.TryMakeDirs(firmware_dir)
if firmware_src == 'release_image':
with MountPartition(self.release_image_path, 3) as f:
shutil.copy(os.path.join(f, FIRMWARE_UPDATER_PATH), firmware_dir)
elif firmware_src != LOCAL:
raise FinalizeBundleException(
'firmware must be either "release_image" or "%s".' % LOCAL)
updaters = os.listdir(firmware_dir)
if len(updaters) != 1:
raise FinalizeBundleException(
'Not having exactly one file under %s.' % firmware_dir)
firmware_images_dir = os.path.join(self.bundle_dir, 'firmware_images')
file_utils.TryMakeDirs(firmware_images_dir)
# TODO(phoenixshen): remove this when we don't need to support old
# firmware updater.
with file_utils.TempDirectory() as temp_dir:
Spawn(['sh', os.path.join(firmware_dir, updaters[0]),
'--sb_extract', temp_dir], log=True, check_call=True)
for root, unused_dirs, files in os.walk(temp_dir):
for filename in files:
if filename.endswith('.bin'):
shutil.copy(os.path.join(root, filename),
firmware_images_dir)
# Try to use "chromeos-firmwareupdate --mode=output" to extract bios/ec
# firmware. This option is available for updaters extracted from image
# version >= 9962.0.0.
Spawn(['sudo', 'sh', os.path.join(firmware_dir, updaters[0]),
'--mode', 'output', '--model', self.project,
'--output_dir', firmware_images_dir],
log=True, call=True)
def PrepareNetboot(self):
"""Prepares netboot resource for TFTP setup."""
# TODO(hungte) Change factory_shim/netboot/ to be netboot/ in factory.zip.
orig_netboot_dir = os.path.join(self.bundle_dir, 'factory_shim', 'netboot')
netboot_dir = os.path.join(self.bundle_dir, 'netboot')
if os.path.exists(orig_netboot_dir) and not os.path.exists(netboot_dir):
shutil.move(orig_netboot_dir, netboot_dir)
if not os.path.exists(netboot_dir):
logging.info('No netboot resources.')
return
# Try same convention that sys-boot/chromeos-bootimage is doing:
# bootfile=${PORTAGE_USERNAME}/${BOARD_USE}/vmlinuz
# argfile=${PORTAGE_USERNAME}/${BOARD_USE}/cmdline
files_dir = os.path.join('chrome-bot', self.board)
target_bootfile = os.path.join(files_dir, 'vmlinuz')
target_argsfile = os.path.join(files_dir, 'cmdline')
netboot_firmware_settings = os.path.join(
self.bundle_dir, 'setup', 'netboot_firmware_settings.py')
server_url = self.manifest.get('server_url')
tftp_server_ip = (urlparse.urlparse(server_url).hostname if server_url else
'')
netboot_firmware_image = os.path.join(netboot_dir, 'image.net.bin')
if os.path.exists(netboot_firmware_image):
new_netboot_firmware_image = netboot_firmware_image + '.INPROGRESS'
args = ['--argsfile', target_argsfile,
'--bootfile', target_bootfile,
'--input', netboot_firmware_image,
'--output', new_netboot_firmware_image]
if server_url:
args += ['--factory-server-url=%s' % server_url,
'--tftpserverip=%s' % tftp_server_ip]
Spawn([netboot_firmware_settings] + args, check_call=True, log=True)
shutil.move(new_netboot_firmware_image, netboot_firmware_image)
tftp_root = os.path.join(self.bundle_dir, 'netboot', 'tftp')
tftp_board_dir = os.path.join(tftp_root, files_dir)
file_utils.TryMakeDirs(tftp_board_dir)
# omaha_conf is fetched by factory_installer explicitly.
if server_url:
# TODO(hungte) Rename omahaserver_* to factory_server_*.conf.
omaha_conf = os.path.join(tftp_root, 'omahaserver_%s.conf' % self.board)
file_utils.WriteFile(omaha_conf, server_url)
file_utils.WriteFile(os.path.join(tftp_root, '..', 'dnsmasq.conf'),
textwrap.dedent('''\
# This is a sample config file to be invoked by `dnsmasq -d -C FILE`.
interface=eth2
tftp-root=/var/tftp
enable-tftp
dhcp-leasefile=/tmp/dnsmasq.leases
dhcp-range=192.168.200.50,192.168.200.150,12h
port=0
'''))
bootfile_path = os.path.join(netboot_dir, 'vmlinuz')
if os.path.exists(bootfile_path):
shutil.move(bootfile_path, os.path.join(tftp_board_dir, 'vmlinuz'))
tftpserverip_config = (
('tftpserverip=%s' % tftp_server_ip) if tftp_server_ip else '')
file_utils.WriteFile(
os.path.join(tftp_board_dir, 'cmdline.sample'),
'lsm.module_locking=0 cros_netboot_ramfs cros_factory_install '
'cros_secure cros_netboot earlyprintk cros_debug loglevel=7 '
'%s console=ttyS2,115200n8' % tftpserverip_config)
def UpdateInstallShim(self):
server_url = self.manifest.get('server_url')
if not server_url:
return
def PatchLSBFactory(mount):
"""Patches lsb-factory in an image.
Returns:
True if there were any changes.
"""
lsb_factory_path = os.path.join(
mount, 'dev_image', 'etc', 'lsb-factory')
logging.info('Patching lsb-factory in %s', lsb_factory_path)
orig_lsb_factory = open(lsb_factory_path).read()
lsb_factory = orig_lsb_factory
if server_url:
lsb_factory, number_of_subs = re.subn(
r'(?m)^(CHROMEOS_(AU|DEV)SERVER=).+$', r'\1' + server_url,
lsb_factory)
if number_of_subs != 2:
raise FinalizeBundleException(
'Unable to set factory server URL in %s' % lsb_factory_path)
if lsb_factory == orig_lsb_factory:
return False # No changes
file_utils.WriteWithSudo(lsb_factory_path, lsb_factory)
return True
def PatchInstallShim(shim):
"""Patches lsb-factory and updates self.install_shim_version."""
def GetSigningKey(shim):
"""Derives signing key from factory install shim's file name."""
if shim.endswith('factory_install_shim.bin'):
return 'unsigned'
key_match = re.search(r'channel_([\w\-]+)\.bin$', shim)
if key_match:
return key_match.group(1)
else:
# Error deriving signing key
return 'undefined'
with MountPartition(shim, 1, rw=True) as mount:
PatchLSBFactory(mount)
with MountPartition(shim, 3) as mount:
self.install_shim_version = '%s (%s)' % (_GetReleaseVersion(mount),
GetSigningKey(shim))
# Patch in the install shim, if present.
has_install_shim = False
unsigned_shim = os.path.join(self.bundle_dir, 'factory_shim',
'factory_install_shim.bin')
if os.path.isfile(unsigned_shim):
PatchInstallShim(unsigned_shim)
has_install_shim = True
signed_shims = glob.glob(os.path.join(self.bundle_dir, 'factory_shim',
'chromeos_*_factory*.bin'))
if has_install_shim and signed_shims:
raise FinalizeBundleException(
'Both unsigned and signed install shim exists. '
'Please remove unsigned one')
if len(signed_shims) > 1:
raise FinalizeBundleException(
'Expected to find 1 signed factory shim but found %d: %r' % (
len(signed_shims), signed_shims))
elif len(signed_shims) == 1:
PatchInstallShim(signed_shims[0])
has_install_shim = True
if not has_install_shim:
logging.warning('There is no install shim in the bundle.')
def FixFactoryPar(self):
"""Fix symlinks to factory.par, and replace factory.par if necessary.
(Certain files may have been turned into real files by the buildbots.)
"""
factory_par_path = os.path.join(self.bundle_dir, 'shopfloor', 'factory.par')
factory_par_data = file_utils.ReadFile(factory_par_path)
# Look for files that are identical copies of factory.par.
for root, _, files in os.walk(self.bundle_dir):
for f in files:
path = os.path.join(root, f)
if path == factory_par_path:
# Don't replace it with itself!
continue
if (os.path.islink(path) or
os.path.getsize(path) != len(factory_par_data)):
# It's not a real file, or not the right size. Skip.
continue
with open(path) as fobj:
data = fobj.read()
if data != factory_par_data:
# Data isn't the same. Skip.
continue
# Replace the file with a symlink.
logging.info('Replacing %s with a symlink', path)
os.unlink(path)
os.symlink(os.path.relpath(factory_par_path, os.path.dirname(path)),
path)
if self.new_factory_par:
logging.info('Copying %s to %s', self.new_factory_par, factory_par_path)
shutil.copy2(self.new_factory_par, factory_par_path)
def RemoveUnnecessaryFiles(self):
"""Removes vim backup files, pyc files, and empty directories."""
logging.info('Removing unnecessary files')
for root, dirs, files in os.walk(self.bundle_dir):
for f in files: # Remove backup files and compiled Python files.
if f.endswith('~') or f.endswith('.pyc'):
path = os.path.join(root, f)
os.unlink(path)
logging.info('Removed file %r', path)
for d in dirs: # Remove any empty directories
try:
path = os.path.join(root, d)
os.rmdir(path)
logging.info('Removed empty directory %r', path)
except OSError as e:
if e.errno != errno.ENOTEMPTY:
raise
def UpdateReadme(self):
REQUIRED_SECTIONS = ['VITAL INFORMATION', 'CHANGES']
# Grok the README file; we'll be modifying it.
try:
readme_sections = re.findall(
# Section header
r'\*\*\*\n\*\n\* (.+?)\n\*\n\*\*\*\n'
# Anything up to (but not including) the next section header
r'((?:(?!\*\*\*).)+)',
file_utils.ReadFile(self.readme_path), re.DOTALL)
# This results in a list of tuples (x, y), where x is the name of the
# section, and y is the contents of the section. Turn each tuple into a
# list; we'll be modifying some of them.
readme_sections = [list(s) for s in readme_sections]
except Exception:
readme_sections = [[x, '\n'] for x in REQUIRED_SECTIONS]
readme_section_index = {} # Map of section name to index
for i, s in enumerate(readme_sections):
readme_section_index[s[0]] = i
for x in REQUIRED_SECTIONS:
if x not in readme_section_index:
raise FinalizeBundleException('README is missing %s section' % x)
# Make sure that the CHANGES section contains this version.
expected_str = '%s changes:' % self.bundle_name
if expected_str not in readme_sections[readme_section_index['CHANGES']][1]:
logging.warning('The string %r was not found in the CHANGES section. '
'Please add a section for it (if this is the first '
'version, just say "initial release").', expected_str)
def _ExtractFirmwareVersions(updater_file, updater_name):
firmware_versions = _GetFirmwareVersions(updater_file, self.has_firmware)
return [('%s %s' % (updater_name, firmware_type), version)
for firmware_type, version in firmware_versions.iteritems()
if version is not None]
# Get some vital information
vitals = [
('Board', self.board),
('Project', self.project),
('Bundle', '%s (created by %s, %s)' % (
self.bundle_name, os.environ['USER'],
time.strftime('%a %Y-%m-%d %H:%M:%S %z')))]
if self.toolkit_version:
vitals.append(('Factory toolkit', self.toolkit_version))
if self.test_image_source == LOCAL:
with MountPartition(self.test_image_path, 3) as f:
vitals.append(('Test image', '%s (local)' % _GetReleaseVersion(f)))
elif self.test_image_source:
vitals.append(('Test image', self.test_image_source))
with MountPartition(self.test_image_path, 1) as f:
stat = os.statvfs(f)
stateful_free_bytes = stat.f_bfree * stat.f_bsize
stateful_total_bytes = stat.f_blocks * stat.f_bsize
vitals.append((
'Stateful partition size',
'%d MiB (%d MiB free = %d%% free)' % (
stateful_total_bytes / 1024 / 1024,
stateful_free_bytes / 1024 / 1024,
int(stateful_free_bytes * 100.0 / stateful_total_bytes))))
vitals.append((
'Stateful partition inodes',
'%d nodes (%d free)' % (stat.f_files, stat.f_ffree)))
if self.install_shim_version:
vitals.append(('Factory install shim', self.install_shim_version))
with MountPartition(self.release_image_path, 3) as f:
vitals.append(('Release (FSI)', _GetReleaseVersion(f)))
vitals.extend(_ExtractFirmwareVersions(
os.path.join(f, FIRMWARE_UPDATER_PATH), 'Release (FSI)'))
# If we have any firmware in the tree, add them to the vitals.
for root, unused_dirs, files in os.walk(self.bundle_dir):
for f in files:
path = os.path.join(root, f)
relpath = os.path.relpath(path, self.bundle_dir)
if f == FIRMWARE_UPDATER_NAME:
vitals.extend(_ExtractFirmwareVersions(path, relpath))
elif f in ['ec.bin', 'bios.bin', 'image.bin', 'image.net.bin']:
version = get_version.GetFirmwareBinaryVersion(path)
if not version:
raise FinalizeBundleException(
'Unable to find firmware version in %s' % path)
vitals.append((relpath, version))
vital_lines = []
max_key_length = max(len(k) for k, v in vitals)
for k, v in vitals:
vital_lines.append('%s:%s %s' % (k, ' ' * (max_key_length - len(k)), v))
vital_contents = '\n'.join(vital_lines)
readme_sections[readme_section_index['VITAL INFORMATION']][1] = (
vital_contents + '\n\n')
with open(self.readme_path, 'w') as f:
for section in readme_sections:
f.write('***\n'
'*\n'
'* %s\n'
'*\n'
'***\n'
'%s' % tuple(section))
logging.info('\n\nUpdated %s; vital information:\n%s\n',
self.readme_path, vital_contents)
def Archive(self):
if self.archive:
# Done! tar it up, and encourage the poor shmuck who has to build
# the bundle to take a little break.
logging.info('Just works! Creating the tarball. '
'This will take a while... meanwhile, go get %s. '
'You deserve it.',
(['some rest'] * 5 +
['a cup of coffee'] * 7 +
['some lunch', 'some fruit'] +
['an afternoon snack'] * 2 +
['a beer'] * 8)[time.localtime().tm_hour])
output_file = self.bundle_dir + '.tar.bz2'
Spawn(['tar', '-cf', output_file,
'-I', file_utils.GetCompressor('bz2'),
'-C', self.bundle_dir, '.'],
log=True, check_call=True)
logging.info(
'Created %s (%.1f GiB).',
output_file, os.path.getsize(output_file) / (1024. * 1024. * 1024.))
logging.info('The README file (%s) has been updated. Make sure to check '
'that it is correct!', self.readme_path)
if sys_utils.InChroot() and self.build_board.factory_board_files:
factory_board_bundle_path = os.path.join(
self.build_board.factory_board_files, 'bundle')
else:
factory_board_bundle_path = 'factory-board'
logging.info(
"IMPORTANT: If you modified the README or MANIFEST.yaml, don't forget "
'to check your changes into %s.',
factory_board_bundle_path)
@contextlib.contextmanager
def _DownloadResource(self, possible_urls, resource_name=None, version=None):
"""Downloads a resource file from given URLs.
This function downloads a resource from a list of possible URLs (only the
first one found by the function will be downloaded). If no file is found at
all possible URLs, an exception will be raised.
Args:
possible_urls: a single or a list of possible GS URLs to search.
resource_name: a human readable name of the resource, just for logging,
won't affect the behavior of downloading.
version: version of the resource, just for logging.
"""
resource_name = resource_name or 'resource'
if not isinstance(possible_urls, list):
possible_urls = [possible_urls]
found_url = None
# ls to see if a given URL exists.
for url in possible_urls:
try:
logging.info('Looking for %s at %s', resource_name, url)
output = self.gsutil.LS(url)
except gsutil.NoSuchKey: # Not found; try next
continue
assert len(output) == 1, (
'Expected %r to matched 1 files, but it matched %r', url, output)
# Found. Download it!
found_url = output[0].strip()
break
if found_url is None:
raise FinalizeBundleException(
'No %s found for version %s' % (resource_name, version))
logging.info('Starting to download %s...', found_url)
downloaded_path = self.gsutil.GSDownload(found_url)
try:
yield (downloaded_path, found_url)
finally:
file_utils.TryUnlink(downloaded_path)
def _DownloadAndExtractImage(self, image_name, version, possible_urls,
target_dir):
with self._DownloadResource(
possible_urls, image_name, version) as (downloaded_path, found_url):
try:
file_utils.TryMakeDirs(target_dir)
image_basename = os.path.basename(found_url)
if image_basename.endswith('.bin'): # just move
dst_path = os.path.join(target_dir, image_basename)
logging.info('Moving %r to %r', downloaded_path, dst_path)
shutil.move(downloaded_path, dst_path)
elif image_basename.endswith('.tar.xz'):
logging.info('Extracting %s image into %s...', image_name, target_dir)
file_utils.ExtractFile(downloaded_path, target_dir)
extracted_path = os.listdir(target_dir)
assert len(extracted_path) == 1, (
'Expect only one file in %r but found multiple' % target_dir)
extracted_path = os.path.join(target_dir, extracted_path[0])
# Replace '.tar.xz' with the extracted ext name ('.bin' normally).
unused_name, ext = os.path.splitext(extracted_path)
dst_path = os.path.join(target_dir, image_basename[:-7] + ext)
shutil.move(extracted_path, dst_path)
else:
raise ValueError(
"Don't know how to handle file extension of %r" % downloaded_path)
return dst_path
finally:
file_utils.TryUnlink(downloaded_path)
def _DownloadTestImage(self, requested_version, target_dir):
possible_urls = []
for channel in RESOURCE_CHANNELS:
url = '%s/%s' % (
FinalizeBundle._ResourceBaseURL(
channel, self.build_board.gsutil_name, requested_version),
'*test*.tar.xz')
possible_urls.append(url)
return self._DownloadAndExtractImage('test image', requested_version,
possible_urls, target_dir)
def _DownloadReleaseImage(self, requested_version, target_dir):
possible_urls = []
# Signed recovery image ends with .bin and takes higher priority, so .bin
# must be searched first. Unsigned recovery image ends with .tar.xz.
for ext in ['.bin', '.tar.xz']:
for channel in RESOURCE_CHANNELS:
url = '%s/%s%s' % (
FinalizeBundle._ResourceBaseURL(
channel, self.build_board.gsutil_name, requested_version),
'*recovery*', ext)
possible_urls.append(url)
return self._DownloadAndExtractImage('release image', requested_version,
possible_urls, target_dir)
def _DownloadFactoryToolkit(self, requested_version, target_dir):
possible_urls = []
for channel in RESOURCE_CHANNELS:
url = '%s/%s' % (
FinalizeBundle._ResourceBaseURL(
channel, self.build_board.gsutil_name, requested_version),
'*factory*.zip')
possible_urls.append(url)
with self._DownloadResource(
possible_urls, 'factory toolkit', requested_version
) as (downloaded_path, unused_url):
file_utils.ExtractFile(downloaded_path, target_dir)
return self._LocateOneResource(
'factory toolkit', LOCAL, TOOLKIT_SEARCH_DIRS,
lambda unused_path, unused_version: True)
@staticmethod
def _ResourceBaseURL(channel, board, version):
return (
'gs://chromeos-releases/%(channel)s-channel/%(board)s/%(version)s' %
dict(channel=channel, board=board, version=version))
@staticmethod
def _ListAllFilesIn(search_dirs):
"""Returns all files under search_dirs.
Args:
search_dirs: a list of directories to search.
"""
found_entries = []
for d in search_dirs:
if os.path.isdir(d):
for f in os.listdir(d):
p = os.path.join(d, f)
if os.path.isfile(p):
found_entries.append(p)
return found_entries
@staticmethod
def FromArgs():
args = FinalizeBundle._ParseArgs()
if os.path.isdir(args.manifest):
manifest_path = os.path.join(args.manifest, 'MANIFEST.yaml')
else:
manifest_path = args.manifest
try:
manifest = yaml.load(file_utils.ReadFile(manifest_path))
except Exception:
logging.exception('Failed to load manifest: %s', manifest_path)
logging.error('Please refer to setup/BUNDLE.md (https://goo.gl/pM1pxo)')
raise
work_dir = args.dir or os.path.dirname(os.path.realpath(manifest_path))
return FinalizeBundle(manifest, work_dir, args.download, args.archive)
@staticmethod
def _ParseArgs():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=USAGE)
parser.add_argument(
'--no-download', dest='download', action='store_false',
help="Don't download files from Google Storage (for testing only)")
parser.add_argument(
'--no-archive', dest='archive', action='store_false',
help="Don't make a tarball (for testing only)")
parser.add_argument('manifest', metavar='MANIFEST',
help=(
'Path to the manifest file or the directory '
'containing MANIFEST.yaml'))
parser.add_argument('dir', metavar='DIR', nargs='?',
default=None, help='Working directory')
return parser.parse_args()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
try:
FinalizeBundle.FromArgs().Main()
except Exception:
logging.exception('')
sys.exit(1)