blob: 6a63e15d9875e1134aceada2a2dd3a069d097948 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Contains a helper function for deploying and executing a packaged
executable on a Target."""
import common
import json
import logging
import multiprocessing
import os
import shutil
import subprocess
import tempfile
import threading
import uuid
import select
from symbolizer import FilterStream
FAR = os.path.join(common.SDK_ROOT, 'tools', 'far')
PM = os.path.join(common.SDK_ROOT, 'tools', 'pm')
# Amount of time to wait for the termination of the system log output thread.
_JOIN_TIMEOUT_SECS = 5
def _AttachKernelLogReader(target):
"""Attaches a kernel log reader as a long-running SSH task."""
logging.info('Attaching kernel logger.')
return target.RunCommandPiped(['dlog', '-f'], stdin=open(os.devnull, 'r'),
stdout=subprocess.PIPE)
def _ReadMergedLines(streams):
"""Creates a generator which merges the buffered line output from |streams|.
The generator is terminated when the primary (first in sequence) stream
signals EOF. Absolute output ordering is not guaranteed."""
assert len(streams) > 0
poll = select.poll()
streams_by_fd = {}
primary_fd = streams[0].fileno()
for s in streams:
poll.register(s.fileno(), select.POLLIN)
streams_by_fd[s.fileno()] = s
try:
while primary_fd != None:
events = poll.poll(1)
for fileno, event in events:
if event & select.POLLIN:
yield streams_by_fd[fileno].readline()
elif event & select.POLLHUP:
poll.unregister(fileno)
del streams_by_fd[fileno]
if fileno == primary_fd:
primary_fd = None
finally:
for fd_to_cleanup, _ in streams_by_fd.iteritems():
poll.unregister(fd_to_cleanup)
def DrainStreamToStdout(stream, quit_event):
"""Outputs the contents of |stream| until |quit_event| is set."""
poll = select.poll()
poll.register(stream.fileno(), select.POLLIN)
try:
while not quit_event.is_set():
events = poll.poll(1)
for fileno, event in events:
if event & select.POLLIN:
print stream.readline().rstrip()
elif event & select.POLLHUP:
break
finally:
poll.unregister(stream.fileno())
def RunPackage(output_dir, target, package_path, package_name, package_deps,
run_args, system_logging, install_only, symbolizer_config=None):
"""Copies the Fuchsia package at |package_path| to the target,
executes it with |run_args|, and symbolizes its output.
output_dir: The path containing the build output files.
target: The deployment Target object that will run the package.
package_path: The path to the .far package file.
package_name: The name of app specified by package metadata.
run_args: The arguments which will be passed to the Fuchsia process.
system_logging: If set, connects a system log reader to the target.
install_only: If set, skips the package execution step.
symbolizer_config: A newline delimited list of source files contained
in the package. Omitting this parameter will disable
symbolization.
Returns the exit code of the remote package process."""
system_logger = _AttachKernelLogReader(target) if system_logging else None
try:
if system_logger:
# Spin up a thread to asynchronously dump the system log to stdout
# for easier diagnoses of early, pre-execution failures.
log_output_quit_event = multiprocessing.Event()
log_output_thread = threading.Thread(
target=lambda: DrainStreamToStdout(system_logger.stdout,
log_output_quit_event))
log_output_thread.daemon = True
log_output_thread.start()
for next_package_path in ([package_path] + package_deps):
logging.info('Installing ' + os.path.basename(next_package_path) + '.')
# Copy the package archive.
install_path = os.path.join('/data', os.path.basename(next_package_path))
target.PutFile(next_package_path, install_path)
# Install the package.
p = target.RunCommandPiped(['pm', 'install', install_path],
stderr=subprocess.PIPE)
output = p.stderr.readlines()
p.wait()
if p.returncode != 0:
# Don't error out if the package already exists on the device.
if len(output) != 1 or 'ErrAlreadyExists' not in output[0]:
raise Exception('Error while installing: %s' % '\n'.join(output))
# Clean up the package archive.
target.RunCommand(['rm', install_path])
if system_logger:
log_output_quit_event.set()
log_output_thread.join(timeout=_JOIN_TIMEOUT_SECS)
if install_only:
logging.info('Installation complete.')
return
logging.info('Running application.')
command = ['run', package_name] + run_args
process = target.RunCommandPiped(command,
stdin=open(os.devnull, 'r'),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
if system_logger:
task_output = _ReadMergedLines([process.stdout, system_logger.stdout])
else:
task_output = process.stdout
if symbolizer_config:
# Decorate the process output stream with the symbolizer.
output = FilterStream(task_output, package_name, symbolizer_config,
output_dir)
else:
logging.warn('Symbolization is DISABLED.')
output = process.stdout
for next_line in output:
print next_line.rstrip()
process.wait()
if process.returncode == 0:
logging.info('Process exited normally with status code 0.')
else:
# The test runner returns an error status code if *any* tests fail,
# so we should proceed anyway.
logging.warning('Process exited with status code %d.' %
process.returncode)
finally:
if system_logger:
logging.info('Terminating kernel log reader.')
log_output_quit_event.set()
log_output_thread.join()
system_logger.kill()
return process.returncode