blob: 0d12e65237b8a621f8bd72b1b1bee9775862ef89 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Contains a helper function for deploying and executing a packaged
executable on a Target."""
import common
import hashlib
import json
import logging
import multiprocessing
import os
import re
import select
import shutil
import subprocess
import sys
import tempfile
import time
import threading
import uuid
from symbolizer import SymbolizerFilter
FAR = os.path.join(common.SDK_ROOT, 'tools', 'far')
PM = os.path.join(common.SDK_ROOT, 'tools', 'pm')
_REPO_NAME = 'chrome_runner'
# Amount of time to wait for the termination of the system log output thread.
# Amount of time to wait for Amber to complete package installation, as a
# mitigation against hangs due to amber/network-related failures.
def _AttachKernelLogReader(target):
"""Attaches a kernel log reader as a long-running SSH task."""'Attaching kernel logger.')
return target.RunCommandPiped(['dlog', '-f'], stdin=open(os.devnull, 'r'),
class MergedInputStream(object):
"""Merges a number of input streams into a UNIX pipe on a dedicated thread.
Terminates when the file descriptor of the primary stream (the first in
the sequence) is closed."""
def __init__(self, streams):
assert len(streams) > 0
self._streams = streams
self._read_pipe, write_pipe = os.pipe()
self._output_stream = os.fdopen(write_pipe, 'w')
self._thread = threading.Thread(target=self._Run)
def Start(self):
"""Returns a file descriptor to the merged output stream."""
return self._read_pipe
def _Run(self):
streams_by_fd = {}
primary_fd = self._streams[0].fileno()
for s in self._streams:
streams_by_fd[s.fileno()] = s
# Set when the primary FD is closed. Input from other FDs will continue to
# be processed until select() runs dry.
flush = False
# The lifetime of the MergedInputStream is bound to the lifetime of
# |primary_fd|.
while primary_fd:
# When not flushing: block until data is read or an exception occurs.
rlist, _, xlist =, [], streams_by_fd)
if len(rlist) == 0 and flush:
for fileno in xlist:
del streams_by_fd[fileno]
if fileno == primary_fd:
primary_fd = None
for fileno in rlist:
line = streams_by_fd[fileno].readline()
if line:
self._output_stream.write(line + '\n')
del streams_by_fd[fileno]
if fileno == primary_fd:
primary_fd = None
# Flush the streams by executing nonblocking reads from the input file
# descriptors until no more data is available, or all the streams are
# closed.
while streams_by_fd:
rlist, _, _ =, [], [], 0)
if not rlist:
for fileno in rlist:
line = streams_by_fd[fileno].readline()
if line:
self._output_stream.write(line + '\n')
del streams_by_fd[fileno]
def _GetComponentUri(package_name):
return 'fuchsia-pkg://' % (package_name,
def _UnregisterAmberRepository(target):
"""Unregisters the Amber repository from the target."""
logging.debug('Unregistering Amber repository.')
target.RunCommand(['amber_ctl', 'rm_src', '-n', _REPO_NAME])
# Re-enable 'devhost' repo if it's present. This is useful for devices that
# were booted with 'fx serve'.
target.RunCommand(['amber_ctl', 'enable_src', '-n', 'devhost'], silent=True)
def _RegisterAmberRepository(target, tuf_repo, remote_port):
"""Configures a device to use a local TUF repository as an installation source
for packages.
|target|: The remote device to configure.
|tuf_repo|: The host filesystem path to the TUF repository.
|remote_port|: The reverse-forwarded port used to connect to instance of
`pm serve` that is serving the contents of |tuf_repo|."""
# Extract the public signing key for inclusion in the config file.
root_keys = []
root_json = json.load(open(os.path.join(tuf_repo, 'repository', 'root.json'),
for root_key_id in root_json['signed']['roles']['root']['keyids']:
'Type': root_json['signed']['keys'][root_key_id]['keytype'],
'Value': root_json['signed']['keys'][root_key_id]['keyval']['public']
# "pm serve" can automatically generate a "config.json" file at query time,
# but the file is unusable because it specifies URLs with port
# numbers that are unreachable from across the port forwarding boundary.
# So instead, we generate our own config file with the forwarded port numbers
# instead.
config_file = open(os.path.join(tuf_repo, 'repository', 'repo_config.json'),
'RepoURL': "" % remote_port,
'BlobRepoURL': "" % remote_port,
'RatePeriod': 10,
'RootKeys': root_keys,
'StatusConfig': {
'Enabled': True
'Auto': True
}, config_file)
# Register the repo.
return_code = target.RunCommand(
[('amber_ctl rm_src -n %s; ' +
'amber_ctl add_src -f')
% (_REPO_NAME, remote_port)])
if return_code != 0:
raise Exception('Error code %d when running amber_ctl.' % return_code)
def _DrainStreamToStdout(stream, quit_event):
"""Outputs the contents of |stream| until |quit_event| is set."""
while not quit_event.is_set():
rlist, _, _ =[ stream ], [], [], 0.1)
if rlist:
line = rlist[0].readline()
if not line:
print line.rstrip()
class RunPackageArgs:
"""RunPackage() configuration arguments structure.
install_only: If set, skips the package execution step.
symbolizer_config: A newline delimited list of source files contained
in the package. Omitting this parameter will disable symbolization.
system_logging: If set, connects a system log reader to the target.
target_staging_path: Path to which package FARs will be staged, during
installation. Defaults to staging into '/data'.
def __init__(self):
self.install_only = False
self.symbolizer_config = None
self.system_logging = False
self.target_staging_path = '/data'
def FromCommonArgs(args):
run_package_args = RunPackageArgs()
run_package_args.install_only = args.install_only
run_package_args.system_logging = args.include_system_logs
run_package_args.target_staging_path = args.target_staging_path
return run_package_args
def GetPackageInfo(package_path):
"""Returns a tuple with the name and version of a package."""
# Query the metadata file which resides next to the package file.
package_info = json.load(
open(os.path.join(os.path.dirname(package_path), 'package')))
return (package_info['name'], package_info['version'])
def PublishPackage(tuf_root, package_path):
"""Publishes a combined FAR package to a TUF repository root."""
[PM, 'publish', '-a', '-f', package_path, '-r', tuf_root, '-vt', '-v'],
def RunPackage(output_dir, target, package_path, package_name,
package_deps, package_args, args):
"""Copies the Fuchsia package at |package_path| to the target,
executes it with |package_args|, and symbolizes its output.
output_dir: The path containing the build output files.
target: The deployment Target object that will run the package.
package_path: The path to the .far package file.
package_name: The name of app specified by package metadata.
package_args: The arguments which will be passed to the Fuchsia process.
args: Structure of arguments to configure how the package will be run.
Returns the exit code of the remote package process."""
system_logger = (
_AttachKernelLogReader(target) if args.system_logging else None)
if system_logger:
# Spin up a thread to asynchronously dump the system log to stdout
# for easier diagnoses of early, pre-execution failures.
log_output_quit_event = multiprocessing.Event()
log_output_thread = threading.Thread(
target=lambda: _DrainStreamToStdout(system_logger.stdout,
log_output_thread.daemon = True
tuf_root = tempfile.mkdtemp()
pm_serve_task = None
# Publish all packages to the serving TUF repository under |tuf_root|.
subprocess.check_call([PM, 'newrepo', '-repo', tuf_root])
all_packages = [package_path] + package_deps
for next_package_path in all_packages:
PublishPackage(tuf_root, next_package_path)
# Serve the |tuf_root| using 'pm serve' and configure the target to pull
# from it.
# TODO(kmarshall): Use -q to suppress pm serve output once blob push
# is confirmed to be running stably on bots.
serve_port = common.GetAvailableTcpPort()
pm_serve_task = subprocess.Popen(
[PM, 'serve', '-d', os.path.join(tuf_root, 'repository'), '-l',
':%d' % serve_port, '-q'])
remote_port = common.ConnectPortForwardingTask(target, serve_port, 0)
_RegisterAmberRepository(target, tuf_root, remote_port)
# Install all packages.
for next_package_path in all_packages:
install_package_name, package_version = GetPackageInfo(next_package_path)'Installing %s version %s.' %
(install_package_name, package_version))
return_code = target.RunCommand(['amber_ctl', 'get_up', '-n',
install_package_name, '-v',
if return_code != 0:
raise Exception('Error while installing %s.' % install_package_name)
if system_logger:
if args.install_only:'Installation complete.')
return'Running application.')
command = ['run', _GetComponentUri(package_name)] + package_args
process = target.RunCommandPiped(command,
stdin=open(os.devnull, 'r'),
if system_logger:
output_fd = MergedInputStream([process.stdout,
output_fd = process.stdout.fileno()
# Run the log data through the symbolizer process.
build_ids_path = os.path.join(os.path.dirname(package_path), 'ids.txt')
output_stream = SymbolizerFilter(output_fd, build_ids_path)
for next_line in output_stream:
print next_line.rstrip()
if process.returncode == 0:'Process exited normally with status code 0.')
# The test runner returns an error status code if *any* tests fail,
# so we should proceed anyway.
logging.warning('Process exited with status code %d.' %
if system_logger:'Terminating kernel log reader.')
if pm_serve_task:
return process.returncode