blob: d33223f808fcd3e4f73ac6ae66ff93786cb6e380 [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module is a generic library for factory bundle production."""
import logging
import os
import re
import shutil
from cb_archive_hashing_lib import MakeTar, GenerateMd5, MakeMd5, ZipExtract
from cb_command_lib import AskUserConfirmation, ExtractFirmware, \
ConvertRecoveryToSsd
from cb_constants import BundlingError, WORKDIR
from cb_name_lib import GetBundleDefaultName, GetReleaseName, GetRecoveryName, \
GetReleaseName, GetShimName, GetFactoryName
from cb_url_lib import DetermineThenDownloadCheckMd5, DetermineUrl, Download
from cb_util import RunCommand
def CheckBundleInputs(image_names, options):
"""Checks the input for making a factory bundle.
In particular:
- checks for conflicting input flags no_firmware and fsi
- binary image names correctly passed by image fetch
- binary image names point to existing files
Assuming a second recovery image implies a second release image.
Args:
image_names: a dict, values are absolute file paths for keys:
'ssd': release image
'ssd2': second release image or None
'recovery': recovery image
'recovery2': second recovery image or None
'factorybin': factory binary
'shim': factory install shim
options: an object of input arguments to the script
possible options are:
board: target board
board2: optional second target board
bundle_dir: destination root directory for factory bundle files
chromeos_root: user-provided root of ChromeOS source tree checkout
factory: factory image version/channel
force: a boolean, True when all existing bundle files can be deleted
fsi: a boolean, True when processing for a Final Shipping Image
full_ssd: a boolean, True to make release image with stateful partition
fw: a boolean, True when script should extract firmware
recovery: recovery image version/channel/signing_key
recovery2: optional second recovery version/channel/signing_key
release: release candidate version/channel/signing_key
release2: optional second release version/channel/signing_key
tar_dir: destination directory for factory bundle tar file
version: key and version for bundle naming, e.g. mp9x
Raises:
BundlingError when a check fails.
"""
if not options.fw and not options.fsi:
raise BundlingError('Can only skip firmware extraction for '
'final shipping image.')
ssd_name = image_names.get('ssd', None)
rec_name = image_names.get('recovery', None)
msg = []
if ssd_name:
if not os.path.isfile(ssd_name):
msg.append('SSD image %s does not exist.' % ssd_name)
else:
msg.append('Bundling method needs ssd image name.')
if rec_name:
if not os.path.isfile(rec_name):
msg.append('Recovery image %s does not exist.' % rec_name)
else:
msg.append('Bundling method needs recovery image name.')
if not options.fsi:
fac_name = image_names.get('factorybin', None)
shim_name = image_names.get('shim', None)
if fac_name:
if not os.path.isfile(fac_name):
msg.append('Factory image %s does not exist.' % fac_name)
else:
msg.append('Bundling method needs factory image name.')
if shim_name:
if not os.path.isfile(shim_name):
msg.append('Factory install shim %s does not exist.' % shim_name)
else:
msg.append('Bundling method needs factory install shim name.')
if options.recovery2:
# we infer second release image should exist, since script options
# might not list second release image, implying recovery to ssd conversion
ssd_name2 = image_names.get('ssd2', None)
if ssd_name2:
if not os.path.isfile(ssd_name2):
msg.append('Second SSD image %s does not exist.' % ssd_name2)
else:
msg.append('Bundling method needs second ssd image name.')
rec_name2 = image_names.get('recovery2', None)
if rec_name2:
if not os.path.isfile(rec_name2):
msg.append('Second recovery image %s does not exist.' % rec_name2)
else:
msg.append('Bundling method needs second recovery image name.')
if msg:
raise BundlingError('\n'.join(msg))
def MakeFactoryBundle(image_names, options):
"""Produces a factory bundle from the downloaded images.
Requires current directory to be <ChromeOS_root>/src/scripts.
Requires sudoer password entry to mount SSD image.
Bundle is named with input version as well as the current date.
Forces exit if any bundle components exist, use flags to override.
Only extracts firmware from one release image.
Assuming a second recovery image implies a second release image.
Args:
image_names: a dict, values are absolute file paths for keys:
'ssd': release image or None
'ssd2': second release image or None
'recovery': recovery image
'recovery2': second recovery image or None
'factorybin': factory binary
'shim': signed factory install shim
options: an object of input arguments to the script
please see CheckBundleInputs above for possibilities
Returns:
a string, the absolute path name of the factory bundle tar created
Raises:
BundlingError on bad input, inability to write, or firmware extract fail.
"""
# TODO(benwin) refactor this method, it is getting long
# shorten names
fsi = options.fsi
firmware = options.fw
version = options.version
mount_point = options.mount_point
bundle_dir = options.bundle_dir
tar_dir = options.tar_dir
del_ok = options.force
# throws BundlingError if needed resources do not exist or options conflict
CheckBundleInputs(image_names, options)
ssd_name = image_names.get('ssd', None)
ssd_name2 = image_names.get('ssd2', None)
rec_name = image_names.get('recovery', None)
rec_name2 = image_names.get('recovery2', None)
fac_name = image_names.get('factorybin', None)
shim_name = image_names.get('shim', None)
if bundle_dir:
if not os.path.isdir(bundle_dir):
raise BundlingError('Provided directory %s does not exist.' % bundle_dir)
if not os.access(bundle_dir, os.W_OK):
raise BundlingError('Provided directory %s not writable.' % bundle_dir)
else:
bundle_dir = os.path.join(
WORKDIR, GetBundleDefaultName(version=version))
if os.path.exists(bundle_dir):
if del_ok:
shutil.rmtree(bundle_dir)
else:
msg = 'Bundle directory %s already exists. Ok to overwrite?' % bundle_dir
ans = AskUserConfirmation(msg)
if ans:
shutil.rmtree(bundle_dir)
else:
raise BundlingError('Directory %s exists. Use -f to overwrite.' %
bundle_dir)
os.mkdir(bundle_dir)
if not fsi:
dir_list = ['release', 'recovery', 'factory', 'shim']
else:
dir_list = ['release', 'recovery']
dir_dict = {}
for dir_name in dir_list:
directory = os.path.join(bundle_dir, dir_name)
dir_dict[dir_name] = directory
os.mkdir(directory)
if tar_dir:
if not os.path.isdir(tar_dir):
# input given but bad
logging.warning('Provided directory %s does not exist, using %s',
tar_dir, WORKDIR)
tar_dir = WORKDIR
else:
# make default have cleaner output
tar_dir = WORKDIR
if firmware:
firmware_dest = os.path.join(bundle_dir, 'firmware')
if os.path.exists(firmware_dest):
if del_ok:
shutil.rmtree(firmware_dest)
else:
msg = ('Bundle directory %s already exists. Ok to overwrite?' %
firmware_dest)
ans = AskUserConfirmation(msg)
if ans:
shutil.rmtree(firmware_dest)
else:
raise BundlingError('Directory %s exists. Use -f to overwrite.' %
firmware_dest)
os.mkdir(firmware_dest)
ExtractFirmware(ssd_name, firmware_dest, mount_point, options.board)
logging.info('Successfully extracted firmware to %s', firmware_dest)
shutil.copy(ssd_name, dir_dict.get('release', None))
shutil.copy(rec_name, dir_dict.get('recovery', None))
if options.release2:
shutil.copy(ssd_name2, dir_dict.get('release', None))
if options.recovery2:
if not options.release2:
# converted from recovery, still need to copy file
shutil.copy(ssd_name2, dir_dict.get('release', None))
shutil.copy(rec_name2, dir_dict.get('recovery', None))
if not fsi:
shutil.copy(shim_name, dir_dict.get('shim', None))
shutil.copy(fac_name, dir_dict.get('factory', None))
MakeMd5Sums(bundle_dir)
logging.info('Completed copying factory bundle files to %s', bundle_dir)
logging.info('Tarring bundle files, this operation is resource-intensive.')
tarname = MakeTar(bundle_dir, tar_dir)
if not tarname:
raise BundlingError('Failed to create tar file of bundle directory.')
logging.info('Completed creating factory bundle tar file in %s.', WORKDIR)
abstarname = os.path.join(tar_dir, tarname)
return abstarname
def MakeMd5Sums(bundle_dir):
"""Generate MD5 checksums for all binary components of factory bundle.
Args:
bundle_dir: absolute path to directory containing factory bundle files
Raises:
BundlingError on failure
"""
file_list = []
binary_file_pattern = re.compile('.*[.]bin$|.*[.]fd$')
for directory in os.listdir(bundle_dir):
for filename in os.listdir(os.path.join(bundle_dir, directory)):
if re.search(binary_file_pattern, filename):
file_list.append(os.path.join(bundle_dir, directory, filename))
md5filename = os.path.join(bundle_dir, 'file_checksum.md5')
lines_written = []
try:
with open(md5filename, 'w') as md5file:
for absfilename in file_list:
md5sum = GenerateMd5(absfilename)
if not md5sum:
raise BundlingError('Failed to compute MD5 checksum for file %s.' %
absfilename)
rel_name_list = ['.']
rel_name_list.extend(absfilename.split('/')[-2:])
relfilename = '/'.join(rel_name_list)
line = (md5sum + ' ' + relfilename + '\n')
lines_written.append(line)
md5file.write(line)
return lines_written
except IOError:
raise BundlingError('Failed to open file for writing md5 checksums.')
def _GetResourceUrlAndPath(desc, get_func, *args):
"""Wrapper method for obtaining a resource.
Args:
desc: a string, resource description.
get_func: a function object, function to execute.
args: arguments to pass into func.
Returns:
url: a string, URL to the resource.
path: a string, local path to the downloaded resource. Or None if error.
"""
url, pat = get_func(*args)
path = DetermineThenDownloadCheckMd5(url, pat, WORKDIR, desc)
return (url, path)
def _HandleFactoryImageAndShim(options, alt_naming):
"""Logic for handling factory image and shim.
Args:
options: an object containing inputs to the script
alt_naming: optional, see docstring for GetNameComponents in cb_name_lib.py
Returns:
absfactorybin: a string, path to factory image.
shim_name: a string, name of factory shim.
"""
fac_url, token_list = GetFactoryName(options.board, options.factory,
alt_naming)
fac_det_url = DetermineUrl(fac_url, token_list)
if not fac_det_url:
raise BundlingError('Factory image exact URL could not be determined '
'on page %s given pattern %s.' % (fac_url, token_list))
fac_name = os.path.join(WORKDIR, os.path.basename(fac_det_url))
if not os.path.exists(fac_name):
logging.info('Downloading ' + fac_det_url)
if not Download(fac_det_url):
raise BundlingError('Factory image could not be fetched.')
logging.info('Resource %s is present.', fac_name)
factorybin = os.path.join('factory_test', 'chromiumos_factory_image.bin')
absfactorybin = os.path.join(WORKDIR, factorybin)
if not os.path.exists(absfactorybin):
logging.info('Extracting factory image binary')
if not ZipExtract(fac_name, factorybin, path=WORKDIR):
raise BundlingError('Could not find chromiumos_factory_image.bin '
'in factory image.')
logging.info('Resource %s is present.', absfactorybin)
# Factory Install Shim
_, token_list = GetShimName(options.board, options.shim, alt_naming)
# shim is to be found on index page of recovery image sought, even if it
# has a name that suggests it would be on another channel index page
shim_name = DetermineThenDownloadCheckMd5(fac_url, token_list, WORKDIR,
'Factory Install Shim')
return (absfactorybin, shim_name)
def FetchImages(options, alt_naming=0):
"""Fetches images for factory bundle specified by args input
Assuming second recovery implies second ssd should be made through conversion
Assuming install shim surfaced on index page for first recovery image
Default ssd conversion requires chroot setup and that this method be used
in current directory <ChromeOS_root>/src/scripts
Args:
options: an object containing inputs to the script
please see CheckBundleInputs above for possibilities
alt_naming: optional, see docstring for GetNameComponents in cb_name_lib.py
Returns:
a dict, possible values are absolute file paths for keys:
'ssd': release image
'ssd2': second release image or None
'recovery': recovery image
'recovery2': second recovery image or None
'factorybin': factory binary
'shim': signed factory install shim
Raises:
BundlingError when resources cannot be fetched.
"""
# Recovery
rec_url, rec_name = _GetResourceUrlAndPath(
'Recovery image', GetRecoveryName, options.board, options.recovery,
alt_naming)
# Release
if options.release:
rel_url, rel_name = _GetResourceUrlAndPath(
'Release image', GetReleaseName, options.board, options.release,
alt_naming)
else:
# if needed, run recovery to ssd conversion now that we have recovery image
rel_name = ConvertRecoveryToSsd(rec_name, options)
if not MakeMd5(rel_name, rel_name + '.md5'):
raise BundlingError('Failed to create md5 checksum for %s' % rel_name)
# Optional Extra Release
rel_name2 = None
if options.release2:
rel_url2, rel_name2 = _GetResourceUrlAndPath(
'Second release image', GetReleaseName, options.board2,
options.release2, alt_naming)
# Optional Extra Recovery
rec_name2 = None
if options.recovery2:
rec_url2, rec_name2 = _GetResourceUrlAndPath(
'Second recovery image', GetRecoveryName, options.board2,
options.recovery2, alt_naming)
# if provided a second recovery image but no matching ssd, run conversion
if not options.release2:
rel_name2 = ConvertRecoveryToSsd(rec_name2, options)
image_names = dict(ssd=rel_name, ssd2=rel_name2, recovery=rec_name,
recovery2=rec_name2)
# Factory and Shim
if not options.fsi:
(absfactorybin, shim_name) = _HandleFactoryImageAndShim(options, alt_naming)
image_names.update(dict(factorybin=absfactorybin, shim=shim_name))
return image_names
def CheckParseOptions(options, parser):
"""Checks parse options input to the factory bundle script.
Args:
options: an object with the input options to the script
please see CheckBundleInputs above for possibilities
parser: the OptionParser used to parse the input options
Raises:
BundlingError when parse options are bad
"""
# TODO(benwin) check that clean does not occur with any other options
if not options.clean and not options.factory:
parser.print_help()
raise BundlingError('\nMust specify factory zip version/channel.')
if options.force:
logging.info('Detected --force option, obtaining sudo privilege now.')
logging.info('Remove --force option to list and confirm each command.')
RunCommand(['sudo', '-v'])
if not options.fsi and not options.shim:
raise BundlingError('\nMust specify install shim for non-fsi bundle.')