blob: ed11019cac5cb5c37797c0a3ddb9326c1f83ad7f [file] [log] [blame]
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wrapper for tests that are run on builders."""
import fileinput
import optparse
import os
import re
import sys
import traceback
import urllib
import HTMLParser
import constants
import chromite.lib.cros_build_lib as cros_lib
_IMAGE_TO_EXTRACT = 'chromiumos_test_image.bin'
class CrosImageDoesNotExistError(Exception):
"""Error thrown when no image can be found."""
class HTMLDirectoryParser(HTMLParser.HTMLParser):
"""HTMLParser for parsing the default apache file index."""
def __init__(self, regex):
self.regex_object = re.compile(regex)
self.link_list = []
def handle_starttag(self, tag, attrs):
"""Overrides from HTMLParser and is called at the start of every tag.
This implementation grabs attributes from links (i.e. <a ... > </a>
and adds the target from href=<target> if the <target> matches the
regex given at the start.
if not tag.lower() == 'a':
for attr in attrs:
if not attr[0].lower() == 'href':
match = self.regex_object.match(attr[1])
if match:
def ModifyBootDesc(download_folder, redirect_file=None):
"""Modifies the boot description of a downloaded image to work with path.
The default boot.desc from another system is specific to the directory
it was created in. This modifies the boot description to be compatiable
with the download folder.
download_folder: Absoulte path to the download folder.
redirect_file: For testing. Where to copy new boot desc.
boot_desc_path = os.path.join(download_folder, 'boot.desc')
in_chroot_folder = cros_lib.ReinterpretPathForChroot(download_folder)
for line in fileinput.input(boot_desc_path, inplace=1):
# Has to be done here to get changes to sys.stdout from fileinput.input.
if not redirect_file:
redirect_file = sys.stdout
split_line = line.split('=')
if len(split_line) > 1:
var_part = split_line[0]
potential_path = split_line[1].replace('"', '').strip()
if potential_path.startswith('/home') and not 'output_dir' in var_part:
new_path = os.path.join(in_chroot_folder,
new_line = '%s="%s"' % (var_part, new_path)
cros_lib.Info('Replacing line %s with %s' % (line, new_line))
redirect_file.write('%s\n' % new_line)
elif 'output_dir' in var_part:
# Special case for output_dir.
new_line = '%s="%s"' % (var_part, in_chroot_folder)
cros_lib.Info('Replacing line %s with %s' % (line, new_line))
redirect_file.write('%s\n' % new_line)
# Line does not need to be modified.
def _GreaterVersion(version_a, version_b):
"""Returns the higher version number of two version number strings."""
version_regex = re.compile('.*(\d+)\.(\d+)\.(\d+)\.(\d+).*')
version_a_tokens = version_regex.match(version_a).groups()
version_b_tokens = version_regex.match(version_b).groups()
for i in range(4):
(a, b) = (int(version_a_tokens[i]), int(version_b_tokens[i]))
if a != b:
if a > b: return version_a
return version_b
return version_a
def GetLatestLinkFromPage(url, regex):
"""Returns the latest link from the given url that matches regex.
url: Url to download and parse.
regex: Regular expression to match links against.
CrosImageDoesNotExistError if no image found using args.
url_file = urllib.urlopen(url)
url_html =
# Parses links with versions embedded.
url_parser = HTMLDirectoryParser(regex=regex)
return reduce(_GreaterVersion, url_parser.link_list)
except TypeError:
raise CrosImageDoesNotExistError('No image found at %s' % url)
def GetNewestLinkFromZipBase(board, channel, zip_server_base):
"""Returns the url to the newest image from the zip server.
board: board for the image zip.
channel: channel for the image zip.
zip_server_base: base url for zipped images.
CrosImageDoesNotExistError if no image found using args.
zip_base = os.path.join(zip_server_base, channel, board)
latest_version = GetLatestLinkFromPage(zip_base, '\d+\.\d+\.\d+\.\d+/')
zip_dir = os.path.join(zip_base, latest_version)
zip_name = GetLatestLinkFromPage(zip_dir,
return os.path.join(zip_dir, zip_name)
def GetLatestZipUrl(board, channel, zip_server_base):
"""Returns the url of the latest image zip for the given arguments.
If the latest does not exist, tries to find the rc equivalent. If neither
exist, returns None.
board: board for the image zip.
channel: channel for the image zip.
zip_server_base: base url for zipped images.
return GetNewestLinkFromZipBase(board, channel, zip_server_base)
except CrosImageDoesNotExistError as ce:
return GetNewestLinkFromZipBase(board + '-rc', channel, zip_server_base)
except CrosImageDoesNotExistError as ce:
return None
def GrabZipAndExtractImage(zip_url, download_folder, image_name) :
"""Downloads the zip and extracts the given image.
Doesn't re-download if matching version found already in download folder.
zip_url - url for the image.
download_folder - download folder to store zip file and extracted images.
image_name - name of the image to extract from the zip file.
zip_path = os.path.join(download_folder, '')
versioned_url_path = os.path.join(download_folder, 'download_url')
found_cached = False
if os.path.exists(versioned_url_path):
fh = open(versioned_url_path)
version_url =
if version_url == zip_url and os.path.exists(os.path.join(download_folder,
cros_lib.Info('Using cached %s' % image_name)
found_cached = True
if not found_cached:
cros_lib.Info('Downloading %s' % zip_url)
cros_lib.RunCommand(['rm', '-rf', download_folder], print_cmd=False)
urllib.urlretrieve(zip_url, zip_path)
# Using unzip because python implemented unzip in native python so
# extraction is really slow.
cros_lib.Info('Unzipping image %s' % image_name)
cros_lib.RunCommand(['unzip', '-d', download_folder, zip_path],
print_cmd=False, error_message='Failed to download %s' % zip_url)
# Put url in version file so we don't have to do this every time.
fh = open(versioned_url_path, 'w+')
version = zip_url.split('/')[-2]
if not _GreaterVersion(version, _NEW_STYLE_VERSION) == version:
# If the version isn't ready for new style, touch file to use old style.
old_style_touch_path = os.path.join(download_folder, '.use_e1000')
fh = open(old_style_touch_path, 'w+')
def GeneratePublicKey(private_key_path):
"""Returns the path to a newly generated public key from given private key."""
# Just output to local directory.
public_key_path = 'public_key.pem'
cros_lib.Info('Generating public key from private key.')
'-in', private_key_path,
'-out', public_key_path,
], print_cmd=False)
return public_key_path
def RunAUTestHarness(board, channel, zip_server_base,
no_graphics, type, remote, clean, test_results_root):
"""Runs the auto update test harness.
The auto update test harness encapsulates testing the auto-update mechanism
for the latest image against the latest official image from the channel. This
also tests images with suite_Smoke (built-in as part of its verification
board: the board for the latest image.
channel: the channel to run the au test harness against.
zip_server_base: base url for zipped images.
no_graphics: boolean - If True, disable graphics during vm test.
type: which test harness to run. Possible values: real, vm.
remote: ip address for real test harness run.
clean: Clean the state of test harness before running.
test_results_root: Root directory to store au_test_harness results.
crosutils_root = os.path.join(constants.SOURCE_ROOT, 'src', 'scripts')
# Grab the latest image we've built.
return_object = cros_lib.RunCommand(
['./', '--board=%s' % board], cwd=crosutils_root,
redirect_stdout=True, print_cmd=True)
latest_image_dir = return_object.output.strip()
target_image = os.path.join(latest_image_dir, _IMAGE_TO_EXTRACT)
# Grab the latest official build for this board to use as the base image.
# If it doesn't exist, run the update test against itself.
download_folder = os.path.abspath('latest_download')
zip_url = GetLatestZipUrl(board, channel, zip_server_base)
base_image = None
if zip_url:
GrabZipAndExtractImage(zip_url, download_folder, _IMAGE_TO_EXTRACT)
base_image = os.path.join(download_folder, _IMAGE_TO_EXTRACT)
base_image = target_image
update_engine_path = os.path.join(crosutils_root, '..', 'platform',
if clean:
private_key_path = os.path.join(update_engine_path, 'unittest_key.pem')
public_key_path = GeneratePublicKey(private_key_path)
cmd = ['bin/cros_au_test_harness',
'--base_image=%s' % base_image,
'--target_image=%s' % target_image,
'--board=%s' % board,
'--type=%s' % type,
'--remote=%s' % remote,
if test_results_root: cmd.append('--test_results_root=%s' % test_results_root)
if no_graphics: cmd.append('--no_graphics')
# Using keys is only compatible with clean.
if clean:
cmd.append('--private_key=%s' % private_key_path)
cmd.append('--public_key=%s' % public_key_path)
cros_lib.RunCommand(cmd, cwd=crosutils_root)
def main():
parser = optparse.OptionParser()
parser.add_option('-b', '--board',
help='board for the image to compare against.')
parser.add_option('-c', '--channel',
help='channel for the image to compare against.')
parser.add_option('--cache', default=False, action='store_true',
help='Cache payloads')
parser.add_option('-z', '--zipbase',
help='Base url for hosted images.')
parser.add_option('--no_graphics', action='store_true', default=False,
help='Disable graphics for the vm test.')
parser.add_option('--test_results_root', default=None,
help='Root directory to store test results. Should '
'be defined relative to chroot root.')
parser.add_option('--type', default='vm',
help='type of test to run: [vm, real]. Default: vm.')
parser.add_option('--remote', default='',
help='For real tests, ip address of the target machine.')
# Set the usage to include flags.
(options, args) = parser.parse_args()
if args: parser.error('Extra args found %s.' % args)
if not options.board: parser.error('Need board for image to compare against.')
if not parser.error('Need channel e.g. dev-channel.')
if not options.zipbase: parser.error('Need zip url base to get images.')
RunAUTestHarness(options.board,, options.zipbase,
options.no_graphics, options.type, options.remote,
not options.cache, options.test_results_root)
if __name__ == '__main__':