blob: f36f0940a429e6d3878ae2e5c7e3335f6886153f [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module contains methods interfacing with pre-existing tools."""
import cb_constants
import logging
import re
import os
import shutil
from cb_archive_hashing_lib import CheckMd5, ZipExtract
from cb_name_lib import ResolveRecoveryUrl, RunWithNamingRetries
from cb_url_lib import DetermineUrl, Download
from cb_util import RunCommand
USER = os.environ['USER']
HOME_DIR = '/home/%s/trunk/src' % USER
IMG_SIGN_DIR = HOME_DIR + '/platform/vboot_reference/scripts/image_signing'
CHROOT_ROOT = '/home/%s/chromiumos/chroot' % USER
CHROOT_REL_DIR = 'tmp/bundle_tmp'
# Mapping of firmware internal name to regular expression patterns.
FIRMWARE_MAP = {
'x86-alex': {
'ec': {'name': cb_constants.EC_NAME['x86-alex'],
'pattern': 'EC image:.*(Alex.*)'},
'ec2': {'name': cb_constants.EC2_NAME['x86-alex'],
'pattern': 'Extra file:.*(Alex.*)'},
'bios': {'name': cb_constants.BIOS_NAME['x86-alex'],
'pattern': 'BIOS image:.*(Alex.*)'}
},
'stumpy': {
'bios': {'name': cb_constants.BIOS_NAME['stumpy'],
'pattern': 'BIOS image:.*(Stumpy.*)'}
},
}
def IsInsideChroot():
"""Returns True if we are inside chroot.
Method borrowed from <chromeos_root>/chromite/lib/cros_build_lib.py
Returns:
a boolean, True if we are inside chroot.
"""
return os.path.exists('/etc/debian_chroot')
def CheckEnvironment(image_name, firmware_dest, mount_point):
"""Checks requirements for the script to run successfully.
In particular:
- script is run from <ChromeOS_root>/src/scripts
- uudecode utility is available
- given SSD image name follows naming convention and is an existing file
- given firmware destination is an existing directory with write access
- mounting point is available for mounting
Args:
image_name: a string, absolute file path to SSD release image binary.
firmware_dest: a string, absolute path to directory firmware should go.
mount_point: a string, directory to mount SSD image. Defaults to
cb_constants._MOUNT_POINT
Returns:
a boolean, True when the conditions checked are all satisfied.
"""
# TODO(benwin) refactor so this check comes at the beginning of cros_bundle.py
res = True
if not re.search('/src/scripts$', os.getcwd()):
logging.error('\nPlease run this script from the src/scripts directory.\n')
res = False
cmd_result = RunCommand(['which', 'uudecode'], redirect_stdout=True)
output_string = cmd_result.output
if not output_string:
logging.error('\nMissing uudecode. Please run sudo apt-get install '
'sharutils\n')
res = False
if (not os.path.isfile(image_name) or
not re.search('.*ssd.*[.]bin$', image_name)) :
logging.error('\nBad SSD image name given : %s\n', image_name)
res = False
if not os.path.isdir(firmware_dest):
logging.error('\nFirmware destination directory %s does not exist!\n',
firmware_dest)
res = False
if not os.access(firmware_dest, os.W_OK):
logging.error('\nFirmware destination directory %s not writable.\n',
firmware_dest)
res = False
if mount_point:
if os.path.isdir(mount_point) and os.listdir(mount_point):
logging.error('\nMount point %s is not emtpy!\n', mount_point)
res = False
else:
logging.error('\nNo mount point specified!\n')
res = False
return res
def UploadToGsd(filename):
"""Uploads a file or directory to Google Storage for Developers
Assuming proper keys for gsutil are set up for current user.
Args:
filename: absolute path name of file or directory to upload
Raises:
BundlingError when file specified by filename does not exist
"""
if not (filename and os.path.exists(filename)):
raise cb_constants.BundlingError('File %s does not exist.' % filename)
RunCommand(['gsutil', 'cp', filename, cb_constants.GSD_BUCKET])
def _ExtractFirmwareFilename(fw_type, board, fw_content):
"""Parses chromeos-firmwareupdate output for proper firmware file name.
Background:
- factory ssd image comes with a binary executable 'chromeos-firmwareupdate'
- we use this executable to extract EC firmware for Alex (i.e. 'ec.bin')
- for Alex factory bundle, use an alternative name for this EC firmware
Sample output of chromeos-firmwareupdate command (truncated):
EC image: 4d02c93315c880efdfc50ef12b281c9e \
*/build/x86-alex_he/tmp/portage/chromeos-base/<SNIP>/Alex_EC_XHA002M.bin
<--snip-->
Package Content:
4d02c93315c880efdfc50ef12b281c9e *./ec.bin
In this example, we want to rename 'ec.bin' as 'Alex_EC_XHA002M.bin' in the
output bundle (both lines contain the same hash value).
Args:
fw_type: a string, type of firmware. Valid values are keys in FIRMWARE_MAP.
board: a string, target board.
fw_content: a list of strings, output of 'chromeos-firmwareupdate -V'.
Returns:
rename: a string, desired firmware filename. Or None if no match found.
"""
rename = FIRMWARE_MAP[board][fw_type]['name']
fw_pat = re.compile(FIRMWARE_MAP[board][fw_type]['pattern'])
fw_searches = [fw_pat.search(line) for line in fw_content]
fw_matches = [match.group(1) for match in fw_searches if match]
if fw_matches:
#TODO(tgao): ask factory team if this should be an error condition
if len(fw_matches) > 1:
logging.warning('Multiple matches of firmware names: fw_type = %r, '
'board = %r', fw_type, board)
if rename != fw_matches[0]:
return fw_matches[0]
#TODO(tgao): ask factory team if this should be an error condition
logging.warning('Proper renaming of firmware %s failed.', rename)
return rename
def ListFirmware(image_name, cros_fw, board):
"""Gets list of strings representing contents of firmware.
As of 11/2011, only handles Alex and Stumpy firmwares.
Args:
image_name: a string, absolute file path to SSD release image binary.
cros_fw: a string, absolute path of firmware extraction script.
board: a string, target board.
Returns:
a dict, {fw_type: fw_name}.
Raises:
BundlingError when necessary files missing.
"""
if not os.path.exists(cros_fw):
err = 'File chromeos-firmwareupdate missing from %s.' % image_name
raise cb_constants.BundlingError(err)
cmd_result = RunCommand([cros_fw, '-V'], redirect_stdout=True)
output = cmd_result.output
if not output:
err = 'Failed to get output from script %s.' % cros_fw
raise cb_constants.BundlingError(err)
logging.debug('ListFirmware(): chromeos-firmwareupdate output = %s', output)
fw_content = output.split('\n')
pat = re.compile('[.]/(.*)')
# Look for mandatory firmware files in chromeos-firmwareupdate output.
# For example, if fw_content = """
# Package Content:
# 57350ea0958cb39a715ddd4ccf2f0e92 *./bios.bin"""
# searches = [None, None, <sre.SRE_Match object at 0x...>, ]
# fw_files = ['bios.bin']
searches = [pat.search(line) for line in fw_content]
fw_files = [match.group(1) for match in searches if match]
for f in [FIRMWARE_MAP[board][k]['name'] for k in FIRMWARE_MAP[board].keys()]:
if f not in fw_files:
raise cb_constants.BundlingError('Necessary file %s missing from %s.' %
(f, cros_fw))
fw_names = dict()
for fw_type in FIRMWARE_MAP[board].keys():
fw_names[fw_type] = _ExtractFirmwareFilename(fw_type, board, fw_content)
return fw_names
def ExtractFiles(cros_fw):
"""Extract necessary firmware files from an SSD image.
Args:
cros_fw: absolute path of firmware extraction script
Returns:
a string, directory of extracted files, None on failure
"""
if not os.path.exists(cros_fw):
logging.error('Necessary firmware extraction script %s missing.', cros_fw)
return None
cmd_result = RunCommand([cros_fw, '--sb_extract'], redirect_stdout=True)
output_string = cmd_result.output
# TODO(benwin) can this regex be future-proofed?
dirsearch = re.search('/tmp/tmp[.].*', output_string)
if dirsearch:
firmdir = dirsearch.group()
if firmdir and os.path.exists(firmdir):
return firmdir
logging.warning('Failed to extract necessary firmware directory.')
return None
def ExtractFirmware(image_name, firmware_dest, mount_point, board):
"""Extract firmware from an SSD image to help prepare a factory bundle.
See docstring of CheckEnvironment() for environmental prerequisites.
Args:
image_name: a string, absolute file path to SSD release image binary.
firmware_dest: a string, absolute path to directory firmware should go.
mount_point: a string, directory to mount SSD image.
board: a string, target board.
Raises:
BundlingError when necessary tools are missing or SSD mounting fails.
"""
if not CheckEnvironment(image_name, firmware_dest, mount_point):
raise cb_constants.BundlingError(
'Environment check failed, please fix conditions listed above.')
image = os.path.basename(image_name)
try:
logging.info('Mounting SSD image.')
cmd_result = RunCommand([
'./mount_gpt_image.sh', '--read_only', '--safe',
'='.join(['--from', cb_constants.WORKDIR]),
'='.join(['--image', image]),
'='.join(['--rootfs_mountpt', mount_point])
])
if not os.path.exists(mount_point) or not os.listdir(mount_point):
err = ('Failed to mount SSD image at %s: cmd_result = %r' %
(mount_point, cmd_result))
raise cb_constants.BundlingError(err)
cros_fw = os.path.join(mount_point, 'usr', 'sbin',
'chromeos-firmwareupdate')
fw_name = ListFirmware(image_name, cros_fw, board)
firmdir = ExtractFiles(cros_fw)
if not firmdir:
raise cb_constants.BundlingError('Failed to extract firmware files.')
for k, v in FIRMWARE_MAP[board].iteritems():
src_path = os.path.join(firmdir, v['name'])
if not os.path.exists(src_path):
logging.debug('shutil: skip non-existing file %s', src_path)
continue
dst_path = os.path.join(firmware_dest, fw_name[k])
shutil.copy(src_path, dst_path)
# Per yongjaek in 11/2011, also copy chromeos-firmwareupdate shellball
shutil.copy(cros_fw, firmware_dest)
finally:
RunCommand(['./mount_gpt_image.sh', '--unmount'])
filename = os.path.join(cb_constants.WORKDIR, image_name)
md5filename = filename + '.md5'
if not CheckMd5(filename, md5filename):
raise cb_constants.BundlingError(
'SSD image MD5 check failed, image was corrupted!')
def HandleGitExists(force):
"""Detect if git directory already exists and handle overwrite confirmation.
Args:
force: a boolean, True when all existing bundle files can be deleted
Raises:
BundlingError when git directory exists and user does not confirm overwrite
"""
if os.path.exists(cb_constants.GITDIR):
if force:
shutil.rmtree(cb_constants.GITDIR)
os.mkdir(cb_constants.GITDIR)
else:
msg = ('Old recovery conversion script git repo exists, please '
'confirm overwrite')
if AskUserConfirmation(msg):
shutil.rmtree(cb_constants.GITDIR)
os.mkdir(cb_constants.GITDIR)
else:
raise cb_constants.BundlingError(
'Vboot git repo exists, use -f to update')
else:
os.mkdir(cb_constants.GITDIR)
def HandleSsdExists(ssd_name, force):
"""Detect if ssd image already exists and handle overwrite confirmation.
Args:
ssd_name: absolute path name of ssd image to check for
force: a boolean, True when all existing bundle files can be deleted
Raises:
BundlingError when ssd image exists and user does not confirm overwrite
"""
if os.path.exists(ssd_name):
if not force:
msg = 'SSD file %s already exists, please confirm overwrite' % ssd_name
if not AskUserConfirmation(msg):
raise cb_constants.BundlingError(
'File %s already exists, use -f to overwrite' % ssd_name)
def MoveCgpt(cgpt_file, dest_file):
"""Concentrate logic to move cgpt and assign permissions.
Args:
cgpt_file: absolute path to cgpt file
dest_file: absolute pathname of file destination
"""
RunCommand(['sudo', 'cp', cgpt_file, dest_file])
RunCommand(['sudo', 'chmod', '760', dest_file])
def InstallCgpt(index_page, force):
"""Install necessary cgpt utility on the sudo path.
Args:
index_page: html page to download au-generator containing correct cgpt
force: a boolean, True when all existing bundle files can be deleted
Raises:
BundlingError when resource fetch and extract fails or overwrite is denied
"""
au_gen_url = os.path.join(index_page, cb_constants.AU_GEN)
if not Download(au_gen_url):
raise cb_constants.BundlingError(
'Necessary resource %s could not be fetched.' % au_gen_url)
au_gen_name = os.path.join(cb_constants.WORKDIR, cb_constants.AU_GEN)
cgpt_name = os.path.join(cb_constants.WORKDIR, 'cgpt')
if not ZipExtract(au_gen_name, 'cgpt', path=cb_constants.WORKDIR):
raise cb_constants.BundlingError(
'Could not extract necesary resource %s from %s.' %
(cgpt_name, au_gen_name))
cgpt_dest = os.path.join(cb_constants.SUDO_DIR, 'cgpt')
if os.path.exists(cgpt_dest):
if force:
MoveCgpt(cgpt_name, cgpt_dest)
else:
msg = 'cgpt exists at %s, please confirm update' % cgpt_dest
if AskUserConfirmation(msg):
MoveCgpt(cgpt_name, cgpt_dest)
else:
raise cb_constants.BundlingError(
'Necessary utility cgpt already exists at %s, use -f to overwrite '
'with newest version.' % cgpt_dest)
else:
MoveCgpt(cgpt_name, cgpt_dest)
def ConvertRecoveryToSsd(image_name, options):
"""Converts a recovery image into an SSD image.
Default ssd option requires chroot setup and script running in src/scripts.
Args:
image_name: absolute path name of recovery image to convert
options: an object containing inputs to the script
please see cros_bundle_lib/CheckBundleInputs for possibilities
Returns:
a string, the absolute path name of the extracted SSD image
"""
if options.full_ssd:
# TODO(benwin) convert recovery image to full ssd image inside chroot
return RecoveryToFullSsdNoChroot(image_name, options)
return RecoveryToStandardSsd(image_name, options)
def RecoveryToFullSsdNoChroot(image_name, options):
"""Converts a recovery image into an SSD image with stateful partition.
This method does not depend on a chroot setup.
Args:
image_name: absolute path name of recovery image to convert
options: an object containing inputs to the script
please see cros_bundle_lib/CheckBundleInputs for possibilities
Returns:
a string, the absolute path name of the extracted SSD image
Raises:
BundlingError when resources not found or conversion fails.
"""
force = options.force
board = options.board
recovery = options.recovery
ssd_name = image_name.replace('recovery', 'ssd')
HandleSsdExists(ssd_name, force)
# fetch convert_recovery_to_full_ssd.sh
HandleGitExists(force)
RunCommand(['git', 'clone', cb_constants.GITURL, cb_constants.GITDIR])
# fetch zip containing chromiumos_base_image
(rec_url, index_page) = RunWithNamingRetries(
None, ResolveRecoveryUrl, board, recovery)
if not index_page:
raise cb_constants.BundlingError(
'All naming schemes failed attempting to resolve recovery URL '
'for recovery version %s' % recovery)
if not rec_url:
raise cb_constants.BundlingError(
'Could not find URL match for recovery version %s on page %s' %
(recovery, index_page))
rec_no = recovery.split('/')[0]
token_list = ['chromeos', rec_no, board, '.zip']
zip_url = DetermineUrl(index_page, token_list)
if not zip_url:
raise cb_constants.BundlingError(
'Failed to determine name of zip file for token_list %s on page %s' %
(token_list, index_page))
if not Download(zip_url):
raise cb_constants.BundlingError('Failed to download %s.' % zip_url)
zip_name = os.path.join(cb_constants.WORKDIR, os.path.basename(zip_url))
InstallCgpt(index_page, force)
script_name = os.path.join(cb_constants.GITDIR,
'scripts',
'image_signing',
'convert_recovery_to_full_ssd.sh')
RunCommand([script_name, image_name, zip_name, ssd_name])
# TODO(benwin) consider cleaning up resources based on command line flag
return ssd_name
def RecoveryToStandardSsd(image_name, options):
"""Converts a recovery image into an SSD image.
Assumes a chroot setup.
Requires sudo privileges to run cros_sdk.
Requires the script to run in <ChromeOS_root>/src/scripts.
Args:
image_name: absolute path name of recovery image to convert
options: an object containing inputs to the script
please see cros_bundle_lib/CheckBundleInputs for possibilities
Returns:
a string, the absolute path name of the extracted SSD image
Raises:
BundlingError when resources not found or conversion fails.
"""
force = options.force
chromeos_root = options.chromeos_root
if not re.search('/src/scripts$', os.getcwd()):
raise cb_constants.BundlingError(
'ConvertRecoveryToSsd must be run from src/scripts.')
image_dir = os.path.dirname(image_name)
ssd_name = image_name.replace('recovery', 'ssd')
HandleSsdExists(ssd_name, force)
# make copy of recovery image to consume
if not options.chromeos_root:
chroot_work_dir = os.path.join(CHROOT_ROOT, CHROOT_REL_DIR)
else:
if not (chromeos_root and os.path.isdir(chromeos_root)):
raise cb_constants.BundlingError(
'Provided ChromeOS source tree root %s does not exist or '
'is not a directory' % chromeos_root)
chroot_work_dir = os.path.join(chromeos_root, 'chroot', CHROOT_REL_DIR)
# ensure we have a chroot to work in
chroot_work_parent_dir = re.match('(.*/).*', chroot_work_dir).group(1)
if not os.path.exists(chroot_work_parent_dir):
raise cb_constants.BundlingError(
'Chroot environment could not be inferred, failed to create link %s.' %
chroot_work_dir)
if not(chroot_work_dir and os.path.isdir(chroot_work_dir)):
os.mkdir(chroot_work_dir)
ssd_chroot_name = ssd_name.replace(image_dir, chroot_work_dir)
shutil.copy(image_name, ssd_chroot_name)
cmd = (['cros_sdk',
'--',
os.path.join(IMG_SIGN_DIR, 'convert_recovery_to_ssd.sh'),
ssd_name.replace(image_dir,
ReinterpretPathForChroot(chroot_work_dir))])
if options.force:
cmd.insert(5, '--force')
RunCommand(cmd)
# move ssd out, clean up folder
shutil.move(ssd_chroot_name, ssd_name)
shutil.rmtree(chroot_work_dir)
return ssd_name
def FindRepoDir(path=None):
"""Returns the nearest higher-level repo dir from the specified path.
Copied verbatim from <ChromeOS_root>/chromite/lib/cros_build_lib.py.
Args:
path: The path to use. Defaults to cwd.
Returns:
a string, the nearest higher-level repo dir from the specified path.
"""
if path is None:
path = os.getcwd()
path = os.path.abspath(path)
while path != '/':
repo_dir = os.path.join(path, '.repo')
if os.path.isdir(repo_dir):
return repo_dir
path = os.path.dirname(path)
return None
def ReinterpretPathForChroot(path):
"""Returns reinterpreted path from outside the chroot for use inside.
Modified insignificantly from <ChromeOS_root>/chromite/lib/cros_build_lib.py.
Args:
path: The path to reinterpret. Must be in src tree.
Returns:
a string, the reinterpreted path from outside the chroot for use inside.
Raises:
BundlingError when given a path not in src tree.
"""
root_path = os.path.join(FindRepoDir(path), '..')
path_abs_path = os.path.abspath(path)
root_abs_path = os.path.abspath(root_path)
# Strip the repository root from the path and strip first /.
relative_path = path_abs_path.replace(root_abs_path, '')[1:]
if relative_path == path_abs_path:
raise cb_constants.BundlingError('Error: '
'path is outside your src tree, cannot reinterpret.')
new_path = os.path.join('/home', os.getenv('USER'), 'trunk', relative_path)
return new_path
def AskUserConfirmation(msg):
"""Interactively obtain consent from user.
Args:
msg: a string describing the permission sought
Returns:
a boolean, True when the user gives assent
"""
logging.info(msg + ' (y/n): ')
ans = str(raw_input())
return ans.lower() == 'y'