blob: 622f67b6b14c570f436c94d5ef0be3528540700c [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module contains methods interfacing with pre-existing tools."""
import cb_constants
import cb_name_lib
import cb_url_lib
import cb_util_lib
import logging
import re
import os
import shutil
import subprocess
from cb_constants import BundlingError
class CommandResult(object):
"""An object to store various attributes of a child process.
Borrowed from <chromeos_root>/chromite/lib/cros_build_lib.py
"""
def __init__(self):
self.cmd = None
self.error = None
self.output = None
self.returncode = None
def RunCommand(cmd, redirect_stdout=False, redirect_stderr=False, cwd=None):
"""Runs a command using subprocess module Popen.
Blocks until command returns.
Modeled on RunCommand from <chromeos_root>/chromite/lib/cros_build_lib.py
Args:
cmd: a list of arguments to Popen
redirect_stdout: a boolean, True when subprocess output should be returned
redirect_stderr: a boolean, True when subprocess errors should be returned
cwd: working directory in which to run command
Returns:
a CommandResult object.
Raises:
BundlingError when running command fails.
"""
# Set default
stdout = None
stderr = None
cmd_result = CommandResult()
# Modify defaults based on parameters
if redirect_stdout:
stdout = subprocess.PIPE
if redirect_stderr:
stderr = subprocess.PIPE
# log command run
logging.info('Running command: ' + ' '.join(cmd))
try:
proc = subprocess.Popen(cmd, cwd=cwd, stdout=stdout, stderr=stderr)
except OSError as (errno, strerror):
raise BundlingError('\n'.join(['OSError [%d] : %s' % (errno, strerror),
'OSError running cmd %s' % ' '.join(cmd)]))
(cmd_result.output, cmd_result.error) = proc.communicate()
cmd_result.returncode = proc.returncode
if proc.returncode:
raise BundlingError('Nonzero return code running command %s.' %
' '.join(cmd))
return cmd_result
def IsInsideChroot():
"""Returns True if we are inside chroot.
Method borrowed from <chromeos_root>/chromite/lib/cros_build_lib.py
Returns:
a boolean, True if we are inside chroot.
"""
return os.path.exists('/etc/debian_chroot')
def CheckEnvironment(image_name, firmware_dest, mount_point):
"""Checks requirements for the script to run successfully.
In particular:
- script is run from <ChromeOS_root>/src/scripts
- uudecode utility is available
- given SSD image name follows naming convention and is an existing file
- given firmware destination is an existing directory with write access
- mounting point is available for mounting
Args:
image_name: absolute file path to SSD release image binary
firmware_dest: absolute path to directory firmware should go
mount_point: dir to mount SSD image, defaults to cb_constants._MOUNT_POINT
Returns:
a boolean, True when the conditions checked are all satisfied
"""
# TODO(benwin) refactor so this check comes at the beginning of the script
res = True
if not re.search('/src/scripts$', os.getcwd()):
logging.error('\nPlease run this script from the src/scripts directory.\n')
res = False
cmd_result = RunCommand(['which', 'uudecode'],
redirect_stdout=True)
output_string = cmd_result.output
if not output_string:
logging.error('\nMissing uudecode. Please run sudo apt-get install '
'sharutils\n')
res = False
if (not os.path.isfile(image_name) or
not re.search('.*ssd.*[.]bin$', image_name)) :
logging.error('\nBad SSD image name given : %s\n', image_name)
res = False
if not os.path.isdir(firmware_dest):
logging.error('\nFirmware destination directory %s does not exist!\n',
firmware_dest)
res = False
if not os.access(firmware_dest, os.W_OK):
logging.error('\nFirmware destination directory %s not writable.\n',
firmware_dest)
res = False
if mount_point:
if os.path.isdir(mount_point) and os.listdir(mount_point):
logging.error('\nMount point %s is not emtpy!\n', mount_point)
res = False
else:
logging.error('\nNo mount point specified!\n')
res = False
return res
def UploadToGsd(filename):
"""Uploads a file or directory to Google Storage for Developers
Assuming proper keys for gsutil are set up for current user.
Args:
filename: absolute path name of file or directory to upload
Raises:
BundlingError when file specified by filename does not exist
"""
if not (filename and os.path.exists(filename)):
raise BundlingError('File %s does not exist.' % filename)
RunCommand(['gsutil', 'cp', filename, cb_constants.GSD_BUCKET])
def ListFirmware(image_name, cros_fw):
"""Get list of strings representing contents of firmware.
Only handles Alex firmware at present.
Args:
image_name: absolute file path to SSD release image binary
cros_fw: absolute path of firmware extraction script
Returns:
a tuple of strings (ec_name, bios_name)
Raises:
BundlingError when necessary files missing.
"""
if not os.path.exists(cros_fw):
raise BundlingError('Necessary file chromeos-firmwareupdate missing '
'from %s.' % image_name)
cmd_result = RunCommand([cros_fw, '-V'], redirect_stdout=True)
output_string = cmd_result.output
if not output_string:
raise BundlingError('Failed to get output from script %s.' % cros_fw)
lines = output_string.split('\n')
searches = [re.search('[.]/(.*)', line) for line in lines]
firmfiles = [match.group(1) for match in searches if match]
if cb_constants.EC_NAME not in firmfiles:
raise BundlingError('Necessary file ec.bin missing from %s.' % cros_fw)
# TODO(benwin) add additional branching for h2c binary
if cb_constants.BIOS_NAME not in firmfiles:
raise BundlingError('Necessary file bios.bin missing from %s.' % cros_fw)
ec_name = cb_constants.EC_NAME
ec_pat = re.compile('EC image:.*(Alex.*)')
ec_searches = [ec_pat.search(line) for line in lines]
ec_matches = [match.group(1) for match in ec_searches if match]
if len(ec_matches):
ec_name = ec_matches[0]
else:
logging.warning('Proper renaming of ec.bin firmware failed.')
bios_name = cb_constants.BIOS_NAME
bios_pat = re.compile('BIOS image:.*(Alex.*)')
bios_searches = [bios_pat.search(line) for line in lines]
bios_matches = [match.group(1) for match in bios_searches if match]
if len(bios_matches):
bios_name = bios_matches[0]
else:
logging.warning('Proper renaming of bios.bin firmware failed.')
return (ec_name, bios_name)
def ExtractFiles(cros_fw):
"""Extract necessary firmware files from an SSD image.
Args:
cros_fw: absolute path of firmware extraction script
Returns:
a string, directory of extracted files, None on failure
"""
if not os.path.exists(cros_fw):
logging.error('Necessary firmware extraction script %s missing.', cros_fw)
return None
cmd_result = RunCommand([cros_fw, '--sb_extract'], redirect_stdout=True)
output_string = cmd_result.output
firmdir = ''
dirsearch = re.search('/tmp/tmp[.].*', output_string)
if dirsearch:
firmdir = dirsearch.group()
if (not firmdir) or (not os.path.exists(firmdir)):
logging.warning('Failed to extract necessary firmware directory.')
return None
return firmdir
def ExtractFirmware(image_name, firmware_dest, mount_point):
"""Extract firmware from an SSD image to help prepare a factory bundle.
Requires current directory to be <ChromeOS_root>/src/scripts.
Requires sudoer password entry to mount SSD image.
Requires use of uudecode utility available in package sharutils.
Requires mount_point is free to mount SSD image.
Requires firmware destination directory exists and is writable.
Args:
image_name: absolute file path to SSD release image binary
firmware_dest: absolute path to directory firmware should go
mount_point: dir to mount SSD image, defaults to cb_constants._MOUNT_POINT
Returns:
a boolean, True when the firmware has been successfully extracted
Raises:
BundlingError when necessary tools are missing or SSD mounting fails.
"""
if not CheckEnvironment(image_name, firmware_dest, mount_point):
raise BundlingError('Environment check failed, please fix conditions '
'listed above.')
image = image_name.split(os.sep)[-1]
try:
# mount SSD image at mount_point
logging.info('Mounting SSD image.')
RunCommand(['./mount_gpt_image.sh', '--read_only', '--safe',
'--from=' + cb_constants.TMPDIR, '--image=' + image,
'--rootfs_mountpt=' + mount_point])
if not os.path.exists(mount_point) or not os.listdir(mount_point):
raise BundlingError('Failed to mount SSD image at mount point %s' %
mount_point)
cros_fw = os.path.join(mount_point, 'usr', 'sbin',
'chromeos-firmwareupdate')
(ec_name, bios_name) = ListFirmware(image_name, cros_fw)
firmdir = ExtractFiles(cros_fw)
if not firmdir:
raise BundlingError('Failed to extract firmware files.')
shutil.copy(os.path.join(firmdir, cb_constants.EC_NAME),
os.path.join(firmware_dest, ec_name))
shutil.copy(os.path.join(firmdir, cb_constants.BIOS_NAME),
os.path.join(firmware_dest, bios_name))
shutil.copy(cros_fw, firmware_dest)
finally:
RunCommand(['./mount_gpt_image.sh', '--unmount'])
filename = os.path.join(cb_constants.TMPDIR, image_name)
md5filename = filename + '.md5'
if not cb_util_lib.CheckMd5(filename, md5filename):
logging.error('SSD image MD5 checksum did not match, image was corrupted!')
return False
return True
def ConvertRecoveryToSsd(image_name, board, recovery, force):
"""Converts a recovery image into an SSD image.
Args:
image_name: absolute path name of recovery image to convert
board: board name of recovery image to convert
recovery: recovery image version/channel/signing_key
force: a boolean, True when any existing bundle files can be deleted
Returns:
a string, the absolute path name of the extracted SSD image
Throws:
BundlingError when resources not found or conversion fails.
"""
# TODO(benwin) refactor to modularize
# fetch convert_recovery_to_full_ssd.sh
if os.path.exists(cb_constants.GITDIR):
if force:
shutil.rmtree(cb_constants.GITDIR)
os.mkdir(cb_constants.GITDIR)
else:
msg = ('Old recovery conversion script git repo exists, please '
'confirm overwrite')
if AskUserConfirmation(msg):
shutil.rmtree(cb_constants.GITDIR)
os.mkdir(cb_constants.GITDIR)
else:
raise BundlingError('Vboot git repo exists, use -f to update')
RunCommand(['git', 'clone', cb_constants.GITURL, cb_constants.GITDIR])
# fetch zip containing chromiumos_base_image
try:
index_page, rec_pat = cb_name_lib.GetRecoveryName(board, recovery)
rec_url = cb_url_lib.DetermineUrl(index_page, rec_pat)
except cb_url_lib.NameResolutionError:
index_page, rec_pat = cb_name_lib.GetRecoveryName(board,
recovery,
alt_naming=True)
rec_url = cb_url_lib.DetermineUrl(index_page, rec_pat)
if not rec_url:
raise BundlingError('Could not find match for pattern %s at %s.' %
(rec_pat, rec_url))
rec_no = recovery.split('/')[0]
zip_pat = '-'.join(['ChromeOS', rec_no, '.*', board + '.zip'])
zip_url = cb_url_lib.DetermineUrl(index_page, zip_pat)
if not cb_url_lib.Download(zip_url):
raise BundlingError('Failed to download %s.' % zip_url)
zip_name = os.path.join(cb_constants.TMPDIR, os.path.basename(zip_url))
ssd_name = image_name.replace('recovery', 'ssd')
if os.path.exists(ssd_name):
if not force:
msg = 'SSD file %s already exists, please confirm overwrite' % ssd_name
if not AskUserConfirmation(msg):
raise BundlingError('File %s already exists, use -f to overwrite' %
ssd_name)
# put cgpt in the path
au_gen_url = os.path.join(index_page, cb_constants.AU_GEN)
if not cb_url_lib.Download(au_gen_url):
raise BundlingError('Necessary resource %s could not be fetched.' %
au_gen_url)
au_gen_name = os.path.join(cb_constants.TMPDIR, cb_constants.AU_GEN)
cgpt_name = os.path.join(cb_constants.TMPDIR, 'cgpt')
if not cb_util_lib.ZipExtract(au_gen_name, 'cgpt', path=cb_constants.TMPDIR):
raise BundlingError('Could not extract necesary resource %s from %s.' %
(cgpt_name, au_gen_name))
cgpt_dest = os.path.join(cb_constants.SUDO_DIR, 'cgpt')
if os.path.exists(cgpt_dest):
if force:
RunCommand(['sudo', 'cp', cgpt_name, cgpt_dest])
else:
msg = 'cgpt exists at %s, please confirm update' % cgpt_dest
if AskUserConfirmation(msg):
RunCommand(['sudo', 'cp', cgpt_name, cgpt_dest])
else:
raise BundlingError('Necessary utility cgpt already exists at %s, use '
'-f to overwrite with newest version.' %
cgpt_dest)
else:
RunCommand(['sudo', 'cp', cgpt_name, cgpt_dest])
# execute script
script_name = os.path.join(cb_constants.GITDIR,
'scripts',
'image_signing',
'convert_recovery_to_full_ssd.sh')
RunCommand([script_name, image_name, zip_name, ssd_name])
# TODO(benwin) consider cleaning up resources based on command line flag
return ssd_name
def AskUserConfirmation(msg):
"""Interactively obtain consent from user.
Args:
msg: a string describing the permission sought
Returns:
a boolean, True when the user gives assent
"""
logging.info(msg + ' (y/n): ')
ans = str(raw_input())
return ans.lower() == 'y'